Как я могу использовать uWSGI Spooler в Django?
Я пытаюсь запустить параллельные задачи, используя uWSGI Spooler вместо Celery в Django 3.2.3, Python 3.7.9. Я нашел несколько ресурсов, таких как this, this и this, но ничего не работает. Я столкнулся с множеством ошибок в этом путешествии, и я исправил их, используя решения, которые я нашел в Интернете, и сейчас вот что у меня есть:
Настройка
uwsgi.ini
[uwsgi]
pythonpath = /path/to/djproj
wsgi-file = /path/to/djproj/wsgi.py
uid = myuid
module = wsgi:application
master = true
processes = 1
threads = 10
lazy-apps = true
http = 0.0.0.0:8080
vacuum = true
log-format = %(ltime) Worker: %(wid) %(status) %(method) %(uri) Size: %(size)
log-date = %%Y %%m %%d %%H:%%M:%%S.000
# Let django handle most of the logging
disable-logging = true
log-5xx = true
harakiri = 60
harakiri-verbose = true
stats = /tmp/djproj_stats.socket
# Spooling
spooler = /path/to/tasks
spooler-harakiri = 600
import = djproj.tasks
tasks.py
import logging
logger = logging.getLogger(__name__)
try:
from uwsgidecorators import spool
logger.warning("Imported spool successfully.")
except Exception:
logger.warning("Couldn't import spool.")
def spool(func):
def func_wrapper(**arguments):
return func(arguments)
return func_wrapper
@spool
def run_task(arguments):
logger.warning("Running in spool.")
from djproj.myapp.models import MyModel
obj = MyModel.objects.get(id=arguments["obj_id"])
obj.run()
djproj/myapp/models.py
# ...
def prepare_spooler_args(**kwargs):
args = {}
for name, value in kwargs.items():
args[name.encode("utf-8")] = str(value).encode("utf-8")
return args
class MyModel(models.Model):
# ...
def start_run_in_spooler(self):
args = prepare_spooler_args(task_id=self.id)
run_task(args)
Results
Когда я запускаю uwsgi --ini uwsgi.ini
и обращаюсь к конечной точке, которая запускает этот код, я получаю:
...
2021 09 20 12:56:20.000 - *** Stats server enabled on /tmp/djproj_stats.socket fd: 16 ***
2021 09 20 12:56:20.000 - spawned uWSGI http 1 (pid: 919)
2021 09 20 12:56:20.000 - [spooler /path/to/tasks pid: 917] managing request uwsgi_spoolfile_on_5a22c167ad32_826_2_189168444_1632124468_886229 ...
2021 09 20 12:56:20.000 - unable to find the spooler function, have you loaded it into the spooler process ?
Мне кажется очень странным, что в сети так мало ресурсов о том, как заставить это работать. Каждый раз, когда я ищу решения, все рекомендуют Celery, как будто это серебряная пуля для параллелизма в любом приложении на Python, хотя Django + uWSGI - очень распространенная комбинация, а Spooler кажется простым и легким решением. Если у кого-то есть советы, как заставить это работать, было бы здорово.
С помощью коллеги нам наконец удалось заставить его работать. Вот изменения, которые были необходимы:
uwsgi.ini
import = djproj.tasks
был изменен на spooler-import = djproj.tasks
.
tasks.py
Импорты должны были быть перенесены из функции спула, а Django должен быть инициализирован, чтобы импорты работали. Вот окончательная версия кода:
import logging
from functools import wraps
import django
logger = logging.getLogger(__name__)
try:
from djproj.myapp.models import MyModel
except Exception:
logger.warning(f"Django is not loaded yet! Setting up...")
try:
django.setup(set_prefix=False)
except Exception:
pass
from djproj.myapp.models import MyModel
try:
from uwsgidecorators import spool
except Exception:
logger.warning("Couldn't import uwsgidecorators!")
def spool(pass_arguments):
def decorator(method):
if callable(pass_arguments):
method.gw_method = method.__name__
else:
method.gw_method = pass_arguments
@wraps(method)
def wrapper(*args, **kwargs):
method(*args, **kwargs)
return wrapper
if callable(pass_arguments):
return decorator(pass_arguments)
return decorator
@spool(pass_arguments=True)
def run_task(task_id):
task = MyModel.objects.get(id=task_id)
task.run()
models.py и views.py
Метод модели, который вызывал спулированную функцию, был удален. Вместо этого мы просто вызываем спулированную функцию непосредственно в представлении:
class MyModelViewSet(viewsets.ModelViewSet):
queryset = models.MyModel.objects.all()
serializer_class = serializers.MyModelSerializer
def create(self, request, *args, **kwargs):
# ...
run_task(task_id=task.id)
# ...
return Response(serializer.data, status=status_code, headers=headers)
Надеюсь, это будет полезно всем, кто сталкивается с подобными проблемами.