Merge "Add rate limiter to icmp handler(DNAT)"

This commit is contained in:
Jenkins 2017-02-12 10:08:05 +00:00 committed by Gerrit Code Review
commit de74522e80
4 changed files with 177 additions and 25 deletions

View File

@ -27,6 +27,11 @@ df_dnat_app_opts = [
cfg.StrOpt('ex_peer_patch_port', default='patch-int',
help=_("Peer patch port in external bridge for integration "
"bridge.")),
cfg.IntOpt('dnat_ttl_invalid_max_rate', default=3,
help=_('Max rate to reply ICMP time exceeded message per '
'second.')),
cfg.IntOpt('dnat_icmp_error_max_rate', default=3,
help=_('Max rate to handle ICMP error message per second.')),
]

View File

@ -26,6 +26,7 @@ from ryu.lib.packet import packet
from ryu.ofproto import ether
from dragonflow._i18n import _LW
from dragonflow.common import utils as df_utils
from dragonflow import conf as cfg
from dragonflow.controller.common import arp_responder
from dragonflow.controller.common import constants as const
@ -51,12 +52,25 @@ class DNATApp(df_base_app.DFlowApp):
cfg.CONF.df_dnat_app.external_network_bridge
self.external_bridge_mac = ""
self.integration_bridge = cfg.CONF.df.integration_bridge
self.int_peer_patch_port = cfg.CONF.df_dnat_app.int_peer_patch_port
self.ex_peer_patch_port = cfg.CONF.df_dnat_app.ex_peer_patch_port
self.conf = cfg.CONF.df_dnat_app
self.int_peer_patch_port = self.conf.int_peer_patch_port
self.ex_peer_patch_port = self.conf.ex_peer_patch_port
self.external_networks = collections.defaultdict(int)
self.local_floatingips = collections.defaultdict(str)
# Map between fixed ip mac to floating ip
self.floatingip_rarp_cache = {}
self.egress_ttl_invalid_handler_rate_limit = df_utils.RateLimiter(
max_rate=self.conf.dnat_ttl_invalid_max_rate,
time_unit=1)
self.ingress_ttl_invalid_handler_rate_limit = df_utils.RateLimiter(
max_rate=self.conf.dnat_ttl_invalid_max_rate,
time_unit=1)
self.egress_icmp_error_rate_limit = df_utils.RateLimiter(
max_rate=self.conf.dnat_icmp_error_max_rate,
time_unit=1)
self.ingress_icmp_error_rate_limit = df_utils.RateLimiter(
max_rate=self.conf.dnat_icmp_error_max_rate,
time_unit=1)
self.api.register_table_handler(const.INGRESS_NAT_TABLE,
self.ingress_packet_in_handler)
self.api.register_table_handler(const.EGRESS_NAT_TABLE,
@ -76,12 +90,26 @@ class DNATApp(df_base_app.DFlowApp):
if msg.reason == ofproto.OFPR_INVALID_TTL:
LOG.debug("Get an invalid TTL packet at table %s",
const.INGRESS_NAT_TABLE)
if self.ingress_ttl_invalid_handler_rate_limit():
LOG.warning(_LW("Get more than %(rate)s TTL invalid "
"packets per second at table %(table)s"),
{'rate': self.conf.dnat_ttl_invalid_max_rate,
'table': const.INGRESS_NAT_TABLE})
return
icmp_ttl_pkt = icmp_error_generator.generate(
icmp.ICMP_TIME_EXCEEDED, icmp.ICMP_TTL_EXPIRED_CODE, msg.data)
in_port = msg.match.get('in_port')
self.send_packet(in_port, icmp_ttl_pkt)
return
if self.ingress_icmp_error_rate_limit():
LOG.warning(_LW("Get more than %(rate)s ICMP error messages "
"per second at table %(table)s"),
{'rate': self.conf.dnat_icmp_error_max_rate,
'table': const.INGRESS_NAT_TABLE})
return
pkt = packet.Packet(msg.data)
reply_pkt = self._revert_nat_for_icmp_embedded_packet(pkt, INGRESS)
out_port = msg.match.get('reg7')
@ -90,12 +118,19 @@ class DNATApp(df_base_app.DFlowApp):
def egress_packet_in_handler(self, event):
msg = event.msg
ofproto = self.ofproto
pkt = packet.Packet(msg.data)
e_pkt = pkt.get_protocol(ethernet.ethernet)
if msg.reason == ofproto.OFPR_INVALID_TTL:
LOG.debug("Get an invalid TTL packet at table %s",
const.EGRESS_NAT_TABLE)
if self.egress_ttl_invalid_handler_rate_limit():
LOG.warning(_LW("Get more than %(rate)s TTL invalid "
"packets per second at table %(table)s"),
{'rate': self.conf.dnat_ttl_invalid_max_rate,
'table': const.EGRESS_NAT_TABLE})
return
pkt = packet.Packet(msg.data)
e_pkt = pkt.get_protocol(ethernet.ethernet)
floatingip = self.floatingip_rarp_cache.get(e_pkt.src)
if floatingip:
icmp_ttl_pkt = icmp_error_generator.generate(
@ -109,6 +144,14 @@ class DNATApp(df_base_app.DFlowApp):
return
if self.external_bridge_mac:
if self.ingress_icmp_error_rate_limit():
LOG.warning(_LW("Get more than %(rate)s ICMP error messages "
"per second at table %(table)s"),
{'rate': self.conf.dnat_icmp_error_max_rate,
'table': const.INGRESS_NAT_TABLE})
return
pkt = packet.Packet(msg.data)
reply_pkt = self._revert_nat_for_icmp_embedded_packet(pkt, EGRESS)
self.send_packet(self.external_ofport, reply_pkt)

View File

@ -442,6 +442,12 @@ class Router(object):
self.router.close()
class TimeoutException(Exception):
def __init__(self):
super(TimeoutException, self).__init__('Timeout')
class Policy(object):
"""Represent a policy, i.e. the expected packets on each port in the
topology, and the actions to take in each case.
@ -513,7 +519,7 @@ class Policy(object):
:param timeout: After this many seconds, throw an exception
:type timeout: Number
"""
exception = Exception('Timeout')
exception = TimeoutException()
if timeout is not None:
entry_time = time.time()
for thread in self.threads:

View File

@ -572,16 +572,15 @@ class TestDHCPApp(test_base.DFTestBase):
)
)
policy.start(self.topology)
try:
policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT)
except Exception:
# Since there is no dhcp response, we are expecting timeout
# exception here.
pass
finally:
policy.stop()
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
# Since there is no dhcp response, we are expecting timeout
# exception here.
self.assertRaises(
app_testing_objects.TimeoutException,
policy.wait,
const.DEFAULT_RESOURCE_READY_TIMEOUT)
policy.stop()
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
def _test_enable_dhcp(self):
# Create policy
@ -852,16 +851,15 @@ class TestL3App(test_base.DFTestBase):
)
)
policy.start(self.topology)
try:
policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT)
except Exception:
# Since there is no OpenFlow in vswitch, we are expecting timeout
# exception here.
pass
finally:
policy.stop()
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
# Since there is no OpenFlow in vswitch, we are expecting timeout
# exception here.
self.assertRaises(
app_testing_objects.TimeoutException,
policy.wait,
const.DEFAULT_RESOURCE_READY_TIMEOUT)
policy.stop()
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
cmd[1] = "set-controller"
cmd.append(controller)
@ -1868,6 +1866,72 @@ class TestDNATApp(test_base.DFTestBase):
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
def _create_rate_limit_port_policies(self, rate, icmp_filter):
ignore_action = app_testing_objects.IgnoreAction()
raise_action = app_testing_objects.RaiseAction("Unexpected packet")
# Disable port policy rule, so that any further packets will hit the
# default action, which is raise_action in this case.
count_action = app_testing_objects.CountAction(
rate, app_testing_objects.DisableRuleAction())
key = (self.subnet.subnet_id, self.port.port_id)
rules = [
app_testing_objects.PortPolicyRule(
# Detect ICMP, end simulation
icmp_filter(self._get_ip),
actions=[count_action]
),
app_testing_objects.PortPolicyRule(
# Ignore gratuitous ARP packets
app_testing_objects.RyuARPGratuitousFilter(),
actions=[ignore_action]
),
app_testing_objects.PortPolicyRule(
# Ignore IPv6 packets
app_testing_objects.RyuIPv6Filter(),
actions=[ignore_action]
),
]
policy = app_testing_objects.PortPolicy(
rules=rules,
default_action=raise_action
)
return {key: policy}
def test_ttl_packet_rate_limit(self):
initial_packet = self._create_packet(
self.topology.external_network.get_gw_ip(),
ryu.lib.packet.ipv4.inet.IPPROTO_ICMP,
ttl=1)
send_action = app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port.port_id,
str(initial_packet))
ignore_action = app_testing_objects.IgnoreAction()
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
send_action,
send_action,
send_action,
send_action,
],
port_policies=self._create_rate_limit_port_policies(
cfg.CONF.df_dnat_app.dnat_ttl_invalid_max_rate,
app_testing_objects.RyuICMPTimeExceedFilter),
unknown_port_action=ignore_action
)
)
policy.start(self.topology)
# Since the rate limit, we expect timeout to wait for 4th packet hit
# the policy.
self.assertRaises(
app_testing_objects.TimeoutException,
policy.wait,
const.DEFAULT_RESOURCE_READY_TIMEOUT)
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
def test_nat_embedded_packet(self):
ignore_action = app_testing_objects.IgnoreAction()
self.port.port.update({"security_groups": []})
@ -1893,3 +1957,37 @@ class TestDNATApp(test_base.DFTestBase):
policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT)
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
def test_nat_embedded_rate_limit(self):
self.port.port.update({"security_groups": []})
initial_packet = self._create_packet(
self.topology.external_network.get_gw_ip(),
ryu.lib.packet.ipv4.inet.IPPROTO_UDP)
send_action = app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port.port_id,
str(initial_packet))
ignore_action = app_testing_objects.IgnoreAction()
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
send_action,
send_action,
send_action,
send_action,
],
port_policies=self._create_rate_limit_port_policies(
cfg.CONF.df_dnat_app.dnat_icmp_error_max_rate,
app_testing_objects.RyuICMPUnreachFilter),
unknown_port_action=ignore_action
)
)
policy.start(self.topology)
# Since the rate limit, we expect timeout to wait for 4th packet hit
# the policy.
self.assertRaises(
app_testing_objects.TimeoutException,
policy.wait,
const.DEFAULT_RESOURCE_READY_TIMEOUT)
if len(policy.exceptions) > 0:
raise policy.exceptions[0]