From 9f58e2c3fe371e851a17dd3b4b172439a4608848 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Mon, 2 Dec 2013 09:27:29 +0100 Subject: [PATCH] Implements notification listener and dispatcher This patch allows to quickly create a listener to receive notification messages. Example of the api: class Endpoint(object): def warn(self, ctxt, publisher_id, event_type, payload): do_something(payload) target = messaging.Target(topic='notifications', exchange='cinder') listener = notify.get_notification_listener(transport, [target], [Endpoint()], executor, serializer) Implements blueprint notification-subscriber-server Change-Id: I434bc487c382a2048670df726d9bebd640150bb9 --- doc/source/index.rst | 1 + doc/source/notification_listener.rst | 14 +++ oslo/messaging/_drivers/amqpdriver.py | 10 ++ oslo/messaging/_drivers/base.py | 6 + oslo/messaging/_drivers/impl_fake.py | 10 ++ oslo/messaging/_drivers/impl_zmq.py | 14 +++ oslo/messaging/notify/__init__.py | 4 +- oslo/messaging/notify/dispatcher.py | 83 ++++++++++++ oslo/messaging/notify/listener.py | 105 ++++++++++++++++ oslo/messaging/transport.py | 8 ++ tests/test_notify_dispatcher.py | 98 +++++++++++++++ tests/test_notify_listener.py | 173 ++++++++++++++++++++++++++ tests/test_rabbit.py | 5 + 13 files changed, 530 insertions(+), 1 deletion(-) create mode 100644 doc/source/notification_listener.rst create mode 100644 oslo/messaging/notify/dispatcher.py create mode 100644 oslo/messaging/notify/listener.py create mode 100644 tests/test_notify_dispatcher.py create mode 100644 tests/test_notify_listener.py diff --git a/doc/source/index.rst b/doc/source/index.rst index c4ff4301c..19156babe 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -15,6 +15,7 @@ Contents server rpcclient notifier + notification_listener serializer exceptions opts diff --git a/doc/source/notification_listener.rst b/doc/source/notification_listener.rst new file mode 100644 index 000000000..4fa06617d --- /dev/null +++ b/doc/source/notification_listener.rst @@ -0,0 +1,14 @@ +--------------------- +Notification Listener +--------------------- + +.. automodule:: oslo.messaging.notify.listener + +.. currentmodule:: oslo.messaging + +.. autofunction:: get_notification_listener + +.. autoclass:: MessageHandlingServer + :members: + +.. autofunction:: get_local_context diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index 9935690d5..8e9813207 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -404,6 +404,16 @@ class AMQPDriverBase(base.BaseDriver): return listener + def listen_for_notifications(self, targets_and_priorities): + conn = self._get_connection(pooled=False) + + listener = AMQPListener(self, conn) + for target, priority in targets_and_priorities: + conn.declare_topic_consumer('%s.%s' % (target.topic, priority), + callback=listener, + exchange_name=target.exchange) + return listener + def cleanup(self): if self._connection_pool: self._connection_pool.empty() diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py index b34997f4a..9803e1935 100644 --- a/oslo/messaging/_drivers/base.py +++ b/oslo/messaging/_drivers/base.py @@ -73,6 +73,12 @@ class BaseDriver(object): def listen(self, target): """Construct a Listener for the given target.""" + @abc.abstractmethod + def listen_for_notifications(self, targets_and_priorities): + """Construct a notification Listener for the given list of + tuple of (target, priority). + """ + @abc.abstractmethod def cleanup(self): """Release all resources.""" diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index 06943f6a2..b43d8ee9d 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -161,5 +161,15 @@ class FakeDriver(base.BaseDriver): messaging.Target(topic=target.topic)]) return listener + def listen_for_notifications(self, targets_and_priorities): + # TODO(sileht): Handle the target.exchange + exchange = self._get_exchange(self._default_exchange) + + targets = [messaging.Target(topic='%s.%s' % (target.topic, priority)) + for target, priority in targets_and_priorities] + listener = FakeListener(self, exchange, targets) + + return listener + def cleanup(self): pass diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py index 0e65f9e66..a3c53c42b 100644 --- a/oslo/messaging/_drivers/impl_zmq.py +++ b/oslo/messaging/_drivers/impl_zmq.py @@ -959,5 +959,19 @@ class ZmqDriver(base.BaseDriver): return listener + def listen_for_notifications(self, targets_and_priorities): + conn = create_connection(self.conf) + + listener = ZmqListener(self, None) + for target, priority in targets_and_priorities: + # NOTE(ewindisch): dot-priority in rpc notifier does not + # work with our assumptions. + # NOTE(sileht): create_consumer doesn't support target.exchange + conn.create_consumer('%s-%s' % (target.topic, priority), + listener) + conn.consume_in_thread() + + return listener + def cleanup(self): cleanup() diff --git a/oslo/messaging/notify/__init__.py b/oslo/messaging/notify/__init__.py index b368e337a..4b87d72c3 100644 --- a/oslo/messaging/notify/__init__.py +++ b/oslo/messaging/notify/__init__.py @@ -14,7 +14,9 @@ # under the License. __all__ = ['Notifier', - 'LoggingNotificationHandler'] + 'LoggingNotificationHandler', + 'get_notification_listener'] from .notifier import * +from .listener import * from .logger import * diff --git a/oslo/messaging/notify/dispatcher.py b/oslo/messaging/notify/dispatcher.py new file mode 100644 index 000000000..f36c3925f --- /dev/null +++ b/oslo/messaging/notify/dispatcher.py @@ -0,0 +1,83 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# Copyright 2013 eNovance +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import itertools +import logging + +from oslo.messaging import localcontext +from oslo.messaging import serializer as msg_serializer + + +LOG = logging.getLogger(__name__) + +PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample'] + + +class NotificationDispatcher(object): + """A message dispatcher which understands Notification messages. + + A MessageHandlingServer is constructed by passing a callable dispatcher + which is invoked with context and message dictionaries each time a message + is received. + + NotifcationDispatcher is one such dispatcher which pass a raw notification + message to the endpoints + """ + + def __init__(self, targets, endpoints, serializer): + self.targets = targets + self.endpoints = endpoints + self.serializer = serializer or msg_serializer.NoOpSerializer() + + self._callbacks_by_priority = {} + for endpoint, prio in itertools.product(endpoints, PRIORITIES): + if hasattr(endpoint, prio): + method = getattr(endpoint, prio) + self._callbacks_by_priority.setdefault(prio, []).append(method) + + priorities = self._callbacks_by_priority.keys() + self._targets_priorities = set(itertools.product(self.targets, + priorities)) + + def _listen(self, transport): + return transport._listen_for_notifications(self._targets_priorities) + + def __call__(self, ctxt, message): + """Dispatch an RPC message to the appropriate endpoint method. + + :param ctxt: the request context + :type ctxt: dict + :param message: the message payload + :type message: dict + """ + ctxt = self.serializer.deserialize_context(ctxt) + + publisher_id = message.get('publisher_id') + event_type = message.get('event_type') + priority = message.get('priority', '').lower() + if priority not in PRIORITIES: + LOG.warning('Unknown priority "%s"' % priority) + return + + payload = self.serializer.deserialize_entity(ctxt, + message.get('payload')) + + for callback in self._callbacks_by_priority.get(priority, []): + localcontext.set_local_context(ctxt) + try: + callback(ctxt, publisher_id, event_type, payload) + finally: + localcontext.clear_local_context() diff --git a/oslo/messaging/notify/listener.py b/oslo/messaging/notify/listener.py new file mode 100644 index 000000000..f7384c148 --- /dev/null +++ b/oslo/messaging/notify/listener.py @@ -0,0 +1,105 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# Copyright 2013 eNovance +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +A notification listener exposes a number of endpoints, each of which +contain a set of methods. Each method corresponds to a notification priority. + +To create a notification listener, you supply a transport, list of targets and +a list of endpoints. + +A transport can be obtained simply by calling the get_transport() method:: + + transport = messaging.get_transport(conf) + +which will load the appropriate transport driver according to the user's +messaging configuration configuration. See get_transport() for more details. + +The target supplied when creating a notification listener expresses the topic +and - optionally - the exchange to listen on. See Target for more details +on these attributes. + +Notification listener have start(), stop() and wait() messages to begin +handling requests, stop handling requests and wait for all in-process +requests to complete. + +Each notification listener is associated with an executor which integrates the +listener with a specific I/O handling framework. Currently, there are blocking +and eventlet executors available. + +A simple example of a notification listener with multiple endpoints might be:: + + from oslo.config import cfg + from oslo import messaging + + class NotificationEndpoint(object): + def warn(self, ctxt, publisher_id, event_type, payload): + do_something(payload) + + class ErrorEndpoint(object): + def error(self, ctxt, publisher_id, event_type, payload): + do_something(payload) + + transport = messaging.get_transport(cfg.CONF) + targets = [ + messaging.Target(topic='notifications') + messaging.Target(topic='notifications_bis') + ] + endpoints = [ + NotificationEndpoint(), + ErrorEndpoint(), + ] + server = messaging.get_notification_listener(transport, targets, endpoints) + server.start() + server.wait() + +A notifier sends a notification on a topic with a priority, the notification +listener will receive this notification if the topic of this one have been set +in one of the targets and if an endpoint implements the method named like the +priority + +Parameters to endpoint methods are the request context supplied by the client, +the publisher_id of the notification message, the event_type, the payload. + +By supplying a serializer object, a listener can deserialize a request context +and arguments from - and serialize return values to - primitive types. +""" + +from oslo.messaging.notify import dispatcher as notify_dispatcher +from oslo.messaging import server as msg_server + + +def get_notification_listener(transport, targets, endpoints, + executor='blocking', serializer=None): + """Construct a notification listener + + The executor parameter controls how incoming messages will be received and + dispatched. By default, the most simple executor is used - the blocking + executor. + + :param transport: the messaging transport + :type transport: Transport + :param targets: the exchanges and topics to listen on + :type targets: list of Target + :param endpoints: a list of endpoint objects + :type endpoints: list + :param executor: name of a message executor - e.g. 'eventlet', 'blocking' + :type executor: str + :param serializer: an optional entity serializer + :type serializer: Serializer + """ + dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints, + serializer) + return msg_server.MessageHandlingServer(transport, dispatcher, executor) diff --git a/oslo/messaging/transport.py b/oslo/messaging/transport.py index 9e8e9d78c..7c8a3be97 100644 --- a/oslo/messaging/transport.py +++ b/oslo/messaging/transport.py @@ -99,6 +99,14 @@ class Transport(object): target) return self._driver.listen(target) + def _listen_for_notifications(self, targets_and_priorities): + for target, priority in targets_and_priorities: + if not target.topic: + raise exceptions.InvalidTarget('A target must have ' + 'topic specified', + target) + return self._driver.listen_for_notifications(targets_and_priorities) + def cleanup(self): """Release all resources associated with this transport.""" self._driver.cleanup() diff --git a/tests/test_notify_dispatcher.py b/tests/test_notify_dispatcher.py new file mode 100644 index 000000000..e1c1f9abb --- /dev/null +++ b/tests/test_notify_dispatcher.py @@ -0,0 +1,98 @@ + +# Copyright 2013 eNovance +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import itertools + +import mock +import testscenarios + +from oslo import messaging +from oslo.messaging.notify import dispatcher as notify_dispatcher +from oslo.messaging.openstack.common import timeutils +from tests import utils as test_utils + +load_tests = testscenarios.load_tests_apply_scenarios + + +notification_msg = dict( + publisher_id="publisher_id", + event_type="compute.start", + payload={"info": "fuu"}, + message_id="uuid", + timestamp=str(timeutils.utcnow()) +) + + +class TestDispatcher(test_utils.BaseTestCase): + + scenarios = [ + ('no_endpoints', + dict(endpoints=[], + endpoints_expect_calls=[], + priority='info')), + ('one_endpoints', + dict(endpoints=[['warn']], + endpoints_expect_calls=['warn'], + priority='warn')), + ('two_endpoints_only_one_match', + dict(endpoints=[['warn'], ['info']], + endpoints_expect_calls=[None, 'info'], + priority='info')), + ('two_endpoints_both_match', + dict(endpoints=[['debug', 'info'], ['info', 'debug']], + endpoints_expect_calls=['debug', 'debug'], + priority='debug')), + ] + + def test_dispatcher(self): + endpoints = [mock.Mock(spec=endpoint_methods) + for endpoint_methods in self.endpoints] + msg = notification_msg.copy() + msg['priority'] = self.priority + + targets = [messaging.Target(topic='notifications')] + dispatcher = notify_dispatcher.NotificationDispatcher(targets, + endpoints, + None) + + # check it listen on wanted topics + self.assertEqual(sorted(dispatcher._targets_priorities), + sorted(set((targets[0], prio) + for prio in itertools.chain.from_iterable( + self.endpoints)))) + + dispatcher({}, msg) + + # check endpoint callbacks are called or not + for i, endpoint_methods in enumerate(self.endpoints): + for m in endpoint_methods: + if m == self.endpoints_expect_calls[i]: + method = getattr(endpoints[i], m) + expected = [mock.call({}, msg['publisher_id'], + msg['event_type'], + msg['payload'])] + self.assertEqual(method.call_args_list, expected) + else: + self.assertEqual(endpoints[i].call_count, 0) + + @mock.patch('oslo.messaging.notify.dispatcher.LOG') + def test_dispatcher_unknown_prio(self, mylog): + msg = notification_msg.copy() + msg['priority'] = 'what???' + dispatcher = notify_dispatcher.NotificationDispatcher([mock.Mock()], + [mock.Mock()], + None) + dispatcher({}, msg) + mylog.warning.assert_called_once_with('Unknown priority "what???"') diff --git a/tests/test_notify_listener.py b/tests/test_notify_listener.py new file mode 100644 index 000000000..7bb30b41b --- /dev/null +++ b/tests/test_notify_listener.py @@ -0,0 +1,173 @@ + +# Copyright 2013 eNovance +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading + +import mock +from oslo.config import cfg +import testscenarios + +from oslo import messaging +from oslo.messaging.notify import dispatcher +from tests import utils as test_utils + +load_tests = testscenarios.load_tests_apply_scenarios + + +class ListenerSetupMixin(object): + + class Listener(object): + def __init__(self, transport, topics, endpoints, expect_messages): + targets = [messaging.Target(topic=topic) + for topic in topics] + self._expect_messages = expect_messages + self._received_msgs = 0 + self._listener = messaging.get_notification_listener( + transport, targets, endpoints + [self]) + + def info(self, ctxt, publisher_id, event_type, payload): + self._received_msgs += 1 + if self._expect_messages == self._received_msgs: + # Check start() does nothing with a running listener + self._listener.start() + self._listener.stop() + self._listener.wait() + + def start(self): + self._listener.start() + + def _setup_listener(self, transport, endpoints, expect_messages, + topics=None): + listener = self.Listener(transport, + topics=topics or ['testtopic'], + expect_messages=expect_messages, + endpoints=endpoints) + + thread = threading.Thread(target=listener.start) + thread.daemon = True + thread.start() + return thread + + def _stop_listener(self, thread): + thread.join(timeout=5) + + def _setup_notifier(self, transport, topic='testtopic', + publisher_id='testpublisher'): + return messaging.Notifier(transport, topic=topic, + driver='messaging', + publisher_id=publisher_id) + + +class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): + + def __init__(self, *args): + super(TestNotifyListener, self).__init__(*args) + ListenerSetupMixin.__init__(self) + + def setUp(self): + super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts()) + + def test_constructor(self): + transport = messaging.get_transport(self.conf, url='fake:') + target = messaging.Target(topic='foo') + endpoints = [object()] + + listener = messaging.get_notification_listener(transport, [target], + endpoints) + + self.assertIs(listener.conf, self.conf) + self.assertIs(listener.transport, transport) + self.assertIsInstance(listener.dispatcher, + dispatcher.NotificationDispatcher) + self.assertIs(listener.dispatcher.endpoints, endpoints) + self.assertIs(listener.executor, 'blocking') + + def test_no_target_topic(self): + transport = messaging.get_transport(self.conf, url='fake:') + + listener = messaging.get_notification_listener(transport, + [messaging.Target()], + [mock.Mock()]) + try: + listener.start() + except Exception as ex: + self.assertIsInstance(ex, messaging.InvalidTarget, ex) + else: + self.assertTrue(False) + + def test_unknown_executor(self): + transport = messaging.get_transport(self.conf, url='fake:') + + try: + messaging.get_notification_listener(transport, [], [], + executor='foo') + except Exception as ex: + self.assertIsInstance(ex, messaging.ExecutorLoadFailure) + self.assertEqual(ex.executor, 'foo') + else: + self.assertTrue(False) + + def test_one_topic(self): + transport = messaging.get_transport(self.conf, url='fake:') + + endpoint = mock.Mock() + endpoint.info = mock.Mock() + listener_thread = self._setup_listener(transport, [endpoint], 1) + + notifier = self._setup_notifier(transport) + notifier.info({}, 'an_event.start', 'test message') + + self._stop_listener(listener_thread) + + endpoint.info.assert_called_once_with( + {}, 'testpublisher', 'an_event.start', 'test message') + + def test_two_topics(self): + transport = messaging.get_transport(self.conf, url='fake:') + + endpoint = mock.Mock() + endpoint.info = mock.Mock() + topics = ["topic1", "topic2"] + listener_thread = self._setup_listener(transport, [endpoint], 2, + topics=topics) + notifier = self._setup_notifier(transport, topic='topic1') + notifier.info({}, 'an_event.start1', 'test') + notifier = self._setup_notifier(transport, topic='topic2') + notifier.info({}, 'an_event.start2', 'test') + + self._stop_listener(listener_thread) + + expected = [mock.call({}, 'testpublisher', 'an_event.start1', 'test'), + mock.call({}, 'testpublisher', 'an_event.start2', 'test')] + self.assertEqual(sorted(endpoint.info.call_args_list), expected) + + def test_two_endpoints(self): + transport = messaging.get_transport(self.conf, url='fake:') + + endpoint1 = mock.Mock() + endpoint1.info = mock.Mock() + endpoint2 = mock.Mock() + endpoint2.info = mock.Mock() + listener_thread = self._setup_listener(transport, + [endpoint1, endpoint2], 1) + notifier = self._setup_notifier(transport) + notifier.info({}, 'an_event.start', 'test') + + self._stop_listener(listener_thread) + + endpoint1.info.assert_called_once_with( + {}, 'testpublisher', 'an_event.start', 'test') + endpoint2.info.assert_called_once_with( + {}, 'testpublisher', 'an_event.start', 'test') diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py index 4e1273cb6..8c0874191 100644 --- a/tests/test_rabbit.py +++ b/tests/test_rabbit.py @@ -108,6 +108,11 @@ class TestRabbitTransportURL(test_utils.BaseTestCase): self._driver.listen(self._target) self.assertEqual(self._server_params[0], self.expected) + def test_transport_url_listen_for_notification(self): + self._driver.listen_for_notifications( + [(messaging.Target(topic='topic'), 'info')]) + self.assertEqual(self._server_params[0], self.expected) + def test_transport_url_send(self): self._driver.send(self._target, {}, {}) self.assertEqual(self._server_params[0], self.expected)