Kafka consumer is missing messages during deployment

My consumers are inherited from the BasicKafkaConsumerV2. During deployments when the pods are rotating I am missing few messages which is visible from the offsets printed after the manual commit(). Kafka is not supposed to miss messages if not committed. What could be the problem here.

run command:

      - name: order-consumer
      image: KUSTOMIZE_PRIMARY
      imagePullPolicy: Always
      command:
        [
        # Invoking wait for pgbouncer script
          "/wait-for.sh",
          "localhost:6432",
          "-s",
          "-t",
          "30",
          "--",
        # Starting main process
          "ddtrace-run",
          "python",
          "manage.py",
          "run_order-consumer",
        ]

Consumer:

class BasicKafkaConsumerV2:
    group_id = None  # str
    consumer_name = None  # str
    newrelic_application = None

    topic_handlers = {}  # dict

    DB_EXCEPTION_RETRY_TIMEOUT = 5  # seconds
    DLQ_TOPIC = None

    def __init__(self, latest_offset=False):
        """Inits the Consumer and subscribes to the topics"""
        self.consumer = KafkaConsumer(
            bootstrap_servers=["broker1", "broker2"],
            group_id=self.group_id,
            enable_auto_commit=False,
            auto_offset_reset="latest",
        )
        self.topics_list = list(self.topic_handlers.keys())
        self.consumer.subscribe(self.topics_list)
        self.newrelic_application = newrelic.agent.application()
        logger.info(
            f"{[self.consumer_name]} subscribed to {self.topics_list} with auto_offset_reset {self.auto_offset_reset}"
        )

    def message_handler_wrapped(
        self,
        topic: str,
        kafka_msg_value: bytes,
        headers: dict,
        consumed_message=None,
    ):
        """Processes the message
        Also handles any DB exceptions by retrying the event after a period
        """
        with tracer.trace(
            settings.DD_KAFKA_RESOURCE_NAME,
            service=settings.DD_SERVICE,
            resource=self.group_id,
            span_type="consumer",
        ) as span:
            try:
                json_data = json.loads(kafka_msg_value)
                dict_headers = convert_tuple_to_dict(headers)

                span.set_tag("topic", topic)
                span.set_tag("event", self.get_event_name(json_data))

                self.message_handler(topic, json_data, dict_headers)

            except (InterfaceError, OperationalError) as e:
                """Sleep for sometime to allow the DB to heal
                This will essentially infinitely loop (further processing of events is blocked)
                """
                logger.info(f"[{self.consumer_name}] DB Exception: {e}")
                span.set_tag("type", "retry")
                span.set_exc_info(type(e), e, e.__traceback__)
                time.sleep(self.DB_EXCEPTION_RETRY_TIMEOUT)
                self.message_handler_wrapped(
                    topic, kafka_msg_value, headers, consumed_message
                )

            except Exception as e:
                logger.exception(f"[{self.consumer_name}] Exception: {e}")
                span.set_tag("type", "error")
                span.set_exc_info(type(e), e, e.__traceback__)
                sentry_sdk.capture_exception(e)

    def message_handler(self, topic: str, data: dict, headers: dict):
        """Handles the message"""

        event = self.get_event_name(data)
        topic_handler = self.topic_handlers.get(topic)
        topic_handler.handle_message(event, data, headers)

    def start_consumer(self):
        """Starts consuming messages on the topic"""

        logger.info(f"Consumer [{self.consumer_name}] is starting consuming")

        for msg in self.consumer:
            with LogGuidSetter() as _:
                self.message_handler_wrapped(
                    msg.topic, msg.value, msg.headers, msg
                )
                self.consumer.commit()
                logger.info(
                    f"[{self.consumer_name}] Consumed message from partition: {msg.partition} offset: {msg.offset} with key: {msg.key}"
                )

    def get_event_name(self, data):
        return data.get("event") or data.get("event_name")


    class TopicEventHandler:
        topic = None
        event_handler_mapping = {}  # event name and their fn handlers
    
        def handle_message(self, event, data, headers):
            """Handles the message"""
    
            event_handler = getattr(
                self, self.event_handler_mapping.get(event, ""), None
            )
            if event_handler is None:
                logger.info(f"Topic <{self.topic}> unhandled event : {event}")
                return
    
            event_handler(data, headers)
Вернуться на верх