Механизм повторной темы потокового вещания Faust
У меня есть две темы
- главная_тема
- retry_topic
Я хочу, чтобы при сбое логики в main_topic он вызывал retry_topic и если это не удается, то должно быть максимум 3 повтора, которые он должен сделать.
Я пробовал использовать sink в потоковой передаче faust, что он делает, так это выдает результат из моей основной темы в retry_topic, но я все еще не могу ограничить его до 3 повторных попыток.
Есть ли способ сделать это в потоковой передаче faust/kafka, потому что я знаю, что у celery есть такая возможность.
Один из способов добиться этого - использовать заголовки.
@app.agent(topic)
async def topic_handler(stream: faust.Stream):
async for event in stream:
try:
await process_event(event)
except Exception as e:
logger.error(f'Error processing')
current_try = int(stream.current_event.headers.get('retries', b'0').decode()) + 1
await asyncio.sleep(min(current_try * 10, MAX_RETRY_WAIT))
await retry_topic.send(key=event['id'], value=event,
headers={'retries': str(current_try).encode(),
'error': repr(e).encode()})
Теперь, если вы хотите использовать один и тот же агент для двух тем, перед этим агентом вы можете определить topic
как один из main_topic
или retry_topic
, примерно так:
use_retry_topic = os.environ['USE_RETRY_TOPIC']
topic = app.topic(retry_topic_name if use_retry_topic else main_topic_name)
retry_topic = app.topic(retry_topic_name)
Таким образом, вам потребуется два процесса. Один начинается с USE_RETRY_TOPIC = False
, он читает основную тему и, если что-то идет не так, отправляет сообщение в retry_topic
с задержкой.
Другой процесс запускается с USE_RETRY_TOPIC = True
, он потребляет тему повторной попытки и, если что-то снова идет не так, снова отправляет сообщение в ту же тему, но с увеличенным retries
счетчиком.
При желании вы можете добавить условие, чтобы проверить счетчик retries
, если он больше <<<2>>
Обратите внимание, что такая логика задержки может быть не очень безопасной, например, если процесс неожиданно завершится в ожидании отправки сообщения в retry_topic
, это сообщение может быть потеряно.
Кроме того, такой подход может нарушить порядок сообщений, link