Refactor sources out of triggers

This is to further differentiate between sources and triggers.
Eventually allowing for multiple triggers per pipeline.

Still to come is separating connections from everything.

Change-Id: I1d680dbed5f650165643842af450f16b32ec5ed9
This commit is contained in:
Joshua Hesketh 2014-11-27 11:31:02 +11:00
parent 66c8e52e42
commit 850ccb6022
12 changed files with 569 additions and 511 deletions

View File

@ -52,6 +52,7 @@ import zuul.merger.merger
import zuul.merger.server
import zuul.reporter.gerrit
import zuul.reporter.smtp
import zuul.source.gerrit
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.trigger.zuultrigger
@ -382,12 +383,12 @@ class FakeGerrit(object):
log = logging.getLogger("zuul.test.FakeGerrit")
def __init__(self, hostname, username, port=29418, keyfile=None,
changes_dbs={}):
changes_dbs={}, queues_dbs={}):
self.hostname = hostname
self.username = username
self.port = port
self.keyfile = keyfile
self.event_queue = Queue.Queue()
self.event_queue = queues_dbs.get(hostname, {})
self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
self.change_number = 0
self.changes = changes_dbs.get(hostname, {})
@ -499,13 +500,14 @@ class FakeURLOpener(object):
return ret
class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
class FakeGerritSource(zuul.source.gerrit.Gerrit):
name = 'gerrit'
def __init__(self, upstream_root, *args):
super(FakeGerritTrigger, self).__init__(*args)
super(FakeGerritSource, self).__init__(*args)
self.upstream_root = upstream_root
self.gerrit_connector.delay = 0.0
self.replication_timeout = 1.5
self.replication_retry_interval = 0.5
def getGitUrl(self, project):
return os.path.join(self.upstream_root, project.name)
@ -958,11 +960,6 @@ class ZuulTestCase(BaseTestCase):
old_urlopen = urllib2.urlopen
urllib2.urlopen = URLOpenerFactory
self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
self.swift)
self.merge_client = zuul.merger.client.MergeClient(
self.config, self.sched)
self.smtp_messages = []
def FakeSMTPFactory(*args, **kw):
@ -971,12 +968,16 @@ class ZuulTestCase(BaseTestCase):
# Set a changes database so multiple FakeGerrit's can report back to
# a virtual canonical database given by the configured hostname
self.gerrit_queues_dbs = {
self.config.get('gerrit', 'server'): Queue.Queue()
}
self.gerrit_changes_dbs = {
self.config.get('gerrit', 'server'): {}
}
def FakeGerritFactory(*args, **kw):
kw['changes_dbs'] = self.gerrit_changes_dbs
kw['queues_dbs'] = self.gerrit_queues_dbs
return FakeGerrit(*args, **kw)
self.useFixture(fixtures.MonkeyPatch('zuul.lib.gerrit.Gerrit',
@ -984,32 +985,23 @@ class ZuulTestCase(BaseTestCase):
self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
self.gerrit = FakeGerritTrigger(
self.upstream_root, self.config, self.sched)
self.gerrit.replication_timeout = 1.5
self.gerrit.replication_retry_interval = 0.5
self.fake_gerrit = self.gerrit.gerrit
self.fake_gerrit.upstream_root = self.upstream_root
self.webapp = zuul.webapp.WebApp(self.sched, port=0)
self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
self.swift)
self.merge_client = zuul.merger.client.MergeClient(
self.config, self.sched)
self.sched.setLauncher(self.launcher)
self.sched.setMerger(self.merge_client)
self.sched.registerTrigger(self.gerrit)
self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
self.sched.registerTrigger(self.timer)
self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
self.sched)
self.sched.registerTrigger(self.zuultrigger)
self.sched.registerReporter(
zuul.reporter.gerrit.Reporter(self.gerrit))
self.smtp_reporter = zuul.reporter.smtp.Reporter(
self.config.get('smtp', 'default_from'),
self.config.get('smtp', 'default_to'),
self.config.get('smtp', 'server'))
self.sched.registerReporter(self.smtp_reporter)
self.register_sources()
self.fake_gerrit = self.gerrit_source.gerrit
self.fake_gerrit.upstream_root = self.upstream_root
self.register_triggers()
self.register_reporters()
self.webapp = zuul.webapp.WebApp(self.sched, port=0)
self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.sched.start()
self.sched.reconfigure(self.config)
@ -1024,6 +1016,38 @@ class ZuulTestCase(BaseTestCase):
self.addCleanup(self.assertFinalState)
self.addCleanup(self.shutdown)
def register_sources(self):
# Register the available sources
self.gerrit_source = FakeGerritSource(
self.upstream_root, self.config, self.sched)
self.gerrit_source.replication_timeout = 1.5
self.gerrit_source.replication_retry_interval = 0.5
self.sched.registerSource(self.gerrit_source)
def register_triggers(self):
# Register the available triggers
self.gerrit_trigger = zuul.trigger.gerrit.Gerrit(
self.fake_gerrit, self.config, self.sched, self.gerrit_source)
self.gerrit_trigger.gerrit_connector.delay = 0.0
self.sched.registerTrigger(self.gerrit_trigger)
self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
self.sched.registerTrigger(self.timer)
self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
self.sched)
self.sched.registerTrigger(self.zuultrigger)
def register_reporters(self):
# Register the available reporters
self.sched.registerReporter(
zuul.reporter.gerrit.Reporter(self.fake_gerrit))
self.smtp_reporter = zuul.reporter.smtp.Reporter(
self.config.get('smtp', 'default_from'),
self.config.get('smtp', 'default_to'),
self.config.get('smtp', 'server'))
self.sched.registerReporter(self.smtp_reporter)
def setup_config(self):
"""Per test config object. Override to set different config."""
self.config = ConfigParser.ConfigParser()
@ -1050,7 +1074,7 @@ class ZuulTestCase(BaseTestCase):
self.merge_server.join()
self.merge_client.stop()
self.worker.shutdown()
self.gerrit.stop()
self.gerrit_trigger.stop()
self.timer.stop()
self.sched.stop()
self.sched.join()

View File

@ -729,8 +729,8 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(self.history[6].changes,
'1,1 2,1 3,1 4,1 5,1 6,1 7,1')
def test_trigger_cache(self):
"Test that the trigger cache operates correctly"
def test_source_cache(self):
"Test that the source cache operates correctly"
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@ -762,9 +762,9 @@ class TestScheduler(ZuulTestCase):
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.waitUntilSettled()
self.log.debug("len %s" % self.gerrit._change_cache.keys())
self.log.debug("len %s" % self.gerrit_source._change_cache.keys())
# there should still be changes in the cache
self.assertNotEqual(len(self.gerrit._change_cache.keys()), 0)
self.assertNotEqual(len(self.gerrit_source._change_cache.keys()), 0)
self.worker.hold_jobs_in_build = False
self.worker.release()
@ -1469,7 +1469,7 @@ class TestScheduler(ZuulTestCase):
"Test that the merger works with large changes after a repack"
# https://bugs.launchpad.net/zuul/+bug/1078946
# This test assumes the repo is already cloned; make sure it is
url = self.sched.triggers['gerrit'].getGitUrl(
url = self.sched.sources['gerrit'].getGitUrl(
self.sched.layout.projects['org/project1'])
self.merge_server.merger.addProject('org/project1', url)
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
@ -2165,6 +2165,7 @@ class TestScheduler(ZuulTestCase):
def test_test_config(self):
"Test that we can test the config"
sched = zuul.scheduler.Scheduler()
sched.registerSource(None, 'gerrit')
sched.registerTrigger(None, 'gerrit')
sched.registerTrigger(None, 'timer')
sched.registerTrigger(None, 'zuul')

View File

@ -144,17 +144,58 @@ class Server(zuul.cmd.ZuulApp):
if self.gear_server_pid:
os.kill(self.gear_server_pid, signal.SIGKILL)
def register_sources(self):
# Register the available sources
# See comment at top of file about zuul imports
import zuul.source.gerrit
self.gerrit_source = zuul.source.gerrit.Gerrit(self.config, self.sched)
self.sched.registerSource(self.gerrit_source)
def register_triggers(self):
# Register the available triggers
# See comment at top of file about zuul imports
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.trigger.zuultrigger
self.gerrit_trigger = zuul.trigger.gerrit.Gerrit(self.gerrit,
self.config,
self.sched,
self.gerrit_source)
timer = zuul.trigger.timer.Timer(self.config, self.sched)
zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
self.sched)
self.sched.registerTrigger(self.gerrit_trigger)
self.sched.registerTrigger(timer)
self.sched.registerTrigger(zuultrigger)
def register_reporters(self):
# Register the available reporters
# See comment at top of file about zuul imports
import zuul.reporter.gerrit
import zuul.reporter.smtp
gerrit_reporter = zuul.reporter.gerrit.Reporter(self.gerrit)
smtp_reporter = zuul.reporter.smtp.Reporter(
self.config.get('smtp', 'default_from')
if self.config.has_option('smtp', 'default_from') else 'zuul',
self.config.get('smtp', 'default_to')
if self.config.has_option('smtp', 'default_to') else 'zuul',
self.config.get('smtp', 'server')
if self.config.has_option('smtp', 'server') else 'localhost',
self.config.get('smtp', 'port')
if self.config.has_option('smtp', 'port') else 25
)
self.sched.registerReporter(gerrit_reporter)
self.sched.registerReporter(smtp_reporter)
def main(self):
# See comment at top of file about zuul imports
import zuul.scheduler
import zuul.launcher.gearman
import zuul.merger.client
import zuul.lib.swift
import zuul.reporter.gerrit
import zuul.reporter.smtp
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.trigger.zuultrigger
import zuul.webapp
import zuul.rpclistener
@ -172,35 +213,22 @@ class Server(zuul.cmd.ZuulApp):
gearman = zuul.launcher.gearman.Gearman(self.config, self.sched,
self.swift)
merger = zuul.merger.client.MergeClient(self.config, self.sched)
gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
timer = zuul.trigger.timer.Timer(self.config, self.sched)
zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
self.sched)
if self.config.has_option('zuul', 'status_expiry'):
cache_expiry = self.config.getint('zuul', 'status_expiry')
else:
cache_expiry = 1
webapp = zuul.webapp.WebApp(self.sched, cache_expiry=cache_expiry)
rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
gerrit_reporter = zuul.reporter.gerrit.Reporter(gerrit)
smtp_reporter = zuul.reporter.smtp.Reporter(
self.config.get('smtp', 'default_from')
if self.config.has_option('smtp', 'default_from') else 'zuul',
self.config.get('smtp', 'default_to')
if self.config.has_option('smtp', 'default_to') else 'zuul',
self.config.get('smtp', 'server')
if self.config.has_option('smtp', 'server') else 'localhost',
self.config.get('smtp', 'port')
if self.config.has_option('smtp', 'port') else 25
)
self.sched.setLauncher(gearman)
self.sched.setMerger(merger)
self.sched.registerTrigger(gerrit)
self.sched.registerTrigger(timer)
self.sched.registerTrigger(zuultrigger)
self.sched.registerReporter(gerrit_reporter)
self.sched.registerReporter(smtp_reporter)
self.register_sources()
# TODO(jhesketh): Use connections instead of grabbing the gerrit lib
# from the source
self.gerrit = self.gerrit_source.gerrit
self.register_triggers()
self.register_reporters()
self.log.info('Starting scheduler')
self.sched.start()

View File

@ -301,11 +301,11 @@ class ActionReporter(object):
self.reporter = reporter
self.params = params
def report(self, change, message):
def report(self, source, change, message):
"""Sends the built message off to the configured reporter.
Takes the change and message and adds the configured parameters.
"""
return self.reporter.report(change, message, self.params)
return self.reporter.report(source, change, message, self.params)
def getSubmitAllowNeeds(self):
"""Gets the submit allow needs from the reporter based off the

View File

@ -21,20 +21,20 @@ class Reporter(object):
name = 'gerrit'
log = logging.getLogger("zuul.reporter.gerrit.Reporter")
def __init__(self, trigger):
def __init__(self, gerrit):
"""Set up the reporter."""
self.gerrit = trigger.gerrit
self.trigger = trigger
# TODO: make default_gerrit come from a connection
self.default_gerrit = gerrit
def report(self, change, message, params):
def report(self, source, change, message, params):
"""Send a message to gerrit."""
self.log.debug("Report change %s, params %s, message: %s" %
(change, params, message))
changeid = '%s,%s' % (change.number, change.patchset)
change._ref_sha = self.trigger.getRefSha(change.project.name,
'refs/heads/' + change.branch)
return self.gerrit.review(change.project.name, changeid, message,
params)
change._ref_sha = source.getRefSha(change.project.name,
'refs/heads/' + change.branch)
return self.default_gerrit.review(
change.project.name, changeid, message, params)
def getSubmitAllowNeeds(self, params):
"""Get a list of code review labels that are allowed to be

View File

@ -35,7 +35,7 @@ class Reporter(object):
self.smtp_default_from = smtp_default_from
self.smtp_default_to = smtp_default_to
def report(self, change, message, params):
def report(self, source, change, message, params):
"""Send the compiled report message via smtp."""
self.log.debug("Report change %s, params %s, message: %s" %
(change, params, message))

View File

@ -189,6 +189,7 @@ class Scheduler(threading.Thread):
self._stopped = False
self.launcher = None
self.merger = None
self.sources = dict()
self.triggers = dict()
self.reporters = dict()
self.config = None
@ -265,8 +266,8 @@ class Scheduler(threading.Thread):
pipeline = Pipeline(conf_pipeline['name'])
pipeline.description = conf_pipeline.get('description')
# TODO(jeblair): remove backwards compatibility:
pipeline.source = self.triggers[conf_pipeline.get('source',
'gerrit')]
pipeline.source = self.sources[conf_pipeline.get('source',
'gerrit')]
precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
pipeline.precedence = precedence
pipeline.failure_message = conf_pipeline.get('failure-message',
@ -513,6 +514,11 @@ class Scheduler(threading.Thread):
def setMerger(self, merger):
self.merger = merger
def registerSource(self, source, name=None):
if name is None:
name = source.name
self.sources[name] = source
def registerTrigger(self, trigger, name=None):
if name is None:
name = trigger.name
@ -756,6 +762,8 @@ class Scheduler(threading.Thread):
self.maintainTriggerCache()
for trigger in self.triggers.values():
trigger.postConfig()
for source in self.sources.values():
source.postConfig()
if statsd:
try:
for pipeline in self.layout.pipelines.values():
@ -887,8 +895,8 @@ class Scheduler(threading.Thread):
relevant.update(item.change.getRelatedChanges())
self.log.debug("End maintain trigger cache for: %s" % pipeline)
self.log.debug("Trigger cache size: %s" % len(relevant))
for trigger in self.triggers.values():
trigger.maintainCache(relevant)
for source in self.sources.values():
source.maintainCache(relevant)
def process_event_queue(self):
self.log.debug("Fetching trigger event")
@ -1131,14 +1139,14 @@ class BasePipelineManager(object):
if self.sched.config.has_option('zuul', 'status_url'):
msg += "\n" + self.sched.config.get('zuul', 'status_url')
ret = self.sendReport(self.pipeline.start_actions,
change, msg)
self.pipeline.source, change, msg)
if ret:
self.log.error("Reporting change start %s received: %s" %
(change, ret))
except:
self.log.exception("Exception while reporting start:")
def sendReport(self, action_reporters, change, message):
def sendReport(self, action_reporters, source, change, message):
"""Sends the built message off to configured reporters.
Takes the action_reporters, change, message and extra options and
@ -1147,7 +1155,7 @@ class BasePipelineManager(object):
report_errors = []
if len(action_reporters) > 0:
for action_reporter in action_reporters:
ret = action_reporter.report(change, message)
ret = action_reporter.report(source, change, message)
if ret:
report_errors.append(ret)
if len(report_errors) == 0:
@ -1604,7 +1612,8 @@ class BasePipelineManager(object):
try:
self.log.info("Reporting change %s, actions: %s" %
(item.change, actions))
ret = self.sendReport(actions, item.change, report)
ret = self.sendReport(actions, self.pipeline.source,
item.change, report)
if ret:
self.log.error("Reporting change %s received: %s" %
(item.change, ret))

0
zuul/source/__init__.py Normal file
View File

408
zuul/source/gerrit.py Normal file
View File

@ -0,0 +1,408 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import re
import time
import urllib2
from zuul.lib import gerrit
from zuul.model import Change, Ref, NullChange
class Gerrit(object):
name = 'gerrit'
log = logging.getLogger("zuul.source.Gerrit")
replication_timeout = 300
replication_retry_interval = 5
depends_on_re = re.compile(r"^Depends-On: (I[0-9a-f]{40})\s*$",
re.MULTILINE | re.IGNORECASE)
def __init__(self, config, sched):
self._change_cache = {}
self.sched = sched
self.config = config
self.server = config.get('gerrit', 'server')
if config.has_option('gerrit', 'baseurl'):
self.baseurl = config.get('gerrit', 'baseurl')
else:
self.baseurl = 'https://%s' % self.server
user = config.get('gerrit', 'user')
if config.has_option('gerrit', 'sshkey'):
sshkey = config.get('gerrit', 'sshkey')
else:
sshkey = None
if config.has_option('gerrit', 'port'):
port = int(config.get('gerrit', 'port'))
else:
port = 29418
self.gerrit = gerrit.Gerrit(self.server, user, port, sshkey)
self.gerrit.startWatching()
def _getInfoRefs(self, project):
url = "%s/p/%s/info/refs?service=git-upload-pack" % (
self.baseurl, project)
try:
data = urllib2.urlopen(url).read()
except:
self.log.error("Cannot get references from %s" % url)
raise # keeps urllib2 error informations
ret = {}
read_headers = False
read_advertisement = False
if data[4] != '#':
raise Exception("Gerrit repository does not support "
"git-upload-pack")
i = 0
while i < len(data):
if len(data) - i < 4:
raise Exception("Invalid length in info/refs")
plen = int(data[i:i + 4], 16)
i += 4
# It's the length of the packet, including the 4 bytes of the
# length itself, unless it's null, in which case the length is
# not included.
if plen > 0:
plen -= 4
if len(data) - i < plen:
raise Exception("Invalid data in info/refs")
line = data[i:i + plen]
i += plen
if not read_headers:
if plen == 0:
read_headers = True
continue
if not read_advertisement:
read_advertisement = True
continue
if plen == 0:
# The terminating null
continue
line = line.strip()
revision, ref = line.split()
ret[ref] = revision
return ret
def getRefSha(self, project, ref):
refs = {}
try:
refs = self._getInfoRefs(project)
except:
self.log.exception("Exception looking for ref %s" %
ref)
sha = refs.get(ref, '')
return sha
def waitForRefSha(self, project, ref, old_sha=''):
# Wait for the ref to show up in the repo
start = time.time()
while time.time() - start < self.replication_timeout:
sha = self.getRefSha(project.name, ref)
if old_sha != sha:
return True
time.sleep(self.replication_retry_interval)
return False
def isMerged(self, change, head=None):
self.log.debug("Checking if change %s is merged" % change)
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
data = self.gerrit.query(change.number)
change._data = data
change.is_merged = self._isMerged(change)
if not head:
return change.is_merged
if not change.is_merged:
return False
ref = 'refs/heads/' + change.branch
self.log.debug("Waiting for %s to appear in git repo" % (change))
if self.waitForRefSha(change.project, ref, change._ref_sha):
self.log.debug("Change %s is in the git repo" %
(change))
return True
self.log.debug("Change %s did not appear in the git repo" %
(change))
return False
def _isMerged(self, change):
data = change._data
if not data:
return False
status = data.get('status')
if not status:
return False
self.log.debug("Change %s status: %s" % (change, status))
if status == 'MERGED':
return True
return False
def canMerge(self, change, allow_needs):
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
data = change._data
if not data:
return False
if 'submitRecords' not in data:
return False
try:
for sr in data['submitRecords']:
if sr['status'] == 'OK':
return True
elif sr['status'] == 'NOT_READY':
for label in sr['labels']:
if label['status'] in ['OK', 'MAY']:
continue
elif label['status'] in ['NEED', 'REJECT']:
# It may be our own rejection, so we ignore
if label['label'].lower() not in allow_needs:
return False
continue
else:
# IMPOSSIBLE
return False
else:
# CLOSED, RULE_ERROR
return False
except:
self.log.exception("Exception determining whether change"
"%s can merge:" % change)
return False
return True
def maintainCache(self, relevant):
# This lets the user supply a list of change objects that are
# still in use. Anything in our cache that isn't in the supplied
# list should be safe to remove from the cache.
remove = []
for key, change in self._change_cache.items():
if change not in relevant:
remove.append(key)
for key in remove:
del self._change_cache[key]
def postConfig(self):
pass
def getChange(self, event, project):
if event.change_number:
change = self._getChange(event.change_number, event.patch_number)
elif event.ref:
change = Ref(project)
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self.getGitwebUrl(project, sha=event.newrev)
else:
change = NullChange(project)
return change
def _getChange(self, number, patchset, refresh=False, history=None):
key = '%s,%s' % (number, patchset)
change = None
if key in self._change_cache:
change = self._change_cache.get(key)
if not refresh:
return change
if not change:
change = Change(None)
change.number = number
change.patchset = patchset
key = '%s,%s' % (change.number, change.patchset)
self._change_cache[key] = change
try:
self.updateChange(change, history)
except Exception:
del self._change_cache[key]
raise
return change
def getProjectOpenChanges(self, project):
# This is a best-effort function in case Gerrit is unable to return
# a particular change. It happens.
query = "project:%s status:open" % (project.name,)
self.log.debug("Running query %s to get project open changes" %
(query,))
data = self.gerrit.simpleQuery(query)
changes = []
for record in data:
try:
changes.append(
self._getChange(record['number'],
record['currentPatchSet']['number']))
except Exception:
self.log.exception("Unable to query change %s" %
(record.get('number'),))
return changes
def _getDependsOnFromCommit(self, message):
records = []
seen = set()
for match in self.depends_on_re.findall(message):
if match in seen:
self.log.debug("Ignoring duplicate Depends-On: %s" %
(match,))
continue
seen.add(match)
query = "change:%s" % (match,)
self.log.debug("Running query %s to find needed changes" %
(query,))
records.extend(self.gerrit.simpleQuery(query))
return records
def _getNeededByFromCommit(self, change_id):
records = []
seen = set()
query = 'message:%s' % change_id
self.log.debug("Running query %s to find changes needed-by" %
(query,))
results = self.gerrit.simpleQuery(query)
for result in results:
for match in self.depends_on_re.findall(
result['commitMessage']):
if match != change_id:
continue
key = (result['number'], result['currentPatchSet']['number'])
if key in seen:
continue
self.log.debug("Found change %s,%s needs %s from commit" %
(key[0], key[1], change_id))
seen.add(key)
records.append(result)
return records
def updateChange(self, change, history=None):
self.log.info("Updating information for %s,%s" %
(change.number, change.patchset))
data = self.gerrit.query(change.number)
change._data = data
if change.patchset is None:
change.patchset = data['currentPatchSet']['number']
if 'project' not in data:
raise Exception("Change %s,%s not found" % (change.number,
change.patchset))
# If updated changed came as a dependent on
# and its project is not defined,
# then create a 'foreign' project for it in layout
change.project = self.sched.getProject(data['project'],
create_foreign=bool(history))
change.branch = data['branch']
change.url = data['url']
max_ps = 0
files = []
for ps in data['patchSets']:
if ps['number'] == change.patchset:
change.refspec = ps['ref']
for f in ps.get('files', []):
files.append(f['file'])
if int(ps['number']) > int(max_ps):
max_ps = ps['number']
if max_ps == change.patchset:
change.is_current_patchset = True
else:
change.is_current_patchset = False
change.files = files
change.is_merged = self._isMerged(change)
change.approvals = data['currentPatchSet'].get('approvals', [])
change.open = data['open']
change.status = data['status']
change.owner = data['owner']
if change.is_merged:
# This change is merged, so we don't need to look any further
# for dependencies.
return change
if history is None:
history = []
else:
history = history[:]
history.append(change.number)
needs_changes = []
if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
self.log.debug("Getting git-dependent change %s,%s" %
(dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
for record in self._getDependsOnFromCommit(data['commitMessage']):
dep_num = record['number']
dep_ps = record['currentPatchSet']['number']
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
self.log.debug("Getting commit-dependent change %s,%s" %
(dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
change.needs_changes = needs_changes
needed_by_changes = []
if 'neededBy' in data:
for needed in data['neededBy']:
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self._getChange(dep_num, dep_ps)
if (not dep.is_merged) and dep.is_current_patchset:
needed_by_changes.append(dep)
for record in self._getNeededByFromCommit(data['id']):
dep_num = record['number']
dep_ps = record['currentPatchSet']['number']
self.log.debug("Getting commit-needed change %s,%s" %
(dep_num, dep_ps))
# Because a commit needed-by may be a cross-repo
# dependency, cause that change to refresh so that it will
# reference the latest patchset of its Depends-On (this
# change).
dep = self._getChange(dep_num, dep_ps, refresh=True)
if (not dep.is_merged) and dep.is_current_patchset:
needed_by_changes.append(dep)
change.needed_by_changes = needed_by_changes
return change
def getGitUrl(self, project):
server = self.config.get('gerrit', 'server')
user = self.config.get('gerrit', 'user')
if self.config.has_option('gerrit', 'port'):
port = int(self.config.get('gerrit', 'port'))
else:
port = 29418
url = 'ssh://%s@%s:%s/%s' % (user, server, port, project.name)
return url
def getGitwebUrl(self, project, sha=None):
url = '%s/gitweb?p=%s.git' % (self.baseurl, project)
if sha:
url += ';a=commitdiff;h=' + sha
return url

View File

@ -13,13 +13,10 @@
# under the License.
import logging
import re
import threading
import time
import urllib2
import voluptuous
from zuul.lib import gerrit
from zuul.model import TriggerEvent, Change, Ref, NullChange
from zuul.model import TriggerEvent
class GerritEventConnector(threading.Thread):
@ -28,12 +25,13 @@ class GerritEventConnector(threading.Thread):
log = logging.getLogger("zuul.GerritEventConnector")
delay = 5.0
def __init__(self, gerrit, sched, trigger):
def __init__(self, gerrit, sched, trigger, source):
super(GerritEventConnector, self).__init__()
self.daemon = True
self.gerrit = gerrit
self.sched = sched
self.trigger = trigger
self.source = source
self._stopped = False
def stop(self):
@ -98,9 +96,9 @@ class GerritEventConnector(threading.Thread):
# Call _getChange for the side effect of updating the
# cache. Note that this modifies Change objects outside
# the main thread.
self.trigger._getChange(event.change_number,
event.patch_number,
refresh=True)
self.source._getChange(event.change_number,
event.patch_number,
refresh=True)
self.sched.addEvent(event)
@ -118,398 +116,28 @@ class GerritEventConnector(threading.Thread):
class Gerrit(object):
name = 'gerrit'
log = logging.getLogger("zuul.Gerrit")
replication_timeout = 300
replication_retry_interval = 5
log = logging.getLogger("zuul.trigger.Gerrit")
depends_on_re = re.compile(r"^Depends-On: (I[0-9a-f]{40})\s*$",
re.MULTILINE | re.IGNORECASE)
def __init__(self, config, sched):
self._change_cache = {}
def __init__(self, gerrit, config, sched, source):
self.sched = sched
# TODO(jhesketh): Make 'gerrit' come from a connection (rather than the
# source)
# TODO(jhesketh): Remove the requirement for a gerrit source (currently
# it is needed so on a trigger event the cache is
# updated. However if we share a connection object the
# cache could be stored there)
self.config = config
self.server = config.get('gerrit', 'server')
if config.has_option('gerrit', 'baseurl'):
self.baseurl = config.get('gerrit', 'baseurl')
else:
self.baseurl = 'https://%s' % self.server
user = config.get('gerrit', 'user')
if config.has_option('gerrit', 'sshkey'):
sshkey = config.get('gerrit', 'sshkey')
else:
sshkey = None
if config.has_option('gerrit', 'port'):
port = int(config.get('gerrit', 'port'))
else:
port = 29418
self.gerrit = gerrit.Gerrit(self.server, user, port, sshkey)
self.gerrit.startWatching()
self.gerrit_connector = GerritEventConnector(
self.gerrit, sched, self)
self.gerrit_connector = GerritEventConnector(gerrit, sched, self,
source)
self.gerrit_connector.start()
def stop(self):
self.gerrit_connector.stop()
self.gerrit_connector.join()
def _getInfoRefs(self, project):
url = "%s/p/%s/info/refs?service=git-upload-pack" % (
self.baseurl, project)
try:
data = urllib2.urlopen(url).read()
except:
self.log.error("Cannot get references from %s" % url)
raise # keeps urllib2 error informations
ret = {}
read_headers = False
read_advertisement = False
if data[4] != '#':
raise Exception("Gerrit repository does not support "
"git-upload-pack")
i = 0
while i < len(data):
if len(data) - i < 4:
raise Exception("Invalid length in info/refs")
plen = int(data[i:i + 4], 16)
i += 4
# It's the length of the packet, including the 4 bytes of the
# length itself, unless it's null, in which case the length is
# not included.
if plen > 0:
plen -= 4
if len(data) - i < plen:
raise Exception("Invalid data in info/refs")
line = data[i:i + plen]
i += plen
if not read_headers:
if plen == 0:
read_headers = True
continue
if not read_advertisement:
read_advertisement = True
continue
if plen == 0:
# The terminating null
continue
line = line.strip()
revision, ref = line.split()
ret[ref] = revision
return ret
def getRefSha(self, project, ref):
refs = {}
try:
refs = self._getInfoRefs(project)
except:
self.log.exception("Exception looking for ref %s" %
ref)
sha = refs.get(ref, '')
return sha
def waitForRefSha(self, project, ref, old_sha=''):
# Wait for the ref to show up in the repo
start = time.time()
while time.time() - start < self.replication_timeout:
sha = self.getRefSha(project.name, ref)
if old_sha != sha:
return True
time.sleep(self.replication_retry_interval)
return False
def isMerged(self, change, head=None):
self.log.debug("Checking if change %s is merged" % change)
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
data = self.gerrit.query(change.number)
change._data = data
change.is_merged = self._isMerged(change)
if not head:
return change.is_merged
if not change.is_merged:
return False
ref = 'refs/heads/' + change.branch
self.log.debug("Waiting for %s to appear in git repo" % (change))
if self.waitForRefSha(change.project, ref, change._ref_sha):
self.log.debug("Change %s is in the git repo" %
(change))
return True
self.log.debug("Change %s did not appear in the git repo" %
(change))
return False
def _isMerged(self, change):
data = change._data
if not data:
return False
status = data.get('status')
if not status:
return False
self.log.debug("Change %s status: %s" % (change, status))
if status == 'MERGED':
return True
return False
def canMerge(self, change, allow_needs):
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
data = change._data
if not data:
return False
if 'submitRecords' not in data:
return False
try:
for sr in data['submitRecords']:
if sr['status'] == 'OK':
return True
elif sr['status'] == 'NOT_READY':
for label in sr['labels']:
if label['status'] in ['OK', 'MAY']:
continue
elif label['status'] in ['NEED', 'REJECT']:
# It may be our own rejection, so we ignore
if label['label'].lower() not in allow_needs:
return False
continue
else:
# IMPOSSIBLE
return False
else:
# CLOSED, RULE_ERROR
return False
except:
self.log.exception("Exception determining whether change"
"%s can merge:" % change)
return False
return True
def maintainCache(self, relevant):
# This lets the user supply a list of change objects that are
# still in use. Anything in our cache that isn't in the supplied
# list should be safe to remove from the cache.
remove = []
for key, change in self._change_cache.items():
if change not in relevant:
remove.append(key)
for key in remove:
del self._change_cache[key]
def postConfig(self):
pass
def getChange(self, event, project):
if event.change_number:
change = self._getChange(event.change_number, event.patch_number)
elif event.ref:
change = Ref(project)
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self.getGitwebUrl(project, sha=event.newrev)
else:
change = NullChange(project)
return change
def _getChange(self, number, patchset, refresh=False, history=None):
key = '%s,%s' % (number, patchset)
change = None
if key in self._change_cache:
change = self._change_cache.get(key)
if not refresh:
return change
if not change:
change = Change(None)
change.number = number
change.patchset = patchset
key = '%s,%s' % (change.number, change.patchset)
self._change_cache[key] = change
try:
self.updateChange(change, history)
except Exception:
del self._change_cache[key]
raise
return change
def getProjectOpenChanges(self, project):
# This is a best-effort function in case Gerrit is unable to return
# a particular change. It happens.
query = "project:%s status:open" % (project.name,)
self.log.debug("Running query %s to get project open changes" %
(query,))
data = self.gerrit.simpleQuery(query)
changes = []
for record in data:
try:
changes.append(
self._getChange(record['number'],
record['currentPatchSet']['number']))
except Exception:
self.log.exception("Unable to query change %s" %
(record.get('number'),))
return changes
def _getDependsOnFromCommit(self, message):
records = []
seen = set()
for match in self.depends_on_re.findall(message):
if match in seen:
self.log.debug("Ignoring duplicate Depends-On: %s" %
(match,))
continue
seen.add(match)
query = "change:%s" % (match,)
self.log.debug("Running query %s to find needed changes" %
(query,))
records.extend(self.gerrit.simpleQuery(query))
return records
def _getNeededByFromCommit(self, change_id):
records = []
seen = set()
query = 'message:%s' % change_id
self.log.debug("Running query %s to find changes needed-by" %
(query,))
results = self.gerrit.simpleQuery(query)
for result in results:
for match in self.depends_on_re.findall(
result['commitMessage']):
if match != change_id:
continue
key = (result['number'], result['currentPatchSet']['number'])
if key in seen:
continue
self.log.debug("Found change %s,%s needs %s from commit" %
(key[0], key[1], change_id))
seen.add(key)
records.append(result)
return records
def updateChange(self, change, history=None):
self.log.info("Updating information for %s,%s" %
(change.number, change.patchset))
data = self.gerrit.query(change.number)
change._data = data
if change.patchset is None:
change.patchset = data['currentPatchSet']['number']
if 'project' not in data:
raise Exception("Change %s,%s not found" % (change.number,
change.patchset))
# If updated changed came as a dependent on
# and its project is not defined,
# then create a 'foreign' project for it in layout
change.project = self.sched.getProject(data['project'],
create_foreign=bool(history))
change.branch = data['branch']
change.url = data['url']
max_ps = 0
files = []
for ps in data['patchSets']:
if ps['number'] == change.patchset:
change.refspec = ps['ref']
for f in ps.get('files', []):
files.append(f['file'])
if int(ps['number']) > int(max_ps):
max_ps = ps['number']
if max_ps == change.patchset:
change.is_current_patchset = True
else:
change.is_current_patchset = False
change.files = files
change.is_merged = self._isMerged(change)
change.approvals = data['currentPatchSet'].get('approvals', [])
change.open = data['open']
change.status = data['status']
change.owner = data['owner']
if change.is_merged:
# This change is merged, so we don't need to look any further
# for dependencies.
return change
if history is None:
history = []
else:
history = history[:]
history.append(change.number)
needs_changes = []
if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
self.log.debug("Getting git-dependent change %s,%s" %
(dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
for record in self._getDependsOnFromCommit(data['commitMessage']):
dep_num = record['number']
dep_ps = record['currentPatchSet']['number']
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
self.log.debug("Getting commit-dependent change %s,%s" %
(dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
change.needs_changes = needs_changes
needed_by_changes = []
if 'neededBy' in data:
for needed in data['neededBy']:
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self._getChange(dep_num, dep_ps)
if (not dep.is_merged) and dep.is_current_patchset:
needed_by_changes.append(dep)
for record in self._getNeededByFromCommit(data['id']):
dep_num = record['number']
dep_ps = record['currentPatchSet']['number']
self.log.debug("Getting commit-needed change %s,%s" %
(dep_num, dep_ps))
# Because a commit needed-by may be a cross-repo
# dependency, cause that change to refresh so that it will
# reference the latest patchset of its Depends-On (this
# change).
dep = self._getChange(dep_num, dep_ps, refresh=True)
if (not dep.is_merged) and dep.is_current_patchset:
needed_by_changes.append(dep)
change.needed_by_changes = needed_by_changes
return change
def getGitUrl(self, project):
server = self.config.get('gerrit', 'server')
user = self.config.get('gerrit', 'user')
if self.config.has_option('gerrit', 'port'):
port = int(self.config.get('gerrit', 'port'))
else:
port = 29418
url = 'ssh://%s@%s:%s/%s' % (user, server, port, project.name)
return url
def getGitwebUrl(self, project, sha=None):
url = '%s/gitweb?p=%s.git' % (self.baseurl, project)
if sha:
url += ';a=commitdiff;h=' + sha
return url
def validate_trigger(trigger_data):
"""Validates the layout's trigger data."""

View File

@ -41,17 +41,6 @@ class Timer(object):
def stop(self):
self.apsched.shutdown()
def isMerged(self, change, head=None):
raise Exception("Timer trigger does not support checking if "
"a change is merged.")
def canMerge(self, change, allow_needs):
raise Exception("Timer trigger does not support checking if "
"a change can merge.")
def maintainCache(self, relevant):
return
def postConfig(self):
for job in self.apsched.get_jobs():
self.apsched.unschedule_job(job)
@ -81,12 +70,3 @@ class Timer(object):
second=second,
args=(pipeline.name,
timespec,))
def getChange(self, event, project):
raise Exception("Timer trigger does not support changes.")
def getGitUrl(self, project):
raise Exception("Timer trigger does not support changes.")
def getGitwebUrl(self, project, sha=None):
raise Exception("Timer trigger does not support changes.")

View File

@ -30,17 +30,6 @@ class ZuulTrigger(object):
def stop(self):
pass
def isMerged(self, change, head=None):
raise Exception("Zuul trigger does not support checking if "
"a change is merged.")
def canMerge(self, change, allow_needs):
raise Exception("Zuul trigger does not support checking if "
"a change can merge.")
def maintainCache(self, relevant):
return
def onChangeMerged(self, change):
# Called each time zuul merges a change
if self._handle_project_change_merged_events:
@ -62,7 +51,7 @@ class ZuulTrigger(object):
"%s in %s" % (change, pipeline))
def _createProjectChangeMergedEvents(self, change):
changes = self.sched.triggers['gerrit'].getProjectOpenChanges(
changes = self.sched.sources['gerrit'].getProjectOpenChanges(
change.project)
for open_change in changes:
self._createProjectChangeMergedEvent(open_change)
@ -111,12 +100,3 @@ class ZuulTrigger(object):
self._handle_parent_change_enqueued_events = True
elif 'project-change-merged' in ef._types:
self._handle_project_change_merged_events = True
def getChange(self, number, patchset, refresh=False):
raise Exception("Zuul trigger does not support changes.")
def getGitUrl(self, project):
raise Exception("Zuul trigger does not support changes.")
def getGitwebUrl(self, project, sha=None):
raise Exception("Zuul trigger does not support changes.")