Bulk extension support for routers
Change-Id: I64f1853f1e82c301a2d967ef30f7493e7695ab82 (cherry picked from commitd5ae8404e5
) (cherry picked from commit 7887af548b0e44648540e8a5999f45fbf6ad596c) (cherry picked from commit aa320132fa2ed2f1a4e9408513c87bb7c8fc3e11) (cherry picked from commitd3815425ac
)
This commit is contained in:
parent
f95516679d
commit
5925024b09
|
@ -13,6 +13,8 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
from neutron.db import models_v2
|
||||
from neutron_lib.db import model_base
|
||||
import sqlalchemy as sa
|
||||
|
@ -263,6 +265,24 @@ class ExtensionDbMixin(object):
|
|||
[c['contract_name'] for c in db_contracts
|
||||
if not c['provides']]}
|
||||
|
||||
def get_router_extn_db_bulk(self, session, router_ids):
|
||||
# Baked queries using in_ require sqlalchemy >=1.2.
|
||||
db_contracts = (session.query(RouterExtensionContractDb).filter(
|
||||
RouterExtensionContractDb.router_id.in_(router_ids)).all())
|
||||
|
||||
attr_dict = defaultdict(dict)
|
||||
for db_contract in db_contracts:
|
||||
router_id = db_contract['router_id']
|
||||
p_contracts = attr_dict[router_id].setdefault(
|
||||
cisco_apic_l3.EXTERNAL_PROVIDED_CONTRACTS, [])
|
||||
c_contracts = attr_dict[router_id].setdefault(
|
||||
cisco_apic_l3.EXTERNAL_CONSUMED_CONTRACTS, [])
|
||||
if db_contract['provides']:
|
||||
p_contracts.append(db_contract['contract_name'])
|
||||
else:
|
||||
c_contracts.append(db_contract['contract_name'])
|
||||
return attr_dict
|
||||
|
||||
def _update_list_attr(self, session, db_model, column,
|
||||
new_values, **filters):
|
||||
if new_values is None:
|
||||
|
|
|
@ -1571,25 +1571,79 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
|
|||
self.aim.delete(aim_ctx, subject)
|
||||
self.aim.delete(aim_ctx, contract)
|
||||
|
||||
def extend_router_dict(self, session, router_db, result):
|
||||
if result.get(api_plus.BULK_EXTENDED):
|
||||
return
|
||||
LOG.debug("APIC AIM MD extending dict for router: %s", result)
|
||||
def extend_router_dict_bulk(self, session, results):
|
||||
LOG.debug("APIC AIM MD extending dict bulk for router: %s",
|
||||
results)
|
||||
|
||||
sync_state = cisco_apic.SYNC_SYNCED
|
||||
dist_names = {}
|
||||
# Gather db objects
|
||||
aim_ctx = aim_context.AimContext(session)
|
||||
aim_resources_aggregate = []
|
||||
res_dict_by_aim_res_dn = {}
|
||||
# template to track the status related info
|
||||
# for each resource.
|
||||
aim_status_track_template = {
|
||||
SYNC_STATE_TMP: cisco_apic.SYNC_NOT_APPLICABLE,
|
||||
AIM_RESOURCES_CNT: 0}
|
||||
|
||||
contract, subject = self._map_router(session, router_db)
|
||||
for res_dict in results:
|
||||
aim_resources = []
|
||||
res_dict[cisco_apic.SYNC_STATE] = cisco_apic.SYNC_NOT_APPLICABLE
|
||||
# Use a tmp field to aggregate the status across mapped
|
||||
# AIM objects, we set the actual sync_state only if we
|
||||
# are able to process all the status objects for these
|
||||
# corresponding AIM resources. If any status object is not
|
||||
# available then sync_state will be 'build'. On create,
|
||||
# subnets start in 'N/A'. The tracking object is added
|
||||
# along with the res_dict on the DN based res_dict_by_aim_res_dn
|
||||
# dict which maintains the mapping from status objs to res_dict.
|
||||
aim_status_track = copy.deepcopy(aim_status_track_template)
|
||||
|
||||
dist_names[a_l3.CONTRACT] = contract.dn
|
||||
sync_state = self._merge_status(aim_ctx, sync_state, contract)
|
||||
res_dict[cisco_apic.DIST_NAMES] = {}
|
||||
res_dict_and_aim_status_track = (res_dict, aim_status_track)
|
||||
dist_names = res_dict[cisco_apic.DIST_NAMES]
|
||||
|
||||
dist_names[a_l3.CONTRACT_SUBJECT] = subject.dn
|
||||
sync_state = self._merge_status(aim_ctx, sync_state, subject)
|
||||
contract, subject = self._map_router(session, res_dict)
|
||||
dist_names[a_l3.CONTRACT] = contract.dn
|
||||
aim_resources.append(contract)
|
||||
res_dict_by_aim_res_dn[contract.dn] = res_dict_and_aim_status_track
|
||||
dist_names[a_l3.CONTRACT_SUBJECT] = subject.dn
|
||||
aim_resources.append(subject)
|
||||
res_dict_by_aim_res_dn[subject.dn] = res_dict_and_aim_status_track
|
||||
|
||||
result[cisco_apic.DIST_NAMES] = dist_names
|
||||
result[cisco_apic.SYNC_STATE] = sync_state
|
||||
# Track the number of AIM resources in aim_status_track,
|
||||
# decrement count each time we process a status obj related to
|
||||
# the resource. If the count hits zero then we have processed
|
||||
# the status objs for all of the associated AIM resources. Until
|
||||
# this happens, the sync_state is held as 'build' (unless it has
|
||||
# to be set to 'error').
|
||||
aim_status_track[AIM_RESOURCES_CNT] = len(aim_resources)
|
||||
aim_resources_aggregate.extend(aim_resources)
|
||||
|
||||
# Merge statuses
|
||||
for status in self.aim.get_statuses(aim_ctx, aim_resources_aggregate):
|
||||
res_dict, aim_status_track = res_dict_by_aim_res_dn.get(
|
||||
status.resource_dn, ({}, {}))
|
||||
if res_dict and aim_status_track:
|
||||
aim_status_track[SYNC_STATE_TMP] = self._merge_status(
|
||||
aim_ctx,
|
||||
aim_status_track.get(SYNC_STATE_TMP,
|
||||
cisco_apic.SYNC_NOT_APPLICABLE),
|
||||
None, status=status)
|
||||
aim_status_track[AIM_RESOURCES_CNT] -= 1
|
||||
if (aim_status_track[AIM_RESOURCES_CNT] == 0 or
|
||||
(aim_status_track[SYNC_STATE_TMP] is
|
||||
cisco_apic.SYNC_ERROR)):
|
||||
# if this is zero then all the AIM resources corresponding,
|
||||
# to this neutron resource are processed and we can
|
||||
# accurately reflect the actual sync_state. Anytime we
|
||||
# encounter an error - we reflect that immediately even
|
||||
# if we are not done with the AIM resources processing.
|
||||
res_dict[cisco_apic.SYNC_STATE] = (
|
||||
aim_status_track[SYNC_STATE_TMP])
|
||||
|
||||
def extend_router_dict(self, session, router_db, result):
|
||||
LOG.debug("APIC AIM MD extending dict for router: %s", result)
|
||||
self.extend_router_dict_bulk(session, [result])
|
||||
|
||||
def add_router_interface(self, context, router, port, subnets):
|
||||
LOG.debug("APIC AIM MD adding subnets %(subnets)s to router "
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# under the License.
|
||||
|
||||
from neutron.api import extensions
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db import common_db_mixin
|
||||
from neutron.db import db_base_plugin_v2
|
||||
from neutron.db import dns_db
|
||||
|
@ -34,10 +35,13 @@ from gbpservice._i18n import _LE
|
|||
from gbpservice._i18n import _LI
|
||||
from gbpservice.neutron import extensions as extensions_pkg
|
||||
from gbpservice.neutron.extensions import cisco_apic_l3 as l3_ext
|
||||
from gbpservice.neutron.plugins.ml2plus import driver_api as api_plus
|
||||
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import (
|
||||
extension_db as extn_db)
|
||||
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import (
|
||||
mechanism_driver as md)
|
||||
from gbpservice.neutron.plugins.ml2plus import patch_neutron # noqa
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -78,6 +82,9 @@ class ApicL3Plugin(common_db_mixin.CommonDbMixin,
|
|||
return self._mechanism_driver
|
||||
|
||||
def _extend_router_dict_apic(self, router_res, router_db):
|
||||
if router_res.get(api_plus.BULK_EXTENDED):
|
||||
return
|
||||
|
||||
LOG.debug("APIC AIM L3 Plugin extending router dict: %s", router_res)
|
||||
session = inspect(router_db).session
|
||||
try:
|
||||
|
@ -90,6 +97,56 @@ class ApicL3Plugin(common_db_mixin.CommonDbMixin,
|
|||
db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
|
||||
l3.ROUTERS, ['_extend_router_dict_apic'])
|
||||
|
||||
def _extend_router_dict_bulk_apic(self, routers, _):
|
||||
LOG.debug("APIC AIM L3 Plugin bulk extending router dict: %s",
|
||||
routers)
|
||||
if not routers:
|
||||
return
|
||||
session = patch_neutron.get_current_session()
|
||||
try:
|
||||
self._md.extend_router_dict_bulk(session, routers)
|
||||
self._include_router_extn_attr_bulk(session, routers)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception("APIC AIM _extend_router_dict_bulk failed")
|
||||
|
||||
db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
|
||||
l3.ROUTERS + '_BULK', ['_extend_router_dict_bulk_apic'])
|
||||
|
||||
def _make_router_dict_nop(self, router, fields=None,
|
||||
process_extensions=False):
|
||||
return router
|
||||
|
||||
def _make_routers_dict(self, routers, fields=None):
|
||||
results = []
|
||||
for router in routers:
|
||||
res = self._make_router_dict(router, fields,
|
||||
process_extensions=False)
|
||||
res[api_plus.BULK_EXTENDED] = True
|
||||
self._apply_dict_extend_functions(l3.ROUTERS, res, router)
|
||||
res.pop(api_plus.BULK_EXTENDED, None)
|
||||
results.append(res)
|
||||
|
||||
self._apply_dict_extend_functions(l3.ROUTERS + '_BULK',
|
||||
results, None)
|
||||
return results
|
||||
|
||||
# Overwrite the upstream implementation to take advantage
|
||||
# of the bulk extension support.
|
||||
@db_api.retry_if_session_inactive()
|
||||
def get_routers(self, context, filters=None, fields=None,
|
||||
sorts=None, limit=None, marker=None,
|
||||
page_reverse=False):
|
||||
marker_obj = self._get_marker_obj(context, 'router', limit, marker)
|
||||
routers_db = self._get_collection(context, l3_db.Router,
|
||||
self._make_router_dict_nop,
|
||||
filters=filters, fields=fields,
|
||||
sorts=sorts,
|
||||
limit=limit,
|
||||
marker_obj=marker_obj,
|
||||
page_reverse=page_reverse)
|
||||
return self._make_routers_dict(routers_db, fields)
|
||||
|
||||
def create_router(self, context, router):
|
||||
LOG.debug("APIC AIM L3 Plugin creating router: %s", router)
|
||||
self._md.ensure_tenant(context, router['router']['tenant_id'])
|
||||
|
@ -146,6 +203,14 @@ class ApicL3Plugin(common_db_mixin.CommonDbMixin,
|
|||
attr = self.get_router_extn_db(session, router['id'])
|
||||
router.update(attr)
|
||||
|
||||
def _include_router_extn_attr_bulk(self, session, routers):
|
||||
router_ids = [router['id'] for router in routers]
|
||||
attr_dict = self.get_router_extn_db_bulk(session, router_ids)
|
||||
for router in routers:
|
||||
router.update(attr_dict[router['id']] if router['id'] in attr_dict
|
||||
else {l3_ext.EXTERNAL_PROVIDED_CONTRACTS: [],
|
||||
l3_ext.EXTERNAL_CONSUMED_CONTRACTS: []})
|
||||
|
||||
def add_router_interface(self, context, router_id, interface_info):
|
||||
LOG.debug("APIC AIM L3 Plugin adding interface %(interface)s "
|
||||
"to router %(router)s",
|
||||
|
|
|
@ -445,6 +445,49 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
|
|||
self.assertIsNotNone(sg_rule)
|
||||
return sg_rule
|
||||
|
||||
def _get_contract(self, contract_name, tenant_name):
|
||||
session = db_api.get_session()
|
||||
aim_ctx = aim_context.AimContext(session)
|
||||
contract = aim_resource.Contract(tenant_name=tenant_name,
|
||||
name=contract_name)
|
||||
contract = self.aim_mgr.get(aim_ctx, contract)
|
||||
self.assertIsNotNone(contract)
|
||||
return contract
|
||||
|
||||
def _get_subject(self, subject_name, contract_name, tenant_name):
|
||||
session = db_api.get_session()
|
||||
aim_ctx = aim_context.AimContext(session)
|
||||
subject = aim_resource.ContractSubject(tenant_name=tenant_name,
|
||||
contract_name=contract_name,
|
||||
name=subject_name)
|
||||
subject = self.aim_mgr.get(aim_ctx, subject)
|
||||
self.assertIsNotNone(subject)
|
||||
return subject
|
||||
|
||||
def _check_router(self, router):
|
||||
dns = copy.copy(router.get(DN))
|
||||
aname = self.name_mapper.router(None, router['id'])
|
||||
|
||||
aim_contract = self._get_contract(aname, 'common')
|
||||
self.assertEqual('common', aim_contract.tenant_name)
|
||||
self.assertEqual(aname, aim_contract.name)
|
||||
self.assertEqual(router['name'], aim_contract.display_name)
|
||||
self.assertEqual('context', aim_contract.scope) # REVISIT(rkukura)
|
||||
self._check_dn_is_resource(dns, 'Contract', aim_contract)
|
||||
|
||||
aim_subject = self._get_subject('route', aname, 'common')
|
||||
self.assertEqual('common', aim_subject.tenant_name)
|
||||
self.assertEqual(aname, aim_subject.contract_name)
|
||||
self.assertEqual('route', aim_subject.name)
|
||||
self.assertEqual(router['name'], aim_subject.display_name)
|
||||
self.assertEqual([], aim_subject.in_filters)
|
||||
self.assertEqual([], aim_subject.out_filters)
|
||||
self.assertEqual([self.driver.apic_system_id + '_AnyFilter'],
|
||||
aim_subject.bi_filters)
|
||||
self._check_dn_is_resource(dns, 'ContractSubject', aim_subject)
|
||||
|
||||
self.assertFalse(dns)
|
||||
|
||||
def _sg_rule_should_not_exist(self, sg_rule_name):
|
||||
session = db_api.get_session()
|
||||
aim_ctx = aim_context.AimContext(session)
|
||||
|
@ -717,15 +760,6 @@ class TestAimMapping(ApicAimTestCase):
|
|||
name=epg_name)
|
||||
self.assertEqual([], epgs)
|
||||
|
||||
def _get_contract(self, contract_name, tenant_name):
|
||||
session = db_api.get_session()
|
||||
aim_ctx = aim_context.AimContext(session)
|
||||
contract = aim_resource.Contract(tenant_name=tenant_name,
|
||||
name=contract_name)
|
||||
contract = self.aim_mgr.get(aim_ctx, contract)
|
||||
self.assertIsNotNone(contract)
|
||||
return contract
|
||||
|
||||
def _contract_should_not_exist(self, contract_name):
|
||||
session = db_api.get_session()
|
||||
aim_ctx = aim_context.AimContext(session)
|
||||
|
@ -733,16 +767,6 @@ class TestAimMapping(ApicAimTestCase):
|
|||
name=contract_name)
|
||||
self.assertEqual([], contracts)
|
||||
|
||||
def _get_subject(self, subject_name, contract_name, tenant_name):
|
||||
session = db_api.get_session()
|
||||
aim_ctx = aim_context.AimContext(session)
|
||||
subject = aim_resource.ContractSubject(tenant_name=tenant_name,
|
||||
contract_name=contract_name,
|
||||
name=subject_name)
|
||||
subject = self.aim_mgr.get(aim_ctx, subject)
|
||||
self.assertIsNotNone(subject)
|
||||
return subject
|
||||
|
||||
def _subject_should_not_exist(self, subject_name, contract_name):
|
||||
session = db_api.get_session()
|
||||
aim_ctx = aim_context.AimContext(session)
|
||||
|
@ -1016,30 +1040,6 @@ class TestAimMapping(ApicAimTestCase):
|
|||
else:
|
||||
self.assertEqual(aim_sg_rule.icmp_type, 'unspecified')
|
||||
|
||||
def _check_router(self, router):
|
||||
dns = copy.copy(router.get(DN))
|
||||
aname = self.name_mapper.router(None, router['id'])
|
||||
|
||||
aim_contract = self._get_contract(aname, 'common')
|
||||
self.assertEqual('common', aim_contract.tenant_name)
|
||||
self.assertEqual(aname, aim_contract.name)
|
||||
self.assertEqual(router['name'], aim_contract.display_name)
|
||||
self.assertEqual('context', aim_contract.scope) # REVISIT(rkukura)
|
||||
self._check_dn_is_resource(dns, 'Contract', aim_contract)
|
||||
|
||||
aim_subject = self._get_subject('route', aname, 'common')
|
||||
self.assertEqual('common', aim_subject.tenant_name)
|
||||
self.assertEqual(aname, aim_subject.contract_name)
|
||||
self.assertEqual('route', aim_subject.name)
|
||||
self.assertEqual(router['name'], aim_subject.display_name)
|
||||
self.assertEqual([], aim_subject.in_filters)
|
||||
self.assertEqual([], aim_subject.out_filters)
|
||||
self.assertEqual([self.driver.apic_system_id + '_AnyFilter'],
|
||||
aim_subject.bi_filters)
|
||||
self._check_dn_is_resource(dns, 'ContractSubject', aim_subject)
|
||||
|
||||
self.assertFalse(dns)
|
||||
|
||||
def _check_router_deleted(self, router):
|
||||
aname = self.name_mapper.router(None, router['id'])
|
||||
self._subject_should_not_exist('route', aname)
|
||||
|
@ -5127,11 +5127,6 @@ class TestExtensionAttributes(ApicAimTestCase):
|
|||
self.assertEqual(['p1', 'p2'], sorted(rtr1[PROV]))
|
||||
self.assertEqual(['k'], rtr1[CONS])
|
||||
|
||||
# delete
|
||||
self._delete('routers', rtr1['id'])
|
||||
self.assertEqual({PROV: [], CONS: []},
|
||||
extn.get_router_extn_db(session, rtr1['id']))
|
||||
|
||||
# Simulate a prior existing router (i.e. no extension attrs exist)
|
||||
rtr2 = self._make_router(self.fmt, 'test-tenant', 'router2',
|
||||
arg_list=self.extension_attributes,
|
||||
|
@ -5156,6 +5151,26 @@ class TestExtensionAttributes(ApicAimTestCase):
|
|||
self.assertEqual(['p1', 'p2'], sorted(rtr2[PROV]))
|
||||
self.assertEqual([], rtr2[CONS])
|
||||
|
||||
# Test the full list which will invoke the bulk extension
|
||||
rtrs = self._list('routers')['routers']
|
||||
self.assertEqual(3, len(rtrs))
|
||||
for rtr in rtrs:
|
||||
self._check_router(rtr)
|
||||
if rtr['id'] == rtr0['id']:
|
||||
self.assertEqual([], sorted(rtr[PROV]))
|
||||
self.assertEqual([], rtr[CONS])
|
||||
elif rtr['id'] == rtr1['id']:
|
||||
self.assertEqual(['p1', 'p2'], sorted(rtr1[PROV]))
|
||||
self.assertEqual(['k'], rtr1[CONS])
|
||||
elif rtr['id'] == rtr2['id']:
|
||||
self.assertEqual(['p1', 'p2'], sorted(rtr[PROV]))
|
||||
self.assertEqual([], rtr[CONS])
|
||||
|
||||
# delete
|
||||
self._delete('routers', rtr1['id'])
|
||||
self.assertEqual({PROV: [], CONS: []},
|
||||
extn.get_router_extn_db(session, rtr1['id']))
|
||||
|
||||
def test_address_scope_lifecycle(self):
|
||||
session = db_api.get_session()
|
||||
aim_ctx = aim_context.AimContext(db_session=session)
|
||||
|
|
Loading…
Reference in New Issue