Запуск задания celery через django singnals
Я хотел бы использовать сигналы Django для запуска задачи celery следующим образом:
def delete_content(sender, instance, **kwargs):
task_id = uuid()
task = delete_libera_contents.apply_async(kwargs={"instance": instance}, task_id=task_id)
task.wait(timeout=300, interval=2)
Но я постоянно сталкиваюсь с kombu.exceptions.EncodeError: Object of type MusicTracks is not JSON serializable
Теперь я не уверен, как передать экземпляр MusicTracks, поскольку это экземпляр класса модели. Как я могу правильно передать такие экземпляры в мою задачу?
В моем tasks.py есть следующее:
@app.task(name="Delete Libera Contents", queue='high_priority_tasks')
def delete_libera_contents(instance, **kwargs):
libera_backend = instance.file.libera_backend
...
Никогда не посылайте instance
в задачу celery, вы должны послать только variables
например instanse primary key
и затем внутри задачи celery через эту pk
найти эту instance
и затем сделать вашу логику
Ваш код должен выглядеть следующим образом:
views.py
def delete_content(sender, **kwargs):
task_id = uuid()
task = delete_libera_contents.apply_async(kwargs={"instance_pk": sender.pk}, task_id=task_id)
task.wait(timeout=300, interval=2)
task.py
@app.task(name="Delete Libera Contents", queue='high_priority_tasks')
def delete_libera_contents(instance_pk, **kwargs):
instance = Instance.ojbects.get(pk = instance_pk)
libera_backend = instance.file.libera_backend
...
Вы можете найти это правило в документации по сельдерею (не могу найти ссылку), одна из причин представить себе ситуацию:
- вы отправляете свой экземпляр в celery tasks (он задерживается по какой-либо причине на 5 мин.)
- потом ваш проект делает логику с этим экземпляром,
before
ваша задача завершена - потом наступает время задачи celery, и он использует этот экземпляр старой версии, и этот экземпляр становится поврежденным
(это причина, как я думаю, а не из документации)
Во-первых, извините, что вопрос получился немного запутанным, особенно для людей, которые уже написали ответ. В моем случае, сигнал delete_content может быть вызван из трех различных моделей, так что на самом деле это выглядит следующим образом:
@receiver(pre_delete, sender=MusicTracks)
@receiver(pre_delete, sender=Movies)
@receiver(pre_delete, sender=TvShowEpisodes)
def delete_content(sender, instance, **kwargs):
delete_libera_contents.delay(instance_pk=instance.pk)
Таким образом, каждый раз, когда одна из этих моделей вызывает действие удаления, этот сигнал будет также вызывать задачу celery для фактического удаления материала в фоновом режиме (все хранится на S3).
Поскольку я не могу и не должен передавать экземпляры напрямую, как указал @oruchkin, я передаю instance.pk в задачу celery, который я затем должен найти в задаче celery, поскольку я не знаю в задаче celery, какая модель вызвала действие удаления:
@app.task(name="Delete Libera Contents", queue='high_priority_tasks')
def delete_libera_contents(instance_pk, **kwargs):
if Movies.objects.filter(pk=instance_pk).exists():
instance = Movies.objects.get(pk=instance_pk)
elif MusicTracks.objects.filter(pk=instance_pk).exists():
instance = MusicTracks.objects.get(pk=instance_pk)
elif TvShowEpisodes.objects.filter(pk=instance_pk).exists():
instance = TvShowEpisodes.objects.get(pk=instance_pk)
else:
raise logger.exception("Task: 'Delete Libera Contents', reports: No instance found (code: JFN4LK) - Warning")
libera_backend = instance.file.libera_backend
Вы можете спросить, почему вы просто не передаете отправителя из сигнала в задачу celery. Я также попробовал это сделать и снова, как уже указывалось, я не могу передать экземпляры и терплю неудачу с:
kombu.exceptions.EncodeError: Object of type ModelBase is not JSON serializable
Итак, похоже, что мне действительно придется жестко получать экземпляр, используя пункты if-elif-else в задаче celery.