Невозможно отправить сообщения через Kafka-Python Producer
Я докеризировал kafka с помощью образа wurstmeister/kafka
. Я новичок в Kafka и интеграции с моим проектом django. Я создал команды управления для consumer.py и producer.py.
docker-compose.yml
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:29092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "topic_test:1:1"
restart: unless-stopped
volumes:
- /var/run/docker.sock:/var/run/docker.sock
consumer.py
import logging
from django.conf import settings
from django.core.management import BaseCommand
from kafka import KafkaConsumer
from json import loads
from time import sleep
logger = logging.getLogger(__name__)
class Command(BaseCommand):
def handle(self, *args, **options):
print("Loading Consumer Kafka......")
consumer = KafkaConsumer(
'topic_test',
bootstrap_servers=[f'{settings.KAFKA_HOST}:{settings.KAFKA_PORT}'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group-id',
value_deserializer=lambda x: loads(x.decode('utf-8')),
api_version=(0, 11, 5),
)
for event in consumer:
event_data = event.value
logger.info(f'Data Received on Kafka .... .: {event_data}')
print(f'Data Received on Kafka .... .: {event_data} \n\n')
sleep(1)
producer.py
import logging
from time import sleep
from json import dumps
from django.conf import settings
from django.core.management import BaseCommand
from kafka import KafkaProducer
logger = logging.getLogger(__name__)
class Command(BaseCommand):
def on_send_success(self, record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(self, err):
logger.error(f'Failed to Send Message on Kafka Consumer : {err}')
# handle exception
def handle(self, *args, **options):
print(f'Sending Kafka Data .... {settings.KAFKA_HOST}:{settings.KAFKA_PORT}')
producer = KafkaProducer(
bootstrap_servers=[f'{settings.KAFKA_HOST}:{settings.KAFKA_PORT}'],
value_serializer=lambda x: dumps(x).encode('utf-8'),
api_version=(0, 11, 5),
)
for _ in range(10):
data = {'data': 'Hello'}
producer.send(f'{settings.KAFKA_TOPIC}', value=data).add_callback(
self.on_send_success).add_errback(self.on_send_error)
logger.info(f'Sending Kafka Data .... {data}')
print(f'Sending Kafka Data .... {data}\n\n')
producer.flush()
sleep(0.5)
Рассмотрим значения этих переменных, которые я использовал в своих настройках.
settings.KAFKA_TOPIC = 'test_topic'
settings.KAFKA_HOST = 'kafka'
settings.KAFKA_PORT = '29092'
docker-compose up zookeeper
docker-compose up kafka
Затем я запускаю команду для производителя.
python manage.py producer
Я получил эту ошибку в журналах производителя,
** Произведены сообщения в topic-partition TopicPartition(topic='topic_test', partition=0) с базовым смещением -1 log start offset None и ошибкой None. Не удалось отправить сообщение на Kafka Consumer : KafkaTimeoutError: Пакет для TopicPartition(topic='topic_test', partition=0), содержащий 1 запись(и), истек: 30 секунд прошло с момента создания партии плюс время ожидания. В накопителе 1 пакет с истекшим сроком хранения KafkaTimeoutError: Истек срок действия пакета для TopicPartition(topic='topic_test', partition=0), содержащего 1 запись(и): 30 секунд прошло с момента создания пакета плюс время ожидания **