Celery AttributeError : объект 'download_all' не имеет атрибута 'location'
Я хочу создать индикатор выполнения для своего проекта. У меня есть класс, и этот класс имеет несколько функций. Особенно одна из них занимает много времени (def download_all
), и это моя основная причина, по которой я хочу создать индикатор выполнения.
Я успешно установил celery, celery-progress и т.д., и они работают нормально. Моя проблема заключается в следующем: Я хочу интегрировать индикатор прогресса в функцию download_all
. I
Выдает ошибку: AttributeError: у объекта 'download_all' нет атрибута 'location'. Я уверен, что эта ошибка из-за задачи celery, потому что когда я запускаю ее без celery, она работает.
Это мой код. Как я могу это сделать? views.py
def setup_wizard(request):
...
task = (functions.myClass(n_username, n_password,
n_url, n_port, db_password,
username=request.user.username)).delay(5)
task_id = task.download_all(request.user.username, setup.nessus_username, setup.nessus_password,
setup.nessus_url, setup.nessus_port, setup.db_password,
username=request.user.username).task_id
// in task_id download_all parameters, I had to user request.user.username instead of "self" parameter.
...
context = {
...
'task_id':task_id
}
functions.py
@shared_task(bind=True)
class myClass():
def __init__(self, nessus_user, nessus_password, nessus_url, nessus_port, db_password, username):
self.location = zt_dosya_url
self.static_fields = {}
self.download_all(n_user, n_password, n_url, n_port, db_password)
self.send(username)
...
def download_all(self, n_user, n_password, n_url, n_port, db_password):
if not os.path.exists(self.location):
os.mkdir(self.location)
else:
shutil.rmtree(self.location, ignore_errors=True)
os.mkdir(self.location)
...
for s in scans:
i = 0
scanner.scan_name = s['name']
...
// I have to get the values of progress bar from here
//for example
// progress_recorder.set_progress(i + 1, len(scans), f'On iteration {i}')
...
i += 1
и вот какая ошибка : AttributeError: объект типа 'myClass' не имеет атрибута 'annotations'