Minimizing L3 agent QoS extensions lock granularity

If agent is concurrently processing large set of resources,
the bottleneck lock will multiply increase processing time
of those resources which have been waiting for the lock for
a long time.

This patch moves the lock to the core cache resource, and
leverage the coordination lock to the resource prcessing
and notification thread functions.

Closes-Bug: #1824911
Change-Id: Id43829b11631727f1a46362ffea5c22d2177fd79
This commit is contained in:
LIU Yulong 2019-04-13 11:01:02 +08:00
parent 771a2a191c
commit ab57410ec8
6 changed files with 174 additions and 54 deletions

View File

@ -19,6 +19,7 @@ from neutron_lib import constants
from neutron_lib.db import constants as db_consts
from neutron_lib import rpc as n_rpc
from neutron_lib.services.qos import constants as qos_consts
from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron.agent.linux import l3_tc_lib as tc_lib
@ -51,23 +52,44 @@ IP_DEFAULT_BURST = 0
class RateLimitMaps(object):
def __init__(self):
def __init__(self, lock_name):
self.qos_policy_resources = collections.defaultdict(dict)
self.known_policies = {}
self.resource_policies = {}
self.lock_name = lock_name
def update_policy(self, policy):
self.known_policies[policy.id] = policy
@lockutils.synchronized(self.lock_name)
def _update_policy():
self.known_policies[policy.id] = policy
return _update_policy()
def get_policy(self, policy_id):
return self.known_policies.get(policy_id)
@lockutils.synchronized(self.lock_name)
def _get_policy():
return self.known_policies.get(policy_id)
return _get_policy()
def get_resources(self, policy):
return self.qos_policy_resources[policy.id].values()
@lockutils.synchronized(self.lock_name)
def _get_resources():
return self.qos_policy_resources[policy.id].values()
return _get_resources()
def get_resource_policy(self, resource):
policy_id = self.resource_policies.get(resource)
return self.get_policy(policy_id)
@lockutils.synchronized(self.lock_name)
def _get_resource_policy():
policy_id = self.resource_policies.get(resource)
return self.known_policies.get(policy_id)
return _get_resource_policy()
def set_resource_policy(self, resource, policy):
"""Attach a resource to policy
@ -75,12 +97,17 @@ class RateLimitMaps(object):
and return any previous policy on resource.
"""
old_policy = self.get_resource_policy(resource)
self.update_policy(policy)
self.resource_policies[resource] = policy.id
self.qos_policy_resources[policy.id][resource] = resource
if old_policy and old_policy.id != policy.id:
del self.qos_policy_resources[old_policy.id][resource]
@lockutils.synchronized(self.lock_name)
def _set_resource_policy():
policy_id = self.resource_policies.get(resource)
old_policy = self.known_policies.get(policy_id)
self.known_policies[policy.id] = policy
self.resource_policies[resource] = policy.id
self.qos_policy_resources[policy.id][resource] = resource
if old_policy and old_policy.id != policy.id:
del self.qos_policy_resources[old_policy.id][resource]
_set_resource_policy()
def clean_by_resource(self, resource):
"""Detach resource from policy
@ -88,16 +115,21 @@ class RateLimitMaps(object):
and cleanup data we don't need anymore.
"""
if resource in self.resource_policies:
del self.resource_policies[resource]
for qos_policy_id, res_dict in self.qos_policy_resources.items():
if resource in res_dict:
del res_dict[resource]
if not res_dict:
self._clean_policy_info(qos_policy_id)
return
LOG.debug("L3 QoS extension did not have "
"information on floating IP %s", resource)
@lockutils.synchronized(self.lock_name)
def _clean_by_resource():
if resource in self.resource_policies:
del self.resource_policies[resource]
for (qos_policy_id,
res_dict) in self.qos_policy_resources.items():
if resource in res_dict:
del res_dict[resource]
if not res_dict:
self._clean_policy_info(qos_policy_id)
return
LOG.debug("L3 QoS extension did not have "
"information on floating IP %s", resource)
_clean_by_resource()
def _clean_policy_info(self, qos_policy_id):
del self.qos_policy_resources[qos_policy_id]

View File

@ -24,11 +24,14 @@ from neutron.agent.linux import ip_lib
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import coordination
LOG = logging.getLogger(__name__)
class RouterFipRateLimitMaps(qos_base.RateLimitMaps):
LOCK_NAME = "fip-qos-cache"
def __init__(self):
"""Initialize RouterFipRateLimitMaps
@ -51,12 +54,58 @@ class RouterFipRateLimitMaps(qos_base.RateLimitMaps):
"""
self.ingress_ratelimits = {}
self.egress_ratelimits = {}
super(RouterFipRateLimitMaps, self).__init__()
super(RouterFipRateLimitMaps, self).__init__(self.LOCK_NAME)
def find_fip_router_id(self, fip):
for router_id, ips in self.router_floating_ips.items():
if fip in ips:
return router_id
@lockutils.synchronized(self.lock_name)
def _find_fip_router_id():
for router_id, ips in self.router_floating_ips.items():
if fip in ips:
return router_id
return _find_fip_router_id()
def get_router_floating_ips(self, router_id):
@lockutils.synchronized(self.lock_name)
def _get_router_floating_ips():
return self.router_floating_ips.pop(
router_id, [])
return _get_router_floating_ips()
def remove_fip_ratelimit_cache(self, direction, fip):
@lockutils.synchronized(self.lock_name)
def _remove_fip_ratelimit_cache():
rate_limits_direction = direction + "_ratelimits"
rate_limits = getattr(self, rate_limits_direction, {})
rate_limits.pop(fip, None)
_remove_fip_ratelimit_cache()
def set_fip_ratelimit_cache(self, direction, fip, rate, burst):
@lockutils.synchronized(self.lock_name)
def _set_fip_ratelimit_cache():
rate_limits_direction = direction + "_ratelimits"
rate_limits = getattr(self, rate_limits_direction, {})
rate_limits[fip] = (rate, burst)
_set_fip_ratelimit_cache()
def get_fip_ratelimit_cache(self, direction, fip):
@lockutils.synchronized(self.lock_name)
def _get_fip_ratelimit_cache():
rate_limits_direction = direction + "_ratelimits"
rate_limits = getattr(self, rate_limits_direction, {})
rate, burst = rate_limits.get(fip, (qos_base.IP_DEFAULT_RATE,
qos_base.IP_DEFAULT_BURST))
return rate, burst
return _get_fip_ratelimit_cache()
class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
@ -68,7 +117,6 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
self.fip_qos_map = RouterFipRateLimitMaps()
self._register_rpc_consumers()
@lockutils.synchronized('qos-fip')
def _handle_notification(self, context, resource_type,
qos_policies, event_type):
if event_type == events.UPDATED:
@ -98,20 +146,16 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
fip, dvr_fip_device, rates, with_cache=False)
self.fip_qos_map.update_policy(qos_policy)
def _remove_fip_rate_limit_cache(self, fip):
for direction in constants.VALID_DIRECTIONS:
self.fip_qos_map.remove_fip_ratelimit_cache(direction, fip)
def _process_reset_fip(self, fip):
self.fip_qos_map.clean_by_resource(fip)
def process_ip_rate_limit(self, ip, direction, device, rate, burst):
rate_limits_direction = direction + "_ratelimits"
rate_limits = getattr(self.fip_qos_map, rate_limits_direction, {})
old_rate, old_burst = rate_limits.get(ip, (qos_base.IP_DEFAULT_RATE,
qos_base.IP_DEFAULT_BURST))
if old_rate == rate and old_burst == burst:
# Two possibilities here:
# 1. Floating IP rate limit does not change.
# 2. Floating IP bandwidth does not limit.
return
@coordination.synchronized('qos-floating-ip-{ip}')
def process_ip_rate_limit(self, ip, direction,
device, rate, burst):
tc_wrapper = self._get_tc_wrapper(device)
@ -121,12 +165,11 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
# floating IP bandwidth was changed to default value (no limit).
# NOTE: l3_tc_lib will ignore exception FilterIDForIPNotFound.
tc_wrapper.clear_ip_rate_limit(direction, ip)
rate_limits.pop(ip, None)
self.fip_qos_map.remove_fip_ratelimit_cache(direction, ip)
return
# Finally just set it, l3_tc_lib will clean the old rules if exists.
tc_wrapper.set_ip_rate_limit(direction, ip, rate, burst)
rate_limits[ip] = (rate, burst)
def _get_rate_limit_ip_device(self, router_info):
ex_gw_port = router_info.get_ex_gw_port()
@ -152,17 +195,12 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
namespace = router_info.get_gw_ns_name()
return ip_lib.IPDevice(name, namespace=namespace)
def _remove_ip_rate_limit_cache(self, ip, direction):
rate_limits_direction = direction + "_ratelimits"
rate_limits = getattr(self.fip_qos_map, rate_limits_direction, {})
rate_limits.pop(ip, None)
def _remove_fip_rate_limit(self, device, fip_ip):
tc_wrapper = self._get_tc_wrapper(device)
for direction in constants.VALID_DIRECTIONS:
if device.exists():
tc_wrapper.clear_ip_rate_limit(direction, fip_ip)
self._remove_ip_rate_limit_cache(fip_ip, direction)
self.fip_qos_map.remove_fip_ratelimit_cache(direction, fip_ip)
def get_fip_qos_rates(self, context, fip, policy_id):
if policy_id is None:
@ -184,9 +222,21 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
for direction in constants.VALID_DIRECTIONS:
rate = rates.get(direction)
if with_cache:
old_rate, old_burst = self.fip_qos_map.get_fip_ratelimit_cache(
direction, fip)
if old_rate == rate['rate'] and old_burst == rate['burst']:
# Two possibilities here:
# 1. Floating IP rate limit does not change.
# 2. Floating IP bandwidth does not limit.
continue
self.process_ip_rate_limit(
fip, direction, device,
rate['rate'], rate['burst'])
self.fip_qos_map.set_fip_ratelimit_cache(
direction, fip, rate['rate'], rate['burst'])
else:
tc_wrapper = self._get_tc_wrapper(device)
if (rate['rate'] == qos_base.IP_DEFAULT_RATE and
@ -280,13 +330,11 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
self._remove_fip_rate_limit(dvr_fip_device, fip)
self._process_reset_fip(fip)
@lockutils.synchronized('qos-fip')
def add_router(self, context, data):
router_info = self._get_router_info(data['id'])
if router_info:
self.process_floating_ip_addresses(context, router_info)
@lockutils.synchronized('qos-fip')
def update_router(self, context, data):
router_info = self._get_router_info(data['id'])
if router_info:

View File

@ -17,7 +17,6 @@ import netaddr
from neutron_lib.agent import l3_extension
from neutron_lib import constants
from oslo_concurrency import lockutils
from oslo_log import log as logging
@ -26,6 +25,7 @@ from neutron.agent.linux import ip_lib
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import coordination
LOG = logging.getLogger(__name__)
@ -37,9 +37,9 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase,
"""Initialize agent extension."""
self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
self._register_rpc_consumers()
self.gateway_ip_qos_map = qos_base.RateLimitMaps()
self.gateway_ip_qos_map = qos_base.RateLimitMaps(
"gateway-ip-qos-cache")
@lockutils.synchronized('qos-gateway-ip')
def _handle_notification(self, context, resource_type,
qos_policies, event_type):
if event_type == events.UPDATED:
@ -81,13 +81,11 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase,
router_id, qos_policy)
self.gateway_ip_qos_map.update_policy(qos_policy)
@lockutils.synchronized('qos-gateway-ip')
def add_router(self, context, data):
router_info = self._get_router_info(data['id'])
if router_info:
self.process_gateway_rate_limit(context, router_info)
@lockutils.synchronized('qos-gateway-ip')
def update_router(self, context, data):
router_info = self._get_router_info(data['id'])
if router_info:
@ -120,6 +118,7 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase,
self._handle_router_gateway_rate_limit(context, router_info)
@coordination.synchronized('qos-gateway-ip-{router_info.router_id}')
def _empty_router_gateway_rate_limits(self, router_info, tc_wrapper):
self.gateway_ip_qos_map.clean_by_resource(router_info.router_id)
for ip in router_info.qos_gateway_ips:
@ -172,6 +171,7 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase,
router_info.router_id, policy)
return self.get_policy_rates(policy)
@coordination.synchronized('qos-gateway-ip-{router_info.router_id}')
def _set_gateway_tc_rules(self, router_info, tc_wrapper,
ex_gw_port, rates):
for ip_addr in ex_gw_port['fixed_ips']:

View File

@ -34,7 +34,7 @@ class RateLimitMapsTestCase(base.BaseTestCase):
def setUp(self):
super(RateLimitMapsTestCase, self).setUp()
self.policy_map = qos_base.RateLimitMaps()
self.policy_map = qos_base.RateLimitMaps("cache-lock")
def test_update_policy(self):
self.policy_map.update_policy(TEST_POLICY)

View File

@ -384,3 +384,37 @@ class RouterFipRateLimitMapsTestCase(base.BaseTestCase):
self.assertIsNone(self.policy_map.find_fip_router_id("8.8.8.8"))
self.assertEqual(router_id,
self.policy_map.find_fip_router_id(TEST_FIP))
def test_get_router_floating_ips(self):
router_id = _uuid()
test_ips = [TEST_FIP, TEST_FIP2]
self.policy_map.router_floating_ips[router_id] = set([TEST_FIP,
TEST_FIP2])
get_ips = self.policy_map.get_router_floating_ips(router_id)
self.assertEqual(len(test_ips), len(get_ips))
def test_remove_fip_ratelimit_cache(self):
fip = "1.1.1.1"
self.policy_map.set_fip_ratelimit_cache(
"ingress", fip, 100, 200)
self.policy_map.set_fip_ratelimit_cache(
"egress", fip, 100, 200)
self.policy_map.remove_fip_ratelimit_cache("ingress", fip)
self.assertIsNone(self.policy_map.ingress_ratelimits.get(fip))
self.policy_map.remove_fip_ratelimit_cache("egress", fip)
self.assertIsNone(self.policy_map.egress_ratelimits.get(fip))
def test_set_fip_ratelimit_cache(self):
fip = "1.1.1.1"
self.policy_map.set_fip_ratelimit_cache(
"ingress", fip, 100, 200)
self.policy_map.set_fip_ratelimit_cache(
"egress", fip, 300, 400)
in_rate, in_burst = self.policy_map.get_fip_ratelimit_cache(
"ingress", fip)
self.assertEqual(100, in_rate)
self.assertEqual(200, in_burst)
e_rate, e_burst = self.policy_map.get_fip_ratelimit_cache(
"egress", fip)
self.assertEqual(300, e_rate)
self.assertEqual(400, e_burst)

View File

@ -0,0 +1,6 @@
---
fixes:
- |
Leverage the coordination lock to the resource processing
and notification thread functions to minimize the lock
granularity.