Получение gzip-файла из конечной точки, извлечение данных и добавление в базу данных Улучшения

В настоящее время у меня есть сценарий, который выполняет запрос к конечной точке API, который возвращает файл csv.gzip, содержащий примерно 75 000 строк и 15 столбцов. Я загружаю эти файлы на дисковое хранилище веб-сервера, распаковываю файл до .csv, затем прохожусь по каждой строке и добавляю данные в базу данных. Наконец, я удаляю файлы, сохраненные на диске. В настоящее время процесс занимает от 5 до 10 минут.

Я уверен, что есть области для улучшения, но не уверен, как их реализовать. Вот некоторые из них:

  1. Сохранить данные csv в переменной, а не на диске.
  2. Массовый импорт данных в мою базу данных.

Я уверен, что есть и другие улучшения, которые можно сделать, поэтому любой совет будет принят с благодарностью.

response = oauth.get(realm)

content = ET.fromstring(response.content)
coded_string = (content.find('.//pricefile'))
decoded_string = base64.b64decode(coded_string.text)
with open('test.csv.gzip', 'wb') as f:
    f.write(decoded_string)
    with gzip.open('test.csv.gzip', 'rb') as f_in:
        with open('test.csv', 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)

with open('test.csv') as f:
    reader = csv.reader(f)
    next(reader)
    for row in reader:
        pricing.objects.update_or_create(
            product_id=row[0],
            date=datetime.now(),

            defaults={
                'average_price': Decimal(row[1] or 0),
                'low_price': Decimal(row[2] or 0),
                'high_price': Decimal(row[3] or 0),
                ...
            })

os.remove('test.csv')
os.remove('test.csv.gzip')

Вам, вероятно, следует использовать bulk_create. Что-то вроде:

pricing.objects.bulk_create(
    [
        pricing(
            product_id=row[0],
            date=datetime.now(),
            average_price=Decimal(row[1] or 0),
            …
        ),
        for row in reader
    ],
    update_conflicts=True,
    update_fields=["date", "average_price", …],
    unique_fields=["product_id"]
)

Пример, использующий psycopg2 напрямую. names.csv взят из базы данных имен социального обеспечения США и имеет 260000 строк на четыре колонки.

with open("/home/aklaver/software_projects/test_data/data_sets/names/names.csv", newline="") as csv_file:
   t1 = time.time()
   cur.execute('create temp table names_csv_tmp(name varchar, rank integer, gender varchar, year integer, PRIMARY KEY(name, gender,year))')
   cur.execute('create table if not exists names_csv(name varchar, rank integer, gender varchar, year integer, PRIMARY KEY(name, gender,year))')  
   cur.copy_expert("COPY names_csv_tmp FROM STDIN WITH CSV HEADER", csv_file)
   cur.execute("insert into names_csv select * from names_csv_tmp ON CONFLICT (name,gender, year) DO UPDATE SET name = EXCLUDED.name, gender=EXCLUDED.gender")
   cur.execute("drop table names_csv_tmp")
   con.commit()
   t2 = time.time()
   print(t2 - t1)

Для первого запуска, где таблица names_csv была пустой, а INSERT просто вставлял все записи из names_csv_tmp, время составило 3.66 секунд.

Для второго запуска, где ON CONFLICT сработали обновления, время выполнения составило 7.29 секунд.

Вернуться на верх