Обеспечение последовательного выполнения задач с помощью Celery в REST-фреймворке Django?
У меня есть проект Django REST Framework, в котором мне нужно последовательно вызывать две задачи Celery. В частности, мне нужно вызвать first_function
, а затем после некоторых операций вызвать second_function
, гарантируя, что second_function
будет запущен только после завершения first_function
.
# tasks.py
from celery import shared_task
@shared_task
def first_function():
# Logic for the first function
@shared_task
def second_function():
# Logic for the second function
Это мой view.py
first_function.delay()
# some codes ....
# now i need to call my second_function.delay()
second_function.delay()
Я не могу вызвать chain(first_function.s(), second_function.s()).delay()
, потому что мне нужно вызвать first_function
в начале моего кода. В случае исключения first_function
должен выполняться независимо. Однако, если все работает правильно, я хочу вызывать second_function
только после того, как убежусь, что first_function
завершился.
Меня беспокоит следующее:
- Если к представлению одновременно выполняется несколько запросов, я хочу, чтобы
second_function
корректно ожидал завершения соответствующегоfirst_function
. - Я немного запутался, как сделать так, чтобы
second_function
выполнялся после определенногоfirst_function
, связанного с тем же запросом? (Примечание: я не могу добавить sleep или любой блокирующий код в середине моего кода.)
Любые рекомендации или лучшие практики по работе с этим сценарием с помощью Celery были бы очень признательны!
Вы можете прочитать состояние первой_задачи в начале второй_задачи и подождать, пока состояние первой_задачи не станет 'SUCCESS'.
from celery.result import AsyncResult
first_task = first_function.delay()
# some codes ....
while True:
# Poll the first task until it's completed
result = AsyncResult(first_task.id)
if result.state == 'SUCCESS':
# If the first task is successful, call the second task
second_task = second_function.delay(result.result)
elif result.state in ['FAILURE', 'REVOKED']:
break
# Sleep for a short while before checking again
time.sleep(2)
Если вы не хотите использовать sleep, то можно ввести 3-ю задачу, чтобы не блокировать основной поток
from celery import shared_task
from celery.result import AsyncResult
@shared_task
def first_function():
# Your first function logic here
pass
@shared_task
def second_function(result):
# Your second function logic here
# 'result' is the result of first_function
pass
@shared_task(bind=True)
def monitor_tasks(self, first_task_id):
# Monitor the first task
result = AsyncResult(first_task_id)
if result.state == 'SUCCESS':
# If the first task is successful, call the second task
second_function.delay(result.result)
elif result.state in ['FAILURE', 'REVOKED']:
# If the first task failed or was revoked, stop monitoring
print("First task failed or was revoked. Not triggering the second task.")
else:
# Retry the monitoring task if the first task is not finished yet
self.retry(countdown=2, max_retries=None)
# Start the first function asynchronously
first_task = first_function.delay()
# some codes ....
# Start monitoring task
monitor_tasks.delay(first_task.id)