Расширения и бутстепы

Потребители пользовательских сообщений

Возможно, вы захотите встроить пользовательские потребители Kombu для ручной обработки сообщений.

Для этого существует специальный класс ConsumerStep bootstep, в котором необходимо определить только метод get_consumers, который должен возвращать список объектов kombu.Consumer, запускаемых при каждом установлении соединения:

from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue

my_queue = Queue('custom', Exchange('custom'), 'routing_key')

app = Celery(broker='amqp://')


class MyConsumerStep(bootsteps.ConsumerStep):

    def get_consumers(self, channel):
        return [Consumer(channel,
                         queues=[my_queue],
                         callbacks=[self.handle_message],
                         accept=['json'])]

    def handle_message(self, body, message):
        print('Received message: {0!r}'.format(body))
        message.ack()
app.steps['consumer'].add(MyConsumerStep)

def send_me_a_message(who, producer=None):
    with app.producer_or_acquire(producer) as producer:
        producer.publish(
            {'hello': who},
            serializer='json',
            exchange=my_queue.exchange,
            routing_key='routing_key',
            declare=[my_queue],
            retry=True,
        )

if __name__ == '__main__':
    send_me_a_message('world!')

Примечание

Потребители Kombu могут использовать два различных механизма диспетчеризации обратных вызовов сообщений. Первый - это аргумент callbacks, который принимает список обратных вызовов с сигнатурой (body, message), второй - аргумент on_message, который принимает один обратный вызов с сигнатурой (message,). Последний не будет автоматически декодировать и десериализовывать полезную нагрузку.

def get_consumers(self, channel):
    return [Consumer(channel, queues=[my_queue],
                     on_message=self.on_message)]


def on_message(self, message):
    payload = message.decode()
    print(
        'Received message: {0!r} {props!r} rawlen={s}'.format(
        payload, props=message.properties, s=len(message.body),
    ))
    message.ack()

Схемы

Bootsteps - это техника добавления функциональности в рабочие. Bootstep - это пользовательский класс, который определяет крючки для выполнения пользовательских действий на разных этапах рабочего. Каждый бутстеп принадлежит к чертежу, и в настоящее время рабочий определяет два чертежа: Worker и Consumer.


Иллюстрация A: Этапы загрузки в схемах «Рабочий» и «Потребитель». Начало

снизу вверх первым шагом в рабочем чертеже является таймер, а последним шагом - запуск чертежа Consumer, который затем устанавливает соединение с брокером и начинает потреблять сообщения.

../../_images/worker_graph_full.png

Исполнитель

Worker - это первая запускаемая схема, и с ним запускаются основные компоненты, такие как цикл событий, пул обработки и таймер, используемый для задач ETA и других событий с определенным временем.

Когда исполнитель полностью запущен, он переходит к схеме Consumer, который устанавливает, как выполняются задания, подключается к брокеру и запускает потребителей сообщений.

WorkController является основной реализацией рабочего, и содержит несколько методов и атрибутов, которые вы можете использовать в своем bootstep.

Атрибуты

app

Текущий экземпляр приложения.

hostname

Имя рабочего узла (например, worker1@example.com)

blueprint

Это рабочий Blueprint.

hub

Объект цикла событий (Hub). Вы можете использовать его для регистрации обратных вызовов в цикле событий.

Это поддерживается только транспортами с поддержкой асинхронного ввода-вывода (amqp, redis), в этом случае следует установить атрибут worker.use_eventloop.

Ваш рабочий бутстеп должен требовать бутстеп концентратора, чтобы использовать его:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Hub'}
pool

Текущий пул процессов/событий/гентов/потоков. См. celery.concurrency.base.BasePool.

Ваш рабочий gevent должен требовать использования Pool bootstep:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Pool'}
timer

Timer используется для планирования функций.

Ваш рабочий бутстеп должен требовать бутстеп Timer, чтобы использовать его:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer'}
statedb

Database <celery.worker.state.Persistent>` для сохранения состояния между перезапусками рабочих.

Определяется только в том случае, если включен аргумент statedb.

Чтобы использовать это, ваш рабочий бутстеп должен требовать бутстеп Statedb:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Statedb'}
autoscaler

Autoscaler используется для автоматического увеличения и уменьшения количества процессов в пуле.

Определяется только в том случае, если включен аргумент autoscale.

Чтобы использовать это, ваш рабочий бутстеп должен требовать бутстеп Autoscaler:

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoscaler:Autoscaler',)
autoreloader

Autoreloader используется для автоматической перезагрузки кода использования при изменении файловой системы.

Он определяется только в том случае, если включен аргумент autoreload. Ваш рабочий бутстеп должен требовать бутстеп Autoreloader, чтобы использовать это;

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoreloader:Autoreloader',)

Пример рабочего бутстепа

Примером Worker bootstep может быть:

from celery import bootsteps

class ExampleWorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Pool'}

    def __init__(self, worker, **kwargs):
        print('Called when the WorkController instance is constructed')
        print('Arguments to WorkController: {0!r}'.format(kwargs))

    def create(self, worker):
        # this method can be used to delegate the action methods
        # to another object that implements ``start`` and ``stop``.
        return self

    def start(self, worker):
        print('Called when the worker is started.')

    def stop(self, worker):
        print('Called when the worker shuts down.')

    def terminate(self, worker):
        print('Called when the worker terminates')

Каждому методу в качестве первого аргумента передается текущий экземпляр WorkController.

Другой пример - использование таймера для пробуждения через регулярные промежутки времени:

from celery import bootsteps


class DeadlockDetection(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer'}

    def __init__(self, worker, deadlock_timeout=3600):
        self.timeout = deadlock_timeout
        self.requests = []
        self.tref = None

    def start(self, worker):
        # run every 30 seconds.
        self.tref = worker.timer.call_repeatedly(
            30.0, self.detect, (worker,), priority=10,
        )

    def stop(self, worker):
        if self.tref:
            self.tref.cancel()
            self.tref = None

    def detect(self, worker):
        # update active requests
        for req in worker.active_requests:
            if req.time_start and time() - req.time_start > self.timeout:
                raise SystemExit()

Настройка журналов обработки задач

Рабочий Celery посылает сообщения в подсистему регистрации Python для различных событий в течение жизненного цикла задачи. Эти сообщения могут быть настроены путем переопределения строк формата LOG_<TYPE>, которые определены в celery/app/trace.py. Например:

import celery.app.trace

celery.app.trace.LOG_SUCCESS = "This is a custom message"

Все различные строки формата снабжены именем и ID задачи для форматирования %, а некоторые из них получают дополнительные поля, такие как возвращаемое значение или исключение, вызвавшее сбой задачи. Эти поля могут быть использованы в пользовательских строках форматирования следующим образом:

import celery.app.trace

celery.app.trace.LOG_REJECTED = "%(name)r is cursed and I won't run it: %(exc)s"

Потребитель

Принтер Consumer устанавливает соединение с брокером и перезапускается каждый раз, когда это соединение теряется. Потребительские бутстрапы включают в себя рабочий пульс, потребитель команд удаленного управления и, что очень важно, потребитель задач.

При создании потребительских бутстепов необходимо учитывать, что должна быть возможность перезапуска вашего чертежа. Для потребительских бутстепов определен дополнительный метод „shutdown“, который вызывается, когда рабочий выключается.

Атрибуты

app

Текущий экземпляр приложения.

controller

Родительский WorkController объект, который создал этого потребителя.

hostname

Имя рабочего узла (например, worker1@example.com)

blueprint

Это рабочий Blueprint.

hub

Объект цикла событий (Hub). Вы можете использовать его для регистрации обратных вызовов в цикле событий.

Это поддерживается только транспортами с поддержкой асинхронного ввода-вывода (amqp, redis), в этом случае следует установить атрибут worker.use_eventloop.

Ваш рабочий бутстеп должен требовать бутстеп концентратора, чтобы использовать его:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Hub'}
connection

Текущее соединение брокера (kombu.Connection).

Потребительский бутстеп должен потребовать бутстеп „Connection“, чтобы использовать это:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.connection:Connection'}
event_dispatcher

Объект app.events.Dispatcher, который может быть использован для отправки событий.

Потребительский бутстеп должен требовать бутстеп Events, чтобы использовать это.

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.events:Events'}
gossip

Широковещательная связь между рабочими (Gossip).

Потребительский бутстеп должен требовать бутстеп Gossip, чтобы использовать его.

class RatelimitStep(bootsteps.StartStopStep):
    """Rate limit tasks based on the number of workers in the
    cluster."""
    requires = {'celery.worker.consumer.gossip:Gossip'}

    def start(self, c):
        self.c = c
        self.c.gossip.on.node_join.add(self.on_cluster_size_change)
        self.c.gossip.on.node_leave.add(self.on_cluster_size_change)
        self.c.gossip.on.node_lost.add(self.on_node_lost)
        self.tasks = [
            self.app.tasks['proj.tasks.add']
            self.app.tasks['proj.tasks.mul']
        ]
        self.last_size = None

    def on_cluster_size_change(self, worker):
        cluster_size = len(list(self.c.gossip.state.alive_workers()))
        if cluster_size != self.last_size:
            for task in self.tasks:
                task.rate_limit = 1.0 / cluster_size
            self.c.reset_rate_limits()
            self.last_size = cluster_size

    def on_node_lost(self, worker):
        # may have processed heartbeat too late, so wake up soon
        # in order to see if the worker recovered.
        self.c.timer.call_after(10.0, self.on_cluster_size_change)

Отзывы

  • <set> gossip.on.node_join

    Вызывается каждый раз, когда новый узел присоединяется к кластеру, предоставляя экземпляр Worker.

  • <set> gossip.on.node_leave

    Вызывается всякий раз, когда новый узел покидает кластер (выключается), предоставляя экземпляр Worker.

  • <set> gossip.on.node_lost

    Вызывается всякий раз, когда сердцебиение было пропущено для рабочего экземпляра в кластере (сердцебиение не было получено или обработано вовремя), обеспечивая экземпляр Worker.

    Это не обязательно означает, что рабочий действительно находится в автономном режиме, поэтому используйте механизм тайм-аута, если стандартного тайм-аута сердцебиения недостаточно.

pool

Текущий пул процессов/событий/гентов/потоков. См. celery.concurrency.base.BasePool.

timer

Timer <celery.utils.timer2.Schedule используется для планирования функций.

heart

Отвечает за отправку пульса рабочих событий (Heart).

Ваш потребительский бутстеп должен требовать бутстеп Heart, чтобы использовать его:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.heart:Heart'}
task_consumer

Объект kombu.Consumer, используемый для потребления сообщений задачи.

Ваш потребительский бутстеп должен требовать бутстеп Tasks, чтобы использовать его:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.tasks:Tasks'}
strategies

Каждый зарегистрированный тип задачи имеет запись в этом отображении, где значение используется для выполнения входящего сообщения этого типа задачи (стратегия выполнения задачи). Это отображение генерируется шагом загрузки Tasks при запуске потребителя:

for name, task in app.tasks.items():
    strategies[name] = task.start_strategy(app, consumer)
    task.__trace__ = celery.app.trace.build_tracer(
        name, task, loader, hostname
    )

Ваш потребительский бутстеп должен требовать бутстеп Tasks, чтобы использовать его:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.tasks:Tasks'}
task_buckets

Диктант defaultdict используется для поиска ограничения скорости для задачи по типу. Записи в этом дикте могут быть None (для отсутствия ограничения) или экземпляром TokenBucket, реализующим consume(tokens) и << 3 >>>.

TokenBucket реализует token bucket algorithm, но можно использовать любой алгоритм, если он соответствует тому же интерфейсу и определяет два вышеуказанных метода.

qos

Объект QoS можно использовать для изменения текущего значения prefetch_count каналов задач:

# increment at next cycle
consumer.qos.increment_eventually(1)
# decrement at next cycle
consumer.qos.decrement_eventually(1)
consumer.qos.set(10)

Методы

consumer.reset_rate_limits()

Обновляет отображение task_buckets для всех зарегистрированных типов задач.

consumer.bucket_for_task(type, Bucket=TokenBucket)

Создает ведро ограничения скорости для задачи, используя ее атрибут task.rate_limit.

consumer.add_task_queue(name, exchange=None, exchange_type=None,
routing_key=None, \*\*options):

Добавляет новую очередь для потребления. Она будет сохраняться при перезапуске соединения.

consumer.cancel_task_queue(name)

Остановить потребление из очереди по имени. Это будет сохраняться при перезапуске соединения.

apply_eta_task(request)

Расписание выполнения задачи ETA на основе атрибута request.eta. (Request)

Установка шагов загрузки

app.steps['worker'] и app.steps['consumer'] могут быть изменены для добавления новых шагов загрузки:

>>> app = Celery()
>>> app.steps['worker'].add(MyWorkerStep)  # < add class, don't instantiate
>>> app.steps['consumer'].add(MyConsumerStep)

>>> app.steps['consumer'].update([StepA, StepB])

>>> app.steps['consumer']
{step:proj.StepB{()}, step:proj.MyConsumerStep{()}, step:proj.StepA{()}

Порядок шагов здесь не важен, так как порядок определяется результирующим графом зависимостей (Step.requires).

Чтобы проиллюстрировать, как можно установить bootsteps и как они работают, вот пример шага, который печатает некоторую бесполезную отладочную информацию. Он может быть добавлен как в качестве рабочего, так и потребительского бутстепа:

from celery import Celery
from celery import bootsteps

class InfoStep(bootsteps.Step):

    def __init__(self, parent, **kwargs):
        # here we can prepare the Worker/Consumer object
        # in any way we want, set attribute defaults, and so on.
        print('{0!r} is in init'.format(parent))

    def start(self, parent):
        # our step is started together with all other Worker/Consumer
        # bootsteps.
        print('{0!r} is starting'.format(parent))

    def stop(self, parent):
        # the Consumer calls stop every time the consumer is
        # restarted (i.e., connection is lost) and also at shutdown.
        # The Worker will call stop at shutdown only.
        print('{0!r} is stopping'.format(parent))

    def shutdown(self, parent):
        # shutdown is called by the Consumer at shutdown, it's not
        # called by Worker.
        print('{0!r} is shutting down'.format(parent))

    app = Celery(broker='amqp://')
    app.steps['worker'].add(InfoStep)
    app.steps['consumer'].add(InfoStep)

Запуск рабочего с установленным этим шагом даст нам следующие журналы:

<Worker: w@example.com (initializing)> is in init
<Consumer: w@example.com (initializing)> is in init
[2013-05-29 16:18:20,544: WARNING/MainProcess]
    <Worker: w@example.com (running)> is starting
[2013-05-29 16:18:21,577: WARNING/MainProcess]
    <Consumer: w@example.com (running)> is starting
<Consumer: w@example.com (closing)> is stopping
<Worker: w@example.com (closing)> is stopping
<Consumer: w@example.com (terminating)> is shutting down

Утверждения print будут перенаправлены в подсистему протоколирования после инициализации рабочего, поэтому строки «is starting» имеют временную метку. Вы можете заметить, что при выключении этого больше не происходит, это потому, что методы stop и << 2 >>> вызываются внутри обработчика сигналов, а использовать логирование внутри такого обработчика небезопасно. Ведение журнала с помощью модуля Python logging не является shutdown: это означает, что вы не можете прервать функцию и вызвать ее снова позже. Важно, чтобы написанные вами методы reentrant и stop были также shutdown.

Запуск рабочего с --loglevel=debug покажет нам больше информации о процессе загрузки:

[2013-05-29 16:18:20,509: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: Building graph...
<celery.apps.worker.Worker object at 0x101ad8410> is in init
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: New boot order:
    {Hub, Pool, Timer, StateDB, Autoscaler, InfoStep, Beat, Consumer}
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Building graph...
<celery.worker.consumer.Consumer object at 0x101c2d8d0> is in init
[2013-05-29 16:18:20,515: DEBUG/MainProcess] | Consumer: New boot order:
    {Connection, Mingle, Events, Gossip, InfoStep, Agent,
     Heart, Control, Tasks, event loop}
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Hub
[2013-05-29 16:18:20,522: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Pool
[2013-05-29 16:18:20,542: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,543: DEBUG/MainProcess] | Worker: Starting InfoStep
[2013-05-29 16:18:20,544: WARNING/MainProcess]
    <celery.apps.worker.Worker object at 0x101ad8410> is starting
[2013-05-29 16:18:20,544: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Worker: Starting Consumer
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Consumer: Starting Connection
[2013-05-29 16:18:20,559: INFO/MainProcess] Connected to amqp://guest@127.0.0.1:5672//
[2013-05-29 16:18:20,560: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,560: DEBUG/MainProcess] | Consumer: Starting Mingle
[2013-05-29 16:18:20,560: INFO/MainProcess] mingle: searching for neighbors
[2013-05-29 16:18:21,570: INFO/MainProcess] mingle: no one here
[2013-05-29 16:18:21,570: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,571: DEBUG/MainProcess] | Consumer: Starting Events
[2013-05-29 16:18:21,572: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,572: DEBUG/MainProcess] | Consumer: Starting Gossip
[2013-05-29 16:18:21,577: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,577: DEBUG/MainProcess] | Consumer: Starting InfoStep
[2013-05-29 16:18:21,577: WARNING/MainProcess]
    <celery.worker.consumer.Consumer object at 0x101c2d8d0> is starting
[2013-05-29 16:18:21,578: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,578: DEBUG/MainProcess] | Consumer: Starting Heart
[2013-05-29 16:18:21,579: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,579: DEBUG/MainProcess] | Consumer: Starting Control
[2013-05-29 16:18:21,583: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,583: DEBUG/MainProcess] | Consumer: Starting Tasks
[2013-05-29 16:18:21,606: DEBUG/MainProcess] basic.qos: prefetch_count->80
[2013-05-29 16:18:21,606: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,606: DEBUG/MainProcess] | Consumer: Starting event loop
[2013-05-29 16:18:21,608: WARNING/MainProcess] celery@example.com ready.

Программы командной строки

Добавление новых опций командной строки

Параметры, специфичные для команды

Вы можете добавить дополнительные опции командной строки к командам worker, beat и events, изменив атрибут user_options экземпляра приложения.

Команды Celery используют модуль click для разбора аргументов командной строки, поэтому для добавления пользовательских аргументов необходимо добавить экземпляры click.Option в соответствующий набор.

Пример добавления пользовательской опции к команде celery worker:

from celery import Celery
from click import Option

app = Celery(broker='amqp://')

app.user_options['worker'].add(Option(('--enable-my-option',),
                                      is_flag=True,
                                      help='Enable custom option.'))

Теперь все шаги загрузки будут получать этот аргумент в качестве аргумента ключевого слова для Bootstep.__init__:

from celery import bootsteps

class MyBootstep(bootsteps.Step):

    def __init__(self, parent, enable_my_option=False, **options):
        super().__init__(parent, **options)
        if enable_my_option:
            party()

app.steps['worker'].add(MyBootstep)

Параметры предварительной нагрузки

Команда celery umbrella поддерживает концепцию «опций предварительной загрузки». Это специальные опции, передаваемые всем подкомандам.

Вы можете добавить новые параметры предварительной загрузки, например, для указания шаблона конфигурации:

from celery import Celery
from celery import signals
from click import Option

app = Celery()

app.user_options['preload'].add(Option(('-Z', '--template'),
                                       default='default',
                                       help='Configuration template to use.'))

@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):
    use_template(options['template'])

Добавление новых подкоманд celery

Новые команды могут быть добавлены к команде зонтика celery с помощью setuptools entry-points.

Точки входа - это специальные мета-данные, которые могут быть добавлены в ваши пакеты setup.py программой, а затем, после установки, прочитаны из системы с помощью модуля pkg_resources.

Celery распознает точки входа celery.commands для установки дополнительных подкоманд, где значение точки входа должно указывать на действительную команду click.

Вот как расширение мониторинга Flower может добавить команду celery flower, добавив точку входа в setup.py:

setup(
    name='flower',
    entry_points={
        'celery.commands': [
           'flower = flower.command:flower',
        ],
    }
)

Определение команды состоит из двух частей, разделенных знаком равенства, где первая часть - имя подкоманды (flower), а вторая часть - путь к функции, реализующей команду:

flower.command:flower

Путь к модулю и имя атрибута должны быть разделены двоеточием, как указано выше.

В модуле flower/command.py командная функция может быть определена следующим образом:

import click

@click.command()
@click.option('--port', default=8888, type=int, help='Webserver port')
@click.option('--debug', is_flag=True)
def flower(port, debug):
    print('Running our command')

Рабочий API

Hub - Рабочий асинхронный цикл событий

поддерживаемые транспорты:

amqp, redis

Добавлено в версии 3.0.

Исполнитель использует асинхронный ввод-вывод, когда используются транспорты брокера amqp или redis. В конечном итоге все транспорты должны использовать цикл событий, но это займет некоторое время, поэтому другие транспорты все еще используют решение на основе потоков.

hub.add(fd, callback, flags)
hub.add_reader(fd, callback, \*args)

Добавьте обратный вызов, который будет вызываться, когда fd будет доступен для чтения.

Обратный вызов будет оставаться зарегистрированным до тех пор, пока не будет явно удален с помощью hub.remove(fd), или пока дескриптор файла не будет автоматически отброшен, потому что он больше не действителен.

Обратите внимание, что для любого данного дескриптора файла одновременно может быть зарегистрирован только один обратный вызов, поэтому вызов add во второй раз удалит любой обратный вызов, который был ранее зарегистрирован для этого дескриптора файла.

Дескриптор файла - это любой файлоподобный объект, который поддерживает метод fileno, или это может быть номер дескриптора файла (int).

hub.add_writer(fd, callback, \*args)

Добавьте обратный вызов, который будет вызываться, когда fd становится доступным для записи. См. также примечания для hub.add_reader() выше.

hub.remove(fd)

Удалите из цикла все обратные вызовы для дескриптора файла fd.

Таймер - Планирование событий

timer.call_after(secs, callback, args=(), kwargs=(),
priority=0)
timer.call_repeatedly(secs, callback, args=(), kwargs=(),
priority=0)
timer.call_at(eta, callback, args=(), kwargs=(),
priority=0)
Вернуться на верх