Support cross-repo-dependencies in dependent pipelines

Parse commit messages for "Depends-On: <changeid>" and treat
matching changes as changes that the given change depends on.

This will treat any changes in any branch of any project as
such.  If the projects share a dependent change queue, the
changes will be enqueued in order.  If they do not share a
change queue in a dependent pipeline, then the latter one will
be unable to be enqueued until the change it depends on merges.

If the dependencies result in a cycle, Zuul will log the error
but otherwise the problematic changes will be ignored.

Dependent changes in independent pipelines are not yet addressed.

Change-Id: I90c173f86d11e6c44d1f408646589b7c75b1cd52
This commit is contained in:
James E. Blair 2014-12-30 10:12:29 -08:00
parent 063672f8d3
commit 5ee2425651
4 changed files with 246 additions and 45 deletions

View File

@ -418,11 +418,15 @@ class FakeGerrit(object):
return {}
def simpleQuery(self, query):
# This is currently only used to return all open changes for a
# project
self.queries.append(query)
l = [change.query() for change in self.changes.values()]
l.append({"type": "stats", "rowCount": 1, "runTimeMilliseconds": 3})
if query.startswith('change:'):
# Query a specific changeid
changeid = query[len('change:'):]
l = [change.query() for change in self.changes.values()
if change.data['id'] == changeid]
else:
# Query all open changes
l = [change.query() for change in self.changes.values()]
return l
def startWatching(self, *args, **kw):

View File

@ -2890,3 +2890,127 @@ For CI problems and help debugging, contact ci@example.org"""
self.getJobFromHistory('experimental-project-test').result,
'SUCCESS')
self.assertEqual(A.reported, 1)
def test_crd_gate(self):
"Test cross-repo dependencies"
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
AM2 = self.fake_gerrit.addFakeChange('org/project1', 'master', 'AM2')
AM1 = self.fake_gerrit.addFakeChange('org/project1', 'master', 'AM1')
AM2.setMerged()
AM1.setMerged()
BM2 = self.fake_gerrit.addFakeChange('org/project2', 'master', 'BM2')
BM1 = self.fake_gerrit.addFakeChange('org/project2', 'master', 'BM1')
BM2.setMerged()
BM1.setMerged()
# A -> AM1 -> AM2
# B -> BM1 -> BM2
# A Depends-On: B
# M2 is here to make sure it is never queried. If it is, it
# means zuul is walking down the entire history of merged
# changes.
B.setDependsOn(BM1, 1)
BM1.setDependsOn(BM2, 1)
A.setDependsOn(AM1, 1)
AM1.setDependsOn(AM2, 1)
A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
A.subject, B.data['id'])
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'NEW')
self.assertEqual(B.data['status'], 'NEW')
source = self.sched.layout.pipelines['gate'].source
source.maintainCache([])
self.worker.hold_jobs_in_build = True
B.addApproval('APRV', 1)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
self.assertEqual(AM2.queried, 0)
self.assertEqual(BM2.queried, 0)
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 2)
self.assertEqual(self.history[-1].changes, '2,1 1,1')
def test_crd_unshared_gate(self):
"Test cross-repo dependencies in unshared gate queues"
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
# A Depends-On: B
A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
A.subject, B.data['id'])
# A and B do not share a queue, make sure that A is unable to
# enqueue B (and therefore, A is unable to be enqueued).
B.addApproval('APRV', 1)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'NEW')
self.assertEqual(B.data['status'], 'NEW')
self.assertEqual(A.reported, 0)
self.assertEqual(B.reported, 0)
self.assertEqual(len(self.history), 0)
# Enqueue and merge B alone.
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.waitUntilSettled()
self.assertEqual(B.data['status'], 'MERGED')
self.assertEqual(B.reported, 2)
# Now that B is merged, A should be able to be enqueued and
# merged.
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
def test_crd_cycle(self):
"Test cross-repo dependency cycles"
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
# A -> B -> A (via commit-depends)
A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
A.subject, B.data['id'])
B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
B.subject, A.data['id'])
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
self.assertEqual(A.reported, 0)
self.assertEqual(B.reported, 0)
self.assertEqual(A.data['status'], 'NEW')
self.assertEqual(B.data['status'], 'NEW')

View File

@ -1064,10 +1064,12 @@ class BasePipelineManager(object):
def isChangeReadyToBeEnqueued(self, change):
return True
def enqueueChangesAhead(self, change, quiet, ignore_requirements):
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
change_queue):
return True
def enqueueChangesBehind(self, change, quiet, ignore_requirements):
def enqueueChangesBehind(self, change, quiet, ignore_requirements,
change_queue):
return True
def checkForChangesNeededBy(self, change):
@ -1126,7 +1128,7 @@ class BasePipelineManager(object):
return False
def addChange(self, change, quiet=False, enqueue_time=None,
ignore_requirements=False):
ignore_requirements=False, change_queue=None):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
@ -1144,7 +1146,16 @@ class BasePipelineManager(object):
"requirement %s" % (change, f))
return False
if not self.enqueueChangesAhead(change, quiet, ignore_requirements):
if not change_queue:
change_queue = self.getChangeQueue(change)
if not change_queue:
self.log.debug("Unable to find change queue for "
"change %s in project %s" %
(change, change.project))
return False
if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
change_queue):
self.log.debug("Failed to enqueue changes ahead of %s" % change)
return False
@ -1152,26 +1163,20 @@ class BasePipelineManager(object):
self.log.debug("Change %s is already in queue, ignoring" % change)
return True
change_queue = self.pipeline.getQueue(change.project)
if change_queue:
self.log.debug("Adding change %s to queue %s" %
(change, change_queue))
if not quiet:
if len(self.pipeline.start_actions) > 0:
self.reportStart(change)
item = change_queue.enqueueChange(change)
if enqueue_time:
item.enqueue_time = enqueue_time
self.reportStats(item)
self.enqueueChangesBehind(change, quiet, ignore_requirements)
self.sched.triggers['zuul'].onChangeEnqueued(item.change,
self.pipeline)
return True
else:
self.log.debug("Unable to find change queue for "
"change %s in project %s" %
(change, change.project))
return False
self.log.debug("Adding change %s to queue %s" %
(change, change_queue))
if not quiet:
if len(self.pipeline.start_actions) > 0:
self.reportStart(change)
item = change_queue.enqueueChange(change)
if enqueue_time:
item.enqueue_time = enqueue_time
self.reportStats(item)
self.enqueueChangesBehind(change, quiet, ignore_requirements,
change_queue)
self.sched.triggers['zuul'].onChangeEnqueued(item.change,
self.pipeline)
return True
def dequeueItem(self, item):
self.log.debug("Removing change %s from queue" % item.change)
@ -1694,6 +1699,9 @@ class IndependentPipelineManager(BasePipelineManager):
self.pipeline.addQueue(change_queue)
def getChangeQueue(self, change):
return self.pipeline.getQueue(change.project)
class DependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.DependentPipelineManager")
@ -1754,6 +1762,9 @@ class DependentPipelineManager(BasePipelineManager):
new_change_queues.append(a)
return new_change_queues
def getChangeQueue(self, change):
return self.pipeline.getQueue(change.project)
def isChangeReadyToBeEnqueued(self, change):
if not self.pipeline.source.canMerge(change,
self.getSubmitAllowNeeds()):
@ -1761,34 +1772,46 @@ class DependentPipelineManager(BasePipelineManager):
return False
return True
def enqueueChangesBehind(self, change, quiet, ignore_requirements):
def enqueueChangesBehind(self, change, quiet, ignore_requirements,
change_queue):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
self.log.debug(" Changeish does not support dependencies")
return
for needs in change.needed_by_changes:
if self.pipeline.source.canMerge(needs,
for other_change in change.needed_by_changes:
other_change_queue = self.getChangeQueue(other_change)
if other_change_queue != change_queue:
self.log.debug(" Change %s in project %s can not be enqueued "
"in the target queue %s" %
(other_change, other_change.project,
change_queue))
continue
if self.pipeline.source.canMerge(other_change,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
(needs, change))
to_enqueue.append(needs)
(other_change, change))
to_enqueue.append(other_change)
if not to_enqueue:
self.log.debug(" No changes need %s" % change)
for other_change in to_enqueue:
self.addChange(other_change, quiet=quiet,
ignore_requirements=ignore_requirements)
ignore_requirements=ignore_requirements,
change_queue=change_queue)
def enqueueChangesAhead(self, change, quiet, ignore_requirements):
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
change_queue):
ret = self.checkForChangesNeededBy(change)
if ret in [True, False]:
return ret
self.log.debug(" Change %s must be merged ahead of %s" %
self.log.debug(" Changes %s must be merged ahead of %s" %
(ret, change))
for needed_change in ret:
r = self.addChange(needed_change, quiet=quiet,
ignore_requirements=ignore_requirements)
ignore_requirements=ignore_requirements,
change_queue=change_queue)
if not r:
return False
return True
@ -1804,13 +1827,20 @@ class DependentPipelineManager(BasePipelineManager):
self.log.debug(" No changes needed")
return True
changes_needed = []
# TODO (jeblair): this is only correct for a list of 1 element
change_queue = self.getChangeQueue(change)
for needed_change in change.needs_changes:
self.log.debug(" Change %s needs change %s:" % (
change, needed_change))
if needed_change.is_merged:
self.log.debug(" Needed change is merged")
continue
needed_change_queue = self.getChangeQueue(needed_change)
if needed_change_queue != change_queue:
self.log.debug(" Change %s in project %s does not share a "
"change queue with %s in project %s" %
(needed_change, needed_change.project,
change, change.project))
return False
if not needed_change.is_current_patchset:
self.log.debug(" Needed change is not the current patchset")
return False

View File

@ -13,6 +13,7 @@
# under the License.
import logging
import re
import threading
import time
import urllib2
@ -93,7 +94,6 @@ class GerritEventConnector(threading.Thread):
refresh=True)
self.sched.addEvent(event)
self.gerrit.eventDone()
def run(self):
while True:
@ -103,6 +103,8 @@ class GerritEventConnector(threading.Thread):
self._handleEvent()
except:
self.log.exception("Exception moving Gerrit event:")
finally:
self.gerrit.eventDone()
class Gerrit(object):
@ -111,6 +113,9 @@ class Gerrit(object):
replication_timeout = 300
replication_retry_interval = 5
depends_on_re = re.compile(r"^Depends-On: (I[0-9a-f]{40})\s*$",
re.MULTILINE | re.IGNORECASE)
def __init__(self, config, sched):
self._change_cache = {}
self.sched = sched
@ -304,7 +309,7 @@ class Gerrit(object):
change = NullChange(project)
return change
def _getChange(self, number, patchset, refresh=False):
def _getChange(self, number, patchset, refresh=False, history=None):
key = '%s,%s' % (number, patchset)
change = None
if key in self._change_cache:
@ -318,7 +323,7 @@ class Gerrit(object):
key = '%s,%s' % (change.number, change.patchset)
self._change_cache[key] = change
try:
self.updateChange(change)
self.updateChange(change, history)
except Exception:
del self._change_cache[key]
raise
@ -342,7 +347,22 @@ class Gerrit(object):
(record.get('number'),))
return changes
def updateChange(self, change):
def _getDependsOnFromCommit(self, message):
records = []
seen = set()
for match in self.depends_on_re.findall(message):
if match in seen:
self.log.debug("Ignoring duplicate Depends-On: %s" %
(match,))
continue
seen.add(match)
query = "change:%s" % (match,)
self.log.debug("Running query %s to find needed changes" %
(query,))
records.extend(self.gerrit.simpleQuery(query))
return records
def updateChange(self, change, history=None):
self.log.info("Updating information for %s,%s" %
(change.number, change.patchset))
data = self.gerrit.query(change.number)
@ -382,12 +402,35 @@ class Gerrit(object):
# for dependencies.
return change
if history is None:
history = []
else:
history = history[:]
history.append(change.number)
change.needs_changes = []
if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self._getChange(dep_num, dep_ps)
if not dep.is_merged:
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
self.log.debug("Getting git-dependent change %s,%s" %
(dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
if (not dep.is_merged) and dep not in change.needs_changes:
change.needs_changes.append(dep)
for record in self._getDependsOnFromCommit(data['commitMessage']):
dep_num = record['number']
dep_ps = record['currentPatchSet']['number']
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
self.log.debug("Getting commit-dependent change %s,%s" %
(dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
if (not dep.is_merged) and dep not in change.needs_changes:
change.needs_changes.append(dep)
change.needed_by_changes = []
@ -396,7 +439,7 @@ class Gerrit(object):
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self._getChange(dep_num, dep_ps)
if not dep.is_merged and dep.is_current_patchset:
if (not dep.is_merged) and dep.is_current_patchset:
change.needed_by_changes.append(dep)
return change