Add MQTT support to the gearman worker
This commit adds support to the gearman worker for publishing an mqtt message when processing a gearman job succeeds or fails. It also adds a message for when the processor passes the logs to logstash either via stdout or over a socket. By default this is disabled since it requires extra configuration to tell the worker how to talk to the mqtt broker. Depends-On: Id0308d2d4d1843fcca73f459cffa2ae944bebd0c Change-Id: I43be3562780c61591ebede61f3a8929e8217f199
This commit is contained in:
parent
3aa0ef4305
commit
b1a4357058
|
@ -33,6 +33,7 @@ import time
|
|||
import urllib2
|
||||
import yaml
|
||||
|
||||
import paho.mqtt.publish as publish
|
||||
|
||||
try:
|
||||
import daemon.pidlockfile as pidfile_mod
|
||||
|
@ -123,11 +124,12 @@ class CRM114FilterFactory(object):
|
|||
|
||||
|
||||
class LogRetriever(threading.Thread):
|
||||
def __init__(self, gearman_worker, filters, logq):
|
||||
def __init__(self, gearman_worker, filters, logq, mqtt=None):
|
||||
threading.Thread.__init__(self)
|
||||
self.gearman_worker = gearman_worker
|
||||
self.filters = filters
|
||||
self.logq = logq
|
||||
self.mqtt = mqtt
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
|
@ -137,6 +139,8 @@ class LogRetriever(threading.Thread):
|
|||
logging.exception("Exception retrieving log event.")
|
||||
|
||||
def _handle_event(self):
|
||||
fields = {}
|
||||
source_url = ''
|
||||
job = self.gearman_worker.getJob()
|
||||
try:
|
||||
arguments = json.loads(job.arguments.decode('utf-8'))
|
||||
|
@ -180,9 +184,27 @@ class LogRetriever(threading.Thread):
|
|||
for f in all_filters:
|
||||
f.close()
|
||||
job.sendWorkComplete()
|
||||
if self.mqtt:
|
||||
msg = json.dumps({
|
||||
'build_uuid': fields.get('build_uuid'),
|
||||
'source_url': source_url,
|
||||
'status': 'success',
|
||||
})
|
||||
self.mqtt.publish_single(msg, fields.get('project'),
|
||||
fields.get('build_change'),
|
||||
'retrieve_logs')
|
||||
except Exception as e:
|
||||
logging.exception("Exception handling log event.")
|
||||
job.sendWorkException(str(e).encode('utf-8'))
|
||||
if self.mqtt:
|
||||
msg = json.dumps({
|
||||
'build_uuid': fields.get('build_uuid'),
|
||||
'source_url': source_url,
|
||||
'status': 'failure',
|
||||
})
|
||||
self.mqtt.publish_single(msg, fields.get('project'),
|
||||
fields.get('build_change'),
|
||||
'retrieve_logs')
|
||||
|
||||
def _retrieve_log(self, source_url, retry):
|
||||
# TODO (clarkb): This should check the content type instead of file
|
||||
|
@ -277,9 +299,10 @@ class LogRetriever(threading.Thread):
|
|||
|
||||
|
||||
class StdOutLogProcessor(object):
|
||||
def __init__(self, logq, pretty_print=False):
|
||||
def __init__(self, logq, pretty_print=False, mqtt=None):
|
||||
self.logq = logq
|
||||
self.pretty_print = pretty_print
|
||||
self.mqtt = mqtt
|
||||
|
||||
def handle_log_event(self):
|
||||
log = self.logq.get()
|
||||
|
@ -290,16 +313,26 @@ class StdOutLogProcessor(object):
|
|||
print(json.dumps(log))
|
||||
# Push each log event through to keep logstash up to date.
|
||||
sys.stdout.flush()
|
||||
if self.mqtt:
|
||||
msg = json.dumps({
|
||||
'build_uuid': log.get('build_uuid'),
|
||||
'source_url': log.get('log_url'),
|
||||
'status': 'success',
|
||||
})
|
||||
self.mqtt.publish_single(msg, log.get('project'),
|
||||
log.get('build_change'),
|
||||
'logs_to_logstash')
|
||||
|
||||
|
||||
class INETLogProcessor(object):
|
||||
socket_type = None
|
||||
|
||||
def __init__(self, logq, host, port):
|
||||
def __init__(self, logq, host, port, mqtt=None):
|
||||
self.logq = logq
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.socket = None
|
||||
self.mqtt = mqtt
|
||||
|
||||
def _connect_socket(self):
|
||||
logging.debug("Creating socket.")
|
||||
|
@ -312,6 +345,15 @@ class INETLogProcessor(object):
|
|||
if self.socket is None:
|
||||
self._connect_socket()
|
||||
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
|
||||
if self.mqtt:
|
||||
msg = json.dumps({
|
||||
'build_uuid': log.get('build_uuid'),
|
||||
'source_url': log.get('log_url'),
|
||||
'status': 'success',
|
||||
})
|
||||
self.mqtt.publish_single(msg, log.get('project'),
|
||||
log.get('build_change'),
|
||||
'logs_to_logstash')
|
||||
except:
|
||||
logging.exception("Exception sending INET event.")
|
||||
# Logstash seems to take about a minute to start again. Wait 90
|
||||
|
@ -321,6 +363,14 @@ class INETLogProcessor(object):
|
|||
semi_busy_wait(90)
|
||||
self._connect_socket()
|
||||
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
|
||||
if self.mqtt:
|
||||
msg = json.dumps({
|
||||
'build_uuid': log.get('build_uuid'),
|
||||
'status': 'success',
|
||||
})
|
||||
self.mqtt.publish_single(msg, log.get('project'),
|
||||
log.get('build_change'),
|
||||
'logs_to_logstash')
|
||||
|
||||
|
||||
class UDPLogProcessor(INETLogProcessor):
|
||||
|
@ -331,6 +381,30 @@ class TCPLogProcessor(INETLogProcessor):
|
|||
socket_type = socket.SOCK_STREAM
|
||||
|
||||
|
||||
class PushMQTT(object):
|
||||
def __init__(self, hostname, base_topic, port=1883, client_id=None,
|
||||
keepalive=60, will=None, auth=None, tls=None, qos=0):
|
||||
self.hostname = hostname
|
||||
self.port = port
|
||||
self.client_id = client_id
|
||||
self.keepalive = 60
|
||||
self.will = will
|
||||
self.auth = auth
|
||||
self.tls = tls
|
||||
self.qos = qos
|
||||
self.base_topic = base_topic
|
||||
|
||||
def _generate_topic(self, project, job_id, action):
|
||||
return '/'.join([self.base_topic, project, job_id, action])
|
||||
|
||||
def publish_single(self, msg, project, job_id, action):
|
||||
topic = _generate_topic(project, job_id)
|
||||
publish.single(topic, msg, hostname=self.hostname,
|
||||
port=self.port, client_id=self.client_id,
|
||||
keepalive=self.keepalive, will=self.will,
|
||||
auth=self.auth, tls=self.tls, qos=self.qos)
|
||||
|
||||
|
||||
class Server(object):
|
||||
def __init__(self, config, debuglog):
|
||||
# Config init.
|
||||
|
@ -340,6 +414,14 @@ class Server(object):
|
|||
self.output_host = self.config['output-host']
|
||||
self.output_port = self.config['output-port']
|
||||
self.output_mode = self.config['output-mode']
|
||||
mqtt_host = self.config.get('mqtt-host')
|
||||
mqtt_port = self.config.get('mqtt-port', 1883)
|
||||
mqtt_user = self.config.get('mqtt-user')
|
||||
mqtt_pass = self.config.get('mqtt-pass')
|
||||
mqtt_topic = self.configget('mqtt-topic', 'gearman-subunit')
|
||||
mqtt_ca_certs = self.config.get('mqtt-ca-certs')
|
||||
mqtt_certfile = self.config.get('mqtt-certfile')
|
||||
mqtt_keyfile = self.config.get('mqtt-keyfile')
|
||||
# Pythong logging output file.
|
||||
self.debuglog = debuglog
|
||||
self.retriever = None
|
||||
|
@ -351,6 +433,21 @@ class Server(object):
|
|||
if crmscript and crmdata:
|
||||
self.filter_factories.append(
|
||||
CRM114FilterFactory(crmscript, crmdata))
|
||||
# Setup MQTT
|
||||
self.mqtt = None
|
||||
if mqtt_host:
|
||||
auth = None
|
||||
if mqtt_user:
|
||||
auth = {'username': mqtt_user}
|
||||
if mqtt_pass:
|
||||
auth['password'] = mqtt_pass
|
||||
tls = None
|
||||
if mqtt_ca_certs:
|
||||
tls = {'ca_certs': mqtt_ca_certs, 'certfile': mqtt_certfile,
|
||||
'keyfile': mqtt_keyfile}
|
||||
|
||||
self.mqtt = PushMQTT(mqtt_host, mqtt_topic, port=mqtt_port,
|
||||
auth=auth, tls=tls)
|
||||
|
||||
def setup_logging(self):
|
||||
if self.debuglog:
|
||||
|
@ -382,21 +479,23 @@ class Server(object):
|
|||
self.gearman_port)
|
||||
gearman_worker.registerFunction(b'push-log')
|
||||
self.retriever = LogRetriever(gearman_worker, self.filter_factories,
|
||||
self.logqueue)
|
||||
self.logqueue, mqtt=self.mqtt)
|
||||
|
||||
def setup_processor(self):
|
||||
if self.output_mode == "tcp":
|
||||
self.processor = TCPLogProcessor(self.logqueue,
|
||||
self.output_host,
|
||||
self.output_port)
|
||||
self.output_port,
|
||||
mqtt=self.mqtt)
|
||||
elif self.output_mode == "udp":
|
||||
self.processor = UDPLogProcessor(self.logqueue,
|
||||
self.output_host,
|
||||
self.output_port)
|
||||
self.output_port,
|
||||
mqtt=self.mqtt)
|
||||
else:
|
||||
# Note this processor will not work if the process is run as a
|
||||
# daemon. You must use the --foreground option.
|
||||
self.processor = StdOutLogProcessor(self.logqueue)
|
||||
self.processor = StdOutLogProcessor(self.logqueue, mqtt=self.mqtt)
|
||||
|
||||
def main(self):
|
||||
self.setup_retriever()
|
||||
|
|
|
@ -76,6 +76,13 @@ class log_processor (
|
|||
provider => openstack_pip,
|
||||
require => Class['pip'],
|
||||
}
|
||||
if ! defined(Package['paho-mqtt']) {
|
||||
package { 'paho-mqtt':
|
||||
ensure => latest,
|
||||
provider => openstack_pip,
|
||||
require => Class['pip'],
|
||||
}
|
||||
}
|
||||
|
||||
file { '/var/lib/crm114':
|
||||
ensure => directory,
|
||||
|
|
Loading…
Reference in New Issue