Ошибка: 'Вы не забыли импортировать модуль, содержащий эту задачу?'

У меня есть этот проект https://github.com/LucasLeone/LaPanaSystem. Я создал задачу для проверки продаж на изменение состояния, но она не работает. Ошибка:

lapanasystem_local_celeryworker  | [2024-09-10 23:47:20,238: ERROR/SpawnProcess-8] Received unregistered task of type 'check_sales_for_delivery'.

lapanasystem_local_celeryworker  | The message has been ignored and discarded.

lapanasystem_local_celeryworker  |

lapanasystem_local_celeryworker  | Did you remember to import the module containing this task?

lapanasystem_local_celeryworker  | Or maybe you're using relative imports?

lapanasystem_local_celeryworker  |

lapanasystem_local_celeryworker  | Please see

lapanasystem_local_celeryworker  | https://docs.celeryq.dev/en/latest/internals/protocol.html

lapanasystem_local_celeryworker  | for more information.

lapanasystem_local_celeryworker  |

lapanasystem_local_celeryworker  | The full contents of the message body was:

lapanasystem_local_celeryworker  | b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)

lapanasystem_local_celeryworker  |

lapanasystem_local_celeryworker  | The full contents of the message headers:

lapanasystem_local_celeryworker  | {'lang': 'py', 'task': 'check_sales_for_delivery', 'id': '5cdd77f4-6b48-4274-8edb-0926ac3419e0', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '5cdd77f4-6b48-4274-8edb-0926ac3419e0', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen18@ace8c29fe6f1', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}}`

lapanasystem_local_celeryworker  |

lapanasystem_local_celeryworker  | The delivery info for this task is:

lapanasystem_local_celeryworker  | {'exchange': '', 'routing_key': 'celery'}

lapanasystem_local_celeryworker  | Traceback (most recent call last):

lapanasystem_local_celeryworker  |   File "/usr/local/lib/python3.12/site-packages/celery/worker/consumer/consumer.py", line 659, in on_task_received

lapanasystem_local_celeryworker  |     strategy = strategies[type_]

lapanasystem_local_celeryworker  |                ~~~~~~~~~~^^^^^^^

lapanasystem_local_celeryworker  | KeyError: 'check_sales_for_delivery'

У меня есть 'lapanasystem.taskapp.tasks.py':

"""Celery tasks."""

# Django
from django.db.models import OuterRef, Subquery

# Models
from lapanasystem.sales.models import Sale, StateChange

# Celery
from celery import shared_task

# Utilities
from datetime import date, datetime


def change_state_to_ready_for_delivery(sale_id):
    """Change state to ready for delivery."""
    sale = Sale.objects.get(id=sale_id)

    last_state_change = StateChange.objects.filter(sale=sale).order_by('-start_date').first()

    if last_state_change and last_state_change.end_date is None:
        last_state_change.end_date = datetime.now()
        last_state_change.save()

    StateChange.objects.create(sale=sale, state=StateChange.PENDIENTE_ENTREGA)


@shared_task(name='check_sales_for_delivery')
def check_sales_for_delivery():
    """Check all sales with today's date and change their state to 'Pending for delivery'."""
    today = date.today()

    last_state_change = StateChange.objects.filter(
        sale=OuterRef('pk')
    ).order_by('-start_date')

    sales = Sale.objects.filter(
        date__date=today,
        is_active=True,
        state_changes__state=Subquery(last_state_change.values('state')[:1])
    )

    for sale in sales:
        change_state_to_ready_for_delivery.delay(sale.id)

Я попытался добавить команду Command:

"""Script to create a periodic task for checking and updating subscriptions daily"""

# Django
from django.core.management.base import BaseCommand
from django.utils.timezone import now

# Celery
from django_celery_beat.models import PeriodicTask, CrontabSchedule


class Command(BaseCommand):
    help = 'Create a periodic task for checking and updating subscriptions daily'

    def handle(self, *args, **kwargs):
        # Crear o obtener el CrontabSchedule para las 3:00 AM
        schedule, created = CrontabSchedule.objects.get_or_create(
            minute='0',
            hour='3',
            day_of_week='*',
            day_of_month='*',
            month_of_year='*',
        )

        # Verificar si ya existe la tarea periódica
        if not PeriodicTask.objects.filter(name='Check and update sales at 3:00 AM').exists():
            # Crear la tarea periódica
            PeriodicTask.objects.create(
                crontab=schedule,
                name='Check and update sales at 3:00 AM',
                task='check_sales_for_delivery',
                start_time=now(),
            )
            self.stdout.write(self.style.SUCCESS('Periodic task created successfully'))
        else:
            self.stdout.write(self.style.WARNING('Periodic task already exists'))

, которые вызывают ошибку.

Вернуться на верх