Source code for celery.contrib.testing.worker

"""Embedded workers for integration tests."""
import os
import threading
from contextlib import contextmanager

from celery import worker
from celery.result import _set_task_join_will_block, allow_join_result
from celery.utils.dispatch import Signal
from celery.utils.nodenames import anon_nodename

WORKER_LOGLEVEL = os.environ.get('WORKER_LOGLEVEL', 'error')

test_worker_starting = Signal(
    name='test_worker_starting',
    providing_args={},
)
test_worker_started = Signal(
    name='test_worker_started',
    providing_args={'worker', 'consumer'},
)
test_worker_stopped = Signal(
    name='test_worker_stopped',
    providing_args={'worker'},
)


[docs]class TestWorkController(worker.WorkController): """Worker that can synchronize on being fully started.""" def __init__(self, *args, **kwargs): # type: (*Any, **Any) -> None self._on_started = threading.Event() super().__init__(*args, **kwargs)
[docs] def on_consumer_ready(self, consumer): # type: (celery.worker.consumer.Consumer) -> None """Callback called when the Consumer blueprint is fully started.""" self._on_started.set() test_worker_started.send( sender=self.app, worker=self, consumer=consumer)
[docs] def ensure_started(self): # type: () -> None """Wait for worker to be fully up and running. Warning: Worker must be started within a thread for this to work, or it will block forever. """ self._on_started.wait()
[docs]@contextmanager def start_worker( app, # type: Celery concurrency=1, # type: int pool='solo', # type: str loglevel=WORKER_LOGLEVEL, # type: Union[str, int] logfile=None, # type: str perform_ping_check=True, # type: bool ping_task_timeout=10.0, # type: float shutdown_timeout=10.0, # type: float **kwargs # type: Any ): # type: (...) -> Iterable """Start embedded worker. Yields: celery.app.worker.Worker: worker instance. """ test_worker_starting.send(sender=app) with _start_worker_thread(app, concurrency=concurrency, pool=pool, loglevel=loglevel, logfile=logfile, perform_ping_check=perform_ping_check, shutdown_timeout=shutdown_timeout, **kwargs) as worker: if perform_ping_check: from .tasks import ping with allow_join_result(): assert ping.delay().get(timeout=ping_task_timeout) == 'pong' yield worker test_worker_stopped.send(sender=app, worker=worker)
@contextmanager def _start_worker_thread(app, concurrency=1, pool='solo', loglevel=WORKER_LOGLEVEL, logfile=None, WorkController=TestWorkController, perform_ping_check=True, shutdown_timeout=10.0, **kwargs): # type: (Celery, int, str, Union[str, int], str, Any, **Any) -> Iterable """Start Celery worker in a thread. Yields: celery.worker.Worker: worker instance. """ setup_app_for_worker(app, loglevel, logfile) if perform_ping_check: assert 'celery.ping' in app.tasks # Make sure we can connect to the broker with app.connection(hostname=os.environ.get('TEST_BROKER')) as conn: conn.default_channel.queue_declare worker = WorkController( app=app, concurrency=concurrency, hostname=anon_nodename(), pool=pool, loglevel=loglevel, logfile=logfile, # not allowed to override TestWorkController.on_consumer_ready ready_callback=None, without_heartbeat=kwargs.pop("without_heartbeat", True), without_mingle=True, without_gossip=True, **kwargs) t = threading.Thread(target=worker.start, daemon=True) t.start() worker.ensure_started() _set_task_join_will_block(False) yield worker from celery.worker import state state.should_terminate = 0 t.join(shutdown_timeout) if t.is_alive(): raise RuntimeError( "Worker thread failed to exit within the allocated timeout. " "Consider raising `shutdown_timeout` if your tasks take longer " "to execute." ) state.should_terminate = None @contextmanager def _start_worker_process(app, concurrency=1, pool='solo', loglevel=WORKER_LOGLEVEL, logfile=None, **kwargs): # type (Celery, int, str, Union[int, str], str, **Any) -> Iterable """Start worker in separate process. Yields: celery.app.worker.Worker: worker instance. """ from celery.apps.multi import Cluster, Node app.set_current() cluster = Cluster([Node('testworker1@%h')]) cluster.start() yield cluster.stopwait()
[docs]def setup_app_for_worker(app, loglevel, logfile): # type: (Celery, Union[str, int], str) -> None """Setup the app to be used for starting an embedded worker.""" app.finalize() app.set_current() app.set_default() type(app.log)._setup = False app.log.setup(loglevel=loglevel, logfile=logfile)
Back to Top