Django Celery Task: AttributeError: объект типа 'Command' не имеет атрибута '__annotations__'

Я пытаюсь установить задачу celery, которая в основном запускает скрипт для деактивации кучи объектов внутри json-файла. Раньше я запускал его вручную, и он работал нормально, но теперь я получаю следующую ошибку, когда Celery пытается запустить его как задачу:

AttributeError: type object 'Command' has no attribute '__annotations__'

Может ли кто-нибудь сказать мне, почему это не работает? Вот код, который я использую:

tasks.py

import requests
import json

from project.users.models import Facility
from django.core.management.base import BaseCommand
from config import celery_app

IMPORT_URL = 'https://example.com/file.json'

@celery_app.task()
class Command(BaseCommand):
    def delete_facility(self, data):
                    
                    data = data
                    

                    if Facility.objects.filter(UUID=data, ReportedStatusField='closed'):
                        
                        print("Facility status already set to closed")


                    else:
                        
                        facility, facility_closed = Facility.objects.update_or_create(UUID=data,
                            defaults={
                            'ReportedStatusField': 'closed'
                            }
                        )
                        print("Facility status set to closed")
                        

    def handle(self, *args, **options):

        headers = {'Content-Type': 'application/json'}
        response = requests.get(
            url=IMPORT_URL,
            headers=headers,
        )

        response.raise_for_status()
        data = response.json()


        for data, data_object in data.items():
            if data in ["deleted"]:
                for data in data_object:
                    self.delete_facility(data)

Кажется, я разобрался. Теперь все работает правильно. Я вызываю базовый командный файл из tasks.py вместо того, чтобы добавить код непосредственно в tasks.py:

from celery import shared_task
from django.core.management import call_command

@shared_task
def start_import():
    call_command("import_from_url", )
Вернуться на верх