address stamp issues in on_version_apply callback

* add callback tests with stamp present
* fix bug in stamp MigrationInfo construction
* adjust MigrationInfo API to reflect existence of stamps with multiple
  up revisions

Change-Id: I308d1de7854542d4d12bcc743bb5ed7e8e2fbefc
Pull-request: https://bitbucket.org/zzzeek/alembic/pull-requests/68
This commit is contained in:
jpassaro 2017-07-10 15:51:46 -04:00 committed by Mike Bayer
parent 11b0e8a5b6
commit e88131f2c0
3 changed files with 81 additions and 22 deletions

View File

@ -560,7 +560,31 @@ class MigrationInfo(object):
it results in any actual database operations)."""
up_revision_id = None
"""Version string corresponding to :attr:`.Revision.revision`."""
"""Version string corresponding to :attr:`.Revision.revision`.
In the case of a stamp operation, it is advised to use the
:attr:`.MigrationInfo.up_revision_ids` tuple as a stamp operation can
make a single movement from one or more branches down to a single
branchpoint, in which case there will be multiple "up" revisions.
.. seealso::
:attr:`.MigrationInfo.up_revision_ids`
"""
up_revision_ids = None
"""Tuple of version strings corresponding to :attr:`.Revision.revision`.
In the majority of cases, this tuple will be a single value, synonomous
with the scalar value of :attr:`.MigrationInfo.up_revision_id`.
It can be multiple revision identifiers only in the case of an
``alembic stamp`` operation which is moving downwards from multiple
branches down to their common branch point.
.. versionadded:: 0.9.4
"""
down_revision_ids = None
"""Tuple of strings representing the base revisions of this migration step.
@ -573,13 +597,20 @@ class MigrationInfo(object):
revision_map = None
"""The revision map inside of which this operation occurs."""
def __init__(self, revision_map, is_upgrade, is_stamp, up_revision,
def __init__(self, revision_map, is_upgrade, is_stamp, up_revisions,
down_revisions):
self.revision_map = revision_map
self.is_upgrade = is_upgrade
self.is_stamp = is_stamp
self.up_revision_id = up_revision
self.down_revision_ids = util.to_tuple(down_revisions)
self.up_revision_ids = util.to_tuple(up_revisions, default=())
if self.up_revision_ids:
self.up_revision_id = self.up_revision_ids[0]
else:
# this should never be the case with
# "upgrade", "downgrade", or "stamp" as we are always
# measuring movement in terms of at least one upgrade version
self.up_revision_id = None
self.down_revision_ids = util.to_tuple(down_revisions, default=())
@property
def is_migration(self):
@ -594,25 +625,32 @@ class MigrationInfo(object):
@property
def source_revision_ids(self):
"""Active revisions before this migration step is applied."""
revs = self.down_revision_ids if self.is_upgrade \
else self.up_revision_id
return util.to_tuple(revs, default=())
return self.down_revision_ids if self.is_upgrade \
else self.up_revision_ids
@property
def destination_revision_ids(self):
"""Active revisions after this migration step is applied."""
revs = self.up_revision_id if self.is_upgrade \
return self.up_revision_ids if self.is_upgrade \
else self.down_revision_ids
return util.to_tuple(revs, default=())
@property
def up_revision(self):
"""Get :attr:`~MigrationInfo.up_revision_id` as a :class:`.Revision`."""
"""Get :attr:`~.MigrationInfo.up_revision_id` as a :class:`.Revision`."""
return self.revision_map.get_revision(self.up_revision_id)
@property
def up_revisions(self):
"""Get :attr:`~.MigrationInfo.up_revision_ids` as a :class:`.Revision`.
.. versionadded:: 0.9.4
"""
return self.revision_map.get_revisions(self.up_revision_ids)
@property
def down_revisions(self):
"""Get :attr:`~MigrationInfo.down_revision_ids` as a tuple of
"""Get :attr:`~.MigrationInfo.down_revision_ids` as a tuple of
:class:`Revisions <.Revision>`."""
return self.revision_map.get_revisions(self.down_revision_ids)
@ -857,7 +895,7 @@ class RevisionStep(MigrationStep):
@property
def info(self):
return MigrationInfo(revision_map=self.revision_map,
up_revision=self.revision.revision,
up_revisions=self.revision.revision,
down_revisions=self.revision._all_down_revisions,
is_upgrade=self.is_upgrade, is_stamp=False)
@ -944,5 +982,8 @@ class StampStep(MigrationStep):
def info(self):
up, down = (self.to_, self.from_) if self.is_upgrade \
else (self.from_, self.to_)
return MigrationInfo(self.revision_map, up, down, self.is_upgrade,
True)
return MigrationInfo(revision_map=self.revision_map,
up_revisions=up,
down_revisions=down,
is_upgrade=self.is_upgrade,
is_stamp=True)

View File

@ -6,6 +6,16 @@ Changelog
.. changelog::
:version: 0.9.4
.. change::
:tags: bug, runtime
Added an additional attribute to the new
:paramref:`.EnvironmentContext.configure.on_version_apply` API,
:attr:`.MigrationInfo.up_revision_ids`, to accommodate for the uncommon
case of the ``alembic stamp`` command being used to move from multiple
branches down to a common branchpoint; there will be multiple
"up" revisions in this one case.
.. changelog::
:version: 0.9.3
:released: July 6, 2017
@ -13,7 +23,7 @@ Changelog
.. change::
:tags: feature, runtime
Added a new callback hook
Added a new callback hook
:paramref:`.EnvironmentContext.configure.on_version_apply`,
which allows user-defined code to be invoked each time an individual
upgrade, downgrade, or stamp operation proceeds against a database.

View File

@ -36,6 +36,7 @@ class ApplyVersionsFunctionalTest(TestBase):
self._test_004_downgrade()
self._test_005_upgrade()
self._test_006_upgrade_again()
self._test_007_stamp_upgrade()
def _test_001_revisions(self):
self.a = a = util.rev_id()
@ -129,6 +130,13 @@ class ApplyVersionsFunctionalTest(TestBase):
assert db.dialect.has_table(db.connect(), 'bar')
assert not db.dialect.has_table(db.connect(), 'bat')
def _test_007_stamp_upgrade(self):
command.stamp(self.cfg, self.c)
db = self.bind
assert db.dialect.has_table(db.connect(), 'foo')
assert db.dialect.has_table(db.connect(), 'bar')
assert not db.dialect.has_table(db.connect(), 'bat')
class SourcelessApplyVersionsTest(ApplyVersionsFunctionalTest):
sourceless = True
@ -194,13 +202,13 @@ class CallbackEnvironmentTest(ApplyVersionsFunctionalTest):
assert hasattr(kw['ctx'], 'get_current_revision')
step = kw['step']
assert isinstance(getattr(step, 'is_upgrade', None), bool)
assert isinstance(getattr(step, 'is_stamp', None), bool)
assert isinstance(getattr(step, 'is_migration', None), bool)
assert isinstance(getattr(step, 'up_revision_id', None),
compat.string_types)
assert isinstance(getattr(step, 'up_revision', None), Script)
for revtype in 'down', 'source', 'destination':
assert isinstance(step.is_upgrade, bool)
assert isinstance(step.is_stamp, bool)
assert isinstance(step.is_migration, bool)
assert isinstance(step.up_revision_id, compat.string_types)
assert isinstance(step.up_revision, Script)
for revtype in 'up', 'down', 'source', 'destination':
revs = getattr(step, '%s_revisions' % revtype)
assert isinstance(revs, tuple)
for rev in revs: