Создавайте задачи asyncio бесконечно без блокировки (django)
Итак, я пытаюсь создать систему опроса внутри команд django для развлечения и для изучения async/django.
Я использую django_tenants, хотя это не слишком важно. Идея заключается в том, что есть таблица, в которой хранятся "арендаторы". Я хочу перебрать всех арендаторов в бесконечном цикле. Для каждого арендатора, который еще не находится в очереди async (возможно, потому что процесс, когда этот арендатор был найден в последний раз, еще не завершился), я хочу добавить новую задачу asyncio для запуска.
В моем тестовом примере есть один арендатор, которого я жду 5 секунд, чтобы убедиться, что у меня работает внутренняя очередь/цикл.
Идея заключается в том, что, скажем, есть 3 арендатора: t_a, t_b и t_c, и t_a и t_c занимают 2 и 1 секунду соответственно, а t_b занимает 5.
В конечном итоге я бы увидел бесконечный цикл, работающий следующим образом:
t_c Готово t_c Выполнено t_a Выполнено t_c Готово t_c Готово t_a Готово t_c Выполнено t_b Сделано t_c Готово --- Долгоиграющий t_a Готово ....
Таким образом, t_c не задерживает повторное выполнение других арендаторов.
При каждой итерации арендаторов извлекаются НОВЫЕ данные, так как арендаторы могут создаваться между запусками.
class Command(BaseCommand):
help = 'Processes the webhook queue'
def __init__(self):
self.tenant_queue = []
super().__init__()
def add_arguments(self, parser):
parser.add_argument(
'--schema',
action='store_true',
help='Process on a single schema',
)
async def get_tenant_queue_count(self, tenant):
with tenant_context(tenant):
count = await sync_to_async(Queue.objects.count)()
if tenant.schema_name == 't_b':
random_int = 10
else:
random_int = randint(1, 3)
await asyncio.sleep(random_int)
print(self.tenant_queue)
print(f'{tenant.name}: {count}, time: {random_int}')
self.tenant_queue.pop(self.tenant_queue.index(tenant.id))
def handle(self, *args, **options):
def _get_tenants():
return Tenant.objects.all()
async def _main_routine():
while True:
tenants = await sync_to_async(_get_tenants, thread_sensitive=True)()
for t in tenants:
if t.id not in self.tenant_queue:
self.tenant_queue.append(t.id)
processing_schemas = [self.get_tenant_queue_count(t) for t in tenants if t.id not in self.tenant_queue]
await asyncio.gather(*processing_schemas)
asyncio.run(_main_routine())
Теперь это умирает каждый раз с
django.core.exceptions.SynchronousOnlyOperation: Вы не можете вызвать это из асинхронного контекста - используйте поток или sync_to_async.
File "/app/app/webhook/management/commands/process_webhooks.py", line 48, in _main_routine
for t in tenants:
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 280, in __iter__
self._fetch_all()
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 1324, in _fetch_all
self._result_cache = list(self._iterable_class(self))
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 51, in __iter__
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
File "/usr/local/lib/python3.8/site-packages/django/db/models/sql/compiler.py", line 1173, in execute_sql
cursor = self.connection.cursor()
File "/usr/local/lib/python3.8/site-packages/django/utils/asyncio.py", line 31, in inner
raise SynchronousOnlyOperation(message)
django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
Как мне перебрать всех жильцов?
Аххх разобрался... Чтобы действия над QuerySet не заставляли ORM пытаться запустить действия синхронизации в асинхронной функции, мне нужно было преобразовать QuerySet в список внутри функции sync_to_async
async def get_queue_count(self, tenant):
with tenant_context(tenant):
count = await sync_to_async(Queue.objects.count)()
if tenant.schema_name == 'company_xyz':
random_int = 10
else:
random_int = randint(1, 3)
await asyncio.sleep(random_int)
print(self.queue)
print(f'{tenant.name}: {count}, time: {random_int}')
self.queue.pop(self.queue.index(tenant.id))
def handle(self, *args, **options):
def _get_tenants():
# THIS LINE WRAPPED IN LIST
return list(Tenant.objects.all())
async def _main_routine():
while True:
tenants = await sync_to_async(_get_tenants, thread_sensitive=True)()
for tenant in tenants:
if tenant.id not in self.queue:
self.queue.append(tenant.id)
asyncio.create_task(self.get_queue_count(tenant))
asyncio.run(_main_routine())
Затем, чтобы заставить его работать бесконечно и без блокировки, я просто создаю задачи одну за другой в цикле и отслеживаю их с помощью простой целочисленной проверки в self.queue
Теперь все работает так, как ожидалось! Надеюсь, это кому-нибудь поможет.