Сериализация задач Dask в django
Я пытаюсь настроить prefect 2 с dask для выполнения некоторых задач в django. Один простой пример с небольшим количеством кода работает нормально, но более крупный пример не работает с:
distributed.worker - ERROR - Could not deserialize task ****-****-****-****
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 2185, in execute
function, args, kwargs = await self._maybe_deserialize_task(ts)
File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 2158, in _maybe_deserialize_task
function, args, kwargs = _deserialize(*ts.run_spec)
File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 2835, in _deserialize
kwargs = pickle.loads(kwargs)
File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 73, in loads
return pickle.loads(x)
File "/tmp/tmpb11md_6wprefect/my_project/models/__init__.py", line 2, in <module>
from my_project.models.my_model import *
File "/tmp/tmpb11md_6wprefect/my_project/models/api_key.py", line 14, in <module>
class ApiKey(models.Model):
File "/usr/local/lib/python3.8/site-packages/django/db/models/base.py", line 108, in __new__
app_config = apps.get_containing_app_config(module)
File "/usr/local/lib/python3.8/site-packages/django/apps/registry.py", line 253, in get_containing_app_config
self.check_apps_ready()
File "/usr/local/lib/python3.8/site-packages/django/apps/registry.py", line 136, in check_apps_ready
raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
Проблема здесь в том, что я использую init.py в каталоге моделей вместо того, чтобы использовать несколько приложений и т.д. Чтобы обойти эту ошибку в других местах, я установил окружение django в начале каждой задачи, например:
import datetime
import xyz
import django
os.environ.setdefault(
"DJANGO_SETTINGS_MODULE",
os.environ.get("DJANGO_SETTINGS_MODULE", "core.settings"),
)
django.setup()
# No we import the models
from my_project.models import MyModel
До сих пор это работало нормально. Но десериализация в dask, похоже, не работает таким образом. Кажется, что он пытается импортировать каталог модели перед выполнением любого кода в сериализованной задаче.
Есть ли способ исправить это?
Я бы с удовольствием помог с вопросом о префекте, но я не смогу сильно помочь с вопросом о Django.
Одним из решений, которое вы можете попробовать, является создание потока, запускаемого из развертывания через вызов API, вместо запуска потока непосредственно в вашем приложении Django. Таким образом, поток будет выполняться в отдельном процессе, не вмешиваясь в работу вашего приложения Django. В этой теме Prefect Discourse показано, как это можно настроить.
Еще проще было бы сделать это с помощью Python requests
, а не клиента Prefect - таким образом, вашему приложению Django даже не нужно будет устанавливать Prefect в вашей среде. Я думаю, это может быть лучшим решением здесь.