Дублирующие задачи Celery, имеющие несколько task_args, не аннулируются
Я написал следующий метод для отзыва дублирующихся задач celery, которые имеют одинаковое имя задачи и аргументы задачи.
from celery import current_app
from celery.utils.log import get_task_logger
from django_celery_results.models import TaskResult
def revoke_duplicate_tasks(task_name, task_args=[], request_id=None):
# Get a list of task ids which are initiated for task_args and are in
# pending/received/started state
celery_logger = get_task_logger(__name__)
task_args = '"' + str(tuple(task_args)) + '"'
celery_logger.info(f'Current Task Args - {task_args}')
celery_logger.info(f'Request ID - {request_id}')
duplicate_tasks = list(TaskResult.objects.filter(
task_name=task_name,
status__in=['PENDING', 'RECEIVED', 'STARTED'],
task_args=task_args
).exclude(
task_id=request_id
).values_list('task_id', flat=True))
celery_logger.info(f'revoking following duplicate tasks - {duplicate_tasks}')
current_app.control.revoke(duplicate_tasks, terminate=True, signal='SIGKILL')
Если я запускаю этот метод через django shell, то duplicate_tasks
имеет список task_id и эти задачи отзываются, но если я запускаю тот же кусок кода в celery как сервис, то duplicate_tasks
имеет пустой список и задачи не отзываются.
Django - 3.2.8 Сельдерей - 5.1.2 Flower - 1.0.0
Я занимаюсь этим уже давно, поэтому любые подсказки будут очень полезны.