Как правильно настроить KafkaBroker, KafkaProducer и KafkaConsumer, которые находятся в разных сетях

возникли сложности в настройки Kafka в моем приложение на Django. У меня есть файл docer-compose в котором контейнер с моим приложением, zookeper и kafka. Приложение будет выступать в роли Kafkaproducer, оно будет записывать в KafkaBroker необходимую информацию о новом пользователе.

version: '3'

services:
  app:
    build:
      context: .
    ports:
      - "8000:8000"
    volumes:
      - ./app:/app
    container_name: app
    command: >
      sh -c "python manage.py migrate &&
             python manage.py runserver 0.0.0.0:8000"
    env_file:
      - ./app/.env.dev
    depends_on:
      - db
      - kafka



  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    hostname: zookeeper
    volumes:
      - ./data/zookeeper/data:/data
      - ./data/zookeeper/datalog:/datalog

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    expose:
      - "9093"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://127.0.0.1:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

Данные в KafkaBroker записываются успешно

def send_data(data):
    producer = KafkaProducer(bootstrap_servers=['kafka:9093'],
                             api_version=(2, 8, 1),
                             value_serializer=lambda x: dumps(x).encode('utf-8'))
    data = {'1': 1}
    producer.send('topic',value=data)
    producer.flush()

В роли KafkaConsumer будет выступает другое приложение на FastAPI, приложение должно получать из брокера информацию о пользователях и совершать рассылку. Файл docker-compose. Не уверен правильно я все делаю на стороне KafkaConsumer

version: '3'

services:
  app:
    build:
      context: .
    ports:
      - "8010:8010"
    volumes:
      - .:/fast_api
    container_name: fast_api
    command: bash -c 'uvicorn fast_api.main:app --host 127.0.0.1 --port 8010'
    env_file:
      - ./fast_api/.env.dev
from fastapi import FastAPI
import consumer
app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello World"}

@app.get("/get")
async def root():
    consumer.get_data()
    return {"message": "Hello World"}
from kafka import KafkaConsumer
import json
import os

def get_data():
    consumer = KafkaConsumer('topic',
                            bootstrap_servers='127.0.0.1:9092', api_version=(2, 8, 1))
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                            message.offset, message.key,
                                            message.value))

Буду благодарен любой помощи. Может я что-то с портами напутал или что-то иное. Может неправильно понимаю как должна работать сторона Получателя

Вернуться на верх