Django celery - перезапуск задания после его завершения
@app.task
def Task1():
print("this is task 1")
return "Task-1 Done"
Например, я хочу перезапустить задание после его завершения
Ручные последовательные звонки
Если вы хотите вызвать задачу несколько раз и заставить ее использовать один и тот же идентификатор задачи каждый раз, вы можете использовать аргумент task_id
в apply_async.
Обратите внимание, что это не применимо с delay в соответствии с документацией:
delay(*args, **kwargs)
Версия apply_async() для звездных аргументов.
Не поддерживает дополнительные опции, включенные apply_async().
@app.task(bind=True)
def Task1(self):
print(f"this is task 1 {self.request.id}")
>>> from tasks import Task1
>>> result = Task1.apply_async()
>>> result
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> result.id
'ba488582-9d7d-4bda-a19d-a2b0bf9b503f'
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>>
[2021-08-12 08:24:31,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:24:31,538: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:24:31,539: WARNING/ForkPoolWorker-4]
[2021-08-12 08:24:31,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.00041928999962692615s: None
[2021-08-12 08:25:00,608: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4]
[2021-08-12 08:25:00,609: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0002528750001147273s: None
[2021-08-12 08:25:06,137: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4]
[2021-08-12 08:25:06,139: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0003467680007815943s: None
[2021-08-12 08:25:10,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4]
[2021-08-12 08:25:10,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0006299719998423825s: None
- Идентификатор задачи одинаков для всех исполнений, который здесь
ba488582-9d7d-4bda-a19d-a2b0bf9b503f
(что видно и изAsyncResult
)
Автоматические последовательные вызовы
Если вы хотите постоянно перезапускать задачу, вот несколько вариантов. Все приведенные ниже варианты являются бесконечно рекурсивными. Возможно, вы захотите поставить некоторые условия внутри задачи о том, когда бесконечный цикл будет прерван, например, добавить вход в задачу и использовать его как основу, если выполнение уже должно остановиться.
Вариант 1: Вызов самой задачи асинхронно внутри одной задачи. Это в некоторой степени похоже на рекурсию. При этом будет использоваться тот же идентификатор задачи, что и при последовательных вызовах вручную (см. выше).
@app.task(bind=True)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
Task1.apply_async(task_id=self.request.id)
Вариант 2: Запустите механизм retry, предоставляемый celery. При этом будет использован тот же идентификатор задачи в той же очереди, как описано в этой ссылке:
При вызове повторной попытки будет отправлено новое сообщение, используя тот же идентификатор задачи, и позаботится о том, чтобы сообщение было доставлено в ту же очередь, что и исходная задача.
Мы можем проверить это, отобразив id задачи через self.request.id
.
@app.task(
bind=True,
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
raise self.retry()
Вариант 3: Повторять только для конкретного сценария (здесь RestartTaskNeeded
). То же самое с Вариантом 2, здесь также будет использоваться тот же самый идентификатор задачи в той же самой очереди.
class RestartTaskNeeded(Exception):
pass
@app.task(
bind=True,
autoretry_for=(RestartTaskNeeded,),
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
raise RestartTaskNeeded
Выход:
>>> from tasks import Task1
>>> Task1.apply_async()
<AsyncResult: 999e9de0-292f-412d-a9a8-b5c0013bdab3>
- Задача всегда (автоматически) "перезапускается" после завершения
- Идентификатор задачи одинаков для всех выполнений, который здесь
999e9de0-292f-412d-a9a8-b5c0013bdab3
(что видно и вAsyncResult
)
Дальнейшее чтение
В зависимости от точной цели вопроса, вас также может заинтересовать Celery canvas, например, цепочка задач (вызов задачи после завершения другой задачи, задачи могут быть разными, а могут быть одинаковыми).