App.control.revoke() не изменяет состояние в CELERY, python
у меня есть такое условие
lst_of_active_tasks = insp.active()
print(lst_of_active_tasks)
for task in lst_of_active_tasks["celery@DESKTOP-HM2VGD6"]:
if task["args"] and task["args"][0] == psid:
app.control.revoke(task["id"])
проверка, если задание уже существует и обрабатывается рабочим. then:
process_and_send_message.apply_async(args=[psid, page.facebook_page_id, page.facebook_papge_token, conversation_dict])
, который должен отправить это задание на обработку рабочему:
@shared_task(bind=True)
def process_and_send_message(self,psid, page_id, page_token, conversation_dict):
for _ in range(10):
time.sleep(1)
result = AsyncResult(self.request.id)
if result.state == "REVOKED":
print("REVOKED")
return "REVOKED"
print(f"{result.id} :::::::::>>>>>>> {result.state}")
return("some text")
я хочу, чтобы задача возвращала REVOKED, если состояние задачи отменено. проблема в следующем:
evevn после того, как он был помечен как REVOKED, он все еще обрабатывается:
[2024-02-25 09:34:59,652: INFO/MainProcess] mingle: searching for neighbors
[2024-02-25 09:35:00,659: INFO/MainProcess] mingle: all alone
[2024-02-25 09:35:00,670: INFO/MainProcess] celery@DESKTOP-HM2VGD6 ready.
[2024-02-25 09:35:05,584: INFO/MainProcess] Task apis.tasks.process_and_send_message[13652c4a-9ec3-438e-946d-3b1d91ee592a] received
[2024-02-25 09:35:06,588: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>> PENDING
[2024-02-25 09:35:07,592: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>> PENDING
[2024-02-25 09:35:08,594: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>> PENDING
[2024-02-25 09:35:08,703: INFO/MainProcess] Tasks flagged as revoked: 13652c4a-9ec3-438e-946d-3b1d91ee592a
[2024-02-25 09:35:08,707: INFO/MainProcess] Task apis.tasks.process_and_send_message[bbbbfcea-0680-4f7b-9f4c-dd37ec291e89] received
[2024-02-25 09:35:09,597: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>> PENDING
[2024-02-25 09:35:09,716: WARNING/ForkPoolWorker-2] bbbbfcea-0680-4f7b-9f4c-dd37ec291e89 :::::::::>>>>>>> PENDING
[2024-02-25 09:35:10,600: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>> PENDING
[2024-02-25 09:35:10,719: WARNING/ForkPoolWorker-2] bbbbfcea-0680-4f7b-9f4c-dd37ec291e89 :::::::::>>>>>>> PENDING
я пытался:
app.conf.worker_pool_restarts = True
>>>> не работает .
app.control.revoke(task["id"], terminate=True)
>>>> terminate вызывает остановку всего рабочего. и я боюсь, что если рабочий остановится, то и другие задачи будут сброшены. я хочу, чтобы сбросилась только эта конкретная задача.