Celery. RuntimeError: невозможно начать новый поток
Я использую Celery (Redis в качестве брокера) для отправки сообщений в Telegram (используя Telegram Bot API).
На моем VPS есть 4 общих процессора. Я разделяю список получателей на группы по 5 человек; каждые 1-3 секунды бот отправляет сообщения этим группам.
Проблема в том, что при тестировании (~2000
ресиверов) в определенный момент мои задачи срываются из-за ошибки RuntimeError: can't start new thread
(при этом загрузка процессора не превышает 15%, а при использовании ulimit -u
я получаю 7795
).
Пожалуйста, расскажите мне, в чем заключается проблема и как ее можно решить.
models.py
class Post(models.Model):
text = models.TextField(max_length=4096, blank=True, null=True, default=None, verbose_name="Text")
@receiver(post_save, sender=Post)
def instance_created(sender, instance, created, **kwargs):
if created:
pre_send_tg.apply_async((instance.id,), countdown=5)
tasks.py
def chunks(lst, n):
res = []
for i in range(0, len(lst), n):
res.append(lst[i:i + n])
return res
@celery_app.task(ignore_result=True)
def pre_send_tg(post_id):
try:
Post = apps.get_model('news.post')
TelegramUser = apps.get_model('tools.telegramuser')
post = Post.objects.get(id=post_id)
users = [x.tg_id for x in TelegramUser.objects.all()]
_start = datetime.datetime.now() + datetime.timedelta(seconds=5)
count = 0
for i in chunks(users, 5):
for tg_id in i:
send_message.apply_async((tg_id, post.text), eta=_start)
if count % 100 == 0:
_start += datetime.timedelta(seconds=random.randint(50, 60))
else:
_start += datetime.timedelta(seconds=random.randint(2, 3))
count += 1
except Exception as e:
print(e)
@celery_app.task(ignore_result=True, time_limit=10, autoretry_for=(Exception,),
retry_backoff=1800, retry_kwargs={'max_retries': 2})
def send_message(tg_id, text):
bot = telebot.TeleBot(token)
try:
bot.send_message(chat_id=tg_id, text=text)
except Exception as e:
if e.args == (
'A request to the Telegram API was unsuccessful. Error code: 403. Description: Forbidden: bot was blocked by the user',):
pass
elif e.args[0].startswith(
"A request to the Telegram API was unsuccessful. Error code: 429."):
raise Exception
else:
pass
celery commands
celery -A Bot worker --loglevel=INFO --concurrency=10 -n worker1@%h --purge
celery -A Bot worker --loglevel=INFO --concurrency=10 -n worker2@%h --purge
celery -A Bot worker --loglevel=INFO --concurrency=10 -n worker3@%h --purge