Source code for celery.bootsteps

"""A directed acyclic graph of reusable components."""

from collections import deque
from threading import Event

from kombu.common import ignore_errors
from kombu.utils.encoding import bytes_to_str
from kombu.utils.imports import symbol_by_name

from .utils.graph import DependencyGraph, GraphFormatter
from .utils.imports import instantiate, qualname
from .utils.log import get_logger

try:
    from greenlet import GreenletExit
except ImportError:  # pragma: no cover
    IGNORE_ERRORS = ()
else:
    IGNORE_ERRORS = (GreenletExit,)

__all__ = ('Blueprint', 'Step', 'StartStopStep', 'ConsumerStep')

#: States
RUN = 0x1
CLOSE = 0x2
TERMINATE = 0x3

logger = get_logger(__name__)


def _pre(ns, fmt):
    return f'| {ns.alias}: {fmt}'


def _label(s):
    return s.name.rsplit('.', 1)[-1]


class StepFormatter(GraphFormatter):
    """Graph formatter for :class:`Blueprint`."""

    blueprint_prefix = '⧉'
    conditional_prefix = '∘'
    blueprint_scheme = {
        'shape': 'parallelogram',
        'color': 'slategray4',
        'fillcolor': 'slategray3',
    }

    def label(self, step):
        return step and '{}{}'.format(
            self._get_prefix(step),
            bytes_to_str(
                (step.label or _label(step)).encode('utf-8', 'ignore')),
        )

    def _get_prefix(self, step):
        if step.last:
            return self.blueprint_prefix
        if step.conditional:
            return self.conditional_prefix
        return ''

    def node(self, obj, **attrs):
        scheme = self.blueprint_scheme if obj.last else self.node_scheme
        return self.draw_node(obj, scheme, attrs)

    def edge(self, a, b, **attrs):
        if a.last:
            attrs.update(arrowhead='none', color='darkseagreen3')
        return self.draw_edge(a, b, self.edge_scheme, attrs)


[docs]class Blueprint: """Blueprint containing bootsteps that can be applied to objects. Arguments: steps Sequence[Union[str, Step]]: List of steps. name (str): Set explicit name for this blueprint. on_start (Callable): Optional callback applied after blueprint start. on_close (Callable): Optional callback applied before blueprint close. on_stopped (Callable): Optional callback applied after blueprint stopped. """ GraphFormatter = StepFormatter name = None state = None started = 0 default_steps = set() state_to_name = { 0: 'initializing', RUN: 'running', CLOSE: 'closing', TERMINATE: 'terminating', } def __init__(self, steps=None, name=None, on_start=None, on_close=None, on_stopped=None): self.name = name or self.name or qualname(type(self)) self.types = set(steps or []) | set(self.default_steps) self.on_start = on_start self.on_close = on_close self.on_stopped = on_stopped self.shutdown_complete = Event() self.steps = {}
[docs] def start(self, parent): self.state = RUN if self.on_start: self.on_start() for i, step in enumerate(s for s in parent.steps if s is not None): self._debug('Starting %s', step.alias) self.started = i + 1 step.start(parent) logger.debug('^-- substep ok')
[docs] def human_state(self): return self.state_to_name[self.state or 0]
[docs] def info(self, parent): info = {} for step in parent.steps: info.update(step.info(parent) or {}) return info
[docs] def close(self, parent): if self.on_close: self.on_close() self.send_all(parent, 'close', 'closing', reverse=False)
[docs] def restart(self, parent, method='stop', description='restarting', propagate=False): self.send_all(parent, method, description, propagate=propagate)
[docs] def send_all(self, parent, method, description=None, reverse=True, propagate=True, args=()): description = description or method.replace('_', ' ') steps = reversed(parent.steps) if reverse else parent.steps for step in steps: if step: fun = getattr(step, method, None) if fun is not None: self._debug('%s %s...', description.capitalize(), step.alias) try: fun(parent, *args) except Exception as exc: # pylint: disable=broad-except if propagate: raise logger.exception( 'Error on %s %s: %r', description, step.alias, exc)
[docs] def stop(self, parent, close=True, terminate=False): what = 'terminating' if terminate else 'stopping' if self.state in (CLOSE, TERMINATE): return if self.state != RUN or self.started != len(parent.steps): # Not fully started, can safely exit. self.state = TERMINATE self.shutdown_complete.set() return self.close(parent) self.state = CLOSE self.restart( parent, 'terminate' if terminate else 'stop', description=what, propagate=False, ) if self.on_stopped: self.on_stopped() self.state = TERMINATE self.shutdown_complete.set()
[docs] def join(self, timeout=None): try: # Will only get here if running green, # makes sure all greenthreads have exited. self.shutdown_complete.wait(timeout=timeout) except IGNORE_ERRORS: pass
[docs] def apply(self, parent, **kwargs): """Apply the steps in this blueprint to an object. This will apply the ``__init__`` and ``include`` methods of each step, with the object as argument:: step = Step(obj) ... step.include(obj) For :class:`StartStopStep` the services created will also be added to the objects ``steps`` attribute. """ self._debug('Preparing bootsteps.') order = self.order = [] steps = self.steps = self.claim_steps() self._debug('Building graph...') for S in self._finalize_steps(steps): step = S(parent, **kwargs) steps[step.name] = step order.append(step) self._debug('New boot order: {%s}', ', '.join(s.alias for s in self.order)) for step in order: step.include(parent) return self
[docs] def connect_with(self, other): self.graph.adjacent.update(other.graph.adjacent) self.graph.add_edge(type(other.order[0]), type(self.order[-1]))
def __getitem__(self, name): return self.steps[name] def _find_last(self): return next((C for C in self.steps.values() if C.last), None) def _firstpass(self, steps): for step in steps.values(): step.requires = [symbol_by_name(dep) for dep in step.requires] stream = deque(step.requires for step in steps.values()) while stream: for node in stream.popleft(): node = symbol_by_name(node) if node.name not in self.steps: steps[node.name] = node stream.append(node.requires) def _finalize_steps(self, steps): last = self._find_last() self._firstpass(steps) it = ((C, C.requires) for C in steps.values()) G = self.graph = DependencyGraph( it, formatter=self.GraphFormatter(root=last), ) if last: for obj in G: if obj != last: G.add_edge(last, obj) try: return G.topsort() except KeyError as exc: raise KeyError('unknown bootstep: %s' % exc)
[docs] def claim_steps(self): return dict(self.load_step(step) for step in self.types)
[docs] def load_step(self, step): step = symbol_by_name(step) return step.name, step
def _debug(self, msg, *args): return logger.debug(_pre(self, msg), *args) @property def alias(self): return _label(self)
class StepType(type): """Meta-class for steps.""" name = None requires = None def __new__(cls, name, bases, attrs): module = attrs.get('__module__') qname = f'{module}.{name}' if module else name attrs.update( __qualname__=qname, name=attrs.get('name') or qname, ) return super().__new__(cls, name, bases, attrs) def __str__(cls): return cls.name def __repr__(cls): return 'step:{0.name}{{{0.requires!r}}}'.format(cls)
[docs]class Step(metaclass=StepType): """A Bootstep. The :meth:`__init__` method is called when the step is bound to a parent object, and can as such be used to initialize attributes in the parent object at parent instantiation-time. """ #: Optional step name, will use ``qualname`` if not specified. name = None #: Optional short name used for graph outputs and in logs. label = None #: Set this to true if the step is enabled based on some condition. conditional = False #: List of other steps that that must be started before this step. #: Note that all dependencies must be in the same blueprint. requires = () #: This flag is reserved for the workers Consumer, #: since it is required to always be started last. #: There can only be one object marked last #: in every blueprint. last = False #: This provides the default for :meth:`include_if`. enabled = True def __init__(self, parent, **kwargs): pass
[docs] def include_if(self, parent): """Return true if bootstep should be included. You can define this as an optional predicate that decides whether this step should be created. """ return self.enabled
[docs] def instantiate(self, name, *args, **kwargs): return instantiate(name, *args, **kwargs)
def _should_include(self, parent): if self.include_if(parent): return True, self.create(parent) return False, None
[docs] def include(self, parent): return self._should_include(parent)[0]
[docs] def create(self, parent): """Create the step."""
def __repr__(self): return f'<step: {self.alias}>' @property def alias(self): return self.label or _label(self)
[docs] def info(self, obj): pass
[docs]class StartStopStep(Step): """Bootstep that must be started and stopped in order.""" #: Optional obj created by the :meth:`create` method. #: This is used by :class:`StartStopStep` to keep the #: original service object. obj = None
[docs] def start(self, parent): if self.obj: return self.obj.start()
[docs] def stop(self, parent): if self.obj: return self.obj.stop()
[docs] def close(self, parent): pass
[docs] def terminate(self, parent): if self.obj: return getattr(self.obj, 'terminate', self.obj.stop)()
[docs] def include(self, parent): inc, ret = self._should_include(parent) if inc: self.obj = ret parent.steps.append(self) return inc
[docs]class ConsumerStep(StartStopStep): """Bootstep that starts a message consumer.""" requires = ('celery.worker.consumer:Connection',) consumers = None
[docs] def get_consumers(self, channel): raise NotImplementedError('missing get_consumers')
[docs] def start(self, c): channel = c.connection.channel() self.consumers = self.get_consumers(channel) for consumer in self.consumers or []: consumer.consume()
[docs] def stop(self, c): self._close(c, True)
[docs] def shutdown(self, c): self._close(c, False)
def _close(self, c, cancel_consumers=True): channels = set() for consumer in self.consumers or []: if cancel_consumers: ignore_errors(c.connection, consumer.cancel) if consumer.channel: channels.add(consumer.channel) for channel in channels: ignore_errors(c.connection, channel.close)
Back to Top