Source code for celery.concurrency.base

"""Base Execution Pool."""
import logging
import os
import sys
import time

from billiard.einfo import ExceptionInfo
from billiard.exceptions import WorkerLostError
from kombu.utils.encoding import safe_repr

from celery.exceptions import WorkerShutdown, WorkerTerminate, reraise
from celery.utils import timer2
from celery.utils.log import get_logger
from celery.utils.text import truncate

__all__ = ('BasePool', 'apply_target')

logger = get_logger('celery.pool')


[docs]def apply_target(target, args=(), kwargs=None, callback=None, accept_callback=None, pid=None, getpid=os.getpid, propagate=(), monotonic=time.monotonic, **_): """Apply function within pool context.""" kwargs = {} if not kwargs else kwargs if accept_callback: accept_callback(pid or getpid(), monotonic()) try: ret = target(*args, **kwargs) except propagate: raise except Exception: raise except (WorkerShutdown, WorkerTerminate): raise except BaseException as exc: try: reraise(WorkerLostError, WorkerLostError(repr(exc)), sys.exc_info()[2]) except WorkerLostError: callback(ExceptionInfo()) else: callback(ret)
[docs]class BasePool: """Task pool.""" RUN = 0x1 CLOSE = 0x2 TERMINATE = 0x3 Timer = timer2.Timer #: set to true if the pool can be shutdown from within #: a signal handler. signal_safe = True #: set to true if pool uses greenlets. is_green = False _state = None _pool = None _does_debug = True #: only used by multiprocessing pool uses_semaphore = False task_join_will_block = True body_can_be_buffer = False def __init__(self, limit=None, putlocks=True, forking_enable=True, callbacks_propagate=(), app=None, **options): self.limit = limit self.putlocks = putlocks self.options = options self.forking_enable = forking_enable self.callbacks_propagate = callbacks_propagate self.app = app
[docs] def on_start(self): pass
[docs] def did_start_ok(self): return True
[docs] def flush(self): pass
[docs] def on_stop(self): pass
[docs] def register_with_event_loop(self, loop): pass
[docs] def on_apply(self, *args, **kwargs): pass
[docs] def on_terminate(self): pass
[docs] def on_soft_timeout(self, job): pass
[docs] def on_hard_timeout(self, job): pass
[docs] def maintain_pool(self, *args, **kwargs): pass
[docs] def terminate_job(self, pid, signal=None): raise NotImplementedError( f'{type(self)} does not implement kill_job')
[docs] def restart(self): raise NotImplementedError( f'{type(self)} does not implement restart')
[docs] def stop(self): self.on_stop() self._state = self.TERMINATE
[docs] def terminate(self): self._state = self.TERMINATE self.on_terminate()
[docs] def start(self): self._does_debug = logger.isEnabledFor(logging.DEBUG) self.on_start() self._state = self.RUN
[docs] def close(self): self._state = self.CLOSE self.on_close()
[docs] def on_close(self): pass
[docs] def apply_async(self, target, args=None, kwargs=None, **options): """Equivalent of the :func:`apply` built-in function. Callbacks should optimally return as soon as possible since otherwise the thread which handles the result will get blocked. """ kwargs = {} if not kwargs else kwargs args = [] if not args else args if self._does_debug: logger.debug('TaskPool: Apply %s (args:%s kwargs:%s)', target, truncate(safe_repr(args), 1024), truncate(safe_repr(kwargs), 1024)) return self.on_apply(target, args, kwargs, waitforslot=self.putlocks, callbacks_propagate=self.callbacks_propagate, **options)
def _get_info(self): return { 'max-concurrency': self.limit, } @property def info(self): return self._get_info() @property def active(self): return self._state == self.RUN @property def num_processes(self): return self.limit
Back to Top