Как настроить celery для параллельного выполнения с несколькими процессами?

У меня есть задача, которая общается с внешним API, ответ json довольно большой, и мне приходится выполнять этот вызов несколько раз с последующей обработкой в python. Чтобы сделать это менее трудоемким, я сначала попробовал:

def make_call(*args, **kwargs):
    pass

def make_another(*args, **kwargs):
    pass

def get_calls():
    return make_call, make_another

def task(*args, **kwargs):
    procs = [Process(target=get_calls()[i], args=(,), 
    kwargs={}) for i in range(3)]
    _start = [proc.start() for proc in procs]
    _join = [proc.join() for proc in procs]

# 
transaction.on_commit(lambda: task.delay()) 

Однако я столкнулся с ``AssertionError: daemonic processes are not allowed to have children. Каким будет мой лучший подход к ускорению задачи celery с помощью дополнительных процессов?

Рабочий процесс Celery уже создает множество процессов. Используйте преимущества множества рабочих процессов вместо создания дочерних процессов. Вместо этого вы можете делегировать работу между рабочими процессами celery. Это приведет к более стабильному/надежному выполнению.

Вы можете либо просто создать множество задач из вашего клиентского кода, либо использовать примитивы celery, такие как цепочки или хорды для распараллеливания работы. Их также можно комбинировать с другими примитивами, такими как группы и т.д.

Например, в вашем сценарии у вас может быть две задачи: одна - сделать вызов (вызовы) API make_api_call, а другая - разобрать ответ parse_response. Вы можете соединить их вместе .

# chain another task when a task completes successfully
res = make_api_call.apply_async((0,), link=parse_response.s())

# chain syntax 1
result_1 = chain(make_api_call.s(1), parse_response.s())
# syntax 2 with | operator
result_b = make_api_call.s(2) | parse_response.s()

# can group chains
job = group([
   chain(make_api_call.s(i), parse_response.s()) 
   for i in range(3)
   ]
)
result = job.apply_async()

Это всего лишь общий пример. Вы можете создавать задачи и компоновать их в соответствии с требованиями вашего рабочего процесса. См: Canvas: Проектирование рабочих потоков для получения дополнительной информации.

Вернуться на верх