Завершите предыдущую задачу Celery с тем же идентификатором задачи и запустите ее снова, если она была создана

В моем проекте django я создал класс представления, используя класс TemplateView. Опять же, я использую django channels и сделал класс consumer. Теперь я пытаюсь использовать celery worker для извлечения данных quearyset каждый раз, когда пользователь возвращается на страницу. Но проблема в том, что если пользователь снова обновляет страницу до завершения задачи, то создается еще одна задача, что приводит к перегрузке. Поэтому я использовал revoke для завершения предыдущей задачи. Но я вижу, что отзыв навсегда аннулировал идентификатор задачи. Я не знаю, как это убрать. Потому что я хочу запускать задачу снова всякий раз, когда пользователь вызывает ее.

views.py

class Analytics(LoginRequiredMixin,TemplateView):
    template_name = 'app/analytics.html'
    login_url = '/user/login/'

    def get_context_data(self, **kwargs):
        app.control.terminate(task_id=self.request.user.username+'_analytics')
        print(app.control.inspect().revoked())
        context = super().get_context_data(**kwargs)
        context['sub_title'] = 'Analytics'
        return context

consumers.py

class AppConsumer(AsyncJsonWebsocketConsumer):
    
    async def connect(self):
        await self.accept()
        analytics_queryset_for_selected_devices.apply_async(
            args=[self.scope['user'].username],
            task_id=self.scope['user'].username+'_analytics'
            )

На данный момент я решаю проблему следующим образом. В consumers.py я сделал функцию disconnect, которая отзывает задачу, когда веб-сокет закрывается.

counter = 0 
       
class AppConsumer(AsyncJsonWebsocketConsumer):
    
    async def connect(self):
        await self.accept()
        analytics_queryset_for_selected_devices.apply_async(args=[self.scope['user'].username],
                                                task_id=self.scope['user'].username+str(counter))    

    async def disconnect(self, close_code):
        global counter
        app.control.terminate(task_id=self.scope['user'].username+str(counter), signal='SIGKILL')
        counter += 1
        await self.close()
Счетчик

используется для создания нового уникального идентификатора задачи. Но в этом методе для каждого запроса добавляется новый идентификатор задачи в список отзыва, что приводит к нагрузке на память. Чтобы минимизировать эту проблему, я ограничил размер списка отзыва до 20.

from celery.utils.collections import LimitedSet
from celery.worker import state

state.revoked = LimitedSet(maxlen=20, expires=3600)
Вернуться на верх