Celery неожиданно закрывает TCP-соединение
Я использую RabbitMQ 3.8.2 с Erlang 22.2.7 и столкнулся с проблемой при потреблении задач. Моя конфигурация - django-celery-rabbitmq. При публикации сообщений в очередь все идет нормально, пока длина очереди не достигнет 1200 сообщений. После этого RabbitMQ начинает закрывать AMQP соединение со следующими ошибками:
...
2022-11-01 09:35:25.327 [info] <0.20608.9> accepting AMQP connection <0.20608.9> (185.121.83.107:60447 -> 185.121.83.116:5672)
2022-11-01 09:35:25.483 [info] <0.20608.9> connection <0.20608.9> (185.121.83.107:60447 -> 185.121.83.116:5672): user 'rabbit_admin' authenticated and granted access to vhost '/'
...
2022-11-01 09:36:59.129 [warning] <0.19994.9> closing AMQP connection <0.19994.9> (185.121.83.108:36149 -> 185.121.83.116:5672, vhost: '/', user: 'rabbit_admin'):
client unexpectedly closed TCP connection
...
[error] <0.11162.9> closing AMQP connection <0.11162.9> (185.121.83.108:57631 -> 185.121.83.116:5672):
{writer,send_failed,{error,enotconn}}
...
2022-11-01 09:35:48.256 [error] <0.20201.9> closing AMQP connection <0.20201.9> (185.121.83.108:50058 -> 185.121.83.116:5672):
{inet_error,enotconn}
...
Затем потребитель django-celery исчезает из списка очередей, сообщения становятся "готовыми", а капсулы celery не могут получить сообщение после завершения работы со следующей ошибкой:
ERROR: [2022-11-01 09:20:23] /usr/src/app/project/celery.py:114 handle_message Error while handling Rabbit task: [Errno 104] Connection reset by peer
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/amqp/connection.py", line 514, in channel
return self.channels[channel_id]
KeyError: None
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/src/app/project/celery.py", line 76, in handle_message
message.ack()
File "/usr/local/lib/python3.10/site-packages/kombu/message.py", line 125, in ack
self.channel.basic_ack(self.delivery_tag, multiple=multiple)
File "/usr/local/lib/python3.10/site-packages/amqp/channel.py", line 1407, in basic_ack
return self.send_method(
File "/usr/local/lib/python3.10/site-packages/amqp/abstract_channel.py", line 70, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/usr/local/lib/python3.10/site-packages/amqp/method_framing.py", line 186, in write_frame
write(buffer_store.view[:offset])
File "/usr/local/lib/python3.10/site-packages/amqp/transport.py", line 347, in write
self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
Я заметил, что размер сообщения также влияет на это поведение. В приведенном выше случае в каждом сообщении примерно 1000-1500 символов. Если я уменьшу его до 50 символов, то порог, при котором RabbitMQ начинает закрывать AMQP соединение, сдвинется до 4000-5000 сообщений.
Я подозреваю, что проблема в нехватке ресурсов для RabbitMQ, но не знаю, как найти, что именно идет не так. Если я запущу htop на сервере, я увижу, что 2 доступных CPU не имеют высокой нагрузки в любое время (загружены менее чем на 20% каждый) и RAM используется 400mb / 3840mb. Так что, похоже, ничего не случилось. Есть ли какая-нибудь команда проверки ресурсов для RabbitMQ? Задания не требуют много времени для выполнения, около 10 секунд каждое.
Также, возможно, есть некоторые пропущенные сердцебиения от клиента (у меня была такая проблема раньше, но не сейчас, в настоящее время нет никаких сообщений об ошибках).
Также если я запускаю sudo journalctl --system | grep rabbitmq
, я получаю следующий вывод:
......
Мау 24 05:15:49 oms-git.omsystem sshd[809111]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=43.154.63.169 user=rabbitmq
Мау 24 05:15:51 oms-git.omsystem sshd[809111]: Failed password for rabbitmq from 43.154.63.169 port 37010 ssh2
Мау 24 05:15:51 oms-git.omsystem sshd[809111]: Disconnected from authenticating user rabbitmq 43.154.63.169 port 37010 [preauth]
Мау 24 16:12:32 oms-git.omsystem sudo[842182]: ad : TTY=pts/3 ; PWD=/var/log/rabbitmq ; USER=root ; COMMAND=/usr/bin/tail -f -n 1000 rabbit@XXX-git.log
......
Возможно, здесь другая проблема с брандмауэром, но я не вижу никаких сообщений об этом в /var/log/rabbitmq/rabbit@XXX.log
.
Моя конфигурация Celery на клиенте выглядит следующим образом:
CELERY_TASK_IGNORE_RESULT = True
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'
CELERY_SEND_EVENTS = False
CELERY_BROKER_POOL_LIMIT = 30
CELERY_BROKER_HEARTBEAT = 30
CELERY_BROKER_CONNECTION_TIMEOUT = 600
CELERY_PREFETCH_MULTIPLIER = 1
CELERY_SEND_EVENTS = False
CELERY_WORKER_CONCURRENCY = 1
CELERY_TASK_ACKS_LATE = True
В настоящее время я запускаю капсулу с помощью следующей команды:
celery -A project.celery worker -l info -f /var/log/celery/celery.log -Ofair
Также я пытался использовать различные аргументы для ограничения префетча или отключения heartbit, но это не сработало:
celery -A project.celery worker -l info -f /var/log/celery/celery.log --without-heartbeat --without-gossip --without-mingle
celery -A project.celery worker -l info -f /var/log/celery/celery.log --prefetch-multiplier=1 --pool=solo --
Я ожидаю, что нет никаких ограничений на длину очереди и каждый celery pod в моем кластере kubernetes потребляет и принимает сообщения без ошибок.