- ensure that all ops have full backwards/forwards maintenance of

the original object passed in from autogenerate.  In particular this
ensures that the diff structure from compare_metadata is fully backwards compatible
with no chance of synthesized objects.
This commit is contained in:
Mike Bayer 2015-08-07 22:25:41 -04:00
parent 1c6be577b8
commit 1d3fac669f
4 changed files with 169 additions and 10 deletions

View File

@ -179,11 +179,12 @@ class CreatePrimaryKeyOp(AddConstraintOp):
def __init__(
self, constraint_name, table_name, columns,
schema=None, **kw):
schema=None, _orig_constraint=None, **kw):
self.constraint_name = constraint_name
self.table_name = table_name
self.columns = columns
self.schema = schema
self._orig_constraint = _orig_constraint
self.kw = kw
@classmethod
@ -193,11 +194,15 @@ class CreatePrimaryKeyOp(AddConstraintOp):
return cls(
constraint.name,
constraint_table.name,
constraint.columns,
schema=constraint_table.schema,
*constraint.columns
_orig_constraint=constraint
)
def to_constraint(self, migration_context=None):
if self._orig_constraint is not None:
return self._orig_constraint
schema_obj = schemaobj.SchemaObjects(migration_context)
return schema_obj.primary_key_constraint(
self.constraint_name, self.table_name,
@ -289,11 +294,12 @@ class CreateUniqueConstraintOp(AddConstraintOp):
def __init__(
self, constraint_name, table_name,
columns, schema=None, **kw):
columns, schema=None, _orig_constraint=None, **kw):
self.constraint_name = constraint_name
self.table_name = table_name
self.columns = columns
self.schema = schema
self._orig_constraint = _orig_constraint
self.kw = kw
@classmethod
@ -311,10 +317,14 @@ class CreateUniqueConstraintOp(AddConstraintOp):
constraint_table.name,
[c.name for c in constraint.columns],
schema=constraint_table.schema,
_orig_constraint=constraint,
**kw
)
def to_constraint(self, migration_context=None):
if self._orig_constraint is not None:
return self._orig_constraint
schema_obj = schemaobj.SchemaObjects(migration_context)
return schema_obj.unique_constraint(
self.constraint_name, self.table_name, self.columns,
@ -421,12 +431,13 @@ class CreateForeignKeyOp(AddConstraintOp):
def __init__(
self, constraint_name, source_table, referent_table, local_cols,
remote_cols, **kw):
remote_cols, _orig_constraint=None, **kw):
self.constraint_name = constraint_name
self.source_table = source_table
self.referent_table = referent_table
self.local_cols = local_cols
self.remote_cols = remote_cols
self._orig_constraint = _orig_constraint
self.kw = kw
def to_diff_tuple(self):
@ -459,10 +470,13 @@ class CreateForeignKeyOp(AddConstraintOp):
target_table,
source_columns,
target_columns,
_orig_constraint=constraint,
**kw
)
def to_constraint(self, migration_context=None):
if self._orig_constraint is not None:
return self._orig_constraint
schema_obj = schemaobj.SchemaObjects(migration_context)
return schema_obj.foreign_key_constraint(
self.constraint_name,
@ -606,11 +620,13 @@ class CreateCheckConstraintOp(AddConstraintOp):
constraint_type = "check"
def __init__(
self, constraint_name, table_name, condition, schema=None, **kw):
self, constraint_name, table_name,
condition, schema=None, _orig_constraint=None, **kw):
self.constraint_name = constraint_name
self.table_name = table_name
self.condition = condition
self.schema = schema
self._orig_constraint = _orig_constraint
self.kw = kw
@classmethod
@ -620,11 +636,14 @@ class CreateCheckConstraintOp(AddConstraintOp):
return cls(
constraint.name,
constraint_table.name,
constraint.condition,
constraint.sqltext,
schema=constraint_table.schema,
_orig_constraint=constraint
)
def to_constraint(self, migration_context=None):
if self._orig_constraint is not None:
return self._orig_constraint
schema_obj = schemaobj.SchemaObjects(migration_context)
return schema_obj.check_constraint(
self.constraint_name, self.table_name,
@ -1444,6 +1463,9 @@ class AddColumnOp(AlterTableOp):
def to_diff_tuple(self):
return ("add_column", self.schema, self.table_name, self.column)
def to_column(self):
return self.column
@classmethod
def from_column(cls, col):
return cls(col.table.name, col, schema=col.table.schema)
@ -1558,6 +1580,8 @@ class DropColumnOp(AlterTableOp):
return cls(tname, col.name, schema=schema, _orig_column=col)
def to_column(self, migration_context=None):
if self._orig_column is not None:
return self._orig_column
schema_obj = schemaobj.SchemaObjects(migration_context)
return schema_obj.column(self.column_name, NULLTYPE)

View File

@ -1,5 +1,5 @@
from .fixtures import TestBase
from .assertions import eq_, ne_, is_, assert_raises_message, \
from .assertions import eq_, ne_, is_, is_not_, assert_raises_message, \
eq_ignore_whitespace, assert_raises
from .util import provide_metadata

View File

@ -25,6 +25,10 @@ if not util.sqla_094:
"""Assert a is b, with repr messaging on failure."""
assert a is b, msg or "%r is not %r" % (a, b)
def is_not_(a, b, msg=None):
"""Assert a is not b, with repr messaging on failure."""
assert a is not b, msg or "%r is %r" % (a, b)
def assert_raises(except_cls, callable_, *args, **kw):
try:
callable_(*args, **kw)
@ -45,7 +49,7 @@ if not util.sqla_094:
print(text_type(e).encode('utf-8'))
else:
from sqlalchemy.testing.assertions import eq_, ne_, is_, \
from sqlalchemy.testing.assertions import eq_, ne_, is_, is_not_, \
assert_raises_message, assert_raises

View File

@ -2,7 +2,8 @@ import sys
from sqlalchemy import MetaData, Column, Table, Integer, String, Text, \
Numeric, CHAR, ForeignKey, INTEGER, Index, UniqueConstraint, \
TypeDecorator, CheckConstraint, text, PrimaryKeyConstraint
TypeDecorator, CheckConstraint, text, PrimaryKeyConstraint, \
ForeignKeyConstraint
from sqlalchemy.types import NULLTYPE
from sqlalchemy.engine.reflection import Inspector
@ -13,7 +14,7 @@ from alembic.testing import TestBase
from alembic.testing import config
from alembic.testing import assert_raises_message
from alembic.testing.mock import Mock
from alembic.testing import eq_
from alembic.testing import eq_, is_, is_not_
from alembic.util import CommandError
from ._autogen_fixtures import AutogenTest, AutogenFixtureTest
@ -324,6 +325,10 @@ class AutogenerateDiffTest(ModelOne, AutogenTest, TestBase):
eq_(diffs[10][0], 'remove_column')
eq_(diffs[10][3].name, 'pw')
eq_(diffs[10][3].table.name, 'user')
assert isinstance(
diffs[10][3].type, String
)
def test_include_symbol(self):
@ -963,3 +968,129 @@ class PGCompareMetaData(ModelOne, AutogenTest, TestBase):
eq_(diffs[5][0][0], 'modify_nullable')
eq_(diffs[5][0][5], False)
eq_(diffs[5][0][6], True)
class OrigObjectTest(TestBase):
def setUp(self):
self.metadata = m = MetaData()
t = Table(
't', m,
Column('id', Integer(), primary_key=True),
Column('x', Integer())
)
self.ix = Index('ix1', t.c.id)
fk = ForeignKeyConstraint(['t_id'], ['t.id'])
q = Table(
'q', m,
Column('t_id', Integer()),
fk
)
self.table = t
self.fk = fk
self.ck = CheckConstraint(t.c.x > 5)
self.uq = UniqueConstraint(q.c.t_id)
self.pk = t.primary_key
def test_drop_fk(self):
fk = self.fk
op = ops.DropConstraintOp.from_constraint(fk)
is_(op.to_constraint(), fk)
is_(op.reverse().to_constraint(), fk)
def test_add_fk(self):
fk = self.fk
op = ops.AddConstraintOp.from_constraint(fk)
is_(op.to_constraint(), fk)
is_(op.reverse().to_constraint(), fk)
is_not_(None, op.to_constraint().table)
def test_add_check(self):
ck = self.ck
op = ops.AddConstraintOp.from_constraint(ck)
is_(op.to_constraint(), ck)
is_(op.reverse().to_constraint(), ck)
is_not_(None, op.to_constraint().table)
def test_drop_check(self):
ck = self.ck
op = ops.DropConstraintOp.from_constraint(ck)
is_(op.to_constraint(), ck)
is_(op.reverse().to_constraint(), ck)
is_not_(None, op.to_constraint().table)
def test_add_unique(self):
uq = self.uq
op = ops.AddConstraintOp.from_constraint(uq)
is_(op.to_constraint(), uq)
is_(op.reverse().to_constraint(), uq)
is_not_(None, op.to_constraint().table)
def test_drop_unique(self):
uq = self.uq
op = ops.DropConstraintOp.from_constraint(uq)
is_(op.to_constraint(), uq)
is_(op.reverse().to_constraint(), uq)
is_not_(None, op.to_constraint().table)
def test_add_pk_no_orig(self):
op = ops.CreatePrimaryKeyOp('pk1', 't', ['x', 'y'])
pk = op.to_constraint()
eq_(pk.name, 'pk1')
eq_(pk.table.name, 't')
def test_add_pk(self):
pk = self.pk
op = ops.AddConstraintOp.from_constraint(pk)
is_(op.to_constraint(), pk)
is_(op.reverse().to_constraint(), pk)
is_not_(None, op.to_constraint().table)
def test_drop_pk(self):
pk = self.pk
op = ops.DropConstraintOp.from_constraint(pk)
is_(op.to_constraint(), pk)
is_(op.reverse().to_constraint(), pk)
is_not_(None, op.to_constraint().table)
def test_drop_column(self):
t = self.table
op = ops.DropColumnOp.from_column_and_tablename(None, 't', t.c.x)
is_(op.to_column(), t.c.x)
is_(op.reverse().to_column(), t.c.x)
is_not_(None, op.to_column().table)
def test_add_column(self):
t = self.table
op = ops.AddColumnOp.from_column_and_tablename(None, 't', t.c.x)
is_(op.to_column(), t.c.x)
is_(op.reverse().to_column(), t.c.x)
is_not_(None, op.to_column().table)
def test_drop_table(self):
t = self.table
op = ops.DropTableOp.from_table(t)
is_(op.to_table(), t)
is_(op.reverse().to_table(), t)
is_(self.metadata, op.to_table().metadata)
def test_add_table(self):
t = self.table
op = ops.CreateTableOp.from_table(t)
is_(op.to_table(), t)
is_(op.reverse().to_table(), t)
is_(self.metadata, op.to_table().metadata)
def test_drop_index(self):
op = ops.DropIndexOp.from_index(self.ix)
is_(op.to_index(), self.ix)
is_(op.reverse().to_index(), self.ix)
def test_create_index(self):
op = ops.CreateIndexOp.from_index(self.ix)
is_(op.to_index(), self.ix)
is_(op.reverse().to_index(), self.ix)