Как загружать и обрабатывать большие файлы excel с помощью Celery в Django?

Я пытаюсь загрузить и обработать файл excel с помощью Django и DRF с Celery. Есть проблема, когда я пытаюсь передать файл в задачу Celery для обработки в фоновом режиме, я получаю следующую ошибку:

kombu.exceptions.EncodeError: Object of type InMemoryUploadedFile is not JSON serializable

Вот мой обработчик запроса на пост представления:

class FileUploadView(generics.CreateAPIView):
    """
    POST: upload file to save data in the database
    """
    parser_classes = [MultiPartParser]
    serializer_class = FileSerializerXLSX

    def post(self, request, format=None):
        """
        Allows to upload file and lets it be handled by pandas
        """

        serialized = FileSerializerXLSX(data=request.data)
        if serialized.is_valid():
            file_obj = request.data['file']
            # file_bytes = file_obj.read()
            print(file_obj)
            import_excel_task.delay(file_obj)
            print("its working")
            return Response(status=204)

        return Response(serialized._errors, status=status.HTTP_400_BAD_REQUEST)

И моя задача с сельдереем:

def import_excel_helper(file_obj):
    df = extract_excel_to_dataframe(file_obj)
    transform_df_to_clientmodel(df)
    transform_df_to_productmodel(df)
    transform_df_to_salesmodel(df)


@shared_task(name="import_excel_task")
def import_excel_task(file_obj):
    """Save excel file in the background"""
    logger.info("Importing excel file")
    import_excel_helper(file_obj)

Есть идеи, как обрабатывать импорт файлов Excel в задачу celery, чтобы они могли обрабатываться другими функциями в фоновом режиме?

В соответствии с ошибкой, тело запроса для вызова задачи celery должно быть JSON сериализуемым, так как это конфигурация по умолчанию. Затем, как документировано в kombu:

Основной недостаток JSON заключается в том, что он ограничивает вас следующими типами данных: строки, Unicode, плавающие числа, булевы, словари и списки. Десятичные числа и даты отсутствуют.

Допустим, это мой файл excel.

file.xlsx

Some Value
Here :)

Решение 1

Перед вызовом задачи преобразуйте необработанные байты excel в строку Base64, чтобы их можно было сериализовать в JSON (поскольку строки являются допустимыми типами данных в документе JSON, а необработанные байты - нет). Затем, все остальное в конфигурациях Celery - те же значения по умолчанию.

tasks.py

import base64

import pandas

from celery import Celery

app = Celery('tasks')


@app.task
def add(excel_file_base64):
    excel_file = base64.b64decode(excel_file_base64)
    df = pandas.read_excel(excel_file)
    print("Contents of excel file:", df)

views.py

import base64

from tasks import add


with open("file.xlsx", 'rb') as file:  # Change this to be your <request.data['file']>
    excel_raw_bytes = file.read()
    excel_base64 = base64.b64encode(excel_raw_bytes).decode()
    add.apply_async((excel_base64,))

Выход

[2021-08-19 20:40:28,904: INFO/MainProcess] Task tasks.add[d5373444-485d-4c50-8695-be2e68ef1c67] received
[2021-08-19 20:40:29,094: WARNING/ForkPoolWorker-4] Contents of excel file:
[2021-08-19 20:40:29,094: WARNING/ForkPoolWorker-4]  
[2021-08-19 20:40:29,099: WARNING/ForkPoolWorker-4]    Some Value
0  Here    :)
[2021-08-19 20:40:29,099: WARNING/ForkPoolWorker-4] 

[2021-08-19 20:40:29,099: INFO/ForkPoolWorker-4] Task tasks.add[d5373444-485d-4c50-8695-be2e68ef1c67] succeeded in 0.19386404199940444s: None

Решение 2:

Это более сложный путь. Реализуйте пользовательский сериализатор, который будет работать с файлами excel.

tasks.py

import ast
import base64

import pandas

from celery import Celery
from kombu.serialization import register


def my_custom_excel_encoder(obj):
    """Uncomment this block if you intend to pass it as a Base64 string:
    file_base64 = base64.b64encode(obj[0][0]).decode()
    obj = list(obj)
    obj[0] = [file_base64]
    """
    return str(obj)


def my_custom_excel_decoder(obj):
    obj = ast.literal_eval(obj)
    """Uncomment this block if you passed it as a Base64 string (as commented above in the encoder):
    obj[0][0] = base64.b64decode(obj[0][0])
    """
    return obj


register(
    'my_custom_excel',
    my_custom_excel_encoder,
    my_custom_excel_decoder,
    content_type='application/x-my-custom-excel',
    content_encoding='utf-8',
)


app = Celery('tasks')

app.conf.update(
    accept_content=['json', 'my_custom_excel'],
)


@app.task
def add(excel_file):
    df = pandas.read_excel(excel_file)
    print("Contents of excel file:", df)

views.py

from tasks import add


with open("file.xlsx", 'rb') as excel_file:  # Change this to be your <request.data['file']>
    excel_raw_bytes = excel_file.read()
    add.apply_async((excel_raw_bytes,), serializer='my_custom_excel')

Выход

  • То же, что и решение 1

Решение 3

Вам может быть интересна эта документация Отправка сырых данных без сериализации

Вернуться на верх