Validate peer_cidrs for ipsec_site_connections
When cidrs format of remote peer is incorrect, we should get validate message from neutron-lib. Meanwhile this patch add a validator for peer_cidrs in db. Change-Id: Ia77208f9a6704b651929c35c85dbc227972014aa Closes-Bug: #1633941
This commit is contained in:
parent
5a1dba9e8d
commit
c1adb319f1
|
@ -131,6 +131,14 @@ class VpnReferenceValidator(object):
|
|||
raise vpnaas.MixedIPVersionsForIPSecEndpoints(group=group_id)
|
||||
return ip_versions.pop()
|
||||
|
||||
def _check_peer_cidrs(self, peer_cidrs):
|
||||
"""Ensure all CIDRs have the valid format."""
|
||||
for peer_cidr in peer_cidrs:
|
||||
msg = validators.validate_subnet(peer_cidr)
|
||||
if msg:
|
||||
raise vpnaas.IPsecSiteConnectionPeerCidrError(
|
||||
peer_cidr=peer_cidr)
|
||||
|
||||
def _check_peer_cidrs_ip_versions(self, peer_cidrs):
|
||||
"""Ensure all CIDRs have the same IP version."""
|
||||
if len(peer_cidrs) == 1:
|
||||
|
@ -231,8 +239,8 @@ class VpnReferenceValidator(object):
|
|||
For legacy mode, we use the (sole) subnet IP version, and the peer
|
||||
CIDR(s). All IP versions must be the same.
|
||||
|
||||
This method also checks MTU (based on the local IP version), and
|
||||
DPD settings.
|
||||
This method also checks peer_cidrs format(legacy mode),
|
||||
MTU (based on the local IP version), and DPD settings.
|
||||
"""
|
||||
if not local_ip_version:
|
||||
# Using endpoint groups
|
||||
|
@ -246,6 +254,7 @@ class VpnReferenceValidator(object):
|
|||
peer_ip_version = self._check_peer_endpoint_ip_versions(
|
||||
ipsec_sitecon['peer_ep_group_id'], peer_cidrs)
|
||||
else:
|
||||
self._check_peer_cidrs(ipsec_sitecon['peer_cidrs'])
|
||||
peer_ip_version = self._check_peer_cidrs_ip_versions(
|
||||
ipsec_sitecon['peer_cidrs'])
|
||||
self._validate_compatible_ip_versions(local_ip_version,
|
||||
|
|
|
@ -47,6 +47,11 @@ class IPsecSiteConnectionMtuError(nexception.InvalidInput):
|
|||
"for ipv%(version)s")
|
||||
|
||||
|
||||
class IPsecSiteConnectionPeerCidrError(nexception.InvalidInput):
|
||||
message = _("ipsec_site_connection peer cidr %(peer_cidr)s is "
|
||||
"invalid CIDR")
|
||||
|
||||
|
||||
class IKEPolicyNotFound(nexception.NotFound):
|
||||
message = _("IKEPolicy %(ikepolicy_id)s could not be found")
|
||||
|
||||
|
@ -166,7 +171,7 @@ class EndpointGroupInUse(nexception.BadRequest):
|
|||
|
||||
def _validate_subnet_list_or_none(data, key_specs=None):
|
||||
if data is not None:
|
||||
validators.validate_subnet_list(data, key_specs)
|
||||
return validators.validate_subnet_list(data, key_specs)
|
||||
|
||||
validators.add_validator('type:subnet_list_or_none',
|
||||
_validate_subnet_list_or_none)
|
||||
|
|
|
@ -484,6 +484,23 @@ class TestIPsecDriverValidation(base.BaseTestCase):
|
|||
cidrs = self.validator._get_peer_cidrs(peer_epg)
|
||||
self.assertEqual(expected_cidrs, cidrs)
|
||||
|
||||
def test_ipsec_conn_check_peer_cidrs(self):
|
||||
peer_cidrs = ['10.10.10.0/24', '20.20.20.0/24']
|
||||
try:
|
||||
self.validator._check_peer_cidrs(peer_cidrs)
|
||||
except Exception:
|
||||
self.fail("All peer cidrs format should be valid")
|
||||
|
||||
def test_fail_ipsec_conn_peer_cidrs_with_invalid_format(self):
|
||||
peer_cidrs = ['invalid_cidr']
|
||||
self.assertRaises(vpnaas.IPsecSiteConnectionPeerCidrError,
|
||||
self.validator._check_peer_cidrs,
|
||||
peer_cidrs)
|
||||
peer_cidrs = ['192/24']
|
||||
self.assertRaises(vpnaas.IPsecSiteConnectionPeerCidrError,
|
||||
self.validator._check_peer_cidrs,
|
||||
peer_cidrs)
|
||||
|
||||
def test_fail_ipsec_conn_endpoint_group_types(self):
|
||||
local_epg = {'id': 'should-be-subnets',
|
||||
'type': v_constants.CIDR_ENDPOINT,
|
||||
|
|
|
@ -452,6 +452,34 @@ class VpnaasExtensionTestCase(base.ExtensionTestCase):
|
|||
self._test_ipsec_site_connection_create(more_args=more_args,
|
||||
defaulted_args=no_peer_cidrs)
|
||||
|
||||
def test_ipsec_site_connection_create_with_invalid_cidr_format(self):
|
||||
peer_cidrs = ['192.168.2.0/24', '10/8']
|
||||
data = {
|
||||
'ipsec_site_connection': {'name': 'connection1',
|
||||
'description': 'Remote-connection1',
|
||||
'peer_address': '192.168.1.10',
|
||||
'peer_id': '192.168.1.10',
|
||||
'peer_cidrs': peer_cidrs,
|
||||
'mtu': 1500,
|
||||
'psk': 'abcd',
|
||||
'initiator': 'bi-directional',
|
||||
'dpd': {
|
||||
'action': 'hold',
|
||||
'interval': 30,
|
||||
'timeout': 120},
|
||||
'ikepolicy_id': _uuid(),
|
||||
'ipsecpolicy_id': _uuid(),
|
||||
'vpnservice_id': _uuid(),
|
||||
'admin_state_up': True,
|
||||
'tenant_id': _uuid()}
|
||||
}
|
||||
res = self.api.post(_get_path('vpn/ipsec-site-connections',
|
||||
fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/%s' % self.fmt,
|
||||
expect_errors=True)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_ipsec_site_connection_list(self):
|
||||
"""Test case to list all ipsec_site_connections."""
|
||||
ipsecsite_con_id = _uuid()
|
||||
|
|
Loading…
Reference in New Issue