Отправка больших файлов base64 через RabbitMQ для использования рабочими

Я использую RabbitMQ и Celery для обработки вложений электронной почты с помощью gmail API. В моем первом celery task он извлекает пакеты электронных писем с большими вложениями в строках base64 размером более 25 Мб на файл. Текущее ограничение по умолчанию для RabbitMQ составляет 16 Мб, но я не хочу его повышать, потому что прочитал несколько статей о том, как лучше сохранять небольшой размер сообщения.

Как это лучше всего сделать? В то время как первой задачей является получение электронных писем, я хочу создать несколько других celery workers, которые обрабатывают эти файлы (с распознаванием текста и сохранением его в базе данных) одновременно, чтобы оптимизировать скорость процесса.

Несколько решений (я не уверен, что это хорошая практика, потому что я новичок) Я придумал:

  • Повышение лимита на размер сообщения RabbitMQ

  • Сохранение файла в памяти и ссылка на него во второй задаче celery (не уверен, что это хорошая идея, потому что на моем сервере, который я запускаю, 32 ГБ оперативной памяти)

  • В первой задаче celery, которая получает электронные письма, я могу напрямую загрузить их в облачный сервис хранения, а затем сослаться на этот URL-адрес файла во второй задаче celery. Но недостатком этого является то, что мне пришлось бы загрузить файл, а затем повторно загрузить его, чтобы выполнить распознавание текста, что не кажется эффективным. (Также увеличились затраты из-за использования полосы пропускания)

Есть ли здесь другое решение моей проблемы с дизайном?

В вашем случае, возможно, имеет смысл разделить вашу первую большую задачу, которая заключается в загрузке почтовых пакетов и вложений к ним, на две более мелкие задачи. Поскольку ваши вложения большие, их можно загрузить с отдельной конечной точки (во второй задаче).

В первом задании вы извлекаете и обрабатываете электронные письма из GmailAPI и передаете их следующему заданию: userId, messageId, attachmentId, access_token.

Во втором задании вы загружаете вложение и сразу же выполняете распознавание текста и сохраняете результаты в базе данных.

Вот упрощенный код для этого решения:

import requests  
from celery import shared_task  
  
API_URL = 'https://gmail.googleapis.com'  
  
  
@shared_task  
def process_user_mails(user_id, access_token):  
    def iter_user_mail_ids_from_first_page():  
        url = f'{API_URL}/gmail/v1/users/{user_id}/messages'  
        mails_data = requests.get(url=url, headers=headers).json()  
        for mail in mails_data['messages']:  
            yield mail['id']  
  
    def load_user_mail_message():  
        url = f'{API_URL}/gmail/v1/users/{user_id}/messages/{mail_id}' 
        return requests.get(url=url, headers=headers).json()  
  
    def iter_attachments_meta(payload):  
        if payload.get('filename'):  
            yield {  
                'mime_type': payload['mimeType'],  
                'filename': payload['filename'],  
                'attachment_id': payload['body']['attachmentId'],  
            }  
  
        for part in payload.get('parts', []):  
            yield from iter_attachments_meta(part)  
  
    headers = {'Authorization': f'Bearer {access_token}'}  
    for mail_id in iter_user_mail_ids_from_first_page():  
        mail_message = load_user_mail_message()  
        attachments_it = iter_attachments_meta(mail_message.get('payload', []))  
        for attachment_meta in attachments_it:  
            process_attachment.delay(  
                user_id=user_id,  
                access_token=access_token,  
                mail_id=mail_id,  
                attachment_meta=attachment_meta,  
            )  
  
  
@shared_task  
def process_attachment(user_id, access_token, mail_id, attachment_meta):  
    attachment_id = attachment_meta['attachment_id']  
    headers = {'Authorization': f'Bearer {access_token}'}  
    url = (  
        f'{API_URL}/gmail/v1/users/{user_id}/messages/'  
        f'{mail_id}/attachments/{attachment_id}'  
    )  
    attachment = requests.get(url=url, headers=headers).json()  
    attachment_data = attachment['data']  
    # Do the necessary OCR processing, save everything you need to the database.

Сам алгоритм максимально прост:

  1. Получить список сообщений можно по ссылке GmailApi.
  2. Загрузить данные каждого электронного сообщения.
  3. Извлекайте метаинформацию о каждом вложении из каждого сообщения.
  4. Передайте обработку вложения следующей задаче.
  5. Загрузите приложение, выполните дальнейшую обработку.

Все задействованные конечные точки:

  1. пользователи.сообщения.список
  2. пользователи.сообщения.получают
  3. пользователи.сообщения.вложения.получить

Преимущества такого подхода очевидны: вы передаете минимум необходимой информации во вторую задачу, вам не нужно где-то сохранять промежуточное состояние, возможность параллельной обработки вложений (Разделяй и властвуй).

<время работы/>

Второе Celery решение, которое приходит мне на ум, - это создать общую папку на сервере, в которую ваши сотрудники смогут сохранять и читать сохраненные вложения. Процесс будет выглядеть примерно так:

  1. В первом задании загрузите вложение. Сохраните данные в файл в общей папке, присвоив файлу уникальное имя. Передайте ссылку на файл во вторую задачу Celery.
  2. Во втором задании прочитайте данные из файла, выполните необходимую обработку распознавания текста и сохраните все необходимое в базе данных.
  3. Удалите файл с диска (в рамках той же задачи или передайте его следующему заданию).

Какие преимущества я вижу здесь:

  1. Вложенные данные загружаются в оперативную память только тогда, когда требуется выполнить некоторую обработку, например распознавание текста.
  2. В эпоху твердотельных накопителей чтение из файла и запись в него осуществляются очень быстро. Это будет значительно быстрее, чем при использовании промежуточного облачного хранилища, учитывая транспортные расходы на загрузку.
  3. Вы не увеличиваете ограничение на размер сообщения RabbitMQ.
  4. Вам не потребуется много места на диске, так как обработанные файлы будут удалены.
Вернуться на верх