Source code for celery.worker.consumer.mingle

"""Worker <-> Worker Sync at startup (Bootstep)."""
from celery import bootsteps
from celery.utils.log import get_logger

from .events import Events

__all__ = ('Mingle',)

logger = get_logger(__name__)
debug, info, exception = logger.debug, logger.info, logger.exception


[docs]class Mingle(bootsteps.StartStopStep): """Bootstep syncing state with neighbor workers. At startup, or upon consumer restart, this will: - Sync logical clocks. - Sync revoked tasks. """ label = 'Mingle' requires = (Events,) compatible_transports = {'amqp', 'redis'} def __init__(self, c, without_mingle=False, **kwargs): self.enabled = not without_mingle and self.compatible_transport(c.app) super().__init__( c, without_mingle=without_mingle, **kwargs)
[docs] def compatible_transport(self, app): with app.connection_for_read() as conn: return conn.transport.driver_type in self.compatible_transports
[docs] def start(self, c): self.sync(c)
[docs] def sync(self, c): info('mingle: searching for neighbors') replies = self.send_hello(c) if replies: info('mingle: sync with %s nodes', len([reply for reply, value in replies.items() if value])) [self.on_node_reply(c, nodename, reply) for nodename, reply in replies.items() if reply] info('mingle: sync complete') else: info('mingle: all alone')
[docs] def send_hello(self, c): inspect = c.app.control.inspect(timeout=1.0, connection=c.connection) our_revoked = c.controller.state.revoked replies = inspect.hello(c.hostname, our_revoked._data) or {} replies.pop(c.hostname, None) # delete my own response return replies
[docs] def on_node_reply(self, c, nodename, reply): debug('mingle: processing reply from %s', nodename) try: self.sync_with_node(c, **reply) except MemoryError: raise except Exception as exc: # pylint: disable=broad-except exception('mingle: sync with %s failed: %r', nodename, exc)
[docs] def sync_with_node(self, c, clock=None, revoked=None, **kwargs): self.on_clock_event(c, clock) self.on_revoked_received(c, revoked)
[docs] def on_clock_event(self, c, clock): c.app.clock.adjust(clock) if clock else c.app.clock.forward()
[docs] def on_revoked_received(self, c, revoked): if revoked: c.controller.state.revoked.update(revoked)
Back to Top