Django.fun

How to let Kombu consumer return or stream queued message

I am using Kombu and RabbitMq to queue the JPEG image data and stream or return the dequeued message back so it shows the video streaming on the webpage. Kombu consumer detects the message sent from the producer declared in detection.py file, but it does not save or return any message back so basically nothing is showing on the web. I read Kombu documentation but still I have trouble understanding the problem of my code. Any help would be greatly appreciated..

color_detection.py:

The following code declares Kombu producer and publish the message to the exchange called video-exchange.

from kombu import Connection
from kombu import Exchange
from kombu import Producer
from kombu import Queue

import numpy as np
import cv2
import sys

# Default RabbitMQ server URI
rabbit_url = 'amqp://guest:guest@localhost:5672//'
# Kombu Connection
conn = Connection(rabbit_url)
channel = conn.channel()

# Kombu Exchange
# - set delivery_mode to transient to prevent disk writes for faster delivery
exchange = Exchange("video-exchange", type="direct", delivery_mode=1)

# Kombu Producer
producer = Producer(exchange=exchange, channel=channel, routing_key="video")
# Kombu Queue
queue = Queue(name="video-queue", exchange=exchange, routing_key="video") 
queue.maybe_bind(conn)
queue.declare()

'''
ML object detection algo(haarcascade)used to identify objects. 
the XML file consists of trained Haar Cascade models.
'''
face_cascade = cv2.CascadeClassifier(
    'accounts/personal_color/self_detection/haarcascade_frontalface_default.xml')
# 'accounts/personal_color/self_detection/haarcascade_frontalface_default.xml'
# initialize video from the webcam
video = cv2.VideoCapture(1)
# encode_param=[int(cv2.IMWRITE_JPEG_QUALITY),90]
def color_detection(season):
    ret, frame = video.read()
    if frame is not None and video.isOpened():
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(
            gray, scaleFactor=1.1, minNeighbors=6)
        if cv2.waitKey(1) == ord("q"):
            return;
        # Draw the rectangle around each face
        if len(faces)!=0:
            for (x, y, w, h) in faces:
                # Use the stcoordinates to find the center of the face and from that point draw a rectangle of radius w/2 or h/2.
                center_coordinates = x + w // 2, y + h // 2
                radius = w // 2  # or can be h / 2 or can be anything based on your requirements
                mask = np.zeros(frame.shape[:2], dtype="uint8")
                # Draw the desired region to crop out in white
                cv2.circle(mask, center_coordinates, radius, (255, 255, 255), -1)
                masked = cv2.bitwise_and(frame, frame, mask=mask)
                if season== "spring":  
                    masked[np.where((masked == [0, 0, 0]).all(axis=2))] = [121, 131, 248]
                ret, jpeg = cv2.imencode('.jpg', masked)

        producer.publish(jpeg.tobytes(), content_type='image/jpeg', content_encoding='binary') -> publish the video streaming data into the "video-exchange" queue.
    else:
        print("camera closed",ret)
    # video.release()
    # cv2.destroyAllWindows()


if __name__ == '__main__':
    arg = sys.argv[1]
    color_detection(arg)

views.py:

The code below calls task.py which deals with consuming messages from the queue. I passed frameRes variable to task.py to save the dequeued messages.

@api_view(['GET'])
def spring(request):
    try:
        frameRes=""
        color_detection.color_detection("spring")
        task.gen(frameRes)
        print("fr",frameRes) -> print nothing
        return StreamingHttpResponse(frameRes, content_type='multipart/x-mixed-replace; boundary=frame') -> nothing showing
    except Exception as ex:
        return HttpResponse(ex)

task.py

The code dequeues the message and frameRes save the data. From here, I can't figure out how to return this data back to views.py.

import os
import cv2
import numpy as np
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
from django.http.response import StreamingHttpResponse

# Default RabbitMQ server URI
rabbit_url = 'amqp://guest:guest@localhost:5672//'

# Kombu Connection
conn = Connection(rabbit_url)
channel = conn.channel()


class Consumer(ConsumerMixin):
    def __init__(self, connection, queues, frameRes):
        self.connection = connection
        self.queues = queues
        self.frameRes=frameRes

    def get_consumers(self, Consumer, channel):
        print("consumer")
        return [Consumer(queues=self.queues,
                         callbacks=[self.on_message],
                         accept=['image/jpeg'])]

    def on_message(self, body, message):
        try:
            print("on_message")
            frameRes=message
            print("properties: ",message.properties)

            # print(frameRes)
            # print("frameRes",frameRes)
            # return StreamingHttpResponse(message, content_type='multipart/x-mixed-replace; boundary=frame')
            message.ack()
           
        except Exception as ex:
            print("exception",ex)


def gen(frameRes):
    print("gen: task")
    exchange = Exchange("video-exchange", type="direct")
    queues = [Queue("video-queue", exchange, routing_key="video")]
    with Connection(rabbit_url, heartbeat=5) as conn:
        consumer = Consumer(conn, queues,frameRes)
        # consumer.consume()
        consumer.run()

The following is printed on the console:

gen: task
Connected to amqp://guest:**@127.0.0.1:5672//
consumer
on_message
properties:  {'content_type': 'image/jpeg', 'content_encoding': 'binary', 'application_headers': {}, 'delivery_mode': 1, 'priority': 0}

Tutorials

Современный Python: начинаем проект с pyenv и poetry

Настройка проекта Python — виртуальные среды и управление пакетами

Использование requests в Python — тайм-ауты, повторы, хуки

Понимание декораторов в Python

ProcessPoolExecutor в Python: полное руководство

map() против submit() с ProcessPoolExecutor в Python

Понимание атрибутов, словарей и слотов в Python

Полное руководство по slice в Python

Выпуск Django 4.0

Безопасное развертывание приложения Django с помощью Gunicorn, Nginx и HTTPS

Автоматический повтор невыполненных задач Celery

Django REST Framework и Elasticsearch

Докеризация Django с помощью Postgres, Gunicorn и Nginx

Асинхронные задачи с Django и Celery

Релизы безопасности Django: 3.2.4, 3.1.12 и 2.2.24

Выпуски исправлений ошибок Django: 3.2.3, 3.1.11 и 2.2.23

Эффективное использование сериализаторов Django REST Framework

Выпуски безопасности Django: 3.2.2, 3.1.10 и 2.2.22

Выпущенные релизы безопасности Django: 3.2.1, 3.1.9 и 2.2.21

Обработка периодических задач в Django с помощью Celery и Docker

View all tutorials →