Celery. RuntimeError: невозможно начать новый поток

Я использую Celery (Redis в качестве брокера) для отправки сообщений в Telegram (используя Telegram Bot API).

На моем VPS есть 4 общих процессора. Я разделяю список получателей на группы по 5 человек; каждые 1-3 секунды бот отправляет сообщения этим группам.

Проблема в том, что при тестировании (~2000 ресиверов) в определенный момент мои задачи срываются из-за ошибки RuntimeError: can't start new thread (при этом загрузка процессора не превышает 15%, а при использовании ulimit -u я получаю 7795).

Пожалуйста, расскажите мне, в чем заключается проблема и как ее можно решить. enter image description here

models.py

class Post(models.Model):
    text = models.TextField(max_length=4096, blank=True, null=True, default=None, verbose_name="Text")

@receiver(post_save, sender=Post)
def instance_created(sender, instance, created, **kwargs):
    if created:
        pre_send_tg.apply_async((instance.id,), countdown=5)
      

tasks.py

def chunks(lst, n):
    res = []
    for i in range(0, len(lst), n):
        res.append(lst[i:i + n])
    return res


@celery_app.task(ignore_result=True)
def pre_send_tg(post_id):
    try:
        Post = apps.get_model('news.post')
        TelegramUser = apps.get_model('tools.telegramuser')

        post = Post.objects.get(id=post_id)
        users = [x.tg_id for x in TelegramUser.objects.all()]
            
        _start = datetime.datetime.now() + datetime.timedelta(seconds=5)
        count = 0

        for i in chunks(users, 5):
            for tg_id in i:          
                send_message.apply_async((tg_id, post.text), eta=_start)
          
            if count % 100 == 0:
                _start += datetime.timedelta(seconds=random.randint(50, 60))
            else:
                _start += datetime.timedelta(seconds=random.randint(2, 3))
            count += 1
    except Exception as e:
        print(e)


@celery_app.task(ignore_result=True, time_limit=10,  autoretry_for=(Exception,),
                retry_backoff=1800, retry_kwargs={'max_retries': 2})
def send_message(tg_id, text):
    bot = telebot.TeleBot(token)
    try:
        bot.send_message(chat_id=tg_id, text=text)
    except Exception as e:
        if e.args == (
                'A request to the Telegram API was unsuccessful. Error code: 403. Description: Forbidden: bot was blocked by the user',):
            pass
        elif e.args[0].startswith(
                "A request to the Telegram API was unsuccessful. Error code: 429."):
            raise Exception
        else:
            pass

celery commands

celery -A Bot worker --loglevel=INFO --concurrency=10 -n worker1@%h --purge
celery -A Bot worker --loglevel=INFO --concurrency=10 -n worker2@%h --purge
celery -A Bot worker --loglevel=INFO --concurrency=10 -n worker3@%h --purge
Вернуться на верх