Как запустить задачи django celery с постоянным циклом событий и вызовами asyncio с правильным переподключением к БД?
У меня есть django и celery, postgresql с драйвером psycopg2 и внешняя зависимость с тяжелыми вызовами настройки, которую трудно модифицировать и которая сохраняет и повторно использует цикл событий asyncio после первого вызова. У меня также есть вызов asyncio, который мне нужно запустить в задаче celery. Этот метод использует эту внешнюю зависимость внутренне.
Похоже, что я не могу использовать вызов django async_to_sync, поскольку он не позволяет сохранить цикл. Поэтому при выполнении последующих задач возникает Event loop is closed
, когда я вызываю зависимые вызовы lib.
Я использовал следующий способ решения этой проблемы:
class ExampleModel(models.Model)
name = models.CharField()
async def my_method():
example_entry = await ExampleModel.objects.aget(id=1)
# external async dep call here
return example_entry
@shared_task(name="example_task", bind=True)
def example_task(self):
asyncio.get_event_loop().run_until_complete(my_method())
Он прекрасно работал несколько месяцев, пока не произошел перезапуск базы данных и соединение не было потеряно. Все дальнейшие задания приводили к django.db.utils.InterfaceError: connection already closed
ошибке.
Погрузившись в работу, я создал набор тестовых задач:
@shared_task(name="debug_task_async_to_sync")
def debug_task_async_to_sync(self):
import asgiref.sync
res = asgiref.sync.async_to_sync(my_method)()
print("async_to_sync", res)
@shared_task(name="debug_task_loop")
def debug_task_loop(self):
res = utils.get_event_loop().run_until_complete(my_method())
print("loop", res)
@shared_task(name="debug_task")
def debug_task():
res = models.ExampleModel.objects.get(id=1)
print("sync", res)
и перезапускали базу данных во время их выполнения.
Оба синхронизатора debug_task
и debug_task_async_to_sync
заново подключаются к базе данных. debug_task_loop
прекращает выполнение для всех последующих вызовов. Более того, это происходит одновременно (например, не "зацикленные" задачи воссоздают соединение в то же время, когда "зацикленная" продолжает терпеть неудачу).
Есть ли способ сохранить цикл событий для внешней библиотеки с помощью async_to_sync или корректно обрабатывать повторные подключения к базе данных?