Ftplib не отправляет файлы в celery bot
Я использовал код отсюда для загрузки файлов в Selenium Grid VMs, но, когда я использую FTP, он не показывает ошибку и не отправляет файлы.
Код от ftp-бота
def is_connected(ftp):
try:
ftp.retrlines('LIST')
except (socket.timeout, OSError, AttributeError):
return False
return True
@shared_task()
def send_binary(file_name : str, file_path):
ftp = FTP(
FTP_FAT_HOST,
FTP_FAT_USER,
FTP_FAT_PASSWORD,
timeout=30
)
ftp.set_pasv(True)
ftp.encoding='utf-8'
if(not is_connected(ftp)):
ftp.login()
upload_to_ftp(file_path, ftp)
def upload_to_ftp(local_file, ftp : FTP):
try:
with open(local_file, 'rb') as file:
ftp.storbinary(f'STOR {os.path.basename(local_file)}', file)
except Exception as e:
print(f'Erro ao enviar {local_file}: {e}')
Код, который я использую для загрузки pdf и txt файлов из моих источников
def __download_file(self, file_name: str, target_directory: str) -> None:
if not os.path.exists(target_directory):
os.makedirs(target_directory)
contents = self.driver.execute(Command.DOWNLOAD_FILE, {"name": file_name})["value"]["contents"]
zip_target_file = os.path.join(target_directory, f"{file_name}.zip")
with open(zip_target_file, "wb") as file:
file.write(base64.b64decode(contents))
with zipfile.ZipFile(zip_target_file, "r") as zip_ref:
zip_ref.extractall(target_directory)
os.remove(zip_target_file)
Кто-нибудь знает другой способ заставить его работать?
Я попробовал создать бота в celery только для ftp, до этого он был внутри моего основного кода. Я пробовал менять ftp-сервер, я пробовал менять пассивность ftp и таймаут. Ничего не изменилось.