Обеспечение последовательного выполнения задач с помощью Celery в REST-фреймворке Django?

У меня есть проект Django REST Framework, в котором мне нужно последовательно вызывать две задачи Celery. В частности, мне нужно вызвать first_function, а затем после некоторых операций вызвать second_function, гарантируя, что second_function будет запущен только после завершения first_function.

# tasks.py
from celery import shared_task

@shared_task
def first_function():
    # Logic for the first function

@shared_task
def second_function():
    # Logic for the second function

Это мой view.py

first_function.delay()

# some codes ....
# now i need to call my second_function.delay()
   
second_function.delay()

Я не могу вызвать chain(first_function.s(), second_function.s()).delay(), потому что мне нужно вызвать first_function в начале моего кода. В случае исключения first_function должен выполняться независимо. Однако, если все работает правильно, я хочу вызывать second_function только после того, как убежусь, что first_function завершился.

Меня беспокоит следующее:

  • Если к представлению одновременно выполняется несколько запросов, я хочу, чтобы second_function корректно ожидал завершения соответствующего first_function.
  • Я немного запутался, как сделать так, чтобы second_function выполнялся после определенного first_function, связанного с тем же запросом? (Примечание: я не могу добавить sleep или любой блокирующий код в середине моего кода.)

Любые рекомендации или лучшие практики по работе с этим сценарием с помощью Celery были бы очень признательны!

Вы можете прочитать состояние первой_задачи в начале второй_задачи и подождать, пока состояние первой_задачи не станет 'SUCCESS'.

from celery.result import AsyncResult

first_task = first_function.delay()

# some codes ....

while True:
    # Poll the first task until it's completed
    result = AsyncResult(first_task.id)
    if result.state == 'SUCCESS':
        # If the first task is successful, call the second task
        second_task = second_function.delay(result.result)
    elif result.state in ['FAILURE', 'REVOKED']:
        break
    
    # Sleep for a short while before checking again
    time.sleep(2)

Если вы не хотите использовать sleep, то можно ввести 3-ю задачу, чтобы не блокировать основной поток

from celery import shared_task
from celery.result import AsyncResult

@shared_task
def first_function():
    # Your first function logic here
    pass

@shared_task
def second_function(result):
    # Your second function logic here
    # 'result' is the result of first_function
    pass

@shared_task(bind=True)
def monitor_tasks(self, first_task_id):
    # Monitor the first task
    result = AsyncResult(first_task_id)

    if result.state == 'SUCCESS':
        # If the first task is successful, call the second task
        second_function.delay(result.result)
    elif result.state in ['FAILURE', 'REVOKED']:
        # If the first task failed or was revoked, stop monitoring
        print("First task failed or was revoked. Not triggering the second task.")
    else:
        # Retry the monitoring task if the first task is not finished yet
        self.retry(countdown=2, max_retries=None)


# Start the first function asynchronously
first_task = first_function.delay()

# some codes ....

# Start monitoring task
monitor_tasks.delay(first_task.id)

Вернуться на верх