Validate peer_cidrs for ipsec_site_connections

When cidrs format of remote peer is incorrect, we should get
validate message from neutron-lib. Meanwhile this patch
add a validator for peer_cidrs in db.

Change-Id: Ia77208f9a6704b651929c35c85dbc227972014aa
Closes-Bug: #1633941
This commit is contained in:
Dongcan Ye 2016-10-17 20:32:36 +08:00
parent 5a1dba9e8d
commit c1adb319f1
4 changed files with 62 additions and 3 deletions

View File

@ -131,6 +131,14 @@ class VpnReferenceValidator(object):
raise vpnaas.MixedIPVersionsForIPSecEndpoints(group=group_id)
return ip_versions.pop()
def _check_peer_cidrs(self, peer_cidrs):
"""Ensure all CIDRs have the valid format."""
for peer_cidr in peer_cidrs:
msg = validators.validate_subnet(peer_cidr)
if msg:
raise vpnaas.IPsecSiteConnectionPeerCidrError(
peer_cidr=peer_cidr)
def _check_peer_cidrs_ip_versions(self, peer_cidrs):
"""Ensure all CIDRs have the same IP version."""
if len(peer_cidrs) == 1:
@ -231,8 +239,8 @@ class VpnReferenceValidator(object):
For legacy mode, we use the (sole) subnet IP version, and the peer
CIDR(s). All IP versions must be the same.
This method also checks MTU (based on the local IP version), and
DPD settings.
This method also checks peer_cidrs format(legacy mode),
MTU (based on the local IP version), and DPD settings.
"""
if not local_ip_version:
# Using endpoint groups
@ -246,6 +254,7 @@ class VpnReferenceValidator(object):
peer_ip_version = self._check_peer_endpoint_ip_versions(
ipsec_sitecon['peer_ep_group_id'], peer_cidrs)
else:
self._check_peer_cidrs(ipsec_sitecon['peer_cidrs'])
peer_ip_version = self._check_peer_cidrs_ip_versions(
ipsec_sitecon['peer_cidrs'])
self._validate_compatible_ip_versions(local_ip_version,

View File

@ -47,6 +47,11 @@ class IPsecSiteConnectionMtuError(nexception.InvalidInput):
"for ipv%(version)s")
class IPsecSiteConnectionPeerCidrError(nexception.InvalidInput):
message = _("ipsec_site_connection peer cidr %(peer_cidr)s is "
"invalid CIDR")
class IKEPolicyNotFound(nexception.NotFound):
message = _("IKEPolicy %(ikepolicy_id)s could not be found")
@ -166,7 +171,7 @@ class EndpointGroupInUse(nexception.BadRequest):
def _validate_subnet_list_or_none(data, key_specs=None):
if data is not None:
validators.validate_subnet_list(data, key_specs)
return validators.validate_subnet_list(data, key_specs)
validators.add_validator('type:subnet_list_or_none',
_validate_subnet_list_or_none)

View File

@ -484,6 +484,23 @@ class TestIPsecDriverValidation(base.BaseTestCase):
cidrs = self.validator._get_peer_cidrs(peer_epg)
self.assertEqual(expected_cidrs, cidrs)
def test_ipsec_conn_check_peer_cidrs(self):
peer_cidrs = ['10.10.10.0/24', '20.20.20.0/24']
try:
self.validator._check_peer_cidrs(peer_cidrs)
except Exception:
self.fail("All peer cidrs format should be valid")
def test_fail_ipsec_conn_peer_cidrs_with_invalid_format(self):
peer_cidrs = ['invalid_cidr']
self.assertRaises(vpnaas.IPsecSiteConnectionPeerCidrError,
self.validator._check_peer_cidrs,
peer_cidrs)
peer_cidrs = ['192/24']
self.assertRaises(vpnaas.IPsecSiteConnectionPeerCidrError,
self.validator._check_peer_cidrs,
peer_cidrs)
def test_fail_ipsec_conn_endpoint_group_types(self):
local_epg = {'id': 'should-be-subnets',
'type': v_constants.CIDR_ENDPOINT,

View File

@ -452,6 +452,34 @@ class VpnaasExtensionTestCase(base.ExtensionTestCase):
self._test_ipsec_site_connection_create(more_args=more_args,
defaulted_args=no_peer_cidrs)
def test_ipsec_site_connection_create_with_invalid_cidr_format(self):
peer_cidrs = ['192.168.2.0/24', '10/8']
data = {
'ipsec_site_connection': {'name': 'connection1',
'description': 'Remote-connection1',
'peer_address': '192.168.1.10',
'peer_id': '192.168.1.10',
'peer_cidrs': peer_cidrs,
'mtu': 1500,
'psk': 'abcd',
'initiator': 'bi-directional',
'dpd': {
'action': 'hold',
'interval': 30,
'timeout': 120},
'ikepolicy_id': _uuid(),
'ipsecpolicy_id': _uuid(),
'vpnservice_id': _uuid(),
'admin_state_up': True,
'tenant_id': _uuid()}
}
res = self.api.post(_get_path('vpn/ipsec-site-connections',
fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
expect_errors=True)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_ipsec_site_connection_list(self):
"""Test case to list all ipsec_site_connections."""
ipsecsite_con_id = _uuid()