Source code for celery.worker.pidbox
"""Worker Pidbox (remote control)."""
import socket
import threading
from kombu.common import ignore_errors
from kombu.utils.encoding import safe_str
from celery.utils.collections import AttributeDict
from celery.utils.functional import pass1
from celery.utils.log import get_logger
from . import control
__all__ = ('Pidbox', 'gPidbox')
logger = get_logger(__name__)
debug, error, info = logger.debug, logger.error, logger.info
[docs]class Pidbox:
"""Worker mailbox."""
consumer = None
def __init__(self, c):
self.c = c
self.hostname = c.hostname
self.node = c.app.control.mailbox.Node(
safe_str(c.hostname),
handlers=control.Panel.data,
state=AttributeDict(
app=c.app,
hostname=c.hostname,
consumer=c,
tset=pass1 if c.controller.use_eventloop else set),
)
self._forward_clock = self.c.app.clock.forward
[docs] def on_message(self, body, message):
# just increase clock as clients usually don't
# have a valid clock to adjust with.
self._forward_clock()
try:
self.node.handle_message(body, message)
except KeyError as exc:
error('No such control command: %s', exc)
except Exception as exc:
error('Control command error: %r', exc, exc_info=True)
self.reset()
[docs] def start(self, c):
self.node.channel = c.connection.channel()
self.consumer = self.node.listen(callback=self.on_message)
self.consumer.on_decode_error = c.on_decode_error
[docs] def on_stop(self):
pass
[docs] def stop(self, c):
self.on_stop()
self.consumer = self._close_channel(c)
[docs] def reset(self):
self.stop(self.c)
self.start(self.c)
def _close_channel(self, c):
if self.node and self.node.channel:
ignore_errors(c, self.node.channel.close)
[docs] def shutdown(self, c):
self.on_stop()
if self.consumer:
debug('Canceling broadcast consumer...')
ignore_errors(c, self.consumer.cancel)
self.stop(self.c)
[docs]class gPidbox(Pidbox):
"""Worker pidbox (greenlet)."""
_node_shutdown = None
_node_stopped = None
_resets = 0
[docs] def start(self, c):
c.pool.spawn_n(self.loop, c)
[docs] def on_stop(self):
if self._node_stopped:
self._node_shutdown.set()
debug('Waiting for broadcast thread to shutdown...')
self._node_stopped.wait()
self._node_stopped = self._node_shutdown = None
[docs] def reset(self):
self._resets += 1
def _do_reset(self, c, connection):
self._close_channel(c)
self.node.channel = connection.channel()
self.consumer = self.node.listen(callback=self.on_message)
self.consumer.consume()
[docs] def loop(self, c):
resets = [self._resets]
shutdown = self._node_shutdown = threading.Event()
stopped = self._node_stopped = threading.Event()
try:
with c.connection_for_read() as connection:
info('pidbox: Connected to %s.', connection.as_uri())
self._do_reset(c, connection)
while not shutdown.is_set() and c.connection:
if resets[0] < self._resets:
resets[0] += 1
self._do_reset(c, connection)
try:
connection.drain_events(timeout=1.0)
except socket.timeout:
pass
finally:
stopped.set()