Публикация сообщений в MQTT с помощью запланированных задач Celery-Beat

Я разрабатываю веб-приложение на Django+Vue вместе с MQTT-сервером. Я хочу реализовать Celery Beat для выполнения запланированных задач, которые я определяю в модели "calendarios", где я ввожу задачи в соответствии с датой, интервалом, повторением и другими. В "eventosCalendarios" я сохраняю каждую задачу с указанием времени и даты выполнения. Моя цель состоит в том, чтобы после выполнения каждой задачи я мог опубликовать ее в определенной теме на сервере MQTT.

Models.py

class calendarios(models.Model):
    nombre=models.CharField(max_length= 100, verbose_name='Nombre', unique=True)
    acciones=models.ForeignKey(acciones, null=True,blank=True,on_delete=models.CASCADE,verbose_name='Acciones')
    fecha_inicio = models.DateField(null=True, blank=True)
    fecha_fin = models.DateField(null=True, blank=True)
    repeticion= models.CharField(max_length=1, choices=repeticionChoices, default='D') # Ej: 'diaria', 'semanal', 'mensual'
    intervalo = models.IntegerField(null=True, blank=True)  # Número de días, semanas o meses dependiendo de la repeticion
    todoCultivo= models.CharField(max_length=1, choices=SioNO, default='S')
    hora_repeticion_1 = models.TimeField(null=True, blank=True)  # Hora de la primera repetición diaria
    hora_repeticion_2 = models.TimeField(null=True, blank=True)  # Hora de la segunda repetición diaria
    hora_repeticion_3 = models.TimeField(null=True, blank=True)
    cultivo=models.ManyToManyField(cultivo, verbose_name="Cultivos",blank=True)
    plantas=models.ManyToManyField(plantas, verbose_name="Plantas",blank=True)
    


    def __str__(self):
        return self.nombre

    class Meta:
        verbose_name = 'Calendario'
        verbose_name_plural = 'Calendarios'
        db_table = 'calendarios'
        ordering = ['id']

class eventosCalendarios(models.Model):
    
    title = models.CharField(max_length=100)
    start = models.DateTimeField()
    calendario = models.ForeignKey(calendarios, on_delete=models.CASCADE)
    allDay = models.BooleanField(default=False)
    
    def __str__(self):
        return self.title
 
    class Meta:
        verbose_name = 'Evento calendario'
        verbose_name_plural = 'Eventos calendarios'
        db_table = 'eventos_calendarios'
        ordering = ['id']

Я уже реализовал метод в views.py, который позволяет мне публиковать данные при обращении к следующему URL из браузера: "http://localhost:8000/publish/".

View.py

from django.views.decorators.csrf import csrf_exempt
import paho.mqtt.publish as publish
from django.http import HttpResponse
import os 
import json

@csrf_exempt
def publish_message(request):
    message = {
            'interface': 'send_aio'
        }
    # Definir las credenciales del cliente MQTT
    auth = {
        'username': '***',
        'password': '***'
    }
    publish.single("comandos", json.dumps(message), hostname=os.environ['MQTT_SERVER'], auth=auth)
    return HttpResponse("Published")

Urls.py

path('publish/', publish_message, name= 'publish' )

Теперь я хочу реализовать это с помощью Celery-Beat в tasks.py. На данный момент у меня получилось, что при выполнении задачи она возвращает сообщение в назначенные дату и время. Мне нужно выполнять publish_message каждый раз, когда выполняется задача. Буду признателен за помощь.

Tasks.py

from django.utils import timezone
from .models import eventosCalendarios
from celery import shared_task

@shared_task
def ejecutar_evento_programado():
    ahora = timezone.localtime(timezone.now())  # Convertir la hora actual a la zona horaria local

    # Obtener el evento más cercano a la hora actual que aún no ha ocurrido
    evento_mas_cercano = eventosCalendarios.objects.filter(start__gte=ahora).order_by('start').first()
    

    if evento_mas_cercano:
        # Si la fecha y hora del evento coinciden con la fecha y hora actual, ejecutar el evento
        if evento_mas_cercano.start == ahora:
            ejecutar_evento.delay(evento_mas_cercano.id)  # Usamos .delay() para programar la tarea
            
            evento_mas_cercano.delete()  # Eliminar el evento después de ejecutarlo
            
        # Calcular el tiempo de espera hasta la hora de inicio del evento
        tiempo_espera = evento_mas_cercano.start - ahora

        # Programar la ejecución de la tarea para la hora de inicio del evento
        ejecutar_evento.apply_async(args=[evento_mas_cercano.id], countdown=tiempo_espera.total_seconds())
        # Convertir la hora del evento a la zona horaria local
        hora_evento_local = evento_mas_cercano.start.astimezone(timezone.get_current_timezone())

        return f"La proxima tarea a ejecutar es '{evento_mas_cercano.title}' a las {hora_evento_local}"

    else:
        return "No hay eventos pendientes en este momento"

@shared_task
def ejecutar_evento(evento_id):
    evento = eventosCalendarios.objects.get(id=evento_id)
    # Aquí ejecuta la acción asociada al evento
    return f"Ejecutando evento '{evento.title}'"
Вернуться на верх