Как запустить on_success в Celery ТОЛЬКО на последнем рабочем процессе в группе?

Я уже несколько дней ломаю голову над этим вопросом. У меня есть фоновая задача, которая выполняется около 1 часа. Я обрабатываю 5000 изображений. Я создаю группу задач celery, и каждая задача обрабатывает по 1000 изображений.

У меня есть метод on_success, в котором я создаю запись базы данных 'Finished' и удаляю запись 'inProgress'.

Однако проблема в том, что даже при наличии кучи проверок в методе on_success, он все равно каким-то образом генерирует 2 записи базы данных 'Finished'. Звучит запутанно, смотрите код ниже:

 def on_success(self, retval, task_id, *args, **kwargs):
        self.image_job.refresh_from_db()
        total_nr_of_images = self.image_job.total_nr_of_images
        stored_results = ImageJobResults.objects.filter(job=self.image_job).count()
        if stored_results == total_nr_of_images:
            #It means all the workers have completed processing images
            # for this job, and we need to create a csv file, and a Finished BC Job
            with transaction.atomic(using='my_db'):
                image_job = ImageJob.objects.select_for_update().get(id=self.image_job.id)
                if not ImageJobFinished.objects.filter(job=image_job).exists():
                    job_in_progress = ImageJobInProgress.objects.get(job=image_job)
                    started_at = job_in_progress.started_at
                    image_job.generate_results(started_at=started_at,completed_at=now(),csv_rds_handle_job_id=self.image_job.pk)
                    #Now that a ImageJobFinished is created.
                    # We delete the ImageJobInProgress instance
                    job_in_progress.delete()

        return super().on_success(retval, task_id, *args, **kwargs)

Теперь теоретически stored_results должен быть равен total_nr_of_images только у последнего рабочего. Но оба работника каким-то образом завершают работу в одно и то же время, и здесь они получают одинаковые значения. Оба они затем создают объект ImageJobFinished, хотя я проверяю существование другого объекта, он проходит эту проверку. Я просто не понимаю, как такое может быть. Думаю, это связано с тем, как оба процесса обращаются к БД.

Я проверил celery chord, но это помогает только внутри одного представления для небольших задач, я думаю.

Чтобы заставить Celery выполнять on_success только для последнего рабочего процесса в группе, используйте код для группировки задач и укажите задачу обратного вызова: Когда все задачи в группе будут завершены, задача обратного вызова будет выполнена с on_success, и только после последней задачи.

Вернуться на верх