Django-celery-beat отправляет задание, рабочий получает его и не выполняет

Вот мои либы:

celery==5.3.6
django-celery-beat==2.5.0
django-celery-results==2.5.1
redis==5.0.1

settings.py:

INSTALLED_APPS = [
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",

    "bot_users.apps.BotUsersConfig",
    "products.apps.ProductsConfig",
    "categories.apps.CategoriesConfig",
    "commands.apps.CommandsConfig",
    "stories_news.apps.StoriesNewsConfig",
    "posts.apps.PostsConfig",
    "polls.apps.PollsConfig",
    "bot_statistics",
    "xml_import",


    "rest_framework",
    "nested_admin",
    "import_export",
    "ckeditor",
    "imagekit",
    "django_celery_beat",
    "django_celery_results",
]


REDIS_HOST = '127.0.0.1'
REDIS_PORT = '6379'
CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}'
CELERY_BROKER_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BEAT_SCHEDULER='django_celery_beat.schedulers:DatabaseScheduler'
CELERY_RESULT_EXTENDED = True

celery.py:

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'src.settings')

app = Celery('src')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

Функция задачи (в services.py):

@app.task
def import_data_from_xml(*args):
    from .admin import ProductResource
    from xml_import.models import XMLImportSettings
    from .formats import XML

    xml_settings = XMLImportSettings.objects.all().first()
    logging.info(f'xml_settings:{xml_settings}')

    folder_path = xml_settings.folder_path
    logging.info(f'folder_path:{folder_path}')
    file_name = xml_settings.file_name
    logging.info(f'file_name:{file_name}')

    # Полный путь к XML файлу
    xml_file_path = f"{folder_path}/{file_name}"
    logging.info(f'xml_file_path:{xml_file_path}')



    # Создаем экземпляр ресурса
    resource = ProductResource()
    logging.info(f'resource:{resource}')

    
    xml_formatter  = XML()
    logging.info(f'xml_formatter:{xml_formatter}')
    with open(xml_file_path, 'r', encoding='utf-8') as xml_file:
        # Создаем dataset из XML данных
        data = xml_formatter.create_dataset(xml_file.read())
        logging.info(f'data:{data}')

    # Импортируем данные из XML файла
    dataset = resource.import_data(dataset=data, raise_errors=True)
    logging.info(f'dataset:{dataset}')

    # Возвращаем результат импорта
    return dataset

Моя конфигурация сельдерея кажется правильной.

Это сельдерейные приставки: рабочий:

[2024-03-15 15:04:56,108: INFO/MainProcess] Connected to redis://127.0.0.1:6379//
[2024-03-15 15:04:56,116: WARNING/MainProcess] C:\Users\hp\desktop\job\venv\Lib\site-packages\celery\worker\consumer\consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-03-15 15:04:56,182: INFO/MainProcess] mingle: searching for neighbors
[2024-03-15 15:04:57,103: INFO/SpawnPoolWorker-3] child process 2004 calling self.run()
[2024-03-15 15:04:57,234: INFO/SpawnPoolWorker-1] child process 15900 calling self.run()
[2024-03-15 15:04:57,243: INFO/SpawnPoolWorker-4] child process 14456 calling self.run()
[2024-03-15 15:04:57,327: INFO/MainProcess] mingle: all alone
[2024-03-15 15:04:57,391: INFO/SpawnPoolWorker-2] child process 3280 calling self.run()
[2024-03-15 15:04:57,491: INFO/MainProcess] celery@LAPTOP-2554OM7H ready.
[2024-03-15 15:04:57,505: INFO/MainProcess] Task products.services.import_data_from_xml[917a7e74-79d0-47dd-b857-c9cb5c37995d] received
[2024-03-15 15:04:57,509: INFO/MainProcess] Task products.services.import_data_from_xml[5c056738-5974-4f3c-aed9-c824bf8bad0a] received
[2024-03-15 15:05:03,554: INFO/SpawnPoolWorker-5] child process 18000 calling self.run()
[2024-03-15 15:05:04,060: INFO/SpawnPoolWorker-6] child process 3016 calling self.run()
[2024-03-15 15:09:00,978: INFO/MainProcess] Task products.services.import_data_from_xml[1496d6c4-06e2-4117-9571-22d5314eb4b5] received
[2024-03-15 15:09:02,684: INFO/SpawnPoolWorker-7] child process 868 calling self.run()
[2024-03-15 15:09:02,684: INFO/SpawnPoolWorker-8] child process 8588 calling self.run()
[2024-03-15 15:09:02,684: INFO/SpawnPoolWorker-9] child process 9280 calling self.run()

биение:

(venv) PS C:\Users\hp\desktop\job> celery -A src beat -l info
celery beat v5.3.6 (emerald-rush) is starting.
__    -    ... __   -        _
LocalTime -> 2024-03-15 15:05:07
Configuration ->
    . broker -> redis://127.0.0.1:6379//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> django_celery_beat.schedulers.DatabaseScheduler

    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 seconds (5s)
[2024-03-15 15:05:07,522: INFO/MainProcess] beat: Starting...
[2024-03-15 15:06:20,632: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2024-03-15 15:06:36,583: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2024-03-15 15:09:00,336: INFO/MainProcess] Scheduler: Sending due task xml_import (products.services.import_data_from_xml)

Функция выполняет импорт xml из xml документа в django. Я ожидаю, что функция выполнится (она работает на 100%, я проверил), но celery не делает этого.

Я думаю, что проблема в конфигурации, но не совсем понимаю, как ее найти. Вроде бы все правильно настроено, beat отправляет задание и worker его получает

Я не вижу явных проблем в вашем коде, но подозреваю, что вы что-то не показываете. Может быть, просто установите регистратор задач с помощью celery.utils.log.get_task_logger, чтобы увидеть, действительно ли он запускает функцию. Найдите ссылку с информацией ниже.

https://www.programcreek.com/python/example/88906/celery.utils.log.get_task_logger

Вернуться на верх