Холст: Проектирование рабочих потоков

Подписи

Добавлено в версии 2.0.

Вы только что узнали, как вызвать задачу с помощью метода tasks delay в руководстве calling, и часто это все, что вам нужно, но иногда вы можете захотеть передать сигнатуру вызова задачи другому процессу или в качестве аргумента другой функции.

signature() оборачивает аргументы, аргументы ключевых слов и параметры выполнения одного вызова задачи таким образом, что его можно передавать функциям или даже сериализовать и передавать по проводам.

  • Вы можете создать сигнатуру для задачи add, используя ее имя следующим образом:

    >>> from celery import signature
    >>> signature('tasks.add', args=(2, 2), countdown=10)
    tasks.add(2, 2)
    

    Эта задача имеет сигнатуру четности 2 (два аргумента): (2, 2), и устанавливает параметр выполнения обратного отсчета на 10.

  • или вы можете создать его, используя метод задачи signature:

    >>> add.signature((2, 2), countdown=10)
    tasks.add(2, 2)
    
  • Существует также короткий путь с использованием аргументов звезды:

    >>> add.s(2, 2)
    tasks.add(2, 2)
    
  • Также поддерживаются аргументы с ключевыми словами:

    >>> add.s(2, 2, debug=True)
    tasks.add(2, 2, debug=True)
    
  • Из любого экземпляра подписи можно просмотреть различные поля:

    >>> s = add.signature((2, 2), {'debug': True}, countdown=10)
    >>> s.args
    (2, 2)
    >>> s.kwargs
    {'debug': True}
    >>> s.options
    {'countdown': 10}
    
  • Он поддерживает «API вызова» delay, apply_async и т.д., включая вызов напрямую (__call__).

    Вызов сигнатуры выполнит задание inline в текущем процессе:

    >>> add(2, 2)
    4
    >>> add.s(2, 2)()
    4
    

    delay - это наше любимое сокращение до apply_async, принимающее звездные аргументы:

    >>> result = add.delay(2, 2)
    >>> result.get()
    4
    

    apply_async принимает те же аргументы, что и метод app.Task.apply_async():

    >>> add.apply_async(args, kwargs, **options)
    >>> add.signature(args, kwargs, **options).apply_async()
    
    >>> add.apply_async((2, 2), countdown=1)
    >>> add.signature((2, 2), countdown=1).apply_async()
    
  • Вы не можете определить опции с помощью s(), но цепной вызов set позаботится об этом:

    >>> add.s(2, 2).set(countdown=1)
    proj.tasks.add(2, 2)
    

Частицы

С помощью подписи вы можете выполнить задание в рабочем:

>>> add.s(2, 2).delay()
>>> add.s(2, 2).apply_async(countdown=1)

Или вы можете вызвать его непосредственно в текущем процессе:

>>> add.s(2, 2)()
4

Указание дополнительных args, kwargs или опций к apply_async/delay создает партиции:

  • Любые добавленные аргументы будут добавлены к аргументам в сигнатуре:

    >>> partial = add.s(2)          # incomplete signature
    >>> partial.delay(4)            # 4 + 2
    >>> partial.apply_async((4,))  # same
    
  • Любые добавленные аргументы ключевых слов будут объединены с kwargs в сигнатуре, при этом новые аргументы ключевых слов будут иметь приоритет:

    >>> s = add.s(2, 2)
    >>> s.delay(debug=True)                    # -> add(2, 2, debug=True)
    >>> s.apply_async(kwargs={'debug': True})  # same
    
  • Любые добавленные опции будут объединены с опциями в подписи, при этом новые опции будут иметь приоритет:

    >>> s = add.signature((2, 2), countdown=10)
    >>> s.apply_async(countdown=1)  # countdown is now 1
    

Вы также можете клонировать подписи для создания производных:

>>> s = add.s(2)
proj.tasks.add(2)

>>> s.clone(args=(4,), kwargs={'debug': True})
proj.tasks.add(4, 2, debug=True)

Неизменность

Добавлено в версии 3.0.

Частицы предназначены для использования с обратными вызовами, любые связанные задачи или хордовые обратные вызовы будут применяться с результатом родительской задачи. Иногда вы хотите указать обратный вызов, который не принимает дополнительных аргументов, и в этом случае вы можете установить сигнатуру неизменяемой:

>>> add.apply_async((2, 2), link=reset_buffers.signature(immutable=True))

Ярлык .si() также может быть использован для создания неизменяемых подписей:

>>> add.apply_async((2, 2), link=reset_buffers.si())

Когда сигнатура неизменяема, можно установить только параметры выполнения, поэтому невозможно вызвать сигнатуру с частичными args/kwargs.

Примечание

В этом учебнике я иногда использую префиксный оператор ~ для подписей. Возможно, вам не стоит использовать его в своем производственном коде, но это удобное сокращение при экспериментах в оболочке Python:

>>> ~sig

>>> # is the same as
>>> sig.delay().get()

Обратные вызовы

Добавлено в версии 3.0.

Обратные вызовы могут быть добавлены к любой задаче с помощью аргумента link к apply_async:

add.apply_async((2, 2), link=other_task.s())

Обратный вызов будет применен только в случае успешного завершения задачи, и он будет применен с возвращаемым значением родительской задачи в качестве аргумента.

Как я уже упоминал ранее, любые аргументы, которые вы добавите к сигнатуре, будут добавлены к аргументам, указанным в самой сигнатуре!

Если у вас есть подпись:

>>> sig = add.s(10)

тогда sig.delay(result) становится:

>>> add.apply_async(args=(result, 10))

Теперь давайте вызовем нашу задачу add с обратным вызовом, используя частичные аргументы:

>>> add.apply_async((2, 2), link=add.s(8))

Как и ожидалось, сначала будет запущена одна задача, вычисляющая 2 + 2, затем другая, вычисляющая 4 + 8.

Примитивы

Добавлено в версии 3.0.

Примитивы также сами являются сигнатурными объектами, поэтому их можно комбинировать любым способом для составления сложных рабочих потоков.

Вот несколько примеров:

  • Простая цепь

    Вот простая цепочка, первая задача выполняется, передавая свое возвращаемое значение следующей задаче в цепочке, и так далее.

    >>> from celery import chain
    
    >>> # 2 + 2 + 4 + 8
    >>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
    >>> res.get()
    16
    

    Это также можно записать с помощью труб:

    >>> (add.s(2, 2) | add.s(4) | add.s(8))().get()
    16
    
  • Неизменяемые подписи

    Подписи могут быть частичными, поэтому аргументы могут быть добавлены к существующим аргументам, но это не всегда нужно, например, если вам не нужен результат предыдущей задачи в цепочке.

    В этом случае вы можете пометить сигнатуру как неизменяемую, чтобы аргументы не могли быть изменены:

    >>> add.signature((2, 2), immutable=True)
    

    Для этого также существует сочетание клавиш .si(), и это предпочтительный способ создания подписей:

    >>> add.si(2, 2)
    

    Теперь вместо этого вы можете создать цепочку независимых задач:

    >>> res = (add.si(2, 2) | add.si(4, 4) | add.si(8, 8))()
    >>> res.get()
    16
    
    >>> res.parent.get()
    8
    
    >>> res.parent.parent.get()
    4
    
  • Простая группа

    Вы можете легко создать группу задач для параллельного выполнения:

    >>> from celery import group
    >>> res = group(add.s(i, i) for i in range(10))()
    >>> res.get(timeout=1)
    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    
  • Простой аккорд

    Примитив chord позволяет нам добавить обратный вызов, который будет вызван, когда все задачи в группе завершат выполнение. Это часто требуется для алгоритмов, которые не являются ужасно параллельными:

    >>> from celery import chord
    >>> res = chord((add.s(i, i) for i in range(10)), xsum.s())()
    >>> res.get()
    90
    

    Приведенный выше пример создает 10 задач, которые запускаются параллельно, и когда все они завершаются, возвращаемые значения объединяются в список и отправляются в задачу xsum.

    Тело аккорда также может быть неизменяемым, чтобы возвращаемое значение группы не передавалось обратному вызову:

    >>> chord((import_contact.s(c) for c in contacts),
    ...       notify_complete.si(import_id)).apply_async()
    

    Обратите внимание на использование .si выше; это создает неизменяемую сигнатуру, что означает, что любые новые переданные аргументы (включая возвращаемое значение предыдущей задачи) будут игнорироваться.

  • Взорвите свой разум, сочетая

    Цепочки тоже могут быть частичными:

    >>> c1 = (add.s(4) | mul.s(8))
    
    # (16 + 4) * 8
    >>> res = c1(16)
    >>> res.get()
    160
    

    это означает, что вы можете комбинировать цепочки:

    # ((4 + 16) * 2 + 4) * 8
    >>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8)))
    
    >>> res = c2()
    >>> res.get()
    352
    

    Соединяя группу с другой задачей, вы автоматически повышаете ее статус до аккорда:

    >>> c3 = (group(add.s(i, i) for i in range(10)) | xsum.s())
    >>> res = c3()
    >>> res.get()
    90
    

    Группы и аккорды также принимают частичные аргументы, поэтому в цепочке возвращаемое значение предыдущей задачи передается всем задачам в группе:

    >>> new_user_workflow = (create_user.s() | group(
    ...                      import_contacts.s(),
    ...                      send_welcome_email.s()))
    ... new_user_workflow.delay(username='artv',
    ...                         first='Art',
    ...                         last='Vandelay',
    ...                         email='art@vandelay.com')
    

    Если вы не хотите передавать аргументы в группу, вы можете сделать подписи в группе неизменяемыми:

    >>> res = (add.s(4, 4) | group(add.si(i, i) for i in range(10)))()
    >>> res.get()
    <GroupResult: de44df8c-821d-4c84-9a6a-44769c738f98 [
        bc01831b-9486-4e51-b046-480d7c9b78de,
        2650a1b8-32bf-4771-a645-b0a35dcc791b,
        dcbee2a5-e92d-4b03-b6eb-7aec60fd30cf,
        59f92e0a-23ea-41ce-9fad-8645a0e7759c,
        26e1e707-eccf-4bf4-bbd8-1e1729c3cce3,
        2d10a5f4-37f0-41b2-96ac-a973b1df024d,
        e13d3bdb-7ae3-4101-81a4-6f17ee21df2d,
        104b2be0-7b75-44eb-ac8e-f9220bdfa140,
        c5c551a5-0386-4973-aa37-b65cbeb2624b,
        83f72d71-4b71-428e-b604-6f16599a9f37]>
    
    >>> res.parent.get()
    8
    

Цепи

Добавлено в версии 3.0.

Задачи могут быть связаны между собой: связанная задача вызывается при успешном завершении задачи:

>>> res = add.apply_async((2, 2), link=mul.s(16))
>>> res.get()
4

Связанная задача будет применена с результатом ее родительской задачи в качестве первого аргумента. В приведенном выше случае, когда результатом было 4, это приведет к mul(4, 16).

Результаты будут отслеживать все подзадачи, вызванные исходной задачей, и к ним можно получить доступ из экземпляра результата:

>>> res.children
[<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>]

>>> res.children[0].get()
64

Экземпляр результата также имеет метод collect(), который рассматривает результат как граф, позволяя вам выполнять итерации по результатам:

>>> list(res.collect())
[(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4),
 (<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)]

По умолчанию collect() вызовет исключение IncompleteStream, если граф не полностью сформирован (одна из задач еще не завершена), но вы можете получить и промежуточное представление графа:

>>> for result, value in res.collect(intermediate=True):
....

Вы можете связать вместе столько задач, сколько пожелаете, и подписи тоже могут быть связаны:

>>> s = add.s(2, 2)
>>> s.link(mul.s(4))
>>> s.link(log_result.s())

Вы также можете добавить обратные вызовы ошибок, используя метод on_error:

>>> add.s(2, 2).on_error(log_error.s()).delay()

Это приведет к следующему вызову .apply_async при применении сигнатуры:

>>> add.apply_async((2, 2), link_error=log_error.s())

Рабочий не будет вызывать errback как задачу, а вместо этого вызовет функцию errback напрямую, чтобы передать ей необработанные объекты запроса, исключения и трассировки.

Вот пример эррбэка:

from __future__ import print_function

import os

from proj.celery import app

@app.task
def log_error(request, exc, traceback):
    with open(os.path.join('/var/errors', request.id), 'a') as fh:
        print('--\n\n{0} {1} {2}'.format(
            request.id, exc, traceback), file=fh)

Чтобы еще больше упростить процесс соединения задач между собой, существует специальная сигнатура chain, которая позволяет соединять задачи в цепочку:

>>> from celery import chain
>>> from proj.tasks import add, mul

>>> # (4 + 4) * 8 * 10
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))
proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10)

Вызов цепочки вызовет задачи в текущем процессе и вернет результат последней задачи в цепочке:

>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
>>> res.get()
640

Он также устанавливает атрибуты parent, чтобы вы могли работать по цепочке вверх для получения промежуточных результатов:

>>> res.parent.get()
64

>>> res.parent.parent.get()
8

>>> res.parent.parent
<AsyncResult: eeaad925-6778-4ad1-88c8-b2a63d017933>

Цепочки также можно создавать с помощью оператора | (pipe):

>>> (add.s(2, 2) | mul.s(8) | mul.s(10)).apply_async()

Графики

Кроме того, вы можете работать с графиком результатов в виде DependencyGraph:

>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()

>>> res.parent.parent.graph
285fa253-fcf8-42ef-8b95-0078897e83e6(1)
    463afec2-5ed4-4036-b22d-ba067ec64f52(0)
872c3995-6fa0-46ca-98c2-5a19155afcf0(2)
    285fa253-fcf8-42ef-8b95-0078897e83e6(1)
        463afec2-5ed4-4036-b22d-ba067ec64f52(0)

Вы даже можете конвертировать эти графики в формат dot:

>>> with open('graph.dot', 'w') as fh:
...     res.parent.parent.graph.to_dot(fh)

и создавать образы:

$ dot -Tpng graph.dot -o graph.png
../../_images/result_graph.png

Группы

Добавлено в версии 3.0.

Группа может использоваться для параллельного выполнения нескольких задач.

Функция group принимает список сигнатур:

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(2, 2), add.s(4, 4))
(proj.tasks.add(2, 2), proj.tasks.add(4, 4))

Если вы вызовете группу, задачи будут применяться одна за другой в текущем процессе, и будет возвращен экземпляр GroupResult, который можно использовать для отслеживания результатов, или сообщить, сколько задач готово и так далее:

>>> g = group(add.s(2, 2), add.s(4, 4))
>>> res = g()
>>> res.get()
[4, 8]

Группа также поддерживает итераторы:

>>> group(add.s(i, i) for i in range(100))()

Группа - это объект подписи, поэтому ее можно использовать в сочетании с другими подписями.

Групповые обратные вызовы и обработка ошибок

Группы также могут иметь связанные с ними сигнатуры обратного вызова и возврата, однако их поведение может быть несколько неожиданным из-за того, что группы не являются реальными задачами и просто передают связанные задачи вниз к их инкапсулированным сигнатурам. Это означает, что возвращаемые значения группы не собираются для передачи в связанную сигнатуру обратного вызова. В качестве примера, следующий фрагмент, использующий простую задачу add(a, b), является ошибочным, поскольку связанная сигнатура add.s() не получит конечный результат группы, как можно было бы ожидать.

>>> g = group(add.s(2, 2), add.s(4, 4))
>>> g.link(add.s())
>>> res = g()
[4, 8]

Обратите внимание, что возвращаются завершенные результаты первых двух задач, но сигнатура обратного вызова будет выполняться в фоновом режиме и вызовет исключение, поскольку она не получила два ожидаемых аргумента.

Групповые возвраты передаются в инкапсулированные сигнатуры, что открывает возможность того, что возврат, связанный только один раз, будет вызываться более одного раза, если несколько задач в группе окажутся неудачными. В качестве примера можно привести следующий фрагмент, использующий задачу fail(), которая вызывает исключение, и ожидается, что сигнатура log_error() будет вызываться один раз для каждой сбойной задачи, выполняемой в группе.

>>> g = group(fail.s(), fail.s())
>>> g.link_error(log_error.s())
>>> res = g()

Учитывая это, обычно рекомендуется создавать идемпотентные или счетные задачи, которые терпимы к многократным вызовам для использования в качестве возвратов.

Для этих случаев лучше использовать класс chord, который поддерживается в некоторых реализациях бэкенда.

Групповые результаты

Групповая задача также возвращает специальный результат, этот результат работает так же, как и результаты обычных задач, за исключением того, что он работает над группой в целом:

>>> from celery import group
>>> from tasks import add

>>> job = group([
...             add.s(2, 2),
...             add.s(4, 4),
...             add.s(8, 8),
...             add.s(16, 16),
...             add.s(32, 32),
... ])

>>> result = job.apply_async()

>>> result.ready()  # have all subtasks completed?
True
>>> result.successful() # were all subtasks successful?
True
>>> result.get()
[4, 8, 16, 32, 64]

GroupResult принимает список экземпляров AsyncResult и работает с ними так, как если бы это была одна задача.

Он поддерживает следующие операции:

  • successful()

    Возвращает True, если все подзадачи завершились успешно (например, не вызвали исключения).

  • failed()

    Верните True, если какая-либо из подзадач не выполнилась.

  • waiting()

    Возвращает True, если какая-либо из подзадач еще не готова.

  • ready()

    Верните True, если все подзадачи готовы.

  • completed_count()

    Возвращает количество выполненных подзадач.

  • revoke()

    Отмените все подзадачи.

  • join()

    Соберите результаты всех подзадач и верните их в том же порядке, в котором они были вызваны (в виде списка).

Аккорды

Добавлено в версии 2.3.

Примечание

Задачи, используемые внутри аккорда, не должны не игнорировать свои результаты. Если бэкенд результатов отключен для любой задачи (заголовка или тела) в вашем аккорде, вы должны прочитать «Важные замечания». В настоящее время аккорды не поддерживают бэкенд результатов RPC.

Аккорд - это задание, которое выполняется только после завершения выполнения всех заданий в группе.

Вычислим сумму выражения 1 + 1 + 2 + 2 + 3 + 3 ... n + n до ста цифр.

Сначала вам нужны две задачи, add() и tsum() (sum() уже является стандартной функцией):

@app.task
def add(x, y):
    return x + y

@app.task
def tsum(numbers):
    return sum(numbers)

Теперь вы можете использовать хорду для параллельного вычисления каждого шага сложения, а затем получить сумму получившихся чисел:

>>> from celery import chord
>>> from tasks import add, tsum

>>> chord(add.s(i, i)
...       for i in range(100))(tsum.s()).get()
9900

Очевидно, что это очень надуманный пример, накладные расходы на обмен сообщениями и синхронизацию делают его намного медленнее, чем его аналог в Python:

>>> sum(i + i for i in range(100))

Шаг синхронизации является дорогостоящим, поэтому следует по возможности избегать использования хордов. Тем не менее, аккорд - это мощный примитив, который необходимо иметь в своем инструментарии, поскольку синхронизация является необходимым шагом для многих параллельных алгоритмов.

Давайте разберем выражение аккорда на части:

>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()
9900

Помните, что обратный вызов может быть выполнен только после возврата всех заданий в заголовке. Каждый шаг в заголовке выполняется как задача, параллельно, возможно, на разных узлах. Затем применяется обратный вызов с возвращаемым значением каждой задачи в заголовке. Идентификатор задачи, возвращаемый командой chord(), является идентификатором обратного вызова, поэтому вы можете дождаться его завершения и получить окончательное возвращаемое значение (но не забудьте never have a task wait for other tasks).

Обработка ошибок

Что же произойдет, если одна из задач вызовет исключение?

Результат обратного вызова аккорда переходит в состояние отказа, а ошибка устанавливается как исключение ChordError:

>>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
>>> result = c()
>>> result.get()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "*/celery/result.py", line 120, in get
    interval=interval)
  File "*/celery/backends/amqp.py", line 150, in wait_for
    raise meta['result']
celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
    raised ValueError('something something',)

Хотя отслеживание может отличаться в зависимости от используемого бэкенда результатов, вы можете видеть, что описание ошибки включает идентификатор задачи, которая потерпела неудачу, и строковое представление исходного исключения. Вы также можете найти исходный traceback в result.traceback.

Обратите внимание, что остальные задачи все равно будут выполнены, поэтому третья задача (add.s(8, 8)) все равно будет выполнена, даже если средняя задача не выполнилась. Также ChordError показывает только ту задачу, которая не выполнилась первой (по времени): это не соблюдает порядок группы заголовков.

Чтобы выполнить действие при сбое аккорда, можно прикрепить к обратному вызову аккорда обратный вызов (errback):

@app.task
def on_chord_error(request, exc, traceback):
    print('Task {0!r} raised error: {1!r}'.format(request.id, exc))
>>> c = (group(add.s(i, i) for i in range(10)) |
...      xsum.s().on_error(on_chord_error.s())).delay()

Аккорды могут иметь связанные с ними сигнатуры обратных вызовов и возвратов, что решает некоторые проблемы, связанные с привязкой сигнатур к группам. В этом случае предоставленная подпись будет связана с телом аккорда, которое, как можно ожидать, будет изящно вызывать обратные вызовы только один раз при завершении тела, или обратные вызовы только один раз, если какая-либо задача в заголовке или теле аккорда не выполнится.

Важные замечания

Задачи, используемые внутри аккорда, не должны не игнорировать свои результаты. На практике это означает, что вы должны включить result_backend, чтобы использовать аккорды. Кроме того, если в вашей конфигурации task_ignore_result установлено значение True, убедитесь, что отдельные задачи, используемые в аккорде, определены с помощью ignore_result=False. Это относится как к подклассам Task, так и к декорированным задачам.

Пример подкласса задачи:

class MyTask(Task):
    ignore_result = False

Пример оформленного задания:

@app.task(ignore_result=False)
def another_task(project):
    do_something()

По умолчанию шаг синхронизации реализуется с помощью повторяющейся задачи, которая каждую секунду опрашивает завершение группы и вызывает сигнатуру, когда она готова.

Пример реализации:

from celery import maybe_signature

@app.task(bind=True)
def unlock_chord(self, group, callback, interval=1, max_retries=None):
    if group.ready():
        return maybe_signature(callback).delay(group.join())
    raise self.retry(countdown=interval, max_retries=max_retries)

Это используется всеми бэкендами результатов, кроме Redis и Memcached: они увеличивают счетчик после каждой задачи в заголовке, а затем применяют обратный вызов, когда счетчик превышает количество задач в наборе.

Подход с использованием Redis и Memcached является гораздо лучшим решением, но его нелегко реализовать в других бэкендах (предложения приветствуются!).

Примечание

Аккорды не работают должным образом с Redis до версии 2.2; для их использования вам потребуется обновление, по крайней мере, до redis-server 2.2.

Примечание

Если вы используете аккорды с бэкендом результатов Redis, а также переопределяете метод Task.after_return(), вам нужно убедиться, что вы вызываете метод super, иначе обратный вызов аккорда не будет применен.

def after_return(self, *args, **kwargs):
    do_something()
    super().after_return(*args, **kwargs)

Карта и Стармап

map и starmap - это встроенные задачи, которые вызывают предусмотренную вызывающую задачу для каждого элемента в последовательности.

Они отличаются от group тем, что:

  • отправляется только одно сообщение о задаче.

  • операция является последовательной.

Например, используя map:

>>> from proj.tasks import add

>>> ~xsum.map([range(10), range(100)])
[45, 4950]

это то же самое, что и выполнение задачи:

@app.task
def temp():
    return [xsum(range(10)), xsum(range(100))]

и используя starmap:

>>> ~add.starmap(zip(range(10), range(10)))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

это то же самое, что и выполнение задачи:

@app.task
def temp():
    return [add(i, i) for i in range(10)]

И map, и starmap являются сигнатурными объектами, поэтому их можно использовать как другие сигнатуры и объединять в группы и т.д., например, для вызова starmap через 10 секунд:

>>> add.starmap(zip(range(10), range(10))).apply_async(countdown=10)

Куски

Chunking позволяет разделить итерабельную работу на части, так что если у вас есть один миллион объектов, вы можете создать 10 задач со ста тысячами объектов каждая.

Некоторые могут беспокоиться, что разбиение задач на части приводит к снижению параллелизма, но это редко бывает верно для загруженного кластера, а на практике, поскольку вы избегаете накладных расходов на обмен сообщениями, это может значительно повысить производительность.

Для создания сигнатуры чанков можно использовать app.Task.chunks():

>>> add.chunks(zip(range(100), range(100)), 10)

Как и в случае с group, акт отправки сообщений для чанков будет происходить в текущем процессе при вызове:

>>> from proj.tasks import add

>>> res = add.chunks(zip(range(100), range(100)), 10)()
>>> res.get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
 [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
 [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
 [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
 [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
 [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
 [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
 [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
 [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
 [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]

в то время как вызов .apply_async создаст выделенную задачу, так что отдельные задачи будут применяться в рабочем вместо этого:

>>> add.chunks(zip(range(100), range(100)), 10).apply_async()

Вы также можете преобразовать фрагменты в группу:

>>> group = add.chunks(zip(range(100), range(100)), 10).group()

и вместе с группой изменить отсчет времени выполнения каждого задания на единицу:

>>> group.skew(start=1, stop=10)()

Это означает, что первая задача будет иметь обратный отсчет одной секунды, вторая задача - двух секунд и так далее.

Вернуться на верх