Как позволить потребителю Kombu вернуть или передать поставленное в очередь сообщение

Я использую Kombu и RabbitMq для постановки в очередь данных JPEG-изображения и передачи или возврата декеированного сообщения обратно, чтобы показать потоковое видео на веб-странице. Потребитель Kombu обнаруживает сообщение, отправленное от производителя, объявленного в файле detection.py, но он не сохраняет и не возвращает никакого сообщения обратно, так что в основном ничего не отображается на веб-странице. Я прочитал документацию Kombu, но мне все еще трудно понять проблему моего кода. Любая помощь будет очень признательна...

color_detection.py:

Следующий код объявляет производителя Kombu и публикует сообщение на биржу, называемую video-exchange.

from kombu import Connection
from kombu import Exchange
from kombu import Producer
from kombu import Queue

import numpy as np
import cv2
import sys

# Default RabbitMQ server URI
rabbit_url = 'amqp://guest:guest@localhost:5672//'
# Kombu Connection
conn = Connection(rabbit_url)
channel = conn.channel()

# Kombu Exchange
# - set delivery_mode to transient to prevent disk writes for faster delivery
exchange = Exchange("video-exchange", type="direct", delivery_mode=1)

# Kombu Producer
producer = Producer(exchange=exchange, channel=channel, routing_key="video")
# Kombu Queue
queue = Queue(name="video-queue", exchange=exchange, routing_key="video") 
queue.maybe_bind(conn)
queue.declare()

'''
ML object detection algo(haarcascade)used to identify objects. 
the XML file consists of trained Haar Cascade models.
'''
face_cascade = cv2.CascadeClassifier(
    'accounts/personal_color/self_detection/haarcascade_frontalface_default.xml')
# 'accounts/personal_color/self_detection/haarcascade_frontalface_default.xml'
# initialize video from the webcam
video = cv2.VideoCapture(1)
# encode_param=[int(cv2.IMWRITE_JPEG_QUALITY),90]
def color_detection(season):
    ret, frame = video.read()
    if frame is not None and video.isOpened():
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(
            gray, scaleFactor=1.1, minNeighbors=6)
        if cv2.waitKey(1) == ord("q"):
            return;
        # Draw the rectangle around each face
        if len(faces)!=0:
            for (x, y, w, h) in faces:
                # Use the stcoordinates to find the center of the face and from that point draw a rectangle of radius w/2 or h/2.
                center_coordinates = x + w // 2, y + h // 2
                radius = w // 2  # or can be h / 2 or can be anything based on your requirements
                mask = np.zeros(frame.shape[:2], dtype="uint8")
                # Draw the desired region to crop out in white
                cv2.circle(mask, center_coordinates, radius, (255, 255, 255), -1)
                masked = cv2.bitwise_and(frame, frame, mask=mask)
                if season== "spring":  
                    masked[np.where((masked == [0, 0, 0]).all(axis=2))] = [121, 131, 248]
                ret, jpeg = cv2.imencode('.jpg', masked)

        producer.publish(jpeg.tobytes(), content_type='image/jpeg', content_encoding='binary') -> publish the video streaming data into the "video-exchange" queue.
    else:
        print("camera closed",ret)
    # video.release()
    # cv2.destroyAllWindows()


if __name__ == '__main__':
    arg = sys.argv[1]
    color_detection(arg)

views.py:

Приведенный ниже код вызывает task.py, который занимается потреблением сообщений из очереди. Я передал переменную frameRes в task.py для сохранения выгруженных сообщений.

@api_view(['GET'])
def spring(request):
    try:
        frameRes=""
        color_detection.color_detection("spring")
        task.gen(frameRes)
        print("fr",frameRes) -> print nothing
        return StreamingHttpResponse(frameRes, content_type='multipart/x-mixed-replace; boundary=frame') -> nothing showing
    except Exception as ex:
        return HttpResponse(ex)
<

Код декеузирует сообщение и сохраняет данные в frameRes. Отсюда я не могу понять, как вернуть эти данные обратно в views.py.

import os
import cv2
import numpy as np
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
from django.http.response import StreamingHttpResponse

# Default RabbitMQ server URI
rabbit_url = 'amqp://guest:guest@localhost:5672//'

# Kombu Connection
conn = Connection(rabbit_url)
channel = conn.channel()


class Consumer(ConsumerMixin):
    def __init__(self, connection, queues, frameRes):
        self.connection = connection
        self.queues = queues
        self.frameRes=frameRes

    def get_consumers(self, Consumer, channel):
        print("consumer")
        return [Consumer(queues=self.queues,
                         callbacks=[self.on_message],
                         accept=['image/jpeg'])]

    def on_message(self, body, message):
        try:
            print("on_message")
            frameRes=message
            print("properties: ",message.properties)

            # print(frameRes)
            # print("frameRes",frameRes)
            # return StreamingHttpResponse(message, content_type='multipart/x-mixed-replace; boundary=frame')
            message.ack()
           
        except Exception as ex:
            print("exception",ex)


def gen(frameRes):
    print("gen: task")
    exchange = Exchange("video-exchange", type="direct")
    queues = [Queue("video-queue", exchange, routing_key="video")]
    with Connection(rabbit_url, heartbeat=5) as conn:
        consumer = Consumer(conn, queues,frameRes)
        # consumer.consume()
        consumer.run()

На консоль выводится следующее:

gen: task
Connected to amqp://guest:**@127.0.0.1:5672//
consumer
on_message
properties:  {'content_type': 'image/jpeg', 'content_encoding': 'binary', 'application_headers': {}, 'delivery_mode': 1, 'priority': 0}
Вернуться на верх