Django: как проверить наличие параллелизма при использовании заданий cron
У меня есть приложение Django, размещенное на удаленном сервере, которое запускает некоторые задания cron через относительно короткие промежутки времени. Одно из этих заданий выполняет команду, которая извлекает набор запросов из базы данных, вызывает внешний API и изменяет модели на основе ответа от API. Если я не позабочусь об этом, задание cron будет выполняться несколько раз, прежде чем API ответит, что приведет к проблемам параллелизма и одновременному обновлению нескольких экземпляров одних и тех же моделей
У меня есть разные стратегии, чтобы избежать этой проблемы, но я хотел бы написать тесты, которые я могу запустить локально, подражая вызову API и гарантируя, что две задачи cron не будут одновременно пытаться работать с объектом. Как мне это сделать?
Мой код выглядит примерно так (иллюстрация с целью показать проблему):
def task():
qs = MyModel.objects.filter(task_ran=False)
for model in qs:
resp = api_call(model.foo)
model.bar = resp
model.task_ran = True
model.save()
Итак, как я могу написать тест, который проверяет, что если task()
вызывается во второй раз до завершения первого вызова, то он не будет обновлять модель снова и API не будет вызван снова? Ниже приведен набросок теста, и я пытался поместить вызовы task()
в отдельные потоки, но это приводит к зависанию теста и - после KeyboardInterrupt
- к отказу с сообщением
django.db.utils.OperationalError: к базе данных "test_db" обращаются другие пользователи
DETAIL: Существует еще 1 сессия, использующая базу данных.``
.
@patch("api_call")
def test_task(self, mock_api_call):
def side_effect(number):
time.sleep(2)
return number + 1
mock_api_call.side_effect = side_effect
# how to call these simultaneously? threading causes Django to get mad
task()
task()
mock_api_call.assert_called_once()
Хорошо, я нашел ответ, основанный на этом ответе. В принципе, тестирование может быть сделано в Django с помощью потоков, но для этого требуется пара вещей:
- Во-первых, сам тестовый класс должен быть подклассом
TransactionTestCase
(по крайней мере, если он включает в себя какие-либо махинации с базой данных или использование.select_for_update
, что делает мой код). - Во-вторых, соединения с базой данных, открытые в каждом потоке, должны быть снова закрыты при завершении потока . Это можно сделать с помощью
ThreadPoolExecutor
для созданияFuture
, а затем добавить функцию обратного вызова для завершения потока через.add_done_callback
.
Таким образом, тест можно записать следующим образом:
import concurrent.futures
from django.db import connections
from django.test import TransactionTestCase
class CronTestCase(TransactionTestCase):
def on_done(self, future):
connections.close_all()
@patch("api_call")
def test_task(self, mock_api_call):
# setup the test
num_threads = 5
with concurrent.futures.ThreadPoolExecutor() as executor:
for _ in range(num_threads):
future = executor.submit(task)
future.add_done_callback(self.on_done)
mock_api_call.assert_called_once()