Django Celery не обновляет базу данных и S3
Я создаю проект на django, в котором есть несколько приложений. Для одного из приложений, которое обрабатывает последние данные (загруженные), я собираюсь использовать celery, так как это занимает некоторое время (около 20-30 секунд) для получения и выборки данных из AWS s3, обработки данных, создания нового экземпляра модели на db (PostgreSQL) и загрузки его обратно в обработанную папку на s3. В настоящее время это хорошо работает на localhost без celery, но таймаут heroku составляет 30 секунд, поэтому я собираюсь использовать celery. (Сейчас я тестирую все это на моем локалхосте, без участия heroku, для информации)
Мой текущий код выглядит следующим образом и работает корректно и без ошибок: В файле tasks.py (в папке processes app):
from django.contrib.auth.models import User
from functions.income_processor import IncomeProcessor
from functions.expense_processor import ExpenseProcessor
from utils.s3_utils import get_static_data, get_latest_income_data, get_latest_expense_data, save_processed_data
def process_income_task(user_id):
user = User.objects.get(id=user_id)
try:
static_data = get_static_data()
income_data = get_latest_income_data(user)
if income_data is None:
raise ValueError("No income data available")
process = IncomeProcessor(static_data, income_data)
process.process()
final_df = process.get_final_df()
save_processed_data(user, final_df, 'INCOME')
except Exception as e:
print(f"Task failed: {str(e)}")
raise e
В файле views.py:
@login_required
def initiate_income_process(request):
process_income_task(request.user.id)
return render(request, 'processes/processing_started.html', {'process_type': 'income'})
def display_income(request):
user = request.user
latest_processed_data = ProcessedData.objects.filter(user=user, data_type='INCOME').order_by('-upload_date').first()
print(f"Latest processed data: {latest_processed_data.filename if latest_processed_data else 'None'}")
df = get_cached_file_data('INCOME', user)
if df is None:
print("No cached data available")
return render(request, 'uploads/error_template.html', {'message': 'No data available'})
print(f"Displaying data from: {latest_processed_data.filename}")
Затем моя функция display_income продолжает готовить данные для отображения на шаблоне.
Поскольку эти процессы занимают время, я перешел на использование celery. Поэтому я изменил свой код, фактически я только добавил декоратор общих задач:
from celery import shared_task
from django.contrib.auth.models import User
from functions.income_processor import IncomeProcessor
from functions.expense_processor import ExpenseProcessor
from utils.s3_utils import get_static_data, get_latest_income_data, get_latest_expense_data, save_processed_data
@shared_task
def process_income_task(user_id):
user = User.objects.get(id=user_id)
try:
static_data = get_static_data()
income_data = get_latest_income_data(user)
if income_data is None:
raise ValueError("No income data available")
process = IncomeProcessor(static_data, income_data)
process.process()
final_df = process.get_final_df()
save_processed_data(user, final_df, 'INCOME')
except Exception as e:
print(f"Task failed: {str(e)}")
raise e
И я изменил свой views.py, добавив команду delay в задачу:
from utils.s3_utils import get_file_from_s3, get_cached_file_data
from .models import ProcessedData
from django.contrib.auth.decorators import login_required
from django.shortcuts import render
from .tasks import process_income_task
@login_required
def initiate_income_process(request):
process_income_task.delay(request.user.id) # added delay
return render(request, 'processes/processing_started.html', {'process_type': 'income'})
def display_income(request):
user = request.user
latest_processed_data = ProcessedData.objects.filter(user=user, data_type='INCOME').order_by('-upload_date').first()
print(f"Latest processed data: {latest_processed_data.filename if latest_processed_data else 'None'}")
df = get_cached_file_data('INCOME', user)
if df is None:
print("No cached data available")
return render(request, 'uploads/error_template.html', {'message': 'No data available'})
print(f"Displaying data from: {latest_processed_data.filename}")
Все настроено для celery и брокера сообщений. На терминале задание принимается
[2024-08-27 14:36:22,120: INFO/MainProcess] Task processes.tasks.process_income_task[8232b00c-a2da-4ff7-b4bd-179421f2a607] received
Однако ни в базе данных, ни в s3 нет последних обновлений, и даже если я включаю в задачу несколько операторов печати, они не показывают ничего, кроме
[2024-08-27 14:36:22,120: INFO/MainProcess] Task processes.tasks.process_income_task[8232b00c-a2da-4ff7-b4bd-179421f2a607] received
[2024-08-27 14:36:25,255: INFO/SpawnPoolWorker-58] child process 22104 calling self.run()
[2024-08-27 14:36:25,262: INFO/SpawnPoolWorker-52] child process 10768 calling self.run()
[2024-08-27 14:36:25,268: INFO/SpawnPoolWorker-55] child process 24804 calling self.run
Я запускаю терминал celery со следующей командой, alfa - это мой проект:
celery -A alfa worker -l INFO
Я буду благодарен, если кто-нибудь сможет помочь, и я могу предоставить полный код, если он не понятен. Но остальной код не изменился и не был затронут этими кодами, я думаю. Все, что я сделал, это добавил задачу shaerd в tasks.py и команду delay в views.py
Я не уверен, почему он не работает так, как ожидалось.
Еще раз спасибо за помощь введите код здесь