Port all rpcapi modules to oslo.messaging interface
Add a temporary nova.rpcclient.RPCClient helper class which translates oslo.messaging.rpc.RPCClient compatible calls into calls on a RpcProxy object. Use this new class to port all of the rpcapi modules over to the new RPCClient so that the final port of Nova over to oslo.messaging will be smaller and easier to review. This patch contains no functional changes at all, except that all client side RPCs go through this temporary helper class. blueprint: oslo-messaging Change-Id: Iee86c36bcc474a604993618b8a2255af8c3d2f48
This commit is contained in:
parent
78410c874f
commit
5668e57616
|
@ -21,8 +21,7 @@ Base RPC client and server common to all services.
|
|||
from oslo.config import cfg
|
||||
|
||||
from nova.openstack.common import jsonutils
|
||||
from nova.openstack.common import rpc
|
||||
import nova.openstack.common.rpc.proxy as rpc_proxy
|
||||
from nova import rpcclient
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
@ -34,7 +33,7 @@ CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
|
|||
_NAMESPACE = 'baseapi'
|
||||
|
||||
|
||||
class BaseAPI(rpc_proxy.RpcProxy):
|
||||
class BaseAPI(rpcclient.RpcProxy):
|
||||
"""Client side of the base rpc API.
|
||||
|
||||
API version history:
|
||||
|
@ -63,18 +62,17 @@ class BaseAPI(rpc_proxy.RpcProxy):
|
|||
super(BaseAPI, self).__init__(topic=topic,
|
||||
default_version=self.BASE_RPC_API_VERSION,
|
||||
version_cap=version_cap)
|
||||
self.namespace = _NAMESPACE
|
||||
|
||||
self.client = self.get_client(namespace=_NAMESPACE)
|
||||
|
||||
def ping(self, context, arg, timeout=None):
|
||||
arg_p = jsonutils.to_primitive(arg)
|
||||
msg = self.make_namespaced_msg('ping', self.namespace, arg=arg_p)
|
||||
return self.call(context, msg, timeout=timeout)
|
||||
cctxt = self.client.prepare(timeout=timeout)
|
||||
return cctxt.call(context, 'ping', arg=arg_p)
|
||||
|
||||
def get_backdoor_port(self, context, host):
|
||||
msg = self.make_namespaced_msg('get_backdoor_port', self.namespace)
|
||||
return self.call(context, msg,
|
||||
topic=rpc.queue_get_for(context, self.topic, host),
|
||||
version='1.1')
|
||||
cctxt = self.client.prepare(server=host, version='1.1')
|
||||
return cctxt.call(context, 'get_backdoor_port')
|
||||
|
||||
|
||||
class BaseRPCAPI(object):
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
A temporary helper which emulates oslo.messaging.rpc.RPCClient.
|
||||
|
||||
The most tedious part of porting to oslo.messaging is porting the code which
|
||||
sub-classes RpcProxy to use RPCClient instead.
|
||||
|
||||
This helper method allows us to do that tedious porting as a standalone commit
|
||||
so that the commit which switches us to oslo.messaging is smaller and easier
|
||||
to review. This file will be removed as part of that commit.
|
||||
"""
|
||||
|
||||
from nova.openstack.common.rpc import proxy
|
||||
|
||||
|
||||
class RPCClient(object):
|
||||
|
||||
def __init__(self, proxy, namespace=None, server_params=None):
|
||||
super(RPCClient, self).__init__()
|
||||
self.proxy = proxy
|
||||
self.namespace = namespace
|
||||
self.server_params = server_params
|
||||
self.kwargs = {}
|
||||
self.fanout = None
|
||||
|
||||
def prepare(self, **kwargs):
|
||||
# Clone ourselves
|
||||
ret = self.__class__(self.proxy, self.namespace, self.server_params)
|
||||
ret.kwargs.update(self.kwargs)
|
||||
ret.fanout = self.fanout
|
||||
|
||||
# Update according to supplied kwargs
|
||||
ret.kwargs.update(kwargs)
|
||||
server = ret.kwargs.pop('server', None)
|
||||
if server:
|
||||
ret.kwargs['topic'] = '%s.%s' % (self.proxy.topic, server)
|
||||
fanout = ret.kwargs.pop('fanout', None)
|
||||
if fanout:
|
||||
ret.fanout = True
|
||||
|
||||
return ret
|
||||
|
||||
def _invoke(self, cast_or_call, ctxt, method, **kwargs):
|
||||
try:
|
||||
msg = self.proxy.make_namespaced_msg(method,
|
||||
self.namespace,
|
||||
**kwargs)
|
||||
return cast_or_call(ctxt, msg, **self.kwargs)
|
||||
finally:
|
||||
self.kwargs = {}
|
||||
self.fanout = None
|
||||
|
||||
def cast(self, ctxt, method, **kwargs):
|
||||
if self.server_params:
|
||||
def cast_to_server(ctxt, msg, **kwargs):
|
||||
if self.fanout:
|
||||
return self.proxy.fanout_cast_to_server(
|
||||
ctxt, self.server_params, msg, **kwargs)
|
||||
else:
|
||||
return self.proxy.cast_to_server(
|
||||
ctxt, self.server_params, msg, **kwargs)
|
||||
|
||||
caster = cast_to_server
|
||||
else:
|
||||
caster = self.proxy.fanout_cast if self.fanout else self.proxy.cast
|
||||
|
||||
self._invoke(caster, ctxt, method, **kwargs)
|
||||
|
||||
def call(self, ctxt, method, **kwargs):
|
||||
return self._invoke(self.proxy.call, ctxt, method, **kwargs)
|
||||
|
||||
def can_send_version(self, version):
|
||||
return self.proxy.can_send_version(version)
|
||||
|
||||
|
||||
class RpcProxy(proxy.RpcProxy):
|
||||
|
||||
def get_client(self, namespace=None, server_params=None):
|
||||
return RPCClient(self,
|
||||
namespace=namespace,
|
||||
server_params=server_params)
|
|
@ -21,7 +21,7 @@ Client side of the scheduler manager RPC API.
|
|||
from oslo.config import cfg
|
||||
|
||||
from nova.openstack.common import jsonutils
|
||||
import nova.openstack.common.rpc.proxy
|
||||
from nova import rpcclient
|
||||
|
||||
rpcapi_opts = [
|
||||
cfg.StrOpt('scheduler_topic',
|
||||
|
@ -37,7 +37,7 @@ rpcapi_cap_opt = cfg.StrOpt('scheduler',
|
|||
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
|
||||
|
||||
|
||||
class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
||||
class SchedulerAPI(rpcclient.RpcProxy):
|
||||
'''Client side of the scheduler rpc API.
|
||||
|
||||
API version history:
|
||||
|
@ -94,11 +94,12 @@ class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
|||
super(SchedulerAPI, self).__init__(topic=CONF.scheduler_topic,
|
||||
default_version=self.BASE_RPC_API_VERSION,
|
||||
version_cap=version_cap)
|
||||
self.client = self.get_client()
|
||||
|
||||
def select_destinations(self, ctxt, request_spec, filter_properties):
|
||||
return self.call(ctxt, self.make_msg('select_destinations',
|
||||
request_spec=request_spec, filter_properties=filter_properties),
|
||||
version='2.7')
|
||||
cctxt = self.client.prepare(version='2.7')
|
||||
return cctxt.call(ctxt, 'select_destinations',
|
||||
request_spec=request_spec, filter_properties=filter_properties)
|
||||
|
||||
def run_instance(self, ctxt, request_spec, admin_password,
|
||||
injected_files, requested_networks, is_first_time,
|
||||
|
@ -110,11 +111,11 @@ class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
|||
'requested_networks': requested_networks,
|
||||
'is_first_time': is_first_time,
|
||||
'filter_properties': filter_properties}
|
||||
if self.can_send_version('2.9'):
|
||||
if self.client.can_send_version('2.9'):
|
||||
version = '2.9'
|
||||
msg_kwargs['legacy_bdm_in_spec'] = legacy_bdm_in_spec
|
||||
return self.cast(ctxt, self.make_msg('run_instance', **msg_kwargs),
|
||||
version=version)
|
||||
cctxt = self.client.prepare(version=version)
|
||||
return cctxt.cast(ctxt, 'run_instance', **msg_kwargs)
|
||||
|
||||
def prep_resize(self, ctxt, instance, instance_type, image,
|
||||
request_spec, filter_properties, reservations):
|
||||
|
@ -122,24 +123,24 @@ class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
|||
instance_type_p = jsonutils.to_primitive(instance_type)
|
||||
reservations_p = jsonutils.to_primitive(reservations)
|
||||
image_p = jsonutils.to_primitive(image)
|
||||
self.cast(ctxt, self.make_msg('prep_resize',
|
||||
instance=instance_p, instance_type=instance_type_p,
|
||||
image=image_p, request_spec=request_spec,
|
||||
filter_properties=filter_properties,
|
||||
reservations=reservations_p))
|
||||
self.client.cast(ctxt, 'prep_resize',
|
||||
instance=instance_p, instance_type=instance_type_p,
|
||||
image=image_p, request_spec=request_spec,
|
||||
filter_properties=filter_properties,
|
||||
reservations=reservations_p)
|
||||
|
||||
def update_service_capabilities(self, ctxt, service_name, host,
|
||||
capabilities):
|
||||
#NOTE(jogo) This is deprecated, but is used by the deprecated
|
||||
# publish_service_capabilities call. So this can begin its removal
|
||||
# process once publish_service_capabilities is removed.
|
||||
self.fanout_cast(ctxt, self.make_msg('update_service_capabilities',
|
||||
service_name=service_name, host=host,
|
||||
capabilities=capabilities),
|
||||
version='2.4')
|
||||
cctxt = self.client.prepare(fanout=True, version='2.4')
|
||||
cctxt.cast(ctxt, 'update_service_capabilities',
|
||||
service_name=service_name, host=host,
|
||||
capabilities=capabilities)
|
||||
|
||||
def select_hosts(self, ctxt, request_spec, filter_properties):
|
||||
return self.call(ctxt, self.make_msg('select_hosts',
|
||||
request_spec=request_spec,
|
||||
filter_properties=filter_properties),
|
||||
version='2.6')
|
||||
cctxt = self.client.prepare(version='2.6')
|
||||
return cctxt.call(ctxt, 'select_hosts',
|
||||
request_spec=request_spec,
|
||||
filter_properties=filter_properties)
|
||||
|
|
Loading…
Reference in New Issue