Source code for celery.worker.consumer.tasks

"""Worker Task Consumer Bootstep."""
from kombu.common import QoS, ignore_errors

from celery import bootsteps
from celery.utils.log import get_logger

from .mingle import Mingle

__all__ = ('Tasks',)

logger = get_logger(__name__)
debug = logger.debug


[docs]class Tasks(bootsteps.StartStopStep): """Bootstep starting the task message consumer.""" requires = (Mingle,) def __init__(self, c, **kwargs): c.task_consumer = c.qos = None super().__init__(c, **kwargs)
[docs] def start(self, c): """Start task consumer.""" c.update_strategies() # - RabbitMQ 3.3 completely redefines how basic_qos works.. # This will detect if the new qos smenatics is in effect, # and if so make sure the 'apply_global' flag is set on qos updates. qos_global = not c.connection.qos_semantics_matches_spec # set initial prefetch count c.connection.default_channel.basic_qos( 0, c.initial_prefetch_count, qos_global, ) c.task_consumer = c.app.amqp.TaskConsumer( c.connection, on_decode_error=c.on_decode_error, ) def set_prefetch_count(prefetch_count): return c.task_consumer.qos( prefetch_count=prefetch_count, apply_global=qos_global, ) c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)
[docs] def stop(self, c): """Stop task consumer.""" if c.task_consumer: debug('Canceling task consumer...') ignore_errors(c, c.task_consumer.cancel)
[docs] def shutdown(self, c): """Shutdown task consumer.""" if c.task_consumer: self.stop(c) debug('Closing consumer channel...') ignore_errors(c, c.task_consumer.close) c.task_consumer = None
[docs] def info(self, c): """Return task consumer info.""" return {'prefetch_count': c.qos.value if c.qos else 'N/A'}
Back to Top