Warn when wrong transport instance is used

Since RPC and notifications can have different backends, it is
useful to warn users if they use a notification transport in RPC
and vice versa. This patch introduces RPCTransport and
NotificationTransport subclasses of Transport, so it's easier to
add different behavior for them if need be.

Related-Bug: #1680192
Change-Id: Iab60544d69053c8e74c28a2d5c84665be749013f
This commit is contained in:
Rajath Agasthya 2017-09-06 14:58:32 -07:00
parent 338b85eb4e
commit 03b6f18f80
13 changed files with 148 additions and 55 deletions

View File

@ -132,6 +132,7 @@ import logging
from oslo_messaging._i18n import _LE
from oslo_messaging.notify import dispatcher as notify_dispatcher
from oslo_messaging import server as msg_server
from oslo_messaging import transport as msg_transport
LOG = logging.getLogger(__name__)
@ -163,6 +164,11 @@ class NotificationServerBase(msg_server.MessageHandlingServer):
class NotificationServer(NotificationServerBase):
def __init__(self, transport, targets, dispatcher, executor='blocking',
allow_requeue=True, pool=None):
if not isinstance(transport, msg_transport.NotificationTransport):
LOG.warning("Using RPC transport for notifications. Please use "
"get_notification_transport to obtain a "
"notification transport instance.")
super(NotificationServer, self).__init__(
transport, targets, dispatcher, executor, allow_requeue, pool, 1,
None

View File

@ -171,8 +171,9 @@ def get_notification_transport(conf, url=None,
group='oslo_messaging_notifications')
if url is None:
url = conf.oslo_messaging_notifications.transport_url
return msg_transport._get_transport(conf, url,
allowed_remote_exmods, aliases)
return msg_transport._get_transport(
conf, url, allowed_remote_exmods, aliases,
transport_cls=msg_transport.NotificationTransport)
class Notifier(object):
@ -245,6 +246,10 @@ class Notifier(object):
conf.register_opts(_notifier_opts,
group='oslo_messaging_notifications')
if not isinstance(transport, msg_transport.NotificationTransport):
_LOG.warning("Using RPC transport for notifications. Please use "
"get_notification_transport to obtain a "
"notification transport instance.")
self.transport = transport
self.publisher_id = publisher_id
if retry is not None:

View File

@ -24,6 +24,7 @@ __all__ = [
]
import abc
import logging
from oslo_config import cfg
import six
@ -32,6 +33,10 @@ from oslo_messaging._drivers import base as driver_base
from oslo_messaging import _utils as utils
from oslo_messaging import exceptions
from oslo_messaging import serializer as msg_serializer
from oslo_messaging import transport as msg_transport
LOG = logging.getLogger(__name__)
_client_opts = [
cfg.IntOpt('rpc_response_timeout',
@ -331,6 +336,11 @@ class RPCClient(_BaseCallContext):
if serializer is None:
serializer = msg_serializer.NoOpSerializer()
if not isinstance(transport, msg_transport.RPCTransport):
LOG.warning("Using notification transport for RPC. Please use "
"get_rpc_transport to obtain an RPC transport "
"instance.")
super(RPCClient, self).__init__(
transport, target, serializer, timeout, version_cap, retry
)

View File

@ -135,6 +135,7 @@ from debtcollector.updating import updated_kwarg_default_value
from oslo_messaging._i18n import _LE
from oslo_messaging.rpc import dispatcher as rpc_dispatcher
from oslo_messaging import server as msg_server
from oslo_messaging import transport as msg_transport
LOG = logging.getLogger(__name__)
@ -142,6 +143,10 @@ LOG = logging.getLogger(__name__)
class RPCServer(msg_server.MessageHandlingServer):
def __init__(self, transport, target, dispatcher, executor='blocking'):
super(RPCServer, self).__init__(transport, dispatcher, executor)
if not isinstance(transport, msg_transport.RPCTransport):
LOG.warning("Using notification transport for RPC. Please use "
"get_rpc_transport to obtain an RPC transport "
"instance.")
self._target = target
def _create_listener(self):

View File

@ -43,5 +43,6 @@ def get_rpc_transport(conf, url=None,
from
:type allowed_remote_exmods: list
"""
return msg_transport._get_transport(conf, url,
allowed_remote_exmods)
return msg_transport._get_transport(
conf, url, allowed_remote_exmods,
transport_cls=msg_transport.RPCTransport)

View File

@ -22,7 +22,6 @@ import oslo_messaging
from oslo_messaging.notify import dispatcher
from oslo_messaging.notify import notifier as msg_notifier
from oslo_messaging.tests import utils as test_utils
import six
from six.moves import mock
load_tests = testscenarios.load_tests_apply_scenarios
@ -183,7 +182,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
self.assertTrue(False)
def test_batch_timeout(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_notification_transport(self.conf,
url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
@ -191,7 +191,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
batch=(5, 1))
notifier = self._setup_notifier(transport)
for i in six.moves.range(12):
for _ in range(12):
notifier.info({}, 'an_event.start', 'test message')
self.wait_for_messages(3)
@ -209,7 +209,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
mock.call(messages * 2)])
def test_batch_size(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_notification_transport(self.conf,
url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
@ -217,7 +218,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
batch=(5, None))
notifier = self._setup_notifier(transport)
for i in six.moves.range(10):
for _ in range(10):
notifier.info({}, 'an_event.start', 'test message')
self.wait_for_messages(2)
@ -234,7 +235,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
mock.call(messages * 5)])
def test_batch_size_exception_path(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_notification_transport(self.conf,
url='fake:')
endpoint = mock.Mock()
endpoint.info.side_effect = [None, Exception('boom!')]
@ -242,7 +244,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
batch=(5, None))
notifier = self._setup_notifier(transport)
for i in six.moves.range(10):
for _ in range(10):
notifier.info({}, 'an_event.start', 'test message')
self.wait_for_messages(2)
@ -506,3 +508,18 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
for call in mocked_endpoint1_calls:
self.assertIn(call, endpoint2.info.mock_calls +
endpoint3.info.mock_calls)
class TestListenerTransportWarning(test_utils.BaseTestCase):
@mock.patch('oslo_messaging.notify.listener.LOG')
def test_warning_when_rpc_transport(self, log):
transport = oslo_messaging.get_rpc_transport(self.conf)
target = oslo_messaging.Target(topic='foo')
endpoints = [object()]
oslo_messaging.get_notification_listener(
transport, [target], endpoints)
log.warning.assert_called_once_with(
"Using RPC transport for notifications. Please use "
"get_notification_transport to obtain a "
"notification transport instance.")

View File

@ -16,7 +16,6 @@ import fixtures
import oslo_messaging
from oslo_messaging.notify import log_handler
from oslo_messaging.tests.notify import test_notifier
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
@ -34,7 +33,7 @@ class PublishErrorsHandlerTestCase(test_utils.BaseTestCase):
group='oslo_messaging_notifications')
self.stub_flg = True
transport = test_notifier._FakeTransport(self.conf)
transport = oslo_messaging.get_notification_transport(self.conf)
notifier = oslo_messaging.Notifier(transport)
def fake_notifier(*args, **kwargs):

View File

@ -22,7 +22,6 @@ from oslo_utils import timeutils
import testscenarios
import oslo_messaging
from oslo_messaging.tests.notify import test_notifier
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
@ -58,8 +57,9 @@ class TestLogNotifier(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow')
def test_logger(self, mock_utcnow):
fake_transport = oslo_messaging.get_notification_transport(self.conf)
with mock.patch('oslo_messaging.transport._get_transport',
return_value=test_notifier._FakeTransport(self.conf)):
return_value=fake_transport):
self.logger = oslo_messaging.LoggingNotificationHandler('test://')
mock_utcnow.return_value = datetime.datetime.utcnow()
@ -102,8 +102,9 @@ class TestLogNotifier(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow')
def test_logging_conf(self, mock_utcnow):
fake_transport = oslo_messaging.get_notification_transport(self.conf)
with mock.patch('oslo_messaging.transport._get_transport',
return_value=test_notifier._FakeTransport(self.conf)):
return_value=fake_transport):
logging.config.dictConfig({
'version': 1,
'handlers': {

View File

@ -47,15 +47,6 @@ class JsonMessageMatcher(object):
return self.message == jsonutils.loads(other)
class _FakeTransport(object):
def __init__(self, conf):
self.conf = conf
def _send_notification(self, target, ctxt, message, version, retry=None):
pass
class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
"""Record logged exceptions and re-raise in cleanup.
@ -73,6 +64,9 @@ class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
def exception(self, msg, *args, **kwargs):
self.exceptions.append(sys.exc_info()[1])
def warning(self, msg, *args, **kwargs):
return
def setUp(self):
super(_ReRaiseLoggedExceptionsFixture, self).setUp()
@ -170,7 +164,8 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
topics=self.topics,
group='oslo_messaging_notifications')
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_notification_transport(self.conf,
url='fake:')
if hasattr(self, 'ctor_pub_id'):
notifier = oslo_messaging.Notifier(transport,
@ -241,7 +236,8 @@ class TestSerializer(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow')
def test_serializer(self, mock_utcnow):
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_notification_transport(self.conf,
url='fake:')
serializer = msg_serializer.NoOpSerializer()
@ -289,7 +285,8 @@ class TestNotifierTopics(test_utils.BaseTestCase):
group='oslo_messaging_notifications')
self.config(topics=['topic1', 'topic2'],
group='oslo_messaging_notifications')
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_notification_transport(self.conf,
url='fake:')
notifier = oslo_messaging.Notifier(transport, 'test.localhost')
self.assertEqual(['topic1', 'topic2'], notifier._topics)
@ -297,7 +294,8 @@ class TestNotifierTopics(test_utils.BaseTestCase):
def test_topics_from_kwargs(self):
self.config(driver=['log'],
group='oslo_messaging_notifications')
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_notification_transport(self.conf,
url='fake:')
notifier = oslo_messaging.Notifier(transport, 'test.localhost',
topics=['topic1', 'topic2'])
@ -311,7 +309,8 @@ class TestLogNotifier(test_utils.BaseTestCase):
self.config(driver=['log'],
group='oslo_messaging_notifications')
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_notification_transport(self.conf,
url='fake:')
notifier = oslo_messaging.Notifier(transport, 'test.localhost')
@ -386,7 +385,8 @@ class TestNotificationConfig(test_utils.BaseTestCase):
group='oslo_messaging_notifications')
conf.set_override('retry', 3, group='oslo_messaging_notifications')
transport = _FakeTransport(conf)
transport = oslo_messaging.get_notification_transport(self.conf,
url='fake:')
notifier = oslo_messaging.Notifier(transport)
self.assertEqual(3, notifier.retry)
@ -397,7 +397,8 @@ class TestNotificationConfig(test_utils.BaseTestCase):
group='oslo_messaging_notifications')
conf.set_override('retry', 3, group='oslo_messaging_notifications')
transport = _FakeTransport(conf)
transport = oslo_messaging.get_notification_transport(self.conf,
url='fake:')
notifier = oslo_messaging.Notifier(transport, retry=5)
self.assertEqual(5, notifier.retry)
@ -409,7 +410,8 @@ class TestRoutingNotifier(test_utils.BaseTestCase):
self.config(driver=['routing'],
group='oslo_messaging_notifications')
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_notification_transport(self.conf,
url='fake:')
self.notifier = oslo_messaging.Notifier(transport)
self.router = self.notifier._driver_mgr['routing'].obj
@ -642,8 +644,21 @@ class TestNoOpNotifier(test_utils.BaseTestCase):
self.config(driver=['noop'],
group='oslo_messaging_notifications')
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_notification_transport(self.conf,
url='fake:')
notifier = oslo_messaging.Notifier(transport, 'test.localhost')
self.assertFalse(notifier.is_enabled())
class TestNotifierTransportWarning(test_utils.BaseTestCase):
@mock.patch('oslo_messaging.notify.notifier._LOG')
def test_warning_when_rpc_transport(self, log):
transport = oslo_messaging.get_rpc_transport(self.conf)
oslo_messaging.Notifier(transport, 'test.localhost')
log.warning.assert_called_once_with(
"Using RPC transport for notifications. Please use "
"get_notification_transport to obtain a "
"notification transport instance.")

View File

@ -25,15 +25,6 @@ from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
class _FakeTransport(object):
def __init__(self, conf):
self.conf = conf
def _send(self, *args, **kwargs):
pass
class TestCastCall(test_utils.BaseTestCase):
scenarios = [
@ -52,7 +43,7 @@ class TestCastCall(test_utils.BaseTestCase):
def test_cast_call(self):
self.config(rpc_response_timeout=None)
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target())
transport._send = mock.Mock()
@ -191,7 +182,7 @@ class TestCastToTarget(test_utils.BaseTestCase):
target = oslo_messaging.Target(**self.ctor)
expect_target = oslo_messaging.Target(**self.expect)
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, target)
transport._send = mock.Mock()
@ -242,7 +233,7 @@ class TestCallTimeout(test_utils.BaseTestCase):
def test_call_timeout(self):
self.config(rpc_response_timeout=self.confval)
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
timeout=self.ctor)
@ -273,7 +264,7 @@ class TestCallRetry(test_utils.BaseTestCase):
]
def test_call_retry(self):
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
retry=self.ctor)
@ -302,7 +293,7 @@ class TestCallFanout(test_utils.BaseTestCase):
]
def test_call_fanout(self):
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport,
oslo_messaging.Target(**self.target))
@ -331,7 +322,7 @@ class TestSerializer(test_utils.BaseTestCase):
def test_call_serializer(self):
self.config(rpc_response_timeout=None)
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
serializer = msg_serializer.NoOpSerializer()
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
@ -430,7 +421,7 @@ class TestVersionCap(test_utils.BaseTestCase):
def test_version_cap(self):
self.config(rpc_response_timeout=None)
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(version=self.version)
client = oslo_messaging.RPCClient(transport, target,
@ -535,7 +526,7 @@ class TestCanSendVersion(test_utils.BaseTestCase):
def test_version_cap(self):
self.config(rpc_response_timeout=None)
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(version=self.version)
client = oslo_messaging.RPCClient(transport, target,
@ -561,7 +552,7 @@ class TestCanSendVersion(test_utils.BaseTestCase):
def test_invalid_version_type(self):
target = oslo_messaging.Target(topic='sometopic')
transport = _FakeTransport(self.conf)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, target)
self.assertRaises(exceptions.MessagingException,
client.prepare, version='5')
@ -569,3 +560,15 @@ class TestCanSendVersion(test_utils.BaseTestCase):
client.prepare, version='5.a')
self.assertRaises(exceptions.MessagingException,
client.prepare, version='5.5.a')
class TestTransportWarning(test_utils.BaseTestCase):
@mock.patch('oslo_messaging.rpc.client.LOG')
def test_warning_when_notifier_transport(self, log):
transport = oslo_messaging.get_notification_transport(self.conf)
oslo_messaging.RPCClient(transport, oslo_messaging.Target())
log.warning.assert_called_once_with(
"Using notification transport for RPC. Please use "
"get_rpc_transport to obtain an RPC transport "
"instance.")

View File

@ -436,6 +436,20 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread)
@mock.patch('oslo_messaging.rpc.server.LOG')
def test_warning_when_notifier_transport(self, log):
transport = oslo_messaging.get_notification_transport(self.conf)
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
oslo_messaging.get_rpc_server(transport, target,
endpoints, serializer=serializer)
log.warning.assert_called_once_with(
"Using notification transport for RPC. Please use "
"get_rpc_transport to obtain an RPC transport "
"instance.")
class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):

View File

@ -148,6 +148,7 @@ class GetTransportTestCase(test_utils.BaseTestCase):
self.assertIsNotNone(transport_)
self.assertIs(transport_.conf, self.conf)
self.assertIs(transport_._driver, drvr)
self.assertTrue(isinstance(transport_, transport.RPCTransport))
driver.DriverManager.assert_called_once_with('oslo.messaging.drivers',
self.expect['backend'],

View File

@ -153,6 +153,20 @@ class Transport(object):
self._driver.cleanup()
class RPCTransport(Transport):
"""Transport object for RPC."""
def __init__(self, driver):
super(RPCTransport, self).__init__(driver)
class NotificationTransport(Transport):
"""Transport object for notifications."""
def __init__(self, driver):
super(NotificationTransport, self).__init__(driver)
class InvalidTransportURL(exceptions.MessagingException):
"""Raised if transport URL is invalid."""
@ -171,7 +185,8 @@ class DriverLoadFailure(exceptions.MessagingException):
self.ex = ex
def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None,
transport_cls=RPCTransport):
allowed_remote_exmods = allowed_remote_exmods or []
conf.register_opts(_transport_opts)
@ -190,7 +205,7 @@ def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
except RuntimeError as ex:
raise DriverLoadFailure(url.transport, ex)
return Transport(mgr.driver)
return transport_cls(mgr.driver)
@removals.remove(
@ -229,7 +244,8 @@ def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
:type aliases: dict
"""
return _get_transport(conf, url,
allowed_remote_exmods, aliases)
allowed_remote_exmods, aliases,
transport_cls=RPCTransport)
class TransportHost(object):