celery — Распределенная обработка


Этот модуль является основной точкой входа в Celery API. Он включает в себя необходимые вещи для вызова задач и создания Celery-приложений.

Celery

Экземпляр приложения Celery

group

группировать задачи вместе

chain

объединять задачи в цепочку

chord

аккорды включают обратные вызовы для групп

signature()

создать новую подпись задачи

Signature

объект, описывающий вызов задачи

current_app

прокси для текущего экземпляра приложения

current_task

прокси для текущего выполняемого задания

Celery объекты приложения

Добавлено в версии 2.5.

class celery.Celery(main=None, loader=None, backend=None, amqp=None, events=None, log=None, control=None, set_as_current=True, tasks=None, broker=None, include=None, changes=None, config_source=None, fixups=None, task_cls=None, autofinalize=True, namespace=None, strict_typing=True, **kwargs)[исходный код]

Celery application.

Параметры:

main (str) – Name of the main module if running as __main__. This is used as the prefix for auto-generated task names.

Именованные аргументы:
  • broker (str) – URL of the default broker used.

  • backend (Union[str, Type[celery.backends.base.Backend]]) –

    The result store backend class, or the name of the backend class to use.

    Default is the value of the result_backend setting.

  • autofinalize (bool) – If set to False a RuntimeError will be raised if the task registry or tasks are used before the app is finalized.

  • set_as_current (bool) – Make this the global current app.

  • include (List[str]) – List of modules every worker should import.

  • amqp (Union[str, Type[AMQP]]) – AMQP object or class name.

  • events (Union[str, Type[celery.app.events.Events]]) – Events object or class name.

  • log (Union[str, Type[Logging]]) – Log object or class name.

  • control (Union[str, Type[celery.app.control.Control]]) – Control object or class name.

  • tasks (Union[str, Type[TaskRegistry]]) – A task registry, or the name of a registry class.

  • fixups (List[str]) – List of fix-up plug-ins (e.g., see celery.fixups.django).

  • config_source (Union[str, class]) – Take configuration from a class, or object. Attributes may include any settings described in the documentation.

  • task_cls (Union[str, Type[celery.app.task.Task]]) – base task class to use. See this section for usage.

user_options = None

Custom options for command-line programs. See Добавление новых опций командной строки

steps = None

Custom bootsteps to extend and modify the worker. See Установка шагов загрузки.

current_task

Instance of task being executed, or None.

current_worker_task

The task currently being executed by a worker or None.

Differs from current_task in that it’s not affected by tasks calling other tasks directly, or eagerly.

amqp

amqp.

Type:

AMQP related functionality

backend

Current backend instance.

loader

Current loader instance.

control

control.

Type:

Remote control

events

events.

Type:

Consuming and sending events

log

log.

Type:

Logging

tasks

Task registry.

Предупреждение

Accessing this attribute will also auto-finalize the app.

pool

pool.

Примечание

This attribute is not related to the workers concurrency pool.

Type:

Broker connection pool

producer_pool
Task

Base task class for this app.

timezone

Current timezone for this app.

This is a cached property taking the time zone from the timezone setting.

builtin_fixups = {'celery.fixups.django:fixup'}
oid

Universally unique identifier for this app.

close()[исходный код]

Clean up after the application.

Only necessary for dynamically created apps, and you should probably use the with statement instead.

Пример

>>> with Celery(set_as_current=False) as app:
...     with app.connection_for_write() as conn:
...         pass
signature(*args, **kwargs)[исходный код]

Return a new Signature bound to this app.

bugreport()[исходный код]

Return information useful in bug reports.

config_from_object(obj, silent=False, force=False, namespace=None)[исходный код]

Read configuration from object.

Object is either an actual object or the name of a module to import.

Пример

>>> celery.config_from_object('myapp.celeryconfig')
>>> from myapp import celeryconfig
>>> celery.config_from_object(celeryconfig)
Параметры:
  • silent (bool) – If true then import errors will be ignored.

  • force (bool) – Force reading configuration immediately. By default the configuration will be read only when required.

config_from_envvar(variable_name, silent=False, force=False)[исходный код]

Read configuration from environment variable.

The value of the environment variable must be the name of a module to import.

Пример

>>> os.environ['CELERY_CONFIG_MODULE'] = 'myapp.celeryconfig'
>>> celery.config_from_envvar('CELERY_CONFIG_MODULE')
autodiscover_tasks(packages=None, related_name='tasks', force=False)[исходный код]

Auto-discover task modules.

Searches a list of packages for a «tasks.py» module (or use related_name argument).

If the name is empty, this will be delegated to fix-ups (e.g., Django).

For example if you have a directory layout like this:

foo/__init__.py
   tasks.py
   models.py

bar/__init__.py
    tasks.py
    models.py

baz/__init__.py
    models.py

Then calling app.autodiscover_tasks(['foo', 'bar', 'baz']) will result in the modules foo.tasks and bar.tasks being imported.

Параметры:
  • packages (List[str]) – List of packages to search. This argument may also be a callable, in which case the value returned is used (for lazy evaluation).

  • related_name (Optional[str]) – The name of the module to find. Defaults to «tasks»: meaning «look for „module.tasks“ for every module in packages.». If None will only try to import the package, i.e. «look for „module“».

  • force (bool) – By default this call is lazy so that the actual auto-discovery won’t happen until an application imports the default modules. Forcing will cause the auto-discovery to happen immediately.

add_defaults(fun)[исходный код]

Add default configuration from dict d.

If the argument is a callable function then it will be regarded as a promise, and it won’t be loaded until the configuration is actually needed.

This method can be compared to:

>>> celery.conf.update(d)

with a difference that 1) no copy will be made and 2) the dict will not be transferred when the worker spawns child processes, so it’s important that the same configuration happens at import time when pickle restores the object on the other side.

add_periodic_task(schedule, sig, args=(), kwargs=(), name=None, **opts)[исходный код]

Add a periodic task to beat schedule.

Celery beat store tasks based on sig or name if provided. Adding the same signature twice make the second task override the first one. To avoid the override, use distinct name for them.

setup_security(allowed_serializers=None, key=None, key_password=None, cert=None, store=None, digest='sha256', serializer='json')[исходный код]

Setup the message-signing serializer.

This will affect all application instances (a global operation).

Disables untrusted serializers and if configured to use the auth serializer will register the auth serializer with the provided settings into the Kombu serializer registry.

Параметры:
  • allowed_serializers (Set[str]) – List of serializer names, or content_types that should be exempt from being disabled.

  • key (str) – Name of private key file to use. Defaults to the security_key setting.

  • key_password (bytes) – Password to decrypt the private key. Defaults to the security_key_password setting.

  • cert (str) – Name of certificate file to use. Defaults to the security_certificate setting.

  • store (str) – Directory containing certificates. Defaults to the security_cert_store setting.

  • digest (str) – Digest algorithm used when signing messages. Default is sha256.

  • serializer (str) – Serializer used to encode messages after they’ve been signed. See task_serializer for the serializers supported. Default is json.

task(*args, **opts)[исходный код]

Decorator to create a task class out of any callable.

See Task options for a list of the arguments that can be passed to this decorator.

Примеры

@app.task
def refresh_feed(url):
    store_feed(feedparser.parse(url))

with setting extra options:

@app.task(exchange='feeds')
def refresh_feed(url):
    return store_feed(feedparser.parse(url))

Примечание

App Binding: For custom apps the task decorator will return a proxy object, so that the act of creating the task is not performed until the task is used or the task registry is accessed.

If you’re depending on binding to be deferred, then you must not access any attributes on the returned object until the application is fully set up (finalized).

send_task(name, args=None, kwargs=None, countdown=None, eta=None, task_id=None, producer=None, connection=None, router=None, result_cls=None, expires=None, publisher=None, link=None, link_error=None, add_to_parent=True, group_id=None, group_index=None, retries=0, chord=None, reply_to=None, time_limit=None, soft_time_limit=None, root_id=None, parent_id=None, route_name=None, shadow=None, chain=None, task_type=None, **options)[исходный код]

Send task by name.

Supports the same arguments as Task.apply_async().

Параметры:
  • name (str) – Name of task to call (e.g., «tasks.add»).

  • result_cls (AsyncResult) – Specify custom result class.

gen_task_name(name, module)[исходный код]
AsyncResult

Create new result instance.

См.также

celery.result.AsyncResult.

GroupResult

Create new group result instance.

См.также

celery.result.GroupResult.

Worker

Worker application.

См.также

Worker.

WorkController

Embeddable worker.

См.также

WorkController.

Beat

celery beat scheduler application.

См.также

Beat.

connection_for_read(url=None, **kwargs)[исходный код]

Establish connection used for consuming.

См.также

connection() for supported arguments.

connection_for_write(url=None, **kwargs)[исходный код]

Establish connection used for producing.

См.также

connection() for supported arguments.

connection(hostname=None, userid=None, password=None, virtual_host=None, port=None, ssl=None, connect_timeout=None, transport=None, transport_options=None, heartbeat=None, login_method=None, failover_strategy=None, **kwargs)[исходный код]

Establish a connection to the message broker.

Please use connection_for_read() and connection_for_write() instead, to convey the intent of use for this connection.

Параметры:
  • url – Either the URL or the hostname of the broker to use.

  • hostname (str) – URL, Hostname/IP-address of the broker. If a URL is used, then the other argument below will be taken from the URL instead.

  • userid (str) – Username to authenticate as.

  • password (str) – Password to authenticate with

  • virtual_host (str) – Virtual host to use (domain).

  • port (int) – Port to connect to.

  • ssl (bool, Dict) – Defaults to the broker_use_ssl setting.

  • transport (str) – defaults to the broker_transport setting.

  • transport_options (Dict) – Dictionary of transport specific options.

  • heartbeat (int) – AMQP Heartbeat in seconds (pyamqp only).

  • login_method (str) – Custom login method to use (AMQP only).

  • failover_strategy (str, Callable) – Custom failover strategy.

  • **kwargs – Additional arguments to kombu.Connection.

Результат:

the lazy connection instance.

Тип результата:

kombu.Connection

connection_or_acquire(connection=None, pool=True, *_, **__)[исходный код]

Context used to acquire a connection from the pool.

For use within a with statement to get a connection from the pool if one is not already provided.

Параметры:

connection (kombu.Connection) – If not provided, a connection will be acquired from the connection pool.

producer_or_acquire(producer=None)[исходный код]

Context used to acquire a producer from the pool.

For use within a with statement to get a producer from the pool if one is not already provided

Параметры:

producer (kombu.Producer) – If not provided, a producer will be acquired from the producer pool.

select_queues(queues=None)[исходный код]

Select subset of queues.

Параметры:

queues (Sequence[str]) – a list of queue names to keep.

now()[исходный код]

Return the current time and date as a datetime.

set_current()[исходный код]

Make this the current app for this thread.

set_default()[исходный код]

Make this the default app for all threads.

finalize(auto=False)[исходный код]

Finalize the app.

This loads built-in tasks, evaluates pending task decorators, reads configuration, etc.

on_init()[исходный код]

Optional callback called at init.

prepare_config(c)[исходный код]

Prepare configuration before it is merged with the defaults.

on_configure

Signal sent when app is loading configuration.

on_after_configure

Signal sent after app has prepared the configuration.

on_after_finalize

Signal sent after app has been finalized.

on_after_fork

Signal sent in child process after fork.

Примитивы холста

Подробнее о создании рабочих потоков задач см. в Холст: Проектирование рабочих потоков.

class celery.group(*tasks, **options)[исходный код]

Creates a group of tasks to be executed in parallel.

A group is lazy so you must call it to take action and evaluate the group.

Примечание

If only one argument is passed, and that argument is an iterable then that’ll be used as the list of tasks instead: this allows us to use group with generator expressions.

Пример

>>> lazy_group = group([add.s(2, 2), add.s(4, 4)])
>>> promise = lazy_group()  # <-- evaluate: returns lazy result.
>>> promise.get()  # <-- will wait for the task to return
[4, 8]
Параметры:
  • *tasks (List[Signature]) – A list of signatures that this group will call. If there’s only one argument, and that argument is an iterable, then that’ll define the list of signatures instead.

  • **options (Any) – Execution options applied to all tasks in the group.

Результат:

signature that when called will then call all of the

tasks in the group (and return a GroupResult instance that can be used to inspect the state of the group).

Тип результата:

group

class celery.chain(*tasks, **kwargs)[исходный код]

Chain tasks together.

Each tasks follows one another, by being applied as a callback of the previous task.

Примечание

If called with only one argument, then that argument must be an iterable of tasks to chain: this allows us to use generator expressions.

Пример

This is effectively ((2 + 2) + 4):

>>> res = chain(add.s(2, 2), add.s(4))()
>>> res.get()
8

Calling a chain will return the result of the last task in the chain. You can get to the other tasks by following the result.parent’s:

>>> res.parent.get()
4

Using a generator expression:

>>> lazy_chain = chain(add.s(i) for i in range(10))
>>> res = lazy_chain(3)
Параметры:

*tasks (Signature) – List of task signatures to chain. If only one argument is passed and that argument is an iterable, then that’ll be used as the list of signatures to chain instead. This means that you can use a generator expression.

Результат:

A lazy signature that can be called to apply the first

task in the chain. When that task succeeds the next task in the chain is applied, and so on.

Тип результата:

chain

celery.chord

alias of _chord

celery.signature(varies, *args, **kwargs)[исходный код]

Create new signature.

  • if the first argument is a signature already then it’s cloned.

  • if the first argument is a dict, then a Signature version is returned.

Результат:

The resulting signature.

Тип результата:

Signature

class celery.Signature(task=None, args=None, kwargs=None, options=None, type=None, subtask_type=None, immutable=False, app=None, **ex)[исходный код]

Task Signature.

Class that wraps the arguments and execution options for a single task invocation.

Used as the parts in a group and other constructs, or to pass tasks around as callbacks while being compatible with serializers with a strict type subset.

Signatures can also be created from tasks:

  • Using the .signature() method that has the same signature as Task.apply_async:

    >>> add.signature(args=(1,), kwargs={'kw': 2}, options={})
    
  • or the .s() shortcut that works for star arguments:

    >>> add.s(1, kw=2)
    
  • the .s() shortcut does not allow you to specify execution options but there’s a chaining .set method that returns the signature:

    >>> add.s(2, 2).set(countdown=10).set(expires=30).delay()
    

Примечание

You should use signature() to create new signatures. The Signature class is the type returned by that function and should be used for isinstance checks for signatures.

Параметры:
  • task (Union[Type[celery.app.task.Task], str]) – Either a task class/instance, or the name of a task.

  • args (Tuple) – Positional arguments to apply.

  • kwargs (Dict) – Keyword arguments to apply.

  • options (Dict) – Additional options to Task.apply_async().

Примечание

If the first argument is a dict, the other arguments will be ignored and the values in the dict will be used instead:

>>> s = signature('tasks.add', args=(2, 2))
>>> signature(s)
{'task': 'tasks.add', args=(2, 2), kwargs={}, options={}}

Прокси

celery.current_app

Текущее установленное приложение для этого потока.

celery.current_task

Задача, выполняемая в данный момент (устанавливается только в рабочем или при использовании eager/apply).

Вернуться на верх