Support pps limitation for openvswitch agent

Add packet rate limit rule to the openvswitch QoS
driver SUPPORTED_RULES list. This patch adds the
ability to limit neutron port packet I/O rate. We
will leverage the ovs meter to achieve the limitation.

The meter action is only supoorted when datapath is
in user mode (with ovs >= 2.7) or ovs kernel datapath with
kernel version >= 4.15 (and ovs >= 2.10).

[1] https://docs.openvswitch.org/en/latest/faq/releases/

Partially-Implements: bp/packet-rate-limit
Related-Bug: #1938966
Related-Bug: #1912460
Change-Id: Ib6341ad539afc9f94f1783a721cf5f793ccdc7d8
This commit is contained in:
LIU Yulong 2021-09-02 16:22:03 +08:00
parent 0232ead2c3
commit 5765186516
3 changed files with 330 additions and 2 deletions

View File

@ -1,4 +1,5 @@
# Copyright (c) 2015 OpenStack Foundation
# Copyright (c) 2021-2022 Chinaunicom
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -13,6 +14,7 @@
# under the License.
import collections
import random
from neutron_lib import constants
from neutron_lib.services.qos import constants as qos_consts
@ -25,8 +27,242 @@ from neutron.services.qos.drivers.openvswitch import driver
LOG = logging.getLogger(__name__)
MAX_RETIES = 1000
class QosOVSAgentDriver(qos.QosLinuxAgentDriver):
class MeterRuleManager(object):
# This cache will be:
# PORT_METER_ID = {"port_id_1_ingress": 1,
# "port_id_1_egress: 2,
# "port_id_2_ingress": 3,
# "port_id_2_egress: 4}
PORT_METER_ID = {}
# This will be:
# PORT_INFO_INGRESS = {"port_id_1": (of_port_name, mac_1, local_vlan_1),
# "port_id_2": (of_port_name, mac_2, local_vlan_2),
PORT_INFO_INGRESS = {}
# PORT_INFO_EGRESS = {"port_id_1": (of_port_name, mac_1, of_port_1),
# "port_id_2": (of_port_name, mac_2, of_port_2),
PORT_INFO_EGRESS = {}
def __init__(self, br_int):
self.br_int = br_int
self._init_max_meter_id()
def _init_max_meter_id(self):
self.max_meter = 0
features = self.br_int.list_meter_features()
for f in features:
if f["max_meter"] > 0:
self.max_meter = f["max_meter"]
break
def get_data_key(self, port_id, direction):
return "%s_%s" % (port_id, direction)
def load_port_meter_id(self, port_name, port_id, direction):
key = self.get_data_key(port_id, direction)
meter_id = self.br_int.get_value_from_other_config(
port_name, key, value_type=int)
if meter_id:
self.PORT_METER_ID[key] = meter_id
else:
LOG.warning("Failed to load port %(port)s meter id in "
"direction %(direction)s",
{"direction": direction,
"port": port_id})
return meter_id
def store_port_meter_id_to_ovsdb(self, port_name, port_id,
direction, meter_id):
key = self.get_data_key(port_id, direction)
self.br_int.set_value_to_other_config(
port_name, key, meter_id)
def clean_port_meter_id_from_ovsdb(self, port_name, port_id, direction):
key = self.get_data_key(port_id, direction)
self.br_int.remove_value_from_other_config(
port_name, key)
def generate_meter_id(self):
if self.max_meter <= 0:
return
used_meter_ids = self.PORT_METER_ID.values()
cid = None
times = 0
while not cid or cid in used_meter_ids:
cid = random.randint(1, self.max_meter)
times += 1
if times >= MAX_RETIES:
LOG.warning("Failed to allocate meter "
"id after %d retries", times)
return
return cid
def allocate_meter_id(self, port_id, direction):
meter_id = self.generate_meter_id()
if not meter_id:
return
key = self.get_data_key(port_id, direction)
self.PORT_METER_ID[key] = meter_id
return meter_id
def remove_port_meter_id(self, port_id, direction):
key = self.get_data_key(port_id, direction)
return self.PORT_METER_ID.pop(key, None)
def set_port_info_ingress(self, port_id, port_name, mac, vlan):
self.PORT_INFO_INGRESS[port_id] = (port_name, mac, vlan)
def remove_port_info_ingress(self, port_id):
return self.PORT_INFO_INGRESS.pop(port_id, (None, None, None))
def set_port_info_egress(self, port_id, port_name, mac, ofport):
self.PORT_INFO_EGRESS[port_id] = (port_name, mac, ofport)
def remove_port_info_egress(self, port_id):
return self.PORT_INFO_EGRESS.pop(port_id, (None, None, None))
class OVSPacketRatelimitDriver(object):
SUPPORT_METER = None
def check_meter_features(self):
features = self.br_int.list_meter_features()
for f in features:
if (f["max_meter"] != 0 and f["band_types"] != 0 and
f["capabilities"] != 0 and f["max_bands"] != 0):
return True
return False
@property
def support_meter(self):
if self.SUPPORT_METER is None:
self.SUPPORT_METER = self.check_meter_features()
return self.SUPPORT_METER
def create_packet_rate_limit(self, port, rule):
self.update_packet_rate_limit(port, rule)
def update_packet_rate_limit(self, port, rule):
if not self.support_meter:
LOG.debug("Meter feature is not supported by ovs %s bridge",
self.br_int.br_name)
return
LOG.debug("Update packet rate limit for port: %s", port)
vif_port = port.get('vif_port')
if not vif_port:
port_id = port.get('port_id')
LOG.debug("update_packet_rate_limit was received for port %s but "
"vif_port was not found. It seems that port is already "
"deleted", port_id)
return
self.ports[port['port_id']][(qos_consts.RULE_TYPE_PACKET_RATE_LIMIT,
rule.direction)] = port
self._update_packet_rate_limit(vif_port, rule, rule.direction)
def delete_packet_rate_limit(self, port):
if not self.support_meter:
LOG.debug("Meter feature is not supported by ovs bridge")
return
self._delete_packet_rate_limit(port, constants.EGRESS_DIRECTION)
def delete_packet_rate_limit_ingress(self, port):
if not self.support_meter:
LOG.debug("Meter feature is not supported by ovs bridge")
return
self._delete_packet_rate_limit(port, constants.INGRESS_DIRECTION)
def _delete_packet_rate_limit(self, port, direction):
port_id = port.get('port_id')
LOG.debug("Delete %(direction)s packet rate limit for port %(port)s.",
{"direction": direction,
"port": port_id})
port = self.ports[port_id].pop(
(qos_consts.RULE_TYPE_PACKET_RATE_LIMIT, direction), None)
meter_id = self.meter_cache.remove_port_meter_id(
port_id, direction)
if direction == constants.INGRESS_DIRECTION:
port_name, mac, local_vlan = (
self.meter_cache.remove_port_info_ingress(port_id))
if mac is not None and local_vlan is not None:
self.br_int.remove_meter_from_port(
direction, mac, local_vlan=local_vlan)
if port_name is not None:
self.meter_cache.clean_port_meter_id_from_ovsdb(
port_name, port_id, direction)
else:
port_name, mac, ofport = (
self.meter_cache.remove_port_info_egress(port_id))
if mac is not None and ofport is not None:
self.br_int.remove_meter_from_port(
direction, mac, in_port=ofport)
if port_name is not None:
self.meter_cache.clean_port_meter_id_from_ovsdb(
port_name, port_id, direction)
if meter_id:
self.br_int.delete_meter(meter_id)
def _update_packet_rate_limit(self, vif_port, rule, direction):
port_name = vif_port.port_name
max_kpps = rule.max_kpps * 1000
max_burst_kpps = rule.max_burst_kpps * 1000 or 0
LOG.debug("Update port %(port)s %(direction)s packet rate limit "
"with rate: %(rate)s, burst: %(burst)s",
{"port": vif_port.vif_id,
"direction": direction,
"rate": rule.max_kpps,
"burst": rule.max_burst_kpps})
meter_id = self.meter_cache.load_port_meter_id(
port_name, vif_port.vif_id, direction)
if not meter_id:
meter_id = self.meter_cache.allocate_meter_id(
vif_port.vif_id, direction)
if not meter_id:
LOG.warning("Failed to retrieve and re-allocate meter id, "
"skipping updating port %(port)s "
"%(direction)s packet rate limit",
{"port": vif_port.vif_id,
"direction": direction})
return
self.meter_cache.store_port_meter_id_to_ovsdb(
port_name, vif_port.vif_id, direction, meter_id)
try:
self.br_int.create_meter(meter_id, max_kpps,
burst=max_burst_kpps)
except Exception:
self.br_int.update_meter(meter_id, max_kpps,
burst=max_burst_kpps)
local_vlan = self.br_int.get_port_tag_by_name(port_name)
if direction == constants.INGRESS_DIRECTION:
self.meter_cache.set_port_info_ingress(
vif_port.vif_id,
port_name, vif_port.vif_mac, local_vlan)
self.br_int.apply_meter_to_port(
meter_id, direction, vif_port.vif_mac,
local_vlan=local_vlan)
else:
self.meter_cache.set_port_info_egress(
vif_port.vif_id,
port_name, vif_port.vif_mac, vif_port.ofport)
self.br_int.apply_meter_to_port(
meter_id, direction, vif_port.vif_mac,
in_port=vif_port.ofport)
class QosOVSAgentDriver(qos.QosLinuxAgentDriver,
OVSPacketRatelimitDriver):
SUPPORTED_RULES = driver.SUPPORTED_RULES
@ -56,6 +292,7 @@ class QosOVSAgentDriver(qos.QosLinuxAgentDriver):
self.br_int = self.agent_api.request_int_br()
self.cookie = self.br_int.default_cookie
self._qos_bandwidth_initialize()
self.meter_cache = MeterRuleManager(self.br_int)
def create_bandwidth_limit(self, port, rule):
self.update_bandwidth_limit(port, rule)

View File

@ -36,6 +36,14 @@ SUPPORTED_RULES = {
qos_consts.DIRECTION: {
'type:values': constants.VALID_DIRECTIONS}
},
qos_consts.RULE_TYPE_PACKET_RATE_LIMIT: {
qos_consts.MAX_KPPS: {
'type:range': [0, db_consts.DB_INTEGER_MAX_VALUE]},
qos_consts.MAX_BURST_KPPS: {
'type:range': [0, db_consts.DB_INTEGER_MAX_VALUE]},
qos_consts.DIRECTION: {
'type:values': constants.VALID_DIRECTIONS}
},
qos_consts.RULE_TYPE_DSCP_MARKING: {
qos_consts.DSCP_MARK: {'type:values': constants.VALID_DSCP_MARKS}
},

View File

@ -51,11 +51,20 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
{'phys1': ovs_bridge.OVSAgentBridge(
'br-phys1', os_ken_app=os_ken_app)})
self.qos_driver.consume_api(self.agent_api)
mock.patch.object(
qos_driver.MeterRuleManager, '_init_max_meter_id').start()
self.qos_driver.initialize()
self.qos_driver.br_int = mock.Mock()
self.qos_driver.br_int.get_dp = mock.Mock(return_value=(mock.Mock(),
mock.Mock(),
mock.Mock()))
self.qos_driver.meter_cache.br_int = self.qos_driver.br_int
self.qos_driver.meter_cache.max_meter = 65535
self.qos_driver.br_int.list_meter_features = mock.Mock(
return_value=[{"max_meter": 65535,
"band_types": 2,
"capabilities": 15,
"max_bands": 8}])
self.qos_driver.br_int.get_egress_bw_limit_for_port = mock.Mock(
return_value=(1000, 10))
self.get_egress = self.qos_driver.br_int.get_egress_bw_limit_for_port
@ -68,12 +77,28 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
self.qos_driver.br_int.create_egress_bw_limit_for_port)
self.update_ingress = (
self.qos_driver.br_int.update_ingress_bw_limit_for_port)
self.apply_meter_to_port = (
self.qos_driver.br_int.apply_meter_to_port)
self.remove_meter_from_port = (
self.qos_driver.br_int.remove_meter_from_port)
self.delete_meter = (
self.qos_driver.br_int.delete_meter)
self.create_meter = (
self.qos_driver.br_int.create_meter)
self.update_meter = (
self.qos_driver.br_int.update_meter)
self.rules = [
self._create_bw_limit_rule_obj(constants.EGRESS_DIRECTION),
self._create_bw_limit_rule_obj(constants.INGRESS_DIRECTION),
self._create_pps_limit_rule_obj(constants.EGRESS_DIRECTION),
self._create_pps_limit_rule_obj(constants.INGRESS_DIRECTION),
self._create_dscp_marking_rule_obj()]
self.qos_policy = self._create_qos_policy_obj(self.rules)
self.port = self._create_fake_port(self.qos_policy.id)
self.qos_driver.br_int.get_port_tag_by_name = mock.Mock(
return_value=1)
def _create_bw_limit_rule_obj(self, direction):
rule_obj = rule.QosBandwidthLimitRule()
@ -91,6 +116,15 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
rule_obj.obj_reset_changes()
return rule_obj
def _create_pps_limit_rule_obj(self, direction):
rule_obj = rule.QosPacketRateLimitRule()
rule_obj.id = uuidutils.generate_uuid()
rule_obj.max_kpps = 2000
rule_obj.max_burst_kpps = 200
rule_obj.direction = direction
rule_obj.obj_reset_changes()
return rule_obj
def _create_qos_policy_obj(self, rules):
policy_dict = {'id': uuidutils.generate_uuid(),
'project_id': uuidutils.generate_uuid(),
@ -108,17 +142,24 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
def _create_fake_port(self, policy_id):
self.port_name = 'fakeport'
port_id = uuidutils.generate_uuid()
class FakeVifPort(object):
port_name = self.port_name
ofport = 111
vif_id = port_id
vif_mac = "aa:bb:cc:dd:ee:ff"
return {'vif_port': FakeVifPort(),
'qos_policy_id': policy_id,
'qos_network_policy_id': None,
'port_id': uuidutils.generate_uuid(),
'port_id': port_id,
'device_owner': uuidutils.generate_uuid()}
def test_create_new_rules(self):
self.qos_driver.br_int.get_value_from_other_config = mock.Mock()
self.qos_driver.br_int.set_value_to_other_config = mock.Mock()
self.qos_driver.br_int.get_egress_bw_limit_for_port = mock.Mock(
return_value=(None, None))
self.qos_driver.create(self.port, self.qos_policy)
@ -132,6 +173,19 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
self.rules[1].max_burst_kbps)
self._assert_dscp_rule_create_updated()
self.create_meter.assert_has_calls(
[mock.call(mock.ANY, self.rules[2].max_kpps * 1000,
burst=self.rules[2].max_burst_kpps * 1000),
mock.call(mock.ANY, self.rules[3].max_kpps * 1000,
burst=self.rules[3].max_burst_kpps * 1000)])
self.apply_meter_to_port.assert_has_calls(
[mock.call(mock.ANY, constants.EGRESS_DIRECTION,
"aa:bb:cc:dd:ee:ff",
in_port=111),
mock.call(mock.ANY, constants.INGRESS_DIRECTION,
"aa:bb:cc:dd:ee:ff",
local_vlan=1)])
def test_create_existing_rules(self):
self.qos_driver.create(self.port, self.qos_policy)
self._assert_rules_create_updated()
@ -149,12 +203,24 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
self.create_egress.assert_not_called()
self.update_ingress.assert_not_called()
self.create_meter.assert_not_called()
self.apply_meter_to_port.assert_not_called()
def _test_delete_rules(self, qos_policy):
self.qos_driver.create(self.port, qos_policy)
self.qos_driver.delete(self.port, qos_policy)
self.delete_egress.assert_called_once_with(self.port_name)
self.delete_ingress.assert_called_once_with(self.port_name)
self.assertEqual(2, self.delete_meter.call_count)
self.remove_meter_from_port.assert_has_calls(
[mock.call(constants.EGRESS_DIRECTION,
"aa:bb:cc:dd:ee:ff",
in_port=111),
mock.call(constants.INGRESS_DIRECTION,
"aa:bb:cc:dd:ee:ff",
local_vlan=1)])
def _test_delete_rules_no_policy(self):
self.qos_driver.delete(self.port)
self.delete_egress.assert_called_once_with(self.port_name)
@ -172,6 +238,10 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
self.qos_driver.delete(port, self.qos_policy)
self.delete_egress.assert_not_called()
self.delete_ingress.assert_not_called()
self.delete_meter.assert_not_called()
self.delete_meter.assert_not_called()
self.remove_meter_from_port.assert_not_called()
def _assert_rules_create_updated(self):
self.create_egress.assert_called_once_with(
@ -181,6 +251,19 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
self.port_name, self.rules[1].max_kbps,
self.rules[1].max_burst_kbps)
self.create_meter.assert_has_calls(
[mock.call(mock.ANY, self.rules[2].max_kpps * 1000,
burst=self.rules[2].max_burst_kpps * 1000),
mock.call(mock.ANY, self.rules[3].max_kpps * 1000,
burst=self.rules[3].max_burst_kpps * 1000)])
self.apply_meter_to_port.assert_has_calls(
[mock.call(mock.ANY, constants.EGRESS_DIRECTION,
"aa:bb:cc:dd:ee:ff",
in_port=111),
mock.call(mock.ANY, constants.INGRESS_DIRECTION,
"aa:bb:cc:dd:ee:ff",
local_vlan=1)])
def _assert_dscp_rule_create_updated(self):
# Assert install_instructions is the last call
self.assertEqual(