Enforce target preconditions outside of drivers

The target preconditions (e.g. you need at least a topic to send) are
the same for all drivers, so enforce them before we ever call into a
driver.

Change-Id: Ic4e9bd94bd9f060ec0662d2bb778c699903dddc4
This commit is contained in:
Mark McLoughlin 2013-08-01 23:09:28 +01:00
parent 89079c6ea1
commit 294c99a6d2
6 changed files with 28 additions and 32 deletions

View File

@ -307,7 +307,6 @@ class AMQPDriverBase(base.BaseDriver):
try:
with self._get_connection() as conn:
# FIXME(markmc): check that target.topic is set
if target.fanout:
conn.fanout_send(target.topic, msg)
else:
@ -326,8 +325,6 @@ class AMQPDriverBase(base.BaseDriver):
self._waiter.unlisten(msg_id)
def listen(self, target):
# FIXME(markmc): check that topic.target and topic.server is set
conn = self._get_connection(pooled=False)
listener = AMQPListener(self, target, conn)

View File

@ -25,14 +25,6 @@ from oslo.messaging._drivers import base
from oslo.messaging import _urls as urls
class InvalidTarget(base.TransportDriverError, ValueError):
def __init__(self, msg, target):
msg = msg + ":" + str(target)
super(InvalidTarget, self).__init__(msg)
self.target = target
class FakeIncomingMessage(base.IncomingMessage):
def __init__(self, listener, ctxt, message, reply_q):
@ -122,13 +114,6 @@ class FakeDriver(base.BaseDriver):
def send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, envelope=False):
if not target.topic:
raise InvalidTarget('A topic is required to send', target)
# FIXME(markmc): preconditions to enforce:
# - timeout and not wait_for_reply
# - target.fanout and (wait_for_reply or timeout)
self._check_serialize(message)
exchange = self._get_exchange(target.exchange or
@ -153,10 +138,6 @@ class FakeDriver(base.BaseDriver):
return None
def listen(self, target):
if not (target.topic and target.server):
raise InvalidTarget('Topic and server are required to listen',
target)
exchange = self._get_exchange(target.exchange or
self._default_exchange)

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
__all__ = ['MessagingException', 'MessagingTimeout']
__all__ = ['MessagingException', 'MessagingTimeout', 'InvalidTarget']
class MessagingException(Exception):
@ -28,3 +28,12 @@ class MessagingException(Exception):
class MessagingTimeout(MessagingException):
"""Raised if message sending times out."""
class InvalidTarget(MessagingException, ValueError):
"""Raised if a target does not meet certain pre-conditions."""
def __init__(self, msg, target):
msg = msg + ":" + str(target)
super(InvalidTarget, self).__init__(msg)
self.target = target

View File

@ -77,12 +77,19 @@ class Transport(object):
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, envelope=False):
if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send',
target)
return self._driver.send(target, ctxt, message,
wait_for_reply=wait_for_reply,
timeout=timeout,
envelope=envelope)
def _listen(self, target):
if not (target.topic and target.server):
raise exceptions.InvalidTarget('A server\'s target must have '
'topic and server names specified',
target)
return self._driver.listen(target)
def cleanup(self):

View File

@ -114,7 +114,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
try:
server.start()
except Exception as ex:
self.assertIsInstance(ex, messaging.ServerListenError, ex)
self.assertIsInstance(ex, messaging.InvalidTarget, ex)
self.assertEqual(ex.target.topic, 'testtopic')
else:
self.assertTrue(False)
@ -126,7 +126,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
try:
server.start()
except Exception as ex:
self.assertIsInstance(ex, messaging.ServerListenError, ex)
self.assertIsInstance(ex, messaging.InvalidTarget, ex)
self.assertEqual(ex.target.server, 'testserver')
else:
self.assertTrue(False)
@ -141,7 +141,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
try:
method({}, 'ping', arg='foo')
except Exception as ex:
self.assertIsInstance(ex, messaging.ClientSendError, ex)
self.assertIsInstance(ex, messaging.InvalidTarget, ex)
self.assertIsNotNone(ex.target)
else:
self.assertTrue(False)

View File

@ -231,29 +231,31 @@ class TestSetDefaults(test_utils.BaseTestCase):
class TestTransportMethodArgs(test_utils.BaseTestCase):
_target = messaging.Target(topic='topic', server='server')
def test_send_defaults(self):
t = transport.Transport(_FakeDriver(cfg.CONF))
self.mox.StubOutWithMock(t._driver, 'send')
t._driver.send('target', 'ctxt', 'message',
t._driver.send(self._target, 'ctxt', 'message',
wait_for_reply=None,
timeout=None,
envelope=False)
self.mox.ReplayAll()
t._send('target', 'ctxt', 'message')
t._send(self._target, 'ctxt', 'message')
def test_send_all_args(self):
t = transport.Transport(_FakeDriver(cfg.CONF))
self.mox.StubOutWithMock(t._driver, 'send')
t._driver.send('target', 'ctxt', 'message',
t._driver.send(self._target, 'ctxt', 'message',
wait_for_reply='wait_for_reply',
timeout='timeout',
envelope='envelope')
self.mox.ReplayAll()
t._send('target', 'ctxt', 'message',
t._send(self._target, 'ctxt', 'message',
wait_for_reply='wait_for_reply',
timeout='timeout',
envelope='envelope')
@ -262,7 +264,7 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
t = transport.Transport(_FakeDriver(cfg.CONF))
self.mox.StubOutWithMock(t._driver, 'listen')
t._driver.listen('target')
t._driver.listen(self._target)
self.mox.ReplayAll()
t._listen('target')
t._listen(self._target)