Отправка больших файлов base64 через RabbitMQ для использования рабочими
Я использую RabbitMQ и Celery для обработки вложений электронной почты с помощью gmail API. В моем первом celery task
он извлекает пакеты электронных писем с большими вложениями в строках base64 размером более 25 Мб на файл. Текущее ограничение по умолчанию для RabbitMQ составляет 16 Мб, но я не хочу его повышать, потому что прочитал несколько статей о том, как лучше сохранять небольшой размер сообщения.
Как это лучше всего сделать? В то время как первой задачей является получение электронных писем, я хочу создать несколько других celery workers
, которые обрабатывают эти файлы (с распознаванием текста и сохранением его в базе данных) одновременно, чтобы оптимизировать скорость процесса.
Несколько решений (я не уверен, что это хорошая практика, потому что я новичок) Я придумал:
Повышение лимита на размер сообщения RabbitMQ
Сохранение файла в памяти и ссылка на него во второй задаче celery (не уверен, что это хорошая идея, потому что на моем сервере, который я запускаю, 32 ГБ оперативной памяти)
В первой задаче celery, которая получает электронные письма, я могу напрямую загрузить их в облачный сервис хранения, а затем сослаться на этот URL-адрес файла во второй задаче celery. Но недостатком этого является то, что мне пришлось бы загрузить файл, а затем повторно загрузить его, чтобы выполнить распознавание текста, что не кажется эффективным. (Также увеличились затраты из-за использования полосы пропускания)
Есть ли здесь другое решение моей проблемы с дизайном?
В вашем случае, возможно, имеет смысл разделить вашу первую большую задачу, которая заключается в загрузке почтовых пакетов и вложений к ним, на две более мелкие задачи. Поскольку ваши вложения большие, их можно загрузить с отдельной конечной точки (во второй задаче).
В первом задании вы извлекаете и обрабатываете электронные письма из GmailAPI
и передаете их следующему заданию: userId
, messageId
, attachmentId
, access_token
.
Во втором задании вы загружаете вложение и сразу же выполняете распознавание текста и сохраняете результаты в базе данных.
Вот упрощенный код для этого решения:
import requests
from celery import shared_task
API_URL = 'https://gmail.googleapis.com'
@shared_task
def process_user_mails(user_id, access_token):
def iter_user_mail_ids_from_first_page():
url = f'{API_URL}/gmail/v1/users/{user_id}/messages'
mails_data = requests.get(url=url, headers=headers).json()
for mail in mails_data['messages']:
yield mail['id']
def load_user_mail_message():
url = f'{API_URL}/gmail/v1/users/{user_id}/messages/{mail_id}'
return requests.get(url=url, headers=headers).json()
def iter_attachments_meta(payload):
if payload.get('filename'):
yield {
'mime_type': payload['mimeType'],
'filename': payload['filename'],
'attachment_id': payload['body']['attachmentId'],
}
for part in payload.get('parts', []):
yield from iter_attachments_meta(part)
headers = {'Authorization': f'Bearer {access_token}'}
for mail_id in iter_user_mail_ids_from_first_page():
mail_message = load_user_mail_message()
attachments_it = iter_attachments_meta(mail_message.get('payload', []))
for attachment_meta in attachments_it:
process_attachment.delay(
user_id=user_id,
access_token=access_token,
mail_id=mail_id,
attachment_meta=attachment_meta,
)
@shared_task
def process_attachment(user_id, access_token, mail_id, attachment_meta):
attachment_id = attachment_meta['attachment_id']
headers = {'Authorization': f'Bearer {access_token}'}
url = (
f'{API_URL}/gmail/v1/users/{user_id}/messages/'
f'{mail_id}/attachments/{attachment_id}'
)
attachment = requests.get(url=url, headers=headers).json()
attachment_data = attachment['data']
# Do the necessary OCR processing, save everything you need to the database.
Сам алгоритм максимально прост:
- Получить список сообщений можно по ссылке
GmailApi
. - Загрузить данные каждого электронного сообщения.
- Извлекайте метаинформацию о каждом вложении из каждого сообщения.
- Передайте обработку вложения следующей задаче.
- Загрузите приложение, выполните дальнейшую обработку.
Все задействованные конечные точки:
- пользователи.сообщения.список
- пользователи.сообщения.получают
- пользователи.сообщения.вложения.получить
Преимущества такого подхода очевидны: вы передаете минимум необходимой информации во вторую задачу, вам не нужно где-то сохранять промежуточное состояние, возможность параллельной обработки вложений (Разделяй и властвуй).
<время работы/>Второе Celery
решение, которое приходит мне на ум, - это создать общую папку на сервере, в которую ваши сотрудники смогут сохранять и читать сохраненные вложения. Процесс будет выглядеть примерно так:
- В первом задании загрузите вложение. Сохраните данные в файл в общей папке, присвоив файлу уникальное имя. Передайте ссылку на файл во вторую задачу
Celery
. - Во втором задании прочитайте данные из файла, выполните необходимую обработку распознавания текста и сохраните все необходимое в базе данных.
- Удалите файл с диска (в рамках той же задачи или передайте его следующему заданию).
Какие преимущества я вижу здесь:
- Вложенные данные загружаются в оперативную память только тогда, когда требуется выполнить некоторую обработку, например распознавание текста.
- В эпоху твердотельных накопителей чтение из файла и запись в него осуществляются очень быстро. Это будет значительно быстрее, чем при использовании промежуточного облачного хранилища, учитывая транспортные расходы на загрузку.
- Вы не увеличиваете ограничение на размер сообщения
RabbitMQ
. - Вам не потребуется много места на диске, так как обработанные файлы будут удалены.