Add MQTT support to the gearman worker
This commit adds support to the gearman worker for publishing an mqtt message when processing a gearman job succeeds or fails. By default this is disabled since it requires extra configuration to tell the worker how to talk to the mqtt broker. Right now the payload of the message is just the build_uuid and whether it was written to the db or not. Eventually some details about the subunit2sql db entry will be added to the payload. But this requires changes to either subunit2sql or how the worker calls the subunit2sql api before it is feasible. Change-Id: Ibd13b737eccf52863a69d20843cb7d50242f7bb9
This commit is contained in:
parent
9b5753123e
commit
c06cdc8a8a
|
@ -27,6 +27,7 @@ import time
|
|||
import urllib2
|
||||
import yaml
|
||||
|
||||
import paho.mqtt.publish as publish
|
||||
from subunit2sql import read_subunit
|
||||
from subunit2sql import shell
|
||||
|
||||
|
@ -54,7 +55,7 @@ class FilterException(Exception):
|
|||
|
||||
|
||||
class SubunitRetriever(object):
|
||||
def __init__(self, gearman_worker, filters, subunit2sql_conf):
|
||||
def __init__(self, gearman_worker, filters, subunit2sql_conf, mqtt=None):
|
||||
super(SubunitRetriever, self).__init__()
|
||||
self.gearman_worker = gearman_worker
|
||||
self.filters = filters
|
||||
|
@ -64,6 +65,7 @@ class SubunitRetriever(object):
|
|||
extensions = shell.get_extensions()
|
||||
shell.parse_args([], [self.config])
|
||||
self.extra_targets = shell.get_targets(extensions)
|
||||
self.mqtt = mqtt
|
||||
|
||||
def _write_to_db(self, subunit):
|
||||
subunit_v2 = subunit.pop('subunit')
|
||||
|
@ -132,9 +134,23 @@ class SubunitRetriever(object):
|
|||
out_event["subunit"] = subunit_io
|
||||
self._write_to_db(out_event)
|
||||
job.sendWorkComplete()
|
||||
if self.mqtt:
|
||||
msg = json.dumps({
|
||||
'build_uuid': out_event.get('build_uuid'),
|
||||
'status': 'success',
|
||||
})
|
||||
self.mqtt.publish_single(msg, out_event.get('project'),
|
||||
out_event.get('build_change'))
|
||||
except Exception as e:
|
||||
logging.exception("Exception handling log event.")
|
||||
job.sendWorkException(str(e).encode('utf-8'))
|
||||
if self.mqtt:
|
||||
msg = json.dumps({
|
||||
'build_uuid': out_event.get('build_uuid'),
|
||||
'status': 'failed',
|
||||
})
|
||||
self.mqtt.publish_single(msg, out_event.get('project'),
|
||||
out_event.get('build_change'))
|
||||
|
||||
def _retrieve_subunit_v2(self, source_url, retry):
|
||||
# TODO (clarkb): This should check the content type instead of file
|
||||
|
@ -200,12 +216,59 @@ class SubunitRetriever(object):
|
|||
return gzipped, raw_buf
|
||||
|
||||
|
||||
class PushMQTT(object):
|
||||
def __init__(self, hostname, base_topic, port=1883, client_id=None,
|
||||
keepalive=60, will=None, auth=None, tls=None, qos=0):
|
||||
self.hostname = hostname
|
||||
self.port = port
|
||||
self.client_id = client_id
|
||||
self.keepalive = 60
|
||||
self.will = will
|
||||
self.auth = auth
|
||||
self.tls = tls
|
||||
self.qos = qos
|
||||
self.base_topic = base_topic
|
||||
|
||||
def _generate_topic(self, project, job_id):
|
||||
return '/'.join([self.base_topic, project, job_id])
|
||||
|
||||
def publish_single(self, msg, project, job_id):
|
||||
topic = self._generate_topic(project, job_id)
|
||||
publish.single(topic, msg, hostname=self.hostname,
|
||||
port=self.port, client_id=self.client_id,
|
||||
keepalive=self.keepalive, will=self.will,
|
||||
auth=self.auth, tls=self.tls, qos=self.qos)
|
||||
|
||||
|
||||
class Server(object):
|
||||
def __init__(self, config, debuglog):
|
||||
# Config init.
|
||||
self.config = config
|
||||
self.gearman_host = self.config['gearman-host']
|
||||
self.gearman_port = self.config['gearman-port']
|
||||
mqtt_host = getattr(self.config, 'mqtt-host')
|
||||
mqtt_port = getattr(self.config, 'mqtt-port', 1883)
|
||||
mqtt_user = getattr(self.config, 'mqtt-user')
|
||||
mqtt_pass = getattr(self.config, 'mqtt-pass')
|
||||
mqtt_topic = getattr(self.config, 'mqtt-topic', 'gearman-subunit')
|
||||
mqtt_ca_certs = getattr(self.config, 'mqtt-ca-certs')
|
||||
mqtt_certfile = getattr(self.config, 'mqtt-certfile')
|
||||
mqtt_keyfile = getattr(self.config, 'mqtt-keyfile')
|
||||
self.mqtt = None
|
||||
if mqtt_host:
|
||||
auth = None
|
||||
if mqtt_user:
|
||||
auth = {'username': mqtt_user}
|
||||
if mqtt_pass:
|
||||
auth['password'] = mqtt_pass
|
||||
tls = None
|
||||
if mqtt_ca_certs:
|
||||
tls = {'ca_certs': mqtt_ca_certs, 'certfile': mqtt_certfile,
|
||||
'keyfile': mqtt_keyfile}
|
||||
|
||||
self.mqtt = PushMQTT(mqtt_host, mqtt_topic, port=mqtt_port,
|
||||
auth=auth, tls=tls)
|
||||
|
||||
# Pythong logging output file.
|
||||
self.debuglog = debuglog
|
||||
self.retriever = None
|
||||
|
@ -229,7 +292,8 @@ class Server(object):
|
|||
subunit2sql_conf = self.config['config']
|
||||
self.retriever = SubunitRetriever(gearman_worker,
|
||||
self.filter_factories,
|
||||
subunit2sql_conf)
|
||||
subunit2sql_conf,
|
||||
mqtt=self.mqtt)
|
||||
|
||||
def main(self):
|
||||
self.setup_retriever()
|
||||
|
|
|
@ -33,6 +33,12 @@ class subunit2sql (
|
|||
require => Class['pip'],
|
||||
}
|
||||
|
||||
package {'paho-mqtt':
|
||||
ensure => latest,
|
||||
provider => openstack_pip,
|
||||
require => Class['pip'],
|
||||
}
|
||||
|
||||
package { 'subunit2sql':
|
||||
ensure => latest,
|
||||
provider => openstack_pip,
|
||||
|
|
Loading…
Reference in New Issue