diff --git a/files/log-gearman-worker.py b/files/log-gearman-worker.py index 74acc00..fdaecdf 100644 --- a/files/log-gearman-worker.py +++ b/files/log-gearman-worker.py @@ -33,6 +33,7 @@ import time import urllib2 import yaml +import paho.mqtt.publish as publish try: import daemon.pidlockfile as pidfile_mod @@ -123,11 +124,12 @@ class CRM114FilterFactory(object): class LogRetriever(threading.Thread): - def __init__(self, gearman_worker, filters, logq): + def __init__(self, gearman_worker, filters, logq, mqtt=None): threading.Thread.__init__(self) self.gearman_worker = gearman_worker self.filters = filters self.logq = logq + self.mqtt = mqtt def run(self): while True: @@ -137,6 +139,8 @@ class LogRetriever(threading.Thread): logging.exception("Exception retrieving log event.") def _handle_event(self): + fields = {} + source_url = '' job = self.gearman_worker.getJob() try: arguments = json.loads(job.arguments.decode('utf-8')) @@ -180,9 +184,27 @@ class LogRetriever(threading.Thread): for f in all_filters: f.close() job.sendWorkComplete() + if self.mqtt: + msg = json.dumps({ + 'build_uuid': fields.get('build_uuid'), + 'source_url': source_url, + 'status': 'success', + }) + self.mqtt.publish_single(msg, fields.get('project'), + fields.get('build_change'), + 'retrieve_logs') except Exception as e: logging.exception("Exception handling log event.") job.sendWorkException(str(e).encode('utf-8')) + if self.mqtt: + msg = json.dumps({ + 'build_uuid': fields.get('build_uuid'), + 'source_url': source_url, + 'status': 'failure', + }) + self.mqtt.publish_single(msg, fields.get('project'), + fields.get('build_change'), + 'retrieve_logs') def _retrieve_log(self, source_url, retry): # TODO (clarkb): This should check the content type instead of file @@ -277,9 +299,10 @@ class LogRetriever(threading.Thread): class StdOutLogProcessor(object): - def __init__(self, logq, pretty_print=False): + def __init__(self, logq, pretty_print=False, mqtt=None): self.logq = logq self.pretty_print = pretty_print + self.mqtt = mqtt def handle_log_event(self): log = self.logq.get() @@ -290,16 +313,26 @@ class StdOutLogProcessor(object): print(json.dumps(log)) # Push each log event through to keep logstash up to date. sys.stdout.flush() + if self.mqtt: + msg = json.dumps({ + 'build_uuid': log.get('build_uuid'), + 'source_url': log.get('log_url'), + 'status': 'success', + }) + self.mqtt.publish_single(msg, log.get('project'), + log.get('build_change'), + 'logs_to_logstash') class INETLogProcessor(object): socket_type = None - def __init__(self, logq, host, port): + def __init__(self, logq, host, port, mqtt=None): self.logq = logq self.host = host self.port = port self.socket = None + self.mqtt = mqtt def _connect_socket(self): logging.debug("Creating socket.") @@ -312,6 +345,15 @@ class INETLogProcessor(object): if self.socket is None: self._connect_socket() self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) + if self.mqtt: + msg = json.dumps({ + 'build_uuid': log.get('build_uuid'), + 'source_url': log.get('log_url'), + 'status': 'success', + }) + self.mqtt.publish_single(msg, log.get('project'), + log.get('build_change'), + 'logs_to_logstash') except: logging.exception("Exception sending INET event.") # Logstash seems to take about a minute to start again. Wait 90 @@ -321,6 +363,14 @@ class INETLogProcessor(object): semi_busy_wait(90) self._connect_socket() self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) + if self.mqtt: + msg = json.dumps({ + 'build_uuid': log.get('build_uuid'), + 'status': 'success', + }) + self.mqtt.publish_single(msg, log.get('project'), + log.get('build_change'), + 'logs_to_logstash') class UDPLogProcessor(INETLogProcessor): @@ -331,6 +381,30 @@ class TCPLogProcessor(INETLogProcessor): socket_type = socket.SOCK_STREAM +class PushMQTT(object): + def __init__(self, hostname, base_topic, port=1883, client_id=None, + keepalive=60, will=None, auth=None, tls=None, qos=0): + self.hostname = hostname + self.port = port + self.client_id = client_id + self.keepalive = 60 + self.will = will + self.auth = auth + self.tls = tls + self.qos = qos + self.base_topic = base_topic + + def _generate_topic(self, project, job_id, action): + return '/'.join([self.base_topic, project, job_id, action]) + + def publish_single(self, msg, project, job_id, action): + topic = _generate_topic(project, job_id) + publish.single(topic, msg, hostname=self.hostname, + port=self.port, client_id=self.client_id, + keepalive=self.keepalive, will=self.will, + auth=self.auth, tls=self.tls, qos=self.qos) + + class Server(object): def __init__(self, config, debuglog): # Config init. @@ -340,6 +414,14 @@ class Server(object): self.output_host = self.config['output-host'] self.output_port = self.config['output-port'] self.output_mode = self.config['output-mode'] + mqtt_host = self.config.get('mqtt-host') + mqtt_port = self.config.get('mqtt-port', 1883) + mqtt_user = self.config.get('mqtt-user') + mqtt_pass = self.config.get('mqtt-pass') + mqtt_topic = self.configget('mqtt-topic', 'gearman-subunit') + mqtt_ca_certs = self.config.get('mqtt-ca-certs') + mqtt_certfile = self.config.get('mqtt-certfile') + mqtt_keyfile = self.config.get('mqtt-keyfile') # Pythong logging output file. self.debuglog = debuglog self.retriever = None @@ -351,6 +433,21 @@ class Server(object): if crmscript and crmdata: self.filter_factories.append( CRM114FilterFactory(crmscript, crmdata)) + # Setup MQTT + self.mqtt = None + if mqtt_host: + auth = None + if mqtt_user: + auth = {'username': mqtt_user} + if mqtt_pass: + auth['password'] = mqtt_pass + tls = None + if mqtt_ca_certs: + tls = {'ca_certs': mqtt_ca_certs, 'certfile': mqtt_certfile, + 'keyfile': mqtt_keyfile} + + self.mqtt = PushMQTT(mqtt_host, mqtt_topic, port=mqtt_port, + auth=auth, tls=tls) def setup_logging(self): if self.debuglog: @@ -382,21 +479,23 @@ class Server(object): self.gearman_port) gearman_worker.registerFunction(b'push-log') self.retriever = LogRetriever(gearman_worker, self.filter_factories, - self.logqueue) + self.logqueue, mqtt=self.mqtt) def setup_processor(self): if self.output_mode == "tcp": self.processor = TCPLogProcessor(self.logqueue, self.output_host, - self.output_port) + self.output_port, + mqtt=self.mqtt) elif self.output_mode == "udp": self.processor = UDPLogProcessor(self.logqueue, self.output_host, - self.output_port) + self.output_port, + mqtt=self.mqtt) else: # Note this processor will not work if the process is run as a # daemon. You must use the --foreground option. - self.processor = StdOutLogProcessor(self.logqueue) + self.processor = StdOutLogProcessor(self.logqueue, mqtt=self.mqtt) def main(self): self.setup_retriever() diff --git a/manifests/init.pp b/manifests/init.pp index 78e0018..a4396cf 100644 --- a/manifests/init.pp +++ b/manifests/init.pp @@ -76,6 +76,13 @@ class log_processor ( provider => openstack_pip, require => Class['pip'], } + if ! defined(Package['paho-mqtt']) { + package { 'paho-mqtt': + ensure => latest, + provider => openstack_pip, + require => Class['pip'], + } + } file { '/var/lib/crm114': ensure => directory,