Как запустить on_success в Celery ТОЛЬКО на последнем рабочем процессе в группе?
Я уже несколько дней ломаю голову над этим вопросом. У меня есть фоновая задача, которая выполняется около 1 часа. Я обрабатываю 5000 изображений. Я создаю группу задач celery, и каждая задача обрабатывает по 1000 изображений.
У меня есть метод on_success
, в котором я создаю запись базы данных 'Finished' и удаляю запись 'inProgress'.
Однако проблема в том, что даже при наличии кучи проверок в методе on_success, он все равно каким-то образом генерирует 2 записи базы данных 'Finished'. Звучит запутанно, смотрите код ниже:
def on_success(self, retval, task_id, *args, **kwargs):
self.image_job.refresh_from_db()
total_nr_of_images = self.image_job.total_nr_of_images
stored_results = ImageJobResults.objects.filter(job=self.image_job).count()
if stored_results == total_nr_of_images:
#It means all the workers have completed processing images
# for this job, and we need to create a csv file, and a Finished BC Job
with transaction.atomic(using='my_db'):
image_job = ImageJob.objects.select_for_update().get(id=self.image_job.id)
if not ImageJobFinished.objects.filter(job=image_job).exists():
job_in_progress = ImageJobInProgress.objects.get(job=image_job)
started_at = job_in_progress.started_at
image_job.generate_results(started_at=started_at,completed_at=now(),csv_rds_handle_job_id=self.image_job.pk)
#Now that a ImageJobFinished is created.
# We delete the ImageJobInProgress instance
job_in_progress.delete()
return super().on_success(retval, task_id, *args, **kwargs)
Теперь теоретически stored_results
должен быть равен total_nr_of_images
только у последнего рабочего. Но оба работника каким-то образом завершают работу в одно и то же время, и здесь они получают одинаковые значения. Оба они затем создают объект ImageJobFinished, хотя я проверяю существование другого объекта, он проходит эту проверку. Я просто не понимаю, как такое может быть. Думаю, это связано с тем, как оба процесса обращаются к БД.
Я проверил celery chord, но это помогает только внутри одного представления для небольших задач, я думаю.
Чтобы заставить Celery выполнять on_success только для последнего рабочего процесса в группе, используйте код для группировки задач и укажите задачу обратного вызова: Когда все задачи в группе будут завершены, задача обратного вызова будет выполнена с on_success, и только после последней задачи.