VPNaaS integration with services flavor framework

* adds new attribute 'flavor_id' for vpnservice
* adds support for multiple drivers for VPNaaS

This solution uses a flavor of service type VPN associated with a flavor
profile containing a driver. During vpnservice creation, if a flavor_id
is passed, it is used to find the provider for the driver which would
handle the newly created vpnservice.  The flavor_id and the
provider-vpnservice association are pesisted in the DB.

ApiImpact and DocImpact tags are added as the new optional flavor_id
parameter should be described, as well as the support of multiple VPNaaS
drivers.

The original work and discussion about integrating VPNaaS and service
type framework can be found in the following change:
I9e195dfaee21b1cf204cb9b9fc773bc9e5af5936

ApiImpact
DocImpact
Closes-Bug: #1672920

Signed-off-by: Hunt Xu <mhuntxu@gmail.com>

Change-Id: I0095e160481f1d4572e38ad1d3bbc8e183039b84
This commit is contained in:
Hunt Xu 2017-04-12 19:20:43 +08:00
parent 9ac61004ac
commit 058469e1b9
20 changed files with 670 additions and 184 deletions

View File

@ -1 +1 @@
38893903cbde
95601446dbcc

View File

@ -0,0 +1,37 @@
# Copyright 2017 Eayun, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""add flavor id to vpnservices
Revision ID: 95601446dbcc
Revises: 38893903cbde
Create Date: 2017-04-10 10:14:41.724811
"""
# revision identifiers, used by Alembic.
revision = '95601446dbcc'
down_revision = '38893903cbde'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('vpnservices',
sa.Column('flavor_id', sa.String(length=36), nullable=True))
op.create_foreign_key('fk_vpnservices_flavors_id',
'vpnservices', 'flavors',
['flavor_id'], ['id'])

View File

@ -15,7 +15,6 @@
# under the License.
from neutron.db import common_db_mixin as base_db
from neutron.db.models import l3agent
from neutron.db import models_v2
from neutron.plugins.common import utils
from neutron_lib.callbacks import events
@ -464,6 +463,7 @@ class VPNPluginDb(vpnaas.VPNPluginBase,
'tenant_id': vpnservice['tenant_id'],
'subnet_id': vpnservice['subnet_id'],
'router_id': vpnservice['router_id'],
'flavor_id': vpnservice['flavor_id'],
'admin_state_up': vpnservice['admin_state_up'],
'external_v4_ip': vpnservice['external_v4_ip'],
'external_v6_ip': vpnservice['external_v6_ip'],
@ -472,6 +472,7 @@ class VPNPluginDb(vpnaas.VPNPluginBase,
def create_vpnservice(self, context, vpnservice):
vpns = vpnservice['vpnservice']
flavor_id = vpns.get('flavor_id', None)
validator = self._get_validator()
with context.session.begin(subtransactions=True):
validator.validate_vpnservice(context, vpns)
@ -482,6 +483,7 @@ class VPNPluginDb(vpnaas.VPNPluginBase,
description=vpns['description'],
subnet_id=vpns['subnet_id'],
router_id=vpns['router_id'],
flavor_id=flavor_id,
admin_state_up=vpns['admin_state_up'],
status=lib_constants.PENDING_CREATE)
context.session.add(vpnservice_db)
@ -638,28 +640,6 @@ class VPNPluginDb(vpnaas.VPNPluginBase,
class VPNPluginRpcDbMixin(object):
def _get_agent_hosting_vpn_services(self, context, host):
plugin = directory.get_plugin()
agent = plugin._get_agent_by_type_and_host(
context, lib_constants.AGENT_TYPE_L3, host)
agent_conf = plugin.get_configuration_dict(agent)
# Retrieve the agent_mode to check if this is the
# right agent to deploy the vpn service. In the
# case of distributed the vpn service should reside
# only on a dvr_snat node.
agent_mode = agent_conf.get('agent_mode', 'legacy')
if not agent.admin_state_up or agent_mode == 'dvr':
return []
query = context.session.query(vpn_models.VPNService)
query = query.join(vpn_models.IPsecSiteConnection)
query = query.join(l3agent.RouterL3AgentBinding,
l3agent.RouterL3AgentBinding.router_id ==
vpn_models.VPNService.router_id)
query = query.filter(
l3agent.RouterL3AgentBinding.l3_agent_id == agent.id)
return query
def _build_local_subnet_cidr_map(self, context):
"""Build a dict of all local endpoint subnets, with list of CIDRs."""
query = context.session.query(models_v2.Subnet.id,

View File

@ -154,6 +154,8 @@ class VPNService(model_base.BASEV2, model_base.HasId, model_base.HasProject):
IPsecSiteConnection,
backref='vpnservice',
cascade="all, delete-orphan")
flavor_id = sa.Column(sa.String(36), sa.ForeignKey(
'flavors.id', name='fk_vpnservices_flavors_id'))
class VPNEndpoint(model_base.BASEV2):

View File

@ -29,7 +29,11 @@ from neutron_vpnaas.services.vpn.common import constants
class VpnReferenceValidator(object):
"""Baseline validation routines for VPN resources."""
"""
Baseline validation routines for VPN resources.
The validations here should be common to all VPN service providers and
only raise exceptions from neutron_vpnaas.extensions.vpnaas.
"""
IP_MIN_MTU = {4: 68, 6: 1280}

View File

@ -0,0 +1,68 @@
# Copyright 2017 Eayun, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron_lib.api import extensions
from neutron_lib import exceptions as nexception
from neutron_vpnaas._i18n import _
class FlavorsPluginNotLoaded(nexception.NotFound):
message = _("Flavors plugin not found")
class NoProviderFoundForFlavor(nexception.NotFound):
message = _("No service provider found for flavor %(flavor_id)s")
EXTENDED_ATTRIBUTES_2_0 = {
'vpnservices': {
'flavor_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:uuid_or_none': None},
'is_visible': True, 'default': None}
}
}
class Vpn_flavors(extensions.ExtensionDescriptor):
"""Extension class supporting flavors for vpnservices."""
@classmethod
def get_name(cls):
return "VPN Service Flavor Extension"
@classmethod
def get_alias(cls):
return 'vpn-flavors'
@classmethod
def get_description(cls):
return "Flavor support for vpnservices."
@classmethod
def get_updated(cls):
return "2017-04-19T00:00:00-00:00"
def get_extended_resources(self, version):
if version == "2.0":
return EXTENDED_ATTRIBUTES_2_0
else:
return {}
def get_required_extensions(self):
return ["vpnaas"]
def get_optional_extensions(self):
return ["flavors"]

View File

@ -15,13 +15,17 @@
# under the License.
from neutron.db import servicetype_db as st_db
from neutron.extensions import flavors
from neutron.services.flavors import flavors_plugin
from neutron.services import provider_configuration as pconf
from neutron.services import service_base
from neutron_lib import context as ncontext
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from oslo_log import log as logging
from neutron_vpnaas.db.vpn import vpn_db
from neutron_vpnaas.extensions import vpn_flavors
LOG = logging.getLogger(__name__)
@ -42,7 +46,8 @@ class VPNPlugin(vpn_db.VPNPluginDb):
"""
supported_extension_aliases = ["vpnaas",
"vpn-endpoint-groups",
"service-type"]
"service-type",
"vpn-flavors"]
path_prefix = "/vpn"
@ -54,30 +59,110 @@ class VPNDriverPlugin(VPNPlugin, vpn_db.VPNPluginRpcDbMixin):
self.service_type_manager = st_db.ServiceTypeManager.get_instance()
add_provider_configuration(self.service_type_manager, constants.VPN)
# Load the service driver from neutron.conf.
drivers, default_provider = service_base.load_drivers(
self.drivers, self.default_provider = service_base.load_drivers(
constants.VPN, self)
LOG.info("VPN plugin using service driver: %s", default_provider)
self.ipsec_driver = drivers[default_provider]
self._check_orphan_vpnservice_associations()
# Associate driver names to driver objects
for driver_name, driver in self.drivers.items():
driver.name = driver_name
LOG.info(("VPN plugin using service drivers: %(service_drivers)s, "
"default: %(default_driver)s"),
{'service_drivers': self.drivers.keys(),
'default_driver': self.default_provider})
# Try to find the flavor plugin only once
self._flavors_plugin = directory.get_plugin(constants.FLAVORS)
vpn_db.subscribe()
def _get_driver_for_vpnservice(self, vpnservice):
return self.ipsec_driver
def _check_orphan_vpnservice_associations(self):
context = ncontext.get_admin_context()
vpnservices = self.get_vpnservices(context)
vpnservice_ids = [vpnservice['id'] for vpnservice in vpnservices]
stm = self.service_type_manager
provider_names = stm.get_provider_names_by_resource_ids(
context, vpnservice_ids)
lost_providers = set()
lost_vpnservices = []
for vpnservice_id, provider in provider_names.items():
if provider not in self.drivers:
lost_providers.add(provider)
lost_vpnservices.append(vpnservice_id)
if lost_providers or lost_vpnservices:
# Provider are kept internally, we need to inform users about
# the related VPN services.
msg = (
"Delete associated vpnservices %(vpnservices)s before "
"removing providers %(providers)s."
) % {'vpnservices': lost_vpnservices,
'providers': list(lost_providers)}
LOG.exception(msg)
raise SystemExit(msg)
# Deal with upgrade. Associate existing VPN services to default
# provider.
unasso_vpnservices = [
vpnservice_id for vpnservice_id in vpnservice_ids
if vpnservice_id not in provider_names]
if unasso_vpnservices:
LOG.info(
("Associating VPN services %(unasso_vpnservices)s to "
"default provider %(default_provider)s."),
{'unasso_vpnservices': unasso_vpnservices,
'default_provider': self.default_provider})
for vpnservice_id in unasso_vpnservices:
stm.add_resource_association(
context, constants.VPN,
self.default_provider, vpnservice_id)
def _get_provider_for_flavor(self, context, flavor_id):
if flavor_id:
if not self._flavors_plugin:
raise vpn_flavors.FlavorsPluginNotLoaded()
fl_db = flavors_plugin.FlavorsPlugin.get_flavor(
self._flavors_plugin, context, flavor_id)
if fl_db['service_type'] != constants.VPN:
raise flavors.InvalidFlavorServiceType(
service_type=fl_db['service_type'])
if not fl_db['enabled']:
raise flavors.FlavorDisabled()
providers = flavors_plugin.FlavorsPlugin.get_flavor_next_provider(
self._flavors_plugin, context, fl_db['id'])
provider = providers[0].get('provider')
if provider not in self.drivers:
raise vpn_flavors.NoProviderFoundForFlavor(flavor_id=flavor_id)
else:
# Use default provider
provider = self.default_provider
LOG.debug("Selected provider %s" % provider)
return provider
def _get_driver_for_vpnservice(self, context, vpnservice):
stm = self.service_type_manager
provider_names = stm.get_provider_names_by_resource_ids(
context, [vpnservice['id']])
provider = provider_names.get(vpnservice['id'])
return self.drivers[provider]
def _get_driver_for_ipsec_site_connection(self, context,
ipsec_site_connection):
#TODO(nati) get vpnservice when we support service type framework
vpnservice = None
return self._get_driver_for_vpnservice(vpnservice)
def _get_validator(self):
return self.ipsec_driver.validator
# Only vpnservice_id is required as the vpnservice should be already
# associated with a provider after its creation.
vpnservice = {'id': ipsec_site_connection['vpnservice_id']}
return self._get_driver_for_vpnservice(context, vpnservice)
def create_ipsec_site_connection(self, context, ipsec_site_connection):
driver = self._get_driver_for_ipsec_site_connection(
context, ipsec_site_connection['ipsec_site_connection'])
driver.validator.validate_ipsec_site_connection(
context,
ipsec_site_connection['ipsec_site_connection'])
ipsec_site_connection = super(
VPNDriverPlugin, self).create_ipsec_site_connection(
context, ipsec_site_connection)
driver = self._get_driver_for_ipsec_site_connection(
context, ipsec_site_connection)
driver.create_ipsec_site_connection(context, ipsec_site_connection)
return ipsec_site_connection
@ -95,21 +180,28 @@ class VPNDriverPlugin(VPNPlugin, vpn_db.VPNPluginRpcDbMixin):
ipsec_conn_id, ipsec_site_connection):
old_ipsec_site_connection = self.get_ipsec_site_connection(
context, ipsec_conn_id)
driver = self._get_driver_for_ipsec_site_connection(
context, old_ipsec_site_connection)
driver.validator.validate_ipsec_site_connection(
context,
ipsec_site_connection['ipsec_site_connection'])
ipsec_site_connection = super(
VPNDriverPlugin, self).update_ipsec_site_connection(
context,
ipsec_conn_id,
ipsec_site_connection)
driver = self._get_driver_for_ipsec_site_connection(
context, ipsec_site_connection)
driver.update_ipsec_site_connection(
context, old_ipsec_site_connection, ipsec_site_connection)
return ipsec_site_connection
def create_vpnservice(self, context, vpnservice):
provider = self._get_provider_for_flavor(
context, vpnservice['vpnservice'].get('flavor_id'))
vpnservice = super(
VPNDriverPlugin, self).create_vpnservice(context, vpnservice)
driver = self._get_driver_for_vpnservice(vpnservice)
self.service_type_manager.add_resource_association(
context, constants.VPN, provider, vpnservice['id'])
driver = self.drivers[provider]
driver.create_vpnservice(context, vpnservice)
return vpnservice
@ -118,12 +210,14 @@ class VPNDriverPlugin(VPNPlugin, vpn_db.VPNPluginRpcDbMixin):
new_vpn_service = super(
VPNDriverPlugin, self).update_vpnservice(context, vpnservice_id,
vpnservice)
driver = self._get_driver_for_vpnservice(old_vpn_service)
driver = self._get_driver_for_vpnservice(context, old_vpn_service)
driver.update_vpnservice(context, old_vpn_service, new_vpn_service)
return new_vpn_service
def delete_vpnservice(self, context, vpnservice_id):
vpnservice = self._get_vpnservice(context, vpnservice_id)
super(VPNDriverPlugin, self).delete_vpnservice(context, vpnservice_id)
driver = self._get_driver_for_vpnservice(vpnservice)
driver = self._get_driver_for_vpnservice(context, vpnservice)
self.service_type_manager.del_resource_associations(
context, [vpnservice_id])
driver.delete_vpnservice(context, vpnservice)

View File

@ -22,7 +22,7 @@ from oslo_log import log as logging
import oslo_messaging
import six
from neutron_vpnaas.db.vpn import vpn_validator
from neutron_vpnaas.services.vpn.service_drivers import driver_validator
LOG = logging.getLogger(__name__)
@ -33,8 +33,9 @@ class VpnDriver(object):
def __init__(self, service_plugin, validator=None):
self.service_plugin = service_plugin
if validator is None:
validator = vpn_validator.VpnReferenceValidator()
validator = driver_validator.VpnDriverValidator(self)
self.validator = validator
self.name = ''
@property
def l3_plugin(self):

View File

@ -18,6 +18,12 @@ import netaddr
import oslo_messaging
import six
from neutron.db.models import l3agent
from neutron.db.models import servicetype
from neutron_lib import constants as lib_constants
from neutron_lib.plugins import directory
from neutron_vpnaas.db.vpn import vpn_models
from neutron_vpnaas.services.vpn import service_drivers
@ -37,11 +43,39 @@ class IPsecVpnDriverCallBack(object):
super(IPsecVpnDriverCallBack, self).__init__()
self.driver = driver
def _get_agent_hosting_vpn_services(self, context, host):
plugin = directory.get_plugin()
agent = plugin._get_agent_by_type_and_host(
context, lib_constants.AGENT_TYPE_L3, host)
agent_conf = plugin.get_configuration_dict(agent)
# Retrieve the agent_mode to check if this is the
# right agent to deploy the vpn service. In the
# case of distributed the vpn service should reside
# only on a dvr_snat node.
agent_mode = agent_conf.get('agent_mode', 'legacy')
if not agent.admin_state_up or agent_mode == 'dvr':
return []
query = context.session.query(vpn_models.VPNService)
query = query.join(vpn_models.IPsecSiteConnection)
query = query.join(l3agent.RouterL3AgentBinding,
l3agent.RouterL3AgentBinding.router_id ==
vpn_models.VPNService.router_id)
query = query.join(
servicetype.ProviderResourceAssociation,
servicetype.ProviderResourceAssociation.resource_id ==
vpn_models.VPNService.id)
query = query.filter(
l3agent.RouterL3AgentBinding.l3_agent_id == agent.id)
query = query.filter(
servicetype.ProviderResourceAssociation.provider_name ==
self.driver.name)
return query
def get_vpn_services_on_host(self, context, host=None):
"""Returns the vpnservices on the host."""
plugin = self.driver.service_plugin
vpnservices = plugin._get_agent_hosting_vpn_services(
vpnservices = self._get_agent_hosting_vpn_services(
context, host)
plugin = self.driver.service_plugin
local_cidr_map = plugin._build_local_subnet_cidr_map(context)
return [self.driver.make_vpnservice_dict(vpnservice, local_cidr_map)
for vpnservice in vpnservices]

View File

@ -16,6 +16,8 @@ from neutron.common import rpc as n_rpc
from oslo_log import log as logging
import oslo_messaging
from neutron.db.models import servicetype
from neutron_vpnaas.db.vpn import vpn_models
from neutron_vpnaas.services.vpn.common import topics
from neutron_vpnaas.services.vpn import service_drivers
@ -56,11 +58,18 @@ class CiscoCsrIPsecVpnDriverCallBack(object):
def get_vpn_services_using(self, context, router_id):
query = context.session.query(vpn_models.VPNService)
query = query.join(
servicetype.ProviderResourceAssociation,
servicetype.ProviderResourceAssociation.resource_id ==
vpn_models.VPNService.id)
query = query.join(vpn_models.IPsecSiteConnection)
query = query.join(vpn_models.IKEPolicy)
query = query.join(vpn_models.IPsecPolicy)
query = query.join(vpn_models.IPsecPeerCidr)
query = query.filter(vpn_models.VPNService.router_id == router_id)
query = query.filter(
servicetype.ProviderResourceAssociation.provider_name ==
self.driver.name)
return query.all()
def get_vpn_services_on_host(self, context, host=None):
@ -122,7 +131,7 @@ class CiscoCsrIPsecVPNDriver(base_ipsec.BaseIPsecVPNDriver):
def __init__(self, service_plugin):
super(CiscoCsrIPsecVPNDriver, self).__init__(
service_plugin,
cisco_validator.CiscoCsrVpnValidator(service_plugin))
cisco_validator.CiscoCsrVpnValidator(self))
def create_rpc_conn(self):
self.endpoints = [CiscoCsrIPsecVpnDriverCallBack(self)]

View File

@ -18,7 +18,7 @@ from neutron_lib import exceptions as nexception
from oslo_log import log as logging
from neutron_vpnaas._i18n import _
from neutron_vpnaas.db.vpn import vpn_validator
from neutron_vpnaas.services.vpn.service_drivers import driver_validator
LIFETIME_LIMITS = {'IKE Policy': {'min': 60, 'max': 86400},
@ -34,13 +34,9 @@ class CsrValidationFailure(nexception.BadRequest):
"with value '%(value)s'")
class CiscoCsrVpnValidator(vpn_validator.VpnReferenceValidator):
class CiscoCsrVpnValidator(driver_validator.VpnDriverValidator):
"""Validator methods for the Cisco CSR."""
def __init__(self, service_plugin):
self.service_plugin = service_plugin
super(CiscoCsrVpnValidator, self).__init__()
"""Driver-specific validator methods for the Cisco CSR."""
def validate_lifetime(self, for_policy, policy_info):
"""Ensure lifetime in secs and value is supported, based on policy."""
@ -114,29 +110,38 @@ class CiscoCsrVpnValidator(vpn_validator.VpnReferenceValidator):
key='auth_algorithm',
value=auth_algorithm)
def validate_ipsec_site_connection(self, context, ipsec_sitecon,
ip_version):
def validate_ipsec_site_connection(self, context, ipsec_sitecon):
"""Validate IPSec site connection for Cisco CSR.
After doing reference validation, do additional checks that relate
to the Cisco CSR.
Do additional checks that relate to the Cisco CSR.
"""
super(CiscoCsrVpnValidator, self)._check_dpd(ipsec_sitecon)
service_plugin = self.driver.service_plugin
if 'ikepolicy_id' in ipsec_sitecon:
ike_policy = service_plugin.get_ikepolicy(
context, ipsec_sitecon['ikepolicy_id'])
self.validate_lifetime('IKE Policy', ike_policy)
self.validate_ike_version(ike_policy)
self.validate_ike_auth_algorithm(ike_policy)
if 'ipsecpolicy_id' in ipsec_sitecon:
ipsec_policy = service_plugin.get_ipsecpolicy(
context, ipsec_sitecon['ipsecpolicy_id'])
self.validate_lifetime('IPSec Policy', ipsec_policy)
self.validate_ipsec_auth_algorithm(ipsec_policy)
self.validate_ipsec_encap_mode(ipsec_policy)
if 'vpnservice_id' in ipsec_sitecon:
vpn_service = service_plugin.get_vpnservice(
context, ipsec_sitecon['vpnservice_id'])
router = self.l3_plugin._get_router(
context, vpn_service['router_id'])
self.validate_public_ip_present(router)
if 'mtu' in ipsec_sitecon:
self.validate_mtu(ipsec_sitecon)
if 'peer_id' in ipsec_sitecon:
self.validate_peer_id(ipsec_sitecon)
ike_policy = self.service_plugin.get_ikepolicy(
context, ipsec_sitecon['ikepolicy_id'])
ipsec_policy = self.service_plugin.get_ipsecpolicy(
context, ipsec_sitecon['ipsecpolicy_id'])
vpn_service = self.service_plugin.get_vpnservice(
context, ipsec_sitecon['vpnservice_id'])
router = self.l3_plugin._get_router(context, vpn_service['router_id'])
self.validate_lifetime('IKE Policy', ike_policy)
self.validate_lifetime('IPSec Policy', ipsec_policy)
self.validate_ike_version(ike_policy)
self.validate_ike_auth_algorithm(ike_policy)
self.validate_ipsec_auth_algorithm(ipsec_policy)
self.validate_mtu(ipsec_sitecon)
self.validate_public_ip_present(router)
self.validate_peer_id(ipsec_sitecon)
self.validate_ipsec_encap_mode(ipsec_policy)
LOG.debug("IPSec connection validated for Cisco CSR")

View File

@ -0,0 +1,29 @@
# Copyright 2017 Eayun, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
class VpnDriverValidator(object):
"""Driver-specific validation routines for VPN resources."""
def __init__(self, driver):
self.driver = driver
@property
def l3_plugin(self):
return self.driver.l3_plugin
def validate_ipsec_site_connection(self, context, ipsec_sitecon):
"""Driver can override this for its additional validations."""
pass

View File

@ -30,7 +30,7 @@ class IPsecVPNDriver(base_ipsec.BaseIPsecVPNDriver):
def __init__(self, service_plugin):
super(IPsecVPNDriver, self).__init__(
service_plugin,
ipsec_validator.IpsecVpnValidator(service_plugin))
ipsec_validator.IpsecVpnValidator(self))
def create_rpc_conn(self):
self.endpoints = [base_ipsec.IPsecVpnDriverCallBack(self)]

View File

@ -15,7 +15,7 @@
from neutron_lib import exceptions as nexception
from neutron_vpnaas._i18n import _
from neutron_vpnaas.db.vpn import vpn_validator
from neutron_vpnaas.services.vpn.service_drivers import driver_validator
class IpsecValidationFailure(nexception.BadRequest):
@ -28,13 +28,11 @@ class IkeValidationFailure(nexception.BadRequest):
"with value '%(value)s'")
class IpsecVpnValidator(vpn_validator.VpnReferenceValidator):
class IpsecVpnValidator(driver_validator.VpnDriverValidator):
"""Validator methods for the Openswan, Strongswan and Libreswan."""
def __init__(self, service_plugin):
self.service_plugin = service_plugin
super(IpsecVpnValidator, self).__init__()
"""Driver-specific validator methods for the Openswan, Strongswan
and Libreswan.
"""
def _check_transform_protocol(self, context, transform_protocol):
"""Restrict selecting ah-esp as IPSec Policy transform protocol.
@ -78,3 +76,14 @@ class IpsecVpnValidator(vpn_validator.VpnReferenceValidator):
resource='IKE Policy',
key='auth_algorithm',
value=auth_algorithm)
def validate_ipsec_site_connection(self, context, ipsec_sitecon):
if 'ikepolicy_id' in ipsec_sitecon:
ike_policy = self.driver.service_plugin.get_ikepolicy(
context, ipsec_sitecon['ikepolicy_id'])
self.validate_ike_policy(context, ike_policy)
if 'ipsecpolicy_id' in ipsec_sitecon:
ipsec_policy = self.driver.service_plugin.get_ipsecpolicy(
context, ipsec_sitecon['ipsecpolicy_id'])
self.validate_ipsec_policy(context, ipsec_policy)

View File

@ -48,6 +48,7 @@ from neutron_vpnaas.extensions import vpnaas
DB_CORE_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
DB_VPN_PLUGIN_KLASS = "neutron_vpnaas.services.vpn.plugin.VPNPlugin"
FLAVOR_PLUGIN_KLASS = "neutron.services.flavors.flavors_plugin.FlavorsPlugin"
ROOTDIR = os.path.normpath(os.path.join(
os.path.dirname(__file__),
'..', '..', '..', '..'))
@ -208,6 +209,8 @@ class VPNTestMixin(object):
'tenant_id': tenant_id}}
if kwargs.get('description') is not None:
data['vpnservice']['description'] = kwargs['description']
if kwargs.get('flavor_id') is not None:
data['vpnservice']['flavor_id'] = kwargs['flavor_id']
vpnservice_req = self.new_create_request('vpnservices', data, fmt)
if (kwargs.get('set_context') and
'tenant_id' in kwargs):
@ -451,7 +454,9 @@ class VPNPluginDbTestCase(VPNTestMixin,
self.service_providers.return_value = [vpnaas_provider]
# force service type manager to reload configuration:
sdb.ServiceTypeManager._instance = None
service_plugins = {'vpnaas_plugin': vpnaas_plugin}
service_plugins = {
'vpnaas_plugin': vpnaas_plugin,
'flavors_plugin': FLAVOR_PLUGIN_KLASS}
plugin_str = ('neutron_vpnaas.tests.unit.db.vpn.'
'test_vpn_db.TestVpnCorePlugin')
@ -1777,6 +1782,7 @@ class TestVpnDatabase(base.NeutronDbPluginV2TestCase, NeutronResourcesMixin):
'description': 'new service',
'subnet_id': subnet_id,
'router_id': router['id'],
'flavor_id': None,
'admin_state_up': True}}
def test_create_vpnservice(self):

View File

@ -17,7 +17,6 @@ import mock
import socket
from neutron.db import l3_db
from neutron.db import servicetype_db as st_db
from neutron_lib import context as n_ctx
from neutron_lib import exceptions as nexception
from neutron_lib.plugins import constants as nconstants
@ -25,11 +24,9 @@ from neutron_lib.plugins import directory
from oslo_utils import uuidutils
from sqlalchemy.orm import query
from neutron_vpnaas.db.vpn import vpn_validator
from neutron_vpnaas.extensions import vpnaas
from neutron_vpnaas.services.vpn.common import constants as v_constants
from neutron_vpnaas.services.vpn import plugin as vpn_plugin
from neutron_vpnaas.services.vpn.service_drivers \
import ipsec_validator as vpn_validator
from neutron_vpnaas.tests import base
_uuid = uuidutils.generate_uuid
@ -40,47 +37,17 @@ FAKE_SUBNET_ID = _uuid()
IPV4 = 4
IPV6 = 6
IPSEC_SERVICE_DRIVER = ('neutron_vpnaas.services.vpn.service_drivers.'
'ipsec.IPsecVPNDriver')
class TestValidatorSelection(base.BaseTestCase):
class TestVpnValidation(base.BaseTestCase):
def setUp(self):
super(TestValidatorSelection, self).setUp()
vpnaas_provider = [{
'service_type': nconstants.VPN,
'name': 'vpnaas',
'driver': IPSEC_SERVICE_DRIVER,
'default': True
}]
# override the default service provider
self.service_providers = (
mock.patch.object(st_db.ServiceTypeManager,
'get_service_providers').start())
self.service_providers.return_value = vpnaas_provider
mock.patch('neutron.common.rpc.create_connection').start()
stm = st_db.ServiceTypeManager()
mock.patch('neutron.db.servicetype_db.ServiceTypeManager.get_instance',
return_value=stm).start()
self.vpn_plugin = vpn_plugin.VPNDriverPlugin()
def test_reference_driver_used(self):
self.assertIsInstance(self.vpn_plugin._get_validator(),
vpn_validator.IpsecVpnValidator)
class TestIPsecDriverValidation(base.BaseTestCase):
def setUp(self):
super(TestIPsecDriverValidation, self).setUp()
super(TestVpnValidation, self).setUp()
self.l3_plugin = mock.Mock()
self.core_plugin = mock.Mock()
directory.add_plugin(nconstants.CORE, self.core_plugin)
directory.add_plugin(nconstants.L3, self.l3_plugin)
self.context = n_ctx.Context('some_user', 'some_tenant')
self.service_plugin = mock.Mock()
self.validator = vpn_validator.IpsecVpnValidator(self.service_plugin)
self.validator = vpn_validator.VpnReferenceValidator()
self.router = mock.Mock()
self.router.gw_port = {'fixed_ips': [{'ip_address': '10.0.0.99'}]}
@ -198,36 +165,6 @@ class TestIPsecDriverValidation(base.BaseTestCase):
self.helper_validate_peer_address(fixed_ips, IPV6,
expected_exception=True)
def test_validate_ipsec_policy(self):
# Validate IPsec Policy transform_protocol and auth_algorithm
ipsec_policy = {'transform_protocol': 'ah-esp'}
self.assertRaises(vpn_validator.IpsecValidationFailure,
self.validator.validate_ipsec_policy,
self.context, ipsec_policy)
auth_algorithm = {'auth_algorithm': 'sha384'}
self.assertRaises(vpn_validator.IpsecValidationFailure,
self.validator.validate_ipsec_policy,
self.context, auth_algorithm)
auth_algorithm = {'auth_algorithm': 'sha512'}
self.assertRaises(vpn_validator.IpsecValidationFailure,
self.validator.validate_ipsec_policy,
self.context, auth_algorithm)
def test_validate_ike_policy(self):
# Validate IKE Policy auth_algorithm
auth_algorithm = {'auth_algorithm': 'sha384'}
self.assertRaises(vpn_validator.IkeValidationFailure,
self.validator.validate_ike_policy,
self.context, auth_algorithm)
auth_algorithm = {'auth_algorithm': 'sha512'}
self.assertRaises(vpn_validator.IkeValidationFailure,
self.validator.validate_ike_policy,
self.context, auth_algorithm)
def test_defaults_for_ipsec_site_connections_on_update(self):
"""Check that defaults are used for any values not specified."""
ipsec_sitecon = {}
@ -285,7 +222,7 @@ class TestIPsecDriverValidation(base.BaseTestCase):
def test_bad_mtu_for_ipsec_connection(self):
"""Failure test of invalid MTU values for IPSec conn create/update."""
ip_version_limits = vpn_validator.IpsecVpnValidator.IP_MIN_MTU
ip_version_limits = self.validator.IP_MIN_MTU
for version, limit in ip_version_limits.items():
ipsec_sitecon = {'mtu': limit - 1}
self.assertRaises(

View File

@ -24,6 +24,7 @@ from neutron_lib.plugins import directory
from oslo_config import cfg
from oslo_utils import uuidutils
from neutron_vpnaas.db.vpn import vpn_validator
from neutron_vpnaas.services.vpn import plugin as vpn_plugin
from neutron_vpnaas.services.vpn.service_drivers import cisco_csr_db as csr_db
from neutron_vpnaas.services.vpn.service_drivers \
@ -80,12 +81,20 @@ class TestCiscoValidatorSelection(base.BaseTestCase):
mock.patch.object(st_db.ServiceTypeManager,
'get_service_providers').start())
self.service_providers.return_value = vpnaas_provider
st_db.ServiceTypeManager._instance = None
mock.patch('neutron.common.rpc.create_connection').start()
stm = st_db.ServiceTypeManager()
stm.get_provider_names_by_resource_ids = mock.Mock(
return_value={})
mock.patch('neutron.db.servicetype_db.ServiceTypeManager.get_instance',
return_value=stm).start()
mock.patch('neutron_vpnaas.db.vpn.vpn_db.VPNPluginDb.get_vpnservices',
return_value=[]).start()
self.vpn_plugin = vpn_plugin.VPNDriverPlugin()
def test_reference_driver_used(self):
self.assertIsInstance(self.vpn_plugin._get_validator(),
default_provider = self.vpn_plugin.default_provider
default_driver = self.vpn_plugin.drivers[default_provider]
self.assertIsInstance(default_driver.validator,
validator.CiscoCsrVpnValidator)
@ -93,13 +102,13 @@ class TestCiscoIPsecDriverValidation(base.BaseTestCase):
def setUp(self):
super(TestCiscoIPsecDriverValidation, self).setUp()
self.l3_plugin = mock.Mock()
directory.add_plugin(lib_const.L3, self.l3_plugin)
self.context = n_ctx.Context('some_user', 'some_tenant')
self.vpn_service = {'router_id': '123'}
self.router = mock.Mock()
driver = mock.Mock()
self.service_plugin = mock.Mock()
self.validator = validator.CiscoCsrVpnValidator(self.service_plugin)
driver.service_plugin = self.service_plugin
self.validator = validator.CiscoCsrVpnValidator(driver)
def test_ike_version_unsupported(self):
"""Failure test that Cisco CSR REST API does not support IKE v2."""
@ -223,7 +232,8 @@ class TestCiscoIPsecDriverValidation(base.BaseTestCase):
'encapsulation_mode': 'tunnel'})
self.service_plugin.get_vpnservice = mock.Mock(
return_value=self.vpn_service)
self.l3_plugin._get_router = mock.Mock(return_value=self.router)
self.validator.driver.l3_plugin._get_router = mock.Mock(
return_value=self.router)
# Provide the minimum needed items to validate
ipsec_sitecon = {'id': '1',
'vpnservice_id': FAKE_SERVICE_ID,
@ -236,9 +246,10 @@ class TestCiscoIPsecDriverValidation(base.BaseTestCase):
'dpd_interval': 30,
'dpd_timeout': 120}
expected.update(ipsec_sitecon)
self.validator.assign_sensible_ipsec_sitecon_defaults(ipsec_sitecon)
plugin_validator = vpn_validator.VpnReferenceValidator()
plugin_validator.assign_sensible_ipsec_sitecon_defaults(ipsec_sitecon)
self.validator.validate_ipsec_site_connection(self.context,
ipsec_sitecon, IPV4)
ipsec_sitecon)
self.assertEqual(expected, ipsec_sitecon)
def test_ipsec_encap_mode_unsupported(self):

View File

@ -15,12 +15,15 @@
import mock
from neutron.db import servicetype_db as st_db
from neutron_lib import context as n_ctx
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from oslo_utils import uuidutils
from neutron_vpnaas.services.vpn import plugin as vpn_plugin
from neutron_vpnaas.services.vpn.service_drivers import ipsec as ipsec_driver
from neutron_vpnaas.services.vpn.service_drivers import ipsec_validator
from neutron_vpnaas.tests import base
@ -49,6 +52,38 @@ class FakeSqlQueryObject(dict):
super(FakeSqlQueryObject, self).__init__(**entries)
class TestValidatorSelection(base.BaseTestCase):
def setUp(self):
super(TestValidatorSelection, self).setUp()
vpnaas_provider = [{
'service_type': constants.VPN,
'name': 'vpnaas',
'driver': IPSEC_SERVICE_DRIVER,
'default': True
}]
# override the default service provider
self.service_providers = (
mock.patch.object(st_db.ServiceTypeManager,
'get_service_providers').start())
self.service_providers.return_value = vpnaas_provider
mock.patch('neutron.common.rpc.create_connection').start()
stm = st_db.ServiceTypeManager()
stm.get_provider_names_by_resource_ids = mock.Mock(
return_value={})
mock.patch('neutron.db.servicetype_db.ServiceTypeManager.get_instance',
return_value=stm).start()
mock.patch('neutron_vpnaas.db.vpn.vpn_db.VPNPluginDb.get_vpnservices',
return_value=[]).start()
self.vpn_plugin = vpn_plugin.VPNDriverPlugin()
def test_reference_driver_used(self):
default_provider = self.vpn_plugin.default_provider
default_driver = self.vpn_plugin.drivers[default_provider]
self.assertIsInstance(default_driver.validator,
ipsec_validator.IpsecVpnValidator)
class TestIPsecDriver(base.BaseTestCase):
def setUp(self):
super(TestIPsecDriver, self).setUp()
@ -67,6 +102,8 @@ class TestIPsecDriver(base.BaseTestCase):
'router_id': self._fake_vpn_router_id
}
self.driver = ipsec_driver.IPsecVPNDriver(self.svc_plugin)
self.validator = ipsec_validator.IpsecVpnValidator(self.driver)
self.context = n_ctx.get_admin_context()
def _test_update(self, func, args, additional_info=None):
ctxt = n_ctx.Context('', 'somebody')
@ -420,3 +457,33 @@ class TestIPsecDriver(base.BaseTestCase):
self.driver.create_vpnservice(ctxt, vpnservice_dict)
self.svc_plugin.set_external_tunnel_ips.assert_called_once_with(
ctxt, FAKE_SERVICE_ID, v4_ip='10.0.0.99', v6_ip='2001::1')
def test_validate_ipsec_policy(self):
# Validate IPsec Policy transform_protocol and auth_algorithm
ipsec_policy = {'transform_protocol': 'ah-esp'}
self.assertRaises(ipsec_validator.IpsecValidationFailure,
self.validator.validate_ipsec_policy,
self.context, ipsec_policy)
auth_algorithm = {'auth_algorithm': 'sha384'}
self.assertRaises(ipsec_validator.IpsecValidationFailure,
self.validator.validate_ipsec_policy,
self.context, auth_algorithm)
auth_algorithm = {'auth_algorithm': 'sha512'}
self.assertRaises(ipsec_validator.IpsecValidationFailure,
self.validator.validate_ipsec_policy,
self.context, auth_algorithm)
def test_validate_ike_policy(self):
# Validate IKE Policy auth_algorithm
auth_algorithm = {'auth_algorithm': 'sha384'}
self.assertRaises(ipsec_validator.IkeValidationFailure,
self.validator.validate_ike_policy,
self.context, auth_algorithm)
auth_algorithm = {'auth_algorithm': 'sha512'}
self.assertRaises(ipsec_validator.IkeValidationFailure,
self.validator.validate_ike_policy,
self.context, auth_algorithm)

View File

@ -15,6 +15,9 @@
import contextlib
import mock
from neutron.db import servicetype_db as st_db
from neutron.extensions import flavors
from neutron.services.flavors.flavors_plugin import FlavorsPlugin
from neutron.tests.unit.db import test_agentschedulers_db
from neutron.tests.unit.extensions import test_agent as test_agent_ext_plugin
@ -22,14 +25,25 @@ from neutron_lib import constants as lib_constants
from neutron_lib import context
from neutron_lib.plugins import constants as p_constants
from neutron_lib.plugins import directory
from oslo_utils import uuidutils
from neutron_vpnaas.db.vpn import vpn_validator
from neutron_vpnaas.extensions import vpn_flavors
from neutron_vpnaas.services.vpn import plugin as vpn_plugin
from neutron_vpnaas.services.vpn.service_drivers import driver_validator
from neutron_vpnaas.services.vpn.service_drivers import ipsec as ipsec_driver
from neutron_vpnaas.tests import base
from neutron_vpnaas.tests.unit.db.vpn import test_vpn_db as test_db_vpnaas
FAKE_HOST = test_agent_ext_plugin.L3_HOSTA
VPN_DRIVER_CLASS = 'neutron_vpnaas.services.vpn.plugin.VPNDriverPlugin'
IPSEC_SERVICE_DRIVER = ('neutron_vpnaas.services.vpn.service_drivers.'
'ipsec.IPsecVPNDriver')
CISCO_IPSEC_SERVICE_DRIVER = ('neutron_vpnaas.services.vpn.service_drivers.'
'cisco_ipsec.CiscoCsrIPsecVPNDriver')
_uuid = uuidutils.generate_uuid
class TestVPNDriverPlugin(test_db_vpnaas.TestVpnaas,
test_agentschedulers_db.AgentSchedulerTestMixIn,
@ -42,7 +56,8 @@ class TestVPNDriverPlugin(test_db_vpnaas.TestVpnaas,
driver_cls = driver_cls_p.start()
self.driver = mock.Mock()
self.driver.service_type = ipsec_driver.IPSEC
self.driver.validator = vpn_validator.VpnReferenceValidator()
self.driver.validator = driver_validator.VpnDriverValidator(
self.driver)
driver_cls.return_value = self.driver
super(TestVPNDriverPlugin, self).setUp(
vpnaas_plugin=VPN_DRIVER_CLASS)
@ -58,14 +73,25 @@ class TestVPNDriverPlugin(test_db_vpnaas.TestVpnaas,
mock.ANY, mock.ANY)
def test_create_vpnservice(self):
mock.patch('neutron_vpnaas.services.vpn.plugin.'
'VPNDriverPlugin._get_driver_for_vpnservice',
return_value=self.driver).start()
stm = directory.get_plugin(p_constants.VPN).service_type_manager
stm.add_resource_association = mock.Mock()
super(TestVPNDriverPlugin, self).test_create_vpnservice()
self.driver.create_vpnservice.assert_called_once_with(
mock.ANY, mock.ANY)
stm.add_resource_association.assert_called_once_with(
mock.ANY, p_constants.VPN, 'vpnaas', mock.ANY)
def test_delete_vpnservice(self, **extras):
stm = directory.get_plugin(p_constants.VPN).service_type_manager
stm.del_resource_associations = mock.Mock()
super(TestVPNDriverPlugin, self).test_delete_vpnservice()
self.driver.delete_vpnservice.assert_called_once_with(
mock.ANY, mock.ANY)
stm.del_resource_associations.assert_called_once_with(
mock.ANY, [mock.ANY])
def test_update_vpnservice(self, **extras):
super(TestVPNDriverPlugin, self).test_update_vpnservice()
@ -136,21 +162,6 @@ class TestVPNDriverPlugin(test_db_vpnaas.TestVpnaas,
):
yield vpnservice1['vpnservice']
def test_get_agent_hosting_vpn_services(self):
with self.vpnservice_set():
service_plugin = directory.get_plugin(p_constants.VPN)
vpnservices = service_plugin._get_agent_hosting_vpn_services(
self.adminContext, FAKE_HOST)
vpnservices = vpnservices.all()
self.assertEqual(1, len(vpnservices))
vpnservice_db = vpnservices[0]
self.assertEqual(1, len(vpnservice_db.ipsec_site_connections))
ipsec_site_connection = vpnservice_db.ipsec_site_connections[0]
self.assertIsNotNone(
ipsec_site_connection['ikepolicy'])
self.assertIsNotNone(
ipsec_site_connection['ipsecpolicy'])
def test_update_status(self):
with self.vpnservice_set() as vpnservice:
self._register_agent_states()
@ -161,7 +172,181 @@ class TestVPNDriverPlugin(test_db_vpnaas.TestVpnaas,
'ipsec_site_connections': {},
'updated_pending_status': True,
'id': vpnservice['id']}])
vpnservices = service_plugin._get_agent_hosting_vpn_services(
self.adminContext, FAKE_HOST)
vpnservice_db = vpnservices[0]
self.assertEqual(lib_constants.ACTIVE, vpnservice_db['status'])
vpnservice = service_plugin.get_vpnservice(
self.adminContext, vpnservice['id'])
self.assertEqual(lib_constants.ACTIVE, vpnservice['status'])
class TestVPNDriverPluginMultipleDrivers(base.BaseTestCase):
def setUp(self):
super(TestVPNDriverPluginMultipleDrivers, self).setUp()
vpnaas_providers = [
{'service_type': p_constants.VPN,
'name': 'ipsec',
'driver': IPSEC_SERVICE_DRIVER,
'default': True},
{'service_type': p_constants.VPN,
'name': 'cisco',
'driver': CISCO_IPSEC_SERVICE_DRIVER,
'default': False}]
self.service_providers = (
mock.patch.object(st_db.ServiceTypeManager,
'get_service_providers').start())
self.service_providers.return_value = vpnaas_providers
self.adminContext = context.get_admin_context()
@contextlib.contextmanager
def vpnservices_providers_set(self, vpnservices=None, provider_names=None):
if not vpnservices:
vpnservices = []
if not provider_names:
provider_names = {}
stm = st_db.ServiceTypeManager()
stm.get_provider_names_by_resource_ids = mock.Mock(
return_value=provider_names)
mock.patch('neutron.db.servicetype_db.ServiceTypeManager.get_instance',
return_value=stm).start()
mock.patch('neutron_vpnaas.db.vpn.vpn_db.VPNPluginDb.get_vpnservices',
return_value=vpnservices).start()
yield stm
def test_multiple_drivers_loaded(self):
with self.vpnservices_providers_set():
driver_plugin = vpn_plugin.VPNDriverPlugin()
self.assertEqual(2, len(driver_plugin.drivers))
self.assertEqual('ipsec', driver_plugin.default_provider)
self.assertIn('ipsec', driver_plugin.drivers)
self.assertEqual('ipsec', driver_plugin.drivers['ipsec'].name)
self.assertIn('cisco', driver_plugin.drivers)
self.assertEqual('cisco', driver_plugin.drivers['cisco'].name)
def test_provider_lost(self):
LOST_SERVICE_ID = _uuid()
LOST_PROVIDER_SERVICE = {'id': LOST_SERVICE_ID}
with self.vpnservices_providers_set(
vpnservices=[LOST_PROVIDER_SERVICE],
provider_names={LOST_SERVICE_ID: 'LOST_PROVIDER'}
):
self.assertRaises(SystemExit, vpn_plugin.VPNDriverPlugin)
def test_unasso_vpnservices(self):
UNASSO_SERVICE_ID = _uuid()
with self.vpnservices_providers_set(
vpnservices=[{'id': UNASSO_SERVICE_ID}]
) as stm:
stm.add_resource_association = mock.Mock()
vpn_plugin.VPNDriverPlugin()
stm.add_resource_association.assert_called_once_with(
mock.ANY, p_constants.VPN, 'ipsec', UNASSO_SERVICE_ID)
def test_get_driver_for_vpnservice(self):
CISCO_VPNSERVICE_ID = _uuid()
CISCO_VPNSERVICE = {'id': CISCO_VPNSERVICE_ID}
provider_names = {CISCO_VPNSERVICE_ID: 'cisco'}
with self.vpnservices_providers_set(provider_names=provider_names):
driver_plugin = vpn_plugin.VPNDriverPlugin()
self.assertEqual(
driver_plugin.drivers['cisco'],
driver_plugin._get_driver_for_vpnservice(
self.adminContext, CISCO_VPNSERVICE))
def test_get_driver_for_ipsec_site_connection(self):
IPSEC_VPNSERVICE_ID = _uuid()
IPSEC_SITE_CONNECTION = {'vpnservice_id': IPSEC_VPNSERVICE_ID}
provider_names = {IPSEC_VPNSERVICE_ID: 'ipsec'}
with self.vpnservices_providers_set(provider_names=provider_names):
driver_plugin = vpn_plugin.VPNDriverPlugin()
self.assertEqual(
driver_plugin.drivers['ipsec'],
driver_plugin._get_driver_for_ipsec_site_connection(
self.adminContext, IPSEC_SITE_CONNECTION))
def test_get_provider_for_none_flavor_id(self):
with self.vpnservices_providers_set():
driver_plugin = vpn_plugin.VPNDriverPlugin()
provider = driver_plugin._get_provider_for_flavor(
self.adminContext, None)
self.assertEqual(
driver_plugin.default_provider, provider)
def test_get_provider_for_flavor_id_plugin_not_loaded(self):
with self.vpnservices_providers_set():
driver_plugin = vpn_plugin.VPNDriverPlugin()
self.assertRaises(
vpn_flavors.FlavorsPluginNotLoaded,
driver_plugin._get_provider_for_flavor,
self.adminContext,
_uuid())
def test_get_provider_for_flavor_id_invalid_type(self):
FAKE_FLAVOR = {'service_type': 'NOT_VPN'}
directory.add_plugin(p_constants.FLAVORS, FlavorsPlugin())
mock.patch(
'neutron.services.flavors.flavors_plugin.FlavorsPlugin.get_flavor',
return_value=FAKE_FLAVOR).start()
with self.vpnservices_providers_set():
driver_plugin = vpn_plugin.VPNDriverPlugin()
self.assertRaises(
flavors.InvalidFlavorServiceType,
driver_plugin._get_provider_for_flavor,
self.adminContext,
_uuid())
def test_get_provider_for_flavor_id_flavor_disabled(self):
FAKE_FLAVOR = {'service_type': p_constants.VPN,
'enabled': False}
directory.add_plugin(p_constants.FLAVORS, FlavorsPlugin())
mock.patch(
'neutron.services.flavors.flavors_plugin.FlavorsPlugin.get_flavor',
return_value=FAKE_FLAVOR).start()
with self.vpnservices_providers_set():
driver_plugin = vpn_plugin.VPNDriverPlugin()
self.assertRaises(
flavors.FlavorDisabled,
driver_plugin._get_provider_for_flavor,
self.adminContext,
_uuid())
def test_get_provider_for_flavor_id_provider_not_found(self):
FLAVOR_ID = _uuid()
FAKE_FLAVOR = {'id': FLAVOR_ID,
'service_type': p_constants.VPN,
'enabled': True}
PROVIDERS = [{'provider': 'SOME_PROVIDER'}]
directory.add_plugin(p_constants.FLAVORS, FlavorsPlugin())
mock.patch(
'neutron.services.flavors.flavors_plugin.FlavorsPlugin.get_flavor',
return_value=FAKE_FLAVOR).start()
mock.patch(
'neutron.services.flavors.flavors_plugin.'
'FlavorsPlugin.get_flavor_next_provider',
return_value=PROVIDERS).start()
with self.vpnservices_providers_set():
driver_plugin = vpn_plugin.VPNDriverPlugin()
self.assertRaises(
vpn_flavors.NoProviderFoundForFlavor,
driver_plugin._get_provider_for_flavor,
self.adminContext,
FLAVOR_ID)
def test_get_provider_for_flavor_id(self):
FLAVOR_ID = _uuid()
FAKE_FLAVOR = {'id': FLAVOR_ID,
'service_type': p_constants.VPN,
'enabled': True}
PROVIDERS = [{'provider': 'cisco'}]
directory.add_plugin(p_constants.FLAVORS, FlavorsPlugin())
mock.patch(
'neutron.services.flavors.flavors_plugin.FlavorsPlugin.get_flavor',
return_value=FAKE_FLAVOR).start()
mock.patch(
'neutron.services.flavors.flavors_plugin.'
'FlavorsPlugin.get_flavor_next_provider',
return_value=PROVIDERS).start()
with self.vpnservices_providers_set():
driver_plugin = vpn_plugin.VPNDriverPlugin()
self.assertEqual(
'cisco',
driver_plugin._get_provider_for_flavor(
self.adminContext, FLAVOR_ID))

View File

@ -0,0 +1,8 @@
---
prelude: >
Flavor framework integration.
features:
- Neutron VPNaaS is now integrated with Neutron flavor framework. Multiple
VPN service providers might be configured at the same time. A flavor of
service type VPN associated with a profile containing a driver is used
to find the provider for a newly created VPN service.