Error: 'Did you remember to import the module containing this task?'
I have this project https://github.com/LucasLeone/LaPanaSystem. I created a task for check the sales for change the state but doesn't work. The error:
lapanasystem_local_celeryworker | [2024-09-10 23:47:20,238: ERROR/SpawnProcess-8] Received unregistered task of type 'check_sales_for_delivery'.
lapanasystem_local_celeryworker | The message has been ignored and discarded.
lapanasystem_local_celeryworker |
lapanasystem_local_celeryworker | Did you remember to import the module containing this task?
lapanasystem_local_celeryworker | Or maybe you're using relative imports?
lapanasystem_local_celeryworker |
lapanasystem_local_celeryworker | Please see
lapanasystem_local_celeryworker | https://docs.celeryq.dev/en/latest/internals/protocol.html
lapanasystem_local_celeryworker | for more information.
lapanasystem_local_celeryworker |
lapanasystem_local_celeryworker | The full contents of the message body was:
lapanasystem_local_celeryworker | b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
lapanasystem_local_celeryworker |
lapanasystem_local_celeryworker | The full contents of the message headers:
lapanasystem_local_celeryworker | {'lang': 'py', 'task': 'check_sales_for_delivery', 'id': '5cdd77f4-6b48-4274-8edb-0926ac3419e0', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '5cdd77f4-6b48-4274-8edb-0926ac3419e0', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen18@ace8c29fe6f1', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}}`
lapanasystem_local_celeryworker |
lapanasystem_local_celeryworker | The delivery info for this task is:
lapanasystem_local_celeryworker | {'exchange': '', 'routing_key': 'celery'}
lapanasystem_local_celeryworker | Traceback (most recent call last):
lapanasystem_local_celeryworker | File "/usr/local/lib/python3.12/site-packages/celery/worker/consumer/consumer.py", line 659, in on_task_received
lapanasystem_local_celeryworker | strategy = strategies[type_]
lapanasystem_local_celeryworker | ~~~~~~~~~~^^^^^^^
lapanasystem_local_celeryworker | KeyError: 'check_sales_for_delivery'
I have 'lapanasystem.taskapp.tasks.py':
"""Celery tasks."""
# Django
from django.db.models import OuterRef, Subquery
# Models
from lapanasystem.sales.models import Sale, StateChange
# Celery
from celery import shared_task
# Utilities
from datetime import date, datetime
def change_state_to_ready_for_delivery(sale_id):
"""Change state to ready for delivery."""
sale = Sale.objects.get(id=sale_id)
last_state_change = StateChange.objects.filter(sale=sale).order_by('-start_date').first()
if last_state_change and last_state_change.end_date is None:
last_state_change.end_date = datetime.now()
last_state_change.save()
StateChange.objects.create(sale=sale, state=StateChange.PENDIENTE_ENTREGA)
@shared_task(name='check_sales_for_delivery')
def check_sales_for_delivery():
"""Check all sales with today's date and change their state to 'Pending for delivery'."""
today = date.today()
last_state_change = StateChange.objects.filter(
sale=OuterRef('pk')
).order_by('-start_date')
sales = Sale.objects.filter(
date__date=today,
is_active=True,
state_changes__state=Subquery(last_state_change.values('state')[:1])
)
for sale in sales:
change_state_to_ready_for_delivery.delay(sale.id)
I tried to add Command:
"""Script to create a periodic task for checking and updating subscriptions daily"""
# Django
from django.core.management.base import BaseCommand
from django.utils.timezone import now
# Celery
from django_celery_beat.models import PeriodicTask, CrontabSchedule
class Command(BaseCommand):
help = 'Create a periodic task for checking and updating subscriptions daily'
def handle(self, *args, **kwargs):
# Crear o obtener el CrontabSchedule para las 3:00 AM
schedule, created = CrontabSchedule.objects.get_or_create(
minute='0',
hour='3',
day_of_week='*',
day_of_month='*',
month_of_year='*',
)
# Verificar si ya existe la tarea periódica
if not PeriodicTask.objects.filter(name='Check and update sales at 3:00 AM').exists():
# Crear la tarea periódica
PeriodicTask.objects.create(
crontab=schedule,
name='Check and update sales at 3:00 AM',
task='check_sales_for_delivery',
start_time=now(),
)
self.stdout.write(self.style.SUCCESS('Periodic task created successfully'))
else:
self.stdout.write(self.style.WARNING('Periodic task already exists'))
that raise the error.