"""Logging utilities."""
import logging
import numbers
import os
import sys
import threading
import traceback
from contextlib import contextmanager
from kombu.log import LOG_LEVELS
from kombu.log import get_logger as _get_logger
from kombu.utils.encoding import safe_str
from .term import colored
__all__ = (
'ColorFormatter', 'LoggingProxy', 'base_logger',
'set_in_sighandler', 'in_sighandler', 'get_logger',
'get_task_logger', 'mlevel',
'get_multiprocessing_logger', 'reset_multiprocessing_logger', 'LOG_LEVELS'
)
_process_aware = False
_in_sighandler = False
MP_LOG = os.environ.get('MP_LOG', False)
RESERVED_LOGGER_NAMES = {'celery', 'celery.task'}
# Sets up our logging hierarchy.
#
# Every logger in the celery package inherits from the "celery"
# logger, and every task logger inherits from the "celery.task"
# logger.
base_logger = logger = _get_logger('celery')
[docs]def set_in_sighandler(value):
"""Set flag signifiying that we're inside a signal handler."""
global _in_sighandler
_in_sighandler = value
def iter_open_logger_fds():
seen = set()
loggers = (list(logging.Logger.manager.loggerDict.values()) +
[logging.getLogger(None)])
for l in loggers:
try:
for handler in l.handlers:
try:
if handler not in seen: # pragma: no cover
yield handler.stream
seen.add(handler)
except AttributeError:
pass
except AttributeError: # PlaceHolder does not have handlers
pass
[docs]@contextmanager
def in_sighandler():
"""Context that records that we are in a signal handler."""
set_in_sighandler(True)
try:
yield
finally:
set_in_sighandler(False)
def logger_isa(l, p, max=1000):
this, seen = l, set()
for _ in range(max):
if this == p:
return True
else:
if this in seen:
raise RuntimeError(
f'Logger {l.name!r} parents recursive',
)
seen.add(this)
this = this.parent
if not this:
break
else: # pragma: no cover
raise RuntimeError(f'Logger hierarchy exceeds {max}')
return False
def _using_logger_parent(parent_logger, logger_):
if not logger_isa(logger_, parent_logger):
logger_.parent = parent_logger
return logger_
[docs]def get_logger(name):
"""Get logger by name."""
l = _get_logger(name)
if logging.root not in (l, l.parent) and l is not base_logger:
l = _using_logger_parent(base_logger, l)
return l
task_logger = get_logger('celery.task')
worker_logger = get_logger('celery.worker')
[docs]def get_task_logger(name):
"""Get logger for task module by name."""
if name in RESERVED_LOGGER_NAMES:
raise RuntimeError(f'Logger name {name!r} is reserved!')
return _using_logger_parent(task_logger, get_logger(name))
[docs]def mlevel(level):
"""Convert level name/int to log level."""
if level and not isinstance(level, numbers.Integral):
return LOG_LEVELS[level.upper()]
return level
[docs]class LoggingProxy:
"""Forward file object to :class:`logging.Logger` instance.
Arguments:
logger (~logging.Logger): Logger instance to forward to.
loglevel (int, str): Log level to use when logging messages.
"""
mode = 'w'
name = None
closed = False
loglevel = logging.ERROR
_thread = threading.local()
def __init__(self, logger, loglevel=None):
# pylint: disable=redefined-outer-name
# Note that the logger global is redefined here, be careful changing.
self.logger = logger
self.loglevel = mlevel(loglevel or self.logger.level or self.loglevel)
self._safewrap_handlers()
def _safewrap_handlers(self):
# Make the logger handlers dump internal errors to
# :data:`sys.__stderr__` instead of :data:`sys.stderr` to circumvent
# infinite loops.
def wrap_handler(handler): # pragma: no cover
class WithSafeHandleError(logging.Handler):
def handleError(self, record):
try:
traceback.print_exc(None, sys.__stderr__)
except OSError:
pass # see python issue 5971
handler.handleError = WithSafeHandleError().handleError
return [wrap_handler(h) for h in self.logger.handlers]
[docs] def write(self, data):
# type: (AnyStr) -> int
"""Write message to logging object."""
if _in_sighandler:
safe_data = safe_str(data)
print(safe_data, file=sys.__stderr__)
return len(safe_data)
if getattr(self._thread, 'recurse_protection', False):
# Logger is logging back to this file, so stop recursing.
return 0
if data and not self.closed:
self._thread.recurse_protection = True
try:
safe_data = safe_str(data)
self.logger.log(self.loglevel, safe_data)
return len(safe_data)
finally:
self._thread.recurse_protection = False
return 0
[docs] def writelines(self, sequence):
# type: (Sequence[str]) -> None
"""Write list of strings to file.
The sequence can be any iterable object producing strings.
This is equivalent to calling :meth:`write` for each string.
"""
for part in sequence:
self.write(part)
[docs] def flush(self):
# This object is not buffered so any :meth:`flush`
# requests are ignored.
pass
[docs] def close(self):
# when the object is closed, no write requests are
# forwarded to the logging object anymore.
self.closed = True
[docs] def isatty(self):
"""Here for file support."""
return False
[docs]def get_multiprocessing_logger():
"""Return the multiprocessing logger."""
try:
from billiard import util
except ImportError: # pragma: no cover
pass
else:
return util.get_logger()
[docs]def reset_multiprocessing_logger():
"""Reset multiprocessing logging setup."""
try:
from billiard import util
except ImportError: # pragma: no cover
pass
else:
if hasattr(util, '_logger'): # pragma: no cover
util._logger = None
def current_process():
try:
from billiard import process
except ImportError: # pragma: no cover
pass
else:
return process.current_process()
def current_process_index(base=1):
index = getattr(current_process(), 'index', None)
return index + base if index is not None else index