Merge "ICMPv6 is not an available protocol when creating firewall rule"

This commit is contained in:
Zuul 2018-11-29 22:19:39 +00:00 committed by Gerrit Code Review
commit 276b9fb6c4
2 changed files with 110 additions and 13 deletions

View File

@ -244,7 +244,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
if not rule['enabled']:
continue
iptbl_rule = self._convert_fwaas_to_iptables_rule(rule)
if rule['ip_version'] == 4:
if rule['ip_version'] == constants.IP_VERSION_4:
ver = IPV4
table = ipt_mgr.ipv4['filter']
else:
@ -258,7 +258,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
if not rule['enabled']:
continue
iptbl_rule = self._convert_fwaas_to_iptables_rule(rule)
if rule['ip_version'] == 4:
if rule['ip_version'] == constants.IP_VERSION_4:
ver = IPV4
table = ipt_mgr.ipv4['filter']
else:
@ -350,39 +350,45 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
def _add_accepted_chain_v4v6(self, ipt_mgr):
v4rules_in_chain = \
ipt_mgr.get_chain("filter", ACCEPTED_CHAIN, ip_version=4)
ipt_mgr.get_chain("filter", ACCEPTED_CHAIN,
ip_version=constants.IP_VERSION_4)
if not v4rules_in_chain:
ipt_mgr.ipv4['filter'].add_chain(ACCEPTED_CHAIN)
ipt_mgr.ipv4['filter'].add_rule(ACCEPTED_CHAIN, '-j ACCEPT')
v6rules_in_chain = \
ipt_mgr.get_chain("filter", ACCEPTED_CHAIN, ip_version=6)
ipt_mgr.get_chain("filter", ACCEPTED_CHAIN,
ip_version=constants.IP_VERSION_6)
if not v6rules_in_chain:
ipt_mgr.ipv6['filter'].add_chain(ACCEPTED_CHAIN)
ipt_mgr.ipv6['filter'].add_rule(ACCEPTED_CHAIN, '-j ACCEPT')
def _add_dropped_chain_v4v6(self, ipt_mgr):
v4rules_in_chain = \
ipt_mgr.get_chain("filter", DROPPED_CHAIN, ip_version=4)
ipt_mgr.get_chain("filter", DROPPED_CHAIN,
ip_version=constants.IP_VERSION_4)
if not v4rules_in_chain:
ipt_mgr.ipv4['filter'].add_chain(DROPPED_CHAIN)
ipt_mgr.ipv4['filter'].add_rule(DROPPED_CHAIN, '-j DROP')
v6rules_in_chain = \
ipt_mgr.get_chain("filter", DROPPED_CHAIN, ip_version=6)
ipt_mgr.get_chain("filter", DROPPED_CHAIN,
ip_version=constants.IP_VERSION_6)
if not v6rules_in_chain:
ipt_mgr.ipv6['filter'].add_chain(DROPPED_CHAIN)
ipt_mgr.ipv6['filter'].add_rule(DROPPED_CHAIN, '-j DROP')
def _add_rejected_chain_v4v6(self, ipt_mgr):
v4rules_in_chain = \
ipt_mgr.get_chain("filter", REJECTED_CHAIN, ip_version=4)
ipt_mgr.get_chain("filter", REJECTED_CHAIN,
ip_version=constants.IP_VERSION_4)
if not v4rules_in_chain:
ipt_mgr.ipv4['filter'].add_chain(REJECTED_CHAIN)
ipt_mgr.ipv4['filter'].add_rule(REJECTED_CHAIN, '-j REJECT')
v6rules_in_chain = \
ipt_mgr.get_chain("filter", REJECTED_CHAIN, ip_version=6)
ipt_mgr.get_chain("filter", REJECTED_CHAIN,
ip_version=constants.IP_VERSION_6)
if not v6rules_in_chain:
ipt_mgr.ipv6['filter'].add_chain(REJECTED_CHAIN)
ipt_mgr.ipv6['filter'].add_rule(REJECTED_CHAIN, '-j REJECT')
@ -457,7 +463,8 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
# and readding rules.
args = []
args += self._protocol_arg(rule.get('protocol'))
args += self._protocol_arg(rule.get('protocol'),
rule.get('ip_version'))
args += self._ip_prefix_arg('s', rule.get('source_ip_address'))
args += self._ip_prefix_arg('d', rule.get('destination_ip_address'))
@ -496,10 +503,14 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
return args
def _protocol_arg(self, protocol):
def _protocol_arg(self, protocol, ip_version):
if not protocol:
return []
if (protocol == constants.PROTO_NAME_ICMP and
ip_version == constants.IP_VERSION_6):
protocol = constants.PROTO_NAME_IPV6_ICMP
args = ['-p', protocol]
return args
@ -508,15 +519,18 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
if not protocol:
return []
protocol_modules = {'udp': 'udp', 'tcp': 'tcp',
'icmp': 'icmp', 'ipv6-icmp': 'icmp6'}
protocol_modules = {constants.PROTO_NAME_UDP: 'udp',
constants.PROTO_NAME_TCP: 'tcp',
constants.PROTO_NAME_ICMP: 'icmp',
constants.PROTO_NAME_IPV6_ICMP: 'icmp6'}
# iptables adds '-m protocol' when the port number is specified
args = ['-m', protocol_modules[protocol]]
return args
def _port_arg(self, direction, protocol, port):
if protocol not in ['udp', 'tcp'] or port is None:
if protocol not in [constants.PROTO_NAME_UDP,
constants.PROTO_NAME_TCP] or port is None:
return []
args = ['--%s' % direction, '%s' % port]

View File

@ -77,6 +77,23 @@ class IptablesFwaasTestCase(base.BaseTestCase):
rule_list.append(rule3)
return rule_list
def _fake_rules_v6(self, fwid, apply_list):
rule_list = []
rule1 = {'enabled': True,
'action': 'allow',
'ip_version': 6,
'protocol': 'icmp',
'destination_ip_address': '2001:db8::2',
'id': 'fake-fw-rule1'}
ingress_chain = ('iv6%s' % fwid)[:11]
egress_chain = ('ov6%s' % fwid)[:11]
for router_info_inst, port_ids in apply_list:
v6filter_inst = router_info_inst.iptables_manager.ipv6['filter']
v6filter_inst.chains.append(ingress_chain)
v6filter_inst.chains.append(egress_chain)
rule_list.append(rule1)
return rule_list
def _fake_firewall_no_rule(self):
rule_list = []
fw_inst = {'id': FAKE_FW_ID,
@ -201,6 +218,66 @@ class IptablesFwaasTestCase(base.BaseTestCase):
intf_name, binary_name)))
v4filter_inst.assert_has_calls(calls)
def _setup_firewall_with_rules_v6(self, func, router_count=1,
distributed=False, distributed_mode=None):
apply_list = self._fake_apply_list(router_count=router_count,
distributed=distributed, distributed_mode=distributed_mode)
rule_list = self._fake_rules_v6(FAKE_FW_ID, apply_list)
firewall = self._fake_firewall(rule_list)
if distributed:
if distributed_mode == 'dvr_snat':
if_prefix = 'sg-'
if distributed_mode == 'dvr':
if_prefix = 'rfp-'
else:
if_prefix = 'qr-'
distributed_mode = 'legacy'
func(distributed_mode, apply_list, firewall)
binary_name = fwaas.iptables_manager.binary_name
dropped = '%s-dropped' % binary_name
accepted = '%s-accepted' % binary_name
invalid_rule = '-m state --state INVALID -j %s' % dropped
est_rule = '-m state --state RELATED,ESTABLISHED -j ACCEPT'
rule1 = '-p ipv6-icmp -d 2001:db8::2/128 -j %s' % accepted
ingress_chain = 'iv6%s' % firewall['id']
egress_chain = 'ov6%s' % firewall['id']
ipt_mgr_ichain = '%s-%s' % (binary_name, ingress_chain[:11])
ipt_mgr_echain = '%s-%s' % (binary_name, egress_chain[:11])
for router_info_inst, port_ids in apply_list:
v6filter_inst = router_info_inst.iptables_manager.ipv6['filter']
calls = [mock.call.remove_chain('iv6fake-fw-uuid'),
mock.call.remove_chain('ov6fake-fw-uuid'),
mock.call.remove_chain('fwaas-default-policy'),
mock.call.add_chain('fwaas-default-policy'),
mock.call.add_rule(
'fwaas-default-policy', '-j %s' % dropped),
mock.call.add_chain(ingress_chain),
mock.call.add_rule(ingress_chain, invalid_rule),
mock.call.add_rule(ingress_chain, est_rule),
mock.call.add_chain(egress_chain),
mock.call.add_rule(egress_chain, invalid_rule),
mock.call.add_rule(egress_chain, est_rule),
mock.call.add_rule(ingress_chain, rule1),
mock.call.add_rule(egress_chain, rule1)
]
for port in FAKE_PORT_IDS:
intf_name = self._get_intf_name(if_prefix, port)
calls.append(mock.call.add_rule('FORWARD',
'-o %s -j %s' % (intf_name, ipt_mgr_ichain)))
for port in FAKE_PORT_IDS:
intf_name = self._get_intf_name(if_prefix, port)
calls.append(mock.call.add_rule('FORWARD',
'-i %s -j %s' % (intf_name, ipt_mgr_echain)))
for direction in ['o', 'i']:
for port_id in FAKE_PORT_IDS:
intf_name = self._get_intf_name(if_prefix, port_id)
calls.append(mock.call.add_rule('FORWARD',
'-%s %s -j %s-fwaas-defau' % (direction,
intf_name, binary_name)))
v6filter_inst.assert_has_calls(calls)
def test_create_firewall_group_no_rules(self):
apply_list = self._fake_apply_list()
first_ri = apply_list[0][0]
@ -244,6 +321,9 @@ class IptablesFwaasTestCase(base.BaseTestCase):
def test_create_firewall_group_with_rules(self):
self._setup_firewall_with_rules(self.firewall.create_firewall_group)
def test_create_firewall_group_with_rules_v6(self):
self._setup_firewall_with_rules_v6(self.firewall.create_firewall_group)
def test_create_firewall_group_with_rules_without_distributed_attr(self):
self._setup_firewall_with_rules(self.firewall.create_firewall_group,
distributed=None)
@ -255,6 +335,9 @@ class IptablesFwaasTestCase(base.BaseTestCase):
def test_update_firewall_group_with_rules(self):
self._setup_firewall_with_rules(self.firewall.update_firewall_group)
def test_update_firewall_group_with_rules_v6(self):
self._setup_firewall_with_rules_v6(self.firewall.update_firewall_group)
def test_update_firewall_group_with_rules_without_distributed_attr(self):
self._setup_firewall_with_rules(self.firewall.update_firewall_group,
distributed=None)