Confluent-kafka-python сохранение в базу данных
У меня есть проект Djnago и я пытаюсь сохранить объекты в базу данных внутри потребителя kafka. Я получаю записи из kafka следующим образом:
def sync_product_from_kafka_consumer():
topic = PRODUCT_KAFKA_TOPIC_NAME
schema_str = """*some_json*"""
json_deserializer = JSONDeserializer(schema_str, from_dict=dict_to_product)
string_deserializer = StringDeserializer('utf_8')
consumer_conf = {'bootstrap.servers': KAFKA_SERVERS,
'key.deserializer': string_deserializer,
'value.deserializer': json_deserializer,
'group.id': 'myproject.dev.group-k8s-dev',
'auto.offset.reset': "earliest"}
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe([topic])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
product = msg.value()
if product is not None:
save_product(product)
# sync_to_async(save_product)(product)
consumer.close()
def save_product(product):
defaults = { *some_dict* }
Product.objects.update_or_create(id=product.id, defaults=defaults)
Но после выполнения этого кода я получаю исключение:
django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
Как мне это исправить? Спасибо за любую помощь!