- finish up most features

This commit is contained in:
Mike Bayer 2014-11-08 17:51:34 -05:00
parent e2c46aa37a
commit be274ae288
3 changed files with 247 additions and 67 deletions

View File

@ -5,7 +5,8 @@ from sqlalchemy.util import OrderedDict
class BatchOperationsImpl(object):
def __init__(self, operations, table_name, schema, recreate, copy_from):
def __init__(self, operations, table_name, schema, recreate,
copy_from, table_args, table_kwargs):
self.operations = operations
self.table_name = table_name
self.schema = schema
@ -14,6 +15,8 @@ class BatchOperationsImpl(object):
"recreate may be one of 'auto', 'always', or 'never'.")
self.recreate = recreate
self.copy_from = copy_from
self.table_args = table_args
self.table_kwargs = table_kwargs
self.batch = []
@property
@ -45,7 +48,8 @@ class BatchOperationsImpl(object):
self.table_name, m1, schema=self.schema,
autoload=True, autoload_with=self.operations.get_bind())
batch_impl = ApplyBatchImpl(existing_table)
batch_impl = ApplyBatchImpl(
existing_table, self.table_args, self.table_kwargs)
for opname, arg, kw in self.batch:
fn = getattr(batch_impl, opname)
fn(*arg, **kw)
@ -84,11 +88,13 @@ class BatchOperationsImpl(object):
class ApplyBatchImpl(object):
def __init__(self, table):
def __init__(self, table, table_args, table_kwargs):
self.table = table # this is a Table object
self.table_args = table_args
self.table_kwargs = table_kwargs
self.new_table = None
self.column_transfers = OrderedDict(
(c.name, {}) for c in self.table.c
(c.name, {'expr': c}) for c in self.table.c
)
self._grab_table_elements()
@ -116,11 +122,17 @@ class ApplyBatchImpl(object):
m = MetaData()
schema = self.table.schema
self.new_table = new_table = Table(
'_alembic_batch_temp', m, *self.columns.values(), schema=schema)
'_alembic_batch_temp', m,
*(list(self.columns.values()) + list(self.table_args)),
schema=schema,
**self.table_kwargs)
for const in list(self.named_constraints.values()) + \
self.unnamed_constraints:
const_columns = set([c.key for c in const.columns])
const_columns = set([
c.key for c in self._constraint_columns(const)])
if not const_columns.issubset(self.column_transfers):
continue
const_copy = const.copy(schema=schema, target_table=new_table)
@ -134,6 +146,12 @@ class ApplyBatchImpl(object):
*[new_table.c[col] for col in index.columns.keys()],
**index.kwargs)
def _constraint_columns(self, constraint):
if isinstance(constraint, ForeignKeyConstraint):
return [fk.parent for fk in constraint.elements]
else:
return list(constraint.columns)
def _setup_referent(self, metadata, constraint):
spec = constraint.elements[0]._get_colspec()
parts = spec.split(".")
@ -156,10 +174,12 @@ class ApplyBatchImpl(object):
op_impl._exec(
self.new_table.insert(inline=True).from_select(
list(self.column_transfers.keys()),
list(k for k, transfer in
self.column_transfers.items() if 'expr' in transfer),
select([
self.table.c[key]
for key in self.column_transfers
transfer['expr']
for transfer in self.column_transfers.values()
if 'expr' in transfer
])
)
)
@ -200,16 +220,24 @@ class ApplyBatchImpl(object):
def add_column(self, table_name, column, **kw):
self.columns[column.name] = column
self.column_transfers[column.name] = {}
def drop_column(self, table_name, column, **kw):
del self.columns[column.name]
del self.column_transfers[column.name]
def add_constraint(self, const):
raise NotImplementedError("TODO")
if not const.name:
raise ValueError("Constraint must have a name")
self.named_constraints[const.name] = const
def drop_constraint(self, const):
raise NotImplementedError("TODO")
if not const.name:
raise ValueError("Constraint must have a name")
try:
del self.named_constraints[const.name]
except KeyError:
raise ValueError("No such constraint: '%s'" % const.name)
def rename_table(self, *arg, **kw):
raise NotImplementedError("TODO")

View File

@ -191,7 +191,8 @@ class Operations(object):
@contextmanager
def batch_alter_table(
self, table_name, schema=None, recreate="auto", copy_from=None):
self, table_name, schema=None, recreate="auto", copy_from=None,
table_args=(), table_kwargs=util.immutabledict()):
"""Invoke a series of per-table migrations in batch.
Batch mode allows a series of operations specific to a table
@ -250,6 +251,16 @@ class Operations(object):
:param copy_from: optional :class:`~sqlalchemy.schema.Table` object
that will act as the structure of the table being copied. If omitted,
table reflection is used to retrieve the structure of the table.
:param table_args: a sequence of additional positional arguments that
will be applied to the new :class:`~sqlalchemy.schema.Table` when
created, in addition to those copied from the source table.
This may be used to provide additional constraints such as CHECK
constraints that may not be reflected.
:param table_kwargs: a dictionary of additional keyword arguments
that will be applied to the new :class:`~sqlalchemy.schema.Table`
when created, in addition to those copied from the source table.
This may be used to provide for additional table options that may
not be reflected.
.. versionadded:: 0.7.0
@ -259,7 +270,8 @@ class Operations(object):
"""
impl = batch.BatchOperationsImpl(
self, table_name, schema, recreate, copy_from)
self, table_name, schema, recreate,
copy_from, table_args, table_kwargs)
batch_op = BatchOperations(self.migration_context, impl=impl)
yield batch_op
impl.flush()
@ -1265,6 +1277,11 @@ class BatchOperations(Operations):
"""
def _noop(self, operation):
raise NotImplementedError(
"The %s method does not apply to a batch table alter operation."
% operation)
def add_column(self, column):
"""Issue an "add column" instruction using the current
batch migration context.
@ -1316,11 +1333,8 @@ class BatchOperations(Operations):
"""
def create_foreign_key(self, name, referent, local_cols,
remote_cols, onupdate=None, ondelete=None,
deferrable=None, initially=None, match=None,
referent_schema=None,
**dialect_kw):
def create_foreign_key(
self, name, referent, local_cols, remote_cols, **kw):
"""Issue a "create foreign key" instruction using the
current batch migration context.
@ -1339,6 +1353,8 @@ class BatchOperations(Operations):
:meth:`.Operations.create_foreign_key`
"""
return super(BatchOperations, self).create_foreign_key(
name, self.impl.table_name, referent, local_cols, remote_cols,)
def create_unique_constraint(self, name, local_cols, **kw):
"""Issue a "create unique constraint" instruction using the
@ -1352,6 +1368,8 @@ class BatchOperations(Operations):
:meth:`.Operations.create_unique_constraint`
"""
return super(BatchOperations, self).create_unique_constraint(
name, self.impl.table_name, local_cols, **kw)
def create_check_constraint(self, name, condition, **kw):
"""Issue a "create check constraint" instruction using the
@ -1366,33 +1384,6 @@ class BatchOperations(Operations):
"""
def create_index(self, name, table_name, columns, schema=None,
unique=False, quote=None, **kw):
"""Issue a "create index" instruction using the
current batch migration context.
The batch form of this call omits the ``table_name`` and ``schema``
arguments from the call.
.. seealso::
:meth:`.Operations.create_index`
"""
def drop_index(self, name):
"""Issue a "drop index" instruction using the
current batch migration context.
The batch form of this call omits the ``table_name`` and ``schema``
arguments from the call.
.. seealso::
:meth:`.Operations.drop_index`
"""
def drop_constraint(self, name, type_=None):
"""Issue a "drop constraint" instruction using the
current batch migration context.
@ -1405,3 +1396,14 @@ class BatchOperations(Operations):
:meth:`.Operations.drop_constraint`
"""
return super(BatchOperations, self).drop_constraint(
name, self.impl.table_name, type_=type_)
def create_index(self, *arg, **kw):
"""Not implemented for batch table operations."""
self._noop("create_index")
def drop_index(self, name):
"""Not implemented for batch table operations."""
self._noop("drop_index")

View File

@ -8,24 +8,37 @@ from alembic.operations import Operations
from alembic.batch import ApplyBatchImpl
from sqlalchemy import Integer, Table, Column, String, MetaData, ForeignKey, \
UniqueConstraint, Index, CheckConstraint, PrimaryKeyConstraint, \
ForeignKeyConstraint
UniqueConstraint, ForeignKeyConstraint
from sqlalchemy.sql import column
from sqlalchemy.schema import CreateTable
class BatchApplyTest(TestBase):
def _simple_fixture(self):
def setUp(self):
self.op = Operations(mock.Mock(opts={}))
def _simple_fixture(self, table_args=(), table_kwargs={}):
m = MetaData()
t = Table(
'tname', m,
Column('id', Integer, primary_key=True),
Column('x', String(10)),
Column('y', Integer)
)
return ApplyBatchImpl(t, table_args, table_kwargs)
def _uq_fixture(self, table_args=(), table_kwargs={}):
m = MetaData()
t = Table(
'tname', m,
Column('id', Integer, primary_key=True),
Column('x', String()),
Column('y', Integer)
Column('y', Integer),
UniqueConstraint('y', name='uq1')
)
return ApplyBatchImpl(t)
return ApplyBatchImpl(t, table_args, table_kwargs)
def _fk_fixture(self):
def _fk_fixture(self, table_args=(), table_kwargs={}):
m = MetaData()
t = Table(
'tname', m,
@ -33,9 +46,19 @@ class BatchApplyTest(TestBase):
Column('email', String()),
Column('user_id', Integer, ForeignKey('user.id'))
)
return ApplyBatchImpl(t)
return ApplyBatchImpl(t, table_args, table_kwargs)
def _selfref_fk_fixture(self):
def _named_fk_fixture(self, table_args=(), table_kwargs={}):
m = MetaData()
t = Table(
'tname', m,
Column('id', Integer, primary_key=True),
Column('email', String()),
Column('user_id', Integer, ForeignKey('user.id', name='ufk'))
)
return ApplyBatchImpl(t, table_args, table_kwargs)
def _selfref_fk_fixture(self, table_args=(), table_kwargs={}):
m = MetaData()
t = Table(
'tname', m,
@ -43,10 +66,12 @@ class BatchApplyTest(TestBase):
Column('parent_id', ForeignKey('tname.id')),
Column('data', String)
)
return ApplyBatchImpl(t)
return ApplyBatchImpl(t, table_args, table_kwargs)
def _assert_impl(self, impl, colnames=None):
context = op_fixture()
def _assert_impl(self, impl, colnames=None,
ddl_contains=None, ddl_not_contains=None,
dialect='default'):
context = op_fixture(dialect=dialect)
impl._create(context.impl)
@ -60,19 +85,22 @@ class BatchApplyTest(TestBase):
create_stmt = str(
CreateTable(impl.new_table).compile(dialect=context.dialect))
create_stmt = re.sub(r'[\n\t]', '', create_stmt)
if pk_cols:
assert "PRIMARY KEY" in create_stmt
else:
assert "PRIMARY KEY" not in create_stmt
if ddl_contains:
assert ddl_contains in create_stmt
if ddl_not_contains:
assert ddl_not_contains not in create_stmt
context.assert_(
create_stmt,
'INSERT INTO _alembic_batch_temp (%(colnames)s) '
'SELECT %(tname_colnames)s FROM tname' % {
"colnames": ", ".join([
impl.new_table.c[name].name for name in colnames]),
impl.new_table.c[name].name
for name in colnames if name in impl.table.c]),
"tname_colnames":
", ".join("tname.%s" % name for name in colnames)
", ".join("tname.%s" % name
for name in colnames if name in impl.table.c)
},
'DROP TABLE tname',
'ALTER TABLE _alembic_batch_temp RENAME TO tname'
@ -94,7 +122,8 @@ class BatchApplyTest(TestBase):
def test_rename_col_pk(self):
impl = self._simple_fixture()
impl.alter_column('tname', 'id', new_column_name='foobar')
new_table = self._assert_impl(impl)
new_table = self._assert_impl(
impl, ddl_contains="PRIMARY KEY (foobar)")
eq_(new_table.c.id.name, 'foobar')
eq_(list(new_table.primary_key), [new_table.c.id])
@ -102,7 +131,8 @@ class BatchApplyTest(TestBase):
impl = self._fk_fixture()
impl.alter_column('tname', 'user_id', new_column_name='foobar')
new_table = self._assert_impl(
impl, colnames=['id', 'email', 'user_id'])
impl, colnames=['id', 'email', 'user_id'],
ddl_contains='FOREIGN KEY(foobar) REFERENCES "user" (id)')
eq_(new_table.c.user_id.name, 'foobar')
eq_(
list(new_table.c.user_id.foreign_keys)[0]._get_colspec(),
@ -119,7 +149,8 @@ class BatchApplyTest(TestBase):
def test_drop_col_remove_pk(self):
impl = self._simple_fixture()
impl.drop_column('tname', column('id'))
new_table = self._assert_impl(impl, colnames=['x', 'y'])
new_table = self._assert_impl(
impl, colnames=['x', 'y'], ddl_not_contains="PRIMARY KEY")
assert 'y' in new_table.c
assert 'id' not in new_table.c
assert not new_table.primary_key
@ -127,14 +158,17 @@ class BatchApplyTest(TestBase):
def test_drop_col_remove_fk(self):
impl = self._fk_fixture()
impl.drop_column('tname', column('user_id'))
new_table = self._assert_impl(impl, colnames=['id', 'email'])
new_table = self._assert_impl(
impl, colnames=['id', 'email'], ddl_not_contains="FOREIGN KEY")
assert 'user_id' not in new_table.c
assert not new_table.foreign_keys
def test_drop_col_retain_fk(self):
impl = self._fk_fixture()
impl.drop_column('tname', column('email'))
new_table = self._assert_impl(impl, colnames=['id', 'user_id'])
new_table = self._assert_impl(
impl, colnames=['id', 'user_id'],
ddl_contains='FOREIGN KEY(user_id) REFERENCES "user" (id)')
assert 'email' not in new_table.c
assert new_table.c.user_id.foreign_keys
@ -145,6 +179,63 @@ class BatchApplyTest(TestBase):
assert 'data' not in new_table.c
assert new_table.c.parent_id.foreign_keys
def test_add_fk(self):
impl = self._simple_fixture()
impl.add_column('tname', Column('user_id', Integer))
fk = self.op._foreign_key_constraint(
'fk1', 'tname', 'user',
['user_id'], ['id'])
impl.add_constraint(fk)
new_table = self._assert_impl(
impl, colnames=['id', 'x', 'y', 'user_id'],
ddl_contains='CONSTRAINT fk1 FOREIGN KEY(user_id) '
'REFERENCES "user" (id)')
eq_(
list(new_table.c.user_id.foreign_keys)[0]._get_colspec(),
'user.id'
)
def test_drop_fk(self):
impl = self._named_fk_fixture()
fk = ForeignKeyConstraint([], [], name='ufk')
impl.drop_constraint(fk)
new_table = self._assert_impl(
impl, colnames=['id', 'email', 'user_id'],
ddl_not_contains="CONSTRANT fk1")
eq_(
list(new_table.foreign_keys),
[]
)
def test_add_uq(self):
impl = self._simple_fixture()
uq = self.op._unique_constraint(
'uq1', 'tname', ['y']
)
impl.add_constraint(uq)
self._assert_impl(
impl, colnames=['id', 'x', 'y'],
ddl_contains="CONSTRAINT uq1 UNIQUE")
def test_drop_uq(self):
impl = self._uq_fixture()
uq = self.op._unique_constraint(
'uq1', 'tname', ['y']
)
impl.drop_constraint(uq)
self._assert_impl(
impl, colnames=['id', 'x', 'y'],
ddl_not_contains="CONSTRAINT uq1 UNIQUE")
def test_add_table_opts(self):
impl = self._simple_fixture(table_kwargs={'mysql_engine': 'InnoDB'})
self._assert_impl(
impl, ddl_contains="ENGINE=InnoDB",
dialect='mysql'
)
class BatchAPITest(TestBase):
@contextmanager
@ -179,3 +270,62 @@ class BatchAPITest(TestBase):
[mock.call.add_column(
'tname', column, schema=None)]
)
def test_create_fk(self):
with self._fixture() as batch:
batch.create_foreign_key('myfk', 'user', ['x'], ['y'])
eq_(
self.mock_schema.ForeignKeyConstraint.mock_calls,
[
mock.call(
['x'], ['user.y'],
onupdate=None, ondelete=None, name='myfk',
initially=None, deferrable=None, match=None)
]
)
eq_(
batch.impl.operations.impl.mock_calls,
[mock.call.add_constraint(
self.mock_schema.ForeignKeyConstraint())]
)
def test_create_uq(self):
with self._fixture() as batch:
batch.create_unique_constraint('uq1', ['a', 'b'])
eq_(
self.mock_schema.Table().c.__getitem__.mock_calls,
[mock.call('a'), mock.call('b')]
)
eq_(
self.mock_schema.UniqueConstraint.mock_calls,
[
mock.call(
self.mock_schema.Table().c.__getitem__(),
self.mock_schema.Table().c.__getitem__(),
name='uq1'
)
]
)
eq_(
batch.impl.operations.impl.mock_calls,
[mock.call.add_constraint(
self.mock_schema.UniqueConstraint())]
)
def test_drop_constraint(self):
with self._fixture() as batch:
batch.drop_constraint('uq1')
eq_(
self.mock_schema.Constraint.mock_calls,
[
mock.call(name='uq1')
]
)
eq_(
batch.impl.operations.impl.mock_calls,
[mock.call.drop_constraint(self.mock_schema.Constraint())]
)