Создавайте задачи asyncio бесконечно без блокировки (django)

Итак, я пытаюсь создать систему опроса внутри команд django для развлечения и для изучения async/django.

Я использую django_tenants, хотя это не слишком важно. Идея заключается в том, что есть таблица, в которой хранятся "арендаторы". Я хочу перебрать всех арендаторов в бесконечном цикле. Для каждого арендатора, который еще не находится в очереди async (возможно, потому что процесс, когда этот арендатор был найден в последний раз, еще не завершился), я хочу добавить новую задачу asyncio для запуска.

В моем тестовом примере есть один арендатор, которого я жду 5 секунд, чтобы убедиться, что у меня работает внутренняя очередь/цикл.

Идея заключается в том, что, скажем, есть 3 арендатора: t_a, t_b и t_c, и t_a и t_c занимают 2 и 1 секунду соответственно, а t_b занимает 5.

В конечном итоге я бы увидел бесконечный цикл, работающий следующим образом:

t_c Готово t_c Выполнено t_a Выполнено t_c Готово t_c Готово t_a Готово t_c Выполнено t_b Сделано t_c Готово --- Долгоиграющий t_a Готово ....

Таким образом, t_c не задерживает повторное выполнение других арендаторов.

При каждой итерации арендаторов извлекаются НОВЫЕ данные, так как арендаторы могут создаваться между запусками.

class Command(BaseCommand):
    help = 'Processes the webhook queue'

    def __init__(self):
        self.tenant_queue = []
        super().__init__()

    def add_arguments(self, parser):
        parser.add_argument(
            '--schema',
            action='store_true',
            help='Process on a single schema',
        )

    async def get_tenant_queue_count(self, tenant):
        with tenant_context(tenant):
            count = await sync_to_async(Queue.objects.count)()
            if tenant.schema_name == 't_b':
                random_int = 10
            else:
                random_int = randint(1, 3)

            await asyncio.sleep(random_int)
            print(self.tenant_queue)
            print(f'{tenant.name}: {count}, time: {random_int}')
            self.tenant_queue.pop(self.tenant_queue.index(tenant.id))

    def handle(self, *args, **options):
        def _get_tenants():
            return Tenant.objects.all()

        async def _main_routine():

            while True:
                tenants = await sync_to_async(_get_tenants, thread_sensitive=True)()
                for t in tenants:
                    if t.id not in self.tenant_queue:
                        self.tenant_queue.append(t.id) 

                processing_schemas = [self.get_tenant_queue_count(t) for t in tenants if t.id not in self.tenant_queue]

                await asyncio.gather(*processing_schemas)

        asyncio.run(_main_routine())

Теперь это умирает каждый раз с

django.core.exceptions.SynchronousOnlyOperation: Вы не можете вызвать это из асинхронного контекста - используйте поток или sync_to_async.

File "/app/app/webhook/management/commands/process_webhooks.py", line 48, in _main_routine
for t in tenants:
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 280, in __iter__
self._fetch_all()
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 1324, in _fetch_all
self._result_cache = list(self._iterable_class(self))
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 51, in __iter__
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
File "/usr/local/lib/python3.8/site-packages/django/db/models/sql/compiler.py", line 1173, in execute_sql
cursor = self.connection.cursor()
File "/usr/local/lib/python3.8/site-packages/django/utils/asyncio.py", line 31, in inner
raise SynchronousOnlyOperation(message)
django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.

Как мне перебрать всех жильцов?

Аххх разобрался... Чтобы действия над QuerySet не заставляли ORM пытаться запустить действия синхронизации в асинхронной функции, мне нужно было преобразовать QuerySet в список внутри функции sync_to_async


    async def get_queue_count(self, tenant):
        with tenant_context(tenant):
            count = await sync_to_async(Queue.objects.count)()
            if tenant.schema_name == 'company_xyz':
                random_int = 10
            else:
                random_int = randint(1, 3)

            await asyncio.sleep(random_int)
            print(self.queue)
            print(f'{tenant.name}: {count}, time: {random_int}')
            self.queue.pop(self.queue.index(tenant.id))

    def handle(self, *args, **options):
        def _get_tenants():
            # THIS LINE WRAPPED IN LIST
            return list(Tenant.objects.all())

        async def _main_routine():

            while True:
                tenants = await sync_to_async(_get_tenants, thread_sensitive=True)()
                for tenant in tenants:
                    if tenant.id not in self.queue:
                        self.queue.append(tenant.id)
                        asyncio.create_task(self.get_queue_count(tenant))

        asyncio.run(_main_routine())

Затем, чтобы заставить его работать бесконечно и без блокировки, я просто создаю задачи одну за другой в цикле и отслеживаю их с помощью простой целочисленной проверки в self.queue

Теперь все работает так, как ожидалось! Надеюсь, это кому-нибудь поможет.

Вернуться на верх