"""Click customizations for Celery."""
import json
from collections import OrderedDict
from functools import update_wrapper
from pprint import pformat
import click
from click import ParamType
from kombu.utils.objects import cached_property
from celery._state import get_current_app
from celery.signals import user_preload_options
from celery.utils import text
from celery.utils.log import mlevel
from celery.utils.time import maybe_iso8601
try:
from pygments import highlight
from pygments.formatters import Terminal256Formatter
from pygments.lexers import PythonLexer
except ImportError:
def highlight(s, *args, **kwargs):
"""Place holder function in case pygments is missing."""
return s
LEXER = None
FORMATTER = None
else:
LEXER = PythonLexer()
FORMATTER = Terminal256Formatter()
[docs]class CLIContext:
"""Context Object for the CLI."""
def __init__(self, app, no_color, workdir, quiet=False):
"""Initialize the CLI context."""
self.app = app or get_current_app()
self.no_color = no_color
self.quiet = quiet
self.workdir = workdir
@cached_property
def OK(self):
return self.style("OK", fg="green", bold=True)
@cached_property
def ERROR(self):
return self.style("ERROR", fg="red", bold=True)
[docs] def style(self, message=None, **kwargs):
if self.no_color:
return message
else:
return click.style(message, **kwargs)
[docs] def secho(self, message=None, **kwargs):
if self.no_color:
kwargs['color'] = False
click.echo(message, **kwargs)
else:
click.secho(message, **kwargs)
[docs] def echo(self, message=None, **kwargs):
if self.no_color:
kwargs['color'] = False
click.echo(message, **kwargs)
else:
click.echo(message, **kwargs)
[docs] def error(self, message=None, **kwargs):
kwargs['err'] = True
if self.no_color:
kwargs['color'] = False
click.echo(message, **kwargs)
else:
click.secho(message, **kwargs)
[docs] def pretty(self, n):
if isinstance(n, list):
return self.OK, self.pretty_list(n)
if isinstance(n, dict):
if 'ok' in n or 'error' in n:
return self.pretty_dict_ok_error(n)
else:
s = json.dumps(n, sort_keys=True, indent=4)
if not self.no_color:
s = highlight(s, LEXER, FORMATTER)
return self.OK, s
if isinstance(n, str):
return self.OK, n
return self.OK, pformat(n)
[docs] def pretty_list(self, n):
if not n:
return '- empty -'
return '\n'.join(
f'{self.style("*", fg="white")} {item}' for item in n
)
[docs] def pretty_dict_ok_error(self, n):
try:
return (self.OK,
text.indent(self.pretty(n['ok'])[1], 4))
except KeyError:
pass
return (self.ERROR,
text.indent(self.pretty(n['error'])[1], 4))
[docs] def say_chat(self, direction, title, body='', show_body=False):
if direction == '<-' and self.quiet:
return
dirstr = not self.quiet and f'{self.style(direction, fg="white", bold=True)} ' or ''
self.echo(f'{dirstr} {title}')
if body and show_body:
self.echo(body)
[docs]def handle_preload_options(f):
"""Extract preload options and return a wrapped callable."""
def caller(ctx, *args, **kwargs):
app = ctx.obj.app
preload_options = [o.name for o in app.user_options.get('preload', [])]
if preload_options:
user_options = {
preload_option: kwargs[preload_option]
for preload_option in preload_options
}
user_preload_options.send(sender=f, app=app, options=user_options)
return f(ctx, *args, **kwargs)
return update_wrapper(caller, f)
[docs]class CeleryOption(click.Option):
"""Customized option for Celery."""
[docs] def get_default(self, ctx):
if self.default_value_from_context:
self.default = ctx.obj[self.default_value_from_context]
return super().get_default(ctx)
def __init__(self, *args, **kwargs):
"""Initialize a Celery option."""
self.help_group = kwargs.pop('help_group', None)
self.default_value_from_context = kwargs.pop('default_value_from_context', None)
super().__init__(*args, **kwargs)
[docs]class CeleryCommand(click.Command):
"""Customized command for Celery."""
[docs]class CeleryDaemonCommand(CeleryCommand):
"""Daemon commands."""
def __init__(self, *args, **kwargs):
"""Initialize a Celery command with common daemon options."""
super().__init__(*args, **kwargs)
self.params.append(CeleryOption(('-f', '--logfile'), help_group="Daemonization Options"))
self.params.append(CeleryOption(('--pidfile',), help_group="Daemonization Options"))
self.params.append(CeleryOption(('--uid',), help_group="Daemonization Options"))
self.params.append(CeleryOption(('--uid',), help_group="Daemonization Options"))
self.params.append(CeleryOption(('--gid',), help_group="Daemonization Options"))
self.params.append(CeleryOption(('--umask',), help_group="Daemonization Options"))
self.params.append(CeleryOption(('--executable',), help_group="Daemonization Options"))
[docs]class CommaSeparatedList(ParamType):
"""Comma separated list argument."""
name = "comma separated list"
[docs] def convert(self, value, param, ctx):
return text.str_to_list(value)
[docs]class Json(ParamType):
"""JSON formatted argument."""
name = "json"
[docs] def convert(self, value, param, ctx):
try:
return json.loads(value)
except ValueError as e:
self.fail(str(e))
[docs]class ISO8601DateTime(ParamType):
"""ISO 8601 Date Time argument."""
name = "iso-86091"
[docs] def convert(self, value, param, ctx):
try:
return maybe_iso8601(value)
except (TypeError, ValueError) as e:
self.fail(e)
[docs]class ISO8601DateTimeOrFloat(ParamType):
"""ISO 8601 Date Time or float argument."""
name = "iso-86091 or float"
[docs] def convert(self, value, param, ctx):
try:
return float(value)
except (TypeError, ValueError):
pass
try:
return maybe_iso8601(value)
except (TypeError, ValueError) as e:
self.fail(e)
[docs]class LogLevel(click.Choice):
"""Log level option."""
def __init__(self):
"""Initialize the log level option with the relevant choices."""
super().__init__(('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'FATAL'))
[docs] def convert(self, value, param, ctx):
value = value.upper()
value = super().convert(value, param, ctx)
return mlevel(value)
JSON = Json()
ISO8601 = ISO8601DateTime()
ISO8601_OR_FLOAT = ISO8601DateTimeOrFloat()
LOG_LEVEL = LogLevel()
COMMA_SEPARATED_LIST = CommaSeparatedList()