Django-celery-beat загружает задание, но celery worker не получает его

У меня возникли такие проблемы на моем celery beat и worker, где мой celery beat создает задачу, но celery worker не получает ее . Я использую elasticmq в качестве брокера. Логи моего celery beat docker exec -it app-1 celery -A app beat --loglevel info celery beat v5.3.6 (emerald-rush) is starting. __ - ... __ - _ LocalTime -> 2024-03-09 16:25:41 Configuration -> . broker -> sqs://user:**@sqs:9324// . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%INFO . maxinterval -> 5.00 minutes (300s) [2024-03-09 16:25:41,744: INFO/MainProcess] beat: Starting... [2024-03-09 16:25:41,821: INFO/MainProcess] Scheduler: Sending due task beat_task (app.tasks.beat_task) [2024-03-09 16:25:51,782: INFO/MainProcess] Scheduler: Sending due task beat_task (app.tasks.beat_task)

Мой рабочий журнал сельдерея

docker exec -it kole-app-api-app-1 celery -A app worker --loglevel info/usr/local/lib/python3.11/site-packages/celery/platforms.py:829: SecurityWarning: You're running the worker with superuser privileges: this isabsolutely not recommended!
Please specify a different user using the --uid option.
User information: uid=0 euid=0 gid=0 egid=0
warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(
-------------- celery@b4d9d70aafdc v5.3.6 (emerald-rush)--- ***** ------- ******* ---- Linux-5.10.16.3-microsoft-standard-WSL2-x86_64-with 2024-03-09 16:26:10
  • *** --- * ---
    
  • ** ---------- [config]
    
  • ** ---------- .> app:         app:0x7fd2fe122f90
    
  • ** ---------- .> transport:   sqs://user:**@sqs:9324//
    
  • ** ---------- .> results:     disabled://
    
  • *** --- * --- .> concurrency: 6 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** ------------------- [queues].> celery           exchange=celery(direct) key=celery
    
[tasks]. app.tasks.beat_task. app.tasks.web_task

[2024-03-09 16:26:11,394: WARNING/MainProcess] /usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py:507: CPendingDeprecationWarning: Параметр конфигурации broker_connection_retry больше не будет определять будут ли выполняться повторные попытки соединения с брокером во время запуска в Celery 6.0 и выше. Если вы хотите сохранить существующее поведение для повторных попыток соединений при запуске, вам следует установить параметр broker_connection_retry_on_startup в значение True. warnings.warn(

)

[2024-03-09 16:26:11,450: INFO/MainProcess] Подключились к sqs://user:**@sqs:9324// [2024-03-09 16:26:11,451: WARNING/MainProcess] /usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py:507: CPendingDeprecationWarning: Параметр конфигурации broker_connection_retry больше не будет определять будут ли выполняться повторные попытки соединения с брокером во время запуска в Celery 6.0 и выше. Если вы хотите сохранить существующее поведение для повторных попыток соединений при запуске, вам следует установить параметр broker_connection_retry_on_startup в значение True. warnings.warn(

)

[2024-03-09 16:26:11,509: INFO/MainProcess] celery@b4d9d70aafdc ready.

Он не подхватывает задачи из celery beat, вот фрагмент задачи


app = Celery("app")
logger = get_task_logger(__name__)

CELERY_CONFIG = {
    "CELERY_TASK_SERIALIZER": "json",
    "CELERY_ACCEPT_CONTENT": ["json"],
    "CELERY_RESULT_SERIALIZER": "json",
    "CELERY_RESULT_BACKEND": None,
    "CELERY_ENABLE_UTC": True,
    "CELERY_ENABLE_REMOTE_CONTROL": False,
}

BROKER_URL = "sqs://user:password@sqs:9324/"

CELERY_CONFIG.update(
    {
        "BROKER_URL": BROKER_URL,
        "BROKER_TRANSPORT": "sqs",
        "BROKER_TRANSPORT_OPTIONS": {
            "region": "us-west-2",
            "use_ssl": False,
            "visibility_timeout": 3600,
            "polling_interval": 60,
        },
    }
)

app.conf.update(**CELERY_CONFIG)
app.config_from_object("django.conf:settings", namespace="CELERY")

и вот задача,

@celery.app.task()
def beat_task() -> None:
    logger.info("Starting beat task...")
    time.sleep(2)
    logger.info("Done beat task.")

Я не получаю информацию от регистратора, печатающего результат на рабочем

Пробовал подключить сельдерей рабочий и бить, не получилось

Объект

celery.app здесь не определен. Объект Celery здесь инициализируется с именем app, а не celery.app. поэтому измените декоратор на @app.task().

app = Celery("app")
logger = get_task_logger(__name__).....

@app.task()
def beat_task().....

обновите все остальные ссылки с celery.app в вашем коде на app

Вернуться на верх