Юнит-тесты Django внезапно перестали работать

Мои модульные тесты работали нормально, но последние два дня они не работают без видимых причин. Никаких новых дополнений или серьезных изменений в код не вносилось, но они все равно не работают.

Та же логика работает в других тестовых случаях, но для пяти конкретных тестов она начала давать сбой.

Я попробовал запустить тесты, проверив предыдущую ветку, где тесты работали нормально, но теперь они перестали работать даже при тех условиях. Я совершенно не знаю, что делать дальше.

Ниже приведен сниппет, который был реализован старшим разработчиком, но он покинул компанию две недели назад. Я не знаю, может быть, это не фильтрует данные или может быть сценарий наложения даты для данных, которые мы хотим отфильтровать.

Добрая помощь.

class EntityBestDataSourceChange(models.Model):
    scope = models.TextField()
    scope_category = models.TextField()
    entity_id = models.IntegerField()
    child_id = models.IntegerField()
    year = models.IntegerField()
    source = models.TextField()
    operation = models.TextField()

    class Meta:
        unique_together = [
            'scope', 'scope_category', 'entity_id',
            'child_id', 'year', 'source', 'operation']

    @classmethod
    @transaction.atomic
    def perform_recalculate(cls):
        ebds_changes = (
            EntityBestDataSourceChange.objects
            .select_for_update(skip_locked=True)
            .all()
        )

        number_of_changes = len(ebds_changes)

        additions = set()
        removals = set()

        for ch in ebds_changes:
            key = (
                ch.scope, ch.scope_category, ch.entity_id,
                ch.child_id, ch.year,
                ACTIVITY_SOURCE_GROUPED_SOURCE[ch.source])
            if ch.operation == '-':
                removals.add(key)
            elif ch.operation == '+':
                additions.add(key)

        inter = additions & removals
        additions -= inter
        removals -= inter

        if additions:
            additions_d = defaultdict(set)
            for *key, source in additions:
                additions_d[scope_key(*key)].add(source)
            EntityBestDataSource.check_for_categories(additions_d)
        if removals:
            removals_d = defaultdict(set)
            for *key, source in removals:
                removals_d[scope_key(*key)].add(source)
            EntityBestDataSource.check_for_categories(removals_d, True)

        if ebds_changes:
            ebds_changes.delete()

        return number_of_changes

    @classmethod
    def recalculate(cls):
        if enable_aggregate_ebds_calculations.get() != 0:
            return
        return cls.perform_recalculate()
enable_aggregate_ebds_calculations = contextvars.ContextVar(
    'enable_aggregate_ebds_calculations', default=0)


@contextmanager
def aggregate_ebds_calculations():
    enable_aggregate_ebds_calculations.set(
        enable_aggregate_ebds_calculations.get() + 1)

    try:
        yield
        if enable_aggregate_ebds_calculations.get() == 1:
            EntityBestDataSourceChange.perform_recalculate()
    finally:
        enable_aggregate_ebds_calculations.set(
            enable_aggregate_ebds_calculations.get() - 1)

недавнее изменение:


class DateRangeModelManager(DataChangedBulkOperationsManager):
    def bulk_delete(self, object_ids):
        with aggregate_ebds_calculations():
            self.model.objects.filter(id__in=object_ids).delete()
    def upload_file(self, data_type, data, method='insert',
                    file_name='test.csv', file_type=None, **kwargs):
        if file_type is None:
            file_type = file_name.rsplit('.', 1)[-1]

        return self.post(
            ('internal:etl:upload',), {
                'file': self.get_uploaded_file(file_name, data),
                'data_type': data_type,
                'action': method,
                'file_type': file_type,
                **kwargs
            },
            format='multipart',
            user=self.internal_user_1
        )

    def export_and_import(self, export_data_type, import_data_type,
                          client_id, obj_id, updates):
        cols, rows = self.export(export_data_type, client_id)
        row = next(r for r in rows if r['id'] == obj_id)
        row.update(updates)

        return self.upload_file(import_data_type, [row], method='update')

    def export(self, export_data_type, client_id='', include_id='on', **kwargs):
        self.post(
            ('internal:export_data',), {
                'data_type': export_data_type,
                'include_id': include_id,
                'client': client_id,
                **kwargs
            },
            user=self.internal_user_1
        )

        de = DataExport.objects.latest('id')
        cols, rows = self.read_excel(de.resource.file)
        return cols, rows

выше приведен полный файл теста

это модульный тест:

Ниже приведена ошибка

class Consumption(BaseConsumption, HistoryModel, DataChangedSignalModel):
    objects = managers.ConsumptionManager()
    supplier = models.ForeignKey(
        Client,
        related_name='%(app_label)s_%(class)s_supplier',
        related_query_name='%(app_label)s_%(class)s_supplier_query',
        null=True,
        on_delete=models.CASCADE
    )
    create_time = models.DateTimeField(auto_now_add=True, null=True)
    update_time = models.DateTimeField(auto_now=True, null=True)

    class Meta:
        constraints = [
            UniqueConstraint(
                fields=[
                    'entity_id',
                    'start_date',
                    'end_date',
                    'activity_category_id',
                    'estimated',
                    'market_emission_source',
                    'market_emission_factor',
                ],
                condition=Q(source=ActivitySource.DMS),
                name='dms_rows_not_duplicated_mef'
            ),
            UniqueConstraint(
                fields=[
                    'entity_id',
                    'start_date',
                    'end_date',
                    'activity_category_id',
                    'estimated',
                    'market_emission_source',
                ],
                condition=Q(
                    source=ActivitySource.DMS,
                    market_emission_factor=None
                ),
                name='dms_rows_not_duplicated'
            ),
        ]

    def save(self, ownership_type=OwnershipTypes.OPERATIONAL, owned_percentage=100, **kwargs):
        return super().save(data_changed_kwargs={
            'ownership_details': {0: {
                'ownership_type': ownership_type,
                'owned_percentage': owned_percentage,
            }}
        }, **kwargs)

недавнее изменение, сделанное sr dev:

class DateRangeModelManager(DataChangedBulkOperationsManager):
    def bulk_delete(self, object_ids):
        with aggregate_ebds_calculations():
            self.model.objects.filter(id__in=object_ids).delete()
Вернуться на верх