Implement VPN connection storage

Change-Id: I3e73dca9ed4084a8d864b7d7495ac6321b623f10
This commit is contained in:
Feodor Tersin 2015-05-24 23:52:49 +03:00
parent d53b1425a0
commit 020e6c0ba1
11 changed files with 547 additions and 12 deletions

View File

@ -41,6 +41,7 @@ from ec2api.api import subnet
from ec2api.api import tag
from ec2api.api import volume
from ec2api.api import vpc
from ec2api.api import vpn_connection
from ec2api.api import vpn_gateway
from ec2api import exception
@ -1994,3 +1995,51 @@ class VpcCloudController(CloudController):
Returns:
Information about one or more customer gateways.
"""
@module_and_param_types(vpn_connection, 'cgw_id',
'vgw_id', 'vpn_connection_type', 'dummy')
def create_vpn_connection(self, context, customer_gateway_id,
vpn_gateway_id, type, options=None):
"""Creates a VPN connection.
Args:
context (RequestContext): The request context.
customer_gateway_id (str): The ID of the customer gateway.
vpn_gateway_id (str): The ID of the virtual private gateway.
type (str): The type of VPN connection (ipsec.1).
options (dict of options): Indicates whether the VPN connection
requires static routes.
Returns:
Information about the VPN connection.
Creates a VPN connection between an existing virtual private gateway
and a VPN customer gateway.
"""
@module_and_param_types(vpn_connection, 'vpn_id')
def delete_vpn_connection(self, context, vpn_connection_id):
"""Deletes the specified VPN connection.
Args:
context (RequestContext): The request context.
vpn_connection_id (str): The ID of the VPN connection.
Returns:
true if the request succeeds.
"""
@module_and_param_types(vpn_connection, 'vpn_ids',
'filter')
def describe_vpn_connections(self, context, vpn_connection_id=None,
filter=None):
"""Describes one or more of your VPN connections.
Args:
context (RequestContext): The request context.
vpn_connection_id (list of str): One or more VPN connection IDs.
filter (list of filter dict): One or more filters.
Returns:
Information about one or more VPN connections.
"""

View File

@ -259,6 +259,12 @@ class Validator(object):
def cgw_ids(self, ids):
self.multi(ids, self.cgw_id)
def vpn_id(self, id):
self.ec2_id(id, ['vpn'])
def vpn_ids(self, ids):
self.multi(ids, self.vpn_id)
def security_group_str(self, value):
validator.validate_security_group_str(value, self.param_name,
self.params.get('vpc_id'))
@ -271,7 +277,7 @@ class Validator(object):
VPC_KINDS = ['vpc', 'igw', 'subnet', 'eni', 'dopt', 'eipalloc', 'rtb',
'vgw', 'cgw']
'vgw', 'cgw', 'vpn']
class UniversalDescriber(object):

View File

@ -191,6 +191,7 @@ NOT_FOUND_EXCEPTION_MAP = {
'ari': exception.InvalidAMIIDNotFound,
'vgw': exception.InvalidVpnGatewayIDNotFound,
'cgw': exception.InvalidCustomerGatewayIDNotFound,
'vpn': exception.InvalidVpnConnectionIDNotFound,
}

View File

@ -0,0 +1,156 @@
# Copyright 2014
# The Cloudscaling Group, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import string
from neutronclient.common import exceptions as neutron_exception
from oslo_log import log as logging
from ec2api.api import clients
from ec2api.api import common
from ec2api.api import ec2utils
from ec2api.db import api as db_api
from ec2api import exception
from ec2api.i18n import _
LOG = logging.getLogger(__name__)
Validator = common.Validator
SHARED_KEY_CHARS = string.ascii_letters + '_.' + string.digits
def create_vpn_connection(context, customer_gateway_id, vpn_gateway_id,
type, options=None):
if not options or options.get('static_routes_only') is not True:
raise exception.Unsupported('BGP dynamic routing is unsupported')
customer_gateway = ec2utils.get_db_item(context, customer_gateway_id)
vpn_gateway = ec2utils.get_db_item(context, vpn_gateway_id)
vpn_connection = next(
(vpn for vpn in db_api.get_items(context, 'vpn')
if vpn['customer_gateway_id'] == customer_gateway_id),
None)
if vpn_connection:
if vpn_connection['vpn_gateway_id'] == vpn_gateway_id:
return {'vpnConnection': _format_vpn_connection(vpn_connection)}
else:
raise exception.InvalidCustomerGatewayDuplicateIpAddress()
neutron = clients.neutron(context)
with common.OnCrashCleaner() as cleaner:
os_ikepolicy = {'ike_version': 'v1',
'auth_algorithm': 'sha1',
'encryption_algorithm': 'aes-128',
'pfs': 'group2',
'phase1_negotiation_mode': 'main',
'lifetime': {'units': 'seconds',
'value': 28800}}
os_ikepolicy = neutron.create_ikepolicy(
{'ikepolicy': os_ikepolicy})['ikepolicy']
cleaner.addCleanup(neutron.delete_ikepolicy, os_ikepolicy['id'])
os_ipsecpolicy = {'transform_protocol': 'esp',
'auth_algorithm': 'sha1',
'encryption_algorithm': 'aes-128',
'pfs': 'group2',
'encapsulation_mode': 'tunnel',
'lifetime': {'units': 'seconds',
'value': 3600}}
os_ipsecpolicy = neutron.create_ipsecpolicy(
{'ipsecpolicy': os_ipsecpolicy})['ipsecpolicy']
cleaner.addCleanup(neutron.delete_ipsecpolicy, os_ipsecpolicy['id'])
psk = ''.join(random.choice(SHARED_KEY_CHARS) for _x in xrange(32))
vpn_connection = db_api.add_item(
context, 'vpn',
{'customer_gateway_id': customer_gateway['id'],
'vpn_gateway_id': vpn_gateway['id'],
'pre_shared_key': psk,
'os_ikepolicy_id': os_ikepolicy['id'],
'os_ipsecpolicy_id': os_ipsecpolicy['id'],
})
cleaner.addCleanup(db_api.delete_item, context, vpn_connection['id'])
neutron.update_ikepolicy(
os_ikepolicy['id'], {'ikepolicy': {'name': vpn_connection['id']}})
neutron.update_ipsecpolicy(
os_ipsecpolicy['id'],
{'ipsecpolicy': {'name': vpn_connection['id']}})
return {'vpnConnection': _format_vpn_connection(vpn_connection)}
def delete_vpn_connection(context, vpn_connection_id):
vpn_connection = ec2utils.get_db_item(context, vpn_connection_id)
with common.OnCrashCleaner() as cleaner:
db_api.delete_item(context, vpn_connection['id'])
cleaner.addCleanup(db_api.restore_item, context, 'vpn', vpn_connection)
neutron = clients.neutron(context)
try:
neutron.delete_ipsecpolicy(vpn_connection['os_ipsecpolicy_id'])
except neutron_exception.Conflict as ex:
LOG.warning(
_('Failed to delete ipsecoplicy %(os_id)s during deleting '
'VPN connection %(id)s. Reason: %(reason)s'),
{'id': vpn_connection['id'],
'os_id': vpn_connection['os_ipsecpolicy_id'],
'reason': ex.message})
except neutron_exception.NotFound:
pass
try:
neutron.delete_ikepolicy(vpn_connection['os_ikepolicy_id'])
except neutron_exception.Conflict as ex:
LOG.warning(
_('Failed to delete ikepolicy %(os_id)s during deleting '
'VPN connection %(id)s. Reason: %(reason)s'),
{'id': vpn_connection['id'],
'os_id': vpn_connection['os_ikepolicy_id'],
'reason': ex.message})
except neutron_exception.NotFound:
pass
return True
def describe_vpn_connections(context, vpn_connection_id=None, filter=None):
formatted_vpn_connections = VpnConnectionDescriber().describe(
context, ids=vpn_connection_id, filter=filter)
return {'vpnConnectionSet': formatted_vpn_connections}
class VpnConnectionDescriber(common.TaggableItemsDescriber,
common.NonOpenstackItemsDescriber):
KIND = 'vpn'
FILTER_MAP = {'customer-gateway-id': 'customerGatewayId',
'state': 'state',
'option.static-routes-only': ('options', 'staticRoutesOnly'),
'type': 'type',
'vpn-connection-id': 'vpnConnectionId',
'vpn-gateway-id': 'vpnGatewayId'}
def format(self, vpn_connection):
return _format_vpn_connection(vpn_connection)
def _format_vpn_connection(vpn_connection):
return {'vpnConnectionId': vpn_connection['id'],
'vpnGatewayId': vpn_connection['vpn_gateway_id'],
'customerGatewayId': vpn_connection['customer_gateway_id'],
'state': 'available',
'type': 'ipsec.1',
'vgwTelemetry': [],
'options': {'staticRoutesOnly': True}}

View File

@ -101,7 +101,8 @@ def detach_vpn_gateway(context, vpc_id, vpn_gateway_id):
def delete_vpn_gateway(context, vpn_gateway_id):
vpn_gateway = ec2utils.get_db_item(context, vpn_gateway_id)
vpn_connections = db_api.get_items(context, 'vpn')
if vpn_gateway['vpc_id']:
if vpn_gateway['vpc_id'] or any(vpn['vpn_gateway_id'] == vpn_gateway['id']
for vpn in vpn_connections):
raise exception.IncorrectState(reason=_('The VPN gateway is in use.'))
db_api.delete_item(context, vpn_gateway['id'])
return True

View File

@ -302,6 +302,11 @@ class RouteAlreadyExists(EC2DuplicateException):
'already exists.')
class InvalidCustomerGatewayDuplicateIpAddress(EC2DuplicateException):
ec2_code = 'InvalidCustomerGateway.DuplicateIpAddress'
msg_fmt = _('Conflict among chosen gateway IP addresses.')
class InvalidVpcIDNotFound(EC2NotFoundException):
ec2_code = 'InvalidVpcID.NotFound'
msg_fmt = _("The vpc ID '%(id)s' does not exist")
@ -420,6 +425,11 @@ class InvalidCustomerGatewayIDNotFound(EC2NotFoundException):
msg_fmt = _("The customerGateway ID '%(id)s' does not exist")
class InvalidVpnConnectionIDNotFound(EC2NotFoundException):
ec2_code = 'InvalidVpnConnectionID.NotFound'
msg_fmt = _("The vpnConnection ID '%(id)s' does not exist")
class InvalidVpnGatewayAttachmentNotFound(EC2NotFoundException):
ec2_code = 'InvalidVpnGatewayAttachment.NotFound'
msg_fmt = _("The attachment with vpn gateway ID '%(vgw_id)s' "

View File

@ -254,6 +254,16 @@ IP_CUSTOMER_GATEWAY_ADDRESS_2 = '172.31.2.22'
# vpn connection constants
ID_EC2_VPN_CONNECTION_1 = random_ec2_id('vpn')
ID_EC2_VPN_CONNECTION_2 = random_ec2_id('vpn')
ID_OS_IKEPOLICY_1 = random_os_id()
ID_OS_IKEPOLICY_2 = random_os_id()
ID_OS_IPSECPOLICY_1 = random_os_id()
ID_OS_IPSECPOLICY_2 = random_os_id()
PRE_SHARED_KEY_1 = 'Z54kLbANio5A1.XmkjwYvWuSfVx3_xuG'
PRE_SHARED_KEY_2 = 'FSbXpA.G9306W.BQ2n6W9JZJsyZcMN2G'
CIDR_VPN_1_STATIC = '192.168.101.0/24'
# Object constants section
@ -1605,6 +1615,85 @@ EC2_CUSTOMER_GATEWAY_2 = {
}
# VPN connection objects
DB_VPN_CONNECTION_1 = {
'id': ID_EC2_VPN_CONNECTION_1,
'customer_gateway_id': ID_EC2_CUSTOMER_GATEWAY_1,
'vpn_gateway_id': ID_EC2_VPN_GATEWAY_1,
'pre_shared_key': PRE_SHARED_KEY_1,
'os_ikepolicy_id': ID_OS_IKEPOLICY_1,
'os_ipsecpolicy_id': ID_OS_IPSECPOLICY_1,
}
DB_VPN_CONNECTION_2 = {
'id': ID_EC2_VPN_CONNECTION_2,
'customer_gateway_id': ID_EC2_CUSTOMER_GATEWAY_2,
'vpn_gateway_id': ID_EC2_VPN_GATEWAY_2,
'pre_shared_key': PRE_SHARED_KEY_2,
'os_ikepolicy_id': ID_OS_IKEPOLICY_2,
'os_ipsecpolicy_id': ID_OS_IPSECPOLICY_2,
}
EC2_VPN_CONNECTION_1 = {
'vpnConnectionId': ID_EC2_VPN_CONNECTION_1,
'vpnGatewayId': ID_EC2_VPN_GATEWAY_1,
'customerGatewayId': ID_EC2_CUSTOMER_GATEWAY_1,
'state': 'available',
'type': 'ipsec.1',
'vgwTelemetry': None,
'options': {'staticRoutesOnly': True},
}
EC2_VPN_CONNECTION_2 = {
'vpnConnectionId': ID_EC2_VPN_CONNECTION_2,
'vpnGatewayId': ID_EC2_VPN_GATEWAY_2,
'customerGatewayId': ID_EC2_CUSTOMER_GATEWAY_2,
'state': 'available',
'type': 'ipsec.1',
'vgwTelemetry': None,
'options': {'staticRoutesOnly': True},
}
OS_IKEPOLICY_1 = {
'id': ID_OS_IKEPOLICY_1,
'ike_version': 'v1',
'auth_algorithm': 'sha1',
'encryption_algorithm': 'aes-128',
'pfs': 'group2',
'phase1_negotiation_mode': 'main',
'lifetime': {'units': 'seconds',
'value': 28800}
}
OS_IKEPOLICY_2 = {
'id': ID_OS_IKEPOLICY_2,
'ike_version': 'v1',
'auth_algorithm': 'sha1',
'encryption_algorithm': 'aes-128',
'pfs': 'group2',
'phase1_negotiation_mode': 'main',
'lifetime': {'units': 'seconds',
'value': 28800}
}
OS_IPSECPOLICY_1 = {
'id': ID_OS_IPSECPOLICY_1,
'transform_protocol': 'esp',
'auth_algorithm': 'sha1',
'encryption_algorithm': 'aes-128',
'pfs': 'group2',
'encapsulation_mode': 'tunnel',
'lifetime': {'units': 'seconds',
'value': 3600}
}
OS_IPSECPOLICY_2 = {
'id': ID_OS_IPSECPOLICY_2,
'transform_protocol': 'esp',
'auth_algorithm': 'sha1',
'encryption_algorithm': 'aes-128',
'pfs': 'group2',
'encapsulation_mode': 'tunnel',
'lifetime': {'units': 'seconds',
'value': 3600}
}
# Object generator functions section
# internet gateway generator functions

View File

@ -89,6 +89,14 @@ class CustomerGatewayTestCase(base.ApiTestCase):
{'CustomerGatewayId': fakes.ID_EC2_CUSTOMER_GATEWAY_2})
self.assertFalse(self.db_api.delete_item.called)
self.set_mock_db_items(fakes.DB_CUSTOMER_GATEWAY_1,
fakes.DB_VPN_CONNECTION_1)
self.assert_execution_error(
'IncorrectState',
'DeleteCustomerGateway',
{'CustomerGatewayId': fakes.ID_EC2_CUSTOMER_GATEWAY_1})
self.assertFalse(self.db_api.delete_item.called)
def test_describe_customer_gateways(self):
self.set_mock_db_items(fakes.DB_CUSTOMER_GATEWAY_1,
fakes.DB_CUSTOMER_GATEWAY_2)

View File

@ -69,6 +69,7 @@ class EC2UtilsTestCase(testtools.TestCase):
check_not_found('aki', exception.InvalidAMIIDNotFound)
check_not_found('vgw', exception.InvalidVpnGatewayIDNotFound)
check_not_found('cgw', exception.InvalidCustomerGatewayIDNotFound)
check_not_found('vpn', exception.InvalidVpnConnectionIDNotFound)
@mock.patch('ec2api.db.api.IMPL')
def test_get_db_items(self, db_api):
@ -125,6 +126,7 @@ class EC2UtilsTestCase(testtools.TestCase):
check_not_found('ari', exception.InvalidAMIIDNotFound)
check_not_found('vgw', exception.InvalidVpnGatewayIDNotFound)
check_not_found('cgw', exception.InvalidCustomerGatewayIDNotFound)
check_not_found('vpn', exception.InvalidVpnConnectionIDNotFound)
"""Unit test api xml conversion."""
def test_number_conversion(self):

View File

@ -0,0 +1,211 @@
# Copyright 2014
# The Cloudscaling Group, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from ec2api.tests.unit import base
from ec2api.tests.unit import fakes
from ec2api.tests.unit import matchers
from ec2api.tests.unit import tools
class VpnConnectionTestCase(base.ApiTestCase):
@mock.patch('random.choice')
def test_create_vpn_connection(self, random_choice):
self.set_mock_db_items(
fakes.DB_VPN_GATEWAY_1, fakes.DB_VPN_GATEWAY_2,
fakes.DB_CUSTOMER_GATEWAY_1, fakes.DB_CUSTOMER_GATEWAY_2)
self.neutron.create_ikepolicy.side_effect = (
tools.get_neutron_create('ikepolicy', fakes.ID_OS_IKEPOLICY_1))
self.neutron.create_ipsecpolicy.side_effect = (
tools.get_neutron_create('ipsecpolicy', fakes.ID_OS_IPSECPOLICY_1))
self.db_api.add_item.side_effect = (
tools.get_db_api_add_item(fakes.ID_EC2_VPN_CONNECTION_1))
random_choice.side_effect = iter(fakes.PRE_SHARED_KEY_1)
resp = self.execute(
'CreateVpnConnection',
{'VpnGatewayId': fakes.ID_EC2_VPN_GATEWAY_1,
'CustomerGatewayId': fakes.ID_EC2_CUSTOMER_GATEWAY_1,
'Type': 'ipsec.1',
'Options.StaticRoutesOnly': 'True'})
self.assertThat(
resp,
matchers.DictMatches(
{'vpnConnection': fakes.EC2_VPN_CONNECTION_1}))
self.neutron.create_ikepolicy.assert_called_once_with(
{'ikepolicy': tools.purge_dict(fakes.OS_IKEPOLICY_1, ('id',))})
self.neutron.create_ipsecpolicy.assert_called_once_with(
{'ipsecpolicy': tools.purge_dict(fakes.OS_IPSECPOLICY_1, ('id',))})
self.db_api.add_item.assert_called_once_with(
mock.ANY, 'vpn',
tools.purge_dict(fakes.DB_VPN_CONNECTION_1, ('id',)),
project_id=None)
self.neutron.update_ikepolicy.assert_called_once_with(
fakes.ID_OS_IKEPOLICY_1,
{'ikepolicy': {'name': fakes.ID_EC2_VPN_CONNECTION_1}})
self.neutron.update_ipsecpolicy.assert_called_once_with(
fakes.ID_OS_IPSECPOLICY_1,
{'ipsecpolicy': {'name': fakes.ID_EC2_VPN_CONNECTION_1}})
def test_create_vpn_connection_idempotent(self):
self.set_mock_db_items(
fakes.DB_VPN_GATEWAY_1, fakes.DB_CUSTOMER_GATEWAY_1,
fakes.DB_VPN_CONNECTION_1)
resp = self.execute(
'CreateVpnConnection',
{'VpnGatewayId': fakes.ID_EC2_VPN_GATEWAY_1,
'CustomerGatewayId': fakes.ID_EC2_CUSTOMER_GATEWAY_1,
'Type': 'ipsec.1',
'Options.StaticRoutesOnly': 'True'})
self.assertThat({'vpnConnection': fakes.EC2_VPN_CONNECTION_1},
matchers.DictMatches(resp))
self.assertFalse(self.neutron.create_ikepolicy.called)
self.assertFalse(self.neutron.create_ipsecpolicy.called)
self.assertFalse(self.db_api.add_item.called)
def test_create_vpn_connection_invalid_parameters(self):
self.assert_execution_error(
'Unsupported', 'CreateVpnConnection',
{'VpnGatewayId': fakes.ID_EC2_VPN_GATEWAY_1,
'CustomerGatewayId': fakes.ID_EC2_CUSTOMER_GATEWAY_1,
'Type': 'ipsec.1',
'Options.StaticRoutesOnly': 'False'})
self.assert_execution_error(
'Unsupported', 'CreateVpnConnection',
{'VpnGatewayId': fakes.ID_EC2_VPN_GATEWAY_1,
'CustomerGatewayId': fakes.ID_EC2_CUSTOMER_GATEWAY_1,
'Type': 'ipsec.1'})
self.set_mock_db_items(fakes.DB_CUSTOMER_GATEWAY_1)
self.assert_execution_error(
'InvalidVpnGatewayID.NotFound', 'CreateVpnConnection',
{'VpnGatewayId': fakes.ID_EC2_VPN_GATEWAY_2,
'CustomerGatewayId': fakes.ID_EC2_CUSTOMER_GATEWAY_1,
'Type': 'ipsec.1',
'Options.StaticRoutesOnly': 'True'})
self.set_mock_db_items(fakes.DB_VPN_GATEWAY_1)
self.assert_execution_error(
'InvalidCustomerGatewayID.NotFound', 'CreateVpnConnection',
{'VpnGatewayId': fakes.ID_EC2_VPN_GATEWAY_2,
'CustomerGatewayId': fakes.ID_EC2_CUSTOMER_GATEWAY_1,
'Type': 'ipsec.1',
'Options.StaticRoutesOnly': 'True'})
self.set_mock_db_items(
fakes.DB_VPN_GATEWAY_2, fakes.DB_CUSTOMER_GATEWAY_1,
fakes.DB_VPN_CONNECTION_1)
self.assert_execution_error(
'InvalidCustomerGateway.DuplicateIpAddress', 'CreateVpnConnection',
{'VpnGatewayId': fakes.ID_EC2_VPN_GATEWAY_2,
'CustomerGatewayId': fakes.ID_EC2_CUSTOMER_GATEWAY_1,
'Type': 'ipsec.1',
'Options.StaticRoutesOnly': 'True'})
@tools.screen_unexpected_exception_logs
def test_create_vpn_connection_rollback(self):
self.set_mock_db_items(fakes.DB_VPN_GATEWAY_1,
fakes.DB_CUSTOMER_GATEWAY_1)
self.neutron.create_ikepolicy.side_effect = (
tools.get_neutron_create('ikepolicy', fakes.ID_OS_IKEPOLICY_1))
self.neutron.create_ipsecpolicy.side_effect = (
tools.get_neutron_create('ipsecpolicy', fakes.ID_OS_IPSECPOLICY_1))
self.db_api.add_item.side_effect = (
tools.get_db_api_add_item(fakes.ID_EC2_VPN_CONNECTION_1))
self.neutron.update_ikepolicy.side_effect = Exception()
self.assert_execution_error(
self.ANY_EXECUTE_ERROR, 'CreateVpnConnection',
{'VpnGatewayId': fakes.ID_EC2_VPN_GATEWAY_1,
'CustomerGatewayId': fakes.ID_EC2_CUSTOMER_GATEWAY_1,
'Type': 'ipsec.1',
'Options.StaticRoutesOnly': 'True'})
self.db_api.delete_item.assert_called_once_with(
mock.ANY, fakes.ID_EC2_VPN_CONNECTION_1)
self.neutron.delete_ipsecpolicy.assert_called_once_with(
fakes.ID_OS_IPSECPOLICY_1)
self.neutron.delete_ikepolicy.assert_called_once_with(
fakes.ID_OS_IKEPOLICY_1)
def test_delete_vpn_connection(self):
self.set_mock_db_items(fakes.DB_VPN_CONNECTION_1)
resp = self.execute('DeleteVpnConnection',
{'VpnConnectionId': fakes.ID_EC2_VPN_CONNECTION_1})
self.assertEqual({'return': True}, resp)
self.db_api.delete_item.assert_called_once_with(
mock.ANY, fakes.ID_EC2_VPN_CONNECTION_1)
self.neutron.delete_ipsecpolicy.assert_called_once_with(
fakes.ID_OS_IPSECPOLICY_1)
self.neutron.delete_ikepolicy.assert_called_once_with(
fakes.ID_OS_IKEPOLICY_1)
def test_delete_vpn_connection_invalid_parameters(self):
self.set_mock_db_items()
self.assert_execution_error(
'InvalidVpnConnectionID.NotFound', 'DeleteVpnConnection',
{'VpnConnectionId': fakes.ID_EC2_VPN_CONNECTION_1})
@tools.screen_unexpected_exception_logs
def test_delete_vpn_connection_rollback(self):
self.set_mock_db_items(fakes.DB_VPN_CONNECTION_1)
self.neutron.delete_ikepolicy.side_effect = Exception()
self.assert_execution_error(
self.ANY_EXECUTE_ERROR, 'DeleteVpnConnection',
{'VpnConnectionId': fakes.ID_EC2_VPN_CONNECTION_1})
self.db_api.restore_item.assert_called_once_with(
mock.ANY, 'vpn', fakes.DB_VPN_CONNECTION_1)
self.assertFalse(self.neutron.create_ipsecpolicy.called)
self.assertFalse(self.neutron.create_ikepolicy.called)
def test_describe_vpn_connections(self):
self.set_mock_db_items(fakes.DB_VPN_CONNECTION_1,
fakes.DB_VPN_CONNECTION_2)
resp = self.execute('DescribeVpnConnections', {})
self.assertThat(
resp,
matchers.DictMatches(
{'vpnConnectionSet': [fakes.EC2_VPN_CONNECTION_1,
fakes.EC2_VPN_CONNECTION_2]},
orderless_lists=True))
resp = self.execute(
'DescribeVpnConnections',
{'VpnConnectionId.1': fakes.ID_EC2_VPN_CONNECTION_1})
self.assertThat(
resp,
matchers.DictMatches(
{'vpnConnectionSet': [fakes.EC2_VPN_CONNECTION_1]},
orderless_lists=True))
self.check_filtering(
'DescribeVpnConnections', 'vpnConnectionSet',
[('customer-gateway-id', fakes.ID_EC2_CUSTOMER_GATEWAY_1),
('state', 'available'),
('option.static-routes-only', True),
('type', 'ipsec.1'),
('vpn-connection-id', fakes.ID_EC2_VPN_CONNECTION_1),
('vpn-gateway-id', fakes.ID_EC2_VPN_GATEWAY_1)])
self.check_tag_support(
'DescribeVpnConnections', 'vpnConnectionSet',
fakes.ID_EC2_VPN_CONNECTION_1, 'vpnConnectionId')

View File

@ -280,19 +280,21 @@ class VpnGatewayTestCase(base.ApiTestCase):
mock.ANY, fakes.ID_EC2_VPN_GATEWAY_2)
def test_delete_vpn_gateway_invalid_parameters(self):
def do_check(error_code):
self.assert_execution_error(
error_code, 'DeleteVpnGateway',
{'VpnGatewayId': fakes.ID_EC2_VPN_GATEWAY_1})
self.assertFalse(self.db_api.delete_item.called)
self.db_api.reset_mock()
self.set_mock_db_items()
do_check('InvalidVpnGatewayID.NotFound')
self.assert_execution_error(
'InvalidVpnGatewayID.NotFound', 'DeleteVpnGateway',
{'VpnGatewayId': fakes.ID_EC2_VPN_GATEWAY_1})
self.set_mock_db_items(fakes.DB_VPN_GATEWAY_1)
do_check('IncorrectState')
self.assert_execution_error(
'IncorrectState', 'DeleteVpnGateway',
{'VpnGatewayId': fakes.ID_EC2_VPN_GATEWAY_1})
self.set_mock_db_items(fakes.DB_VPN_GATEWAY_2,
fakes.DB_VPN_CONNECTION_2)
self.assert_execution_error(
'IncorrectState', 'DeleteVpnGateway',
{'VpnGatewayId': fakes.ID_EC2_VPN_GATEWAY_2})
def test_describe_vpn_gateways(self):
self.set_mock_db_items(fakes.DB_VPN_GATEWAY_1, fakes.DB_VPN_GATEWAY_2)