From 8a4aecb155478e9493f4d36b080ccdf6be406eba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Dulko?= Date: Thu, 15 Sep 2016 10:06:45 +0200 Subject: [PATCH] Add scheduler RPC API v3.0 This patch creates scheduler RPC API version 3.0, while retaining compatibility in rpcapi and manager for 2.x, allowing for continuous deployment scenarios. This should be merged just before the Newton release. Change-Id: I8eb9c1ae93d84c63a061294fc570be1d9eed69ba --- cinder/rpc.py | 7 ++ cinder/scheduler/manager.py | 88 ++++++++++++++++- cinder/scheduler/rpcapi.py | 94 +++++++++++-------- cinder/tests/unit/scheduler/test_rpcapi.py | 51 ++++++---- .../notes/rpc-apis-3.0-b745f429c11d8198.yaml | 8 ++ 5 files changed, 185 insertions(+), 63 deletions(-) create mode 100644 releasenotes/notes/rpc-apis-3.0-b745f429c11d8198.yaml diff --git a/cinder/rpc.py b/cinder/rpc.py index 4b1950e43..4308a8819 100644 --- a/cinder/rpc.py +++ b/cinder/rpc.py @@ -198,6 +198,13 @@ class RPCAPI(object): self.client = get_client(target, version_cap=rpc_version_cap, serializer=serializer) + def _compat_ver(self, current, *legacy): + versions = (current,) + legacy + for version in versions[:-1]: + if self.client.can_send_version(version): + return version + return versions[-1] + @classmethod def determine_rpc_version_cap(cls): global LAST_RPC_VERSIONS diff --git a/cinder/scheduler/manager.py b/cinder/scheduler/manager.py index 70118e91c..ecb639cfc 100644 --- a/cinder/scheduler/manager.py +++ b/cinder/scheduler/manager.py @@ -57,13 +57,9 @@ LOG = logging.getLogger(__name__) class SchedulerManager(manager.Manager): """Chooses a host to create volumes.""" - # FIXME(caosf): Remove unused argument 'topic' from functions - # create_consistencygroup(), create_volume(), migrate_volume_to_host(), - # retype() and manage_existing() in v3.0 of RPC API. - RPC_API_VERSION = scheduler_rpcapi.SchedulerAPI.RPC_API_VERSION - target = messaging.Target(version=RPC_API_VERSION) + target = messaging.Target(version='2.3') def __init__(self, scheduler_driver=None, service_name=None, *args, **kwargs): @@ -71,6 +67,7 @@ class SchedulerManager(manager.Manager): scheduler_driver = CONF.scheduler_driver self.driver = importutils.import_object(scheduler_driver) super(SchedulerManager, self).__init__(*args, **kwargs) + self.additional_endpoints.append(_SchedulerV3Proxy(self)) self._startup_delay = True def init_host_with_rpc(self): @@ -356,3 +353,84 @@ class SchedulerManager(manager.Manager): rpc.get_notifier("scheduler").error(context, 'scheduler.' + method, payload) + + +# TODO(dulek): This goes away immediately in Ocata and is just present in +# Newton so that we can receive v2.x and v3.0 messages. +class _SchedulerV3Proxy(object): + target = messaging.Target(version='3.0') + + def __init__(self, manager): + self.manager = manager + + def update_service_capabilities(self, context, service_name=None, + host=None, capabilities=None, **kwargs): + return self.manager.update_service_capabilities( + context, service_name=service_name, host=host, + capabilities=capabilities, **kwargs) + + def create_consistencygroup(self, context, group, request_spec_list=None, + filter_properties_list=None): + # NOTE(dulek): Second argument here is `topic` which is unused. We're + # getting rid of it in 3.0, hence it's missing from method signature. + return self.manager.create_consistencygroup( + context, None, group, request_spec_list=request_spec_list, + filter_properties_list=filter_properties_list) + + def create_group(self, context, group, group_spec=None, + group_filter_properties=None, request_spec_list=None, + filter_properties_list=None): + # NOTE(dulek): Second argument here is `topic` which is unused. We're + # getting rid of it in 3.0, hence it's missing from method signature. + return self.manager.create_group( + context, None, group, group_spec=group_spec, + group_filter_properties=group_filter_properties, + request_spec_list=request_spec_list, + filter_properties_list=filter_properties_list) + + def create_volume(self, context, volume, snapshot_id=None, image_id=None, + request_spec=None, filter_properties=None): + # NOTE(dulek): Second argument here is `topic`, which is unused. We're + # getting rid of it in 3.0, hence it's missing from method signature. + # We're also replacing volume_id with volume object (switched from + # optional keyword argument to positional argument). + return self.manager.create_volume( + context, None, volume.id, snapshot_id=snapshot_id, + image_id=image_id, request_spec=request_spec, + filter_properties=filter_properties, volume=volume) + + def request_service_capabilities(self, context): + return self.manager.request_service_capabilities(context) + + def migrate_volume_to_host(self, context, volume, host, + force_host_copy, request_spec, + filter_properties=None): + # NOTE(dulek): Second argument here is `topic` which is unused. We're + # getting rid of it in 3.0, hence it's missing from method signature. + # We're also replacing volume_id with volume object (switched from + # optional keyword argument to positional argument). + return self.manager.migrate_volume_to_host( + context, None, volume.id, host, force_host_copy, request_spec, + filter_propterties=filter_properties, volume=volume) + + def retype(self, context, volume, request_spec, filter_properties=None): + # NOTE(dulek): Second argument here is `topic` which is unused. We're + # getting rid of it in 3.0, hence it's missing from method signature. + # We're also replacing volume_id with volume object (switched from + # optional keyword argument to positional argument). + return self.manager.retype( + context, None, volume.id, request_spec, + filter_properties=filter_properties, volume=volume) + + def manage_existing(self, context, volume, request_spec, + filter_properties=None): + # NOTE(dulek): Second argument here is `topic` which is unused. We're + # getting rid of it in 3.0, hence it's missing from method signature. + # We're also replacing volume_id with volume object (switched from + # optional keyword argument to positional argument). + return self.manager.manage_existing( + context, None, volume.id, request_spec, + filter_properties=filter_properties, volume=volume) + + def get_pools(self, context, filters=None): + return self.manager.get_pools(context, filters=filters) diff --git a/cinder/scheduler/rpcapi.py b/cinder/scheduler/rpcapi.py index 2b128bdc9..772a413dc 100644 --- a/cinder/scheduler/rpcapi.py +++ b/cinder/scheduler/rpcapi.py @@ -52,38 +52,44 @@ class SchedulerAPI(rpc.RPCAPI): 2.1 - Adds support for sending objects over RPC in manage_existing() 2.2 - Sends request_spec as object in create_volume() 2.3 - Add create_group method + + ... Newton supports messaging 2.3. Any changes to existing methods in + 2.x after this point should be done so that they can handle version cap + set to 2.3. + + 3.0 - Remove 2.x compatibility """ - RPC_API_VERSION = '2.3' + RPC_API_VERSION = '3.0' TOPIC = constants.SCHEDULER_TOPIC BINARY = 'cinder-scheduler' - # FIXME(caosf): Remove unused argument 'topic' from functions - # create_consistencygroup(), create_volume(), migrate_volume_to_host(), - # retype() and manage_existing() in v3.0 of RPC API. - def create_consistencygroup(self, ctxt, topic, group, request_spec_list=None, filter_properties_list=None): - version = '2.1' + version = self._compat_ver('3.0', '2.0') cctxt = self.client.prepare(version=version) request_spec_p_list = [] for request_spec in request_spec_list: request_spec_p = jsonutils.to_primitive(request_spec) request_spec_p_list.append(request_spec_p) - return cctxt.cast(ctxt, 'create_consistencygroup', - topic=topic, - group=group, - request_spec_list=request_spec_p_list, - filter_properties_list=filter_properties_list) + msg_args = { + 'group': group, 'request_spec_list': request_spec_p_list, + 'filter_properties_list': filter_properties_list, + } + + if version == '2.0': + msg_args['topic'] = topic + + return cctxt.cast(ctxt, 'create_consistencygroup', **msg_args) def create_group(self, ctxt, topic, group, group_spec=None, request_spec_list=None, group_filter_properties=None, filter_properties_list=None): - version = '2.3' + version = self._compat_ver('3.0', '2.3') cctxt = self.client.prepare(version=version) request_spec_p_list = [] for request_spec in request_spec_list: @@ -91,26 +97,31 @@ class SchedulerAPI(rpc.RPCAPI): request_spec_p_list.append(request_spec_p) group_spec_p = jsonutils.to_primitive(group_spec) - return cctxt.cast(ctxt, 'create_group', - topic=topic, - group=group, - group_spec=group_spec_p, - request_spec_list=request_spec_p_list, - group_filter_properties=group_filter_properties, - filter_properties_list=filter_properties_list) + msg_args = { + 'group': group, 'group_spec': group_spec_p, + 'request_spec_list': request_spec_p_list, + 'group_filter_properties': group_filter_properties, + 'filter_properties_list': filter_properties_list, + } + + if version == '2.3': + msg_args['topic'] = topic + + return cctxt.cast(ctxt, 'create_group', **msg_args) def create_volume(self, ctxt, topic, volume_id, snapshot_id=None, image_id=None, request_spec=None, filter_properties=None, volume=None): request_spec_p = jsonutils.to_primitive(request_spec) - msg_args = {'topic': topic, 'volume_id': volume_id, - 'snapshot_id': snapshot_id, 'image_id': image_id, + msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id, 'request_spec': request_spec_p, 'filter_properties': filter_properties, 'volume': volume} - version = '2.2' - if not self.client.can_send_version('2.2'): + version = self._compat_ver('3.0', '2.2', '2.0') + if version in ('2.2', '2.0'): + msg_args['volume_id'] = volume.id + msg_args['topic'] = topic + if version == '2.0': # Send request_spec as dict - version = '2.0' msg_args['request_spec'] = jsonutils.to_primitive(request_spec) cctxt = self.client.prepare(version=version) @@ -120,23 +131,29 @@ class SchedulerAPI(rpc.RPCAPI): force_host_copy=False, request_spec=None, filter_properties=None, volume=None): request_spec_p = jsonutils.to_primitive(request_spec) - msg_args = {'topic': topic, 'volume_id': volume_id, - 'host': host, 'force_host_copy': force_host_copy, + msg_args = {'host': host, 'force_host_copy': force_host_copy, 'request_spec': request_spec_p, 'filter_properties': filter_properties, 'volume': volume} - version = '2.0' + version = self._compat_ver('3.0', '2.0') + + if version == '2.0': + msg_args['volume_id'] = volume.id + msg_args['topic'] = topic cctxt = self.client.prepare(version=version) return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args) - def retype(self, ctxt, topic, volume_id, - request_spec=None, filter_properties=None, volume=None): + def retype(self, ctxt, topic, volume_id, request_spec=None, + filter_properties=None, volume=None): request_spec_p = jsonutils.to_primitive(request_spec) - msg_args = {'topic': topic, 'volume_id': volume_id, - 'request_spec': request_spec_p, + msg_args = {'request_spec': request_spec_p, 'filter_properties': filter_properties, 'volume': volume} - version = '2.0' + version = self._compat_ver('3.0', '2.0') + + if version == '2.0': + msg_args['volume_id'] = volume.id + msg_args['topic'] = topic cctxt = self.client.prepare(version=version) return cctxt.cast(ctxt, 'retype', **msg_args) @@ -146,19 +163,20 @@ class SchedulerAPI(rpc.RPCAPI): volume=None): request_spec_p = jsonutils.to_primitive(request_spec) msg_args = { - 'topic': topic, 'volume_id': volume_id, 'request_spec': request_spec_p, 'filter_properties': filter_properties, 'volume': volume, } - version = '2.1' - if not self.client.can_send_version('2.1'): - version = '2.0' + version = self._compat_ver('3.0', '2.1', '2.0') + if version in ('2.1', '2.0'): + msg_args['volume_id'] = volume.id + msg_args['topic'] = topic + if version == '2.0': msg_args.pop('volume') cctxt = self.client.prepare(version=version) return cctxt.cast(ctxt, 'manage_existing', **msg_args) def get_pools(self, ctxt, filters=None): - version = '2.0' + version = self._compat_ver('3.0', '2.0') cctxt = self.client.prepare(version=version) return cctxt.call(ctxt, 'get_pools', filters=filters) @@ -166,7 +184,7 @@ class SchedulerAPI(rpc.RPCAPI): def update_service_capabilities(self, ctxt, service_name, host, capabilities): - version = '2.0' + version = self._compat_ver('3.0', '2.0') cctxt = self.client.prepare(fanout=True, version=version) cctxt.cast(ctxt, 'update_service_capabilities', service_name=service_name, host=host, diff --git a/cinder/tests/unit/scheduler/test_rpcapi.py b/cinder/tests/unit/scheduler/test_rpcapi.py index 040d70cf4..d61b2c142 100644 --- a/cinder/tests/unit/scheduler/test_rpcapi.py +++ b/cinder/tests/unit/scheduler/test_rpcapi.py @@ -25,6 +25,8 @@ import mock from cinder import context from cinder.scheduler import rpcapi as scheduler_rpcapi from cinder import test +from cinder.tests.unit import fake_constants +from cinder.tests.unit import fake_volume @ddt.ddt @@ -32,13 +34,17 @@ class SchedulerRpcAPITestCase(test.TestCase): def setUp(self): super(SchedulerRpcAPITestCase, self).setUp() + self.patch('oslo_messaging.RPCClient.can_send_version', + return_value=True) + self.context = context.RequestContext('fake_user', 'fake_project') + self.volume_id = fake_constants.VOLUME_ID def tearDown(self): super(SchedulerRpcAPITestCase, self).tearDown() def _test_scheduler_api(self, method, rpc_method, fanout=False, **kwargs): - ctxt = context.RequestContext('fake_user', 'fake_project') + ctxt = self.context rpcapi = scheduler_rpcapi.SchedulerAPI() expected_retval = 'foo' if rpc_method == 'call' else None @@ -84,21 +90,22 @@ class SchedulerRpcAPITestCase(test.TestCase): host='fake_host', capabilities='fake_capabilities', fanout=True, - version='2.0') + version='3.0') @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) def test_create_volume(self, can_send_version): self._test_scheduler_api('create_volume', rpc_method='cast', topic='topic', - volume_id='volume_id', + volume_id=self.volume_id, snapshot_id='snapshot_id', image_id='image_id', request_spec='fake_request_spec', filter_properties='filter_properties', - volume='volume', - version='2.2') - can_send_version.assert_has_calls([mock.call('2.2')]) + volume=fake_volume.fake_volume_obj( + self.context), + version='3.0') + can_send_version.assert_has_calls([mock.call('3.0')]) @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=False) @@ -106,36 +113,39 @@ class SchedulerRpcAPITestCase(test.TestCase): self._test_scheduler_api('create_volume', rpc_method='cast', topic='topic', - volume_id='volume_id', + volume_id=self.volume_id, snapshot_id='snapshot_id', image_id='image_id', request_spec='fake_request_spec', filter_properties='filter_properties', - volume='volume', + volume=fake_volume.fake_volume_obj( + self.context), version='2.0') - can_send_version.assert_has_calls([mock.call('2.2')]) + can_send_version.assert_has_calls([mock.call('3.0'), mock.call('2.2')]) def test_migrate_volume_to_host(self): self._test_scheduler_api('migrate_volume_to_host', rpc_method='cast', topic='topic', - volume_id='volume_id', + volume_id=self.volume_id, host='host', force_host_copy=True, request_spec='fake_request_spec', filter_properties='filter_properties', - volume='volume', - version='2.0') + volume=fake_volume.fake_volume_obj( + self.context), + version='3.0') def test_retype(self): self._test_scheduler_api('retype', rpc_method='cast', topic='topic', - volume_id='volume_id', + volume_id=self.volume_id, request_spec='fake_request_spec', filter_properties='filter_properties', - volume='volume', - version='2.0') + volume=fake_volume.fake_volume_obj( + self.context), + version='3.0') @ddt.data('2.0', '2.1') @mock.patch('oslo_messaging.RPCClient.can_send_version') @@ -144,18 +154,19 @@ class SchedulerRpcAPITestCase(test.TestCase): self._test_scheduler_api('manage_existing', rpc_method='cast', topic='topic', - volume_id='volume_id', + volume_id=self.volume_id, request_spec='fake_request_spec', filter_properties='filter_properties', - volume='volume', + volume=fake_volume.fake_volume_obj( + self.context), version=version) - can_send_version.assert_called_with('2.1') + can_send_version.assert_has_calls([mock.call('3.0'), mock.call('2.1')]) def test_get_pools(self): self._test_scheduler_api('get_pools', rpc_method='call', filters=None, - version='2.0') + version='3.0') def test_create_group(self): self._test_scheduler_api('create_group', @@ -168,4 +179,4 @@ class SchedulerRpcAPITestCase(test.TestCase): 'fake_group_filter_properties', filter_properties_list= ['fake_filter_properties_list'], - version='2.3') + version='3.0') diff --git a/releasenotes/notes/rpc-apis-3.0-b745f429c11d8198.yaml b/releasenotes/notes/rpc-apis-3.0-b745f429c11d8198.yaml new file mode 100644 index 000000000..84f9daf17 --- /dev/null +++ b/releasenotes/notes/rpc-apis-3.0-b745f429c11d8198.yaml @@ -0,0 +1,8 @@ +--- +upgrade: + - Deployments doing continuous live upgrades from master branch should not + upgrade into Ocata before doing an upgrade which includes all the Newton's + RPC API version bump commits (scheduler, volume). If you're upgrading + deployment in a release-to-release manner, then you can safely ignore this + note. +