Sisipan massal dengan SQLAlchemy ORM

131

Apakah ada cara agar SQLAlchemy melakukan penyisipan massal daripada memasukkan setiap objek individual. yaitu,

perbuatan:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

daripada:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

Saya baru saja mengonversi beberapa kode untuk menggunakan sqlalchemy daripada sql mentah dan meskipun sekarang jauh lebih baik untuk bekerja dengannya tampaknya lebih lambat sekarang (hingga faktor 10), saya bertanya-tanya apakah ini alasannya.

Mungkin saya bisa memperbaiki situasi menggunakan sesi dengan lebih efisien. Saat ini saya memiliki autoCommit=Falsedan melakukan session.commit()setelah saya menambahkan beberapa hal. Meskipun hal ini tampaknya menyebabkan data menjadi basi jika DB diubah di tempat lain, seperti meskipun saya melakukan kueri baru, saya masih mendapatkan hasil lama kembali?

Terima kasih atas bantuan Anda!

Nick Holden
sumber
1
Ini mungkin membantu: stackoverflow.com/questions/270879/…
Sean Vieira
1
Nick, saya mengerti ini adalah posting yang sangat lama. Apakah mungkin untuk memperbarui judul menjadi sesuatu yang benar seperti "penyisipan banyak catatan dengan SQLAlchemy ORM". Pernyataan penyisipan multi-rekaman seperti yang Anda berikan sangat berbeda dari operasi pemuatan massal di tingkat database. Sisipan massal ditujukan untuk unggahan 1k + data, biasanya dari kumpulan data besar dan dilakukan oleh manajer aplikasi, bukan operasi REST atau kode level aplikasi .... Mari gunakan nomenklatur kami dengan benar.
W4t3randWind
Bagi mereka yang tersandung pada pertanyaan ini saat mencari informasi tentang operasi massal di sqlalchemy Core (bukan ORM), lihat jawaban saya untuk pertanyaan lain .
Nickolay

Jawaban:

174

SQLAlchemy memperkenalkan itu dalam versi 1.0.0:

Operasi massal - dokumen SQLAlchemy

Dengan operasi ini, Anda sekarang dapat melakukan penyisipan atau pembaruan massal!

Misalnya, Anda dapat melakukan:

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

Di sini, penyisipan massal akan dibuat.

Pierre
sumber
30
Anda juga perlu s.commit () untuk benar-benar menyimpan catatan (saya perlu sedikit memikirkan yang ini).
horcle_buzz
3
Saya mencoba ini dengan sqlachemy 1.0.11 dan masih membuat 3 pernyataan penyisipan. Tetapi ini jauh lebih cepat daripada operasi orm normal.
zidarsk8
3
Meskipun tidak terkait dengan pertanyaan OP, perlu disebutkan bahwa ini merusak fitur tertentu dari ORM. docs.sqlalchemy.org/en/rel_1_0/orm/…
dangel
@dangel ya terima kasih telah memposting ini. Meskipun judul OP menyangkut "pemuatan massal", pertanyaannya tentang pernyataan penyisipan multi-rekaman tidak ada hubungannya dengan fitur pemuatan massal sqlalchemy.
W4t3randWind
Dibandingkan dengan memasukkan data yang sama dari CSV dengan \copydengan psql (dari klien yang sama ke server yang sama), saya melihat perbedaan besar dalam kinerja di sisi server yang menghasilkan sekitar 10x lebih banyak sisipan / s. Rupanya pemuatan massal menggunakan \copy(atau COPYdi server) menggunakan pengepakan dalam berkomunikasi dari klien-ke-server jauh lebih baik daripada menggunakan SQL melalui SQLAlchemy. Info lebih lanjut: bulk besar insert perbedaan kinerja PostgreSQL vs ... .
gertvdijk
42

Dokumentasi SQLAlchemy memiliki Langgan pada kinerja berbagai teknik yang dapat digunakan untuk menyisipkan massal:

ORM pada dasarnya tidak dimaksudkan untuk penyisipan massal berkinerja tinggi - inilah alasan utama SQLAlchemy menawarkan Core selain ORM sebagai komponen kelas satu.

Untuk kasus penggunaan sisipan massal cepat, pembuatan dan sistem eksekusi SQL yang di atasnya dibangun ORM adalah bagian dari Core. Dengan menggunakan sistem ini secara langsung, kita dapat menghasilkan INSERT yang kompetitif dengan menggunakan API database mentah secara langsung.

Alternatifnya, SQLAlchemy ORM menawarkan rangkaian metode Operasi Massal, yang menyediakan pengait ke subbagian unit proses kerja untuk memancarkan konstruksi INSERT dan UPDATE tingkat Inti dengan otomatisasi berbasis ORM tingkat kecil.

Contoh di bawah mengilustrasikan pengujian berbasis waktu untuk beberapa metode penyisipan baris yang berbeda, dari yang paling otomatis ke paling sedikit. Dengan cPython 2.7, runtime diamati:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

Naskah:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)
Grant Humphries
sumber
1
Terima kasih. Sangat membantu dan menyeluruh.
Steve B.
Saya melihat contoh lain menggunakan bindparams. Sintaksnya terlihat ringkas, apakah itu bagus?
Jay
35

Sejauh yang saya tahu, tidak ada cara agar ORM mengeluarkan sisipan massal. Saya percaya alasan yang mendasarinya adalah bahwa SQLAlchemy perlu melacak identitas setiap objek (yaitu, kunci primer baru), dan sisipan massal mengganggu itu. Misalnya, dengan asumsi footabel Anda berisi idkolom dan dipetakan ke Fookelas:

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

Karena SQLAlchemy mengambil nilainya x.idtanpa mengeluarkan kueri lain, kita dapat menyimpulkan bahwa SQLAlchemy mendapatkan nilainya langsung dari INSERTpernyataan. Jika Anda tidak memerlukan akses berikutnya ke objek yang dibuat melalui instance yang sama , Anda dapat melewati lapisan ORM untuk penyisipan Anda:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy tidak dapat mencocokkan baris baru ini dengan objek yang sudah ada, jadi Anda harus menanyakannya lagi untuk operasi selanjutnya.

Sejauh menyangkut data usang, perlu diingat bahwa sesi tidak memiliki cara bawaan untuk mengetahui kapan database diubah di luar sesi. Untuk mengakses data yang diubah secara eksternal melalui instance yang ada, instance tersebut harus ditandai sebagai kedaluwarsa . Ini terjadi secara default aktif session.commit(), tetapi dapat dilakukan secara manual dengan menelepon session.expire_all()atau session.expire(instance). Contoh (SQL dihilangkan):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit()kedaluwarsa x, jadi pernyataan cetak pertama secara implisit membuka transaksi baru dan xatribut kueri ulang . Jika Anda mengomentari pernyataan cetak pertama, Anda akan melihat bahwa yang kedua sekarang mengambil nilai yang benar, karena kueri baru tidak dikeluarkan hingga setelah pembaruan.

Ini masuk akal dari sudut pandang isolasi transaksional - Anda hanya boleh mengambil modifikasi eksternal antar transaksi. Jika hal ini menyebabkan masalah bagi Anda, saya sarankan untuk mengklarifikasi atau memikirkan ulang batasan transaksi aplikasi Anda daripada segera meraihnya session.expire_all().

dhaffey
sumber
Terima kasih atas balasan Anda, saya akan mencobanya. WRT masalah kedaluwarsa, apa yang saya lihat tidak persis sama. Saya menggunakan sesi terbatas di turbogears. Melakukan getSession (). Query (Foo) .filter .... all () mengembalikan hal yang berbeda tergantung pada permintaan, juga tidak mengembalikan catatan yang diperbarui yang ada di db sampai saya memulainya kembali. Saya memperbaiki masalah ini dengan melakukan autocommit = True dan menambahkan sesuatu yang .remove () d sesi setelah permintaan selesai (saya rasa Anda memang dimaksudkan untuk melakukannya).
Nick Holden
Saya kira itu mengembalikan hal yang berbeda tergantung pada permintaan karena memiliki sesi terbatas per utas di kumpulan dan sesi berada di status berbeda? Tampaknya agak aneh bahwa sa tidak akan mendapatkan data baru setelah permintaan baru. Saya rasa saya salah paham tentang apa yang dilakukan autocommit = False
Nick Holden
Dengan autocommit=False, saya yakin Anda harus menelepon session.commit()setelah permintaan selesai (saya tidak terbiasa dengan TurboGears, jadi abaikan ini jika itu ditangani untuk Anda di tingkat kerangka kerja). Selain memastikan perubahan Anda telah dilakukan ke database, ini akan membuat semua yang ada di sesi tidak berlaku lagi. Transaksi berikutnya tidak akan dimulai hingga penggunaan sesi itu berikutnya, jadi permintaan di masa mendatang pada utas yang sama tidak akan melihat data yang sudah usang.
dhaffey
10
Gaya alternatif:session.execute(Foo.__table__.insert(), values)
Joril
6
Perhatikan bahwa versi terbaru dari sqlalchemy memiliki kemampuan penyisipan massal: docs.sqlalchemy.org/en/latest/orm/…
Wayne Werner
18

Saya biasanya melakukannya dengan menggunakan add_all.

from app import session
from models import User

objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()
reubano
sumber
2
Apakah Anda yakin ini berhasil? Itu tidak hanya melakukan yang setara .adddengan memasukkan mereka ke sesi satu per satu?
Alec
Itu akan menjadi kontra intuitif mengingat nama metode, dokumen tidak menjelaskan secara rinci: Add the given collection of instances to this Session.Apakah Anda punya alasan untuk percaya itu tidak melakukan penyisipan massal?
reubano
3
Saya tidak berpikir itu terlalu berlawanan dengan intuisi - itu sebenarnya menambahkan semua hal yang Anda minta. Tidak ada tentang menambahkan semua hal ke sesi yang sepertinya akan menyiratkan apa yang mendasari pernyataan SQL yang dikeluarkan. Melihat sumbernya: github.com/zzzeek/sqlalchemy/blob/… sebenarnya tampaknya hanya .addsetiap item satu per satu.
Alec
Ini bekerja dengan baik, dibandingkan dengan bulk_save_objects(), dengan a flush(), kita bisa mendapatkan ID objek, tetapi bulk_save_objects()tidak bisa (acara dengan flush()panggilan).
coanor
14

Dukungan langsung telah ditambahkan ke SQLAlchemy pada versi 0.8

Sesuai dokumen , connection.execute(table.insert().values(data))harus melakukan trik. (Perhatikan bahwa ini tidak sama dengan connection.execute(table.insert(), data)yang menghasilkan banyak baris yang disisipkan melalui panggilan ke executemany). Dalam hal apa pun selain koneksi lokal, perbedaan kinerja bisa sangat besar.

pengguna3805082
sumber
10

SQLAlchemy memperkenalkan itu dalam versi 1.0.0:

Operasi massal - dokumen SQLAlchemy

Dengan operasi ini, Anda sekarang dapat melakukan penyisipan atau pembaruan massal!

Misalnya (jika Anda menginginkan overhead terendah untuk INSERT tabel sederhana), Anda dapat menggunakan Session.bulk_insert_mappings():

loadme = [(1, 'a'),
          (2, 'b'),
          (3, 'c')]
dicts = [dict(bar=t[0], fly=t[1]) for t in loadme]

s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()

Atau, jika Anda mau, lewati loadmetupel dan tulis kamus langsung ke dictsdalamnya (tetapi saya merasa lebih mudah untuk meninggalkan semua kata-kata dari data dan memuat daftar kamus dalam satu lingkaran).

juanitogan
sumber
7

Jawaban Piere benar tetapi satu masalah adalah bahwa bulk_save_objectssecara default tidak mengembalikan kunci utama objek, jika itu menjadi perhatian Anda. Atur return_defaultsuntuk Truemendapatkan perilaku ini.

Dokumentasinya ada di sini .

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
    assert foo.id is not None
session.commit()
Matthew Moisen
sumber
2
Perhatian harus diambil dengan bendera. Ini akan memasukkan satu objek pada satu waktu secara berurutan dan perolehan kinerja yang signifikan mungkin tidak ada [1]. Dalam kasus saya, kinerja menurun yang saya duga karena biaya overhead. [1]: docs.sqlalchemy.org/en/13/orm/…
dhfromkorea
6

Semua Jalan Menuju Roma , tetapi beberapa di antaranya melintasi pegunungan, membutuhkan kapal feri tetapi jika ingin cepat ke sana, gunakan saja jalan raya.


Dalam hal ini jalan tol akan menggunakan fitur execute_batch () dari psycopg2 . Dokumentasi mengatakan yang terbaik:

Implementasi saat executemany()ini (menggunakan pernyataan yang sangat amal) tidak terlalu berhasil. Fungsi ini dapat digunakan untuk mempercepat pengulangan eksekusi pernyataan terhadap sekumpulan parameter. Dengan mengurangi jumlah perjalanan pulang pergi server, kinerja dapat menjadi lipat lebih baik daripada menggunakan executemany().

Dalam pengujian saya sendiri execute_batch()adalah sekitar dua kali lebih cepat sebagai executemany(), dan memberikan pilihan untuk mengkonfigurasi page_size untuk tweaker lebih lanjut (jika Anda ingin memeras terakhir 2-3% dari kinerja dari pengemudi).

Fitur yang sama dapat dengan mudah diaktifkan jika Anda menggunakan SQLAlchemy dengan menyetel use_batch_mode=Truesebagai parameter saat Anda membuat instance mesin dengancreate_engine()

chjortlund.dll
sumber
Catatan: psycopg2 ini execute_valuesadalah lebih cepat daripada psycopg2 ini execute_batchketika melakukan menyisipkan massal!
Fierr
5

Ini caranya:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])

Ini akan menyisipkan seperti ini:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

Referensi: FAQ SQLAlchemy menyertakan tolok ukur untuk berbagai metode komit.

Eefret
sumber
3

Jawaban terbaik yang saya temukan sejauh ini adalah di dokumentasi sqlalchemy:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#im-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

Ada contoh lengkap dari tolok ukur solusi yang mungkin.

Seperti yang ditunjukkan dalam dokumentasi:

bulk_save_objects bukanlah solusi terbaik tetapi kinerjanya benar.

Implementasi terbaik kedua dalam hal keterbacaan yang saya pikir adalah dengan SQLAlchemy Core:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
            [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )

Konteks fungsi ini diberikan dalam artikel dokumentasi.

lelabo_m
sumber