Невозможно отправить сообщения через Kafka-Python Producer

Я докеризировал kafka с помощью образа wurstmeister/kafka. Я новичок в Kafka и интеграции с моим проектом django. Я создал команды управления для consumer.py и producer.py.

docker-compose.yml

services:
   zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"

    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "topic_test:1:1"
      restart: unless-stopped
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

consumer.py

import logging

from django.conf import settings
from django.core.management import BaseCommand

from kafka import KafkaConsumer
from json import loads
from time import sleep

logger = logging.getLogger(__name__)


class Command(BaseCommand):

    def handle(self, *args, **options):
        print("Loading Consumer Kafka......")
        consumer = KafkaConsumer(
            'topic_test',
            bootstrap_servers=[f'{settings.KAFKA_HOST}:{settings.KAFKA_PORT}'],
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id='my-group-id',
            value_deserializer=lambda x: loads(x.decode('utf-8')),
            api_version=(0, 11, 5),
        )

        for event in consumer:
            event_data = event.value
            logger.info(f'Data Received on Kafka .... .: {event_data}')
            print(f'Data Received on Kafka .... .: {event_data} \n\n')
            sleep(1)

producer.py

import logging
from time import sleep
from json import dumps

from django.conf import settings
from django.core.management import BaseCommand
from kafka import KafkaProducer

logger = logging.getLogger(__name__)


class Command(BaseCommand):
    def on_send_success(self, record_metadata):
        print(record_metadata.topic)
        print(record_metadata.partition)
        print(record_metadata.offset)

    def on_send_error(self, err):
        logger.error(f'Failed to Send Message on Kafka Consumer : {err}')
        # handle exception

    def handle(self, *args, **options):
        print(f'Sending Kafka Data .... {settings.KAFKA_HOST}:{settings.KAFKA_PORT}')
        producer = KafkaProducer(
            bootstrap_servers=[f'{settings.KAFKA_HOST}:{settings.KAFKA_PORT}'],
            value_serializer=lambda x: dumps(x).encode('utf-8'),
            api_version=(0, 11, 5),
        )

        for _ in range(10):
            data = {'data': 'Hello'}
            producer.send(f'{settings.KAFKA_TOPIC}', value=data).add_callback(
                self.on_send_success).add_errback(self.on_send_error)
            logger.info(f'Sending Kafka Data .... {data}')
            print(f'Sending Kafka Data .... {data}\n\n')
            producer.flush()
            sleep(0.5)

Рассмотрим значения этих переменных, которые я использовал в своих настройках.

settings.KAFKA_TOPIC = 'test_topic'
settings.KAFKA_HOST = 'kafka'
settings.KAFKA_PORT = '29092'
  • docker-compose up zookeeper
  • docker-compose up kafka

Затем я запускаю команду для производителя. python manage.py producer

Я получил эту ошибку в журналах производителя,

enter image description here

** Произведены сообщения в topic-partition TopicPartition(topic='topic_test', partition=0) с базовым смещением -1 log start offset None и ошибкой None. Не удалось отправить сообщение на Kafka Consumer : KafkaTimeoutError: Пакет для TopicPartition(topic='topic_test', partition=0), содержащий 1 запись(и), истек: 30 секунд прошло с момента создания партии плюс время ожидания. В накопителе 1 пакет с истекшим сроком хранения KafkaTimeoutError: Истек срок действия пакета для TopicPartition(topic='topic_test', partition=0), содержащего 1 запись(и): 30 секунд прошло с момента создания пакета плюс время ожидания **

Вернуться на верх