Запуск задания celery через django singnals

Я хотел бы использовать сигналы Django для запуска задачи celery следующим образом:

def delete_content(sender, instance, **kwargs):
    task_id = uuid()
    task = delete_libera_contents.apply_async(kwargs={"instance": instance}, task_id=task_id)
    task.wait(timeout=300, interval=2)

Но я постоянно сталкиваюсь с kombu.exceptions.EncodeError: Object of type MusicTracks is not JSON serializable

Теперь я не уверен, как передать экземпляр MusicTracks, поскольку это экземпляр класса модели. Как я могу правильно передать такие экземпляры в мою задачу?

В моем tasks.py есть следующее:

@app.task(name="Delete Libera Contents", queue='high_priority_tasks')
def delete_libera_contents(instance, **kwargs):

    libera_backend = instance.file.libera_backend
    ...

Никогда не посылайте instance в задачу celery, вы должны послать только variables например instanse primary key и затем внутри задачи celery через эту pk найти эту instance и затем сделать вашу логику

Ваш код должен выглядеть следующим образом:

views.py

def delete_content(sender, **kwargs):
    task_id = uuid()
    task = delete_libera_contents.apply_async(kwargs={"instance_pk": sender.pk}, task_id=task_id)
    task.wait(timeout=300, interval=2)

task.py

@app.task(name="Delete Libera Contents", queue='high_priority_tasks')
def delete_libera_contents(instance_pk, **kwargs):

    instance = Instance.ojbects.get(pk = instance_pk)
    libera_backend = instance.file.libera_backend
    ...

Вы можете найти это правило в документации по сельдерею (не могу найти ссылку), одна из причин представить себе ситуацию:

  1. вы отправляете свой экземпляр в celery tasks (он задерживается по какой-либо причине на 5 мин.)
  2. потом ваш проект делает логику с этим экземпляром, before ваша задача завершена
  3. потом наступает время задачи celery, и он использует этот экземпляр старой версии, и этот экземпляр становится поврежденным

(это причина, как я думаю, а не из документации)

Во-первых, извините, что вопрос получился немного запутанным, особенно для людей, которые уже написали ответ. В моем случае, сигнал delete_content может быть вызван из трех различных моделей, так что на самом деле это выглядит следующим образом:

@receiver(pre_delete, sender=MusicTracks)
@receiver(pre_delete, sender=Movies)
@receiver(pre_delete, sender=TvShowEpisodes)
def delete_content(sender, instance, **kwargs):
    delete_libera_contents.delay(instance_pk=instance.pk)

Таким образом, каждый раз, когда одна из этих моделей вызывает действие удаления, этот сигнал будет также вызывать задачу celery для фактического удаления материала в фоновом режиме (все хранится на S3).

Поскольку я не могу и не должен передавать экземпляры напрямую, как указал @oruchkin, я передаю instance.pk в задачу celery, который я затем должен найти в задаче celery, поскольку я не знаю в задаче celery, какая модель вызвала действие удаления:

@app.task(name="Delete Libera Contents", queue='high_priority_tasks')
def delete_libera_contents(instance_pk, **kwargs):

    if Movies.objects.filter(pk=instance_pk).exists():
        instance = Movies.objects.get(pk=instance_pk)
    elif MusicTracks.objects.filter(pk=instance_pk).exists():
        instance = MusicTracks.objects.get(pk=instance_pk)
    elif TvShowEpisodes.objects.filter(pk=instance_pk).exists():
        instance = TvShowEpisodes.objects.get(pk=instance_pk)
    else:
        raise logger.exception("Task: 'Delete Libera Contents', reports: No instance found (code: JFN4LK) - Warning")

    libera_backend = instance.file.libera_backend

Вы можете спросить, почему вы просто не передаете отправителя из сигнала в задачу celery. Я также попробовал это сделать и снова, как уже указывалось, я не могу передать экземпляры и терплю неудачу с:

kombu.exceptions.EncodeError: Object of type ModelBase is not JSON serializable

Итак, похоже, что мне действительно придется жестко получать экземпляр, используя пункты if-elif-else в задаче celery.

Вернуться на верх