Django unit tests suddenly stopped working

My unit tests were working fine, but for the past two days, they have been failing without any apparent reason. No new code additions or major changes have been made, yet they are failing.

The same logic is working in other test cases, but for five specific tests, it has started to fail.

I tried running the tests by checking out a previous branch where the tests were working fine, but now they have stopped working even under those conditions. I am completely clueless about what to do next.

Below is the snippet which was implemented by sr dev, but he left the company two weeks ago. I don’t know this might not be filtering the data or there might be a scenario of overlapping of date for the data that we want to filter.

Kindly help.

class EntityBestDataSourceChange(models.Model):
    scope = models.TextField()
    scope_category = models.TextField()
    entity_id = models.IntegerField()
    child_id = models.IntegerField()
    year = models.IntegerField()
    source = models.TextField()
    operation = models.TextField()

    class Meta:
        unique_together = [
            'scope', 'scope_category', 'entity_id',
            'child_id', 'year', 'source', 'operation']

    @classmethod
    @transaction.atomic
    def perform_recalculate(cls):
        ebds_changes = (
            EntityBestDataSourceChange.objects
            .select_for_update(skip_locked=True)
            .all()
        )

        number_of_changes = len(ebds_changes)

        additions = set()
        removals = set()

        for ch in ebds_changes:
            key = (
                ch.scope, ch.scope_category, ch.entity_id,
                ch.child_id, ch.year,
                ACTIVITY_SOURCE_GROUPED_SOURCE[ch.source])
            if ch.operation == '-':
                removals.add(key)
            elif ch.operation == '+':
                additions.add(key)

        inter = additions & removals
        additions -= inter
        removals -= inter

        if additions:
            additions_d = defaultdict(set)
            for *key, source in additions:
                additions_d[scope_key(*key)].add(source)
            EntityBestDataSource.check_for_categories(additions_d)
        if removals:
            removals_d = defaultdict(set)
            for *key, source in removals:
                removals_d[scope_key(*key)].add(source)
            EntityBestDataSource.check_for_categories(removals_d, True)

        if ebds_changes:
            ebds_changes.delete()

        return number_of_changes

    @classmethod
    def recalculate(cls):
        if enable_aggregate_ebds_calculations.get() != 0:
            return
        return cls.perform_recalculate()
enable_aggregate_ebds_calculations = contextvars.ContextVar(
    'enable_aggregate_ebds_calculations', default=0)


@contextmanager
def aggregate_ebds_calculations():
    enable_aggregate_ebds_calculations.set(
        enable_aggregate_ebds_calculations.get() + 1)

    try:
        yield
        if enable_aggregate_ebds_calculations.get() == 1:
            EntityBestDataSourceChange.perform_recalculate()
    finally:
        enable_aggregate_ebds_calculations.set(
            enable_aggregate_ebds_calculations.get() - 1)

recent change:


class DateRangeModelManager(DataChangedBulkOperationsManager):
    def bulk_delete(self, object_ids):
        with aggregate_ebds_calculations():
            self.model.objects.filter(id__in=object_ids).delete()
    def upload_file(self, data_type, data, method='insert',
                    file_name='test.csv', file_type=None, **kwargs):
        if file_type is None:
            file_type = file_name.rsplit('.', 1)[-1]

        return self.post(
            ('internal:etl:upload',), {
                'file': self.get_uploaded_file(file_name, data),
                'data_type': data_type,
                'action': method,
                'file_type': file_type,
                **kwargs
            },
            format='multipart',
            user=self.internal_user_1
        )

    def export_and_import(self, export_data_type, import_data_type,
                          client_id, obj_id, updates):
        cols, rows = self.export(export_data_type, client_id)
        row = next(r for r in rows if r['id'] == obj_id)
        row.update(updates)

        return self.upload_file(import_data_type, [row], method='update')

    def export(self, export_data_type, client_id='', include_id='on', **kwargs):
        self.post(
            ('internal:export_data',), {
                'data_type': export_data_type,
                'include_id': include_id,
                'client': client_id,
                **kwargs
            },
            user=self.internal_user_1
        )

        de = DataExport.objects.latest('id')
        cols, rows = self.read_excel(de.resource.file)
        return cols, rows
from ghg_reporting.tests.base_ghg_test_case import BaseGHGTestCase
from etl.tests.base_test import ExtendedBaseETLTest

from ghg_reporting.models import Consumption
from meters.models import Meter


class TestMeterConsumptionGHGUpdate(BaseGHGTestCase, ExtendedBaseETLTest):
    @classmethod
    def setUpTestData(cls):
        super().setUpTestData()

        cls.elec_ac = cls.create_fuel_category(
            cls.fuel_electricity, 0.1, emission_year=2020)[0]
        cls.ng_ac = cls.create_fuel_category(
            cls.fuel_natural_gas, 0.2, emission_year=2020)[0]
        cls.meter_1 = cls.create_meter(
            'Submeter 1', '11-22-33', fuel=cls.fuel_electricity,
            is_submeter=True)
        cls.meter_2 = cls.create_meter(
            'Meter 1', '22-33-44', fuel=cls.fuel_natural_gas)
        cls.m1_invoice = cls.add_meter_invoice_consumption(
            cls.meter_1, end_date='2020-01-31')
        cls.m2_invoice = cls.add_meter_invoice_consumption(
            cls.meter_2, consumption=1000, end_date='2020-01-31')

    def test_consumption_data_copied_as_expected(self):
        self.assertEqual(Consumption.objects.count(), 1)
        m_cons = Consumption.objects.get()
        self.assertEqual(m_cons.entity_id, self.meter_2.entity.pk)
        self.assertEqual(m_cons.activity_category_id, self.ng_ac.pk)
        self.assertAlmostEqual(m_cons.value, 1000)

    def test_invoice_change_reflected_in_consumption(self):
        self.m2_invoice.consumption = 50
        self.m2_invoice.save()

        m_cons = Consumption.objects.get(
            entity_id=self.meter_2.entity.pk)

        self.assertAlmostEqual(m_cons.value, 50)

    def test_invoice_delete_reflected_in_consumption(self):
        self.m2_invoice.delete()

        with self.assertRaises(Consumption.DoesNotExist):
            Consumption.objects.get(
                entity_id=self.meter_2.entity.pk)

    def test_change_meter_fuel_update_consumption(self):
        self.meter_2.fuel = self.fuel_electricity
        self.meter_2.save()
        m_cons = Consumption.objects.get()
        self.assertEqual(m_cons.activity_category_id, self.elec_ac.pk)

    def test_change_meter_fuel_via_etl_update_consumption(self):
        resp = self.export_and_import(
            self.export_types.METER,
            self.etl_types.METERS,
            self.building.client_id,
            self.meter_2.pk, {
                'fuel': self.fuel_electricity.name,
            }
        )

        self.assertUploadedSuccessfully(resp)

        self.meter_2.refresh_from_db()
        self.assertEqual(self.meter_2.fuel_id, self.fuel_electricity.pk)

        m_cons = Consumption.objects.get()
        self.assertEqual(m_cons.activity_category_id, self.elec_ac.pk)

    def test_change_to_submeter_delete_consumption(self):
        self.meter_2.is_submeter = True
        self.meter_2.save()

        self.assertEqual(Consumption.objects.count(), 0)

    def test_change_to_main_meter_create_consumption(self):
        self.meter_1.is_submeter = False
        self.meter_1.save()

        self.assertEqual(Consumption.objects.count(), 2)
        cons = Consumption.objects.get(
            entity=self.meter_1.entity)
        self.assertAlmostEqual(cons.value, 100)

    def test_change_to_main_meter_via_etl_create_cons(self):
        resp = self.export_and_import(
            self.export_types.METER,
            self.etl_types.METERS,
            self.building.client_id,
            self.meter_1.pk, {
                'is_submeter': 'FALSE'
            }
        )

        self.assertUploadedSuccessfully(resp)

        self.meter_1.refresh_from_db()
        self.assertFalse(self.meter_2.is_submeter)

        self.assertEqual(Consumption.objects.count(), 2)
        cons = Consumption.objects.get(
            entity=self.meter_1.entity)
        self.assertAlmostEqual(cons.value, 100)

    def test_delete_meter_delete_consumption(self):
        self.meter_2.delete()

        self.assertEqual(Consumption.objects.count(), 0)

    def test_bulk_delete_meter_delete_consumption(self):
        Meter.objects.filter(pk=self.meter_2.pk).delete()

        self.assertEqual(Consumption.objects.count(), 0)

    def test_etl_delete_meter_delete_consumption(self):
        resp = self.upload_file(
            self.etl_types.METERS, [{
                'client_key': self.building.client.key,
                'id': self.meter_2.pk
            }],
            method='delete'
        )

        self.assertUploadedSuccessfully(resp)

        self.assertEqual(Consumption.objects.count(), 0)

above is the complete test file

this is the unit test:

from ghg_reporting.tests.base_ghg_test_case import BaseGHGTestCase
from etl.tests.base_test import ExtendedBaseETLTest

from ghg_reporting.models import Consumption
from meters.models import Meter


class TestMeterConsumptionGHGUpdate(BaseGHGTestCase, ExtendedBaseETLTest):
    @classmethod
    def setUpTestData(cls):
        super().setUpTestData()

        cls.elec_ac = cls.create_fuel_category(
            cls.fuel_electricity, 0.1, emission_year=2020)[0]
        cls.ng_ac = cls.create_fuel_category(
            cls.fuel_natural_gas, 0.2, emission_year=2020)[0]
        cls.meter_1 = cls.create_meter(
            'Submeter 1', '11-22-33', fuel=cls.fuel_electricity,
            is_submeter=True)
        cls.meter_2 = cls.create_meter(
            'Meter 1', '22-33-44', fuel=cls.fuel_natural_gas)
        cls.m1_invoice = cls.add_meter_invoice_consumption(
            cls.meter_1, end_date='2020-01-31')
        cls.m2_invoice = cls.add_meter_invoice_consumption(
            cls.meter_2, consumption=1000, end_date='2020-01-31')

    def test_consumption_data_copied_as_expected(self):
        self.assertEqual(Consumption.objects.count(), 1)
        m_cons = Consumption.objects.get()
        self.assertEqual(m_cons.entity_id, self.meter_2.entity.pk)
        self.assertEqual(m_cons.activity_category_id, self.ng_ac.pk)
        self.assertAlmostEqual(m_cons.value, 1000)

    def test_invoice_change_reflected_in_consumption(self):
        self.m2_invoice.consumption = 50
        self.m2_invoice.save()

        m_cons = Consumption.objects.get(
            entity_id=self.meter_2.entity.pk)

        self.assertAlmostEqual(m_cons.value, 50)

    def test_invoice_delete_reflected_in_consumption(self):
        self.m2_invoice.delete()

        with self.assertRaises(Consumption.DoesNotExist):
            Consumption.objects.get(
                entity_id=self.meter_2.entity.pk)

    def test_change_meter_fuel_update_consumption(self):
        self.meter_2.fuel = self.fuel_electricity
        self.meter_2.save()
        m_cons = Consumption.objects.get()
        self.assertEqual(m_cons.activity_category_id, self.elec_ac.pk)

    def test_change_meter_fuel_via_etl_update_consumption(self):
        resp = self.export_and_import(
            self.export_types.METER,
            self.etl_types.METERS,
            self.building.client_id,
            self.meter_2.pk, {
                'fuel': self.fuel_electricity.name,
            }
        )

        self.assertUploadedSuccessfully(resp)

        self.meter_2.refresh_from_db()
        self.assertEqual(self.meter_2.fuel_id, self.fuel_electricity.pk)

        m_cons = Consumption.objects.get()
        self.assertEqual(m_cons.activity_category_id, self.elec_ac.pk)

    def test_change_to_submeter_delete_consumption(self):
        self.meter_2.is_submeter = True
        self.meter_2.save()

        self.assertEqual(Consumption.objects.count(), 0)

    def test_change_to_main_meter_create_consumption(self):
        self.meter_1.is_submeter = False
        self.meter_1.save()

        self.assertEqual(Consumption.objects.count(), 2)
        cons = Consumption.objects.get(
            entity=self.meter_1.entity)
        self.assertAlmostEqual(cons.value, 100)

    def test_change_to_main_meter_via_etl_create_cons(self):
        resp = self.export_and_import(
            self.export_types.METER,
            self.etl_types.METERS,
            self.building.client_id,
            self.meter_1.pk, {
                'is_submeter': 'FALSE'
            }
        )

        self.assertUploadedSuccessfully(resp)

        self.meter_1.refresh_from_db()
        self.assertFalse(self.meter_2.is_submeter)

        self.assertEqual(Consumption.objects.count(), 2)
        cons = Consumption.objects.get(
            entity=self.meter_1.entity)
        self.assertAlmostEqual(cons.value, 100)

    def test_delete_meter_delete_consumption(self):
        self.meter_2.delete()

        self.assertEqual(Consumption.objects.count(), 0)

    def test_bulk_delete_meter_delete_consumption(self):
        Meter.objects.filter(pk=self.meter_2.pk).delete()

        self.assertEqual(Consumption.objects.count(), 0)

    def test_etl_delete_meter_delete_consumption(self):
        resp = self.upload_file(
            self.etl_types.METERS, [{
                'client_key': self.building.client.key,
                'id': self.meter_2.pk
            }],
            method='delete'
        )

        self.assertUploadedSuccessfully(resp)

        self.assertEqual(Consumption.objects.count(), 0)

below is the error

======================================================================
ERROR [0.204s]: test_change_meter_fuel_update_consumption (ghg_reporting.tests.test_meters_consumption_creation.TestMeterConsumptionGHGUpdate.test_change_meter_fuel_update_consumption)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/container/code/ghg_reporting/tests/test_meters_consumption_creation.py", line 53, in test_change_meter_fuel_update_consumption
    m_cons = Consumption.objects.get()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/container/.local/lib/python3.11/site-packages/django/db/models/manager.py", line 87, in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/container/.local/lib/python3.11/site-packages/django/db/models/query.py", line 637, in get
    raise self.model.DoesNotExist(
ghg_reporting.models.Consumption.DoesNotExist: Consumption matching query does not exist.

======================================================================
ERROR [5.308s]: test_change_meter_fuel_via_etl_update_consumption (ghg_reporting.tests.test_meters_consumption_creation.TestMeterConsumptionGHGUpdate.test_change_meter_fuel_via_etl_update_consumption)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/container/code/ghg_reporting/tests/test_meters_consumption_creation.py", line 71, in test_change_meter_fuel_via_etl_update_consumption
    m_cons = Consumption.objects.get()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/container/.local/lib/python3.11/site-packages/django/db/models/manager.py", line 87, in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/container/.local/lib/python3.11/site-packages/django/db/models/query.py", line 637, in get
    raise self.model.DoesNotExist(
ghg_reporting.models.Consumption.DoesNotExist: Consumption matching query does not exist.

======================================================================
FAIL [0.419s]: test_change_to_main_meter_create_consumption (ghg_reporting.tests.test_meters_consumption_creation.TestMeterConsumptionGHGUpdate.test_change_to_main_meter_create_consumption)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/container/code/ghg_reporting/tests/test_meters_consumption_creation.py", line 84, in test_change_to_main_meter_create_consumption
    self.assertEqual(Consumption.objects.count(), 2)
  File "/home/container/code/core/tests/base_test_case.py", line 1108, in assertEqual
    self.assertKeysAlmostEqual(actual, expected, *args, **kwargs)
  File "/home/container/code/core/tests/base_test_case.py", line 837, in assertKeysAlmostEqual
    super().assertEqual(act_value, exp_value, *args, **kwargs)
AssertionError: 1 != 2

======================================================================
FAIL [3.557s]: test_change_to_main_meter_via_etl_create_cons (ghg_reporting.tests.test_meters_consumption_creation.TestMeterConsumptionGHGUpdate.test_change_to_main_meter_via_etl_create_cons)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/container/code/ghg_reporting/tests/test_meters_consumption_creation.py", line 104, in test_change_to_main_meter_via_etl_create_cons
    self.assertEqual(Consumption.objects.count(), 2)
  File "/home/container/code/core/tests/base_test_case.py", line 1108, in assertEqual
    self.assertKeysAlmostEqual(actual, expected, *args, **kwargs)
  File "/home/container/code/core/tests/base_test_case.py", line 837, in assertKeysAlmostEqual
    super().assertEqual(act_value, exp_value, *args, **kwargs)
AssertionError: 1 != 2

----------------------------------------------------------------------
Ran 11 tests in 44.552s

FAILED (failures=2, errors=2)

Generating XML reports...
Preserving test database for alias 'default'...
class Consumption(BaseConsumption, HistoryModel, DataChangedSignalModel):
    objects = managers.ConsumptionManager()
    supplier = models.ForeignKey(
        Client,
        related_name='%(app_label)s_%(class)s_supplier',
        related_query_name='%(app_label)s_%(class)s_supplier_query',
        null=True,
        on_delete=models.CASCADE
    )
    create_time = models.DateTimeField(auto_now_add=True, null=True)
    update_time = models.DateTimeField(auto_now=True, null=True)

    class Meta:
        constraints = [
            UniqueConstraint(
                fields=[
                    'entity_id',
                    'start_date',
                    'end_date',
                    'activity_category_id',
                    'estimated',
                    'market_emission_source',
                    'market_emission_factor',
                ],
                condition=Q(source=ActivitySource.DMS),
                name='dms_rows_not_duplicated_mef'
            ),
            UniqueConstraint(
                fields=[
                    'entity_id',
                    'start_date',
                    'end_date',
                    'activity_category_id',
                    'estimated',
                    'market_emission_source',
                ],
                condition=Q(
                    source=ActivitySource.DMS,
                    market_emission_factor=None
                ),
                name='dms_rows_not_duplicated'
            ),
        ]

    def save(self, ownership_type=OwnershipTypes.OPERATIONAL, owned_percentage=100, **kwargs):
        return super().save(data_changed_kwargs={
            'ownership_details': {0: {
                'ownership_type': ownership_type,
                'owned_percentage': owned_percentage,
            }}
        }, **kwargs)

recent change done by sr dev:

class DateRangeModelManager(DataChangedBulkOperationsManager):
    def bulk_delete(self, object_ids):
        with aggregate_ebds_calculations():
            self.model.objects.filter(id__in=object_ids).delete()
Вернуться на верх