Повышение эффективности ORM и сериализации Django при импорте внешних данных

У меня есть celery worker, который периодически получает данные из API, с которым мне нужно синхронизироваться. Я создаю или обновляю конкретный экземпляр модели в зависимости от того, хранятся ли у меня уже внешние данные id.

Однако по мере роста API, с которым я синхронизирую, эта задача становится все более дорогостоящей.

Вот соответствующий код:

    for d in api.fetch_items():
        attempted_count += 1
        try:
            existing_item = MyModel.objects.get(item_id=d["item"])
            serializer = MySerializer(
                existing_item,
                data=d,
                partial=True,
                context={"company": company},
            )
        except MyModel.DoesNotExist:
            serializer = MySerializer(data=d, context={"company": company})
        if serializer.is_valid():
            serializer.save()
            imported_count += 1
        else:
            logger.error(f"Invalid data when deserializing item: {serializer.errors}\nBased on: {d}")

Я думаю, что поскольку ответ очень длинный (часто 50,000+ элементов), эти запросы заставляют задачу съедать много времени. Я надеюсь получить несколько предложений по повышению эффективности этого потока.

Некоторые вещи, которые я рассматривал:

  • Сравнивая экземпляры MyModel через iterator() с ответом, который получает задача, чтобы определить, какие внутренние элементы нужно обновить или создать (я даже не знаю, будет ли это быстрее)

  • Массовое обновление, хотя я не уверен, как делать массовые обновления, когда значения, которые нужно обновить для каждого элемента, разные.

  • Может быть, атомарная транзакция?

Я даже не совсем уверен, являются ли те вещи, о которых я подумал, хорошими подходами. Надеюсь, кто-нибудь поделится своими мыслями!

Мне приходилось сталкиваться с похожей проблемой - которая, честно говоря, связана с наличием никудышного API, не предоставляющего разумного способа внесения изменений в данные, и заставляющего вас периодически итерировать весь набор данных для самостоятельного выявления изменений.

Каким бы способом вы это ни делали, если каждый раз обновлять все объекты в базе данных, это будет медленно, неэффективно и плохо масштабируется. Я использовал следующий подход:

  1. При итерации по входящим данным генерируйте хэш каждой записи.

  2. Храните этот хэш в кэше, с ID объекта в качестве ключа.

  3. При повторной итерации данных проверьте хэш, и если он совпадает с тем, что у вас есть в кэше, пропустите обновление, потому что вы знаете, что ничего не изменилось с момента последнего сохранения.

В коде это будет выглядеть примерно так:

import hashlib
from django.core.cache import cache

for d in api.fetch_items():
    data_str = json.dumps(d)  # Convert the data to a string so you can hash it
    data_hash = hashlib.md5(data_str.encode('utf-8')).hexdigest() # Generate md5 hash, which is quick
    cache_key = 'some_prefix_' + data_hash
    if not cache.get(cache_key):
        cache.set(cache_key, 1)  # Just set a simple value - we only care about the key
        # Perform your logic here for updating the item in the database
    else:
        # The data hasn't changed since the last time you checked - skip it
        continue

По-прежнему существует некоторая внутренняя неэффективность в итерации по всему - но, по крайней мере, этот подход устраняет ненужные записи в базу данных.

Вернуться на верх