Celery task called inside another task always goes to default queue even with queue specified

I’m running Celery with Django and Celery Beat. Celery Beat triggers an outer task every 30 minutes, and inside that task I enqueue another task per item. Both tasks are decorated to use the same custom queue, but the inner task still lands in the default queue.

from celery import shared_task
from django.db import transaction

@shared_task(queue="outer_queue")
def sync_all_items():
    """
    This outer task is triggered by Celery Beat every 30 minutes.
    It scans the DB for outdated items and enqueues a per-item task.
    """
    items = Item.objects.find_outdated_items()
    for item in items:
        # I expect this to enqueue on outer_queue as well
        process_item.apply_async_on_commit(args=(item.pk,))  


@shared_task(queue="outer_queue")
def process_item(item_id):
    do_some_processing(item_id=item_id)

Celery beat config:

CELERY_BEAT_SCHEDULE = {
    "sync_all_items": {
        "task": "myapp.tasks.sync_all_items",
        "schedule": crontab(minute="*/30"),
        # Beat is explicitly sending the outer task to outer_queue
        "options": {"queue": "outer_queue"},
    }
}

But, when I run the process_item task manually e.g. in the Django view, it respect the decorator and lands in expected queue.

I’ve tried:

  • Adding queue='outer_queue' to apply_async_on_commit

  • Calling process_item.delay(item.pk) instead

  • Using .apply_async(args=[item.pk], queue='outer_queue') inside transaction.on_commit

But no matter what, the inner tasks still show up in the default queue.

Use apply_async_on_commit properly

def apply_async_on_commit(task, *args, **kwargs):
    transaction.on_commit(lambda: task.apply_async(*args, **kwargs))

Then :
apply_async_on_commit(process_item, args=(item.pk,), queue="outer_queue")

If anyone needs to reach you regarding this, they can email doanthanhminhk55@gmail.com.

Вернуться на верх