Django Celery is not updating database and S3
I am building a django project that has multiple apps. For one of the apps, which processes the latest data (uploaded), I am going to use celery since it it taking sometime (around 20-30 seconds) to retrieve and fetch data from AWS s3, process the data, create new instance of model on db (PostgreSQL) and upload it back to processed folder on s3. Currently it is working well on localhost without celery, but heroku's timeout is 30 seconds, therefore I am going to use celery. (Now I am testing it all on my localhost, no heroku involved, for information.)
My current code is as follows and working correctly without error: On tasks.py (in processes app folder):
from django.contrib.auth.models import User
from functions.income_processor import IncomeProcessor
from functions.expense_processor import ExpenseProcessor
from utils.s3_utils import get_static_data, get_latest_income_data, get_latest_expense_data, save_processed_data
def process_income_task(user_id):
user = User.objects.get(id=user_id)
try:
static_data = get_static_data()
income_data = get_latest_income_data(user)
if income_data is None:
raise ValueError("No income data available")
process = IncomeProcessor(static_data, income_data)
process.process()
final_df = process.get_final_df()
save_processed_data(user, final_df, 'INCOME')
except Exception as e:
print(f"Task failed: {str(e)}")
raise e
On views.py:
@login_required
def initiate_income_process(request):
process_income_task(request.user.id)
return render(request, 'processes/processing_started.html', {'process_type': 'income'})
def display_income(request):
user = request.user
latest_processed_data = ProcessedData.objects.filter(user=user, data_type='INCOME').order_by('-upload_date').first()
print(f"Latest processed data: {latest_processed_data.filename if latest_processed_data else 'None'}")
df = get_cached_file_data('INCOME', user)
if df is None:
print("No cached data available")
return render(request, 'uploads/error_template.html', {'message': 'No data available'})
print(f"Displaying data from: {latest_processed_data.filename}")
Then my display_income function continues on preparing data to show on the template.
Since those processes take time, I moved to use celery. So I modified my code, actually i just have added shared task decorator only:
from celery import shared_task
from django.contrib.auth.models import User
from functions.income_processor import IncomeProcessor
from functions.expense_processor import ExpenseProcessor
from utils.s3_utils import get_static_data, get_latest_income_data, get_latest_expense_data, save_processed_data
@shared_task
def process_income_task(user_id):
user = User.objects.get(id=user_id)
try:
static_data = get_static_data()
income_data = get_latest_income_data(user)
if income_data is None:
raise ValueError("No income data available")
process = IncomeProcessor(static_data, income_data)
process.process()
final_df = process.get_final_df()
save_processed_data(user, final_df, 'INCOME')
except Exception as e:
print(f"Task failed: {str(e)}")
raise e
And I modified my views.py by adding delay command on the task:
from utils.s3_utils import get_file_from_s3, get_cached_file_data
from .models import ProcessedData
from django.contrib.auth.decorators import login_required
from django.shortcuts import render
from .tasks import process_income_task
@login_required
def initiate_income_process(request):
process_income_task.delay(request.user.id) # added delay
return render(request, 'processes/processing_started.html', {'process_type': 'income'})
def display_income(request):
user = request.user
latest_processed_data = ProcessedData.objects.filter(user=user, data_type='INCOME').order_by('-upload_date').first()
print(f"Latest processed data: {latest_processed_data.filename if latest_processed_data else 'None'}")
df = get_cached_file_data('INCOME', user)
if df is None:
print("No cached data available")
return render(request, 'uploads/error_template.html', {'message': 'No data available'})
print(f"Displaying data from: {latest_processed_data.filename}")
All is set for celery and message broker. On the terminal, the task is being recieved
[2024-08-27 14:36:22,120: INFO/MainProcess] Task processes.tasks.process_income_task[8232b00c-a2da-4ff7-b4bd-179421f2a607] received
However, there is no recent updates neither on database nor on s3, and even if I include some print statements on task, it is not showing anything except
[2024-08-27 14:36:22,120: INFO/MainProcess] Task processes.tasks.process_income_task[8232b00c-a2da-4ff7-b4bd-179421f2a607] received
[2024-08-27 14:36:25,255: INFO/SpawnPoolWorker-58] child process 22104 calling self.run()
[2024-08-27 14:36:25,262: INFO/SpawnPoolWorker-52] child process 10768 calling self.run()
[2024-08-27 14:36:25,268: INFO/SpawnPoolWorker-55] child process 24804 calling self.run
I run celery terminal with following command, alfa is my project:
celery -A alfa worker -l INFO
I would be grateful if anyone could help, and I can provide the full code if it is not clear. But the rest of code is unchanged and not affected by those codes I think. All I did was just to add shaerd task on tasks.py and delay command on views.py
I am not sure why it is not working as expected.
Thanks again for the help enter code here