Проблемы с шифрованием/дешифрованием файлов с помощью pycryptodome
Недавно мы обновили наш проект с Python 2.7 и Django 1.11 до Python 3.11 и Django 3.2. При этом мы перешли от использования библиотеки pycrypto к pycryptodome для криптографических операций.
Наше Django-приложение обрабатывает загрузку и скачивание файлов с шифрованием и дешифрованием следующим образом:
Загрузка файлов: Когда пользователь загружает файл, он шифруется и хранится в облачном хранилище Google (GCS) с расширением .enc. Загрузка файлов: Когда пользователь запрашивает загрузку файла, Django извлекает файл из GCS, расшифровывает его, а затем отправляет расшифрованный файл пользователю. После переноса мы столкнулись с проблемами, связанными с процессом шифрования и дешифрования. В частности, файлы неправильно шифруются и расшифровываются, что приводит к повреждению загружаемых файлов.
Вот краткое описание нашего текущего подхода:
Encryption: Использование pycryptodome для шифрования файлов перед загрузкой в GCS. Дешифрование: Использование pycryptodome для расшифровки файлов после загрузки с GCS. Кто-нибудь еще сталкивался с подобными проблемами после обновления до Python 3.11 и Django 3.2? Любые соображения или решения для обеспечения правильного шифрования и дешифрования с помощью pycryptodome в этой установке будут высоко оценены.
Вот мой crypto.py, который обрабатывает шифрование и дешифрование
from Crypto.Cipher import AES
from Crypto import Random
from django.conf import settings
import hashlib
class Crypto:
__password = settings.CRYPTO_KEY
__key = hashlib.md5(__password.encode())
__key = __key.hexdigest()
@staticmethod
def __pad(sa):
padding_size = AES.block_size - len(sa) % AES.block_size
if isinstance(sa, str):
sa = sa.encode('utf-8')
return (sa + b"\0" * (AES.block_size - len(sa) % AES.block_size)), padding_size
def encrypt(self, message):
"""
Encrypts a plaintext using the key that is initialised during the init of the class constructor.
The padding bytes are also attached in this along with the padding size.
:param message:
:return:
"""
message, padding_size = self.__pad(message)
iv = Random.new().read(AES.block_size)
self.__key = bytes(self.__key, 'utf-8')
cipher = AES.new(self.__key, AES.MODE_CFB, iv)
# python3migration - might cause an error here
enc_bytes = iv + cipher.encrypt(message) + bytes(bytearray([padding_size]))
return enc_bytes
def decrypt(self, ciphertext):
"""
Takes the encrypted content and decrypts using the key that has been initialised and returns the plaintext.
The padding size is also removed from the text here.
:param ciphertext: Encrypted content in bytes format
:return: Decrypted plaintext
"""
iv = ciphertext[:AES.block_size]
cipher = AES.new(self.__key.encode(), AES.MODE_CFB, iv)
plaintext = cipher.decrypt(ciphertext[AES.block_size:])
padding_size = plaintext[-1]
return plaintext[:-padding_size]
# the code was used before the python3 migration
# iv = ciphertext[:AES.block_size]
# cipher = AES.new(self.__key, AES.MODE_CFB, iv)
# plaintext = cipher.decrypt(ciphertext[AES.block_size:-1])
# padding_size = int(ciphertext[-1].encode('hex'), 16) * (-1)
# return plaintext[:padding_size]
Вот код, который загружает файл с GCS, расшифровывает его и отправляет загруженное содержимое пользователю:
def get_file(self, request):
"""
Takes input as request param and returns the response as attachment content.
This function is exposed to the public and does not need any permission. Any file which is shared through app
and is encrypted will need to be called through this function to be decrypted.
Any file that is encrypted and stored in GCS has a .enc extension. On the model the URL returned from the file
has a Signature, GoogleAccessId and Expiry attached to it.
This gets the file from GCS - where it validates the Expiry, AccessId and Signature
Then this file is decrypted via the crypto library, which is in common/crypto.
:param request:
:return:
"""
_filename = request.GET.get('file', None)
_expires = request.GET.get('Expires', None)
_google_access_id = request.GET.get('GoogleAccessId', None)
_signature = request.GET.get('Signature', None)
if _filename and _expires and _google_access_id and _signature:
# valid
crypto = Crypto()
gcs_token = 'Expires=' + _expires + 'GoogleAccessId=' + _google_access_id + '&Signature=' + urllib.request.pathname2url(
_signature)
file_url = settings.GCS_API_ENDPOINT + '/' + settings.GS_BUCKET_NAME + '/' + _filename + '?' + gcs_token
response = requests.get(file_url, stream=True)
if response.status_code == 200:
content = response.raw.read()
decrypted_content = crypto.decrypt(content)
_filename = _filename[:-4]
_filename = _filename.split('/')[-1]
mime = magic.Magic(mime=True)
file_to_send = ContentFile(decrypted_content)
content_type = mime.from_buffer(decrypted_content)
response = HttpResponse(file_to_send, content_type)
response['Content-Length'] = file_to_send.size
response['Content-Disposition'] = 'attachment; filename="' + _filename + '"'
return response
else:
return JsonResponse({'message': 'The Key is invalid'}, status=status.HTTP_400_BAD_REQUEST)
else:
return JsonResponse({'message': 'Invalid URL'}, status=status.HTTP_400_BAD_REQUEST)