Celery. RuntimeError: can't start new thread
I use Celery (Redis as brocker) to send telegram messages (using Telegram Bot API).
My VPS has 4 shared CPUs. I divide the list of recipients into groups of 5 people; every 1-3 seconds, the bot sends messages to these groups.
The problem is that when testing (~2000
receivers) at a certain point, my tasks fail due to a RuntimeError: can't start new thread
error (while the CPU load does not exceed 15% and when using ulimit -u
I get 7795
).
Please tell me what the problem is and how it can be solved.
models.py
class Post(models.Model):
text = models.TextField(max_length=4096, blank=True, null=True, default=None, verbose_name="Text")
@receiver(post_save, sender=Post)
def instance_created(sender, instance, created, **kwargs):
if created:
pre_send_tg.apply_async((instance.id,), countdown=5)
tasks.py
def chunks(lst, n):
res = []
for i in range(0, len(lst), n):
res.append(lst[i:i + n])
return res
@celery_app.task(ignore_result=True)
def pre_send_tg(post_id):
try:
Post = apps.get_model('news.post')
TelegramUser = apps.get_model('tools.telegramuser')
post = Post.objects.get(id=post_id)
users = [x.tg_id for x in TelegramUser.objects.all()]
_start = datetime.datetime.now() + datetime.timedelta(seconds=5)
count = 0
for i in chunks(users, 5):
for tg_id in i:
send_message.apply_async((tg_id, post.text), eta=_start)
if count % 100 == 0:
_start += datetime.timedelta(seconds=random.randint(50, 60))
else:
_start += datetime.timedelta(seconds=random.randint(2, 3))
count += 1
except Exception as e:
print(e)
@celery_app.task(ignore_result=True, time_limit=10, autoretry_for=(Exception,),
retry_backoff=1800, retry_kwargs={'max_retries': 2})
def send_message(tg_id, text):
bot = telebot.TeleBot(token)
try:
bot.send_message(chat_id=tg_id, text=text)
except Exception as e:
if e.args == (
'A request to the Telegram API was unsuccessful. Error code: 403. Description: Forbidden: bot was blocked by the user',):
pass
elif e.args[0].startswith(
"A request to the Telegram API was unsuccessful. Error code: 429."):
raise Exception
else:
pass
celery commands
celery -A Bot worker --loglevel=INFO --concurrency=10 -n worker1@%h --purge
celery -A Bot worker --loglevel=INFO --concurrency=10 -n worker2@%h --purge
celery -A Bot worker --loglevel=INFO --concurrency=10 -n worker3@%h --purge