[L3][QoS] Adding L3 rate limit TC lib

This is the TC lib utils for L3 IP QoS implementation.
For more detail please see [1]: L3 agent side TC rules.

[1] https://review.openstack.org/#/c/374506/

Partially-Implements blueprint: floating-ip-rate-limit
Related-Bug: #1596611
Change-Id: Icfec83ca6dc31d7283d9c6c6ef0997d5e60daae6
This commit is contained in:
LIU Yulong 2017-04-05 09:38:55 +08:00
parent dd06df8f04
commit f40128b437
5 changed files with 764 additions and 0 deletions

View File

@ -34,6 +34,15 @@ ip: IpFilter, ip, root
find: RegExpFilter, find, root, find, /sys/class/net, -maxdepth, 1, -type, l, -printf, %.*
ip_exec: IpNetnsExecFilter, ip, root
# l3_tc_lib
l3_tc_show_qdisc: RegExpFilter, tc, root, tc, qdisc, show, dev, .+
l3_tc_add_qdisc_ingress: RegExpFilter, tc, root, tc, qdisc, add, dev, .+, ingress
l3_tc_add_qdisc_egress: RegExpFilter, tc, root, tc, qdisc, add, dev, .+, root, handle, 1:, htb
l3_tc_show_filters: RegExpFilter, tc, root, tc, -p, -s, -d, filter, show, dev, .+, parent, .+, prio, 1
l3_tc_delete_filters: RegExpFilter, tc, root, tc, filter, del, dev, .+, parent, .+, prio, 1, handle, .+, u32
l3_tc_add_filter_ingress: RegExpFilter, tc, root, tc, filter, add, dev, .+, parent, .+, protocol, ip, prio, 1, u32, match, ip, dst, .+, police, rate, .+, burst, .+, drop, flowid, :1
l3_tc_add_filter_egress: RegExpFilter, tc, root, tc, filter, add, dev, .+, parent, .+, protocol, ip, prio, 1, u32, match, ip, src, .+, police, rate, .+, burst, .+, drop, flowid, :1
# For ip monitor
kill_ip_monitor: KillFilter, root, ip, -9

View File

@ -0,0 +1,194 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
from neutron_lib import constants
from oslo_log import log as logging
from neutron.agent.linux import ip_lib
from neutron.agent.linux import tc_lib
from neutron.common import exceptions
LOG = logging.getLogger(__name__)
QDISC_IN_REGEX = re.compile(r"qdisc ingress (\w+:) *")
QDISC_OUT_REGEX = re.compile(r"qdisc htb (\w+:) *")
FILTER_ID_REGEX = re.compile(r"filter protocol ip u32 fh (\w+::\w+) *")
FILTER_STATS_REGEX = re.compile(r"Sent (\w+) bytes (\w+) pkts *")
class FloatingIPTcCommandBase(ip_lib.IPDevice):
def _execute_tc_cmd(self, cmd, **kwargs):
cmd = ['tc'] + cmd
ip_wrapper = ip_lib.IPWrapper(self.namespace)
return ip_wrapper.netns.execute(cmd, run_as_root=True, **kwargs)
def _get_qdiscs(self):
cmd = ['qdisc', 'show', 'dev', self.name]
return self._execute_tc_cmd(cmd)
def _get_qdisc_id_for_filter(self, direction):
qdisc_results = self._get_qdiscs().split('\n')
for qdisc in qdisc_results:
pattern = (QDISC_OUT_REGEX
if direction == constants.EGRESS_DIRECTION
else QDISC_IN_REGEX)
m = pattern.match(qdisc)
if m:
# No chance to get multiple qdiscs
return m.group(1)
def _add_qdisc(self, direction):
if direction == constants.EGRESS_DIRECTION:
args = ['root', 'handle', '1:', 'htb']
else:
args = ['ingress']
cmd = ['qdisc', 'add', 'dev', self.name] + args
self._execute_tc_cmd(cmd)
def _get_filters(self, qdisc_id):
cmd = ['-p', '-s', '-d', 'filter', 'show', 'dev', self.name,
'parent', qdisc_id, 'prio', 1]
return self._execute_tc_cmd(cmd)
def _get_filterid_for_ip(self, qdisc_id, ip):
filterids_for_ip = []
filters_output = self._get_filters(qdisc_id)
if not filters_output:
raise exceptions.FilterIDForIPNotFound(ip=ip)
filter_lines = filters_output.split('\n')
for line in filter_lines:
line = line.strip()
m = FILTER_ID_REGEX.match(line)
if m:
filter_id = m.group(1)
# It matched, so ip/32 is not here. continue
continue
elif not line.startswith('match'):
continue
parts = line.split(" ")
if ip + '/32' in parts:
filterids_for_ip.append(filter_id)
if len(filterids_for_ip) > 1:
raise exceptions.MultipleFilterIDForIPFound(ip=ip)
elif len(filterids_for_ip) == 0:
raise exceptions.FilterIDForIPNotFound(ip=ip)
return filterids_for_ip[0]
def _del_filter_by_id(self, qdisc_id, filter_id):
cmd = ['filter', 'del', 'dev', self.name,
'parent', qdisc_id,
'prio', 1, 'handle', filter_id, 'u32']
self._execute_tc_cmd(cmd)
def _get_qdisc_filters(self, qdisc_id):
filterids = []
filters_output = self._get_filters(qdisc_id)
if not filters_output:
return filterids
filter_lines = filters_output.split('\n')
for line in filter_lines:
line = line.strip()
m = FILTER_ID_REGEX.match(line)
if m:
filter_id = m.group(1)
filterids.append(filter_id)
return filterids
def _add_filter(self, qdisc_id, direction, ip, rate, burst):
rate_value = "%s%s" % (rate, tc_lib.BW_LIMIT_UNIT)
burst_value = "%s%s" % (
tc_lib.TcCommand.get_ingress_qdisc_burst_value(rate, burst),
tc_lib.BURST_UNIT
)
protocol = ['protocol', 'ip']
prio = ['prio', 1]
_match = 'src' if direction == constants.EGRESS_DIRECTION else 'dst'
match = ['u32', 'match', 'ip', _match, ip]
police = ['police', 'rate', rate_value, 'burst', burst_value,
'drop', 'flowid', ':1']
args = protocol + prio + match + police
cmd = ['filter', 'add', 'dev', self.name,
'parent', qdisc_id] + args
self._execute_tc_cmd(cmd)
def _get_or_create_qdisc(self, direction):
qdisc_id = self._get_qdisc_id_for_filter(direction)
if not qdisc_id:
self._add_qdisc(direction)
qdisc_id = self._get_qdisc_id_for_filter(direction)
if not qdisc_id:
raise exceptions.FailedToAddQdiscToDevice(direction=direction,
device=self.name)
return qdisc_id
class FloatingIPTcCommand(FloatingIPTcCommandBase):
def clear_all_filters(self, direction):
qdisc_id = self._get_qdisc_id_for_filter(direction)
if not qdisc_id:
return
filterids = self._get_qdisc_filters(qdisc_id)
for filter_id in filterids:
self._del_filter_by_id(qdisc_id, filter_id)
def get_filter_id_for_ip(self, direction, ip):
qdisc_id = self._get_qdisc_id_for_filter(direction)
if not qdisc_id:
return
return self._get_filterid_for_ip(qdisc_id, ip)
def get_existing_filter_ids(self, direction):
qdisc_id = self._get_qdisc_id_for_filter(direction)
if not qdisc_id:
return
return self._get_qdisc_filters(qdisc_id)
def delete_filter_ids(self, direction, filterids):
qdisc_id = self._get_qdisc_id_for_filter(direction)
if not qdisc_id:
return
for filter_id in filterids:
self._del_filter_by_id(qdisc_id, filter_id)
def set_ip_rate_limit(self, direction, ip, rate, burst):
qdisc_id = self._get_or_create_qdisc(direction)
try:
filter_id = self._get_filterid_for_ip(qdisc_id, ip)
LOG.debug("Filter %(filter)s for IP %(ip)s in %(direction)s "
"qdisc already existed, removing.",
{'filter': filter_id,
'ip': ip,
'direction': direction})
self._del_filter_by_id(qdisc_id, filter_id)
except exceptions.FilterIDForIPNotFound:
pass
LOG.debug("Adding filter for IP %(ip)s in %(direction)s.",
{'ip': ip,
'direction': direction})
self._add_filter(qdisc_id, direction, ip, rate, burst)
def clear_ip_rate_limit(self, direction, ip):
qdisc_id = self._get_qdisc_id_for_filter(direction)
if not qdisc_id:
return
try:
filter_id = self._get_filterid_for_ip(qdisc_id, ip)
self._del_filter_by_id(qdisc_id, filter_id)
except exceptions.FilterIDForIPNotFound:
LOG.debug("No filter found for IP %(ip)s in %(direction)s, "
"skipping deletion.",
{'ip': ip,
'direction': direction})

View File

@ -352,3 +352,16 @@ class TenantQuotaNotFound(e.NotFound):
class TenantIdProjectIdFilterConflict(e.BadRequest):
message = _("Both tenant_id and project_id passed as filters.")
class MultipleFilterIDForIPFound(e.Conflict):
message = _("Multiple filter IDs for IP %(ip)s found.")
class FilterIDForIPNotFound(e.NotFound):
message = _("Filter ID for IP %(ip)s could not be found.")
class FailedToAddQdiscToDevice(e.NeutronException):
message = _("Failed to add %(direction)s qdisc "
"to device %(device)s.")

View File

@ -0,0 +1,152 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib import constants as common_constants
from oslo_utils import uuidutils
from neutron.agent.l3 import namespaces
from neutron.agent.linux import ip_lib
from neutron.agent.linux import l3_tc_lib
from neutron.common import exceptions
from neutron.tests.functional import base as functional_base
RATE_LIMIT = 1024
BURST_LIMIT = 512
DEV_NAME = "test_device"
class TcLibTestCase(functional_base.BaseSudoTestCase):
def create_tc_wrapper_with_namespace_and_device(self):
ns_name = uuidutils.generate_uuid()
namespace = namespaces.Namespace(
ns_name, None,
mock.Mock(), False)
namespace.create()
self.addCleanup(namespace.delete)
ip_wrapper = ip_lib.IPWrapper(namespace=ns_name)
tc_device = ip_wrapper.add_tuntap(DEV_NAME)
tc_device.link.set_up()
return l3_tc_lib.FloatingIPTcCommand(
DEV_NAME,
namespace=ns_name)
def test_clear_all_filters(self):
ip_addr = "2.2.2.2"
l3_tc = self.create_tc_wrapper_with_namespace_and_device()
l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr,
RATE_LIMIT, BURST_LIMIT)
l3_tc.set_ip_rate_limit(common_constants.EGRESS_DIRECTION, ip_addr,
RATE_LIMIT, BURST_LIMIT)
l3_tc.clear_all_filters(common_constants.INGRESS_DIRECTION)
self.assertRaises(exceptions.FilterIDForIPNotFound,
l3_tc.get_filter_id_for_ip,
common_constants.INGRESS_DIRECTION,
ip_addr)
l3_tc.clear_all_filters(common_constants.EGRESS_DIRECTION)
self.assertRaises(exceptions.FilterIDForIPNotFound,
l3_tc.get_filter_id_for_ip,
common_constants.EGRESS_DIRECTION,
ip_addr)
def test_get_filter_id_for_ip(self):
ip_addr = "3.3.3.3"
l3_tc = self.create_tc_wrapper_with_namespace_and_device()
l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr,
RATE_LIMIT, BURST_LIMIT)
l3_tc.set_ip_rate_limit(common_constants.EGRESS_DIRECTION, ip_addr,
RATE_LIMIT, BURST_LIMIT)
self.assertIsNotNone(
l3_tc.get_filter_id_for_ip(common_constants.INGRESS_DIRECTION,
ip_addr))
self.assertIsNotNone(
l3_tc.get_filter_id_for_ip(common_constants.EGRESS_DIRECTION,
ip_addr))
# testing: IP filter does not exist
self.assertRaises(exceptions.FilterIDForIPNotFound,
l3_tc.get_filter_id_for_ip,
common_constants.EGRESS_DIRECTION,
'33.33.33.33')
def test_get_existing_filter_ids(self):
ip_addr = "4.4.4.4"
l3_tc = self.create_tc_wrapper_with_namespace_and_device()
l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr,
RATE_LIMIT, BURST_LIMIT)
l3_tc.set_ip_rate_limit(common_constants.EGRESS_DIRECTION, ip_addr,
RATE_LIMIT, BURST_LIMIT)
filter_ids = l3_tc.get_existing_filter_ids(
common_constants.INGRESS_DIRECTION)
self.assertNotEqual(0, len(filter_ids))
filter_ids = l3_tc.get_existing_filter_ids(
common_constants.EGRESS_DIRECTION)
self.assertNotEqual(0, len(filter_ids))
def test_delete_filter_ids(self):
ip_addr1 = "5.5.5.5"
ip_addr2 = "6.6.6.6"
l3_tc = self.create_tc_wrapper_with_namespace_and_device()
l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr1,
RATE_LIMIT, BURST_LIMIT)
l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr2,
RATE_LIMIT, BURST_LIMIT)
filter_ids = l3_tc.get_existing_filter_ids(
common_constants.INGRESS_DIRECTION)
self.assertEqual(2, len(filter_ids))
l3_tc.delete_filter_ids(common_constants.INGRESS_DIRECTION,
filter_ids)
filter_ids = l3_tc.get_existing_filter_ids(
common_constants.INGRESS_DIRECTION)
self.assertEqual(0, len(filter_ids))
def test_set_ip_rate_limit(self):
ip_addr = "7.7.7.7"
l3_tc = self.create_tc_wrapper_with_namespace_and_device()
# Set it multiple times
l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr,
RATE_LIMIT, BURST_LIMIT)
l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr,
RATE_LIMIT, BURST_LIMIT)
l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr,
RATE_LIMIT, BURST_LIMIT)
# Get only one and no exception
filter_id = l3_tc.get_filter_id_for_ip(
common_constants.INGRESS_DIRECTION,
ip_addr)
self.assertIsNotNone(filter_id)
def test_clear_ip_rate_limit(self):
ip_addr = "8.8.8.8"
l3_tc = self.create_tc_wrapper_with_namespace_and_device()
l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION,
ip_addr,
RATE_LIMIT, BURST_LIMIT)
filter_id = l3_tc.get_filter_id_for_ip(
common_constants.INGRESS_DIRECTION,
ip_addr)
self.assertIsNotNone(filter_id)
filter_id = l3_tc.clear_ip_rate_limit(
common_constants.INGRESS_DIRECTION,
ip_addr)
self.assertIsNone(filter_id)
# testing: IP filter does not exist
l3_tc.clear_ip_rate_limit(
common_constants.INGRESS_DIRECTION,
"88.88.88.88")

View File

@ -0,0 +1,396 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib import constants
from neutron.agent.linux import l3_tc_lib as tc_lib
from neutron.common import exceptions
from neutron.tests import base
FLOATING_IP_DEVICE_NAME = "qg-device_rfp"
FLOATING_IP_ROUTER_NAMESPACE = "qrouter-namespace_snat-namespace"
FLOATING_IP_1 = "172.16.5.146"
FLOATING_IP_2 = "172.16.10.105"
FILETER_ID_1 = "800::800"
FILETER_ID_2 = "800::801"
TC_INGRESS_FILTERS = (
'filter protocol ip u32 \n'
'filter protocol ip u32 fh 800: ht divisor 1 \n'
'filter protocol ip u32 fh %(filter_id1)s order 2048 key '
'ht 800 bkt 0 '
'flowid :1 (rule hit 0 success 0)\n'
' match IP dst %(fip1)s/32 (success 0 ) \n'
' police 0x3 rate 3000Kbit burst 3Mb mtu 64Kb action drop overhead 0b \n'
'ref 1 bind 1\n'
'\n'
' Sent 111 bytes 222 pkts (dropped 0, overlimits 0) \n'
'filter protocol ip u32 fh %(filter_id2)s order 2049 key '
'ht 800 bkt 0 '
'flowid :1 (rule hit 0 success 0)\n'
' match IP dst %(fip2)s/32 (success 0 ) \n'
' police 0x1b rate 22000Kbit burst 22Mb mtu 64Kb action drop '
'overhead 0b \n'
'ref 1 bind 1\n'
'\n'
' Sent 111 bytes 222 pkts (dropped 0, overlimits 0)\n') % {
"filter_id1": FILETER_ID_1,
"fip1": FLOATING_IP_1,
"filter_id2": FILETER_ID_2,
"fip2": FLOATING_IP_2}
TC_INGRESS_FILTERS_DUP = TC_INGRESS_FILTERS + (
'filter protocol ip u32 fh %(filter_id2)s order 2049 key '
'ht 800 bkt 0 '
'flowid :1 (rule hit 0 success 0)\n'
' match IP dst %(fip2)s/32 (success 0 ) \n'
' police 0x1b rate 22000Kbit burst 22Mb mtu 64Kb action drop '
'overhead 0b \n'
'ref 1 bind 1\n'
'\n'
' Sent 111 bytes 222 pkts (dropped 0, overlimits 0)\n') % {
"filter_id2": FILETER_ID_2,
"fip2": FLOATING_IP_2}
TC_EGRESS_FILTERS = (
'filter protocol ip u32 \n'
'filter protocol ip u32 fh 800: ht divisor 1 \n'
'filter protocol ip u32 fh %(filter_id1)s order 2048 key '
'ht 800 bkt 0 '
'flowid :1 (rule hit 0 success 0)\n'
' match IP src %(fip1)s/32 (success 0 ) \n'
' police 0x4 rate 3000Kbit burst 3Mb mtu 64Kb action drop overhead 0b \n'
'ref 1 bind 1\n'
'\n'
' Sent 111 bytes 222 pkts (dropped 0, overlimits 0) \n'
'filter protocol ip u32 fh %(filter_id2)s order 2049 key '
'ht 800 bkt 0 '
'flowid :1 (rule hit 0 success 0)\n'
' match IP src %(fip2)s/32 (success 0 ) \n'
' police 0x1c rate 22000Kbit burst 22Mb mtu 64Kb action drop '
'overhead 0b \n'
'ref 1 bind 1\n'
'\n'
' Sent 111 bytes 222 pkts (dropped 0, overlimits 0)\n') % {
"filter_id1": FILETER_ID_1,
"fip1": FLOATING_IP_1,
"filter_id2": FILETER_ID_2,
"fip2": FLOATING_IP_2}
FILTERS_IDS = {constants.INGRESS_DIRECTION: TC_INGRESS_FILTERS,
constants.EGRESS_DIRECTION: TC_EGRESS_FILTERS}
INGRESS_QSIC_ID = "ffff:"
EGRESS_QDISC_ID = "1:"
QDISC_IDS = {constants.INGRESS_DIRECTION: INGRESS_QSIC_ID,
constants.EGRESS_DIRECTION: EGRESS_QDISC_ID}
TC_QDISCS = (
'qdisc htb %(egress)s root refcnt 2 r2q 10 default 0 '
'direct_packets_stat 6\n'
'qdisc ingress %(ingress)s parent ffff:fff1 ----------------\n') % {
"egress": EGRESS_QDISC_ID,
"ingress": INGRESS_QSIC_ID}
class TestFloatingIPTcCommandBase(base.BaseTestCase):
def setUp(self):
super(TestFloatingIPTcCommandBase, self).setUp()
self.tc = tc_lib.FloatingIPTcCommandBase(
FLOATING_IP_DEVICE_NAME,
namespace=FLOATING_IP_ROUTER_NAMESPACE)
self.execute = mock.patch('neutron.agent.common.utils.execute').start()
def test__get_qdiscs(self):
self.tc._get_qdiscs()
self.execute.assert_called_once_with(
['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE,
'tc', 'qdisc', 'show', 'dev', FLOATING_IP_DEVICE_NAME],
run_as_root=True,
check_exit_code=True,
log_fail_as_error=True,
extra_ok_codes=None
)
def test__get_qdisc_id_for_filter(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_qdiscs') as get_qdiscs:
get_qdiscs.return_value = TC_QDISCS
q1 = self.tc._get_qdisc_id_for_filter(constants.INGRESS_DIRECTION)
self.assertEqual(INGRESS_QSIC_ID, q1)
q2 = self.tc._get_qdisc_id_for_filter(constants.EGRESS_DIRECTION)
self.assertEqual(EGRESS_QDISC_ID, q2)
def test__add_qdisc(self):
self.tc._add_qdisc(constants.INGRESS_DIRECTION)
self.execute.assert_called_with(
['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE,
'tc', 'qdisc', 'add', 'dev', FLOATING_IP_DEVICE_NAME, 'ingress'],
run_as_root=True,
check_exit_code=True,
log_fail_as_error=True,
extra_ok_codes=None
)
self.tc._add_qdisc(constants.EGRESS_DIRECTION)
self.execute.assert_called_with(
['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE,
'tc', 'qdisc', 'add', 'dev',
FLOATING_IP_DEVICE_NAME] + ['root', 'handle', '1:', 'htb'],
run_as_root=True,
check_exit_code=True,
log_fail_as_error=True,
extra_ok_codes=None
)
def test__get_filters(self):
self.tc._get_filters(INGRESS_QSIC_ID)
self.execute.assert_called_with(
['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE,
'tc', '-p', '-s', '-d', 'filter', 'show', 'dev',
FLOATING_IP_DEVICE_NAME,
'parent', INGRESS_QSIC_ID, 'prio', 1],
run_as_root=True,
check_exit_code=True,
log_fail_as_error=True,
extra_ok_codes=None
)
def test__get_filterid_for_ip(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_filters') as get_filters:
get_filters.return_value = TC_EGRESS_FILTERS
f_id = self.tc._get_filterid_for_ip(INGRESS_QSIC_ID, FLOATING_IP_1)
self.assertEqual(FILETER_ID_1, f_id)
def test__get_filterid_for_ip_no_output(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_filters') as get_filters:
get_filters.return_value = ""
self.assertRaises(exceptions.FilterIDForIPNotFound,
self.tc._get_filterid_for_ip,
INGRESS_QSIC_ID, FLOATING_IP_1)
def test__get_filterid_for_ip_duplicated(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_filters') as get_filters:
get_filters.return_value = TC_INGRESS_FILTERS_DUP
self.assertRaises(exceptions.MultipleFilterIDForIPFound,
self.tc._get_filterid_for_ip,
INGRESS_QSIC_ID, FLOATING_IP_2)
def test__get_filterid_for_ip_not_found(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_filters') as get_filters:
get_filters.return_value = TC_EGRESS_FILTERS
self.assertRaises(exceptions.FilterIDForIPNotFound,
self.tc._get_filterid_for_ip,
INGRESS_QSIC_ID, "1.1.1.1")
def test__del_filter_by_id(self):
self.tc._del_filter_by_id(INGRESS_QSIC_ID, FLOATING_IP_1)
self.execute.assert_called_once_with(
['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE,
'tc', 'filter', 'del', 'dev', FLOATING_IP_DEVICE_NAME,
'parent', INGRESS_QSIC_ID,
'prio', 1, 'handle', FLOATING_IP_1, 'u32'],
run_as_root=True,
check_exit_code=True,
log_fail_as_error=True,
extra_ok_codes=None
)
def test__get_qdisc_filters(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_filters') as get_filters:
get_filters.return_value = TC_EGRESS_FILTERS
f_ids = self.tc._get_qdisc_filters(INGRESS_QSIC_ID)
self.assertEqual([FILETER_ID_1, FILETER_ID_2], f_ids)
def test__get_qdisc_filters_no_output(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_filters') as get_filters:
get_filters.return_value = ""
f_ids = self.tc._get_qdisc_filters(INGRESS_QSIC_ID)
self.assertEqual(0, len(f_ids))
def test__add_filter(self):
protocol = ['protocol', 'ip']
prio = ['prio', 1]
match = ['u32', 'match', 'ip', 'dst', FLOATING_IP_1]
police = ['police', 'rate', '1kbit', 'burst', '1kbit',
'drop', 'flowid', ':1']
args = protocol + prio + match + police
cmd = ['tc', 'filter', 'add', 'dev', FLOATING_IP_DEVICE_NAME,
'parent', INGRESS_QSIC_ID] + args
self.tc._add_filter(INGRESS_QSIC_ID,
constants.INGRESS_DIRECTION,
FLOATING_IP_1, 1, 1)
self.execute.assert_called_once_with(
['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE] + cmd,
run_as_root=True,
check_exit_code=True,
log_fail_as_error=True,
extra_ok_codes=None
)
def test__get_or_create_qdisc(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_qdisc_id_for_filter') as get_disc1:
get_disc1.return_value = None
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_add_qdisc'):
with mock.patch.object(
tc_lib.FloatingIPTcCommandBase,
'_get_qdisc_id_for_filter') as get_disc2:
get_disc2.return_value = INGRESS_QSIC_ID
qdisc_id = self.tc._get_or_create_qdisc(
constants.INGRESS_DIRECTION)
self.assertEqual(INGRESS_QSIC_ID, qdisc_id)
def test__get_or_create_qdisc_failed(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_qdisc_id_for_filter') as get_disc1:
get_disc1.return_value = None
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_add_qdisc'):
with mock.patch.object(
tc_lib.FloatingIPTcCommandBase,
'_get_qdisc_id_for_filter') as get_disc2:
get_disc2.return_value = None
self.assertRaises(exceptions.FailedToAddQdiscToDevice,
self.tc._get_or_create_qdisc,
constants.INGRESS_DIRECTION)
class TestFloatingIPTcCommand(base.BaseTestCase):
def setUp(self):
super(TestFloatingIPTcCommand, self).setUp()
self.tc = tc_lib.FloatingIPTcCommand(
FLOATING_IP_DEVICE_NAME,
namespace=FLOATING_IP_ROUTER_NAMESPACE)
self.execute = mock.patch('neutron.agent.common.utils.execute').start()
def test_clear_all_filters(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_qdisc_id_for_filter') as get_disc:
get_disc.return_value = EGRESS_QDISC_ID
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_filters') as get_filters:
get_filters.return_value = TC_EGRESS_FILTERS
self.tc.clear_all_filters(constants.EGRESS_DIRECTION)
self.assertEqual(2, self.execute.call_count)
def test_set_ip_rate_limit_filter_existed(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_qdisc_id_for_filter') as get_disc:
get_disc.return_value = EGRESS_QDISC_ID
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_filterid_for_ip') as get_filter:
get_filter.return_value = FILETER_ID_1
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_del_filter_by_id') as del_filter:
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_add_filter') as add_filter:
ip = "111.111.111.111"
self.tc.set_ip_rate_limit(constants.EGRESS_DIRECTION,
ip, 1, 1)
del_filter.assert_called_once_with(
EGRESS_QDISC_ID, FILETER_ID_1)
add_filter.assert_called_once_with(
EGRESS_QDISC_ID, constants.EGRESS_DIRECTION,
ip, 1, 1)
def test_set_ip_rate_limit_no_qdisc(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_qdisc_id_for_filter') as get_disc:
get_disc.return_value = None
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_add_qdisc'):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_filters') as get_filters:
get_filters.return_value = TC_INGRESS_FILTERS
get_disc.return_value = INGRESS_QSIC_ID
ip = "111.111.111.111"
self.tc.set_ip_rate_limit(constants.INGRESS_DIRECTION,
ip, 1, 1)
protocol = ['protocol', 'ip']
prio = ['prio', 1]
_match = 'dst'
match = ['u32', 'match', 'ip', _match, ip]
police = ['police', 'rate', '1kbit', 'burst', '1kbit',
'drop', 'flowid', ':1']
args = protocol + prio + match + police
self.execute.assert_called_once_with(
['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE,
'tc', 'filter', 'add', 'dev', FLOATING_IP_DEVICE_NAME,
'parent', INGRESS_QSIC_ID] + args,
run_as_root=True,
check_exit_code=True,
log_fail_as_error=True,
extra_ok_codes=None
)
def test_clear_ip_rate_limit(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_qdisc_id_for_filter') as get_disc:
get_disc.return_value = EGRESS_QDISC_ID
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_filterid_for_ip') as get_filter_id:
get_filter_id.return_value = FILETER_ID_1
self.tc.clear_ip_rate_limit(constants.EGRESS_DIRECTION,
FLOATING_IP_1)
self.execute.assert_called_once_with(
['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE,
'tc', 'filter', 'del', 'dev', FLOATING_IP_DEVICE_NAME,
'parent', EGRESS_QDISC_ID,
'prio', 1, 'handle', FILETER_ID_1, 'u32'],
run_as_root=True,
check_exit_code=True,
log_fail_as_error=True,
extra_ok_codes=None
)
def test_get_filter_id_for_ip(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_qdisc_id_for_filter') as get_disc:
get_disc.return_value = EGRESS_QDISC_ID
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_filterid_for_ip') as get_filter_id:
self.tc.get_filter_id_for_ip(constants.EGRESS_DIRECTION,
'8.8.8.8')
get_filter_id.assert_called_once_with(EGRESS_QDISC_ID,
'8.8.8.8')
def test_get_existing_filter_ids(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_qdisc_id_for_filter') as get_disc:
get_disc.return_value = EGRESS_QDISC_ID
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_qdisc_filters') as get_filter_ids:
self.tc.get_existing_filter_ids(constants.EGRESS_DIRECTION)
get_filter_ids.assert_called_once_with(EGRESS_QDISC_ID)
def test_delete_filter_ids(self):
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_get_qdisc_id_for_filter') as get_disc:
get_disc.return_value = EGRESS_QDISC_ID
with mock.patch.object(tc_lib.FloatingIPTcCommandBase,
'_del_filter_by_id') as del_filter_id:
self.tc.delete_filter_ids(constants.EGRESS_DIRECTION,
[FILETER_ID_1, FILETER_ID_2])
del_filter_id.assert_has_calls(
[mock.call(EGRESS_QDISC_ID, FILETER_ID_1),
mock.call(EGRESS_QDISC_ID, FILETER_ID_2)])