UnboundLocalError: локальная переменная 'client' упоминается перед присвоением в задаче Celery

Я работаю над проектом Django с Celery и Channels для обработки MQTT-соединений. У меня есть задача Celery (mqtt_client_task), которая подключается к MQTT-брокеру, и я сталкиваюсь со следующей ошибкой:

File "/home/<user>/../../iot/tasks.py", line 30, in mqtt_client_task
    logger.error("Failed to connect, return code %d", rc)
UnboundLocalError: local variable 'client' referenced before assignment

Вот упрощенная версия соответствующего кода:

tasks.py:

# iot/task.py

from celery import shared_task
from paho.mqtt import client as mqtt_client_module
from asgiref import sync
from channels.layers import get_channel_layer
from django.conf import settings

import logging
from django.core.exceptions import ObjectDoesNotExist
from django.db import close_old_connections

logger = logging.getLogger(__name__)


@shared_task(bind=True)
def mqtt_client_task(self, device_client_id):
    from iot.models import Device

    try:
        device = Device.objects.get(client_id=device_client_id)
        topic = device.topic
    except ObjectDoesNotExist as e:
        logger.error(f"Device not found: {device_client_id}")
        return

    def on_connect(client, userdata, flags, rc):
        nonlocal mqtt_client_instance
        if rc == 0:
            logger.info("Connected to MQTT Broker!")
        else:
            logger.error("Failed to connect, return code %d", rc)

        client.subscribe(topic)

    def on_message(client, userdata, msg):
        logger.info(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

        channel_layer = get_channel_layer()
        sync.async_to_sync(channel_layer.group_send)(
            f"{device.id}", {"type": "device.message", "message": msg.payload.decode()}
        )

    mqtt_client_instance = mqtt_client_module.Client(device_client_id)
    mqtt_client_instance.on_connect = on_connect
    mqtt_client_instance.on_message = on_message

    try:
        mqtt_client_instance.connect(
            settings.MQTT_HOST, settings.MQTT_PORT, settings.MQTT_KEEPALIVE
        )
        mqtt_client_instance.loop_start()
    except Exception as e:
        logger.error(f"Failed to connect to MQTT Broker: {e}")
        return
    finally:
        close_old_connections()


Consumer.py

# iot/channels/consumers.py

import json
from channels.generic.websocket import AsyncWebsocketConsumer

from iot.tasks import mqtt_client_task


class DeviceConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.device_id = self.scope["url_route"]["kwargs"]["device_id"]
        self.room_group_name = f"{self.device_id}"

        await self.channel_layer.group_add(self.room_group_name, self.channel_name)

        await self.accept()

        client_id = self.device_id

        self.run_mqtt_task = mqtt_client_task.delay(client_id)

    async def on_message(self, event):
        message = event["message"]
        await self.send(text_data=json.dumps({"message": message}))

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
        if hasattr(self, "mqtt_client_task_result"):
            self.mqtt_client_task_result.revoke(terminate=True)

    async def on_receive(self, text_data):
        pass


Полный журнал ошибок:

[2024-02-03 08:35:15,194: INFO/MainProcess] Task iot.tasks.mqtt_client_task[68bfba44-85bc-4b81-b02c-118d956e4e61] received
[2024-02-03 08:35:15,208: ERROR/ForkPoolWorker-4] Task iot.tasks.mqtt_client_task[68bfba44-85bc-4b81-b02c-118d956e4e61] raised unexpected: UnboundLocalError("local variable 'client' referenced before assignment")
Traceback (most recent call last):
  File "/home/sabbir/zenith-sys/zenith/.venv/lib/python3.10/site-packages/celery/app/trace.py", line 477, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/sabbir/zenith-sys/zenith/.venv/lib/python3.10/site-packages/celery/app/trace.py", line 760, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/sabbir/zenith-sys/zenith/iot/tasks.py", line 30, in mqtt_client_task
    logger.error("Failed to connect, return code %d", rc)
UnboundLocalError: local variable 'client' referenced before assignment

Мне трудно понять, почему в функции on_connect возникает ошибка UnboundLocalError. Я подозреваю, что это может быть связано с областью видимости клиентской переменной.

Любые соображения или предложения по решению этой проблемы будут высоко оценены!

Дополнительная информация:

Версия Django: 4.2.7 Версия Celery: 5.3.6 Версия Paho MQTT: 1.6.1 Версия Python: 3.10.12 ОС: Ubuntu 22.04.3 LTS на Windows 10 x86_64 (WSL-2) Заранее благодарю за помощь!

Я пробовал: Проверил область видимости клиентской переменной в функции on_connect. Просмотрел документацию Celery и Paho MQTT на предмет наличия необходимой информации. Экспериментировал с различными именами переменных и реструктуризацией кода в функции on_connect. Убедились, что переменная mqtt_client_instance правильно определена и доступна в функции on_connect. Ожидаемый результат:

Я ожидал, что функция on_connect будет корректно ссылаться на клиентскую переменную в своей области видимости и записывать соответствующие сообщения, основываясь на коде возврата при подключении к MQTT-брокеру. Однако ошибка UnboundLocalError указывает на проблему с областью действия переменной, которую я не смог определить.

Вернуться на верх