Source code for celery.app.defaults

"""Configuration introspection and defaults."""
from collections import deque, namedtuple
from datetime import timedelta

from celery.utils.functional import memoize
from celery.utils.serialization import strtobool

__all__ = ('Option', 'NAMESPACES', 'flatten', 'find')


DEFAULT_POOL = 'prefork'

DEFAULT_ACCEPT_CONTENT = ['json']
DEFAULT_PROCESS_LOG_FMT = """
    [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
""".strip()
DEFAULT_TASK_LOG_FMT = """[%(asctime)s: %(levelname)s/%(processName)s] \
%(task_name)s[%(task_id)s]: %(message)s"""

DEFAULT_SECURITY_DIGEST = 'sha256'


OLD_NS = {'celery_{0}'}
OLD_NS_BEAT = {'celerybeat_{0}'}
OLD_NS_WORKER = {'celeryd_{0}'}

searchresult = namedtuple('searchresult', ('namespace', 'key', 'type'))


def Namespace(__old__=None, **options):
    if __old__ is not None:
        for key, opt in options.items():
            if not opt.old:
                opt.old = {o.format(key) for o in __old__}
    return options


def old_ns(ns):
    return {f'{ns}_{{0}}'}


[docs]class Option: """Describes a Celery configuration option.""" alt = None deprecate_by = None remove_by = None old = set() typemap = {'string': str, 'int': int, 'float': float, 'any': lambda v: v, 'bool': strtobool, 'dict': dict, 'tuple': tuple} def __init__(self, default=None, *args, **kwargs): self.default = default self.type = kwargs.get('type') or 'string' for attr, value in kwargs.items(): setattr(self, attr, value)
[docs] def to_python(self, value): return self.typemap[self.type](value)
def __repr__(self): return '<Option: type->{} default->{!r}>'.format(self.type, self.default)
NAMESPACES = Namespace( accept_content=Option(DEFAULT_ACCEPT_CONTENT, type='list', old=OLD_NS), result_accept_content=Option(None, type='list'), enable_utc=Option(True, type='bool'), imports=Option((), type='tuple', old=OLD_NS), include=Option((), type='tuple', old=OLD_NS), timezone=Option(type='string', old=OLD_NS), beat=Namespace( __old__=OLD_NS_BEAT, max_loop_interval=Option(0, type='float'), schedule=Option({}, type='dict'), scheduler=Option('celery.beat:PersistentScheduler'), schedule_filename=Option('celerybeat-schedule'), sync_every=Option(0, type='int'), ), broker=Namespace( url=Option(None, type='string'), read_url=Option(None, type='string'), write_url=Option(None, type='string'), transport=Option(type='string'), transport_options=Option({}, type='dict'), connection_timeout=Option(4, type='float'), connection_retry=Option(True, type='bool'), connection_max_retries=Option(100, type='int'), failover_strategy=Option(None, type='string'), heartbeat=Option(120, type='int'), heartbeat_checkrate=Option(3.0, type='int'), login_method=Option(None, type='string'), pool_limit=Option(10, type='int'), use_ssl=Option(False, type='bool'), host=Option(type='string'), port=Option(type='int'), user=Option(type='string'), password=Option(type='string'), vhost=Option(type='string'), ), cache=Namespace( __old__=old_ns('celery_cache'), backend=Option(), backend_options=Option({}, type='dict'), ), cassandra=Namespace( entry_ttl=Option(type='float'), keyspace=Option(type='string'), port=Option(type='string'), read_consistency=Option(type='string'), servers=Option(type='list'), table=Option(type='string'), write_consistency=Option(type='string'), auth_provider=Option(type='string'), auth_kwargs=Option(type='string'), options=Option({}, type='dict'), ), s3=Namespace( access_key_id=Option(type='string'), secret_access_key=Option(type='string'), bucket=Option(type='string'), base_path=Option(type='string'), endpoint_url=Option(type='string'), region=Option(type='string'), ), azureblockblob=Namespace( container_name=Option('celery', type='string'), retry_initial_backoff_sec=Option(2, type='int'), retry_increment_base=Option(2, type='int'), retry_max_attempts=Option(3, type='int'), base_path=Option('', type='string'), ), control=Namespace( queue_ttl=Option(300.0, type='float'), queue_expires=Option(10.0, type='float'), exchange=Option('celery', type='string'), ), couchbase=Namespace( __old__=old_ns('celery_couchbase'), backend_settings=Option(None, type='dict'), ), arangodb=Namespace( __old__=old_ns('celery_arangodb'), backend_settings=Option(None, type='dict') ), mongodb=Namespace( __old__=old_ns('celery_mongodb'), backend_settings=Option(type='dict'), ), cosmosdbsql=Namespace( database_name=Option('celerydb', type='string'), collection_name=Option('celerycol', type='string'), consistency_level=Option('Session', type='string'), max_retry_attempts=Option(9, type='int'), max_retry_wait_time=Option(30, type='int'), ), event=Namespace( __old__=old_ns('celery_event'), queue_expires=Option(60.0, type='float'), queue_ttl=Option(5.0, type='float'), queue_prefix=Option('celeryev'), serializer=Option('json'), exchange=Option('celeryev', type='string'), ), redis=Namespace( __old__=old_ns('celery_redis'), backend_use_ssl=Option(type='dict'), db=Option(type='int'), host=Option(type='string'), max_connections=Option(type='int'), username=Option(type='string'), password=Option(type='string'), port=Option(type='int'), socket_timeout=Option(120.0, type='float'), socket_connect_timeout=Option(None, type='float'), retry_on_timeout=Option(False, type='bool'), socket_keepalive=Option(False, type='bool'), ), result=Namespace( __old__=old_ns('celery_result'), backend=Option(type='string'), cache_max=Option( -1, type='int', old={'celery_max_cached_results'}, ), compression=Option(type='str'), exchange=Option('celeryresults'), exchange_type=Option('direct'), expires=Option( timedelta(days=1), type='float', old={'celery_task_result_expires'}, ), persistent=Option(None, type='bool'), extended=Option(False, type='bool'), serializer=Option('json'), backend_transport_options=Option({}, type='dict'), chord_retry_interval=Option(1.0, type='float'), chord_join_timeout=Option(3.0, type='float'), backend_max_sleep_between_retries_ms=Option(10000, type='int'), backend_max_retries=Option(float("inf"), type='float'), backend_base_sleep_between_retries_ms=Option(10, type='int'), backend_always_retry=Option(False, type='bool'), ), elasticsearch=Namespace( __old__=old_ns('celery_elasticsearch'), retry_on_timeout=Option(type='bool'), max_retries=Option(type='int'), timeout=Option(type='float'), save_meta_as_text=Option(True, type='bool'), ), security=Namespace( __old__=old_ns('celery_security'), certificate=Option(type='string'), cert_store=Option(type='string'), key=Option(type='string'), digest=Option(DEFAULT_SECURITY_DIGEST, type='string'), ), database=Namespace( url=Option(old={'celery_result_dburi'}), engine_options=Option( type='dict', old={'celery_result_engine_options'}, ), short_lived_sessions=Option( False, type='bool', old={'celery_result_db_short_lived_sessions'}, ), table_schemas=Option(type='dict'), table_names=Option(type='dict', old={'celery_result_db_tablenames'}), ), task=Namespace( __old__=OLD_NS, acks_late=Option(False, type='bool'), acks_on_failure_or_timeout=Option(True, type='bool'), always_eager=Option(False, type='bool'), annotations=Option(type='any'), compression=Option(type='string', old={'celery_message_compression'}), create_missing_queues=Option(True, type='bool'), inherit_parent_priority=Option(False, type='bool'), default_delivery_mode=Option(2, type='string'), default_queue=Option('celery'), default_exchange=Option(None, type='string'), # taken from queue default_exchange_type=Option('direct'), default_routing_key=Option(None, type='string'), # taken from queue default_rate_limit=Option(type='string'), default_priority=Option(None, type='string'), eager_propagates=Option( False, type='bool', old={'celery_eager_propagates_exceptions'}, ), ignore_result=Option(False, type='bool'), store_eager_result=Option(False, type='bool'), protocol=Option(2, type='int', old={'celery_task_protocol'}), publish_retry=Option( True, type='bool', old={'celery_task_publish_retry'}, ), publish_retry_policy=Option( {'max_retries': 3, 'interval_start': 0, 'interval_max': 1, 'interval_step': 0.2}, type='dict', old={'celery_task_publish_retry_policy'}, ), queues=Option(type='dict'), queue_max_priority=Option(None, type='int'), reject_on_worker_lost=Option(type='bool'), remote_tracebacks=Option(False, type='bool'), routes=Option(type='any'), send_sent_event=Option( False, type='bool', old={'celery_send_task_sent_event'}, ), serializer=Option('json', old={'celery_task_serializer'}), soft_time_limit=Option( type='float', old={'celeryd_task_soft_time_limit'}, ), time_limit=Option( type='float', old={'celeryd_task_time_limit'}, ), store_errors_even_if_ignored=Option(False, type='bool'), track_started=Option(False, type='bool'), ), worker=Namespace( __old__=OLD_NS_WORKER, agent=Option(None, type='string'), autoscaler=Option('celery.worker.autoscale:Autoscaler'), cancel_long_running_tasks_on_connection_loss=Option( False, type='bool' ), concurrency=Option(0, type='int'), consumer=Option('celery.worker.consumer:Consumer', type='string'), direct=Option(False, type='bool', old={'celery_worker_direct'}), disable_rate_limits=Option( False, type='bool', old={'celery_disable_rate_limits'}, ), deduplicate_successful_tasks=Option( False, type='bool' ), enable_remote_control=Option( True, type='bool', old={'celery_enable_remote_control'}, ), hijack_root_logger=Option(True, type='bool'), log_color=Option(type='bool'), log_format=Option(DEFAULT_PROCESS_LOG_FMT), lost_wait=Option(10.0, type='float', old={'celeryd_worker_lost_wait'}), max_memory_per_child=Option(type='int'), max_tasks_per_child=Option(type='int'), pool=Option(DEFAULT_POOL), pool_putlocks=Option(True, type='bool'), pool_restarts=Option(False, type='bool'), proc_alive_timeout=Option(4.0, type='float'), prefetch_multiplier=Option(4, type='int'), redirect_stdouts=Option( True, type='bool', old={'celery_redirect_stdouts'}, ), redirect_stdouts_level=Option( 'WARNING', old={'celery_redirect_stdouts_level'}, ), send_task_events=Option( False, type='bool', old={'celery_send_events'}, ), state_db=Option(), task_log_format=Option(DEFAULT_TASK_LOG_FMT), timer=Option(type='string'), timer_precision=Option(1.0, type='float'), ), ) def _flatten_keys(ns, key, opt): return [(ns + key, opt)] def _to_compat(ns, key, opt): if opt.old: return [ (oldkey.format(key).upper(), ns + key, opt) for oldkey in opt.old ] return [((ns + key).upper(), ns + key, opt)]
[docs]def flatten(d, root='', keyfilter=_flatten_keys): """Flatten settings.""" stack = deque([(root, d)]) while stack: ns, options = stack.popleft() for key, opt in options.items(): if isinstance(opt, dict): stack.append((ns + key + '_', opt)) else: yield from keyfilter(ns, key, opt)
DEFAULTS = { key: opt.default for key, opt in flatten(NAMESPACES) } __compat = list(flatten(NAMESPACES, keyfilter=_to_compat)) _OLD_DEFAULTS = {old_key: opt.default for old_key, _, opt in __compat} _TO_OLD_KEY = {new_key: old_key for old_key, new_key, _ in __compat} _TO_NEW_KEY = {old_key: new_key for old_key, new_key, _ in __compat} __compat = None SETTING_KEYS = set(DEFAULTS.keys()) _OLD_SETTING_KEYS = set(_TO_NEW_KEY.keys()) def find_deprecated_settings(source): # pragma: no cover from celery.utils import deprecated for name, opt in flatten(NAMESPACES): if (opt.deprecate_by or opt.remove_by) and getattr(source, name, None): deprecated.warn(description=f'The {name!r} setting', deprecation=opt.deprecate_by, removal=opt.remove_by, alternative=f'Use the {opt.alt} instead') return source
[docs]@memoize(maxsize=None) def find(name, namespace='celery'): """Find setting by name.""" # - Try specified name-space first. namespace = namespace.lower() try: return searchresult( namespace, name.lower(), NAMESPACES[namespace][name.lower()], ) except KeyError: # - Try all the other namespaces. for ns, opts in NAMESPACES.items(): if ns.lower() == name.lower(): return searchresult(None, ns, opts) elif isinstance(opts, dict): try: return searchresult(ns, name.lower(), opts[name.lower()]) except KeyError: pass # - See if name is a qualname last. return searchresult(None, name.lower(), DEFAULTS[name.lower()])
Back to Top