"""Celery Command Line Interface."""
import os
import pathlib
import traceback
import click
import click.exceptions
from click.types import ParamType
from click_didyoumean import DYMGroup
from click_plugins import with_plugins
from pkg_resources import iter_entry_points
from celery import VERSION_BANNER
from celery.app.utils import find_app
from celery.bin.amqp import amqp
from celery.bin.base import CeleryCommand, CeleryOption, CLIContext
from celery.bin.beat import beat
from celery.bin.call import call
from celery.bin.control import control, inspect, status
from celery.bin.events import events
from celery.bin.graph import graph
from celery.bin.list import list_
from celery.bin.logtool import logtool
from celery.bin.migrate import migrate
from celery.bin.multi import multi
from celery.bin.purge import purge
from celery.bin.result import result
from celery.bin.shell import shell
from celery.bin.upgrade import upgrade
from celery.bin.worker import worker
UNABLE_TO_LOAD_APP_MODULE_NOT_FOUND = click.style("""
Unable to load celery application.
The module {0} was not found.""", fg='red')
UNABLE_TO_LOAD_APP_ERROR_OCCURRED = click.style("""
Unable to load celery application.
While trying to load the module {0} the following error occurred:
{1}""", fg='red')
UNABLE_TO_LOAD_APP_APP_MISSING = click.style("""
Unable to load celery application.
{0}""")
[docs]class App(ParamType):
"""Application option."""
name = "application"
[docs] def convert(self, value, param, ctx):
try:
return find_app(value)
except ModuleNotFoundError as e:
if e.name != value:
exc = traceback.format_exc()
self.fail(
UNABLE_TO_LOAD_APP_ERROR_OCCURRED.format(value, exc)
)
self.fail(UNABLE_TO_LOAD_APP_MODULE_NOT_FOUND.format(e.name))
except AttributeError as e:
attribute_name = e.args[0].capitalize()
self.fail(UNABLE_TO_LOAD_APP_APP_MISSING.format(attribute_name))
except Exception:
exc = traceback.format_exc()
self.fail(
UNABLE_TO_LOAD_APP_ERROR_OCCURRED.format(value, exc)
)
APP = App()
@with_plugins(iter_entry_points('celery.commands'))
@click.group(cls=DYMGroup, invoke_without_command=True)
@click.option('-A',
'--app',
envvar='APP',
cls=CeleryOption,
type=APP,
help_group="Global Options")
@click.option('-b',
'--broker',
envvar='BROKER_URL',
cls=CeleryOption,
help_group="Global Options")
@click.option('--result-backend',
envvar='RESULT_BACKEND',
cls=CeleryOption,
help_group="Global Options")
@click.option('--loader',
envvar='LOADER',
cls=CeleryOption,
help_group="Global Options")
@click.option('--config',
envvar='CONFIG_MODULE',
cls=CeleryOption,
help_group="Global Options")
@click.option('--workdir',
cls=CeleryOption,
type=pathlib.Path,
callback=lambda _, __, wd: os.chdir(wd) if wd else None,
is_eager=True,
help_group="Global Options")
@click.option('-C',
'--no-color',
envvar='NO_COLOR',
is_flag=True,
cls=CeleryOption,
help_group="Global Options")
@click.option('-q',
'--quiet',
is_flag=True,
cls=CeleryOption,
help_group="Global Options")
@click.option('--version',
cls=CeleryOption,
is_flag=True,
help_group="Global Options")
@click.pass_context
def celery(ctx, app, broker, result_backend, loader, config, workdir,
no_color, quiet, version):
"""Celery command entrypoint."""
if version:
click.echo(VERSION_BANNER)
ctx.exit()
elif ctx.invoked_subcommand is None:
click.echo(ctx.get_help())
ctx.exit()
if loader:
# Default app takes loader from this env (Issue #1066).
os.environ['CELERY_LOADER'] = loader
if broker:
os.environ['CELERY_BROKER_URL'] = broker
if result_backend:
os.environ['CELERY_RESULT_BACKEND'] = result_backend
if config:
os.environ['CELERY_CONFIG_MODULE'] = config
ctx.obj = CLIContext(app=app, no_color=no_color, workdir=workdir,
quiet=quiet)
# User options
worker.params.extend(ctx.obj.app.user_options.get('worker', []))
beat.params.extend(ctx.obj.app.user_options.get('beat', []))
events.params.extend(ctx.obj.app.user_options.get('events', []))
for command in celery.commands.values():
command.params.extend(ctx.obj.app.user_options.get('preload', []))
@celery.command(cls=CeleryCommand)
@click.pass_context
def report(ctx):
"""Shows information useful to include in bug-reports."""
app = ctx.obj.app
app.loader.import_default_modules()
ctx.obj.echo(app.bugreport())
celery.add_command(purge)
celery.add_command(call)
celery.add_command(beat)
celery.add_command(list_)
celery.add_command(result)
celery.add_command(migrate)
celery.add_command(status)
celery.add_command(worker)
celery.add_command(events)
celery.add_command(inspect)
celery.add_command(control)
celery.add_command(graph)
celery.add_command(upgrade)
celery.add_command(logtool)
celery.add_command(amqp)
celery.add_command(shell)
celery.add_command(multi)
# Monkey-patch click to display a custom error
# when -A or --app are used as sub-command options instead of as options
# of the global command.
previous_show_implementation = click.exceptions.NoSuchOption.show
WRONG_APP_OPTION_USAGE_MESSAGE = """You are using `{option_name}` as an option of the {info_name} sub-command:
celery {info_name} {option_name} celeryapp <...>
The support for this usage was removed in Celery 5.0. Instead you should use `{option_name}` as a global option:
celery {option_name} celeryapp {info_name} <...>"""
def _show(self, file=None):
if self.option_name in ('-A', '--app'):
self.ctx.obj.error(
WRONG_APP_OPTION_USAGE_MESSAGE.format(
option_name=self.option_name,
info_name=self.ctx.info_name),
fg='red'
)
previous_show_implementation(self, file=file)
click.exceptions.NoSuchOption.show = _show
[docs]def main() -> int:
"""Start celery umbrella command.
This function is the main entrypoint for the CLI.
:return: The exit code of the CLI.
"""
return celery(auto_envvar_prefix="CELERY")