Use ICMPV6_TYPE_* constants from neutron-lib

Replace ICMPv6 type literal and Neutron constants with ICMPV6_TYPE_*
constants from neutron-lib.

Move ICMPV6_ALLOWED_TYPES from neutron constants, into agent/firewall.py
since it defines the policy of which ICMPv6 types to permit ingress
through security groups use by various firewall implementations.

In addition, this patch removes the ICMPV6_TYPE_NC constant represent
for ICMPv6 Neighbor Solicitation messages, it was introduced for the OVS
Firewall, and reference to it was removed in
I8f925afa50f36d073f52bd03954939ca14c505d7.

Change-Id: I6234d0c63ee5c93900c65df743535b1c0ce00256
This commit is contained in:
Dustin Lundquist 2016-04-27 10:43:45 -05:00
parent 53fc477703
commit 358f459b06
6 changed files with 22 additions and 34 deletions

View File

@ -18,6 +18,8 @@ import contextlib
import six
from neutron_lib import constants as n_const
import neutron.common.constants as const
from neutron.common import utils
from neutron.extensions import portsecurity as psec
@ -28,6 +30,14 @@ EGRESS_DIRECTION = const.EGRESS_DIRECTION
DIRECTION_IP_PREFIX = {INGRESS_DIRECTION: 'source_ip_prefix',
EGRESS_DIRECTION: 'dest_ip_prefix'}
# List of ICMPv6 types that should be permitted (ingress) by default. This list
# depends on iptables conntrack behavior of recognizing ICMP errors (types 1-4)
# as related traffic.
ICMPV6_ALLOWED_TYPES = [n_const.ICMPV6_TYPE_MLD_QUERY,
n_const.ICMPV6_TYPE_RA,
n_const.ICMPV6_TYPE_NS,
n_const.ICMPV6_TYPE_NA]
def port_sec_enabled(port):
return port.get(psec.PORTSECURITY, True)

View File

@ -41,7 +41,6 @@ SPOOF_FILTER = 'spoof-filter'
CHAIN_NAME_PREFIX = {firewall.INGRESS_DIRECTION: 'i',
firewall.EGRESS_DIRECTION: 'o',
SPOOF_FILTER: 's'}
ICMPV6_ALLOWED_UNSPEC_ADDR_TYPES = [131, 135, 143]
IPSET_DIRECTION = {firewall.INGRESS_DIRECTION: 'src',
firewall.EGRESS_DIRECTION: 'dst'}
# length of all device prefixes (e.g. qvo, tap, qvb)
@ -394,7 +393,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
'-j RETURN', comment=ic.DHCP_CLIENT)]
# Allow neighbor solicitation and multicast listener discovery
# from the unspecified address for duplicate address detection
for icmp6_type in ICMPV6_ALLOWED_UNSPEC_ADDR_TYPES:
for icmp6_type in constants.ICMPV6_ALLOWED_UNSPEC_ADDR_TYPES:
ipv6_rules += [comment_rule('-s ::/128 -d ff02::/16 '
'-p ipv6-icmp -m icmp6 '
'--icmpv6-type %s -j RETURN' %
@ -448,7 +447,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
# Allow multicast listener, neighbor solicitation and
# neighbor advertisement into the instance
icmpv6_rules = []
for icmp6_type in n_const.ICMPV6_ALLOWED_TYPES:
for icmp6_type in firewall.ICMPV6_ALLOWED_TYPES:
icmpv6_rules += ['-p ipv6-icmp -m icmp6 --icmpv6-type %s '
'-j RETURN' % icmp6_type]
return icmpv6_rules

View File

@ -368,7 +368,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
self._initialize_ingress(port)
def _initialize_egress_ipv6_icmp(self, port):
for icmp_type in constants.ICMPV6_ALLOWED_TYPES:
for icmp_type in firewall.ICMPV6_ALLOWED_TYPES:
self._add_flow(
table=ovs_consts.BASE_EGRESS_TABLE,
priority=95,
@ -547,7 +547,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
)
def _initialize_ingress_ipv6_icmp(self, port):
for icmp_type in constants.ICMPV6_ALLOWED_TYPES:
for icmp_type in firewall.ICMPV6_ALLOWED_TYPES:
self._add_flow(
table=ovs_consts.BASE_INGRESS_TABLE,
priority=100,

View File

@ -67,19 +67,6 @@ VALID_DSCP_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34,
IP_PROTOCOL_NUM_TO_NAME_MAP = {
str(v): k for k, v in lib_constants.IP_PROTOCOL_MAP.items()}
# Neighbor Solicitation (135),
ICMPV6_TYPE_NC = 135
# List of ICMPv6 types that should be permitted (ingress) by default. This list
# depends on iptables conntrack behavior of recognizing ICMP errors (types 1-4)
# as related traffic.
# Multicast Listener Query (130),
# Router Advertisement (134),
# Neighbor Solicitation (135),
# Neighbor Advertisement (136)
# TODO(dlundquist): use ICMPV6_TYPE_* constants from neutron-lib
ICMPV6_ALLOWED_TYPES = [130, 134, 135, 136]
DHCPV6_STATEFUL = 'dhcpv6-stateful'
DHCPV6_STATELESS = 'dhcpv6-stateless'
IPV6_SLAAC = 'slaac'

View File

@ -22,11 +22,11 @@ import six
import testtools
from neutron.agent.common import config as a_cfg
from neutron.agent import firewall
from neutron.agent.linux import ipset_manager
from neutron.agent.linux import iptables_comments as ic
from neutron.agent.linux import iptables_firewall
from neutron.agent import securitygroups_rpc as sg_cfg
from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
from neutron.common import utils
from neutron.tests import base
@ -954,21 +954,12 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
filter_inst = self.v6filter_inst
dhcp_rule = [mock.call.add_rule('ofake_dev',
'-s ::/128 -d ff02::/16 '
'-p ipv6-icmp -m icmp6 '
'--icmpv6-type 131 -j RETURN',
comment=None),
mock.call.add_rule('ofake_dev',
'-s ::/128 -d ff02::/16 '
'-p ipv6-icmp -m icmp6 '
'--icmpv6-type %s -j RETURN' %
n_const.ICMPV6_TYPE_NC,
comment=None),
mock.call.add_rule('ofake_dev',
'-s ::/128 -d ff02::/16 '
'-p ipv6-icmp -m icmp6 '
'--icmpv6-type 143 -j RETURN',
comment=None)]
icmp6_type,
comment=None) for icmp6_type
in constants.ICMPV6_ALLOWED_UNSPEC_ADDR_TYPES]
sg = [rule]
port['security_group_rules'] = sg
self.firewall.prepare_port_filter(port)
@ -991,7 +982,7 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
comment=ic.SG_TO_VM_SG)
]
if ethertype == 'IPv6':
for icmp6_type in n_const.ICMPV6_ALLOWED_TYPES:
for icmp6_type in firewall.ICMPV6_ALLOWED_TYPES:
calls.append(
mock.call.add_rule('ifake_dev',
'-p ipv6-icmp -m icmp6 --icmpv6-type '

View File

@ -232,8 +232,9 @@ class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase):
self.br.delete_arp_spoofing_protection(port)
expected = [
call.delete_flows(table_id=0, in_port=8888, proto='arp'),
call.delete_flows(table_id=0, in_port=8888, icmp_type=136,
nw_proto=58),
call.delete_flows(table_id=0, in_port=8888,
icmp_type=const.ICMPV6_TYPE_NA,
nw_proto=const.PROTO_NUM_IPV6_ICMP),
call.delete_flows(table_id=24, in_port=8888),
]
self.assertEqual(expected, self.mock.mock_calls)