Merge "Refactor for L3 router QoS extensions"

This commit is contained in:
Zuul 2018-05-17 12:32:43 +00:00 committed by Gerrit Code Review
commit 50ca126676
9 changed files with 269 additions and 186 deletions

View File

@ -0,0 +1,166 @@
# Copyright 2017 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from neutron_lib import constants
from neutron_lib.db import constants as db_consts
from neutron_lib.services.qos import constants as qos_consts
from oslo_log import log as logging
from neutron.agent.linux import l3_tc_lib as tc_lib
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import rpc as n_rpc
LOG = logging.getLogger(__name__)
SUPPORTED_RULES = {
qos_consts.RULE_TYPE_BANDWIDTH_LIMIT: {
qos_consts.MAX_KBPS: {
'type:range': [0, db_consts.DB_INTEGER_MAX_VALUE]},
qos_consts.MAX_BURST: {
'type:range': [0, db_consts.DB_INTEGER_MAX_VALUE]},
qos_consts.DIRECTION: {
'type:values': constants.VALID_DIRECTIONS}
}
}
# We use the default values to illustrate:
# 1. QoS policy does not have some direction `bandwidth_limit`, then we use
# the default value.
# 2. default value 0 will be treated as no limit.
# 3. if one IP's rate was changed from x to 0, the extension will do
# a tc filter clean procedure.
IP_DEFAULT_RATE = 0
IP_DEFAULT_BURST = 0
class RateLimitMaps(object):
def __init__(self):
self.qos_policy_resources = collections.defaultdict(dict)
self.known_policies = {}
self.resource_policies = {}
def update_policy(self, policy):
self.known_policies[policy.id] = policy
def get_policy(self, policy_id):
return self.known_policies.get(policy_id)
def get_resources(self, policy):
return self.qos_policy_resources[policy.id].values()
def get_resource_policy(self, resource):
policy_id = self.resource_policies.get(resource)
return self.get_policy(policy_id)
def set_resource_policy(self, resource, policy):
"""Attach a resource to policy
and return any previous policy on resource.
"""
old_policy = self.get_resource_policy(resource)
self.update_policy(policy)
self.resource_policies[resource] = policy.id
self.qos_policy_resources[policy.id][resource] = resource
if old_policy and old_policy.id != policy.id:
del self.qos_policy_resources[old_policy.id][resource]
def clean_by_resource(self, resource):
"""Detach resource from policy
and cleanup data we don't need anymore.
"""
if resource in self.resource_policies:
del self.resource_policies[resource]
for qos_policy_id, res_dict in self.qos_policy_resources.items():
if resource in res_dict:
del res_dict[resource]
if not res_dict:
self._clean_policy_info(qos_policy_id)
return
LOG.debug("L3 QoS extension did not have "
"information on floating IP %s", resource)
def _clean_policy_info(self, qos_policy_id):
del self.qos_policy_resources[qos_policy_id]
del self.known_policies[qos_policy_id]
class L3QosAgentExtensionBase(object):
SUPPORTED_RESOURCE_TYPES = [resources.QOS_POLICY]
def consume_api(self, agent_api):
self.agent_api = agent_api
def _handle_notification(self, context, resource_type,
qos_policies, event_type):
pass
def _process_update_policy(self, qos_policy):
pass
def _policy_rules_modified(self, old_policy, policy):
return not (len(old_policy.rules) == len(policy.rules) and
all(i in old_policy.rules for i in policy.rules))
def _register_rpc_consumers(self):
registry.register(self._handle_notification, resources.QOS_POLICY)
self._connection = n_rpc.Connection()
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
topic = resources_rpc.resource_type_versioned_topic(
resources.QOS_POLICY)
self._connection.create_consumer(topic, endpoints, fanout=True)
self._connection.consume_in_threads()
def _get_tc_wrapper(self, device):
return tc_lib.FloatingIPTcCommand(device.name,
namespace=device.namespace)
def get_policy_rates(self, policy):
rates = {}
for rule in policy.rules:
# NOTE(liuyulong): for now, the L3 agent QoS extensions only
# use ``bandwidth_limit`` rules.
if rule.rule_type in SUPPORTED_RULES:
if rule.direction not in rates:
rates[rule.direction] = {"rate": rule.max_kbps,
"burst": rule.max_burst_kbps}
# The return rates dict must contain all directions. If there is no
# one specific direction QoS rule, use the default values.
for direction in constants.VALID_DIRECTIONS:
if direction not in rates:
LOG.debug("Policy %(id)s does not have '%(direction)s' "
"bandwidth_limit rule, use default value instead.",
{"id": policy.id,
"direction": direction})
rates[direction] = {"rate": IP_DEFAULT_RATE,
"burst": IP_DEFAULT_BURST}
return rates
def _get_router_info(self, router_id):
router_info = self.agent_api.get_router_info(router_id)
if router_info:
return router_info
LOG.debug("Router %s is not managed by this agent. "
"It was possibly deleted concurrently.",
router_id)

View File

@ -13,52 +13,23 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
from neutron_lib.agent import l3_extension
from neutron_lib import constants
from neutron_lib.db import constants as db_consts
from neutron_lib.services.qos import constants as qos_consts
from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron.agent.l3.extensions.qos import base as qos_base
from neutron.agent.linux import ip_lib
from neutron.agent.linux import l3_tc_lib as tc_lib
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import rpc as n_rpc
LOG = logging.getLogger(__name__)
SUPPORTED_RULES = {
qos_consts.RULE_TYPE_BANDWIDTH_LIMIT: {
qos_consts.MAX_KBPS: {
'type:range': [0, db_consts.DB_INTEGER_MAX_VALUE]},
qos_consts.MAX_BURST: {
'type:range': [0, db_consts.DB_INTEGER_MAX_VALUE]},
qos_consts.DIRECTION: {
'type:values': constants.VALID_DIRECTIONS}
}
}
# We use the default values to illustrate:
# 1. QoS policy does not have some direction `bandwidth_limit`, then we use
# the default value.
# 2. default value 0 will be treated as no limit.
# 3. if one floating IP's rate was changed from x to 0, the extension will do
# a tc filter clean procedure.
FIP_DEFAULT_RATE = 0
FIP_DEFAULT_BURST = 0
class RouterFipRateLimitMaps(object):
class RouterFipRateLimitMaps(qos_base.RateLimitMaps):
def __init__(self):
self.qos_policy_fips = collections.defaultdict(dict)
self.known_policies = {}
self.fip_policies = {}
"""
The router_floating_ips will be:
router_floating_ips = {
@ -72,52 +43,14 @@ class RouterFipRateLimitMaps(object):
The rate limits dict will be:
xxx_ratelimits = {
fip_1: (rate, burst),
fip_2: (FIP_DEFAULT_RATE, FIP_DEFAULT_BURST), # default
fip_2: (IP_DEFAULT_RATE, IP_DEFAULT_BURST), # default
fip_3: (1, 2),
fip_4: (3, 4),
}
"""
self.ingress_ratelimits = {}
self.egress_ratelimits = {}
def update_policy(self, policy):
self.known_policies[policy.id] = policy
def get_policy(self, policy_id):
return self.known_policies.get(policy_id)
def get_fips(self, policy):
return self.qos_policy_fips[policy.id].values()
def get_fip_policy(self, fip):
policy_id = self.fip_policies.get(fip)
return self.get_policy(policy_id)
def set_fip_policy(self, fip, policy):
"""Attach a fip to policy and return any previous policy on fip."""
old_policy = self.get_fip_policy(fip)
self.update_policy(policy)
self.fip_policies[fip] = policy.id
self.qos_policy_fips[policy.id][fip] = fip
if old_policy and old_policy.id != policy.id:
del self.qos_policy_fips[old_policy.id][fip]
def clean_by_fip(self, fip):
"""Detach fip from policy and cleanup data we don't need anymore."""
if fip in self.fip_policies:
del self.fip_policies[fip]
for qos_policy_id, fip_dict in self.qos_policy_fips.items():
if fip in fip_dict:
del fip_dict[fip]
if not fip_dict:
self._clean_policy_info(qos_policy_id)
return
LOG.debug("Floating IP QoS extension did not have "
"information on floating IP %s", fip)
def _clean_policy_info(self, qos_policy_id):
del self.qos_policy_fips[qos_policy_id]
del self.known_policies[qos_policy_id]
super(RouterFipRateLimitMaps, self).__init__()
def find_fip_router_id(self, fip):
for router_id, ips in self.router_floating_ips.items():
@ -125,8 +58,8 @@ class RouterFipRateLimitMaps(object):
return router_id
class FipQosAgentExtension(l3_extension.L3AgentExtension):
SUPPORTED_RESOURCE_TYPES = [resources.QOS_POLICY]
class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
l3_extension.L3AgentExtension):
def initialize(self, connection, driver_type):
"""Initialize agent extension."""
@ -134,9 +67,6 @@ class FipQosAgentExtension(l3_extension.L3AgentExtension):
self.fip_qos_map = RouterFipRateLimitMaps()
self._register_rpc_consumers()
def consume_api(self, agent_api):
self.agent_api = agent_api
@lockutils.synchronized('qos-fip')
def _handle_notification(self, context, resource_type,
qos_policies, event_type):
@ -144,15 +74,11 @@ class FipQosAgentExtension(l3_extension.L3AgentExtension):
for qos_policy in qos_policies:
self._process_update_policy(qos_policy)
def _policy_rules_modified(self, old_policy, policy):
return not (len(old_policy.rules) == len(policy.rules) and
all(i in old_policy.rules for i in policy.rules))
def _process_update_policy(self, qos_policy):
old_qos_policy = self.fip_qos_map.get_policy(qos_policy.id)
if old_qos_policy:
if self._policy_rules_modified(old_qos_policy, qos_policy):
for fip in self.fip_qos_map.get_fips(qos_policy):
for fip in self.fip_qos_map.get_resources(qos_policy):
router_id = self.fip_qos_map.find_fip_router_id(fip)
router_info = self._get_router_info(router_id)
if not router_info:
@ -172,27 +98,13 @@ class FipQosAgentExtension(l3_extension.L3AgentExtension):
self.fip_qos_map.update_policy(qos_policy)
def _process_reset_fip(self, fip):
self.fip_qos_map.clean_by_fip(fip)
def _register_rpc_consumers(self):
registry.register(self._handle_notification, resources.QOS_POLICY)
self._connection = n_rpc.Connection()
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
topic = resources_rpc.resource_type_versioned_topic(
resources.QOS_POLICY)
self._connection.create_consumer(topic, endpoints, fanout=True)
self._connection.consume_in_threads()
def _get_tc_wrapper(self, device):
return tc_lib.FloatingIPTcCommand(device.name,
namespace=device.namespace)
self.fip_qos_map.clean_by_resource(fip)
def process_ip_rate_limit(self, ip, direction, device, rate, burst):
rate_limits_direction = direction + "_ratelimits"
rate_limits = getattr(self.fip_qos_map, rate_limits_direction, {})
old_rate, old_burst = rate_limits.get(ip, (FIP_DEFAULT_RATE,
FIP_DEFAULT_BURST))
old_rate, old_burst = rate_limits.get(ip, (qos_base.IP_DEFAULT_RATE,
qos_base.IP_DEFAULT_BURST))
if old_rate == rate and old_burst == burst:
# Two possibilities here:
@ -202,7 +114,8 @@ class FipQosAgentExtension(l3_extension.L3AgentExtension):
tc_wrapper = self._get_tc_wrapper(device)
if rate == FIP_DEFAULT_RATE and burst == FIP_DEFAULT_BURST:
if (rate == qos_base.IP_DEFAULT_RATE and
burst == qos_base.IP_DEFAULT_BURST):
# According to the agreements of default value definition,
# floating IP bandwidth was changed to default value (no limit).
# NOTE: l3_tc_lib will ignore exception FilterIDForIPNotFound.
@ -255,37 +168,17 @@ class FipQosAgentExtension(l3_extension.L3AgentExtension):
self._process_reset_fip(fip)
# process_ip_rate_limit will treat value 0 as
# cleaning the tc filters if exits or no action.
return {constants.INGRESS_DIRECTION: {"rate": FIP_DEFAULT_RATE,
"burst": FIP_DEFAULT_BURST},
constants.EGRESS_DIRECTION: {"rate": FIP_DEFAULT_RATE,
"burst": FIP_DEFAULT_BURST}}
return {constants.INGRESS_DIRECTION: {
"rate": qos_base.IP_DEFAULT_RATE,
"burst": qos_base.IP_DEFAULT_BURST},
constants.EGRESS_DIRECTION: {
"rate": qos_base.IP_DEFAULT_RATE,
"burst": qos_base.IP_DEFAULT_BURST}}
policy = self.resource_rpc.pull(
context, resources.QOS_POLICY, policy_id)
self.fip_qos_map.set_fip_policy(fip, policy)
self.fip_qos_map.set_resource_policy(fip, policy)
return self.get_policy_rates(policy)
def get_policy_rates(self, policy):
rates = {}
for rule in policy.rules:
# NOTE(liuyulong): for now, the L3 agent floating IP QoS
# extension only uses ``bandwidth_limit`` rules..
if rule.rule_type in SUPPORTED_RULES:
if rule.direction not in rates:
rates[rule.direction] = {"rate": rule.max_kbps,
"burst": rule.max_burst_kbps}
# The return rates dict must contain all directions. If there is no
# one specific direction QoS rule, use the default values.
for direction in constants.VALID_DIRECTIONS:
if direction not in rates:
LOG.debug("Policy %(id)s does not have '%(direction)s' "
"bandwidth_limit rule, use default value instead.",
{"id": policy.id,
"direction": direction})
rates[direction] = {"rate": FIP_DEFAULT_RATE,
"burst": FIP_DEFAULT_BURST}
return rates
def process_ip_rates(self, fip, device, rates, with_cache=True):
for direction in constants.VALID_DIRECTIONS:
rate = rates.get(direction)
@ -380,14 +273,6 @@ class FipQosAgentExtension(l3_extension.L3AgentExtension):
self._remove_fip_rate_limit(dvr_fip_device, fip)
self._process_reset_fip(fip)
def _get_router_info(self, router_id):
router_info = self.agent_api.get_router_info(router_id)
if router_info:
return router_info
LOG.debug("Router %s is not managed by this agent. "
"It was possibly deleted concurrently.",
router_id)
@lockutils.synchronized('qos-fip')
def add_router(self, context, data):
router_info = self._get_router_info(data['id'])

View File

@ -18,7 +18,7 @@ from neutron_lib import constants
from oslo_utils import uuidutils
from neutron.agent.l3 import agent as neutron_l3_agent
from neutron.agent.l3.extensions import fip_qos
from neutron.agent.l3.extensions.qos import fip as fip_qos
from neutron.agent.linux import ip_lib
from neutron.common import exceptions
from neutron.common import utils as common_utils

View File

@ -0,0 +1,81 @@
# Copyright 2017 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_utils import uuidutils
from neutron.agent.l3.extensions.qos import base as qos_base
from neutron.objects.qos import policy
from neutron.tests import base
_uuid = uuidutils.generate_uuid
TEST_POLICY = policy.QosPolicy(context=None,
name='test1', id=_uuid())
TEST_POLICY2 = policy.QosPolicy(context=None,
name='test2', id=_uuid())
TEST_RES_1 = "res1"
TEST_RES_2 = "res2"
class RateLimitMapsTestCase(base.BaseTestCase):
def setUp(self):
super(RateLimitMapsTestCase, self).setUp()
self.policy_map = qos_base.RateLimitMaps()
def test_update_policy(self):
self.policy_map.update_policy(TEST_POLICY)
self.assertEqual(TEST_POLICY,
self.policy_map.known_policies[TEST_POLICY.id])
def _set_resources(self):
self.policy_map.set_resource_policy(TEST_RES_1, TEST_POLICY)
self.policy_map.set_resource_policy(TEST_RES_2, TEST_POLICY2)
def test_set_resource_policy(self):
self._set_resources()
self.assertEqual(TEST_POLICY,
self.policy_map.known_policies[TEST_POLICY.id])
self.assertIn(TEST_RES_1,
self.policy_map.qos_policy_resources[TEST_POLICY.id])
def test_get_resource_policy(self):
self._set_resources()
self.assertEqual(TEST_POLICY,
self.policy_map.get_resource_policy(TEST_RES_1))
self.assertEqual(TEST_POLICY2,
self.policy_map.get_resource_policy(TEST_RES_2))
def test_get_resources(self):
self._set_resources()
self.assertEqual([TEST_RES_1],
list(self.policy_map.get_resources(TEST_POLICY)))
self.assertEqual([TEST_RES_2],
list(self.policy_map.get_resources(TEST_POLICY2)))
def test_clean_by_resource(self):
self._set_resources()
self.policy_map.clean_by_resource(TEST_RES_1)
self.assertNotIn(TEST_POLICY.id, self.policy_map.known_policies)
self.assertNotIn(TEST_RES_1, self.policy_map.resource_policies)
self.assertIn(TEST_POLICY2.id, self.policy_map.known_policies)
def test_clean_by_resource_for_unknown_resource(self):
self.policy_map._clean_policy_info = mock.Mock()
self.policy_map.clean_by_resource(TEST_RES_1)
self.policy_map._clean_policy_info.assert_not_called()

View File

@ -20,7 +20,7 @@ from neutron_lib.services.qos import constants as qos_consts
from oslo_utils import uuidutils
from neutron.agent.l3 import agent as l3_agent
from neutron.agent.l3.extensions import fip_qos
from neutron.agent.l3.extensions.qos import fip as fip_qos
from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api
from neutron.agent.l3 import router_info as l3router
from neutron.api.rpc.callbacks.consumer import registry
@ -32,11 +32,6 @@ from neutron.tests import base
from neutron.tests.unit.agent.l3 import test_agent
_uuid = uuidutils.generate_uuid
TEST_POLICY = policy.QosPolicy(context=None,
name='test1', id=_uuid())
TEST_POLICY2 = policy.QosPolicy(context=None,
name='test2', id=_uuid())
TEST_QOS_FIP = "3.3.3.3"
@ -382,50 +377,6 @@ class RouterFipRateLimitMapsTestCase(base.BaseTestCase):
super(RouterFipRateLimitMapsTestCase, self).setUp()
self.policy_map = fip_qos.RouterFipRateLimitMaps()
def test_update_policy(self):
self.policy_map.update_policy(TEST_POLICY)
self.assertEqual(TEST_POLICY,
self.policy_map.known_policies[TEST_POLICY.id])
def _set_fips(self):
self.policy_map.set_fip_policy(TEST_FIP, TEST_POLICY)
self.policy_map.set_fip_policy(TEST_FIP2, TEST_POLICY2)
def test_set_fip_policy(self):
self._set_fips()
self.assertEqual(TEST_POLICY,
self.policy_map.known_policies[TEST_POLICY.id])
self.assertIn(TEST_FIP,
self.policy_map.qos_policy_fips[TEST_POLICY.id])
def test_get_fip_policy(self):
self._set_fips()
self.assertEqual(TEST_POLICY,
self.policy_map.get_fip_policy(TEST_FIP))
self.assertEqual(TEST_POLICY2,
self.policy_map.get_fip_policy(TEST_FIP2))
def test_get_fips(self):
self._set_fips()
self.assertEqual([TEST_FIP],
list(self.policy_map.get_fips(TEST_POLICY)))
self.assertEqual([TEST_FIP2],
list(self.policy_map.get_fips(TEST_POLICY2)))
def test_clean_by_fip(self):
self._set_fips()
self.policy_map.clean_by_fip(TEST_FIP)
self.assertNotIn(TEST_POLICY.id, self.policy_map.known_policies)
self.assertNotIn(TEST_FIP, self.policy_map.fip_policies)
self.assertIn(TEST_POLICY2.id, self.policy_map.known_policies)
def test_clean_by_fip_for_unknown_fip(self):
self.policy_map._clean_policy_info = mock.Mock()
self.policy_map.clean_by_fip(TEST_FIP)
self.policy_map._clean_policy_info.assert_not_called()
def test_find_fip_router_id(self):
router_id = _uuid()
self.policy_map.router_floating_ips[router_id] = set([TEST_FIP,

View File

@ -106,7 +106,7 @@ neutron.agent.l2.extensions =
fdb = neutron.agent.l2.extensions.fdb_population:FdbPopulationAgentExtension
log = neutron.services.logapi.agent.log_extension:LoggingExtension
neutron.agent.l3.extensions =
fip_qos = neutron.agent.l3.extensions.fip_qos:FipQosAgentExtension
fip_qos = neutron.agent.l3.extensions.qos.fip:FipQosAgentExtension
neutron.services.logapi.drivers =
ovs = neutron.services.logapi.drivers.openvswitch.ovs_firewall_log:OVSFirewallLoggingDriver
neutron.qos.agent_drivers =