Фоновый демон в celery без взрывов задач
У меня всегда возникают проблемы, когда я пытаюсь заставить celery jobs сканировать базу данных на предмет работы, но при этом не допускать повторных действий или сбоев в отправке. Проблема заключается в том, что:
- периодические
celery-beatзадания просто продолжают отправляться, даже если в очередь уже добавлено много заданий, и нет возможности ограничить это количество. Например, не более 1 из них должно быть в очереди или активно. - Если вы не используете celery inspect для проверки того, существует ли задание, но эти вызовы могут быть заблокированы слишком надолго, я считаю, что это особенно часто происходит, если работники заняты.
- Это может привести к накоплению тонны проверок.
Это может привести к сбоям в работе, например, когда beat продолжает отправлять заявки, но работник занят или не работает какое-то время, а как только они появляются снова, их набирается миллион.
То, что я ищу, - это демоноподобный процесс или поток, который просто проверяет наличие в базе данных объектов, требующих обработки, и будет ждать, если их там нет (или запросит когда-нибудь 30' или что-то в этом роде). В этом случае, если рабочая нагрузка слишком высока, нет особого смысла планировать одно и то же множество раз.
Отслеживать в базе данных, что было отправлено, а что нет, также немного сложно, потому что задание может быть прервано (например, oom) до того, как будет выполнено снятие флажков.
Можно было бы легко запрограммировать что-то подобное, но мне кажется неприличным отказываться от таких фреймворков, как celery, для фоновых процессов. С другой стороны, celery, похоже, не позволяет мне ограничить частоту отправки чего-либо разумным способом.
Каков надежный способ, используя celery или нет, запустить что-то подобное в фоновом режиме без взрывов или ложных блоков проверки?
from celery import Celery
app = Celery(...)
app.autodiscover_tasks()
def not_already_enqueued(pdf_id):
# celery inspect magic on registered/active/reserved
# unfortunately spuriously blocks a lot
return True|False
@app.task
def create_pages_for_pdf(pdf_id):
# ...
pass
@app.task
def create_pages_for_pdfs():
# Queries the database for unprocessed jobs,
# in my case these are PDFs not converted to
# images for each page
unporcessed_pdfs = [...]
for pdf in unporcessed_pdfs:
if not_already_enqueued(pdf_id)
create_pages_for_pdf.apply_async([pdf.id])
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(30.0, create_pages_for_pdfs, name='creates pages for each unprocessed PDF')
оберните задачу в блокировку redis и игнорируйте задачу, если блокировка существует, я создал обработчик сельдерея, вы можете использовать это
import logging
from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
from django.conf import settings
class CeleryTaskHandler:
"""
- Utility class for managing the execution and exception handling of Celery tasks.
- Contains a locking mechanism to prevent multiple instances of the same task from running concurrently.
Args:
task_name (str): The name of the task.
lock_duration (int, optional): The duration of the lock in seconds. Defaults to 300.
"""
def __init__(self, task_name, lock_duration=300):
self.task_name = task_name
self.lock_key = f"{self.task_name}_lock"
self.lock_duration = lock_duration
self.REDIS_CLIENT = settings.REDIS_CLIENT
def acquire_lock(self):
return self.REDIS_CLIENT.set(self.lock_key, "True", nx=True, ex=self.lock_duration)
def release_lock(self):
self.REDIS_CLIENT.delete(self.lock_key)
def execute_task(self, task_func, *args, **kwargs):
"""
Execute a task function while acquiring and releasing a lock to prevent concurrent execution.
"""
if self.acquire_lock():
try:
task_func(*args, **kwargs)
except SoftTimeLimitExceeded as e:
logging.error(f"Notification Limit exceeded: {str(e)}")
except TimeLimitExceeded as e:
logging.error(f"Task shutdown hard time limit exceeded: {str(e)}")
except Exception as e:
logging.error(f"Unexpected error in {self.task_name}: {str(e)}", exc_info=True)
finally:
self.release_lock()
else:
logging.info(f"New task is ignored since a previous task is already in progress for {self.task_name}")
Вдохновленный ответом @shaf-shafiq и похожий на него, я создал аннотацию.
Я предпочитаю это, потому что
- проще обернуть функцию с помощью
run_once_or_cancel('bla {0["id"]}, format=True), чтобы аннотировать задание вместе с идентификатором первого аргумента - их решение, похоже, не зависит от того, что делает celery, все задачи по-прежнему запускаются, но просто отменяются, как только блокировка уже используется. Таким образом, ортогональность становится более понятной.
Но я, возможно, неправильно понял.
import logging
from functools import wraps
from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
from django.conf import settings
from redis import Redis
logger = logging.getLogger("tasklocks")
def get_redis_client():
logger.info('Setting up client')
return Redis(**settings.TASK_LOCK_REDIS_CLIENT)
def run_once_or_cancel(lock_name, format=False, redis_client=None, lock_duration=5):
"""Annotation for a function, connects to reddis and only executes if not already executing somethign with lock_name.
`format` uses `lockname.format(*args, **kwargs)` as a lock_name, e.g. allowing for lock names with the ID of the
object(s) the function processes in it. Still figuring out what `lock_duration` does, guess it allows re-execution
after that time in any case"""
if redis_client is None:
redis_client = get_redis_client()
def acquire_lock(lock_name_):
logger.debug(f"Acquire task lock {lock_name_}")
return redis_client.set(lock_name_, "True", nx=True, ex=lock_duration)
def release_lock(lock_name_):
logger.debug(f"Release task lock {lock_name_}")
redis_client.delete(lock_name_)
def f(task_func):
@wraps(task_func)
def g(*args, **kwargs):
lock_name_ = lock_name.format(*args, **kwargs) if format else lock_name
if acquire_lock(lock_name_):
try:
task_func(*args, **kwargs)
except SoftTimeLimitExceeded as e:
logger.error(f"Notification Limit exceeded: {str(e)}")
except TimeLimitExceeded as e:
logger.error(f"Task shutdown hard time limit exceeded: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error in `{lock_name}`: {str(e)}", exc_info=True)
finally:
release_lock(lock_name_)
else:
logger.info(f"New task `{lock_name}` is ignored since a previous task is still in progress")
return g
return f
Я также настроил отдельные базы данных redis для этих блокировок, чтобы они не смешивались с теми, которые использует celery (settings.py). В моем случае их три, с базами данных 0 (websockets), 1 (celery) и 2 (the locks).
CELERY_BROKER_URL = (
'rediss://localhost:16379/1?'
'ssl_cert_reqs=CERT_REQUIRED'
'&ssl_ca_certs=docker/secrets/certificates/redis/ca.crt'
'ssl_keyfile=docker/secrets/certificates/redis/client.key'
'ssl_certfile=docker/secrets/certificates/redis/client.crt'
TASK_LOCK_REDIS_CLIENT = dict(
host="localhost", port=16379, ssl=True, db=2,
ssl_certfile='docker/secrets/certificates/redis/client.crt',
ssl_keyfile='docker/secrets/certificates/redis/client.key',
ssl_cert_reqs="required",
ssl_ca_certs='docker/secrets/certificates/redis/ca.crt',
)
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [{
'address': 'rediss://localhost:16379/0',
'ssl_certfile': 'docker/secrets/certificates/redis/client.crt',
'ssl_keyfile': 'docker/secrets/certificates/redis/client.key',
'ssl_cert_reqs': "required",
'ssl_ca_certs': 'docker/secrets/certificates/redis/ca.crt',
}],
'capacity': 1500,
},
},
}
Тогда у меня есть две функции, одна из которых является задачей celery, а другая - просто функцией, которая может быть вызвана из нескольких мест.
Таким образом, необработанные элементы ищутся только один раз за раз, а элемент обрабатывается только 1 раз за раз.
@app.task
@run_once_or_cancel('process_items', lock_duration=300)
def process_items():
# ....
for item in unprocessed:
process_item(item)
@app.task
@run_once_or_cancel('process_item({0.id})', format=True, lock_duration=300)
def process_items(item):
# ....
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
10.0, process_items,
name='process unprocessed items',
) # every 10 seconds, or longer if previous run still busy
В обоих ответах вы фактически передаете логику "проверки" блокировкам Redis. Эта блокировка может быть ненадежной, особенно с учетом контекста предварительной проверки.
Насколько я понимаю, ваша задача не является такой уж "периодической", поскольку контекст выполнения сильно зависит от состояния системы.
То, что я ищу, - это демоноподобный процесс или поток, который просто проверяет наличие в базе данных объектов, требующих обработки, и будет ждать, если их там нет (или запросит когда-нибудь 30' или что-то в этом роде). В этом случае, если рабочая нагрузка слишком высока, нет особого смысла планировать одно и то же множество раз.
Здесь вы описали реальное решение. Для этого вам понадобится коляска, не связанная с сельдереем. Таким образом, вы сможете контролировать все. Я предлагаю вам уменьшить интервал "проверки" до минимума, чтобы не накапливать данные. В случае celery это неэффективно, так как в этом случае возникнет много задач, которые займут место в очереди.
но мне кажется неприличным отказываться от таких фреймворков, как celery, для фоновых процессов
Не предполагается, что Celery может бесконечно запускать фоновые процессы. На самом деле, чем более атомарной, идемпотентной и небольшой является задача, тем лучше.
Я мог бы дать более конкретные рекомендации, если бы вы могли предоставить мне подробную информацию о том, какие задачи вы выполняете и сколько их.