Юнит-тесты Django внезапно перестали работать
Мои модульные тесты работали нормально, но последние два дня они не работают без видимых причин. Никаких новых дополнений или серьезных изменений в код не вносилось, но они все равно не работают.
Та же логика работает в других тестовых случаях, но для пяти конкретных тестов она начала давать сбой.
Я попробовал запустить тесты, проверив предыдущую ветку, где тесты работали нормально, но теперь они перестали работать даже при тех условиях. Я совершенно не знаю, что делать дальше.
Ниже приведен сниппет, который был реализован старшим разработчиком, но он покинул компанию две недели назад. Я не знаю, может быть, это не фильтрует данные или может быть сценарий наложения даты для данных, которые мы хотим отфильтровать.
Добрая помощь.
class EntityBestDataSourceChange(models.Model):
scope = models.TextField()
scope_category = models.TextField()
entity_id = models.IntegerField()
child_id = models.IntegerField()
year = models.IntegerField()
source = models.TextField()
operation = models.TextField()
class Meta:
unique_together = [
'scope', 'scope_category', 'entity_id',
'child_id', 'year', 'source', 'operation']
@classmethod
@transaction.atomic
def perform_recalculate(cls):
ebds_changes = (
EntityBestDataSourceChange.objects
.select_for_update(skip_locked=True)
.all()
)
number_of_changes = len(ebds_changes)
additions = set()
removals = set()
for ch in ebds_changes:
key = (
ch.scope, ch.scope_category, ch.entity_id,
ch.child_id, ch.year,
ACTIVITY_SOURCE_GROUPED_SOURCE[ch.source])
if ch.operation == '-':
removals.add(key)
elif ch.operation == '+':
additions.add(key)
inter = additions & removals
additions -= inter
removals -= inter
if additions:
additions_d = defaultdict(set)
for *key, source in additions:
additions_d[scope_key(*key)].add(source)
EntityBestDataSource.check_for_categories(additions_d)
if removals:
removals_d = defaultdict(set)
for *key, source in removals:
removals_d[scope_key(*key)].add(source)
EntityBestDataSource.check_for_categories(removals_d, True)
if ebds_changes:
ebds_changes.delete()
return number_of_changes
@classmethod
def recalculate(cls):
if enable_aggregate_ebds_calculations.get() != 0:
return
return cls.perform_recalculate()
enable_aggregate_ebds_calculations = contextvars.ContextVar(
'enable_aggregate_ebds_calculations', default=0)
@contextmanager
def aggregate_ebds_calculations():
enable_aggregate_ebds_calculations.set(
enable_aggregate_ebds_calculations.get() + 1)
try:
yield
if enable_aggregate_ebds_calculations.get() == 1:
EntityBestDataSourceChange.perform_recalculate()
finally:
enable_aggregate_ebds_calculations.set(
enable_aggregate_ebds_calculations.get() - 1)
недавнее изменение:
class DateRangeModelManager(DataChangedBulkOperationsManager):
def bulk_delete(self, object_ids):
with aggregate_ebds_calculations():
self.model.objects.filter(id__in=object_ids).delete()
def upload_file(self, data_type, data, method='insert',
file_name='test.csv', file_type=None, **kwargs):
if file_type is None:
file_type = file_name.rsplit('.', 1)[-1]
return self.post(
('internal:etl:upload',), {
'file': self.get_uploaded_file(file_name, data),
'data_type': data_type,
'action': method,
'file_type': file_type,
**kwargs
},
format='multipart',
user=self.internal_user_1
)
def export_and_import(self, export_data_type, import_data_type,
client_id, obj_id, updates):
cols, rows = self.export(export_data_type, client_id)
row = next(r for r in rows if r['id'] == obj_id)
row.update(updates)
return self.upload_file(import_data_type, [row], method='update')
def export(self, export_data_type, client_id='', include_id='on', **kwargs):
self.post(
('internal:export_data',), {
'data_type': export_data_type,
'include_id': include_id,
'client': client_id,
**kwargs
},
user=self.internal_user_1
)
de = DataExport.objects.latest('id')
cols, rows = self.read_excel(de.resource.file)
return cols, rows
выше приведен полный файл теста
это модульный тест:
Ниже приведена ошибка
class Consumption(BaseConsumption, HistoryModel, DataChangedSignalModel):
objects = managers.ConsumptionManager()
supplier = models.ForeignKey(
Client,
related_name='%(app_label)s_%(class)s_supplier',
related_query_name='%(app_label)s_%(class)s_supplier_query',
null=True,
on_delete=models.CASCADE
)
create_time = models.DateTimeField(auto_now_add=True, null=True)
update_time = models.DateTimeField(auto_now=True, null=True)
class Meta:
constraints = [
UniqueConstraint(
fields=[
'entity_id',
'start_date',
'end_date',
'activity_category_id',
'estimated',
'market_emission_source',
'market_emission_factor',
],
condition=Q(source=ActivitySource.DMS),
name='dms_rows_not_duplicated_mef'
),
UniqueConstraint(
fields=[
'entity_id',
'start_date',
'end_date',
'activity_category_id',
'estimated',
'market_emission_source',
],
condition=Q(
source=ActivitySource.DMS,
market_emission_factor=None
),
name='dms_rows_not_duplicated'
),
]
def save(self, ownership_type=OwnershipTypes.OPERATIONAL, owned_percentage=100, **kwargs):
return super().save(data_changed_kwargs={
'ownership_details': {0: {
'ownership_type': ownership_type,
'owned_percentage': owned_percentage,
}}
}, **kwargs)
недавнее изменение, сделанное sr dev:
class DateRangeModelManager(DataChangedBulkOperationsManager):
def bulk_delete(self, object_ids):
with aggregate_ebds_calculations():
self.model.objects.filter(id__in=object_ids).delete()