I am using Kombu and RabbitMq to queue the JPEG image data and stream or return the dequeued message back so it shows the video streaming on the webpage. Kombu consumer detects the message sent from the producer declared in detection.py file, but it does not save or return any message back so basically nothing is showing on the web. I read Kombu documentation but still I have trouble understanding the problem of my code. Any help would be greatly appreciated..
color_detection.py:
The following code declares Kombu producer and publish the message to the exchange called video-exchange.
from kombu import Connection
from kombu import Exchange
from kombu import Producer
from kombu import Queue
import numpy as np
import cv2
import sys
# Default RabbitMQ server URI
rabbit_url = 'amqp://guest:guest@localhost:5672//'
# Kombu Connection
conn = Connection(rabbit_url)
channel = conn.channel()
# Kombu Exchange
# - set delivery_mode to transient to prevent disk writes for faster delivery
exchange = Exchange("video-exchange", type="direct", delivery_mode=1)
# Kombu Producer
producer = Producer(exchange=exchange, channel=channel, routing_key="video")
# Kombu Queue
queue = Queue(name="video-queue", exchange=exchange, routing_key="video")
queue.maybe_bind(conn)
queue.declare()
'''
ML object detection algo(haarcascade)used to identify objects.
the XML file consists of trained Haar Cascade models.
'''
face_cascade = cv2.CascadeClassifier(
'accounts/personal_color/self_detection/haarcascade_frontalface_default.xml')
# 'accounts/personal_color/self_detection/haarcascade_frontalface_default.xml'
# initialize video from the webcam
video = cv2.VideoCapture(1)
# encode_param=[int(cv2.IMWRITE_JPEG_QUALITY),90]
def color_detection(season):
ret, frame = video.read()
if frame is not None and video.isOpened():
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(
gray, scaleFactor=1.1, minNeighbors=6)
if cv2.waitKey(1) == ord("q"):
return;
# Draw the rectangle around each face
if len(faces)!=0:
for (x, y, w, h) in faces:
# Use the stcoordinates to find the center of the face and from that point draw a rectangle of radius w/2 or h/2.
center_coordinates = x + w // 2, y + h // 2
radius = w // 2 # or can be h / 2 or can be anything based on your requirements
mask = np.zeros(frame.shape[:2], dtype="uint8")
# Draw the desired region to crop out in white
cv2.circle(mask, center_coordinates, radius, (255, 255, 255), -1)
masked = cv2.bitwise_and(frame, frame, mask=mask)
if season== "spring":
masked[np.where((masked == [0, 0, 0]).all(axis=2))] = [121, 131, 248]
ret, jpeg = cv2.imencode('.jpg', masked)
producer.publish(jpeg.tobytes(), content_type='image/jpeg', content_encoding='binary') -> publish the video streaming data into the "video-exchange" queue.
else:
print("camera closed",ret)
# video.release()
# cv2.destroyAllWindows()
if __name__ == '__main__':
arg = sys.argv[1]
color_detection(arg)
views.py:
The code below calls task.py which deals with consuming messages from the queue. I passed frameRes variable to task.py to save the dequeued messages.
@api_view(['GET'])
def spring(request):
try:
frameRes=""
color_detection.color_detection("spring")
task.gen(frameRes)
print("fr",frameRes) -> print nothing
return StreamingHttpResponse(frameRes, content_type='multipart/x-mixed-replace; boundary=frame') -> nothing showing
except Exception as ex:
return HttpResponse(ex)
task.py
The code dequeues the message and frameRes save the data. From here, I can't figure out how to return this data back to views.py.
import os
import cv2
import numpy as np
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
from django.http.response import StreamingHttpResponse
# Default RabbitMQ server URI
rabbit_url = 'amqp://guest:guest@localhost:5672//'
# Kombu Connection
conn = Connection(rabbit_url)
channel = conn.channel()
class Consumer(ConsumerMixin):
def __init__(self, connection, queues, frameRes):
self.connection = connection
self.queues = queues
self.frameRes=frameRes
def get_consumers(self, Consumer, channel):
print("consumer")
return [Consumer(queues=self.queues,
callbacks=[self.on_message],
accept=['image/jpeg'])]
def on_message(self, body, message):
try:
print("on_message")
frameRes=message
print("properties: ",message.properties)
# print(frameRes)
# print("frameRes",frameRes)
# return StreamingHttpResponse(message, content_type='multipart/x-mixed-replace; boundary=frame')
message.ack()
except Exception as ex:
print("exception",ex)
def gen(frameRes):
print("gen: task")
exchange = Exchange("video-exchange", type="direct")
queues = [Queue("video-queue", exchange, routing_key="video")]
with Connection(rabbit_url, heartbeat=5) as conn:
consumer = Consumer(conn, queues,frameRes)
# consumer.consume()
consumer.run()
The following is printed on the console:
gen: task
Connected to amqp://guest:**@127.0.0.1:5672//
consumer
on_message
properties: {'content_type': 'image/jpeg', 'content_encoding': 'binary', 'application_headers': {}, 'delivery_mode': 1, 'priority': 0}