App.control.revoke() не изменяет состояние в CELERY, python

у меня есть такое условие

lst_of_active_tasks = insp.active()
        print(lst_of_active_tasks)
        for task in lst_of_active_tasks["celery@DESKTOP-HM2VGD6"]:
            if task["args"] and task["args"][0] == psid:
                app.control.revoke(task["id"])

проверка, если задание уже существует и обрабатывается рабочим. then:

process_and_send_message.apply_async(args=[psid, page.facebook_page_id, page.facebook_papge_token, conversation_dict])

, который должен отправить это задание на обработку рабочему:

@shared_task(bind=True)
def process_and_send_message(self,psid, page_id, page_token, conversation_dict):
    for _ in range(10):
        time.sleep(1)
        result = AsyncResult(self.request.id)
        if result.state == "REVOKED":
            print("REVOKED")
            return "REVOKED"
        print(f"{result.id} :::::::::>>>>>>>  {result.state}")

    
    return("some text")

я хочу, чтобы задача возвращала REVOKED, если состояние задачи отменено. проблема в следующем:

evevn после того, как он был помечен как REVOKED, он все еще обрабатывается:

[2024-02-25 09:34:59,652: INFO/MainProcess] mingle: searching for neighbors
[2024-02-25 09:35:00,659: INFO/MainProcess] mingle: all alone
[2024-02-25 09:35:00,670: INFO/MainProcess] celery@DESKTOP-HM2VGD6 ready.
[2024-02-25 09:35:05,584: INFO/MainProcess] Task apis.tasks.process_and_send_message[13652c4a-9ec3-438e-946d-3b1d91ee592a] received
[2024-02-25 09:35:06,588: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>>  PENDING
[2024-02-25 09:35:07,592: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>>  PENDING
[2024-02-25 09:35:08,594: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>>  PENDING
[2024-02-25 09:35:08,703: INFO/MainProcess] Tasks flagged as revoked: 13652c4a-9ec3-438e-946d-3b1d91ee592a
[2024-02-25 09:35:08,707: INFO/MainProcess] Task apis.tasks.process_and_send_message[bbbbfcea-0680-4f7b-9f4c-dd37ec291e89] received
[2024-02-25 09:35:09,597: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>>  PENDING
[2024-02-25 09:35:09,716: WARNING/ForkPoolWorker-2] bbbbfcea-0680-4f7b-9f4c-dd37ec291e89 :::::::::>>>>>>>  PENDING
[2024-02-25 09:35:10,600: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>>  PENDING
[2024-02-25 09:35:10,719: WARNING/ForkPoolWorker-2] bbbbfcea-0680-4f7b-9f4c-dd37ec291e89 :::::::::>>>>>>>  PENDING

я пытался:

  • app.conf.worker_pool_restarts = True >>>> не работает
  • .
  • app.control.revoke(task["id"], terminate=True) >>>> terminate вызывает остановку всего рабочего. и я боюсь, что если рабочий остановится, то и другие задачи будут сброшены. я хочу, чтобы сбросилась только эта конкретная задача.
Вернуться на верх