Как запустить задачи django celery с постоянным циклом событий и вызовами asyncio с правильным переподключением к БД?

У меня есть django и celery, postgresql с драйвером psycopg2 и внешняя зависимость с тяжелыми вызовами настройки, которую трудно модифицировать и которая сохраняет и повторно использует цикл событий asyncio после первого вызова. У меня также есть вызов asyncio, который мне нужно запустить в задаче celery. Этот метод использует эту внешнюю зависимость внутренне.

Похоже, что я не могу использовать вызов django async_to_sync, поскольку он не позволяет сохранить цикл. Поэтому при выполнении последующих задач возникает Event loop is closed, когда я вызываю зависимые вызовы lib.

Я использовал следующий способ решения этой проблемы:

class ExampleModel(models.Model)
    name = models.CharField()

async def my_method():
    example_entry = await ExampleModel.objects.aget(id=1)
    # external async dep call here
    return example_entry

@shared_task(name="example_task", bind=True)
def example_task(self):
    asyncio.get_event_loop().run_until_complete(my_method())

Он прекрасно работал несколько месяцев, пока не произошел перезапуск базы данных и соединение не было потеряно. Все дальнейшие задания приводили к django.db.utils.InterfaceError: connection already closed ошибке.

Погрузившись в работу, я создал набор тестовых задач:

@shared_task(name="debug_task_async_to_sync")
def debug_task_async_to_sync(self):
    import asgiref.sync
    res = asgiref.sync.async_to_sync(my_method)()
    print("async_to_sync", res)


@shared_task(name="debug_task_loop")
def debug_task_loop(self):
    res = utils.get_event_loop().run_until_complete(my_method())
    print("loop", res)


@shared_task(name="debug_task")
def debug_task():
    res = models.ExampleModel.objects.get(id=1)
    print("sync", res)

и перезапускали базу данных во время их выполнения.

Оба синхронизатора debug_task и debug_task_async_to_sync заново подключаются к базе данных. debug_task_loop прекращает выполнение для всех последующих вызовов. Более того, это происходит одновременно (например, не "зацикленные" задачи воссоздают соединение в то же время, когда "зацикленная" продолжает терпеть неудачу).

Есть ли способ сохранить цикл событий для внешней библиотеки с помощью async_to_sync или корректно обрабатывать повторные подключения к базе данных?

Вернуться на верх