notification listener: add allow_requeue param

In commit d8d2ad9 we added support for notification listener endpoint
methods to return REQUEUE, but if a driver does not support this we
raise NotImplementedError when the application attempts to requeue
a message.

This requeuing behaviour might only be used by an application in
unusual, exceptional circumstances and catch users by surprise.

Instead, let's require the application to assert that it needs this
feature in advance and raise NotImplementError at that point if the
driver doesn't support it.

Change-Id: Id0bb0e57d2dcc1ec7d752e98c9b1e8e48d99f35c
This commit is contained in:
Mark McLoughlin 2014-03-03 07:35:29 -08:00
parent d8d2ad95d7
commit 5bd31315c2
10 changed files with 42 additions and 31 deletions

View File

@ -67,6 +67,11 @@ class BaseDriver(object):
self._default_exchange = default_exchange
self._allowed_remote_exmods = allowed_remote_exmods
def require_features(self, requeue=False):
if requeue:
raise NotImplementedError('Message requeueing not supported by '
'this transport driver')
@abc.abstractmethod
def send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, envelope=False):

View File

@ -112,6 +112,9 @@ class FakeDriver(base.BaseDriver):
self._exchanges_lock = threading.Lock()
self._exchanges = {}
def require_features(self, requeue=True):
pass
@staticmethod
def _check_serialize(message):
"""Make sure a message intended for rpc can be serialized.

View File

@ -100,8 +100,7 @@ class QpidMessage(dict):
self._session.acknowledge(self._raw_message)
def requeue(self):
raise NotImplementedError('The QPID driver does not yet support '
'requeuing messages')
pass
class ConsumerBase(object):

View File

@ -744,3 +744,6 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
connection_pool,
default_exchange,
allowed_remote_exmods)
def require_features(self, requeue=True):
pass

View File

@ -844,8 +844,7 @@ class ZmqIncomingMessage(base.IncomingMessage):
self.condition.notify()
def requeue(self):
raise NotImplementedError('The ZeroMQ driver does not yet support '
'requeuing messages')
pass
class ZmqListener(base.Listener):

View File

@ -44,10 +44,11 @@ class NotificationDispatcher(object):
message to the endpoints
"""
def __init__(self, targets, endpoints, serializer):
def __init__(self, targets, endpoints, serializer, allow_requeue):
self.targets = targets
self.endpoints = endpoints
self.serializer = serializer or msg_serializer.NoOpSerializer()
self.allow_requeue = allow_requeue
self._callbacks_by_priority = {}
for endpoint, prio in itertools.product(endpoints, PRIORITIES):
@ -114,7 +115,7 @@ class NotificationDispatcher(object):
try:
ret = callback(ctxt, publisher_id, event_type, payload)
ret = NotificationResult.HANDLED if ret is None else ret
if ret != NotificationResult.HANDLED:
if self.allow_requeue and ret == NotificationResult.REQUEUE:
return ret
finally:
localcontext.clear_local_context()

View File

@ -73,26 +73,20 @@ priority
Parameters to endpoint methods are the request context supplied by the client,
the publisher_id of the notification message, the event_type, the payload.
An endpoint method can return explicitly messaging.NotificationResult.HANDLED
By supplying a serializer object, a listener can deserialize a request context
and arguments from - and serialize return values to - primitive types.
An endpoint method can explicitly return messaging.NotificationResult.HANDLED
to acknowledge a message or messaging.NotificationResult.REQUEUE to requeue the
message.
The message is acknowledge only if all endpoints return
messaging.NotificationResult.HANDLED
The message is acknowledged only if all endpoints either return
messaging.NotificationResult.HANDLED or None.
If nothing is returned by an endpoint, this is considered like
messaging.NotificationResult.HANDLED
messaging.NotificationResult values needs to be handled by drivers:
* HANDLED: supported by all drivers
* REQUEUE: supported by drivers: fake://, rabbit://
In case of an unsupported driver nothing is done to the message and a
NotImplementedError is raised and logged.
By supplying a serializer object, a listener can deserialize a request context
and arguments from - and serialize return values to - primitive types.
Note that not all transport drivers implement support for requeueing. In order
to use this feature, applications should assert that the feature is available
by passing allow_requeue=True to get_notification_listener(). If the driver
does not support requeueing, it will raise NotImplementedError at this point.
"""
from oslo.messaging.notify import dispatcher as notify_dispatcher
@ -100,7 +94,8 @@ from oslo.messaging import server as msg_server
def get_notification_listener(transport, targets, endpoints,
executor='blocking', serializer=None):
executor='blocking', serializer=None,
allow_requeue=False):
"""Construct a notification listener
The executor parameter controls how incoming messages will be received and
@ -117,7 +112,12 @@ def get_notification_listener(transport, targets, endpoints,
:type executor: str
:param serializer: an optional entity serializer
:type serializer: Serializer
:param allow_requeue: whether NotificationResult.REQUEUE support is needed
:type allow_requeue: bool
:raises: NotImplementedError
"""
transport._require_driver_features(requeue=allow_requeue)
dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints,
serializer)
serializer,
allow_requeue)
return msg_server.MessageHandlingServer(transport, dispatcher, executor)

View File

@ -78,6 +78,9 @@ class Transport(object):
self.conf = driver.conf
self._driver = driver
def _require_driver_features(self, requeue=False):
self._driver.require_features(requeue=requeue)
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send',

View File

@ -97,9 +97,8 @@ class TestDispatcher(test_utils.BaseTestCase):
msg['priority'] = self.priority
targets = [messaging.Target(topic='notifications')]
dispatcher = notify_dispatcher.NotificationDispatcher(targets,
endpoints,
None)
dispatcher = notify_dispatcher.NotificationDispatcher(
targets, endpoints, None, allow_requeue=True)
# check it listen on wanted topics
self.assertEqual(sorted(dispatcher._targets_priorities),
@ -138,9 +137,8 @@ class TestDispatcher(test_utils.BaseTestCase):
def test_dispatcher_unknown_prio(self, mylog):
msg = notification_msg.copy()
msg['priority'] = 'what???'
dispatcher = notify_dispatcher.NotificationDispatcher([mock.Mock()],
[mock.Mock()],
None)
dispatcher = notify_dispatcher.NotificationDispatcher(
[mock.Mock()], [mock.Mock()], None, allow_requeue=True)
with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback:
callback()
mylog.warning.assert_called_once_with('Unknown priority "what???"')

View File

@ -35,7 +35,7 @@ class ListenerSetupMixin(object):
self._expect_messages = expect_messages
self._received_msgs = 0
self._listener = messaging.get_notification_listener(
transport, targets, endpoints + [self])
transport, targets, endpoints + [self], allow_requeue=True)
def info(self, ctxt, publisher_id, event_type, payload):
self._received_msgs += 1