diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index fb001f10b..ab77f4f78 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -277,8 +277,8 @@ class AMQPDriverBase(base.BaseDriver): return self._reply_q - def send(self, target, ctxt, message, - wait_for_reply=None, timeout=None, envelope=False): + def _send(self, target, ctxt, message, + wait_for_reply=None, timeout=None, envelope=True): # FIXME(markmc): remove this temporary hack class Context(object): @@ -299,8 +299,8 @@ class AMQPDriverBase(base.BaseDriver): msg.update({'_reply_q': self._get_reply_q()}) - # FIXME(markmc): handle envelope param - msg = rpc_common.serialize_msg(msg) + if envelope: + msg = rpc_common.serialize_msg(msg) if wait_for_reply: self._waiter.listen(msg_id) @@ -324,6 +324,12 @@ class AMQPDriverBase(base.BaseDriver): if wait_for_reply: self._waiter.unlisten(msg_id) + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None): + return self._send(target, ctxt, message, wait_for_reply, timeout) + + def send_notification(self, target, ctxt, message, version): + return self._send(target, ctxt, message, envelope=(version == 2.0)) + def listen(self, target): conn = self._get_connection(pooled=False) diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py index 2b7b8e028..1dd49f4e6 100644 --- a/oslo/messaging/_drivers/base.py +++ b/oslo/messaging/_drivers/base.py @@ -65,6 +65,10 @@ class BaseDriver(object): wait_for_reply=None, timeout=None, envelope=False): """Send a message to the given target.""" + @abc.abstractmethod + def send_notification(self, target, ctxt, message, version): + """Send a notification message to the given target.""" + @abc.abstractmethod def listen(self, target): """Construct a Listener for the given target.""" diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index 6afbfad37..3b4c99e18 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -112,8 +112,7 @@ class FakeDriver(base.BaseDriver): while self._exchanges_lock: return self._exchanges.setdefault(name, FakeExchange(name)) - def send(self, target, ctxt, message, - wait_for_reply=None, timeout=None, envelope=False): + def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None): self._check_serialize(message) exchange = self._get_exchange(target.exchange or @@ -137,6 +136,12 @@ class FakeDriver(base.BaseDriver): return None + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None): + return self._send(target, ctxt, message, wait_for_reply, timeout) + + def send_notification(self, target, ctxt, message, version): + self._send(target, ctxt, message) + def listen(self, target): exchange = self._get_exchange(target.exchange or self._default_exchange) diff --git a/oslo/messaging/notify/_impl_messaging.py b/oslo/messaging/notify/_impl_messaging.py index d422fbd6e..1eca98ec5 100644 --- a/oslo/messaging/notify/_impl_messaging.py +++ b/oslo/messaging/notify/_impl_messaging.py @@ -34,17 +34,17 @@ class MessagingDriver(notifier._Driver): deployed which do not support the 2.0 message format. """ - def __init__(self, conf, topics, transport, envelope=False): + def __init__(self, conf, topics, transport, version=1.0): super(MessagingDriver, self).__init__(conf, topics, transport) - self.envelope = envelope + self.version = version def notify(self, ctxt, message, priority): priority = priority.lower() for topic in self.topics: target = messaging.Target(topic='%s.%s' % (topic, priority)) try: - self.transport._send(target, ctxt, message, - envelope=self.envelope) + self.transport._send_notification(target, ctxt, message, + version=self.version) except Exception: LOG.exception("Could not send notification to %(topic)s. " "Payload=%(message)s", @@ -56,4 +56,4 @@ class MessagingV2Driver(MessagingDriver): "Send notifications using the 2.0 message format." def __init__(self, conf, **kwargs): - super(MessagingV2Driver, self).__init__(conf, envelope=True, **kwargs) + super(MessagingV2Driver, self).__init__(conf, version=2.0, **kwargs) diff --git a/oslo/messaging/transport.py b/oslo/messaging/transport.py index f4667c83b..20083688f 100644 --- a/oslo/messaging/transport.py +++ b/oslo/messaging/transport.py @@ -75,15 +75,19 @@ class Transport(object): self.conf = driver.conf self._driver = driver - def _send(self, target, ctxt, message, - wait_for_reply=None, timeout=None, envelope=False): + def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None): if not target.topic: raise exceptions.InvalidTarget('A topic is required to send', target) return self._driver.send(target, ctxt, message, wait_for_reply=wait_for_reply, - timeout=timeout, - envelope=envelope) + timeout=timeout) + + def _send_notification(self, target, ctxt, message, version): + if not target.topic: + raise exceptions.InvalidTarget('A topic is required to send', + target) + self._driver.send(target, ctxt, message, version) def _listen(self, target): if not (target.topic and target.server): diff --git a/tests/test_notifier.py b/tests/test_notifier.py index a8ebc8691..4a7ed29c6 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -37,8 +37,7 @@ class _FakeTransport(object): def __init__(self, conf): self.conf = conf - def _send(self, target, ctxt, message, - wait_for_reply=None, timeout=None, envelope=False): + def _send_notification(self, target, ctxt, message, version): pass @@ -139,7 +138,7 @@ class TestMessagingNotifier(test_utils.BaseTestCase): notifier = messaging.Notifier(transport, 'test.localhost') - self.mox.StubOutWithMock(transport, '_send') + self.mox.StubOutWithMock(transport, '_send_notification') message_id = uuid.uuid4() self.mox.StubOutWithMock(uuid, 'uuid4') @@ -158,15 +157,16 @@ class TestMessagingNotifier(test_utils.BaseTestCase): sends = [] if self.v1: - sends.append(dict(envelope=False)) + sends.append(dict(version=1.0)) if self.v2: - sends.append(dict(envelope=True)) + sends.append(dict(version=2.0)) for send_kwargs in sends: for topic in self.topics: target = messaging.Target(topic='%s.%s' % (topic, self.priority)) - transport._send(target, self.ctxt, message, **send_kwargs) + transport._send_notification(target, self.ctxt, message, + **send_kwargs) self.mox.ReplayAll() diff --git a/tests/test_transport.py b/tests/test_transport.py index 55e581c37..0722ac554 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -36,6 +36,9 @@ class _FakeDriver(object): def send(self, *args, **kwargs): pass + def send_notification(self, *args, **kwargs): + pass + def listen(self, target): pass @@ -239,8 +242,7 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): self.mox.StubOutWithMock(t._driver, 'send') t._driver.send(self._target, 'ctxt', 'message', wait_for_reply=None, - timeout=None, - envelope=False) + timeout=None) self.mox.ReplayAll() t._send(self._target, 'ctxt', 'message') @@ -251,14 +253,21 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): self.mox.StubOutWithMock(t._driver, 'send') t._driver.send(self._target, 'ctxt', 'message', wait_for_reply='wait_for_reply', - timeout='timeout', - envelope='envelope') + timeout='timeout') self.mox.ReplayAll() t._send(self._target, 'ctxt', 'message', wait_for_reply='wait_for_reply', - timeout='timeout', - envelope='envelope') + timeout='timeout') + + def test_send_notification(self): + t = transport.Transport(_FakeDriver(cfg.CONF)) + + self.mox.StubOutWithMock(t._driver, 'send') + t._driver.send(self._target, 'ctxt', 'message', 1.0) + self.mox.ReplayAll() + + t._send_notification(self._target, 'ctxt', 'message', version=1.0) def test_listen(self): t = transport.Transport(_FakeDriver(cfg.CONF))