[AIM] Use baked queries

Use sqlalchemy's "baked query" feature to reduce the CPU time spent
building DB queries in the AIM mechanism, policy, extension, and SFC
drivers. The GBP plugin's driver-independent queries are not modified
in this patch.

Change-Id: I5aabb1a9662ec08d6600dc5ad7e173f8ce408df3
This commit is contained in:
Robert Kukura 2018-12-05 20:18:06 -05:00
parent e83a9636a6
commit 8342c6cdf7
14 changed files with 1187 additions and 550 deletions

View File

@ -11,7 +11,14 @@
# under the License.
from neutron_lib.db import model_base
from oslo_log import log
import sqlalchemy as sa
from sqlalchemy.ext import baked
LOG = log.getLogger(__name__)
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
"sqlalchemy baked query cache size exceeded in %s" % __name__))
class ApicAllowedVMNameDB(model_base.BASEV2):
@ -26,15 +33,26 @@ class ApicAllowedVMNameDB(model_base.BASEV2):
class ApicAllowedVMNameDBMixin(object):
def get_l3_policy_allowed_vm_names(self, session, l3_policy_id):
rows = (session.query(ApicAllowedVMNameDB).filter_by(
l3_policy_id=l3_policy_id).all())
query = BAKERY(lambda s: s.query(
ApicAllowedVMNameDB))
query += lambda q: q.filter_by(
l3_policy_id=sa.bindparam('l3_policy_id'))
rows = query(session).params(
l3_policy_id=l3_policy_id).all()
return rows
def get_l3_policy_allowed_vm_name(self, session, l3_policy_id,
allowed_vm_name):
row = (session.query(ApicAllowedVMNameDB).filter_by(
query = BAKERY(lambda s: s.query(
ApicAllowedVMNameDB))
query += lambda q: q.filter_by(
l3_policy_id=sa.bindparam('l3_policy_id'),
allowed_vm_name=sa.bindparam('allowed_vm_name'))
row = query(session).params(
l3_policy_id=l3_policy_id,
allowed_vm_name=allowed_vm_name).one())
allowed_vm_name=allowed_vm_name).one()
return row
def add_l3_policy_allowed_vm_name(self, session, l3_policy_id,

View File

@ -11,7 +11,14 @@
# under the License.
from neutron_lib.db import model_base
from oslo_log import log
import sqlalchemy as sa
from sqlalchemy.ext import baked
LOG = log.getLogger(__name__)
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
"sqlalchemy baked query cache size exceeded in %s" % __name__))
class ApicAutoPtgDB(model_base.BASEV2):
@ -25,8 +32,13 @@ class ApicAutoPtgDB(model_base.BASEV2):
class ApicAutoPtgDBMixin(object):
def get_is_auto_ptg(self, session, policy_target_group_id):
row = (session.query(ApicAutoPtgDB).filter_by(
policy_target_group_id=policy_target_group_id).one())
query = BAKERY(lambda s: s.query(
ApicAutoPtgDB))
query += lambda q: q.filter_by(
policy_target_group_id=sa.bindparam('policy_target_group_id'))
row = query(session).params(
policy_target_group_id=policy_target_group_id).one()
return row['is_auto_ptg']
def set_is_auto_ptg(self, session, policy_target_group_id,

View File

@ -11,7 +11,14 @@
# under the License.
from neutron_lib.db import model_base
from oslo_log import log
import sqlalchemy as sa
from sqlalchemy.ext import baked
LOG = log.getLogger(__name__)
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
"sqlalchemy baked query cache size exceeded in %s" % __name__))
class ApicIntraPtgDB(model_base.BASEV2):
@ -25,15 +32,25 @@ class ApicIntraPtgDB(model_base.BASEV2):
class ApicIntraPtgDBMixin(object):
def get_intra_ptg_allow(self, session, policy_target_group_id):
row = (session.query(ApicIntraPtgDB).filter_by(
policy_target_group_id=policy_target_group_id).one())
query = BAKERY(lambda s: s.query(
ApicIntraPtgDB))
query += lambda q: q.filter_by(
policy_target_group_id=sa.bindparam('policy_target_group_id'))
row = query(session).params(
policy_target_group_id=policy_target_group_id).one()
return row['intra_ptg_allow']
def set_intra_ptg_allow(self, session, policy_target_group_id,
intra_ptg_allow=True):
with session.begin(subtransactions=True):
row = (session.query(ApicIntraPtgDB).filter_by(
policy_target_group_id=policy_target_group_id).first())
query = BAKERY(lambda s: s.query(
ApicIntraPtgDB))
query += lambda q: q.filter_by(
policy_target_group_id=sa.bindparam('policy_target_group_id'))
row = query(session).params(
policy_target_group_id=policy_target_group_id).first()
if not row:
row = ApicIntraPtgDB(
policy_target_group_id=policy_target_group_id,

View File

@ -11,7 +11,14 @@
# under the License.
from neutron_lib.db import model_base
from oslo_log import log
import sqlalchemy as sa
from sqlalchemy.ext import baked
LOG = log.getLogger(__name__)
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
"sqlalchemy baked query cache size exceeded in %s" % __name__))
class ApicReuseBdDB(model_base.BASEV2):
@ -26,8 +33,15 @@ class ApicReuseBdDB(model_base.BASEV2):
class ApicReuseBdDBMixin(object):
def get_reuse_bd_l2policy(self, session, l2_policy_id):
row = (session.query(ApicReuseBdDB).filter_by(
l2_policy_id=l2_policy_id).first())
# REVISIT: This method is not executed in any unit test.
query = BAKERY(lambda s: s.query(
ApicReuseBdDB))
query += lambda q: q.filter_by(
l2_policy_id=sa.bindparam('l2_policy_id'))
row = query(session).params(
l2_policy_id=l2_policy_id).first()
return row
def add_reuse_bd_l2policy(self, session, l2_policy_id,
@ -38,5 +52,11 @@ class ApicReuseBdDBMixin(object):
session.add(row)
def is_reuse_bd_target(self, session, l2_policy_id):
return (session.query(ApicReuseBdDB).filter_by(
target_l2_policy_id=l2_policy_id).first() is not None)
# REVISIT: This method is not executed in any unit test.
query = BAKERY(lambda s: s.query(
ApicReuseBdDB))
query += lambda q: q.filter_by(
target_l2_policy_id=sa.bindparam('l2_policy_id'))
return query(session).params(
l2_policy_id=l2_policy_id).first() is not None

View File

@ -11,7 +11,14 @@
# under the License.
from neutron_lib.db import model_base
from oslo_log import log
import sqlalchemy as sa
from sqlalchemy.ext import baked
LOG = log.getLogger(__name__)
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
"sqlalchemy baked query cache size exceeded in %s" % __name__))
class ApicSegmentationLabelDB(model_base.BASEV2):
@ -25,15 +32,26 @@ class ApicSegmentationLabelDB(model_base.BASEV2):
class ApicSegmentationLabelDBMixin(object):
def get_policy_target_segmentation_labels(self, session, policy_target_id):
rows = (session.query(ApicSegmentationLabelDB).filter_by(
policy_target_id=policy_target_id).all())
query = BAKERY(lambda s: s.query(
ApicSegmentationLabelDB))
query += lambda q: q.filter_by(
policy_target_id=sa.bindparam('policy_target_id'))
rows = query(session).params(
policy_target_id=policy_target_id).all()
return rows
def get_policy_target_segmentation_label(self, session, policy_target_id,
segmentation_label):
row = (session.query(ApicSegmentationLabelDB).filter_by(
query = BAKERY(lambda s: s.query(
ApicSegmentationLabelDB))
query += lambda q: q.filter_by(
policy_target_id=sa.bindparam('policy_target_id'),
segmentation_label=sa.bindparam('segmentation_label'))
row = query(session).params(
policy_target_id=policy_target_id,
segmentation_label=segmentation_label).one())
segmentation_label=segmentation_label).one()
return row
def add_policy_target_segmentation_label(self, session, policy_target_id,

View File

@ -17,10 +17,18 @@ from aim.api import resource as aim_resource
from neutron.db.models import address_scope as as_db
from neutron.db import models_v2
from neutron_lib.db import model_base
from oslo_log import log
import sqlalchemy as sa
from sqlalchemy.ext import baked
from sqlalchemy import orm
LOG = log.getLogger(__name__)
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
"sqlalchemy baked query cache size exceeded in %s" % __name__))
class AddressScopeMapping(model_base.BASEV2):
__tablename__ = 'apic_aim_address_scope_mappings'
@ -88,33 +96,49 @@ class DbMixin(object):
# within the same transaction tries to access its
# aim_mapping relationship after retrieving the
# AddressScope record from the session cache.
scope = (session.query(as_db.AddressScope).
filter_by(id=scope_id).
one_or_none())
query = BAKERY(lambda s: s.query(
as_db.AddressScope))
query += lambda q: q.filter_by(
id=sa.bindparam('scope_id'))
scope = query(session).params(
scope_id=scope_id).one_or_none()
scope.aim_mapping = mapping
return mapping
def _get_address_scope_mapping(self, session, scope_id):
return (session.query(AddressScopeMapping).
filter_by(scope_id=scope_id).
one_or_none())
query = BAKERY(lambda s: s.query(
AddressScopeMapping))
query += lambda q: q.filter_by(
scope_id=sa.bindparam('scope_id'))
return query(session).params(
scope_id=scope_id).one_or_none()
def _get_address_scope_mappings_for_vrf(self, session, vrf):
return (session.query(AddressScopeMapping).
filter_by(vrf_tenant_name=vrf.tenant_name,
vrf_name=vrf.name).
all())
query = BAKERY(lambda s: s.query(
AddressScopeMapping))
query += lambda q: q.filter_by(
vrf_tenant_name=sa.bindparam('tenant_name'),
vrf_name=sa.bindparam('name'))
return query(session).params(
tenant_name=vrf.tenant_name,
name=vrf.name).all()
def _get_address_scopes_owning_vrf(self, session, vrf):
return (session.query(as_db.AddressScope).
join(AddressScopeMapping,
AddressScopeMapping.scope_id == as_db.AddressScope.id).
filter(AddressScopeMapping.vrf_tenant_name ==
vrf.tenant_name,
AddressScopeMapping.vrf_name == vrf.name,
AddressScopeMapping.vrf_owned).
order_by(as_db.AddressScope.ip_version).
all())
query = BAKERY(lambda s: s.query(
as_db.AddressScope))
query += lambda q: q.join(
AddressScopeMapping,
AddressScopeMapping.scope_id == as_db.AddressScope.id)
query += lambda q: q.filter(
AddressScopeMapping.vrf_tenant_name == sa.bindparam('tenant_name'),
AddressScopeMapping.vrf_name == sa.bindparam('name'),
AddressScopeMapping.vrf_owned)
query += lambda q: q.order_by(
as_db.AddressScope.ip_version)
return query(session).params(
tenant_name=vrf.tenant_name,
name=vrf.name).all()
def _get_address_scope_vrf(self, mapping):
return aim_resource.VRF(
@ -149,38 +173,67 @@ class DbMixin(object):
# transaction tries to access its aim_mapping relationship
# after retrieving the Network record from the session
# cache.
net = (session.query(models_v2.Network).
filter_by(id=network_id).
one_or_none())
query = BAKERY(lambda s: s.query(
models_v2.Network))
query += lambda q: q.filter_by(
id=sa.bindparam('network_id'))
net = query(session).params(
network_id=network_id).one_or_none()
net.aim_mapping = mapping
return mapping
def _get_network_mapping(self, session, network_id):
return (session.query(NetworkMapping).
filter_by(network_id=network_id).
one_or_none())
query = BAKERY(lambda s: s.query(
NetworkMapping))
query += lambda q: q.filter_by(
network_id=sa.bindparam('network_id'))
return query(session).params(
network_id=network_id).one_or_none()
def _get_network_mapping_bulk(self, session, network_ids):
return session.query(NetworkMapping).filter(
NetworkMapping.network_id.in_(network_ids)).all()
# REVISIT: This method is not called during any UT, and does
# not appear to be referenced elsewhere in this repository.
if not network_ids:
return []
query = BAKERY(lambda s: s.query(
NetworkMapping))
query += lambda q: q.filter(
NetworkMapping.network_id.in_(
sa.bindparam('network_ids', expanding=True)))
return query(session).params(
network_ids=network_ids).all()
def _get_network_mappings_for_vrf(self, session, vrf):
return (session.query(NetworkMapping).
filter_by(vrf_tenant_name=vrf.tenant_name,
vrf_name=vrf.name).
all())
query = BAKERY(lambda s: s.query(
NetworkMapping))
query += lambda q: q.filter_by(
vrf_tenant_name=sa.bindparam('vrf_tenant_name'),
vrf_name=sa.bindparam('vrf_name'))
return query(session).params(
vrf_tenant_name=vrf.tenant_name,
vrf_name=vrf.name).all()
def _get_network_mappings_for_bd(self, session, bd):
return (session.query(NetworkMapping).
filter_by(bd_tenant_name=bd.tenant_name,
bd_name=bd.name).
all())
query = BAKERY(lambda s: s.query(
NetworkMapping))
query += lambda q: q.filter_by(
bd_tenant_name=sa.bindparam('bd_tenant_name'),
bd_name=sa.bindparam('bd_name'))
return query(session).params(
bd_tenant_name=bd.tenant_name,
bd_name=bd.name).all()
def _is_vrf_used_by_networks(self, session, vrf):
return (session.query(NetworkMapping.network_id).
filter_by(vrf_tenant_name=vrf.tenant_name,
vrf_name=vrf.name).
first() is not None)
query = BAKERY(lambda s: s.query(
NetworkMapping.network_id))
query += lambda q: q.filter_by(
vrf_tenant_name=sa.bindparam('vrf_tenant_name'),
vrf_name=sa.bindparam('vrf_name'))
return query(session).params(
vrf_tenant_name=vrf.tenant_name,
vrf_name=vrf.name).first() is not None
def _get_network_bd(self, mapping):
return aim_resource.BridgeDomain(

View File

@ -15,13 +15,20 @@
from neutron.db import models_v2
from neutron_lib.db import model_base
from oslo_log import log
import sqlalchemy as sa
from sqlalchemy.ext import baked
from sqlalchemy import orm
from sqlalchemy.sql.expression import true
from gbpservice.neutron.extensions import cisco_apic
from gbpservice.neutron.extensions import cisco_apic_l3
LOG = log.getLogger(__name__)
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
"sqlalchemy baked query cache size exceeded in %s" % __name__))
class NetworkExtensionDb(model_base.BASEV2):
@ -115,14 +122,33 @@ class ExtensionDbMixin(object):
network_id, {})
def get_network_extn_db_bulk(self, session, network_ids):
db_objs = (session.query(NetworkExtensionDb).filter(
NetworkExtensionDb.network_id.in_(network_ids)).all())
db_cidrs = (session.query(NetworkExtensionCidrDb).filter(
NetworkExtensionCidrDb.network_id.in_(network_ids)).all())
db_vlans = (session.query(
NetworkExtNestedDomainAllowedVlansDb).filter(
if not network_ids:
return {}
query = BAKERY(lambda s: s.query(
NetworkExtensionDb))
query += lambda q: q.filter(
NetworkExtensionDb.network_id.in_(
sa.bindparam('network_ids', expanding=True)))
db_objs = query(session).params(
network_ids=network_ids).all()
query = BAKERY(lambda s: s.query(
NetworkExtensionCidrDb))
query += lambda q: q.filter(
NetworkExtensionCidrDb.network_id.in_(
sa.bindparam('network_ids', expanding=True)))
db_cidrs = query(session).params(
network_ids=network_ids).all()
query = BAKERY(lambda s: s.query(
NetworkExtNestedDomainAllowedVlansDb))
query += lambda q: q.filter(
NetworkExtNestedDomainAllowedVlansDb.network_id.in_(
network_ids)).all())
sa.bindparam('network_ids', expanding=True)))
db_vlans = query(session).params(
network_ids=network_ids).all()
cidrs_by_net_id = {}
vlans_by_net_id = {}
for db_cidr in db_cidrs:
@ -169,8 +195,13 @@ class ExtensionDbMixin(object):
def set_network_extn_db(self, session, network_id, res_dict):
with session.begin(subtransactions=True):
db_obj = (session.query(NetworkExtensionDb).filter_by(
network_id=network_id).first())
query = BAKERY(lambda s: s.query(
NetworkExtensionDb))
query += lambda q: q.filter_by(
network_id=sa.bindparam('network_id'))
db_obj = query(session).params(
network_id=network_id).first()
db_obj = db_obj or NetworkExtensionDb(network_id=network_id)
if cisco_apic.EXTERNAL_NETWORK in res_dict:
db_obj['external_network_dn'] = (
@ -214,40 +245,71 @@ class ExtensionDbMixin(object):
network_id=network_id)
def get_network_ids_by_ext_net_dn(self, session, dn, lock_update=False):
ids = session.query(NetworkExtensionDb.network_id).filter_by(
external_network_dn=dn)
query = BAKERY(lambda s: s.query(
NetworkExtensionDb.network_id))
query += lambda q: q.filter_by(
external_network_dn=sa.bindparam('dn'))
if lock_update:
ids = ids.with_lockmode('update')
# REVISIT: Eliminate locking.
query += lambda q: q.with_lockmode('update')
ids = query(session).params(dn=dn)
return [i[0] for i in ids]
def get_network_ids_by_l3out_dn(self, session, dn, lock_update=False):
ids = session.query(NetworkExtensionDb.network_id).filter(
NetworkExtensionDb.external_network_dn.like(dn + "/%"))
query = BAKERY(lambda s: s.query(
NetworkExtensionDb.network_id))
query += lambda q: q.filter(
NetworkExtensionDb.external_network_dn.like(
sa.bindparam('dn') + "/%"))
if lock_update:
ids = ids.with_lockmode('update')
# REVISIT: Eliminate locking.
query += lambda q: q.with_lockmode('update')
ids = query(session).params(dn=dn)
return [i[0] for i in ids]
def get_svi_network_ids_by_l3out_dn(self, session, dn, lock_update=False):
ids = session.query(NetworkExtensionDb.network_id).filter(
NetworkExtensionDb.external_network_dn.like(dn + "/%"),
query = BAKERY(lambda s: s.query(
NetworkExtensionDb.network_id))
query += lambda q: q.filter(
NetworkExtensionDb.external_network_dn.like(
sa.bindparam('dn') + "/%"),
NetworkExtensionDb.svi == true())
if lock_update:
ids = ids.with_lockmode('update')
# REVISIT: Eliminate locking.
query += lambda q: q.with_lockmode('update')
ids = query(session).params(dn=dn)
return [i[0] for i in ids]
def get_external_cidrs_by_ext_net_dn(self, session, dn, lock_update=False):
ctab = NetworkExtensionCidrDb
ntab = NetworkExtensionDb
cidrs = session.query(ctab.cidr).join(
ntab, ntab.network_id == ctab.network_id).filter(
ntab.external_network_dn == dn).distinct()
query = BAKERY(lambda s: s.query(
ctab.cidr))
query += lambda q: q.join(
ntab,
ntab.network_id == ctab.network_id)
query += lambda q: q.filter(
ntab.external_network_dn == sa.bindparam('dn'))
query += lambda q: q.distinct()
if lock_update:
cidrs = cidrs.with_lockmode('update')
# REVISIT: Eliminate locking.
query += lambda q: q.with_lockmode('update')
cidrs = query(session).params(dn=dn)
return [c[0] for c in cidrs]
def get_subnet_extn_db(self, session, subnet_id):
db_obj = (session.query(SubnetExtensionDb).filter_by(
subnet_id=subnet_id).first())
query = BAKERY(lambda s: s.query(
SubnetExtensionDb))
query += lambda q: q.filter_by(
subnet_id=sa.bindparam('subnet_id'))
db_obj = query(session).params(
subnet_id=subnet_id).first()
result = {}
if db_obj:
self._set_if_not_none(result, cisco_apic.SNAT_HOST_POOL,
@ -255,16 +317,26 @@ class ExtensionDbMixin(object):
return result
def set_subnet_extn_db(self, session, subnet_id, res_dict):
db_obj = (session.query(SubnetExtensionDb).filter_by(
subnet_id=subnet_id).first())
query = BAKERY(lambda s: s.query(
SubnetExtensionDb))
query += lambda q: q.filter_by(
subnet_id=sa.bindparam('subnet_id'))
db_obj = query(session).params(
subnet_id=subnet_id).first()
db_obj = db_obj or SubnetExtensionDb(subnet_id=subnet_id)
if cisco_apic.SNAT_HOST_POOL in res_dict:
db_obj['snat_host_pool'] = res_dict[cisco_apic.SNAT_HOST_POOL]
session.add(db_obj)
def get_router_extn_db(self, session, router_id):
db_contracts = (session.query(RouterExtensionContractDb).filter_by(
router_id=router_id).all())
query = BAKERY(lambda s: s.query(
RouterExtensionContractDb))
query += lambda q: q.filter_by(
router_id=sa.bindparam('router_id'))
db_contracts = query(session).params(
router_id=router_id).all()
return {cisco_apic_l3.EXTERNAL_PROVIDED_CONTRACTS:
[c['contract_name'] for c in db_contracts if c['provides']],
cisco_apic_l3.EXTERNAL_CONSUMED_CONTRACTS:
@ -275,7 +347,10 @@ class ExtensionDbMixin(object):
new_values, **filters):
if new_values is None:
return
# REVISIT: Can this query be baked?
rows = session.query(db_model).filter_by(**filters).all()
new_values = set(new_values)
for r in rows:
if r[column] in new_values:

View File

@ -13,6 +13,8 @@
import hashlib
import re
import six
import sqlalchemy as sa
from sqlalchemy.ext import baked
from aim import aim_manager
from aim.api import resource as aim_resource
@ -60,6 +62,10 @@ from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
from gbpservice.neutron.services.grouppolicy import plugin as gbp_plugin
LOG = logging.getLogger(__name__)
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
"sqlalchemy baked query cache size exceeded in %s" % __name__))
FORWARD = 'Forward'
REVERSE = 'Reverse'
FILTER_DIRECTIONS = {FORWARD: False, REVERSE: True}
@ -1122,16 +1128,26 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
# the gbp_obj here should be a ptg
l2p_id = gbp_obj['l2_policy_id']
if l2p_id:
l2p_db = session.query(
gpmdb.L2PolicyMapping).filter_by(id=l2p_id).first()
query = BAKERY(lambda s: s.query(
gpmdb.L2PolicyMapping))
query += lambda q: q.filter_by(
id=sa.bindparam('l2p_id'))
l2p_db = query(session).params(
l2p_id=l2p_id).first()
l3p_id = l2p_db['l3_policy_id']
elif aim_resource_class.__name__ == (
aim_resource.BridgeDomain.__name__):
# the gbp_obj here should be a l2p
l3p_id = gbp_obj['l3_policy_id']
if l3p_id:
l3p_db = session.query(
gpmdb.L3PolicyMapping).filter_by(id=l3p_id).first()
query = BAKERY(lambda s: s.query(
gpmdb.L3PolicyMapping))
query += lambda q: q.filter_by(
id=sa.bindparam('l3p_id'))
l3p_db = query(session).params(
l3p_id=l3p_id).first()
tenant_id = l3p_db['tenant_id']
tenant_name = self.name_mapper.project(session, tenant_id)
LOG.debug("Mapped tenant_id %(id)s to %(apic_name)s",
@ -2337,11 +2353,21 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
context = gbp_utils.get_current_context()
# get_network can do a DB write, hence we use a writer
with db_api.context_manager.writer.using(context):
ptg_db = session.query(gpmdb.PolicyTargetGroupMapping).filter_by(
id=ptg_id).first()
query = BAKERY(lambda s: s.query(
gpmdb.PolicyTargetGroupMapping))
query += lambda q: q.filter_by(
id=sa.bindparam('ptg_id'))
ptg_db = query(session).params(
ptg_id=ptg_id).first()
if ptg_db and self._is_auto_ptg(ptg_db):
l2p_db = session.query(gpmdb.L2PolicyMapping).filter_by(
id=ptg_db['l2_policy_id']).first()
query = BAKERY(lambda s: s.query(
gpmdb.L2PolicyMapping))
query += lambda q: q.filter_by(
id=sa.bindparam('l2p_id'))
l2p_db = query(session).params(
l2p_id=ptg_db['l2_policy_id']).first()
network_id = l2p_db['network_id']
admin_context = n_context.get_admin_context()
net = self._get_network(admin_context, network_id)
@ -2392,17 +2418,24 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
self._handle_network_service_policy(context)
def _get_prss_for_policy_rules(self, context, pr_ids):
if not pr_ids:
return []
query = BAKERY(lambda s: s.query(
gpdb.PolicyRuleSet))
query += lambda q: q.join(
gpdb.PRSToPRAssociation,
gpdb.PRSToPRAssociation.policy_rule_set_id ==
gpdb.PolicyRuleSet.id)
query += lambda q: q.join(
gpdb.PolicyRule,
gpdb.PRSToPRAssociation.policy_rule_id == gpdb.PolicyRule.id)
query += lambda q: q.filter(
gpdb.PolicyRule.id.in_(sa.bindparam('pr_ids', expanding=True)))
return [self._get_policy_rule_set(
context._plugin_context, x['id']) for x in (
context._plugin_context.session.query(
gpdb.PolicyRuleSet).join(
gpdb.PRSToPRAssociation,
gpdb.PRSToPRAssociation.policy_rule_set_id ==
gpdb.PolicyRuleSet.id).join(
gpdb.PolicyRule,
gpdb.PRSToPRAssociation.policy_rule_id ==
gpdb.PolicyRule.id).filter(
gpdb.PolicyRule.id.in_(pr_ids)).all())]
query(context._plugin_context.session).params(
pr_ids=pr_ids).all())]
def _get_port_mtu(self, context, port):
if self.advertise_mtu:
@ -2441,7 +2474,11 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
aim_ctx = aim_context.AimContext(session)
contract_name_prefix = alib.get_service_contract_filter_entries(
).keys()[0]
l3ps = session.query(gpmdb.L3PolicyMapping).all()
query = BAKERY(lambda s: s.query(
gpmdb.L3PolicyMapping))
l3ps = query(session).all()
name_mapper = apic_mapper.APICNameMapper()
aim_mgr = aim_manager.AimManager()
self._aim = aim_mgr
@ -2500,49 +2537,63 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
def _validate_l3_policies(self, mgr):
# REVISIT: Implement validation of actual mapping to AIM
# resources.
if mgr.actual_session.query(gpdb.L3Policy).first():
query = BAKERY(lambda s: s.query(
gpdb.L3Policy))
if query(mgr.actual_session).first():
mgr.validation_failed(
"GBP->AIM validation for L3P not yet implemented")
def _validate_l2_policies(self, mgr):
# REVISIT: Implement validation of actual mapping to AIM
# resources.
if mgr.actual_session.query(gpdb.L2Policy).first():
query = BAKERY(lambda s: s.query(
gpdb.L2Policy))
if query(mgr.actual_session).first():
mgr.validation_failed(
"GBP->AIM validation for L2P not yet implemented")
def _validate_policy_target_groups(self, mgr):
# REVISIT: Implement validation of actual mapping to AIM
# resources.
if mgr.actual_session.query(gpdb.PolicyTargetGroup).first():
query = BAKERY(lambda s: s.query(
gpdb.PolicyTargetGroup))
if query(mgr.actual_session).first():
mgr.validation_failed(
"GBP->AIM validation for PTG not yet implemented")
def _validate_policy_targets(self, mgr):
# REVISIT: Implement validation of actual mapping to AIM
# resources.
if mgr.actual_session.query(gpdb.PolicyTarget).first():
query = BAKERY(lambda s: s.query(
gpdb.PolicyTarget))
if query(mgr.actual_session).first():
mgr.validation_failed(
"GBP->AIM validation for PT not yet implemented")
def _validate_application_policy_groups(self, mgr):
# REVISIT: Implement validation of actual mapping to AIM
# resources.
if mgr.actual_session.query(gpdb.ApplicationPolicyGroup).first():
query = BAKERY(lambda s: s.query(
gpdb.ApplicationPolicyGroup))
if query(mgr.actual_session).first():
mgr.validation_failed(
"GBP->AIM validation for APG not yet implemented")
def _validate_policy_classifiers(self, mgr):
# REVISIT: Implement validation of actual mapping to AIM
# resources.
if mgr.actual_session.query(gpdb.PolicyClassifier).first():
query = BAKERY(lambda s: s.query(
gpdb.PolicyClassifier))
if query(mgr.actual_session).first():
mgr.validation_failed(
"GBP->AIM validation for PC not yet implemented")
def _validate_policy_rule_sets(self, mgr):
# REVISIT: Implement validation of actual mapping to AIM
# resources.
if mgr.actual_session.query(gpdb.PolicyRuleSet).first():
query = BAKERY(lambda s: s.query(
gpdb.PolicyRuleSet))
if query(mgr.actual_session).first():
mgr.validation_failed(
"GBP->AIM validation for PRS not yet implemented")
@ -2552,13 +2603,17 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
# validate_neutron_mapping rather than validate_aim_mapping,
# since external_routes maps to the cisco_apic.EXTERNAL_CIDRS
# network extension.
if mgr.actual_session.query(gpdb.ExternalSegment).first():
query = BAKERY(lambda s: s.query(
gpdb.ExternalSegment))
if query(mgr.actual_session).first():
mgr.validation_failed(
"GBP->AIM validation for ES not yet implemented")
def _validate_external_policies(self, mgr):
# REVISIT: Implement validation of actual mapping to AIM
# resources.
if mgr.actual_session.query(gpdb.ExternalPolicy).first():
query = BAKERY(lambda s: s.query(
gpdb.ExternalPolicy))
if query(mgr.actual_session).first():
mgr.validation_failed(
"GBP->AIM validation for EP not yet implemented")

View File

@ -10,6 +10,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sa
from sqlalchemy.ext import baked
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import api as db_api
@ -31,6 +34,9 @@ from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
LOG = log.getLogger(__name__)
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
"sqlalchemy baked query cache size exceeded in %s" % __name__))
class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
"""RPC mixin for AIM mapping.
@ -264,17 +270,23 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
return
details['security_group'] = []
port_sgs = (context.session.query(sg_models.SecurityGroup.id,
sg_models.SecurityGroup.tenant_id).
filter(sg_models.SecurityGroup.id.
in_(port['security_groups'])).
all())
for sg_id, tenant_id in port_sgs:
tenant_aname = self.aim_mech_driver.name_mapper.project(
context.session, tenant_id)
details['security_group'].append(
{'policy-space': tenant_aname,
'name': sg_id})
if port['security_groups']:
query = BAKERY(lambda s: s.query(
sg_models.SecurityGroup.id,
sg_models.SecurityGroup.tenant_id))
query += lambda q: q.filter(
sg_models.SecurityGroup.id.in_(
sa.bindparam('sg_ids', expanding=True)))
port_sgs = query(context.session).params(
sg_ids=port['security_groups']).all()
for sg_id, tenant_id in port_sgs:
tenant_aname = self.aim_mech_driver.name_mapper.project(
context.session, tenant_id)
details['security_group'].append(
{'policy-space': tenant_aname,
'name': sg_id})
# Always include this SG which has the default arp & dhcp rules
details['security_group'].append(
{'policy-space': 'common',

View File

@ -13,6 +13,7 @@
# under the License.
import sqlalchemy as sa
from sqlalchemy.ext import baked
from sqlalchemy import orm
from neutron.db import api as db_api
@ -26,7 +27,11 @@ from neutron_lib.plugins import directory
LOG = logging.getLogger(__name__)
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
"sqlalchemy baked query cache size exceeded in %s" % __name__))
# REVISIT: Fix the misspelling of 'association'.
class HAIPAddressToPortAssocation(model_base.BASEV2):
"""Port Owner for HA IP Address.
@ -49,23 +54,45 @@ class PortForHAIPAddress(object):
def _get_ha_ipaddress(self, port_id, ipaddress, session=None):
session = session or db_api.get_reader_session()
return session.query(HAIPAddressToPortAssocation).filter_by(
port_id=port_id, ha_ip_address=ipaddress).first()
query = BAKERY(lambda s: s.query(
HAIPAddressToPortAssocation))
query += lambda q: q.filter_by(
port_id=sa.bindparam('port_id'),
ha_ip_address=sa.bindparam('ipaddress'))
return query(session).params(
port_id=port_id, ipaddress=ipaddress).first()
def get_port_for_ha_ipaddress(self, ipaddress, network_id):
"""Returns the Neutron Port ID for the HA IP Addresss."""
session = db_api.get_reader_session()
port_ha_ip = session.query(HAIPAddressToPortAssocation).join(
models_v2.Port).filter(
HAIPAddressToPortAssocation.ha_ip_address == ipaddress).filter(
models_v2.Port.network_id == network_id).first()
query = BAKERY(lambda s: s.query(
HAIPAddressToPortAssocation))
query += lambda q: q.join(
models_v2.Port,
models_v2.Port.id == HAIPAddressToPortAssocation.port_id)
query += lambda q: q.filter(
HAIPAddressToPortAssocation.ha_ip_address ==
sa.bindparam('ipaddress'))
query += lambda q: q.filter(
models_v2.Port.network_id == sa.bindparam('network_id'))
port_ha_ip = query(session).params(
ipaddress=ipaddress, network_id=network_id).first()
return port_ha_ip
def get_ha_ipaddresses_for_port(self, port_id):
"""Returns the HA IP Addressses associated with a Port."""
session = db_api.get_reader_session()
objs = session.query(HAIPAddressToPortAssocation).filter_by(
query = BAKERY(lambda s: s.query(
HAIPAddressToPortAssocation))
query += lambda q: q.filter_by(
port_id=sa.bindparam('port_id'))
objs = query(session).params(
port_id=port_id).all()
return sorted([x['ha_ip_address'] for x in objs])
def set_port_id_for_ha_ipaddress(self, port_id, ipaddress, session=None):
@ -90,6 +117,11 @@ class PortForHAIPAddress(object):
session = session or db_api.get_writer_session()
with session.begin(subtransactions=True):
try:
# REVISIT: Can this query be baked? The
# sqlalchemy.ext.baked.Result class does not have a
# delete() method, and adding delete() to the baked
# query before executing it seems to result in the
# params() not being evaluated.
return session.query(
HAIPAddressToPortAssocation).filter_by(
port_id=port_id,
@ -99,7 +131,10 @@ class PortForHAIPAddress(object):
def get_ha_port_associations(self):
session = db_api.get_reader_session()
return session.query(HAIPAddressToPortAssocation).all()
query = BAKERY(lambda s: s.query(
HAIPAddressToPortAssocation))
return query(session).all()
class HAIPOwnerDbMixin(object):

View File

@ -12,6 +12,8 @@
from neutron_lib.plugins import directory
from oslo_log import log as logging
import sqlalchemy as sa
from sqlalchemy.ext import baked
from gbpservice.neutron.db.grouppolicy.extensions import (
apic_auto_ptg_db as auto_ptg_db)
@ -25,6 +27,9 @@ from gbpservice.neutron.services.grouppolicy import (
LOG = logging.getLogger(__name__)
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
"sqlalchemy baked query cache size exceeded in %s" % __name__))
class AIMExtensionDriver(api.ExtensionDriver,
intra_ptg_db.ApicIntraPtgDBMixin,
@ -53,8 +58,14 @@ class AIMExtensionDriver(api.ExtensionDriver,
def _set_intra_ptg_allow(self, session, data, result):
ptg = data['policy_target_group']
ptg_db = (session.query(gp_db.PolicyTargetGroup)
.filter_by(id=result['id']).one())
query = BAKERY(lambda s: s.query(
gp_db.PolicyTargetGroup))
query += lambda q: q.filter_by(
id=sa.bindparam('id'))
ptg_db = query(session).params(
id=result['id']).one()
if not ptg_db:
raise gpolicy.PolicyTargetGroupNotFound(
policy_target_group_id=result['id'])

View File

@ -19,6 +19,8 @@ from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib.plugins import directory
from oslo_log import log as logging
import sqlalchemy as sa
from sqlalchemy.ext import baked
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import constants
from gbpservice.neutron.services.grouppolicy.common import exceptions as exc
@ -28,6 +30,9 @@ from gbpservice.neutron.services.sfc.aim import exceptions as sfc_exc
LOG = logging.getLogger(__name__)
flowclassifier.SUPPORTED_L7_PARAMETERS.update(sfc_cts.AIM_FLC_L7_PARAMS)
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
"sqlalchemy baked query cache size exceeded in %s" % __name__))
class FlowclassifierAIMDriverBase(base.FlowClassifierDriverBase):
def create_flow_classifier_precommit(self, context):
@ -118,10 +123,16 @@ class FlowclassifierAIMDriver(FlowclassifierAIMDriverBase):
with context.session.begin(subtransactions=True):
classifier_ids = []
for keyword in [sfc_cts.LOGICAL_SRC_NET, sfc_cts.LOGICAL_DST_NET]:
query = BAKERY(lambda s: s.query(
flc_db.L7Parameter))
query += lambda q: q.filter_by(
keyword=sa.bindparam('keyword'))
query += lambda q: q.filter_by(
value=sa.bindparam('network_id'))
classifier_ids.extend(
[x.classifier_id for x in context.session.query(
flc_db.L7Parameter).filter_by(
keyword=keyword).filter_by(value=network_id).all()])
[x.classifier_id for x in query(context.session).params(
keyword=keyword, network_id=network_id).all()])
return classifier_ids
def _handle_network_delete(self, rtype, event, trigger, context,

View File

@ -31,6 +31,8 @@ from neutron_lib.callbacks import registry
from neutron_lib import constants as n_constants
from neutron_lib.plugins import directory
from oslo_log import log as logging
import sqlalchemy as sa
from sqlalchemy.ext import baked
from sqlalchemy import or_
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import apic_mapper
@ -51,6 +53,9 @@ SUPPORTED_DOM_TYPES = [PHYSDOM_TYPE]
DEFAULT_SUBNETS = ['128.0.0.0/1', '0.0.0.0/1', '8000::/1', '::/1']
MAX_PPGS_PER_CHAIN = 3
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
"sqlalchemy baked query cache size exceeded in %s" % __name__))
class SfcAIMDriverBase(base.SfcDriverBase):
def delete_port_pair_group(self, context):
@ -672,18 +677,29 @@ class SfcAIMDriver(SfcAIMDriverBase):
def _get_chains_by_classifier_id(self, plugin_context, flowc_id):
context = plugin_context
with db_api.context_manager.writer.using(context):
chain_ids = [x.portchain_id for x in context.session.query(
sfc_db.ChainClassifierAssoc).filter_by(
flowclassifier_id=flowc_id).all()]
query = BAKERY(lambda s: s.query(
sfc_db.ChainClassifierAssoc))
query += lambda q: q.filter_by(
flowclassifier_id=sa.bindparam('flowc_id'))
chain_ids = [x.portchain_id for x in query(context.session).params(
flowc_id=flowc_id).all()]
return self.sfc_plugin.get_port_chains(plugin_context,
filters={'id': chain_ids})
def _get_chains_by_ppg_ids(self, plugin_context, ppg_ids):
if not ppg_ids:
return []
context = plugin_context
with db_api.context_manager.writer.using(context):
chain_ids = [x.portchain_id for x in context.session.query(
sfc_db.ChainGroupAssoc).filter(
sfc_db.ChainGroupAssoc.portpairgroup_id.in_(ppg_ids)).all()]
query = BAKERY(lambda s: s.query(
sfc_db.ChainGroupAssoc))
query += lambda q: q.filter(
sfc_db.ChainGroupAssoc.portpairgroup_id.in_(
sa.bindparam('ppg_ids', expanding=True)))
chain_ids = [x.portchain_id for x in query(context.session).params(
ppg_ids=ppg_ids).all()]
return self.sfc_plugin.get_port_chains(plugin_context,
filters={'id': chain_ids})
@ -698,14 +714,22 @@ class SfcAIMDriver(SfcAIMDriverBase):
return []
def _get_group_ids_by_network_ids(self, plugin_context, network_ids):
if not network_ids:
return []
session = plugin_context.session
return [
x.portpairgroup_id for x in
session.query(sfc_db.PortPair).join(
models_v2.Port,
or_(models_v2.Port.id == sfc_db.PortPair.ingress,
models_v2.Port.id == sfc_db.PortPair.egress)).filter(
models_v2.Port.network_id.in_(network_ids)).all()]
query = BAKERY(lambda s: s.query(
sfc_db.PortPair))
query += lambda q: q.join(
models_v2.Port,
or_(models_v2.Port.id == sfc_db.PortPair.ingress,
models_v2.Port.id == sfc_db.PortPair.egress))
query += lambda q: q.filter(
models_v2.Port.network_id.in_(
sa.bindparam('network_ids', expanding=True)))
return [x.portpairgroup_id for x in
query(session).params(
network_ids=network_ids).all()]
def _should_regenerate_pp(self, context):
attrs = [INGRESS, EGRESS, 'name']
@ -1034,20 +1058,26 @@ class SfcAIMDriver(SfcAIMDriverBase):
def _validate_flow_classifiers(self, mgr):
# REVISIT: Implement validation of actual mapping to AIM
# resources.
if mgr.actual_session.query(flowc_db.FlowClassifier).first():
query = BAKERY(lambda s: s.query(
flowc_db.FlowClassifier))
if query(mgr.actual_session).first():
mgr.validation_failed(
"SFC->AIM validation for FC not yet implemented")
def _validate_port_pair_groups(self, mgr):
# REVISIT: Implement validation of actual mapping to AIM
# resources.
if mgr.actual_session.query(sfc_db.PortPairGroup).first():
query = BAKERY(lambda s: s.query(
sfc_db.PortPairGroup))
if query(mgr.actual_session).first():
mgr.validation_failed(
"SFC->AIM validation for PPG not yet implemented")
def _validate_port_chains(self, mgr):
# REVISIT: Implement validation of actual mapping to AIM
# resources.
if mgr.actual_session.query(sfc_db.PortChain).first():
query = BAKERY(lambda s: s.query(
sfc_db.PortChain))
if query(mgr.actual_session).first():
mgr.validation_failed(
"SFC->AIM validation for PC not yet implemented")