Публикация сообщений в MQTT с помощью запланированных задач Celery-Beat
Я разрабатываю веб-приложение на Django+Vue вместе с MQTT-сервером. Я хочу реализовать Celery Beat для выполнения запланированных задач, которые я определяю в модели "calendarios", где я ввожу задачи в соответствии с датой, интервалом, повторением и другими. В "eventosCalendarios" я сохраняю каждую задачу с указанием времени и даты выполнения. Моя цель состоит в том, чтобы после выполнения каждой задачи я мог опубликовать ее в определенной теме на сервере MQTT.
Models.py
class calendarios(models.Model):
nombre=models.CharField(max_length= 100, verbose_name='Nombre', unique=True)
acciones=models.ForeignKey(acciones, null=True,blank=True,on_delete=models.CASCADE,verbose_name='Acciones')
fecha_inicio = models.DateField(null=True, blank=True)
fecha_fin = models.DateField(null=True, blank=True)
repeticion= models.CharField(max_length=1, choices=repeticionChoices, default='D') # Ej: 'diaria', 'semanal', 'mensual'
intervalo = models.IntegerField(null=True, blank=True) # Número de días, semanas o meses dependiendo de la repeticion
todoCultivo= models.CharField(max_length=1, choices=SioNO, default='S')
hora_repeticion_1 = models.TimeField(null=True, blank=True) # Hora de la primera repetición diaria
hora_repeticion_2 = models.TimeField(null=True, blank=True) # Hora de la segunda repetición diaria
hora_repeticion_3 = models.TimeField(null=True, blank=True)
cultivo=models.ManyToManyField(cultivo, verbose_name="Cultivos",blank=True)
plantas=models.ManyToManyField(plantas, verbose_name="Plantas",blank=True)
def __str__(self):
return self.nombre
class Meta:
verbose_name = 'Calendario'
verbose_name_plural = 'Calendarios'
db_table = 'calendarios'
ordering = ['id']
class eventosCalendarios(models.Model):
title = models.CharField(max_length=100)
start = models.DateTimeField()
calendario = models.ForeignKey(calendarios, on_delete=models.CASCADE)
allDay = models.BooleanField(default=False)
def __str__(self):
return self.title
class Meta:
verbose_name = 'Evento calendario'
verbose_name_plural = 'Eventos calendarios'
db_table = 'eventos_calendarios'
ordering = ['id']
Я уже реализовал метод в views.py, который позволяет мне публиковать данные при обращении к следующему URL из браузера: "http://localhost:8000/publish/".
View.py
from django.views.decorators.csrf import csrf_exempt
import paho.mqtt.publish as publish
from django.http import HttpResponse
import os
import json
@csrf_exempt
def publish_message(request):
message = {
'interface': 'send_aio'
}
# Definir las credenciales del cliente MQTT
auth = {
'username': '***',
'password': '***'
}
publish.single("comandos", json.dumps(message), hostname=os.environ['MQTT_SERVER'], auth=auth)
return HttpResponse("Published")
Urls.py
path('publish/', publish_message, name= 'publish' )
Теперь я хочу реализовать это с помощью Celery-Beat в tasks.py. На данный момент у меня получилось, что при выполнении задачи она возвращает сообщение в назначенные дату и время. Мне нужно выполнять publish_message каждый раз, когда выполняется задача. Буду признателен за помощь.
Tasks.py
from django.utils import timezone
from .models import eventosCalendarios
from celery import shared_task
@shared_task
def ejecutar_evento_programado():
ahora = timezone.localtime(timezone.now()) # Convertir la hora actual a la zona horaria local
# Obtener el evento más cercano a la hora actual que aún no ha ocurrido
evento_mas_cercano = eventosCalendarios.objects.filter(start__gte=ahora).order_by('start').first()
if evento_mas_cercano:
# Si la fecha y hora del evento coinciden con la fecha y hora actual, ejecutar el evento
if evento_mas_cercano.start == ahora:
ejecutar_evento.delay(evento_mas_cercano.id) # Usamos .delay() para programar la tarea
evento_mas_cercano.delete() # Eliminar el evento después de ejecutarlo
# Calcular el tiempo de espera hasta la hora de inicio del evento
tiempo_espera = evento_mas_cercano.start - ahora
# Programar la ejecución de la tarea para la hora de inicio del evento
ejecutar_evento.apply_async(args=[evento_mas_cercano.id], countdown=tiempo_espera.total_seconds())
# Convertir la hora del evento a la zona horaria local
hora_evento_local = evento_mas_cercano.start.astimezone(timezone.get_current_timezone())
return f"La proxima tarea a ejecutar es '{evento_mas_cercano.title}' a las {hora_evento_local}"
else:
return "No hay eventos pendientes en este momento"
@shared_task
def ejecutar_evento(evento_id):
evento = eventosCalendarios.objects.get(id=evento_id)
# Aquí ejecuta la acción asociada al evento
return f"Ejecutando evento '{evento.title}'"