Не выполняется корректно таска Celery (Django)

У меня есть небольшая логика вызова тасков, есть модель достижений для юзеров (условно сделай то и получишь опыт и сможешь поднять уровень профиля). Есть сигнал:

@receiver(post_save, sender=AchievementsProgressStatus)
def check_status(sender, instance, created, **kwargs):
    """From here we check the achievement status and set the actual user level"""
    if not created:
        check_achievements_status.delay(instance.id)

этот сигнал вызывает таску, которая обновляет прогресс и может быть флаг выполнено/не выполнено

@shared_task
def check_achievements_status(obj_id):
    """Tracking progress to set current is_achieved value"""
    from .models import AchievementsProgressStatus

    obj = (
        AchievementsProgressStatus.objects.select_related("user")
        .filter(id=obj_id)
        .first()
    )

    last_lvl = UserLevel.objects.last()

    if obj:
        with transaction.atomic():
            is_achieved = obj.progress_rn >= obj.achievement.final_value
            new_progress = min(obj.progress_rn, obj.achievement.final_value)

            AchievementsProgressStatus.objects.filter(id=obj_id).update(
                progress_rn=new_progress
            )

            # we are changing boolean field only when it has changed
            if obj.is_achieved != is_achieved:
                AchievementsProgressStatus.objects.filter(id=obj_id).update(
                    is_achieved=is_achieved
                )

                if obj.user.user_profile.user_level != last_lvl:
                    level_calculating.delay(obj.user.id)

Получаем наш обьект модели и джоиним юзера, после чего использую транзакцию и получаю 2 значения: is_achieved, new_progress. Если is_achieved изменил свое значение то меняем поле в модели и запускаем другую таску если юзер не на последнем уровне.

Таска имеет следующий вид

@shared_task
def level_calculating(user_id: int):
    from ..achievements.models import AchievementsProgressStatus
    from .models import UserLevel, UserProfile

    user = get_user_model().objects.select_related("user_profile").filter(id=user_id).first()

    if user:
        total_exp = (
            AchievementsProgressStatus.objects.select_related("achievement")
            .filter(user=user, is_achieved=True)
            .aggregate(total_exp=Sum("achievement__given_exp"))["total_exp"]
            or 0
        )

        current_level = UserLevel.objects.filter(
            low_range__lte=total_exp, top_range__gte=total_exp
        ).first()

        if current_level:
            UserProfile.objects.filter(user=user).update(user_level=current_level)

Получаем юзера по ид и агрегируем значения всех достижений которые выполнены, после чего это число сумы опыта всех выполненных достижений берем и просто проверяем какой у юзера уровень, если такой есть - ставим его юзеру.

На данном этапе все работает, пусть код и не самый лучший. И вот я решил написать сервис чтобы в таску не сувать логику.

class UserLevelCalculating:
    def __init__(self, user_id):
        self.user = get_user_model().objects.select_related("user_profile").filter(id=user_id).first()

    def get_total_exp(self):
        total_exp = (
            AchievementsProgressStatus.objects.select_related("achievement")
            .filter(user=self.user, is_achieved=True)
            .aggregate(total_exp=Sum("achievement__given_exp"))["total_exp"]
            or 0
        )
        return total_exp

    def get_level(self, total_exp):
        user_level = UserLevel.objects.filter(low_range__lte=total_exp, top_range__gte=total_exp).first()

        return user_level

    def level_updater(self):
        total_exp = self.get_total_exp()
        user_level = self.get_level(total_exp)

        if user_level:
            UserProfile.objects.filter(user=self.user).update(user_level=user_level)

Класс делает все идентично таске + так-же изменил таску чтобы она работала с классом, теперь она выглядит так

@shared_task
def level_calculating(user_id: int):
    from ..users.service.level_calculating_service import UserLevelCalculating

    obj = UserLevelCalculating(user_id)
    obj.level_updater()

Вроде все должно работать но увы, тест не проходит?

    def test_success_update_user_level(self):
        """
        Verifies that the achievement status is correctly updated when progress changes,
        and the Celery task runs successfully. Also checks that the user level is updated.
        """
        achievement_progress_status = AchievementsProgressStatus.objects.get(
            user=self.user, achievement__achievement_name=self.user_achievement.achievement_name
        )

        achievement_progress_status.progress_rn = 1
        achievement_progress_status.save()
        achievement_progress_status.refresh_from_db()

        result = check_achievements_status.delay(achievement_progress_status.id)

        self.assertTrue(result.successful())
        self.assertTrue(achievement_progress_status.is_achieved)

        self.user.user_profile.refresh_from_db()
        self.assertEqual(self.user.user_profile.user_level, self.second_lvl)

Сам тест очень простой, просто берем достижение, ставим прогресс на 1, обновляем значения в бд и проверяем что у юзера 2 уровень. И вот тут сама проблема, без разделения логики на сервис все работает отлично и тест проходит, а вот с ним не хочет. Над этим я сидел не мало и всеравно не вижу ошибку, буду рад любой подсказке!

Тест не проходит потому что не обновляется уровень:

...F......
======================================================================
FAIL: test_success_update_user_level (map_apps.achievements.tests.ModelAchievementsStatusTest)
Verifies that the achievement status is correctly updated when progress changes,
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/service/map_apps/achievements/tests.py", line 98, in test_success_update_user_level
    self.assertEqual(self.user.user_profile.user_level, self.second_lvl)
AssertionError: <UserLevel: Новачок> != <UserLevel: Продвинутий>

вдруг нужно будет пример моделей:

class Achievements(models.Model):
    """Models to save the achievements"""

    achievement_name = models.CharField(max_length=128)
    descr_achievement = models.CharField(max_length=256, blank=True)
    given_exp = models.PositiveSmallIntegerField(
        default=0, validators=[MinValueValidator(0)]
    )
    final_value = models.PositiveSmallIntegerField(
        default=0, validators=[MinValueValidator(1)]
    )
    for_organization = models.BooleanField(default=False)
    for_def_user = models.BooleanField(default=True)
    achievement_image = models.ImageField(upload_to="achiev_img/", blank=True)

    def __str__(self):
        return f"{self.achievement_name}"


class AchievementsProgressStatus(models.Model):
    """Models to track the user achievements progres"""

    user = models.ForeignKey(
        to=User, on_delete=models.CASCADE, related_name="achievementsprogressstatus"
    )
    achievement = models.ForeignKey(
        to=Achievements,
        on_delete=models.CASCADE,
        related_name="achievementsprogressstatus",
    )
    progress_rn = models.PositiveSmallIntegerField(
        default=0, validators=[MinValueValidator(0)]
    )
    is_achieved = models.BooleanField(default=False, db_index=True)

    def __str__(self):
        return (
            f"{self.user.get_full_name()} | {self.achievement.achievement_name} | Виконано = {self.is_achieved}, "
            f"{self.progress_rn}/{self.achievement.final_value}"
        )

Вернуться на верх