Optimize the nova vm name query inside get_gbp_details()

1. Each controller will have a polling thread which will query
Nova API every 1 min to get the list of nova VMs and use that
to build up/update the DB cache.

2. There are 2 types of udpates, one is the full udpate which
will query all the nova VMs while the other is the incremental
update which will only query the new/updated VMs in the past
10 mins. We will do incremental udpate every 1 min then a full
update every 10 mins.

3. Inside get_gbp_details(), it will then query this DB cache
instead for the VM name. If its not there, it will just use the
device_id instead for the time being to speed up the performance.

4. Then when there are added/updated VMs, the polling thread will
call notify_port_update() to trigger an update to the
corresponding EP file.

5. This patch can also take care of the VM name change case.

6. The VMNameUpdate DB table is used to corodinate which
controller's polling thread should be in charge of the periodical
updates. So at any given time, there will be only one active
polling thread doing the updates.

Change-Id: I820d8078825dbc1e801b7e87752138e9509369d8
This commit is contained in:
Kent Wu 2019-01-25 11:43:12 -08:00
parent 2210d27f48
commit 357fbd4d83
10 changed files with 288 additions and 13 deletions

View File

@ -0,0 +1,46 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""VM names acquired from Nova API
Revision ID: 4f70135dc089
Revises: 4967af35820f
Create Date: 2019-01-08 14:18:11.909757
"""
# revision identifiers, used by Alembic.
revision = '4f70135dc089'
down_revision = '4967af35820f'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'apic_aim_vm_names',
sa.Column('device_id', sa.String(36), nullable=False),
sa.PrimaryKeyConstraint('device_id'),
sa.Column('vm_name', sa.String(64), nullable=False),
)
op.create_table(
'apic_aim_vm_name_updates',
sa.Column('host_id', sa.String(36), nullable=False),
sa.PrimaryKeyConstraint('host_id'),
sa.Column('last_incremental_update_time', sa.DateTime()),
sa.Column('last_full_update_time', sa.DateTime()),
)
def downgrade():
pass

View File

@ -1 +1 @@
4967af35820f
4f70135dc089

View File

@ -57,6 +57,10 @@ apic_opts = [
default=False,
help=("This will use those raw SQL statements to speed "
"up the calculation of the EP file.")),
cfg.IntOpt('apic_nova_vm_name_cache_update_interval', default=60,
help=("How many seconds for the polling thread on each "
"controller should wait before it updates the nova vm "
"name cache again.")),
]

View File

@ -80,6 +80,24 @@ class NetworkMapping(model_base.BASEV2):
vrf_tenant_name = sa.Column(sa.String(64))
class VMName(model_base.BASEV2):
__tablename__ = 'apic_aim_vm_names'
device_id = sa.Column(sa.String(36), primary_key=True)
vm_name = sa.Column(sa.String(64))
# At any point of time, there should only be one entry in this table.
# That entry is used to make sure only one controller is actively updating
# the VMName table.
class VMNameUpdate(model_base.BASEV2):
__tablename__ = 'apic_aim_vm_name_updates'
host_id = sa.Column(sa.String(36), primary_key=True)
last_incremental_update_time = sa.Column(sa.DateTime)
last_full_update_time = sa.Column(sa.DateTime)
class DbMixin(object):
def _add_address_scope_mapping(self, session, scope_id, vrf,
vrf_owned=True, update_scope=True):
@ -299,3 +317,61 @@ class DbMixin(object):
def _set_network_vrf(self, mapping, vrf):
mapping.vrf_tenant_name = vrf.tenant_name
mapping.vrf_name = vrf.name
def _get_vm_name(self, session, device_id, is_detailed=False):
if is_detailed:
query = BAKERY(lambda s: s.query(VMName))
else:
query = BAKERY(lambda s: s.query(VMName.vm_name))
query += lambda q: q.filter_by(
device_id=sa.bindparam('device_id'))
return query(session).params(
device_id=device_id).one_or_none()
def _get_vm_names(self, session):
query = BAKERY(lambda s: s.query(VMName.device_id,
VMName.vm_name))
return query(session).all()
def _set_vm_name(self, session, device_id, vm_name):
with session.begin(subtransactions=True):
db_obj = self._get_vm_name(session, device_id,
is_detailed=True)
if db_obj:
db_obj.vm_name = vm_name
else:
db_obj = VMName(device_id=device_id, vm_name=vm_name)
session.add(db_obj)
def _delete_vm_name(self, session, device_id):
with session.begin(subtransactions=True):
db_obj = self._get_vm_name(session, device_id,
is_detailed=True)
if db_obj:
session.delete(db_obj)
def _get_vm_name_update(self, session):
query = BAKERY(lambda s: s.query(VMNameUpdate))
return query(session).one_or_none()
def _set_vm_name_update(self, session, db_obj, host_id,
last_incremental_update_time,
last_full_update_time=None):
with session.begin(subtransactions=True):
if db_obj:
db_obj.host_id = host_id
db_obj.last_incremental_update_time = (
last_incremental_update_time)
if last_full_update_time:
db_obj.last_full_update_time = last_full_update_time
else:
db_obj = VMNameUpdate(host_id=host_id,
last_incremental_update_time=last_incremental_update_time,
last_full_update_time=last_full_update_time)
session.add(db_obj)
def _delete_vm_name_update(self, session):
with session.begin(subtransactions=True):
db_obj = self._get_vm_name_update(session)
if db_obj:
session.delete(db_obj)

View File

@ -16,6 +16,7 @@
from collections import defaultdict
from collections import namedtuple
import copy
from datetime import datetime
import netaddr
import os
import re
@ -60,6 +61,7 @@ from neutron_lib import context as nctx
from neutron_lib import exceptions as n_exceptions
from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api
from neutron_lib.utils import net
from opflexagent import constants as ofcst
from opflexagent import host_agent_rpc as arpc
from opflexagent import rpc as ofrpc
@ -67,6 +69,7 @@ from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log
import oslo_messaging
from oslo_service import loopingcall
from oslo_utils import importutils
from gbpservice.network.neutronv2 import local_api
@ -82,6 +85,8 @@ from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import db
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import exceptions
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import extension_db
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import trunk_driver
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
nova_client as nclient)
LOG = log.getLogger(__name__)
@ -258,6 +263,9 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
self.l3_domain_dn = cfg.CONF.ml2_apic_aim.l3_domain_dn
self.enable_raw_sql_for_device_rpc = (cfg.CONF.ml2_apic_aim.
enable_raw_sql_for_device_rpc)
self.apic_nova_vm_name_cache_update_interval = (cfg.CONF.ml2_apic_aim.
apic_nova_vm_name_cache_update_interval)
self._setup_nova_vm_update()
local_api.QUEUE_OUT_OF_PROCESS_NOTIFICATIONS = True
self._ensure_static_resources()
trunk_driver.register()
@ -266,6 +274,76 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
self.apic_router_id_pool = cfg.CONF.ml2_apic_aim.apic_router_id_pool
self.apic_router_id_subnet = netaddr.IPSet([self.apic_router_id_pool])
def _setup_nova_vm_update(self):
self.admin_context = nctx.get_admin_context()
self.host_id = 'id-%s' % net.get_hostname()
vm_update = loopingcall.FixedIntervalLoopingCall(
self._update_nova_vm_name_cache)
vm_update.start(
interval=self.apic_nova_vm_name_cache_update_interval)
def _update_nova_vm_name_cache(self):
current_time = datetime.now()
session = self.admin_context.session
vm_name_update = self._get_vm_name_update(session)
is_full_update = True
if vm_name_update:
# The other controller is still doing the update actively
if vm_name_update.host_id != self.host_id:
delta_time = (current_time -
vm_name_update.last_incremental_update_time)
if (delta_time.total_seconds() <
self.apic_nova_vm_name_cache_update_interval * 2):
return
else:
delta_time = (current_time -
vm_name_update.last_full_update_time)
if (delta_time.total_seconds() <
self.apic_nova_vm_name_cache_update_interval * 10):
is_full_update = False
self._set_vm_name_update(session, vm_name_update, self.host_id,
current_time,
current_time if is_full_update else None)
nova_vms = nclient.NovaClient().get_servers(
is_full_update, self.apic_nova_vm_name_cache_update_interval * 10)
vm_list = []
for vm in nova_vms:
vm_list.append((vm.id, vm.name))
nova_vms = set(vm_list)
with db_api.context_manager.writer.using(self.admin_context):
cached_vms = self._get_vm_names(session)
cached_vms = set(cached_vms)
# Only handle the deletion during full update otherwise we
# don't know if the missing VMs are being deleted or just older
# than 10 minutes as incremental update only queries Nova for
# the past 10 mins.
if is_full_update:
removed_vms = cached_vms - nova_vms
for device_id, _ in removed_vms:
self._delete_vm_name(session, device_id)
added_vms = nova_vms - cached_vms
update_ports = []
for device_id, name in added_vms:
self._set_vm_name(session, device_id, name)
# Get the port_id for this device_id
query = BAKERY(lambda s: s.query(
models_v2.Port.id))
query += lambda q: q.filter(
models_v2.Port.device_id == sa.bindparam('device_id'))
port = query(session).params(
device_id=device_id).one_or_none()
if port:
port_id, = port
update_ports.append(port_id)
if update_ports:
self._notify_port_update_bulk(self.admin_context, update_ports)
def _query_used_apic_router_ids(self, aim_ctx):
used_ids = netaddr.IPSet()
# Find the l3out_nodes created by us

View File

@ -1927,6 +1927,10 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
subnet['dhcp_server_ports'] = dhcp_ports
return subnets
def _get_nova_vm_name(self, context, port):
return self.aim_mech_driver._get_vm_name(context.session,
port['device_id'])
def _send_port_update_notification(self, plugin_context, port):
self.aim_mech_driver._notify_port_update(plugin_context, port)

View File

@ -27,8 +27,6 @@ from oslo_log import log
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import (
mechanism_driver as md)
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
nova_client as nclient)
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
port_ha_ipaddress_binding as ha_ip_db)
@ -234,11 +232,7 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
# Put per mac-address extra info
'extra_details': {}}
# Set VM name if needed.
if port['device_owner'].startswith(
'compute:') and port['device_id']:
vm = nclient.NovaClient().get_server(port['device_id'])
details['vm-name'] = vm.name if vm else port['device_id']
self._set_nova_vm_name(context, port, details)
details['_cache'] = {}
mtu = self._get_port_mtu(context, port, details)
@ -612,11 +606,7 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
# Put per mac-address extra info
'extra_details': {}}
# Set VM name if needed.
if port['device_owner'].startswith(
'compute:') and port['device_id']:
vm = nclient.NovaClient().get_server(port['device_id'])
details['vm-name'] = vm.name if vm else port['device_id']
self._set_nova_vm_name(context, port, details)
details['_cache'] = {}
self._build_up_details_cache(
@ -698,6 +688,17 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
LOG.debug("Details for port %s : %s", port['id'], details)
return details
def _set_nova_vm_name(self, context, port, details):
# Set VM name if needed.
if port['device_owner'].startswith(
'compute:') and port['device_id']:
vm = self._get_nova_vm_name(context, port)
if vm:
vm_name, = vm
else:
vm_name = port['device_id']
details['vm-name'] = vm_name
def _get_owned_addresses(self, plugin_context, port_id):
return set(self.ha_ip_handler.get_ha_ipaddresses_for_port(port_id))

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from neutron.notifiers import nova as n_nova
from novaclient import exceptions as nova_exceptions
from oslo_log import log as logging
@ -41,3 +42,15 @@ class NovaClient(object):
server_id)
except Exception as e:
LOG.exception(e)
def get_servers(self, is_full_update, changes_since_in_sec):
if is_full_update:
search_opts = {'all_tenants': 1}
else:
seconds_ago = (datetime.datetime.now() -
datetime.timedelta(seconds=changes_since_in_sec))
search_opts = {'all_tenants': 1,
'changes-since': str(seconds_ago),
'deleted': 'false'}
return self.client.servers.list(detailed=False,
search_opts=search_opts)

View File

@ -14,6 +14,7 @@
# under the License.
import copy
import datetime
import fixtures
import mock
import netaddr
@ -241,6 +242,10 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
def setUp(self, mechanism_drivers=None, tenant_network_types=None,
plugin=None, ext_mgr=None):
self.nova_client = mock.patch(
'gbpservice.neutron.services.grouppolicy.drivers.cisco.'
'apic.nova_client.NovaClient.get_servers').start()
self.nova_client.return_value = []
# Enable the test mechanism driver to ensure that
# we can successfully call through to all mechanism
# driver apis.
@ -1731,6 +1736,50 @@ class TestAimMapping(ApicAimTestCase):
mock.call(mock.ANY, tenant)]
self._check_call_list(exp_calls, self.driver.aim.delete.call_args_list)
def test_update_nova_vm_name_cache(self):
# VM cache is empty to being with
self.assertEqual(self.driver._get_vm_names(self.db_session),
[])
# Add one vm
vm = mock.Mock()
vm.id = 'some_id'
vm.name = 'some_name'
self.nova_client.return_value = [vm]
self.driver._update_nova_vm_name_cache()
self.assertEqual(self.driver._get_vm_names(self.db_session),
[('some_id', 'some_name')])
# Update vm name
vm.name = 'new_name'
self.nova_client.return_value = [vm]
self.driver._update_nova_vm_name_cache()
self.assertEqual(self.driver._get_vm_names(self.db_session),
[('some_id', 'new_name')])
# Simulate the polling thread from the other controller
# will just stand by
old_id = self.driver.host_id
self.driver.host_id = 'new_id'
vm.name = 'old_name'
self.nova_client.return_value = [vm]
self.driver._update_nova_vm_name_cache()
self.assertEqual(self.driver._get_vm_names(self.db_session),
[('some_id', 'new_name')])
# VM removal won't be triggered thru incremental update
self.driver.host_id = old_id
self.nova_client.return_value = []
self.driver._update_nova_vm_name_cache()
self.assertEqual(self.driver._get_vm_names(self.db_session),
[('some_id', 'new_name')])
# Force a full update which will take care of the VM removal
vm_update_obj = self.driver._get_vm_name_update(self.db_session)
new_full_update_time = (vm_update_obj.last_full_update_time -
datetime.timedelta(minutes=11))
self.driver._set_vm_name_update(self.db_session, vm_update_obj,
old_id, new_full_update_time, new_full_update_time)
self.nova_client.return_value = []
self.driver._update_nova_vm_name_cache()
self.assertEqual(self.driver._get_vm_names(self.db_session),
[])
def test_multi_scope_routing_with_unscoped_pools(self):
self._test_multi_scope_routing(True)

View File

@ -120,6 +120,10 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
def setUp(self, policy_drivers=None, core_plugin=None, ml2_options=None,
l3_plugin=None, sc_plugin=None, trunk_plugin=None,
qos_plugin=None, **kwargs):
self.nova_client1 = mock.patch(
'gbpservice.neutron.services.grouppolicy.drivers.cisco.'
'apic.nova_client.NovaClient.get_servers').start()
self.nova_client1.return_value = []
core_plugin = core_plugin or ML2PLUS_PLUGIN
if not l3_plugin:
l3_plugin = "apic_aim_l3"