Как получить данные из БД Django перед отправкой задания Celery удаленному работнику?
У меня есть сельдерей shared_task, который планируется запускать через определенные промежутки времени. Каждый раз, когда эта задача выполняется, ей необходимо сначала получить данные из Django DB, чтобы завершить вычисления. Эта задача может быть или не быть отправлена на celery worker, который находится на отдельной машине, поэтому в задаче celery я не могу делать никаких запросов к локальной базе данных celery
До сих пор я пытался использовать сигналы для достижения этой цели, поскольку я знаю, что функции с оберткой @before_task_publish выполняются еще до того, как задача будет опубликована в очереди сообщений. Однако я не знаю, как я могу фактически получить данные для задачи.
@shared_task
def run_computation(data):
perform_computation(data)
@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
data = create_data()
# How do I get the data to the task from here?
Правильный ли это способ решения проблемы? Или мне лучше сделать API маршрут, который задача celery может получить для получения данных?
Можно изменить данные задачи на месте, из обработчика before_task_publish, так, чтобы они были переданы задаче. Сразу скажу, что есть множество причин почему это не очень хорошая идея:
@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
data = create_data()
# Modify the body of the task data.
# Body is a tuple, the first entry of which is a tuple of arguments to the task.
# So we replace the first argument (data) with our own.
body[0][0] = data
# Alternatively modify the kwargs, which is a bit more explicit
body[1]['data'] = data
Это работает, но должно быть очевидно, почему это рискованно и чревато поломками. Предполагая, что у вас есть контроль над сайтами вызова задач, я думаю, что было бы лучше вообще отказаться от сигнала и просто иметь простую функцию, которая делает работу за вас, т.е.:
def create_task(data):
data = create_data()
run_computation.delay(data)
А затем в вызывающем коде просто вызовите create_task(data) вместо прямого вызова задачи (что вы, предположительно, делаете прямо сейчас).
Я размещаю решение, которое сработало для меня, спасибо за помощь @solarissmoke. То, что работает лучше всего для меня, это использование "цепных" функций обратного вызова Celery и отдельных очередей RabbitMQ для обозначения того, что будет вычисляться локально, а что на удаленном рабочем месте. Мое решение выглядит примерно так:
@app.task
def create_data_task():
# this creates the data to be passed to the analysis function
return create_data()
@app.task
def perform_computation_task(data):
# This performs the computation with given data
return perform_computation(data)
@app.task
def store_results(result):
# This would store the result in the DB here, but for now we just print it
print(result)
@app.task
def run_all_computation():
task = signature("path.to.tasks.create_data_task", queue="default") | signature("path.to.tasks.perform_computation_task", queue="remote_computation") | signature("path.to.tasks.store", queue="default")
task()
Здесь важно отметить, что эти задачи не выполнялись последовательно; на самом деле это отдельные задачи, которые выполняются рабочими и поэтому не блокируют ни один поток. Другие задачи активируются только функцией обратного вызова от других. Я объявил две очереди celery в RabbitMQ, одну по умолчанию под названием default, и одну специально для удаленных вычислений под названием "remote_computation". Это описано в явном виде здесь , включая то, как направлять celery workers на созданные очереди.