Celery task called inside another task always goes to default queue even with queue specified
I’m running Celery with Django and Celery Beat. Celery Beat triggers an outer task every 30 minutes, and inside that task I enqueue another task per item. Both tasks are decorated to use the same custom queue, but the inner task still lands in the default queue.
from celery import shared_task
from django.db import transaction
@shared_task(queue="outer_queue")
def sync_all_items():
"""
This outer task is triggered by Celery Beat every 30 minutes.
It scans the DB for outdated items and enqueues a per-item task.
"""
items = Item.objects.find_outdated_items()
for item in items:
# I expect this to enqueue on outer_queue as well
process_item.apply_async_on_commit(args=(item.pk,))
@shared_task(queue="outer_queue")
def process_item(item_id):
do_some_processing(item_id=item_id)
Celery beat config:
CELERY_BEAT_SCHEDULE = {
"sync_all_items": {
"task": "myapp.tasks.sync_all_items",
"schedule": crontab(minute="*/30"),
# Beat is explicitly sending the outer task to outer_queue
"options": {"queue": "outer_queue"},
}
}
But, when I run the process_item
task manually e.g. in the Django view, it respect the decorator and lands in expected queue.
I’ve tried:
Adding queue='outer_queue' to apply_async_on_commit
Calling process_item.delay(item.pk) instead
Using .apply_async(args=[item.pk], queue='outer_queue') inside transaction.on_commit
But no matter what, the inner tasks still show up in the default queue.
Use apply_async_on_commit
properly
def apply_async_on_commit(task, *args, **kwargs):
transaction.on_commit(lambda: task.apply_async(*args, **kwargs))
Then :
apply_async_on_commit(process_item, args=(item.pk,), queue="outer_queue")
If anyone needs to reach you regarding this, they can email doanthanhminhk55@gmail.com.