Работа с транзакциями базы данных Celery и Django

В этом руководстве мы рассмотрим, как предотвратить выполнение задачи Celery, зависящей от транзакции базы данных Django, до того, как база данных зафиксирует транзакцию. Это довольно распространенная проблема.

Серия Django + Celery:

  1. Асинхронные задачи с Django и Celery
  2. Обработка периодических задач в Django с помощью Celery и Docker
  3. Автоматический повтор невыполненных задач Celery
  4. Работа с транзакциями базы данных Celery и Django (эта статья)

Цели

После прочтения вы должны уметь:

  1. Описать, что такое транзакция базы данных и как ее использовать в Django.
  2. Объясните, почему вы можете получить ошибку DoesNotExist в воркере Celery и как ее решить.
  3. Предотвратить выполнение задачи до того, как база данных зафиксирует транзакцию

Что такое транзакция базы данных?

Транзакция базы данных - это единица работы, которая либо фиксируется (применяется к базе данных), либо откатывается (отменяется из базы данных) как единое целое.

Большинство баз данных используют следующий шаблон:

  1. Начать транзакцию.
  2. Выполнение набора манипуляций с данными и/или запросов.
  3. Если ошибок не возникает, зафиксируйте транзакцию.
  4. Если произойдет ошибка, то откатите транзакцию.

Как видите, транзакция - это очень полезный способ сохранить ваши данные вдали от хаоса.

Как использовать транзакции баз данных в Django

Исходный код этого руководства вы можете найти на Github. Если вы решите использовать этот код в процессе работы над этим постом, помните, что имя пользователя должно быть уникальным. Вы можете использовать генератор случайных имен пользователей для тестирования, например Faker.

Давайте сначала посмотрим на это представление Django:

def test_view(request):
    user = User.objects.create_user('john', 'lennon@thebeatles.com', 'johnpassword')
    logger.info(f'create user {user.pk}')
    raise Exception('test')

Что происходит, когда вы посещаете этот вид?

Поведение по умолчанию

По умолчанию в Django используется autocommit: каждый запрос напрямую фиксируется в базе данных, если не активна транзакция. Другими словами, при автокоммите каждый запрос начинает транзакцию и либо фиксирует, либо откатывает транзакцию. Если у вас есть представление с тремя запросами, то каждый из них будет выполняться по очереди. Если один из них не сработает, два других будут зафиксированы.

Итак, в приведенном выше представлении исключение возникает после фиксации транзакции, создающей пользователя john.

Явный контроль

Если вы предпочитаете иметь больший контроль над транзакциями базы данных, вы можете переопределить поведение по умолчанию с помощью transaction.atomic.atomic. В этом режиме, перед вызовом функции представления, Django начинает транзакцию. Если ответ получен без проблем, Django фиксирует транзакцию. С другой стороны, если представление выдает исключение, Django откатывает транзакцию. Если у вас есть три запроса и один из них не сработал, то ни один из запросов не будет зафиксирован.

Итак, давайте перепишем представление, используя transaction.atomic:

def transaction_test(request):
    with transaction.atomic():
        user = User.objects.create_user('john1', 'lennon@thebeatles.com', 'johnpassword')
        logger.info(f'create user {user.pk}')
        raise Exception('force transaction to rollback')

Теперь операция user create будет откатываться при возникновении исключения, поэтому пользователь в итоге не будет создан.

transaction.atomic - это очень полезный инструмент, который может сохранить ваши данные организованными, особенно когда вам нужно манипулировать данными в моделях.

Он также может использоваться в качестве декоратора, например, так:

@transaction.atomic
def transaction_test2(request):
    user = User.objects.create_user('john1', 'lennon@thebeatles.com', 'johnpassword')
    logger.info(f'create user {user.pk}')
    raise Exception('force transaction to rollback')

Так что если в представлении возникнет какая-то ошибка, и мы не поймаем ее, то транзакция откатится назад.

Если вы хотите использовать transaction.atomic для всех функций представления, вы можете установить ATOMIC_REQUESTS на True в вашем файле настроек Django:

ATOMIC_REQUESTS=True

# or

DATABASES["default"]["ATOMIC_REQUESTS"] = True

Затем вы можете переопределить поведение, чтобы представление запускалось в режиме автокоммита:

@transaction.non_atomic_requests

Исключение

DoesNotExist

Если у вас нет твердого понимания того, как Django управляет транзакциями базы данных, это может привести вас в замешательство, когда вы столкнетесь со случайными ошибками, связанными с базой данных, в Celery worker.

Давайте рассмотрим пример:

@transaction.atomic
def transaction_celery(request):
    username = random_username()
    user = User.objects.create_user(username, 'lennon@thebeatles.com', 'johnpassword')
    logger.info(f'create user {user.pk}')
    task_send_welcome_email.delay(user.pk)

    time.sleep(1)
    return HttpResponse('test')

Код задачи выглядит следующим образом:

@shared_task()
def task_send_welcome_email(user_pk):
    user = User.objects.get(pk=user_pk)
    logger.info(f'send email to {user.email} {user.pk}')
  1. Since the view uses the transaction.atomic decorator, all database operations are only committed if an error isn't raised in the view, including the Celery task.
  2. The task is fairly simple: We create a user and then pass the primary key to the task to send a welcome email.
  3. time.sleep(1) is used to introduce a race condition.

При запуске вы увидите следующую ошибку:

django.contrib.auth.models.User.DoesNotExist: User matching query does not exist.

Почему?

  1. We pause for 1 second after enqueueing the task.
  2. Since the task executes immediately, user = User.objects.get(pk=user_pk) fails as the user is not in the database because the transaction in Django has not yet been committed.

Решение

Есть три способа решения этой проблемы:

  1. Отключите транзакцию базы данных, чтобы Django использовал функцию автоматической фиксации. Для этого вы можете просто удалить декоратор transaction.atomic. Однако делать это не рекомендуется, поскольку атомарная транзакция базы данных является мощным инструментом.

  2. Принудительно запустить задачу Celery через определенный период времени.

    Например, чтобы сделать паузу на 10 секунд:

    task_send_welcome_email.apply_async(args=[user.pk], countdown=10)
    
  3. В Django есть функция обратного вызова transaction.on_commit, которая выполняется после успешной фиксации транзакции. Чтобы использовать это, обновите представление следующим образом:

    @transaction.atomic
    def transaction_celery2(request):
        username = random_username()
        user = User.objects.create_user(username, 'lennon@thebeatles.com', 'johnpassword')
        logger.info(f'create user {user.pk}')
        # the task does not get called until after the transaction is committed
        transaction.on_commit(lambda: task_send_welcome_email.delay(user.pk))
    
        time.sleep(1)
        return HttpResponse('test')
    

    Теперь задача не вызывается до тех пор, пока не будет зафиксирована транзакция базы данных. Таким образом, когда рабочий процесс Celery находит пользователя, его можно найти, потому что код рабочего процесса всегда запускается после успешной фиксации транзакции базы данных Django.

    Это рекомендуемое решение.

Стоит отметить, что вы можете не захотеть, чтобы ваша транзакция была зафиксирована сразу же, особенно если вы работаете в высокомасштабной среде. Если база данных или экземпляр находятся в состоянии высокой загрузки, принудительная фиксация только увеличит существующее использование. В этом случае лучше использовать второе решение и подождать достаточное количество времени (возможно, 20 секунд), чтобы убедиться, что изменения внесены в базу данных до выполнения задачи.

Тестирование

В Django TestCase каждый тест обернут в транзакцию, которая после каждого теста откатывается. Поскольку транзакции никогда не фиксируются, on_commit() тоже никогда не запускается. Поэтому, если вам нужно протестировать код, запускаемый в обратном вызове on_commit, используйте TransactionTestCase в вашем тестовом коде.

Транзакция базы данных в задаче Celery

Если вашей задаче Celery необходимо обновить запись в базе данных, имеет смысл использовать транзакцию базы данных в задаче Celery.

Одним из простых способов является with transaction.atomic():

@shared_task()
def task_transaction_test():
    with transaction.atomic():
        from .views import random_username
        username = random_username()
        user = User.objects.create_user(username, 'lennon@thebeatles.com', 'johnpassword')
        user.save()
        logger.info(f'send email to {user.pk}')
        raise Exception('test')

Лучше написать пользовательский decorator, который имеет transaction поддержку:

class custom_celery_task:
    """
    This is a decorator we can use to add custom logic to our Celery task
    such as retry or database transaction
    """
    def __init__(self, *args, **kwargs):
        self.task_args = args
        self.task_kwargs = kwargs

    def __call__(self, func):
        @functools.wraps(func)
        def wrapper_func(*args, **kwargs):
            try:
                with transaction.atomic():
                    return func(*args, **kwargs)
            except Exception as e:
                # task_func.request.retries
                raise task_func.retry(exc=e, countdown=5)

        task_func = shared_task(*self.task_args, **self.task_kwargs)(wrapper_func)
        return task_func

...

@custom_celery_task(max_retries=5)
def task_transaction_test():
    # do something

Заключение

В этом руководстве рассматривалось, как заставить Celery хорошо работать с транзакциями базы данных Django.

Исходный код этого руководства можно найти на GitHub.

Спасибо за ваше чтение. Если у вас есть какие-либо вопросы, пожалуйста, не стесняйтесь связаться со мной .

Серия Django + Celery:

  1. Асинхронные задачи с Django и Celery
  2. Обработка периодических задач в Django с помощью Celery и Docker
  3. Автоматический повтор невыполненных задач Celery
  4. Работа с транзакциями базы данных Celery и Django (эта статья)
Вернуться на верх