Django celery - перезапуск задания после его завершения


    @app.task
    def Task1():
        print("this is task 1")
        return  "Task-1 Done"

Например, я хочу перезапустить задание после его завершения

Ручные последовательные звонки

Если вы хотите вызвать задачу несколько раз и заставить ее использовать один и тот же идентификатор задачи каждый раз, вы можете использовать аргумент task_id в apply_async.

Обратите внимание, что это не применимо с delay в соответствии с документацией:

delay(*args, **kwargs)

  • Версия apply_async() для звездных аргументов.

  • Не поддерживает дополнительные опции, включенные apply_async().

@app.task(bind=True)
def Task1(self):
    print(f"this is task 1 {self.request.id}")
>>> from tasks import Task1
>>> result = Task1.apply_async()
>>> result
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> result.id
'ba488582-9d7d-4bda-a19d-a2b0bf9b503f'
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> 
[2021-08-12 08:24:31,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:24:31,538: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:24:31,539: WARNING/ForkPoolWorker-4] 

[2021-08-12 08:24:31,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.00041928999962692615s: None
[2021-08-12 08:25:00,608: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4] 

[2021-08-12 08:25:00,609: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0002528750001147273s: None
[2021-08-12 08:25:06,137: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4] 

[2021-08-12 08:25:06,139: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0003467680007815943s: None
[2021-08-12 08:25:10,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4] 

[2021-08-12 08:25:10,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0006299719998423825s: None
  • Идентификатор задачи одинаков для всех исполнений, который здесь ba488582-9d7d-4bda-a19d-a2b0bf9b503f (что видно и из AsyncResult)

Автоматические последовательные вызовы

Если вы хотите постоянно перезапускать задачу, вот несколько вариантов. Все приведенные ниже варианты являются бесконечно рекурсивными. Возможно, вы захотите поставить некоторые условия внутри задачи о том, когда бесконечный цикл будет прерван, например, добавить вход в задачу и использовать его как основу, если выполнение уже должно остановиться.

Вариант 1: Вызов самой задачи асинхронно внутри одной задачи. Это в некоторой степени похоже на рекурсию. При этом будет использоваться тот же идентификатор задачи, что и при последовательных вызовах вручную (см. выше).

@app.task(bind=True)
def Task1(self):
    print(f"this is task 1 {self.request.id}")
    time.sleep(2)
    print("re-trigger task 1")
    Task1.apply_async(task_id=self.request.id)

Вариант 2: Запустите механизм retry, предоставляемый celery. При этом будет использован тот же идентификатор задачи в той же очереди, как описано в этой ссылке:

При вызове повторной попытки будет отправлено новое сообщение, используя тот же идентификатор задачи, и позаботится о том, чтобы сообщение было доставлено в ту же очередь, что и исходная задача.

Мы можем проверить это, отобразив id задачи через self.request.id.

@app.task(
    bind=True,
    default_retry_delay=0.1,
    retry_backoff=False,
    max_retries=None,
)
def Task1(self):
    print(f"this is task 1 {self.request.id}")
    time.sleep(2)
    print("re-trigger task 1")
    raise self.retry()

Вариант 3: Повторять только для конкретного сценария (здесь RestartTaskNeeded). То же самое с Вариантом 2, здесь также будет использоваться тот же самый идентификатор задачи в той же самой очереди.

class RestartTaskNeeded(Exception):
    pass


@app.task(
    bind=True,
    autoretry_for=(RestartTaskNeeded,),
    default_retry_delay=0.1,
    retry_backoff=False,
    max_retries=None,
)
def Task1(self):
    print(f"this is task 1 {self.request.id}")
    time.sleep(2)
    print("re-trigger task 1")
    raise RestartTaskNeeded

Выход:

>>> from tasks import Task1
>>> Task1.apply_async()
<AsyncResult: 999e9de0-292f-412d-a9a8-b5c0013bdab3>
  • Задача всегда (автоматически) "перезапускается" после завершения
  • Идентификатор задачи одинаков для всех выполнений, который здесь 999e9de0-292f-412d-a9a8-b5c0013bdab3 (что видно и в AsyncResult)

Дальнейшее чтение

В зависимости от точной цели вопроса, вас также может заинтересовать Celery canvas, например, цепочка задач (вызов задачи после завершения другой задачи, задачи могут быть разными, а могут быть одинаковыми).

Вернуться на верх