UnboundLocalError: локальная переменная 'client' упоминается перед присвоением в задаче Celery
Я работаю над проектом Django с Celery и Channels для обработки MQTT-соединений. У меня есть задача Celery (mqtt_client_task), которая подключается к MQTT-брокеру, и я сталкиваюсь со следующей ошибкой:
File "/home/<user>/../../iot/tasks.py", line 30, in mqtt_client_task
logger.error("Failed to connect, return code %d", rc)
UnboundLocalError: local variable 'client' referenced before assignment
Вот упрощенная версия соответствующего кода:
tasks.py:
# iot/task.py
from celery import shared_task
from paho.mqtt import client as mqtt_client_module
from asgiref import sync
from channels.layers import get_channel_layer
from django.conf import settings
import logging
from django.core.exceptions import ObjectDoesNotExist
from django.db import close_old_connections
logger = logging.getLogger(__name__)
@shared_task(bind=True)
def mqtt_client_task(self, device_client_id):
from iot.models import Device
try:
device = Device.objects.get(client_id=device_client_id)
topic = device.topic
except ObjectDoesNotExist as e:
logger.error(f"Device not found: {device_client_id}")
return
def on_connect(client, userdata, flags, rc):
nonlocal mqtt_client_instance
if rc == 0:
logger.info("Connected to MQTT Broker!")
else:
logger.error("Failed to connect, return code %d", rc)
client.subscribe(topic)
def on_message(client, userdata, msg):
logger.info(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
channel_layer = get_channel_layer()
sync.async_to_sync(channel_layer.group_send)(
f"{device.id}", {"type": "device.message", "message": msg.payload.decode()}
)
mqtt_client_instance = mqtt_client_module.Client(device_client_id)
mqtt_client_instance.on_connect = on_connect
mqtt_client_instance.on_message = on_message
try:
mqtt_client_instance.connect(
settings.MQTT_HOST, settings.MQTT_PORT, settings.MQTT_KEEPALIVE
)
mqtt_client_instance.loop_start()
except Exception as e:
logger.error(f"Failed to connect to MQTT Broker: {e}")
return
finally:
close_old_connections()
Consumer.py
# iot/channels/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from iot.tasks import mqtt_client_task
class DeviceConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.device_id = self.scope["url_route"]["kwargs"]["device_id"]
self.room_group_name = f"{self.device_id}"
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
await self.accept()
client_id = self.device_id
self.run_mqtt_task = mqtt_client_task.delay(client_id)
async def on_message(self, event):
message = event["message"]
await self.send(text_data=json.dumps({"message": message}))
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
if hasattr(self, "mqtt_client_task_result"):
self.mqtt_client_task_result.revoke(terminate=True)
async def on_receive(self, text_data):
pass
Полный журнал ошибок:
[2024-02-03 08:35:15,194: INFO/MainProcess] Task iot.tasks.mqtt_client_task[68bfba44-85bc-4b81-b02c-118d956e4e61] received
[2024-02-03 08:35:15,208: ERROR/ForkPoolWorker-4] Task iot.tasks.mqtt_client_task[68bfba44-85bc-4b81-b02c-118d956e4e61] raised unexpected: UnboundLocalError("local variable 'client' referenced before assignment")
Traceback (most recent call last):
File "/home/sabbir/zenith-sys/zenith/.venv/lib/python3.10/site-packages/celery/app/trace.py", line 477, in trace_task
R = retval = fun(*args, **kwargs)
File "/home/sabbir/zenith-sys/zenith/.venv/lib/python3.10/site-packages/celery/app/trace.py", line 760, in __protected_call__
return self.run(*args, **kwargs)
File "/home/sabbir/zenith-sys/zenith/iot/tasks.py", line 30, in mqtt_client_task
logger.error("Failed to connect, return code %d", rc)
UnboundLocalError: local variable 'client' referenced before assignment
Мне трудно понять, почему в функции on_connect возникает ошибка UnboundLocalError. Я подозреваю, что это может быть связано с областью видимости клиентской переменной.
Любые соображения или предложения по решению этой проблемы будут высоко оценены!
Дополнительная информация:
Версия Django: 4.2.7 Версия Celery: 5.3.6 Версия Paho MQTT: 1.6.1 Версия Python: 3.10.12 ОС: Ubuntu 22.04.3 LTS на Windows 10 x86_64 (WSL-2) Заранее благодарю за помощь!
Я пробовал: Проверил область видимости клиентской переменной в функции on_connect. Просмотрел документацию Celery и Paho MQTT на предмет наличия необходимой информации. Экспериментировал с различными именами переменных и реструктуризацией кода в функции on_connect. Убедились, что переменная mqtt_client_instance правильно определена и доступна в функции on_connect. Ожидаемый результат:
Я ожидал, что функция on_connect будет корректно ссылаться на клиентскую переменную в своей области видимости и записывать соответствующие сообщения, основываясь на коде возврата при подключении к MQTT-брокеру. Однако ошибка UnboundLocalError указывает на проблему с областью действия переменной, которую я не смог определить.