Improve validation of supported QoS rules

New way of validation of QoS policy can be applied on port/network based on
port_types. It can validate if port can support rule by rule_type and also by
rule parameters or some values of parameters.

For example bandwidth_limit_rule can have parameter "direction" with
possible values "ingress" and "egress". Some of mechanism drivers can support
only one of those directions.

If user will try to apply policy with rule unsupported by port_type then it
will get error message and policy will not be applied.
Such validation is made during:
* port create
* port update
* network update
* QoS rule create
* QoS rule update

Co-Authored-By: Miguel Angel Ajo <majopela@redhat.com>

Implements blueprint qos-rules-validation
Closes-Bug: #1586056

Change-Id: I75bd18b3a1875daa5639dd141fb7bbd6e1c54118
This commit is contained in:
Sławek Kapłoński 2017-01-30 21:56:38 +00:00
parent bd13a42365
commit e2226fe373
11 changed files with 663 additions and 24 deletions

View File

@ -111,6 +111,10 @@ class InvalidAllocationPool(e.BadRequest):
message = _("The allocation pool %(pool)s is not valid.")
class QosRuleNotSupported(e.Conflict):
message = _("Rule %(rule_type)s is not supported by port %(port_id)s")
class UnsupportedPortDeviceOwner(e.Conflict):
message = _("Operation %(op)s is not supported for device_owner "
"%(device_owner)s on port %(port_id)s.")

View File

@ -14,10 +14,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api import validators as lib_validators
from oslo_log import log as logging
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.services.qos import qos_consts
LOG = logging.getLogger(__name__)
@registry.has_registry_receivers
class DriverBase(object):
@ -64,7 +69,34 @@ class DriverBase(object):
return vnic_type in self.vnic_types
def is_rule_supported(self, rule):
return rule.rule_type in self.supported_rules
supported_parameters = self.supported_rules.get(rule.rule_type)
if not supported_parameters:
LOG.debug("Rule type %(rule_type)s is not supported by "
"%(driver_name)s",
{'rule_type': rule.rule_type,
'driver_name': self.name})
return False
for parameter, validators in supported_parameters.items():
parameter_value = rule.get(parameter)
for validator_type, validator_data in validators.items():
validator_function = lib_validators.get_validator(
validator_type)
validate_result = validator_function(parameter_value,
validator_data)
# NOTE(slaweq): validator functions returns None if data is
# valid or string with reason why data is not valid
if validate_result:
LOG.debug("Parameter %(parameter)s=%(value)s in "
"rule type %(rule_type)s is not "
"supported by %(driver_name)s. "
"Validate result: %(validate_result)s",
{'parameter': parameter,
'value': parameter_value,
'rule_type': rule.rule_type,
'driver_name': self.name,
'validate_result': validate_result})
return False
return True
def create_policy(self, context, policy):
"""Create policy invocation.

View File

@ -16,6 +16,7 @@
from neutron_lib.api.definitions import portbindings
from oslo_log import log as logging
from neutron.common import constants
from neutron.services.qos.drivers import base
from neutron.services.qos import qos_consts
@ -23,8 +24,17 @@ LOG = logging.getLogger(__name__)
DRIVER = None
SUPPORTED_RULES = [qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
qos_consts.RULE_TYPE_DSCP_MARKING]
SUPPORTED_RULES = {
qos_consts.RULE_TYPE_BANDWIDTH_LIMIT: {
qos_consts.MAX_KBPS: {
'type:range': [0, constants.DB_INTEGER_MAX_VALUE]},
qos_consts.MAX_BURST: {
'type:range': [0, constants.DB_INTEGER_MAX_VALUE]}
},
qos_consts.RULE_TYPE_DSCP_MARKING: {
qos_consts.DSCP_MARK: {'type:values': constants.VALID_DSCP_MARKS}
}
}
class LinuxBridgeDriver(base.DriverBase):

View File

@ -10,10 +10,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import portbindings
from oslo_log import log as logging
from oslo_utils import excutils
from neutron._i18n import _LE, _LW
from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.callbacks.producer import registry as rpc_registry
@ -30,6 +30,12 @@ qos_mgr.register_qos_plugin_opts()
LOG = logging.getLogger(__name__)
SKIPPED_VIF_TYPES = [
portbindings.VIF_TYPE_UNBOUND,
portbindings.VIF_TYPE_BINDING_FAILED
]
class QosServiceDriverManager(object):
def __init__(self, enable_rpc=False):
@ -61,6 +67,28 @@ class QosServiceDriverManager(object):
policy = policy_object.QosPolicy.get_object(context, id=policy_id)
return policy
@staticmethod
def _validate_vnic_type(driver, vnic_type, port_id):
if driver.is_vnic_compatible(vnic_type):
return True
LOG.debug("vnic_type %(vnic_type)s of port %(port_id)s "
"is not compatible with QoS driver %(driver)s",
{'vnic_type': vnic_type,
'port_id': port_id,
'driver': driver.name})
return False
@staticmethod
def _validate_vif_type(driver, vif_type, port_id):
if driver.is_vif_type_compatible(vif_type):
return True
LOG.debug("vif_type %(vif_type)s of port %(port_id)s "
"is not compatible with QoS driver %(driver)s",
{'vif_type': vif_type,
'port_id': port_id,
'driver': driver.name})
return False
def call(self, method_name, *args, **kwargs):
"""Helper method for calling a method across all extension drivers."""
for driver in self._drivers:
@ -93,6 +121,22 @@ class QosServiceDriverManager(object):
self._drivers.append(driver)
self.rpc_notifications_required |= driver.requires_rpc_notifications
def validate_rule_for_port(self, rule, port):
for driver in self._drivers:
vif_type = port.binding.vif_type
if vif_type not in SKIPPED_VIF_TYPES:
if not self._validate_vif_type(driver, vif_type, port['id']):
continue
else:
vnic_type = port.binding.vnic_type
if not self._validate_vnic_type(driver, vnic_type, port['id']):
continue
if driver.is_rule_supported(rule):
return True
return False
@property
def supported_rule_types(self):
if not self._drivers:

View File

@ -16,6 +16,7 @@
from neutron_lib.api.definitions import portbindings
from oslo_log import log as logging
from neutron.common import constants
from neutron.services.qos.drivers import base
from neutron.services.qos import qos_consts
@ -23,8 +24,17 @@ LOG = logging.getLogger(__name__)
DRIVER = None
SUPPORTED_RULES = [qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
qos_consts.RULE_TYPE_DSCP_MARKING]
SUPPORTED_RULES = {
qos_consts.RULE_TYPE_BANDWIDTH_LIMIT: {
qos_consts.MAX_KBPS: {
'type:range': [0, constants.DB_INTEGER_MAX_VALUE]},
qos_consts.MAX_BURST: {
'type:range': [0, constants.DB_INTEGER_MAX_VALUE]}
},
qos_consts.RULE_TYPE_DSCP_MARKING: {
qos_consts.DSCP_MARK: {'type:values': constants.VALID_DSCP_MARKS}
}
}
class OVSDriver(base.DriverBase):

View File

@ -16,6 +16,7 @@
from neutron_lib.api.definitions import portbindings
from oslo_log import log as logging
from neutron.common import constants
from neutron.services.qos.drivers import base
from neutron.services.qos import qos_consts
@ -23,8 +24,19 @@ LOG = logging.getLogger(__name__)
DRIVER = None
SUPPORTED_RULES = [qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH]
SUPPORTED_RULES = {
qos_consts.RULE_TYPE_BANDWIDTH_LIMIT: {
qos_consts.MAX_KBPS: {
'type:range': [0, constants.DB_INTEGER_MAX_VALUE]},
qos_consts.MAX_BURST: {
'type:range': [0, constants.DB_INTEGER_MAX_VALUE]}
},
qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH: {
qos_consts.MIN_KBPS: {
'type:range': [0, constants.DB_INTEGER_MAX_VALUE]},
qos_consts.DIRECTION: {'type:values': constants.VALID_DIRECTIONS}
}
}
class SRIOVNICSwitchDriver(base.DriverBase):

View File

@ -21,6 +21,13 @@ VALID_RULE_TYPES = [RULE_TYPE_BANDWIDTH_LIMIT,
RULE_TYPE_MINIMUM_BANDWIDTH,
]
# Names of rules' attributes
MAX_KBPS = "max_kbps"
MAX_BURST = "max_burst_kbps"
MIN_KBPS = "min_kbps"
DIRECTION = "direction"
DSCP_MARK = "dscp_mark"
QOS_POLICY_ID = 'qos_policy_id'
QOS_PLUGIN = 'qos_plugin'

View File

@ -13,11 +13,16 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.callbacks import events as callbacks_events
from neutron.callbacks import registry as callbacks_registry
from neutron.callbacks import resources as callbacks_resources
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db import db_base_plugin_common
from neutron.extensions import qos
from neutron.objects import base as base_obj
from neutron.objects import network as network_object
from neutron.objects import ports as ports_object
from neutron.objects.qos import policy as policy_object
from neutron.objects.qos import rule_type as rule_type_object
from neutron.services.qos.drivers import manager
@ -47,6 +52,101 @@ class QoSPlugin(qos.QoSPluginBase):
self.driver_manager = manager.QosServiceDriverManager(enable_rpc=(
self.notification_driver_manager.has_message_queue_driver))
callbacks_registry.subscribe(
self._validate_create_port_callback,
callbacks_resources.PORT,
callbacks_events.PRECOMMIT_CREATE)
callbacks_registry.subscribe(
self._validate_update_port_callback,
callbacks_resources.PORT,
callbacks_events.PRECOMMIT_UPDATE)
callbacks_registry.subscribe(
self._validate_update_network_callback,
callbacks_resources.NETWORK,
callbacks_events.PRECOMMIT_UPDATE)
def _get_ports_with_policy(self, context, policy):
networks_ids = policy.get_bound_networks()
ports_with_net_policy = ports_object.Port.get_objects(
context, network_id=networks_ids)
# Filter only this ports which don't have overwritten policy
ports_with_net_policy = [
port for port in ports_with_net_policy if
port.qos_policy_id is None
]
ports_ids = policy.get_bound_ports()
ports_with_policy = ports_object.Port.get_objects(
context, id=ports_ids)
return list(set(ports_with_policy + ports_with_net_policy))
def _validate_create_port_callback(self, resource, event, trigger,
**kwargs):
context = kwargs['context']
port_id = kwargs['port']['id']
port = ports_object.Port.get_object(context, id=port_id)
network = network_object.Network.get_object(context,
id=port.network_id)
policy_id = port.qos_policy_id or network.qos_policy_id
if policy_id is None:
return
policy = policy_object.QosPolicy.get_object(context, id=policy_id)
self.validate_policy_for_port(policy, port)
def _validate_update_port_callback(self, resource, event, trigger,
**kwargs):
context = kwargs['context']
original_policy_id = kwargs['original_port'].get(
qos_consts.QOS_POLICY_ID)
policy_id = kwargs['port'].get(qos_consts.QOS_POLICY_ID)
if policy_id is None or policy_id == original_policy_id:
return
updated_port = ports_object.Port.get_object(
context, id=kwargs['port']['id'])
policy = policy_object.QosPolicy.get_object(context, id=policy_id)
self.validate_policy_for_port(policy, updated_port)
def _validate_update_network_callback(self, resource, event, trigger,
**kwargs):
context = kwargs['context']
original_network = kwargs['original_network']
updated_network = kwargs['network']
original_policy_id = original_network.get(qos_consts.QOS_POLICY_ID)
policy_id = updated_network.get(qos_consts.QOS_POLICY_ID)
if policy_id is None or policy_id == original_policy_id:
return
policy = policy_object.QosPolicy.get_object(context, id=policy_id)
ports = ports_object.Port.get_objects(
context, network_id=updated_network['id'])
# Filter only this ports which don't have overwritten policy
ports = [
port for port in ports if port.qos_policy_id is None
]
self.validate_policy_for_ports(policy, ports)
def validate_policy(self, context, policy):
ports = self._get_ports_with_policy(context, policy)
self.validate_policy_for_ports(policy, ports)
def validate_policy_for_ports(self, policy, ports):
for port in ports:
self.validate_policy_for_port(policy, port)
def validate_policy_for_port(self, policy, port):
for rule in policy.rules:
if not self.driver_manager.validate_rule_for_port(rule, port):
raise n_exc.QosRuleNotSupported(rule_type=rule.rule_type,
port_id=port['id'])
@db_base_plugin_common.convert_result_to_dict
def create_policy(self, context, policy):
"""Create a QoS policy.
@ -203,6 +303,7 @@ class QoSPlugin(qos.QoSPluginBase):
rule = rule_cls(context, qos_policy_id=policy_id, **rule_data)
rule.create()
policy.reload_rules()
self.validate_policy(context, policy)
self.driver_manager.call('update_policy', context, policy)
@ -241,6 +342,7 @@ class QoSPlugin(qos.QoSPluginBase):
rule.update_fields(rule_data, reset_changes=True)
rule.update()
policy.reload_rules()
self.validate_policy(context, policy)
self.driver_manager.call('update_policy', context, policy)

View File

@ -0,0 +1,91 @@
# Copyright 2017 OVH SAS
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import portbindings
from neutron_lib import context
from oslo_utils import uuidutils
from neutron.common import constants
from neutron.objects.qos import rule as rule_object
from neutron.services.qos.drivers import base as qos_base_driver
from neutron.services.qos import qos_consts
from neutron.tests import base
SUPPORTED_RULES = {
qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH: {
"min_kbps": {'type:values': None},
'direction': {'type:values': [constants.EGRESS_DIRECTION]}
}
}
class FakeDriver(qos_base_driver.DriverBase):
@staticmethod
def create():
return FakeDriver(
name='fake_driver',
vif_types=[portbindings.VIF_TYPE_OVS],
vnic_types=[portbindings.VNIC_NORMAL],
supported_rules=SUPPORTED_RULES,
requires_rpc_notifications=False)
class TestDriverBase(base.BaseTestCase):
def setUp(self):
super(TestDriverBase, self).setUp()
self.driver = FakeDriver.create()
self.rule_data = {
'minimum_bandwidth_rule': {
'id': uuidutils.generate_uuid(),
'min_kbps': 100,
'direction': constants.EGRESS_DIRECTION
},
'dscp_marking_rule': {
'id': uuidutils.generate_uuid(),
'dscp_mark': 16
}
}
ctxt = context.Context('fake_user', 'fake_tenant')
self.minimum_bandwidth_rule = rule_object.QosMinimumBandwidthRule(
ctxt, **self.rule_data['minimum_bandwidth_rule'])
self.dscp_rule = rule_object.QosDscpMarkingRule(
ctxt, **self.rule_data['dscp_marking_rule'])
def test_is_vif_type_compatible(self):
self.assertFalse(
self.driver.is_vif_type_compatible(portbindings.VIF_TYPE_OTHER))
self.assertTrue(
self.driver.is_vif_type_compatible(portbindings.VIF_TYPE_OVS))
def test_is_vnic_compatible(self):
self.assertFalse(
self.driver.is_vnic_compatible(portbindings.VNIC_BAREMETAL))
self.assertTrue(
self.driver.is_vnic_compatible(portbindings.VNIC_NORMAL))
def test_is_rule_supported(self):
# Rule which is in SUPPORTED_RULES should be supported
self.assertTrue(
self.driver.is_rule_supported(self.minimum_bandwidth_rule))
# Rule which is not in SUPPORTED_RULES should not be supported
self.assertFalse(self.driver.is_rule_supported(self.dscp_rule))
# Rule which is in SUPPORTED_RULES but got not supported parameter
# should not be supported
self.minimum_bandwidth_rule.direction = constants.INGRESS_DIRECTION
self.assertFalse(
self.driver.is_rule_supported(self.minimum_bandwidth_rule))

View File

@ -10,9 +10,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib.api.definitions import portbindings
from neutron_lib import context
from oslo_config import cfg
from oslo_utils import uuidutils
from neutron.common import constants
from neutron.conf.services import qos_driver_manager as notif_driver_mgr_config
from neutron.objects import ports as ports_object
from neutron.objects.qos import rule as rule_object
from neutron.services.qos.drivers import base as qos_driver_base
from neutron.services.qos.drivers import manager as driver_mgr
from neutron.services.qos import qos_consts
@ -69,26 +76,135 @@ class TestQosDriversManagerMulti(TestQosDriversManagerBase):
self.assertEqual(len(driver_manager._drivers), 2)
class TestQoSDriversRulesValidations(TestQosDriversManagerBase):
"""Test validation of rules for port"""
def setUp(self):
super(TestQoSDriversRulesValidations, self).setUp()
self.ctxt = context.Context('fake_user', 'fake_tenant')
def _get_port(self, vif_type, vnic_type):
port_id = uuidutils.generate_uuid()
port_binding = ports_object.PortBinding(
self.ctxt, port_id=port_id, vif_type=vif_type, vnic_type=vnic_type)
return ports_object.Port(
self.ctxt, id=uuidutils.generate_uuid(), binding=port_binding)
def _test_validate_rule_for_port(self, port, expected_result):
driver_manager = self._create_manager_with_drivers({
'driver-A': {
'is_loaded': True,
'rules': {
qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH: {
"min_kbps": {'type:values': None},
'direction': {
'type:values': constants.VALID_DIRECTIONS}
}
},
'vif_types': [portbindings.VIF_TYPE_OVS],
'vnic_types': [portbindings.VNIC_NORMAL]
}
})
rule = rule_object.QosMinimumBandwidthRule(
self.ctxt, id=uuidutils.generate_uuid())
is_rule_supported_mock = mock.Mock()
if expected_result:
is_rule_supported_mock.return_value = expected_result
driver_manager._drivers[0].is_rule_supported = is_rule_supported_mock
self.assertEqual(expected_result,
driver_manager.validate_rule_for_port(rule, port))
if expected_result:
is_rule_supported_mock.assert_called_once_with(rule)
else:
is_rule_supported_mock.assert_not_called()
def test_validate_rule_for_port_rule_vif_type_supported(self):
port = self._get_port(
portbindings.VIF_TYPE_OVS, portbindings.VNIC_NORMAL)
self._test_validate_rule_for_port(
port, expected_result=True)
def test_validate_rule_for_port_vif_type_not_supported(self):
port = self._get_port(
portbindings.VIF_TYPE_OTHER, portbindings.VNIC_NORMAL)
self._test_validate_rule_for_port(
port, expected_result=False)
def test_validate_rule_for_port_unbound_vnic_type_supported(self):
port = self._get_port(
portbindings.VIF_TYPE_UNBOUND, portbindings.VNIC_NORMAL)
self._test_validate_rule_for_port(
port, expected_result=True)
def test_validate_rule_for_port_unbound_vnic_type_not_supported(self):
port = self._get_port(
portbindings.VIF_TYPE_UNBOUND, portbindings.VNIC_BAREMETAL)
self._test_validate_rule_for_port(
port, expected_result=False)
class TestQosDriversManagerRules(TestQosDriversManagerBase):
"""Test supported rules"""
def test_available_rules_one_in_common(self):
driver_manager = self._create_manager_with_drivers(
{'driver-A': {'is_loaded': True,
'rules': [qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH]},
'driver-B': {'is_loaded': True,
'rules': [qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH,
qos_consts.RULE_TYPE_DSCP_MARKING]}
})
driver_manager = self._create_manager_with_drivers({
'driver-A': {
'is_loaded': True,
'rules': {
qos_consts.RULE_TYPE_BANDWIDTH_LIMIT: {
"max_kbps": {'type:values': None},
"max_burst_kbps": {'type:values': None}
},
qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH: {
"min_kbps": {'type:values': None},
'direction': {
'type:values': constants.VALID_DIRECTIONS}
}
}
},
'driver-B': {
'is_loaded': True,
'rules': {
qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH: {
"min_kbps": {'type:values': None},
'direction': {
'type:values': constants.VALID_DIRECTIONS}
},
qos_consts.RULE_TYPE_DSCP_MARKING: {
"dscp_mark": {
'type:values': constants.VALID_DSCP_MARKS}
}
}
}
})
self.assertEqual(driver_manager.supported_rule_types,
set([qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH]))
def test_available_rules_no_rule_in_common(self):
driver_manager = self._create_manager_with_drivers(
{'driver-A': {'is_loaded': True,
'rules': [qos_consts.RULE_TYPE_BANDWIDTH_LIMIT]},
'driver-B': {'is_loaded': True,
'rules': [qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH,
qos_consts.RULE_TYPE_DSCP_MARKING]}
})
driver_manager = self._create_manager_with_drivers({
'driver-A': {
'is_loaded': True,
'rules': {
qos_consts.RULE_TYPE_BANDWIDTH_LIMIT: {
"max_kbps": {'type:values': None},
"max_burst_kbps": {'type:values': None}
}
}
},
'driver-B': {
'is_loaded': True,
'rules': {
qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH: {
"min_kbps": {'type:values': None},
'direction': {
'type:values': constants.VALID_DIRECTIONS}
},
qos_consts.RULE_TYPE_DSCP_MARKING: {
"dscp_mark": {
'type:values': constants.VALID_DSCP_MARKS}
}
}
}
})
self.assertEqual(driver_manager.supported_rule_types, set([]))

View File

@ -101,6 +101,214 @@ class TestQosPlugin(base.BaseQosTestCase):
policy_object.QosPolicy
)
def test_get_ports_with_policy(self):
network_ports = [
mock.MagicMock(qos_policy_id=None),
mock.MagicMock(qos_policy_id=uuidutils.generate_uuid()),
mock.MagicMock(qos_policy_id=None)
]
ports = [
mock.MagicMock(qos_policy_id=self.policy.id),
]
expected_network_ports = [
port for port in network_ports if port.qos_policy_id is None]
expected_ports = ports + expected_network_ports
with mock.patch(
'neutron.objects.ports.Port.get_objects',
side_effect=[network_ports, ports]
), mock.patch.object(
self.policy, "get_bound_networks"
), mock.patch.object(
self.policy, "get_bound_ports"
):
policy_ports = self.qos_plugin._get_ports_with_policy(
self.ctxt, self.policy)
self.assertEqual(
len(expected_ports), len(policy_ports))
for port in expected_ports:
self.assertIn(port, policy_ports)
def _test_validate_create_port_callback(self, policy_id=None,
network_policy_id=None):
port_id = uuidutils.generate_uuid()
kwargs = {
"context": self.ctxt,
"port": {"id": port_id}
}
port_mock = mock.MagicMock(id=port_id, qos_policy_id=policy_id)
network_mock = mock.MagicMock(
id=uuidutils.generate_uuid(), qos_policy_id=network_policy_id)
policy_mock = mock.MagicMock(id=policy_id)
expected_policy_id = policy_id or network_policy_id
with mock.patch(
'neutron.objects.ports.Port.get_object',
return_value=port_mock
), mock.patch(
'neutron.objects.network.Network.get_object',
return_value=network_mock
), mock.patch(
'neutron.objects.qos.policy.QosPolicy.get_object',
return_value=policy_mock
) as get_policy, mock.patch.object(
self.qos_plugin, "validate_policy_for_port"
) as validate_policy_for_port:
self.qos_plugin._validate_create_port_callback(
"PORT", "precommit_create", "test_plugin", **kwargs)
if policy_id or network_policy_id:
get_policy.assert_called_once_with(self.ctxt,
id=expected_policy_id)
validate_policy_for_port.assert_called_once_with(policy_mock,
port_mock)
else:
get_policy.assert_not_called()
validate_policy_for_port.assert_not_called()
def test_validate_create_port_callback_policy_on_port(self):
self._test_validate_create_port_callback(
policy_id=uuidutils.generate_uuid())
def test_validate_create_port_callback_policy_on_port_and_network(self):
self._test_validate_create_port_callback(
policy_id=uuidutils.generate_uuid(),
network_policy_id=uuidutils.generate_uuid())
def test_validate_create_port_callback_policy_on_network(self):
self._test_validate_create_port_callback(
network_policy_id=uuidutils.generate_uuid())
def test_validate_create_port_callback_no_policy(self):
self._test_validate_create_port_callback()
def _test_validate_update_port_callback(self, policy_id=None,
original_policy_id=None):
port_id = uuidutils.generate_uuid()
kwargs = {
"context": self.ctxt,
"port": {
"id": port_id,
qos_consts.QOS_POLICY_ID: policy_id
},
"original_port": {
"id": port_id,
qos_consts.QOS_POLICY_ID: original_policy_id
}
}
port_mock = mock.MagicMock(id=port_id, qos_policy_id=policy_id)
policy_mock = mock.MagicMock(id=policy_id)
with mock.patch(
'neutron.objects.ports.Port.get_object',
return_value=port_mock
) as get_port, mock.patch(
'neutron.objects.qos.policy.QosPolicy.get_object',
return_value=policy_mock
) as get_policy, mock.patch.object(
self.qos_plugin, "validate_policy_for_port"
) as validate_policy_for_port:
self.qos_plugin._validate_update_port_callback(
"PORT", "precommit_update", "test_plugin", **kwargs)
if policy_id is None or policy_id == original_policy_id:
get_port.assert_not_called()
get_policy.assert_not_called()
validate_policy_for_port.assert_not_called()
else:
get_port.assert_called_once_with(self.ctxt, id=port_id)
get_policy.assert_called_once_with(self.ctxt, id=policy_id)
validate_policy_for_port.assert_called_once_with(policy_mock,
port_mock)
def test_validate_update_port_callback_policy_changed(self):
self._test_validate_update_port_callback(
policy_id=uuidutils.generate_uuid())
def test_validate_update_port_callback_policy_not_changed(self):
policy_id = uuidutils.generate_uuid()
self._test_validate_update_port_callback(
policy_id=policy_id, original_policy_id=policy_id)
def test_validate_update_port_callback_policy_removed(self):
self._test_validate_update_port_callback(
policy_id=None, original_policy_id=uuidutils.generate_uuid())
def _test_validate_update_network_callback(self, policy_id=None,
original_policy_id=None):
network_id = uuidutils.generate_uuid()
kwargs = {
"context": self.ctxt,
"network": {
"id": network_id,
qos_consts.QOS_POLICY_ID: policy_id
},
"original_network": {
"id": network_id,
qos_consts.QOS_POLICY_ID: original_policy_id
}
}
port_mock_with_own_policy = mock.MagicMock(
id=uuidutils.generate_uuid(),
qos_policy_id=uuidutils.generate_uuid())
port_mock_without_own_policy = mock.MagicMock(
id=uuidutils.generate_uuid(), qos_policy_id=None)
ports = [port_mock_with_own_policy, port_mock_without_own_policy]
policy_mock = mock.MagicMock(id=policy_id)
with mock.patch(
'neutron.objects.ports.Port.get_objects',
return_value=ports
) as get_ports, mock.patch(
'neutron.objects.qos.policy.QosPolicy.get_object',
return_value=policy_mock
) as get_policy, mock.patch.object(
self.qos_plugin, "validate_policy_for_ports"
) as validate_policy_for_ports:
self.qos_plugin._validate_update_network_callback(
"NETWORK", "precommit_update", "test_plugin", **kwargs)
if policy_id is None or policy_id == original_policy_id:
get_policy.assert_not_called()
get_ports.assert_not_called()
validate_policy_for_ports.assert_not_called()
else:
get_policy.assert_called_once_with(self.ctxt, id=policy_id)
get_ports.assert_called_once_with(self.ctxt,
network_id=network_id)
validate_policy_for_ports.assert_called_once_with(
policy_mock, [port_mock_without_own_policy])
def test_validate_update_network_callback_policy_changed(self):
self._test_validate_update_network_callback(
policy_id=uuidutils.generate_uuid())
def test_validate_update_network_callback_policy_not_changed(self):
policy_id = uuidutils.generate_uuid()
self._test_validate_update_network_callback(
policy_id=policy_id, original_policy_id=policy_id)
def test_validate_update_network_callback_policy_removed(self):
self._test_validate_update_network_callback(
policy_id=None, original_policy_id=uuidutils.generate_uuid())
def test_validate_policy_for_port_rule_not_valid(self):
port = {'id': uuidutils.generate_uuid()}
with mock.patch.object(
self.qos_plugin.driver_manager, "validate_rule_for_port",
return_value=False
):
self.policy.rules = [self.rule]
self.assertRaises(
n_exc.QosRuleNotSupported,
self.qos_plugin.validate_policy_for_port,
self.policy, port)
def test_validate_policy_for_port_all_rules_valid(self):
port = {'id': uuidutils.generate_uuid()}
with mock.patch.object(
self.qos_plugin.driver_manager, "validate_rule_for_port",
return_value=True
):
self.policy.rules = [self.rule]
try:
self.qos_plugin.validate_policy_for_port(self.policy, port)
except n_exc.QosRuleNotSupported:
self.fail("QosRuleNotSupported exception unexpectedly raised")
@mock.patch(
'neutron.objects.rbac_db.RbacNeutronDbObjectMixin'
'.create_rbac_policy')
@ -449,8 +657,10 @@ class TestQosPlugin(base.BaseQosTestCase):
self.assertTrue(mock_manager.mock_calls.index(policy_mock_call) <
mock_manager.mock_calls.index(notify_mock_call))
@mock.patch('neutron.objects.ports.Port')
@mock.patch('neutron.objects.qos.policy.QosPolicy')
def test_rule_notification_and_driver_ordering(self, qos_policy_mock):
def test_rule_notification_and_driver_ordering(self, qos_policy_mock,
port_mock):
rule_cls_mock = mock.Mock()
rule_cls_mock.rule_type = 'fake'
@ -468,6 +678,7 @@ class TestQosPlugin(base.BaseQosTestCase):
mock_manager = mock.Mock()
mock_manager.attach_mock(qos_policy_mock, 'QosPolicy')
mock_manager.attach_mock(port_mock, 'Port')
mock_manager.attach_mock(rule_cls_mock, 'RuleCls')
mock_manager.attach_mock(self.qos_plugin.notification_driver_manager,
'notification_driver')