Вход пользователя в Celery Task в Django

У меня есть приложение, в котором нам нужно вести некий журнал аудита, который должен показывать, какой пользователь изменил какие данные в базе данных в какое время.

Я реализовал простое решение с использованием сигналов. Поэтому я создал приложение под названием "actions", в котором я регистрирую всю необходимую информацию. Для этого я подключил три сигнала (presave, postsave и predelete):

apps.py

class ActionsConfig(AppConfig):
    name = 'actions'

    def ready(self):

        apps_to_audit = ['app1', 'app2', 'app3']

        for app in apps_to_audit:
            app_models = apps.get_app_config(app).get_models()
            for model in app_models:
                pre_save.connect(pre_save_signal, sender=f'{app}.{model.__name__}', dispatch_uid=f'{app}_{model}_pre_save')
                post_save.connect(post_save_signal, sender=f'{app}.{model.__name__}', dispatch_uid=f'{app}_{model}_post_save')
                pre_delete.connect(pre_delete_signal, sender=f'{app}.{model.__name__}', dispatch_uid=f'{app}_{model}_pre_delete')

затем я создал функцию celery для хранения данных в actions-модели:

tasks.py

@shared_task
def log_action(action):
    from actions.models import Action
    from users.models import CustomUser

    app_label = action.get('app')
    model_name = action.get('object')
    user_pk = action.get('user')
    action_type = action.get('action_type')
    updated_fields = action.get('updated_fields')

    # Get Object
    Object = apps.get_model(app_label=app_label, model_name=model_name)
    model_pk = action.get('model_pk')

    # Get Object instance
    try:
        target = Object.objects.get(pk=model_pk)
    except Object.DoesNotExist:
        target = None

    # Get User
    try:
        user = CustomUser.objects.get(pk=user_pk)
    except CustomUser.DoesNotExist:
        user = None

    # LOGIC TO DEFINE VERB
    verb = ''
    if action_type == 1:
        verb = f' created new {model_name}'

    elif action_type == 2:
        verb = f' updated {model_name}'

    elif action_type == 3:
        verb = f' deleted {model_name}'

    if user:
        verb = f'{user}' + verb
    else:
        verb = 'Backend' + verb

    # Save Action
    action = Action(user=user, action_type=action_type, verb=verb, target=target, updated_fields=updated_fields)
    action.save()

и наконец, вот мои сигналы:

signals.py

def post_save_signal(sender, instance, **kwargs):
    if kwargs['created']:
        user = get_user()
        if user is None:
            user = None
        else:
            user = user.pk

        inital_instance = instance.__dict__.copy()
        updated_fields = {field: str(inital_instance[field]) for field in inital_instance}
        updated_fields.pop('_state')

        action = {'app': sender._meta.app_label,
                  'object': sender._meta.object_name,
                  'model_pk': instance.pk,
                  'user': user,
                  'action_type': 1,
                  'updated_fields': updated_fields}

        transaction.on_commit(lambda: log_action.apply_async(kwargs={'action': action}, ))


def pre_save_signal(sender, instance, **kwargs):

    if instance.pk:
        initial = sender.objects.get(pk=instance.pk)
        initial_json, final_json = initial.__dict__.copy(), instance.__dict__.copy()
        initial_json.pop('_state'), final_json.pop('_state')
        only_changed_fields = {k: {'from': str(initial_json[k]), 'to': str(final_json[k])} for k in initial_json
                               if final_json[k] != initial_json[k]}

        user = get_user()
        if user is None:
            user = None
        else:
            user = user.pk

        action = {'app': sender._meta.app_label,
                  'object': sender._meta.object_name,
                  'model_pk': instance.pk,
                  'user': user,
                  'action_type': 2,
                  'updated_fields': only_changed_fields}

        if len(only_changed_fields) > 0:
            transaction.on_commit(lambda: log_action.apply_async(kwargs={'action': action}))


def pre_delete_signal(sender, instance, **kwargs):
    user = get_user()

    if user is None:
        user = None
    else:
        user = user.pk

    final_instance = instance.__dict__.copy()
    updated_fields = {field: str(final_instance[field]) for field in final_instance}

    action = {'app': sender._meta.app_label,
              'object': sender._meta.object_name,
              'model_pk': instance.pk,
              'user': user,
              'action_type': 3,
              'updated_fields': updated_fields}

    transaction.on_commit(lambda: log_action.apply_async(kwargs={'action': action}))

Это работает как шарм. Но если мне нужно "передать" конкретную задачу задаче celery (потому что она занимает слишком много времени для выполнения в представлении и может быть легко запущена в фоновом режиме), я "теряю" информацию о пользователе, потому что celery не знает, какой пользователь на самом деле вызвал задачу.

Одним из решений может быть регистрация пользователя в задаче celery и выход из нее после завершения (функция get_user() - это вызов в пользовательском промежуточном ПО для извлечения текущего зарегистрированного пользователя). К сожалению, Django требует либо набор username &pw, либо пользователя и запроса. Передача запроса в задачи celery, вероятно, будет хлопотной (нелегко сериализуемой), и, очевидно, у меня нет доступа к pw пользователя. Я также не уверен, что это будет элегантным решением. В принципе, логирование работает, но я не вижу, какой пользователь на самом деле запустил задачу.

Это очень специфическая и уникальная ситуация, когда это происходит (внешний пинг к моему бэкенду), поэтому я мог бы реализовать исключение в моих сигналах, обрабатывающих обычные типы и эту специфическую ситуацию, но я не большой поклонник исключений и мне интересно, как можно решить это в любом случае.

Есть идеи, как это решить?

Вернуться на верх