Celery apply_async возвращает ошибку отсутствия аргумента

У меня есть функция celery с определением -

@async_worker.task(ignore_result=True, queue="data_path")
def publish_msg_from_lock(self, mac: str, data: bytes, gateway_euid: str): 

Ранее это не было задачей сельдерея, он назывался так -

n.publish_msg_from_lock(addr, unhexlify(payload), gateway_euid) 

После того, как я сделал его задачей celery, я обновил вызов таким образом -

n.publish_msg_from_lock.apply_async(args=(addr, unhexlify(payload), gateway_euid),)

Я также пробовал -

n.publish_msg_from_lock.apply_async(args=(addr, unhexlify(payload), gateway_euid), kwargs={})

и

n.publish_msg_from_lock.apply_async(kwargs={"mac": addr, "data": unhexlify(payload), "gateway_euid": gateway_euid})

Но я получаю ошибку - ** Файл "/usr/local/lib/python3.8/dist-packages/celery/app/task.py", строка 531, in apply_async check_arguments(*(args или ()), **(kwargs или {})) TypeError: publish_msg_from_lock() missing 1 required positional argument: 'gateway_euid' **

Можете ли вы помочь исправить это?

Ваша задача не выполняется, потому что она не привязана. Чтобы использовать параметр self в сигнатуре функции, нужно добавить bind=True в декоратор celery.

Пример:

@async_worker.task(ignore_result=True, queue="data_path", bind=True)
def publish_msg_from_lock(self, mac: str, data: bytes, gateway_euid: str): 

Это позволяет получить доступ к некоторой функциональности celery, как описано здесь https://docs.celeryq.dev/en/stable/userguide/tasks.html#bound-tasks

Вы также можете удалить параметр self, если он вам не нужен.

@async_worker.task(ignore_result=True, queue="data_path")
def publish_msg_from_lock(mac: str, data: bytes, gateway_euid: str): 
Вернуться на верх