Как использовать прогресс-бар celery в функции в классе?

Я хочу создать индикатор выполнения для своего проекта. У меня есть класс, и этот класс имеет несколько функций. Особенно одна из них занимает много времени (def download_all), и это моя основная причина, по которой я хочу создать индикатор выполнения.

Я успешно установил celery, celery-progress и т.д., и они работают нормально. Моя проблема заключается в следующем: Я хочу интегрировать индикатор прогресса в функцию download_all. Я пробую многое, но это дает ошибки.

Это мой код. Как я могу это сделать? views.py

def setup_wizard(request):
   ...
   task = (functions.myClass(n_username, n_password,
                                          n_url, n_port, db_password,
                                          username=request.user.username)).delay(5)
   task_id = task.task_id
   ...
   context = {
     ...
     'task_id':task_id
    }

functions.py

@shared_task(bind=True)
class myClass():

    def __init__(self, nessus_user, nessus_password, nessus_url, nessus_port, db_password, username):
        self.location = zt_dosya_url
        self.static_fields = {}
        self.download_all(n_user, n_password, n_url, n_port, db_password)
        self.send(username)
        ...
     def download_all(self, n_user, n_password, n_url, n_port, db_password):
         ...
         for s in scans:
                i = 0
                scanner.scan_name = s['name']
                ...
                // I have to get the values of progress bar from here
                //for example
                // progress_recorder.set_progress(i + 1, len(scans), f'On iteration {i}')
                ...
                i += 1      

и вот какая ошибка : AttributeError: объект типа 'myClass' не имеет атрибута 'annotations'

Вернуться на верх