Django - Невозможно сделать celery worker и протестировать транзакции на тестовом файле без ошибки "You can't execute queries until the end of the 'atomic' block."

Вот в чем дело, я создаю тест, чтобы проверить, работает или нет улучшение, сделанное для повышения производительности, улучшение заключается в том, чтобы сделать некоторые сигналы логики асинхронной задачей вместо асинхронного процесса, я хотел проверить это, но когда я вручную создаю celery worker, я получаю

django.db.transaction.TransactionManagementError: В текущей транзакции произошла ошибка. Вы не можете выполнять запросы до конца 'атомарного' блока.

Чтобы иметь больше контекста... Я избегаю создания рабочего с самого начала, потому что конвейер не использует redis по умолчанию, или, по крайней мере, он избегает создания celery, поэтому мне нужно создать его вручную, но когда я создаю службу, я получаю ошибку

    def test_delete_optimal_deletion_time(self):
    """
    Verifies that the `deletion logic` is under an acceptable
    running time.
    :return:
    """
    # ... non-relevant code 
   # Start Celery worker in a separate context to avoid nested atomic transactions
    with start_worker(celery_app, perform_ping_check=False):
        data_delete_start = time.time()
        og_org.delete()
        data_delete_end = time.time()
        data_total_test_end = time.time()
        data_total_test_time = data_total_test_end - data_total_test_start
        data_delete_time = data_delete_end - data_delete_start
        print("time to create test data: ", data_creation_time)
        print("time to delete test data: ", data_delete_time)
        print("time to run all test data: ", data_total_test_time)
        print("settings.CELERY_TASK_ALWAYS_EAGER: ", settings.CELERY_TASK_ALWAYS_EAGER)

эта строка является "корнем" сбоя

            og_org.delete()

Это происходит потому, что метод удаления og_org использует transaction.atomic для предотвращения тупиковых ситуаций, Моя теория заключается в том, что определение "with" для использования агента действует как try catch, который в новых итерациях django считается атомарной транзакцией... поэтому возникает ошибка, я не смог найти решение этой проблемы...

для большего контекста это метод удаления экземпляра og_org:

def delete(self, *args, **kwargs):
    with transaction.atomic():
        return super().delete(*args, **kwargs)

То, что я пробовал

  1. завернув это в еще одну атомарную транзакцию
  2. определение celery worker в настройке класса
  3. попытка запустить эту og_org.delete() как отдельную задачу вне with
  4. пытался сделать это с помощью start_worker.... pass
  5. пробовал обойти с помощью библиотеки pytest

Я знаю, что самым простым решением было бы обновить некоторые конфигурации local_settings и разрешить создание агента в начале создания приложения для тестов, а не только для самого приложения, но я пытаюсь оставить это как последнее средство. Любая помощь будет оценена по достоинству

Вернуться на верх