Remove APIC mapping policy driver

The APIC mapping policy driver was deprecated in mitaka and
replaced with the AIM mapping driver. This patch removes the
APIC mapping driver, starting with the ocata release.

Change-Id: I5f33cd2c0e06cf45e092e74e664809475904c047
This commit is contained in:
Thomas Bachman 2017-06-22 14:01:07 +00:00 committed by Sumit Naiksatam
parent 7db0f27231
commit 02a94fb722
16 changed files with 14 additions and 12610 deletions

View File

@ -1,302 +0,0 @@
# Copyright (c) 2014 Cisco Systems Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import re
from neutron import context as nctx
from neutron.extensions import portbindings
from neutron import manager
from neutron.plugins.ml2 import driver_api as api
from neutron_lib import constants as n_constants
from opflexagent import constants as ofcst
from oslo_log import log
from oslo_utils import importutils
from gbpservice._i18n import _LW
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
apic_mapping as amap)
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
nova_client as nclient)
LOG = log.getLogger(__name__)
# TODO(tbachman) Find a good home for these
AGENT_TYPE_DVS = 'DVS agent'
VIF_TYPE_DVS = 'dvs'
DVS_AGENT_KLASS = 'vmware_dvs.api.dvs_agent_rpc_api.DVSClientAPI'
class APICMechanismGBPDriver(api.MechanismDriver):
def __init__(self):
super(APICMechanismGBPDriver, self).__init__()
self._dvs_notifier = None
self._apic_allowed_vm_name_driver = None
def _agent_bind_port(self, context, agent_list, bind_strategy):
"""Attempt port binding per agent.
Perform the port binding for a given agent.
Returns True if bound successfully.
"""
for agent in agent_list:
LOG.debug("Checking agent: %s", agent)
if agent['alive']:
for segment in context.segments_to_bind:
if bind_strategy(context, segment, agent):
LOG.debug("Bound using segment: %s", segment)
return True
else:
LOG.warning(_LW("Refusing to bind port %(pid)s to dead agent: "
"%(agent)s"),
{'pid': context.current['id'], 'agent': agent})
return False
def bind_port(self, context):
"""Get port binding per host.
This is similar to the one defined in the
AgentMechanismDriverBase class, but is modified
to support multiple L2 agent types (DVS and OpFlex).
"""
port = context.current
LOG.debug("Attempting to bind port %(port)s on "
"network %(network)s",
{'port': port['id'],
'network': context.network.current['id']})
vnic_type = port.get(portbindings.VNIC_TYPE,
portbindings.VNIC_NORMAL)
if vnic_type not in [portbindings.VNIC_NORMAL]:
LOG.debug("Refusing to bind due to unsupported vnic_type: %s",
vnic_type)
return
if port['device_owner'].startswith('compute:'):
# enforce the allowed_vm_names rules if possible
if (port['device_id'] and self.apic_allowed_vm_name_driver):
ptg, pt = self.apic_gbp._port_id_to_ptg(
context._plugin_context, port['id'])
if ptg is None:
LOG.warning(_LW("PTG for port %s does not exist"),
port['id'])
return
l2p = self.apic_gbp._get_l2_policy(context._plugin_context,
ptg['l2_policy_id'])
l3p = self.apic_gbp.gbp_plugin.get_l3_policy(
context._plugin_context, l2p['l3_policy_id'])
ok_to_bind = True
if l3p.get('allowed_vm_names'):
ok_to_bind = False
vm = nclient.NovaClient().get_server(port['device_id'])
for allowed_vm_name in l3p['allowed_vm_names']:
match = re.search(allowed_vm_name, vm.name)
if match:
ok_to_bind = True
break
if not ok_to_bind:
LOG.warning(_LW("Failed to bind the port due to "
"allowed_vm_names rules %(rules)s "
"for VM: %(vm)s"),
{'rules': l3p['allowed_vm_names'],
'vm': vm.name})
return
# Attempt to bind ports for DVS agents for nova-compute daemons
# first. This allows having network agents (dhcp, metadata)
# that typically run on a network node using an OpFlex agent to
# co-exist with nova-compute daemons for ESX, which host DVS
# agents.
agent_list = context.host_agents(AGENT_TYPE_DVS)
if self._agent_bind_port(context, agent_list, self._bind_dvs_port):
return
# It either wasn't a DVS binding, or there wasn't a DVS
# agent on the binding host (could be the case in a hybrid
# environment supporting KVM and ESX compute). Go try for
# OpFlex agents.
agent_list = context.host_agents(ofcst.AGENT_TYPE_OPFLEX_OVS)
self._agent_bind_port(context, agent_list, self._bind_opflex_port)
def _bind_dvs_port(self, context, segment, agent):
"""Populate VIF type and details for DVS VIFs.
For DVS VIFs, provide the portgroup along
with the security groups setting
"""
if self._check_segment_for_agent(segment, agent):
port = context.current
# We only handle details for ports that are PTs in PTGs
ptg, pt = self.apic_gbp._port_id_to_ptg(context._plugin_context,
port['id'])
if ptg is None:
LOG.warning(_LW("PTG for port %s does not exist"), port['id'])
return False
mapper = self.apic_gbp.name_mapper
ptg_name = mapper.policy_target_group(context, ptg)
network_id = port.get('network_id')
network = self.apic_gbp._get_network(context._plugin_context,
network_id)
project_name = self.apic_gbp._tenant_by_sharing_policy(network)
apic_tenant_name = self.apic_gbp.apic_manager.apic.fvTenant.name(
project_name)
profile = self.apic_gbp.apic_manager.app_profile_name
# Use default security groups from MD
vif_details = {portbindings.CAP_PORT_FILTER: False}
vif_details['dvs_port_group_name'] = (apic_tenant_name +
'|' + str(profile) +
'|' + str(ptg_name))
currentcopy = copy.copy(context.current)
currentcopy['portgroup_name'] = (
vif_details['dvs_port_group_name'])
booked_port_key = None
if self.dvs_notifier:
booked_port_key = self.dvs_notifier.bind_port_call(
currentcopy,
context.network.network_segments,
context.network.current,
context.host
)
if booked_port_key:
vif_details['dvs_port_key'] = booked_port_key
context.set_binding(segment[api.ID],
VIF_TYPE_DVS, vif_details,
n_constants.PORT_STATUS_ACTIVE)
return True
else:
return False
def _bind_opflex_port(self, context, segment, agent):
"""Populate VIF type and details for OpFlex VIFs.
For OpFlex VIFs, we just report the OVS VIF type,
along with security groups setting, which were
set when this mechanism driver was instantiated.
"""
if self._check_segment_for_agent(segment, agent):
context.set_binding(segment[api.ID],
portbindings.VIF_TYPE_OVS,
{portbindings.CAP_PORT_FILTER: False,
portbindings.OVS_HYBRID_PLUG: False})
return True
else:
return False
def _check_segment_for_agent(self, segment, agent):
"""Check support for OpFlex type segments.
The agent has the ability to limit the segments in OpFlex
networks by specifying the mappings in their config. If no
mapping is specifified, then all OpFlex segments are
supported.
"""
network_type = segment[api.NETWORK_TYPE]
if network_type == ofcst.TYPE_OPFLEX:
opflex_mappings = agent['configurations'].get('opflex_networks')
LOG.debug("Checking segment: %(segment)s "
"for physical network: %(mappings)s ",
{'segment': segment, 'mappings': opflex_mappings})
return (opflex_mappings is None or
segment[api.PHYSICAL_NETWORK] in opflex_mappings)
elif network_type == 'local':
return True
else:
return False
def initialize(self):
super(APICMechanismGBPDriver, self).initialize()
self._apic_gbp = None
@property
def apic_gbp(self):
if not self._apic_gbp:
self._apic_gbp = manager.NeutronManager.get_service_plugins()[
'GROUP_POLICY'].policy_driver_manager.policy_drivers[
'apic'].obj
return self._apic_gbp
@property
def dvs_notifier(self):
if not self._dvs_notifier:
try:
self._dvs_notifier = importutils.import_object(
DVS_AGENT_KLASS,
nctx.get_admin_context_without_session()
)
except ImportError:
self._dvs_notifier = None
return self._dvs_notifier
@property
def apic_allowed_vm_name_driver(self):
if self._apic_allowed_vm_name_driver is False:
return False
if not self._apic_allowed_vm_name_driver:
ext_drivers = (self.apic_gbp.gbp_plugin.extension_manager.
ordered_ext_drivers)
for driver in ext_drivers:
if 'apic_allowed_vm_name' == driver.name:
self._apic_allowed_vm_name_driver = driver.obj
break
if not self._apic_allowed_vm_name_driver:
self._apic_allowed_vm_name_driver = False
return self._apic_allowed_vm_name_driver
def create_port_postcommit(self, context):
self.apic_gbp.process_port_added(context)
def update_port_postcommit(self, context):
self.apic_gbp.process_port_changed(context)
port = context.current
if (port.get('binding:vif_details') and
port['binding:vif_details'].get('dvs_port_group_name')) and (
self.dvs_notifier):
self.dvs_notifier.update_postcommit_port_call(
context.current,
context.original,
context.network.network_segments[0],
context.host
)
def delete_port_precommit(self, context):
self.apic_gbp.process_pre_port_deleted(context)
def delete_port_postcommit(self, context):
self.apic_gbp.process_port_deleted(context)
port = context.current
if (port.get('binding:vif_details') and
port['binding:vif_details'].get('dvs_port_group_name')) and (
self.dvs_notifier):
self.dvs_notifier.delete_port_call(
context.current,
context.original,
context.network.network_segments[0],
context.host
)
def update_subnet_postcommit(self, context):
self.apic_gbp.process_subnet_changed(context._plugin_context,
context.original, context.current)
def create_subnet_postcommit(self, context):
if not context.current['name'].startswith(amap.APIC_OWNED):
self.apic_gbp.process_subnet_added(context._plugin_context,
context.current)
def delete_subnet_postcommit(self, context):
self.apic_gbp.process_subnet_deleted(context._plugin_context,
context.current)

View File

@ -1,78 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from gbpservice.neutron.services.grouppolicy.common import exceptions as gpexc
class InvalidApicName(gpexc.GroupPolicyBadRequest):
message = _("Resource has no valid APIC name.")
APIC_REFERENCE_PREFIX = 'apic:'
class ApicNameManager(object):
gbp_to_apic = {'l3_policy': 'context',
'l2_policy': 'bridge_domain',
'policy_target_group': 'endpoint_group',
'policy_rule_set': 'contract'}
def __init__(self, apic_manager):
self.name_mapper = apic_manager.apic_mapper
self.dn_manager = apic_manager.apic.dn_manager
def __getattr__(self, item):
if self.name_mapper.is_valid_name_type(item):
def get_name_wrapper(context, obj_id, prefix=''):
return self._get_name(item, context, obj_id, prefix=prefix)
return get_name_wrapper
raise AttributeError
def tenant(self, obj):
if self._is_apic_reference(obj):
parts = self._try_all_types(obj)
if parts:
return parts[0]
return self.name_mapper.tenant(None, obj['tenant_id'])
def has_valid_name(self, obj):
if self._is_apic_reference(obj):
if not self._try_all_types(obj):
raise InvalidApicName()
def _try_all_types(self, obj):
for possible in self.dn_manager.nice_to_rn:
parts = getattr(self.dn_manager, 'decompose_%s' % possible)(
self._extract_apic_reference(obj))
if parts:
return parts
def _get_name(self, obj_type, context, obj, prefix=''):
if self._is_apic_reference(obj) and obj_type in self.gbp_to_apic:
map_type = self.gbp_to_apic[obj_type]
parts = getattr(self.dn_manager, 'decompose_%s' % map_type)(
self._extract_apic_reference(obj))
result = self.name_mapper.pre_existing(context, parts[-1])
else:
obj_id = (obj['reuse_bd']
if obj_type == 'l2_policy' and obj.get('reuse_bd')
else obj['id'])
result = getattr(self.name_mapper, obj_type)(context, obj_id,
prefix=prefix)
return result
def _is_apic_reference(self, obj):
return obj['name'].startswith(APIC_REFERENCE_PREFIX)
def _extract_apic_reference(self, obj):
return obj['name'][len(APIC_REFERENCE_PREFIX):]

View File

@ -12,6 +12,7 @@
from oslo_log import helpers as log
from gbpservice.network.neutronv2 import local_api
from gbpservice.neutron.services.grouppolicy import (
group_policy_driver_api as api)
@ -20,7 +21,7 @@ class NoopDriver(api.PolicyDriver):
@log.log_method_call
def initialize(self):
pass
local_api.QUEUE_OUT_OF_PROCESS_NOTIFICATIONS = False
@log.log_method_call
def create_policy_target_precommit(self, context):

View File

@ -1,144 +0,0 @@
# Copyright (c) 2016 Cisco Systems Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron import context as n_ctx
from neutron.extensions import l3
from neutron import manager
from neutron_lib import constants as q_const
from apic_ml2.neutron.services.l3_router import apic_driver_api
class ApicGBPL3Driver(apic_driver_api.ApicL3DriverBase):
def __init__(self, plugin):
super(ApicGBPL3Driver, self).__init__()
self._plugin = plugin
self._apic_gbp = None
@property
def apic_gbp(self):
if not self._apic_gbp:
self._apic_gbp = manager.NeutronManager.get_service_plugins()[
'GROUP_POLICY'].policy_driver_manager.policy_drivers[
'apic'].obj
return self._apic_gbp
def _get_port_id_for_router_interface(self, context, router_id, subnet_id):
filters = {'device_id': [router_id],
'device_owner': [q_const.DEVICE_OWNER_ROUTER_INTF],
'fixed_ips': {'subnet_id': [subnet_id]}}
ports = self._plugin._core_plugin.get_ports(context.elevated(),
filters=filters)
return ports[0]['id']
def _update_router_gw_info(self, context, router_id, info, router=None):
super(ApicGBPL3Driver, self)._update_router_gw_info(
context, router_id, info, router)
if info and 'network_id' in info:
filters = {'device_id': [router_id],
'device_owner': [q_const.DEVICE_OWNER_ROUTER_GW],
'network_id': [info['network_id']]}
ports = self._plugin._core_plugin.get_ports(context.elevated(),
filters=filters)
self._plugin._core_plugin.update_port_status(
context, ports[0]['id'], q_const.PORT_STATUS_ACTIVE)
def add_router_interface_postcommit(self, context, router_id,
interface_info):
if 'subnet_id' in interface_info:
port_id = self._get_port_id_for_router_interface(
context, router_id, interface_info['subnet_id'])
else:
port_id = interface_info['port_id']
self._plugin._core_plugin.update_port_status(context,
port_id, q_const.PORT_STATUS_ACTIVE)
def remove_router_interface_precommit(self, context, router_id,
interface_info):
if 'subnet_id' in interface_info:
port_id = self._get_port_id_for_router_interface(
context, router_id, interface_info['subnet_id'])
else:
port_id = interface_info['port_id']
self._plugin._core_plugin.update_port_status(context,
port_id, q_const.PORT_STATUS_DOWN)
# Floating IP API
def create_floatingip_precommit(self, context, floatingip):
fip = floatingip['floatingip']
tenant_id = self._plugin._get_tenant_id_for_create(context, fip)
if self.apic_gbp:
context.nat_pool_list = []
for nat_pool in self.apic_gbp.nat_pool_iterator(context,
tenant_id, floatingip):
context.nat_pool_list.append(nat_pool)
def create_floatingip_postcommit(self, context, floatingip):
port_id = floatingip.get('floatingip', {}).get('port_id')
self._notify_port_update(port_id)
if getattr(context, 'result', None):
context.result['status'] = self._update_floatingip_status(
context, context.result['id'])
def update_floatingip_precommit(self, context, id, floatingip):
port_id = self._get_port_mapped_to_floatingip(context, id)
context.port_id_list = [port_id]
def update_floatingip_postcommit(self, context, id, floatingip):
port_id_list = getattr(context, 'port_id_list', [])
port_id_list.append(
floatingip.get('floatingip', {}).get('port_id'))
for p in port_id_list:
self._notify_port_update(p)
status = self._update_floatingip_status(context, id)
if getattr(context, 'result', None):
context.result['status'] = status
def delete_floatingip_precommit(self, context, id):
port_id_list = [self._get_port_mapped_to_floatingip(context, id)]
context.port_id_list = port_id_list
def delete_floatingip_postcommit(self, context, id):
self._notify_port_update(context.port_id_list[0])
def _notify_port_update(self, port_id):
context = n_ctx.get_admin_context()
if self.apic_gbp and port_id:
self.apic_gbp._notify_port_update(context, port_id)
ptg, _ = self.apic_gbp._port_id_to_ptg(context, port_id)
if ptg:
self.apic_gbp._notify_head_chain_ports(ptg['id'])
def _update_floatingip_status(self, context, fip_id):
status = q_const.FLOATINGIP_STATUS_DOWN
try:
fip = self._plugin.get_floatingip(context, fip_id)
if fip.get('port_id'):
status = q_const.FLOATINGIP_STATUS_ACTIVE
self._plugin.update_floatingip_status(context, fip_id, status)
except l3.FloatingIPNotFound:
pass
return status
def _get_port_mapped_to_floatingip(self, context, fip_id):
try:
fip = self._plugin.get_floatingip(context, fip_id)
return fip.get('port_id')
except l3.FloatingIPNotFound:
pass
return None

View File

@ -1,117 +0,0 @@
# Copyright (c) 2015 Cisco Systems Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db import common_db_mixin
from neutron.db import dns_db
from neutron.db import extraroute_db
from neutron.db import l3_gwmode_db
from neutron.plugins.common import constants
from neutron_lib import constants as q_const
from neutron_lib import exceptions as n_exc
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
from gbpservice._i18n import _LW
from gbpservice.neutron.services.l3_router import apic_driver
class ApicGBPL3ServicePlugin(common_db_mixin.CommonDbMixin,
extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
dns_db.DNSDbMixin):
supported_extension_aliases = ["router", "ext-gw-mode", "extraroute",
"dns-integration"]
def __init__(self):
super(ApicGBPL3ServicePlugin, self).__init__()
self._apic_driver = apic_driver.ApicGBPL3Driver(self)
def _update_router_gw_info(self, context, router_id, info, router=None):
super(ApicGBPL3ServicePlugin, self)._update_router_gw_info(
context, router_id, info, router)
if info and 'network_id' in info:
filters = {'device_id': [router_id],
'device_owner': [q_const.DEVICE_OWNER_ROUTER_GW],
'network_id': [info['network_id']]}
ports = self._core_plugin.get_ports(context.elevated(),
filters=filters)
self._core_plugin.update_port_status(
context, ports[0]['id'], q_const.PORT_STATUS_ACTIVE)
@staticmethod
def get_plugin_type():
return constants.L3_ROUTER_NAT
@staticmethod
def get_plugin_description():
"""Returns string description of the plugin."""
return _("L3 Router Service Plugin for basic L3 using the APIC")
def add_router_interface(self, context, router_id, interface_info):
port = super(ApicGBPL3ServicePlugin, self).add_router_interface(
context, router_id, interface_info)
self._apic_driver.add_router_interface_postcommit(
context, router_id, interface_info)
return port
def remove_router_interface(self, context, router_id, interface_info):
self._apic_driver.remove_router_interface_precommit(
context, router_id, interface_info)
super(ApicGBPL3ServicePlugin, self).remove_router_interface(
context, router_id, interface_info)
# Floating IP API
def create_floatingip(self, context, floatingip):
result = None
fip = floatingip['floatingip']
if not fip.get('subnet_id'):
self._apic_driver.create_floatingip_precommit(context, floatingip)
nat_pool_list = getattr(context, 'nat_pool_list', [])
for nat_pool in nat_pool_list:
if not nat_pool:
continue
fip['subnet_id'] = nat_pool['subnet_id']
try:
result = super(ApicGBPL3ServicePlugin,
self).create_floatingip(context, floatingip)
except n_exc.IpAddressGenerationFailure as ex:
LOG.warning(_LW("Floating allocation failed: %s"),
ex.message)
if result:
break
if not result:
result = super(ApicGBPL3ServicePlugin,
self).create_floatingip(context, floatingip)
context.result = result
self._apic_driver.create_floatingip_postcommit(context, floatingip)
return result
def update_floatingip(self, context, id, floatingip):
self._apic_driver.update_floatingip_precommit(context, id, floatingip)
result = super(ApicGBPL3ServicePlugin,
self).update_floatingip(context, id, floatingip)
context.result = result
self._apic_driver.update_floatingip_postcommit(context,
id, floatingip)
return result
def delete_floatingip(self, context, id):
self._apic_driver.delete_floatingip_precommit(context, id)
result = super(ApicGBPL3ServicePlugin,
self).delete_floatingip(context, id)
self._apic_driver.delete_floatingip_postcommit(context, id)
return result

View File

@ -51,8 +51,6 @@ from gbpservice.neutron.services.grouppolicy.common import utils
from gbpservice.neutron.services.grouppolicy import config
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
aim_mapping as aimd)
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
apic_mapping as amap)
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
apic_mapping_lib as alib)
from gbpservice.neutron.services.grouppolicy.drivers import nsp_manager
@ -128,7 +126,6 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
'port_security'],
'type_drivers': ['opflex', 'local', 'vlan'],
'tenant_network_types': ['opflex']}
amap.ApicMappingDriver.get_apic_manager = mock.Mock()
self._default_es_name = 'default'
super(AIMBaseTestCase, self).setUp(
policy_drivers=policy_drivers, core_plugin=core_plugin,
@ -1739,7 +1736,7 @@ class TestL2PolicyWithAutoPTG(TestL2PolicyBase):
ptg = self._gbp_plugin.get_policy_target_groups(
self._neutron_context)[0]
l2p_id = ptg['l2_policy_id']
auto_ptg_id = amap.AUTO_PTG_ID_PREFIX % hashlib.md5(l2p_id).hexdigest()
auto_ptg_id = aimd.AUTO_PTG_ID_PREFIX % hashlib.md5(l2p_id).hexdigest()
self.assertEqual(auto_ptg_id, ptg['id'])
self.assertEqual(aimd.AUTO_PTG_NAME_PREFIX % l2p_id, str(ptg['name']))
return ptg

View File

@ -19,6 +19,7 @@ import webob.exc
from gbpservice.neutron.db.grouppolicy import group_policy_mapping_db as gpmdb
from gbpservice.neutron.extensions import group_policy as gpolicy
from gbpservice.neutron.services.grouppolicy import config
from gbpservice.neutron.services.grouppolicy.drivers import dummy_driver
from gbpservice.neutron.services.grouppolicy import plugin as gplugin
from gbpservice.neutron.tests.unit.db.grouppolicy import (
@ -157,8 +158,16 @@ class GroupPolicyPluginTestBase(tgpmdb.GroupPolicyMappingDbTestCase):
class GroupPolicyPluginTestCase(GroupPolicyPluginTestBase):
# This is a place-holder for GBP plugin-specific common tests
pass
def tearDown(self):
# Always reset configuration to dummy driver. Any
# test which requires to configure a different
# policy driver would have done so in it's setup
# (and should have ideally reset it too).
config.cfg.CONF.set_override('policy_drivers',
['dummy'],
group='group_policy')
super(GroupPolicyPluginTestCase, self).tearDown()
class TestGroupPolicyPluginDrivers(GroupPolicyPluginTestBase):

View File

@ -1,166 +0,0 @@
# Copyright (c) 2016 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron import context
from neutron_lib import constants as q_const
import apicapi.apic_mapper # noqa
from gbpservice.neutron.services.l3_router import l3_apic
from gbpservice.neutron.tests.unit.services.grouppolicy import (
test_apic_mapping)
TENANT = 'tenant1'
ROUTER = 'router1'
SUBNET = 'subnet1'
NETWORK = 'network1'
PORT = 'port1'
NETWORK_NAME = 'one_network'
FLOATINGIP = 'fip1'
# TODO(tbachman): create better test class hierarchy to inherit
class TestCiscoApicGBPL3Driver(test_apic_mapping.ApicMappingTestCase):
def setUp(self):
super(TestCiscoApicGBPL3Driver, self).setUp()
# Some actual dicts to return
self.subnet = {'network_id': NETWORK, 'tenant_id': TENANT}
self.port = {'tenant_id': TENANT,
'network_id': NETWORK,
'fixed_ips': [{'subnet_id': SUBNET}],
'id': PORT}
self.interface_info = {'subnet': {'subnet_id': SUBNET},
'port': {'port_id': self.port['id']}}
self.floatingip = {'id': FLOATINGIP,
'floating_network_id': NETWORK_NAME,
'port_id': PORT}
self.context = context.get_admin_context()
self.context.tenant_id = TENANT
# Create our plugin, but mock some superclass and
# core plugin methods
self.plugin = l3_apic.ApicGBPL3ServicePlugin()
self.plugin._apic_driver._notify_port_update = mock.Mock()
self.plugin._apic_driver._apic_gbp = mock.Mock()
self.plugin._core_plugin.get_ports = mock.Mock(
return_value=[self.port])
self.plugin._core_plugin.get_port = mock.Mock(return_value=self.port)
self.plugin._core_plugin.get_subnet = mock.Mock(
return_value = self.subnet)
self.plugin._core_plugin.update_port_status = mock.Mock()
# Floating IP updates to agents are mocked
self.plugin.update_floatingip_status = mock.Mock()
self.plugin.get_floatingip = mock.Mock(return_value=self.floatingip)
def _check_call_list(self, expected, observed):
for call in expected:
self.assertTrue(call in observed,
msg='Call not found, expected:\n%s\nobserved:'
'\n%s' % (str(call), str(observed)))
observed.remove(call)
self.assertFalse(
len(observed),
msg='There are more calls than expected: %s' % str(observed))
def _test_add_router_interface_postcommit(self, interface_info):
apic_driver = self.plugin._apic_driver
apic_driver.add_router_interface_postcommit(self.context,
ROUTER, interface_info)
test_assert = self.plugin._core_plugin.update_port_status
test_assert.assert_called_once_with(self.context,
self.port['id'], q_const.PORT_STATUS_ACTIVE)
def test_add_router_interface_postcommit_subnet(self):
self._test_add_router_interface_postcommit(
self.interface_info['subnet'])
def test_add_router_interface_postcommit_port(self):
self._test_add_router_interface_postcommit(self.interface_info['port'])
def _test_remove_router_interface_precommit(self, interface_info):
plugin = self.plugin._core_plugin
apic_driver = self.plugin._apic_driver
apic_driver.remove_router_interface_precommit(self.context, ROUTER,
interface_info)
plugin.update_port_status.assert_called_once_with(
self.context, mock.ANY, q_const.PORT_STATUS_DOWN)
def test_remove_router_interface_precommit_subnet(self):
self._test_remove_router_interface_precommit(
self.interface_info['subnet'])
def test_remove_router_interface_precommit_port(self):
self._test_remove_router_interface_precommit(
self.interface_info['port'])
def _dummy_generator(self, context, tenant_id, floatingip):
self._dummy_list = [0, 1, 2, 3]
for item in self._dummy_list:
yield item
def test_create_floatingip_precommit(self):
fip = {'floatingip': self.floatingip,
'id': FLOATINGIP, 'port_id': PORT}
apic_driver = self.plugin._apic_driver
apic_gbp = apic_driver.apic_gbp
apic_gbp.nat_pool_iterator = self._dummy_generator
apic_driver.create_floatingip_precommit(self.context, fip)
for nat_pool in self.context.nat_pool_list:
self.assertTrue(nat_pool in self._dummy_list)
def test_create_floatingip_postcommit(self):
fip = {'floatingip': self.floatingip,
'id': FLOATINGIP, 'port_id': PORT}
apic_driver = self.plugin._apic_driver
self.context.result = fip
apic_driver.create_floatingip_postcommit(self.context, fip)
apic_driver._notify_port_update.assert_called_once_with(PORT)
apic_driver._plugin.update_floatingip_status.assert_called_once_with(
mock.ANY, FLOATINGIP, q_const.FLOATINGIP_STATUS_ACTIVE)
self.assertEqual(q_const.FLOATINGIP_STATUS_ACTIVE, fip['status'])
def test_update_floatingip_precommit(self):
fip = {'floatingip': self.floatingip}
apic_driver = self.plugin._apic_driver
apic_driver.update_floatingip_precommit(self.context, FLOATINGIP, fip)
self.assertEqual(PORT, self.context.port_id_list[0])
def test_update_floatingip_postcommit(self):
fip = {'floatingip': self.floatingip,
'id': FLOATINGIP, 'port_id': PORT}
self.context.port_id_list = []
apic_driver = self.plugin._apic_driver
apic_driver.update_floatingip_postcommit(self.context, FLOATINGIP, fip)
self.assertEqual(self.port['id'], self.context.port_id_list[0])
apic_driver._notify_port_update.assert_called_once_with(PORT)
apic_driver._plugin.update_floatingip_status.assert_called_once_with(
mock.ANY, FLOATINGIP, q_const.FLOATINGIP_STATUS_ACTIVE)
def test_delete_floatingip_precommit(self):
apic_driver = self.plugin._apic_driver
apic_driver.delete_floatingip_precommit(self.context, FLOATINGIP)
self.assertEqual(PORT, self.context.port_id_list[0])
def test_delete_floatingip_postcommit(self):
self.context.port_id_list = [PORT]
apic_driver = self.plugin._apic_driver
apic_driver.delete_floatingip_postcommit(self.context, FLOATINGIP)
apic_driver._notify_port_update.assert_called_once_with(PORT)

View File

@ -1,208 +0,0 @@
# Copyright (c) 2016 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron import context
from neutron_lib import constants as q_const
from neutron_lib import exceptions as n_exc
from gbpservice.neutron.services.l3_router import l3_apic
from gbpservice.neutron.tests.unit.services.grouppolicy import (
test_apic_mapping)
TENANT = 'tenant1'
ROUTER = 'router1'
SUBNET = 'subnet1'
NETWORK = 'network1'
PORT = 'port1'
NETWORK_NAME = 'one_network'
TEST_SEGMENT1 = 'test-segment1'
FLOATINGIP = 'fip1'
# TODO(tbachman): create better test class hierarchy to inherit
class TestCiscoApicL3Plugin(test_apic_mapping.ApicMappingTestCase):
'''Test class for the Cisco APIC L3 Plugin
This is a set of tests specific to the Cisco APIC
L3 plugin. It currently derives from the ApicMappingTestCase
class so that it can inherit test infrastructure from those classes.
'''
def setUp(self):
super(TestCiscoApicL3Plugin, self).setUp()
# Some actual dicts to return
self.subnet = {'network_id': NETWORK, 'tenant_id': TENANT}
self.port = {'tenant_id': TENANT,
'network_id': NETWORK,
'fixed_ips': [{'subnet_id': SUBNET}],
'id': 'port_id'}
self.interface_info = {'subnet': {'subnet_id': SUBNET},
'port': {'port_id': self.port['id']}}
self.floatingip = {'id': FLOATINGIP,
'floating_network_id': NETWORK_NAME,
'port_id': PORT}
self.context = context.get_admin_context()
self.context.tenant_id = TENANT
# Create our plugin, but mock some superclass and
# core plugin methods
self.plugin = l3_apic.ApicGBPL3ServicePlugin()
self.plugin._apic_driver._notify_port_update = mock.Mock()
self.plugin._core_plugin.get_ports = mock.Mock(
return_value=[self.port])
self.plugin._core_plugin.get_port = mock.Mock(return_value=self.port)
self.plugin._core_plugin.get_subnet = mock.Mock(
return_value = self.subnet)
self.plugin._core_plugin.update_port_status = mock.Mock()
# Floating IP updates to agents are mocked
self.plugin.update_floatingip_status = mock.Mock()
self.plugin.get_floatingip = mock.Mock(return_value=self.floatingip)
def _check_call_list(self, expected, observed):
for call in expected:
self.assertTrue(call in observed,
msg='Call not found, expected:\n%s\nobserved:'
'\n%s' % (str(call), str(observed)))
observed.remove(call)
self.assertFalse(
len(observed),
msg='There are more calls than expected: %s' % str(observed))
def _test_add_router_interface(self, interface_info):
with mock.patch('neutron.db.l3_db.'
'L3_NAT_db_mixin.add_router_interface') as if_mock:
if_mock.return_value = self.port
port = self.plugin.add_router_interface(self.context,
ROUTER, interface_info)
self.assertEqual(port, self.port)
test_assert = self.plugin._core_plugin.update_port_status
test_assert.assert_called_once_with(self.context,
self.port['id'], q_const.PORT_STATUS_ACTIVE)
def _test_remove_router_interface(self, interface_info):
with mock.patch('neutron.db.l3_db.'
'L3_NAT_db_mixin.remove_router_interface') as if_mock:
self.plugin.remove_router_interface(self.context, ROUTER,
interface_info)
self.assertEqual(1, if_mock.call_count)
def test_add_router_interface_subnet(self):
self._test_add_router_interface(self.interface_info['subnet'])
def test_add_router_interface_port(self):
self._test_add_router_interface(self.interface_info['port'])
def test_remove_router_interface_subnet(self):
self._test_remove_router_interface(self.interface_info['subnet'])
def test_remove_router_interface_port(self):
self._test_remove_router_interface(self.interface_info['port'])
def test_create_router_gateway_fails(self):
# Force _update_router_gw_info failure
with mock.patch('neutron.db.l3_db.L3_NAT_dbonly_mixin.'
'_update_router_gw_info',
side_effect=n_exc.NeutronException):
data = {'router': {'tenant_id': 'foo',
'name': 'router1', 'admin_state_up': True,
'external_gateway_info': {'network_id': 'some_uuid'}}}
# Verify router doesn't persist on failure
self.assertRaises(n_exc.NeutronException,
self.plugin.create_router, self.context, data)
routers = self.plugin.get_routers(self.context)
self.assertEqual(0, len(routers))
def test_floatingip_port_notify_on_create(self):
with mock.patch('neutron.db.l3_db.L3_NAT_db_mixin.'
'create_floatingip',
new=mock.Mock(return_value=self.floatingip)):
# create floating-ip with mapped port
plugin = self.plugin
plugin.create_floatingip(self.context,
{'floatingip': self.floatingip})
plugin._apic_driver._notify_port_update.assert_called_once_with(
PORT)
def test_floatingip_port_notify_on_reassociate(self):
with mock.patch('neutron.db.l3_db.L3_NAT_db_mixin.'
'update_floatingip',
new=mock.Mock(return_value=self.floatingip)):
# associate with different port
new_fip = {'port_id': 'port-another'}
self.plugin.update_floatingip(self.context, FLOATINGIP,
{'floatingip': new_fip})
self._check_call_list(
[mock.call(PORT),
mock.call('port-another')],
self.plugin._apic_driver._notify_port_update.call_args_list)
def test_floatingip_port_notify_on_disassociate(self):
with mock.patch('neutron.db.l3_db.L3_NAT_db_mixin.'
'update_floatingip',
new=mock.Mock(return_value=self.floatingip)):
# dissociate mapped port
plugin = self.plugin
plugin.update_floatingip(self.context, FLOATINGIP,
{'floatingip': {}})
plugin._apic_driver._notify_port_update.assert_any_call(
PORT)
def test_floatingip_port_notify_on_delete(self):
with mock.patch('neutron.db.l3_db.L3_NAT_db_mixin.delete_floatingip'):
# delete
plugin = self.plugin
plugin.delete_floatingip(self.context, FLOATINGIP)
plugin._apic_driver._notify_port_update.assert_called_once_with(
PORT)
def test_floatingip_status(self):
with mock.patch('neutron.db.l3_db.L3_NAT_db_mixin.'
'create_floatingip',
new=mock.Mock(return_value=self.floatingip)):
# create floating-ip with mapped port
fip = self.plugin.create_floatingip(self.context,
{'floatingip': self.floatingip})
self.plugin.update_floatingip_status.assert_called_once_with(
mock.ANY, FLOATINGIP, q_const.FLOATINGIP_STATUS_ACTIVE)
self.assertEqual(q_const.FLOATINGIP_STATUS_ACTIVE, fip['status'])
# dissociate mapped-port
with mock.patch('neutron.db.l3_db.L3_NAT_db_mixin.'
'update_floatingip',
new=mock.Mock(return_value=self.floatingip)):
self.plugin.update_floatingip_status.reset_mock()
self.floatingip.pop('port_id')
fip = self.plugin.update_floatingip(self.context, FLOATINGIP,
{'floatingip': self.floatingip})
self.plugin.update_floatingip_status.assert_called_once_with(
mock.ANY, FLOATINGIP, q_const.FLOATINGIP_STATUS_DOWN)
self.assertEqual(q_const.FLOATINGIP_STATUS_DOWN, fip['status'])
# re-associate mapped-port
with mock.patch('neutron.db.l3_db.L3_NAT_db_mixin.'
'update_floatingip',
new=mock.Mock(return_value=self.floatingip)):
self.plugin.update_floatingip_status.reset_mock()
self.floatingip['port_id'] = PORT
fip = self.plugin.update_floatingip(self.context, FLOATINGIP,
{'floatingip': self.floatingip})
self.plugin.update_floatingip_status.assert_called_once_with(
mock.ANY, FLOATINGIP, q_const.FLOATINGIP_STATUS_ACTIVE)
self.assertEqual(q_const.FLOATINGIP_STATUS_ACTIVE, fip['status'])

View File

@ -1,71 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.common import config # noqa
from gbpservice.neutron.services.servicechain.plugins.ncp.node_plumbers import(
admin_owned_resources_apic_tscp as admin_tscp)
from gbpservice.neutron.tests.unit.services.servicechain.ncp import (
test_tscp_apic_mapping as test_tscp_apic_mapping)
class AdminOwnedResourcesTscpTestCase(
test_tscp_apic_mapping.ApicMappingStitchingPlumberGBPTestCase):
def setUp(self):
user = 'user'
password = 'password'
tenant_name = 'tenant_name',
uri = 'http://127.0.0.1:35357/v2.0/'
try:
config.cfg.CONF.keystone_authtoken.username
except config.cfg.NoSuchOptError:
config.cfg.CONF.register_opt(
config.cfg.StrOpt('username', default=user),
'keystone_authtoken')
else:
config.cfg.CONF.set_override('username', user,
group='keystone_authtoken')
try:
config.cfg.CONF.keystone_authtoken.password
except config.cfg.NoSuchOptError:
config.cfg.CONF.register_opt(
config.cfg.StrOpt('password', default=password),
'keystone_authtoken')
else:
config.cfg.CONF.set_override('password', password,
group='keystone_authtoken')
try:
config.cfg.CONF.keystone_authtoken.project_name
except config.cfg.NoSuchOptError:
config.cfg.CONF.register_opt(
config.cfg.StrOpt('project_name', default=tenant_name),
'keystone_authtoken')
else:
config.cfg.CONF.set_override('project_name', tenant_name,
group='keystone_authtoken')
config.cfg.CONF.set_override('auth_uri', uri,
group='keystone_authtoken')
super(AdminOwnedResourcesTscpTestCase, self).setUp(
plumber='admin_owned_resources_apic_plumber')
admin_tscp.keyclient = mock.Mock()
res = mock.patch('gbpservice.neutron.services.servicechain.plugins.'
'ncp.node_plumbers.admin_owned_resources_apic_tscp.'
'AdminOwnedResourcesApicTSCP.'
'_get_resource_owner_tenant_id').start()
res.return_value = "1234"
class TestApicChains(AdminOwnedResourcesTscpTestCase,
test_tscp_apic_mapping.TestApicChains):
pass