Возможно ли отправить MQTT сообщение с помощью mqttasgi внутри Celery Worker, который использует Redis Backend

Я использую библиотеку mqttasgi в Django для получения большого количества сообщений, обрабатываю их с помощью очереди REDIS и хочу опубликовать эту информацию в другом топике. Возможно ли это? Если да, то как я могу это сделать? На данный момент я только переопределяю функцию publish в моем потребителе, как показано ниже.

from mqttasgi.consumers import MqttConsumer
from mqtt_handler.tasks import processmqttmessage
import json

class MyMqttConsumer(MqttConsumer):

async def connect(self):
    await self.subscribe('application/5/device/+/event/up', 2)

async def receive(self, mqtt_message):
    print('Received a message at topic:', mqtt_message['topic'])
    print('With payload', mqtt_message['payload'])
    print('And QOS:', mqtt_message['qos'])
    print(type(mqtt_message['payload']))
    dictresult = json.loads(mqtt_message['payload'])
    print(type(dictresult))
    print(dictresult)
    jsonresult = json.dumps(dictresult)
    print(type(jsonresult))
    print(jsonresult)
    processmqttmessage.delay(jsonresult)
    print("test")
    pass

async def publish(self, topic, payload, qos=1, retain=False):
    await self.send({
        'type': 'mqtt.pub',
        'mqtt': {
            'topic': topic,
            'payload': payload,
            'qos': qos,
            'retain': retain,
        }
    })

async def disconnect(self):
    await self.unsubscribe('application/5/device/+/event/up')

Я хочу иметь возможность публиковать, но изнутри моей задачи processmqttmessage.

Спасибо.

Pd: @Santiago Ivulich, возможно, вы сможете помочь мне с этим.

Да, это возможно, нет необходимости переопределять публикацию базового потребителя. Я бы рекомендовал возвращать результат, который должен быть опубликован обратно в MQTTAsgi, чтобы поддерживать одно MQTT-соединение. Для этого вы можете использовать группу в канальном уровне, чтобы отправить обратно в mqttasgi то, что нужно отправить.

from mqttasgi.consumers import MqttConsumer
from mqtt_handler.tasks import processmqttmessage
import json

class MyMqttConsumer(MqttConsumer):
    async def connect(self):
        await self.subscribe('application/5/device/+/event/up', 2)
        # Subscribe consumer to channel layer group.
        await self.channel_layer.group_add("my.group", self.channel_name)

    async def receive(self, mqtt_message):
        print('Received a message at topic:', mqtt_message['topic'])
        print('With payload', mqtt_message['payload'])
        print('And QOS:', mqtt_message['qos'])
        print(type(mqtt_message['payload']))
        dictresult = json.loads(mqtt_message['payload'])
        print(type(dictresult))
        print(dictresult)
        jsonresult = json.dumps(dictresult)
        print(type(jsonresult))
        print(jsonresult)
        processmqttmessage.delay(jsonresult)
        print("test")
        pass

    async def publish_results(self, event):
        data = event['text']
        self.publish('my/publish/topic', data, qos=2, retain=False)


    async def disconnect(self):
        await self.unsubscribe('application/5/device/+/event/up')

Из задачи про сельдерей:

from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

def processmqttmessage():
    ...
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)("my.group", 
    {"type": "publish.results", "text":"Hi from outside of the consumer"})

Если несколько потребителей будут работать одновременно, можно программно сгенерировать имя для группы и передать его задаче в качестве параметра.

ВАЖНОЕ ПРИМЕЧАНИЕ: Убедитесь, что вы используете один и тот же бэкенд каналов в проекте celery и mqttasgi.

Вернуться на верх