Add MQTT notification publisher

This commit adds an alternate notification publisher driver for mqtt.
The intent here is to use this driver with firehose.openstack.org, which
is infra's unified event bus. [1] To use this driver you first need to
set it as the notification driver with the driver option in the
notifications config group. Then set the required config options in the
mqtt_notifications section.

[1] https://docs.openstack.org/infra/system-config/firehose.html

Change-Id: Ie82e625fcce7d5b5b794a46f6456ccff1dc5ec3e
This commit is contained in:
Matthew Treinish 2018-01-28 02:40:42 -05:00
parent c7e0e458ff
commit d84e80ec99
No known key found for this signature in database
GPG Key ID: FD12A0F214C9E177
5 changed files with 157 additions and 1 deletions

View File

@ -30,3 +30,4 @@ apscheduler>=3.0.1,<3.1.0
python_dateutil>=2.4.0
oslo.concurrency>=3.8.0 # Apache-2.0
oslo.i18n>=2.1.0 # Apache-2.0
paho-mqtt>=1.3.1

View File

@ -18,6 +18,6 @@ from oslo_config import cfg
CONF = cfg.CONF
OPTS = [
cfg.StrOpt('driver', choices=['pika'],
cfg.StrOpt('driver', choices=['pika', 'mqtt'],
help='The notification driver to use', default='pika')
]

View File

@ -0,0 +1,51 @@
# Copyright 2018 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
MQTT_OPTS = [
cfg.StrOpt('hostname', help="MQTT broker address/name"),
cfg.IntOpt('port', default=1883, help='MQTT broker port'),
cfg.StrOpt('username',
help="Username to authenticate against the broker."),
cfg.StrOpt('password', secret=True,
help='Password to authenticate against the broker.'),
cfg.IntOpt('qos', default=0, min=0, max=2,
help='Max MQTT QoS available on messages. This can be 0, 1, '
'or 2'),
cfg.StrOpt('client_id',
help='MQTT client identifier, default is hostname + pid'),
cfg.StrOpt('base_topic', default='storyboard',
help='The base MQTT topic to publish to'),
cfg.StrOpt('ca_certs',
help="The path to the Certificate Authority certificate files "
"that are to be treated as trusted. If this is the only "
"certificate option given then the client will operate in "
"a similar manner to a web browser. That is to say it will"
"require the broker to have a certificate signed by the "
"Certificate Authorities in ca_certs and will communicate "
"using TLS v1, but will not attempt any form of TLS"
"certificate based authentication."),
cfg.StrOpt('certfile',
help="The path pointing to the PEM encoded client certificate. "
"If this is set it will be used as client information for "
"TLS based authentication. Support for this feature is "
"broker dependent."),
cfg.StrOpt('keyfile',
help="The path pointing to the PEM encoded client private key."
"If this is set it will be used as client information for "
"TLS based authentication. Support for this feature is "
"broker dependent"),
]

View File

@ -0,0 +1,104 @@
# Copyright 2018 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from oslo_config import cfg
import paho.mqtt.publish as mqtt_publish
from storyboard.notifications.mqtt import conf
CONF = cfg.CONF
CONF.register_opts(conf.MQTT_OPTS, group='mqtt_notifications')
class PushMQTT(object):
def __init__(self, hostname, port=1883, client_id=None,
keepalive=60, will=None, auth=None, tls=None, qos=0):
self.hostname = hostname
self.port = port
self.client_id = client_id
self.keepalive = 60
self.will = will
self.auth = auth
self.tls = tls
self.qos = qos
def publish_single(self, topic, msg):
mqtt_publish.single(topic, msg, hostname=self.hostname,
port=self.port, client_id=self.client_id,
keepalive=self.keepalive, will=self.will,
auth=self.auth, tls=self.tls, qos=self.qos)
def publish_multiple(self, topic, msg):
mqtt_publish.multiple(topic, msg, hostname=self.hostname,
port=self.port, client_id=self.client_id,
keepalive=self.keepalive, will=self.will,
auth=self.auth, tls=self.tls, qos=self.qos)
def config_publisher():
auth = None
if CONF.mqtt_notifications.username:
auth = {'username': CONF.mqtt_notifications.username,
'password': CONF.mqtt_notifications.password}
tls = None
if CONF.mqtt_notifications.ca_certs:
tls = {'ca_certs': CONF.mqtt_notifications.ca_certs,
'certfile': CONF.mqtt_notifications.certfile,
'keyfile': CONF.mqtt_notifications.keyfile}
return PushMQTT(CONF.mqtt_notifications.hostname,
port=CONF.mqtt_notifications.port,
client_id=CONF.mqtt_notifications.client_id,
auth=auth, tls=tls, qos=CONF.mqtt_notifications.qos)
def _generate_topic(resource, resource_id=None, author_id=None,
sub_resource=None, sub_resource_id=None):
topic = [CONF.mqtt_notifications.base_topic]
if resource:
topic.append(resource)
if resource_id:
topic.append(resource_id)
if author_id:
topic.append(author_id)
if sub_resource:
topic.extend(['sub_resource', sub_resource])
if sub_resource_id:
topic.append(sub_resource_id)
return '/'.join(topic)
def publish(resource, author_id=None, method=None, url=None, path=None,
query_string=None, status=None, resource_id=None,
sub_resource=None, sub_resource_id=None, resource_before=None,
resource_after=None):
mqtt_publish = config_publisher()
topic = _generate_topic(resource, resource_id, author_id, sub_resource,
sub_resource_id)
payload = {
"author_id": author_id,
"method": method,
"url": url,
"path": path,
"query_string": query_string,
"status": status,
"resource": resource,
"resource_id": resource_id,
"sub_resource": sub_resource,
"sub_resource_id": sub_resource_id,
"resource_before": resource_before,
"resource_after": resource_after
}
mqtt_publish.publish_single(topic, json.dumps(payload))