diff --git a/alembic/command.py b/alembic/command.py index 586d103..b69124a 100644 --- a/alembic/command.py +++ b/alembic/command.py @@ -208,7 +208,7 @@ def heads(config, verbose=False): script = ScriptDirectory.from_config(config) for rev in script.get_revisions("heads"): - config.print_stdout(rev.cmd_format(verbose)) + config.print_stdout(rev.cmd_format(verbose, short_head_status=False)) def branches(config): diff --git a/alembic/revision.py b/alembic/revision.py index af0771e..b079927 100644 --- a/alembic/revision.py +++ b/alembic/revision.py @@ -12,6 +12,16 @@ class RevisionError(Exception): pass +class RangeNotAncestorError(RevisionError): + def __init__(self, lower, upper): + self.lower = lower + self.upper = upper + super(RangeNotAncestorError, self).__init__( + "Revision %s is not an ancestor of revision %s" % + (lower or "base", upper or "base") + ) + + class MultipleHeads(RevisionError): pass @@ -245,7 +255,8 @@ class RevisionMap(object): if branch_rev: revs = self.filter_for_lineage(revs, check_branch) if not revs: - raise ResolutionError("No such revision '%s'" % resolved_id) + raise ResolutionError( + "No such revision or branch '%s'" % resolved_id) elif len(revs) > 1: raise ResolutionError( "Multiple revisions start " @@ -440,14 +451,7 @@ class RevisionMap(object): rev.revision for rev in self._get_descendant_nodes(lowers) ) if not total_space: - raise RevisionError( - "Revision(s) %s is not an ancestor of revision(s) %s" % ( - (", ".join(r.revision for r in lowers) - if lowers else "base"), - (", ".join(r.revision for r in uppers) - if uppers else "base") - ) - ) + raise RangeNotAncestorError(lower, upper) branch_endpoints = set( rev.revision for rev in diff --git a/alembic/script.py b/alembic/script.py index 8e318a9..eeefc99 100644 --- a/alembic/script.py +++ b/alembic/script.py @@ -7,6 +7,8 @@ from . import compat from . import revision from . import migration +from contextlib import contextmanager + _sourceless_rev_file = re.compile(r'(?!__init__)(.*\.py)(c|o)?$') _only_source_rev_file = re.compile(r'(?!__init__)(.*\.py)$') _legacy_rev = re.compile(r'([a-f0-9]+)\.py$') @@ -85,6 +87,34 @@ class ScriptDirectory(object): output_encoding=config.get_main_option("output_encoding", "utf-8") ) + @contextmanager + def _catch_revision_errors( + self, + ancestor=None, multiple_heads=None, start=None, end=None): + try: + yield + except revision.RangeNotAncestorError as rna: + if start is None: + start = rna.lower + if end is None: + end = rna.upper + if not ancestor: + ancestor = ( + "Requested range %(start)s:%(end)s does not refer to " + "ancestor/descendant revisions along the same branch" + ) + ancestor = ancestor % {"start": start, "end": end} + compat.raise_from_cause(util.CommandError(ancestor)) + except revision.MultipleHeads: + if not multiple_heads: + multiple_heads = ( + "Multiple head revisions are present; please " + "specify a specific target revision or specify 'heads' to " + "refer to all branch heads at once") + compat.raise_from_cause(util.CommandError(multiple_heads)) + except revision.RevisionError as err: + compat.raise_from_cause(util.CommandError(err.message)) + def walk_revisions(self, base="base", head="heads"): """Iterate through all revisions. @@ -100,14 +130,9 @@ class ScriptDirectory(object): refer to the set of all head branches simultaneously. """ - try: + with self._catch_revision_errors(start=base, end=head): for rev in self.iterate_revisions(head, base): yield rev - except revision.MultipleHeads: - raise util.CommandError( - "Revision '%s' corresponds to multiple revisions; " - "please specify 'heads' for all heads, or a specific " - "revision or branch" % head) def get_revisions(self, id_): """Return the :class:`.Script` instance with the given rev identifier, @@ -133,17 +158,14 @@ class ScriptDirectory(object): """Convert a symbolic revision, i.e. 'head' or 'base', into an actual revision number.""" - try: + with self._catch_revision_errors(id_): rev, branch_name = self.revision_map._resolve_revision_number(id_) - except revision.MultipleHeads: - raise util.CommandError( - "Revision %s corresponds to multiple revisions" % id_) + + if not rev: + # convert () to None + return None else: - if not rev: - # convert () to None - return None - else: - return rev[0] + return rev[0] def iterate_revisions(self, upper, lower): """Iterate through script revisions, starting at the given @@ -178,14 +200,12 @@ class ScriptDirectory(object): :meth:`.ScriptDirectory.get_heads` """ - try: - return self.revision_map.get_current_head() - except revision.MultipleHeads: - raise util.CommandError( + with self._catch_revision_errors(multiple_heads=( 'The script directory has multiple heads (due to branching).' 'Please use get_heads(), or merge the branches using ' 'alembic merge.' - ) + )): + return self.revision_map.get_current_head() def get_heads(self): """Return all "head" revisions as strings. @@ -210,7 +230,9 @@ class ScriptDirectory(object): yield rev, dupe def _upgrade_revs(self, destination, current_rev): - try: + with self._catch_revision_errors( + ancestor="Destination %(end)s is not a valid upgrade " + "target from current head(s)", end=destination): revs = self.revision_map.iterate_revisions( destination, current_rev, implicit_base=True) return [ @@ -220,11 +242,11 @@ class ScriptDirectory(object): for script, new_branch in self._flag_branch_changes(reversed(list(revs))) ] - except revision.RevisionError as err: - compat.raise_from_cause(util.CommandError(err.message)) def _downgrade_revs(self, destination, current_rev): - try: + with self._catch_revision_errors( + ancestor="Destination %(end)s is not a valid downgrade " + "target from current head(s)", end=destination): revs = self.revision_map.iterate_revisions( current_rev, destination) return [ @@ -233,8 +255,6 @@ class ScriptDirectory(object): ) for script, delete_branch in self._flag_branch_changes(revs) ] - except revision.RevisionError as err: - compat.raise_from_cause(util.CommandError(err.message)) def run_env(self): """Run the script environment. @@ -298,13 +318,12 @@ class ScriptDirectory(object): if head is None: head = "head" - try: + with self._catch_revision_errors(multiple_heads=( + "Multiple heads are present; please specify the head " + "revision on which the new revision should be based, " + "or perform a merge." + )): heads = self.revision_map.get_revisions(head) - except revision.MultipleHeads: - raise util.CommandError( - "Multiple heads are present; please specify the head " - "revision on which the new revision should be based, " - "or perform a merge.") create_date = datetime.datetime.now() path = self._rev_path(revid, message, create_date) @@ -430,11 +449,13 @@ class Script(revision.Revision): " (mergepoint)" if self.is_merge_point else "", ) - def cmd_format(self, verbose): + def cmd_format(self, verbose, short_head_status=True): if verbose: return self.log_entry - else: + elif short_head_status: return self._head_only() + else: + return self.revision def _format_down_revision(self): if not self.down_revision: diff --git a/tests/test_revision.py b/tests/test_revision.py index e642884..6dde218 100644 --- a/tests/test_revision.py +++ b/tests/test_revision.py @@ -207,7 +207,7 @@ class NamedBranchTest(DownIterateTest): def test_no_revision_exists(self): assert_raises_message( RevisionError, - "No such revision 'q'", + "No such revision or branch 'q'", self.map.get_revision, "abranch@q" ) @@ -286,7 +286,7 @@ class MultipleBranchTest(DownIterateTest): # db1cb1 is the descendant of b1 assert_raises_message( RevisionError, - r"Revision\(s\) d1cb1 is not an ancestor of revision\(s\) b1", + r"Revision d1cb1 is not an ancestor of revision b1", list, self.map._iterate_revisions('b1', 'd1cb1') ) @@ -295,7 +295,7 @@ class MultipleBranchTest(DownIterateTest): # nodes db2cb2 and b1 have no path to each other assert_raises_message( RevisionError, - r"Revision\(s\) b1 is not an ancestor of revision\(s\) d2cb2", + r"Revision b1 is not an ancestor of revision d2cb2", list, self.map._iterate_revisions('d2cb2', 'b1') ) @@ -303,14 +303,14 @@ class MultipleBranchTest(DownIterateTest): def test_wrong_direction_to_base(self): assert_raises_message( RevisionError, - r"Revision\(s\) d1cb1 is not an ancestor of revision\(s\) base", + r"Revision d1cb1 is not an ancestor of revision base", list, self.map._iterate_revisions(None, 'd1cb1') ) assert_raises_message( RevisionError, - r"Revision\(s\) d1cb1 is not an ancestor of revision\(s\) base", + r"Revision d1cb1 is not an ancestor of revision base", list, self.map._iterate_revisions((), 'd1cb1') ) diff --git a/tests/test_version_traversal.py b/tests/test_version_traversal.py index 7ef710e..3fd31a5 100644 --- a/tests/test_version_traversal.py +++ b/tests/test_version_traversal.py @@ -121,8 +121,8 @@ class RevisionPathTest(TestBase): a, b, c, d, e = self.a, self.b, self.c, self.d, self.e assert_raises_message( util.CommandError, - r"Revision\(s\) %s is not an ancestor " - "of revision\(s\) base" % b.revision, + r"Destination %s is not a valid downgrade " + "target from current head\(s\)" % b.revision[0:3], self.env._downgrade_revs, b.revision[0:3], None ) @@ -131,8 +131,8 @@ class RevisionPathTest(TestBase): assert_raises_message( util.CommandError, - r"Revision\(s\) %s is not an ancestor " - "of revision\(s\) %s" % (c.revision, b.revision), + r"Destination %s is not a valid downgrade " + "target from current head\(s\)" % c.revision[0:4], self.env._downgrade_revs, c.revision[0:4], b.revision )