Celery apply_async возвращает ошибку отсутствия аргумента
У меня есть функция celery с определением -
@async_worker.task(ignore_result=True, queue="data_path")
def publish_msg_from_lock(self, mac: str, data: bytes, gateway_euid: str):
Ранее это не было задачей сельдерея, он назывался так -
n.publish_msg_from_lock(addr, unhexlify(payload), gateway_euid)
После того, как я сделал его задачей celery, я обновил вызов таким образом -
n.publish_msg_from_lock.apply_async(args=(addr, unhexlify(payload), gateway_euid),)
Я также пробовал -
n.publish_msg_from_lock.apply_async(args=(addr, unhexlify(payload), gateway_euid), kwargs={})
и
n.publish_msg_from_lock.apply_async(kwargs={"mac": addr, "data": unhexlify(payload), "gateway_euid": gateway_euid})
Но я получаю ошибку - ** Файл "/usr/local/lib/python3.8/dist-packages/celery/app/task.py", строка 531, in apply_async check_arguments(*(args или ()), **(kwargs или {})) TypeError: publish_msg_from_lock() missing 1 required positional argument: 'gateway_euid' **
Можете ли вы помочь исправить это?
Ваша задача не выполняется, потому что она не привязана. Чтобы использовать параметр self в сигнатуре функции, нужно добавить bind=True в декоратор celery.
Пример:
@async_worker.task(ignore_result=True, queue="data_path", bind=True)
def publish_msg_from_lock(self, mac: str, data: bytes, gateway_euid: str):
Это позволяет получить доступ к некоторой функциональности celery, как описано здесь https://docs.celeryq.dev/en/stable/userguide/tasks.html#bound-tasks
Вы также можете удалить параметр self, если он вам не нужен.
@async_worker.task(ignore_result=True, queue="data_path")
def publish_msg_from_lock(mac: str, data: bytes, gateway_euid: str):