commit ee743613ce5b3aee11d12e91e932d7876bc0b40c Author: James E. Blair Date: Tue May 29 14:49:32 2012 -0700 Initial commit. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000..84dbe1d0de --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +build +dist +zuul.egg-info +MANIFEST +.tox + diff --git a/.gitreview b/.gitreview new file mode 100644 index 0000000000..cf39bc601a --- /dev/null +++ b/.gitreview @@ -0,0 +1,4 @@ +[gerrit] +host=review.openstack.org +port=29418 +project=openstack-ci/zuul.git diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 0000000000..1fd7b21b8c --- /dev/null +++ b/AUTHORS @@ -0,0 +1 @@ +James E. Blair diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000000..75b52484ea --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README b/README new file mode 100644 index 0000000000..52383b50fe --- /dev/null +++ b/README @@ -0,0 +1 @@ +This is a trunk gating system developed for the OpenStack Project. diff --git a/setup.py b/setup.py new file mode 100644 index 0000000000..8456cc4bbe --- /dev/null +++ b/setup.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from setuptools import find_packages +from setuptools.command.sdist import sdist +from setuptools import setup +import subprocess + +setup(name='zuul', + version='1.0', + description="Trunk gating system", + license='Apache License (2.0)', + author='Hewlett-Packard Development Company, L.P.', + author_email='openstack@lists.launchpad.net', + url='http://launchpad.net/zuul', + scripts=['zuul-server'], + include_package_data=True, + zip_safe=False, + ) diff --git a/zuul-server b/zuul-server new file mode 100755 index 0000000000..e3e1926713 --- /dev/null +++ b/zuul-server @@ -0,0 +1,53 @@ +import argparse +import ConfigParser +import os + +import zuul.scheduler +import zuul.launcher.jenkins +import zuul.trigger.gerrit + +import logging.config + +def parse_arguments(): + parser = argparse.ArgumentParser(description='Project gating system.') + parser.add_argument('-c', dest='config', + help='specify the config file') + return parser.parse_args() + +def read_config(args): + config=ConfigParser.ConfigParser() + if args.config: + locations = [args.config] + else: + locations = ['/etc/zuul/zuul.conf', + '~/zuul.conf'] + for fp in locations: + if os.path.exists(os.path.expanduser(fp)): + config.read(fp) + return config + raise Exception("Unable to locate config file in %s" % locations) + +def setup_logging(config): + if config.has_option('zuul', 'log_config'): + fp = os.path.expanduser(config.get('zuul', 'log_config')) + if not os.path.exists(fp): + raise Exception("Unable to read logging config file at %s" % fp) + logging.config.fileConfig(fp) + else: + logging.basicConfig(level=logging.DEBUG) + +def main(config): + sched = zuul.scheduler.Scheduler(config) + + jenkins = zuul.launcher.jenkins.Jenkins(config, sched) + gerrit = zuul.trigger.gerrit.Gerrit(config, sched) + + sched.setLauncher(jenkins) + sched.setTrigger(gerrit) + sched.run() + +if __name__ == '__main__': + args = parse_arguments() + config = read_config(args) + setup_logging(config) + main(config) diff --git a/zuul/__init__.py b/zuul/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zuul/launcher/__init__.py b/zuul/launcher/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zuul/launcher/jenkins.py b/zuul/launcher/jenkins.py new file mode 100644 index 0000000000..cd9e19d55f --- /dev/null +++ b/zuul/launcher/jenkins.py @@ -0,0 +1,217 @@ +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# So we can name this module "jenkins" and still load the "jenkins" +# system module +from __future__ import absolute_import + +import threading +from webob import Request +from paste import httpserver +from uuid import uuid1 +import jenkins +import json +import urllib2 # for extending jenkins lib +import logging +import pprint + +from zuul.model import Build + +class JenkinsCallback(threading.Thread): + log = logging.getLogger("zuul.JenkinsCallback") + + def __init__(self, jenkins): + threading.Thread.__init__(self) + self.jenkins = jenkins + + def run(self): + httpserver.serve(self.app, host='0.0.0.0', port='8080') + + def app(self, environ, start_response): + request = Request(environ) + if request.path == '/jenkins_endpoint': + self.jenkins_endpoint(request) + start_response('200 OK', [('content-type', 'text/html')]) + return ['Zuul good.'] + + def jenkins_endpoint(self, request): + data = json.loads(request.body) + if data: + self.log.debug("Received data from Jenkins: \n%s" % ( + pprint.pformat(data))) + build = data.get('build') + if build: + phase = build.get('phase') + status = build.get('status') + url = build.get('full_url') + number = build.get('number') + params = build.get('parameters') + if params: + uuid = params.get('UUID') + if (status and url and uuid and phase + and phase == 'COMPLETED'): + self.jenkins.onBuildCompleted(uuid, status, url, number) + if (phase and phase == 'STARTED'): + self.jenkins.onBuildStarted(uuid, url, number) + + +STOP_BUILD = 'job/%(name)s/%(number)s/stop' +CANCEL_QUEUE = 'queue/item/%(number)s/cancelQueue' +BUILD_INFO = 'job/%(name)s/%(number)s/api/json?depth=0' + +class ExtendedJenkins(jenkins.Jenkins): + def jenkins_open(self, req): + ''' + Utility routine for opening an HTTP request to a Jenkins server. + ''' + try: + if self.auth: + req.add_header('Authorization', self.auth) + return urllib2.urlopen(req).read() + except urllib2.HTTPError, e: + print e.msg + print e.fp.read() + raise + + def stop_build(self, name, number): + ''' + Stop a running Jenkins build. + + @param name: Name of Jenkins job + @type name: str + @param number: Jenkins build number for the job + @type number: int + ''' + self.jenkins_open(urllib2.Request(self.server + STOP_BUILD%locals())) + + def cancel_queue(self, number): + ''' + Cancel a queued build. + + @param number: Jenkins queue number for the build + @type number: int + ''' + # Jenkins returns a 302 from this URL, unless Referer is not set, + # then you get a 404. + self.jenkins_open(urllib2.Request(self.server + CANCEL_QUEUE%locals(), + headers={'Referer': self.server})) + + + def get_build_info(self, name, number): + ''' + Get information for a build. + + @param name: Name of Jenkins job + @type name: str + @param number: Jenkins build number for the job + @type number: int + @return: dictionary + ''' + return json.loads(self.jenkins_open(urllib2.Request(self.server + BUILD_INFO%locals()))) + +class Jenkins(object): + log = logging.getLogger("zuul.Jenkins") + + def __init__(self, config, sched): + self.sched = sched + self.builds = {} + server = config.get('jenkins', 'server') + user = config.get('jenkins', 'user') + apikey = config.get('jenkins', 'apikey') + self.jenkins = ExtendedJenkins(server, user, apikey) + self.callback_thread = JenkinsCallback(self) + self.callback_thread.start() + + def launch(self, job, change, dependent_changes = []): + self.log.info("Launch job %s for change %s with dependent changes %s" % ( + job, change, dependent_changes)) + uuid = str(uuid1()) + params = dict(UUID=uuid) + build = Build(job, uuid) + self.builds[uuid] = build + # We can get the started notification on another thread before this is done + # so we add the build even before we trigger the job on Jenkins. We should + # be careful to clean it up if it doesn't actually kick off. + try: + self.jenkins.build_job(job.name, parameters=params) + except: + self.log.exception("Exception launching build %s for job %s for change %s:" % ( + build, job, change)) + # Whoops. Remove that build we added. + del self.builds[uuid] + raise + return build + + def cancel(self, build): + self.log.info("Cancel build %s for job %s" % (build, build.job)) + if build.number: + self.log.debug("Build %s has already started" % build) + self.jenkins.stop_build(build.job.name, build.number) + self.log.debug("Canceled running build %s" % build) + return + else: + self.log.debug("Build %s has not started yet" % build) + + self.log.debug("Looking for build %s in queue" % build) + for item in self.jenkins.get_queue_info(): + if not item.has_key('actions'): + continue + for action in item['actions']: + if not action.has_key('parameters'): + continue + parameters = action['parameters'] + for param in parameters: + if (param['name'] == 'UUID' and build.uuid == param['value']): + self.log.debug("Found queue item %s for build %s" % ( + item['id'], build)) + try: + self.jenkins.cancel_queue(item['id']) + self.log.debug("Canceled queue item %s for build %s" % ( + item['id'], build)) + return + except: + self.log.exception("Exception canceling queue item %s for build %s" % ( + item['id'], build)) + + self.log.debug("Still unable to find build %s to cancel" % build) + if build.number: + self.log.debug("Build %s has just started" % build) + self.jenkins.stop_build(build.job.name, build.number) + self.log.debug("Canceled just running build %s" % build) + else: + self.log.error("Build %s has not started but was not found in queue" % build) + + def onBuildCompleted(self, uuid, status, url, number): + self.log.info("Build %s #%s complete, status %s" % ( + uuid, number, status)) + build = self.builds.get(uuid) + if build: + self.log.debug("Found build %s" % build) + del self.builds[uuid] + build.result = status + build.url = url + build.number = number + self.sched.onBuildCompleted(build) + else: + self.log.error("Unable to find build %s" % uuid) + + def onBuildStarted(self, uuid, url, number): + self.log.info("Build %s #%s started, url: %s" % (uuid, number, url)) + build = self.builds.get(uuid) + if build: + self.log.debug("Found build %s" % build) + build.url = url + build.number = number + else: + self.log.error("Unable to find build %s" % uuid) diff --git a/zuul/lib/__init__.py b/zuul/lib/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zuul/lib/gerrit.py b/zuul/lib/gerrit.py new file mode 100644 index 0000000000..9c72d4436b --- /dev/null +++ b/zuul/lib/gerrit.py @@ -0,0 +1,171 @@ +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading +import select +import json +import time +import subprocess +import Queue +import paramiko +import logging +import pprint + +# TODO: switch this to paramiko? + +class GerritWatcher(threading.Thread): + log = logging.getLogger("gerrit.GerritWatcher") + + def __init__(self, gerrit, username, server, port=29418, keyfile=None): + threading.Thread.__init__(self) + self.username = username + self.keyfile = keyfile + self.server = server + self.port = port + self.proc = None + self.poll = select.poll() + self.gerrit = gerrit + + def _open(self): + self.log.debug("Opening ssh connection to %s" % self.server) + cmd = ['/usr/bin/ssh', '-p', str(self.port)] + if self.keyfile: + cmd += ['-i', self.keyfile] + cmd += ['-l', self.username, self.server, + 'gerrit', 'stream-events'] + self.proc = subprocess.Popen(cmd, + bufsize=1, + stdin=None, + stdout=subprocess.PIPE, + stderr=None, + ) + self.poll.register(self.proc.stdout) + + def _close(self): + self.log.debug("Closing ssh connection") + try: + self.poll.unregister(self.proc.stdout) + except: + pass + try: + self.proc.kill() + except: + pass + self.proc = None + + def _read(self): + l = self.proc.stdout.readline() + data = json.loads(l) + self.log.debug("Received data from Gerrit event stream: \n%s" % pprint.pformat(data)) + self.gerrit.addEvent(data) + + def _listen(self): + while True: + ret = self.poll.poll() + for (fd, event) in ret: + if fd == self.proc.stdout.fileno(): + if event == select.POLLIN: + self._read() + else: + raise Exception("event on ssh connection") + + def _run(self): + try: + if not self.proc: + self._open() + self._listen() + except: + self.log.exception("Exception on ssh event stream:") + self._close() + time.sleep(5) + + def run(self): + while True: + self._run() + +class Gerrit(object): + log = logging.getLogger("gerrit.Gerrit") + + def __init__(self, hostname, username, keyfile=None): + self.username = username + self.hostname = hostname + self.keyfile = keyfile + self.watcher_thread = None + self.event_queue = None + + def startWatching(self): + self.event_queue = Queue.Queue() + self.watcher_thread = GerritWatcher( + self, + self.username, + self.hostname, + keyfile=self.keyfile) + self.watcher_thread.start() + + def addEvent(self, data): + return self.event_queue.put(data) + + def getEvent(self): + return self.event_queue.get() + + def review(self, project, change, message, action={}): + cmd = 'gerrit review --project %s --message "%s"' % ( + project, message) + for k,v in action.items(): + if v == True: + cmd += ' --%s' % k + else: + cmd += ' --%s %s' % (k, v) + cmd += ' %s' % change + out, err = self._ssh(cmd) + return err + + def query(self, change): + cmd = 'gerrit query --format json %s"' % ( + change) + out, err = self._ssh(cmd) + if not out: + return False + lines = out.split('\n') + if not lines: + return False + data = json.loads(lines[0]) + if not data: + return False + self.log.debug("Received data from Gerrit query: \n%s" % ( + pprint.pformat(data))) + return data + + def _ssh(self, command): + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.WarningPolicy()) + client.connect(self.hostname, + username=self.username, + port=29418) + + self.log.debug("SSH command:\n%s" % command) + stdin, stdout, stderr = client.exec_command(command) + + out = stdout.read() + self.log.debug("SSH received stdout:\n%s" % out) + + ret = stdout.channel.recv_exit_status() + self.log.debug("SSH exit status: %s" % ret) + + err = stderr.read() + self.log.debug("SSH received stderr:\n%s" % err) + if ret: + raise Exception("Gerrit error executing %s" % command) + return (out, err) diff --git a/zuul/model.py b/zuul/model.py new file mode 100644 index 0000000000..ac64761d18 --- /dev/null +++ b/zuul/model.py @@ -0,0 +1,323 @@ +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import re + +class ChangeQueue(object): + def __init__(self, queue_name): + self.name = '' + self.queue_name = queue_name + self.projects = [] + self._jobs = set() + self.queue = [] + + def __str__(self): + return '' % (self.queue_name, self.name) + + def getJobs(self): + return self._jobs + + def addProject(self, project): + if project not in self.projects: + self.projects.append(project) + names = [x.name for x in self.projects] + names.sort() + self.name = ', '.join(names) + self._jobs |= set(project.getJobs(self.queue_name)) + + def enqueueChange(self, change): + if self.queue: + self.queue[-1].change_behind = change + change.change_ahead = self.queue[-1] + self.queue.append(change) + change.queue = self + + def dequeueChange(self, change): + if change in self.queue: + self.queue.remove(change) + + def mergeChangeQueue(self, other): + for project in other.projects: + self.addProject(project) + +class Job(object): + def __init__(self, name): + self.name = name + self.failure_message = None + self.success_message = None + self.event_filters = [] + + def __str__(self): + return self.name + + def __repr__(self): + return '' % (self.name) + +class Build(object): + def __init__(self, job, uuid): + self.job = job + self.uuid = uuid + self.status = None + self.url = None + self.number = None + + def __repr__(self): + return '' % (self.uuid, self.job.name) + +class JobTree(object): + """ A JobTree represents an instance of one Job, and holds JobTrees + whose jobs should be run if that Job succeeds. A root node of a + JobTree will have no associated Job. """ + + def __init__(self, job): + self.job = job + self.job_trees = [] + + def addJob(self, job): + if job not in [x.job for x in self.job_trees]: + t = JobTree(job) + self.job_trees.append(t) + return t + + def getJobs(self): + jobs = [] + for x in self.job_trees: + jobs.append(x.job) + jobs.extend(x.getJobs()) + return jobs + + def getJobTreeForJob(self, job): + if self.job == job: + return self + for tree in self.job_trees: + ret = tree.getJobTreeForJob(job) + if ret: + return ret + return None + +class Project(object): + def __init__(self, name): + self.name = name + self.job_trees = {} # Queue -> JobTree + + def __str__(self): + return self.name + + def __repr__(self): + return '' % (self.name) + + def addQueue(self, name): + self.job_trees[name] = JobTree(None) + return self.job_trees[name] + + def hasQueue(self, name): + if self.job_trees.has_key(name): + return True + return False + + def getJobTreeForQueue(self, name): + return self.job_trees.get(name, None) + + def getJobs(self, queue_name): + tree = self.getJobTreeForQueue(queue_name) + if not tree: + return [] + return tree.getJobs() + +class Change(object): + def __init__(self, queue_name, project, number, patchset): + self.queue_name = queue_name + self.project = project + self.number = number + self.patchset = patchset + self.jobs = {} + self.job_urls = {} + self.change_ahead = None + self.change_behind = None + self.running_builds = [] + + def __str__(self): + return '' % (id(self), self.number, self.patchset) + + def formatStatus(self, indent=0): + indent_str = ' '*indent + ret = '' + ret += '%sProject %s change %s,%s\n' % (indent_str, + self.project.name, + self.number, + self.patchset) + for job in self.project.getJobs(self.queue_name): + result = self.jobs.get(job.name) + ret += '%s %s: %s\n' % (indent_str, job.name, result) + if self.change_ahead: + ret += '%sWaiting on:\n' % (indent_str) + ret += self.change_ahead.formatStatus(indent+2) + return ret + + def formatReport(self): + ret = '' + if self.didAllJobsSucceed(): + ret += 'Build successful\n\n' + else: + ret += 'Build failed\n\n' + + for job in self.project.getJobs(self.queue_name): + result = self.jobs.get(job.name) + url = self.job_urls.get(job.name, job.name) + ret += '- %s : %s\n' % (url, result) + return ret + + def resetAllBuilds(self): + self.jobs = {} + self.job_urls = {} + self.running_builds = [] + + def addBuild(self, build): + self.running_builds.append(build) + + def setResult(self, build): + self.running_builds.remove(build) + self.jobs[build.job.name] = build.result + self.job_urls[build.job.name] = build.url + if build.result != 'SUCCESS': + # Get a JobTree from a Job so we can find only its dependent jobs + root = self.project.getJobTreeForQueue(self.queue_name) + tree = root.getJobTreeForJob(build.job) + for job in tree.getJobs(): + self.jobs[job.name] = 'SKIPPED' + + def _findJobsToRun(self, job_trees): + torun = [] + for tree in job_trees: + job = tree.job + if job: + result = self.jobs.get(job.name, None) + else: + # This is a null job tree, run all of its jobs + result = 'SUCCESS' + if not result: + if job not in [b.job for b in self.running_builds]: + torun.append(job) + elif result == 'SUCCESS': + torun.extend(self._findJobsToRun(tree.job_trees)) + return torun + + def findJobsToRun(self): + tree = self.project.getJobTreeForQueue(self.queue_name) + return self._findJobsToRun(tree.job_trees) + + def areAllJobsComplete(self): + tree = self.project.getJobTreeForQueue(self.queue_name) + for job in tree.getJobs(): + if not self.jobs.has_key(job.name): + return False + return True + + def didAllJobsSucceed(self): + for result in self.jobs.values(): + if result != 'SUCCESS': + return False + return True + + def delete(self): + if self.change_behind: + self.change_behind.change_ahead = None + +class TriggerEvent(object): + def __init__(self): + self.data = None + self.type = None + self.project_name = None + self.change_number = None + self.patch_number = None + self.approvals = [] + self.branch = None + self.ref = None + + def __str__(self): + ret = '" % (self.__class__.__name__, self.name) + + def _postConfig(self): + self.log.info("Configured Queue Manager %s" % self.name) + self.log.info(" Events:") + for e in self.event_filters: + self.log.info(" %s" % e) + self.log.info(" Projects:") + def log_jobs(tree, indent=0): + istr = ' '+' '*indent + if tree.job: + efilters = '' + for e in tree.job.event_filters: + efilters += str(e) + if efilters: + efilters = ' '+efilters + self.log.info("%s%s%s" % (istr, repr(tree.job), efilters)) + for x in tree.job_trees: + log_jobs(x, indent+2) + for p in self.sched.projects.values(): + if p.hasQueue(self.name): + self.log.info(" %s" % p) + log_jobs(p.getJobTreeForQueue(self.name)) + if self.success_action: + self.log.info(" On success:") + self.log.info(" %s" % self.success_action) + if self.failure_action: + self.log.info(" On failure:") + self.log.info(" %s" % self.failure_action) + + + def eventMatches(self, event): + for ef in self.event_filters: + print ef + if ef.matches(event): + return True + return False + + + def addChange(self, change): + self.log.debug("Adding change %s" % change) + self.launchJobs(change) + + def launchJobs(self, change): + self.log.debug("Launching jobs for change %s" % change) + for job in change.findJobsToRun(): + self.log.debug("Found job %s for change %s" % (job, change)) + try: + build = self.sched.launcher.launch(job, change) + self.building_jobs[build] = change + self.log.debug("Adding build %s of job %s to change %s" % ( + build, job, change)) + change.addBuild(build) + except: + self.log.exception("Exception while launching job %s for change %s:" % ( + job, change)) + + def onBuildCompleted(self, build): + self.log.debug("Build %s completed" % build) + if not self.building_jobs.has_key(build): + self.log.warning("Build %s not found (may have been canceled)" % ( + build)) + # Or triggered externally, or triggered before zuul started, + # or restarted + return False + change = self.building_jobs[build] + self.log.debug("Found change %s which triggered completed build %s" % ( + change, build)) + + del self.building_jobs[build] + + change.setResult(build) + self.log.info("Change %s status is now:\n %s" % ( + change, change.formatStatus())) + + if change.areAllJobsComplete(): + self.log.debug("All jobs for change %s are complete" % change) + self.possiblyReportChange(change) + else: + # There may be jobs that depended on jobs that are now complete + self.log.debug("All jobs for change %s are not yet complete" % ( + change)) + self.launchJobs(change) + return True + + def possiblyReportChange(self, change): + self.log.debug("Possibly reporting change %s" % change) + self.reportChange(change) + + def reportChange(self, change): + self.log.debug("Reporting change %s" % change) + ret = None + if change.didAllJobsSucceed(): + action = self.success_action + else: + action = self.failure_action + try: + self.log.info("Reporting change %s, action: %s" % ( + change, action)) + ret = self.sched.trigger.report(change, change.formatReport(), + action) + if ret: + self.log.error("Reporting change %s received: %s" % ( + change, ret)) + print ret + except: + self.log.exception("Exception while reporting:") + return ret + +class IndependentQueueManager(BaseQueueManager): + log = logging.getLogger("zuul.IndependentQueueManager") + pass + +class DependentQueueManager(BaseQueueManager): + log = logging.getLogger("zuul.DependentQueueManager") + + def __init__(self, *args, **kwargs): + super(DependentQueueManager, self).__init__(*args, **kwargs) + self.change_queues = [] + + def _postConfig(self): + super(DependentQueueManager, self)._postConfig() + self.buildChangeQueues() + + def buildChangeQueues(self): + self.log.debug("Building shared change queues") + change_queues = [] + + for project in self.sched.projects.values(): + if project.hasQueue(self.name): + change_queue = ChangeQueue(self.name) + change_queue.addProject(project) + change_queues.append(change_queue) + self.log.debug("Created queue: %s" % change_queue) + + self.log.debug("Combining shared queues") + new_change_queues = [] + for a in change_queues: + merged_a = False + for b in new_change_queues: + if not a.getJobs().isdisjoint(b.getJobs()): + self.log.debug("Merging queue %s into %s" % (a, b)) + b.mergeChangeQueue(a) + merged_a = True + break # this breaks out of 'for b' and continues 'for a' + if not merged_a: + self.log.debug("Keeping queue %s" % (a)) + new_change_queues.append(a) + + self.change_queues = new_change_queues + self.log.info(" Shared change queues:") + for x in self.change_queues: + self.log.info(" %s" % x) + + def getQueue(self, project): + for queue in self.change_queues: + if project in queue.projects: + return queue + self.log.error("Unable to find change queue for project %s" % project) + + def addChange(self, change): + self.log.debug("Adding change %s" % change) + change_queue = self.getQueue(change.project) + self.log.debug("Adding change %s to queue %s" % (change, change_queue)) + change_queue.enqueueChange(change) + super(DependentQueueManager, self).addChange(change) + + def _getDependentChanges(self, change): + changes = [] + while change.change_ahead: + changes.append(change.change_ahead) + change = change.change_ahead + self.log.info("Change %s depends on changes %s" % (change, changes)) + return changes + + def launchJobs(self, change): + self.log.debug("Launching jobs for change %s" % change) + dependent_changes = self._getDependentChanges(change) + for job in change.findJobsToRun(): + self.log.debug("Found job %s for change %s" % (job, change)) + try: + build = self.sched.launcher.launch(job, change, + dependent_changes) + self.building_jobs[build] = change + self.log.debug("Adding build %s of job %s to change %s" % ( + build, job, change)) + change.addBuild(build) + except: + self.log.exception("Exception while launching job %s for change %s:" % ( + job, change)) + if change.change_behind: + self.log.debug("Launching jobs for change %s, behind change %s" % ( + change.change_behind, change)) + self.launchJobs(change.change_behind) + + def cancelJobs(self, change): + self.log.debug("Cancel jobs for change %s" % change) + to_remove = [] + change.resetAllBuilds() + for build, build_change in self.building_jobs.items(): + if build_change == change: + self.log.debug("Found build %s for change %s to cancel" % ( + build, change)) + try: + self.sched.launcher.cancel(build) + except: + self.log.exception("Exception while canceling build %s for change %s" % ( + build, change)) + to_remove.append(build) + for build in to_remove: + self.log.debug("Removing build %s from running builds" % build) + del self.building_jobs[build] + if change.change_behind: + self.log.debug("Canceling jobs for change %s, behind change %s" % ( + change.change_behind, change)) + self.cancelJobs(change.change_behind) + + def possiblyReportChange(self, change): + self.log.debug("Possibly reporting change %s" % change) + if not change.change_ahead: + self.log.debug("Change %s is at the front of the queue, reporting" % ( + change)) + ret = self.reportChange(change) + self.log.debug("Removing reported change %s from queue" % change) + change.delete() + change.queue.dequeueChange(change) + merged = (not ret) + if merged: + merged = self.sched.trigger.isMerged(change) + succeeded = change.didAllJobsSucceed() + self.log.info("Reported change %s status: all-succeeded: %s, merged: %s" % ( + change, succeeded, merged)) + + if not (succeeded and merged): + self.log.debug("Reported change %s failed tests or failed to merge" % ( + change)) + # The merge or test failed, re-run all jobs behind this one + if change.change_behind: + self.log.info("Canceling/relaunching jobs for change %s behind failed change %s" % ( + change.change_behind, change)) + self.cancelJobs(change.change_behind) + self.launchJobs(change.change_behind) + # If the change behind this is ready, notify + if (change.change_behind and + change.change_behind.areAllJobsComplete()): + self.log.info("Change %s behind change %s is ready, possibly reporting" % ( + change.change_behind, change)) + self.possiblyReportChange(change.change_behind) diff --git a/zuul/trigger/__init__.py b/zuul/trigger/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py new file mode 100644 index 0000000000..8708207270 --- /dev/null +++ b/zuul/trigger/gerrit.py @@ -0,0 +1,92 @@ +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading +import logging +from zuul.lib import gerrit +from zuul.model import TriggerEvent + +class GerritEventConnector(threading.Thread): + """Move events from Gerrit to the scheduler.""" + + log = logging.getLogger("zuul.GerritEventConnector") + + def __init__(self, gerrit, sched): + super(GerritEventConnector, self).__init__() + self.gerrit = gerrit + self.sched = sched + + + def _handleEvent(self): + data = self.gerrit.getEvent() + event = TriggerEvent() + event.type = data.get('type') + change = data.get('change') + if change: + event.project_name = change.get('project') + event.branch = change.get('branch') + event.change_number = change.get('number') + patchset = data.get('patchSet') + if patchset: + event.patch_number = patchset.get('number') + event.approvals = data.get('approvals') + self.sched.addEvent(event) + + + def run(self): + while True: + try: + self._handleEvent() + except: + self.log.exception("Exception moving Gerrit event:") + + +class Gerrit(object): + log = logging.getLogger("zuul.Gerrit") + + def __init__(self, config, sched): + self.sched = sched + server = config.get('gerrit', 'server') + user = config.get('gerrit', 'user') + if config.has_option('gerrit', 'sshkey'): + sshkey = config.get('gerrit', 'sshkey') + else: + sshkey = None + self.gerrit = gerrit.Gerrit(server, user, sshkey) + self.gerrit.startWatching() + self.gerrit_connector = GerritEventConnector( + self.gerrit, sched) + self.gerrit_connector.start() + + + def report(self, change, message, action): + self.log.debug("Report change %s, action %s, message: %s" % + (change, action, message)) + changeid = '%s,%s' % (change.number, change.patchset) + return self.gerrit.review(change.project.name, changeid, + message, action) + + + def isMerged(self, change): + self.log.debug("Checking if change %s is merged", change) + data = self.gerrit.query(change.number) + if not data: + return False + status = data.get('status') + if not status: + return False + self.log.debug("Change %s status: %s" % (change, status)) + if status == 'MERGED' or status == 'SUBMITTED': + return True +