Невозможно создать вторую очередь сельдерея
Я хочу создать две очереди celery (для разных типов задач)
Моя конфигурация сельдерея. Я ожидаю, что эта конфигурация создаст две очереди 'celery' and 'celery:1'
# celery.py
import os
from celery import Celery
from core_app.settings import INSTALLED_APPS
# this code copied from manage.py
# set the default Django settings module for the 'celery' app.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core_app.settings')
app = Celery("my_app")
# To start scheduling tasks based on priorities
# you need to configure queue_order_strategy transport option.
# ['celery', 'celery:1',] - two queues, the highest priority queue will be named celery
app.conf.broker_transport_options = {
'priority_steps': list(range(2)),
'sep': ':',
'queue_order_strategy': 'priority',
}
# read config from Django settings, the CELERY namespace would make celery
# config keys has `CELERY` prefix
app.config_from_object('django.conf:settings', namespace='CELERY')
# load tasks.py in django apps
app.autodiscover_tasks(lambda: INSTALLED_APPS)
Я определяю и запускаю задачу celery следующим образом:
@shared_task(queue="celery", soft_time_limit=600, time_limit=650)
def check_priority_urls(parsing_result_ids: List[int]):
check_urls(parsing_result_ids)
@shared_task(queue="celery:1", soft_time_limit=600, time_limit=650)
def check_common_urls(parsing_result_ids: List[int]):
check_urls(parsing_result_ids)
# running task
check_priority_urls.delay(parsing_results_ids)
Но я вижу только одну очередь
celery -A core_app inspect active_queues
-> celery@d1a287d1d3b1: OK
* {'name': 'celery', 'exchange': {'name': 'celery', 'type': 'direct', 'arguments': None, 'durable': True, 'passive': False, 'auto_delete': False, 'delivery_mode': None, 'no_declare': False}, 'routing_key': 'celery', 'queue_arguments': None, 'binding_arguments': None, 'consumer_arguments': None, 'durable': True, 'exclusive': False, 'auto_delete': False, 'no_ack': False, 'alias': None, 'bindings': [], 'no_declare': None, 'expires': None, 'message_ttl': None, 'max_length': None, 'max_length_bytes': None, 'max_priority': None}
1 node online.
А цветок сельдерея видит только одну очередь
Мой сельдерей в docker-compose
celery:
build: ./project
command: celery -A core_app worker --loglevel=info --concurrency=15 --max-memory-per-child=1000000
volumes:
- ./project:/usr/src/app
- ./project/media:/project/media
- ./project/logs:/project/logs
env_file:
- .env
environment:
# environment variables declared in the environment section override env_file
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- django
- redis
Ваш случай означает, что ваш рабочий на самом деле не прослушивает вторую очередь,
По умолчанию рабочий Celery будет слушать только очередь «по умолчанию», если вы явно не настроите или не проинструктируете его делать иначе
вы можете сделать что-то вроде этого
from celery import Celery
from kombu import Queue, Exchange
app = Celery("my_app")
# Declare your queues explicitly
app.conf.task_queues = (
Queue('celery', Exchange('celery'), routing_key='celery'),
Queue('celery:1', Exchange('celery'), routing_key='celery:1'),
)
# (Optional) If you want to direct certain tasks to certain queues,
# you can specify task_routes.
app.conf.task_routes = {
'path.to.check_priority_urls': {'queue': 'celery'},
'path.to.check_common_urls': {'queue': 'celery:1'},
}
# You can keep the rest of your config as is
также вам нужно убедиться, что ваш рабочий прослушивает обе очереди
celery -A core_app worker --loglevel=info --concurrency=15 \
--max-memory-per-child=1000000 \
-Q celery,celery:1
и в вашем файле docker нужно добавить эту команду убедитесь, что вы добавили очереди в команде
command: >
celery -A core_app worker
--loglevel=info
--concurrency=15
--max-memory-per-child=1000000
-Q celery,celery:1