bulk insert

This commit is contained in:
Mike Bayer 2014-09-13 15:47:30 -04:00
parent 4a34fd8027
commit 62885198ac
3 changed files with 187 additions and 184 deletions

View File

@ -1,4 +1,4 @@
from .fixtures import TestBase
from .assertions import eq_, ne_
from .assertions import eq_, ne_, assert_raises_message
from sqlalchemy.testing import config

View File

@ -24,6 +24,7 @@ from contextlib import contextmanager
from sqlalchemy.testing.fixtures import TestBase
from .assertions import _get_dialect, eq_
from . import mock
testing_config = configparser.ConfigParser()
testing_config.read(['test.cfg'])

View File

@ -6,197 +6,198 @@ from sqlalchemy.sql import table, column
from sqlalchemy import Table, Column, MetaData
from sqlalchemy.types import TypeEngine
from . import op_fixture, eq_, assert_raises_message
from alembic.testing.fixtures import op_fixture, TestBase
from alembic.testing import eq_, assert_raises_message, config
def _table_fixture(dialect, as_sql):
context = op_fixture(dialect, as_sql)
t1 = table("ins_table",
column('id', Integer),
column('v1', String()),
column('v2', String()),
)
return context, t1
class BulkInsertTest(TestBase):
def _table_fixture(self, dialect, as_sql):
context = op_fixture(dialect, as_sql)
t1 = table("ins_table",
column('id', Integer),
column('v1', String()),
column('v2', String()),
)
return context, t1
def _big_t_table_fixture(self, dialect, as_sql):
context = op_fixture(dialect, as_sql)
t1 = Table("ins_table", MetaData(),
Column('id', Integer, primary_key=True),
Column('v1', String()),
Column('v2', String()),
)
return context, t1
def _test_bulk_insert(self, dialect, as_sql):
context, t1 = self._table_fixture(dialect, as_sql)
op.bulk_insert(t1, [
{'id': 1, 'v1': 'row v1', 'v2': 'row v5'},
{'id': 2, 'v1': 'row v2', 'v2': 'row v6'},
{'id': 3, 'v1': 'row v3', 'v2': 'row v7'},
{'id': 4, 'v1': 'row v4', 'v2': 'row v8'},
])
return context
def _test_bulk_insert_single(self, dialect, as_sql):
context, t1 = self._table_fixture(dialect, as_sql)
op.bulk_insert(t1, [
{'id': 1, 'v1': 'row v1', 'v2': 'row v5'},
])
return context
def _test_bulk_insert_single_bigt(self, dialect, as_sql):
context, t1 = self._big_t_table_fixture(dialect, as_sql)
op.bulk_insert(t1, [
{'id': 1, 'v1': 'row v1', 'v2': 'row v5'},
])
return context
def test_bulk_insert(self):
context = self._test_bulk_insert('default', False)
context.assert_(
'INSERT INTO ins_table (id, v1, v2) VALUES (:id, :v1, :v2)'
)
def test_bulk_insert_wrong_cols(self):
context = op_fixture('postgresql')
t1 = table("ins_table",
column('id', Integer),
column('v1', String()),
column('v2', String()),
)
op.bulk_insert(t1, [
{'v1': 'row v1', },
])
context.assert_(
'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
)
def test_bulk_insert_no_rows(self):
context, t1 = self._table_fixture('default', False)
op.bulk_insert(t1, [])
context.assert_()
def test_bulk_insert_pg(self):
context = self._test_bulk_insert('postgresql', False)
context.assert_(
'INSERT INTO ins_table (id, v1, v2) '
'VALUES (%(id)s, %(v1)s, %(v2)s)'
)
def test_bulk_insert_pg_single(self):
context = self._test_bulk_insert_single('postgresql', False)
context.assert_(
'INSERT INTO ins_table (id, v1, v2) '
'VALUES (%(id)s, %(v1)s, %(v2)s)'
)
def test_bulk_insert_pg_single_as_sql(self):
context = self._test_bulk_insert_single('postgresql', True)
context.assert_(
"INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')"
)
def test_bulk_insert_pg_single_big_t_as_sql(self):
context = self._test_bulk_insert_single_bigt('postgresql', True)
context.assert_(
"INSERT INTO ins_table (id, v1, v2) "
"VALUES (1, 'row v1', 'row v5')"
)
def test_bulk_insert_mssql(self):
context = self._test_bulk_insert('mssql', False)
context.assert_(
'INSERT INTO ins_table (id, v1, v2) VALUES (:id, :v1, :v2)'
)
def test_bulk_insert_inline_literal_as_sql(self):
context = op_fixture('postgresql', True)
class MyType(TypeEngine):
pass
t1 = table('t', column('id', Integer), column('data', MyType()))
op.bulk_insert(t1, [
{'id': 1, 'data': op.inline_literal('d1')},
{'id': 2, 'data': op.inline_literal('d2')},
])
context.assert_(
"INSERT INTO t (id, data) VALUES (1, 'd1')",
"INSERT INTO t (id, data) VALUES (2, 'd2')"
)
def test_bulk_insert_as_sql(self):
context = self._test_bulk_insert('default', True)
context.assert_(
"INSERT INTO ins_table (id, v1, v2) "
"VALUES (1, 'row v1', 'row v5')",
"INSERT INTO ins_table (id, v1, v2) "
"VALUES (2, 'row v2', 'row v6')",
"INSERT INTO ins_table (id, v1, v2) "
"VALUES (3, 'row v3', 'row v7')",
"INSERT INTO ins_table (id, v1, v2) "
"VALUES (4, 'row v4', 'row v8')"
)
def test_bulk_insert_as_sql_pg(self):
context = self._test_bulk_insert('postgresql', True)
context.assert_(
"INSERT INTO ins_table (id, v1, v2) "
"VALUES (1, 'row v1', 'row v5')",
"INSERT INTO ins_table (id, v1, v2) "
"VALUES (2, 'row v2', 'row v6')",
"INSERT INTO ins_table (id, v1, v2) "
"VALUES (3, 'row v3', 'row v7')",
"INSERT INTO ins_table (id, v1, v2) "
"VALUES (4, 'row v4', 'row v8')"
)
def test_bulk_insert_as_sql_mssql(self):
context = self._test_bulk_insert('mssql', True)
# SQL server requires IDENTITY_INSERT
# TODO: figure out if this is safe to enable for a table that
# doesn't have an IDENTITY column
context.assert_(
'SET IDENTITY_INSERT ins_table ON',
"INSERT INTO ins_table (id, v1, v2) "
"VALUES (1, 'row v1', 'row v5')",
"INSERT INTO ins_table (id, v1, v2) "
"VALUES (2, 'row v2', 'row v6')",
"INSERT INTO ins_table (id, v1, v2) "
"VALUES (3, 'row v3', 'row v7')",
"INSERT INTO ins_table (id, v1, v2) "
"VALUES (4, 'row v4', 'row v8')",
'SET IDENTITY_INSERT ins_table OFF'
)
def test_invalid_format(self):
context, t1 = self._table_fixture("sqlite", False)
assert_raises_message(
TypeError,
"List expected",
op.bulk_insert, t1, {"id": 5}
)
assert_raises_message(
TypeError,
"List of dictionaries expected",
op.bulk_insert, t1, [(5, )]
)
def _big_t_table_fixture(dialect, as_sql):
context = op_fixture(dialect, as_sql)
t1 = Table("ins_table", MetaData(),
Column('id', Integer, primary_key=True),
Column('v1', String()),
Column('v2', String()),
)
return context, t1
def _test_bulk_insert(dialect, as_sql):
context, t1 = _table_fixture(dialect, as_sql)
op.bulk_insert(t1, [
{'id': 1, 'v1': 'row v1', 'v2': 'row v5'},
{'id': 2, 'v1': 'row v2', 'v2': 'row v6'},
{'id': 3, 'v1': 'row v3', 'v2': 'row v7'},
{'id': 4, 'v1': 'row v4', 'v2': 'row v8'},
])
return context
def _test_bulk_insert_single(dialect, as_sql):
context, t1 = _table_fixture(dialect, as_sql)
op.bulk_insert(t1, [
{'id': 1, 'v1': 'row v1', 'v2': 'row v5'},
])
return context
def _test_bulk_insert_single_bigt(dialect, as_sql):
context, t1 = _big_t_table_fixture(dialect, as_sql)
op.bulk_insert(t1, [
{'id': 1, 'v1': 'row v1', 'v2': 'row v5'},
])
return context
def test_bulk_insert():
context = _test_bulk_insert('default', False)
context.assert_(
'INSERT INTO ins_table (id, v1, v2) VALUES (:id, :v1, :v2)'
)
def test_bulk_insert_wrong_cols():
context = op_fixture('postgresql')
t1 = table("ins_table",
column('id', Integer),
column('v1', String()),
column('v2', String()),
)
op.bulk_insert(t1, [
{'v1': 'row v1', },
])
context.assert_(
'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
)
def test_bulk_insert_no_rows():
context, t1 = _table_fixture('default', False)
op.bulk_insert(t1, [])
context.assert_()
def test_bulk_insert_pg():
context = _test_bulk_insert('postgresql', False)
context.assert_(
'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
)
def test_bulk_insert_pg_single():
context = _test_bulk_insert_single('postgresql', False)
context.assert_(
'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
)
def test_bulk_insert_pg_single_as_sql():
context = _test_bulk_insert_single('postgresql', True)
context.assert_(
"INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')"
)
def test_bulk_insert_pg_single_big_t_as_sql():
context = _test_bulk_insert_single_bigt('postgresql', True)
context.assert_(
"INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')"
)
def test_bulk_insert_mssql():
context = _test_bulk_insert('mssql', False)
context.assert_(
'INSERT INTO ins_table (id, v1, v2) VALUES (:id, :v1, :v2)'
)
def test_bulk_insert_inline_literal_as_sql():
context = op_fixture('postgresql', True)
class MyType(TypeEngine):
pass
t1 = table('t', column('id', Integer), column('data', MyType()))
op.bulk_insert(t1, [
{'id': 1, 'data': op.inline_literal('d1')},
{'id': 2, 'data': op.inline_literal('d2')},
])
context.assert_(
"INSERT INTO t (id, data) VALUES (1, 'd1')",
"INSERT INTO t (id, data) VALUES (2, 'd2')"
)
def test_bulk_insert_as_sql():
context = _test_bulk_insert('default', True)
context.assert_(
"INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')",
"INSERT INTO ins_table (id, v1, v2) VALUES (2, 'row v2', 'row v6')",
"INSERT INTO ins_table (id, v1, v2) VALUES (3, 'row v3', 'row v7')",
"INSERT INTO ins_table (id, v1, v2) VALUES (4, 'row v4', 'row v8')"
)
def test_bulk_insert_as_sql_pg():
context = _test_bulk_insert('postgresql', True)
context.assert_(
"INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')",
"INSERT INTO ins_table (id, v1, v2) VALUES (2, 'row v2', 'row v6')",
"INSERT INTO ins_table (id, v1, v2) VALUES (3, 'row v3', 'row v7')",
"INSERT INTO ins_table (id, v1, v2) VALUES (4, 'row v4', 'row v8')"
)
def test_bulk_insert_as_sql_mssql():
context = _test_bulk_insert('mssql', True)
# SQL server requires IDENTITY_INSERT
# TODO: figure out if this is safe to enable for a table that
# doesn't have an IDENTITY column
context.assert_(
'SET IDENTITY_INSERT ins_table ON',
"INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')",
"INSERT INTO ins_table (id, v1, v2) VALUES (2, 'row v2', 'row v6')",
"INSERT INTO ins_table (id, v1, v2) VALUES (3, 'row v3', 'row v7')",
"INSERT INTO ins_table (id, v1, v2) VALUES (4, 'row v4', 'row v8')",
'SET IDENTITY_INSERT ins_table OFF'
)
def test_invalid_format():
context, t1 = _table_fixture("sqlite", False)
assert_raises_message(
TypeError,
"List expected",
op.bulk_insert, t1, {"id": 5}
)
assert_raises_message(
TypeError,
"List of dictionaries expected",
op.bulk_insert, t1, [(5, )]
)
class RoundTripTest(TestCase):
class RoundTripTest(TestBase):
__only_on__ = "sqlite"
def setUp(self):
from sqlalchemy import create_engine
from alembic.migration import MigrationContext
self.conn = create_engine("sqlite://").connect()
self.conn = config.db.connect()
self.conn.execute("""
create table foo(
id integer primary key,
@ -213,6 +214,7 @@ class RoundTripTest(TestCase):
)
def tearDown(self):
self.conn.execute("drop table foo")
self.conn.close()
def test_single_insert_round_trip(self):