Сериализация задач Dask в django

Я пытаюсь настроить prefect 2 с dask для выполнения некоторых задач в django. Один простой пример с небольшим количеством кода работает нормально, но более крупный пример не работает с:

distributed.worker - ERROR - Could not deserialize task ****-****-****-****
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 2185, in execute
    function, args, kwargs = await self._maybe_deserialize_task(ts)
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 2158, in _maybe_deserialize_task
    function, args, kwargs = _deserialize(*ts.run_spec)
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 2835, in _deserialize
    kwargs = pickle.loads(kwargs)
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 73, in loads
    return pickle.loads(x)
  File "/tmp/tmpb11md_6wprefect/my_project/models/__init__.py", line 2, in <module>
    from my_project.models.my_model import *
  File "/tmp/tmpb11md_6wprefect/my_project/models/api_key.py", line 14, in <module>
    class ApiKey(models.Model):
  File "/usr/local/lib/python3.8/site-packages/django/db/models/base.py", line 108, in __new__
    app_config = apps.get_containing_app_config(module)
  File "/usr/local/lib/python3.8/site-packages/django/apps/registry.py", line 253, in get_containing_app_config
    self.check_apps_ready()
  File "/usr/local/lib/python3.8/site-packages/django/apps/registry.py", line 136, in check_apps_ready
    raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.

Проблема здесь в том, что я использую init.py в каталоге моделей вместо того, чтобы использовать несколько приложений и т.д. Чтобы обойти эту ошибку в других местах, я установил окружение django в начале каждой задачи, например:

import datetime
import xyz
import django

os.environ.setdefault(
    "DJANGO_SETTINGS_MODULE",
    os.environ.get("DJANGO_SETTINGS_MODULE", "core.settings"),
)

django.setup()

# No we import the models

from my_project.models import MyModel

До сих пор это работало нормально. Но десериализация в dask, похоже, не работает таким образом. Кажется, что он пытается импортировать каталог модели перед выполнением любого кода в сериализованной задаче.

Есть ли способ исправить это?

Я бы с удовольствием помог с вопросом о префекте, но я не смогу сильно помочь с вопросом о Django.

Одним из решений, которое вы можете попробовать, является создание потока, запускаемого из развертывания через вызов API, вместо запуска потока непосредственно в вашем приложении Django. Таким образом, поток будет выполняться в отдельном процессе, не вмешиваясь в работу вашего приложения Django. В этой теме Prefect Discourse показано, как это можно настроить.

Еще проще было бы сделать это с помощью Python requests, а не клиента Prefect - таким образом, вашему приложению Django даже не нужно будет устанавливать Prefect в вашей среде. Я думаю, это может быть лучшим решением здесь.

Вернуться на верх