Django unit tests suddenly stopped working
My unit tests were working fine, but for the past two days, they have been failing without any apparent reason. No new code additions or major changes have been made, yet they are failing.
The same logic is working in other test cases, but for five specific tests, it has started to fail.
I tried running the tests by checking out a previous branch where the tests were working fine, but now they have stopped working even under those conditions. I am completely clueless about what to do next.
Below is the snippet which was implemented by sr dev, but he left the company two weeks ago. I don’t know this might not be filtering the data or there might be a scenario of overlapping of date for the data that we want to filter.
Kindly help.
class EntityBestDataSourceChange(models.Model):
scope = models.TextField()
scope_category = models.TextField()
entity_id = models.IntegerField()
child_id = models.IntegerField()
year = models.IntegerField()
source = models.TextField()
operation = models.TextField()
class Meta:
unique_together = [
'scope', 'scope_category', 'entity_id',
'child_id', 'year', 'source', 'operation']
@classmethod
@transaction.atomic
def perform_recalculate(cls):
ebds_changes = (
EntityBestDataSourceChange.objects
.select_for_update(skip_locked=True)
.all()
)
number_of_changes = len(ebds_changes)
additions = set()
removals = set()
for ch in ebds_changes:
key = (
ch.scope, ch.scope_category, ch.entity_id,
ch.child_id, ch.year,
ACTIVITY_SOURCE_GROUPED_SOURCE[ch.source])
if ch.operation == '-':
removals.add(key)
elif ch.operation == '+':
additions.add(key)
inter = additions & removals
additions -= inter
removals -= inter
if additions:
additions_d = defaultdict(set)
for *key, source in additions:
additions_d[scope_key(*key)].add(source)
EntityBestDataSource.check_for_categories(additions_d)
if removals:
removals_d = defaultdict(set)
for *key, source in removals:
removals_d[scope_key(*key)].add(source)
EntityBestDataSource.check_for_categories(removals_d, True)
if ebds_changes:
ebds_changes.delete()
return number_of_changes
@classmethod
def recalculate(cls):
if enable_aggregate_ebds_calculations.get() != 0:
return
return cls.perform_recalculate()
enable_aggregate_ebds_calculations = contextvars.ContextVar(
'enable_aggregate_ebds_calculations', default=0)
@contextmanager
def aggregate_ebds_calculations():
enable_aggregate_ebds_calculations.set(
enable_aggregate_ebds_calculations.get() + 1)
try:
yield
if enable_aggregate_ebds_calculations.get() == 1:
EntityBestDataSourceChange.perform_recalculate()
finally:
enable_aggregate_ebds_calculations.set(
enable_aggregate_ebds_calculations.get() - 1)
recent change:
class DateRangeModelManager(DataChangedBulkOperationsManager):
def bulk_delete(self, object_ids):
with aggregate_ebds_calculations():
self.model.objects.filter(id__in=object_ids).delete()
def upload_file(self, data_type, data, method='insert',
file_name='test.csv', file_type=None, **kwargs):
if file_type is None:
file_type = file_name.rsplit('.', 1)[-1]
return self.post(
('internal:etl:upload',), {
'file': self.get_uploaded_file(file_name, data),
'data_type': data_type,
'action': method,
'file_type': file_type,
**kwargs
},
format='multipart',
user=self.internal_user_1
)
def export_and_import(self, export_data_type, import_data_type,
client_id, obj_id, updates):
cols, rows = self.export(export_data_type, client_id)
row = next(r for r in rows if r['id'] == obj_id)
row.update(updates)
return self.upload_file(import_data_type, [row], method='update')
def export(self, export_data_type, client_id='', include_id='on', **kwargs):
self.post(
('internal:export_data',), {
'data_type': export_data_type,
'include_id': include_id,
'client': client_id,
**kwargs
},
user=self.internal_user_1
)
de = DataExport.objects.latest('id')
cols, rows = self.read_excel(de.resource.file)
return cols, rows
from ghg_reporting.tests.base_ghg_test_case import BaseGHGTestCase
from etl.tests.base_test import ExtendedBaseETLTest
from ghg_reporting.models import Consumption
from meters.models import Meter
class TestMeterConsumptionGHGUpdate(BaseGHGTestCase, ExtendedBaseETLTest):
@classmethod
def setUpTestData(cls):
super().setUpTestData()
cls.elec_ac = cls.create_fuel_category(
cls.fuel_electricity, 0.1, emission_year=2020)[0]
cls.ng_ac = cls.create_fuel_category(
cls.fuel_natural_gas, 0.2, emission_year=2020)[0]
cls.meter_1 = cls.create_meter(
'Submeter 1', '11-22-33', fuel=cls.fuel_electricity,
is_submeter=True)
cls.meter_2 = cls.create_meter(
'Meter 1', '22-33-44', fuel=cls.fuel_natural_gas)
cls.m1_invoice = cls.add_meter_invoice_consumption(
cls.meter_1, end_date='2020-01-31')
cls.m2_invoice = cls.add_meter_invoice_consumption(
cls.meter_2, consumption=1000, end_date='2020-01-31')
def test_consumption_data_copied_as_expected(self):
self.assertEqual(Consumption.objects.count(), 1)
m_cons = Consumption.objects.get()
self.assertEqual(m_cons.entity_id, self.meter_2.entity.pk)
self.assertEqual(m_cons.activity_category_id, self.ng_ac.pk)
self.assertAlmostEqual(m_cons.value, 1000)
def test_invoice_change_reflected_in_consumption(self):
self.m2_invoice.consumption = 50
self.m2_invoice.save()
m_cons = Consumption.objects.get(
entity_id=self.meter_2.entity.pk)
self.assertAlmostEqual(m_cons.value, 50)
def test_invoice_delete_reflected_in_consumption(self):
self.m2_invoice.delete()
with self.assertRaises(Consumption.DoesNotExist):
Consumption.objects.get(
entity_id=self.meter_2.entity.pk)
def test_change_meter_fuel_update_consumption(self):
self.meter_2.fuel = self.fuel_electricity
self.meter_2.save()
m_cons = Consumption.objects.get()
self.assertEqual(m_cons.activity_category_id, self.elec_ac.pk)
def test_change_meter_fuel_via_etl_update_consumption(self):
resp = self.export_and_import(
self.export_types.METER,
self.etl_types.METERS,
self.building.client_id,
self.meter_2.pk, {
'fuel': self.fuel_electricity.name,
}
)
self.assertUploadedSuccessfully(resp)
self.meter_2.refresh_from_db()
self.assertEqual(self.meter_2.fuel_id, self.fuel_electricity.pk)
m_cons = Consumption.objects.get()
self.assertEqual(m_cons.activity_category_id, self.elec_ac.pk)
def test_change_to_submeter_delete_consumption(self):
self.meter_2.is_submeter = True
self.meter_2.save()
self.assertEqual(Consumption.objects.count(), 0)
def test_change_to_main_meter_create_consumption(self):
self.meter_1.is_submeter = False
self.meter_1.save()
self.assertEqual(Consumption.objects.count(), 2)
cons = Consumption.objects.get(
entity=self.meter_1.entity)
self.assertAlmostEqual(cons.value, 100)
def test_change_to_main_meter_via_etl_create_cons(self):
resp = self.export_and_import(
self.export_types.METER,
self.etl_types.METERS,
self.building.client_id,
self.meter_1.pk, {
'is_submeter': 'FALSE'
}
)
self.assertUploadedSuccessfully(resp)
self.meter_1.refresh_from_db()
self.assertFalse(self.meter_2.is_submeter)
self.assertEqual(Consumption.objects.count(), 2)
cons = Consumption.objects.get(
entity=self.meter_1.entity)
self.assertAlmostEqual(cons.value, 100)
def test_delete_meter_delete_consumption(self):
self.meter_2.delete()
self.assertEqual(Consumption.objects.count(), 0)
def test_bulk_delete_meter_delete_consumption(self):
Meter.objects.filter(pk=self.meter_2.pk).delete()
self.assertEqual(Consumption.objects.count(), 0)
def test_etl_delete_meter_delete_consumption(self):
resp = self.upload_file(
self.etl_types.METERS, [{
'client_key': self.building.client.key,
'id': self.meter_2.pk
}],
method='delete'
)
self.assertUploadedSuccessfully(resp)
self.assertEqual(Consumption.objects.count(), 0)
above is the complete test file
this is the unit test:
from ghg_reporting.tests.base_ghg_test_case import BaseGHGTestCase
from etl.tests.base_test import ExtendedBaseETLTest
from ghg_reporting.models import Consumption
from meters.models import Meter
class TestMeterConsumptionGHGUpdate(BaseGHGTestCase, ExtendedBaseETLTest):
@classmethod
def setUpTestData(cls):
super().setUpTestData()
cls.elec_ac = cls.create_fuel_category(
cls.fuel_electricity, 0.1, emission_year=2020)[0]
cls.ng_ac = cls.create_fuel_category(
cls.fuel_natural_gas, 0.2, emission_year=2020)[0]
cls.meter_1 = cls.create_meter(
'Submeter 1', '11-22-33', fuel=cls.fuel_electricity,
is_submeter=True)
cls.meter_2 = cls.create_meter(
'Meter 1', '22-33-44', fuel=cls.fuel_natural_gas)
cls.m1_invoice = cls.add_meter_invoice_consumption(
cls.meter_1, end_date='2020-01-31')
cls.m2_invoice = cls.add_meter_invoice_consumption(
cls.meter_2, consumption=1000, end_date='2020-01-31')
def test_consumption_data_copied_as_expected(self):
self.assertEqual(Consumption.objects.count(), 1)
m_cons = Consumption.objects.get()
self.assertEqual(m_cons.entity_id, self.meter_2.entity.pk)
self.assertEqual(m_cons.activity_category_id, self.ng_ac.pk)
self.assertAlmostEqual(m_cons.value, 1000)
def test_invoice_change_reflected_in_consumption(self):
self.m2_invoice.consumption = 50
self.m2_invoice.save()
m_cons = Consumption.objects.get(
entity_id=self.meter_2.entity.pk)
self.assertAlmostEqual(m_cons.value, 50)
def test_invoice_delete_reflected_in_consumption(self):
self.m2_invoice.delete()
with self.assertRaises(Consumption.DoesNotExist):
Consumption.objects.get(
entity_id=self.meter_2.entity.pk)
def test_change_meter_fuel_update_consumption(self):
self.meter_2.fuel = self.fuel_electricity
self.meter_2.save()
m_cons = Consumption.objects.get()
self.assertEqual(m_cons.activity_category_id, self.elec_ac.pk)
def test_change_meter_fuel_via_etl_update_consumption(self):
resp = self.export_and_import(
self.export_types.METER,
self.etl_types.METERS,
self.building.client_id,
self.meter_2.pk, {
'fuel': self.fuel_electricity.name,
}
)
self.assertUploadedSuccessfully(resp)
self.meter_2.refresh_from_db()
self.assertEqual(self.meter_2.fuel_id, self.fuel_electricity.pk)
m_cons = Consumption.objects.get()
self.assertEqual(m_cons.activity_category_id, self.elec_ac.pk)
def test_change_to_submeter_delete_consumption(self):
self.meter_2.is_submeter = True
self.meter_2.save()
self.assertEqual(Consumption.objects.count(), 0)
def test_change_to_main_meter_create_consumption(self):
self.meter_1.is_submeter = False
self.meter_1.save()
self.assertEqual(Consumption.objects.count(), 2)
cons = Consumption.objects.get(
entity=self.meter_1.entity)
self.assertAlmostEqual(cons.value, 100)
def test_change_to_main_meter_via_etl_create_cons(self):
resp = self.export_and_import(
self.export_types.METER,
self.etl_types.METERS,
self.building.client_id,
self.meter_1.pk, {
'is_submeter': 'FALSE'
}
)
self.assertUploadedSuccessfully(resp)
self.meter_1.refresh_from_db()
self.assertFalse(self.meter_2.is_submeter)
self.assertEqual(Consumption.objects.count(), 2)
cons = Consumption.objects.get(
entity=self.meter_1.entity)
self.assertAlmostEqual(cons.value, 100)
def test_delete_meter_delete_consumption(self):
self.meter_2.delete()
self.assertEqual(Consumption.objects.count(), 0)
def test_bulk_delete_meter_delete_consumption(self):
Meter.objects.filter(pk=self.meter_2.pk).delete()
self.assertEqual(Consumption.objects.count(), 0)
def test_etl_delete_meter_delete_consumption(self):
resp = self.upload_file(
self.etl_types.METERS, [{
'client_key': self.building.client.key,
'id': self.meter_2.pk
}],
method='delete'
)
self.assertUploadedSuccessfully(resp)
self.assertEqual(Consumption.objects.count(), 0)
below is the error
======================================================================
ERROR [0.204s]: test_change_meter_fuel_update_consumption (ghg_reporting.tests.test_meters_consumption_creation.TestMeterConsumptionGHGUpdate.test_change_meter_fuel_update_consumption)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/container/code/ghg_reporting/tests/test_meters_consumption_creation.py", line 53, in test_change_meter_fuel_update_consumption
m_cons = Consumption.objects.get()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/container/.local/lib/python3.11/site-packages/django/db/models/manager.py", line 87, in manager_method
return getattr(self.get_queryset(), name)(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/container/.local/lib/python3.11/site-packages/django/db/models/query.py", line 637, in get
raise self.model.DoesNotExist(
ghg_reporting.models.Consumption.DoesNotExist: Consumption matching query does not exist.
======================================================================
ERROR [5.308s]: test_change_meter_fuel_via_etl_update_consumption (ghg_reporting.tests.test_meters_consumption_creation.TestMeterConsumptionGHGUpdate.test_change_meter_fuel_via_etl_update_consumption)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/container/code/ghg_reporting/tests/test_meters_consumption_creation.py", line 71, in test_change_meter_fuel_via_etl_update_consumption
m_cons = Consumption.objects.get()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/container/.local/lib/python3.11/site-packages/django/db/models/manager.py", line 87, in manager_method
return getattr(self.get_queryset(), name)(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/container/.local/lib/python3.11/site-packages/django/db/models/query.py", line 637, in get
raise self.model.DoesNotExist(
ghg_reporting.models.Consumption.DoesNotExist: Consumption matching query does not exist.
======================================================================
FAIL [0.419s]: test_change_to_main_meter_create_consumption (ghg_reporting.tests.test_meters_consumption_creation.TestMeterConsumptionGHGUpdate.test_change_to_main_meter_create_consumption)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/container/code/ghg_reporting/tests/test_meters_consumption_creation.py", line 84, in test_change_to_main_meter_create_consumption
self.assertEqual(Consumption.objects.count(), 2)
File "/home/container/code/core/tests/base_test_case.py", line 1108, in assertEqual
self.assertKeysAlmostEqual(actual, expected, *args, **kwargs)
File "/home/container/code/core/tests/base_test_case.py", line 837, in assertKeysAlmostEqual
super().assertEqual(act_value, exp_value, *args, **kwargs)
AssertionError: 1 != 2
======================================================================
FAIL [3.557s]: test_change_to_main_meter_via_etl_create_cons (ghg_reporting.tests.test_meters_consumption_creation.TestMeterConsumptionGHGUpdate.test_change_to_main_meter_via_etl_create_cons)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/container/code/ghg_reporting/tests/test_meters_consumption_creation.py", line 104, in test_change_to_main_meter_via_etl_create_cons
self.assertEqual(Consumption.objects.count(), 2)
File "/home/container/code/core/tests/base_test_case.py", line 1108, in assertEqual
self.assertKeysAlmostEqual(actual, expected, *args, **kwargs)
File "/home/container/code/core/tests/base_test_case.py", line 837, in assertKeysAlmostEqual
super().assertEqual(act_value, exp_value, *args, **kwargs)
AssertionError: 1 != 2
----------------------------------------------------------------------
Ran 11 tests in 44.552s
FAILED (failures=2, errors=2)
Generating XML reports...
Preserving test database for alias 'default'...
class Consumption(BaseConsumption, HistoryModel, DataChangedSignalModel):
objects = managers.ConsumptionManager()
supplier = models.ForeignKey(
Client,
related_name='%(app_label)s_%(class)s_supplier',
related_query_name='%(app_label)s_%(class)s_supplier_query',
null=True,
on_delete=models.CASCADE
)
create_time = models.DateTimeField(auto_now_add=True, null=True)
update_time = models.DateTimeField(auto_now=True, null=True)
class Meta:
constraints = [
UniqueConstraint(
fields=[
'entity_id',
'start_date',
'end_date',
'activity_category_id',
'estimated',
'market_emission_source',
'market_emission_factor',
],
condition=Q(source=ActivitySource.DMS),
name='dms_rows_not_duplicated_mef'
),
UniqueConstraint(
fields=[
'entity_id',
'start_date',
'end_date',
'activity_category_id',
'estimated',
'market_emission_source',
],
condition=Q(
source=ActivitySource.DMS,
market_emission_factor=None
),
name='dms_rows_not_duplicated'
),
]
def save(self, ownership_type=OwnershipTypes.OPERATIONAL, owned_percentage=100, **kwargs):
return super().save(data_changed_kwargs={
'ownership_details': {0: {
'ownership_type': ownership_type,
'owned_percentage': owned_percentage,
}}
}, **kwargs)
recent change done by sr dev:
class DateRangeModelManager(DataChangedBulkOperationsManager):
def bulk_delete(self, object_ids):
with aggregate_ebds_calculations():
self.model.objects.filter(id__in=object_ids).delete()