Правильно ли использовать Celery в Kafka Consumer?
Я выполняю некоторые длительные задачи в Kafka Consumer. Я хочу знать, что я делаю правильно или неправильно? Я использую Kafka Consumer для получения сообщений с другого сервера, и сообщения обрабатываются так, как я хочу. Я помещаю полученные сообщения в очередь Celery. И все работает хорошо. Нужен ли здесь Celery? Или Kafka справится с этим как система очередей?
_consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=['{}:{}'.format(HOST, PORT)],auto_offset_reset="earliest", value_deserializer=lambda x: ReadHelper().json_deserializer(x), group_id="mygroupZ1")
for msg in _consumer:
payload = msg.value
print("data fetched payload------------------")
long_running_task.delay(payload) # Does here need Celery task to put in?
Потребителям Kafka необходимо периодически опрашивать Kafka. Если вы запустите какую-то блокирующую функцию, то она в конечном итоге остановит этого потребителя и перебалансирует группу потребителей, а затем, возможно, перезапустится, пока другой потребитель не возьмет на себя эту функцию. Поэтому, если очереди локальны в памяти, то не гарантируется, что потребитель будет обрабатывать данные, которые он только что потребил. Другими словами, если произойдет ребалансировка, вы можете обрабатывать дублирующиеся события.
В противном случае, если вы передаете данные в другую очередь, задерживаете это событие, а затем не ждете, вы рискуете переполнить эту очередь, и вам необходимо реализовать обратное давление, например _consumer.pause()
и подождать, пока очередь иссякнет, а затем возобновить опрос из Kafka. У вас также нет возможности управлять смещениями записей Kafka, например, определять сбои процесса очереди, поэтому существует вероятность потери данных, поскольку это просто "запустить и забыть"