From c55d63a16c0ec58d73da9f186c7a09c4120de8f0 Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Mon, 5 Feb 2018 13:13:31 -0500 Subject: [PATCH] Add configurable notification subscriber and mqtt driver The previous commits adds support for a configurable notification driver and an mqtt publisher implementation. The subscriber interface was left hard coded to the pika implementation and respecting the configurable driver config setting was left as a future item. This commit addresses this and adds the missing subscriber interface for the configurable notification drivers with an mqtt implementation. Change-Id: I97aafc5fd94ac1a8c126fa1f463e33e772bf87f0 --- storyboard/notifications/mqtt/subscriber.py | 94 +++++++++++++++++++++ storyboard/notifications/pika/subscriber.py | 5 +- storyboard/notifications/subscriber.py | 30 +++++++ storyboard/plugin/event_worker.py | 4 +- 4 files changed, 130 insertions(+), 3 deletions(-) create mode 100644 storyboard/notifications/mqtt/subscriber.py create mode 100644 storyboard/notifications/subscriber.py diff --git a/storyboard/notifications/mqtt/subscriber.py b/storyboard/notifications/mqtt/subscriber.py new file mode 100644 index 00000000..d804abe8 --- /dev/null +++ b/storyboard/notifications/mqtt/subscriber.py @@ -0,0 +1,94 @@ +# Copyright 2018 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json + +from oslo_config import cfg +import paho.mqtt.client as mqtt +from stevedore import enabled + +from storyboard.notifications.mqtt import conf + +CONF = cfg.CONF +CONF.register_opts(conf.MQTT_OPTS, group='mqtt_notifications') + + +def handle_event(ext, body): + """Handle an event from the queue. + + :param ext: The extension that's handling this event. + :param body: The body of the event. + :return: The result of the handler. + """ + payload = json.loads(body) + return ext.obj.event(author_id=payload['author_id'] or None, + method=payload['method'] or None, + url=payload['url'] or None, + path=payload['path'] or None, + query_string=payload['query_string'] or None, + status=payload['status'] or None, + resource=payload['resource'] or None, + resource_id=payload['resource_id'] or None, + sub_resource=payload['sub_resource'] or None, + sub_resource_id=payload['sub_resource_id'] or None, + resource_before=payload['resource_before'] or None, + resource_after=payload['resource_after'] or None) + + +def check_enabled(ext): + """Check to see whether an extension should be enabled. + + :param ext: The extension instance to check. + :return: True if it should be enabled. Otherwise false. + """ + return ext.obj.enabled() + + +def subscriber(topic=None): + if not topic: + topic = CONF.mqtt_notifications.base_topic + '/#' + auth = None + if CONF.mqtt_notifications.username: + auth = {'username': CONF.mqtt_notifications.username, + 'password': CONF.mqtt_notifications.password} + tls = None + if CONF.mqtt_notifications.ca_certs: + tls = {'ca_certs': CONF.mqtt_notifications.ca_certs, + 'certfile': CONF.mqtt_notifications.certfile, + 'keyfile': CONF.mqtt_notifications.keyfile} + client = mqtt.Client() + if tls: + client.tls_set(**tls) + if auth: + client.username_pw_set(auth['username'], + password=auth.get('password')) + + manager = enabled.EnabledExtensionManager( + namespace='storyboard.plugin.worker', + check_func=check_enabled, + invoke_on_load=True, + invoke_args=(CONF,)) + + def on_connect(client, userdata, flags, rc): + # If no topic is specified subscribe to all messages on base_topic + client.subscribe(topic, qos=CONF.mqtt_notifications.qos) + + def on_message(client, userdata, msg): + manager.map(handle_event, msg.payload) + + client.on_connect = on_connect + client.on_message = on_message + client.connect(CONF.mqtt_notifications.hostname, + CONF.mqtt_notifications.port) + client.loop_forever() diff --git a/storyboard/notifications/pika/subscriber.py b/storyboard/notifications/pika/subscriber.py index 73b7dcd4..3d317772 100644 --- a/storyboard/notifications/pika/subscriber.py +++ b/storyboard/notifications/pika/subscriber.py @@ -30,7 +30,7 @@ CONF = cfg.CONF LOG = log.getLogger(__name__) -def subscribe(): +def subscribe(topic=None): try: log.register_options(CONF) except cfg.ArgsAlreadyParsedError: @@ -39,6 +39,9 @@ def subscribe(): log.setup(CONF, 'storyboard') CONF(project='storyboard') CONF.register_opts(NOTIFICATION_OPTS, "notifications") + if topic: + LOG.warning("A subscription topic was specified, but the pika driver" + "doesn't use topics") subscriber = Subscriber(CONF.notifications) subscriber.start() diff --git a/storyboard/notifications/subscriber.py b/storyboard/notifications/subscriber.py new file mode 100644 index 00000000..438f3842 --- /dev/null +++ b/storyboard/notifications/subscriber.py @@ -0,0 +1,30 @@ +# Copyright 2018 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_utils import importutils + +from storyboard.notifications import conf + +CONF = cfg.CONF +CONF.register_opts(conf.OPTS, 'notifications') + + +def subscribe(topic=None): + + subscriber_module = importutils.import_module( + 'storyboard.notifications.' + CONF.notifications.driver + + '.subscriber') + subscriber_module.subscribe(topic) diff --git a/storyboard/plugin/event_worker.py b/storyboard/plugin/event_worker.py index f71e00e8..b9e54ba0 100644 --- a/storyboard/plugin/event_worker.py +++ b/storyboard/plugin/event_worker.py @@ -22,7 +22,7 @@ from oslo_log import log import storyboard.db.api.base as db_api from storyboard.notifications.notification_hook import class_mappings -from storyboard.notifications.pika.subscriber import subscribe +from storyboard.notifications import subscriber from storyboard._i18n import _LI, _LW from storyboard.plugin.base import PluginBase @@ -55,7 +55,7 @@ def run_daemon(): signal.signal(signal.SIGTERM, terminate) signal.signal(signal.SIGINT, terminate) - MANAGER = DaemonManager(daemon_method=subscribe, + MANAGER = DaemonManager(daemon_method=subscriber.subscribe, child_process_count=CONF.worker_count) MANAGER.start()