Allow to requeue the notification message

This patch allow to requeue the notification received by the
notification listener.

Partial implements blueprint notification-subscriber-server

Change-Id: I49c4ba91224c280e479edb19289ccb337a2ab843
This commit is contained in:
Mehdi Abaakouk 2013-12-11 16:50:09 +01:00 committed by Mehdi Abaakouk
parent 35f6d588a3
commit d8d2ad95d7
13 changed files with 178 additions and 25 deletions

View File

@ -318,12 +318,16 @@ class _MsgIdCache(object):
"""AMQP consumers may read same message twice when exceptions occur
before ack is returned. This method prevents doing it.
"""
if UNIQUE_ID in message_data:
msg_id = message_data.get(UNIQUE_ID)
if msg_id in self.prev_msgids:
raise rpc_common.DuplicateMessageError(msg_id=msg_id)
def add(self, message_data):
if UNIQUE_ID in message_data:
msg_id = message_data.pop(UNIQUE_ID)
if msg_id not in self.prev_msgids:
self.prev_msgids.append(msg_id)
else:
raise rpc_common.DuplicateMessageError(msg_id=msg_id)
def _add_unique_id(msg):

View File

@ -37,7 +37,8 @@ class AMQPIncomingMessage(base.IncomingMessage):
self.msg_id = msg_id
self.reply_q = reply_q
self.acknowledge = message.acknowledge
self.acknowledge_callback = message.acknowledge
self.requeue_callback = message.requeue
def _send_reply(self, conn, reply=None, failure=None,
ending=False, log_failure=True):
@ -65,6 +66,19 @@ class AMQPIncomingMessage(base.IncomingMessage):
self._send_reply(conn, reply, failure, log_failure=log_failure)
self._send_reply(conn, ending=True)
def acknowledge(self):
self.listener.msg_id_cache.add(self.message)
self.acknowledge_callback()
def requeue(self):
# NOTE(sileht): In case of the connection is lost between receiving the
# message and requeing it, this requeue call fail
# but because the message is not acknowledged and not added to the
# msg_id_cache, the message will be reconsumed, the only difference is
# the message stay at the beginning of the queue instead of moving to
# the end.
self.requeue_callback()
class AMQPListener(base.Listener):

View File

@ -40,6 +40,10 @@ class IncomingMessage(object):
def acknowledge(self):
"Acknowledge the message."
@abc.abstractmethod
def requeue(self):
"Requeue the message."
@six.add_metaclass(abc.ABCMeta)
class Listener(object):

View File

@ -26,8 +26,9 @@ from oslo.messaging._drivers import base
class FakeIncomingMessage(base.IncomingMessage):
def __init__(self, listener, ctxt, message, reply_q):
def __init__(self, listener, ctxt, message, reply_q, requeue):
super(FakeIncomingMessage, self).__init__(listener, ctxt, message)
self.requeue_callback = requeue
self._reply_q = reply_q
def reply(self, reply=None, failure=None, log_failure=True):
@ -35,6 +36,9 @@ class FakeIncomingMessage(base.IncomingMessage):
failure = failure[1] if failure else None
self._reply_q.put((reply, failure))
def requeue(self):
self.requeue_callback()
class FakeListener(base.Listener):
@ -46,10 +50,11 @@ class FakeListener(base.Listener):
def poll(self):
while True:
for target in self._targets:
(ctxt, message, reply_q) = self._exchange.poll(target)
(ctxt, message, reply_q, requeue) = \
self._exchange.poll(target)
if message is not None:
message = FakeIncomingMessage(self, ctxt, message, reply_q)
message.acknowledge()
message = FakeIncomingMessage(self, ctxt, message,
reply_q, requeue)
return message
time.sleep(.05)
@ -58,7 +63,7 @@ class FakeExchange(object):
def __init__(self, name):
self.name = name
self._queues_lock = threading.Lock()
self._queues_lock = threading.RLock()
self._topic_queues = {}
self._server_queues = {}
@ -78,8 +83,13 @@ class FakeExchange(object):
queues = [self._get_server_queue(topic, server)]
else:
queues = [self._get_topic_queue(topic)]
def requeue():
self.deliver_message(topic, ctxt, message, server=server,
fanout=fanout, reply_q=reply_q)
for queue in queues:
queue.append((ctxt, message, reply_q))
queue.append((ctxt, message, reply_q, requeue))
def poll(self, target):
with self._queues_lock:
@ -87,7 +97,7 @@ class FakeExchange(object):
queue = self._get_server_queue(target.topic, target.server)
else:
queue = self._get_topic_queue(target.topic)
return queue.pop(0) if queue else (None, None, None)
return queue.pop(0) if queue else (None, None, None, None)
class FakeDriver(base.BaseDriver):

View File

@ -99,6 +99,10 @@ class QpidMessage(dict):
def acknowledge(self):
self._session.acknowledge(self._raw_message)
def requeue(self):
raise NotImplementedError('The QPID driver does not yet support '
'requeuing messages')
class ConsumerBase(object):
"""Consumer base class."""

View File

@ -128,6 +128,9 @@ class RabbitMessage(dict):
def acknowledge(self):
self._raw_message.ack()
def requeue(self):
self._raw_message.requeue()
class ConsumerBase(object):
"""Consumer base class."""

View File

@ -843,6 +843,10 @@ class ZmqIncomingMessage(base.IncomingMessage):
with self.condition:
self.condition.notify()
def requeue(self):
raise NotImplementedError('The ZeroMQ driver does not yet support '
'requeuing messages')
class ZmqListener(base.Listener):
@ -960,6 +964,8 @@ class ZmqDriver(base.BaseDriver):
return listener
def listen_for_notifications(self, targets_and_priorities):
# NOTE(sileht): this listener implementation is limited
# because zeromq doesn't support requeing message
conn = create_connection(self.conf)
listener = ZmqListener(self, None)

View File

@ -15,8 +15,10 @@
__all__ = ['Notifier',
'LoggingNotificationHandler',
'get_notification_listener']
'get_notification_listener',
'NotificationResult']
from .notifier import *
from .listener import *
from .logger import *
from .dispatcher import NotificationResult

View File

@ -28,6 +28,11 @@ LOG = logging.getLogger(__name__)
PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample']
class NotificationResult(object):
HANDLED = 'handled'
REQUEUE = 'requeue'
class NotificationDispatcher(object):
"""A message dispatcher which understands Notification messages.
@ -59,8 +64,15 @@ class NotificationDispatcher(object):
@contextlib.contextmanager
def __call__(self, incoming):
yield lambda: self._dispatch_and_handle_error(incoming)
incoming.acknowledge()
result_wrapper = []
yield lambda: result_wrapper.append(
self._dispatch_and_handle_error(incoming))
if result_wrapper[0] == NotificationResult.HANDLED:
incoming.acknowledge()
else:
incoming.requeue()
def _dispatch_and_handle_error(self, incoming):
"""Dispatch a notification message to the appropriate endpoint method.
@ -69,12 +81,13 @@ class NotificationDispatcher(object):
:type ctxt: IncomingMessage
"""
try:
self._dispatch(incoming.ctxt, incoming.message)
return self._dispatch(incoming.ctxt, incoming.message)
except Exception:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error('Exception during message handling',
exc_info=exc_info)
return NotificationResult.HANDLED
def _dispatch(self, ctxt, message):
"""Dispatch an RPC message to the appropriate endpoint method.
@ -99,6 +112,10 @@ class NotificationDispatcher(object):
for callback in self._callbacks_by_priority.get(priority, []):
localcontext.set_local_context(ctxt)
try:
callback(ctxt, publisher_id, event_type, payload)
ret = callback(ctxt, publisher_id, event_type, payload)
ret = NotificationResult.HANDLED if ret is None else ret
if ret != NotificationResult.HANDLED:
return ret
finally:
localcontext.clear_local_context()
return NotificationResult.HANDLED

View File

@ -73,6 +73,24 @@ priority
Parameters to endpoint methods are the request context supplied by the client,
the publisher_id of the notification message, the event_type, the payload.
An endpoint method can return explicitly messaging.NotificationResult.HANDLED
to acknowledge a message or messaging.NotificationResult.REQUEUE to requeue the
message.
The message is acknowledge only if all endpoints return
messaging.NotificationResult.HANDLED
If nothing is returned by an endpoint, this is considered like
messaging.NotificationResult.HANDLED
messaging.NotificationResult values needs to be handled by drivers:
* HANDLED: supported by all drivers
* REQUEUE: supported by drivers: fake://, rabbit://
In case of an unsupported driver nothing is done to the message and a
NotImplementedError is raised and logged.
By supplying a serializer object, a listener can deserialize a request context
and arguments from - and serialize return values to - primitive types.
"""

View File

@ -41,24 +41,58 @@ class TestDispatcher(test_utils.BaseTestCase):
('no_endpoints',
dict(endpoints=[],
endpoints_expect_calls=[],
priority='info')),
priority='info',
ex=None,
return_value=messaging.NotificationResult.HANDLED)),
('one_endpoints',
dict(endpoints=[['warn']],
endpoints_expect_calls=['warn'],
priority='warn')),
priority='warn',
ex=None,
return_value=messaging.NotificationResult.HANDLED)),
('two_endpoints_only_one_match',
dict(endpoints=[['warn'], ['info']],
endpoints_expect_calls=[None, 'info'],
priority='info')),
priority='info',
ex=None,
return_value=messaging.NotificationResult.HANDLED)),
('two_endpoints_both_match',
dict(endpoints=[['debug', 'info'], ['info', 'debug']],
endpoints_expect_calls=['debug', 'debug'],
priority='debug')),
priority='debug',
ex=None,
return_value=messaging.NotificationResult.HANDLED)),
('no_return_value',
dict(endpoints=[['warn']],
endpoints_expect_calls=['warn'],
priority='warn',
ex=None, return_value=None)),
('requeue',
dict(endpoints=[['debug', 'warn']],
endpoints_expect_calls=['debug'],
priority='debug', msg=notification_msg,
ex=None,
return_value=messaging.NotificationResult.REQUEUE)),
('exception',
dict(endpoints=[['debug', 'warn']],
endpoints_expect_calls=['debug'],
priority='debug', msg=notification_msg,
ex=Exception,
return_value=messaging.NotificationResult.HANDLED)),
]
def test_dispatcher(self):
endpoints = [mock.Mock(spec=endpoint_methods)
for endpoint_methods in self.endpoints]
endpoints = []
for endpoint_methods in self.endpoints:
e = mock.Mock(spec=endpoint_methods)
endpoints.append(e)
for m in endpoint_methods:
method = getattr(e, m)
if self.ex:
method.side_effect = self.ex()
else:
method.return_value = self.return_value
msg = notification_msg.copy()
msg['priority'] = self.priority
@ -89,6 +123,17 @@ class TestDispatcher(test_utils.BaseTestCase):
else:
self.assertEqual(endpoints[i].call_count, 0)
if self.ex:
self.assertEqual(incoming.acknowledge.call_count, 1)
self.assertEqual(incoming.requeue.call_count, 0)
elif self.return_value == messaging.NotificationResult.HANDLED \
or self.return_value is None:
self.assertEqual(incoming.acknowledge.call_count, 1)
self.assertEqual(incoming.requeue.call_count, 0)
elif self.return_value == messaging.NotificationResult.REQUEUE:
self.assertEqual(incoming.acknowledge.call_count, 0)
self.assertEqual(incoming.requeue.call_count, 1)
@mock.patch('oslo.messaging.notify.dispatcher.LOG')
def test_dispatcher_unknown_prio(self, mylog):
msg = notification_msg.copy()

View File

@ -123,7 +123,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info = mock.Mock()
endpoint.info.return_value = None
listener_thread = self._setup_listener(transport, [endpoint], 1)
notifier = self._setup_notifier(transport)
@ -138,7 +138,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info = mock.Mock()
endpoint.info.return_value = None
topics = ["topic1", "topic2"]
listener_thread = self._setup_listener(transport, [endpoint], 2,
topics=topics)
@ -157,9 +157,9 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint1 = mock.Mock()
endpoint1.info = mock.Mock()
endpoint1.info.return_value = None
endpoint2 = mock.Mock()
endpoint2.info = mock.Mock()
endpoint2.info.return_value = messaging.NotificationResult.HANDLED
listener_thread = self._setup_listener(transport,
[endpoint1, endpoint2], 1)
notifier = self._setup_notifier(transport)
@ -171,3 +171,25 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
{}, 'testpublisher', 'an_event.start', 'test')
endpoint2.info.assert_called_once_with(
{}, 'testpublisher', 'an_event.start', 'test')
def test_requeue(self):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info = mock.Mock()
def side_effect_requeue(*args, **kwargs):
if endpoint.info.call_count == 1:
return messaging.NotificationResult.REQUEUE
return messaging.NotificationResult.HANDLED
endpoint.info.side_effect = side_effect_requeue
listener_thread = self._setup_listener(transport,
[endpoint], 2)
notifier = self._setup_notifier(transport)
notifier.info({}, 'an_event.start', 'test')
self._stop_listener(listener_thread)
expected = [mock.call({}, 'testpublisher', 'an_event.start', 'test'),
mock.call({}, 'testpublisher', 'an_event.start', 'test')]
self.assertEqual(endpoint.info.call_args_list, expected)

View File

@ -206,6 +206,7 @@ class TestSendReceive(test_utils.BaseTestCase):
senders[i].start()
received = listener.poll()
received.message.pop('_unique_id')
self.assertIsNotNone(received)
self.assertEqual(received.ctxt, self.ctxt)
self.assertEqual(received.message, {'tx_id': i})
@ -302,12 +303,14 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
senders[0].start()
msgs.append(listener.poll())
msgs[-1].message.pop('_unique_id')
self.assertEqual(msgs[-1].message, {'tx_id': 0})
# Start the second guy, receive his message
senders[1].start()
msgs.append(listener.poll())
msgs[-1].message.pop('_unique_id')
self.assertEqual(msgs[-1].message, {'tx_id': 1})
# Reply to both in order, making the second thread queue
@ -602,6 +605,7 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
producer.publish(msg)
received = listener.poll()
received.message.pop('_unique_id')
self.assertIsNotNone(received)
self.assertEqual(self.expected_ctxt, received.ctxt)
self.assertEqual(self.expected, received.message)