Как правильно настроить KafkaBroker, KafkaProducer и KafkaConsumer, которые находятся в разных сетях
возникли сложности в настройки Kafka в моем приложение на Django. У меня есть файл docer-compose в котором контейнер с моим приложением, zookeper и kafka. Приложение будет выступать в роли Kafkaproducer, оно будет записывать в KafkaBroker необходимую информацию о новом пользователе.
version: '3'
services:
app:
build:
context: .
ports:
- "8000:8000"
volumes:
- ./app:/app
container_name: app
command: >
sh -c "python manage.py migrate &&
python manage.py runserver 0.0.0.0:8000"
env_file:
- ./app/.env.dev
depends_on:
- db
- kafka
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
hostname: zookeeper
volumes:
- ./data/zookeeper/data:/data
- ./data/zookeeper/datalog:/datalog
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
expose:
- "9093"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://127.0.0.1:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Данные в KafkaBroker записываются успешно
def send_data(data):
producer = KafkaProducer(bootstrap_servers=['kafka:9093'],
api_version=(2, 8, 1),
value_serializer=lambda x: dumps(x).encode('utf-8'))
data = {'1': 1}
producer.send('topic',value=data)
producer.flush()
В роли KafkaConsumer будет выступает другое приложение на FastAPI, приложение должно получать из брокера информацию о пользователях и совершать рассылку. Файл docker-compose. Не уверен правильно я все делаю на стороне KafkaConsumer
version: '3'
services:
app:
build:
context: .
ports:
- "8010:8010"
volumes:
- .:/fast_api
container_name: fast_api
command: bash -c 'uvicorn fast_api.main:app --host 127.0.0.1 --port 8010'
env_file:
- ./fast_api/.env.dev
from fastapi import FastAPI
import consumer
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/get")
async def root():
consumer.get_data()
return {"message": "Hello World"}
from kafka import KafkaConsumer
import json
import os
def get_data():
consumer = KafkaConsumer('topic',
bootstrap_servers='127.0.0.1:9092', api_version=(2, 8, 1))
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
Буду благодарен любой помощи. Может я что-то с портами напутал или что-то иное. Может неправильно понимаю как должна работать сторона Получателя