Механизм повторной темы потокового вещания Faust

У меня есть две темы

  • главная_тема
  • retry_topic

Я хочу, чтобы при сбое логики в main_topic он вызывал retry_topic и если это не удается, то должно быть максимум 3 повтора, которые он должен сделать.

Я пробовал использовать sink в потоковой передаче faust, что он делает, так это выдает результат из моей основной темы в retry_topic, но я все еще не могу ограничить его до 3 повторных попыток.

Есть ли способ сделать это в потоковой передаче faust/kafka, потому что я знаю, что у celery есть такая возможность.

Один из способов добиться этого - использовать заголовки.

@app.agent(topic)
async def topic_handler(stream: faust.Stream):
    async for event in stream:
        try:
            await process_event(event)
        except Exception as e:
            logger.error(f'Error processing')
            current_try = int(stream.current_event.headers.get('retries', b'0').decode()) + 1
            await asyncio.sleep(min(current_try * 10, MAX_RETRY_WAIT))
            await retry_topic.send(key=event['id'], value=event,
                                   headers={'retries': str(current_try).encode(),
                                            'error': repr(e).encode()})

Теперь, если вы хотите использовать один и тот же агент для двух тем, перед этим агентом вы можете определить topic как один из main_topic или retry_topic, примерно так:

use_retry_topic = os.environ['USE_RETRY_TOPIC']
topic = app.topic(retry_topic_name if use_retry_topic else main_topic_name)
retry_topic = app.topic(retry_topic_name)

Таким образом, вам потребуется два процесса. Один начинается с USE_RETRY_TOPIC = False, он читает основную тему и, если что-то идет не так, отправляет сообщение в retry_topic с задержкой. Другой процесс запускается с USE_RETRY_TOPIC = True, он потребляет тему повторной попытки и, если что-то снова идет не так, снова отправляет сообщение в ту же тему, но с увеличенным retries счетчиком.

При желании вы можете добавить условие, чтобы проверить счетчик retries, если он больше <<<2>>

>

Обратите внимание, что такая логика задержки может быть не очень безопасной, например, если процесс неожиданно завершится в ожидании отправки сообщения в retry_topic, это сообщение может быть потеряно.

Кроме того, такой подход может нарушить порядок сообщений, link

Вернуться на верх