From 159062882ec1a004cc62fffc9c426c5a5e533e15 Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Fri, 24 Feb 2017 06:19:13 -0800 Subject: [PATCH] Change MQ targeting to honor only what is in the context Previously we had aimed to make things like compute RPC automatically look up the InstanceMapping or HostMapping for the call being performed to target appropriately. However, we cannot do that within the cell, and even trying incurs some overhead. For now, just deprecate the by_instance() and by_host() methods and honor what is in the context (if set) and otherwise fall back to the default client. Make the context target routines create and store the RPC transport and remove the caching logic from the ClientRouter since we're removing its ability to do that. Related to blueprint cells-aware-api Change-Id: I10f374adca672576058c4dbab708c040d166df47 --- nova/context.py | 6 + nova/rpc.py | 86 ++++---------- nova/tests/unit/compute/test_rpcapi.py | 2 +- nova/tests/unit/test_context.py | 14 ++- nova/tests/unit/test_rpc.py | 158 ++++--------------------- 5 files changed, 61 insertions(+), 205 deletions(-) diff --git a/nova/context.py b/nova/context.py index bc5a21a92e54..4dfd0257228a 100644 --- a/nova/context.py +++ b/nova/context.py @@ -370,8 +370,12 @@ def set_target_cell(context, cell_mapping): """ # avoid circular import from nova import db + from nova import rpc db_connection_string = cell_mapping.database_connection context.db_connection = db.create_context_manager(db_connection_string) + if not cell_mapping.transport_url.startswith('none'): + context.mq_connection = rpc.create_transport( + cell_mapping.transport_url) @contextmanager @@ -386,8 +390,10 @@ def target_cell(context, cell_mapping): :param cell_mapping: A objects.CellMapping object """ original_db_connection = context.db_connection + original_mq_connection = context.mq_connection set_target_cell(context, cell_mapping) try: yield context finally: context.db_connection = original_db_connection + context.mq_connection = original_mq_connection diff --git a/nova/rpc.py b/nova/rpc.py index dacb6ccb9288..f374a7f4e3f1 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -34,13 +34,11 @@ from oslo_messaging.rpc import dispatcher from oslo_serialization import jsonutils from oslo_service import periodic_task from oslo_utils import importutils -from oslo_utils import timeutils import nova.conf import nova.context import nova.exception from nova.i18n import _ -from nova import objects profiler = importutils.try_import("osprofiler.profiler") @@ -395,27 +393,14 @@ class LegacyValidatingNotifier(object): getattr(self.notifier, priority)(ctxt, event_type, payload) -class ClientWrapper(object): - def __init__(self, client): - self._client = client - self.last_access_time = timeutils.utcnow() - - @property - def client(self): - self.last_access_time = timeutils.utcnow() - return self._client - - class ClientRouter(periodic_task.PeriodicTasks): - """Creates and caches RPC clients that route to cells or the default. - - The default client connects to the API cell message queue. The rest of the - clients connect to compute cell message queues. + """Creates RPC clients that honor the context's RPC transport + or provides a default. """ + def __init__(self, default_client): super(ClientRouter, self).__init__(CONF) - self.clients = {} - self.clients['default'] = ClientWrapper(default_client) + self.default_client = default_client self.target = default_client.target self.version_cap = default_client.version_cap # NOTE(melwitt): Cells v1 does its own serialization and won't @@ -424,55 +409,24 @@ class ClientRouter(periodic_task.PeriodicTasks): # Prevent this empty context from overwriting the thread local copy self.run_periodic_tasks(nova.context.RequestContext(overwrite=False)) - def _client(self, context, cell_mapping=None): - if cell_mapping: - client_id = cell_mapping.uuid + def _client(self, context, transport=None): + if transport: + return messaging.RPCClient(transport, self.target, + version_cap=self.version_cap, + serializer=self.serializer) else: - client_id = 'default' - - try: - client = self.clients[client_id].client - except KeyError: - transport = create_transport(cell_mapping.transport_url) - client = messaging.RPCClient(transport, self.target, - version_cap=self.version_cap, - serializer=self.serializer) - self.clients[client_id] = ClientWrapper(client) - - return client - - @periodic_task.periodic_task - def _remove_stale_clients(self, context): - timeout = 60 - - def stale(client_id, last_access_time): - if timeutils.is_older_than(last_access_time, timeout): - LOG.debug('Removing stale RPC client: %s as it was last ' - 'accessed at %s', client_id, last_access_time) - return True - return False - - # Never expire the default client - items_copy = list(self.clients.items()) - for client_id, client_wrapper in items_copy: - if (client_id != 'default' and - stale(client_id, client_wrapper.last_access_time)): - del self.clients[client_id] + return self.default_client def by_instance(self, context, instance): - try: - cell_mapping = objects.InstanceMapping.get_by_instance_uuid( - context, instance.uuid).cell_mapping - except nova.exception.InstanceMappingNotFound: - # Not a cells v2 deployment - cell_mapping = None - return self._client(context, cell_mapping=cell_mapping) + """Deprecated.""" + if context.mq_connection: + return self._client(context, transport=context.mq_connection) + else: + return self.default_client def by_host(self, context, host): - try: - cell_mapping = objects.HostMapping.get_by_host( - context, host).cell_mapping - except nova.exception.HostMappingNotFound: - # Not a cells v2 deployment - cell_mapping = None - return self._client(context, cell_mapping=cell_mapping) + """Deprecated.""" + if context.mq_connection: + return self._client(context, transport=context.mq_connection) + else: + return self.default_client diff --git a/nova/tests/unit/compute/test_rpcapi.py b/nova/tests/unit/compute/test_rpcapi.py index 4d0eb75721af..46f7558d5a09 100644 --- a/nova/tests/unit/compute/test_rpcapi.py +++ b/nova/tests/unit/compute/test_rpcapi.py @@ -115,7 +115,7 @@ class ComputeRpcAPITestCase(test.NoDBTestCase): # This test wants to run the real prepare function, so must use # a real client object - default_client = rpcapi.router.clients['default'].client + default_client = rpcapi.router.default_client orig_prepare = default_client.prepare base_version = rpcapi.router.target.version diff --git a/nova/tests/unit/test_context.py b/nova/tests/unit/test_context.py index 33d72f7d7ce3..f1f254419430 100644 --- a/nova/tests/unit/test_context.py +++ b/nova/tests/unit/test_context.py @@ -290,18 +290,24 @@ class ContextTestCase(test.NoDBTestCase): mock_authorize.assert_called_once_with(ctxt, mock.sentinel.rule, mock.sentinel.target) + @mock.patch('nova.rpc.create_transport') @mock.patch('nova.db.create_context_manager') - def test_target_cell(self, mock_create_ctxt_mgr): - mock_create_ctxt_mgr.return_value = mock.sentinel.cm + def test_target_cell(self, mock_create_ctxt_mgr, mock_rpc): + mock_create_ctxt_mgr.return_value = mock.sentinel.cdb + mock_rpc.return_value = mock.sentinel.cmq ctxt = context.RequestContext('111', '222', roles=['admin', 'weasel']) # Verify the existing db_connection, if any, is restored ctxt.db_connection = mock.sentinel.db_conn - mapping = objects.CellMapping(database_connection='fake://') + ctxt.mq_connection = mock.sentinel.mq_conn + mapping = objects.CellMapping(database_connection='fake://', + transport_url='fake://') with context.target_cell(ctxt, mapping): - self.assertEqual(ctxt.db_connection, mock.sentinel.cm) + self.assertEqual(ctxt.db_connection, mock.sentinel.cdb) + self.assertEqual(ctxt.mq_connection, mock.sentinel.cmq) self.assertEqual(mock.sentinel.db_conn, ctxt.db_connection) + self.assertEqual(mock.sentinel.mq_conn, ctxt.mq_connection) def test_get_context(self): ctxt = context.get_context() diff --git a/nova/tests/unit/test_rpc.py b/nova/tests/unit/test_rpc.py index a265d45cbd1a..0c49762c9e1a 100644 --- a/nova/tests/unit/test_rpc.py +++ b/nova/tests/unit/test_rpc.py @@ -12,18 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. import copy -import datetime import fixtures import mock import oslo_messaging as messaging from oslo_messaging.rpc import dispatcher from oslo_serialization import jsonutils -from oslo_utils import fixture as utils_fixture import testtools from nova import context -from nova import exception from nova import objects from nova import rpc from nova import test @@ -445,179 +442,72 @@ class TestProfilerRequestContextSerializer(test.NoDBTestCase): class TestClientRouter(test.NoDBTestCase): - @mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid') - @mock.patch('nova.rpc.create_transport') @mock.patch('oslo_messaging.RPCClient') - def test_by_instance(self, mock_rpcclient, mock_create, mock_get): + def test_by_instance(self, mock_rpcclient): default_client = mock.Mock() cell_client = mock.Mock() mock_rpcclient.return_value = cell_client ctxt = mock.Mock() - cm = objects.CellMapping(uuid=uuids.cell_mapping, - transport_url='fake:///') - mock_get.return_value = objects.InstanceMapping(cell_mapping=cm) + ctxt.mq_connection = mock.sentinel.transport instance = objects.Instance(uuid=uuids.instance) router = rpc.ClientRouter(default_client) client = router.by_instance(ctxt, instance) - mock_get.assert_called_once_with(ctxt, instance.uuid) # verify a client was created by ClientRouter mock_rpcclient.assert_called_once_with( - mock_create.return_value, default_client.target, + mock.sentinel.transport, default_client.target, version_cap=default_client.version_cap, serializer=default_client.serializer) # verify cell client was returned self.assertEqual(cell_client, client) - # reset and check that cached client is returned the second time - mock_rpcclient.reset_mock() - mock_create.reset_mock() - mock_get.reset_mock() - - client = router.by_instance(ctxt, instance) - mock_get.assert_called_once_with(ctxt, instance.uuid) - mock_rpcclient.assert_not_called() - mock_create.assert_not_called() - self.assertEqual(cell_client, client) - - @mock.patch('nova.objects.HostMapping.get_by_host') - @mock.patch('nova.rpc.create_transport') @mock.patch('oslo_messaging.RPCClient') - def test_by_host(self, mock_rpcclient, mock_create, mock_get): - default_client = mock.Mock() - cell_client = mock.Mock() - mock_rpcclient.return_value = cell_client - ctxt = mock.Mock() - cm = objects.CellMapping(uuid=uuids.cell_mapping, - transport_url='fake:///') - mock_get.return_value = objects.HostMapping(cell_mapping=cm) - host = 'fake-host' - - router = rpc.ClientRouter(default_client) - client = router.by_host(ctxt, host) - - mock_get.assert_called_once_with(ctxt, host) - # verify a client was created by ClientRouter - mock_rpcclient.assert_called_once_with( - mock_create.return_value, default_client.target, - version_cap=default_client.version_cap, - serializer=default_client.serializer) - # verify cell client was returned - self.assertEqual(cell_client, client) - - # reset and check that cached client is returned the second time - mock_rpcclient.reset_mock() - mock_create.reset_mock() - mock_get.reset_mock() - - client = router.by_host(ctxt, host) - mock_get.assert_called_once_with(ctxt, host) - mock_rpcclient.assert_not_called() - mock_create.assert_not_called() - self.assertEqual(cell_client, client) - - @mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid', - side_effect=exception.InstanceMappingNotFound(uuid=uuids.instance)) - @mock.patch('nova.rpc.create_transport') - @mock.patch('oslo_messaging.RPCClient') - def test_by_instance_not_found(self, mock_rpcclient, mock_create, - mock_get): + def test_by_instance_untargeted(self, mock_rpcclient): default_client = mock.Mock() cell_client = mock.Mock() mock_rpcclient.return_value = cell_client ctxt = mock.Mock() + ctxt.mq_connection = None instance = objects.Instance(uuid=uuids.instance) router = rpc.ClientRouter(default_client) client = router.by_instance(ctxt, instance) - mock_get.assert_called_once_with(ctxt, instance.uuid) - mock_rpcclient.assert_not_called() - mock_create.assert_not_called() - # verify default client was returned - self.assertEqual(default_client, client) + self.assertEqual(router.default_client, client) + self.assertFalse(mock_rpcclient.called) - @mock.patch('nova.objects.HostMapping.get_by_host', - side_effect=exception.HostMappingNotFound(name='fake-host')) - @mock.patch('nova.rpc.create_transport') @mock.patch('oslo_messaging.RPCClient') - def test_by_host_not_found(self, mock_rpcclient, mock_create, mock_get): + def test_by_host(self, mock_rpcclient): default_client = mock.Mock() cell_client = mock.Mock() mock_rpcclient.return_value = cell_client ctxt = mock.Mock() + ctxt.mq_connection = mock.sentinel.transport host = 'fake-host' router = rpc.ClientRouter(default_client) client = router.by_host(ctxt, host) - mock_get.assert_called_once_with(ctxt, host) - mock_rpcclient.assert_not_called() - mock_create.assert_not_called() - # verify default client was returned - self.assertEqual(default_client, client) + # verify a client was created by ClientRouter + mock_rpcclient.assert_called_once_with( + mock.sentinel.transport, default_client.target, + version_cap=default_client.version_cap, + serializer=default_client.serializer) + # verify cell client was returned + self.assertEqual(cell_client, client) - @mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid') - @mock.patch('nova.rpc.create_transport') @mock.patch('oslo_messaging.RPCClient') - def test_remove_stale_clients(self, mock_rpcclient, mock_create, mock_get): - t0 = datetime.datetime(2016, 8, 9, 0, 0, 0) - time_fixture = self.useFixture(utils_fixture.TimeFixture(t0)) - + def test_by_host_untargeted(self, mock_rpcclient): default_client = mock.Mock() + cell_client = mock.Mock() + mock_rpcclient.return_value = cell_client ctxt = mock.Mock() - - cm1 = objects.CellMapping(uuid=uuids.cell_mapping1, - transport_url='fake:///') - cm2 = objects.CellMapping(uuid=uuids.cell_mapping2, - transport_url='fake:///') - cm3 = objects.CellMapping(uuid=uuids.cell_mapping3, - transport_url='fake:///') - mock_get.side_effect = [objects.InstanceMapping(cell_mapping=cm1), - objects.InstanceMapping(cell_mapping=cm2), - objects.InstanceMapping(cell_mapping=cm3), - objects.InstanceMapping(cell_mapping=cm3)] - instance1 = objects.Instance(uuid=uuids.instance1) - instance2 = objects.Instance(uuid=uuids.instance2) - instance3 = objects.Instance(uuid=uuids.instance3) + ctxt.mq_connection = None + host = 'fake-host' router = rpc.ClientRouter(default_client) - cell1_client = router.by_instance(ctxt, instance1) - cell2_client = router.by_instance(ctxt, instance2) + client = router.by_host(ctxt, host) - # default client, cell1 client, cell2 client - self.assertEqual(3, len(router.clients)) - expected = {'default': default_client, - uuids.cell_mapping1: cell1_client, - uuids.cell_mapping2: cell2_client} - for client_id, client in expected.items(): - self.assertEqual(client, router.clients[client_id].client) - - # expire cell1 client and cell2 client - time_fixture.advance_time_seconds(80) - - # add cell3 client - cell3_client = router.by_instance(ctxt, instance3) - - router._remove_stale_clients(ctxt) - - # default client, cell3 client - expected = {'default': default_client, - uuids.cell_mapping3: cell3_client} - self.assertEqual(2, len(router.clients)) - for client_id, client in expected.items(): - self.assertEqual(client, router.clients[client_id].client) - - # expire cell3 client - time_fixture.advance_time_seconds(80) - - # access cell3 client to refresh it - cell3_client = router.by_instance(ctxt, instance3) - - router._remove_stale_clients(ctxt) - - # default client and cell3 client should be there - self.assertEqual(2, len(router.clients)) - for client_id, client in expected.items(): - self.assertEqual(client, router.clients[client_id].client) + self.assertEqual(router.default_client, client) + self.assertFalse(mock_rpcclient.called)