Source code for celery.app.autoretry
"""Tasks auto-retry functionality."""
from vine.utils import wraps
from celery.exceptions import Ignore, Retry
from celery.utils.time import get_exponential_backoff_interval
[docs]def add_autoretry_behaviour(task, **options):
"""Wrap task's `run` method with auto-retry functionality."""
autoretry_for = tuple(
options.get('autoretry_for',
getattr(task, 'autoretry_for', ()))
)
retry_kwargs = options.get(
'retry_kwargs', getattr(task, 'retry_kwargs', {})
)
retry_backoff = int(
options.get('retry_backoff',
getattr(task, 'retry_backoff', False))
)
retry_backoff_max = int(
options.get('retry_backoff_max',
getattr(task, 'retry_backoff_max', 600))
)
retry_jitter = options.get(
'retry_jitter', getattr(task, 'retry_jitter', True)
)
if autoretry_for and not hasattr(task, '_orig_run'):
@wraps(task.run)
def run(*args, **kwargs):
try:
return task._orig_run(*args, **kwargs)
except Ignore:
# If Ignore signal occures task shouldn't be retried,
# even if it suits autoretry_for list
raise
except Retry:
raise
except autoretry_for as exc:
if retry_backoff:
retry_kwargs['countdown'] = \
get_exponential_backoff_interval(
factor=retry_backoff,
retries=task.request.retries,
maximum=retry_backoff_max,
full_jitter=retry_jitter)
# Override max_retries
if hasattr(task, 'override_max_retries'):
retry_kwargs['max_retries'] = getattr(task,
'override_max_retries',
task.max_retries)
ret = task.retry(exc=exc, **retry_kwargs)
# Stop propagation
if hasattr(task, 'override_max_retries'):
delattr(task, 'override_max_retries')
raise ret
task._orig_run, task.run = task.run, run