Выполнение нескольких задач celery внутри одной задачи celery
я хочу выполнить несколько связанных задач celery внутри другой задачи celery. Возможно ли это вообще?
Код ниже даст лучшее представление о том, чего я пытаюсь достичь
@shared_task(ignore_result=True)
def test_job():
try:
another_test_job.delay()
yet_another_test_job.delay()
except Exception as e:
logger.error(f'Something happened when collecting edr data ===> {e}')
raise
@shared_task(ignore_result=True)
def another_test_job():
try:
print('this is another test')
except Exception as e:
logger.error(f'Something happened when collecting edr data ===> {e}')
raise
@shared_task(ignore_result=True)
def yet_another_test_job():
try:
print('this is yet another test')
except Exception as e:
logger.error(f'Something happened when collecting edr data ===> {e}')
raise
Моя цель - попытаться консолидировать количество задач, которые я планирую, по существу распределяя мои задачи под одну задачу.
Я попробовал выполнить задание вручную, используя test_job.apply()
приводит к
In [2]: test_job.apply()
2021-09-30 13:57:49,196 amqp DEBUG Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'someclustername', 'copyright': 'Copyright (c) 2007-2020 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 22.3.4.11', 'product': 'RabbitMQ', 'version': '3.8.9'}, mechanisms: [b'PLAIN'], locales: ['en_US']
2021-09-30 13:57:49,259 amqp DEBUG using channel_id: 1
2021-09-30 13:57:49,307 amqp DEBUG Channel open
2021-09-30 13:57:49,431 celery.app.trace INFO Task myapp.tasks.jobs.test_job[3d0c3a93-157c-46bf-b592-ebce0850c49e] succeeded in 0.4412529049999989s: None
Out[2]: <EagerResult: 3d0c3a93-157c-46bf-b592-ebce0850c49e>
если я тестирую отдельные задания напрямую, они работают просто отлично... another_test_job.apply()
In [3]: another_test_job.apply()
this is another test
2021-09-30 13:59:38,601 celery.app.trace INFO Task myapp.tasks.jobs.another_test_job[6499aaff-bc88-4df1-9f77-78ff68f29915] succeeded in 0.00015368999999054722s: None
Out[3]: <EagerResult: 6499aaff-bc88-4df1-9f77-78ff68f29915>
Может кто-нибудь объяснить мне, что происходит?
К сожалению, это не работает таким образом. Вам нужно создать Celery Group и инициировать выполнение группы. Чтобы иметь возможность "связать" все выполнения, передайте переменные задачам при добавлении их в группу, чтобы вы могли заставить их все отчитываться перед центральным ресурсом, используя эту ссылку. Затем проверьте центральный ресурс, чтобы узнать, все ли задачи в группе уже отчитались.
Я вставлю сюда код, который я использовал для управления задачами Celery Group. Вам действительно нужен центральный ресурс (например, база данных) и сделать так, чтобы ваши задачи отчитывались перед ним, чтобы вы могли зайти туда и проверить ход выполнения. Но код, которым я делюсь, может помочь вам понять, как управлять Celery Group. Я использую их, чтобы не заниматься техническими вопросами Celery при кодировании.
Зависимости Django, которые вы можете удалить и поменять местами для нужд вашего проекта.