diff --git a/alembic/batch.py b/alembic/batch.py index 1006739..6f6306b 100644 --- a/alembic/batch.py +++ b/alembic/batch.py @@ -23,7 +23,7 @@ class BatchOperationsImpl(object): self.recreate = recreate self.copy_from = copy_from self.table_args = table_args - self.table_kwargs = table_kwargs + self.table_kwargs = dict(table_kwargs) self.reflect_args = reflect_args self.reflect_kwargs = reflect_kwargs self.naming_convention = naming_convention @@ -139,11 +139,15 @@ class ApplyBatchImpl(object): for idx in self.table.indexes: self.indexes[idx.name] = idx + for k in self.table.kwargs: + self.table_kwargs.setdefault(k, self.table.kwargs[k]) + def _transfer_elements_to_new_table(self): assert self.new_table is None, "Can only create new table once" m = MetaData() schema = self.table.schema + self.new_table = new_table = Table( '_alembic_batch_temp', m, *(list(self.columns.values()) + list(self.table_args)), @@ -264,6 +268,10 @@ class ApplyBatchImpl(object): def add_constraint(self, const): if not const.name: raise ValueError("Constraint must have a name") + if isinstance(const, sql_schema.PrimaryKeyConstraint): + if self.table.primary_key in self.unnamed_constraints: + self.unnamed_constraints.remove(self.table.primary_key) + self.named_constraints[const.name] = const def drop_constraint(self, const): diff --git a/alembic/ddl/postgresql.py b/alembic/ddl/postgresql.py index 9f97b34..faeabad 100644 --- a/alembic/ddl/postgresql.py +++ b/alembic/ddl/postgresql.py @@ -23,7 +23,8 @@ class PostgresqlImpl(DefaultImpl): def prep_table_for_batch(self, table): for constraint in table.constraints: - self.drop_constraint(constraint) + if constraint.name is not None: + self.drop_constraint(constraint) def compare_server_default(self, inspector_column, metadata_column, diff --git a/alembic/operations.py b/alembic/operations.py index 2bf8060..dcacd06 100644 --- a/alembic/operations.py +++ b/alembic/operations.py @@ -67,11 +67,12 @@ class Operations(object): def _primary_key_constraint(self, name, table_name, cols, schema=None): m = self._metadata() columns = [sa_schema.Column(n, NULLTYPE) for n in cols] - t1 = sa_schema.Table(table_name, m, - *columns, - schema=schema) - p = sa_schema.PrimaryKeyConstraint(*columns, name=name) - t1.append_constraint(p) + t = sa_schema.Table( + table_name, m, + *columns, + schema=schema) + p = sa_schema.PrimaryKeyConstraint(*[t.c[n] for n in cols], name=name) + t.append_constraint(p) return p def _foreign_key_constraint(self, name, source, referent, @@ -1356,7 +1357,7 @@ class BatchOperations(Operations): return super(BatchOperations, self).drop_column( self.impl.table_name, column_name, schema=self.impl.schema) - def create_primary_key(self, name, cols): + def create_primary_key(self, name, cols, **kw): """Issue a "create primary key" instruction using the current batch migration context. @@ -1368,7 +1369,9 @@ class BatchOperations(Operations): :meth:`.Operations.create_primary_key` """ - raise NotImplementedError("not yet implemented") + kw['schema'] = self.impl.schema + return super(BatchOperations, self).create_primary_key( + name, self.impl.table_name, cols, **kw) def create_foreign_key( self, name, referent, local_cols, remote_cols, **kw): @@ -1422,7 +1425,9 @@ class BatchOperations(Operations): :meth:`.Operations.create_check_constraint` """ - raise NotImplementedError("not yet implemented") + kw['schema'] = self.impl.schema + return super(BatchOperations, self).create_check_constraint( + name, self.impl.table_name, condition, **kw) def drop_constraint(self, name, type_=None): """Issue a "drop constraint" instruction using the diff --git a/alembic/testing/requirements.py b/alembic/testing/requirements.py index b981951..2889ea5 100644 --- a/alembic/testing/requirements.py +++ b/alembic/testing/requirements.py @@ -31,6 +31,17 @@ class SuiteRequirements(Requirements): "MATCH for foreign keys added in SQLAlchemy 0.8.0" ) + @property + def check_constraints_w_enforcement(self): + """Target database must support check constraints + and also enforce them.""" + + return exclusions.open() + + @property + def reflects_pk_names(self): + return exclusions.closed() + @property def fail_before_sqla_079(self): return exclusions.fails_if( diff --git a/docs/build/changelog.rst b/docs/build/changelog.rst index 193c87f..473a1eb 100644 --- a/docs/build/changelog.rst +++ b/docs/build/changelog.rst @@ -6,6 +6,16 @@ Changelog .. changelog:: :version: 0.7.7 + .. change:: + :tags: feature, batch + :tickets: 305 + + Implemented support for :meth:`.BatchOperations.create_primary_key` + and :meth:`.BatchOperations.create_check_constraint`. Additionally, + table keyword arguments are copied from the original reflected table, + such as the "mysql_engine" keyword argument. + + .. change:: :tags: bug, environment :tickets: 300 diff --git a/tests/requirements.py b/tests/requirements.py index 0304919..c5d538d 100644 --- a/tests/requirements.py +++ b/tests/requirements.py @@ -40,6 +40,10 @@ class DefaultRequirements(SuiteRequirements): and not util.sqla_100 ) + @property + def check_constraints_w_enforcement(self): + return exclusions.fails_on("mysql") + @property def unnamed_constraints(self): """constraints without names are supported.""" @@ -53,3 +57,10 @@ class DefaultRequirements(SuiteRequirements): @property def reflects_unique_constraints_unambiguously(self): return exclusions.fails_on("mysql") + + @property + def reflects_pk_names(self): + """Target driver reflects the name of primary key constraints.""" + + return exclusions.fails_on_everything_except( + 'postgresql', 'oracle', 'mssql', 'sybase') diff --git a/tests/test_batch.py b/tests/test_batch.py index a498c36..ce16422 100644 --- a/tests/test_batch.py +++ b/tests/test_batch.py @@ -4,6 +4,7 @@ import re import io from alembic.testing import exclusions +from alembic.testing import assert_raises_message from alembic.testing import TestBase, eq_, config from alembic.testing.fixtures import op_fixture from alembic.testing import mock @@ -16,8 +17,9 @@ from sqlalchemy import Integer, Table, Column, String, MetaData, ForeignKey, \ UniqueConstraint, ForeignKeyConstraint, Index, Boolean, CheckConstraint, \ Enum from sqlalchemy.engine.reflection import Inspector -from sqlalchemy.sql import column +from sqlalchemy.sql import column, text from sqlalchemy.schema import CreateTable, CreateIndex +from sqlalchemy import exc class BatchApplyTest(TestBase): @@ -627,6 +629,50 @@ class BatchAPITest(TestBase): self.mock_schema.UniqueConstraint())] ) + def test_create_pk(self): + with self._fixture() as batch: + batch.create_primary_key('pk1', ['a', 'b']) + + eq_( + self.mock_schema.Table().c.__getitem__.mock_calls, + [mock.call('a'), mock.call('b')] + ) + + eq_( + self.mock_schema.PrimaryKeyConstraint.mock_calls, + [ + mock.call( + self.mock_schema.Table().c.__getitem__(), + self.mock_schema.Table().c.__getitem__(), + name='pk1' + ) + ] + ) + eq_( + batch.impl.operations.impl.mock_calls, + [mock.call.add_constraint( + self.mock_schema.PrimaryKeyConstraint())] + ) + + def test_create_check(self): + expr = text("a > b") + with self._fixture() as batch: + batch.create_check_constraint('ck1', expr) + + eq_( + self.mock_schema.CheckConstraint.mock_calls, + [ + mock.call( + expr, name="ck1" + ) + ] + ) + eq_( + batch.impl.operations.impl.mock_calls, + [mock.call.add_constraint( + self.mock_schema.CheckConstraint())] + ) + def test_drop_constraint(self): with self._fixture() as batch: batch.drop_constraint('uq1') @@ -795,6 +841,25 @@ class BatchRoundTripTest(TestBase): context = MigrationContext.configure(self.conn) self.op = Operations(context) + def _no_pk_fixture(self): + nopk = Table( + 'nopk', self.metadata, + Column('a', Integer), + Column('b', Integer), + Column('c', Integer), + mysql_engine='InnoDB' + ) + nopk.create(self.conn) + self.conn.execute( + nopk.insert(), + [ + {"a": 1, "b": 2, "c": 3}, + {"a": 2, "b": 4, "c": 5}, + ] + + ) + return nopk + def tearDown(self): self.metadata.drop_all(self.conn) self.conn.close() @@ -854,6 +919,32 @@ class BatchRoundTripTest(TestBase): {"id": 5, "x": 9} ]) + def test_add_pk_constraint(self): + self._no_pk_fixture() + with self.op.batch_alter_table("nopk", recreate="always") as batch_op: + batch_op.create_primary_key('newpk', ['a', 'b']) + + pk_const = Inspector.from_engine(self.conn).get_pk_constraint('nopk') + with config.requirements.reflects_pk_names.fail_if(): + eq_(pk_const['name'], 'newpk') + eq_(pk_const['constrained_columns'], ['a', 'b']) + + @config.requirements.check_constraints_w_enforcement + def test_add_ck_constraint(self): + with self.op.batch_alter_table("foo", recreate="always") as batch_op: + batch_op.create_check_constraint("newck", text("x > 0")) + + # we dont support reflection of CHECK constraints + # so test this by just running invalid data in + foo = self.metadata.tables['foo'] + + assert_raises_message( + exc.IntegrityError, + "newck", + self.conn.execute, + foo.insert(), {"id": 6, "data": 5, "x": -2} + ) + @config.requirements.sqlalchemy_094 @config.requirements.unnamed_constraints def test_drop_foreign_key(self):