Add get_rpc_transport call

The get_rpc_transport wraps get_transport to unify the API in
anticipation of comprehensive separation of RPC and Notification
messaging backends.

Related-Bug: 1680192
Change-Id: Ic6af07b98ff43806c2af38a3ba129991f1e0ec86
This commit is contained in:
Andrew Smith 2017-04-06 09:47:34 -04:00
parent e569c92cd9
commit ec4d6639bc
10 changed files with 144 additions and 54 deletions

View File

@ -4,8 +4,6 @@ Transport
.. currentmodule:: oslo_messaging
.. autofunction:: get_transport
.. autoclass:: Transport
.. autoclass:: TransportURL

View File

@ -171,8 +171,8 @@ def get_notification_transport(conf, url=None,
group='oslo_messaging_notifications')
if url is None:
url = conf.oslo_messaging_notifications.transport_url
return msg_transport.get_transport(conf, url,
allowed_remote_exmods, aliases)
return msg_transport._get_transport(conf, url,
allowed_remote_exmods, aliases)
class Notifier(object):

View File

@ -28,10 +28,12 @@ __all__ = [
'RemoteError',
'UnsupportedVersion',
'expected_exceptions',
'get_rpc_transport',
'get_rpc_server',
'expose'
]
from .client import *
from .dispatcher import *
from .transport import *
from .server import *

View File

@ -282,7 +282,7 @@ class RPCClient(_BaseCallContext):
However, this class can be used directly without wrapping it another class.
For example::
transport = messaging.get_transport(cfg.CONF)
transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic='test', version='2.0')
client = messaging.RPCClient(transport, target)
client.call(ctxt, 'test', arg=arg)
@ -440,12 +440,12 @@ class RPCClient(_BaseCallContext):
method are handled are quite subtle.
Firstly, if the remote exception is contained in one of the modules
listed in the allow_remote_exmods messaging.get_transport() parameter,
then it this exception will be re-raised by call(). However, such
locally re-raised remote exceptions are distinguishable from the same
exception type raised locally because re-raised remote exceptions are
modified such that their class name ends with the '_Remote' suffix so
you may do::
listed in the allow_remote_exmods messaging.get_rpc_transport()
parameter, then it this exception will be re-raised by call(). However,
such locally re-raised remote exceptions are distinguishable from the
same exception type raised locally because re-raised remote exceptions
are modified such that their class name ends with the '_Remote' suffix
so you may do::
if ex.__class__.__name__.endswith('_Remote'):
# Some special case for locally re-raised remote exceptions

View File

@ -20,12 +20,12 @@ methods which may be invoked remotely by clients over a given transport.
To create an RPC server, you supply a transport, target and a list of
endpoints.
A transport can be obtained simply by calling the get_transport() method::
A transport can be obtained simply by calling the get_rpc_transport() method::
transport = messaging.get_transport(conf)
transport = messaging.get_rpc_transport(conf)
which will load the appropriate transport driver according to the user's
messaging configuration. See get_transport() for more details.
messaging configuration. See get_rpc_transport() for more details.
The target supplied when creating an RPC server expresses the topic, server
name and - optionally - the exchange to listen on. See Target for more details
@ -98,7 +98,7 @@ A simple example of an RPC server with multiple endpoints might be::
def test(self, ctx, arg):
return arg
transport = oslo_messaging.get_transport(cfg.CONF)
transport = oslo_messaging.get_rpc_transport(cfg.CONF)
target = oslo_messaging.Target(topic='test', server='server1')
endpoints = [
ServerControlEndpoint(None),

View File

@ -0,0 +1,47 @@
# Copyright 2017 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
'get_rpc_transport'
]
from oslo_messaging import transport as msg_transport
def get_rpc_transport(conf, url=None,
allowed_remote_exmods=None):
"""A factory method for Transport objects for RPCs.
This method should be used to ensure the correct messaging functionality
for RPCs. RPCs and Notifications may use separate messaging systems
that utilize different drivers, different access permissions,
message delivery, etc.
Presently, this function works exactly the same as get_transport. It's
use is recommended as disambiguates the intended use for the transport
and may in the future extend functionality related to the separation of
messaging backends.
:param conf: the user configuration
:type conf: cfg.ConfigOpts
:param url: a transport URL
:type url: str or TransportURL
:param allowed_remote_exmods: a list of modules which a client using this
transport will deserialize remote exceptions
from
:type allowed_remote_exmods: list
"""
return msg_transport._get_transport(conf, url,
allowed_remote_exmods)

View File

@ -58,7 +58,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow')
def test_logger(self, mock_utcnow):
with mock.patch('oslo_messaging.transport.get_transport',
with mock.patch('oslo_messaging.transport._get_transport',
return_value=test_notifier._FakeTransport(self.conf)):
self.logger = oslo_messaging.LoggingNotificationHandler('test://')
@ -102,7 +102,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow')
def test_logging_conf(self, mock_utcnow):
with mock.patch('oslo_messaging.transport.get_transport',
with mock.patch('oslo_messaging.transport._get_transport',
return_value=test_notifier._FakeTransport(self.conf)):
logging.config.dictConfig({
'version': 1,

View File

@ -113,7 +113,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
def test_constructor(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
@ -135,7 +135,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual('blocking', server.executor_type)
def test_constructor_without_explicit_RPCAccessPolicy(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
@ -148,7 +148,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual(FutureWarning, w.category)
def test_server_wait_method(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
@ -180,7 +180,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual(1, listener.cleanup.call_count)
def test_no_target_server(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
server = oslo_messaging.get_rpc_server(
transport,
@ -195,7 +195,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertTrue(False)
def test_no_server_topic(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(server='testserver')
server = oslo_messaging.get_rpc_server(transport, target, [])
try:
@ -207,7 +207,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertTrue(False)
def _test_no_client_topic(self, call=True):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = self._setup_client(transport, topic=None)
@ -228,7 +228,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._test_no_client_topic(call=False)
def test_client_call_timeout(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
finished = False
wait = threading.Condition()
@ -256,7 +256,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread)
def test_unknown_executor(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
try:
oslo_messaging.get_rpc_server(transport, None, [], executor='foo')
@ -267,7 +267,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertTrue(False)
def test_cast(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
def __init__(self):
@ -288,7 +288,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual(['dsfoo', 'dsbar'], endpoint.pings)
def test_call(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
def ping(self, ctxt, arg):
@ -307,7 +307,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread)
def test_direct_call(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
def ping(self, ctxt, arg):
@ -327,7 +327,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread)
def test_context(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
def ctxt_check(self, ctxt, key):
@ -344,7 +344,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread)
def test_failure(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
def ping(self, ctxt, arg):
@ -384,7 +384,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread)
def test_expected_failure(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
debugs = []
errors = []
@ -529,9 +529,9 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
url1 = 'fake:///' + (self.exchange1 or '')
url2 = 'fake:///' + (self.exchange2 or '')
transport1 = oslo_messaging.get_transport(self.conf, url=url1)
transport1 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
if url1 != url2:
transport2 = oslo_messaging.get_transport(self.conf, url=url1)
transport2 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
else:
transport2 = transport1

View File

@ -77,8 +77,33 @@ class Transport(object):
This is a mostly opaque handle for an underlying messaging transport
driver.
It has a single 'conf' property which is the cfg.ConfigOpts instance used
to construct the transport object.
RPCs and Notifications may use separate messaging systems that utilize
different drivers, access permissions, message delivery, etc. To ensure
the correct messaging functionality, the corresponding method should be
used to construct a Transport object from transport configuration
gleaned from the user's configuration and, optionally, a transport URL.
The factory method for RPC Transport objects::
def get_rpc_transport(conf, url=None,
allowed_remote_exmods=None)
If a transport URL is supplied as a parameter, any transport configuration
contained in it takes precedence. If no transport URL is supplied, but
there is a transport URL supplied in the user's configuration then that
URL will take the place of the URL parameter.
The factory method for Notification Transport objects::
def get_notification_transport(conf, url=None,
allowed_remote_exmods=None)
If no transport URL is provided, the URL in the notifications section of
the config file will be used. If that URL is also absent, the same
transport as specified in the user's default section will be used.
The Transport has a single 'conf' property which is the cfg.ConfigOpts
instance used to construct the transport object.
"""
def __init__(self, driver):
@ -146,6 +171,31 @@ class DriverLoadFailure(exceptions.MessagingException):
self.ex = ex
def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
allowed_remote_exmods = allowed_remote_exmods or []
conf.register_opts(_transport_opts)
if not isinstance(url, TransportURL):
url = TransportURL.parse(conf, url, aliases)
kwargs = dict(default_exchange=conf.control_exchange,
allowed_remote_exmods=allowed_remote_exmods)
try:
mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport.split('+')[0],
invoke_on_load=True,
invoke_args=[conf, url],
invoke_kwds=kwargs)
except RuntimeError as ex:
raise DriverLoadFailure(url.transport, ex)
return Transport(mgr.driver)
@removals.remove(
message='use get_rpc_transport or get_notification_transport'
)
@removals.removed_kwarg('aliases',
'Parameter aliases is deprecated for removal.')
def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
@ -178,25 +228,8 @@ def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
:param aliases: DEPRECATED: A map of transport alias to transport name
:type aliases: dict
"""
allowed_remote_exmods = allowed_remote_exmods or []
conf.register_opts(_transport_opts)
if not isinstance(url, TransportURL):
url = TransportURL.parse(conf, url, aliases)
kwargs = dict(default_exchange=conf.control_exchange,
allowed_remote_exmods=allowed_remote_exmods)
try:
mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport.split('+')[0],
invoke_on_load=True,
invoke_args=[conf, url],
invoke_kwds=kwargs)
except RuntimeError as ex:
raise DriverLoadFailure(url.transport, ex)
return Transport(mgr.driver)
return _get_transport(conf, url,
allowed_remote_exmods, aliases)
class TransportHost(object):

View File

@ -0,0 +1,10 @@
---
features:
- |
Add get_rpc_transport call to make the API clear for the separation
of RPC and Notification messaging backends.
deprecations:
- |
Deprecate get_transport and use get_rpc_transport or
get_notification_transport to make the API usage clear for the
separation of RPC and Notification messaging backends.