Celery мгновенно убивает запущенную задачу

я хочу реализовать api конечную точку для кнопки старт/стоп с помощью celery (когда кнопка запущена, на бэкенде в цикле while(True) выполняется некоторая работа)

celery версии 4.4.6. В качестве брокера я использую rabbit

я застрял с отзывом задачи (мне нужно просто мгновенно убить задачу)

я использую этот код для этого (но он не работает)

app.control.revoke(started_task.id, terminate=True, signal='SIGKILL')

моя задача выглядит так

async def _grind():
    count = 1
    while True:
        count += 1
        print(count)
        # here are many parrallel aihttp requests and database writes...

@shared_task(bind=True)
def grind(self):
    asyncio.run(_grind())

и мне нужно состояние is_running, чтобы предоставить его моему фронтенду

мои тщетные попытки...

идея заключается в том, что при нажатии на стоп на входе в задачу, устанавливается сигнал стоп и при сохранении я вызываю исключение, которое прерывает выполнение задачи (но сигнал сохранения никогда не срабатывает)

class Task(models.Model):
    id = models.CharField(max_length=50, primary_key=True)
    type = models.CharField(max_length=20, choices=TaskType.choices)
    signal = models.CharField(max_length=20, choices=TaskSignal.choices, null=True, blank=True)
    user = models.ForeignKey(CustomUser, related_name='tasks', on_delete=CASCADE)


@shared_task(bind=True)
def grind(self):
    task_id = self.request.id

    def cancel_task(instance: Task, **kwargs):
        print('Never trigger')
        if instance.id == task_id and instance.signal == TaskSignal.STOP:
            post_save.disconnect(cancel_task, sender=Task)
            instance.delete()
            raise Ignore()

    post_save.connect(cancel_task, sender=Task)


    count = 1
    while True:
        count += 1
        print(count)

# ---- on start-stop click
def toggle_grind(user: CustomUser):
    started_task = Task.objects.filter(type=TaskType.GRIND, user=user).first()

    if started_task:
       started_task.signal = TaskSignal.STOP
       started_task.save()
       #  app.control.revoke(started_task.id, terminate=True, signal='SIGKILL')
       return False
    else:
        task_id = grind.apply_async().id
        Task.objects.create(
            id=task_id,
            type=TaskType.GRIND,
            user=user
        )
        return True
Вернуться на верх