Как загружать и обрабатывать большие файлы excel с помощью Celery в Django?
Я пытаюсь загрузить и обработать файл excel с помощью Django и DRF с Celery. Есть проблема, когда я пытаюсь передать файл в задачу Celery для обработки в фоновом режиме, я получаю следующую ошибку:
kombu.exceptions.EncodeError: Object of type InMemoryUploadedFile is not JSON serializable
Вот мой обработчик запроса на пост представления:
class FileUploadView(generics.CreateAPIView):
"""
POST: upload file to save data in the database
"""
parser_classes = [MultiPartParser]
serializer_class = FileSerializerXLSX
def post(self, request, format=None):
"""
Allows to upload file and lets it be handled by pandas
"""
serialized = FileSerializerXLSX(data=request.data)
if serialized.is_valid():
file_obj = request.data['file']
# file_bytes = file_obj.read()
print(file_obj)
import_excel_task.delay(file_obj)
print("its working")
return Response(status=204)
return Response(serialized._errors, status=status.HTTP_400_BAD_REQUEST)
И моя задача с сельдереем:
def import_excel_helper(file_obj):
df = extract_excel_to_dataframe(file_obj)
transform_df_to_clientmodel(df)
transform_df_to_productmodel(df)
transform_df_to_salesmodel(df)
@shared_task(name="import_excel_task")
def import_excel_task(file_obj):
"""Save excel file in the background"""
logger.info("Importing excel file")
import_excel_helper(file_obj)
Есть идеи, как обрабатывать импорт файлов Excel в задачу celery, чтобы они могли обрабатываться другими функциями в фоновом режиме?
В соответствии с ошибкой, тело запроса для вызова задачи celery должно быть JSON сериализуемым, так как это конфигурация по умолчанию. Затем, как документировано в kombu:
Основной недостаток JSON заключается в том, что он ограничивает вас следующими типами данных: строки, Unicode, плавающие числа, булевы, словари и списки. Десятичные числа и даты отсутствуют.
Допустим, это мой файл excel.
file.xlsx
Some | Value |
---|---|
Here | :) |
Решение 1
Перед вызовом задачи преобразуйте необработанные байты excel в строку Base64, чтобы их можно было сериализовать в JSON (поскольку строки являются допустимыми типами данных в документе JSON, а необработанные байты - нет). Затем, все остальное в конфигурациях Celery - те же значения по умолчанию.
tasks.py
import base64
import pandas
from celery import Celery
app = Celery('tasks')
@app.task
def add(excel_file_base64):
excel_file = base64.b64decode(excel_file_base64)
df = pandas.read_excel(excel_file)
print("Contents of excel file:", df)
views.py
import base64
from tasks import add
with open("file.xlsx", 'rb') as file: # Change this to be your <request.data['file']>
excel_raw_bytes = file.read()
excel_base64 = base64.b64encode(excel_raw_bytes).decode()
add.apply_async((excel_base64,))
Выход
[2021-08-19 20:40:28,904: INFO/MainProcess] Task tasks.add[d5373444-485d-4c50-8695-be2e68ef1c67] received
[2021-08-19 20:40:29,094: WARNING/ForkPoolWorker-4] Contents of excel file:
[2021-08-19 20:40:29,094: WARNING/ForkPoolWorker-4]
[2021-08-19 20:40:29,099: WARNING/ForkPoolWorker-4] Some Value
0 Here :)
[2021-08-19 20:40:29,099: WARNING/ForkPoolWorker-4]
[2021-08-19 20:40:29,099: INFO/ForkPoolWorker-4] Task tasks.add[d5373444-485d-4c50-8695-be2e68ef1c67] succeeded in 0.19386404199940444s: None
Решение 2:
Это более сложный путь. Реализуйте пользовательский сериализатор, который будет работать с файлами excel.
tasks.py
import ast
import base64
import pandas
from celery import Celery
from kombu.serialization import register
def my_custom_excel_encoder(obj):
"""Uncomment this block if you intend to pass it as a Base64 string:
file_base64 = base64.b64encode(obj[0][0]).decode()
obj = list(obj)
obj[0] = [file_base64]
"""
return str(obj)
def my_custom_excel_decoder(obj):
obj = ast.literal_eval(obj)
"""Uncomment this block if you passed it as a Base64 string (as commented above in the encoder):
obj[0][0] = base64.b64decode(obj[0][0])
"""
return obj
register(
'my_custom_excel',
my_custom_excel_encoder,
my_custom_excel_decoder,
content_type='application/x-my-custom-excel',
content_encoding='utf-8',
)
app = Celery('tasks')
app.conf.update(
accept_content=['json', 'my_custom_excel'],
)
@app.task
def add(excel_file):
df = pandas.read_excel(excel_file)
print("Contents of excel file:", df)
views.py
from tasks import add
with open("file.xlsx", 'rb') as excel_file: # Change this to be your <request.data['file']>
excel_raw_bytes = excel_file.read()
add.apply_async((excel_raw_bytes,), serializer='my_custom_excel')
Выход
- То же, что и решение 1
Решение 3
Вам может быть интересна эта документация Отправка сырых данных без сериализации