Задача Celery, вызываемая внутри другой задачи, всегда попадает в очередь по умолчанию, даже если указана очередь
Я запускаю Celery с помощью Django и Celery Beat. Celery Beat запускает внешнюю задачу каждые 30 минут, и внутри этой задачи я ставлю в очередь другую задачу для каждого элемента. Обе задачи оформлены так, чтобы использовать одну и ту же пользовательскую очередь, но внутренняя задача по-прежнему попадает в очередь по умолчанию.
from celery import shared_task
from django.db import transaction
@shared_task(queue="outer_queue")
def sync_all_items():
"""
This outer task is triggered by Celery Beat every 30 minutes.
It scans the DB for outdated items and enqueues a per-item task.
"""
items = Item.objects.find_outdated_items()
for item in items:
# I expect this to enqueue on outer_queue as well
process_item.apply_async_on_commit(args=(item.pk,))
@shared_task(queue="outer_queue")
def process_item(item_id):
do_some_processing(item_id=item_id)
Конфигурация Celery beat:
CELERY_BEAT_SCHEDULE = {
"sync_all_items": {
"task": "myapp.tasks.sync_all_items",
"schedule": crontab(minute="*/30"),
# Beat is explicitly sending the outer task to outer_queue
"options": {"queue": "outer_queue"},
}
}
Но, когда я запускаю задачу process_item
вручную, например, в представлении Django, она соответствует декоратору и попадает в ожидаемую очередь.
Я пробовал:
Добавление queue='outer_queue' в apply_async_on_commit
Вызов process_item.задержка(item.pk) вместо
Используя .apply_async(args=[item.pk], queue='внешняя очередь') внутри transaction.on_commit
Но, несмотря ни на что, внутренние задачи по-прежнему отображаются в очереди по умолчанию.
Используйте apply_async_on_commit
правильно
def apply_async_on_commit(task, *args, **kwargs):
transaction.on_commit(lambda: task.apply_async(*args, **kwargs))
Затем:
применяем_async_on_commit(process_item, args=(item.pk,), queue="внешняя очередь")
Если кому-то понадобится связаться с вами по этому поводу, они могут написать по электронной почте doanthanhminhk55@gmail.com.