Как позволить потребителю Kombu вернуть или передать поставленное в очередь сообщение
Я использую Kombu и RabbitMq для постановки в очередь данных JPEG-изображения и передачи или возврата декеированного сообщения обратно, чтобы показать потоковое видео на веб-странице. Потребитель Kombu обнаруживает сообщение, отправленное от производителя, объявленного в файле detection.py, но он не сохраняет и не возвращает никакого сообщения обратно, так что в основном ничего не отображается на веб-странице. Я прочитал документацию Kombu, но мне все еще трудно понять проблему моего кода. Любая помощь будет очень признательна...
color_detection.py:
Следующий код объявляет производителя Kombu и публикует сообщение на биржу, называемую video-exchange.
from kombu import Connection
from kombu import Exchange
from kombu import Producer
from kombu import Queue
import numpy as np
import cv2
import sys
# Default RabbitMQ server URI
rabbit_url = 'amqp://guest:guest@localhost:5672//'
# Kombu Connection
conn = Connection(rabbit_url)
channel = conn.channel()
# Kombu Exchange
# - set delivery_mode to transient to prevent disk writes for faster delivery
exchange = Exchange("video-exchange", type="direct", delivery_mode=1)
# Kombu Producer
producer = Producer(exchange=exchange, channel=channel, routing_key="video")
# Kombu Queue
queue = Queue(name="video-queue", exchange=exchange, routing_key="video")
queue.maybe_bind(conn)
queue.declare()
'''
ML object detection algo(haarcascade)used to identify objects.
the XML file consists of trained Haar Cascade models.
'''
face_cascade = cv2.CascadeClassifier(
'accounts/personal_color/self_detection/haarcascade_frontalface_default.xml')
# 'accounts/personal_color/self_detection/haarcascade_frontalface_default.xml'
# initialize video from the webcam
video = cv2.VideoCapture(1)
# encode_param=[int(cv2.IMWRITE_JPEG_QUALITY),90]
def color_detection(season):
ret, frame = video.read()
if frame is not None and video.isOpened():
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(
gray, scaleFactor=1.1, minNeighbors=6)
if cv2.waitKey(1) == ord("q"):
return;
# Draw the rectangle around each face
if len(faces)!=0:
for (x, y, w, h) in faces:
# Use the stcoordinates to find the center of the face and from that point draw a rectangle of radius w/2 or h/2.
center_coordinates = x + w // 2, y + h // 2
radius = w // 2 # or can be h / 2 or can be anything based on your requirements
mask = np.zeros(frame.shape[:2], dtype="uint8")
# Draw the desired region to crop out in white
cv2.circle(mask, center_coordinates, radius, (255, 255, 255), -1)
masked = cv2.bitwise_and(frame, frame, mask=mask)
if season== "spring":
masked[np.where((masked == [0, 0, 0]).all(axis=2))] = [121, 131, 248]
ret, jpeg = cv2.imencode('.jpg', masked)
producer.publish(jpeg.tobytes(), content_type='image/jpeg', content_encoding='binary') -> publish the video streaming data into the "video-exchange" queue.
else:
print("camera closed",ret)
# video.release()
# cv2.destroyAllWindows()
if __name__ == '__main__':
arg = sys.argv[1]
color_detection(arg)
views.py:
Приведенный ниже код вызывает task.py, который занимается потреблением сообщений из очереди. Я передал переменную frameRes в task.py для сохранения выгруженных сообщений.
@api_view(['GET'])
def spring(request):
try:
frameRes=""
color_detection.color_detection("spring")
task.gen(frameRes)
print("fr",frameRes) -> print nothing
return StreamingHttpResponse(frameRes, content_type='multipart/x-mixed-replace; boundary=frame') -> nothing showing
except Exception as ex:
return HttpResponse(ex)
<
Код декеузирует сообщение и сохраняет данные в frameRes. Отсюда я не могу понять, как вернуть эти данные обратно в views.py.
import os
import cv2
import numpy as np
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
from django.http.response import StreamingHttpResponse
# Default RabbitMQ server URI
rabbit_url = 'amqp://guest:guest@localhost:5672//'
# Kombu Connection
conn = Connection(rabbit_url)
channel = conn.channel()
class Consumer(ConsumerMixin):
def __init__(self, connection, queues, frameRes):
self.connection = connection
self.queues = queues
self.frameRes=frameRes
def get_consumers(self, Consumer, channel):
print("consumer")
return [Consumer(queues=self.queues,
callbacks=[self.on_message],
accept=['image/jpeg'])]
def on_message(self, body, message):
try:
print("on_message")
frameRes=message
print("properties: ",message.properties)
# print(frameRes)
# print("frameRes",frameRes)
# return StreamingHttpResponse(message, content_type='multipart/x-mixed-replace; boundary=frame')
message.ack()
except Exception as ex:
print("exception",ex)
def gen(frameRes):
print("gen: task")
exchange = Exchange("video-exchange", type="direct")
queues = [Queue("video-queue", exchange, routing_key="video")]
with Connection(rabbit_url, heartbeat=5) as conn:
consumer = Consumer(conn, queues,frameRes)
# consumer.consume()
consumer.run()
На консоль выводится следующее:
gen: task
Connected to amqp://guest:**@127.0.0.1:5672//
consumer
on_message
properties: {'content_type': 'image/jpeg', 'content_encoding': 'binary', 'application_headers': {}, 'delivery_mode': 1, 'priority': 0}