Как разбить задачу сельдерея внутри задачи

Я использую celery с django.внутри цикла forloop, каждый поворот устанавливает значение в db для отношения

lawyers=Lawyer.objects.filter(consultation_status=True)
for idx,lawyer in enumerate(lawyers):
    if(consultation.lawyer):
        break
    change_offered_lawyer.apply_async((id,lawyer.id),countdown=idx*60)

каждый оборот в цикле внутри задачи я проверяю условие, и моя цель - если условие выполнено, то прервать все эти задачи.

@app.task
def change_offered_lawyer(consulation_id,consulator_id):
    consulation=ConsultationOrder.objects.get(id=consulation_id)
    consulator=Lawyer.objects.get(id=consulator_id)
    if(consultation.lawyer):
        #break all tasks 
    consultation.offered_lawyer=consulator
    consultation.save()

Для более простого объяснения ниже мы бы просто использовали список чисел 1-10, который должен прерываться после 4-го числа, таким образом обрабатывая 1-3 и пропуская 4-10.

Решение 1: Использование цепочек задач

Краткое описание проекта:

Свяжите каждую задачу друг с другом. После возврата одной задачи вызывается следующая, и так далее. Задача должна вернуть значение, которое укажет, должна ли следующая задача продолжить выполнение или прерваться.

Продюсер:

from celery import chain

from task import change_offered_lawyer

tasks = []

lawyers_count = 10
for lawyer_id in range(1, lawyers_count + 1):
    if lawyer_id == 1:
        # If first item, manually set the initial value for should_continue
        tasks.append(change_offered_lawyer.s(True, lawyer_id))
    else:
        tasks.append(change_offered_lawyer.s(lawyer_id))

chain(*tasks).apply_async()

Потребитель:

from celery import shared_task


@shared_task
def change_offered_lawyer(should_continue, lawyer_id):
    if not should_continue:
        print(f"{lawyer_id=} Break...")
        return False

    if lawyer_id == 4:
        print(f"{lawyer_id=} Break now!")
        return False

    print(f"{lawyer_id=} Continue...")
    return True

Logs:

Ссылки:

Решение 2: Использование общего хранилища для отслеживания состояния

Краткое описание проекта:

Используйте хранилище базы данных, которое будет отслеживать, должно ли выполнение продолжаться или прерваться. Здесь мы будем использовать систему кэширования django-caching для простоты использования. Обратите внимание, что если вы когда-нибудь собираетесь запускать задачи параллельно без countdown, это решение может быть эффективным против условий гонки .

settings.py :

...
CACHES = {
    "default": {
        "BACKEND": "django.core.cache.backends.db.DatabaseCache",
        "LOCATION": "my_cache_table",
    }
}
...
  • После этого выполните python manage.py createcachetable

Продюсер:

from django.core.cache import cache

from task import change_offered_lawyer, reset_cache

cache.set(key='change_offered_lawyer', value='continue', timeout=None)

lawyers_count = 10
for lawyer_id in range(1, lawyers_count + 1):
    if lawyer_id == lawyers_count:
        # If last item, reset the cache
        change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5, link=reset_cache.si())
    else:
        change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5)

Потребитель:

from celery import shared_task
from django.core.cache import cache


@shared_task
def change_offered_lawyer(lawyer_id):
    if cache.get('change_offered_lawyer') == 'break':
        print(f"{lawyer_id=} Break...")
        return

    if lawyer_id == 4:
        cache.set(key='change_offered_lawyer', value='break', timeout=None)
        print(f"{lawyer_id=} Break now!")
        return

    print(f"{lawyer_id=} Continue...")


@shared_task
def reset_cache():
    cache.delete('change_offered_lawyer')

Logs:

Ссылки:

Решение 3: Запускать задачи синхронно (не рекомендуется)

Краткое описание проекта:

Обновите change_offered_lawyer, чтобы вернуть индикатор продолжения или прерывания. Затем после вызова apply_async() вызовите get(), чтобы синхронно получить результат и прерваться, если необходимо.

Вернуться на верх