Celery "Ни один узел не ответил в течение ограниченного времени"

Я создаю веб-приложение django для дневной торговли, и мне нужно управлять множеством рабочих процессов celery, которые будут выполнять сделки для пользователей. Рабочие процессы Celery создаются как отделенный процесс оболочки с помощью модуля python subprocess. В директории django app я создаю фиктивные .sh bash-файлы в качестве целей, которые будут инициализировать celery workers непосредственно после запуска окна терминала. f"celery -A MyDjangoServer4_0_4 worker --pool solo -Q bot{id}_queue -n bot{id} -l INFO -E" запускает celery worker на терминале, а cmd /k в конце предотвращает немедленное закрытие окна терминала для отладки. Каждый celery worker получает свое уникальное имя и уникальную очередь, поэтому они должны работать независимо и параллельно друг от друга. Я могу порождать и создавать celery worker с новыми именами и даже успешно передавать им задания. Но я не могу воссоздать или повторно инициализировать их, используя то же имя цели, даже после закрытия окна подпроцесса. Я полагаю, что мне нужно как-то выключить/прервать узел, чтобы я мог перезапустить/реинициализировать рабочий подпроцесс, но я получаю ошибку при использовании команды celery control terminate, которая мешает мне это сделать.

Приведенный ниже код start_process запустит окно подпроцесса, используя параметр ID для назначения имени/очереди в целевом сценарии .sh. Он запустит celery worker в новом окне, если имя не было использовано ранее.

def start_process(id, args=None):
    # https://stackoverflow.com/questions/11585168/launch-an-independent-process-with-python
    kwargs = {}

    if platform.system() == 'Windows':
        DETACHED_PROCESS = 0x00000008
        kwargs.update(creationflags=DETACHED_PROCESS)
        kwargs.update(shell=True)
        kwargs.update(close_fds=True)
    elif sys.version_info < (3, 2):  # assume posix
        kwargs.update(preexec_fn=os.setsid)
    else:  # Python 3.2+ and Unix
        kwargs.update(start_new_session=True)

    f = open(f'start_celery_consumer{id}.sh', 'w')
    f.write(f"""
    celery -A MyDjangoServer4_0_4 worker --pool solo -Q bot{id}_queue -n bot{id} -l INFO -E
    cmd /k
    """)
    f.close()

    process = subprocess.Popen([f'start_celery_consumer{id}.sh'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
    CryptoBotsApp.shared.console_ids.append(process)

Если я пытаюсь воссоздать celery worker, я получаю следующую ошибку:

[2022-08-13 18:00:17,927: WARNING/MainProcess] C:\Users\admin\PycharmProjects\MyDjangoServer4_0_4\venv\lib\site-packages\kombu\pidbox.py:70: UserWarning: A node named celery@bot1 is already using this process mailbox!

Maybe you forgot to shutdown the other node or did not do so properly?
Or if you meant to start multiple nodes on the same host please make sure
you give each node a unique node name!

  warnings.warn(W_PIDBOX_IN_USE.format(node=self))

Когда я набираю celery -A MyDjangoServer4_0_4 status, я вижу, что некоторые рабочие инициализированы и работают, несмотря на уничтожение подпроцессов.

->  celery@bot2: OK
->  celery@bot1: OK
->  celery@JOHN-WORKSTATION: OK

НО у меня нет способа сказать узлам, чтобы они закрылись или завершились. Каждый вызов типа celery control terminate celery@bot1 или celery control shutdown celery@bot2 приводит к следующему:

Error: No nodes replied within time constraint

Раскрытие платформы: Я разрабатываю в среде windows, но позже буду развертывать на дистрибутиве linux, так как большинство веб-хостингов используют именно его. Моей программой по умолчанию при открытии .sh файлов является git bash, который, похоже, отлично поддерживает запуск python и celery, так что пока я придерживаюсь этого. Предоставленный код должен быть переносимым и отлично работать на unix-системе, но я не тестировал фрагмент этого кода для подпроцессов на других платформах. Я не могу представить, что "отсоединение" терминала может вызвать какие-либо помехи в проблеме отключения связи с узлом, но подумал, что об этом стоит упомянуть.

Вернуться на верх