Validation for classification attributes

This patch validate classification attributes for
security-group and firewall rules create.

Co-Authored-By: Mohankumar <mohankumar.n@huawei.com>
Change-Id: Iad3bb9792dbb08d21b5072f6740c2f622f5b706d
This commit is contained in:
Ramanjaneya 2015-12-11 16:45:00 +05:30 committed by vikram.choudhary
parent fb482eb6d5
commit 10b2eb3127
5 changed files with 340 additions and 39 deletions

View File

@ -13,8 +13,20 @@
# License for the specific language governing permissions and limitations
# under the License.
CLASSIFIER_TYPES = ['ip_classifier', 'ipv4_classifier', 'ipv6_classifier',
'transport_classifier', 'ethernet_classifier',
'encapsulation_classifier', 'neutron_port_classifier']
# Protocol names and numbers
PROTO_NAME_ICMP = 'icmp'
PROTO_NAME_ICMP_V6 = 'icmpv6'
PROTO_NAME_TCP = 'tcp'
PROTO_NAME_UDP = 'udp'
# TODO(sc68cal) add more protocols`
PROTOCOLS = ['tcp', 'udp', 'icmp', 'icmpv6']
PROTOCOLS = [PROTO_NAME_ICMP, PROTO_NAME_ICMP_V6,
PROTO_NAME_TCP, PROTO_NAME_UDP]
ENCAPSULATION_TYPES = ['vxlan', 'gre']

View File

@ -0,0 +1,39 @@
# Copyright (c) 2016 Huawei Technologies India Pvt Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
neutron-classifier exception handling.
"""
from neutron_lib import exceptions as nexceptions
class InvalidEthernetClassifier(nexceptions.NeutronException):
message = ('Invalid ethernet classifier value for %(eth_type)s.')
class EthertypeConflictWithProtocol(nexceptions.NeutronException):
message = ("Invalid ethertype %(ethertype)s for protocol %(protocol)s.")
class IpAddressConflict(nexceptions.NeutronException):
message = ("IP address do not agree with the given IP Version.")
class InvalidICMPParameter(nexceptions.NeutronException):
message = ("%(param)s are not allowed when protocol is set to ICMP.")
class InvalidPortRange(nexceptions.NeutronException):
message = ("Invalid port range %(port_range).")

View File

@ -15,6 +15,7 @@
from neutron_classifier.common import constants
from neutron_classifier.db import models
from neutron_classifier.db import validators
def security_group_ethertype_to_ethertype_value(ethertype):
@ -60,30 +61,37 @@ def convert_security_group_to_classifier(context, security_group):
def convert_security_group_rule_to_classifier(context, sgr, group):
# Pull the source from the SG rule
cl1 = models.IpClassifier()
cl1.source_ip_prefix = sgr['remote_ip_prefix']
# Ports
cl2 = models.TransportClassifier(
destination_port_range_min=sgr['port_range_min'],
destination_port_range_max=sgr['port_range_max'])
# Direction
cl3 = models.DirectionClassifier(
direction=sgr['direction'])
cl1 = cl2 = cl3 = cl4 = cl5 = None
# Ethertype
cl4 = models.EthernetClassifier()
cl4.ethertype = security_group_ethertype_to_ethertype_value(
sgr['ethertype'])
if validators.is_ethernetclassifier_valid(sgr, validators.SG_RULE_TYPE):
cl1 = models.EthernetClassifier()
cl1.ethertype = security_group_ethertype_to_ethertype_value(
sgr['ethertype'])
if cl4.ethertype == constants.ETHERTYPE_IPV6:
cl5 = models.Ipv6Classifier()
cl5.next_header = sgr['protocol']
else:
cl5 = models.Ipv4Classifier()
cl5.protocol = sgr['protocol']
# protocol
if validators.is_protocolclassifier_valid(sgr, validators.SG_RULE_TYPE):
if cl1 and cl1.ethertype == constants.ETHERTYPE_IPV6:
cl2 = models.Ipv6Classifier()
cl2.next_header = sgr['protocol']
else:
cl2 = models.Ipv4Classifier()
cl2.protocol = sgr['protocol']
# remote ip
if validators.is_ipclassifier_valid(sgr, validators.SG_RULE_TYPE):
cl3 = models.IpClassifier()
cl3.source_ip_prefix = sgr['remote_ip_prefix']
# Ports
if validators.is_transportclassifier_valid(sgr, validators.SG_RULE_TYPE):
cl4 = models.TransportClassifier(
destination_port_range_min=sgr['port_range_min'],
destination_port_range_max=sgr['port_range_max'])
# Direction
if validators.is_directionclassifier_valid(sgr, validators.SG_RULE_TYPE):
cl5 = models.DirectionClassifier(direction=sgr['direction'])
classifiers = [cl1, cl2, cl3, cl4, cl5]
create_classifier_chain(group, classifiers)
@ -128,30 +136,36 @@ def convert_firewall_policy_to_classifier(context, firewall):
return cgroup
def convert_firewall_rule_to_classifier(context, fw_rule, group):
def convert_firewall_rule_to_classifier(context, fwr, group):
cl1 = cl2 = cl3 = cl4 = None
# ip_version
cl1 = models.EthernetClassifier()
cl1.ethertype = fw_rule['ip_version']
if validators.is_ethernetclassifier_valid(fwr, validators.FW_RULE_TYPE):
cl1 = models.EthernetClassifier()
cl1.ethertype = fwr['ip_version']
# protocol
if cl1.ethertype == constants.IP_VERSION_6:
cl2 = models.Ipv6Classifier()
cl2.next_header = fw_rule['protocol']
else:
cl2 = models.Ipv4Classifier()
cl2.protocol = fw_rule['protocol']
if validators.is_protocolclassifier_valid(fwr, validators.FW_RULE_TYPE):
if cl1.ethertype == constants.IP_VERSION_6:
cl2 = models.Ipv6Classifier()
cl2.next_header = fwr['protocol']
else:
cl2 = models.Ipv4Classifier()
cl2.protocol = fwr['protocol']
# Source and destination ip
cl3 = models.IpClassifier()
cl3.source_ip_prefix = fw_rule['source_ip_address']
cl3.destination_ip_prefix = fw_rule['destination_ip_address']
if validators.is_ipclassifier_valid(fwr, validators.FW_RULE_TYPE):
cl3 = models.IpClassifier()
cl3.source_ip_prefix = fwr['source_ip_address']
cl3.destination_ip_prefix = fwr['destination_ip_address']
# Ports
cl4 = models.TransportClassifier(
source_port_range_min=fw_rule['source_port_range_min'],
source_port_range_max=fw_rule['source_port_range_max'],
destination_port_range_min=fw_rule['destination_port_range_min'],
destination_port_range_max=fw_rule['destination_port_range_max'])
if validators.is_transportclassifier_valid(fwr, validators.FW_RULE_TYPE):
cl4 = models.TransportClassifier(
source_port_range_min=fwr['source_port_range_min'],
source_port_range_max=fwr['source_port_range_max'],
destination_port_range_min=fwr['destination_port_range_min'],
destination_port_range_max=fwr['destination_port_range_max'])
classifiers = [cl1, cl2, cl3, cl4]
create_classifier_chain(group, classifiers)

View File

@ -0,0 +1,141 @@
# Copyright (c) 2016 Huawei Technologies India Pvt Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_classifier.common import constants as const
from neutron_classifier.common import exceptions as exc
import netaddr
SG_RULE_TYPE = 1
FW_RULE_TYPE = 2
def get_attr_value(dict, key):
return dict.get(key, None)
def _validate_fwr_protocol_parameters(fwr, protocol):
"""Check if given port values and protocol is valid."""
if protocol in (const.PROTO_NAME_ICMP, const.PROTO_NAME_ICMP_V6):
source_port_range_min = get_attr_value(fwr, 'source_port_range_min')
source_port_range_max = get_attr_value(fwr, 'source_port_range_max')
destination_port_range_min = get_attr_value(
fwr, 'destination_port_range_min')
destination_port_range_max = get_attr_value(
fwr, 'destination_port_range_max')
if (source_port_range_min or source_port_range_max or
destination_port_range_min or destination_port_range_max):
raise exc.InvalidICMPParameter(param="Source, destination port")
def _validate_sg_ethertype_and_protocol(rule, protocol):
"""Check if given ethertype and protocol is valid."""
eth_value = get_attr_value(rule, 'ethertype')
if protocol == const.PROTO_NAME_ICMP_V6:
if eth_value == const.SECURITYGROUP_ETHERTYPE_IPV4:
raise exc.EthertypeConflictWithProtocol(ethertype=eth_value,
protocol=protocol)
def validate_port_range(min_port, max_port):
"""Check that port_range is valid."""
port_range = '%s:%s' % (min_port, max_port)
if(min_port is None and
max_port is None):
return
if (int(min_port) <= 0 or int(max_port) <= 0):
raise exc.InvalidPortRange(port_range=port_range)
if int(min_port) > int(max_port):
raise exc.InvalidPortRange(port_range=port_range)
def is_ethernetclassifier_valid(rule, type):
"""Check ethertype or ip_version in rule dict."""
if type == SG_RULE_TYPE:
attr_type = 'ethertype'
attr_list = [const.SECURITYGROUP_ETHERTYPE_IPV4,
const.SECURITYGROUP_ETHERTYPE_IPV6]
else:
attr_type = 'ip_version'
attr_list = [const.IP_VERSION_4, const.IP_VERSION_6]
eth_value = get_attr_value(rule, attr_type)
if not eth_value:
return False
elif eth_value not in attr_list:
raise exc.InvalidEthernetClassifier(eth_type=attr_type)
return True
def is_protocolclassifier_valid(rule, type):
"""Check protocol in rule dict and validate dependent params"""
protocol = get_attr_value(rule, 'protocol')
if not protocol:
return False
if type == SG_RULE_TYPE:
_validate_sg_ethertype_and_protocol(rule, protocol)
else:
_validate_fwr_protocol_parameters(rule, protocol)
return True
def is_ipclassifier_valid(rule, type):
"""validate the ip address received in rule dict"""
src_ip_version = dst_ip_version = None
src_ip_address = dst_ip_address = None
if type == SG_RULE_TYPE:
dst_ip_address = get_attr_value(rule, 'remote_ip_prefix')
attr_type = 'ethertype'
else:
src_ip_address = get_attr_value(rule, 'source_ip_address')
dst_ip_address = get_attr_value(rule, 'destination_ip_address')
attr_type = 'ip_version'
if src_ip_address:
src_ip_version = netaddr.IPNetwork(src_ip_address).version
if dst_ip_address:
dst_ip_version = netaddr.IPNetwork(dst_ip_address).version
rule_ip_version = get_attr_value(rule, attr_type)
if type == SG_RULE_TYPE:
if rule_ip_version != "IPv%d" % dst_ip_version:
raise exc.IpAddressConflict()
elif ((src_ip_version and src_ip_version != rule_ip_version) or
(dst_ip_version and dst_ip_version != rule_ip_version)):
raise exc.IpAddressConflict()
return True
def is_directionclassifier_valid(rule, type):
"""Check direction param in rule dict"""
direction = get_attr_value(rule, 'direction')
if not direction:
return False
return True
def is_transportclassifier_valid(rule, type):
"""Verify port range values"""
if type == SG_RULE_TYPE:
port_range_min = get_attr_value(rule, 'port_range_min')
port_range_max = get_attr_value(rule, 'port_range_max')
validate_port_range(port_range_min, port_range_max)
else:
source_port_range_min = get_attr_value(rule, 'source_port_range_min')
source_port_range_max = get_attr_value(rule, 'source_port_range_max')
destination_port_range_min = get_attr_value(
rule, 'destination_port_range_min')
destination_port_range_max = get_attr_value(
rule, 'destination_port_range_max')
validate_port_range(source_port_range_min, source_port_range_max)
validate_port_range(destination_port_range_min,
destination_port_range_max)
return True

View File

@ -12,6 +12,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy as cp
from neutron_classifier.db import api
from neutron_classifier.db import models
import sqlalchemy as sa
@ -129,6 +131,54 @@ class DbApiTestCase(base.BaseTestCase):
result['tenant_id'] = FAKE_SG_RULE_V6['tenant_id']
self.assertEqual(FAKE_SG_RULE_V6, result)
def _test_convert_sg_rule_to_classifier_exception(self, sg_rule):
try:
self._test_convert_security_group_rule_to_classifier(sg_rule)
except Exception:
pass
def test_convert_sg_rule_to_classifier_with_no_ethertype(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
del FAKE_SG_RULE['ethertype']
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# test case for invalid ip-version
def test_convert_sg_rule_to_classifier_with_invalid_ethertype(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
FAKE_SG_RULE['ethertype'] = 'IPvx'
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# test case for protocol none
def test_convert_sg_rule_to_classifier_with_None_protocol(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
del FAKE_SG_RULE['protocol']
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# can not allow icmpv6 protocol with IPv4 version
def test_convert_sg_rule_to_classifier_with_icmpv6_protocol(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
FAKE_SG_RULE['protocol'] = 'icmpv6'
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# ip-version is 4 and remote ip as v6 address
def test_convert_sg_rule_to_classifier_with_invalid_remote_ipv6(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
FAKE_SG_RULE['remote_ip_prefix'] = 'fddf:cb3b:bc4::/48'
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# ip-version is 6 and remote ip as v4 address
def test_convert_sg_rule_to_classifier_with_invalid_dest_ipv4(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V6)
FAKE_SG_RULE['remote_ip_prefix'] = '1.2.3.4/24'
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# invalid port-range
def test_convert_sg_rule_to_classifier_with_invalid_port_range(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
FAKE_SG_RULE['port_range_min'] = 200
FAKE_SG_RULE['port_range_max'] = 10
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# Firewall testcases
def _test_convert_firewall_rule_to_classifier(self, fw_rule):
cg = self._create_classifier_group('neutron-fwaas')
@ -168,3 +218,48 @@ class DbApiTestCase(base.BaseTestCase):
result['action'] = FAKE_FW_RULE_V6['action']
result['enabled'] = FAKE_FW_RULE_V6['enabled']
self.assertEqual(FAKE_FW_RULE_V6, result)
def _test_convert_firewall_rule_to_classifier_exception(self, fw_rule):
try:
self._test_convert_firewall_rule_to_classifier(fw_rule)
except Exception:
pass
# test case for invalid ip-version
def test_convert_firewall_rule_to_classifier_with_invalid_ip_version(self):
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
FAKE_FW_RULE['ip_version'] = 5
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
# test case for protocol none
def test_convert_firewall_rule_to_classifier_with_None_protocol(self):
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
del FAKE_FW_RULE['protocol']
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
# icmp protocol with valid port range
def test_convert_firewall_rule_to_classifier_with_icmp_protocol(self):
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
FAKE_FW_RULE['protocol'] = 'icmp'
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
# ip-version is 4 and source ip as v6 address
def test_convert_firewall_rule_to_classifier_with_invalid_source_ip(self):
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
FAKE_FW_RULE['source_ip_address'] = 'fddf:cb3b:bc4::/48'
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
# ip-version is 6 and dest ip as v4 address
def test_convert_firewall_rule_to_classifier_with_invalid_dest_ip(self):
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V6)
FAKE_FW_RULE['destination_ip_address'] = '1.2.3.4/24'
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
# invalid port-range
def test_convert_firewall_rule_to_classifier_with_invalid_port_range(self):
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
FAKE_FW_RULE['source_port_range_min'] = 200
FAKE_FW_RULE['source_port_range_max'] = 10
FAKE_FW_RULE['destination_port_range_min'] = 100
FAKE_FW_RULE['destination_port_range_max'] = 10
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)