Confluent-kafka-python сохранение в базу данных

У меня есть проект Djnago и я пытаюсь сохранить объекты в базу данных внутри потребителя kafka. Я получаю записи из kafka следующим образом:

def sync_product_from_kafka_consumer():
    topic = PRODUCT_KAFKA_TOPIC_NAME
    schema_str = """*some_json*"""
    json_deserializer = JSONDeserializer(schema_str, from_dict=dict_to_product)
    string_deserializer = StringDeserializer('utf_8')

    consumer_conf = {'bootstrap.servers': KAFKA_SERVERS,
                     'key.deserializer': string_deserializer,
                     'value.deserializer': json_deserializer,
                     'group.id': 'myproject.dev.group-k8s-dev',
                     'auto.offset.reset': "earliest"}

    consumer = DeserializingConsumer(consumer_conf)
    consumer.subscribe([topic])
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue

        product = msg.value()
        if product is not None:
            save_product(product)
            # sync_to_async(save_product)(product)

    consumer.close()


def save_product(product):
    defaults = { *some_dict* }
    Product.objects.update_or_create(id=product.id, defaults=defaults)

Но после выполнения этого кода я получаю исключение:

django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.

Как мне это исправить? Спасибо за любую помощь!

Вернуться на верх