Использование Amazon SQS

Установка

Для поддержки Amazon SQS необходимо установить дополнительные зависимости. Вы можете установить Celery и эти зависимости за один раз, используя celery[sqs] bundle:

$ pip install celery[sqs]

Конфигурация

Вы должны указать SQS в URL брокера:

broker_url = 'sqs://ABCDEFGHIJKLMNOPQRST:ZYXK7NiynGlTogH8Nj+P9nlE73sq3@'

где формат URL:

sqs://aws_access_key_id:aws_secret_access_key@

Обратите внимание, что вы должны не забыть включить знак @ в конце и закодировать пароль, чтобы он всегда мог быть правильно разобран. Например:

from kombu.utils.url import safequote

aws_access_key = safequote("ABCDEFGHIJKLMNOPQRST")
aws_secret_key = safequote("ZYXK7NiynG/TogH8Nj+P9nlE73sq3")

broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
    aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)

Учетные данные для входа также могут быть установлены с помощью переменных окружения AWS_ACCESS_KEY_ID и AWS_SECRET_ACCESS_KEY, в этом случае URL брокера может быть только sqs://.

Если вы используете роли IAM для экземпляров, вы можете установить BROKER_URL в значение: sqs:// и kombu попытается получить маркеры доступа из метаданных экземпляра.

Опции

Регион

По умолчанию используется регион us-east-1, но вы можете выбрать другой регион, настроив параметр broker_transport_options:

broker_transport_options = {'region': 'eu-west-1'}

См.также

Обзор регионов Amazon Web Services можно найти здесь:

Таймаут видимости

Таймаут видимости определяет количество секунд, в течение которых нужно ждать, пока рабочий подтвердит задание, прежде чем сообщение будет повторно передано другому рабочему. Также см. предостережения ниже.

Этот параметр устанавливается с помощью настройки broker_transport_options:

broker_transport_options = {'visibility_timeout': 3600}  # 1 hour.

По умолчанию тайм-аут видимости составляет 30 минут.

Интервал опроса

Интервал опроса определяет количество секунд сна между неудачными опросами. Это значение может быть либо int, либо float. По умолчанию значение равно одна секунда: это означает, что рабочий будет спать одну секунду, когда больше не будет сообщений для чтения.

Следует учитывать, что более частый опрос также более дорогостоящий, поэтому увеличение интервала опроса может сэкономить вам деньги.

Интервал опроса можно установить с помощью настройки broker_transport_options:

broker_transport_options = {'polling_interval': 0.3}

Очень частые интервалы опроса могут вызвать занятые циклы, в результате чего рабочий будет использовать много процессорного времени. Если вам нужна субмиллисекундная точность, вам следует рассмотреть возможность использования другого транспорта, например, RabbitMQ <broker-amqp> или Redis <broker-redis>.

Длительный опрос

SQS Long Polling включен по умолчанию, а параметр WaitTimeSeconds операции ReceiveMessage установлен на 10 секунд.

Значение параметра WaitTimeSeconds может быть установлено с помощью настройки broker_transport_options:

broker_transport_options = {'wait_time_seconds': 15}

Допустимые значения - от 0 до 20. Обратите внимание, что вновь созданные очереди (также если они созданы Celery) будут иметь значение 0 по умолчанию для свойства очереди «Receive Message Wait Time».

Префикс очереди

По умолчанию Celery не будет назначать никаких префиксов для имен очередей, если у вас есть другие сервисы, использующие SQS, вы можете настроить его на это с помощью параметра broker_transport_options:

broker_transport_options = {'queue_name_prefix': 'celery-'}

Предопределенные очереди

Если вы хотите, чтобы Celery использовал набор предопределенных очередей в AWS и никогда не пытался перечислить очереди SQS, не пытался создать или удалить их, передайте карту имен очередей в URL с помощью параметра predefined_queues:

broker_transport_options = {
    'predefined_queues': {
        'my-q': {
            'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
            'access_key_id': 'xxx',
            'secret_access_key': 'xxx',
        }
    }
}

Политика резервного копирования

Политика резервного копирования использует механизм таймаута видимости SQS, изменяя разницу во времени между повторными попытками выполнения задачи. Механизм изменяет конкретное сообщение visibility timeout из очереди Default visibility timeout на настроенный политикой таймаут. Количество повторных попыток управляется SQS (в частности, атрибутом сообщения ApproximateReceiveCount), и от пользователя не требуется никаких дополнительных действий.

Настройка очередей и политики отката:

broker_transport_options = {
    'predefined_queues': {
        'my-q': {
            'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
            'access_key_id': 'xxx',
            'secret_access_key': 'xxx',
            'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640},
            'backoff_tasks': ['svc.tasks.tasks.task1']
        }
    }
}

backoff_policy словарь, где ключ - количество повторных попыток, а значение - секунды задержки между повторными попытками (т.е. таймаут видимости SQS) backoff_tasks список имен задач для применения вышеуказанной политики

Вышеуказанная политика:

Ответственность

Задержка

2nd attempt

20 секунд

3rd attempt

40 секунд

4th attempt

80 секунд

5th attempt

320 секунд

6th attempt

640 секунд

Аутентификация с помощью маркера STS

https://docs.aws.amazon.com/cli/latest/reference/sts/assume-role.html

Аутентификация AWS STS поддерживается с помощью транспортных опций брокера sts_role_arn и sts_token_timeout. sts_role_arn - предполагаемая IAM-роль ARN, которую мы используем для авторизации доступа к SQS. sts_token_timeout - это таймаут токена, по умолчанию (и минимально) 900 секунд. По истечении указанного периода будет создан новый токен.

broker_transport_options = {
„predefined_queues“: {
„my-q“: {

„url“: „https://ap-southeast-2.queue.amazonaws.com/123456/my-q“, „access_key_id“: „xxx“, „secret_access_key“: „xxx“, „backoff_policy“: {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, „backoff_tasks“: [„svc.tasks.tasks.task1“].

}

},

„sts_role_arn“: „arn:aws:iam::<xxx>:role/STSTest“, # optional „sts_token_timeout“: 900 # optional }

Оговорки

  • Если задание не было подтверждено в течение visibility_timeout, оно будет повторно передано другому работнику и выполнено.

    Это вызывает проблемы с задачами ETA/countdown/retry, где время выполнения превышает таймаут видимости; фактически, если это произойдет, задача будет выполняться снова и снова в цикле.

    Поэтому вы должны увеличить таймаут видимости, чтобы он соответствовал времени самого длинного ETA, которое вы планируете использовать.

    Обратите внимание, что Celery будет повторно доставлять сообщения при отключении рабочих, поэтому длительный тайм-аут видимости только задержит повторную доставку «потерянных» заданий в случае сбоя питания или принудительного отключения рабочих.

    Периодические задачи не будут зависеть от тайм-аута видимости, поскольку это концепция, отдельная от ETA/countdown.

    Максимальный тайм-аут видимости, поддерживаемый AWS на данный момент, составляет 12 часов (43200 секунд):

    broker_transport_options = {'visibility_timeout': 43200}
    
  • SQS пока не поддерживает команды удаленного управления рабочими.

  • SQS пока не поддерживает события, поэтому его нельзя использовать с celery events, << 1 >>> или монитором Django Admin.

Результаты

Несколько продуктов из семейства Amazon Web Services могли бы стать хорошим кандидатом для хранения или публикации результатов, но на данный момент такой бэкенд для результатов не предусмотрен.

Предупреждение

Не используйте бэкенд результатов amqp с SQS.

Это создаст одну очередь для каждой задачи, и очереди не будут собираться. Это может стоить вам денег, которые лучше потратить на поддержку бэкенда хранилища результатов AWS обратно в Celery :)

Вернуться на верх