Source code for celery.utils.log

"""Logging utilities."""
import logging
import numbers
import os
import sys
import threading
import traceback
from contextlib import contextmanager

from kombu.log import LOG_LEVELS
from kombu.log import get_logger as _get_logger
from kombu.utils.encoding import safe_str

from .term import colored

__all__ = (
    'ColorFormatter', 'LoggingProxy', 'base_logger',
    'set_in_sighandler', 'in_sighandler', 'get_logger',
    'get_task_logger', 'mlevel',
    'get_multiprocessing_logger', 'reset_multiprocessing_logger', 'LOG_LEVELS'
)

_process_aware = False
_in_sighandler = False

MP_LOG = os.environ.get('MP_LOG', False)

RESERVED_LOGGER_NAMES = {'celery', 'celery.task'}

# Sets up our logging hierarchy.
#
# Every logger in the celery package inherits from the "celery"
# logger, and every task logger inherits from the "celery.task"
# logger.
base_logger = logger = _get_logger('celery')


[docs]def set_in_sighandler(value): """Set flag signifiying that we're inside a signal handler.""" global _in_sighandler _in_sighandler = value
def iter_open_logger_fds(): seen = set() loggers = (list(logging.Logger.manager.loggerDict.values()) + [logging.getLogger(None)]) for l in loggers: try: for handler in l.handlers: try: if handler not in seen: # pragma: no cover yield handler.stream seen.add(handler) except AttributeError: pass except AttributeError: # PlaceHolder does not have handlers pass
[docs]@contextmanager def in_sighandler(): """Context that records that we are in a signal handler.""" set_in_sighandler(True) try: yield finally: set_in_sighandler(False)
def logger_isa(l, p, max=1000): this, seen = l, set() for _ in range(max): if this == p: return True else: if this in seen: raise RuntimeError( f'Logger {l.name!r} parents recursive', ) seen.add(this) this = this.parent if not this: break else: # pragma: no cover raise RuntimeError(f'Logger hierarchy exceeds {max}') return False def _using_logger_parent(parent_logger, logger_): if not logger_isa(logger_, parent_logger): logger_.parent = parent_logger return logger_
[docs]def get_logger(name): """Get logger by name.""" l = _get_logger(name) if logging.root not in (l, l.parent) and l is not base_logger: l = _using_logger_parent(base_logger, l) return l
task_logger = get_logger('celery.task') worker_logger = get_logger('celery.worker')
[docs]def get_task_logger(name): """Get logger for task module by name.""" if name in RESERVED_LOGGER_NAMES: raise RuntimeError(f'Logger name {name!r} is reserved!') return _using_logger_parent(task_logger, get_logger(name))
[docs]def mlevel(level): """Convert level name/int to log level.""" if level and not isinstance(level, numbers.Integral): return LOG_LEVELS[level.upper()] return level
[docs]class ColorFormatter(logging.Formatter): """Logging formatter that adds colors based on severity.""" #: Loglevel -> Color mapping. COLORS = colored().names colors = { 'DEBUG': COLORS['blue'], 'WARNING': COLORS['yellow'], 'ERROR': COLORS['red'], 'CRITICAL': COLORS['magenta'], } def __init__(self, fmt=None, use_color=True): logging.Formatter.__init__(self, fmt) self.use_color = use_color
[docs] def formatException(self, ei): if ei and not isinstance(ei, tuple): ei = sys.exc_info() r = logging.Formatter.formatException(self, ei) return r
[docs] def format(self, record): msg = logging.Formatter.format(self, record) color = self.colors.get(record.levelname) # reset exception info later for other handlers... einfo = sys.exc_info() if record.exc_info == 1 else record.exc_info if color and self.use_color: try: # safe_str will repr the color object # and color will break on non-string objects # so need to reorder calls based on type. # Issue #427 try: if isinstance(msg, str): return str(color(safe_str(msg))) return safe_str(color(msg)) except UnicodeDecodeError: # pragma: no cover return safe_str(msg) # skip colors except Exception as exc: # pylint: disable=broad-except prev_msg, record.exc_info, record.msg = ( record.msg, 1, '<Unrepresentable {!r}: {!r}>'.format( type(msg), exc ), ) try: return logging.Formatter.format(self, record) finally: record.msg, record.exc_info = prev_msg, einfo else: return safe_str(msg)
[docs]class LoggingProxy: """Forward file object to :class:`logging.Logger` instance. Arguments: logger (~logging.Logger): Logger instance to forward to. loglevel (int, str): Log level to use when logging messages. """ mode = 'w' name = None closed = False loglevel = logging.ERROR _thread = threading.local() def __init__(self, logger, loglevel=None): # pylint: disable=redefined-outer-name # Note that the logger global is redefined here, be careful changing. self.logger = logger self.loglevel = mlevel(loglevel or self.logger.level or self.loglevel) self._safewrap_handlers() def _safewrap_handlers(self): # Make the logger handlers dump internal errors to # :data:`sys.__stderr__` instead of :data:`sys.stderr` to circumvent # infinite loops. def wrap_handler(handler): # pragma: no cover class WithSafeHandleError(logging.Handler): def handleError(self, record): try: traceback.print_exc(None, sys.__stderr__) except OSError: pass # see python issue 5971 handler.handleError = WithSafeHandleError().handleError return [wrap_handler(h) for h in self.logger.handlers]
[docs] def write(self, data): # type: (AnyStr) -> int """Write message to logging object.""" if _in_sighandler: safe_data = safe_str(data) print(safe_data, file=sys.__stderr__) return len(safe_data) if getattr(self._thread, 'recurse_protection', False): # Logger is logging back to this file, so stop recursing. return 0 if data and not self.closed: self._thread.recurse_protection = True try: safe_data = safe_str(data) self.logger.log(self.loglevel, safe_data) return len(safe_data) finally: self._thread.recurse_protection = False return 0
[docs] def writelines(self, sequence): # type: (Sequence[str]) -> None """Write list of strings to file. The sequence can be any iterable object producing strings. This is equivalent to calling :meth:`write` for each string. """ for part in sequence: self.write(part)
[docs] def flush(self): # This object is not buffered so any :meth:`flush` # requests are ignored. pass
[docs] def close(self): # when the object is closed, no write requests are # forwarded to the logging object anymore. self.closed = True
[docs] def isatty(self): """Here for file support.""" return False
[docs]def get_multiprocessing_logger(): """Return the multiprocessing logger.""" try: from billiard import util except ImportError: # pragma: no cover pass else: return util.get_logger()
[docs]def reset_multiprocessing_logger(): """Reset multiprocessing logging setup.""" try: from billiard import util except ImportError: # pragma: no cover pass else: if hasattr(util, '_logger'): # pragma: no cover util._logger = None
def current_process(): try: from billiard import process except ImportError: # pragma: no cover pass else: return process.current_process() def current_process_index(base=1): index = getattr(current_process(), 'index', None) return index + base if index is not None else index
Back to Top