Bulk up port context retrieval
With the switch to subquery relationships, individual get_port calls can get expensive with large numbers of ports (100ms per port in my dev environment). This patch bulks up the retrieval of the port contexts so one set of queries covers all of the devices in an RPC call. Partial-Bug: #1665215 Change-Id: I63757e143b23c24c349be98dc5a09115b8709a25
This commit is contained in:
parent
55be865d9a
commit
529da4e583
|
@ -893,7 +893,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
filters=net_filters)
|
||||
}
|
||||
segments_by_netid = segments_db.get_networks_segments(
|
||||
context, nets_by_netid.keys())
|
||||
context, list(nets_by_netid.keys()))
|
||||
netctxs_by_netid = {
|
||||
net_id: driver_context.NetworkContext(
|
||||
self, context, nets_by_netid[net_id],
|
||||
|
@ -1539,6 +1539,52 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
|
||||
return self._bind_port_if_needed(port_context)
|
||||
|
||||
@utils.transaction_guard
|
||||
@db_api.retry_if_session_inactive(context_var_name='plugin_context')
|
||||
def get_bound_ports_contexts(self, plugin_context, dev_ids, host=None):
|
||||
result = {}
|
||||
with db_api.context_manager.reader.using(plugin_context):
|
||||
dev_to_full_pids = db.partial_port_ids_to_full_ids(
|
||||
plugin_context, dev_ids)
|
||||
# get all port objects for IDs
|
||||
port_dbs_by_id = db.get_port_db_objects(
|
||||
plugin_context, dev_to_full_pids.values())
|
||||
# get all networks for PortContext construction
|
||||
netctxs_by_netid = self.get_network_contexts(
|
||||
plugin_context,
|
||||
{p.network_id for p in port_dbs_by_id.values()})
|
||||
for dev_id in dev_ids:
|
||||
port_id = dev_to_full_pids.get(dev_id)
|
||||
port_db = port_dbs_by_id.get(port_id)
|
||||
if (not port_id or not port_db or
|
||||
port_db.network_id not in netctxs_by_netid):
|
||||
result[dev_id] = None
|
||||
continue
|
||||
port = self._make_port_dict(port_db)
|
||||
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
|
||||
binding = db.get_distributed_port_binding_by_host(
|
||||
plugin_context, port['id'], host)
|
||||
bindlevelhost_match = host
|
||||
else:
|
||||
binding = port_db.port_binding
|
||||
bindlevelhost_match = binding.host if binding else None
|
||||
if not binding:
|
||||
LOG.info(_LI("Binding info for port %s was not found, "
|
||||
"it might have been deleted already."),
|
||||
port_id)
|
||||
result[dev_id] = None
|
||||
continue
|
||||
levels = [l for l in port_db.binding_levels
|
||||
if l.host == bindlevelhost_match]
|
||||
levels = sorted(levels, key=lambda l: l.level)
|
||||
network_ctx = netctxs_by_netid.get(port_db.network_id)
|
||||
port_context = driver_context.PortContext(
|
||||
self, plugin_context, port, network_ctx, binding, levels)
|
||||
result[dev_id] = port_context
|
||||
|
||||
return {d: self._bind_port_if_needed(pctx) if pctx else None
|
||||
for d, pctx in result.items()}
|
||||
|
||||
@utils.transaction_guard
|
||||
@db_api.retry_if_session_inactive()
|
||||
def update_port_status(self, context, port_id, status, host=None,
|
||||
|
|
|
@ -82,13 +82,22 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||
{'device': device, 'agent_id': agent_id})
|
||||
return {'device': device}
|
||||
|
||||
segment = port_context.bottom_bound_segment
|
||||
port = port_context.current
|
||||
# caching information about networks for future use
|
||||
if cached_networks is not None:
|
||||
if port['network_id'] not in cached_networks:
|
||||
cached_networks[port['network_id']] = (
|
||||
port_context.network.current)
|
||||
return self._get_device_details(rpc_context, agent_id=agent_id,
|
||||
host=host, device=device,
|
||||
port_context=port_context)
|
||||
|
||||
def _get_device_details(self, rpc_context, agent_id, host, device,
|
||||
port_context):
|
||||
segment = port_context.bottom_bound_segment
|
||||
port = port_context.current
|
||||
plugin = directory.get_plugin()
|
||||
port_id = port_context.current['id']
|
||||
|
||||
if not segment:
|
||||
LOG.warning(_LW("Device %(device)s requested by agent "
|
||||
|
@ -148,17 +157,31 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||
**kwargs):
|
||||
devices = []
|
||||
failed_devices = []
|
||||
cached_networks = {}
|
||||
for device in kwargs.pop('devices', []):
|
||||
devices_to_fetch = kwargs.pop('devices', [])
|
||||
plugin = directory.get_plugin()
|
||||
host = kwargs.get('host')
|
||||
bound_contexts = plugin.get_bound_ports_contexts(rpc_context,
|
||||
devices_to_fetch,
|
||||
host)
|
||||
for device in devices_to_fetch:
|
||||
if not bound_contexts.get(device):
|
||||
# unbound bound
|
||||
LOG.debug("Device %(device)s requested by agent "
|
||||
"%(agent_id)s not found in database",
|
||||
{'device': device,
|
||||
'agent_id': kwargs.get('agent_id')})
|
||||
devices.append({'device': device})
|
||||
continue
|
||||
try:
|
||||
devices.append(self.get_device_details(
|
||||
devices.append(self._get_device_details(
|
||||
rpc_context,
|
||||
agent_id=kwargs.get('agent_id'),
|
||||
host=host,
|
||||
device=device,
|
||||
cached_networks=cached_networks,
|
||||
**kwargs))
|
||||
port_context=bound_contexts[device]))
|
||||
except Exception:
|
||||
LOG.error(_LE("Failed to get details for device %s"),
|
||||
device)
|
||||
LOG.exception(_LE("Failed to get details for device %s"),
|
||||
device)
|
||||
failed_devices.append(device)
|
||||
|
||||
return {'devices': devices,
|
||||
|
|
|
@ -132,15 +132,6 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
|
|||
cached_networks=cached_networks)
|
||||
self.assertFalse(self.plugin.get_network.called)
|
||||
|
||||
def test_get_bound_port_context_cache_miss(self):
|
||||
ctx = context.get_admin_context()
|
||||
with self.port(name='name') as port:
|
||||
some_network = {'id': u'2ac23560-7638-44e2-9875-c1888b02af72'}
|
||||
self.plugin.get_network = mock.Mock(return_value=some_network)
|
||||
self.plugin.get_bound_port_context(ctx, port['port']['id'],
|
||||
cached_networks={})
|
||||
self.assertEqual(1, self.plugin.get_network.call_count)
|
||||
|
||||
def _test_update_port_binding(self, host, new_host=None):
|
||||
with mock.patch.object(self.plugin,
|
||||
'_notify_port_updated') as notify_mock:
|
||||
|
|
|
@ -166,13 +166,13 @@ class RpcCallbacksTestCase(base.BaseTestCase):
|
|||
def _test_get_devices_list(self, callback, side_effect, expected):
|
||||
devices = [1, 2, 3, 4, 5]
|
||||
kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}
|
||||
with mock.patch.object(self.callbacks, 'get_device_details',
|
||||
with mock.patch.object(self.callbacks, '_get_device_details',
|
||||
side_effect=side_effect) as f:
|
||||
res = callback('fake_context', devices=devices, **kwargs)
|
||||
self.assertEqual(expected, res)
|
||||
self.assertEqual(len(devices), f.call_count)
|
||||
calls = [mock.call('fake_context', device=i,
|
||||
cached_networks={}, **kwargs)
|
||||
port_context=mock.ANY, **kwargs)
|
||||
for i in devices]
|
||||
f.assert_has_calls(calls)
|
||||
|
||||
|
|
Loading…
Reference in New Issue