Как разбить задачу сельдерея внутри задачи
Я использую celery с django.внутри цикла forloop, каждый поворот устанавливает значение в db для отношения
lawyers=Lawyer.objects.filter(consultation_status=True)
for idx,lawyer in enumerate(lawyers):
if(consultation.lawyer):
break
change_offered_lawyer.apply_async((id,lawyer.id),countdown=idx*60)
каждый оборот в цикле внутри задачи я проверяю условие, и моя цель - если условие выполнено, то прервать все эти задачи.
@app.task
def change_offered_lawyer(consulation_id,consulator_id):
consulation=ConsultationOrder.objects.get(id=consulation_id)
consulator=Lawyer.objects.get(id=consulator_id)
if(consultation.lawyer):
#break all tasks
consultation.offered_lawyer=consulator
consultation.save()
Для более простого объяснения ниже мы бы просто использовали список чисел 1-10, который должен прерываться после 4-го числа, таким образом обрабатывая 1-3 и пропуская 4-10.
Решение 1: Использование цепочек задач
Краткое описание проекта:
Свяжите каждую задачу друг с другом. После возврата одной задачи вызывается следующая, и так далее. Задача должна вернуть значение, которое укажет, должна ли следующая задача продолжить выполнение или прерваться.
Продюсер:
from celery import chain
from task import change_offered_lawyer
tasks = []
lawyers_count = 10
for lawyer_id in range(1, lawyers_count + 1):
if lawyer_id == 1:
# If first item, manually set the initial value for should_continue
tasks.append(change_offered_lawyer.s(True, lawyer_id))
else:
tasks.append(change_offered_lawyer.s(lawyer_id))
chain(*tasks).apply_async()
Потребитель:
from celery import shared_task
@shared_task
def change_offered_lawyer(should_continue, lawyer_id):
if not should_continue:
print(f"{lawyer_id=} Break...")
return False
if lawyer_id == 4:
print(f"{lawyer_id=} Break now!")
return False
print(f"{lawyer_id=} Continue...")
return True
Logs:
Ссылки:
- https://docs.celeryproject.org/en/latest/getting-started/next-steps.html#chains
- https://docs.celeryproject.org/en/latest/userguide/canvas.html#canvas-chain .
- https://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks .
Решение 2: Использование общего хранилища для отслеживания состояния
Краткое описание проекта:
Используйте хранилище базы данных, которое будет отслеживать, должно ли выполнение продолжаться или прерваться. Здесь мы будем использовать систему кэширования django-caching для простоты использования. Обратите внимание, что если вы когда-нибудь собираетесь запускать задачи параллельно без countdown
, это решение может быть эффективным против условий гонки .
settings.py :
...
CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.db.DatabaseCache",
"LOCATION": "my_cache_table",
}
}
...
- После этого выполните
python manage.py createcachetable
Продюсер:
from django.core.cache import cache
from task import change_offered_lawyer, reset_cache
cache.set(key='change_offered_lawyer', value='continue', timeout=None)
lawyers_count = 10
for lawyer_id in range(1, lawyers_count + 1):
if lawyer_id == lawyers_count:
# If last item, reset the cache
change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5, link=reset_cache.si())
else:
change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5)
Потребитель:
from celery import shared_task
from django.core.cache import cache
@shared_task
def change_offered_lawyer(lawyer_id):
if cache.get('change_offered_lawyer') == 'break':
print(f"{lawyer_id=} Break...")
return
if lawyer_id == 4:
cache.set(key='change_offered_lawyer', value='break', timeout=None)
print(f"{lawyer_id=} Break now!")
return
print(f"{lawyer_id=} Continue...")
@shared_task
def reset_cache():
cache.delete('change_offered_lawyer')
Logs:
Ссылки:
Решение 3: Запускать задачи синхронно (не рекомендуется)
Краткое описание проекта:
Обновите change_offered_lawyer
, чтобы вернуть индикатор продолжения или прерывания. Затем после вызова apply_async()
вызовите get()
, чтобы синхронно получить результат и прерваться, если необходимо.