App.control.revoke() doesn't change the state in CELERY, python

i have this conditional

lst_of_active_tasks = insp.active()
        print(lst_of_active_tasks)
        for task in lst_of_active_tasks["celery@DESKTOP-HM2VGD6"]:
            if task["args"] and task["args"][0] == psid:
                app.control.revoke(task["id"])

checking if the task allready exists and being processed by a worker. then:

process_and_send_message.apply_async(args=[psid, page.facebook_page_id, page.facebook_papge_token, conversation_dict])

which should send this task to be processed by a worker:

@shared_task(bind=True)
def process_and_send_message(self,psid, page_id, page_token, conversation_dict):
    for _ in range(10):
        time.sleep(1)
        result = AsyncResult(self.request.id)
        if result.state == "REVOKED":
            print("REVOKED")
            return "REVOKED"
        print(f"{result.id} :::::::::>>>>>>>  {result.state}")

    
    return("some text")

i want the task to return REVOKED if the task state is revoked. problem is:

evevn after it being flagged as REVOKED it still gets processed:

[2024-02-25 09:34:59,652: INFO/MainProcess] mingle: searching for neighbors
[2024-02-25 09:35:00,659: INFO/MainProcess] mingle: all alone
[2024-02-25 09:35:00,670: INFO/MainProcess] celery@DESKTOP-HM2VGD6 ready.
[2024-02-25 09:35:05,584: INFO/MainProcess] Task apis.tasks.process_and_send_message[13652c4a-9ec3-438e-946d-3b1d91ee592a] received
[2024-02-25 09:35:06,588: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>>  PENDING
[2024-02-25 09:35:07,592: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>>  PENDING
[2024-02-25 09:35:08,594: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>>  PENDING
[2024-02-25 09:35:08,703: INFO/MainProcess] Tasks flagged as revoked: 13652c4a-9ec3-438e-946d-3b1d91ee592a
[2024-02-25 09:35:08,707: INFO/MainProcess] Task apis.tasks.process_and_send_message[bbbbfcea-0680-4f7b-9f4c-dd37ec291e89] received
[2024-02-25 09:35:09,597: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>>  PENDING
[2024-02-25 09:35:09,716: WARNING/ForkPoolWorker-2] bbbbfcea-0680-4f7b-9f4c-dd37ec291e89 :::::::::>>>>>>>  PENDING
[2024-02-25 09:35:10,600: WARNING/ForkPoolWorker-4] 13652c4a-9ec3-438e-946d-3b1d91ee592a :::::::::>>>>>>>  PENDING
[2024-02-25 09:35:10,719: WARNING/ForkPoolWorker-2] bbbbfcea-0680-4f7b-9f4c-dd37ec291e89 :::::::::>>>>>>>  PENDING

i tried:

  • app.conf.worker_pool_restarts = True >>>> didn't work
  • app.control.revoke(task["id"], terminate=True) >>>> terminate causes the whole worker to stop. and i'm afraid if the worker stops. other tasks will be dropped too. i want just that specific task to drop.
Back to Top