Add MQTT support to the gearman worker

This commit adds support to the gearman worker for publishing an mqtt
message when processing a gearman job succeeds or fails. By default this
is disabled since it requires extra configuration to tell the worker how
to talk to the mqtt broker.

Right now the payload of the message is just the build_uuid and whether
it was written to the db or not. Eventually some details about the
subunit2sql db entry will be added to the payload. But this requires
changes to either subunit2sql or how the worker calls the subunit2sql
api before it is feasible.

Change-Id: Ibd13b737eccf52863a69d20843cb7d50242f7bb9
This commit is contained in:
Matthew Treinish 2017-03-23 14:43:41 -04:00
parent 9b5753123e
commit c06cdc8a8a
No known key found for this signature in database
GPG Key ID: FD12A0F214C9E177
2 changed files with 72 additions and 2 deletions

View File

@ -27,6 +27,7 @@ import time
import urllib2
import yaml
import paho.mqtt.publish as publish
from subunit2sql import read_subunit
from subunit2sql import shell
@ -54,7 +55,7 @@ class FilterException(Exception):
class SubunitRetriever(object):
def __init__(self, gearman_worker, filters, subunit2sql_conf):
def __init__(self, gearman_worker, filters, subunit2sql_conf, mqtt=None):
super(SubunitRetriever, self).__init__()
self.gearman_worker = gearman_worker
self.filters = filters
@ -64,6 +65,7 @@ class SubunitRetriever(object):
extensions = shell.get_extensions()
shell.parse_args([], [self.config])
self.extra_targets = shell.get_targets(extensions)
self.mqtt = mqtt
def _write_to_db(self, subunit):
subunit_v2 = subunit.pop('subunit')
@ -132,9 +134,23 @@ class SubunitRetriever(object):
out_event["subunit"] = subunit_io
self._write_to_db(out_event)
job.sendWorkComplete()
if self.mqtt:
msg = json.dumps({
'build_uuid': out_event.get('build_uuid'),
'status': 'success',
})
self.mqtt.publish_single(msg, out_event.get('project'),
out_event.get('build_change'))
except Exception as e:
logging.exception("Exception handling log event.")
job.sendWorkException(str(e).encode('utf-8'))
if self.mqtt:
msg = json.dumps({
'build_uuid': out_event.get('build_uuid'),
'status': 'failed',
})
self.mqtt.publish_single(msg, out_event.get('project'),
out_event.get('build_change'))
def _retrieve_subunit_v2(self, source_url, retry):
# TODO (clarkb): This should check the content type instead of file
@ -200,12 +216,59 @@ class SubunitRetriever(object):
return gzipped, raw_buf
class PushMQTT(object):
def __init__(self, hostname, base_topic, port=1883, client_id=None,
keepalive=60, will=None, auth=None, tls=None, qos=0):
self.hostname = hostname
self.port = port
self.client_id = client_id
self.keepalive = 60
self.will = will
self.auth = auth
self.tls = tls
self.qos = qos
self.base_topic = base_topic
def _generate_topic(self, project, job_id):
return '/'.join([self.base_topic, project, job_id])
def publish_single(self, msg, project, job_id):
topic = self._generate_topic(project, job_id)
publish.single(topic, msg, hostname=self.hostname,
port=self.port, client_id=self.client_id,
keepalive=self.keepalive, will=self.will,
auth=self.auth, tls=self.tls, qos=self.qos)
class Server(object):
def __init__(self, config, debuglog):
# Config init.
self.config = config
self.gearman_host = self.config['gearman-host']
self.gearman_port = self.config['gearman-port']
mqtt_host = getattr(self.config, 'mqtt-host')
mqtt_port = getattr(self.config, 'mqtt-port', 1883)
mqtt_user = getattr(self.config, 'mqtt-user')
mqtt_pass = getattr(self.config, 'mqtt-pass')
mqtt_topic = getattr(self.config, 'mqtt-topic', 'gearman-subunit')
mqtt_ca_certs = getattr(self.config, 'mqtt-ca-certs')
mqtt_certfile = getattr(self.config, 'mqtt-certfile')
mqtt_keyfile = getattr(self.config, 'mqtt-keyfile')
self.mqtt = None
if mqtt_host:
auth = None
if mqtt_user:
auth = {'username': mqtt_user}
if mqtt_pass:
auth['password'] = mqtt_pass
tls = None
if mqtt_ca_certs:
tls = {'ca_certs': mqtt_ca_certs, 'certfile': mqtt_certfile,
'keyfile': mqtt_keyfile}
self.mqtt = PushMQTT(mqtt_host, mqtt_topic, port=mqtt_port,
auth=auth, tls=tls)
# Pythong logging output file.
self.debuglog = debuglog
self.retriever = None
@ -229,7 +292,8 @@ class Server(object):
subunit2sql_conf = self.config['config']
self.retriever = SubunitRetriever(gearman_worker,
self.filter_factories,
subunit2sql_conf)
subunit2sql_conf,
mqtt=self.mqtt)
def main(self):
self.setup_retriever()

View File

@ -33,6 +33,12 @@ class subunit2sql (
require => Class['pip'],
}
package {'paho-mqtt':
ensure => latest,
provider => openstack_pip,
require => Class['pip'],
}
package { 'subunit2sql':
ensure => latest,
provider => openstack_pip,