Django tests freeze when running async Celery tasks
I'm trying to create unit tests in Django, that tests the Celery tasks and whether or not the task creates the objects from the CSV data i give it.
For some weird reason my tests freeze when calling .get() on the AsyncResult -objects.
However, when i monitor the Celery logs and the database, it looks like they work normally and return success-messages indicating the tasks are completed.
Outside the testing environment everything works fine, its just when im running Celery in my tests. I have no idea what is going on.
Here's the relevant code:
stations/test.py
class StationsTests(TestCase):
@classmethod
def setUpTestData(cls):
super(StationsTests, cls).setUpTestData()
dirname = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', 'csvimport'))
cls.csv_data_type_station = 'station'
cls.upload_type = 'safe_create'
cls.file_station = os.path.join(dirname, 'CSVFiles/Station_test_csv.csv')
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERY_ALWAYS_EAGER=True)
def test_station_geoJSON_data(self):
self.result_upload_csv = upload_csv.delay(
self.file_station, self.csv_data_type_station, self.upload_type).get(timeout=10)
self.assertEqual(self.result_upload_csv, '10 stations uploaded successfully')
self.assertEqual(len(Station.objects.all()), 10)
settings.py:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'db_hcbapp',
'USER': 'postgres',
'PASSWORD': 'postgres',
'HOST': 'localhost',
'PORT': '5432'
},
}
.....
CELERY_BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_IMPORTS = ('csvimport.tasks')
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_RESULT_EXTENDED = True
celery.py:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')
app.conf.enable_utc = False
app.config_from_object(settings, namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
tasks.py:
@shared_task(bind=True)
def upload_csv(self, file_path, csv_data_type, upload_type):
progress_recorder = ProgressRecorder(self)
csv_data = [] # Temporary list for storing the CSV-rows
if not bool(upload_type == "safe_create") | bool(upload_type == "chunk_create"):
return "The upload type needs to be either Safe Create or Chunk Create"
with open(file_path, 'r') as file:
reader = csv.DictReader(file)
if csv_data_type == 'station':
pattern = "FID,ID,Nimi,Namn,Name,Osoite,Adress,Kaupunki,Stad,Operaattor,Kapasiteet,x,y"
pattern = pattern.replace("(", "\(").replace(")", "\)")
if not re.search(pattern, ",".join(reader.fieldnames), re.IGNORECASE):
raise Exception(
"CSV invalid as it does not contain the correct fieldnames '['FID','ID','Nimi','Namn','Name','Osoite','Adress','Kaupunki','Stad','Operaattor','Kapasiteet','x','y']' for a Station upload.")
# Fields in the CSV (and the index number): [0]FID,[1]ID,[2]Nimi,[3]Namn,[4]Name,[5]Osoite,[6]Address,[7]Kaupunki,[8]Stad,[9]Operaattor,[10]Kapasiteet,[11]x,[12]y
# Fieldnames are reassigned just in case so the object data can be gathered from correctly named row key-names.
reader.fieldnames = ['FID', 'ID', 'Nimi', 'Namn', 'Name', 'Osoite',
'Adress', 'Kaupunki', 'Stad', 'Operaattor', 'Kapasiteet', 'x', 'y']
print("Reading the csv")
for row in reader:
csv_data.append(row)
file.seek(0)
# Creating objects ->>
rowcount = len(csv_data)
print(f"CSV read complete with {rowcount} rows. Creating objects...")
try:
object_list = [] # Temporary list for the objects about to be created in bulk
if csv_data_type == 'station':
print("Uploading a Station")
for i, row in enumerate(csv_data):
# Check empty rows
if any(value for value in row.values()):
try:
csv_data = Station(
station_id=row['ID'],
fid=row['FID'],
name_fin=row['Nimi'],
name_swe=row['Namn'],
name_eng=row['Name'],
address_fin=row['Osoite'],
address_swe=row['Adress'],
city_fin=row['Kaupunki'],
city_swe=row['Stad'],
operator=row['Operaattor'],
capacity=int(row['Kapasiteet']),
geo_pos_x=Decimal(row['x']),
geo_pos_y=Decimal(row['y'])
)
# If station is found in the db, skip to the next iteration
if Station.objects.filter(station_id=csv_data.station_id).exists():
print("Station already exists")
continue
# Add the object to the list
object_list.append(csv_data)
# Chunk create checks if there are 100 Journey-objects, if so, creates them and clears the temporary list
if upload_type == 'chunk_create':
if len(object_list) >= 100:
Station.objects.bulk_create(object_list)
# Clear the list
object_list = []
print(
f"Row {i+1} : Station {row['ID']}.{row['Nimi']} created!")
except ValueError as e:
print(f"Row {i+1} : Value error in row {i+1}! ", e)
except Exception as e:
print(f"Row {i+1} : Exception: {e}")
continue
else:
# In case a csv-row is empty
print(f"Row {i+1} : Field empty")
# Updates the progress after every iteration and passes the current state (in percentages) to the front end
progress_recorder.set_progress(
i + 1, rowcount, f'{round(((i+1) / rowcount) * 100),2}%')
# If there are leftover objects in the list after bulk_create or safe_create has finished creating all objects, add them to the db.
if object_list:
Station.objects.bulk_create(object_list)
return f"{rowcount} stations uploaded successfully"
# Just in case neither Journey or Station upload_type is chosen
else:
print("Select a CSV containing proper values of either Stations or Journeys")
except Exception as e:
print(f"Exception, something went wrong: {e}")
return f"Something went wrong: {e}"
Like i said, outside testing the task and the db works perfectly. Redis and Celery are both up and running and task results are added to the PostgreSQL database.
Any ideas what could be causing the freeze?
I've tried mocking the test method using @patch('myapp.tasks.upload_csv.delay') but that does not seem to create any objects in my real database. Hence with
self.assertEqual(len(Station.objects.all()), 10)
i want to verify the task creates objects in my db.I've tried changing my database back to SQLite but it still freezes up.
I've tried creating other simple temporary tasks in tasks.py. But even those freeze up.
Any ideas?
I figured this out, it might help others too!
It obviously ran the test in a separate database while the Celery task utilized my real database. So even though Celery / Redis worked and returned values, they didn't return values to my test env. In a way a bit obvious.
I recommend to use @override_settings(CELERY_TASK_ALWAYS_EAGER=True) in the test method, that way its included within the test process !