From ac49921cffba1af9a2f28c77e8a5f78896afd48e Mon Sep 17 00:00:00 2001 From: Ivar Lazzaro Date: Fri, 20 Jul 2018 16:17:40 -0700 Subject: [PATCH] [AIM] Bulk extension for ML2Plus Improve performance of bulk read operations when using the AIM driver by implementing a bulk extension mechanism. Changing the extension driver framework, we can build extension calls that have visibility over all the networks that need to be extended, and thus can run bulk queries against the database to reduce the number of round-trips to a constant. Every query will still be slower as the number of objects grow, but with this we cut down the round-trip overhead. Change-Id: Ib884668228a5dc9be48477ca2245d762cd41bcc0 --- .../neutron/plugins/ml2plus/driver_api.py | 2 + .../plugins/ml2plus/drivers/apic_aim/db.py | 4 + .../ml2plus/drivers/apic_aim/extension_db.py | 92 ++++--- .../drivers/apic_aim/extension_driver.py | 17 +- .../drivers/apic_aim/mechanism_driver.py | 123 ++++++--- .../neutron/plugins/ml2plus/managers.py | 38 ++- gbpservice/neutron/plugins/ml2plus/plugin.py | 90 +++++++ .../unit/plugins/ml2plus/test_apic_aim.py | 249 +++++++++++++++--- 8 files changed, 495 insertions(+), 120 deletions(-) diff --git a/gbpservice/neutron/plugins/ml2plus/driver_api.py b/gbpservice/neutron/plugins/ml2plus/driver_api.py index 8fc64cd83..c72f1b428 100644 --- a/gbpservice/neutron/plugins/ml2plus/driver_api.py +++ b/gbpservice/neutron/plugins/ml2plus/driver_api.py @@ -18,6 +18,8 @@ import six from neutron.plugins.ml2 import driver_api +BULK_EXTENDED = 'ml2plus:_bulk_extended' + @six.add_metaclass(abc.ABCMeta) class SubnetPoolContext(object): diff --git a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/db.py b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/db.py index a111baa57..41076d285 100644 --- a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/db.py +++ b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/db.py @@ -127,6 +127,10 @@ class DbMixin(object): filter_by(network_id=network_id). one_or_none()) + def _get_network_mapping_bulk(self, session, network_ids): + return session.query(NetworkMapping).filter( + NetworkMapping.network_id.in_(network_ids)).all() + def _get_network_mappings_for_vrf(self, session, vrf): return (session.query(NetworkMapping). filter_by(vrf_tenant_name=vrf.tenant_name, diff --git a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/extension_db.py b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/extension_db.py index fd4904562..e8d0b2039 100644 --- a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/extension_db.py +++ b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/extension_db.py @@ -58,6 +58,10 @@ class NetworkExtensionCidrDb(model_base.BASEV2): sa.String(36), sa.ForeignKey('networks.id', ondelete="CASCADE"), primary_key=True) cidr = sa.Column(sa.String(64), primary_key=True) + network = orm.relationship(models_v2.Network, + backref=orm.backref( + 'aim_extension_cidr_mapping', lazy='joined', + uselist=True, cascade='delete')) class NetworkExtNestedDomainAllowedVlansDb(model_base.BASEV2): @@ -68,6 +72,11 @@ class NetworkExtNestedDomainAllowedVlansDb(model_base.BASEV2): vlan = sa.Column(sa.Integer(), primary_key=True) network_id = sa.Column( sa.String(36), sa.ForeignKey('networks.id', ondelete="CASCADE")) + network = orm.relationship(models_v2.Network, + backref=orm.backref( + 'aim_extension_domain_mapping', + uselist=True, + lazy='joined', cascade='delete')) class SubnetExtensionDb(model_base.BASEV2): @@ -98,41 +107,62 @@ class ExtensionDbMixin(object): res_dict[res_attr] = db_attr def get_network_extn_db(self, session, network_id): - db_obj = (session.query(NetworkExtensionDb).filter_by( - network_id=network_id).first()) - db_cidrs = (session.query(NetworkExtensionCidrDb).filter_by( - network_id=network_id).all()) + return self.get_network_extn_db_bulk(session, [network_id]).get( + network_id, {}) + + def get_network_extn_db_bulk(self, session, network_ids): + db_objs = (session.query(NetworkExtensionDb).filter( + NetworkExtensionDb.network_id.in_(network_ids)).all()) + db_cidrs = (session.query(NetworkExtensionCidrDb).filter( + NetworkExtensionCidrDb.network_id.in_(network_ids)).all()) db_vlans = (session.query( - NetworkExtNestedDomainAllowedVlansDb).filter_by( - network_id=network_id).all()) + NetworkExtNestedDomainAllowedVlansDb).filter( + NetworkExtNestedDomainAllowedVlansDb.network_id.in_( + network_ids)).all()) + cidrs_by_net_id = {} + vlans_by_net_id = {} + for db_cidr in db_cidrs: + cidrs_by_net_id.setdefault(db_cidr.network_id, []).append( + db_cidr) + for db_vlan in db_vlans: + vlans_by_net_id.setdefault(db_vlan.network_id, []).append( + db_vlan) result = {} - if db_obj: - self._set_if_not_none(result, cisco_apic.EXTERNAL_NETWORK, - db_obj['external_network_dn']) - self._set_if_not_none(result, cisco_apic.NAT_TYPE, - db_obj['nat_type']) - self._set_if_not_none(result, cisco_apic.SVI, db_obj['svi']) - result[cisco_apic.BGP] = db_obj['bgp_enable'] - result[cisco_apic.BGP_TYPE] = db_obj['bgp_type'] - result[cisco_apic.BGP_ASN] = db_obj['bgp_asn'] - result[cisco_apic.NESTED_DOMAIN_NAME] = ( - db_obj['nested_domain_name']) - result[cisco_apic.NESTED_DOMAIN_TYPE] = ( - db_obj['nested_domain_type']) - result[cisco_apic.NESTED_DOMAIN_INFRA_VLAN] = ( - db_obj['nested_domain_infra_vlan']) - result[cisco_apic.NESTED_DOMAIN_SERVICE_VLAN] = ( - db_obj['nested_domain_service_vlan']) - result[cisco_apic.NESTED_DOMAIN_NODE_NETWORK_VLAN] = ( - db_obj['nested_domain_node_network_vlan']) - result[cisco_apic.NESTED_DOMAIN_ALLOWED_VLANS] = ( - [c['vlan'] for c in db_vlans]) - - if result.get(cisco_apic.EXTERNAL_NETWORK): - result[cisco_apic.EXTERNAL_CIDRS] = [c['cidr'] for c in db_cidrs] - + for db_obj in db_objs: + net_id = db_obj.network_id + result.setdefault(net_id, self.make_network_extn_db_conf_dict( + db_obj, cidrs_by_net_id.get(net_id, []), + vlans_by_net_id.get(net_id, []))) return result + def make_network_extn_db_conf_dict(self, ext_db, db_cidrs, db_vlans): + net_res = {} + db_obj = ext_db + if db_obj: + self._set_if_not_none(net_res, cisco_apic.EXTERNAL_NETWORK, + db_obj['external_network_dn']) + self._set_if_not_none(net_res, cisco_apic.NAT_TYPE, + db_obj['nat_type']) + self._set_if_not_none(net_res, cisco_apic.SVI, db_obj['svi']) + net_res[cisco_apic.BGP] = db_obj['bgp_enable'] + net_res[cisco_apic.BGP_TYPE] = db_obj['bgp_type'] + net_res[cisco_apic.BGP_ASN] = db_obj['bgp_asn'] + net_res[cisco_apic.NESTED_DOMAIN_NAME] = ( + db_obj['nested_domain_name']) + net_res[cisco_apic.NESTED_DOMAIN_TYPE] = ( + db_obj['nested_domain_type']) + net_res[cisco_apic.NESTED_DOMAIN_INFRA_VLAN] = ( + db_obj['nested_domain_infra_vlan']) + net_res[cisco_apic.NESTED_DOMAIN_SERVICE_VLAN] = ( + db_obj['nested_domain_service_vlan']) + net_res[cisco_apic.NESTED_DOMAIN_NODE_NETWORK_VLAN] = ( + db_obj['nested_domain_node_network_vlan']) + net_res[cisco_apic.NESTED_DOMAIN_ALLOWED_VLANS] = [ + c.vlan for c in db_vlans] + if net_res.get(cisco_apic.EXTERNAL_NETWORK): + net_res[cisco_apic.EXTERNAL_CIDRS] = [c.cidr for c in db_cidrs] + return net_res + def set_network_extn_db(self, session, network_id, res_dict): with session.begin(subtransactions=True): db_obj = (session.query(NetworkExtensionDb).filter_by( diff --git a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/extension_driver.py b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/extension_driver.py index dff0af92b..1111b6a43 100644 --- a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/extension_driver.py +++ b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/extension_driver.py @@ -62,12 +62,17 @@ class ApicExtensionDriver(api_plus.ExtensionDriver, def extend_network_dict(self, session, base_model, result): try: self._md.extend_network_dict(session, base_model, result) - res_dict = self.get_network_extn_db(session, result['id']) - if cisco_apic.EXTERNAL_NETWORK in res_dict: - result.setdefault(cisco_apic.DIST_NAMES, {})[ - cisco_apic.EXTERNAL_NETWORK] = res_dict.pop( - cisco_apic.EXTERNAL_NETWORK) - result.update(res_dict) + except Exception as e: + with excutils.save_and_reraise_exception(): + if db_api.is_retriable(e): + LOG.debug("APIC AIM extend_network_dict got retriable " + "exception: %s", type(e)) + else: + LOG.exception("APIC AIM extend_network_dict failed") + + def extend_network_dict_bulk(self, session, results): + try: + self._md.extend_network_dict_bulk(session, results) except Exception as e: with excutils.save_and_reraise_exception(): if db_api.is_retriable(e): diff --git a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py index 731214809..808d0ffa5 100644 --- a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py +++ b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py @@ -812,52 +812,81 @@ class ApicMechanismDriver(api_plus.MechanismDriver, self.aim.delete(aim_ctx, epg) session.delete(mapping) - def extend_network_dict(self, session, network_db, result): - LOG.debug("APIC AIM MD extending dict for network: %s", result) - - sync_state = cisco_apic.SYNC_NOT_APPLICABLE - dist_names = {} + def extend_network_dict_bulk(self, session, results, single=False): + # Gather db objects aim_ctx = aim_context.AimContext(session) + extn_db = extension_db.ExtensionDbMixin() + aim_resources = [] + res_dict_by_aim_res_dn = {} - # REVISIT: Check and revert the following to: - # mapping = network_db.aim_mapping - mapping = self._get_network_mapping(session, network_db['id']) - if mapping: - if mapping.epg_name: - bd = self._get_network_bd(mapping) - dist_names[cisco_apic.BD] = bd.dn - sync_state = self._merge_status(aim_ctx, sync_state, bd) + for res_dict, net_db in results: + res_dict[cisco_apic.SYNC_STATE] = cisco_apic.SYNC_NOT_APPLICABLE + res_dict[cisco_apic.DIST_NAMES] = {} + mapping = net_db.aim_mapping + dist_names = res_dict.setdefault(cisco_apic.DIST_NAMES, {}) + if not mapping and single: + # Needed because of commit + # d8c1e153f88952b7670399715c2f88f1ecf0a94a in Neutron that + # put the extension call in Pike+ *before* the precommit + # calls happen in network creation. I believe this is a but + # and should be discussed with the Neutron team. + mapping = self._get_network_mapping(session, net_db.id) + if mapping: + if mapping.epg_name: + bd = self._get_network_bd(mapping) + dist_names[cisco_apic.BD] = bd.dn + epg = self._get_network_epg(mapping) + dist_names[cisco_apic.EPG] = epg.dn + aim_resources.extend([bd, epg]) + res_dict_by_aim_res_dn[epg.dn] = res_dict + res_dict_by_aim_res_dn[bd.dn] = res_dict + elif mapping.l3out_name: + l3out_ext_net = self._get_network_l3out_ext_net(mapping) + dist_names[cisco_apic.EXTERNAL_NETWORK] = l3out_ext_net.dn + aim_resources.append(l3out_ext_net) + res_dict_by_aim_res_dn[l3out_ext_net.dn] = res_dict - epg = self._get_network_epg(mapping) - dist_names[cisco_apic.EPG] = epg.dn - sync_state = self._merge_status(aim_ctx, sync_state, epg) - # SVI network with auto l3out. - elif mapping.l3out_name: - l3out_ext_net = self._get_network_l3out_ext_net(mapping) - dist_names[cisco_apic.EXTERNAL_NETWORK] = l3out_ext_net.dn - sync_state = self._merge_status(aim_ctx, sync_state, - l3out_ext_net) + vrf = self._get_network_vrf(mapping) + dist_names[cisco_apic.VRF] = vrf.dn + aim_resources.append(vrf) + res_dict_by_aim_res_dn[vrf.dn] = res_dict + if not net_db.aim_extension_mapping and single: + # Needed because of commit + # d8c1e153f88952b7670399715c2f88f1ecf0a94a in Neutron that + # put the extension call in Pike+ *before* the precommit + # calls happen in network creation. I believe this is a but + # and should be discussed with the Neutron team. + ext_dict = extn_db.get_network_extn_db(session, net_db.id) + else: + ext_dict = extn_db.make_network_extn_db_conf_dict( + net_db.aim_extension_mapping, + net_db.aim_extension_cidr_mapping, + net_db.aim_extension_domain_mapping) + if cisco_apic.EXTERNAL_NETWORK in ext_dict: + dn = ext_dict.pop(cisco_apic.EXTERNAL_NETWORK) + a_ext_net = aim_resource.ExternalNetwork.from_dn(dn) + res_dict.setdefault(cisco_apic.DIST_NAMES, {})[ + cisco_apic.EXTERNAL_NETWORK] = dn + aim_resources.append(a_ext_net) + res_dict_by_aim_res_dn[a_ext_net.dn] = res_dict - vrf = self._get_network_vrf(mapping) - dist_names[cisco_apic.VRF] = vrf.dn - sync_state = self._merge_status(aim_ctx, sync_state, vrf) + res_dict.update(ext_dict) - # SVI network with pre-existing l3out. - if self._is_preexisting_svi_db(network_db): - _, ext_net, _ = self._get_aim_external_objects_db(session, - network_db) - if ext_net: - sync_state = self._merge_status(aim_ctx, sync_state, ext_net) + # Merge statuses + for status in self.aim.get_statuses(aim_ctx, aim_resources): + res_dict = res_dict_by_aim_res_dn.get(status.resource_dn, {}) + res_dict[cisco_apic.SYNC_STATE] = self._merge_status( + aim_ctx, + res_dict.get(cisco_apic.SYNC_STATE, + cisco_apic.SYNC_NOT_APPLICABLE), + None, status=status) - # REVISIT: Should the external network be persisted in the - # mapping along with the other resources? - if network_db.external is not None: - _, ext_net, _ = self._get_aim_nat_strategy_db(session, network_db) - if ext_net: - sync_state = self._merge_status(aim_ctx, sync_state, ext_net) - - result[cisco_apic.DIST_NAMES] = dist_names - result[cisco_apic.SYNC_STATE] = sync_state + def extend_network_dict(self, session, network_db, result): + if result.get(api_plus.BULK_EXTENDED): + return + LOG.debug("APIC AIM MD extending dict for network: %s", result) + self.extend_network_dict_bulk(session, [(result, network_db)], + single=True) def create_subnet_precommit(self, context): current = context.current @@ -975,6 +1004,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver, # they are removed from routers. def extend_subnet_dict(self, session, subnet_db, result): + if result.get(api_plus.BULK_EXTENDED): + return LOG.debug("APIC AIM MD extending dict for subnet: %s", result) sync_state = cisco_apic.SYNC_NOT_APPLICABLE @@ -1108,6 +1139,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver, session.delete(mapping) def extend_address_scope_dict(self, session, scope, result): + if result.get(api_plus.BULK_EXTENDED): + return LOG.debug("APIC AIM MD extending dict for address scope: %s", result) # REVISIT: Consider moving to ApicExtensionDriver. @@ -1255,6 +1288,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver, self.aim.delete(aim_ctx, contract) def extend_router_dict(self, session, router_db, result): + if result.get(api_plus.BULK_EXTENDED): + return LOG.debug("APIC AIM MD extending dict for router: %s", result) # REVISIT(rkukura): Consider optimizing this method by @@ -1492,6 +1527,9 @@ class ApicMechanismDriver(api_plus.MechanismDriver, # interfaces. if not net_intfs: # First interface for network. + network_db.aim_mapping = (network_db.aim_mapping or + self._get_network_mapping(session, + network_db.id)) if network_db.aim_mapping.epg_name: bd, epg = self._associate_network_with_vrf( context, aim_ctx, network_db, vrf, nets_to_notify) @@ -2343,8 +2381,9 @@ class ApicMechanismDriver(api_plus.MechanismDriver, policy_drivers['aim_mapping'].obj) return self._gbp_driver - def _merge_status(self, aim_ctx, sync_state, resource): - status = self.aim.get_status(aim_ctx, resource, create_if_absent=False) + def _merge_status(self, aim_ctx, sync_state, resource, status=None): + status = status or self.aim.get_status(aim_ctx, resource, + create_if_absent=False) if not status: # REVISIT(rkukura): This should only occur if the AIM # resource has not yet been created when diff --git a/gbpservice/neutron/plugins/ml2plus/managers.py b/gbpservice/neutron/plugins/ml2plus/managers.py index be458b91b..ef05a990d 100644 --- a/gbpservice/neutron/plugins/ml2plus/managers.py +++ b/gbpservice/neutron/plugins/ml2plus/managers.py @@ -229,13 +229,23 @@ class ExtensionManager(managers.ExtensionManager): # exceptions, as well as to support calling only on extension # drivers extended for ML2Plus. def _call_on_dict_driver(self, method_name, session, base_model, result, - extended_only=False): + extended_only=False, has_base_model=True): + + # Bulk operations might not be implemented by all drivers + def noop(*args, **kwargs): + pass + for driver in self.ordered_ext_drivers: if not extended_only or isinstance( driver.obj, driver_api.ExtensionDriver): try: - getattr(driver.obj, method_name)(session, base_model, - result) + if not has_base_model: + getattr(driver.obj, method_name, noop)(session, + result) + else: + getattr(driver.obj, method_name, noop)(session, + base_model, + result) except Exception as e: if db_api.is_retriable(e): with excutils.save_and_reraise_exception(): @@ -272,3 +282,25 @@ class ExtensionManager(managers.ExtensionManager): def extend_address_scope_dict(self, session, base_model, result): self._call_on_dict_driver("extend_address_scope_dict", session, base_model, result, True) + + def extend_subnetpool_dict_bulk(self, session, result): + self._call_on_dict_driver("extend_subnetpool_dict_bulk", + session, None, result, True, + has_base_model=False) + + def extend_address_scope_dict_bulk(self, session, result): + self._call_on_dict_driver("extend_address_scope_dict_bulk", + session, None, result, True, + has_base_model=False) + + def extend_network_dict_bulk(self, session, result): + self._call_on_dict_driver("extend_network_dict_bulk", session, + None, result, has_base_model=False) + + def extend_subnet_dict_bulk(self, session, result): + self._call_on_dict_driver("extend_subnet_dict_bulk", session, + None, result, has_base_model=False) + + def extend_port_dict_bulk(self, session, result): + self._call_on_dict_driver("extend_port_dict_bulk", session, None, + result, has_base_model=False) diff --git a/gbpservice/neutron/plugins/ml2plus/plugin.py b/gbpservice/neutron/plugins/ml2plus/plugin.py index 3e8befa6c..80f3c5f18 100644 --- a/gbpservice/neutron/plugins/ml2plus/plugin.py +++ b/gbpservice/neutron/plugins/ml2plus/plugin.py @@ -20,7 +20,9 @@ from gbpservice.neutron import extensions as gbp_extensions from gbpservice.neutron.extensions import patch # noqa from gbpservice.neutron.plugins.ml2plus import patch_neutron # noqa +from neutron.common import constants as n_const from neutron.db import _resource_extend as resource_extend +from neutron.db import _utils as db_utils from neutron.db import api as db_api from neutron.db.models import securitygroup as securitygroups_db from neutron.db import models_v2 @@ -44,6 +46,7 @@ from oslo_log import log from oslo_utils import excutils from gbpservice.neutron.db import implicitsubnetpool_db +from gbpservice.neutron.plugins.ml2plus import driver_api as api_plus from gbpservice.neutron.plugins.ml2plus import driver_context from gbpservice.neutron.plugins.ml2plus import managers @@ -192,6 +195,14 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin, plugin.extension_manager.extend_network_dict( session, netdb, result) + @staticmethod + @resource_extend.extends([net_def.COLLECTION_NAME + '_BULK']) + def _ml2_md_extend_network_dict_bulk(results, _): + plugin = directory.get_plugin() + session = patch_neutron.get_current_session() + with session.begin(subtransactions=True): + plugin.extension_manager.extend_network_dict_bulk(session, results) + @staticmethod @resource_extend.extends([port_def.COLLECTION_NAME]) def _ml2_md_extend_port_dict(result, portdb): @@ -205,6 +216,14 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin, plugin.extension_manager.extend_port_dict( session, portdb, result) + @staticmethod + @resource_extend.extends([port_def.COLLECTION_NAME + '_BULK']) + def _ml2_md_extend_port_dict_bulk(results, _): + plugin = directory.get_plugin() + session = patch_neutron.get_current_session() + with session.begin(subtransactions=True): + plugin.extension_manager.extend_port_dict_bulk(session, results) + @staticmethod @resource_extend.extends([subnet_def.COLLECTION_NAME]) def _ml2_md_extend_subnet_dict(result, subnetdb): @@ -218,6 +237,14 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin, plugin.extension_manager.extend_subnet_dict( session, subnetdb, result) + @staticmethod + @resource_extend.extends([subnet_def.COLLECTION_NAME + '_BULK']) + def _ml2_md_extend_subnet_dict_bulk(results, _): + plugin = directory.get_plugin() + session = patch_neutron.get_current_session() + with session.begin(subtransactions=True): + plugin.extension_manager.extend_subnet_dict_bulk(session, results) + @staticmethod @resource_extend.extends([subnetpool_def.COLLECTION_NAME]) def _ml2_md_extend_subnetpool_dict(result, subnetpooldb): @@ -231,6 +258,15 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin, plugin.extension_manager.extend_subnetpool_dict( session, subnetpooldb, result) + @staticmethod + @resource_extend.extends([subnetpool_def.COLLECTION_NAME + '_BULK']) + def _ml2_md_extend_subnetpool_dict_bulk(results, _): + plugin = directory.get_plugin() + session = patch_neutron.get_current_session() + with session.begin(subtransactions=True): + plugin.extension_manager.extend_subnetpool_dict_bulk(session, + results) + @staticmethod @resource_extend.extends([as_def.COLLECTION_NAME]) def _ml2_md_extend_address_scope_dict(result, address_scope): @@ -244,6 +280,15 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin, plugin.extension_manager.extend_address_scope_dict( session, address_scope, result) + @staticmethod + @resource_extend.extends([as_def.COLLECTION_NAME + '_BULK']) + def _ml2_md_extend_address_scope_dict_bulk(results, _): + plugin = directory.get_plugin() + session = patch_neutron.get_current_session() + with session.begin(subtransactions=True): + plugin.extension_manager.extend_address_scope_dict_bulk(session, + results) + # Base version does not call _apply_dict_extend_functions() def _make_address_scope_dict(self, address_scope, fields=None): res = {'id': address_scope['id'], @@ -510,3 +555,48 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin, ip_version=ip_version) or self.get_implicit_subnetpool_id(context, tenant=None, ip_version=ip_version)) + + # REVISIT(ivar): patching bulk gets for extension performance + + def _make_networks_dict(self, networks, context): + nets = [] + for network in networks: + if network.mtu is None: + # TODO(ivar): also refactor this to run for bulk networks + network.mtu = self._get_network_mtu(network, validate=False) + res = {'id': network['id'], + 'name': network['name'], + 'tenant_id': network['tenant_id'], + 'admin_state_up': network['admin_state_up'], + 'mtu': network.get('mtu', n_const.DEFAULT_NETWORK_MTU), + 'status': network['status'], + 'subnets': [subnet['id'] + for subnet in network['subnets']]} + res['shared'] = self._is_network_shared(context, + network.rbac_entries) + nets.append((res, network)) + + # Bulk extend first + resource_extend.apply_funcs(net_def.COLLECTION_NAME + '_BULK', nets, + None) + + result = [] + for res, network in nets: + res[api_plus.BULK_EXTENDED] = True + resource_extend.apply_funcs(net_def.COLLECTION_NAME, res, network) + res.pop(api_plus.BULK_EXTENDED, None) + result.append(db_utils.resource_fields(res, [])) + return result + + @db_api.retry_if_session_inactive() + def get_networks(self, context, filters=None, fields=None, + sorts=None, limit=None, marker=None, page_reverse=False): + with db_api.context_manager.writer.using(context): + nets_db = super(Ml2PlusPlugin, self)._get_networks( + context, filters, None, sorts, limit, marker, page_reverse) + + net_data = self._make_networks_dict(nets_db, context) + + self.type_manager.extend_networks_dict_provider(context, net_data) + nets = self._filter_nets_provider(context, net_data, filters) + return [db_utils.resource_fields(net, fields) for net in nets] diff --git a/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py b/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py index 4308509ab..ffb55c4c7 100644 --- a/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py +++ b/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py @@ -3076,24 +3076,62 @@ class TestSyncState(ApicAimTestCase): @staticmethod def _get_synced_status(self, context, resource, **kwargs): status = aim_status.AciStatus.SYNCED - return aim_status.AciStatus(resource_root=resource.root, - sync_status=status) + + aim_ctx = context + aim_mgr = aim_manager.AimManager() + res_type, res_id = aim_mgr._get_status_params(aim_ctx, resource) + if not res_type: + return None + return aim_mgr.create( + aim_ctx, aim_status.AciStatus( + resource_root=resource.root, sync_status=status, + resource_type=res_type, + resource_dn=resource.dn, + resource_id=res_id), overwrite=True) @staticmethod - def _get_pending_status_for_type(resource, type, **kwargs): + def _get_pending_status_for_type(context, resource, type, **kwargs): status = (isinstance(resource, type) and aim_status.AciStatus.SYNC_PENDING or aim_status.AciStatus.SYNCED) - return aim_status.AciStatus(resource_root=resource.root, - sync_status=status) + + aim_ctx = context + aim_mgr = aim_manager.AimManager() + res_type, res_id = aim_mgr._get_status_params(aim_ctx, resource) + if not res_type: + return None + return aim_mgr.create( + aim_ctx, aim_status.AciStatus( + resource_root=resource.root, sync_status=status, + resource_type=res_type, + resource_dn=resource.dn, + resource_id=res_id), overwrite=True) @staticmethod - def _get_failed_status_for_type(resource, type, **kwargs): + def _get_failed_status_for_type(context, resource, type, **kwargs): status = (isinstance(resource, type) and aim_status.AciStatus.SYNC_FAILED or aim_status.AciStatus.SYNC_PENDING) - return aim_status.AciStatus(resource_root=resource.root, - sync_status=status) + + aim_ctx = context + aim_mgr = aim_manager.AimManager() + res_type, res_id = aim_mgr._get_status_params(aim_ctx, resource) + if not res_type: + return None + return aim_mgr.create( + aim_ctx, aim_status.AciStatus( + resource_root=resource.root, sync_status=status, + resource_type=res_type, + resource_dn=resource.dn, + resource_id=res_id), overwrite=True) + + _aim_get_statuses = aim_manager.AimManager.get_statuses + + @staticmethod + def _mocked_get_statuses(self, context, resources): + for res in resources: + self.get_status(context, res) + return TestSyncState._aim_get_statuses(self, context, resources) def _test_network(self, expected_state): net = self._make_network(self.fmt, 'net1', True)['network'] @@ -3102,58 +3140,76 @@ class TestSyncState(ApicAimTestCase): net = self._show('networks', net['id'])['network'] self.assertEqual(expected_state, net['apic:synchronization_state']) + net = self._list( + 'networks', query_params=('id=%s' % net['id']))['networks'][0] + self.assertEqual(expected_state, net['apic:synchronization_state']) + def test_network_synced(self): with mock.patch('aim.aim_manager.AimManager.get_status', TestSyncState._get_synced_status): - self._test_network('synced') + with mock.patch('aim.aim_manager.AimManager.get_statuses', + TestSyncState._mocked_get_statuses): + self._test_network('synced') def test_network_bd_build(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_pending_status_for_type( - resource, aim_resource.BridgeDomain) + context, resource, aim_resource.BridgeDomain) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): - self._test_network('build') + with mock.patch('aim.aim_manager.AimManager.get_statuses', + TestSyncState._mocked_get_statuses): + self._test_network('build') def test_network_bd_error(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_failed_status_for_type( - resource, aim_resource.BridgeDomain) + context, resource, aim_resource.BridgeDomain) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): - self._test_network('error') + with mock.patch('aim.aim_manager.AimManager.get_statuses', + TestSyncState._mocked_get_statuses): + self._test_network('error') def test_network_epg_build(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_pending_status_for_type( - resource, aim_resource.EndpointGroup) + context, resource, aim_resource.EndpointGroup) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): - self._test_network('build') + with mock.patch('aim.aim_manager.AimManager.get_statuses', + TestSyncState._mocked_get_statuses): + self._test_network('build') def test_network_epg_error(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_failed_status_for_type( - resource, aim_resource.EndpointGroup) + context, resource, aim_resource.EndpointGroup) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): - self._test_network('error') + with mock.patch('aim.aim_manager.AimManager.get_statuses', + TestSyncState._mocked_get_statuses): + self._test_network('error') def test_network_vrf_build(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_pending_status_for_type( - resource, aim_resource.VRF) + context, resource, aim_resource.VRF) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): - self._test_network('build') + with mock.patch('aim.aim_manager.AimManager.get_statuses', + TestSyncState._mocked_get_statuses): + self._test_network('build') def test_network_vrf_error(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_failed_status_for_type( - resource, aim_resource.VRF) + context, resource, aim_resource.VRF) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): - self._test_network('error') + with mock.patch('aim.aim_manager.AimManager.get_statuses', + TestSyncState._mocked_get_statuses): + self._test_network('error') def _test_address_scope(self, expected_state): scope = self._make_address_scope(self.fmt, 4, name='scope1')[ @@ -3163,6 +3219,11 @@ class TestSyncState(ApicAimTestCase): scope = self._show('address-scopes', scope['id'])['address_scope'] self.assertEqual(expected_state, scope['apic:synchronization_state']) + scope = self._list( + 'address-scopes', + query_params=('id=%s' % scope['id']))['address_scopes'][0] + self.assertEqual(expected_state, scope['apic:synchronization_state']) + def test_address_scope_synced(self): with mock.patch('aim.aim_manager.AimManager.get_status', TestSyncState._get_synced_status): @@ -3171,7 +3232,7 @@ class TestSyncState(ApicAimTestCase): def test_address_scope_vrf_build(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_pending_status_for_type( - resource, aim_resource.VRF) + context, resource, aim_resource.VRF) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): self._test_address_scope('build') @@ -3179,7 +3240,7 @@ class TestSyncState(ApicAimTestCase): def test_address_scope_vrf_error(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_failed_status_for_type( - resource, aim_resource.VRF) + context, resource, aim_resource.VRF) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): self._test_address_scope('error') @@ -3192,6 +3253,11 @@ class TestSyncState(ApicAimTestCase): router = self._show('routers', router['id'])['router'] self.assertEqual(expected_state, router['apic:synchronization_state']) + router = self._list( + 'routers', + query_params=('id=%s' % router['id']))['routers'][0] + self.assertEqual(expected_state, router['apic:synchronization_state']) + def test_router_synced(self): with mock.patch('aim.aim_manager.AimManager.get_status', TestSyncState._get_synced_status): @@ -3200,7 +3266,7 @@ class TestSyncState(ApicAimTestCase): def test_router_contract_build(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_pending_status_for_type( - resource, aim_resource.Contract) + context, resource, aim_resource.Contract) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): self._test_router('build') @@ -3208,7 +3274,7 @@ class TestSyncState(ApicAimTestCase): def test_router_contract_error(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_failed_status_for_type( - resource, aim_resource.Contract) + context, resource, aim_resource.Contract) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): self._test_router('error') @@ -3216,7 +3282,7 @@ class TestSyncState(ApicAimTestCase): def test_router_subject_build(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_pending_status_for_type( - resource, aim_resource.ContractSubject) + context, resource, aim_resource.ContractSubject) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): self._test_router('build') @@ -3224,7 +3290,7 @@ class TestSyncState(ApicAimTestCase): def test_router_subject_error(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_failed_status_for_type( - resource, aim_resource.ContractSubject) + context, resource, aim_resource.ContractSubject) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): self._test_router('error') @@ -3242,6 +3308,11 @@ class TestSyncState(ApicAimTestCase): router = self._show('routers', router['id'])['router'] self.assertEqual(expected_state, router['apic:synchronization_state']) + router = self._list( + 'routers', + query_params=('id=%s' % router['id']))['routers'][0] + self.assertEqual(expected_state, router['apic:synchronization_state']) + def test_router_interface_vrf_synced(self): with mock.patch('aim.aim_manager.AimManager.get_status', TestSyncState._get_synced_status): @@ -3250,7 +3321,7 @@ class TestSyncState(ApicAimTestCase): def test_router_interface_vrf_build(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_pending_status_for_type( - resource, aim_resource.VRF) + context, resource, aim_resource.VRF) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): self._test_router_interface_vrf('build') @@ -3258,7 +3329,7 @@ class TestSyncState(ApicAimTestCase): def test_router_interface_vrf_error(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_failed_status_for_type( - resource, aim_resource.VRF) + context, resource, aim_resource.VRF) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): self._test_router_interface_vrf('error') @@ -3277,9 +3348,18 @@ class TestSyncState(ApicAimTestCase): self.assertEqual(expected_state, router['apic:synchronization_state']) + router = self._list( + 'routers', + query_params=('id=%s' % router['id']))['routers'][0] + self.assertEqual(expected_state, router['apic:synchronization_state']) + subnet = self._show('subnets', subnet['id'])['subnet'] self.assertEqual(expected_state, subnet['apic:synchronization_state']) + subnet = self._list( + 'subnets', query_params=('id=%s' % subnet['id']))['subnets'][0] + self.assertEqual(expected_state, subnet['apic:synchronization_state']) + def test_router_interface_subnet_synced(self): with mock.patch('aim.aim_manager.AimManager.get_status', TestSyncState._get_synced_status): @@ -3288,7 +3368,7 @@ class TestSyncState(ApicAimTestCase): def test_router_interface_subnet_build(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_pending_status_for_type( - resource, aim_resource.Subnet) + context, resource, aim_resource.Subnet) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): self._test_router_interface_subnet('build') @@ -3296,7 +3376,7 @@ class TestSyncState(ApicAimTestCase): def test_router_interface_subnet_error(self): def get_status(self, context, resource, create_if_absent=True): return TestSyncState._get_failed_status_for_type( - resource, aim_resource.Subnet) + context, resource, aim_resource.Subnet) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): self._test_router_interface_subnet('error') @@ -3309,11 +3389,22 @@ class TestSyncState(ApicAimTestCase): self.assertEqual(expected_state, net['apic:synchronization_state'], msg) + net = self._list( + 'networks', query_params=('id=%s' % net['id']))['networks'][0] + self.assertEqual(expected_state, net['apic:synchronization_state']) + def test_external_network(self): + ext_net = aim_resource.ExternalNetwork.from_dn(self.dn_t1_l1_n1) + ext_net.monitored = True + aim_ctx = aim_context.AimContext(self.db_session) + self.aim_mgr.create(aim_ctx, ext_net) + with mock.patch('aim.aim_manager.AimManager.get_status', TestSyncState._get_synced_status): - self._test_external_network('synced', - dn=self.dn_t1_l1_n1) + with mock.patch('aim.aim_manager.AimManager.get_statuses', + TestSyncState._mocked_get_statuses): + self._test_external_network('synced', + dn=self.dn_t1_l1_n1) for expected_status, status_func in [ ('build', TestSyncState._get_pending_status_for_type), @@ -3323,12 +3414,14 @@ class TestSyncState(ApicAimTestCase): aim_resource.BridgeDomain, aim_resource.VRF]: def get_status(self, context, resource, create_if_absent=True): - return status_func(resource, a_res) + return status_func(context, resource, a_res) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): - self._test_external_network(expected_status, - dn=self.dn_t1_l1_n1, - msg='%s' % a_res) + with mock.patch('aim.aim_manager.AimManager.get_statuses', + TestSyncState._mocked_get_statuses): + self._test_external_network(expected_status, + dn=self.dn_t1_l1_n1, + msg='%s' % a_res) def test_unmanaged_external_network(self): self._test_external_network('N/A') @@ -3340,9 +3433,19 @@ class TestSyncState(ApicAimTestCase): subnet = self._show('subnets', subnet['id'])['subnet'] self.assertEqual(expected_state, subnet['apic:synchronization_state']) + + subnet = self._list( + 'subnets', query_params=('id=%s' % subnet['id']))['subnets'][0] + self.assertEqual(expected_state, subnet['apic:synchronization_state']) + self._delete("subnets", subnet['id']) def test_external_subnet(self): + ext_net = aim_resource.ExternalNetwork.from_dn(self.dn_t1_l1_n1) + ext_net.monitored = True + aim_ctx = aim_context.AimContext(self.db_session) + self.aim_mgr.create(aim_ctx, ext_net) + with mock.patch('aim.aim_manager.AimManager.get_status', TestSyncState._get_synced_status): self._test_external_subnet('synced', @@ -3352,7 +3455,7 @@ class TestSyncState(ApicAimTestCase): ('build', TestSyncState._get_pending_status_for_type), ('error', TestSyncState._get_failed_status_for_type)]: def get_status(self, context, resource, **kwargs): - return status_func(resource, aim_resource.Subnet) + return status_func(context, resource, aim_resource.Subnet) with mock.patch('aim.aim_manager.AimManager.get_status', get_status): self._test_external_subnet(expected_status, @@ -4470,6 +4573,13 @@ class TestExtensionAttributes(ApicAimTestCase): self.assertEqual('', net1['apic:nat_type']) self.assertEqual(['0.0.0.0/0'], net1[CIDR]) + net1 = self._list( + 'networks', query_params=('id=%s' % net1['id']))['networks'][0] + self.assertEqual(self.dn_t1_l1_n1, + net1[DN]['ExternalNetwork']) + self.assertEqual('', net1['apic:nat_type']) + self.assertEqual(['0.0.0.0/0'], net1[CIDR]) + # create with nat_type set to default, and CIDR specified net2 = self._make_ext_network('net2', dn=self.dn_t1_l2_n2, @@ -4478,22 +4588,43 @@ class TestExtensionAttributes(ApicAimTestCase): self.assertEqual(['10.20.0.0/16', '5.5.5.0/24'], sorted(net2[CIDR])) + net2 = self._list( + 'networks', query_params=('id=%s' % net2['id']))['networks'][0] + self.assertEqual('distributed', net2['apic:nat_type']) + self.assertEqual(['10.20.0.0/16', '5.5.5.0/24'], + sorted(net2[CIDR])) + # update CIDR net2 = self._update('networks', net2['id'], {'network': {CIDR: ['20.20.30.0/24']}})['network'] self.assertEqual('distributed', net2['apic:nat_type']) self.assertEqual(['20.20.30.0/24'], net2[CIDR]) + net2 = self._list( + 'networks', query_params=('id=%s' % net2['id']))['networks'][0] + self.assertEqual('distributed', net2['apic:nat_type']) + self.assertEqual(['20.20.30.0/24'], net2[CIDR]) + net2 = self._update('networks', net2['id'], {'network': {CIDR: []}})['network'] self.assertEqual([], net2[CIDR]) + net2 = self._list( + 'networks', query_params=('id=%s' % net2['id']))['networks'][0] + self.assertEqual([], net2[CIDR]) + # create without APIC DN -> this is an unmanaged network net3 = self._make_ext_network('net3') self.assertTrue(DN not in net3 or 'ExternalNetwork' not in net3[DN]) self.assertFalse('apic:nat_type' in net3) self.assertFalse(CIDR in net3) + net3 = self._list( + 'networks', query_params=('id=%s' % net3['id']))['networks'][0] + self.assertTrue(DN not in net3 or 'ExternalNetwork' not in net3[DN]) + self.assertFalse('apic:nat_type' in net3) + self.assertFalse(CIDR in net3) + # updating CIDR of unmanaged network is no-op net3 = self._update('networks', net3['id'], {'network': {CIDR: ['30.30.20.0/24']}})['network'] @@ -4501,6 +4632,12 @@ class TestExtensionAttributes(ApicAimTestCase): self.assertFalse('apic:nat_type' in net3) self.assertFalse(CIDR in net3) + net3 = self._list( + 'networks', query_params=('id=%s' % net3['id']))['networks'][0] + self.assertTrue(DN not in net3 or 'ExternalNetwork' not in net3[DN]) + self.assertFalse('apic:nat_type' in net3) + self.assertFalse(CIDR in net3) + # delete the external networks self._delete('networks', net2['id']) self._delete('networks', net1['id']) @@ -4546,6 +4683,10 @@ class TestExtensionAttributes(ApicAimTestCase): subnet = self._show('subnets', subnet['id'])['subnet'] self.assertFalse(subnet[SNAT_POOL]) + subnet = self._list( + 'subnets', query_params=('id=%s' % subnet['id']))['subnets'][0] + self.assertFalse(subnet[SNAT_POOL]) + # Update something other than snat_host_pool subnet = self._update('subnets', subnet['id'], {'subnet': {'name': 'foo'}})['subnet'] @@ -4556,10 +4697,18 @@ class TestExtensionAttributes(ApicAimTestCase): {'subnet': {SNAT_POOL: True}})['subnet'] self.assertTrue(subnet[SNAT_POOL]) + subnet = self._list( + 'subnets', query_params=('id=%s' % subnet['id']))['subnets'][0] + self.assertTrue(subnet[SNAT_POOL]) + subnet = self._update('subnets', subnet['id'], {'subnet': {SNAT_POOL: False}})['subnet'] self.assertFalse(subnet[SNAT_POOL]) + subnet = self._list( + 'subnets', query_params=('id=%s' % subnet['id']))['subnets'][0] + self.assertFalse(subnet[SNAT_POOL]) + # delete subnet self._delete('subnets', subnet['id']) self.assertFalse(extn.get_subnet_extn_db(session, subnet['id'])) @@ -4582,6 +4731,10 @@ class TestExtensionAttributes(ApicAimTestCase): {'subnet': {SNAT_POOL: True}})['subnet'] self.assertTrue(subnet2[SNAT_POOL]) + subnet2 = self._list( + 'subnets', query_params=('id=%s' % subnet2['id']))['subnets'][0] + self.assertTrue(subnet2[SNAT_POOL]) + def test_router_lifecycle(self): session = db_api.get_reader_session() extn = extn_db.ExtensionDbMixin() @@ -4607,12 +4760,22 @@ class TestExtensionAttributes(ApicAimTestCase): self.assertEqual([], rtr1[PROV]) self.assertEqual(['k'], rtr1[CONS]) + rtr1 = self._list( + 'routers', query_params=('id=%s' % rtr1['id']))['routers'][0] + self.assertEqual([], rtr1[PROV]) + self.assertEqual(['k'], rtr1[CONS]) + self._update('routers', rtr1['id'], {'router': {PROV: ['p1', 'p2']}}) rtr1 = self._show('routers', rtr1['id'])['router'] self.assertEqual(['p1', 'p2'], sorted(rtr1[PROV])) self.assertEqual(['k'], rtr1[CONS]) + rtr1 = self._list( + 'routers', query_params=('id=%s' % rtr1['id']))['routers'][0] + self.assertEqual(['p1', 'p2'], sorted(rtr1[PROV])) + self.assertEqual(['k'], rtr1[CONS]) + # delete self._delete('routers', rtr1['id']) self.assertEqual({PROV: [], CONS: []}, @@ -4627,11 +4790,21 @@ class TestExtensionAttributes(ApicAimTestCase): self.assertEqual([], rtr2[PROV]) self.assertEqual([], rtr2[CONS]) + rtr2 = self._list( + 'routers', query_params=('id=%s' % rtr2['id']))['routers'][0] + self.assertEqual([], rtr2[PROV]) + self.assertEqual([], rtr2[CONS]) + rtr2 = self._update('routers', rtr2['id'], {'router': {PROV: ['p1', 'p2']}})['router'] self.assertEqual(['p1', 'p2'], sorted(rtr2[PROV])) self.assertEqual([], rtr2[CONS]) + rtr2 = self._list( + 'routers', query_params=('id=%s' % rtr2['id']))['routers'][0] + self.assertEqual(['p1', 'p2'], sorted(rtr2[PROV])) + self.assertEqual([], rtr2[CONS]) + def test_address_scope_lifecycle(self): session = db_api.get_writer_session() aim_ctx = aim_context.AimContext(db_session=session)