Django celery beat не может найти приложение для планирования задач

Я пытаюсь запланировать задачу в celery.

celery.py внутри основной директории проекта

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE','example_api.settings')

app = Celery('example_api')

app.config_from_object('django.conf:settings',namespace="CELERY")

app.conf.beat_schedule = {
    'add_trades_to_database_periodically': {
        'task': 'transactions.tasks.add_trades_to_database',
        'schedule': crontab(minute='*/1'),
        # 'args': (16,16),
    },
}

app.autodiscover_tasks()

В проекте есть одно приложение, называемое транзакциями.

функция внутри transactions/tasks.py

@task(name="add_trades_to_database")
def add_trades_to_database():
    start_date = '20000101' #YYYYDDMM
    end_date = '20150101'
    url = f'https://api.example.com/trade-retriever-api/v1/fx/trades?fromDate={start_date}&toDate={end_date}'
    content = get_json(url)
    print(content)
    save_data_to_model(content,BulkTrade)

settings.py

Я использую rabbitmq-server для taskqueues.

  • Мой rabbitmq-сервер активен все время.
  • Другие задачи celery работают абсолютно нормально. (Я пробовал реализовать функцию электронной почты, которая хорошо работает с помощью celery).

Я запускаю сельдерейный рабочий и бью, используя

celery -A project worker -l info
celery -A project beat -l info

В рабочем терминале появляется следующая ошибка

The full contents of the message body was:
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
  File "/home/......./env/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 581, in on_task_received
    strategy = strategies[type_]
KeyError: 'transactions.tasks.add_trades_to_database'

Вы явно назвали задачу "add_trades_to_database"

@task(name="add_trades_to_database")
def add_trades_to_database():
    ...

Однако вы планируете задачу под именем "transactions.tasks.add_trades_to_database"

app.conf.beat_schedule = {
    'add_trades_to_database_periodically': {
        'task': 'transactions.tasks.add_trades_to_database',
        'schedule': crontab(minute='*/1'),
    },
}

Решения, которые вы можете выбрать:

  1. Не задавайте имя задачи в явном виде. Celery установит для нее имя по умолчанию, основываясь на именах модулей и имени функции, как документировано. Расписание тактов остается прежним (при условии, что add_trades_to_database находится в my_proj/transactions/tasks.py::add_trades_to_database)
    @task
    def add_trades_to_database():
        ...
    
  2. Или вы можете просто изменить расписание тактов, чтобы оно ссылалось на явно заданное имя.
    app.conf.beat_schedule = {
        'add_trades_to_database_periodically': {
            'task': 'add_trades_to_database',
            'schedule': crontab(minute='*/1'),
        },
    }
    

Подчеркните, что из этих двух решений выберите только одно, потому что оба решения, очевидно, все равно не будут работать по той же причине.

Кроме того, обратите внимание, что при использовании Django принято использовать декоратор @shared_task, поэтому вы можете изменить ваши задачи на:

from celery import shared_task


@shared_task
def add_trades_to_database():
   ...
Вернуться на верх