From d84e80ec99863f6a0c1bc66f7558bd80d083a66f Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Sun, 28 Jan 2018 02:40:42 -0500 Subject: [PATCH] Add MQTT notification publisher This commit adds an alternate notification publisher driver for mqtt. The intent here is to use this driver with firehose.openstack.org, which is infra's unified event bus. [1] To use this driver you first need to set it as the notification driver with the driver option in the notifications config group. Then set the required config options in the mqtt_notifications section. [1] https://docs.openstack.org/infra/system-config/firehose.html Change-Id: Ie82e625fcce7d5b5b794a46f6456ccff1dc5ec3e --- requirements.txt | 1 + storyboard/notifications/conf.py | 2 +- storyboard/notifications/mqtt/__init__.py | 0 storyboard/notifications/mqtt/conf.py | 51 ++++++++++ storyboard/notifications/mqtt/publisher.py | 104 +++++++++++++++++++++ 5 files changed, 157 insertions(+), 1 deletion(-) create mode 100644 storyboard/notifications/mqtt/__init__.py create mode 100644 storyboard/notifications/mqtt/conf.py create mode 100644 storyboard/notifications/mqtt/publisher.py diff --git a/requirements.txt b/requirements.txt index 522f2c9f..d063bf13 100644 --- a/requirements.txt +++ b/requirements.txt @@ -30,3 +30,4 @@ apscheduler>=3.0.1,<3.1.0 python_dateutil>=2.4.0 oslo.concurrency>=3.8.0 # Apache-2.0 oslo.i18n>=2.1.0 # Apache-2.0 +paho-mqtt>=1.3.1 diff --git a/storyboard/notifications/conf.py b/storyboard/notifications/conf.py index e6d96591..ae089eb5 100644 --- a/storyboard/notifications/conf.py +++ b/storyboard/notifications/conf.py @@ -18,6 +18,6 @@ from oslo_config import cfg CONF = cfg.CONF OPTS = [ - cfg.StrOpt('driver', choices=['pika'], + cfg.StrOpt('driver', choices=['pika', 'mqtt'], help='The notification driver to use', default='pika') ] diff --git a/storyboard/notifications/mqtt/__init__.py b/storyboard/notifications/mqtt/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/storyboard/notifications/mqtt/conf.py b/storyboard/notifications/mqtt/conf.py new file mode 100644 index 00000000..60b7fca6 --- /dev/null +++ b/storyboard/notifications/mqtt/conf.py @@ -0,0 +1,51 @@ +# Copyright 2018 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg + + +MQTT_OPTS = [ + cfg.StrOpt('hostname', help="MQTT broker address/name"), + cfg.IntOpt('port', default=1883, help='MQTT broker port'), + cfg.StrOpt('username', + help="Username to authenticate against the broker."), + cfg.StrOpt('password', secret=True, + help='Password to authenticate against the broker.'), + cfg.IntOpt('qos', default=0, min=0, max=2, + help='Max MQTT QoS available on messages. This can be 0, 1, ' + 'or 2'), + cfg.StrOpt('client_id', + help='MQTT client identifier, default is hostname + pid'), + cfg.StrOpt('base_topic', default='storyboard', + help='The base MQTT topic to publish to'), + cfg.StrOpt('ca_certs', + help="The path to the Certificate Authority certificate files " + "that are to be treated as trusted. If this is the only " + "certificate option given then the client will operate in " + "a similar manner to a web browser. That is to say it will" + "require the broker to have a certificate signed by the " + "Certificate Authorities in ca_certs and will communicate " + "using TLS v1, but will not attempt any form of TLS" + "certificate based authentication."), + cfg.StrOpt('certfile', + help="The path pointing to the PEM encoded client certificate. " + "If this is set it will be used as client information for " + "TLS based authentication. Support for this feature is " + "broker dependent."), + cfg.StrOpt('keyfile', + help="The path pointing to the PEM encoded client private key." + "If this is set it will be used as client information for " + "TLS based authentication. Support for this feature is " + "broker dependent"), +] diff --git a/storyboard/notifications/mqtt/publisher.py b/storyboard/notifications/mqtt/publisher.py new file mode 100644 index 00000000..50ef3cb2 --- /dev/null +++ b/storyboard/notifications/mqtt/publisher.py @@ -0,0 +1,104 @@ +# Copyright 2018 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json + +from oslo_config import cfg +import paho.mqtt.publish as mqtt_publish + +from storyboard.notifications.mqtt import conf + +CONF = cfg.CONF +CONF.register_opts(conf.MQTT_OPTS, group='mqtt_notifications') + + +class PushMQTT(object): + def __init__(self, hostname, port=1883, client_id=None, + keepalive=60, will=None, auth=None, tls=None, qos=0): + self.hostname = hostname + self.port = port + self.client_id = client_id + self.keepalive = 60 + self.will = will + self.auth = auth + self.tls = tls + self.qos = qos + + def publish_single(self, topic, msg): + mqtt_publish.single(topic, msg, hostname=self.hostname, + port=self.port, client_id=self.client_id, + keepalive=self.keepalive, will=self.will, + auth=self.auth, tls=self.tls, qos=self.qos) + + def publish_multiple(self, topic, msg): + mqtt_publish.multiple(topic, msg, hostname=self.hostname, + port=self.port, client_id=self.client_id, + keepalive=self.keepalive, will=self.will, + auth=self.auth, tls=self.tls, qos=self.qos) + + +def config_publisher(): + auth = None + if CONF.mqtt_notifications.username: + auth = {'username': CONF.mqtt_notifications.username, + 'password': CONF.mqtt_notifications.password} + tls = None + if CONF.mqtt_notifications.ca_certs: + tls = {'ca_certs': CONF.mqtt_notifications.ca_certs, + 'certfile': CONF.mqtt_notifications.certfile, + 'keyfile': CONF.mqtt_notifications.keyfile} + return PushMQTT(CONF.mqtt_notifications.hostname, + port=CONF.mqtt_notifications.port, + client_id=CONF.mqtt_notifications.client_id, + auth=auth, tls=tls, qos=CONF.mqtt_notifications.qos) + + +def _generate_topic(resource, resource_id=None, author_id=None, + sub_resource=None, sub_resource_id=None): + topic = [CONF.mqtt_notifications.base_topic] + if resource: + topic.append(resource) + if resource_id: + topic.append(resource_id) + if author_id: + topic.append(author_id) + if sub_resource: + topic.extend(['sub_resource', sub_resource]) + if sub_resource_id: + topic.append(sub_resource_id) + return '/'.join(topic) + + +def publish(resource, author_id=None, method=None, url=None, path=None, + query_string=None, status=None, resource_id=None, + sub_resource=None, sub_resource_id=None, resource_before=None, + resource_after=None): + mqtt_publish = config_publisher() + topic = _generate_topic(resource, resource_id, author_id, sub_resource, + sub_resource_id) + payload = { + "author_id": author_id, + "method": method, + "url": url, + "path": path, + "query_string": query_string, + "status": status, + "resource": resource, + "resource_id": resource_id, + "sub_resource": sub_resource, + "sub_resource_id": sub_resource_id, + "resource_before": resource_before, + "resource_after": resource_after + } + mqtt_publish.publish_single(topic, json.dumps(payload))