- stamp is working again

This commit is contained in:
Mike Bayer 2014-11-18 17:38:32 -05:00
parent 6bbbde58f7
commit 20cc68dedc
4 changed files with 92 additions and 51 deletions

View File

@ -121,7 +121,6 @@ def merge(config, revisiona, revisionb, message=None, branch_name=None):
**template_args)
def upgrade(config, revision, sql=False, tag=None):
"""Upgrade to a later version."""

View File

@ -1,13 +1,12 @@
import logging
import sys
from contextlib import contextmanager
from collections import namedtuple
from sqlalchemy import MetaData, Table, Column, String, literal_column
from sqlalchemy import create_engine
from sqlalchemy.engine import url as sqla_url
from .compat import callable, EncodedIO, string_types
from .compat import callable, EncodedIO
from . import ddl, util
from .revision import tuple_rev_as_scalar
@ -509,6 +508,26 @@ class MigrationStep(object):
def downgrade_from_script(cls, revision_map, script):
return RevisionStep(revision_map, script, False)
@property
def is_downgrade(self):
return not self.is_upgrade
@property
def merge_branch_idents(self):
return (
# delete revs, update from rev, update to rev
list(self.from_revisions[0:-1]), self.from_revisions[-1],
self.to_revisions[0]
)
@property
def unmerge_branch_idents(self):
return (
# update from rev, update to rev, insert revs
self.from_revisions[0], self.to_revisions[-1],
list(self.to_revisions[0:-1])
)
@property
def short_log(self):
return "%s %s -> %s" % (
@ -559,10 +578,6 @@ class RevisionStep(MigrationStep):
else:
return self.revision._down_revision_tuple
@property
def is_downgrade(self):
return not self.is_upgrade
@property
def _has_scalar_down_revision(self):
return len(self.revision._down_revision_tuple) == 1
@ -672,22 +687,60 @@ class RevisionStep(MigrationStep):
def insert_version_num(self):
return self.revision.revision
@property
def merge_branch_idents(self):
return (
# delete revs, update from rev, update to rev
self.from_revisions[0:-1], self.from_revisions[-1],
self.to_revisions[0]
)
@property
def unmerge_branch_idents(self):
return (
# update from rev, update to rev, insert revs
self.from_revisions[0], self.to_revisions[-1],
self.to_revisions[0:-1]
)
class StampStep(MigrationStep):
pass
def __init__(self, from_, to_, is_upgrade, branch_move):
self.from_ = util.to_tuple(from_, default=())
self.to_ = util.to_tuple(to_, default=())
assert len(self.to_) <= 1
self.is_upgrade = is_upgrade
self.branch_move = branch_move
self.migration_fn = self.stamp_revision
doc = None
def stamp_revision(self, **kw):
return None
def __eq__(self, other):
return isinstance(other, StampStep) and \
other.from_revisions == self.revisions and \
other.to_revisions == self.to_revisions and \
other.branch_move == self.branch_move and \
self.is_upgrade == other.is_upgrade
@property
def from_revisions(self):
return self.from_
@property
def to_revisions(self):
return self.to_
@property
def delete_version_num(self):
assert len(self.from_) == 1
return self.from_[0]
@property
def insert_version_num(self):
assert len(self.to_) == 1
return self.to_[0]
@property
def update_version_num(self):
assert len(self.from_) == 1
assert len(self.to_) == 1
return self.from_[0], self.to_[0]
def should_delete_branch(self, heads):
return self.is_downgrade and self.branch_move
def should_create_branch(self, heads):
return self.is_upgrade and self.branch_move
def should_merge_branches(self, heads):
return len(self.from_) > 1
def should_unmerge_branches(self, heads):
return False

View File

@ -298,18 +298,13 @@ class ScriptDirectory(object):
filtered_heads = self.revision_map.filter_for_lineage(
heads, revision)
def stamp_revision(**kw):
return None
null_migration = stamp_revision
dest = self.get_revision(revision)
if dest is None:
# dest is 'base'. Return a "delete branch" migration
# for all applicable heads.
return [
migration.MigrationStep(
null_migration, head.revision, None, None, False, True)
migration.StampStep(head.revision, None, False, True)
for head in filtered_heads
]
elif dest in filtered_heads:
@ -326,23 +321,20 @@ class ScriptDirectory(object):
# we can treat them as a "merge", single step.
assert not ancestors.intersection(filtered_heads)
todo_heads = [head.revision for head in filtered_heads]
step = migration.MigrationStep(
null_migration, todo_heads,
dest.revision, None, False, False)
step = migration.StampStep(
todo_heads, dest.revision, False, False)
return [step]
elif ancestors.intersection(filtered_heads):
# heads are below the target, so this is an upgrade.
# we can treat them as a "merge", single step.
todo_heads = [head.revision for head in filtered_heads]
step = migration.MigrationStep(
null_migration, todo_heads,
dest.revision, None, True, False)
step = migration.StampStep(
todo_heads, dest.revision, True, False)
return [step]
else:
# destination is in a branch not represented,
# treat it as new branch
step = migration.MigrationStep(
null_migration, [None], dest.revision, None, True, True)
step = migration.StampStep([None], dest.revision, True, True)
return [step]
def run_env(self):

View File

@ -243,9 +243,10 @@ class BranchedPathTest(MigrationTest):
self.a, self.b, self.c1, self.d1, self.c2, self.d2
)
eq_(
self.env._upgrade_revs(d1.revision, b.revision),
[self.up_(c1), self.up_(d1)]
self._assert_upgrade(
d1.revision, b.revision,
[self.up_(c1), self.up_(d1)],
set([d1.revision])
)
def test_upgrade_multiple_branch(self):
@ -254,10 +255,11 @@ class BranchedPathTest(MigrationTest):
self.a, self.b, self.c1, self.d1, self.c2, self.d2
)
eq_(
self.env._upgrade_revs((d1.revision, d2.revision), a.revision),
self._assert_upgrade(
(d1.revision, d2.revision), a.revision,
[self.up_(b), self.up_(c2), self.up_(d2),
self.up_(c1), self.up_(d1)]
self.up_(c1), self.up_(d1)],
set([d1.revision, d2.revision])
)
def test_downgrade_multiple_branch(self):
@ -267,14 +269,9 @@ class BranchedPathTest(MigrationTest):
self._assert_downgrade(
a.revision, (d1.revision, d2.revision),
[self.down_(d1), self.down_(c1), self.down_(d2),
self.down_(c2), self.down_(b)]
self.down_(c2), self.down_(b)],
set([a.revision])
)
eq_(
self.env._downgrade_revs(a.revision, (d1.revision, d2.revision)),
[self.down_(d1), self.down_(c1), self.down_(d2),
self.down_(c2), self.down_(b)]
)
class ForestTest(MigrationTest):