From c7e0e458fff4a317789fea192fb5cf5f2fb0fe5c Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Sun, 28 Jan 2018 00:44:35 -0500 Subject: [PATCH] Make notification driver configurable This commit makes the notification driver configurable. Currently there is only one notification backend available, rabbit/pika, but to support emitting notifications to firehose.openstack.org in the future a MQTT backend will be added. This commit lays to groundwork for doing this. Note this only effects the notification publisher, the subscriber calls will be updated in the future. (there is only one use of the subscriber currently so that is updated manually to directly use the new path) The external api for registering a pecan notification hook remains the same, but the underlying publish call being made is set by the configuration option. This enables a different publish() method to be called depending on configuration. To facilitate this change the current pika notification options are deprecated in the 'notifications' group and are instead added to the 'pika-notifications' configuration group. This will be removed in the future. A new 'driver' option is added to the ntofications group to specify which driver to use which defaults to the current pika driver. Change-Id: I5235ae4368f8bff13c767b87ced83173d52b5155 --- storyboard/db/api/timeline_events.py | 20 +- storyboard/notifications/conf.py | 32 +--- storyboard/notifications/notification_hook.py | 26 +-- storyboard/notifications/pika/__init__.py | 0 storyboard/notifications/pika/conf.py | 56 ++++++ .../{ => pika}/connection_service.py | 0 storyboard/notifications/pika/publisher.py | 181 ++++++++++++++++++ .../notifications/{ => pika}/subscriber.py | 4 +- storyboard/notifications/publisher.py | 167 ++-------------- storyboard/plugin/event_worker.py | 2 +- .../notifications/test_notification_hook.py | 4 +- 11 files changed, 282 insertions(+), 210 deletions(-) create mode 100644 storyboard/notifications/pika/__init__.py create mode 100644 storyboard/notifications/pika/conf.py rename storyboard/notifications/{ => pika}/connection_service.py (100%) create mode 100644 storyboard/notifications/pika/publisher.py rename storyboard/notifications/{ => pika}/subscriber.py (97%) diff --git a/storyboard/db/api/timeline_events.py b/storyboard/db/api/timeline_events.py index 9ee4713e..ab177ec2 100644 --- a/storyboard/db/api/timeline_events.py +++ b/storyboard/db/api/timeline_events.py @@ -26,7 +26,7 @@ from storyboard.db.api import base as api_base from storyboard.db.api import stories as stories_api from storyboard.db.api import tasks as tasks_api from storyboard.db import models -from storyboard.notifications.publisher import publish +from storyboard.notifications import publisher CONF = cfg.CONF @@ -112,15 +112,15 @@ def event_create(values): event_dict = tojson(TimeLineEvent, TimeLineEvent.from_db_model(new_event)) - publish(author_id=request.current_user_id or None, - method="POST", - url=request.headers.get('Referer') or None, - path=request.path or None, - query_string=request.query_string or None, - status=response.status_code or None, - resource="timeline_event", - resource_id=new_event.id or None, - resource_after=event_dict or None) + publisher.publish(author_id=request.current_user_id or None, + method="POST", + url=request.headers.get('Referer') or None, + path=request.path or None, + query_string=request.query_string or None, + status=response.status_code or None, + resource="timeline_event", + resource_id=new_event.id or None, + resource_after=event_dict or None) return new_event diff --git a/storyboard/notifications/conf.py b/storyboard/notifications/conf.py index 325be8e1..e6d96591 100644 --- a/storyboard/notifications/conf.py +++ b/storyboard/notifications/conf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# Copyright (c) 2018 IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,31 +17,7 @@ from oslo_config import cfg CONF = cfg.CONF -NOTIFICATION_OPTS = [ - cfg.StrOpt("rabbit_exchange_name", default="storyboard", - help="The name of the topic exchange which storyboard will " - "use to broadcast its events."), - cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events", - help="The name of the queue that will be created for " - "API events."), - cfg.StrOpt("rabbit_application_name", default="storyboard", - help="The rabbit application identifier for storyboard's " - "connection."), - cfg.StrOpt("rabbit_host", default="localhost", - help="Host of the rabbitmq server."), - cfg.StrOpt("rabbit_login_method", default="AMQPLAIN", - help="The RabbitMQ login method."), - cfg.StrOpt("rabbit_userid", default="storyboard", - help="The RabbitMQ userid."), - cfg.StrOpt("rabbit_password", default="storyboard", - help="The RabbitMQ password."), - cfg.IntOpt("rabbit_port", default=5672, - help="The RabbitMQ broker port where a single node is used."), - cfg.StrOpt("rabbit_virtual_host", default="/", - help="The virtual host within which our queues and exchanges " - "live."), - cfg.IntOpt("rabbit_connection_attempts", default=6, - help="The number of connection attempts before giving-up"), - cfg.IntOpt("rabbit_retry_delay", default=10, - help="The interval between connection attempts (in seconds)") +OPTS = [ + cfg.StrOpt('driver', choices=['pika'], + help='The notification driver to use', default='pika') ] diff --git a/storyboard/notifications/notification_hook.py b/storyboard/notifications/notification_hook.py index aab541c6..11018ad2 100644 --- a/storyboard/notifications/notification_hook.py +++ b/storyboard/notifications/notification_hook.py @@ -23,7 +23,7 @@ from storyboard.api.v1 import wmodels import storyboard.common.hook_priorities as priority from storyboard.db.api import base as api_base from storyboard.db import models -from storyboard.notifications.publisher import publish +from storyboard.notifications import publisher class_mappings = {'task': [models.Task, wmodels.Task], @@ -110,18 +110,18 @@ class NotificationHook(hooks.PecanHook): # Build the payload. Use of None is included to ensure that we don't # accidentally blow up the API call, but we don't anticipate it # happening. - publish(author_id=request.current_user_id, - method=request.method, - url=request.headers.get('Referer'), - path=request.path, - query_string=request.query_string, - status=response.status_code, - resource=resource, - resource_id=resource_id, - sub_resource=subresource, - sub_resource_id=subresource_id, - resource_before=old_resource, - resource_after=new_resource) + publisher.publish(author_id=request.current_user_id, + method=request.method, + url=request.headers.get('Referer'), + path=request.path, + query_string=request.query_string, + status=response.status_code, + resource=resource, + resource_id=resource_id, + sub_resource=subresource, + sub_resource_id=subresource_id, + resource_before=old_resource, + resource_after=new_resource) def get_original_resource(self, resource, resource_id): """Given a resource name and ID, will load that resource and map it diff --git a/storyboard/notifications/pika/__init__.py b/storyboard/notifications/pika/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/storyboard/notifications/pika/conf.py b/storyboard/notifications/pika/conf.py new file mode 100644 index 00000000..f5c323d5 --- /dev/null +++ b/storyboard/notifications/pika/conf.py @@ -0,0 +1,56 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + +CONF = cfg.CONF + +NOTIFICATION_OPTS = [ + cfg.StrOpt("rabbit_exchange_name", default="storyboard", + help="The name of the topic exchange which storyboard will " + "use to broadcast its events.", + deprecated_group='notifications'), + cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events", + help="The name of the queue that will be created for " + "API events.", + deprecated_group='notifications'), + cfg.StrOpt("rabbit_application_name", default="storyboard", + help="The rabbit application identifier for storyboard's " + "connection.", + deprecated_group='notifications'), + cfg.StrOpt("rabbit_host", default="localhost", + help="Host of the rabbitmq server.", + deprecated_group='notifications'), + cfg.StrOpt("rabbit_login_method", default="AMQPLAIN", + help="The RabbitMQ login method.", + deprecated_group='notifications'), + cfg.StrOpt("rabbit_userid", default="storyboard", + help="The RabbitMQ userid.", + deprecated_group='notifications'), + cfg.StrOpt("rabbit_password", default="storyboard", + help="The RabbitMQ password.", + deprecated_group='notifications'), + cfg.IntOpt("rabbit_port", default=5672, + help="The RabbitMQ broker port where a single node is used."), + cfg.StrOpt("rabbit_virtual_host", default="/", + help="The virtual host within which our queues and exchanges " + "live.", deprecated_group='notifications'), + cfg.IntOpt("rabbit_connection_attempts", default=6, + help="The number of connection attempts before giving-up", + deprecated_group='notifications'), + cfg.IntOpt("rabbit_retry_delay", default=10, + help="The interval between connection attempts (in seconds)", + deprecated_group='notifications') +] diff --git a/storyboard/notifications/connection_service.py b/storyboard/notifications/pika/connection_service.py similarity index 100% rename from storyboard/notifications/connection_service.py rename to storyboard/notifications/pika/connection_service.py diff --git a/storyboard/notifications/pika/publisher.py b/storyboard/notifications/pika/publisher.py new file mode 100644 index 00000000..11b882fd --- /dev/null +++ b/storyboard/notifications/pika/publisher.py @@ -0,0 +1,181 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from oslo_config import cfg +from oslo_log import log +from pika.exceptions import ConnectionClosed + +from storyboard.notifications.pika.conf import NOTIFICATION_OPTS +from storyboard.notifications.pika.connection_service import ConnectionService +from storyboard._i18n import _, _LW, _LE + + +CONF = cfg.CONF +LOG = log.getLogger(__name__) +PUBLISHER = None + + +class Publisher(ConnectionService): + """A generic message publisher that uses delivery confirmation to ensure + that messages are delivered, and will keep a running cache of unsent + messages while the publisher is attempting to reconnect. + """ + + def __init__(self, conf): + """Setup the publisher instance based on our configuration. + + :param conf A configuration object. + """ + super(Publisher, self).__init__(conf) + + self._pending = list() + + self.add_open_hook(self._publish_pending) + + def _publish_pending(self): + """Publishes any pending messages that were broadcast while the + publisher was connecting. + """ + + # Shallow copy, so we can iterate over it without having it be modified + # out of band. + pending = list(self._pending) + + for payload in pending: + self._publish(payload) + + def _publish(self, payload): + """Publishes a payload to the passed exchange. If it encounters a + failure, will store the payload for later. + + :param Payload payload: The payload to send. + """ + LOG.debug(_("Sending message to %(name)s [%(topic)s]") % + {'name': self._exchange_name, 'topic': payload.topic}) + + # First check, are we closing? + if self._closing: + LOG.warning(_LW("Cannot send message, publisher is closing.")) + if payload not in self._pending: + self._pending.append(payload) + return + + # Second check, are we open? + if not self._open: + LOG.debug(_("Cannot send message, publisher is connecting.")) + if payload not in self._pending: + self._pending.append(payload) + self._reconnect() + return + + # Third check, are we in a sane state? This should never happen, + # but just in case... + if not self._connection or not self._channel: + LOG.error(_LE("Cannot send message, publisher is " + "an unexpected state.")) + if payload not in self._pending: + self._pending.append(payload) + self._reconnect() + return + + # Try to send a message. If we fail, schedule a reconnect and store + # the message. + try: + self._channel.basic_publish(self._exchange_name, + payload.topic, + json.dumps(payload.payload, + ensure_ascii=False), + self._properties) + if payload in self._pending: + self._pending.remove(payload) + return True + except (ConnectionClosed, AttributeError) as cc: + LOG.warning(_LW("Attempted to send message on closed connection.")) + LOG.debug(cc) + self._open = False + if payload not in self._pending: + self._pending.append(payload) + self._reconnect() + return False + + def publish_message(self, topic, payload): + """Publishes a message to RabbitMQ. + """ + self._publish(Payload(topic, payload)) + + +class Payload(object): + def __init__(self, topic, payload): + """Setup the example publisher object, passing in the URL we will use + to connect to RabbitMQ. + + :param topic string The exchange topic to broadcast on. + :param payload string The message payload to send. + """ + + self.topic = topic + self.payload = payload + + +def publish(resource, author_id=None, method=None, url=None, path=None, + query_string=None, status=None, resource_id=None, + sub_resource=None, sub_resource_id=None, resource_before=None, + resource_after=None): + """Send a message for an API event to the storyboard exchange. + + The message will be automatically JSON encoded. + + :param resource: The extrapolated resource type (project, story, etc). + :param author_id: The ID of the author who performed this action. + :param method: The HTTP Method used. + :param url: The Referer header from the request. + :param path: The HTTP Path used. + :param query_string: The HTTP query string used. + :param status: The HTTP Status code of the response. + :param resource_id: The ID of the resource. + :param sub_resource: The extracted subresource (user_token, etc) + :param sub_resource_id: THe ID of the subresource. + :param resource_before: The resource state before this event occurred. + :param resource_after: The resource state after this event occurred. + """ + global PUBLISHER + + if not PUBLISHER: + CONF.register_opts(NOTIFICATION_OPTS, "pika-notifications") + PUBLISHER = Publisher(CONF.pika_notifications) + PUBLISHER.start() + + payload = { + "author_id": author_id, + "method": method, + "url": url, + "path": path, + "query_string": query_string, + "status": status, + "resource": resource, + "resource_id": resource_id, + "sub_resource": sub_resource, + "sub_resource_id": sub_resource_id, + "resource_before": resource_before, + "resource_after": resource_after + } + + if resource: + PUBLISHER.publish_message(resource, payload) + else: + LOG.warning("Attempted to send payload with no destination " + "resource.") diff --git a/storyboard/notifications/subscriber.py b/storyboard/notifications/pika/subscriber.py similarity index 97% rename from storyboard/notifications/subscriber.py rename to storyboard/notifications/pika/subscriber.py index 236cf6d6..73b7dcd4 100644 --- a/storyboard/notifications/subscriber.py +++ b/storyboard/notifications/pika/subscriber.py @@ -21,8 +21,8 @@ from oslo_log import log from pika.exceptions import ConnectionClosed from stevedore import enabled -from storyboard.notifications.conf import NOTIFICATION_OPTS -from storyboard.notifications.connection_service import ConnectionService +from storyboard.notifications.pika.conf import NOTIFICATION_OPTS +from storyboard.notifications.pika.connection_service import ConnectionService from storyboard._i18n import _, _LW diff --git a/storyboard/notifications/publisher.py b/storyboard/notifications/publisher.py index 3425497c..23539dcc 100644 --- a/storyboard/notifications/publisher.py +++ b/storyboard/notifications/publisher.py @@ -1,4 +1,4 @@ -# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# Copyright (c) 2018 IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,167 +13,26 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json - from oslo_config import cfg -from oslo_log import log -from pika.exceptions import ConnectionClosed - -from storyboard.notifications.conf import NOTIFICATION_OPTS -from storyboard.notifications.connection_service import ConnectionService -from storyboard._i18n import _, _LW, _LE +from oslo_utils import importutils +from storyboard.notifications import conf CONF = cfg.CONF -LOG = log.getLogger(__name__) -PUBLISHER = None - - -class Publisher(ConnectionService): - """A generic message publisher that uses delivery confirmation to ensure - that messages are delivered, and will keep a running cache of unsent - messages while the publisher is attempting to reconnect. - """ - - def __init__(self, conf): - """Setup the publisher instance based on our configuration. - - :param conf A configuration object. - """ - super(Publisher, self).__init__(conf) - - self._pending = list() - - self.add_open_hook(self._publish_pending) - - def _publish_pending(self): - """Publishes any pending messages that were broadcast while the - publisher was connecting. - """ - - # Shallow copy, so we can iterate over it without having it be modified - # out of band. - pending = list(self._pending) - - for payload in pending: - self._publish(payload) - - def _publish(self, payload): - """Publishes a payload to the passed exchange. If it encounters a - failure, will store the payload for later. - - :param Payload payload: The payload to send. - """ - LOG.debug(_("Sending message to %(name)s [%(topic)s]") % - {'name': self._exchange_name, 'topic': payload.topic}) - - # First check, are we closing? - if self._closing: - LOG.warning(_LW("Cannot send message, publisher is closing.")) - if payload not in self._pending: - self._pending.append(payload) - return - - # Second check, are we open? - if not self._open: - LOG.debug(_("Cannot send message, publisher is connecting.")) - if payload not in self._pending: - self._pending.append(payload) - self._reconnect() - return - - # Third check, are we in a sane state? This should never happen, - # but just in case... - if not self._connection or not self._channel: - LOG.error(_LE("Cannot send message, publisher is " - "an unexpected state.")) - if payload not in self._pending: - self._pending.append(payload) - self._reconnect() - return - - # Try to send a message. If we fail, schedule a reconnect and store - # the message. - try: - self._channel.basic_publish(self._exchange_name, - payload.topic, - json.dumps(payload.payload, - ensure_ascii=False), - self._properties) - if payload in self._pending: - self._pending.remove(payload) - return True - except (ConnectionClosed, AttributeError) as cc: - LOG.warning(_LW("Attempted to send message on closed connection.")) - LOG.debug(cc) - self._open = False - if payload not in self._pending: - self._pending.append(payload) - self._reconnect() - return False - - def publish_message(self, topic, payload): - """Publishes a message to RabbitMQ. - """ - self._publish(Payload(topic, payload)) - - -class Payload(object): - def __init__(self, topic, payload): - """Setup the example publisher object, passing in the URL we will use - to connect to RabbitMQ. - - :param topic string The exchange topic to broadcast on. - :param payload string The message payload to send. - """ - - self.topic = topic - self.payload = payload +CONF.register_opts(conf.OPTS, 'notifications') def publish(resource, author_id=None, method=None, url=None, path=None, query_string=None, status=None, resource_id=None, sub_resource=None, sub_resource_id=None, resource_before=None, resource_after=None): - """Send a message for an API event to the storyboard exchange. The message - will be automatically JSON encoded. - :param resource: The extrapolated resource type (project, story, etc). - :param author_id: The ID of the author who performed this action. - :param method: The HTTP Method used. - :param url: The Referer header from the request. - :param path: The HTTP Path used. - :param query_string: The HTTP query string used. - :param status: The HTTP Status code of the response. - :param resource_id: The ID of the resource. - :param sub_resource: The extracted subresource (user_token, etc) - :param sub_resource_id: THe ID of the subresource. - :param resource_before: The resource state before this event occurred. - :param resource_after: The resource state after this event occurred. - """ - global PUBLISHER - - if not PUBLISHER: - CONF.register_opts(NOTIFICATION_OPTS, "notifications") - PUBLISHER = Publisher(CONF.notifications) - PUBLISHER.start() - - payload = { - "author_id": author_id, - "method": method, - "url": url, - "path": path, - "query_string": query_string, - "status": status, - "resource": resource, - "resource_id": resource_id, - "sub_resource": sub_resource, - "sub_resource_id": sub_resource_id, - "resource_before": resource_before, - "resource_after": resource_after - } - - if resource: - PUBLISHER.publish_message(resource, payload) - else: - LOG.warning("Attempted to send payload with no destination resource.") + publisher_module = importutils.import_module( + 'storyboard.notifications.' + CONF.notifications.driver + '.publisher') + publisher_module.publish(resource, author_id=author_id, method=method, + url=url, path=path, query_string=query_string, + status=status, resource_id=resource_id, + sub_resource=sub_resource, + sub_resource_id=sub_resource_id, + resource_before=resource_before, + resource_after=resource_after) diff --git a/storyboard/plugin/event_worker.py b/storyboard/plugin/event_worker.py index 93572d2e..f71e00e8 100644 --- a/storyboard/plugin/event_worker.py +++ b/storyboard/plugin/event_worker.py @@ -22,7 +22,7 @@ from oslo_log import log import storyboard.db.api.base as db_api from storyboard.notifications.notification_hook import class_mappings -from storyboard.notifications.subscriber import subscribe +from storyboard.notifications.pika.subscriber import subscribe from storyboard._i18n import _LI, _LW from storyboard.plugin.base import PluginBase diff --git a/storyboard/tests/notifications/test_notification_hook.py b/storyboard/tests/notifications/test_notification_hook.py index 29314b62..6148a90b 100644 --- a/storyboard/tests/notifications/test_notification_hook.py +++ b/storyboard/tests/notifications/test_notification_hook.py @@ -221,7 +221,7 @@ class TestNotificationHook(base.BaseDbTestCase): self.assertEqual(mock_state.old_entity_values['priority'], sample_task_wmodel.priority) - @patch('storyboard.notifications.notification_hook.publish') + @patch('storyboard.notifications.notification_hook.publisher') @patch.object(NotificationHook, 'get_original_resource') def test_after_publishes_payload(self, mock_get_original_resource, mock_publish): @@ -261,7 +261,7 @@ class TestNotificationHook(base.BaseDbTestCase): mock_get_original_resource.return_value = smt_json n.after(mock_state) - mock_publish.assert_called_with( + mock_publish.publish.assert_called_with( author_id=mock_state.request.current_user_id, method=mock_state.request.method, url=mock_state.request.headers['Referer'],