diff --git a/files/subunit-gearman-worker.py b/files/subunit-gearman-worker.py index 3c7001f..3a7be00 100644 --- a/files/subunit-gearman-worker.py +++ b/files/subunit-gearman-worker.py @@ -27,6 +27,7 @@ import time import urllib2 import yaml +import paho.mqtt.publish as publish from subunit2sql import read_subunit from subunit2sql import shell @@ -54,7 +55,7 @@ class FilterException(Exception): class SubunitRetriever(object): - def __init__(self, gearman_worker, filters, subunit2sql_conf): + def __init__(self, gearman_worker, filters, subunit2sql_conf, mqtt=None): super(SubunitRetriever, self).__init__() self.gearman_worker = gearman_worker self.filters = filters @@ -64,6 +65,7 @@ class SubunitRetriever(object): extensions = shell.get_extensions() shell.parse_args([], [self.config]) self.extra_targets = shell.get_targets(extensions) + self.mqtt = mqtt def _write_to_db(self, subunit): subunit_v2 = subunit.pop('subunit') @@ -132,9 +134,23 @@ class SubunitRetriever(object): out_event["subunit"] = subunit_io self._write_to_db(out_event) job.sendWorkComplete() + if self.mqtt: + msg = json.dumps({ + 'build_uuid': out_event.get('build_uuid'), + 'status': 'success', + }) + self.mqtt.publish_single(msg, out_event.get('project'), + out_event.get('build_change')) except Exception as e: logging.exception("Exception handling log event.") job.sendWorkException(str(e).encode('utf-8')) + if self.mqtt: + msg = json.dumps({ + 'build_uuid': out_event.get('build_uuid'), + 'status': 'failed', + }) + self.mqtt.publish_single(msg, out_event.get('project'), + out_event.get('build_change')) def _retrieve_subunit_v2(self, source_url, retry): # TODO (clarkb): This should check the content type instead of file @@ -200,12 +216,59 @@ class SubunitRetriever(object): return gzipped, raw_buf +class PushMQTT(object): + def __init__(self, hostname, base_topic, port=1883, client_id=None, + keepalive=60, will=None, auth=None, tls=None, qos=0): + self.hostname = hostname + self.port = port + self.client_id = client_id + self.keepalive = 60 + self.will = will + self.auth = auth + self.tls = tls + self.qos = qos + self.base_topic = base_topic + + def _generate_topic(self, project, job_id): + return '/'.join([self.base_topic, project, job_id]) + + def publish_single(self, msg, project, job_id): + topic = self._generate_topic(project, job_id) + publish.single(topic, msg, hostname=self.hostname, + port=self.port, client_id=self.client_id, + keepalive=self.keepalive, will=self.will, + auth=self.auth, tls=self.tls, qos=self.qos) + + class Server(object): def __init__(self, config, debuglog): # Config init. self.config = config self.gearman_host = self.config['gearman-host'] self.gearman_port = self.config['gearman-port'] + mqtt_host = getattr(self.config, 'mqtt-host') + mqtt_port = getattr(self.config, 'mqtt-port', 1883) + mqtt_user = getattr(self.config, 'mqtt-user') + mqtt_pass = getattr(self.config, 'mqtt-pass') + mqtt_topic = getattr(self.config, 'mqtt-topic', 'gearman-subunit') + mqtt_ca_certs = getattr(self.config, 'mqtt-ca-certs') + mqtt_certfile = getattr(self.config, 'mqtt-certfile') + mqtt_keyfile = getattr(self.config, 'mqtt-keyfile') + self.mqtt = None + if mqtt_host: + auth = None + if mqtt_user: + auth = {'username': mqtt_user} + if mqtt_pass: + auth['password'] = mqtt_pass + tls = None + if mqtt_ca_certs: + tls = {'ca_certs': mqtt_ca_certs, 'certfile': mqtt_certfile, + 'keyfile': mqtt_keyfile} + + self.mqtt = PushMQTT(mqtt_host, mqtt_topic, port=mqtt_port, + auth=auth, tls=tls) + # Pythong logging output file. self.debuglog = debuglog self.retriever = None @@ -229,7 +292,8 @@ class Server(object): subunit2sql_conf = self.config['config'] self.retriever = SubunitRetriever(gearman_worker, self.filter_factories, - subunit2sql_conf) + subunit2sql_conf, + mqtt=self.mqtt) def main(self): self.setup_retriever() diff --git a/manifests/init.pp b/manifests/init.pp index 5e2302d..743707d 100644 --- a/manifests/init.pp +++ b/manifests/init.pp @@ -33,6 +33,12 @@ class subunit2sql ( require => Class['pip'], } + package {'paho-mqtt': + ensure => latest, + provider => openstack_pip, + require => Class['pip'], + } + package { 'subunit2sql': ensure => latest, provider => openstack_pip,