[AIM] Bulk extension for ML2Plus

Improve performance of bulk read operations when using the
AIM driver by implementing a bulk extension mechanism.

Changing the extension driver framework, we can build extension
calls that have visibility over all the networks that need to be
extended, and thus can run bulk queries against the database to
reduce the number of round-trips to a constant.

Every query will still be slower as the number of objects grow,
but with this we cut down the round-trip overhead.

Change-Id: Ib884668228a5dc9be48477ca2245d762cd41bcc0
This commit is contained in:
Ivar Lazzaro 2018-07-20 16:17:40 -07:00 committed by Thomas Bachman
parent 427d299c89
commit ac49921cff
8 changed files with 495 additions and 120 deletions

View File

@ -18,6 +18,8 @@ import six
from neutron.plugins.ml2 import driver_api
BULK_EXTENDED = 'ml2plus:_bulk_extended'
@six.add_metaclass(abc.ABCMeta)
class SubnetPoolContext(object):

View File

@ -127,6 +127,10 @@ class DbMixin(object):
filter_by(network_id=network_id).
one_or_none())
def _get_network_mapping_bulk(self, session, network_ids):
return session.query(NetworkMapping).filter(
NetworkMapping.network_id.in_(network_ids)).all()
def _get_network_mappings_for_vrf(self, session, vrf):
return (session.query(NetworkMapping).
filter_by(vrf_tenant_name=vrf.tenant_name,

View File

@ -58,6 +58,10 @@ class NetworkExtensionCidrDb(model_base.BASEV2):
sa.String(36), sa.ForeignKey('networks.id', ondelete="CASCADE"),
primary_key=True)
cidr = sa.Column(sa.String(64), primary_key=True)
network = orm.relationship(models_v2.Network,
backref=orm.backref(
'aim_extension_cidr_mapping', lazy='joined',
uselist=True, cascade='delete'))
class NetworkExtNestedDomainAllowedVlansDb(model_base.BASEV2):
@ -68,6 +72,11 @@ class NetworkExtNestedDomainAllowedVlansDb(model_base.BASEV2):
vlan = sa.Column(sa.Integer(), primary_key=True)
network_id = sa.Column(
sa.String(36), sa.ForeignKey('networks.id', ondelete="CASCADE"))
network = orm.relationship(models_v2.Network,
backref=orm.backref(
'aim_extension_domain_mapping',
uselist=True,
lazy='joined', cascade='delete'))
class SubnetExtensionDb(model_base.BASEV2):
@ -98,41 +107,62 @@ class ExtensionDbMixin(object):
res_dict[res_attr] = db_attr
def get_network_extn_db(self, session, network_id):
db_obj = (session.query(NetworkExtensionDb).filter_by(
network_id=network_id).first())
db_cidrs = (session.query(NetworkExtensionCidrDb).filter_by(
network_id=network_id).all())
return self.get_network_extn_db_bulk(session, [network_id]).get(
network_id, {})
def get_network_extn_db_bulk(self, session, network_ids):
db_objs = (session.query(NetworkExtensionDb).filter(
NetworkExtensionDb.network_id.in_(network_ids)).all())
db_cidrs = (session.query(NetworkExtensionCidrDb).filter(
NetworkExtensionCidrDb.network_id.in_(network_ids)).all())
db_vlans = (session.query(
NetworkExtNestedDomainAllowedVlansDb).filter_by(
network_id=network_id).all())
NetworkExtNestedDomainAllowedVlansDb).filter(
NetworkExtNestedDomainAllowedVlansDb.network_id.in_(
network_ids)).all())
cidrs_by_net_id = {}
vlans_by_net_id = {}
for db_cidr in db_cidrs:
cidrs_by_net_id.setdefault(db_cidr.network_id, []).append(
db_cidr)
for db_vlan in db_vlans:
vlans_by_net_id.setdefault(db_vlan.network_id, []).append(
db_vlan)
result = {}
if db_obj:
self._set_if_not_none(result, cisco_apic.EXTERNAL_NETWORK,
db_obj['external_network_dn'])
self._set_if_not_none(result, cisco_apic.NAT_TYPE,
db_obj['nat_type'])
self._set_if_not_none(result, cisco_apic.SVI, db_obj['svi'])
result[cisco_apic.BGP] = db_obj['bgp_enable']
result[cisco_apic.BGP_TYPE] = db_obj['bgp_type']
result[cisco_apic.BGP_ASN] = db_obj['bgp_asn']
result[cisco_apic.NESTED_DOMAIN_NAME] = (
db_obj['nested_domain_name'])
result[cisco_apic.NESTED_DOMAIN_TYPE] = (
db_obj['nested_domain_type'])
result[cisco_apic.NESTED_DOMAIN_INFRA_VLAN] = (
db_obj['nested_domain_infra_vlan'])
result[cisco_apic.NESTED_DOMAIN_SERVICE_VLAN] = (
db_obj['nested_domain_service_vlan'])
result[cisco_apic.NESTED_DOMAIN_NODE_NETWORK_VLAN] = (
db_obj['nested_domain_node_network_vlan'])
result[cisco_apic.NESTED_DOMAIN_ALLOWED_VLANS] = (
[c['vlan'] for c in db_vlans])
if result.get(cisco_apic.EXTERNAL_NETWORK):
result[cisco_apic.EXTERNAL_CIDRS] = [c['cidr'] for c in db_cidrs]
for db_obj in db_objs:
net_id = db_obj.network_id
result.setdefault(net_id, self.make_network_extn_db_conf_dict(
db_obj, cidrs_by_net_id.get(net_id, []),
vlans_by_net_id.get(net_id, [])))
return result
def make_network_extn_db_conf_dict(self, ext_db, db_cidrs, db_vlans):
net_res = {}
db_obj = ext_db
if db_obj:
self._set_if_not_none(net_res, cisco_apic.EXTERNAL_NETWORK,
db_obj['external_network_dn'])
self._set_if_not_none(net_res, cisco_apic.NAT_TYPE,
db_obj['nat_type'])
self._set_if_not_none(net_res, cisco_apic.SVI, db_obj['svi'])
net_res[cisco_apic.BGP] = db_obj['bgp_enable']
net_res[cisco_apic.BGP_TYPE] = db_obj['bgp_type']
net_res[cisco_apic.BGP_ASN] = db_obj['bgp_asn']
net_res[cisco_apic.NESTED_DOMAIN_NAME] = (
db_obj['nested_domain_name'])
net_res[cisco_apic.NESTED_DOMAIN_TYPE] = (
db_obj['nested_domain_type'])
net_res[cisco_apic.NESTED_DOMAIN_INFRA_VLAN] = (
db_obj['nested_domain_infra_vlan'])
net_res[cisco_apic.NESTED_DOMAIN_SERVICE_VLAN] = (
db_obj['nested_domain_service_vlan'])
net_res[cisco_apic.NESTED_DOMAIN_NODE_NETWORK_VLAN] = (
db_obj['nested_domain_node_network_vlan'])
net_res[cisco_apic.NESTED_DOMAIN_ALLOWED_VLANS] = [
c.vlan for c in db_vlans]
if net_res.get(cisco_apic.EXTERNAL_NETWORK):
net_res[cisco_apic.EXTERNAL_CIDRS] = [c.cidr for c in db_cidrs]
return net_res
def set_network_extn_db(self, session, network_id, res_dict):
with session.begin(subtransactions=True):
db_obj = (session.query(NetworkExtensionDb).filter_by(

View File

@ -62,12 +62,17 @@ class ApicExtensionDriver(api_plus.ExtensionDriver,
def extend_network_dict(self, session, base_model, result):
try:
self._md.extend_network_dict(session, base_model, result)
res_dict = self.get_network_extn_db(session, result['id'])
if cisco_apic.EXTERNAL_NETWORK in res_dict:
result.setdefault(cisco_apic.DIST_NAMES, {})[
cisco_apic.EXTERNAL_NETWORK] = res_dict.pop(
cisco_apic.EXTERNAL_NETWORK)
result.update(res_dict)
except Exception as e:
with excutils.save_and_reraise_exception():
if db_api.is_retriable(e):
LOG.debug("APIC AIM extend_network_dict got retriable "
"exception: %s", type(e))
else:
LOG.exception("APIC AIM extend_network_dict failed")
def extend_network_dict_bulk(self, session, results):
try:
self._md.extend_network_dict_bulk(session, results)
except Exception as e:
with excutils.save_and_reraise_exception():
if db_api.is_retriable(e):

View File

@ -812,52 +812,81 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
self.aim.delete(aim_ctx, epg)
session.delete(mapping)
def extend_network_dict(self, session, network_db, result):
LOG.debug("APIC AIM MD extending dict for network: %s", result)
sync_state = cisco_apic.SYNC_NOT_APPLICABLE
dist_names = {}
def extend_network_dict_bulk(self, session, results, single=False):
# Gather db objects
aim_ctx = aim_context.AimContext(session)
extn_db = extension_db.ExtensionDbMixin()
aim_resources = []
res_dict_by_aim_res_dn = {}
# REVISIT: Check and revert the following to:
# mapping = network_db.aim_mapping
mapping = self._get_network_mapping(session, network_db['id'])
if mapping:
if mapping.epg_name:
bd = self._get_network_bd(mapping)
dist_names[cisco_apic.BD] = bd.dn
sync_state = self._merge_status(aim_ctx, sync_state, bd)
for res_dict, net_db in results:
res_dict[cisco_apic.SYNC_STATE] = cisco_apic.SYNC_NOT_APPLICABLE
res_dict[cisco_apic.DIST_NAMES] = {}
mapping = net_db.aim_mapping
dist_names = res_dict.setdefault(cisco_apic.DIST_NAMES, {})
if not mapping and single:
# Needed because of commit
# d8c1e153f88952b7670399715c2f88f1ecf0a94a in Neutron that
# put the extension call in Pike+ *before* the precommit
# calls happen in network creation. I believe this is a but
# and should be discussed with the Neutron team.
mapping = self._get_network_mapping(session, net_db.id)
if mapping:
if mapping.epg_name:
bd = self._get_network_bd(mapping)
dist_names[cisco_apic.BD] = bd.dn
epg = self._get_network_epg(mapping)
dist_names[cisco_apic.EPG] = epg.dn
aim_resources.extend([bd, epg])
res_dict_by_aim_res_dn[epg.dn] = res_dict
res_dict_by_aim_res_dn[bd.dn] = res_dict
elif mapping.l3out_name:
l3out_ext_net = self._get_network_l3out_ext_net(mapping)
dist_names[cisco_apic.EXTERNAL_NETWORK] = l3out_ext_net.dn
aim_resources.append(l3out_ext_net)
res_dict_by_aim_res_dn[l3out_ext_net.dn] = res_dict
epg = self._get_network_epg(mapping)
dist_names[cisco_apic.EPG] = epg.dn
sync_state = self._merge_status(aim_ctx, sync_state, epg)
# SVI network with auto l3out.
elif mapping.l3out_name:
l3out_ext_net = self._get_network_l3out_ext_net(mapping)
dist_names[cisco_apic.EXTERNAL_NETWORK] = l3out_ext_net.dn
sync_state = self._merge_status(aim_ctx, sync_state,
l3out_ext_net)
vrf = self._get_network_vrf(mapping)
dist_names[cisco_apic.VRF] = vrf.dn
aim_resources.append(vrf)
res_dict_by_aim_res_dn[vrf.dn] = res_dict
if not net_db.aim_extension_mapping and single:
# Needed because of commit
# d8c1e153f88952b7670399715c2f88f1ecf0a94a in Neutron that
# put the extension call in Pike+ *before* the precommit
# calls happen in network creation. I believe this is a but
# and should be discussed with the Neutron team.
ext_dict = extn_db.get_network_extn_db(session, net_db.id)
else:
ext_dict = extn_db.make_network_extn_db_conf_dict(
net_db.aim_extension_mapping,
net_db.aim_extension_cidr_mapping,
net_db.aim_extension_domain_mapping)
if cisco_apic.EXTERNAL_NETWORK in ext_dict:
dn = ext_dict.pop(cisco_apic.EXTERNAL_NETWORK)
a_ext_net = aim_resource.ExternalNetwork.from_dn(dn)
res_dict.setdefault(cisco_apic.DIST_NAMES, {})[
cisco_apic.EXTERNAL_NETWORK] = dn
aim_resources.append(a_ext_net)
res_dict_by_aim_res_dn[a_ext_net.dn] = res_dict
vrf = self._get_network_vrf(mapping)
dist_names[cisco_apic.VRF] = vrf.dn
sync_state = self._merge_status(aim_ctx, sync_state, vrf)
res_dict.update(ext_dict)
# SVI network with pre-existing l3out.
if self._is_preexisting_svi_db(network_db):
_, ext_net, _ = self._get_aim_external_objects_db(session,
network_db)
if ext_net:
sync_state = self._merge_status(aim_ctx, sync_state, ext_net)
# Merge statuses
for status in self.aim.get_statuses(aim_ctx, aim_resources):
res_dict = res_dict_by_aim_res_dn.get(status.resource_dn, {})
res_dict[cisco_apic.SYNC_STATE] = self._merge_status(
aim_ctx,
res_dict.get(cisco_apic.SYNC_STATE,
cisco_apic.SYNC_NOT_APPLICABLE),
None, status=status)
# REVISIT: Should the external network be persisted in the
# mapping along with the other resources?
if network_db.external is not None:
_, ext_net, _ = self._get_aim_nat_strategy_db(session, network_db)
if ext_net:
sync_state = self._merge_status(aim_ctx, sync_state, ext_net)
result[cisco_apic.DIST_NAMES] = dist_names
result[cisco_apic.SYNC_STATE] = sync_state
def extend_network_dict(self, session, network_db, result):
if result.get(api_plus.BULK_EXTENDED):
return
LOG.debug("APIC AIM MD extending dict for network: %s", result)
self.extend_network_dict_bulk(session, [(result, network_db)],
single=True)
def create_subnet_precommit(self, context):
current = context.current
@ -975,6 +1004,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
# they are removed from routers.
def extend_subnet_dict(self, session, subnet_db, result):
if result.get(api_plus.BULK_EXTENDED):
return
LOG.debug("APIC AIM MD extending dict for subnet: %s", result)
sync_state = cisco_apic.SYNC_NOT_APPLICABLE
@ -1108,6 +1139,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
session.delete(mapping)
def extend_address_scope_dict(self, session, scope, result):
if result.get(api_plus.BULK_EXTENDED):
return
LOG.debug("APIC AIM MD extending dict for address scope: %s", result)
# REVISIT: Consider moving to ApicExtensionDriver.
@ -1255,6 +1288,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
self.aim.delete(aim_ctx, contract)
def extend_router_dict(self, session, router_db, result):
if result.get(api_plus.BULK_EXTENDED):
return
LOG.debug("APIC AIM MD extending dict for router: %s", result)
# REVISIT(rkukura): Consider optimizing this method by
@ -1492,6 +1527,9 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
# interfaces.
if not net_intfs:
# First interface for network.
network_db.aim_mapping = (network_db.aim_mapping or
self._get_network_mapping(session,
network_db.id))
if network_db.aim_mapping.epg_name:
bd, epg = self._associate_network_with_vrf(
context, aim_ctx, network_db, vrf, nets_to_notify)
@ -2343,8 +2381,9 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
policy_drivers['aim_mapping'].obj)
return self._gbp_driver
def _merge_status(self, aim_ctx, sync_state, resource):
status = self.aim.get_status(aim_ctx, resource, create_if_absent=False)
def _merge_status(self, aim_ctx, sync_state, resource, status=None):
status = status or self.aim.get_status(aim_ctx, resource,
create_if_absent=False)
if not status:
# REVISIT(rkukura): This should only occur if the AIM
# resource has not yet been created when

View File

@ -229,13 +229,23 @@ class ExtensionManager(managers.ExtensionManager):
# exceptions, as well as to support calling only on extension
# drivers extended for ML2Plus.
def _call_on_dict_driver(self, method_name, session, base_model, result,
extended_only=False):
extended_only=False, has_base_model=True):
# Bulk operations might not be implemented by all drivers
def noop(*args, **kwargs):
pass
for driver in self.ordered_ext_drivers:
if not extended_only or isinstance(
driver.obj, driver_api.ExtensionDriver):
try:
getattr(driver.obj, method_name)(session, base_model,
result)
if not has_base_model:
getattr(driver.obj, method_name, noop)(session,
result)
else:
getattr(driver.obj, method_name, noop)(session,
base_model,
result)
except Exception as e:
if db_api.is_retriable(e):
with excutils.save_and_reraise_exception():
@ -272,3 +282,25 @@ class ExtensionManager(managers.ExtensionManager):
def extend_address_scope_dict(self, session, base_model, result):
self._call_on_dict_driver("extend_address_scope_dict",
session, base_model, result, True)
def extend_subnetpool_dict_bulk(self, session, result):
self._call_on_dict_driver("extend_subnetpool_dict_bulk",
session, None, result, True,
has_base_model=False)
def extend_address_scope_dict_bulk(self, session, result):
self._call_on_dict_driver("extend_address_scope_dict_bulk",
session, None, result, True,
has_base_model=False)
def extend_network_dict_bulk(self, session, result):
self._call_on_dict_driver("extend_network_dict_bulk", session,
None, result, has_base_model=False)
def extend_subnet_dict_bulk(self, session, result):
self._call_on_dict_driver("extend_subnet_dict_bulk", session,
None, result, has_base_model=False)
def extend_port_dict_bulk(self, session, result):
self._call_on_dict_driver("extend_port_dict_bulk", session, None,
result, has_base_model=False)

View File

@ -20,7 +20,9 @@ from gbpservice.neutron import extensions as gbp_extensions
from gbpservice.neutron.extensions import patch # noqa
from gbpservice.neutron.plugins.ml2plus import patch_neutron # noqa
from neutron.common import constants as n_const
from neutron.db import _resource_extend as resource_extend
from neutron.db import _utils as db_utils
from neutron.db import api as db_api
from neutron.db.models import securitygroup as securitygroups_db
from neutron.db import models_v2
@ -44,6 +46,7 @@ from oslo_log import log
from oslo_utils import excutils
from gbpservice.neutron.db import implicitsubnetpool_db
from gbpservice.neutron.plugins.ml2plus import driver_api as api_plus
from gbpservice.neutron.plugins.ml2plus import driver_context
from gbpservice.neutron.plugins.ml2plus import managers
@ -192,6 +195,14 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
plugin.extension_manager.extend_network_dict(
session, netdb, result)
@staticmethod
@resource_extend.extends([net_def.COLLECTION_NAME + '_BULK'])
def _ml2_md_extend_network_dict_bulk(results, _):
plugin = directory.get_plugin()
session = patch_neutron.get_current_session()
with session.begin(subtransactions=True):
plugin.extension_manager.extend_network_dict_bulk(session, results)
@staticmethod
@resource_extend.extends([port_def.COLLECTION_NAME])
def _ml2_md_extend_port_dict(result, portdb):
@ -205,6 +216,14 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
plugin.extension_manager.extend_port_dict(
session, portdb, result)
@staticmethod
@resource_extend.extends([port_def.COLLECTION_NAME + '_BULK'])
def _ml2_md_extend_port_dict_bulk(results, _):
plugin = directory.get_plugin()
session = patch_neutron.get_current_session()
with session.begin(subtransactions=True):
plugin.extension_manager.extend_port_dict_bulk(session, results)
@staticmethod
@resource_extend.extends([subnet_def.COLLECTION_NAME])
def _ml2_md_extend_subnet_dict(result, subnetdb):
@ -218,6 +237,14 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
plugin.extension_manager.extend_subnet_dict(
session, subnetdb, result)
@staticmethod
@resource_extend.extends([subnet_def.COLLECTION_NAME + '_BULK'])
def _ml2_md_extend_subnet_dict_bulk(results, _):
plugin = directory.get_plugin()
session = patch_neutron.get_current_session()
with session.begin(subtransactions=True):
plugin.extension_manager.extend_subnet_dict_bulk(session, results)
@staticmethod
@resource_extend.extends([subnetpool_def.COLLECTION_NAME])
def _ml2_md_extend_subnetpool_dict(result, subnetpooldb):
@ -231,6 +258,15 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
plugin.extension_manager.extend_subnetpool_dict(
session, subnetpooldb, result)
@staticmethod
@resource_extend.extends([subnetpool_def.COLLECTION_NAME + '_BULK'])
def _ml2_md_extend_subnetpool_dict_bulk(results, _):
plugin = directory.get_plugin()
session = patch_neutron.get_current_session()
with session.begin(subtransactions=True):
plugin.extension_manager.extend_subnetpool_dict_bulk(session,
results)
@staticmethod
@resource_extend.extends([as_def.COLLECTION_NAME])
def _ml2_md_extend_address_scope_dict(result, address_scope):
@ -244,6 +280,15 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
plugin.extension_manager.extend_address_scope_dict(
session, address_scope, result)
@staticmethod
@resource_extend.extends([as_def.COLLECTION_NAME + '_BULK'])
def _ml2_md_extend_address_scope_dict_bulk(results, _):
plugin = directory.get_plugin()
session = patch_neutron.get_current_session()
with session.begin(subtransactions=True):
plugin.extension_manager.extend_address_scope_dict_bulk(session,
results)
# Base version does not call _apply_dict_extend_functions()
def _make_address_scope_dict(self, address_scope, fields=None):
res = {'id': address_scope['id'],
@ -510,3 +555,48 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
ip_version=ip_version) or
self.get_implicit_subnetpool_id(context, tenant=None,
ip_version=ip_version))
# REVISIT(ivar): patching bulk gets for extension performance
def _make_networks_dict(self, networks, context):
nets = []
for network in networks:
if network.mtu is None:
# TODO(ivar): also refactor this to run for bulk networks
network.mtu = self._get_network_mtu(network, validate=False)
res = {'id': network['id'],
'name': network['name'],
'tenant_id': network['tenant_id'],
'admin_state_up': network['admin_state_up'],
'mtu': network.get('mtu', n_const.DEFAULT_NETWORK_MTU),
'status': network['status'],
'subnets': [subnet['id']
for subnet in network['subnets']]}
res['shared'] = self._is_network_shared(context,
network.rbac_entries)
nets.append((res, network))
# Bulk extend first
resource_extend.apply_funcs(net_def.COLLECTION_NAME + '_BULK', nets,
None)
result = []
for res, network in nets:
res[api_plus.BULK_EXTENDED] = True
resource_extend.apply_funcs(net_def.COLLECTION_NAME, res, network)
res.pop(api_plus.BULK_EXTENDED, None)
result.append(db_utils.resource_fields(res, []))
return result
@db_api.retry_if_session_inactive()
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
with db_api.context_manager.writer.using(context):
nets_db = super(Ml2PlusPlugin, self)._get_networks(
context, filters, None, sorts, limit, marker, page_reverse)
net_data = self._make_networks_dict(nets_db, context)
self.type_manager.extend_networks_dict_provider(context, net_data)
nets = self._filter_nets_provider(context, net_data, filters)
return [db_utils.resource_fields(net, fields) for net in nets]

View File

@ -3076,24 +3076,62 @@ class TestSyncState(ApicAimTestCase):
@staticmethod
def _get_synced_status(self, context, resource, **kwargs):
status = aim_status.AciStatus.SYNCED
return aim_status.AciStatus(resource_root=resource.root,
sync_status=status)
aim_ctx = context
aim_mgr = aim_manager.AimManager()
res_type, res_id = aim_mgr._get_status_params(aim_ctx, resource)
if not res_type:
return None
return aim_mgr.create(
aim_ctx, aim_status.AciStatus(
resource_root=resource.root, sync_status=status,
resource_type=res_type,
resource_dn=resource.dn,
resource_id=res_id), overwrite=True)
@staticmethod
def _get_pending_status_for_type(resource, type, **kwargs):
def _get_pending_status_for_type(context, resource, type, **kwargs):
status = (isinstance(resource, type) and
aim_status.AciStatus.SYNC_PENDING or
aim_status.AciStatus.SYNCED)
return aim_status.AciStatus(resource_root=resource.root,
sync_status=status)
aim_ctx = context
aim_mgr = aim_manager.AimManager()
res_type, res_id = aim_mgr._get_status_params(aim_ctx, resource)
if not res_type:
return None
return aim_mgr.create(
aim_ctx, aim_status.AciStatus(
resource_root=resource.root, sync_status=status,
resource_type=res_type,
resource_dn=resource.dn,
resource_id=res_id), overwrite=True)
@staticmethod
def _get_failed_status_for_type(resource, type, **kwargs):
def _get_failed_status_for_type(context, resource, type, **kwargs):
status = (isinstance(resource, type) and
aim_status.AciStatus.SYNC_FAILED or
aim_status.AciStatus.SYNC_PENDING)
return aim_status.AciStatus(resource_root=resource.root,
sync_status=status)
aim_ctx = context
aim_mgr = aim_manager.AimManager()
res_type, res_id = aim_mgr._get_status_params(aim_ctx, resource)
if not res_type:
return None
return aim_mgr.create(
aim_ctx, aim_status.AciStatus(
resource_root=resource.root, sync_status=status,
resource_type=res_type,
resource_dn=resource.dn,
resource_id=res_id), overwrite=True)
_aim_get_statuses = aim_manager.AimManager.get_statuses
@staticmethod
def _mocked_get_statuses(self, context, resources):
for res in resources:
self.get_status(context, res)
return TestSyncState._aim_get_statuses(self, context, resources)
def _test_network(self, expected_state):
net = self._make_network(self.fmt, 'net1', True)['network']
@ -3102,58 +3140,76 @@ class TestSyncState(ApicAimTestCase):
net = self._show('networks', net['id'])['network']
self.assertEqual(expected_state, net['apic:synchronization_state'])
net = self._list(
'networks', query_params=('id=%s' % net['id']))['networks'][0]
self.assertEqual(expected_state, net['apic:synchronization_state'])
def test_network_synced(self):
with mock.patch('aim.aim_manager.AimManager.get_status',
TestSyncState._get_synced_status):
self._test_network('synced')
with mock.patch('aim.aim_manager.AimManager.get_statuses',
TestSyncState._mocked_get_statuses):
self._test_network('synced')
def test_network_bd_build(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_pending_status_for_type(
resource, aim_resource.BridgeDomain)
context, resource, aim_resource.BridgeDomain)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_network('build')
with mock.patch('aim.aim_manager.AimManager.get_statuses',
TestSyncState._mocked_get_statuses):
self._test_network('build')
def test_network_bd_error(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_failed_status_for_type(
resource, aim_resource.BridgeDomain)
context, resource, aim_resource.BridgeDomain)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_network('error')
with mock.patch('aim.aim_manager.AimManager.get_statuses',
TestSyncState._mocked_get_statuses):
self._test_network('error')
def test_network_epg_build(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_pending_status_for_type(
resource, aim_resource.EndpointGroup)
context, resource, aim_resource.EndpointGroup)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_network('build')
with mock.patch('aim.aim_manager.AimManager.get_statuses',
TestSyncState._mocked_get_statuses):
self._test_network('build')
def test_network_epg_error(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_failed_status_for_type(
resource, aim_resource.EndpointGroup)
context, resource, aim_resource.EndpointGroup)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_network('error')
with mock.patch('aim.aim_manager.AimManager.get_statuses',
TestSyncState._mocked_get_statuses):
self._test_network('error')
def test_network_vrf_build(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_pending_status_for_type(
resource, aim_resource.VRF)
context, resource, aim_resource.VRF)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_network('build')
with mock.patch('aim.aim_manager.AimManager.get_statuses',
TestSyncState._mocked_get_statuses):
self._test_network('build')
def test_network_vrf_error(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_failed_status_for_type(
resource, aim_resource.VRF)
context, resource, aim_resource.VRF)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_network('error')
with mock.patch('aim.aim_manager.AimManager.get_statuses',
TestSyncState._mocked_get_statuses):
self._test_network('error')
def _test_address_scope(self, expected_state):
scope = self._make_address_scope(self.fmt, 4, name='scope1')[
@ -3163,6 +3219,11 @@ class TestSyncState(ApicAimTestCase):
scope = self._show('address-scopes', scope['id'])['address_scope']
self.assertEqual(expected_state, scope['apic:synchronization_state'])
scope = self._list(
'address-scopes',
query_params=('id=%s' % scope['id']))['address_scopes'][0]
self.assertEqual(expected_state, scope['apic:synchronization_state'])
def test_address_scope_synced(self):
with mock.patch('aim.aim_manager.AimManager.get_status',
TestSyncState._get_synced_status):
@ -3171,7 +3232,7 @@ class TestSyncState(ApicAimTestCase):
def test_address_scope_vrf_build(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_pending_status_for_type(
resource, aim_resource.VRF)
context, resource, aim_resource.VRF)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_address_scope('build')
@ -3179,7 +3240,7 @@ class TestSyncState(ApicAimTestCase):
def test_address_scope_vrf_error(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_failed_status_for_type(
resource, aim_resource.VRF)
context, resource, aim_resource.VRF)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_address_scope('error')
@ -3192,6 +3253,11 @@ class TestSyncState(ApicAimTestCase):
router = self._show('routers', router['id'])['router']
self.assertEqual(expected_state, router['apic:synchronization_state'])
router = self._list(
'routers',
query_params=('id=%s' % router['id']))['routers'][0]
self.assertEqual(expected_state, router['apic:synchronization_state'])
def test_router_synced(self):
with mock.patch('aim.aim_manager.AimManager.get_status',
TestSyncState._get_synced_status):
@ -3200,7 +3266,7 @@ class TestSyncState(ApicAimTestCase):
def test_router_contract_build(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_pending_status_for_type(
resource, aim_resource.Contract)
context, resource, aim_resource.Contract)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_router('build')
@ -3208,7 +3274,7 @@ class TestSyncState(ApicAimTestCase):
def test_router_contract_error(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_failed_status_for_type(
resource, aim_resource.Contract)
context, resource, aim_resource.Contract)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_router('error')
@ -3216,7 +3282,7 @@ class TestSyncState(ApicAimTestCase):
def test_router_subject_build(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_pending_status_for_type(
resource, aim_resource.ContractSubject)
context, resource, aim_resource.ContractSubject)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_router('build')
@ -3224,7 +3290,7 @@ class TestSyncState(ApicAimTestCase):
def test_router_subject_error(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_failed_status_for_type(
resource, aim_resource.ContractSubject)
context, resource, aim_resource.ContractSubject)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_router('error')
@ -3242,6 +3308,11 @@ class TestSyncState(ApicAimTestCase):
router = self._show('routers', router['id'])['router']
self.assertEqual(expected_state, router['apic:synchronization_state'])
router = self._list(
'routers',
query_params=('id=%s' % router['id']))['routers'][0]
self.assertEqual(expected_state, router['apic:synchronization_state'])
def test_router_interface_vrf_synced(self):
with mock.patch('aim.aim_manager.AimManager.get_status',
TestSyncState._get_synced_status):
@ -3250,7 +3321,7 @@ class TestSyncState(ApicAimTestCase):
def test_router_interface_vrf_build(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_pending_status_for_type(
resource, aim_resource.VRF)
context, resource, aim_resource.VRF)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_router_interface_vrf('build')
@ -3258,7 +3329,7 @@ class TestSyncState(ApicAimTestCase):
def test_router_interface_vrf_error(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_failed_status_for_type(
resource, aim_resource.VRF)
context, resource, aim_resource.VRF)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_router_interface_vrf('error')
@ -3277,9 +3348,18 @@ class TestSyncState(ApicAimTestCase):
self.assertEqual(expected_state,
router['apic:synchronization_state'])
router = self._list(
'routers',
query_params=('id=%s' % router['id']))['routers'][0]
self.assertEqual(expected_state, router['apic:synchronization_state'])
subnet = self._show('subnets', subnet['id'])['subnet']
self.assertEqual(expected_state, subnet['apic:synchronization_state'])
subnet = self._list(
'subnets', query_params=('id=%s' % subnet['id']))['subnets'][0]
self.assertEqual(expected_state, subnet['apic:synchronization_state'])
def test_router_interface_subnet_synced(self):
with mock.patch('aim.aim_manager.AimManager.get_status',
TestSyncState._get_synced_status):
@ -3288,7 +3368,7 @@ class TestSyncState(ApicAimTestCase):
def test_router_interface_subnet_build(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_pending_status_for_type(
resource, aim_resource.Subnet)
context, resource, aim_resource.Subnet)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_router_interface_subnet('build')
@ -3296,7 +3376,7 @@ class TestSyncState(ApicAimTestCase):
def test_router_interface_subnet_error(self):
def get_status(self, context, resource, create_if_absent=True):
return TestSyncState._get_failed_status_for_type(
resource, aim_resource.Subnet)
context, resource, aim_resource.Subnet)
with mock.patch('aim.aim_manager.AimManager.get_status', get_status):
self._test_router_interface_subnet('error')
@ -3309,11 +3389,22 @@ class TestSyncState(ApicAimTestCase):
self.assertEqual(expected_state, net['apic:synchronization_state'],
msg)
net = self._list(
'networks', query_params=('id=%s' % net['id']))['networks'][0]
self.assertEqual(expected_state, net['apic:synchronization_state'])
def test_external_network(self):
ext_net = aim_resource.ExternalNetwork.from_dn(self.dn_t1_l1_n1)
ext_net.monitored = True
aim_ctx = aim_context.AimContext(self.db_session)
self.aim_mgr.create(aim_ctx, ext_net)
with mock.patch('aim.aim_manager.AimManager.get_status',
TestSyncState._get_synced_status):
self._test_external_network('synced',
dn=self.dn_t1_l1_n1)
with mock.patch('aim.aim_manager.AimManager.get_statuses',
TestSyncState._mocked_get_statuses):
self._test_external_network('synced',
dn=self.dn_t1_l1_n1)
for expected_status, status_func in [
('build', TestSyncState._get_pending_status_for_type),
@ -3323,12 +3414,14 @@ class TestSyncState(ApicAimTestCase):
aim_resource.BridgeDomain,
aim_resource.VRF]:
def get_status(self, context, resource, create_if_absent=True):
return status_func(resource, a_res)
return status_func(context, resource, a_res)
with mock.patch('aim.aim_manager.AimManager.get_status',
get_status):
self._test_external_network(expected_status,
dn=self.dn_t1_l1_n1,
msg='%s' % a_res)
with mock.patch('aim.aim_manager.AimManager.get_statuses',
TestSyncState._mocked_get_statuses):
self._test_external_network(expected_status,
dn=self.dn_t1_l1_n1,
msg='%s' % a_res)
def test_unmanaged_external_network(self):
self._test_external_network('N/A')
@ -3340,9 +3433,19 @@ class TestSyncState(ApicAimTestCase):
subnet = self._show('subnets', subnet['id'])['subnet']
self.assertEqual(expected_state, subnet['apic:synchronization_state'])
subnet = self._list(
'subnets', query_params=('id=%s' % subnet['id']))['subnets'][0]
self.assertEqual(expected_state, subnet['apic:synchronization_state'])
self._delete("subnets", subnet['id'])
def test_external_subnet(self):
ext_net = aim_resource.ExternalNetwork.from_dn(self.dn_t1_l1_n1)
ext_net.monitored = True
aim_ctx = aim_context.AimContext(self.db_session)
self.aim_mgr.create(aim_ctx, ext_net)
with mock.patch('aim.aim_manager.AimManager.get_status',
TestSyncState._get_synced_status):
self._test_external_subnet('synced',
@ -3352,7 +3455,7 @@ class TestSyncState(ApicAimTestCase):
('build', TestSyncState._get_pending_status_for_type),
('error', TestSyncState._get_failed_status_for_type)]:
def get_status(self, context, resource, **kwargs):
return status_func(resource, aim_resource.Subnet)
return status_func(context, resource, aim_resource.Subnet)
with mock.patch('aim.aim_manager.AimManager.get_status',
get_status):
self._test_external_subnet(expected_status,
@ -4470,6 +4573,13 @@ class TestExtensionAttributes(ApicAimTestCase):
self.assertEqual('', net1['apic:nat_type'])
self.assertEqual(['0.0.0.0/0'], net1[CIDR])
net1 = self._list(
'networks', query_params=('id=%s' % net1['id']))['networks'][0]
self.assertEqual(self.dn_t1_l1_n1,
net1[DN]['ExternalNetwork'])
self.assertEqual('', net1['apic:nat_type'])
self.assertEqual(['0.0.0.0/0'], net1[CIDR])
# create with nat_type set to default, and CIDR specified
net2 = self._make_ext_network('net2',
dn=self.dn_t1_l2_n2,
@ -4478,22 +4588,43 @@ class TestExtensionAttributes(ApicAimTestCase):
self.assertEqual(['10.20.0.0/16', '5.5.5.0/24'],
sorted(net2[CIDR]))
net2 = self._list(
'networks', query_params=('id=%s' % net2['id']))['networks'][0]
self.assertEqual('distributed', net2['apic:nat_type'])
self.assertEqual(['10.20.0.0/16', '5.5.5.0/24'],
sorted(net2[CIDR]))
# update CIDR
net2 = self._update('networks', net2['id'],
{'network': {CIDR: ['20.20.30.0/24']}})['network']
self.assertEqual('distributed', net2['apic:nat_type'])
self.assertEqual(['20.20.30.0/24'], net2[CIDR])
net2 = self._list(
'networks', query_params=('id=%s' % net2['id']))['networks'][0]
self.assertEqual('distributed', net2['apic:nat_type'])
self.assertEqual(['20.20.30.0/24'], net2[CIDR])
net2 = self._update('networks', net2['id'],
{'network': {CIDR: []}})['network']
self.assertEqual([], net2[CIDR])
net2 = self._list(
'networks', query_params=('id=%s' % net2['id']))['networks'][0]
self.assertEqual([], net2[CIDR])
# create without APIC DN -> this is an unmanaged network
net3 = self._make_ext_network('net3')
self.assertTrue(DN not in net3 or 'ExternalNetwork' not in net3[DN])
self.assertFalse('apic:nat_type' in net3)
self.assertFalse(CIDR in net3)
net3 = self._list(
'networks', query_params=('id=%s' % net3['id']))['networks'][0]
self.assertTrue(DN not in net3 or 'ExternalNetwork' not in net3[DN])
self.assertFalse('apic:nat_type' in net3)
self.assertFalse(CIDR in net3)
# updating CIDR of unmanaged network is no-op
net3 = self._update('networks', net3['id'],
{'network': {CIDR: ['30.30.20.0/24']}})['network']
@ -4501,6 +4632,12 @@ class TestExtensionAttributes(ApicAimTestCase):
self.assertFalse('apic:nat_type' in net3)
self.assertFalse(CIDR in net3)
net3 = self._list(
'networks', query_params=('id=%s' % net3['id']))['networks'][0]
self.assertTrue(DN not in net3 or 'ExternalNetwork' not in net3[DN])
self.assertFalse('apic:nat_type' in net3)
self.assertFalse(CIDR in net3)
# delete the external networks
self._delete('networks', net2['id'])
self._delete('networks', net1['id'])
@ -4546,6 +4683,10 @@ class TestExtensionAttributes(ApicAimTestCase):
subnet = self._show('subnets', subnet['id'])['subnet']
self.assertFalse(subnet[SNAT_POOL])
subnet = self._list(
'subnets', query_params=('id=%s' % subnet['id']))['subnets'][0]
self.assertFalse(subnet[SNAT_POOL])
# Update something other than snat_host_pool
subnet = self._update('subnets', subnet['id'],
{'subnet': {'name': 'foo'}})['subnet']
@ -4556,10 +4697,18 @@ class TestExtensionAttributes(ApicAimTestCase):
{'subnet': {SNAT_POOL: True}})['subnet']
self.assertTrue(subnet[SNAT_POOL])
subnet = self._list(
'subnets', query_params=('id=%s' % subnet['id']))['subnets'][0]
self.assertTrue(subnet[SNAT_POOL])
subnet = self._update('subnets', subnet['id'],
{'subnet': {SNAT_POOL: False}})['subnet']
self.assertFalse(subnet[SNAT_POOL])
subnet = self._list(
'subnets', query_params=('id=%s' % subnet['id']))['subnets'][0]
self.assertFalse(subnet[SNAT_POOL])
# delete subnet
self._delete('subnets', subnet['id'])
self.assertFalse(extn.get_subnet_extn_db(session, subnet['id']))
@ -4582,6 +4731,10 @@ class TestExtensionAttributes(ApicAimTestCase):
{'subnet': {SNAT_POOL: True}})['subnet']
self.assertTrue(subnet2[SNAT_POOL])
subnet2 = self._list(
'subnets', query_params=('id=%s' % subnet2['id']))['subnets'][0]
self.assertTrue(subnet2[SNAT_POOL])
def test_router_lifecycle(self):
session = db_api.get_reader_session()
extn = extn_db.ExtensionDbMixin()
@ -4607,12 +4760,22 @@ class TestExtensionAttributes(ApicAimTestCase):
self.assertEqual([], rtr1[PROV])
self.assertEqual(['k'], rtr1[CONS])
rtr1 = self._list(
'routers', query_params=('id=%s' % rtr1['id']))['routers'][0]
self.assertEqual([], rtr1[PROV])
self.assertEqual(['k'], rtr1[CONS])
self._update('routers', rtr1['id'],
{'router': {PROV: ['p1', 'p2']}})
rtr1 = self._show('routers', rtr1['id'])['router']
self.assertEqual(['p1', 'p2'], sorted(rtr1[PROV]))
self.assertEqual(['k'], rtr1[CONS])
rtr1 = self._list(
'routers', query_params=('id=%s' % rtr1['id']))['routers'][0]
self.assertEqual(['p1', 'p2'], sorted(rtr1[PROV]))
self.assertEqual(['k'], rtr1[CONS])
# delete
self._delete('routers', rtr1['id'])
self.assertEqual({PROV: [], CONS: []},
@ -4627,11 +4790,21 @@ class TestExtensionAttributes(ApicAimTestCase):
self.assertEqual([], rtr2[PROV])
self.assertEqual([], rtr2[CONS])
rtr2 = self._list(
'routers', query_params=('id=%s' % rtr2['id']))['routers'][0]
self.assertEqual([], rtr2[PROV])
self.assertEqual([], rtr2[CONS])
rtr2 = self._update('routers', rtr2['id'],
{'router': {PROV: ['p1', 'p2']}})['router']
self.assertEqual(['p1', 'p2'], sorted(rtr2[PROV]))
self.assertEqual([], rtr2[CONS])
rtr2 = self._list(
'routers', query_params=('id=%s' % rtr2['id']))['routers'][0]
self.assertEqual(['p1', 'p2'], sorted(rtr2[PROV]))
self.assertEqual([], rtr2[CONS])
def test_address_scope_lifecycle(self):
session = db_api.get_writer_session()
aim_ctx = aim_context.AimContext(db_session=session)