Django Celery: Задача с часами не выполняется
В приложении Django у меня есть форма, которая планирует отправку электронного письма. Она имеет четыре поля: имя, email, body, send_date. Я хочу динамически создать задачу Celery (email) для запуска другой задачи Celery в назначенное время.
Я смог отправить письмо через регулярные промежутки времени (каждые 30 секунд) на основе формы, используя следующий код:
schedule, _ = IntervalSchedule.objects.update_or_create(every=30, period=IntervalSchedule.SECONDS)
@shared_task(name="schedule_interval_email")
def schedule_email_interval(name, email, body, send_date):
PeriodicTask.objects.update_or_create(
defaults={
"interval": schedule,
"task": "email_task"
},
name="Send message at interval",
args=json.dumps(['name', 'test@test.com', 'body']),
)
Однако, когда я пытался запланировать выполнение задачи в определенное время (через 3 минуты от текущего времени) с помощью ClockedSchedule, Celery beat записывает задачи и сохраняет все соответствующие настройки. Задача отображается активной в области администратора Django. Однако письмо так и не отправляется.
clocked = ClockedSchedule.objects.create(clocked_time=datetime.now() + timedelta(minutes=3))
@shared_task(name="schedule_clock_email")
def schedule_email_clocked(name, email, body, send_date):
PeriodicTask.objects.create(
clocked=clocked,
name="Send message at specific date/time",
task="email_task",
one_off=True,
args=json.dumps(['name', 'test@test.com', 'body']),
)
В конечном итоге я хочу динамически устанавливать поле clocked на основе времени даты, которое пользователь вводит в форму, поэтому текущий код просто пытается проверить, как работает Celery. Мне кажется, что я что-то упускаю в том, как это работает. Любые мысли будут высоко оценены.
Думаю, дело в часовом поясе.
Если ваша конфигурация не вступает в силу.База данных celery хранит время UTC по умолчанию.
Вы можете проверить это, выполнив следующий код.
В среде оболочки Django
import datetime
import json
from ProjectName.celerytest import test
execution_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=2)
schedule, created = ClockedSchedule.objects.get_or_create(clocked_time=execution_time)
PeriodicTask.objects.create(clocked=schedule,one_off=True,name="test input",task="projectName.celerytest.test",args=json.dumps(['112233']))