From ab57410ec8e5fae97dec052a5940debd614e4e50 Mon Sep 17 00:00:00 2001 From: LIU Yulong Date: Sat, 13 Apr 2019 11:01:02 +0800 Subject: [PATCH] Minimizing L3 agent QoS extensions lock granularity If agent is concurrently processing large set of resources, the bottleneck lock will multiply increase processing time of those resources which have been waiting for the lock for a long time. This patch moves the lock to the core cache resource, and leverage the coordination lock to the resource prcessing and notification thread functions. Closes-Bug: #1824911 Change-Id: Id43829b11631727f1a46362ffea5c22d2177fd79 --- neutron/agent/l3/extensions/qos/base.py | 76 +++++++++---- neutron/agent/l3/extensions/qos/fip.py | 100 +++++++++++++----- neutron/agent/l3/extensions/qos/gateway_ip.py | 10 +- .../unit/agent/l3/extensions/qos/test_base.py | 2 +- .../unit/agent/l3/extensions/qos/test_fip.py | 34 ++++++ ...ing-lock-granularity-8bc2f893d9389cf8.yaml | 6 ++ 6 files changed, 174 insertions(+), 54 deletions(-) create mode 100644 releasenotes/notes/Minimizing-lock-granularity-8bc2f893d9389cf8.yaml diff --git a/neutron/agent/l3/extensions/qos/base.py b/neutron/agent/l3/extensions/qos/base.py index 372ddac6eb0..f1f3f15f87e 100644 --- a/neutron/agent/l3/extensions/qos/base.py +++ b/neutron/agent/l3/extensions/qos/base.py @@ -19,6 +19,7 @@ from neutron_lib import constants from neutron_lib.db import constants as db_consts from neutron_lib import rpc as n_rpc from neutron_lib.services.qos import constants as qos_consts +from oslo_concurrency import lockutils from oslo_log import log as logging from neutron.agent.linux import l3_tc_lib as tc_lib @@ -51,23 +52,44 @@ IP_DEFAULT_BURST = 0 class RateLimitMaps(object): - def __init__(self): + def __init__(self, lock_name): self.qos_policy_resources = collections.defaultdict(dict) self.known_policies = {} self.resource_policies = {} + self.lock_name = lock_name def update_policy(self, policy): - self.known_policies[policy.id] = policy + + @lockutils.synchronized(self.lock_name) + def _update_policy(): + self.known_policies[policy.id] = policy + + return _update_policy() def get_policy(self, policy_id): - return self.known_policies.get(policy_id) + + @lockutils.synchronized(self.lock_name) + def _get_policy(): + return self.known_policies.get(policy_id) + + return _get_policy() def get_resources(self, policy): - return self.qos_policy_resources[policy.id].values() + + @lockutils.synchronized(self.lock_name) + def _get_resources(): + return self.qos_policy_resources[policy.id].values() + + return _get_resources() def get_resource_policy(self, resource): - policy_id = self.resource_policies.get(resource) - return self.get_policy(policy_id) + + @lockutils.synchronized(self.lock_name) + def _get_resource_policy(): + policy_id = self.resource_policies.get(resource) + return self.known_policies.get(policy_id) + + return _get_resource_policy() def set_resource_policy(self, resource, policy): """Attach a resource to policy @@ -75,12 +97,17 @@ class RateLimitMaps(object): and return any previous policy on resource. """ - old_policy = self.get_resource_policy(resource) - self.update_policy(policy) - self.resource_policies[resource] = policy.id - self.qos_policy_resources[policy.id][resource] = resource - if old_policy and old_policy.id != policy.id: - del self.qos_policy_resources[old_policy.id][resource] + @lockutils.synchronized(self.lock_name) + def _set_resource_policy(): + policy_id = self.resource_policies.get(resource) + old_policy = self.known_policies.get(policy_id) + self.known_policies[policy.id] = policy + self.resource_policies[resource] = policy.id + self.qos_policy_resources[policy.id][resource] = resource + if old_policy and old_policy.id != policy.id: + del self.qos_policy_resources[old_policy.id][resource] + + _set_resource_policy() def clean_by_resource(self, resource): """Detach resource from policy @@ -88,16 +115,21 @@ class RateLimitMaps(object): and cleanup data we don't need anymore. """ - if resource in self.resource_policies: - del self.resource_policies[resource] - for qos_policy_id, res_dict in self.qos_policy_resources.items(): - if resource in res_dict: - del res_dict[resource] - if not res_dict: - self._clean_policy_info(qos_policy_id) - return - LOG.debug("L3 QoS extension did not have " - "information on floating IP %s", resource) + @lockutils.synchronized(self.lock_name) + def _clean_by_resource(): + if resource in self.resource_policies: + del self.resource_policies[resource] + for (qos_policy_id, + res_dict) in self.qos_policy_resources.items(): + if resource in res_dict: + del res_dict[resource] + if not res_dict: + self._clean_policy_info(qos_policy_id) + return + LOG.debug("L3 QoS extension did not have " + "information on floating IP %s", resource) + + _clean_by_resource() def _clean_policy_info(self, qos_policy_id): del self.qos_policy_resources[qos_policy_id] diff --git a/neutron/agent/l3/extensions/qos/fip.py b/neutron/agent/l3/extensions/qos/fip.py index e9b6d6f96f7..f72fbcceb7f 100644 --- a/neutron/agent/l3/extensions/qos/fip.py +++ b/neutron/agent/l3/extensions/qos/fip.py @@ -24,11 +24,14 @@ from neutron.agent.linux import ip_lib from neutron.api.rpc.callbacks import events from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import resources_rpc +from neutron.common import coordination LOG = logging.getLogger(__name__) class RouterFipRateLimitMaps(qos_base.RateLimitMaps): + LOCK_NAME = "fip-qos-cache" + def __init__(self): """Initialize RouterFipRateLimitMaps @@ -51,12 +54,58 @@ class RouterFipRateLimitMaps(qos_base.RateLimitMaps): """ self.ingress_ratelimits = {} self.egress_ratelimits = {} - super(RouterFipRateLimitMaps, self).__init__() + super(RouterFipRateLimitMaps, self).__init__(self.LOCK_NAME) def find_fip_router_id(self, fip): - for router_id, ips in self.router_floating_ips.items(): - if fip in ips: - return router_id + + @lockutils.synchronized(self.lock_name) + def _find_fip_router_id(): + for router_id, ips in self.router_floating_ips.items(): + if fip in ips: + return router_id + + return _find_fip_router_id() + + def get_router_floating_ips(self, router_id): + + @lockutils.synchronized(self.lock_name) + def _get_router_floating_ips(): + return self.router_floating_ips.pop( + router_id, []) + + return _get_router_floating_ips() + + def remove_fip_ratelimit_cache(self, direction, fip): + + @lockutils.synchronized(self.lock_name) + def _remove_fip_ratelimit_cache(): + rate_limits_direction = direction + "_ratelimits" + rate_limits = getattr(self, rate_limits_direction, {}) + rate_limits.pop(fip, None) + + _remove_fip_ratelimit_cache() + + def set_fip_ratelimit_cache(self, direction, fip, rate, burst): + + @lockutils.synchronized(self.lock_name) + def _set_fip_ratelimit_cache(): + rate_limits_direction = direction + "_ratelimits" + rate_limits = getattr(self, rate_limits_direction, {}) + rate_limits[fip] = (rate, burst) + + _set_fip_ratelimit_cache() + + def get_fip_ratelimit_cache(self, direction, fip): + + @lockutils.synchronized(self.lock_name) + def _get_fip_ratelimit_cache(): + rate_limits_direction = direction + "_ratelimits" + rate_limits = getattr(self, rate_limits_direction, {}) + rate, burst = rate_limits.get(fip, (qos_base.IP_DEFAULT_RATE, + qos_base.IP_DEFAULT_BURST)) + return rate, burst + + return _get_fip_ratelimit_cache() class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, @@ -68,7 +117,6 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, self.fip_qos_map = RouterFipRateLimitMaps() self._register_rpc_consumers() - @lockutils.synchronized('qos-fip') def _handle_notification(self, context, resource_type, qos_policies, event_type): if event_type == events.UPDATED: @@ -98,20 +146,16 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, fip, dvr_fip_device, rates, with_cache=False) self.fip_qos_map.update_policy(qos_policy) + def _remove_fip_rate_limit_cache(self, fip): + for direction in constants.VALID_DIRECTIONS: + self.fip_qos_map.remove_fip_ratelimit_cache(direction, fip) + def _process_reset_fip(self, fip): self.fip_qos_map.clean_by_resource(fip) - def process_ip_rate_limit(self, ip, direction, device, rate, burst): - rate_limits_direction = direction + "_ratelimits" - rate_limits = getattr(self.fip_qos_map, rate_limits_direction, {}) - old_rate, old_burst = rate_limits.get(ip, (qos_base.IP_DEFAULT_RATE, - qos_base.IP_DEFAULT_BURST)) - - if old_rate == rate and old_burst == burst: - # Two possibilities here: - # 1. Floating IP rate limit does not change. - # 2. Floating IP bandwidth does not limit. - return + @coordination.synchronized('qos-floating-ip-{ip}') + def process_ip_rate_limit(self, ip, direction, + device, rate, burst): tc_wrapper = self._get_tc_wrapper(device) @@ -121,12 +165,11 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, # floating IP bandwidth was changed to default value (no limit). # NOTE: l3_tc_lib will ignore exception FilterIDForIPNotFound. tc_wrapper.clear_ip_rate_limit(direction, ip) - rate_limits.pop(ip, None) + self.fip_qos_map.remove_fip_ratelimit_cache(direction, ip) return # Finally just set it, l3_tc_lib will clean the old rules if exists. tc_wrapper.set_ip_rate_limit(direction, ip, rate, burst) - rate_limits[ip] = (rate, burst) def _get_rate_limit_ip_device(self, router_info): ex_gw_port = router_info.get_ex_gw_port() @@ -152,17 +195,12 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, namespace = router_info.get_gw_ns_name() return ip_lib.IPDevice(name, namespace=namespace) - def _remove_ip_rate_limit_cache(self, ip, direction): - rate_limits_direction = direction + "_ratelimits" - rate_limits = getattr(self.fip_qos_map, rate_limits_direction, {}) - rate_limits.pop(ip, None) - def _remove_fip_rate_limit(self, device, fip_ip): tc_wrapper = self._get_tc_wrapper(device) for direction in constants.VALID_DIRECTIONS: if device.exists(): tc_wrapper.clear_ip_rate_limit(direction, fip_ip) - self._remove_ip_rate_limit_cache(fip_ip, direction) + self.fip_qos_map.remove_fip_ratelimit_cache(direction, fip_ip) def get_fip_qos_rates(self, context, fip, policy_id): if policy_id is None: @@ -184,9 +222,21 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, for direction in constants.VALID_DIRECTIONS: rate = rates.get(direction) if with_cache: + + old_rate, old_burst = self.fip_qos_map.get_fip_ratelimit_cache( + direction, fip) + if old_rate == rate['rate'] and old_burst == rate['burst']: + # Two possibilities here: + # 1. Floating IP rate limit does not change. + # 2. Floating IP bandwidth does not limit. + continue + self.process_ip_rate_limit( fip, direction, device, rate['rate'], rate['burst']) + + self.fip_qos_map.set_fip_ratelimit_cache( + direction, fip, rate['rate'], rate['burst']) else: tc_wrapper = self._get_tc_wrapper(device) if (rate['rate'] == qos_base.IP_DEFAULT_RATE and @@ -280,13 +330,11 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, self._remove_fip_rate_limit(dvr_fip_device, fip) self._process_reset_fip(fip) - @lockutils.synchronized('qos-fip') def add_router(self, context, data): router_info = self._get_router_info(data['id']) if router_info: self.process_floating_ip_addresses(context, router_info) - @lockutils.synchronized('qos-fip') def update_router(self, context, data): router_info = self._get_router_info(data['id']) if router_info: diff --git a/neutron/agent/l3/extensions/qos/gateway_ip.py b/neutron/agent/l3/extensions/qos/gateway_ip.py index d8abbc57ece..316127d3657 100644 --- a/neutron/agent/l3/extensions/qos/gateway_ip.py +++ b/neutron/agent/l3/extensions/qos/gateway_ip.py @@ -17,7 +17,6 @@ import netaddr from neutron_lib.agent import l3_extension from neutron_lib import constants -from oslo_concurrency import lockutils from oslo_log import log as logging @@ -26,6 +25,7 @@ from neutron.agent.linux import ip_lib from neutron.api.rpc.callbacks import events from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import resources_rpc +from neutron.common import coordination LOG = logging.getLogger(__name__) @@ -37,9 +37,9 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase, """Initialize agent extension.""" self.resource_rpc = resources_rpc.ResourcesPullRpcApi() self._register_rpc_consumers() - self.gateway_ip_qos_map = qos_base.RateLimitMaps() + self.gateway_ip_qos_map = qos_base.RateLimitMaps( + "gateway-ip-qos-cache") - @lockutils.synchronized('qos-gateway-ip') def _handle_notification(self, context, resource_type, qos_policies, event_type): if event_type == events.UPDATED: @@ -81,13 +81,11 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase, router_id, qos_policy) self.gateway_ip_qos_map.update_policy(qos_policy) - @lockutils.synchronized('qos-gateway-ip') def add_router(self, context, data): router_info = self._get_router_info(data['id']) if router_info: self.process_gateway_rate_limit(context, router_info) - @lockutils.synchronized('qos-gateway-ip') def update_router(self, context, data): router_info = self._get_router_info(data['id']) if router_info: @@ -120,6 +118,7 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase, self._handle_router_gateway_rate_limit(context, router_info) + @coordination.synchronized('qos-gateway-ip-{router_info.router_id}') def _empty_router_gateway_rate_limits(self, router_info, tc_wrapper): self.gateway_ip_qos_map.clean_by_resource(router_info.router_id) for ip in router_info.qos_gateway_ips: @@ -172,6 +171,7 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase, router_info.router_id, policy) return self.get_policy_rates(policy) + @coordination.synchronized('qos-gateway-ip-{router_info.router_id}') def _set_gateway_tc_rules(self, router_info, tc_wrapper, ex_gw_port, rates): for ip_addr in ex_gw_port['fixed_ips']: diff --git a/neutron/tests/unit/agent/l3/extensions/qos/test_base.py b/neutron/tests/unit/agent/l3/extensions/qos/test_base.py index 5c626cfaf8a..e43fafb9970 100644 --- a/neutron/tests/unit/agent/l3/extensions/qos/test_base.py +++ b/neutron/tests/unit/agent/l3/extensions/qos/test_base.py @@ -34,7 +34,7 @@ class RateLimitMapsTestCase(base.BaseTestCase): def setUp(self): super(RateLimitMapsTestCase, self).setUp() - self.policy_map = qos_base.RateLimitMaps() + self.policy_map = qos_base.RateLimitMaps("cache-lock") def test_update_policy(self): self.policy_map.update_policy(TEST_POLICY) diff --git a/neutron/tests/unit/agent/l3/extensions/qos/test_fip.py b/neutron/tests/unit/agent/l3/extensions/qos/test_fip.py index 09ee853e90f..2def99ead6b 100644 --- a/neutron/tests/unit/agent/l3/extensions/qos/test_fip.py +++ b/neutron/tests/unit/agent/l3/extensions/qos/test_fip.py @@ -384,3 +384,37 @@ class RouterFipRateLimitMapsTestCase(base.BaseTestCase): self.assertIsNone(self.policy_map.find_fip_router_id("8.8.8.8")) self.assertEqual(router_id, self.policy_map.find_fip_router_id(TEST_FIP)) + + def test_get_router_floating_ips(self): + router_id = _uuid() + test_ips = [TEST_FIP, TEST_FIP2] + self.policy_map.router_floating_ips[router_id] = set([TEST_FIP, + TEST_FIP2]) + get_ips = self.policy_map.get_router_floating_ips(router_id) + self.assertEqual(len(test_ips), len(get_ips)) + + def test_remove_fip_ratelimit_cache(self): + fip = "1.1.1.1" + self.policy_map.set_fip_ratelimit_cache( + "ingress", fip, 100, 200) + self.policy_map.set_fip_ratelimit_cache( + "egress", fip, 100, 200) + self.policy_map.remove_fip_ratelimit_cache("ingress", fip) + self.assertIsNone(self.policy_map.ingress_ratelimits.get(fip)) + self.policy_map.remove_fip_ratelimit_cache("egress", fip) + self.assertIsNone(self.policy_map.egress_ratelimits.get(fip)) + + def test_set_fip_ratelimit_cache(self): + fip = "1.1.1.1" + self.policy_map.set_fip_ratelimit_cache( + "ingress", fip, 100, 200) + self.policy_map.set_fip_ratelimit_cache( + "egress", fip, 300, 400) + in_rate, in_burst = self.policy_map.get_fip_ratelimit_cache( + "ingress", fip) + self.assertEqual(100, in_rate) + self.assertEqual(200, in_burst) + e_rate, e_burst = self.policy_map.get_fip_ratelimit_cache( + "egress", fip) + self.assertEqual(300, e_rate) + self.assertEqual(400, e_burst) diff --git a/releasenotes/notes/Minimizing-lock-granularity-8bc2f893d9389cf8.yaml b/releasenotes/notes/Minimizing-lock-granularity-8bc2f893d9389cf8.yaml new file mode 100644 index 00000000000..b76c6d22803 --- /dev/null +++ b/releasenotes/notes/Minimizing-lock-granularity-8bc2f893d9389cf8.yaml @@ -0,0 +1,6 @@ +--- +fixes: + - | + Leverage the coordination lock to the resource processing + and notification thread functions to minimize the lock + granularity.