Change MQ targeting to honor only what is in the context
Previously we had aimed to make things like compute RPC automatically look up the InstanceMapping or HostMapping for the call being performed to target appropriately. However, we cannot do that within the cell, and even trying incurs some overhead. For now, just deprecate the by_instance() and by_host() methods and honor what is in the context (if set) and otherwise fall back to the default client. Make the context target routines create and store the RPC transport and remove the caching logic from the ClientRouter since we're removing its ability to do that. Related to blueprint cells-aware-api Change-Id: I10f374adca672576058c4dbab708c040d166df47
This commit is contained in:
parent
4cd8ab5bdc
commit
159062882e
|
@ -370,8 +370,12 @@ def set_target_cell(context, cell_mapping):
|
|||
"""
|
||||
# avoid circular import
|
||||
from nova import db
|
||||
from nova import rpc
|
||||
db_connection_string = cell_mapping.database_connection
|
||||
context.db_connection = db.create_context_manager(db_connection_string)
|
||||
if not cell_mapping.transport_url.startswith('none'):
|
||||
context.mq_connection = rpc.create_transport(
|
||||
cell_mapping.transport_url)
|
||||
|
||||
|
||||
@contextmanager
|
||||
|
@ -386,8 +390,10 @@ def target_cell(context, cell_mapping):
|
|||
:param cell_mapping: A objects.CellMapping object
|
||||
"""
|
||||
original_db_connection = context.db_connection
|
||||
original_mq_connection = context.mq_connection
|
||||
set_target_cell(context, cell_mapping)
|
||||
try:
|
||||
yield context
|
||||
finally:
|
||||
context.db_connection = original_db_connection
|
||||
context.mq_connection = original_mq_connection
|
||||
|
|
86
nova/rpc.py
86
nova/rpc.py
|
@ -34,13 +34,11 @@ from oslo_messaging.rpc import dispatcher
|
|||
from oslo_serialization import jsonutils
|
||||
from oslo_service import periodic_task
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
import nova.conf
|
||||
import nova.context
|
||||
import nova.exception
|
||||
from nova.i18n import _
|
||||
from nova import objects
|
||||
|
||||
profiler = importutils.try_import("osprofiler.profiler")
|
||||
|
||||
|
@ -395,27 +393,14 @@ class LegacyValidatingNotifier(object):
|
|||
getattr(self.notifier, priority)(ctxt, event_type, payload)
|
||||
|
||||
|
||||
class ClientWrapper(object):
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
self.last_access_time = timeutils.utcnow()
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
self.last_access_time = timeutils.utcnow()
|
||||
return self._client
|
||||
|
||||
|
||||
class ClientRouter(periodic_task.PeriodicTasks):
|
||||
"""Creates and caches RPC clients that route to cells or the default.
|
||||
|
||||
The default client connects to the API cell message queue. The rest of the
|
||||
clients connect to compute cell message queues.
|
||||
"""Creates RPC clients that honor the context's RPC transport
|
||||
or provides a default.
|
||||
"""
|
||||
|
||||
def __init__(self, default_client):
|
||||
super(ClientRouter, self).__init__(CONF)
|
||||
self.clients = {}
|
||||
self.clients['default'] = ClientWrapper(default_client)
|
||||
self.default_client = default_client
|
||||
self.target = default_client.target
|
||||
self.version_cap = default_client.version_cap
|
||||
# NOTE(melwitt): Cells v1 does its own serialization and won't
|
||||
|
@ -424,55 +409,24 @@ class ClientRouter(periodic_task.PeriodicTasks):
|
|||
# Prevent this empty context from overwriting the thread local copy
|
||||
self.run_periodic_tasks(nova.context.RequestContext(overwrite=False))
|
||||
|
||||
def _client(self, context, cell_mapping=None):
|
||||
if cell_mapping:
|
||||
client_id = cell_mapping.uuid
|
||||
def _client(self, context, transport=None):
|
||||
if transport:
|
||||
return messaging.RPCClient(transport, self.target,
|
||||
version_cap=self.version_cap,
|
||||
serializer=self.serializer)
|
||||
else:
|
||||
client_id = 'default'
|
||||
|
||||
try:
|
||||
client = self.clients[client_id].client
|
||||
except KeyError:
|
||||
transport = create_transport(cell_mapping.transport_url)
|
||||
client = messaging.RPCClient(transport, self.target,
|
||||
version_cap=self.version_cap,
|
||||
serializer=self.serializer)
|
||||
self.clients[client_id] = ClientWrapper(client)
|
||||
|
||||
return client
|
||||
|
||||
@periodic_task.periodic_task
|
||||
def _remove_stale_clients(self, context):
|
||||
timeout = 60
|
||||
|
||||
def stale(client_id, last_access_time):
|
||||
if timeutils.is_older_than(last_access_time, timeout):
|
||||
LOG.debug('Removing stale RPC client: %s as it was last '
|
||||
'accessed at %s', client_id, last_access_time)
|
||||
return True
|
||||
return False
|
||||
|
||||
# Never expire the default client
|
||||
items_copy = list(self.clients.items())
|
||||
for client_id, client_wrapper in items_copy:
|
||||
if (client_id != 'default' and
|
||||
stale(client_id, client_wrapper.last_access_time)):
|
||||
del self.clients[client_id]
|
||||
return self.default_client
|
||||
|
||||
def by_instance(self, context, instance):
|
||||
try:
|
||||
cell_mapping = objects.InstanceMapping.get_by_instance_uuid(
|
||||
context, instance.uuid).cell_mapping
|
||||
except nova.exception.InstanceMappingNotFound:
|
||||
# Not a cells v2 deployment
|
||||
cell_mapping = None
|
||||
return self._client(context, cell_mapping=cell_mapping)
|
||||
"""Deprecated."""
|
||||
if context.mq_connection:
|
||||
return self._client(context, transport=context.mq_connection)
|
||||
else:
|
||||
return self.default_client
|
||||
|
||||
def by_host(self, context, host):
|
||||
try:
|
||||
cell_mapping = objects.HostMapping.get_by_host(
|
||||
context, host).cell_mapping
|
||||
except nova.exception.HostMappingNotFound:
|
||||
# Not a cells v2 deployment
|
||||
cell_mapping = None
|
||||
return self._client(context, cell_mapping=cell_mapping)
|
||||
"""Deprecated."""
|
||||
if context.mq_connection:
|
||||
return self._client(context, transport=context.mq_connection)
|
||||
else:
|
||||
return self.default_client
|
||||
|
|
|
@ -115,7 +115,7 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
|||
|
||||
# This test wants to run the real prepare function, so must use
|
||||
# a real client object
|
||||
default_client = rpcapi.router.clients['default'].client
|
||||
default_client = rpcapi.router.default_client
|
||||
|
||||
orig_prepare = default_client.prepare
|
||||
base_version = rpcapi.router.target.version
|
||||
|
|
|
@ -290,18 +290,24 @@ class ContextTestCase(test.NoDBTestCase):
|
|||
mock_authorize.assert_called_once_with(ctxt, mock.sentinel.rule,
|
||||
mock.sentinel.target)
|
||||
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('nova.db.create_context_manager')
|
||||
def test_target_cell(self, mock_create_ctxt_mgr):
|
||||
mock_create_ctxt_mgr.return_value = mock.sentinel.cm
|
||||
def test_target_cell(self, mock_create_ctxt_mgr, mock_rpc):
|
||||
mock_create_ctxt_mgr.return_value = mock.sentinel.cdb
|
||||
mock_rpc.return_value = mock.sentinel.cmq
|
||||
ctxt = context.RequestContext('111',
|
||||
'222',
|
||||
roles=['admin', 'weasel'])
|
||||
# Verify the existing db_connection, if any, is restored
|
||||
ctxt.db_connection = mock.sentinel.db_conn
|
||||
mapping = objects.CellMapping(database_connection='fake://')
|
||||
ctxt.mq_connection = mock.sentinel.mq_conn
|
||||
mapping = objects.CellMapping(database_connection='fake://',
|
||||
transport_url='fake://')
|
||||
with context.target_cell(ctxt, mapping):
|
||||
self.assertEqual(ctxt.db_connection, mock.sentinel.cm)
|
||||
self.assertEqual(ctxt.db_connection, mock.sentinel.cdb)
|
||||
self.assertEqual(ctxt.mq_connection, mock.sentinel.cmq)
|
||||
self.assertEqual(mock.sentinel.db_conn, ctxt.db_connection)
|
||||
self.assertEqual(mock.sentinel.mq_conn, ctxt.mq_connection)
|
||||
|
||||
def test_get_context(self):
|
||||
ctxt = context.get_context()
|
||||
|
|
|
@ -12,18 +12,15 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import copy
|
||||
import datetime
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
import oslo_messaging as messaging
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import fixture as utils_fixture
|
||||
import testtools
|
||||
|
||||
from nova import context
|
||||
from nova import exception
|
||||
from nova import objects
|
||||
from nova import rpc
|
||||
from nova import test
|
||||
|
@ -445,179 +442,72 @@ class TestProfilerRequestContextSerializer(test.NoDBTestCase):
|
|||
|
||||
|
||||
class TestClientRouter(test.NoDBTestCase):
|
||||
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid')
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_by_instance(self, mock_rpcclient, mock_create, mock_get):
|
||||
def test_by_instance(self, mock_rpcclient):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
cm = objects.CellMapping(uuid=uuids.cell_mapping,
|
||||
transport_url='fake:///')
|
||||
mock_get.return_value = objects.InstanceMapping(cell_mapping=cm)
|
||||
ctxt.mq_connection = mock.sentinel.transport
|
||||
instance = objects.Instance(uuid=uuids.instance)
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
client = router.by_instance(ctxt, instance)
|
||||
|
||||
mock_get.assert_called_once_with(ctxt, instance.uuid)
|
||||
# verify a client was created by ClientRouter
|
||||
mock_rpcclient.assert_called_once_with(
|
||||
mock_create.return_value, default_client.target,
|
||||
mock.sentinel.transport, default_client.target,
|
||||
version_cap=default_client.version_cap,
|
||||
serializer=default_client.serializer)
|
||||
# verify cell client was returned
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
# reset and check that cached client is returned the second time
|
||||
mock_rpcclient.reset_mock()
|
||||
mock_create.reset_mock()
|
||||
mock_get.reset_mock()
|
||||
|
||||
client = router.by_instance(ctxt, instance)
|
||||
mock_get.assert_called_once_with(ctxt, instance.uuid)
|
||||
mock_rpcclient.assert_not_called()
|
||||
mock_create.assert_not_called()
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
@mock.patch('nova.objects.HostMapping.get_by_host')
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_by_host(self, mock_rpcclient, mock_create, mock_get):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
cm = objects.CellMapping(uuid=uuids.cell_mapping,
|
||||
transport_url='fake:///')
|
||||
mock_get.return_value = objects.HostMapping(cell_mapping=cm)
|
||||
host = 'fake-host'
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
client = router.by_host(ctxt, host)
|
||||
|
||||
mock_get.assert_called_once_with(ctxt, host)
|
||||
# verify a client was created by ClientRouter
|
||||
mock_rpcclient.assert_called_once_with(
|
||||
mock_create.return_value, default_client.target,
|
||||
version_cap=default_client.version_cap,
|
||||
serializer=default_client.serializer)
|
||||
# verify cell client was returned
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
# reset and check that cached client is returned the second time
|
||||
mock_rpcclient.reset_mock()
|
||||
mock_create.reset_mock()
|
||||
mock_get.reset_mock()
|
||||
|
||||
client = router.by_host(ctxt, host)
|
||||
mock_get.assert_called_once_with(ctxt, host)
|
||||
mock_rpcclient.assert_not_called()
|
||||
mock_create.assert_not_called()
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid',
|
||||
side_effect=exception.InstanceMappingNotFound(uuid=uuids.instance))
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_by_instance_not_found(self, mock_rpcclient, mock_create,
|
||||
mock_get):
|
||||
def test_by_instance_untargeted(self, mock_rpcclient):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
ctxt.mq_connection = None
|
||||
instance = objects.Instance(uuid=uuids.instance)
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
client = router.by_instance(ctxt, instance)
|
||||
|
||||
mock_get.assert_called_once_with(ctxt, instance.uuid)
|
||||
mock_rpcclient.assert_not_called()
|
||||
mock_create.assert_not_called()
|
||||
# verify default client was returned
|
||||
self.assertEqual(default_client, client)
|
||||
self.assertEqual(router.default_client, client)
|
||||
self.assertFalse(mock_rpcclient.called)
|
||||
|
||||
@mock.patch('nova.objects.HostMapping.get_by_host',
|
||||
side_effect=exception.HostMappingNotFound(name='fake-host'))
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_by_host_not_found(self, mock_rpcclient, mock_create, mock_get):
|
||||
def test_by_host(self, mock_rpcclient):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
ctxt.mq_connection = mock.sentinel.transport
|
||||
host = 'fake-host'
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
client = router.by_host(ctxt, host)
|
||||
|
||||
mock_get.assert_called_once_with(ctxt, host)
|
||||
mock_rpcclient.assert_not_called()
|
||||
mock_create.assert_not_called()
|
||||
# verify default client was returned
|
||||
self.assertEqual(default_client, client)
|
||||
# verify a client was created by ClientRouter
|
||||
mock_rpcclient.assert_called_once_with(
|
||||
mock.sentinel.transport, default_client.target,
|
||||
version_cap=default_client.version_cap,
|
||||
serializer=default_client.serializer)
|
||||
# verify cell client was returned
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid')
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_remove_stale_clients(self, mock_rpcclient, mock_create, mock_get):
|
||||
t0 = datetime.datetime(2016, 8, 9, 0, 0, 0)
|
||||
time_fixture = self.useFixture(utils_fixture.TimeFixture(t0))
|
||||
|
||||
def test_by_host_untargeted(self, mock_rpcclient):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
|
||||
cm1 = objects.CellMapping(uuid=uuids.cell_mapping1,
|
||||
transport_url='fake:///')
|
||||
cm2 = objects.CellMapping(uuid=uuids.cell_mapping2,
|
||||
transport_url='fake:///')
|
||||
cm3 = objects.CellMapping(uuid=uuids.cell_mapping3,
|
||||
transport_url='fake:///')
|
||||
mock_get.side_effect = [objects.InstanceMapping(cell_mapping=cm1),
|
||||
objects.InstanceMapping(cell_mapping=cm2),
|
||||
objects.InstanceMapping(cell_mapping=cm3),
|
||||
objects.InstanceMapping(cell_mapping=cm3)]
|
||||
instance1 = objects.Instance(uuid=uuids.instance1)
|
||||
instance2 = objects.Instance(uuid=uuids.instance2)
|
||||
instance3 = objects.Instance(uuid=uuids.instance3)
|
||||
ctxt.mq_connection = None
|
||||
host = 'fake-host'
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
cell1_client = router.by_instance(ctxt, instance1)
|
||||
cell2_client = router.by_instance(ctxt, instance2)
|
||||
client = router.by_host(ctxt, host)
|
||||
|
||||
# default client, cell1 client, cell2 client
|
||||
self.assertEqual(3, len(router.clients))
|
||||
expected = {'default': default_client,
|
||||
uuids.cell_mapping1: cell1_client,
|
||||
uuids.cell_mapping2: cell2_client}
|
||||
for client_id, client in expected.items():
|
||||
self.assertEqual(client, router.clients[client_id].client)
|
||||
|
||||
# expire cell1 client and cell2 client
|
||||
time_fixture.advance_time_seconds(80)
|
||||
|
||||
# add cell3 client
|
||||
cell3_client = router.by_instance(ctxt, instance3)
|
||||
|
||||
router._remove_stale_clients(ctxt)
|
||||
|
||||
# default client, cell3 client
|
||||
expected = {'default': default_client,
|
||||
uuids.cell_mapping3: cell3_client}
|
||||
self.assertEqual(2, len(router.clients))
|
||||
for client_id, client in expected.items():
|
||||
self.assertEqual(client, router.clients[client_id].client)
|
||||
|
||||
# expire cell3 client
|
||||
time_fixture.advance_time_seconds(80)
|
||||
|
||||
# access cell3 client to refresh it
|
||||
cell3_client = router.by_instance(ctxt, instance3)
|
||||
|
||||
router._remove_stale_clients(ctxt)
|
||||
|
||||
# default client and cell3 client should be there
|
||||
self.assertEqual(2, len(router.clients))
|
||||
for client_id, client in expected.items():
|
||||
self.assertEqual(client, router.clients[client_id].client)
|
||||
self.assertEqual(router.default_client, client)
|
||||
self.assertFalse(mock_rpcclient.called)
|
||||
|
|
Loading…
Reference in New Issue