Получение gzip-файла из конечной точки, извлечение данных и добавление в базу данных Улучшения
В настоящее время у меня есть сценарий, который выполняет запрос к конечной точке API, который возвращает файл csv.gzip
, содержащий примерно 75 000 строк и 15 столбцов. Я загружаю эти файлы на дисковое хранилище веб-сервера, распаковываю файл до .csv
, затем прохожусь по каждой строке и добавляю данные в базу данных. Наконец, я удаляю файлы, сохраненные на диске. В настоящее время процесс занимает от 5 до 10 минут.
Я уверен, что есть области для улучшения, но не уверен, как их реализовать. Вот некоторые из них:
- Сохранить данные csv в переменной, а не на диске.
- Массовый импорт данных в мою базу данных.
Я уверен, что есть и другие улучшения, которые можно сделать, поэтому любой совет будет принят с благодарностью.
response = oauth.get(realm)
content = ET.fromstring(response.content)
coded_string = (content.find('.//pricefile'))
decoded_string = base64.b64decode(coded_string.text)
with open('test.csv.gzip', 'wb') as f:
f.write(decoded_string)
with gzip.open('test.csv.gzip', 'rb') as f_in:
with open('test.csv', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
with open('test.csv') as f:
reader = csv.reader(f)
next(reader)
for row in reader:
pricing.objects.update_or_create(
product_id=row[0],
date=datetime.now(),
defaults={
'average_price': Decimal(row[1] or 0),
'low_price': Decimal(row[2] or 0),
'high_price': Decimal(row[3] or 0),
...
})
os.remove('test.csv')
os.remove('test.csv.gzip')
Вам, вероятно, следует использовать bulk_create. Что-то вроде:
pricing.objects.bulk_create(
[
pricing(
product_id=row[0],
date=datetime.now(),
average_price=Decimal(row[1] or 0),
…
),
for row in reader
],
update_conflicts=True,
update_fields=["date", "average_price", …],
unique_fields=["product_id"]
)
Пример, использующий psycopg2
напрямую. names.csv
взят из базы данных имен социального обеспечения США и имеет 260000 строк на четыре колонки.
with open("/home/aklaver/software_projects/test_data/data_sets/names/names.csv", newline="") as csv_file:
t1 = time.time()
cur.execute('create temp table names_csv_tmp(name varchar, rank integer, gender varchar, year integer, PRIMARY KEY(name, gender,year))')
cur.execute('create table if not exists names_csv(name varchar, rank integer, gender varchar, year integer, PRIMARY KEY(name, gender,year))')
cur.copy_expert("COPY names_csv_tmp FROM STDIN WITH CSV HEADER", csv_file)
cur.execute("insert into names_csv select * from names_csv_tmp ON CONFLICT (name,gender, year) DO UPDATE SET name = EXCLUDED.name, gender=EXCLUDED.gender")
cur.execute("drop table names_csv_tmp")
con.commit()
t2 = time.time()
print(t2 - t1)
Для первого запуска, где таблица names_csv
была пустой, а INSERT
просто вставлял все записи из names_csv_tmp
, время составило 3.66
секунд.
Для второго запуска, где ON CONFLICT
сработали обновления, время выполнения составило 7.29
секунд.