From c06cdc8a8a08312727bca725942ae69edc2fa812 Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Thu, 23 Mar 2017 14:43:41 -0400 Subject: [PATCH] Add MQTT support to the gearman worker This commit adds support to the gearman worker for publishing an mqtt message when processing a gearman job succeeds or fails. By default this is disabled since it requires extra configuration to tell the worker how to talk to the mqtt broker. Right now the payload of the message is just the build_uuid and whether it was written to the db or not. Eventually some details about the subunit2sql db entry will be added to the payload. But this requires changes to either subunit2sql or how the worker calls the subunit2sql api before it is feasible. Change-Id: Ibd13b737eccf52863a69d20843cb7d50242f7bb9 --- files/subunit-gearman-worker.py | 68 ++++++++++++++++++++++++++++++++- manifests/init.pp | 6 +++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/files/subunit-gearman-worker.py b/files/subunit-gearman-worker.py index 3c7001f..3a7be00 100644 --- a/files/subunit-gearman-worker.py +++ b/files/subunit-gearman-worker.py @@ -27,6 +27,7 @@ import time import urllib2 import yaml +import paho.mqtt.publish as publish from subunit2sql import read_subunit from subunit2sql import shell @@ -54,7 +55,7 @@ class FilterException(Exception): class SubunitRetriever(object): - def __init__(self, gearman_worker, filters, subunit2sql_conf): + def __init__(self, gearman_worker, filters, subunit2sql_conf, mqtt=None): super(SubunitRetriever, self).__init__() self.gearman_worker = gearman_worker self.filters = filters @@ -64,6 +65,7 @@ class SubunitRetriever(object): extensions = shell.get_extensions() shell.parse_args([], [self.config]) self.extra_targets = shell.get_targets(extensions) + self.mqtt = mqtt def _write_to_db(self, subunit): subunit_v2 = subunit.pop('subunit') @@ -132,9 +134,23 @@ class SubunitRetriever(object): out_event["subunit"] = subunit_io self._write_to_db(out_event) job.sendWorkComplete() + if self.mqtt: + msg = json.dumps({ + 'build_uuid': out_event.get('build_uuid'), + 'status': 'success', + }) + self.mqtt.publish_single(msg, out_event.get('project'), + out_event.get('build_change')) except Exception as e: logging.exception("Exception handling log event.") job.sendWorkException(str(e).encode('utf-8')) + if self.mqtt: + msg = json.dumps({ + 'build_uuid': out_event.get('build_uuid'), + 'status': 'failed', + }) + self.mqtt.publish_single(msg, out_event.get('project'), + out_event.get('build_change')) def _retrieve_subunit_v2(self, source_url, retry): # TODO (clarkb): This should check the content type instead of file @@ -200,12 +216,59 @@ class SubunitRetriever(object): return gzipped, raw_buf +class PushMQTT(object): + def __init__(self, hostname, base_topic, port=1883, client_id=None, + keepalive=60, will=None, auth=None, tls=None, qos=0): + self.hostname = hostname + self.port = port + self.client_id = client_id + self.keepalive = 60 + self.will = will + self.auth = auth + self.tls = tls + self.qos = qos + self.base_topic = base_topic + + def _generate_topic(self, project, job_id): + return '/'.join([self.base_topic, project, job_id]) + + def publish_single(self, msg, project, job_id): + topic = self._generate_topic(project, job_id) + publish.single(topic, msg, hostname=self.hostname, + port=self.port, client_id=self.client_id, + keepalive=self.keepalive, will=self.will, + auth=self.auth, tls=self.tls, qos=self.qos) + + class Server(object): def __init__(self, config, debuglog): # Config init. self.config = config self.gearman_host = self.config['gearman-host'] self.gearman_port = self.config['gearman-port'] + mqtt_host = getattr(self.config, 'mqtt-host') + mqtt_port = getattr(self.config, 'mqtt-port', 1883) + mqtt_user = getattr(self.config, 'mqtt-user') + mqtt_pass = getattr(self.config, 'mqtt-pass') + mqtt_topic = getattr(self.config, 'mqtt-topic', 'gearman-subunit') + mqtt_ca_certs = getattr(self.config, 'mqtt-ca-certs') + mqtt_certfile = getattr(self.config, 'mqtt-certfile') + mqtt_keyfile = getattr(self.config, 'mqtt-keyfile') + self.mqtt = None + if mqtt_host: + auth = None + if mqtt_user: + auth = {'username': mqtt_user} + if mqtt_pass: + auth['password'] = mqtt_pass + tls = None + if mqtt_ca_certs: + tls = {'ca_certs': mqtt_ca_certs, 'certfile': mqtt_certfile, + 'keyfile': mqtt_keyfile} + + self.mqtt = PushMQTT(mqtt_host, mqtt_topic, port=mqtt_port, + auth=auth, tls=tls) + # Pythong logging output file. self.debuglog = debuglog self.retriever = None @@ -229,7 +292,8 @@ class Server(object): subunit2sql_conf = self.config['config'] self.retriever = SubunitRetriever(gearman_worker, self.filter_factories, - subunit2sql_conf) + subunit2sql_conf, + mqtt=self.mqtt) def main(self): self.setup_retriever() diff --git a/manifests/init.pp b/manifests/init.pp index 5e2302d..743707d 100644 --- a/manifests/init.pp +++ b/manifests/init.pp @@ -33,6 +33,12 @@ class subunit2sql ( require => Class['pip'], } + package {'paho-mqtt': + ensure => latest, + provider => openstack_pip, + require => Class['pip'], + } + package { 'subunit2sql': ensure => latest, provider => openstack_pip,