celery.contrib.testing.worker

Справочник по API

Embedded workers for integration tests.

class celery.contrib.testing.worker.TestWorkController(*args: Any, **kwargs: Any)[исходный код]

Worker that can synchronize on being fully started.

ensure_started() None[исходный код]

Wait for worker to be fully up and running.

Предупреждение

Worker must be started within a thread for this to work, or it will block forever.

on_consumer_ready(consumer: Consumer) None[исходный код]

Callback called when the Consumer blueprint is fully started.

celery.contrib.testing.worker.setup_app_for_worker(app: Celery, loglevel: Union[str, int], logfile: str) None[исходный код]

Setup the app to be used for starting an embedded worker.

celery.contrib.testing.worker.start_worker(app: Celery, concurrency: int = 1, pool: str = 'solo', loglevel: Union[str, int] = 'error', logfile: str = None, perform_ping_check: bool = True, ping_task_timeout: float = 10.0, shutdown_timeout: float = 10.0, **kwargs: Any) Iterable[исходный код]

Start embedded worker.

Yields:

celery.app.worker.Worker – worker instance.

Вернуться на верх