[AIM] Use baked queries
Use sqlalchemy's "baked query" feature to reduce the CPU time spent building DB queries in the AIM mechanism, policy, extension, and SFC drivers. The GBP plugin's driver-independent queries are not modified in this patch. Change-Id: I5aabb1a9662ec08d6600dc5ad7e173f8ce408df3
This commit is contained in:
parent
e83a9636a6
commit
8342c6cdf7
|
@ -11,7 +11,14 @@
|
|||
# under the License.
|
||||
|
||||
from neutron_lib.db import model_base
|
||||
from oslo_log import log
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext import baked
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
|
||||
"sqlalchemy baked query cache size exceeded in %s" % __name__))
|
||||
|
||||
|
||||
class ApicAllowedVMNameDB(model_base.BASEV2):
|
||||
|
@ -26,15 +33,26 @@ class ApicAllowedVMNameDB(model_base.BASEV2):
|
|||
class ApicAllowedVMNameDBMixin(object):
|
||||
|
||||
def get_l3_policy_allowed_vm_names(self, session, l3_policy_id):
|
||||
rows = (session.query(ApicAllowedVMNameDB).filter_by(
|
||||
l3_policy_id=l3_policy_id).all())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
ApicAllowedVMNameDB))
|
||||
query += lambda q: q.filter_by(
|
||||
l3_policy_id=sa.bindparam('l3_policy_id'))
|
||||
rows = query(session).params(
|
||||
l3_policy_id=l3_policy_id).all()
|
||||
|
||||
return rows
|
||||
|
||||
def get_l3_policy_allowed_vm_name(self, session, l3_policy_id,
|
||||
allowed_vm_name):
|
||||
row = (session.query(ApicAllowedVMNameDB).filter_by(
|
||||
query = BAKERY(lambda s: s.query(
|
||||
ApicAllowedVMNameDB))
|
||||
query += lambda q: q.filter_by(
|
||||
l3_policy_id=sa.bindparam('l3_policy_id'),
|
||||
allowed_vm_name=sa.bindparam('allowed_vm_name'))
|
||||
row = query(session).params(
|
||||
l3_policy_id=l3_policy_id,
|
||||
allowed_vm_name=allowed_vm_name).one())
|
||||
allowed_vm_name=allowed_vm_name).one()
|
||||
|
||||
return row
|
||||
|
||||
def add_l3_policy_allowed_vm_name(self, session, l3_policy_id,
|
||||
|
|
|
@ -11,7 +11,14 @@
|
|||
# under the License.
|
||||
|
||||
from neutron_lib.db import model_base
|
||||
from oslo_log import log
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext import baked
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
|
||||
"sqlalchemy baked query cache size exceeded in %s" % __name__))
|
||||
|
||||
|
||||
class ApicAutoPtgDB(model_base.BASEV2):
|
||||
|
@ -25,8 +32,13 @@ class ApicAutoPtgDB(model_base.BASEV2):
|
|||
class ApicAutoPtgDBMixin(object):
|
||||
|
||||
def get_is_auto_ptg(self, session, policy_target_group_id):
|
||||
row = (session.query(ApicAutoPtgDB).filter_by(
|
||||
policy_target_group_id=policy_target_group_id).one())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
ApicAutoPtgDB))
|
||||
query += lambda q: q.filter_by(
|
||||
policy_target_group_id=sa.bindparam('policy_target_group_id'))
|
||||
row = query(session).params(
|
||||
policy_target_group_id=policy_target_group_id).one()
|
||||
|
||||
return row['is_auto_ptg']
|
||||
|
||||
def set_is_auto_ptg(self, session, policy_target_group_id,
|
||||
|
|
|
@ -11,7 +11,14 @@
|
|||
# under the License.
|
||||
|
||||
from neutron_lib.db import model_base
|
||||
from oslo_log import log
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext import baked
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
|
||||
"sqlalchemy baked query cache size exceeded in %s" % __name__))
|
||||
|
||||
|
||||
class ApicIntraPtgDB(model_base.BASEV2):
|
||||
|
@ -25,15 +32,25 @@ class ApicIntraPtgDB(model_base.BASEV2):
|
|||
class ApicIntraPtgDBMixin(object):
|
||||
|
||||
def get_intra_ptg_allow(self, session, policy_target_group_id):
|
||||
row = (session.query(ApicIntraPtgDB).filter_by(
|
||||
policy_target_group_id=policy_target_group_id).one())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
ApicIntraPtgDB))
|
||||
query += lambda q: q.filter_by(
|
||||
policy_target_group_id=sa.bindparam('policy_target_group_id'))
|
||||
row = query(session).params(
|
||||
policy_target_group_id=policy_target_group_id).one()
|
||||
|
||||
return row['intra_ptg_allow']
|
||||
|
||||
def set_intra_ptg_allow(self, session, policy_target_group_id,
|
||||
intra_ptg_allow=True):
|
||||
with session.begin(subtransactions=True):
|
||||
row = (session.query(ApicIntraPtgDB).filter_by(
|
||||
policy_target_group_id=policy_target_group_id).first())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
ApicIntraPtgDB))
|
||||
query += lambda q: q.filter_by(
|
||||
policy_target_group_id=sa.bindparam('policy_target_group_id'))
|
||||
row = query(session).params(
|
||||
policy_target_group_id=policy_target_group_id).first()
|
||||
|
||||
if not row:
|
||||
row = ApicIntraPtgDB(
|
||||
policy_target_group_id=policy_target_group_id,
|
||||
|
|
|
@ -11,7 +11,14 @@
|
|||
# under the License.
|
||||
|
||||
from neutron_lib.db import model_base
|
||||
from oslo_log import log
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext import baked
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
|
||||
"sqlalchemy baked query cache size exceeded in %s" % __name__))
|
||||
|
||||
|
||||
class ApicReuseBdDB(model_base.BASEV2):
|
||||
|
@ -26,8 +33,15 @@ class ApicReuseBdDB(model_base.BASEV2):
|
|||
class ApicReuseBdDBMixin(object):
|
||||
|
||||
def get_reuse_bd_l2policy(self, session, l2_policy_id):
|
||||
row = (session.query(ApicReuseBdDB).filter_by(
|
||||
l2_policy_id=l2_policy_id).first())
|
||||
# REVISIT: This method is not executed in any unit test.
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
ApicReuseBdDB))
|
||||
query += lambda q: q.filter_by(
|
||||
l2_policy_id=sa.bindparam('l2_policy_id'))
|
||||
row = query(session).params(
|
||||
l2_policy_id=l2_policy_id).first()
|
||||
|
||||
return row
|
||||
|
||||
def add_reuse_bd_l2policy(self, session, l2_policy_id,
|
||||
|
@ -38,5 +52,11 @@ class ApicReuseBdDBMixin(object):
|
|||
session.add(row)
|
||||
|
||||
def is_reuse_bd_target(self, session, l2_policy_id):
|
||||
return (session.query(ApicReuseBdDB).filter_by(
|
||||
target_l2_policy_id=l2_policy_id).first() is not None)
|
||||
# REVISIT: This method is not executed in any unit test.
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
ApicReuseBdDB))
|
||||
query += lambda q: q.filter_by(
|
||||
target_l2_policy_id=sa.bindparam('l2_policy_id'))
|
||||
return query(session).params(
|
||||
l2_policy_id=l2_policy_id).first() is not None
|
||||
|
|
|
@ -11,7 +11,14 @@
|
|||
# under the License.
|
||||
|
||||
from neutron_lib.db import model_base
|
||||
from oslo_log import log
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext import baked
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
|
||||
"sqlalchemy baked query cache size exceeded in %s" % __name__))
|
||||
|
||||
|
||||
class ApicSegmentationLabelDB(model_base.BASEV2):
|
||||
|
@ -25,15 +32,26 @@ class ApicSegmentationLabelDB(model_base.BASEV2):
|
|||
class ApicSegmentationLabelDBMixin(object):
|
||||
|
||||
def get_policy_target_segmentation_labels(self, session, policy_target_id):
|
||||
rows = (session.query(ApicSegmentationLabelDB).filter_by(
|
||||
policy_target_id=policy_target_id).all())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
ApicSegmentationLabelDB))
|
||||
query += lambda q: q.filter_by(
|
||||
policy_target_id=sa.bindparam('policy_target_id'))
|
||||
rows = query(session).params(
|
||||
policy_target_id=policy_target_id).all()
|
||||
|
||||
return rows
|
||||
|
||||
def get_policy_target_segmentation_label(self, session, policy_target_id,
|
||||
segmentation_label):
|
||||
row = (session.query(ApicSegmentationLabelDB).filter_by(
|
||||
query = BAKERY(lambda s: s.query(
|
||||
ApicSegmentationLabelDB))
|
||||
query += lambda q: q.filter_by(
|
||||
policy_target_id=sa.bindparam('policy_target_id'),
|
||||
segmentation_label=sa.bindparam('segmentation_label'))
|
||||
row = query(session).params(
|
||||
policy_target_id=policy_target_id,
|
||||
segmentation_label=segmentation_label).one())
|
||||
segmentation_label=segmentation_label).one()
|
||||
|
||||
return row
|
||||
|
||||
def add_policy_target_segmentation_label(self, session, policy_target_id,
|
||||
|
|
|
@ -17,10 +17,18 @@ from aim.api import resource as aim_resource
|
|||
from neutron.db.models import address_scope as as_db
|
||||
from neutron.db import models_v2
|
||||
from neutron_lib.db import model_base
|
||||
from oslo_log import log
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext import baked
|
||||
from sqlalchemy import orm
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
|
||||
"sqlalchemy baked query cache size exceeded in %s" % __name__))
|
||||
|
||||
|
||||
class AddressScopeMapping(model_base.BASEV2):
|
||||
__tablename__ = 'apic_aim_address_scope_mappings'
|
||||
|
||||
|
@ -88,33 +96,49 @@ class DbMixin(object):
|
|||
# within the same transaction tries to access its
|
||||
# aim_mapping relationship after retrieving the
|
||||
# AddressScope record from the session cache.
|
||||
scope = (session.query(as_db.AddressScope).
|
||||
filter_by(id=scope_id).
|
||||
one_or_none())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
as_db.AddressScope))
|
||||
query += lambda q: q.filter_by(
|
||||
id=sa.bindparam('scope_id'))
|
||||
scope = query(session).params(
|
||||
scope_id=scope_id).one_or_none()
|
||||
|
||||
scope.aim_mapping = mapping
|
||||
return mapping
|
||||
|
||||
def _get_address_scope_mapping(self, session, scope_id):
|
||||
return (session.query(AddressScopeMapping).
|
||||
filter_by(scope_id=scope_id).
|
||||
one_or_none())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
AddressScopeMapping))
|
||||
query += lambda q: q.filter_by(
|
||||
scope_id=sa.bindparam('scope_id'))
|
||||
return query(session).params(
|
||||
scope_id=scope_id).one_or_none()
|
||||
|
||||
def _get_address_scope_mappings_for_vrf(self, session, vrf):
|
||||
return (session.query(AddressScopeMapping).
|
||||
filter_by(vrf_tenant_name=vrf.tenant_name,
|
||||
vrf_name=vrf.name).
|
||||
all())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
AddressScopeMapping))
|
||||
query += lambda q: q.filter_by(
|
||||
vrf_tenant_name=sa.bindparam('tenant_name'),
|
||||
vrf_name=sa.bindparam('name'))
|
||||
return query(session).params(
|
||||
tenant_name=vrf.tenant_name,
|
||||
name=vrf.name).all()
|
||||
|
||||
def _get_address_scopes_owning_vrf(self, session, vrf):
|
||||
return (session.query(as_db.AddressScope).
|
||||
join(AddressScopeMapping,
|
||||
AddressScopeMapping.scope_id == as_db.AddressScope.id).
|
||||
filter(AddressScopeMapping.vrf_tenant_name ==
|
||||
vrf.tenant_name,
|
||||
AddressScopeMapping.vrf_name == vrf.name,
|
||||
AddressScopeMapping.vrf_owned).
|
||||
order_by(as_db.AddressScope.ip_version).
|
||||
all())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
as_db.AddressScope))
|
||||
query += lambda q: q.join(
|
||||
AddressScopeMapping,
|
||||
AddressScopeMapping.scope_id == as_db.AddressScope.id)
|
||||
query += lambda q: q.filter(
|
||||
AddressScopeMapping.vrf_tenant_name == sa.bindparam('tenant_name'),
|
||||
AddressScopeMapping.vrf_name == sa.bindparam('name'),
|
||||
AddressScopeMapping.vrf_owned)
|
||||
query += lambda q: q.order_by(
|
||||
as_db.AddressScope.ip_version)
|
||||
return query(session).params(
|
||||
tenant_name=vrf.tenant_name,
|
||||
name=vrf.name).all()
|
||||
|
||||
def _get_address_scope_vrf(self, mapping):
|
||||
return aim_resource.VRF(
|
||||
|
@ -149,38 +173,67 @@ class DbMixin(object):
|
|||
# transaction tries to access its aim_mapping relationship
|
||||
# after retrieving the Network record from the session
|
||||
# cache.
|
||||
net = (session.query(models_v2.Network).
|
||||
filter_by(id=network_id).
|
||||
one_or_none())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
models_v2.Network))
|
||||
query += lambda q: q.filter_by(
|
||||
id=sa.bindparam('network_id'))
|
||||
net = query(session).params(
|
||||
network_id=network_id).one_or_none()
|
||||
|
||||
net.aim_mapping = mapping
|
||||
return mapping
|
||||
|
||||
def _get_network_mapping(self, session, network_id):
|
||||
return (session.query(NetworkMapping).
|
||||
filter_by(network_id=network_id).
|
||||
one_or_none())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
NetworkMapping))
|
||||
query += lambda q: q.filter_by(
|
||||
network_id=sa.bindparam('network_id'))
|
||||
return query(session).params(
|
||||
network_id=network_id).one_or_none()
|
||||
|
||||
def _get_network_mapping_bulk(self, session, network_ids):
|
||||
return session.query(NetworkMapping).filter(
|
||||
NetworkMapping.network_id.in_(network_ids)).all()
|
||||
# REVISIT: This method is not called during any UT, and does
|
||||
# not appear to be referenced elsewhere in this repository.
|
||||
if not network_ids:
|
||||
return []
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
NetworkMapping))
|
||||
query += lambda q: q.filter(
|
||||
NetworkMapping.network_id.in_(
|
||||
sa.bindparam('network_ids', expanding=True)))
|
||||
return query(session).params(
|
||||
network_ids=network_ids).all()
|
||||
|
||||
def _get_network_mappings_for_vrf(self, session, vrf):
|
||||
return (session.query(NetworkMapping).
|
||||
filter_by(vrf_tenant_name=vrf.tenant_name,
|
||||
vrf_name=vrf.name).
|
||||
all())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
NetworkMapping))
|
||||
query += lambda q: q.filter_by(
|
||||
vrf_tenant_name=sa.bindparam('vrf_tenant_name'),
|
||||
vrf_name=sa.bindparam('vrf_name'))
|
||||
return query(session).params(
|
||||
vrf_tenant_name=vrf.tenant_name,
|
||||
vrf_name=vrf.name).all()
|
||||
|
||||
def _get_network_mappings_for_bd(self, session, bd):
|
||||
return (session.query(NetworkMapping).
|
||||
filter_by(bd_tenant_name=bd.tenant_name,
|
||||
bd_name=bd.name).
|
||||
all())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
NetworkMapping))
|
||||
query += lambda q: q.filter_by(
|
||||
bd_tenant_name=sa.bindparam('bd_tenant_name'),
|
||||
bd_name=sa.bindparam('bd_name'))
|
||||
return query(session).params(
|
||||
bd_tenant_name=bd.tenant_name,
|
||||
bd_name=bd.name).all()
|
||||
|
||||
def _is_vrf_used_by_networks(self, session, vrf):
|
||||
return (session.query(NetworkMapping.network_id).
|
||||
filter_by(vrf_tenant_name=vrf.tenant_name,
|
||||
vrf_name=vrf.name).
|
||||
first() is not None)
|
||||
query = BAKERY(lambda s: s.query(
|
||||
NetworkMapping.network_id))
|
||||
query += lambda q: q.filter_by(
|
||||
vrf_tenant_name=sa.bindparam('vrf_tenant_name'),
|
||||
vrf_name=sa.bindparam('vrf_name'))
|
||||
return query(session).params(
|
||||
vrf_tenant_name=vrf.tenant_name,
|
||||
vrf_name=vrf.name).first() is not None
|
||||
|
||||
def _get_network_bd(self, mapping):
|
||||
return aim_resource.BridgeDomain(
|
||||
|
|
|
@ -15,13 +15,20 @@
|
|||
|
||||
from neutron.db import models_v2
|
||||
from neutron_lib.db import model_base
|
||||
from oslo_log import log
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext import baked
|
||||
from sqlalchemy import orm
|
||||
from sqlalchemy.sql.expression import true
|
||||
|
||||
from gbpservice.neutron.extensions import cisco_apic
|
||||
from gbpservice.neutron.extensions import cisco_apic_l3
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
|
||||
"sqlalchemy baked query cache size exceeded in %s" % __name__))
|
||||
|
||||
|
||||
class NetworkExtensionDb(model_base.BASEV2):
|
||||
|
||||
|
@ -115,14 +122,33 @@ class ExtensionDbMixin(object):
|
|||
network_id, {})
|
||||
|
||||
def get_network_extn_db_bulk(self, session, network_ids):
|
||||
db_objs = (session.query(NetworkExtensionDb).filter(
|
||||
NetworkExtensionDb.network_id.in_(network_ids)).all())
|
||||
db_cidrs = (session.query(NetworkExtensionCidrDb).filter(
|
||||
NetworkExtensionCidrDb.network_id.in_(network_ids)).all())
|
||||
db_vlans = (session.query(
|
||||
NetworkExtNestedDomainAllowedVlansDb).filter(
|
||||
if not network_ids:
|
||||
return {}
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
NetworkExtensionDb))
|
||||
query += lambda q: q.filter(
|
||||
NetworkExtensionDb.network_id.in_(
|
||||
sa.bindparam('network_ids', expanding=True)))
|
||||
db_objs = query(session).params(
|
||||
network_ids=network_ids).all()
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
NetworkExtensionCidrDb))
|
||||
query += lambda q: q.filter(
|
||||
NetworkExtensionCidrDb.network_id.in_(
|
||||
sa.bindparam('network_ids', expanding=True)))
|
||||
db_cidrs = query(session).params(
|
||||
network_ids=network_ids).all()
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
NetworkExtNestedDomainAllowedVlansDb))
|
||||
query += lambda q: q.filter(
|
||||
NetworkExtNestedDomainAllowedVlansDb.network_id.in_(
|
||||
network_ids)).all())
|
||||
sa.bindparam('network_ids', expanding=True)))
|
||||
db_vlans = query(session).params(
|
||||
network_ids=network_ids).all()
|
||||
|
||||
cidrs_by_net_id = {}
|
||||
vlans_by_net_id = {}
|
||||
for db_cidr in db_cidrs:
|
||||
|
@ -169,8 +195,13 @@ class ExtensionDbMixin(object):
|
|||
|
||||
def set_network_extn_db(self, session, network_id, res_dict):
|
||||
with session.begin(subtransactions=True):
|
||||
db_obj = (session.query(NetworkExtensionDb).filter_by(
|
||||
network_id=network_id).first())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
NetworkExtensionDb))
|
||||
query += lambda q: q.filter_by(
|
||||
network_id=sa.bindparam('network_id'))
|
||||
db_obj = query(session).params(
|
||||
network_id=network_id).first()
|
||||
|
||||
db_obj = db_obj or NetworkExtensionDb(network_id=network_id)
|
||||
if cisco_apic.EXTERNAL_NETWORK in res_dict:
|
||||
db_obj['external_network_dn'] = (
|
||||
|
@ -214,40 +245,71 @@ class ExtensionDbMixin(object):
|
|||
network_id=network_id)
|
||||
|
||||
def get_network_ids_by_ext_net_dn(self, session, dn, lock_update=False):
|
||||
ids = session.query(NetworkExtensionDb.network_id).filter_by(
|
||||
external_network_dn=dn)
|
||||
query = BAKERY(lambda s: s.query(
|
||||
NetworkExtensionDb.network_id))
|
||||
query += lambda q: q.filter_by(
|
||||
external_network_dn=sa.bindparam('dn'))
|
||||
if lock_update:
|
||||
ids = ids.with_lockmode('update')
|
||||
# REVISIT: Eliminate locking.
|
||||
query += lambda q: q.with_lockmode('update')
|
||||
ids = query(session).params(dn=dn)
|
||||
|
||||
return [i[0] for i in ids]
|
||||
|
||||
def get_network_ids_by_l3out_dn(self, session, dn, lock_update=False):
|
||||
ids = session.query(NetworkExtensionDb.network_id).filter(
|
||||
NetworkExtensionDb.external_network_dn.like(dn + "/%"))
|
||||
query = BAKERY(lambda s: s.query(
|
||||
NetworkExtensionDb.network_id))
|
||||
query += lambda q: q.filter(
|
||||
NetworkExtensionDb.external_network_dn.like(
|
||||
sa.bindparam('dn') + "/%"))
|
||||
if lock_update:
|
||||
ids = ids.with_lockmode('update')
|
||||
# REVISIT: Eliminate locking.
|
||||
query += lambda q: q.with_lockmode('update')
|
||||
ids = query(session).params(dn=dn)
|
||||
|
||||
return [i[0] for i in ids]
|
||||
|
||||
def get_svi_network_ids_by_l3out_dn(self, session, dn, lock_update=False):
|
||||
ids = session.query(NetworkExtensionDb.network_id).filter(
|
||||
NetworkExtensionDb.external_network_dn.like(dn + "/%"),
|
||||
query = BAKERY(lambda s: s.query(
|
||||
NetworkExtensionDb.network_id))
|
||||
query += lambda q: q.filter(
|
||||
NetworkExtensionDb.external_network_dn.like(
|
||||
sa.bindparam('dn') + "/%"),
|
||||
NetworkExtensionDb.svi == true())
|
||||
if lock_update:
|
||||
ids = ids.with_lockmode('update')
|
||||
# REVISIT: Eliminate locking.
|
||||
query += lambda q: q.with_lockmode('update')
|
||||
ids = query(session).params(dn=dn)
|
||||
|
||||
return [i[0] for i in ids]
|
||||
|
||||
def get_external_cidrs_by_ext_net_dn(self, session, dn, lock_update=False):
|
||||
ctab = NetworkExtensionCidrDb
|
||||
ntab = NetworkExtensionDb
|
||||
cidrs = session.query(ctab.cidr).join(
|
||||
ntab, ntab.network_id == ctab.network_id).filter(
|
||||
ntab.external_network_dn == dn).distinct()
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
ctab.cidr))
|
||||
query += lambda q: q.join(
|
||||
ntab,
|
||||
ntab.network_id == ctab.network_id)
|
||||
query += lambda q: q.filter(
|
||||
ntab.external_network_dn == sa.bindparam('dn'))
|
||||
query += lambda q: q.distinct()
|
||||
if lock_update:
|
||||
cidrs = cidrs.with_lockmode('update')
|
||||
# REVISIT: Eliminate locking.
|
||||
query += lambda q: q.with_lockmode('update')
|
||||
cidrs = query(session).params(dn=dn)
|
||||
|
||||
return [c[0] for c in cidrs]
|
||||
|
||||
def get_subnet_extn_db(self, session, subnet_id):
|
||||
db_obj = (session.query(SubnetExtensionDb).filter_by(
|
||||
subnet_id=subnet_id).first())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
SubnetExtensionDb))
|
||||
query += lambda q: q.filter_by(
|
||||
subnet_id=sa.bindparam('subnet_id'))
|
||||
db_obj = query(session).params(
|
||||
subnet_id=subnet_id).first()
|
||||
|
||||
result = {}
|
||||
if db_obj:
|
||||
self._set_if_not_none(result, cisco_apic.SNAT_HOST_POOL,
|
||||
|
@ -255,16 +317,26 @@ class ExtensionDbMixin(object):
|
|||
return result
|
||||
|
||||
def set_subnet_extn_db(self, session, subnet_id, res_dict):
|
||||
db_obj = (session.query(SubnetExtensionDb).filter_by(
|
||||
subnet_id=subnet_id).first())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
SubnetExtensionDb))
|
||||
query += lambda q: q.filter_by(
|
||||
subnet_id=sa.bindparam('subnet_id'))
|
||||
db_obj = query(session).params(
|
||||
subnet_id=subnet_id).first()
|
||||
|
||||
db_obj = db_obj or SubnetExtensionDb(subnet_id=subnet_id)
|
||||
if cisco_apic.SNAT_HOST_POOL in res_dict:
|
||||
db_obj['snat_host_pool'] = res_dict[cisco_apic.SNAT_HOST_POOL]
|
||||
session.add(db_obj)
|
||||
|
||||
def get_router_extn_db(self, session, router_id):
|
||||
db_contracts = (session.query(RouterExtensionContractDb).filter_by(
|
||||
router_id=router_id).all())
|
||||
query = BAKERY(lambda s: s.query(
|
||||
RouterExtensionContractDb))
|
||||
query += lambda q: q.filter_by(
|
||||
router_id=sa.bindparam('router_id'))
|
||||
db_contracts = query(session).params(
|
||||
router_id=router_id).all()
|
||||
|
||||
return {cisco_apic_l3.EXTERNAL_PROVIDED_CONTRACTS:
|
||||
[c['contract_name'] for c in db_contracts if c['provides']],
|
||||
cisco_apic_l3.EXTERNAL_CONSUMED_CONTRACTS:
|
||||
|
@ -275,7 +347,10 @@ class ExtensionDbMixin(object):
|
|||
new_values, **filters):
|
||||
if new_values is None:
|
||||
return
|
||||
|
||||
# REVISIT: Can this query be baked?
|
||||
rows = session.query(db_model).filter_by(**filters).all()
|
||||
|
||||
new_values = set(new_values)
|
||||
for r in rows:
|
||||
if r[column] in new_values:
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -13,6 +13,8 @@
|
|||
import hashlib
|
||||
import re
|
||||
import six
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext import baked
|
||||
|
||||
from aim import aim_manager
|
||||
from aim.api import resource as aim_resource
|
||||
|
@ -60,6 +62,10 @@ from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
|
|||
from gbpservice.neutron.services.grouppolicy import plugin as gbp_plugin
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
|
||||
"sqlalchemy baked query cache size exceeded in %s" % __name__))
|
||||
|
||||
FORWARD = 'Forward'
|
||||
REVERSE = 'Reverse'
|
||||
FILTER_DIRECTIONS = {FORWARD: False, REVERSE: True}
|
||||
|
@ -1122,16 +1128,26 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
|
|||
# the gbp_obj here should be a ptg
|
||||
l2p_id = gbp_obj['l2_policy_id']
|
||||
if l2p_id:
|
||||
l2p_db = session.query(
|
||||
gpmdb.L2PolicyMapping).filter_by(id=l2p_id).first()
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpmdb.L2PolicyMapping))
|
||||
query += lambda q: q.filter_by(
|
||||
id=sa.bindparam('l2p_id'))
|
||||
l2p_db = query(session).params(
|
||||
l2p_id=l2p_id).first()
|
||||
|
||||
l3p_id = l2p_db['l3_policy_id']
|
||||
elif aim_resource_class.__name__ == (
|
||||
aim_resource.BridgeDomain.__name__):
|
||||
# the gbp_obj here should be a l2p
|
||||
l3p_id = gbp_obj['l3_policy_id']
|
||||
if l3p_id:
|
||||
l3p_db = session.query(
|
||||
gpmdb.L3PolicyMapping).filter_by(id=l3p_id).first()
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpmdb.L3PolicyMapping))
|
||||
query += lambda q: q.filter_by(
|
||||
id=sa.bindparam('l3p_id'))
|
||||
l3p_db = query(session).params(
|
||||
l3p_id=l3p_id).first()
|
||||
|
||||
tenant_id = l3p_db['tenant_id']
|
||||
tenant_name = self.name_mapper.project(session, tenant_id)
|
||||
LOG.debug("Mapped tenant_id %(id)s to %(apic_name)s",
|
||||
|
@ -2337,11 +2353,21 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
|
|||
context = gbp_utils.get_current_context()
|
||||
# get_network can do a DB write, hence we use a writer
|
||||
with db_api.context_manager.writer.using(context):
|
||||
ptg_db = session.query(gpmdb.PolicyTargetGroupMapping).filter_by(
|
||||
id=ptg_id).first()
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpmdb.PolicyTargetGroupMapping))
|
||||
query += lambda q: q.filter_by(
|
||||
id=sa.bindparam('ptg_id'))
|
||||
ptg_db = query(session).params(
|
||||
ptg_id=ptg_id).first()
|
||||
|
||||
if ptg_db and self._is_auto_ptg(ptg_db):
|
||||
l2p_db = session.query(gpmdb.L2PolicyMapping).filter_by(
|
||||
id=ptg_db['l2_policy_id']).first()
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpmdb.L2PolicyMapping))
|
||||
query += lambda q: q.filter_by(
|
||||
id=sa.bindparam('l2p_id'))
|
||||
l2p_db = query(session).params(
|
||||
l2p_id=ptg_db['l2_policy_id']).first()
|
||||
|
||||
network_id = l2p_db['network_id']
|
||||
admin_context = n_context.get_admin_context()
|
||||
net = self._get_network(admin_context, network_id)
|
||||
|
@ -2392,17 +2418,24 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
|
|||
self._handle_network_service_policy(context)
|
||||
|
||||
def _get_prss_for_policy_rules(self, context, pr_ids):
|
||||
if not pr_ids:
|
||||
return []
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpdb.PolicyRuleSet))
|
||||
query += lambda q: q.join(
|
||||
gpdb.PRSToPRAssociation,
|
||||
gpdb.PRSToPRAssociation.policy_rule_set_id ==
|
||||
gpdb.PolicyRuleSet.id)
|
||||
query += lambda q: q.join(
|
||||
gpdb.PolicyRule,
|
||||
gpdb.PRSToPRAssociation.policy_rule_id == gpdb.PolicyRule.id)
|
||||
query += lambda q: q.filter(
|
||||
gpdb.PolicyRule.id.in_(sa.bindparam('pr_ids', expanding=True)))
|
||||
return [self._get_policy_rule_set(
|
||||
context._plugin_context, x['id']) for x in (
|
||||
context._plugin_context.session.query(
|
||||
gpdb.PolicyRuleSet).join(
|
||||
gpdb.PRSToPRAssociation,
|
||||
gpdb.PRSToPRAssociation.policy_rule_set_id ==
|
||||
gpdb.PolicyRuleSet.id).join(
|
||||
gpdb.PolicyRule,
|
||||
gpdb.PRSToPRAssociation.policy_rule_id ==
|
||||
gpdb.PolicyRule.id).filter(
|
||||
gpdb.PolicyRule.id.in_(pr_ids)).all())]
|
||||
query(context._plugin_context.session).params(
|
||||
pr_ids=pr_ids).all())]
|
||||
|
||||
def _get_port_mtu(self, context, port):
|
||||
if self.advertise_mtu:
|
||||
|
@ -2441,7 +2474,11 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
|
|||
aim_ctx = aim_context.AimContext(session)
|
||||
contract_name_prefix = alib.get_service_contract_filter_entries(
|
||||
).keys()[0]
|
||||
l3ps = session.query(gpmdb.L3PolicyMapping).all()
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpmdb.L3PolicyMapping))
|
||||
l3ps = query(session).all()
|
||||
|
||||
name_mapper = apic_mapper.APICNameMapper()
|
||||
aim_mgr = aim_manager.AimManager()
|
||||
self._aim = aim_mgr
|
||||
|
@ -2500,49 +2537,63 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
|
|||
def _validate_l3_policies(self, mgr):
|
||||
# REVISIT: Implement validation of actual mapping to AIM
|
||||
# resources.
|
||||
if mgr.actual_session.query(gpdb.L3Policy).first():
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpdb.L3Policy))
|
||||
if query(mgr.actual_session).first():
|
||||
mgr.validation_failed(
|
||||
"GBP->AIM validation for L3P not yet implemented")
|
||||
|
||||
def _validate_l2_policies(self, mgr):
|
||||
# REVISIT: Implement validation of actual mapping to AIM
|
||||
# resources.
|
||||
if mgr.actual_session.query(gpdb.L2Policy).first():
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpdb.L2Policy))
|
||||
if query(mgr.actual_session).first():
|
||||
mgr.validation_failed(
|
||||
"GBP->AIM validation for L2P not yet implemented")
|
||||
|
||||
def _validate_policy_target_groups(self, mgr):
|
||||
# REVISIT: Implement validation of actual mapping to AIM
|
||||
# resources.
|
||||
if mgr.actual_session.query(gpdb.PolicyTargetGroup).first():
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpdb.PolicyTargetGroup))
|
||||
if query(mgr.actual_session).first():
|
||||
mgr.validation_failed(
|
||||
"GBP->AIM validation for PTG not yet implemented")
|
||||
|
||||
def _validate_policy_targets(self, mgr):
|
||||
# REVISIT: Implement validation of actual mapping to AIM
|
||||
# resources.
|
||||
if mgr.actual_session.query(gpdb.PolicyTarget).first():
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpdb.PolicyTarget))
|
||||
if query(mgr.actual_session).first():
|
||||
mgr.validation_failed(
|
||||
"GBP->AIM validation for PT not yet implemented")
|
||||
|
||||
def _validate_application_policy_groups(self, mgr):
|
||||
# REVISIT: Implement validation of actual mapping to AIM
|
||||
# resources.
|
||||
if mgr.actual_session.query(gpdb.ApplicationPolicyGroup).first():
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpdb.ApplicationPolicyGroup))
|
||||
if query(mgr.actual_session).first():
|
||||
mgr.validation_failed(
|
||||
"GBP->AIM validation for APG not yet implemented")
|
||||
|
||||
def _validate_policy_classifiers(self, mgr):
|
||||
# REVISIT: Implement validation of actual mapping to AIM
|
||||
# resources.
|
||||
if mgr.actual_session.query(gpdb.PolicyClassifier).first():
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpdb.PolicyClassifier))
|
||||
if query(mgr.actual_session).first():
|
||||
mgr.validation_failed(
|
||||
"GBP->AIM validation for PC not yet implemented")
|
||||
|
||||
def _validate_policy_rule_sets(self, mgr):
|
||||
# REVISIT: Implement validation of actual mapping to AIM
|
||||
# resources.
|
||||
if mgr.actual_session.query(gpdb.PolicyRuleSet).first():
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpdb.PolicyRuleSet))
|
||||
if query(mgr.actual_session).first():
|
||||
mgr.validation_failed(
|
||||
"GBP->AIM validation for PRS not yet implemented")
|
||||
|
||||
|
@ -2552,13 +2603,17 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
|
|||
# validate_neutron_mapping rather than validate_aim_mapping,
|
||||
# since external_routes maps to the cisco_apic.EXTERNAL_CIDRS
|
||||
# network extension.
|
||||
if mgr.actual_session.query(gpdb.ExternalSegment).first():
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpdb.ExternalSegment))
|
||||
if query(mgr.actual_session).first():
|
||||
mgr.validation_failed(
|
||||
"GBP->AIM validation for ES not yet implemented")
|
||||
|
||||
def _validate_external_policies(self, mgr):
|
||||
# REVISIT: Implement validation of actual mapping to AIM
|
||||
# resources.
|
||||
if mgr.actual_session.query(gpdb.ExternalPolicy).first():
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gpdb.ExternalPolicy))
|
||||
if query(mgr.actual_session).first():
|
||||
mgr.validation_failed(
|
||||
"GBP->AIM validation for EP not yet implemented")
|
||||
|
|
|
@ -10,6 +10,9 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext import baked
|
||||
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.db import api as db_api
|
||||
|
@ -31,6 +34,9 @@ from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
|
|||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
|
||||
"sqlalchemy baked query cache size exceeded in %s" % __name__))
|
||||
|
||||
|
||||
class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
|
||||
"""RPC mixin for AIM mapping.
|
||||
|
@ -264,17 +270,23 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
|
|||
return
|
||||
details['security_group'] = []
|
||||
|
||||
port_sgs = (context.session.query(sg_models.SecurityGroup.id,
|
||||
sg_models.SecurityGroup.tenant_id).
|
||||
filter(sg_models.SecurityGroup.id.
|
||||
in_(port['security_groups'])).
|
||||
all())
|
||||
for sg_id, tenant_id in port_sgs:
|
||||
tenant_aname = self.aim_mech_driver.name_mapper.project(
|
||||
context.session, tenant_id)
|
||||
details['security_group'].append(
|
||||
{'policy-space': tenant_aname,
|
||||
'name': sg_id})
|
||||
if port['security_groups']:
|
||||
query = BAKERY(lambda s: s.query(
|
||||
sg_models.SecurityGroup.id,
|
||||
sg_models.SecurityGroup.tenant_id))
|
||||
query += lambda q: q.filter(
|
||||
sg_models.SecurityGroup.id.in_(
|
||||
sa.bindparam('sg_ids', expanding=True)))
|
||||
port_sgs = query(context.session).params(
|
||||
sg_ids=port['security_groups']).all()
|
||||
|
||||
for sg_id, tenant_id in port_sgs:
|
||||
tenant_aname = self.aim_mech_driver.name_mapper.project(
|
||||
context.session, tenant_id)
|
||||
details['security_group'].append(
|
||||
{'policy-space': tenant_aname,
|
||||
'name': sg_id})
|
||||
|
||||
# Always include this SG which has the default arp & dhcp rules
|
||||
details['security_group'].append(
|
||||
{'policy-space': 'common',
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext import baked
|
||||
from sqlalchemy import orm
|
||||
|
||||
from neutron.db import api as db_api
|
||||
|
@ -26,7 +27,11 @@ from neutron_lib.plugins import directory
|
|||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
|
||||
"sqlalchemy baked query cache size exceeded in %s" % __name__))
|
||||
|
||||
|
||||
# REVISIT: Fix the misspelling of 'association'.
|
||||
class HAIPAddressToPortAssocation(model_base.BASEV2):
|
||||
|
||||
"""Port Owner for HA IP Address.
|
||||
|
@ -49,23 +54,45 @@ class PortForHAIPAddress(object):
|
|||
|
||||
def _get_ha_ipaddress(self, port_id, ipaddress, session=None):
|
||||
session = session or db_api.get_reader_session()
|
||||
return session.query(HAIPAddressToPortAssocation).filter_by(
|
||||
port_id=port_id, ha_ip_address=ipaddress).first()
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
HAIPAddressToPortAssocation))
|
||||
query += lambda q: q.filter_by(
|
||||
port_id=sa.bindparam('port_id'),
|
||||
ha_ip_address=sa.bindparam('ipaddress'))
|
||||
return query(session).params(
|
||||
port_id=port_id, ipaddress=ipaddress).first()
|
||||
|
||||
def get_port_for_ha_ipaddress(self, ipaddress, network_id):
|
||||
"""Returns the Neutron Port ID for the HA IP Addresss."""
|
||||
session = db_api.get_reader_session()
|
||||
port_ha_ip = session.query(HAIPAddressToPortAssocation).join(
|
||||
models_v2.Port).filter(
|
||||
HAIPAddressToPortAssocation.ha_ip_address == ipaddress).filter(
|
||||
models_v2.Port.network_id == network_id).first()
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
HAIPAddressToPortAssocation))
|
||||
query += lambda q: q.join(
|
||||
models_v2.Port,
|
||||
models_v2.Port.id == HAIPAddressToPortAssocation.port_id)
|
||||
query += lambda q: q.filter(
|
||||
HAIPAddressToPortAssocation.ha_ip_address ==
|
||||
sa.bindparam('ipaddress'))
|
||||
query += lambda q: q.filter(
|
||||
models_v2.Port.network_id == sa.bindparam('network_id'))
|
||||
port_ha_ip = query(session).params(
|
||||
ipaddress=ipaddress, network_id=network_id).first()
|
||||
|
||||
return port_ha_ip
|
||||
|
||||
def get_ha_ipaddresses_for_port(self, port_id):
|
||||
"""Returns the HA IP Addressses associated with a Port."""
|
||||
session = db_api.get_reader_session()
|
||||
objs = session.query(HAIPAddressToPortAssocation).filter_by(
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
HAIPAddressToPortAssocation))
|
||||
query += lambda q: q.filter_by(
|
||||
port_id=sa.bindparam('port_id'))
|
||||
objs = query(session).params(
|
||||
port_id=port_id).all()
|
||||
|
||||
return sorted([x['ha_ip_address'] for x in objs])
|
||||
|
||||
def set_port_id_for_ha_ipaddress(self, port_id, ipaddress, session=None):
|
||||
|
@ -90,6 +117,11 @@ class PortForHAIPAddress(object):
|
|||
session = session or db_api.get_writer_session()
|
||||
with session.begin(subtransactions=True):
|
||||
try:
|
||||
# REVISIT: Can this query be baked? The
|
||||
# sqlalchemy.ext.baked.Result class does not have a
|
||||
# delete() method, and adding delete() to the baked
|
||||
# query before executing it seems to result in the
|
||||
# params() not being evaluated.
|
||||
return session.query(
|
||||
HAIPAddressToPortAssocation).filter_by(
|
||||
port_id=port_id,
|
||||
|
@ -99,7 +131,10 @@ class PortForHAIPAddress(object):
|
|||
|
||||
def get_ha_port_associations(self):
|
||||
session = db_api.get_reader_session()
|
||||
return session.query(HAIPAddressToPortAssocation).all()
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
HAIPAddressToPortAssocation))
|
||||
return query(session).all()
|
||||
|
||||
|
||||
class HAIPOwnerDbMixin(object):
|
||||
|
|
|
@ -12,6 +12,8 @@
|
|||
|
||||
from neutron_lib.plugins import directory
|
||||
from oslo_log import log as logging
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext import baked
|
||||
|
||||
from gbpservice.neutron.db.grouppolicy.extensions import (
|
||||
apic_auto_ptg_db as auto_ptg_db)
|
||||
|
@ -25,6 +27,9 @@ from gbpservice.neutron.services.grouppolicy import (
|
|||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
|
||||
"sqlalchemy baked query cache size exceeded in %s" % __name__))
|
||||
|
||||
|
||||
class AIMExtensionDriver(api.ExtensionDriver,
|
||||
intra_ptg_db.ApicIntraPtgDBMixin,
|
||||
|
@ -53,8 +58,14 @@ class AIMExtensionDriver(api.ExtensionDriver,
|
|||
|
||||
def _set_intra_ptg_allow(self, session, data, result):
|
||||
ptg = data['policy_target_group']
|
||||
ptg_db = (session.query(gp_db.PolicyTargetGroup)
|
||||
.filter_by(id=result['id']).one())
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
gp_db.PolicyTargetGroup))
|
||||
query += lambda q: q.filter_by(
|
||||
id=sa.bindparam('id'))
|
||||
ptg_db = query(session).params(
|
||||
id=result['id']).one()
|
||||
|
||||
if not ptg_db:
|
||||
raise gpolicy.PolicyTargetGroupNotFound(
|
||||
policy_target_group_id=result['id'])
|
||||
|
|
|
@ -19,6 +19,8 @@ from neutron_lib.callbacks import registry
|
|||
from neutron_lib.callbacks import resources
|
||||
from neutron_lib.plugins import directory
|
||||
from oslo_log import log as logging
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext import baked
|
||||
|
||||
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import constants
|
||||
from gbpservice.neutron.services.grouppolicy.common import exceptions as exc
|
||||
|
@ -28,6 +30,9 @@ from gbpservice.neutron.services.sfc.aim import exceptions as sfc_exc
|
|||
LOG = logging.getLogger(__name__)
|
||||
flowclassifier.SUPPORTED_L7_PARAMETERS.update(sfc_cts.AIM_FLC_L7_PARAMS)
|
||||
|
||||
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
|
||||
"sqlalchemy baked query cache size exceeded in %s" % __name__))
|
||||
|
||||
|
||||
class FlowclassifierAIMDriverBase(base.FlowClassifierDriverBase):
|
||||
def create_flow_classifier_precommit(self, context):
|
||||
|
@ -118,10 +123,16 @@ class FlowclassifierAIMDriver(FlowclassifierAIMDriverBase):
|
|||
with context.session.begin(subtransactions=True):
|
||||
classifier_ids = []
|
||||
for keyword in [sfc_cts.LOGICAL_SRC_NET, sfc_cts.LOGICAL_DST_NET]:
|
||||
query = BAKERY(lambda s: s.query(
|
||||
flc_db.L7Parameter))
|
||||
query += lambda q: q.filter_by(
|
||||
keyword=sa.bindparam('keyword'))
|
||||
query += lambda q: q.filter_by(
|
||||
value=sa.bindparam('network_id'))
|
||||
classifier_ids.extend(
|
||||
[x.classifier_id for x in context.session.query(
|
||||
flc_db.L7Parameter).filter_by(
|
||||
keyword=keyword).filter_by(value=network_id).all()])
|
||||
[x.classifier_id for x in query(context.session).params(
|
||||
keyword=keyword, network_id=network_id).all()])
|
||||
|
||||
return classifier_ids
|
||||
|
||||
def _handle_network_delete(self, rtype, event, trigger, context,
|
||||
|
|
|
@ -31,6 +31,8 @@ from neutron_lib.callbacks import registry
|
|||
from neutron_lib import constants as n_constants
|
||||
from neutron_lib.plugins import directory
|
||||
from oslo_log import log as logging
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext import baked
|
||||
from sqlalchemy import or_
|
||||
|
||||
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import apic_mapper
|
||||
|
@ -51,6 +53,9 @@ SUPPORTED_DOM_TYPES = [PHYSDOM_TYPE]
|
|||
DEFAULT_SUBNETS = ['128.0.0.0/1', '0.0.0.0/1', '8000::/1', '::/1']
|
||||
MAX_PPGS_PER_CHAIN = 3
|
||||
|
||||
BAKERY = baked.bakery(_size_alert=lambda c: LOG.warning(
|
||||
"sqlalchemy baked query cache size exceeded in %s" % __name__))
|
||||
|
||||
|
||||
class SfcAIMDriverBase(base.SfcDriverBase):
|
||||
def delete_port_pair_group(self, context):
|
||||
|
@ -672,18 +677,29 @@ class SfcAIMDriver(SfcAIMDriverBase):
|
|||
def _get_chains_by_classifier_id(self, plugin_context, flowc_id):
|
||||
context = plugin_context
|
||||
with db_api.context_manager.writer.using(context):
|
||||
chain_ids = [x.portchain_id for x in context.session.query(
|
||||
sfc_db.ChainClassifierAssoc).filter_by(
|
||||
flowclassifier_id=flowc_id).all()]
|
||||
query = BAKERY(lambda s: s.query(
|
||||
sfc_db.ChainClassifierAssoc))
|
||||
query += lambda q: q.filter_by(
|
||||
flowclassifier_id=sa.bindparam('flowc_id'))
|
||||
chain_ids = [x.portchain_id for x in query(context.session).params(
|
||||
flowc_id=flowc_id).all()]
|
||||
|
||||
return self.sfc_plugin.get_port_chains(plugin_context,
|
||||
filters={'id': chain_ids})
|
||||
|
||||
def _get_chains_by_ppg_ids(self, plugin_context, ppg_ids):
|
||||
if not ppg_ids:
|
||||
return []
|
||||
context = plugin_context
|
||||
with db_api.context_manager.writer.using(context):
|
||||
chain_ids = [x.portchain_id for x in context.session.query(
|
||||
sfc_db.ChainGroupAssoc).filter(
|
||||
sfc_db.ChainGroupAssoc.portpairgroup_id.in_(ppg_ids)).all()]
|
||||
query = BAKERY(lambda s: s.query(
|
||||
sfc_db.ChainGroupAssoc))
|
||||
query += lambda q: q.filter(
|
||||
sfc_db.ChainGroupAssoc.portpairgroup_id.in_(
|
||||
sa.bindparam('ppg_ids', expanding=True)))
|
||||
chain_ids = [x.portchain_id for x in query(context.session).params(
|
||||
ppg_ids=ppg_ids).all()]
|
||||
|
||||
return self.sfc_plugin.get_port_chains(plugin_context,
|
||||
filters={'id': chain_ids})
|
||||
|
||||
|
@ -698,14 +714,22 @@ class SfcAIMDriver(SfcAIMDriverBase):
|
|||
return []
|
||||
|
||||
def _get_group_ids_by_network_ids(self, plugin_context, network_ids):
|
||||
if not network_ids:
|
||||
return []
|
||||
session = plugin_context.session
|
||||
return [
|
||||
x.portpairgroup_id for x in
|
||||
session.query(sfc_db.PortPair).join(
|
||||
models_v2.Port,
|
||||
or_(models_v2.Port.id == sfc_db.PortPair.ingress,
|
||||
models_v2.Port.id == sfc_db.PortPair.egress)).filter(
|
||||
models_v2.Port.network_id.in_(network_ids)).all()]
|
||||
|
||||
query = BAKERY(lambda s: s.query(
|
||||
sfc_db.PortPair))
|
||||
query += lambda q: q.join(
|
||||
models_v2.Port,
|
||||
or_(models_v2.Port.id == sfc_db.PortPair.ingress,
|
||||
models_v2.Port.id == sfc_db.PortPair.egress))
|
||||
query += lambda q: q.filter(
|
||||
models_v2.Port.network_id.in_(
|
||||
sa.bindparam('network_ids', expanding=True)))
|
||||
return [x.portpairgroup_id for x in
|
||||
query(session).params(
|
||||
network_ids=network_ids).all()]
|
||||
|
||||
def _should_regenerate_pp(self, context):
|
||||
attrs = [INGRESS, EGRESS, 'name']
|
||||
|
@ -1034,20 +1058,26 @@ class SfcAIMDriver(SfcAIMDriverBase):
|
|||
def _validate_flow_classifiers(self, mgr):
|
||||
# REVISIT: Implement validation of actual mapping to AIM
|
||||
# resources.
|
||||
if mgr.actual_session.query(flowc_db.FlowClassifier).first():
|
||||
query = BAKERY(lambda s: s.query(
|
||||
flowc_db.FlowClassifier))
|
||||
if query(mgr.actual_session).first():
|
||||
mgr.validation_failed(
|
||||
"SFC->AIM validation for FC not yet implemented")
|
||||
|
||||
def _validate_port_pair_groups(self, mgr):
|
||||
# REVISIT: Implement validation of actual mapping to AIM
|
||||
# resources.
|
||||
if mgr.actual_session.query(sfc_db.PortPairGroup).first():
|
||||
query = BAKERY(lambda s: s.query(
|
||||
sfc_db.PortPairGroup))
|
||||
if query(mgr.actual_session).first():
|
||||
mgr.validation_failed(
|
||||
"SFC->AIM validation for PPG not yet implemented")
|
||||
|
||||
def _validate_port_chains(self, mgr):
|
||||
# REVISIT: Implement validation of actual mapping to AIM
|
||||
# resources.
|
||||
if mgr.actual_session.query(sfc_db.PortChain).first():
|
||||
query = BAKERY(lambda s: s.query(
|
||||
sfc_db.PortChain))
|
||||
if query(mgr.actual_session).first():
|
||||
mgr.validation_failed(
|
||||
"SFC->AIM validation for PC not yet implemented")
|
||||
|
|
Loading…
Reference in New Issue