Merge "Bulk up port context retrieval" into stable/ocata

This commit is contained in:
Jenkins 2017-05-31 21:02:30 +00:00 committed by Gerrit Code Review
commit ba0d79887b
4 changed files with 79 additions and 19 deletions

View File

@ -1624,6 +1624,52 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return self._bind_port_if_needed(port_context)
@utils.transaction_guard
@db_api.retry_if_session_inactive(context_var_name='plugin_context')
def get_bound_ports_contexts(self, plugin_context, dev_ids, host=None):
result = {}
with db_api.context_manager.reader.using(plugin_context):
dev_to_full_pids = db.partial_port_ids_to_full_ids(
plugin_context, dev_ids)
# get all port objects for IDs
port_dbs_by_id = db.get_port_db_objects(
plugin_context, dev_to_full_pids.values())
# get all networks for PortContext construction
netctxs_by_netid = self.get_network_contexts(
plugin_context,
{p.network_id for p in port_dbs_by_id.values()})
for dev_id in dev_ids:
port_id = dev_to_full_pids.get(dev_id)
port_db = port_dbs_by_id.get(port_id)
if (not port_id or not port_db or
port_db.network_id not in netctxs_by_netid):
result[dev_id] = None
continue
port = self._make_port_dict(port_db)
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
binding = db.get_distributed_port_binding_by_host(
plugin_context, port['id'], host)
bindlevelhost_match = host
else:
binding = port_db.port_binding
bindlevelhost_match = binding.host if binding else None
if not binding:
LOG.info(_LI("Binding info for port %s was not found, "
"it might have been deleted already."),
port_id)
result[dev_id] = None
continue
levels = [l for l in port_db.binding_levels
if l.host == bindlevelhost_match]
levels = sorted(levels, key=lambda l: l.level)
network_ctx = netctxs_by_netid.get(port_db.network_id)
port_context = driver_context.PortContext(
self, plugin_context, port, network_ctx, binding, levels)
result[dev_id] = port_context
return {d: self._bind_port_if_needed(pctx) if pctx else None
for d, pctx in result.items()}
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def update_port_status(self, context, port_id, status, host=None,

View File

@ -82,13 +82,22 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
{'device': device, 'agent_id': agent_id})
return {'device': device}
segment = port_context.bottom_bound_segment
port = port_context.current
# caching information about networks for future use
if cached_networks is not None:
if port['network_id'] not in cached_networks:
cached_networks[port['network_id']] = (
port_context.network.current)
return self._get_device_details(rpc_context, agent_id=agent_id,
host=host, device=device,
port_context=port_context)
def _get_device_details(self, rpc_context, agent_id, host, device,
port_context):
segment = port_context.bottom_bound_segment
port = port_context.current
plugin = directory.get_plugin()
port_id = port_context.current['id']
if not segment:
LOG.warning(_LW("Device %(device)s requested by agent "
@ -149,17 +158,31 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
**kwargs):
devices = []
failed_devices = []
cached_networks = {}
for device in kwargs.pop('devices', []):
devices_to_fetch = kwargs.pop('devices', [])
plugin = directory.get_plugin()
host = kwargs.get('host')
bound_contexts = plugin.get_bound_ports_contexts(rpc_context,
devices_to_fetch,
host)
for device in devices_to_fetch:
if not bound_contexts.get(device):
# unbound bound
LOG.debug("Device %(device)s requested by agent "
"%(agent_id)s not found in database",
{'device': device,
'agent_id': kwargs.get('agent_id')})
devices.append({'device': device})
continue
try:
devices.append(self.get_device_details(
devices.append(self._get_device_details(
rpc_context,
agent_id=kwargs.get('agent_id'),
host=host,
device=device,
cached_networks=cached_networks,
**kwargs))
port_context=bound_contexts[device]))
except Exception:
LOG.error(_LE("Failed to get details for device %s"),
device)
LOG.exception(_LE("Failed to get details for device %s"),
device)
failed_devices.append(device)
return {'devices': devices,

View File

@ -131,15 +131,6 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
cached_networks=cached_networks)
self.assertFalse(self.plugin.get_network.called)
def test_get_bound_port_context_cache_miss(self):
ctx = context.get_admin_context()
with self.port(name='name') as port:
some_network = {'id': u'2ac23560-7638-44e2-9875-c1888b02af72'}
self.plugin.get_network = mock.Mock(return_value=some_network)
self.plugin.get_bound_port_context(ctx, port['port']['id'],
cached_networks={})
self.assertEqual(1, self.plugin.get_network.call_count)
def _test_update_port_binding(self, host, new_host=None):
with mock.patch.object(self.plugin,
'_notify_port_updated') as notify_mock:

View File

@ -166,13 +166,13 @@ class RpcCallbacksTestCase(base.BaseTestCase):
def _test_get_devices_list(self, callback, side_effect, expected):
devices = [1, 2, 3, 4, 5]
kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}
with mock.patch.object(self.callbacks, 'get_device_details',
with mock.patch.object(self.callbacks, '_get_device_details',
side_effect=side_effect) as f:
res = callback('fake_context', devices=devices, **kwargs)
self.assertEqual(expected, res)
self.assertEqual(len(devices), f.call_count)
calls = [mock.call('fake_context', device=i,
cached_networks={}, **kwargs)
port_context=mock.ANY, **kwargs)
for i in devices]
f.assert_has_calls(calls)