[aim] Ensure existence of static AIM resources during driver initialization

Certain AIM resources that were previously created on-demand, but are
never deleted, are now created during driver initialization if they do
not already exist. These include the common Tenant, the any Filter,
the unrouted VRF, and the default SecurityGroup.

Loading and clearing of the AIM schema is now done via a fixture
during GBP unit tests, allowing it to work properly during driver
initialization when running unit tests.

Under the common Tenant, the default SecurityGroup's name is now
scoped by the system_id so that multiple OpenStack installations can
peacefully co-exist within an ACI fabric. Unit tests are added for the
default SecurityGroup, its SecurityGroupSubject, and its ingress and
egress SecurityGroupRules. The display_name fields are also set for
these resources.

Change-Id: Ie1a94e6840a6fb64b22548eb0977f2a047b1a3bd
This commit is contained in:
Robert Kukura 2017-11-14 20:35:02 -05:00
parent 6bbbb6cb22
commit 1d27e00865
4 changed files with 186 additions and 95 deletions

View File

@ -66,16 +66,13 @@ from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import extension_db
LOG = log.getLogger(__name__)
DEVICE_OWNER_SNAT_PORT = 'apic:snat-pool'
# REVISIT(rkukura): Consider making these APIC name constants
# configurable, although changing them would break an existing
# deployment.
ANY_FILTER_NAME = 'AnyFilter'
ANY_FILTER_ENTRY_NAME = 'AnyFilterEntry'
DEFAULT_VRF_NAME = 'DefaultVRF'
UNROUTED_VRF_NAME = 'UnroutedVRF'
COMMON_TENANT_NAME = 'common'
ROUTER_SUBJECT_NAME = 'route'
DEFAULT_SG_NAME = 'DefaultSecurityGroup'
AGENT_TYPE_DVS = 'DVS agent'
VIF_TYPE_DVS = 'dvs'
@ -87,7 +84,6 @@ FABRIC_HOST_ID = 'fabric'
NO_ADDR_SCOPE = object()
DVS_AGENT_KLASS = 'networking_vsphere.common.dvs_agent_rpc_api.DVSClientAPI'
GBP_DEFAULT = 'gbp_default'
DEFAULT_HOST_DOMAIN = '*'
@ -201,43 +197,53 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
self.enable_iptables_firewall = (cfg.CONF.ml2_apic_aim.
enable_iptables_firewall)
local_api.QUEUE_OUT_OF_PROCESS_NOTIFICATIONS = True
self._setup_default_arp_security_group_rules()
self._ensure_static_resources()
def _setup_default_arp_security_group_rules(self):
def _ensure_static_resources(self):
session = db_api.get_session()
aim_ctx = aim_context.AimContext(session)
sg = aim_resource.SecurityGroup(
tenant_name=COMMON_TENANT_NAME, name=GBP_DEFAULT)
try:
self.aim.create(aim_ctx, sg, overwrite=True)
except db_exc.DBNonExistentTable as e:
# This is expected in the UT env. but will never
# happen in the real fab.
LOG.error(e)
return
self._ensure_common_tenant(aim_ctx)
self._ensure_unrouted_vrf(aim_ctx)
self._ensure_any_filter(aim_ctx)
self._setup_default_arp_security_group_rules(aim_ctx)
def _setup_default_arp_security_group_rules(self, aim_ctx):
sg_name = self._default_sg_name
dname = aim_utils.sanitize_display_name('DefaultSecurityGroup')
sg = aim_resource.SecurityGroup(
tenant_name=COMMON_TENANT_NAME, name=sg_name, display_name=dname)
self.aim.create(aim_ctx, sg, overwrite=True)
dname = aim_utils.sanitize_display_name('DefaultSecurityGroupSubject')
sg_subject = aim_resource.SecurityGroupSubject(
tenant_name=COMMON_TENANT_NAME,
security_group_name=GBP_DEFAULT, name='default')
security_group_name=sg_name, name='default', display_name=dname)
self.aim.create(aim_ctx, sg_subject, overwrite=True)
dname = aim_utils.sanitize_display_name(
'DefaultSecurityGroupEgressRule')
arp_egress_rule = aim_resource.SecurityGroupRule(
tenant_name=COMMON_TENANT_NAME,
security_group_name=GBP_DEFAULT,
security_group_name=sg_name,
security_group_subject_name='default',
name='arp_egress',
display_name=dname,
direction='egress',
ethertype='arp',
conn_track='normal')
self.aim.create(aim_ctx, arp_egress_rule, overwrite=True)
dname = aim_utils.sanitize_display_name(
'DefaultSecurityGroupIngressRule')
arp_ingress_rule = aim_resource.SecurityGroupRule(
tenant_name=COMMON_TENANT_NAME,
security_group_name=GBP_DEFAULT,
security_group_name=sg_name,
security_group_subject_name='default',
name='arp_ingress',
display_name=dname,
direction='ingress',
ethertype='arp',
conn_track='normal')
self.aim.create(aim_ctx, arp_egress_rule, overwrite=True)
self.aim.create(aim_ctx, arp_ingress_rule, overwrite=True)
def _setup_keystone_notification_listeners(self):
@ -316,7 +322,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
bd, epg = self._map_network(session, current)
dname = aim_utils.sanitize_display_name(current['name'])
vrf = self._ensure_unrouted_vrf(aim_ctx)
vrf = self._map_unrouted_vrf()
bd.display_name = dname
bd.vrf_name = vrf.name
@ -667,8 +673,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
session = context.session
aim_ctx = aim_context.AimContext(session)
filter = self._ensure_any_filter(aim_ctx)
contract, subject = self._map_router(session, current)
dname = aim_utils.sanitize_display_name(current['name'])
@ -677,7 +681,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
self.aim.create(aim_ctx, contract)
subject.display_name = dname
subject.bi_filters = [filter.name]
subject.bi_filters = [self._any_filter_name]
self.aim.create(aim_ctx, subject)
# External-gateway information about the router will be handled
@ -2197,7 +2201,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
return tenant
def _ensure_unrouted_vrf(self, aim_ctx):
self._ensure_common_tenant(aim_ctx)
attrs = self._map_unrouted_vrf()
vrf = self.aim.get(aim_ctx, attrs)
if not vrf:
@ -2208,9 +2211,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
return vrf
def _ensure_any_filter(self, aim_ctx):
self._ensure_common_tenant(aim_ctx)
filter_name = self.apic_system_id + '_' + ANY_FILTER_NAME
filter_name = self._any_filter_name
dname = aim_utils.sanitize_display_name("AnyFilter")
filter = aim_resource.Filter(tenant_name=COMMON_TENANT_NAME,
name=filter_name,
@ -2230,6 +2231,14 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
return filter
@property
def _any_filter_name(self):
return self.apic_system_id + '_' + ANY_FILTER_NAME
@property
def _default_sg_name(self):
return self.apic_system_id + '_' + DEFAULT_SG_NAME
def _ensure_default_vrf(self, aim_ctx, attrs):
vrf = self.aim.get(aim_ctx, attrs)
if not vrf:

View File

@ -219,8 +219,9 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
{'policy-space': details['ptg_tenant'],
'name': sg_id})
# Always include this SG which has the default arp & dhcp rules
details['security_group'].append({'policy-space': 'common',
'name': 'gbp_default'})
details['security_group'].append(
{'policy-space': 'common',
'name': self.aim_mech_driver._default_sg_name})
# Child class needs to support:
# - self._get_subnet_details(context, port, details)

View File

@ -14,6 +14,7 @@
# under the License.
import copy
import fixtures
import mock
import netaddr
import six
@ -27,6 +28,7 @@ from aim.api import status as aim_status
from aim import config as aim_cfg
from aim import context as aim_context
from aim.db import model_base as aim_model_base
from aim.db import models as aim_models # noqa
from aim import utils as aim_utils
from keystoneclient.v3 import client as ksc_client
@ -41,6 +43,7 @@ from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
from neutron.tests.unit.extensions import test_address_scope
from neutron.tests.unit.extensions import test_l3
from neutron.tests.unit.extensions import test_securitygroup
from neutron.tests.unit import testlib_api
from neutron_lib import constants as n_constants
from neutron_lib.plugins import directory
from opflexagent import constants as ofcst
@ -125,7 +128,29 @@ class FakeKeystoneClient(object):
self.projects = FakeProjectManager()
# TODO(rkukura): Also run Neutron L3 tests on apic_aim L3 plugin.
class AimSqlFixture(fixtures.Fixture):
# Flag to indicate that the models have been loaded.
_AIM_TABLES_ESTABLISHED = False
def _setUp(self):
# Ensure Neutron has done its setup first.
self.useFixture(testlib_api.StaticSqlFixture())
# Register all data models.
engine = db_api.context_manager.writer.get_engine()
if not AimSqlFixture._AIM_TABLES_ESTABLISHED:
aim_model_base.Base.metadata.create_all(engine)
AimSqlFixture._AIM_TABLES_ESTABLISHED = True
def clear_tables():
with engine.begin() as conn:
for table in reversed(
aim_model_base.Base.metadata.sorted_tables):
conn.execute(table.delete())
self.addCleanup(clear_tables)
class ApicAimTestMixin(object):
@ -189,10 +214,9 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
'L3_ROUTER_NAT':
'gbpservice.neutron.services.apic_aim.l3_plugin.ApicL3Plugin'}
self.useFixture(AimSqlFixture())
super(ApicAimTestCase, self).setUp(PLUGIN_NAME,
service_plugins=service_plugins)
engine = db_api.context_manager.writer.get_engine()
aim_model_base.Base.metadata.create_all(engine)
self.db_session = db_api.get_session()
self.initialize_db_config(self.db_session)
@ -230,11 +254,6 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
'dhcp_agent_scheduler')
def tearDown(self):
engine = db_api.context_manager.writer.get_engine()
with engine.begin() as conn:
for table in reversed(
aim_model_base.Base.metadata.sorted_tables):
conn.execute(table.delete())
ksc_client.Client = self.saved_keystone_client
# We need to do the following to avoid non-aim tests
# picking up the patched version of the method in patch_neutron
@ -328,37 +347,48 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(self.fmt, res)
def _get_sg(self, sg_id, tenant_name):
def _get_sg(self, sg_name, tenant_name):
session = db_api.get_session()
aim_ctx = aim_context.AimContext(session)
sg = aim_resource.SecurityGroup(tenant_name=tenant_name,
name=sg_id)
name=sg_name)
sg = self.aim_mgr.get(aim_ctx, sg)
self.assertIsNotNone(sg)
return sg
def _sg_should_not_exist(self, sg_id):
def _sg_should_not_exist(self, sg_name):
session = db_api.get_session()
aim_ctx = aim_context.AimContext(session)
sgs = self.aim_mgr.find(
aim_ctx, aim_resource.SecurityGroup, name=sg_id)
aim_ctx, aim_resource.SecurityGroup, name=sg_name)
self.assertEqual([], sgs)
def _get_sg_rule(self, sg_id, sg_rule_id, tenant_name):
def _get_sg_subject(self, sg_subject_name, sg_name, tenant_name):
session = db_api.get_session()
aim_ctx = aim_context.AimContext(session)
sg_subject = aim_resource.SecurityGroupSubject(
tenant_name=tenant_name, security_group_name=sg_name,
name=sg_subject_name)
sg_subject = self.aim_mgr.get(aim_ctx, sg_subject)
self.assertIsNotNone(sg_subject)
return sg_subject
def _get_sg_rule(self, sg_rule_name, sg_subject_name, sg_name,
tenant_name):
session = db_api.get_session()
aim_ctx = aim_context.AimContext(session)
sg_rule = aim_resource.SecurityGroupRule(
tenant_name=tenant_name, security_group_name=sg_id,
security_group_subject_name='default', name=sg_rule_id)
tenant_name=tenant_name, security_group_name=sg_name,
security_group_subject_name=sg_subject_name, name=sg_rule_name)
sg_rule = self.aim_mgr.get(aim_ctx, sg_rule)
self.assertIsNotNone(sg_rule)
return sg_rule
def _sg_rule_should_not_exist(self, sg_rule_id):
def _sg_rule_should_not_exist(self, sg_rule_name):
session = db_api.get_session()
aim_ctx = aim_context.AimContext(session)
sg_rules = self.aim_mgr.find(
aim_ctx, aim_resource.SecurityGroupRule, name=sg_rule_id)
aim_ctx, aim_resource.SecurityGroupRule, name=sg_rule_name)
self.assertEqual([], sg_rules)
def port_notif_verifier(self):
@ -674,7 +704,8 @@ class TestAimMapping(ApicAimTestCase):
tenant_aname = self.name_mapper.project(None, sg_rule['tenant_id'])
self._get_tenant(tenant_aname)
aim_sg_rule = self._get_sg_rule(sg_id, sg_rule['id'], tenant_aname)
aim_sg_rule = self._get_sg_rule(
sg_rule['id'], 'default', sg_id, tenant_aname)
self.assertEqual(tenant_aname, aim_sg_rule.tenant_name)
self.assertEqual(sg_id, aim_sg_rule.security_group_name)
self.assertEqual('default',
@ -720,8 +751,6 @@ class TestAimMapping(ApicAimTestCase):
aim_subject.bi_filters)
self._check_dn_is_resource(dns, 'ContractSubject', aim_subject)
self._check_any_filter()
if expected_gw_ips:
if unscoped_project:
self._check_router_vrf(
@ -789,6 +818,71 @@ class TestAimMapping(ApicAimTestCase):
self.assertFalse(aim_entry.stateful)
self.assertFalse(aim_entry.fragment_only)
def test_static_resources(self):
# Check common Tenant.
tenant = self._get_tenant('common')
self.assertEqual('common', tenant.name)
self.assertEqual('CommonTenant', tenant.display_name)
self.assertEqual('', tenant.descr)
# Check unrouted VRF.
vrf_aname = self.driver.apic_system_id + '_UnroutedVRF'
vrf = self._get_vrf(vrf_aname, 'common')
self.assertEqual('common', vrf.tenant_name)
self.assertEqual(vrf_aname, vrf.name)
self.assertEqual('CommonUnroutedVRF', vrf.display_name)
self.assertEqual('enforced', vrf.policy_enforcement_pref)
# Check any Filter.
self._check_any_filter()
# Check default SecurityGroup.
sg_aname = self.driver.apic_system_id + '_DefaultSecurityGroup'
sg = self._get_sg(sg_aname, 'common')
self.assertEqual('common', sg.tenant_name)
self.assertEqual(sg_aname, sg.name)
self.assertEqual('DefaultSecurityGroup', sg.display_name)
# Check default SecurityGroupSubject.
sg_subject = self._get_sg_subject('default', sg_aname, 'common')
self.assertEqual('common', sg_subject.tenant_name)
self.assertEqual(sg_aname, sg_subject.security_group_name)
self.assertEqual('default', sg_subject.name)
self.assertEqual(
'DefaultSecurityGroupSubject', sg_subject.display_name)
# Check ARP egress SecurityGroupRule.
sg_rule = self._get_sg_rule(
'arp_egress', 'default', sg_aname, 'common')
self.assertEqual('common', sg_rule.tenant_name)
self.assertEqual(sg_aname, sg_rule.security_group_name)
self.assertEqual('default', sg_rule.security_group_subject_name)
self.assertEqual('arp_egress', sg_rule.name)
self.assertEqual(
'DefaultSecurityGroupEgressRule', sg_rule.display_name)
self.assertEqual('egress', sg_rule.direction)
self.assertEqual('arp', sg_rule.ethertype)
self.assertEqual([], sg_rule.remote_ips)
self.assertEqual('unspecified', sg_rule.from_port)
self.assertEqual('unspecified', sg_rule.to_port)
self.assertEqual('normal', sg_rule.conn_track)
# Check ARP inress SecurityGroupRule.
sg_rule = self._get_sg_rule(
'arp_ingress', 'default', sg_aname, 'common')
self.assertEqual('common', sg_rule.tenant_name)
self.assertEqual(sg_aname, sg_rule.security_group_name)
self.assertEqual('default', sg_rule.security_group_subject_name)
self.assertEqual('arp_ingress', sg_rule.name)
self.assertEqual(
'DefaultSecurityGroupIngressRule', sg_rule.display_name)
self.assertEqual('ingress', sg_rule.direction)
self.assertEqual('arp', sg_rule.ethertype)
self.assertEqual([], sg_rule.remote_ips)
self.assertEqual('unspecified', sg_rule.from_port)
self.assertEqual('unspecified', sg_rule.to_port)
self.assertEqual('normal', sg_rule.conn_track)
def test_network_lifecycle(self):
# Test create.
net = self._make_network(self.fmt, 'net1', True)['network']
@ -2316,12 +2410,12 @@ class TestAimMapping(ApicAimTestCase):
def test_network_in_address_scope_pre_existing_vrf(self, common_vrf=False):
aim_ctx = aim_context.AimContext(self.db_session)
tenant = aim_resource.Tenant(
name='common' if common_vrf else self.t1_aname,
display_name=('CommonTenant' if common_vrf
else TEST_TENANT_NAMES['t1']),
monitored=True)
self.aim_mgr.create(aim_ctx, tenant)
if not common_vrf:
tenant = aim_resource.Tenant(
name=self.t1_aname,
display_name=TEST_TENANT_NAMES['t1'],
monitored=True)
self.aim_mgr.create(aim_ctx, tenant)
vrf = aim_resource.VRF(
tenant_name='common' if common_vrf else self.t1_aname,
name='ctx1', monitored=True)
@ -4719,8 +4813,6 @@ class TestExternalConnectivityBase(object):
aim_ctx = aim_context.AimContext(self.db_session)
# create pre-existing VRF
tenant = aim_resource.Tenant(name='common', monitored=True)
self.aim_mgr.create(aim_ctx, tenant)
vrf = aim_resource.VRF(tenant_name='common', name='ctx1',
monitored=True)
vrf = self.aim_mgr.create(aim_ctx, vrf)
@ -5569,8 +5661,8 @@ class TestPortOnPhysicalNode(TestPortVlanNetwork):
if sg_rule['remote_group_id'] and sg_rule['ethertype'] == 'IPv4':
break
tenant_aname = self.name_mapper.project(None, default_sg['tenant_id'])
aim_sg_rule = self._get_sg_rule(default_sg_id, sg_rule['id'],
tenant_aname)
aim_sg_rule = self._get_sg_rule(
sg_rule['id'], 'default', default_sg_id, tenant_aname)
self.assertEqual(aim_sg_rule.remote_ips, ['10.0.1.100'])
# add another rule with remote_group_id set
@ -5580,36 +5672,36 @@ class TestPortOnPhysicalNode(TestPortVlanNetwork):
rules = {'security_group_rules': [rule1['security_group_rule']]}
sg_rule1 = self._make_security_group_rule(
self.fmt, rules)['security_group_rules'][0]
aim_sg_rule = self._get_sg_rule(default_sg_id, sg_rule1['id'],
tenant_aname)
aim_sg_rule = self._get_sg_rule(
sg_rule1['id'], 'default', default_sg_id, tenant_aname)
self.assertEqual(aim_sg_rule.remote_ips, ['10.0.1.100'])
# delete SG from port
data = {'port': {'security_groups': []}}
port = self._update('ports', port['id'], data)['port']
aim_sg_rule = self._get_sg_rule(default_sg_id, sg_rule['id'],
tenant_aname)
aim_sg_rule = self._get_sg_rule(
sg_rule['id'], 'default', default_sg_id, tenant_aname)
self.assertEqual(aim_sg_rule.remote_ips, [])
aim_sg_rule = self._get_sg_rule(default_sg_id, sg_rule1['id'],
tenant_aname)
aim_sg_rule = self._get_sg_rule(
sg_rule1['id'], 'default', default_sg_id, tenant_aname)
self.assertEqual(aim_sg_rule.remote_ips, [])
# add SG to port
data = {'port': {'security_groups': [default_sg_id]}}
port = self._update('ports', port['id'], data)['port']
aim_sg_rule = self._get_sg_rule(default_sg_id, sg_rule['id'],
tenant_aname)
aim_sg_rule = self._get_sg_rule(
sg_rule['id'], 'default', default_sg_id, tenant_aname)
self.assertEqual(aim_sg_rule.remote_ips, ['10.0.1.100'])
aim_sg_rule = self._get_sg_rule(default_sg_id, sg_rule1['id'],
tenant_aname)
aim_sg_rule = self._get_sg_rule(
sg_rule1['id'], 'default', default_sg_id, tenant_aname)
self.assertEqual(aim_sg_rule.remote_ips, ['10.0.1.100'])
self._delete('ports', port['id'])
aim_sg_rule = self._get_sg_rule(default_sg_id, sg_rule['id'],
tenant_aname)
aim_sg_rule = self._get_sg_rule(
sg_rule['id'], 'default', default_sg_id, tenant_aname)
self.assertEqual(aim_sg_rule.remote_ips, [])
aim_sg_rule = self._get_sg_rule(default_sg_id, sg_rule1['id'],
tenant_aname)
aim_sg_rule = self._get_sg_rule(
sg_rule1['id'], 'default', default_sg_id, tenant_aname)
self.assertEqual(aim_sg_rule.remote_ips, [])
def test_mixed_ports_on_network_with_specific_domains(self):

View File

@ -22,7 +22,6 @@ from aim.api import infra as aim_infra
from aim.api import resource as aim_resource
from aim.api import status as aim_status
from aim import context as aim_context
from aim.db import model_base as aim_model_base
from keystoneclient.v3 import client as ksc_client
from netaddr import IPSet
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
@ -41,7 +40,7 @@ from oslo_config import cfg
import webob.exc
from gbpservice.network.neutronv2 import local_api
from gbpservice.neutron.db.grouppolicy import group_policy_mapping_db # noqa
from gbpservice.neutron.db.grouppolicy import group_policy_mapping_db
from gbpservice.neutron.extensions import cisco_apic
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import (
mechanism_driver as md)
@ -134,11 +133,11 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
'type_drivers': ['opflex', 'local', 'vlan'],
'tenant_network_types': ['opflex']}
self._default_es_name = 'default'
self.useFixture(test_aim_md.AimSqlFixture())
super(AIMBaseTestCase, self).setUp(
policy_drivers=policy_drivers, core_plugin=core_plugin,
ml2_options=ml2_opts, l3_plugin=l3_plugin,
sc_plugin=sc_plugin, qos_plugin=qos_plugin)
aim_model_base.Base.metadata.create_all(self.engine)
self.db_session = db_api.get_session()
self.initialize_db_config(self.db_session)
self.l3_plugin = directory.get_plugin(n_constants.L3)
@ -189,11 +188,6 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
self._dn_t1_l1_n1 = ('uni/tn-%s/out-l1/instP-n1' % self._t1_aname)
def tearDown(self):
engine = db_api.context_manager.writer.get_engine()
with engine.begin() as conn:
for table in reversed(
aim_model_base.Base.metadata.sorted_tables):
conn.execute(table.delete())
ksc_client.Client = self.saved_keystone_client
# We need to do the following to avoid non-aim tests
# picking up the patched version of the method in patch_neutron
@ -1527,8 +1521,6 @@ class TestLegacyL3Policy(TestL3Policy):
subnet_prefix_length=24, shared=False)
session.add(l3p_db)
session.flush()
aim_model_base.Base.metadata.create_all(
session.__dict__['bind'])
orig_create_per_l3p_implicit_contracts(self)
aimd.AIMMappingDriver._create_per_l3p_implicit_contracts = (
orig_create_per_l3p_implicit_contracts)
@ -1628,9 +1620,9 @@ class TestL2Policy(TestL2PolicyBase):
shared=False):
self.assertEqual(0, len(self.aim_mgr.find(
self._aim_context, aim_resource.Contract)))
self.assertEqual(0, len(self.aim_mgr.find(
self.assertEqual(1, len(self.aim_mgr.find(
self._aim_context, aim_resource.Filter)))
self.assertEqual(0, len(self.aim_mgr.find(
self.assertEqual(1, len(self.aim_mgr.find(
self._aim_context, aim_resource.FilterEntry)))
l2p0 = self.create_l2_policy(name="l2p0",
shared=shared)['l2_policy']
@ -2108,11 +2100,11 @@ class TestL2PolicyRollback(TestL2PolicyBase):
aim_filters = self.aim_mgr.find(
self._aim_context, aim_resource.Filter,
tenant_name=aim_tenant_name)
self.assertEqual(0, len(aim_filters))
self.assertEqual(1, len(aim_filters))
aim_filter_entries = self.aim_mgr.find(
self._aim_context, aim_resource.FilterEntry,
tenant_name=aim_tenant_name)
self.assertEqual(0, len(aim_filter_entries))
self.assertEqual(1, len(aim_filter_entries))
# restore mock
self.dummy.create_l2_policy_precommit = orig_func
@ -3093,7 +3085,8 @@ class TestPolicyTarget(AIMBaseTestCase):
{'policy-space': mapping['ptg_tenant'],
'name': sg_id})
sg_list.append({'policy-space': 'common',
'name': 'gbp_default'})
'name': self.driver.aim_mech_driver.apic_system_id +
'_DefaultSecurityGroup'})
self.assertEqual(sg_list, mapping['security_group'])
def _do_test_gbp_details_no_pt(self, use_as=True, routed=True,
@ -3209,8 +3202,6 @@ class TestPolicyTarget(AIMBaseTestCase):
def test_get_gbp_details_pre_existing_vrf(self):
aim_ctx = aim_context.AimContext(self.db_session)
self.aim_mgr.create(
aim_ctx, aim_resource.Tenant(name='common', monitored=True))
vrf = self.aim_mgr.create(
aim_ctx, aim_resource.VRF(tenant_name='common', name='ctx1',
monitored=True))
@ -3223,8 +3214,6 @@ class TestPolicyTarget(AIMBaseTestCase):
def test_get_gbp_details_no_pt_pre_existing_vrf(self):
aim_ctx = aim_context.AimContext(self.db_session)
self.aim_mgr.create(
aim_ctx, aim_resource.Tenant(name='common', monitored=True))
vrf = self.aim_mgr.create(
aim_ctx, aim_resource.VRF(tenant_name='common', name='ctx1',
monitored=True))
@ -3715,10 +3704,10 @@ class TestPolicyRuleRollback(TestPolicyRuleBase):
self._gbp_plugin.get_policy_rules(self._context))
aim_filters = self.aim_mgr.find(
self._aim_context, aim_resource.Filter)
self.assertEqual(0, len(aim_filters))
self.assertEqual(1, len(aim_filters))
aim_filter_entries = self.aim_mgr.find(
self._aim_context, aim_resource.FilterEntry)
self.assertEqual(0, len(aim_filter_entries))
self.assertEqual(1, len(aim_filter_entries))
# restore mock
self.dummy.create_policy_rule_precommit = orig_func