diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py index 01428566a..9de09f8a8 100644 --- a/oslo_messaging/notify/listener.py +++ b/oslo_messaging/notify/listener.py @@ -132,6 +132,7 @@ import logging from oslo_messaging._i18n import _LE from oslo_messaging.notify import dispatcher as notify_dispatcher from oslo_messaging import server as msg_server +from oslo_messaging import transport as msg_transport LOG = logging.getLogger(__name__) @@ -163,6 +164,11 @@ class NotificationServerBase(msg_server.MessageHandlingServer): class NotificationServer(NotificationServerBase): def __init__(self, transport, targets, dispatcher, executor='blocking', allow_requeue=True, pool=None): + if not isinstance(transport, msg_transport.NotificationTransport): + LOG.warning("Using RPC transport for notifications. Please use " + "get_notification_transport to obtain a " + "notification transport instance.") + super(NotificationServer, self).__init__( transport, targets, dispatcher, executor, allow_requeue, pool, 1, None diff --git a/oslo_messaging/notify/notifier.py b/oslo_messaging/notify/notifier.py index 7774deace..8bdb13caf 100644 --- a/oslo_messaging/notify/notifier.py +++ b/oslo_messaging/notify/notifier.py @@ -171,8 +171,9 @@ def get_notification_transport(conf, url=None, group='oslo_messaging_notifications') if url is None: url = conf.oslo_messaging_notifications.transport_url - return msg_transport._get_transport(conf, url, - allowed_remote_exmods, aliases) + return msg_transport._get_transport( + conf, url, allowed_remote_exmods, aliases, + transport_cls=msg_transport.NotificationTransport) class Notifier(object): @@ -245,6 +246,10 @@ class Notifier(object): conf.register_opts(_notifier_opts, group='oslo_messaging_notifications') + if not isinstance(transport, msg_transport.NotificationTransport): + _LOG.warning("Using RPC transport for notifications. Please use " + "get_notification_transport to obtain a " + "notification transport instance.") self.transport = transport self.publisher_id = publisher_id if retry is not None: diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py index 2a27a9ff8..4bf7fc443 100644 --- a/oslo_messaging/rpc/client.py +++ b/oslo_messaging/rpc/client.py @@ -24,6 +24,7 @@ __all__ = [ ] import abc +import logging from oslo_config import cfg import six @@ -32,6 +33,10 @@ from oslo_messaging._drivers import base as driver_base from oslo_messaging import _utils as utils from oslo_messaging import exceptions from oslo_messaging import serializer as msg_serializer +from oslo_messaging import transport as msg_transport + + +LOG = logging.getLogger(__name__) _client_opts = [ cfg.IntOpt('rpc_response_timeout', @@ -331,6 +336,11 @@ class RPCClient(_BaseCallContext): if serializer is None: serializer = msg_serializer.NoOpSerializer() + if not isinstance(transport, msg_transport.RPCTransport): + LOG.warning("Using notification transport for RPC. Please use " + "get_rpc_transport to obtain an RPC transport " + "instance.") + super(RPCClient, self).__init__( transport, target, serializer, timeout, version_cap, retry ) diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index c94669f32..b91fffecf 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -135,6 +135,7 @@ from debtcollector.updating import updated_kwarg_default_value from oslo_messaging._i18n import _LE from oslo_messaging.rpc import dispatcher as rpc_dispatcher from oslo_messaging import server as msg_server +from oslo_messaging import transport as msg_transport LOG = logging.getLogger(__name__) @@ -142,6 +143,10 @@ LOG = logging.getLogger(__name__) class RPCServer(msg_server.MessageHandlingServer): def __init__(self, transport, target, dispatcher, executor='blocking'): super(RPCServer, self).__init__(transport, dispatcher, executor) + if not isinstance(transport, msg_transport.RPCTransport): + LOG.warning("Using notification transport for RPC. Please use " + "get_rpc_transport to obtain an RPC transport " + "instance.") self._target = target def _create_listener(self): diff --git a/oslo_messaging/rpc/transport.py b/oslo_messaging/rpc/transport.py index e3abe3224..90815fc10 100644 --- a/oslo_messaging/rpc/transport.py +++ b/oslo_messaging/rpc/transport.py @@ -43,5 +43,6 @@ def get_rpc_transport(conf, url=None, from :type allowed_remote_exmods: list """ - return msg_transport._get_transport(conf, url, - allowed_remote_exmods) + return msg_transport._get_transport( + conf, url, allowed_remote_exmods, + transport_cls=msg_transport.RPCTransport) diff --git a/oslo_messaging/tests/notify/test_listener.py b/oslo_messaging/tests/notify/test_listener.py index 6582ccd03..13d91a04f 100644 --- a/oslo_messaging/tests/notify/test_listener.py +++ b/oslo_messaging/tests/notify/test_listener.py @@ -22,7 +22,6 @@ import oslo_messaging from oslo_messaging.notify import dispatcher from oslo_messaging.notify import notifier as msg_notifier from oslo_messaging.tests import utils as test_utils -import six from six.moves import mock load_tests = testscenarios.load_tests_apply_scenarios @@ -183,7 +182,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): self.assertTrue(False) def test_batch_timeout(self): - transport = oslo_messaging.get_transport(self.conf, url='fake:') + transport = oslo_messaging.get_notification_transport(self.conf, + url='fake:') endpoint = mock.Mock() endpoint.info.return_value = None @@ -191,7 +191,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): batch=(5, 1)) notifier = self._setup_notifier(transport) - for i in six.moves.range(12): + for _ in range(12): notifier.info({}, 'an_event.start', 'test message') self.wait_for_messages(3) @@ -209,7 +209,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): mock.call(messages * 2)]) def test_batch_size(self): - transport = oslo_messaging.get_transport(self.conf, url='fake:') + transport = oslo_messaging.get_notification_transport(self.conf, + url='fake:') endpoint = mock.Mock() endpoint.info.return_value = None @@ -217,7 +218,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): batch=(5, None)) notifier = self._setup_notifier(transport) - for i in six.moves.range(10): + for _ in range(10): notifier.info({}, 'an_event.start', 'test message') self.wait_for_messages(2) @@ -234,7 +235,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): mock.call(messages * 5)]) def test_batch_size_exception_path(self): - transport = oslo_messaging.get_transport(self.conf, url='fake:') + transport = oslo_messaging.get_notification_transport(self.conf, + url='fake:') endpoint = mock.Mock() endpoint.info.side_effect = [None, Exception('boom!')] @@ -242,7 +244,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): batch=(5, None)) notifier = self._setup_notifier(transport) - for i in six.moves.range(10): + for _ in range(10): notifier.info({}, 'an_event.start', 'test message') self.wait_for_messages(2) @@ -506,3 +508,18 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): for call in mocked_endpoint1_calls: self.assertIn(call, endpoint2.info.mock_calls + endpoint3.info.mock_calls) + + +class TestListenerTransportWarning(test_utils.BaseTestCase): + + @mock.patch('oslo_messaging.notify.listener.LOG') + def test_warning_when_rpc_transport(self, log): + transport = oslo_messaging.get_rpc_transport(self.conf) + target = oslo_messaging.Target(topic='foo') + endpoints = [object()] + oslo_messaging.get_notification_listener( + transport, [target], endpoints) + log.warning.assert_called_once_with( + "Using RPC transport for notifications. Please use " + "get_notification_transport to obtain a " + "notification transport instance.") diff --git a/oslo_messaging/tests/notify/test_log_handler.py b/oslo_messaging/tests/notify/test_log_handler.py index 6ca5f683a..40d109451 100644 --- a/oslo_messaging/tests/notify/test_log_handler.py +++ b/oslo_messaging/tests/notify/test_log_handler.py @@ -16,7 +16,6 @@ import fixtures import oslo_messaging from oslo_messaging.notify import log_handler -from oslo_messaging.tests.notify import test_notifier from oslo_messaging.tests import utils as test_utils from six.moves import mock @@ -34,7 +33,7 @@ class PublishErrorsHandlerTestCase(test_utils.BaseTestCase): group='oslo_messaging_notifications') self.stub_flg = True - transport = test_notifier._FakeTransport(self.conf) + transport = oslo_messaging.get_notification_transport(self.conf) notifier = oslo_messaging.Notifier(transport) def fake_notifier(*args, **kwargs): diff --git a/oslo_messaging/tests/notify/test_logger.py b/oslo_messaging/tests/notify/test_logger.py index e7cd868b8..37ce82e76 100644 --- a/oslo_messaging/tests/notify/test_logger.py +++ b/oslo_messaging/tests/notify/test_logger.py @@ -22,7 +22,6 @@ from oslo_utils import timeutils import testscenarios import oslo_messaging -from oslo_messaging.tests.notify import test_notifier from oslo_messaging.tests import utils as test_utils from six.moves import mock @@ -58,8 +57,9 @@ class TestLogNotifier(test_utils.BaseTestCase): @mock.patch('oslo_utils.timeutils.utcnow') def test_logger(self, mock_utcnow): + fake_transport = oslo_messaging.get_notification_transport(self.conf) with mock.patch('oslo_messaging.transport._get_transport', - return_value=test_notifier._FakeTransport(self.conf)): + return_value=fake_transport): self.logger = oslo_messaging.LoggingNotificationHandler('test://') mock_utcnow.return_value = datetime.datetime.utcnow() @@ -102,8 +102,9 @@ class TestLogNotifier(test_utils.BaseTestCase): @mock.patch('oslo_utils.timeutils.utcnow') def test_logging_conf(self, mock_utcnow): + fake_transport = oslo_messaging.get_notification_transport(self.conf) with mock.patch('oslo_messaging.transport._get_transport', - return_value=test_notifier._FakeTransport(self.conf)): + return_value=fake_transport): logging.config.dictConfig({ 'version': 1, 'handlers': { diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py index 364803e06..02bb8a404 100755 --- a/oslo_messaging/tests/notify/test_notifier.py +++ b/oslo_messaging/tests/notify/test_notifier.py @@ -47,15 +47,6 @@ class JsonMessageMatcher(object): return self.message == jsonutils.loads(other) -class _FakeTransport(object): - - def __init__(self, conf): - self.conf = conf - - def _send_notification(self, target, ctxt, message, version, retry=None): - pass - - class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture): """Record logged exceptions and re-raise in cleanup. @@ -73,6 +64,9 @@ class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture): def exception(self, msg, *args, **kwargs): self.exceptions.append(sys.exc_info()[1]) + def warning(self, msg, *args, **kwargs): + return + def setUp(self): super(_ReRaiseLoggedExceptionsFixture, self).setUp() @@ -170,7 +164,8 @@ class TestMessagingNotifier(test_utils.BaseTestCase): topics=self.topics, group='oslo_messaging_notifications') - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_notification_transport(self.conf, + url='fake:') if hasattr(self, 'ctor_pub_id'): notifier = oslo_messaging.Notifier(transport, @@ -241,7 +236,8 @@ class TestSerializer(test_utils.BaseTestCase): @mock.patch('oslo_utils.timeutils.utcnow') def test_serializer(self, mock_utcnow): - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_notification_transport(self.conf, + url='fake:') serializer = msg_serializer.NoOpSerializer() @@ -289,7 +285,8 @@ class TestNotifierTopics(test_utils.BaseTestCase): group='oslo_messaging_notifications') self.config(topics=['topic1', 'topic2'], group='oslo_messaging_notifications') - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_notification_transport(self.conf, + url='fake:') notifier = oslo_messaging.Notifier(transport, 'test.localhost') self.assertEqual(['topic1', 'topic2'], notifier._topics) @@ -297,7 +294,8 @@ class TestNotifierTopics(test_utils.BaseTestCase): def test_topics_from_kwargs(self): self.config(driver=['log'], group='oslo_messaging_notifications') - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_notification_transport(self.conf, + url='fake:') notifier = oslo_messaging.Notifier(transport, 'test.localhost', topics=['topic1', 'topic2']) @@ -311,7 +309,8 @@ class TestLogNotifier(test_utils.BaseTestCase): self.config(driver=['log'], group='oslo_messaging_notifications') - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_notification_transport(self.conf, + url='fake:') notifier = oslo_messaging.Notifier(transport, 'test.localhost') @@ -386,7 +385,8 @@ class TestNotificationConfig(test_utils.BaseTestCase): group='oslo_messaging_notifications') conf.set_override('retry', 3, group='oslo_messaging_notifications') - transport = _FakeTransport(conf) + transport = oslo_messaging.get_notification_transport(self.conf, + url='fake:') notifier = oslo_messaging.Notifier(transport) self.assertEqual(3, notifier.retry) @@ -397,7 +397,8 @@ class TestNotificationConfig(test_utils.BaseTestCase): group='oslo_messaging_notifications') conf.set_override('retry', 3, group='oslo_messaging_notifications') - transport = _FakeTransport(conf) + transport = oslo_messaging.get_notification_transport(self.conf, + url='fake:') notifier = oslo_messaging.Notifier(transport, retry=5) self.assertEqual(5, notifier.retry) @@ -409,7 +410,8 @@ class TestRoutingNotifier(test_utils.BaseTestCase): self.config(driver=['routing'], group='oslo_messaging_notifications') - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_notification_transport(self.conf, + url='fake:') self.notifier = oslo_messaging.Notifier(transport) self.router = self.notifier._driver_mgr['routing'].obj @@ -642,8 +644,21 @@ class TestNoOpNotifier(test_utils.BaseTestCase): self.config(driver=['noop'], group='oslo_messaging_notifications') - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_notification_transport(self.conf, + url='fake:') notifier = oslo_messaging.Notifier(transport, 'test.localhost') self.assertFalse(notifier.is_enabled()) + + +class TestNotifierTransportWarning(test_utils.BaseTestCase): + + @mock.patch('oslo_messaging.notify.notifier._LOG') + def test_warning_when_rpc_transport(self, log): + transport = oslo_messaging.get_rpc_transport(self.conf) + oslo_messaging.Notifier(transport, 'test.localhost') + log.warning.assert_called_once_with( + "Using RPC transport for notifications. Please use " + "get_notification_transport to obtain a " + "notification transport instance.") diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py index e955167fd..f57de543b 100755 --- a/oslo_messaging/tests/rpc/test_client.py +++ b/oslo_messaging/tests/rpc/test_client.py @@ -25,15 +25,6 @@ from oslo_messaging.tests import utils as test_utils load_tests = testscenarios.load_tests_apply_scenarios -class _FakeTransport(object): - - def __init__(self, conf): - self.conf = conf - - def _send(self, *args, **kwargs): - pass - - class TestCastCall(test_utils.BaseTestCase): scenarios = [ @@ -52,7 +43,7 @@ class TestCastCall(test_utils.BaseTestCase): def test_cast_call(self): self.config(rpc_response_timeout=None) - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') client = oslo_messaging.RPCClient(transport, oslo_messaging.Target()) transport._send = mock.Mock() @@ -191,7 +182,7 @@ class TestCastToTarget(test_utils.BaseTestCase): target = oslo_messaging.Target(**self.ctor) expect_target = oslo_messaging.Target(**self.expect) - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') client = oslo_messaging.RPCClient(transport, target) transport._send = mock.Mock() @@ -242,7 +233,7 @@ class TestCallTimeout(test_utils.BaseTestCase): def test_call_timeout(self): self.config(rpc_response_timeout=self.confval) - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), timeout=self.ctor) @@ -273,7 +264,7 @@ class TestCallRetry(test_utils.BaseTestCase): ] def test_call_retry(self): - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), retry=self.ctor) @@ -302,7 +293,7 @@ class TestCallFanout(test_utils.BaseTestCase): ] def test_call_fanout(self): - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(**self.target)) @@ -331,7 +322,7 @@ class TestSerializer(test_utils.BaseTestCase): def test_call_serializer(self): self.config(rpc_response_timeout=None) - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') serializer = msg_serializer.NoOpSerializer() client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), @@ -430,7 +421,7 @@ class TestVersionCap(test_utils.BaseTestCase): def test_version_cap(self): self.config(rpc_response_timeout=None) - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') target = oslo_messaging.Target(version=self.version) client = oslo_messaging.RPCClient(transport, target, @@ -535,7 +526,7 @@ class TestCanSendVersion(test_utils.BaseTestCase): def test_version_cap(self): self.config(rpc_response_timeout=None) - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') target = oslo_messaging.Target(version=self.version) client = oslo_messaging.RPCClient(transport, target, @@ -561,7 +552,7 @@ class TestCanSendVersion(test_utils.BaseTestCase): def test_invalid_version_type(self): target = oslo_messaging.Target(topic='sometopic') - transport = _FakeTransport(self.conf) + transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') client = oslo_messaging.RPCClient(transport, target) self.assertRaises(exceptions.MessagingException, client.prepare, version='5') @@ -569,3 +560,15 @@ class TestCanSendVersion(test_utils.BaseTestCase): client.prepare, version='5.a') self.assertRaises(exceptions.MessagingException, client.prepare, version='5.5.a') + + +class TestTransportWarning(test_utils.BaseTestCase): + + @mock.patch('oslo_messaging.rpc.client.LOG') + def test_warning_when_notifier_transport(self, log): + transport = oslo_messaging.get_notification_transport(self.conf) + oslo_messaging.RPCClient(transport, oslo_messaging.Target()) + log.warning.assert_called_once_with( + "Using notification transport for RPC. Please use " + "get_rpc_transport to obtain an RPC transport " + "instance.") diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index 22793242b..d1369dde3 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -436,6 +436,20 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): self._stop_server(client, server_thread) + @mock.patch('oslo_messaging.rpc.server.LOG') + def test_warning_when_notifier_transport(self, log): + transport = oslo_messaging.get_notification_transport(self.conf) + target = oslo_messaging.Target(topic='foo', server='bar') + endpoints = [object()] + serializer = object() + + oslo_messaging.get_rpc_server(transport, target, + endpoints, serializer=serializer) + log.warning.assert_called_once_with( + "Using notification transport for RPC. Please use " + "get_rpc_transport to obtain an RPC transport " + "instance.") + class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin): diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py index 71ff9ec4d..e0d327f94 100755 --- a/oslo_messaging/tests/test_transport.py +++ b/oslo_messaging/tests/test_transport.py @@ -148,6 +148,7 @@ class GetTransportTestCase(test_utils.BaseTestCase): self.assertIsNotNone(transport_) self.assertIs(transport_.conf, self.conf) self.assertIs(transport_._driver, drvr) + self.assertTrue(isinstance(transport_, transport.RPCTransport)) driver.DriverManager.assert_called_once_with('oslo.messaging.drivers', self.expect['backend'], diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py index f00c9013f..c2d0e25ae 100644 --- a/oslo_messaging/transport.py +++ b/oslo_messaging/transport.py @@ -153,6 +153,20 @@ class Transport(object): self._driver.cleanup() +class RPCTransport(Transport): + """Transport object for RPC.""" + + def __init__(self, driver): + super(RPCTransport, self).__init__(driver) + + +class NotificationTransport(Transport): + """Transport object for notifications.""" + + def __init__(self, driver): + super(NotificationTransport, self).__init__(driver) + + class InvalidTransportURL(exceptions.MessagingException): """Raised if transport URL is invalid.""" @@ -171,7 +185,8 @@ class DriverLoadFailure(exceptions.MessagingException): self.ex = ex -def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None): +def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None, + transport_cls=RPCTransport): allowed_remote_exmods = allowed_remote_exmods or [] conf.register_opts(_transport_opts) @@ -190,7 +205,7 @@ def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None): except RuntimeError as ex: raise DriverLoadFailure(url.transport, ex) - return Transport(mgr.driver) + return transport_cls(mgr.driver) @removals.remove( @@ -229,7 +244,8 @@ def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None): :type aliases: dict """ return _get_transport(conf, url, - allowed_remote_exmods, aliases) + allowed_remote_exmods, aliases, + transport_cls=RPCTransport) class TransportHost(object):