From cd3cc7e9087dc7ee0e6833d806da9a1896ede54c Mon Sep 17 00:00:00 2001 From: LIU Yulong Date: Mon, 14 May 2018 15:00:29 +0800 Subject: [PATCH] [L3][QoS] Agent side router gateway IP rate limit This patch implements the L3 agent side router gateway IP rate limit. For routers in centralized snat node (network node), the tc rules will be set on the corresponding device in router namespace: 1. Legacy and HA router, qrouter-namespace and qg-device 2. Dvr (edge) router, snat namespace and qg-device If gateway IP rate limit was set, then under the same router, all the VMs without floating IP will share the bandwidth. Partially-Implements blueprint: router-gateway-ip-qos Closes-Bug: #1757044 Change-Id: Ie92ff0d4df0e85ce71c7d50f34ea6ff973812af8 --- devstack/lib/qos | 4 + devstack/plugin.sh | 1 + neutron/agent/l3/extensions/qos/gateway_ip.py | 191 +++++++++++++++ neutron/agent/l3/router_info.py | 1 + .../test_gateway_ip_qos_extension.py | 197 ++++++++++++++++ .../l3/extensions/qos/test_gateway_ip.py | 221 ++++++++++++++++++ setup.cfg | 1 + 7 files changed, 616 insertions(+) create mode 100644 neutron/agent/l3/extensions/qos/gateway_ip.py create mode 100644 neutron/tests/functional/agent/l3/extensions/test_gateway_ip_qos_extension.py create mode 100644 neutron/tests/unit/agent/l3/extensions/qos/test_gateway_ip.py diff --git a/devstack/lib/qos b/devstack/lib/qos index 9fb9cadefa9..9570fc74802 100644 --- a/devstack/lib/qos +++ b/devstack/lib/qos @@ -22,3 +22,7 @@ function configure_qos { function configure_l3_agent_extension_fip_qos { plugin_agent_add_l3_agent_extension "fip_qos" } + +function configure_l3_agent_extension_gateway_ip_qos { + plugin_agent_add_l3_agent_extension "gateway_ip_qos" +} diff --git a/devstack/plugin.sh b/devstack/plugin.sh index ef19c51fef6..d87a788f38a 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -72,6 +72,7 @@ if [[ "$1" == "stack" ]]; then if is_service_enabled q-l3 neutron-l3; then if is_service_enabled q-qos neutron-qos; then configure_l3_agent_extension_fip_qos + configure_l3_agent_extension_gateway_ip_qos fi if is_service_enabled q-port-forwarding neutron-port-forwarding; then configure_port_forwarding diff --git a/neutron/agent/l3/extensions/qos/gateway_ip.py b/neutron/agent/l3/extensions/qos/gateway_ip.py new file mode 100644 index 00000000000..d8abbc57ece --- /dev/null +++ b/neutron/agent/l3/extensions/qos/gateway_ip.py @@ -0,0 +1,191 @@ +# Copyright 2018 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import netaddr + +from neutron_lib.agent import l3_extension +from neutron_lib import constants +from oslo_concurrency import lockutils +from oslo_log import log as logging + + +from neutron.agent.l3.extensions.qos import base as qos_base +from neutron.agent.linux import ip_lib +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.handlers import resources_rpc + +LOG = logging.getLogger(__name__) + + +class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase, + l3_extension.L3AgentExtension): + + def initialize(self, connection, driver_type): + """Initialize agent extension.""" + self.resource_rpc = resources_rpc.ResourcesPullRpcApi() + self._register_rpc_consumers() + self.gateway_ip_qos_map = qos_base.RateLimitMaps() + + @lockutils.synchronized('qos-gateway-ip') + def _handle_notification(self, context, resource_type, + qos_policies, event_type): + if event_type == events.UPDATED: + for qos_policy in qos_policies: + self._process_update_policy(qos_policy) + + def _process_router_gateway_after_policy_update( + self, router_id, qos_policy): + router_info = self._get_router_info(router_id) + if not router_info: + return + ex_gw_port = router_info.get_ex_gw_port() + if not ex_gw_port: + return + interface_name = router_info.get_external_device_name( + ex_gw_port['id']) + device = self._get_gateway_tc_rule_device( + router_info, interface_name) + if not device.exists(): + return + tc_wrapper = self._get_tc_wrapper(device) + # Clear all old gateway IP tc rules first. + self._empty_router_gateway_rate_limits(router_info, tc_wrapper) + rates = self.get_policy_rates(qos_policy) + self.gateway_ip_qos_map.set_resource_policy( + router_info.router_id, qos_policy) + self._set_gateway_tc_rules( + router_info, tc_wrapper, + ex_gw_port, rates) + + def _process_update_policy(self, qos_policy): + old_qos_policy = self.gateway_ip_qos_map.get_policy(qos_policy.id) + if old_qos_policy: + if self._policy_rules_modified(old_qos_policy, qos_policy): + router_ids = self.gateway_ip_qos_map.get_resources( + qos_policy) + for router_id in list(router_ids): + self._process_router_gateway_after_policy_update( + router_id, qos_policy) + self.gateway_ip_qos_map.update_policy(qos_policy) + + @lockutils.synchronized('qos-gateway-ip') + def add_router(self, context, data): + router_info = self._get_router_info(data['id']) + if router_info: + self.process_gateway_rate_limit(context, router_info) + + @lockutils.synchronized('qos-gateway-ip') + def update_router(self, context, data): + router_info = self._get_router_info(data['id']) + if router_info: + self.process_gateway_rate_limit(context, router_info) + + def delete_router(self, context, data): + # Remove the router and policy map in case the router deletion with + # gateway. + self.gateway_ip_qos_map.clean_by_resource(data['id']) + + def ha_state_change(self, context, data): + pass + + def process_gateway_rate_limit(self, context, router_info): + is_distributed_router = router_info.router.get('distributed') + agent_mode = router_info.agent_conf.agent_mode + LOG.debug("Start processing gateway IP QoS for " + "router %(router_id)s, router " + "distributed: %(distributed)s, " + "agent mode: %(agent_mode)s", + {"router_id": router_info.router_id, + "distributed": is_distributed_router, + "agent_mode": agent_mode}) + if is_distributed_router and agent_mode in ( + constants.L3_AGENT_MODE_DVR, + constants.L3_AGENT_MODE_DVR_NO_EXTERNAL): + # Dvr local router and dvr_no_external agent do not process + # gateway IPs. + return + + self._handle_router_gateway_rate_limit(context, router_info) + + def _empty_router_gateway_rate_limits(self, router_info, tc_wrapper): + self.gateway_ip_qos_map.clean_by_resource(router_info.router_id) + for ip in router_info.qos_gateway_ips: + for direction in constants.VALID_DIRECTIONS: + tc_wrapper.clear_ip_rate_limit(direction, ip) + router_info.qos_gateway_ips.clear() + + def _handle_router_gateway_rate_limit(self, context, router_info): + ex_gw_port = router_info.get_ex_gw_port() + if not ex_gw_port: + return + + interface_name = router_info.get_external_device_name( + ex_gw_port['id']) + device = self._get_gateway_tc_rule_device(router_info, interface_name) + if not device.exists(): + return + + tc_wrapper = self._get_tc_wrapper(device) + # Clear all old gateway IP tc rules first. + self._empty_router_gateway_rate_limits(router_info, tc_wrapper) + + rates = self._get_rates_by_policy(context, router_info) + if not rates: + return + + self._set_gateway_tc_rules(router_info, tc_wrapper, ex_gw_port, rates) + + def _get_gateway_tc_rule_device(self, router_info, interface_name): + is_distributed_router = router_info.router.get('distributed') + agent_mode = router_info.agent_conf.agent_mode + namespace = router_info.ns_name + if (is_distributed_router and + agent_mode == constants.L3_AGENT_MODE_DVR_SNAT): + namespace = router_info.snat_namespace.name + return ip_lib.IPDevice(interface_name, namespace=namespace) + + def _get_rates_by_policy(self, context, router_info): + gateway_info = router_info.router.get('external_gateway_info') + if not gateway_info: + return + + policy_id = gateway_info.get('qos_policy_id') + if not policy_id: + return + + policy = self.resource_rpc.pull( + context, resources.QOS_POLICY, policy_id) + self.gateway_ip_qos_map.set_resource_policy( + router_info.router_id, policy) + return self.get_policy_rates(policy) + + def _set_gateway_tc_rules(self, router_info, tc_wrapper, + ex_gw_port, rates): + for ip_addr in ex_gw_port['fixed_ips']: + ex_gw_ip = ip_addr['ip_address'] + ip_ver = netaddr.IPAddress(ex_gw_ip).version + if ip_ver == constants.IP_VERSION_4: + self._set_gateway_ip_rate_limit(tc_wrapper, ex_gw_ip, rates) + router_info.qos_gateway_ips.add(ex_gw_ip) + + def _set_gateway_ip_rate_limit(self, tc_wrapper, ex_gw_ip, rates): + for direction in constants.VALID_DIRECTIONS: + rate = rates.get(direction) + if (rate['rate'] == qos_base.IP_DEFAULT_RATE and + rate['burst'] == qos_base.IP_DEFAULT_BURST): + continue + tc_wrapper.set_ip_rate_limit(direction, ex_gw_ip, + rate['rate'], rate['burst']) diff --git a/neutron/agent/l3/router_info.py b/neutron/agent/l3/router_info.py index 8dfd487822c..226bd00aff9 100644 --- a/neutron/agent/l3/router_info.py +++ b/neutron/agent/l3/router_info.py @@ -82,6 +82,7 @@ class RouterInfo(object): self.radvd = None self.centralized_port_forwarding_fip_set = set() self.fip_managed_by_port_forwardings = None + self.qos_gateway_ips = set() def initialize(self, process_monitor): """Initialize the router on the system. diff --git a/neutron/tests/functional/agent/l3/extensions/test_gateway_ip_qos_extension.py b/neutron/tests/functional/agent/l3/extensions/test_gateway_ip_qos_extension.py new file mode 100644 index 00000000000..39b11474932 --- /dev/null +++ b/neutron/tests/functional/agent/l3/extensions/test_gateway_ip_qos_extension.py @@ -0,0 +1,197 @@ +# Copyright 2018 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from neutron_lib import constants +from oslo_utils import uuidutils + +from neutron.agent.l3 import agent as neutron_l3_agent +from neutron.agent.l3.extensions.qos import gateway_ip as gateway_ip_qos +from neutron.common import exceptions +from neutron.common import utils as common_utils +from neutron.objects.qos import policy +from neutron.objects.qos import rule +from neutron.tests.functional.agent.l3 import framework +from neutron.tests.functional.agent.l3 import test_dvr_router + +_uuid = uuidutils.generate_uuid +INGRESS_EGRESS_POLICY_ID = _uuid() +INGRESS_POLICY_ID = _uuid() +EGRESS_POLICY_ID = _uuid() + + +class RouterGatewayIPQosAgentExtensionTestFramework( + framework.L3AgentTestFramework): + + test_bw_limit_rule_1 = rule.QosBandwidthLimitRule( + context=None, + qos_policy_id=INGRESS_EGRESS_POLICY_ID, + id=_uuid(), + max_kbps=111, + max_burst_kbps=222, + direction=constants.INGRESS_DIRECTION) + test_bw_limit_rule_2 = rule.QosBandwidthLimitRule( + context=None, + qos_policy_id=INGRESS_EGRESS_POLICY_ID, + id=_uuid(), + max_kbps=333, + max_burst_kbps=444, + direction=constants.EGRESS_DIRECTION) + test_bw_limit_rule_3 = rule.QosBandwidthLimitRule( + context=None, + qos_policy_id=INGRESS_POLICY_ID, + id=_uuid(), + max_kbps=555, + max_burst_kbps=666, + direction=constants.INGRESS_DIRECTION) + test_bw_limit_rule_4 = rule.QosBandwidthLimitRule( + context=None, + qos_policy_id=EGRESS_POLICY_ID, + id=_uuid(), + max_kbps=777, + max_burst_kbps=888, + direction=constants.EGRESS_DIRECTION) + + def setUp(self): + super(RouterGatewayIPQosAgentExtensionTestFramework, self).setUp() + self.conf.set_override('extensions', ['gateway_ip_qos'], 'agent') + self.agent = neutron_l3_agent.L3NATAgentWithStateReport('agent1', + self.conf) + self._set_pull_mock() + self.set_test_qos_rules(INGRESS_EGRESS_POLICY_ID, + [self.test_bw_limit_rule_1, + self.test_bw_limit_rule_2]) + self.set_test_qos_rules(INGRESS_POLICY_ID, + [self.test_bw_limit_rule_3]) + self.set_test_qos_rules(EGRESS_POLICY_ID, + [self.test_bw_limit_rule_4]) + self.gateway_ip_qos_ext = ( + gateway_ip_qos.RouterGatewayIPQosAgentExtension()) + + def _set_pull_mock(self): + + self.qos_policies = {} + + def _pull_mock(context, resource_type, resource_id): + return self.qos_policies[resource_id] + + self.pull = mock.patch( + 'neutron.api.rpc.handlers.resources_rpc.' + 'ResourcesPullRpcApi.pull').start() + self.pull.side_effect = _pull_mock + + def set_test_qos_rules(self, policy_id, policy_rules): + """This function sets the policy test rules to be exposed.""" + + qos_policy = policy.QosPolicy( + context=None, + project_id=_uuid(), + id=policy_id, + name="Test Policy Name", + description="This is a policy for testing purposes", + shared=False, + rules=policy_rules) + + qos_policy.obj_reset_changes() + self.qos_policies[policy_id] = qos_policy + + def _assert_bandwidth_limit_rule_is_set(self, router, ip, rule): + ex_gw_port = router.get_ex_gw_port() + interface_name = router.get_external_device_name(ex_gw_port['id']) + device = self.gateway_ip_qos_ext._get_gateway_tc_rule_device( + router, interface_name) + tc_wrapper = self.gateway_ip_qos_ext._get_tc_wrapper(device) + + def get_filter_id(): + try: + return tc_wrapper.get_filter_id_for_ip(rule.direction, ip) + except exceptions.FilterIDForIPNotFound: + pass + + common_utils.wait_until_true(get_filter_id) + + +class TestRouterGatewayIPQosAgentExtension( + RouterGatewayIPQosAgentExtensionTestFramework): + + def _test_centralized_routers(self, enable_ha=False, + ingress=True, egress=True): + qos_policy_id = INGRESS_EGRESS_POLICY_ID + if ingress and not egress: + qos_policy_id = INGRESS_POLICY_ID + elif egress and not ingress: + qos_policy_id = EGRESS_POLICY_ID + router_info = self.generate_router_info( + enable_ha=enable_ha, + qos_policy_id=qos_policy_id) + ri = self.manage_router(self.agent, router_info) + if qos_policy_id == INGRESS_EGRESS_POLICY_ID: + self._assert_bandwidth_limit_rule_is_set( + ri, '19.4.4.4', self.test_bw_limit_rule_1) + self._assert_bandwidth_limit_rule_is_set( + ri, '19.4.4.4', self.test_bw_limit_rule_2) + elif qos_policy_id == INGRESS_POLICY_ID: + self._assert_bandwidth_limit_rule_is_set( + ri, '19.4.4.4', self.test_bw_limit_rule_3) + elif qos_policy_id == EGRESS_POLICY_ID: + self._assert_bandwidth_limit_rule_is_set( + ri, '19.4.4.4', self.test_bw_limit_rule_4) + + def test_legacy_router_gateway_ip_qos(self): + self._test_centralized_routers() + + def test_legacy_router_gateway_ip_qos_ingress(self): + self._test_centralized_routers(ingress=True, egress=False) + + def test_legacy_router_gateway_ip_qos_egress(self): + self._test_centralized_routers(ingress=False, egress=True) + + def test_ha_router_gateway_ip_qos(self): + self._test_centralized_routers(enable_ha=True) + + def test_ha_router_gateway_ip_qos_ingress(self): + self._test_centralized_routers(enable_ha=True, + ingress=True, egress=False) + + def test_ha_router_gateway_ip_qos_egress(self): + self._test_centralized_routers(enable_ha=True, + ingress=False, egress=True) + + +class TestRouterGatewayIPQosAgentExtensionDVR( + test_dvr_router.TestDvrRouter, + RouterGatewayIPQosAgentExtensionTestFramework): + + def _test_dvr_gateway_ip_qos(self, enable_ha=False): + self.agent.conf.agent_mode = constants.L3_AGENT_MODE_DVR_SNAT + router_info = self.generate_dvr_router_info( + enable_ha=enable_ha, enable_snat=True, + enable_gw=True, qos_policy_id=INGRESS_EGRESS_POLICY_ID) + ri = self.manage_router(self.agent, router_info) + self._assert_bandwidth_limit_rule_is_set( + ri, '19.4.4.4', self.test_bw_limit_rule_1) + self._assert_bandwidth_limit_rule_is_set( + ri, '19.4.4.4', self.test_bw_limit_rule_2) + + def test_dvr_edge_router_gateway_ip_qos(self): + self._test_dvr_gateway_ip_qos() + + def test_ha_dvr_edge_router_gateway_ip_qos(self): + self._test_dvr_gateway_ip_qos(enable_ha=True) + + +class LinuxBridgeRouterGatewayIPQosAgentExtensionTestCase( + TestRouterGatewayIPQosAgentExtension): + INTERFACE_DRIVER = 'neutron.agent.linux.interface.BridgeInterfaceDriver' diff --git a/neutron/tests/unit/agent/l3/extensions/qos/test_gateway_ip.py b/neutron/tests/unit/agent/l3/extensions/qos/test_gateway_ip.py new file mode 100644 index 00000000000..3d68c2a11c8 --- /dev/null +++ b/neutron/tests/unit/agent/l3/extensions/qos/test_gateway_ip.py @@ -0,0 +1,221 @@ +# Copyright 2017 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy + +import mock +from neutron_lib import constants as lib_const +from neutron_lib import context +from oslo_utils import uuidutils + +from neutron.agent.l3 import agent as l3_agent +from neutron.agent.l3.extensions.qos import gateway_ip as gateway_ip_qos +from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api +from neutron.agent.l3 import router_info as l3router +from neutron.api.rpc.callbacks.consumer import registry +from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.handlers import resources_rpc +from neutron.objects.qos import policy +from neutron.objects.qos import rule +from neutron.tests.unit.agent.l3 import test_agent + +_uuid = uuidutils.generate_uuid + +TEST_QOS_GW_IP = "172.24.4.1" + +HOSTNAME = 'myhost' + + +class QosExtensionBaseTestCase(test_agent.BasicRouterOperationsFramework): + + def setUp(self): + super(QosExtensionBaseTestCase, self).setUp() + + self.gw_ip_qos_ext = gateway_ip_qos.RouterGatewayIPQosAgentExtension() + self.context = context.get_admin_context() + self.connection = mock.Mock() + + self.policy = policy.QosPolicy(context=None, + name='test1', id=_uuid()) + self.ingress_rule = ( + rule.QosBandwidthLimitRule(context=None, id=_uuid(), + qos_policy_id=self.policy.id, + max_kbps=1111, + max_burst_kbps=2222, + direction=lib_const.INGRESS_DIRECTION)) + self.egress_rule = ( + rule.QosBandwidthLimitRule(context=None, id=_uuid(), + qos_policy_id=self.policy.id, + max_kbps=3333, + max_burst_kbps=4444, + direction=lib_const.EGRESS_DIRECTION)) + self.policy.rules = [self.ingress_rule, self.egress_rule] + + self.new_ingress_rule = ( + rule.QosBandwidthLimitRule(context=None, id=_uuid(), + qos_policy_id=self.policy.id, + max_kbps=5555, + max_burst_kbps=6666, + direction=lib_const.INGRESS_DIRECTION)) + self.ingress_rule_only_has_max_kbps = ( + rule.QosBandwidthLimitRule(context=None, id=_uuid(), + qos_policy_id=self.policy.id, + max_kbps=5555, + max_burst_kbps=0, + direction=lib_const.INGRESS_DIRECTION)) + + self.policy2 = policy.QosPolicy(context=None, + name='test2', id=_uuid()) + self.policy2.rules = [self.ingress_rule] + + self.policy3 = policy.QosPolicy(context=None, + name='test3', id=_uuid()) + self.policy3.rules = [self.egress_rule] + + self.policy4 = policy.QosPolicy(context=None, + name='test4', id=_uuid()) + self.dscp = rule.QosDscpMarkingRule(context=None, id=_uuid(), + qos_policy_id=self.policy4.id, + dscp_mark=32) + self.dscp.obj_reset_changes() + self.policy4.rules = [self.dscp] + + self.qos_policies = {self.policy.id: self.policy, + self.policy2.id: self.policy2, + self.policy3.id: self.policy3, + self.policy4.id: self.policy4} + + self.agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + self.ex_gw_port = {'id': _uuid(), + 'fixed_ips': [ + {'ip_address': TEST_QOS_GW_IP}], + 'qos_policy_id': self.policy.id, + 'enable_snat': True} + self.fip = {'id': _uuid(), + 'floating_ip_address': '172.24.4.9', + 'fixed_ip_address': '192.168.0.1', + 'floating_network_id': _uuid(), + 'port_id': _uuid(), + 'host': HOSTNAME, + 'qos_policy_id': self.policy.id} + self.router = {'id': _uuid(), + 'gw_port': self.ex_gw_port, + 'ha': False, + 'distributed': False, + lib_const.FLOATINGIP_KEY: [self.fip], + 'external_gateway_info': self.ex_gw_port} + self.router_info = l3router.RouterInfo(self.agent, self.router['id'], + self.router, **self.ri_kwargs) + self.router_info.ex_gw_port = self.ex_gw_port + self.agent.router_info[self.router['id']] = self.router_info + + def _mock_get_router_info(router_id): + return self.router_info + + self.get_router_info = mock.patch( + 'neutron.agent.l3.l3_agent_extension_api.' + 'L3AgentExtensionAPI.get_router_info').start() + self.get_router_info.side_effect = _mock_get_router_info + + self.agent_api = l3_ext_api.L3AgentExtensionAPI(None) + self.gw_ip_qos_ext.consume_api(self.agent_api) + + +class RouterGatewayIPQosAgentExtensionInitializeTestCase( + QosExtensionBaseTestCase): + + @mock.patch.object(registry, 'register') + @mock.patch.object(resources_rpc, 'ResourcesPushRpcCallback') + def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock): + call_to_patch = 'neutron.common.rpc.Connection' + with mock.patch(call_to_patch, + return_value=self.connection) as create_connection: + self.gw_ip_qos_ext.initialize( + self.connection, lib_const.L3_AGENT_MODE) + create_connection.assert_has_calls([mock.call()]) + self.connection.create_consumer.assert_has_calls( + [mock.call( + resources_rpc.resource_type_versioned_topic( + resources.QOS_POLICY), + [rpc_mock()], + fanout=True)] + ) + subscribe_mock.assert_called_with(mock.ANY, resources.QOS_POLICY) + + +class RouterGatewayIPQosAgentExtensionTestCase( + QosExtensionBaseTestCase): + + def setUp(self): + super(RouterGatewayIPQosAgentExtensionTestCase, self).setUp() + self.gw_ip_qos_ext.initialize( + self.connection, lib_const.L3_AGENT_MODE) + self._set_pull_mock() + + def _set_pull_mock(self): + + def _pull_mock(context, resource_type, resource_id): + return self.qos_policies[resource_id] + + self.pull = mock.patch( + 'neutron.api.rpc.handlers.resources_rpc.' + 'ResourcesPullRpcApi.pull').start() + self.pull.side_effect = _pull_mock + + def _test_gateway_ip_add(self, func): + tc_wrapper = mock.Mock() + with mock.patch.object(self.gw_ip_qos_ext, '_get_tc_wrapper', + return_value=tc_wrapper): + func(self.context, self.router) + tc_wrapper.set_ip_rate_limit.assert_has_calls( + [mock.call(lib_const.INGRESS_DIRECTION, + TEST_QOS_GW_IP, 1111, 2222), + mock.call(lib_const.EGRESS_DIRECTION, + TEST_QOS_GW_IP, 3333, 4444)], + any_order=True) + + self.assertEqual( + {self.router_info.router_id: self.policy.id}, + self.gw_ip_qos_ext.gateway_ip_qos_map.resource_policies) + + def test_add_router(self): + self._test_gateway_ip_add(self.gw_ip_qos_ext.add_router) + + def test_update_router(self): + self._test_gateway_ip_add(self.gw_ip_qos_ext.update_router) + + def test__process_update_policy(self): + tc_wrapper = mock.Mock() + with mock.patch.object(self.gw_ip_qos_ext, '_get_tc_wrapper', + return_value=tc_wrapper): + self.gw_ip_qos_ext.add_router(self.context, self.router) + tc_wrapper.set_ip_rate_limit.assert_has_calls( + [mock.call(lib_const.INGRESS_DIRECTION, + TEST_QOS_GW_IP, 1111, 2222), + mock.call(lib_const.EGRESS_DIRECTION, + TEST_QOS_GW_IP, 3333, 4444)], + any_order=True) + new_policy = copy.deepcopy(self.policy) + new_policy.rules = [self.new_ingress_rule, self.egress_rule] + self.gw_ip_qos_ext._process_update_policy(new_policy) + self.assertEqual( + {self.router_info.router_id: self.policy.id}, + self.gw_ip_qos_ext.gateway_ip_qos_map.resource_policies) + tc_wrapper.set_ip_rate_limit.assert_has_calls( + [mock.call(lib_const.INGRESS_DIRECTION, + TEST_QOS_GW_IP, 5555, 6666), + mock.call(lib_const.EGRESS_DIRECTION, + TEST_QOS_GW_IP, 3333, 4444)], + any_order=True) diff --git a/setup.cfg b/setup.cfg index 0b5e17c0b7e..de5c0e38a8e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -110,6 +110,7 @@ neutron.agent.l2.extensions = log = neutron.services.logapi.agent.log_extension:LoggingExtension neutron.agent.l3.extensions = fip_qos = neutron.agent.l3.extensions.qos.fip:FipQosAgentExtension + gateway_ip_qos = neutron.agent.l3.extensions.qos.gateway_ip:RouterGatewayIPQosAgentExtension port_forwarding = neutron.agent.l3.extensions.port_forwarding:PortForwardingAgentExtension snat_log = neutron.agent.l3.extensions.snat_log:SNATLoggingExtension neutron.services.logapi.drivers =