Обратный вызов аккорда сельдерея не всегда запускается
Я пытаюсь использовать аккорд для запуска обновления отчета после завершения обновления.
@shared_task(autoretry_for=(Exception,), retry_backoff=True, retry_kwargs {'max_retries': 5})
def upload(df: pd.DataFrame, **kwargs):
ed = EntityDataPoint(df, **kwargs)
uploadtasks, source, subtype = ed.upload_database()
chord(uploadtasks)(final_report.si(logger=ed.logger, source=source, subtype=subtype, index=ed.index))
При том, что uploadtask является :
g = group(unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size])
for i in range(0, len(self.data), chunk_size))
Когда заголовок аккорда содержит более 2 элементов, первые две подзадачи выполняются, а остальные задачи в группе и обратный вызов не запускаются, при этом нигде не отправляется ошибка и нет никакой информации в логах celery workers. После осмотра рабочих, с активным celery inspect, запланированных, кажется, нет ни одной ожидающей задачи в очереди.
Если заголовок (группа) имеет 2 или менее элементов, то проблем нет, задания группы завершаются, вызывается обратный вызов.
Похоже, это не зависит от размера элементов (если каждая подзадача в группе отправляет 100 строк, мы все еще имеем такое же поведение для 1000 строк).
Если я просто запускаю групповые задачи, без аккорда и обратного вызова, задачи выполняются без ошибок.
Я пробовал использовать разные синтаксисы для аккорда, и, похоже, это ничего не меняет.
Я попробовал использовать функцию group.link, чтобы посмотреть, что произойдет, и группа, кажется, завершает работу, но обратный вызов не происходит после завершения всех задач группы, конечно, поскольку он не предназначен для этого, как я понял из документации, так что это не совсем то поведение, которое я хочу.
Я использую Celery 5.2.3 с брокером Redis 7.0.0 и бэкендом Django 3.2.13 с psql, с python 3.9. Все работает на отдельных контейнерах docker.
Похоже, что использование группы непосредственно в качестве заголовка аккорда создавало проблему. Вероятно, в качестве заголовка использовалась первая задача в группе, а в качестве обратного вызова - вторая (хотя я не могу понять, почему это не вызвало ошибку с аргументами этих задач). Вместо того, чтобы возвращать :
group(unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size])
for i in range(0, len(self.data), chunk_size))
Теперь я возвращаю :
[unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size])
for i in range(0, len(self.data), chunk_size)]
И он работает, как и ожидалось.