Запуск прослушивания kafka в параллельном потоке при запуске проекта django

Я хочу запустить файл, который слушает kafka в параллельном потоке с проектом django. Мой файл manage.py

import asyncio
import os
import sys
import multiprocessing as mt

from kafka.run_kafka import run_kafka


def main():
    """Run administrative tasks."""
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'business_logic.settings')
    try:
        from django.core.management import execute_from_command_line
    except ImportError as exc:
        raise ImportError(
            "Couldn't import Django. Are you sure it's installed and "
            "available on your PYTHONPATH environment variable? Did you "
            "forget to activate a virtual environment?"
        ) from exc
    execute_from_command_line(sys.argv)


if __name__ == '__main__':
    kafka_process = mt.Process(target=asyncio.run(run_kafka()))
    django_process = mt.Process(target=main())

    kafka_process.start()
    django_process.start()

    kafka_process.join()
    django_process.join()

Мой файл run_kafka.py использует Confluent Kafka Python

import os
import django

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'business_logic.settings')
django.setup()

import asyncio

from business_logic.settings import KAFKA_CONF, TOPIC_PROD
from kafka.kafka_consuners import KafkaConsumerBL

async def run_kafka():
    """
    Запуск прослушивания Kafka на всех топиках на все ответы
    """

    consumer = KafkaConsumerBL(KAFKA_CONF)
    consumer.indicate_topic([TOPIC_PROD])
    await consumer.run_consumer(consumer)

if __name__ == '__main__':
    asyncio.run(run_kafka())

Я пытался решить проблему с помощью библиотек threading и multiprocessing. После использования любой из библиотек запускается либо проект django, либо kafka.

При использовании библиотеки многопроцессорной обработки запускается один процесс, но не оба manage.py

...
if __name__ == '__main__':
    kafka_process = mt.Process(target=asyncio.run(run_kafka()))
    django_process = mt.Process(target=main())

    kafka_process.start()
    django_process.start()

    kafka_process.join()
    django_process.join()

При использовании библиотеки потоков снова запускается только один процесс manage.py

...
if __name__ == '__main__':
    threading.Thread(target=asyncio.run(run_kafka())).start()
    threading.Thread(target=main()).start()

Подскажите, где я допустил ошибку, неправильно ли я использовал библиотеку, или мне вообще нужно использовать другой метод?

Решили задачу следующим образом:

if __name__ == "__main__":
    process = subprocess.Popen(['python3', 'kafka_run.py'], stdout=subprocess.PIPE)
    uvicorn.run(app=application, host='0.0.0.0', port=8000)
Вернуться на верх