Add configurable notification subscriber and mqtt driver

The previous commits adds support for a configurable notification driver
and an mqtt publisher implementation. The subscriber interface was left
hard coded to the pika implementation and respecting the configurable
driver config setting was left as a future item. This commit addresses
this and adds the missing subscriber interface for the configurable
notification drivers with an mqtt implementation.

Change-Id: I97aafc5fd94ac1a8c126fa1f463e33e772bf87f0
This commit is contained in:
Matthew Treinish 2018-02-05 13:13:31 -05:00
parent d84e80ec99
commit c55d63a16c
No known key found for this signature in database
GPG Key ID: FD12A0F214C9E177
4 changed files with 130 additions and 3 deletions

View File

@ -0,0 +1,94 @@
# Copyright 2018 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from oslo_config import cfg
import paho.mqtt.client as mqtt
from stevedore import enabled
from storyboard.notifications.mqtt import conf
CONF = cfg.CONF
CONF.register_opts(conf.MQTT_OPTS, group='mqtt_notifications')
def handle_event(ext, body):
"""Handle an event from the queue.
:param ext: The extension that's handling this event.
:param body: The body of the event.
:return: The result of the handler.
"""
payload = json.loads(body)
return ext.obj.event(author_id=payload['author_id'] or None,
method=payload['method'] or None,
url=payload['url'] or None,
path=payload['path'] or None,
query_string=payload['query_string'] or None,
status=payload['status'] or None,
resource=payload['resource'] or None,
resource_id=payload['resource_id'] or None,
sub_resource=payload['sub_resource'] or None,
sub_resource_id=payload['sub_resource_id'] or None,
resource_before=payload['resource_before'] or None,
resource_after=payload['resource_after'] or None)
def check_enabled(ext):
"""Check to see whether an extension should be enabled.
:param ext: The extension instance to check.
:return: True if it should be enabled. Otherwise false.
"""
return ext.obj.enabled()
def subscriber(topic=None):
if not topic:
topic = CONF.mqtt_notifications.base_topic + '/#'
auth = None
if CONF.mqtt_notifications.username:
auth = {'username': CONF.mqtt_notifications.username,
'password': CONF.mqtt_notifications.password}
tls = None
if CONF.mqtt_notifications.ca_certs:
tls = {'ca_certs': CONF.mqtt_notifications.ca_certs,
'certfile': CONF.mqtt_notifications.certfile,
'keyfile': CONF.mqtt_notifications.keyfile}
client = mqtt.Client()
if tls:
client.tls_set(**tls)
if auth:
client.username_pw_set(auth['username'],
password=auth.get('password'))
manager = enabled.EnabledExtensionManager(
namespace='storyboard.plugin.worker',
check_func=check_enabled,
invoke_on_load=True,
invoke_args=(CONF,))
def on_connect(client, userdata, flags, rc):
# If no topic is specified subscribe to all messages on base_topic
client.subscribe(topic, qos=CONF.mqtt_notifications.qos)
def on_message(client, userdata, msg):
manager.map(handle_event, msg.payload)
client.on_connect = on_connect
client.on_message = on_message
client.connect(CONF.mqtt_notifications.hostname,
CONF.mqtt_notifications.port)
client.loop_forever()

View File

@ -30,7 +30,7 @@ CONF = cfg.CONF
LOG = log.getLogger(__name__)
def subscribe():
def subscribe(topic=None):
try:
log.register_options(CONF)
except cfg.ArgsAlreadyParsedError:
@ -39,6 +39,9 @@ def subscribe():
log.setup(CONF, 'storyboard')
CONF(project='storyboard')
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
if topic:
LOG.warning("A subscription topic was specified, but the pika driver"
"doesn't use topics")
subscriber = Subscriber(CONF.notifications)
subscriber.start()

View File

@ -0,0 +1,30 @@
# Copyright 2018 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_utils import importutils
from storyboard.notifications import conf
CONF = cfg.CONF
CONF.register_opts(conf.OPTS, 'notifications')
def subscribe(topic=None):
subscriber_module = importutils.import_module(
'storyboard.notifications.' + CONF.notifications.driver +
'.subscriber')
subscriber_module.subscribe(topic)

View File

@ -22,7 +22,7 @@ from oslo_log import log
import storyboard.db.api.base as db_api
from storyboard.notifications.notification_hook import class_mappings
from storyboard.notifications.pika.subscriber import subscribe
from storyboard.notifications import subscriber
from storyboard._i18n import _LI, _LW
from storyboard.plugin.base import PluginBase
@ -55,7 +55,7 @@ def run_daemon():
signal.signal(signal.SIGTERM, terminate)
signal.signal(signal.SIGINT, terminate)
MANAGER = DaemonManager(daemon_method=subscribe,
MANAGER = DaemonManager(daemon_method=subscriber.subscribe,
child_process_count=CONF.worker_count)
MANAGER.start()