Как запустить скрипт rabbitMQ в проекте django rest framework
Итак, у меня есть проект django rest framework, и что я хочу сделать, это передать в очередь rabbitMQ некоторые значения из файла с помощью скрипта python, и как только мой сервер django запустится, я хочу, чтобы проект начал потреблять сообщения, доступные в очереди, и с этими значениями вставлять сущности в мою базу данных. Итак, вот мой продюсерский python скрипт:
import pika, csv, time
url = 'queue_url'
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue='hello')
with open('sensor.csv', newline='') as csvfile:
reader = csv.reader(csvfile, delimiter='\n')
for row in reader:
channel.basic_publish(exchange='', routing_key='hello', body=row[0])
print(row[0])
time.sleep(1)
connection.close()
Этот скрипт не принадлежит проекту django, я буду запускать этот скрипт вне его. Это мой потребительский скрипт:
import pika
from app.models import Measurement
url = 'queue_url'
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
measurement = Measurement.objects.create(sensor=3, energy_consumption=float(str(body)))
measurement.save()
channel.basic_consume('hello',
callback,
auto_ack=True)
channel.start_consuming()
connection.close()
Я хочу интегрировать этот потребительский скрипт в мой проект django, чтобы как только я начинаю помещать сообщения в очередь с помощью скрипта procuder, потребительский скрипт был готов к ним, перехватывал их и вставлял новую сущность для каждого значения. Я поместил сценарий-потребитель внутри одного из моих приложений в моем проекте, но он, похоже, не связан с моим проектом. Например, когда я сохраняю изменения, сервер вообще не обновляется, но когда я сохраняю ченджес в любой другой файл, сервер обновляется. Вот моя модель измерений:
class Measurement(models.Model):
timestamp = models.DateTimeField(auto_now_add=True)
energy_consumption = models.FloatField()
sensor = models.ForeignKey(Sensor, on_delete=models.CASCADE, null=True)
Значения в очереди - это потребление_энергии, и когда я читаю значение из очереди, я хочу создать новый экземпляр измерения в моей базе данных.