Kafka consumer is missing messages during deployment
My consumers are inherited from the BasicKafkaConsumerV2. During deployments when the pods are rotating I am missing few messages which is visible from the offsets printed after the manual commit(). Kafka is not supposed to miss messages if not committed. What could be the problem here.
run command:
- name: order-consumer
image: KUSTOMIZE_PRIMARY
imagePullPolicy: Always
command:
[
# Invoking wait for pgbouncer script
"/wait-for.sh",
"localhost:6432",
"-s",
"-t",
"30",
"--",
# Starting main process
"ddtrace-run",
"python",
"manage.py",
"run_order-consumer",
]
Consumer:
class BasicKafkaConsumerV2:
group_id = None # str
consumer_name = None # str
newrelic_application = None
topic_handlers = {} # dict
DB_EXCEPTION_RETRY_TIMEOUT = 5 # seconds
DLQ_TOPIC = None
def __init__(self, latest_offset=False):
"""Inits the Consumer and subscribes to the topics"""
self.consumer = KafkaConsumer(
bootstrap_servers=["broker1", "broker2"],
group_id=self.group_id,
enable_auto_commit=False,
auto_offset_reset="latest",
)
self.topics_list = list(self.topic_handlers.keys())
self.consumer.subscribe(self.topics_list)
self.newrelic_application = newrelic.agent.application()
logger.info(
f"{[self.consumer_name]} subscribed to {self.topics_list} with auto_offset_reset {self.auto_offset_reset}"
)
def message_handler_wrapped(
self,
topic: str,
kafka_msg_value: bytes,
headers: dict,
consumed_message=None,
):
"""Processes the message
Also handles any DB exceptions by retrying the event after a period
"""
with tracer.trace(
settings.DD_KAFKA_RESOURCE_NAME,
service=settings.DD_SERVICE,
resource=self.group_id,
span_type="consumer",
) as span:
try:
json_data = json.loads(kafka_msg_value)
dict_headers = convert_tuple_to_dict(headers)
span.set_tag("topic", topic)
span.set_tag("event", self.get_event_name(json_data))
self.message_handler(topic, json_data, dict_headers)
except (InterfaceError, OperationalError) as e:
"""Sleep for sometime to allow the DB to heal
This will essentially infinitely loop (further processing of events is blocked)
"""
logger.info(f"[{self.consumer_name}] DB Exception: {e}")
span.set_tag("type", "retry")
span.set_exc_info(type(e), e, e.__traceback__)
time.sleep(self.DB_EXCEPTION_RETRY_TIMEOUT)
self.message_handler_wrapped(
topic, kafka_msg_value, headers, consumed_message
)
except Exception as e:
logger.exception(f"[{self.consumer_name}] Exception: {e}")
span.set_tag("type", "error")
span.set_exc_info(type(e), e, e.__traceback__)
sentry_sdk.capture_exception(e)
def message_handler(self, topic: str, data: dict, headers: dict):
"""Handles the message"""
event = self.get_event_name(data)
topic_handler = self.topic_handlers.get(topic)
topic_handler.handle_message(event, data, headers)
def start_consumer(self):
"""Starts consuming messages on the topic"""
logger.info(f"Consumer [{self.consumer_name}] is starting consuming")
for msg in self.consumer:
with LogGuidSetter() as _:
self.message_handler_wrapped(
msg.topic, msg.value, msg.headers, msg
)
self.consumer.commit()
logger.info(
f"[{self.consumer_name}] Consumed message from partition: {msg.partition} offset: {msg.offset} with key: {msg.key}"
)
def get_event_name(self, data):
return data.get("event") or data.get("event_name")
class TopicEventHandler:
topic = None
event_handler_mapping = {} # event name and their fn handlers
def handle_message(self, event, data, headers):
"""Handles the message"""
event_handler = getattr(
self, self.event_handler_mapping.get(event, ""), None
)
if event_handler is None:
logger.info(f"Topic <{self.topic}> unhandled event : {event}")
return
event_handler(data, headers)