Add MQTT support to the gearman worker

This commit adds support to the gearman worker for publishing an mqtt
message when processing a gearman job succeeds or fails. It also adds
a message for when the processor passes the logs to logstash either via
stdout or over a socket. By default this is disabled since it requires
extra configuration to tell the worker how to talk to the mqtt broker.

Depends-On: Id0308d2d4d1843fcca73f459cffa2ae944bebd0c
Change-Id: I43be3562780c61591ebede61f3a8929e8217f199
This commit is contained in:
Matthew Treinish 2017-03-23 15:27:04 -04:00
parent 3aa0ef4305
commit b1a4357058
No known key found for this signature in database
GPG Key ID: FD12A0F214C9E177
2 changed files with 113 additions and 7 deletions

View File

@ -33,6 +33,7 @@ import time
import urllib2
import yaml
import paho.mqtt.publish as publish
try:
import daemon.pidlockfile as pidfile_mod
@ -123,11 +124,12 @@ class CRM114FilterFactory(object):
class LogRetriever(threading.Thread):
def __init__(self, gearman_worker, filters, logq):
def __init__(self, gearman_worker, filters, logq, mqtt=None):
threading.Thread.__init__(self)
self.gearman_worker = gearman_worker
self.filters = filters
self.logq = logq
self.mqtt = mqtt
def run(self):
while True:
@ -137,6 +139,8 @@ class LogRetriever(threading.Thread):
logging.exception("Exception retrieving log event.")
def _handle_event(self):
fields = {}
source_url = ''
job = self.gearman_worker.getJob()
try:
arguments = json.loads(job.arguments.decode('utf-8'))
@ -180,9 +184,27 @@ class LogRetriever(threading.Thread):
for f in all_filters:
f.close()
job.sendWorkComplete()
if self.mqtt:
msg = json.dumps({
'build_uuid': fields.get('build_uuid'),
'source_url': source_url,
'status': 'success',
})
self.mqtt.publish_single(msg, fields.get('project'),
fields.get('build_change'),
'retrieve_logs')
except Exception as e:
logging.exception("Exception handling log event.")
job.sendWorkException(str(e).encode('utf-8'))
if self.mqtt:
msg = json.dumps({
'build_uuid': fields.get('build_uuid'),
'source_url': source_url,
'status': 'failure',
})
self.mqtt.publish_single(msg, fields.get('project'),
fields.get('build_change'),
'retrieve_logs')
def _retrieve_log(self, source_url, retry):
# TODO (clarkb): This should check the content type instead of file
@ -277,9 +299,10 @@ class LogRetriever(threading.Thread):
class StdOutLogProcessor(object):
def __init__(self, logq, pretty_print=False):
def __init__(self, logq, pretty_print=False, mqtt=None):
self.logq = logq
self.pretty_print = pretty_print
self.mqtt = mqtt
def handle_log_event(self):
log = self.logq.get()
@ -290,16 +313,26 @@ class StdOutLogProcessor(object):
print(json.dumps(log))
# Push each log event through to keep logstash up to date.
sys.stdout.flush()
if self.mqtt:
msg = json.dumps({
'build_uuid': log.get('build_uuid'),
'source_url': log.get('log_url'),
'status': 'success',
})
self.mqtt.publish_single(msg, log.get('project'),
log.get('build_change'),
'logs_to_logstash')
class INETLogProcessor(object):
socket_type = None
def __init__(self, logq, host, port):
def __init__(self, logq, host, port, mqtt=None):
self.logq = logq
self.host = host
self.port = port
self.socket = None
self.mqtt = mqtt
def _connect_socket(self):
logging.debug("Creating socket.")
@ -312,6 +345,15 @@ class INETLogProcessor(object):
if self.socket is None:
self._connect_socket()
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
if self.mqtt:
msg = json.dumps({
'build_uuid': log.get('build_uuid'),
'source_url': log.get('log_url'),
'status': 'success',
})
self.mqtt.publish_single(msg, log.get('project'),
log.get('build_change'),
'logs_to_logstash')
except:
logging.exception("Exception sending INET event.")
# Logstash seems to take about a minute to start again. Wait 90
@ -321,6 +363,14 @@ class INETLogProcessor(object):
semi_busy_wait(90)
self._connect_socket()
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
if self.mqtt:
msg = json.dumps({
'build_uuid': log.get('build_uuid'),
'status': 'success',
})
self.mqtt.publish_single(msg, log.get('project'),
log.get('build_change'),
'logs_to_logstash')
class UDPLogProcessor(INETLogProcessor):
@ -331,6 +381,30 @@ class TCPLogProcessor(INETLogProcessor):
socket_type = socket.SOCK_STREAM
class PushMQTT(object):
def __init__(self, hostname, base_topic, port=1883, client_id=None,
keepalive=60, will=None, auth=None, tls=None, qos=0):
self.hostname = hostname
self.port = port
self.client_id = client_id
self.keepalive = 60
self.will = will
self.auth = auth
self.tls = tls
self.qos = qos
self.base_topic = base_topic
def _generate_topic(self, project, job_id, action):
return '/'.join([self.base_topic, project, job_id, action])
def publish_single(self, msg, project, job_id, action):
topic = _generate_topic(project, job_id)
publish.single(topic, msg, hostname=self.hostname,
port=self.port, client_id=self.client_id,
keepalive=self.keepalive, will=self.will,
auth=self.auth, tls=self.tls, qos=self.qos)
class Server(object):
def __init__(self, config, debuglog):
# Config init.
@ -340,6 +414,14 @@ class Server(object):
self.output_host = self.config['output-host']
self.output_port = self.config['output-port']
self.output_mode = self.config['output-mode']
mqtt_host = self.config.get('mqtt-host')
mqtt_port = self.config.get('mqtt-port', 1883)
mqtt_user = self.config.get('mqtt-user')
mqtt_pass = self.config.get('mqtt-pass')
mqtt_topic = self.configget('mqtt-topic', 'gearman-subunit')
mqtt_ca_certs = self.config.get('mqtt-ca-certs')
mqtt_certfile = self.config.get('mqtt-certfile')
mqtt_keyfile = self.config.get('mqtt-keyfile')
# Pythong logging output file.
self.debuglog = debuglog
self.retriever = None
@ -351,6 +433,21 @@ class Server(object):
if crmscript and crmdata:
self.filter_factories.append(
CRM114FilterFactory(crmscript, crmdata))
# Setup MQTT
self.mqtt = None
if mqtt_host:
auth = None
if mqtt_user:
auth = {'username': mqtt_user}
if mqtt_pass:
auth['password'] = mqtt_pass
tls = None
if mqtt_ca_certs:
tls = {'ca_certs': mqtt_ca_certs, 'certfile': mqtt_certfile,
'keyfile': mqtt_keyfile}
self.mqtt = PushMQTT(mqtt_host, mqtt_topic, port=mqtt_port,
auth=auth, tls=tls)
def setup_logging(self):
if self.debuglog:
@ -382,21 +479,23 @@ class Server(object):
self.gearman_port)
gearman_worker.registerFunction(b'push-log')
self.retriever = LogRetriever(gearman_worker, self.filter_factories,
self.logqueue)
self.logqueue, mqtt=self.mqtt)
def setup_processor(self):
if self.output_mode == "tcp":
self.processor = TCPLogProcessor(self.logqueue,
self.output_host,
self.output_port)
self.output_port,
mqtt=self.mqtt)
elif self.output_mode == "udp":
self.processor = UDPLogProcessor(self.logqueue,
self.output_host,
self.output_port)
self.output_port,
mqtt=self.mqtt)
else:
# Note this processor will not work if the process is run as a
# daemon. You must use the --foreground option.
self.processor = StdOutLogProcessor(self.logqueue)
self.processor = StdOutLogProcessor(self.logqueue, mqtt=self.mqtt)
def main(self):
self.setup_retriever()

View File

@ -76,6 +76,13 @@ class log_processor (
provider => openstack_pip,
require => Class['pip'],
}
if ! defined(Package['paho-mqtt']) {
package { 'paho-mqtt':
ensure => latest,
provider => openstack_pip,
require => Class['pip'],
}
}
file { '/var/lib/crm114':
ensure => directory,