Merge "Validate ethertype for icmp protocols"

This commit is contained in:
Jenkins 2015-10-24 01:31:05 +00:00 committed by Gerrit Code Review
commit 554b5d96cd
3 changed files with 24 additions and 0 deletions

View File

@ -446,6 +446,13 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
raise ext_sg.SecurityGroupMissingIcmpType(
value=rule['port_range_max'])
def _validate_ethertype_and_protocol(self, rule):
"""Check if given ethertype and protocol are valid or not"""
if rule['protocol'] == constants.PROTO_NAME_ICMP_V6:
if rule['ethertype'] == constants.IPv4:
raise ext_sg.SecurityGroupEthertypeConflictWithProtocol(
ethertype=rule['ethertype'], protocol=rule['protocol'])
def _validate_single_tenant_and_group(self, security_group_rules):
"""Check that all rules belong to the same security group and tenant
"""
@ -466,6 +473,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
rule = security_group_rule['security_group_rule']
self._validate_port_range(rule)
self._validate_ip_prefix(rule)
self._validate_ethertype_and_protocol(rule)
if rule['remote_ip_prefix'] and rule['remote_group_id']:
raise ext_sg.SecurityGroupRemoteGroupAndRemoteIpPrefix()

View File

@ -44,6 +44,11 @@ class SecurityGroupInvalidIcmpValue(nexception.InvalidInput):
"%(value)s. It must be 0 to 255.")
class SecurityGroupEthertypeConflictWithProtocol(nexception.InvalidInput):
message = ("Invalid ethertype %(ethertype)s for protocol "
"%(protocol)s .")
class SecurityGroupMissingIcmpType(nexception.InvalidInput):
message = _("ICMP code (port-range-max) %(value)s is provided"
" but ICMP type (port-range-min) is missing.")

View File

@ -420,6 +420,17 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_ethertype_invalid_for_protocol(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
rule = self._build_security_group_rule(
security_group_id, 'ingress', const.PROTO_NAME_ICMP_V6)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_invalid_ip_prefix(self):
name = 'webservers'
description = 'my webservers'