Add a driver method specifically for sending notifications

Notifications are an unusual case in that we need users to manually opt
in to new incompatible message formats by editing configuration because
there may be external consumers expecting the old format.

Add a send_notification() method to the driver interface and add a
format version paramater to the method, to make it clear that this
version selection is specifically for notifications.

In the case of the rabbit/qpid drivers, the 2.0 format is where we added
the message envelope.

Change-Id: Ib4925c308b1252503749962aa16f043281f2b429
This commit is contained in:
Mark McLoughlin 2013-08-01 23:32:17 +01:00
parent 294c99a6d2
commit 206c19e99e
7 changed files with 55 additions and 27 deletions

View File

@ -277,8 +277,8 @@ class AMQPDriverBase(base.BaseDriver):
return self._reply_q return self._reply_q
def send(self, target, ctxt, message, def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, envelope=False): wait_for_reply=None, timeout=None, envelope=True):
# FIXME(markmc): remove this temporary hack # FIXME(markmc): remove this temporary hack
class Context(object): class Context(object):
@ -299,8 +299,8 @@ class AMQPDriverBase(base.BaseDriver):
msg.update({'_reply_q': self._get_reply_q()}) msg.update({'_reply_q': self._get_reply_q()})
# FIXME(markmc): handle envelope param if envelope:
msg = rpc_common.serialize_msg(msg) msg = rpc_common.serialize_msg(msg)
if wait_for_reply: if wait_for_reply:
self._waiter.listen(msg_id) self._waiter.listen(msg_id)
@ -324,6 +324,12 @@ class AMQPDriverBase(base.BaseDriver):
if wait_for_reply: if wait_for_reply:
self._waiter.unlisten(msg_id) self._waiter.unlisten(msg_id)
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
return self._send(target, ctxt, message, wait_for_reply, timeout)
def send_notification(self, target, ctxt, message, version):
return self._send(target, ctxt, message, envelope=(version == 2.0))
def listen(self, target): def listen(self, target):
conn = self._get_connection(pooled=False) conn = self._get_connection(pooled=False)

View File

@ -65,6 +65,10 @@ class BaseDriver(object):
wait_for_reply=None, timeout=None, envelope=False): wait_for_reply=None, timeout=None, envelope=False):
"""Send a message to the given target.""" """Send a message to the given target."""
@abc.abstractmethod
def send_notification(self, target, ctxt, message, version):
"""Send a notification message to the given target."""
@abc.abstractmethod @abc.abstractmethod
def listen(self, target): def listen(self, target):
"""Construct a Listener for the given target.""" """Construct a Listener for the given target."""

View File

@ -112,8 +112,7 @@ class FakeDriver(base.BaseDriver):
while self._exchanges_lock: while self._exchanges_lock:
return self._exchanges.setdefault(name, FakeExchange(name)) return self._exchanges.setdefault(name, FakeExchange(name))
def send(self, target, ctxt, message, def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
wait_for_reply=None, timeout=None, envelope=False):
self._check_serialize(message) self._check_serialize(message)
exchange = self._get_exchange(target.exchange or exchange = self._get_exchange(target.exchange or
@ -137,6 +136,12 @@ class FakeDriver(base.BaseDriver):
return None return None
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
return self._send(target, ctxt, message, wait_for_reply, timeout)
def send_notification(self, target, ctxt, message, version):
self._send(target, ctxt, message)
def listen(self, target): def listen(self, target):
exchange = self._get_exchange(target.exchange or exchange = self._get_exchange(target.exchange or
self._default_exchange) self._default_exchange)

View File

@ -34,17 +34,17 @@ class MessagingDriver(notifier._Driver):
deployed which do not support the 2.0 message format. deployed which do not support the 2.0 message format.
""" """
def __init__(self, conf, topics, transport, envelope=False): def __init__(self, conf, topics, transport, version=1.0):
super(MessagingDriver, self).__init__(conf, topics, transport) super(MessagingDriver, self).__init__(conf, topics, transport)
self.envelope = envelope self.version = version
def notify(self, ctxt, message, priority): def notify(self, ctxt, message, priority):
priority = priority.lower() priority = priority.lower()
for topic in self.topics: for topic in self.topics:
target = messaging.Target(topic='%s.%s' % (topic, priority)) target = messaging.Target(topic='%s.%s' % (topic, priority))
try: try:
self.transport._send(target, ctxt, message, self.transport._send_notification(target, ctxt, message,
envelope=self.envelope) version=self.version)
except Exception: except Exception:
LOG.exception("Could not send notification to %(topic)s. " LOG.exception("Could not send notification to %(topic)s. "
"Payload=%(message)s", "Payload=%(message)s",
@ -56,4 +56,4 @@ class MessagingV2Driver(MessagingDriver):
"Send notifications using the 2.0 message format." "Send notifications using the 2.0 message format."
def __init__(self, conf, **kwargs): def __init__(self, conf, **kwargs):
super(MessagingV2Driver, self).__init__(conf, envelope=True, **kwargs) super(MessagingV2Driver, self).__init__(conf, version=2.0, **kwargs)

View File

@ -75,15 +75,19 @@ class Transport(object):
self.conf = driver.conf self.conf = driver.conf
self._driver = driver self._driver = driver
def _send(self, target, ctxt, message, def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
wait_for_reply=None, timeout=None, envelope=False):
if not target.topic: if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send', raise exceptions.InvalidTarget('A topic is required to send',
target) target)
return self._driver.send(target, ctxt, message, return self._driver.send(target, ctxt, message,
wait_for_reply=wait_for_reply, wait_for_reply=wait_for_reply,
timeout=timeout, timeout=timeout)
envelope=envelope)
def _send_notification(self, target, ctxt, message, version):
if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send',
target)
self._driver.send(target, ctxt, message, version)
def _listen(self, target): def _listen(self, target):
if not (target.topic and target.server): if not (target.topic and target.server):

View File

@ -37,8 +37,7 @@ class _FakeTransport(object):
def __init__(self, conf): def __init__(self, conf):
self.conf = conf self.conf = conf
def _send(self, target, ctxt, message, def _send_notification(self, target, ctxt, message, version):
wait_for_reply=None, timeout=None, envelope=False):
pass pass
@ -139,7 +138,7 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
notifier = messaging.Notifier(transport, 'test.localhost') notifier = messaging.Notifier(transport, 'test.localhost')
self.mox.StubOutWithMock(transport, '_send') self.mox.StubOutWithMock(transport, '_send_notification')
message_id = uuid.uuid4() message_id = uuid.uuid4()
self.mox.StubOutWithMock(uuid, 'uuid4') self.mox.StubOutWithMock(uuid, 'uuid4')
@ -158,15 +157,16 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
sends = [] sends = []
if self.v1: if self.v1:
sends.append(dict(envelope=False)) sends.append(dict(version=1.0))
if self.v2: if self.v2:
sends.append(dict(envelope=True)) sends.append(dict(version=2.0))
for send_kwargs in sends: for send_kwargs in sends:
for topic in self.topics: for topic in self.topics:
target = messaging.Target(topic='%s.%s' % (topic, target = messaging.Target(topic='%s.%s' % (topic,
self.priority)) self.priority))
transport._send(target, self.ctxt, message, **send_kwargs) transport._send_notification(target, self.ctxt, message,
**send_kwargs)
self.mox.ReplayAll() self.mox.ReplayAll()

View File

@ -36,6 +36,9 @@ class _FakeDriver(object):
def send(self, *args, **kwargs): def send(self, *args, **kwargs):
pass pass
def send_notification(self, *args, **kwargs):
pass
def listen(self, target): def listen(self, target):
pass pass
@ -239,8 +242,7 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
self.mox.StubOutWithMock(t._driver, 'send') self.mox.StubOutWithMock(t._driver, 'send')
t._driver.send(self._target, 'ctxt', 'message', t._driver.send(self._target, 'ctxt', 'message',
wait_for_reply=None, wait_for_reply=None,
timeout=None, timeout=None)
envelope=False)
self.mox.ReplayAll() self.mox.ReplayAll()
t._send(self._target, 'ctxt', 'message') t._send(self._target, 'ctxt', 'message')
@ -251,14 +253,21 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
self.mox.StubOutWithMock(t._driver, 'send') self.mox.StubOutWithMock(t._driver, 'send')
t._driver.send(self._target, 'ctxt', 'message', t._driver.send(self._target, 'ctxt', 'message',
wait_for_reply='wait_for_reply', wait_for_reply='wait_for_reply',
timeout='timeout', timeout='timeout')
envelope='envelope')
self.mox.ReplayAll() self.mox.ReplayAll()
t._send(self._target, 'ctxt', 'message', t._send(self._target, 'ctxt', 'message',
wait_for_reply='wait_for_reply', wait_for_reply='wait_for_reply',
timeout='timeout', timeout='timeout')
envelope='envelope')
def test_send_notification(self):
t = transport.Transport(_FakeDriver(cfg.CONF))
self.mox.StubOutWithMock(t._driver, 'send')
t._driver.send(self._target, 'ctxt', 'message', 1.0)
self.mox.ReplayAll()
t._send_notification(self._target, 'ctxt', 'message', version=1.0)
def test_listen(self): def test_listen(self):
t = transport.Transport(_FakeDriver(cfg.CONF)) t = transport.Transport(_FakeDriver(cfg.CONF))