Merge remote-tracking branch 'origin/feature/qos' into merge-branch

Change-Id: I683102e617202e0ffc953a0d3cc179879f8faf82
This commit is contained in:
Ihar Hrachyshka 2015-08-05 17:15:40 +02:00
commit cc0ae6dd49
102 changed files with 5407 additions and 20 deletions

View File

@ -2,3 +2,4 @@
host=review.openstack.org
port=29418
project=openstack/neutron.git
defaultbranch=feature/qos

View File

@ -48,8 +48,10 @@ Neutron Internals
plugin-api
db_layer
rpc_api
rpc_callbacks
layer3
l2_agents
quality_of_service
advanced_services
oslo-incubator
callbacks

View File

@ -0,0 +1,262 @@
==================
Quality of Service
==================
Quality of Service advanced service is designed as a service plugin. The
service is decoupled from the rest of Neutron code on multiple levels (see
below).
QoS is the first service/api extension to extend core resources (ports,
networks) without using mixins inherited from plugins.
Details about the DB models, API extension, and use cases can be found here: `qos spec <http://specs.openstack.org/openstack/neutron-specs/specs/liberty/qos-api-extension.html>`_
.
Service side design
===================
* neutron.extensions.qos:
base extension + API controller definition.
* neutron.services.qos.qos_plugin:
QoSPlugin, service plugin that implements 'qos' extension, receiving and
handling API calls to create/modify policies and rules. It also handles core
plugin requests to associate ports and networks with a QoS policy.
* neutron.services.qos.drivers.qos_base:
the interface class for server-side QoS backend which will receive {create,
update, delete} events on any rule change.
* neutron.services.qos.drivers.rpc.mq_qos:
message queue based reference backend driver which provides messaging
notifications to any interested agent, using `RPC callbacks <rpc_callbacks.html>`_.
Supported QoS rule types
------------------------
Any plugin or Ml2 mechanism driver can claim support for some QoS rule types by
providing a plugin/driver class property called 'supported_qos_rule_types' that
should return a list of strings that correspond to QoS rule types (for the list
of all rule types, see: neutron.extensions.qos.VALID_RULE_TYPES).
In the most simple case, the property can be represented by a simple Python
list defined on the class.
For Ml2 plugin, the list of supported QoS rule types is defined as a common
subset of rules supported by all active mechanism drivers.
QoS resources
-------------
QoS design defines the following two conceptual resources to define QoS rules
for a port or a network:
* QoS policy
* QoS rule (type specific)
Each QoS policy contains zero or more QoS rules. A policy is then applied to a
network or a port, making all rules of the policy applied to the corresponding
Neutron resource (for a network, applying a policy means that the policy will
be applied to all ports that belong to it).
From database point of view, following objects are defined in schema:
* QosPolicy: directly maps to the conceptual policy resource.
* QosNetworkPolicyBinding, QosPortPolicyBinding: defines attachment between a
Neutron resource and a QoS policy.
* QosBandwidthLimitRule: defines the only rule type available at the moment.
All database models are defined under:
* neutron.db.qos.models
There is a long history of passing database dictionaries directly into business
logic of Neutron. This path is not the one we wanted to take for QoS effort, so
we've also introduced a new objects middleware to encapsulate the database logic
from the rest of the Neutron code that works with QoS resources. For this, we've
adopted oslo.versionedobjects library and introduced a new NeutronObject class
that is a base for all other objects that will belong to the middle layer.
There is an expectation that Neutron will evolve into using objects for all
resources it handles, though that part is obviously out of scope for the QoS
effort.
Every NeutronObject supports the following operations:
* get_by_id: returns specific object that is represented by the id passed as an
argument.
* get_objects: returns all objects of the type, potentially with a filter
applied.
* create/update/delete: usual persistence operations.
Base object class is defined in:
* neutron.objects.base
For QoS, new neutron objects were implemented:
* QosPolicy: directly maps to the conceptual policy resource, as defined above.
* QosBandwidthLimitRule: class that represents the only rule type supported by
initial QoS design.
Those are defined in:
* neutron.objects.qos.policy
* neutron.objects.qos.rule
For QosPolicy neutron object, the following public methods were implemented:
* get_network_policy/get_port_policy: returns a policy object that is attached
to the corresponding Neutron resource.
* attach_network/attach_port: attach a policy to the corresponding Neutron
resource.
* detach_network/detach_port: detach a policy from the corresponding Neutron
resource.
In addition to the fields that belong to QoS policy database object itself,
synthetic fields were added to the object that represent lists of rules that
belong to the policy. To get a list of all rules for a specific policy, a
consumer of the object can just access the corresponding attribute via:
* policy.rules
Implementation is done in a way that will allow adding a new rule list field
with little or no modifications in the policy object itself. This is achieved
by smart introspection of existing available rule object definitions and
automatic definition of those fields on the policy class.
Note that rules are loaded in a non lazy way, meaning they are all fetched from
the database on policy fetch.
For Qos<type>Rule objects, an extendable approach was taken to allow easy
addition of objects for new rule types. To accomodate this, fields common to
all types are put into a base class called QosRule that is then inherited into
type-specific rule implementations that, ideally, only define additional fields
and some other minor things.
Note that the QosRule base class is not registered with oslo.versionedobjects
registry, because it's not expected that 'generic' rules should be
instantiated (and to enforce just that, the base rule class is marked as ABC).
QoS objects rely on some primitive database API functions that are added in:
* neutron.db.api
* neutron.db.qos.api
Callback changes
----------------
TODO(QoS): We're changing strategy here to not rely on AFTER_READ callbacks,
and foster discussion about how to do decouple core resource
extension in the community. So, update next phrase when that
happens.
To extend ports and networks with qos_policy_id field, AFTER_READ callback
event is introduced.
Note: a better mechanism is being built by @armax to make resource extensions
more explicit and under control. We will migrate to that better mechanism as
soon as it's available.
RPC communication
-----------------
Details on RPC communication implemented in reference backend driver are
discussed in `a separate page <rpc_callbacks.html>`_.
One thing that should be mentioned here explicitly is that RPC callback
endpoints communicate using real versioned objects (as defined by serialization
for oslo.versionedobjects library), not vague json dictionaries. Meaning,
oslo.versionedobjects are on the wire and not just used internally inside a
component.
There is expectation that after RPC callbacks are introduced in Neutron, we
will be able to migrate propagation from server to agents for other resources
(f.e. security groups) to the new mechanism. This will need to wait until those
resources get proper NeutronObject implementations.
Agent side design
=================
To facilitate code reusability between agents and agent extensions without
patching the agent code itself, agent extensions were introduced. They can be
especially interesting to third parties that don't want to maintain their code
in Neutron tree.
Extensions are meant to receive basic events like port update or delete, and do
whatever they need with it.
* neutron.agent.l2.agent_extension:
extension interface definition.
* neutron.agent.l2.agent_extensions_manager:
manager that allows to register multiple extensions, and pass events down to
all enabled extensions.
* neutron.agent.l2.extensions.qos_agent:
defines QoSAgentExtension that is also pluggable using QoSAgentDriver
implementations that are specific to agent backends being used.
* neutron.agent.l2.l2_agent:
provides the API entry point for process_{network,subnet,port}_extension,
and holds an agent extension manager inside.
TODO(QoS): clarify what this is for, I don't follow a bit.
ML2
---
TODO(QoS): there is work ongoing that will need to be reflected here.
Agent backends
--------------
TODO(QoS): this section needs rework.
Open vSwitch
* neutron.plugins.ml2.drivers.openvswitch.agent.extension_drivers.qos_driver
This module implements the QoSAgentDriver interface used by the
QosAgentExtension.
* neutron.agent.common.ovs_lib
* neutron.agent.ovsdb.api
* neutron.agent.ovsdb.impl_idl
* neutron.agent.ovsdb.impl_vsctl
* neutron.agent.ovsdb.native.commands
SR-IOV
Configuration
=============
TODO(QoS)
Testing strategy
================
Neutron objects
---------------
Base unit test classes to validate neutron objects were implemented in a way
that allows code reuse when introducing a new object type.
There are two test classes that are utilized for that:
* BaseObjectIfaceTestCase: class to validate basic object operations (mostly
CRUD) with database layer isolated.
* BaseDbObjectTestCase: class to validate the same operations with models in
place and database layer unmocked.
Every new object implemented on top of one of those classes is expected to
either inherit existing test cases as is, or reimplement it, if it makes sense
in terms of how those objects are implemented. Specific test classes can
obviously extend the set of test cases as they see needed (f.e. you need to
define new test cases for those additional methods that you may add to your
object implementations on top of base semantics common to all neutron objects).

View File

@ -0,0 +1,229 @@
=================================
Neutron Messaging Callback System
=================================
Neutron already has a callback system [link-to: callbacks.rst] for
in-process resource callbacks where publishers and subscribers are able
to publish, subscribe and extend resources.
This system is different, and is intended to be used for inter-process
callbacks, via the messaging fanout mechanisms.
In Neutron, agents may need to subscribe to specific resource details which
may change over time. And the purpose of this messaging callback system
is to allow agent subscription to those resources without the need to extend
modify existing RPC calls, or creating new RPC messages.
A few resource which can benefit of this system:
* security groups members
* security group rules,
* QoS policies.
Using a remote publisher/subscriber pattern, the information about such
resources could be published using fanout queues to all interested nodes,
minimizing messaging requests from agents to server since the agents
get subscribed for their whole lifecycle (unless they unsubscribe).
Within an agent, there could be multiple subscriber callbacks to the same
resource events, the resources updates would be dispatched to the subscriber
callbacks from a single message. Any update would come in a single message,
doing only a single oslo versioned objects deserialization on each receiving
agent.
This publishing/subscription mechanism is highly dependent on the format
of the resources passed around. This is why the library only allows
versioned objects to be published and subscribed. Oslo versioned objects
allow object version down/up conversion. #[vo_mkcompat]_ #[vo_mkcptests]_
For the VO's versioning schema look here: #[vo_versioning]_
versioned_objects serialization/deserialization with the
obj_to_primitive(target_version=..) and primitive_to_obj() #[ov_serdes]_
methods is used internally to convert/retrieve objects before/after messaging.
Considering rolling upgrades, there are several scenarios to look at:
* publisher (generally neutron-server or a service) and subscriber (agent)
know the same version of the objects, so they serialize, and deserialize
without issues.
* publisher knows (and sends) an older version of the object, subscriber
will get the object updated to latest version on arrival before any
callback is called.
* publisher sends a newer version of the object, subscriber won't be able
to deserialize the object, in this case (PLEASE DISCUSS), we can think of two
strategies:
a) During upgrades, we pin neutron-server to a compatible version for resource
fanout updates, and server sends both the old, and the newer version to
different topic, queues. Old agents receive the updates on the old version
topic, new agents receive updates on the new version topic.
When the whole system upgraded, we un-pin the compatible version fanout.
A variant of this could be using a single fanout queue, and sending the
pinned version of the object to all. Newer agents can deserialize to the
latest version and upgrade any fields internally. Again at the end, we
unpin the version and restart the service.
b) The subscriber will rpc call the publisher to start publishing also a downgraded
version of the object on every update on a separate queue. The complication
of this version, is the need to ignore new version objects as long as we keep
receiving the downgraded ones, and otherwise resend the request to send the
downgraded objects after a certain timeout (thinking of the case where the
request for downgraded queue is done, but the publisher restarted).
This approach is more complicated to implement, but more automated from the
administrator point of view. We may want to look into it as a second step
from a
c) The subscriber will send a registry.get_info for the latest specific version
he knows off. This can have scalability issues during upgrade as any outdated
agent will require a flow of two messages (request, and response). This is
indeed very bad at scale if you have hundreds or thousands of agents.
Option a seems like a reasonable strategy, similar to what nova does now with
versioned objects.
Serialized versioned objects look like::
{'versioned_object.version': '1.0',
'versioned_object.name': 'QoSProfile',
'versioned_object.data': {'rules': [
{'versioned_object.version': '1.0',
'versioned_object.name': 'QoSRule',
'versioned_object.data': {'name': u'a'},
'versioned_object.namespace': 'versionedobjects'}
],
'uuid': u'abcde',
'name': u'aaa'},
'versioned_object.namespace': 'versionedobjects'}
Topic names for the fanout queues
=================================
if we adopted option a:
neutron-<resouce_type>_<resource_id>-<vo_version>
[neutron-<resouce_type>_<resource_id>-<vo_version_compat>]
if we adopted option b for rolling upgrades:
neutron-<resource_type>-<resource_id>
neutron-<resource_type>-<resource_id>-<vo_version>
for option c, just:
neutron-<resource_type>-<resource_id>
Subscribing to resources
========================
Imagine that you have agent A, which just got to handle a new port, which
has an associated security group, and QoS policy.
The agent code processing port updates may look like::
from neutron.rpc_resources import events
from neutron.rpc_resources import resources
from neutron.rpc_resources import registry
def process_resource_updates(resource_type, resource_id, resource_list, action_type):
# send to the right handler which will update any control plane
# details related to the updated resource...
def port_update(...):
# here we extract sg_id and qos_policy_id from port..
registry.subscribe(resources.SG_RULES, sg_id,
callback=process_resource_updates)
sg_rules = registry.get_info(resources.SG_RULES, sg_id)
registry.subscribe(resources.SG_MEMBERS, sg_id,
callback=process_resource_updates)
sg_members = registry.get_info(resources.SG_MEMBERS, sg_id)
registry.subscribe(resources.QOS_RULES, qos_policy_id,
callback=process_resource_updates)
qos_rules = registry.get_info(resources.QOS_RULES, qos_policy_id,
callback=process_resource_updates)
cleanup_subscriptions()
def cleanup_subscriptions()
sg_ids = determine_unreferenced_sg_ids()
qos_policy_id = determine_unreferenced_qos_policy_ids()
registry.unsubscribe_info(resource.SG_RULES, sg_ids)
registry.unsubscribe_info(resource.SG_MEMBERS, sg_ids)
registry.unsubscribe_info(resource.QOS_RULES, qos_policy_id)
Another unsubscription strategy could be to lazily unsubscribe resources when
we receive updates for them, and we discover that they are not needed anymore.
Deleted resources are automatically unsubscribed as we receive the delete event.
NOTE(irenab): this could be extended to core resources like ports, making use
of the standard neutron in-process callbacks at server side and propagating
AFTER_UPDATE events, for example, but we may need to wait until those callbacks
are used with proper versioned objects.
Unsubscribing to resources
==========================
There are a few options to unsubscribe registered callbacks:
* unsubscribe_resource_id(): it selectively unsubscribes an specific
resource type + id.
* unsubscribe_resource_type(): it unsubscribes from an specific resource type,
any ID.
* unsubscribe_all(): it unsubscribes all subscribed resources and ids.
Sending resource updates
========================
On the server side, resource updates could come from anywhere, a service plugin,
an extension, anything that updates the resource and that it's of any interest
to the agents.
The server/publisher side may look like::
from neutron.rpc_resources import events
from neutron.rpc_resources import resources
from neutron.rpc_resources import registry as rpc_registry
def add_qos_x_rule(...):
update_the_db(...)
send_rpc_updates_on_qos_policy(qos_policy_id)
def del_qos_x_rule(...):
update_the_db(...)
send_rpc_deletion_of_qos_policy(qos_policy_id)
def send_rpc_updates_on_qos_policy(qos_policy_id):
rules = get_qos_policy_rules_versioned_object(qos_policy_id)
rpc_registry.notify(resources.QOS_RULES, qos_policy_id, rules, events.UPDATE)
def send_rpc_deletion_of_qos_policy(qos_policy_id):
rpc_registry.notify(resources.QOS_RULES, qos_policy_id, None, events.DELETE)
# This part is added for the registry mechanism, to be able to request
# older versions of the notified objects if any oudated agent requires
# them.
def retrieve_older_version_callback(qos_policy_id, version):
return get_qos_policy_rules_versioned_object(qos_policy_id, version)
rpc_registry.register_retrieve_callback(resource.QOS_RULES,
retrieve_older_version_callback)
References
==========
.. [#ov_serdes] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/tests/test_objects.py#L621
.. [#vo_mkcompat] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/base.py#L460
.. [#vo_mkcptests] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/tests/test_objects.py#L111
.. [#vo_versioning] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/base.py#L236

View File

@ -75,7 +75,7 @@
# of its entrypoint name.
#
# service_plugins =
# Example: service_plugins = router,firewall,lbaas,vpnaas,metering
# Example: service_plugins = router,firewall,lbaas,vpnaas,metering,qos
# Paste configuration file
# api_paste_config = api-paste.ini
@ -1017,3 +1017,7 @@ lock_path = $state_path/lock
# Deprecated, use rpc_backend=kombu+memory or rpc_backend=fake (boolean value)
# Deprecated group/name - [DEFAULT]/fake_rabbit
# fake_rabbit = false
[qos]
# Drivers list to use to send the update notification
# notification_drivers = message_queue

View File

@ -133,6 +133,11 @@
#
# quitting_rpc_timeout = 10
# (ListOpt) Extensions list to use
# Example: extensions = qos
#
# extensions =
[securitygroup]
# Firewall driver for realizing neutron security group function.
# firewall_driver = neutron.agent.firewall.NoopFirewallDriver
@ -142,6 +147,10 @@
# It should be false when you use nova security group.
# enable_security_group = True
[qos]
# QoS agent driver
# agent_driver = ovs
#-----------------------------------------------------------------------------
# Sample Configurations.
#-----------------------------------------------------------------------------

View File

@ -39,12 +39,14 @@
"get_network:provider:physical_network": "rule:admin_only",
"get_network:provider:segmentation_id": "rule:admin_only",
"get_network:queue_id": "rule:admin_only",
"get_network:qos_policy_id": "rule:admin_only",
"create_network:shared": "rule:admin_only",
"create_network:router:external": "rule:admin_only",
"create_network:segments": "rule:admin_only",
"create_network:provider:network_type": "rule:admin_only",
"create_network:provider:physical_network": "rule:admin_only",
"create_network:provider:segmentation_id": "rule:admin_only",
"create_network:qos_policy_id": "rule:admin_only",
"update_network": "rule:admin_or_owner",
"update_network:segments": "rule:admin_only",
"update_network:shared": "rule:admin_only",
@ -52,6 +54,7 @@
"update_network:provider:physical_network": "rule:admin_only",
"update_network:provider:segmentation_id": "rule:admin_only",
"update_network:router:external": "rule:admin_only",
"update_network:qos_policy_id": "rule:admin_only",
"delete_network": "rule:admin_or_owner",
"create_port": "",
@ -62,12 +65,14 @@
"create_port:binding:profile": "rule:admin_only",
"create_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"create_port:allowed_address_pairs": "rule:admin_or_network_owner",
"create_port:qos_policy_id": "rule:admin_only",
"get_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_port:queue_id": "rule:admin_only",
"get_port:binding:vif_type": "rule:admin_only",
"get_port:binding:vif_details": "rule:admin_only",
"get_port:binding:host_id": "rule:admin_only",
"get_port:binding:profile": "rule:admin_only",
"get_port:qos_policy_id": "rule:admin_only",
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
"update_port:mac_address": "rule:admin_only or rule:context_is_advsvc",
"update_port:fixed_ips": "rule:admin_or_network_owner or rule:context_is_advsvc",
@ -76,6 +81,7 @@
"update_port:binding:profile": "rule:admin_only",
"update_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"update_port:allowed_address_pairs": "rule:admin_or_network_owner",
"update_port:qos_policy_id": "rule:admin_only",
"delete_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_router:ha": "rule:admin_only",

View File

@ -489,6 +489,81 @@ class OVSBridge(BaseOVS):
txn.add(self.ovsdb.db_set('Controller',
controller_uuid, *attr))
def _create_qos_bw_limit_queue(self, port_name, max_bw_in_bits,
max_burst_in_bits):
external_ids = {'id': port_name}
queue_other_config = {'min-rate': max_bw_in_bits,
'max-rate': max_bw_in_bits,
'burst': max_burst_in_bits}
self.ovsdb.db_create(
'Queue', external_ids=external_ids,
other_config=queue_other_config).execute(check_error=True)
def _create_qos_bw_limit_profile(self, port_name, max_bw_in_bits):
external_ids = {'id': port_name}
queue = self.ovsdb.db_find(
'Queue',
('external_ids', '=', {'id': port_name}),
columns=['_uuid']).execute(
check_error=True)
queues = {}
queues[0] = queue[0]['_uuid']
qos_other_config = {'max-rate': max_bw_in_bits}
self.ovsdb.db_create('QoS', external_ids=external_ids,
other_config=qos_other_config,
type='linux-htb',
queues=queues).execute(check_error=True)
def create_qos_bw_limit_for_port(self, port_name, max_kbps,
max_burst_kbps):
# TODO(QoS) implement this with transactions,
# or roll back on failure
max_bw_in_bits = str(max_kbps * 1000)
max_burst_in_bits = str(max_burst_kbps * 1000)
self._create_qos_bw_limit_queue(port_name, max_bw_in_bits,
max_burst_in_bits)
self._create_qos_bw_limit_profile(port_name, max_bw_in_bits)
qos = self.ovsdb.db_find('QoS',
('external_ids', '=', {'id': port_name}),
columns=['_uuid']).execute(check_error=True)
qos_profile = qos[0]['_uuid']
self.set_db_attribute('Port', port_name, 'qos', qos_profile,
check_error=True)
def get_qos_bw_limit_for_port(self, port_name):
res = self.ovsdb.db_find(
'Queue',
('external_ids', '=', {'id': port_name}),
columns=['other_config']).execute(check_error=True)
if res is None or len(res) == 0:
return None, None
other_config = res[0]['other_config']
max_kbps = int(other_config['max-rate']) / 1000
max_burst_kbps = int(other_config['burst']) / 1000
return max_kbps, max_burst_kbps
def del_qos_bw_limit_for_port(self, port_name):
qos = self.ovsdb.db_find('QoS',
('external_ids', '=', {'id': port_name}),
columns=['_uuid']).execute(check_error=True)
qos_row = qos[0]['_uuid']
queue = self.ovsdb.db_find('Queue',
('external_ids', '=', {'id': port_name}),
columns=['_uuid']).execute(check_error=True)
queue_row = queue[0]['_uuid']
with self.ovsdb.transaction(check_error=True) as txn:
txn.add(self.ovsdb.db_set('Port', port_name, ('qos', [])))
txn.add(self.ovsdb.db_destroy('QoS', qos_row))
txn.add(self.ovsdb.db_destroy('Queue', queue_row))
def __enter__(self):
self.create()
return self

View File

View File

@ -0,0 +1,41 @@
# Copyright (c) 2015 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class AgentCoreResourceExtension(object):
"""Define stable abstract interface for agent extensions.
An agent extension extends the agent core functionality.
"""
def initialize(self):
"""Perform agent core resource extension initialization.
Called after all extensions have been loaded.
No port handling will be called before this method.
"""
@abc.abstractmethod
def handle_port(self, context, data):
"""handle agent extension for port.
:param context - rpc context
:param data - port data
"""

View File

@ -0,0 +1,65 @@
# Copyright (c) 2015 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log
import stevedore
from neutron.i18n import _LE, _LI
LOG = log.getLogger(__name__)
L2_AGENT_EXT_MANAGER_NAMESPACE = 'neutron.agent.l2.extensions'
L2_AGENT_EXT_MANAGER_OPTS = [
cfg.ListOpt('extensions',
default=[],
help=_('Extensions list to use')),
]
def register_opts(conf):
conf.register_opts(L2_AGENT_EXT_MANAGER_OPTS, 'agent')
class AgentExtensionsManager(stevedore.named.NamedExtensionManager):
"""Manage agent extensions."""
def __init__(self, conf):
super(AgentExtensionsManager, self).__init__(
L2_AGENT_EXT_MANAGER_NAMESPACE, conf.agent.extensions,
invoke_on_load=True, name_order=True)
LOG.info(_LI("Loaded agent extensions: %s"), self.names())
def initialize(self):
# Initialize each agent extension in the list.
for extension in self:
LOG.info(_LI("Initializing agent extension '%s'"), extension.name)
extension.obj.initialize()
def handle_port(self, context, data):
"""Notify all agent extensions to handle port."""
for extension in self:
try:
extension.obj.handle_port(context, data)
# TODO(QoS) add agent extensions exception and catch them here
except AttributeError:
LOG.exception(
_LE("Agent Extension '%(name)s' failed "
"while handling port update"),
{'name': extension.name}
)
#TODO(Qos) we are missing how to handle delete. we can pass action
#type in all the handle methods or add handle_delete_resource methods

View File

View File

@ -0,0 +1,126 @@
# Copyright (c) 2015 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import collections
from oslo_config import cfg
import six
from neutron.agent.l2 import agent_extension
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron import manager
@six.add_metaclass(abc.ABCMeta)
class QosAgentDriver(object):
"""Define stable abstract interface for QoS Agent Driver.
QoS Agent driver defines the interface to be implemented by Agent
for applying QoS Rules on a port.
"""
@abc.abstractmethod
def initialize(self):
"""Perform QoS agent driver initialization.
"""
pass
@abc.abstractmethod
def create(self, port, qos_policy):
"""Apply QoS rules on port for the first time.
:param port: port object.
:param qos_policy: the QoS policy to be apply on port.
"""
#TODO(QoS) we may want to provide default implementations of calling
#delete and then update
pass
@abc.abstractmethod
def update(self, port, qos_policy):
"""Apply QoS rules on port.
:param port: port object.
:param qos_policy: the QoS policy to be apply on port.
"""
pass
@abc.abstractmethod
def delete(self, port, qos_policy):
"""Remove QoS rules from port.
:param port: port object.
:param qos_policy: the QoS policy to be removed from port.
"""
pass
class QosAgentExtension(agent_extension.AgentCoreResourceExtension):
def initialize(self):
"""Perform Agent Extension initialization.
"""
super(QosAgentExtension, self).initialize()
self.resource_rpc = resources_rpc.ResourcesServerRpcApi()
self.qos_driver = manager.NeutronManager.load_class_for_provider(
'neutron.qos.agent_drivers', cfg.CONF.qos.agent_driver)()
self.qos_driver.initialize()
self.qos_policy_ports = collections.defaultdict(dict)
self.known_ports = set()
def handle_port(self, context, port):
"""Handle agent QoS extension for port.
This method subscribes to qos_policy_id changes
with a callback and get all the qos_policy_ports and apply
them using the QoS driver.
Updates and delete event should be handle by the registered
callback.
"""
port_id = port['port_id']
qos_policy_id = port.get('qos_policy_id')
if qos_policy_id is None:
#TODO(QoS): we should also handle removing policy
return
#Note(moshele) check if we have seen this port
#and it has the same policy we do nothing.
if (port_id in self.known_ports and
port_id in self.qos_policy_ports[qos_policy_id]):
return
self.qos_policy_ports[qos_policy_id][port_id] = port
self.known_ports.add(port_id)
#TODO(QoS): handle updates when implemented
# we have two options:
# 1. to add new api for subscribe
# registry.subscribe(self._process_policy_updates,
# resources.QOS_POLICY, qos_policy_id)
# 2. combine get_info rpc to also subscribe to the resource
qos_policy = self.resource_rpc.get_info(
context,
resources.QOS_POLICY,
qos_policy_id)
self._process_policy_updates(
port, resources.QOS_POLICY, qos_policy_id,
qos_policy, 'create')
def _process_policy_updates(
self, port, resource_type, resource_id,
qos_policy, action_type):
getattr(self.qos_driver, action_type)(port, qos_policy)

View File

@ -161,6 +161,29 @@ class API(object):
:returns: :class:`Command` with field value result
"""
@abc.abstractmethod
def db_create(self, table, **col_values):
"""Create a command to create new record
:param table: The OVS table containing the record to be created
:type table: string
:param col_values: The columns and their associated values
to be set after create
:type col_values: Dictionary of columns id's and values
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def db_destroy(self, table, record):
"""Create a command to destroy a record
:param table: The OVS table containing the record to be destroyed
:type table: string
:param record: The record id (name/uuid) to be destroyed
:type record: uuid/string
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def db_set(self, table, record, *col_values):
"""Create a command to set fields in a record

View File

@ -168,6 +168,12 @@ class OvsdbIdl(api.API):
def br_set_external_id(self, name, field, value):
return cmd.BrSetExternalIdCommand(self, name, field, value)
def db_create(self, table, **col_values):
return cmd.DbCreateCommand(self, table, **col_values)
def db_destroy(self, table, record):
return cmd.DbDestroyCommand(self, table, record)
def db_set(self, table, record, *col_values):
return cmd.DbSetCommand(self, table, record, *col_values)

View File

@ -184,6 +184,15 @@ class OvsdbVsctl(ovsdb.API):
return BaseCommand(self.context, 'br-get-external-id',
args=[name, field])
def db_create(self, table, **col_values):
args = [table]
args += _set_colval_args(*col_values.items())
return BaseCommand(self.context, 'create', args=args)
def db_destroy(self, table, record):
args = [table, record]
return BaseCommand(self.context, 'destroy', args=args)
def db_set(self, table, record, *col_values):
args = [table, record]
args += _set_colval_args(*col_values)
@ -259,8 +268,11 @@ def _set_colval_args(*col_values):
col, k, op, ovsdb.py_to_val(v)) for k, v in val.items()]
elif (isinstance(val, collections.Sequence)
and not isinstance(val, six.string_types)):
args.append(
"%s%s%s" % (col, op, ",".join(map(ovsdb.py_to_val, val))))
if len(val) == 0:
args.append("%s%s%s" % (col, op, "[]"))
else:
args.append(
"%s%s%s" % (col, op, ",".join(map(ovsdb.py_to_val, val))))
else:
args.append("%s%s%s" % (col, op, ovsdb.py_to_val(val)))
return args

View File

@ -148,6 +148,30 @@ class BrSetExternalIdCommand(BaseCommand):
br.external_ids = external_ids
class DbCreateCommand(BaseCommand):
def __init__(self, api, table, **columns):
super(DbCreateCommand, self).__init__(api)
self.table = table
self.columns = columns
def run_idl(self, txn):
row = txn.insert(self.api._tables[self.table])
for col, val in self.columns.items():
setattr(row, col, val)
self.result = row
class DbDestroyCommand(BaseCommand):
def __init__(self, api, table, record):
super(DbDestroyCommand, self).__init__(api)
self.table = table
self.record = record
def run_idl(self, txn):
record = idlutils.row_by_record(self.api.idl, self.table, self.record)
record.delete()
class DbSetCommand(BaseCommand):
def __init__(self, api, table, record, *col_values):
super(DbSetCommand, self).__init__(api)

View File

View File

@ -0,0 +1,19 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
UPDATED = 'updated'
DELETED = 'deleted'
VALID = (
UPDATED,
DELETED
)

View File

@ -0,0 +1,87 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.rpc.callbacks import resource_manager
from neutron.api.rpc.callbacks import resources
from neutron.common import exceptions
# TODO(ajo): consider adding locking
CALLBACK_MANAGER = None
def _get_resources_callback_manager():
global CALLBACK_MANAGER
if CALLBACK_MANAGER is None:
CALLBACK_MANAGER = resource_manager.ResourcesCallbacksManager()
return CALLBACK_MANAGER
class CallbackReturnedWrongObjectType(exceptions.NeutronException):
message = _('Callback for %(resource_type)s returned wrong object type')
class CallbackNotFound(exceptions.NeutronException):
message = _('Callback for %(resource_type)s not found')
#resource implementation callback registration functions
def get_info(resource_type, resource_id, **kwargs):
"""Get information about resource type with resource id.
The function will check the providers for a specific remotable
resource and get the resource.
:returns: NeutronObject
"""
callback = _get_resources_callback_manager().get_callback(resource_type)
if not callback:
raise CallbackNotFound(resource_type=resource_type)
obj = callback(resource_type, resource_id, **kwargs)
if obj:
expected_cls = resources.get_resource_cls(resource_type)
if not isinstance(obj, expected_cls):
raise CallbackReturnedWrongObjectType(
resource_type=resource_type)
return obj
def register_provider(callback, resource_type):
_get_resources_callback_manager().register(callback, resource_type)
# resource RPC callback for pub/sub
#Agent side
def subscribe(callback, resource_type, resource_id):
#TODO(QoS): we have to finish the real update notifications
raise NotImplementedError("we should finish update notifications")
def unsubscribe(callback, resource_type, resource_id):
#TODO(QoS): we have to finish the real update notifications
raise NotImplementedError("we should finish update notifications")
def unsubscribe_all():
#TODO(QoS): we have to finish the real update notifications
raise NotImplementedError("we should finish update notifications")
#Server side
def notify(resource_type, event, obj):
#TODO(QoS): we have to finish the real update notifications
raise NotImplementedError("we should finish update notifications")
def clear():
_get_resources_callback_manager().clear()

View File

@ -0,0 +1,67 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from oslo_log import log as logging
from neutron.api.rpc.callbacks import resources
from neutron.callbacks import exceptions
LOG = logging.getLogger(__name__)
class ResourcesCallbacksManager(object):
"""A callback system that allows information providers in a loose manner.
"""
def __init__(self):
self.clear()
def register(self, callback, resource_type):
"""Register a callback for a resource type.
Only one callback can be registered for a resource type.
:param callback: the callback. It must raise or return NeutronObject.
:param resource_type: must be a valid resource type.
"""
LOG.debug("register: %(callback)s %(resource_type)s",
{'callback': callback, 'resource_type': resource_type})
if not resources.is_valid_resource_type(resource_type):
raise exceptions.Invalid(element='resource', value=resource_type)
self._callbacks[resource_type] = callback
def unregister(self, resource_type):
"""Unregister callback from the registry.
:param resource: must be a valid resource type.
"""
LOG.debug("Unregister: %s", resource_type)
if not resources.is_valid_resource_type(resource_type):
raise exceptions.Invalid(element='resource', value=resource_type)
self._callbacks[resource_type] = None
def clear(self):
"""Brings the manager to a clean state."""
self._callbacks = collections.defaultdict(dict)
def get_callback(self, resource_type):
"""Return the callback if found, None otherwise.
:param resource_type: must be a valid resource type.
"""
if not resources.is_valid_resource_type(resource_type):
raise exceptions.Invalid(element='resource', value=resource_type)
return self._callbacks[resource_type]

View File

@ -0,0 +1,49 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.objects.qos import policy
_QOS_POLICY_CLS = policy.QosPolicy
_VALID_CLS = (
_QOS_POLICY_CLS,
)
_VALID_TYPES = [cls.obj_name() for cls in _VALID_CLS]
# Supported types
QOS_POLICY = _QOS_POLICY_CLS.obj_name()
_TYPE_TO_CLS_MAP = {
QOS_POLICY: _QOS_POLICY_CLS,
}
def get_resource_type(resource_cls):
if not resource_cls:
return None
if not hasattr(resource_cls, 'obj_name'):
return None
return resource_cls.obj_name()
def is_valid_resource_type(resource_type):
return resource_type in _VALID_TYPES
def get_resource_cls(resource_type):
return _TYPE_TO_CLS_MAP.get(resource_type)

View File

@ -0,0 +1,111 @@
# Copyright (c) 2015 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import helpers as log_helpers
from oslo_log import log as logging
import oslo_messaging
from neutron.api.rpc.callbacks import registry
from neutron.api.rpc.callbacks import resources
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import topics
LOG = logging.getLogger(__name__)
class ResourcesRpcError(exceptions.NeutronException):
pass
class InvalidResourceTypeClass(ResourcesRpcError):
message = _("Invalid resource type %(resource_type)s")
class ResourceNotFound(ResourcesRpcError):
message = _("Resource %(resource_id)s of type %(resource_type)s "
"not found")
def _validate_resource_type(resource_type):
if not resources.is_valid_resource_type(resource_type):
raise InvalidResourceTypeClass(resource_type=resource_type)
class ResourcesServerRpcApi(object):
"""Agent-side RPC (stub) for agent-to-plugin interaction.
This class implements the client side of an rpc interface. The server side
can be found below: ResourcesServerRpcCallback. For more information on
this RPC interface, see doc/source/devref/rpc_callbacks.rst.
"""
def __init__(self):
target = oslo_messaging.Target(
topic=topics.PLUGIN, version='1.0',
namespace=constants.RPC_NAMESPACE_RESOURCES)
self.client = n_rpc.get_client(target)
@log_helpers.log_method_call
def get_info(self, context, resource_type, resource_id):
_validate_resource_type(resource_type)
# we've already validated the resource type, so we are pretty sure the
# class is there => no need to validate it specifically
resource_type_cls = resources.get_resource_cls(resource_type)
cctxt = self.client.prepare()
primitive = cctxt.call(context, 'get_info',
resource_type=resource_type,
version=resource_type_cls.VERSION, resource_id=resource_id)
if primitive is None:
raise ResourceNotFound(resource_type=resource_type,
resource_id=resource_id)
obj = resource_type_cls.obj_from_primitive(primitive)
obj.obj_reset_changes()
return obj
class ResourcesServerRpcCallback(object):
"""Plugin-side RPC (implementation) for agent-to-plugin interaction.
This class implements the server side of an rpc interface. The client side
can be found above: ResourcesServerRpcApi. For more information on
this RPC interface, see doc/source/devref/rpc_callbacks.rst.
"""
# History
# 1.0 Initial version
target = oslo_messaging.Target(
version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES)
def get_info(self, context, resource_type, version, resource_id):
_validate_resource_type(resource_type)
obj = registry.get_info(
resource_type,
resource_id,
context=context)
if obj:
# don't request a backport for the latest known version
if version == obj.VERSION:
version = None
return obj.obj_to_primitive(target_version=version)

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
NETWORK = 'network'
PORT = 'port'
ROUTER = 'router'
ROUTER_GATEWAY = 'router_gateway'
@ -19,6 +20,7 @@ SECURITY_GROUP_RULE = 'security_group_rule'
SUBNET = 'subnet'
VALID = (
NETWORK,
PORT,
ROUTER,
ROUTER_GATEWAY,

View File

@ -127,22 +127,24 @@ def arp_header_match_supported():
def vf_management_supported():
is_supported = True
required_caps = (
ip_link_support.IpLinkConstants.IP_LINK_CAPABILITY_STATE,
ip_link_support.IpLinkConstants.IP_LINK_CAPABILITY_SPOOFCHK)
ip_link_support.IpLinkConstants.IP_LINK_CAPABILITY_SPOOFCHK,
ip_link_support.IpLinkConstants.IP_LINK_CAPABILITY_RATE)
try:
vf_section = ip_link_support.IpLinkSupport.get_vf_mgmt_section()
for cap in required_caps:
if not ip_link_support.IpLinkSupport.vf_mgmt_capability_supported(
vf_section, cap):
is_supported = False
LOG.debug("ip link command does not support "
"vf capability '%(cap)s'", cap)
return False
except ip_link_support.UnsupportedIpLinkCommand:
LOG.exception(_LE("Unexpected exception while checking supported "
"ip link command"))
return False
return True
return is_supported
def netns_read_requires_helper():

View File

@ -180,6 +180,8 @@ RPC_NAMESPACE_SECGROUP = None
RPC_NAMESPACE_DVR = None
# RPC interface for reporting state back to the plugin
RPC_NAMESPACE_STATE = None
# RPC interface for agent to plugin resources API
RPC_NAMESPACE_RESOURCES = None
# Default network MTU value when not configured
DEFAULT_NETWORK_MTU = 0

View File

@ -77,6 +77,10 @@ class AdminRequired(NotAuthorized):
message = _("User does not have admin privileges: %(reason)s")
class ObjectNotFound(NotFound):
message = _("Object %(id)s not found.")
class NetworkNotFound(NotFound):
message = _("Network %(net_id)s could not be found")
@ -93,11 +97,30 @@ class PortNotFound(NotFound):
message = _("Port %(port_id)s could not be found")
class QosPolicyNotFound(NotFound):
message = _("QoS policy %(policy_id)s could not be found")
class QosRuleNotFound(NotFound):
message = _("QoS rule %(rule_id)s for policy %(policy_id)s "
"could not be found")
class PortNotFoundOnNetwork(NotFound):
message = _("Port %(port_id)s could not be found "
"on network %(net_id)s")
class PortQosBindingNotFound(NotFound):
message = _("QoS binding for port %(port_id)s and policy %(policy_id)s "
"could not be found")
class NetworkQosBindingNotFound(NotFound):
message = _("QoS binding for network %(net_id)s and policy %(policy_id)s "
"could not be found")
class PolicyFileNotFound(NotFound):
message = _("Policy configuration policy.json could not be found")
@ -118,6 +141,11 @@ class InUse(NeutronException):
message = _("The resource is inuse")
class QosPolicyInUse(InUse):
message = _("QoS Policy %(policy_id)s is used by "
"%(object_type)s %(object_id)s.")
class NetworkInUse(InUse):
message = _("Unable to complete operation on network %(net_id)s. "
"There are one or more ports still in use on the network.")
@ -474,3 +502,7 @@ class DeviceNotFoundError(NeutronException):
class NetworkSubnetPoolAffinityError(BadRequest):
message = _("Subnets hosted on the same network must be allocated from "
"the same subnet pool")
class ObjectActionError(NeutronException):
message = _('Object action %(action)s failed because: %(reason)s')

View File

@ -19,6 +19,7 @@ PORT = 'port'
SECURITY_GROUP = 'security_group'
L2POPULATION = 'l2population'
DVR = 'dvr'
RESOURCES = 'resources'
CREATE = 'create'
DELETE = 'delete'

View File

@ -434,3 +434,7 @@ class DelayedStringRenderer(object):
def __str__(self):
return str(self.function(*self.args, **self.kwargs))
def camelize(s):
return ''.join(s.replace('_', ' ').title().split())

View File

@ -20,9 +20,13 @@ from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as os_db_exception
from oslo_db.sqlalchemy import session
from oslo_utils import uuidutils
from sqlalchemy import exc
from sqlalchemy import orm
from neutron.common import exceptions as n_exc
from neutron.db import common_db_mixin
_FACADE = None
@ -88,3 +92,48 @@ class convert_db_exception_to_retry(object):
except self.to_catch as e:
raise os_db_exception.RetryRequest(e)
return wrapper
# Common database operation implementations
def get_object(context, model, **kwargs):
with context.session.begin(subtransactions=True):
return (common_db_mixin.model_query(context, model)
.filter_by(**kwargs)
.first())
def get_objects(context, model, **kwargs):
with context.session.begin(subtransactions=True):
return (common_db_mixin.model_query(context, model)
.filter_by(**kwargs)
.all())
def create_object(context, model, values):
with context.session.begin(subtransactions=True):
if 'id' not in values:
values['id'] = uuidutils.generate_uuid()
db_obj = model(**values)
context.session.add(db_obj)
return db_obj.__dict__
def _safe_get_object(context, model, id):
db_obj = get_object(context, model, id=id)
if db_obj is None:
raise n_exc.ObjectNotFound(id=id)
return db_obj
def update_object(context, model, id, values):
with context.session.begin(subtransactions=True):
db_obj = _safe_get_object(context, model, id)
db_obj.update(values)
db_obj.save(session=context.session)
return db_obj.__dict__
def delete_object(context, model, id):
with context.session.begin(subtransactions=True):
db_obj = _safe_get_object(context, model, id)
context.session.delete(db_obj)

View File

@ -29,6 +29,40 @@ from neutron.db import models_v2
LOG = logging.getLogger(__name__)
def convert_result_to_dict(f):
@functools.wraps(f)
def inner(*args, **kwargs):
result = f(*args, **kwargs)
if result is None:
return None
elif isinstance(result, list):
return [r.to_dict() for r in result]
else:
return result.to_dict()
return inner
def filter_fields(f):
@functools.wraps(f)
def inner_filter(*args, **kwargs):
result = f(*args, **kwargs)
fields = kwargs.get('fields')
if not fields:
try:
pos = f.func_code.co_varnames.index('fields')
fields = args[pos]
except (IndexError, ValueError):
return result
do_filter = lambda d: {k: v for k, v in d.items() if k in fields}
if isinstance(result, list):
return [do_filter(obj) for obj in result]
else:
return do_filter(result)
return inner_filter
class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
"""Stores getters and helper methods for db_base_plugin_v2

View File

@ -1,3 +1,3 @@
1c844d1677f7
2a16083502f3
48153cb5f051
kilo

View File

@ -0,0 +1,69 @@
# Copyright 2015 Huawei Technologies India Pvt Ltd, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""qos db changes
Revision ID: 48153cb5f051
Revises: 1c844d1677f7
Create Date: 2015-06-24 17:03:34.965101
"""
# revision identifiers, used by Alembic.
revision = '48153cb5f051'
down_revision = '1c844d1677f7'
from alembic import op
import sqlalchemy as sa
from neutron.api.v2 import attributes as attrs
def upgrade():
op.create_table(
'qos_policies',
sa.Column('id', sa.String(length=36), primary_key=True),
sa.Column('name', sa.String(length=attrs.NAME_MAX_LEN)),
sa.Column('description', sa.String(length=attrs.DESCRIPTION_MAX_LEN)),
sa.Column('shared', sa.Boolean(), nullable=False),
sa.Column('tenant_id', sa.String(length=attrs.TENANT_ID_MAX_LEN),
index=True))
op.create_table(
'qos_network_policy_bindings',
sa.Column('policy_id', sa.String(length=36),
sa.ForeignKey('qos_policies.id', ondelete='CASCADE'),
nullable=False),
sa.Column('network_id', sa.String(length=36),
sa.ForeignKey('networks.id', ondelete='CASCADE'),
nullable=False, unique=True))
op.create_table(
'qos_port_policy_bindings',
sa.Column('policy_id', sa.String(length=36),
sa.ForeignKey('qos_policies.id', ondelete='CASCADE'),
nullable=False),
sa.Column('port_id', sa.String(length=36),
sa.ForeignKey('ports.id', ondelete='CASCADE'),
nullable=False, unique=True))
op.create_table(
'qos_bandwidth_limit_rules',
sa.Column('id', sa.String(length=36), primary_key=True),
sa.Column('qos_policy_id', sa.String(length=36),
sa.ForeignKey('qos_policies.id', ondelete='CASCADE'),
nullable=False, unique=True),
sa.Column('max_kbps', sa.Integer()),
sa.Column('max_burst_kbps', sa.Integer()))

View File

@ -41,6 +41,7 @@ from neutron.db import model_base
from neutron.db import models_v2 # noqa
from neutron.db import portbindings_db # noqa
from neutron.db import portsecurity_db # noqa
from neutron.db.qos import models as qos_models # noqa
from neutron.db.quota import models # noqa
from neutron.db import rbac_db_models # noqa
from neutron.db import securitygroups_db # noqa

View File

65
neutron/db/qos/api.py Normal file
View File

@ -0,0 +1,65 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_db import exception as oslo_db_exception
from sqlalchemy.orm import exc as orm_exc
from neutron.common import exceptions as n_exc
from neutron.db import common_db_mixin as db
from neutron.db.qos import models
def create_policy_network_binding(context, policy_id, network_id):
try:
with context.session.begin(subtransactions=True):
db_obj = models.QosNetworkPolicyBinding(policy_id=policy_id,
network_id=network_id)
context.session.add(db_obj)
except oslo_db_exception.DBReferenceError:
raise n_exc.NetworkQosBindingNotFound(net_id=network_id,
policy_id=policy_id)
def delete_policy_network_binding(context, policy_id, network_id):
try:
with context.session.begin(subtransactions=True):
db_object = (db.model_query(context,
models.QosNetworkPolicyBinding)
.filter_by(policy_id=policy_id,
network_id=network_id).one())
context.session.delete(db_object)
except orm_exc.NoResultFound:
raise n_exc.NetworkQosBindingNotFound(net_id=network_id,
policy_id=policy_id)
def create_policy_port_binding(context, policy_id, port_id):
try:
with context.session.begin(subtransactions=True):
db_obj = models.QosPortPolicyBinding(policy_id=policy_id,
port_id=port_id)
context.session.add(db_obj)
except oslo_db_exception.DBReferenceError:
raise n_exc.PortQosBindingNotFound(port_id=port_id,
policy_id=policy_id)
def delete_policy_port_binding(context, policy_id, port_id):
try:
with context.session.begin(subtransactions=True):
db_object = (db.model_query(context, models.QosPortPolicyBinding)
.filter_by(policy_id=policy_id,
port_id=port_id).one())
context.session.delete(db_object)
except orm_exc.NoResultFound:
raise n_exc.PortQosBindingNotFound(port_id=port_id,
policy_id=policy_id)

86
neutron/db/qos/models.py Executable file
View File

@ -0,0 +1,86 @@
# Copyright 2015 Huawei Technologies India Pvt Ltd, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
import sqlalchemy as sa
from neutron.api.v2 import attributes as attrs
from neutron.db import model_base
from neutron.db import models_v2
LOG = logging.getLogger(__name__)
class QosPolicy(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
__tablename__ = 'qos_policies'
name = sa.Column(sa.String(attrs.NAME_MAX_LEN))
description = sa.Column(sa.String(attrs.DESCRIPTION_MAX_LEN))
shared = sa.Column(sa.Boolean, nullable=False)
class QosNetworkPolicyBinding(model_base.BASEV2):
__tablename__ = 'qos_network_policy_bindings'
policy_id = sa.Column(sa.String(36),
sa.ForeignKey('qos_policies.id',
ondelete='CASCADE'),
nullable=False,
primary_key=True)
network_id = sa.Column(sa.String(36),
sa.ForeignKey('networks.id',
ondelete='CASCADE'),
nullable=False,
unique=True,
primary_key=True)
network = sa.orm.relationship(
models_v2.Network,
backref=sa.orm.backref("qos_policy_binding", uselist=False,
cascade='delete', lazy='joined'))
class QosPortPolicyBinding(model_base.BASEV2):
__tablename__ = 'qos_port_policy_bindings'
policy_id = sa.Column(sa.String(36),
sa.ForeignKey('qos_policies.id',
ondelete='CASCADE'),
nullable=False,
primary_key=True)
port_id = sa.Column(sa.String(36),
sa.ForeignKey('ports.id',
ondelete='CASCADE'),
nullable=False,
unique=True,
primary_key=True)
port = sa.orm.relationship(
models_v2.Port,
backref=sa.orm.backref("qos_policy_binding", uselist=False,
cascade='delete', lazy='joined'))
class QosRuleColumns(models_v2.HasId):
# NOTE(ihrachyshka): we may need to rework it later when we introduce types
# that should not enforce uniqueness
qos_policy_id = sa.Column(sa.String(36), nullable=False, unique=True)
__table_args__ = (
sa.ForeignKeyConstraint(['qos_policy_id'], ['qos_policies.id']),
model_base.BASEV2.__table_args__
)
class QosBandwidthLimitRule(QosRuleColumns, model_base.BASEV2):
__tablename__ = 'qos_bandwidth_limit_rules'
max_kbps = sa.Column(sa.Integer)
max_burst_kbps = sa.Column(sa.Integer)

236
neutron/extensions/qos.py Normal file
View File

@ -0,0 +1,236 @@
# Copyright (c) 2015 Red Hat Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import itertools
import six
from neutron.api import extensions
from neutron.api.v2 import attributes as attr
from neutron.api.v2 import base
from neutron.api.v2 import resource_helper
from neutron import manager
from neutron.plugins.common import constants
from neutron.services.qos import qos_consts
from neutron.services import service_base
QOS_PREFIX = "/qos"
# Attribute Map
QOS_RULE_COMMON_FIELDS = {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True,
'primary_key': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True},
}
RESOURCE_ATTRIBUTE_MAP = {
'policies': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True, 'primary_key': True},
'name': {'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': '',
'validate': {'type:string': None}},
'description': {'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': '',
'validate': {'type:string': None}},
'shared': {'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': False,
'convert_to': attr.convert_to_boolean},
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True},
'rules': {'allow_post': False, 'allow_put': False, 'is_visible': True},
},
'rule_types': {
'type': {'allow_post': False, 'allow_put': False,
'is_visible': True}
}
}
SUB_RESOURCE_ATTRIBUTE_MAP = {
'bandwidth_limit_rules': {
'parent': {'collection_name': 'policies',
'member_name': 'policy'},
'parameters': dict(QOS_RULE_COMMON_FIELDS,
**{'max_kbps': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': None,
'validate': {'type:non_negative': None}},
'max_burst_kbps': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': 0,
'validate': {'type:non_negative': None}}})
}
}
EXTENDED_ATTRIBUTES_2_0 = {
'ports': {qos_consts.QOS_POLICY_ID: {
'allow_post': True,
'allow_put': True,
'is_visible': True,
'default': None,
'validate': {'type:uuid_or_none': None}}},
'networks': {qos_consts.QOS_POLICY_ID: {
'allow_post': True,
'allow_put': True,
'is_visible': True,
'default': None,
'validate': {'type:uuid_or_none': None}}}}
class Qos(extensions.ExtensionDescriptor):
"""Quality of service API extension."""
@classmethod
def get_name(cls):
return "qos"
@classmethod
def get_alias(cls):
return "qos"
@classmethod
def get_description(cls):
return "The Quality of Service extension."
@classmethod
def get_updated(cls):
return "2015-06-08T10:00:00-00:00"
@classmethod
def get_plugin_interface(cls):
return QoSPluginBase
@classmethod
def get_resources(cls):
"""Returns Ext Resources."""
special_mappings = {'policies': 'policy'}
plural_mappings = resource_helper.build_plural_mappings(
special_mappings, itertools.chain(RESOURCE_ATTRIBUTE_MAP,
SUB_RESOURCE_ATTRIBUTE_MAP))
attr.PLURALS.update(plural_mappings)
resources = resource_helper.build_resource_info(
plural_mappings,
RESOURCE_ATTRIBUTE_MAP,
constants.QOS,
translate_name=True,
allow_bulk=True)
plugin = manager.NeutronManager.get_service_plugins()[constants.QOS]
for collection_name in SUB_RESOURCE_ATTRIBUTE_MAP:
resource_name = collection_name[:-1]
parent = SUB_RESOURCE_ATTRIBUTE_MAP[collection_name].get('parent')
params = SUB_RESOURCE_ATTRIBUTE_MAP[collection_name].get(
'parameters')
controller = base.create_resource(collection_name, resource_name,
plugin, params,
allow_bulk=True,
parent=parent,
allow_pagination=True,
allow_sorting=True)
resource = extensions.ResourceExtension(
collection_name,
controller, parent,
path_prefix=QOS_PREFIX,
attr_map=params)
resources.append(resource)
return resources
def update_attributes_map(self, attributes, extension_attrs_map=None):
super(Qos, self).update_attributes_map(
attributes, extension_attrs_map=RESOURCE_ATTRIBUTE_MAP)
def get_extended_resources(self, version):
if version == "2.0":
return dict(EXTENDED_ATTRIBUTES_2_0.items() +
RESOURCE_ATTRIBUTE_MAP.items())
else:
return {}
@six.add_metaclass(abc.ABCMeta)
class QoSPluginBase(service_base.ServicePluginBase):
path_prefix = QOS_PREFIX
def get_plugin_description(self):
return "QoS Service Plugin for ports and networks"
def get_plugin_type(self):
return constants.QOS
@abc.abstractmethod
def get_policy(self, context, policy_id, fields=None):
pass
@abc.abstractmethod
def get_policies(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
pass
@abc.abstractmethod
def create_policy(self, context, policy):
pass
@abc.abstractmethod
def update_policy(self, context, policy_id, policy):
pass
@abc.abstractmethod
def delete_policy(self, context, policy_id):
pass
@abc.abstractmethod
def get_policy_bandwidth_limit_rule(self, context, rule_id,
policy_id, fields=None):
pass
@abc.abstractmethod
def get_policy_bandwidth_limit_rules(self, context, policy_id,
filters=None, fields=None,
sorts=None, limit=None,
marker=None, page_reverse=False):
pass
@abc.abstractmethod
def create_policy_bandwidth_limit_rule(self, context, policy_id,
bandwidth_limit_rule):
pass
@abc.abstractmethod
def update_policy_bandwidth_limit_rule(self, context, rule_id, policy_id,
bandwidth_limit_rule):
pass
@abc.abstractmethod
def delete_policy_bandwidth_limit_rule(self, context, rule_id, policy_id):
pass
@abc.abstractmethod
def get_rule_types(self, context, filters=None, fields=None,
sorts=None, limit=None,
marker=None, page_reverse=False):
pass

View File

132
neutron/objects/base.py Normal file
View File

@ -0,0 +1,132 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo_versionedobjects import base as obj_base
import six
from neutron.common import exceptions
from neutron.db import api as db_api
class NeutronObjectUpdateForbidden(exceptions.NeutronException):
message = _("Unable to update the following object fields: %(fields)s")
def get_updatable_fields(cls, fields):
fields = fields.copy()
for field in cls.fields_no_update:
if field in fields:
del fields[field]
return fields
@six.add_metaclass(abc.ABCMeta)
class NeutronObject(obj_base.VersionedObject,
obj_base.VersionedObjectDictCompat,
obj_base.ComparableVersionedObject):
def __init__(self, context=None, **kwargs):
super(NeutronObject, self).__init__(context, **kwargs)
self.obj_set_defaults()
def to_dict(self):
return dict(self.items())
@classmethod
def get_by_id(cls, context, id):
raise NotImplementedError()
@classmethod
@abc.abstractmethod
def get_objects(cls, context, **kwargs):
raise NotImplementedError()
def create(self):
raise NotImplementedError()
def update(self):
raise NotImplementedError()
def delete(self):
raise NotImplementedError()
class NeutronDbObject(NeutronObject):
# should be overridden for all persistent objects
db_model = None
synthetic_fields = []
fields_no_update = []
def from_db_object(self, *objs):
for field in self.fields:
for db_obj in objs:
if field in db_obj:
setattr(self, field, db_obj[field])
break
self.obj_reset_changes()
@classmethod
def get_by_id(cls, context, id):
db_obj = db_api.get_object(context, cls.db_model, id=id)
if db_obj:
obj = cls(context, **db_obj)
obj.obj_reset_changes()
return obj
@classmethod
def get_objects(cls, context, **kwargs):
db_objs = db_api.get_objects(context, cls.db_model, **kwargs)
objs = [cls(context, **db_obj) for db_obj in db_objs]
for obj in objs:
obj.obj_reset_changes()
return objs
def _get_changed_persistent_fields(self):
fields = self.obj_get_changes()
for field in self.synthetic_fields:
if field in fields:
del fields[field]
return fields
def _validate_changed_fields(self, fields):
fields = fields.copy()
# We won't allow id update anyway, so let's pop it out not to trigger
# update on id field touched by the consumer
fields.pop('id', None)
forbidden_updates = set(self.fields_no_update) & set(fields.keys())
if forbidden_updates:
raise NeutronObjectUpdateForbidden(fields=forbidden_updates)
return fields
def create(self):
fields = self._get_changed_persistent_fields()
db_obj = db_api.create_object(self._context, self.db_model, fields)
self.from_db_object(db_obj)
def update(self):
updates = self._get_changed_persistent_fields()
updates = self._validate_changed_fields(updates)
if updates:
db_obj = db_api.update_object(self._context, self.db_model,
self.id, updates)
self.from_db_object(self, db_obj)
def delete(self):
db_api.delete_object(self._context, self.db_model, self.id)

View File

View File

@ -0,0 +1,162 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from neutron.common import exceptions
from neutron.db import api as db_api
from neutron.db.qos import api as qos_db_api
from neutron.db.qos import models as qos_db_model
from neutron.objects import base
from neutron.objects.qos import rule as rule_obj_impl
@obj_base.VersionedObjectRegistry.register
class QosPolicy(base.NeutronDbObject):
db_model = qos_db_model.QosPolicy
port_binding_model = qos_db_model.QosPortPolicyBinding
network_binding_model = qos_db_model.QosNetworkPolicyBinding
fields = {
'id': obj_fields.UUIDField(),
'tenant_id': obj_fields.UUIDField(),
'name': obj_fields.StringField(),
'description': obj_fields.StringField(),
'shared': obj_fields.BooleanField(default=False),
'rules': obj_fields.ListOfObjectsField('QosRule', subclasses=True),
}
fields_no_update = ['id', 'tenant_id']
synthetic_fields = ['rules']
def to_dict(self):
dict_ = super(QosPolicy, self).to_dict()
if 'rules' in dict_:
dict_['rules'] = [rule.to_dict() for rule in dict_['rules']]
return dict_
def obj_load_attr(self, attrname):
if attrname != 'rules':
raise exceptions.ObjectActionError(
action='obj_load_attr', reason='unable to load %s' % attrname)
rules = rule_obj_impl.get_rules(self._context, self.id)
setattr(self, attrname, rules)
self.obj_reset_changes([attrname])
def _load_rules(self):
self.obj_load_attr('rules')
@staticmethod
def _is_policy_accessible(context, db_obj):
#TODO(QoS): Look at I3426b13eede8bfa29729cf3efea3419fb91175c4 for
# other possible solutions to this.
return (context.is_admin or
db_obj.shared or
db_obj.tenant_id == context.tenant_id)
@classmethod
def get_by_id(cls, context, id):
# We want to get the policy regardless of its tenant id. We'll make
# sure the tenant has permission to access the policy later on.
admin_context = context.elevated()
with db_api.autonested_transaction(admin_context.session):
policy_obj = super(QosPolicy, cls).get_by_id(admin_context, id)
if (not policy_obj or
not cls._is_policy_accessible(context, policy_obj)):
return
policy_obj._load_rules()
return policy_obj
@classmethod
def get_objects(cls, context, **kwargs):
# We want to get the policy regardless of its tenant id. We'll make
# sure the tenant has permission to access the policy later on.
admin_context = context.elevated()
with db_api.autonested_transaction(admin_context.session):
db_objs = db_api.get_objects(admin_context, cls.db_model, **kwargs)
objs = []
for db_obj in db_objs:
if not cls._is_policy_accessible(context, db_obj):
continue
obj = cls(context, **db_obj)
obj._load_rules()
objs.append(obj)
return objs
@classmethod
def _get_object_policy(cls, context, model, **kwargs):
with db_api.autonested_transaction(context.session):
binding_db_obj = db_api.get_object(context, model, **kwargs)
if binding_db_obj:
return cls.get_by_id(context, binding_db_obj['policy_id'])
@classmethod
def get_network_policy(cls, context, network_id):
return cls._get_object_policy(context, cls.network_binding_model,
network_id=network_id)
@classmethod
def get_port_policy(cls, context, port_id):
return cls._get_object_policy(context, cls.port_binding_model,
port_id=port_id)
# TODO(QoS): Consider extending base to trigger registered methods for us
def create(self):
with db_api.autonested_transaction(self._context.session):
super(QosPolicy, self).create()
self._load_rules()
def delete(self):
models = (
('network', self.network_binding_model),
('port', self.port_binding_model)
)
with db_api.autonested_transaction(self._context.session):
for object_type, model in models:
binding_db_obj = db_api.get_object(self._context, model,
policy_id=self.id)
if binding_db_obj:
raise exceptions.QosPolicyInUse(
policy_id=self.id,
object_type=object_type,
object_id=binding_db_obj['%s_id' % object_type])
super(QosPolicy, self).delete()
def attach_network(self, network_id):
qos_db_api.create_policy_network_binding(self._context,
policy_id=self.id,
network_id=network_id)
def attach_port(self, port_id):
qos_db_api.create_policy_port_binding(self._context,
policy_id=self.id,
port_id=port_id)
def detach_network(self, network_id):
qos_db_api.delete_policy_network_binding(self._context,
policy_id=self.id,
network_id=network_id)
def detach_port(self, port_id):
qos_db_api.delete_policy_port_binding(self._context,
policy_id=self.id,
port_id=port_id)

View File

@ -0,0 +1,71 @@
# Copyright 2015 Huawei Technologies India Pvt Ltd, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import sys
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
import six
from neutron.common import utils
from neutron.db import api as db_api
from neutron.db.qos import models as qos_db_model
from neutron.objects import base
from neutron.services.qos import qos_consts
def get_rules(context, qos_policy_id):
all_rules = []
with db_api.autonested_transaction(context.session):
for rule_type in qos_consts.VALID_RULE_TYPES:
rule_cls_name = 'Qos%sRule' % utils.camelize(rule_type)
rule_cls = getattr(sys.modules[__name__], rule_cls_name)
rules = rule_cls.get_objects(context, qos_policy_id=qos_policy_id)
all_rules.extend(rules)
return all_rules
@six.add_metaclass(abc.ABCMeta)
class QosRule(base.NeutronDbObject):
fields = {
'id': obj_fields.UUIDField(),
'qos_policy_id': obj_fields.UUIDField()
}
fields_no_update = ['id', 'qos_policy_id']
# should be redefined in subclasses
rule_type = None
def to_dict(self):
dict_ = super(QosRule, self).to_dict()
dict_['type'] = self.rule_type
return dict_
@obj_base.VersionedObjectRegistry.register
class QosBandwidthLimitRule(QosRule):
db_model = qos_db_model.QosBandwidthLimitRule
fields = {
'max_kbps': obj_fields.IntegerField(nullable=True),
'max_burst_kbps': obj_fields.IntegerField(nullable=True)
}
rule_type = qos_consts.RULE_TYPE_BANDWIDTH_LIMIT

View File

@ -0,0 +1,41 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from neutron import manager
from neutron.objects import base
from neutron.services.qos import qos_consts
class RuleTypeField(obj_fields.BaseEnumField):
def __init__(self, **kwargs):
self.AUTO_TYPE = obj_fields.Enum(
valid_values=qos_consts.VALID_RULE_TYPES)
super(RuleTypeField, self).__init__(**kwargs)
@obj_base.VersionedObjectRegistry.register
class QosRuleType(base.NeutronObject):
fields = {
'type': RuleTypeField(),
}
# we don't receive context because we don't need db access at all
@classmethod
def get_objects(cls, **kwargs):
core_plugin = manager.NeutronManager.get_plugin()
return [cls(type=type_)
for type_ in core_plugin.supported_qos_rule_types]

View File

@ -23,6 +23,7 @@ VPN = "VPN"
METERING = "METERING"
L3_ROUTER_NAT = "L3_ROUTER_NAT"
FLAVORS = "FLAVORS"
QOS = "QOS"
# Maps extension alias to service type
EXT_TO_SERVICE_MAPPING = {
@ -33,7 +34,8 @@ EXT_TO_SERVICE_MAPPING = {
'vpnaas': VPN,
'metering': METERING,
'router': L3_ROUTER_NAT,
'flavors': FLAVORS
'flavors': FLAVORS,
'qos': QOS,
}
# Service operation status constants

View File

@ -911,12 +911,14 @@ class ExtensionDriver(object):
"""
pass
@abc.abstractproperty
@property
def extension_alias(self):
"""Supported extension alias.
Return the alias identifying the core API extension supported
by this driver.
by this driver. Do not declare if API extension handling will
be left to a service plugin, and we just need to provide
core resource extension and updates.
"""
pass

View File

@ -20,6 +20,7 @@ from neutron.common import constants
from neutron.extensions import portbindings
from neutron.plugins.common import constants as p_constants
from neutron.plugins.ml2.drivers import mech_agent
from neutron.services.qos import qos_consts
LOG = log.getLogger(__name__)
@ -34,6 +35,12 @@ class LinuxbridgeMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
network.
"""
# TODO(QoS): really, there is no support for QoS in the driver. Leaving it
# here since API tests are executed against both ovs and lb drivers, and it
# effectively makes ml2 plugin return an empty list for supported rule
# types
supported_qos_rule_types = [qos_consts.RULE_TYPE_BANDWIDTH_LIMIT]
def __init__(self):
sg_enabled = securitygroups_rpc.is_firewall_enabled()
super(LinuxbridgeMechanismDriver, self).__init__(

View File

@ -122,6 +122,21 @@ class PciDeviceIPWrapper(ip_lib.IPWrapper):
raise exc.IpCommandError(dev_name=self.dev_name,
reason=str(e))
def set_vf_max_rate(self, vf_index, max_tx_rate):
"""sets vf max rate.
@param vf_index: vf index
@param max_tx_rate: vf max tx rate
"""
try:
self._as_root([], "link", ("set", self.dev_name, "vf",
str(vf_index), "rate",
str(max_tx_rate)))
except Exception as e:
LOG.exception(_LE("Failed executing ip command"))
raise exc.IpCommandError(dev_name=self.dev_name,
reason=e)
def _get_vf_link_show(self, vf_list, link_show_out):
"""Get link show output for VFs

View File

@ -100,7 +100,12 @@ agent_opts = [
"timeout won't be changed"))
]
qos_opts = [
cfg.StrOpt('agent_driver', default='ovs', help=_('QoS agent driver.')),
]
cfg.CONF.register_opts(ovs_opts, "OVS")
cfg.CONF.register_opts(agent_opts, "AGENT")
cfg.CONF.register_opts(qos_opts, "qos")
config.register_agent_state_opts_helper(cfg.CONF)

View File

@ -0,0 +1,89 @@
# Copyright (c) 2015 Openstack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from neutron.agent.common import ovs_lib
from neutron.i18n import _LE, _LW
from neutron.agent.l2.extensions import qos_agent
from neutron.plugins.ml2.drivers.openvswitch.mech_driver import (
mech_openvswitch)
LOG = logging.getLogger(__name__)
class QosOVSAgentDriver(qos_agent.QosAgentDriver):
_SUPPORTED_RULES = (
mech_openvswitch.OpenvswitchMechanismDriver.supported_qos_rule_types)
def __init__(self):
super(QosOVSAgentDriver, self).__init__()
# TODO(QoS) check if we can get this configuration
# as constructor arguments
self.br_int_name = cfg.CONF.OVS.integration_bridge
self.br_int = None
def initialize(self):
self.br_int = ovs_lib.OVSBridge(self.br_int_name)
def create(self, port, qos_policy):
self._handle_rules('create', port, qos_policy)
def update(self, port, qos_policy):
self._handle_rules('update', port, qos_policy)
def delete(self, port, qos_policy):
self._handle_rules('delete', port, qos_policy)
def _handle_rules(self, action, port, qos_policy):
for rule in qos_policy.rules:
if rule.rule_type in self._SUPPORTED_RULES:
handler_name = ("".join(("_", action, "_", rule.rule_type)))
try:
handler = getattr(self, handler_name)
handler(port, rule)
except AttributeError:
LOG.error(
_LE('Failed to locate a handler for %(rule_type) '
'rules; skipping.'), handler_name)
else:
LOG.warning(_LW('Unsupported QoS rule type for %(rule_id)s: '
'%(rule_type)s; skipping'),
{'rule_id': rule.id, 'rule_type': rule.rule_type})
def _create_bandwidth_limit(self, port, rule):
self._update_bandwidth_limit(port, rule)
def _update_bandwidth_limit(self, port, rule):
port_name = port['vif_port'].port_name
max_kbps = rule.max_kbps
max_burst_kbps = rule.max_burst_kbps
current_max_kbps, current_max_burst = (
self.br_int.get_qos_bw_limit_for_port(port_name))
if current_max_kbps is not None or current_max_burst is not None:
self.br_int.del_qos_bw_limit_for_port(port_name)
self.br_int.create_qos_bw_limit_for_port(port_name,
max_kbps,
max_burst_kbps)
def _delete_bandwidth_limit(self, port, rule):
port_name = port['vif_port'].port_name
current_max_kbps, current_max_burst = (
self.br_int.get_qos_bw_limit_for_port(port_name))
if current_max_kbps is not None or current_max_burst is not None:
self.br_int.del_qos_bw_limit_for_port(port_name)

View File

@ -30,6 +30,7 @@ from six import moves
from neutron.agent.common import ovs_lib
from neutron.agent.common import polling
from neutron.agent.common import utils
from neutron.agent.l2 import agent_extensions_manager
from neutron.agent.linux import ip_lib
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
@ -225,6 +226,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# keeps association between ports and ofports to detect ofport change
self.vifname_to_ofport_map = {}
self.setup_rpc()
self.init_agent_extensions_mgr()
self.bridge_mappings = bridge_mappings
self.setup_physical_bridges(self.bridge_mappings)
self.local_vlan_map = {}
@ -365,6 +367,12 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
consumers,
start_listening=False)
def init_agent_extensions_mgr(self):
agent_extensions_manager.register_opts(self.conf)
self.agent_extensions_mgr = (
agent_extensions_manager.AgentExtensionsManager(self.conf))
self.agent_extensions_mgr.initialize()
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in six.iteritems(self.local_vlan_map):
if vif_id in vlan_mapping.vif_ports:
@ -1246,6 +1254,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
if 'port_id' in details:
LOG.info(_LI("Port %(device)s updated. Details: %(details)s"),
{'device': device, 'details': details})
details['vif_port'] = port
need_binding = self.treat_vif_port(port, details['port_id'],
details['network_id'],
details['network_type'],
@ -1259,8 +1268,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.setup_arp_spoofing_protection(self.int_br,
port, details)
if need_binding:
details['vif_port'] = port
need_binding_devices.append(details)
self.agent_extensions_mgr.handle_port(self.context, details)
else:
LOG.warn(_LW("Device %s not defined on plugin"), device)
if (port and port.ofport != -1):

View File

@ -20,6 +20,7 @@ from neutron.common import constants
from neutron.extensions import portbindings
from neutron.plugins.common import constants as p_constants
from neutron.plugins.ml2.drivers import mech_agent
from neutron.services.qos import qos_consts
LOG = log.getLogger(__name__)
@ -34,6 +35,8 @@ class OpenvswitchMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
network.
"""
supported_qos_rule_types = [qos_consts.RULE_TYPE_BANDWIDTH_LIMIT]
def __init__(self):
sg_enabled = securitygroups_rpc.is_firewall_enabled()
vif_details = {portbindings.CAP_PORT_FILTER: sg_enabled,

View File

@ -0,0 +1,50 @@
# Copyright (c) 2015 Red Hat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from neutron.plugins.ml2 import driver_api as api
from neutron.services.qos import qos_extension
LOG = logging.getLogger(__name__)
class QosExtensionDriver(api.ExtensionDriver):
def initialize(self):
self.qos_ext_handler = qos_extension.QosResourceExtensionHandler()
LOG.debug("QosExtensionDriver initialization complete")
def process_create_network(self, context, data, result):
self.qos_ext_handler.process_resource(
context, qos_extension.NETWORK, data, result)
process_update_network = process_create_network
def process_create_port(self, context, data, result):
self.qos_ext_handler.process_resource(
context, qos_extension.PORT, data, result)
process_update_port = process_create_port
def extend_network_dict(self, session, db_data, result):
result.update(
self.qos_ext_handler.extract_resource_fields(qos_extension.NETWORK,
db_data))
def extend_port_dict(self, session, db_data, result):
result.update(
self.qos_ext_handler.extract_resource_fields(qos_extension.PORT,
db_data))

View File

@ -25,11 +25,12 @@ from neutron.extensions import multiprovidernet as mpnet
from neutron.extensions import portbindings
from neutron.extensions import providernet as provider
from neutron.extensions import vlantransparent
from neutron.i18n import _LE, _LI
from neutron.i18n import _LE, _LI, _LW
from neutron.plugins.ml2.common import exceptions as ml2_exc
from neutron.plugins.ml2 import db
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2 import models
from neutron.services.qos import qos_consts
LOG = log.getLogger(__name__)
@ -312,6 +313,40 @@ class MechanismManager(stevedore.named.NamedExtensionManager):
LOG.info(_LI("Registered mechanism drivers: %s"),
[driver.name for driver in self.ordered_mech_drivers])
@property
def supported_qos_rule_types(self):
if not self.ordered_mech_drivers:
return []
rule_types = set(qos_consts.VALID_RULE_TYPES)
# Recalculate on every call to allow drivers determine supported rule
# types dynamically
for driver in self.ordered_mech_drivers:
if hasattr(driver.obj, 'supported_qos_rule_types'):
new_rule_types = \
rule_types & set(driver.obj.supported_qos_rule_types)
dropped_rule_types = new_rule_types - rule_types
if dropped_rule_types:
LOG.info(
_LI("%(rule_types)s rule types disabled for ml2 "
"because %(driver)s does not support them"),
{'rule_types': ', '.join(dropped_rule_types),
'driver': driver.name})
rule_types = new_rule_types
else:
# at least one of drivers does not support QoS, meaning there
# are no rule types supported by all of them
LOG.warn(
_LW("%s does not support QoS; no rule types available"),
driver.name)
return []
rule_types = list(rule_types)
LOG.debug("Supported QoS rule types "
"(common subset for all mech drivers): %s", rule_types)
return rule_types
def initialize(self):
for driver in self.ordered_mech_drivers:
LOG.info(_LI("Initializing mechanism driver '%s'"), driver.name)
@ -753,9 +788,10 @@ class ExtensionManager(stevedore.named.NamedExtensionManager):
exts = []
for driver in self.ordered_ext_drivers:
alias = driver.obj.extension_alias
exts.append(alias)
LOG.info(_LI("Got %(alias)s extension from driver '%(drv)s'"),
{'alias': alias, 'drv': driver.name})
if alias:
exts.append(alias)
LOG.info(_LI("Got %(alias)s extension from driver '%(drv)s'"),
{'alias': alias, 'drv': driver.name})
return exts
def _call_on_ext_drivers(self, method_name, plugin_context, data, result):

View File

@ -31,6 +31,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import resources_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes
from neutron.callbacks import events
@ -76,6 +77,7 @@ from neutron.plugins.ml2 import managers
from neutron.plugins.ml2 import models
from neutron.plugins.ml2 import rpc
from neutron.quota import resource_registry
from neutron.services.qos import qos_consts
LOG = log.getLogger(__name__)
@ -161,7 +163,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
dvr_rpc.DVRServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
agents_db.AgentExtRpcCallback(),
metadata_rpc.MetadataRpcCallback()
metadata_rpc.MetadataRpcCallback(),
resources_rpc.ResourcesServerRpcCallback()
]
def _setup_dhcp(self):
@ -171,6 +174,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
)
self.start_periodic_dhcp_agent_status_check()
@property
def supported_qos_rule_types(self):
return self.mechanism_manager.supported_qos_rule_types
@log_helpers.log_method_call
def start_rpc_listeners(self):
"""Start the RPC loop to let the plugin communicate with agents."""
@ -623,6 +630,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def create_network(self, context, network):
result, mech_context = self._create_network_with_retries(context,
network)
self._notify_registry(
resources.NETWORK, events.AFTER_CREATE, context, result)
try:
self.mechanism_manager.create_network_postcommit(mech_context)
except ml2_exc.MechanismDriverError:
@ -635,6 +644,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def create_network_bulk(self, context, networks):
objects = self._create_bulk_ml2(attributes.NETWORK, context, networks)
for obj in objects:
self._notify_registry(resources.NETWORK,
events.AFTER_CREATE,
context,
obj)
return [obj['result'] for obj in objects]
def update_network(self, context, id, network):
@ -657,6 +672,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
original_network=original_network)
self.mechanism_manager.update_network_precommit(mech_context)
# Notifications must be sent after the above transaction is complete
self._notify_registry(
resources.NETWORK, events.AFTER_UPDATE, context, updated_network)
# TODO(apech) - handle errors raised by update_network, potentially
# by re-calling update_network with the previous attributes. For
# now the error is propogated to the caller, which is expected to
@ -1119,6 +1138,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
original_port[psec.PORTSECURITY] !=
updated_port[psec.PORTSECURITY]):
need_port_update_notify = True
# TODO(QoS): Move out to the extension framework somehow.
# Follow https://review.openstack.org/#/c/169223 for a solution.
if (qos_consts.QOS_POLICY_ID in attrs and
original_port[qos_consts.QOS_POLICY_ID] !=
updated_port[qos_consts.QOS_POLICY_ID]):
need_port_update_notify = True
if addr_pair.ADDRESS_PAIRS in attrs:
need_port_update_notify |= (
@ -1519,3 +1544,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if port:
return port.id
return device
def _notify_registry(self, resource_type, event_type, context, resource):
kwargs = {
'context': context,
resource_type: resource,
}
registry.notify(resource_type, event_type, self, **kwargs)

View File

@ -32,6 +32,7 @@ from neutron.i18n import _LE, _LW
from neutron import manager
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_tunnel
from neutron.services.qos import qos_consts
# REVISIT(kmestery): Allow the type and mechanism drivers to supply the
# mixins and eventually remove the direct dependencies on type_tunnel.
@ -108,6 +109,9 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
host,
port_context.network.current)
qos_policy_id = (port.get(qos_consts.QOS_POLICY_ID) or
port_context.network._network.get(
qos_consts.QOS_POLICY_ID))
entry = {'device': device,
'network_id': port['network_id'],
'port_id': port['id'],
@ -120,6 +124,7 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
'device_owner': port['device_owner'],
'allowed_address_pairs': port['allowed_address_pairs'],
'port_security_enabled': port.get(psec.PORTSECURITY, True),
'qos_policy_id': qos_policy_id,
'profile': port[portbindings.PROFILE]}
LOG.debug("Returning: %s", entry)
return entry

View File

View File

@ -0,0 +1,74 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from neutron.i18n import _LI
from neutron import manager
QOS_DRIVER_NAMESPACE = 'neutron.qos.notification_drivers'
QOS_PLUGIN_OPTS = [
cfg.ListOpt('notification_drivers',
default='message_queue',
help=_('Drivers list to use to send the update notification')),
]
cfg.CONF.register_opts(QOS_PLUGIN_OPTS, "qos")
LOG = logging.getLogger(__name__)
class QosServiceNotificationDriverManager(object):
def __init__(self):
self.notification_drivers = []
self._load_drivers(cfg.CONF.qos.notification_drivers)
def update_policy(self, qos_policy):
for driver in self.notification_drivers:
driver.update_policy(qos_policy)
def delete_policy(self, qos_policy):
for driver in self.notification_drivers:
driver.delete_policy(qos_policy)
def create_policy(self, qos_policy):
for driver in self.notification_drivers:
driver.create_policy(qos_policy)
def _load_drivers(self, notification_drivers):
"""Load all the instances of the configured QoS notification drivers
:param notification_drivers: comma separated string
"""
if not notification_drivers:
raise SystemExit(_('A QoS driver must be specified'))
LOG.debug("Loading QoS notification drivers: %s", notification_drivers)
for notification_driver in notification_drivers:
driver_ins = self._load_driver_instance(notification_driver)
self.notification_drivers.append(driver_ins)
def _load_driver_instance(self, notification_driver):
"""Returns an instance of the configured QoS notification driver
:returns: An instance of Driver for the QoS notification
"""
mgr = manager.NeutronManager
driver = mgr.load_class_for_provider(QOS_DRIVER_NAMESPACE,
notification_driver)
driver_instance = driver()
LOG.info(
_LI("Loading %(name)s (%(description)s) notification driver "
"for QoS plugin"),
{"name": notification_driver,
"description": driver_instance.get_description()})
return driver_instance

View File

@ -0,0 +1,71 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import registry as rpc_registry
from neutron.api.rpc.callbacks import resources
from neutron.i18n import _LW
from neutron.objects.qos import policy as policy_object
from neutron.services.qos.notification_drivers import qos_base
LOG = logging.getLogger(__name__)
def _get_qos_policy_cb(resource, policy_id, **kwargs):
context = kwargs.get('context')
if context is None:
LOG.warning(_LW(
'Received %(resource)s %(policy_id)s without context'),
{'resource': resource, 'policy_id': policy_id}
)
return
policy = policy_object.QosPolicy.get_by_id(context, policy_id)
return policy
class RpcQosServiceNotificationDriver(
qos_base.QosServiceNotificationDriverBase):
"""RPC message queue service notification driver for QoS."""
def __init__(self):
rpc_registry.register_provider(
_get_qos_policy_cb,
resources.QOS_POLICY)
def get_description(self):
return "Message queue updates"
def create_policy(self, policy):
#No need to update agents on create
pass
def update_policy(self, policy):
# TODO(QoS): this is temporary until we get notify() implemented
try:
rpc_registry.notify(resources.QOS_POLICY,
events.UPDATED,
policy)
except NotImplementedError:
pass
def delete_policy(self, policy):
# TODO(QoS): this is temporary until we get notify() implemented
try:
rpc_registry.notify(resources.QOS_POLICY,
events.DELETED,
policy)
except NotImplementedError:
pass

View File

@ -0,0 +1,42 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class QosServiceNotificationDriverBase(object):
"""QoS service notification driver base class."""
@abc.abstractmethod
def get_description(self):
"""Get the notification driver description.
"""
@abc.abstractmethod
def create_policy(self, policy):
"""Create the QoS policy."""
@abc.abstractmethod
def update_policy(self, policy):
"""Update the QoS policy.
Apply changes to the QoS policy.
"""
@abc.abstractmethod
def delete_policy(self, policy):
"""Delete the QoS policy.
Remove all rules for this policy and free up all the resources.
"""

View File

@ -0,0 +1,19 @@
# Copyright (c) 2015 Red Hat Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth_limit'
VALID_RULE_TYPES = [RULE_TYPE_BANDWIDTH_LIMIT]
QOS_POLICY_ID = 'qos_policy_id'

View File

@ -0,0 +1,87 @@
# Copyright (c) 2015 Red Hat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db import api as db_api
from neutron import manager
from neutron.objects.qos import policy as policy_object
from neutron.plugins.common import constants as plugin_constants
from neutron.services.qos import qos_consts
NETWORK = 'network'
PORT = 'port'
# TODO(QoS): Add interface to define how this should look like
class QosResourceExtensionHandler(object):
@property
def plugin_loaded(self):
if not hasattr(self, '_plugin_loaded'):
service_plugins = manager.NeutronManager.get_service_plugins()
self._plugin_loaded = plugin_constants.QOS in service_plugins
return self._plugin_loaded
def _get_policy_obj(self, context, policy_id):
return policy_object.QosPolicy.get_by_id(context, policy_id)
def _update_port_policy(self, context, port, port_changes):
old_policy = policy_object.QosPolicy.get_port_policy(
context, port['id'])
if old_policy:
old_policy.detach_port(port['id'])
qos_policy_id = port_changes.get(qos_consts.QOS_POLICY_ID)
if qos_policy_id is not None:
policy = self._get_policy_obj(context, qos_policy_id)
#TODO(QoS): If the policy doesn't exist (or if it is not shared and
# the tenant id doesn't match the context's), this will
# raise an exception (policy is None).
policy.attach_port(port['id'])
port[qos_consts.QOS_POLICY_ID] = qos_policy_id
def _update_network_policy(self, context, network, network_changes):
old_policy = policy_object.QosPolicy.get_network_policy(
context, network['id'])
if old_policy:
old_policy.detach_network(network['id'])
qos_policy_id = network_changes.get(qos_consts.QOS_POLICY_ID)
if qos_policy_id:
policy = self._get_policy_obj(context, qos_policy_id)
#TODO(QoS): If the policy doesn't exist (or if it is not shared and
# the tenant id doesn't match the context's), this will
# raise an exception (policy is None).
policy.attach_network(network['id'])
network[qos_consts.QOS_POLICY_ID] = qos_policy_id
def _exec(self, method_name, context, kwargs):
with db_api.autonested_transaction(context.session):
return getattr(self, method_name)(context=context, **kwargs)
def process_resource(self, context, resource_type, requested_resource,
actual_resource):
if (qos_consts.QOS_POLICY_ID in requested_resource and
self.plugin_loaded):
self._exec('_update_%s_policy' % resource_type, context,
{resource_type: actual_resource,
"%s_changes" % resource_type: requested_resource})
def extract_resource_fields(self, resource_type, resource):
if not self.plugin_loaded:
return {}
binding = resource['qos_policy_binding']
qos_policy_id = binding['policy_id'] if binding else None
return {qos_consts.QOS_POLICY_ID: qos_policy_id}

View File

@ -0,0 +1,148 @@
# Copyright (c) 2015 Red Hat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from neutron.common import exceptions as n_exc
from neutron.db import db_base_plugin_common
from neutron.extensions import qos
from neutron.objects.qos import policy as policy_object
from neutron.objects.qos import rule as rule_object
from neutron.objects.qos import rule_type as rule_type_object
from neutron.services.qos.notification_drivers import manager as driver_mgr
LOG = logging.getLogger(__name__)
class QoSPlugin(qos.QoSPluginBase):
"""Implementation of the Neutron QoS Service Plugin.
This class implements a Quality of Service plugin that
provides quality of service parameters over ports and
networks.
"""
supported_extension_aliases = ['qos']
def __init__(self):
super(QoSPlugin, self).__init__()
self.notification_driver_manager = (
driver_mgr.QosServiceNotificationDriverManager())
@db_base_plugin_common.convert_result_to_dict
def create_policy(self, context, policy):
policy = policy_object.QosPolicy(context, **policy['policy'])
policy.create()
self.notification_driver_manager.create_policy(policy)
return policy
@db_base_plugin_common.convert_result_to_dict
def update_policy(self, context, policy_id, policy):
policy = policy_object.QosPolicy(context, **policy['policy'])
policy.id = policy_id
policy.update()
self.notification_driver_manager.update_policy(policy)
return policy
def delete_policy(self, context, policy_id):
policy = policy_object.QosPolicy(context)
policy.id = policy_id
self.notification_driver_manager.delete_policy(policy)
policy.delete()
def _get_policy_obj(self, context, policy_id):
obj = policy_object.QosPolicy.get_by_id(context, policy_id)
if obj is None:
raise n_exc.QosPolicyNotFound(policy_id=policy_id)
return obj
@db_base_plugin_common.filter_fields
@db_base_plugin_common.convert_result_to_dict
def get_policy(self, context, policy_id, fields=None):
return self._get_policy_obj(context, policy_id)
@db_base_plugin_common.filter_fields
@db_base_plugin_common.convert_result_to_dict
def get_policies(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
#TODO(QoS): Support all the optional parameters
return policy_object.QosPolicy.get_objects(context)
#TODO(QoS): Consider adding a proxy catch-all for rules, so
# we capture the API function call, and just pass
# the rule type as a parameter removing lots of
# future code duplication when we have more rules.
@db_base_plugin_common.convert_result_to_dict
def create_policy_bandwidth_limit_rule(self, context, policy_id,
bandwidth_limit_rule):
# validate that we have access to the policy
policy = self._get_policy_obj(context, policy_id)
rule = rule_object.QosBandwidthLimitRule(
context, qos_policy_id=policy_id,
**bandwidth_limit_rule['bandwidth_limit_rule'])
rule.create()
self.notification_driver_manager.update_policy(policy)
return rule
@db_base_plugin_common.convert_result_to_dict
def update_policy_bandwidth_limit_rule(self, context, rule_id, policy_id,
bandwidth_limit_rule):
# validate that we have access to the policy
policy = self._get_policy_obj(context, policy_id)
rule = rule_object.QosBandwidthLimitRule(
context, **bandwidth_limit_rule['bandwidth_limit_rule'])
rule.id = rule_id
rule.update()
self.notification_driver_manager.update_policy(policy)
return rule
def delete_policy_bandwidth_limit_rule(self, context, rule_id, policy_id):
# validate that we have access to the policy
policy = self._get_policy_obj(context, policy_id)
rule = rule_object.QosBandwidthLimitRule(context)
rule.id = rule_id
rule.delete()
self.notification_driver_manager.update_policy(policy)
@db_base_plugin_common.filter_fields
@db_base_plugin_common.convert_result_to_dict
def get_policy_bandwidth_limit_rule(self, context, rule_id,
policy_id, fields=None):
# validate that we have access to the policy
self._get_policy_obj(context, policy_id)
rule = rule_object.QosBandwidthLimitRule.get_by_id(context, rule_id)
if not rule:
raise n_exc.QosRuleNotFound(policy_id=policy_id, rule_id=rule_id)
return rule
@db_base_plugin_common.filter_fields
@db_base_plugin_common.convert_result_to_dict
def get_policy_bandwidth_limit_rules(self, context, policy_id,
filters=None, fields=None,
sorts=None, limit=None,
marker=None, page_reverse=False):
#TODO(QoS): Support all the optional parameters
# validate that we have access to the policy
self._get_policy_obj(context, policy_id)
return rule_object.QosBandwidthLimitRule.get_objects(context)
@db_base_plugin_common.filter_fields
@db_base_plugin_common.convert_result_to_dict
def get_rule_types(self, context, filters=None, fields=None,
sorts=None, limit=None,
marker=None, page_reverse=False):
return rule_type_object.QosRuleType.get_objects()

View File

@ -85,6 +85,8 @@ class BaseNetworkTest(neutron.tests.tempest.test.BaseTestCase):
cls.metering_label_rules = []
cls.fw_rules = []
cls.fw_policies = []
cls.qos_rules = []
cls.qos_policies = []
cls.ethertype = "IPv" + str(cls._ip_version)
cls.address_scopes = []
cls.admin_address_scopes = []
@ -100,6 +102,14 @@ class BaseNetworkTest(neutron.tests.tempest.test.BaseTestCase):
for fw_rule in cls.fw_rules:
cls._try_delete_resource(cls.client.delete_firewall_rule,
fw_rule['id'])
# Clean up QoS policies
for qos_policy in cls.qos_policies:
cls._try_delete_resource(cls.admin_client.delete_qos_policy,
qos_policy['id'])
# Clean up QoS rules
for qos_rule in cls.qos_rules:
cls._try_delete_resource(cls.admin_client.delete_qos_rule,
qos_rule['id'])
# Clean up floating IPs
for floating_ip in cls.floating_ips:
cls._try_delete_resource(cls.client.delete_floatingip,
@ -206,9 +216,9 @@ class BaseNetworkTest(neutron.tests.tempest.test.BaseTestCase):
return network
@classmethod
def create_shared_network(cls, network_name=None):
def create_shared_network(cls, network_name=None, **post_body):
network_name = network_name or data_utils.rand_name('sharednetwork-')
post_body = {'name': network_name, 'shared': True}
post_body.update({'name': network_name, 'shared': True})
body = cls.admin_client.create_network(**post_body)
network = body['network']
cls.shared_networks.append(network)
@ -398,6 +408,25 @@ class BaseNetworkTest(neutron.tests.tempest.test.BaseTestCase):
cls.fw_policies.append(fw_policy)
return fw_policy
@classmethod
def create_qos_policy(cls, name, description, shared, tenant_id=None):
"""Wrapper utility that returns a test QoS policy."""
body = cls.admin_client.create_qos_policy(
name, description, shared, tenant_id)
qos_policy = body['policy']
cls.qos_policies.append(qos_policy)
return qos_policy
@classmethod
def create_qos_bandwidth_limit_rule(cls, policy_id,
max_kbps, max_burst_kbps):
"""Wrapper utility that returns a test QoS bandwidth limit rule."""
body = cls.admin_client.create_bandwidth_limit_rule(
policy_id, max_kbps, max_burst_kbps)
qos_rule = body['bandwidth_limit_rule']
cls.qos_rules.append(qos_rule)
return qos_rule
@classmethod
def delete_router(cls, router):
body = cls.client.list_router_interfaces(router['id'])

View File

@ -0,0 +1,364 @@
# Copyright 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest_lib import exceptions
from neutron.services.qos import qos_consts
from neutron.tests.api import base
from neutron.tests.tempest import config
from neutron.tests.tempest import test
CONF = config.CONF
class QosTestJSON(base.BaseAdminNetworkTest):
@classmethod
def resource_setup(cls):
super(QosTestJSON, cls).resource_setup()
if not test.is_extension_enabled('qos', 'network'):
msg = "qos extension not enabled."
raise cls.skipException(msg)
@test.attr(type='smoke')
@test.idempotent_id('108fbdf7-3463-4e47-9871-d07f3dcf5bbb')
def test_create_policy(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy desc',
shared=False)
# Test 'show policy'
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
retrieved_policy = retrieved_policy['policy']
self.assertEqual('test-policy', retrieved_policy['name'])
self.assertEqual('test policy desc', retrieved_policy['description'])
self.assertFalse(retrieved_policy['shared'])
# Test 'list policies'
policies = self.admin_client.list_qos_policies()['policies']
policies_ids = [p['id'] for p in policies]
self.assertIn(policy['id'], policies_ids)
@test.attr(type='smoke')
@test.idempotent_id('8e88a54b-f0b2-4b7d-b061-a15d93c2c7d6')
def test_policy_update(self):
policy = self.create_qos_policy(name='test-policy',
description='',
shared=False)
self.admin_client.update_qos_policy(policy['id'],
description='test policy desc',
shared=True)
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
retrieved_policy = retrieved_policy['policy']
self.assertEqual('test policy desc', retrieved_policy['description'])
self.assertTrue(retrieved_policy['shared'])
self.assertEqual([], retrieved_policy['rules'])
@test.attr(type='smoke')
@test.idempotent_id('1cb42653-54bd-4a9a-b888-c55e18199201')
def test_delete_policy(self):
policy = self.admin_client.create_qos_policy(
'test-policy', 'desc', True)['policy']
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
retrieved_policy = retrieved_policy['policy']
self.assertEqual('test-policy', retrieved_policy['name'])
self.admin_client.delete_qos_policy(policy['id'])
self.assertRaises(exceptions.NotFound,
self.admin_client.show_qos_policy, policy['id'])
@test.attr(type='smoke')
@test.idempotent_id('cf776f77-8d3d-49f2-8572-12d6a1557224')
def test_list_rule_types(self):
# List supported rule types
expected_rule_types = qos_consts.VALID_RULE_TYPES
expected_rule_details = ['type']
rule_types = self.admin_client.list_qos_rule_types()
actual_list_rule_types = rule_types['rule_types']
actual_rule_types = [rule['type'] for rule in actual_list_rule_types]
# Verify that only required fields present in rule details
for rule in actual_list_rule_types:
self.assertEqual(tuple(rule.keys()), tuple(expected_rule_details))
# Verify if expected rules are present in the actual rules list
for rule in expected_rule_types:
self.assertIn(rule, actual_rule_types)
def _disassociate_network(self, client, network_id):
client.update_network(network_id, qos_policy_id=None)
updated_network = self.admin_client.show_network(network_id)
self.assertIsNone(updated_network['network']['qos_policy_id'])
@test.attr(type='smoke')
@test.idempotent_id('65b9ef75-1911-406a-bbdb-ca1d68d528b0')
def test_policy_association_with_admin_network(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
network = self.create_shared_network('test network',
qos_policy_id=policy['id'])
retrieved_network = self.admin_client.show_network(network['id'])
self.assertEqual(
policy['id'], retrieved_network['network']['qos_policy_id'])
self._disassociate_network(self.admin_client, network['id'])
@test.attr(type='smoke')
@test.idempotent_id('1738de5d-0476-4163-9022-5e1b548c208e')
def test_policy_association_with_tenant_network(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=True)
network = self.create_network('test network',
qos_policy_id=policy['id'])
retrieved_network = self.admin_client.show_network(network['id'])
self.assertEqual(
policy['id'], retrieved_network['network']['qos_policy_id'])
self._disassociate_network(self.client, network['id'])
@test.attr(type='smoke')
@test.idempotent_id('1aa55a79-324f-47d9-a076-894a8fc2448b')
def test_policy_association_with_network_non_shared_policy(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
#TODO(QoS): This currently raises an exception on the server side. See
# services/qos/qos_extension.py for comments on this subject.
network = self.create_network('test network',
qos_policy_id=policy['id'])
retrieved_network = self.admin_client.show_network(network['id'])
self.assertIsNone(retrieved_network['network']['qos_policy_id'])
@test.attr(type='smoke')
@test.idempotent_id('09a9392c-1359-4cbb-989f-fb768e5834a8')
def test_policy_update_association_with_admin_network(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
network = self.create_shared_network('test network')
retrieved_network = self.admin_client.show_network(network['id'])
self.assertIsNone(retrieved_network['network']['qos_policy_id'])
self.admin_client.update_network(network['id'],
qos_policy_id=policy['id'])
retrieved_network = self.admin_client.show_network(network['id'])
self.assertEqual(
policy['id'], retrieved_network['network']['qos_policy_id'])
self._disassociate_network(self.admin_client, network['id'])
def _disassociate_port(self, port_id):
self.client.update_port(port_id, qos_policy_id=None)
updated_port = self.admin_client.show_port(port_id)
self.assertIsNone(updated_port['port']['qos_policy_id'])
@test.attr(type='smoke')
@test.idempotent_id('98fcd95e-84cf-4746-860e-44692e674f2e')
def test_policy_association_with_port_shared_policy(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=True)
network = self.create_shared_network('test network')
port = self.create_port(network, qos_policy_id=policy['id'])
retrieved_port = self.admin_client.show_port(port['id'])
self.assertEqual(
policy['id'], retrieved_port['port']['qos_policy_id'])
self._disassociate_port(port['id'])
@test.attr(type='smoke')
@test.idempotent_id('f53d961c-9fe5-4422-8b66-7add972c6031')
def test_policy_association_with_port_non_shared_policy(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
network = self.create_shared_network('test network')
#TODO(QoS): This currently raises an exception on the server side. See
# services/qos/qos_extension.py for comments on this subject.
port = self.create_port(network, qos_policy_id=policy['id'])
retrieved_port = self.admin_client.show_port(port['id'])
self.assertIsNone(retrieved_port['port']['qos_policy_id'])
@test.attr(type='smoke')
@test.idempotent_id('f8163237-fba9-4db5-9526-bad6d2343c76')
def test_policy_update_association_with_port_shared_policy(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=True)
network = self.create_shared_network('test network')
port = self.create_port(network)
retrieved_port = self.admin_client.show_port(port['id'])
self.assertIsNone(retrieved_port['port']['qos_policy_id'])
self.client.update_port(port['id'], qos_policy_id=policy['id'])
retrieved_port = self.admin_client.show_port(port['id'])
self.assertEqual(
policy['id'], retrieved_port['port']['qos_policy_id'])
self._disassociate_port(port['id'])
@test.attr(type='smoke')
@test.idempotent_id('18163237-8ba9-4db5-9525-bad6d2343c75')
def test_delete_not_allowed_if_policy_in_use_by_network(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=True)
network = self.create_shared_network(
'test network', qos_policy_id=policy['id'])
self.assertRaises(
exceptions.Conflict,
self.admin_client.delete_qos_policy, policy['id'])
self._disassociate_network(self.admin_client, network['id'])
self.admin_client.delete_qos_policy(policy['id'])
@test.attr(type='smoke')
@test.idempotent_id('24153230-84a9-4dd5-9525-bad6d2343c75')
def test_delete_not_allowed_if_policy_in_use_by_port(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=True)
network = self.create_shared_network('test network')
port = self.create_port(network, qos_policy_id=policy['id'])
self.assertRaises(
exceptions.Conflict,
self.admin_client.delete_qos_policy, policy['id'])
self._disassociate_port(port['id'])
self.admin_client.delete_qos_policy(policy['id'])
class QosBandwidthLimitRuleTestJSON(base.BaseAdminNetworkTest):
@classmethod
def resource_setup(cls):
super(QosBandwidthLimitRuleTestJSON, cls).resource_setup()
if not test.is_extension_enabled('qos', 'network'):
msg = "qos extension not enabled."
raise cls.skipException(msg)
@test.attr(type='smoke')
@test.idempotent_id('8a59b00b-3e9c-4787-92f8-93a5cdf5e378')
def test_rule_create(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
max_kbps=200,
max_burst_kbps=1337)
# Test 'show rule'
retrieved_rule = self.admin_client.show_bandwidth_limit_rule(
policy['id'], rule['id'])
retrieved_rule = retrieved_rule['bandwidth_limit_rule']
self.assertEqual(rule['id'], retrieved_rule['id'])
self.assertEqual(200, retrieved_rule['max_kbps'])
self.assertEqual(1337, retrieved_rule['max_burst_kbps'])
# Test 'list rules'
rules = self.admin_client.list_bandwidth_limit_rules(policy['id'])
rules = rules['bandwidth_limit_rules']
rules_ids = [r['id'] for r in rules]
self.assertIn(rule['id'], rules_ids)
# Test 'show policy'
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
policy_rules = retrieved_policy['policy']['rules']
self.assertEqual(1, len(policy_rules))
self.assertEqual(rule['id'], policy_rules[0]['id'])
self.assertEqual(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
policy_rules[0]['type'])
@test.attr(type='smoke')
@test.idempotent_id('8a59b00b-ab01-4787-92f8-93a5cdf5e378')
def test_rule_create_fail_for_the_same_type(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
max_kbps=200,
max_burst_kbps=1337)
self.assertRaises(exceptions.ServerFault,
self.create_qos_bandwidth_limit_rule,
policy_id=policy['id'],
max_kbps=201, max_burst_kbps=1338)
@test.attr(type='smoke')
@test.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3')
def test_rule_update(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
max_kbps=1,
max_burst_kbps=1)
self.admin_client.update_bandwidth_limit_rule(policy['id'],
rule['id'],
max_kbps=200,
max_burst_kbps=1337)
retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
policy['id'], rule['id'])
retrieved_policy = retrieved_policy['bandwidth_limit_rule']
self.assertEqual(200, retrieved_policy['max_kbps'])
self.assertEqual(1337, retrieved_policy['max_burst_kbps'])
@test.attr(type='smoke')
@test.idempotent_id('67ee6efd-7b33-4a68-927d-275b4f8ba958')
def test_rule_delete(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
rule = self.admin_client.create_bandwidth_limit_rule(
policy['id'], 200, 1337)['bandwidth_limit_rule']
retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
policy['id'], rule['id'])
retrieved_policy = retrieved_policy['bandwidth_limit_rule']
self.assertEqual(rule['id'], retrieved_policy['id'])
self.admin_client.delete_bandwidth_limit_rule(policy['id'], rule['id'])
self.assertRaises(exceptions.NotFound,
self.admin_client.show_bandwidth_limit_rule,
policy['id'], rule['id'])
@test.attr(type='smoke')
@test.idempotent_id('f211222c-5808-46cb-a961-983bbab6b852')
def test_rule_create_rule_nonexistent_policy(self):
self.assertRaises(
exceptions.NotFound,
self.create_qos_bandwidth_limit_rule,
'policy', 200, 1337)
@test.attr(type='smoke')
@test.idempotent_id('3ba4abf9-7976-4eaf-a5d0-a934a6e09b2d')
def test_rule_association_nonshared_policy(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False,
tenant_id='tenant-id')
self.assertRaises(
exceptions.NotFound,
self.client.create_bandwidth_limit_rule,
policy['id'], 200, 1337)

View File

@ -39,12 +39,14 @@
"get_network:provider:physical_network": "rule:admin_only",
"get_network:provider:segmentation_id": "rule:admin_only",
"get_network:queue_id": "rule:admin_only",
"get_network:qos_policy_id": "rule:admin_only",
"create_network:shared": "rule:admin_only",
"create_network:router:external": "rule:admin_only",
"create_network:segments": "rule:admin_only",
"create_network:provider:network_type": "rule:admin_only",
"create_network:provider:physical_network": "rule:admin_only",
"create_network:provider:segmentation_id": "rule:admin_only",
"create_network:qos_policy_id": "rule:admin_only",
"update_network": "rule:admin_or_owner",
"update_network:segments": "rule:admin_only",
"update_network:shared": "rule:admin_only",
@ -52,6 +54,7 @@
"update_network:provider:physical_network": "rule:admin_only",
"update_network:provider:segmentation_id": "rule:admin_only",
"update_network:router:external": "rule:admin_only",
"update_network:qos_policy_id": "rule:admin_only",
"delete_network": "rule:admin_or_owner",
"create_port": "",
@ -62,12 +65,14 @@
"create_port:binding:profile": "rule:admin_only",
"create_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"create_port:allowed_address_pairs": "rule:admin_or_network_owner",
"create_port:qos_policy_id": "rule:admin_only",
"get_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_port:queue_id": "rule:admin_only",
"get_port:binding:vif_type": "rule:admin_only",
"get_port:binding:vif_details": "rule:admin_only",
"get_port:binding:host_id": "rule:admin_only",
"get_port:binding:profile": "rule:admin_only",
"get_port:qos_policy_id": "rule:admin_only",
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
"update_port:mac_address": "rule:admin_only or rule:context_is_advsvc",
"update_port:fixed_ips": "rule:admin_or_network_owner or rule:context_is_advsvc",
@ -76,6 +81,7 @@
"update_port:binding:profile": "rule:admin_only",
"update_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"update_port:allowed_address_pairs": "rule:admin_or_network_owner",
"update_port:qos_policy_id": "rule:admin_only",
"delete_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_router:ha": "rule:admin_only",

View File

@ -310,6 +310,17 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
controller,
'connection_mode'))
def test_qos_bw_limit(self):
port_name, _ = self.create_ovs_port()
self.br.create_qos_bw_limit_for_port(port_name, 700, 70)
max_rate, burst = self.br.get_qos_bw_limit_for_port(port_name)
self.assertEqual(700, max_rate)
self.assertEqual(70, burst)
self.br.del_qos_bw_limit_for_port(port_name)
max_rate, burst = self.br.get_qos_bw_limit_for_port(port_name)
self.assertIsNone(max_rate)
self.assertIsNone(burst)
class OVSLibTestCase(base.BaseOVSLinuxTestCase):

View File

@ -65,7 +65,10 @@ class NetworkClientJSON(service_client.ServiceClient):
'metering_label_rules': 'metering',
'firewall_rules': 'fw',
'firewall_policies': 'fw',
'firewalls': 'fw'
'firewalls': 'fw',
'policies': 'qos',
'bandwidth_limit_rules': 'qos',
'rule_types': 'qos',
}
service_prefix = service_resource_prefix_map.get(
plural_name)
@ -90,7 +93,8 @@ class NetworkClientJSON(service_client.ServiceClient):
'ikepolicy': 'ikepolicies',
'ipsec_site_connection': 'ipsec-site-connections',
'quotas': 'quotas',
'firewall_policy': 'firewall_policies'
'firewall_policy': 'firewall_policies',
'qos_policy': 'policies'
}
return resource_plural_map.get(resource_name, resource_name + 's')
@ -620,3 +624,84 @@ class NetworkClientJSON(service_client.ServiceClient):
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def list_qos_policies(self):
uri = '%s/qos/policies' % self.uri_prefix
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def create_qos_policy(self, name, description, shared, tenant_id=None):
uri = '%s/qos/policies' % self.uri_prefix
post_data = {'policy': {
'name': name,
'description': description,
'shared': shared
}}
if tenant_id is not None:
post_data['policy']['tenant_id'] = tenant_id
resp, body = self.post(uri, self.serialize(post_data))
body = self.deserialize_single(body)
self.expected_success(201, resp.status)
return service_client.ResponseBody(resp, body)
def update_qos_policy(self, policy_id, **kwargs):
uri = '%s/qos/policies/%s' % (self.uri_prefix, policy_id)
post_data = self.serialize({'policy': kwargs})
resp, body = self.put(uri, post_data)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def create_bandwidth_limit_rule(self, policy_id, max_kbps, max_burst_kbps):
uri = '%s/qos/policies/%s/bandwidth_limit_rules' % (
self.uri_prefix, policy_id)
post_data = self.serialize(
{'bandwidth_limit_rule': {
'max_kbps': max_kbps,
'max_burst_kbps': max_burst_kbps}
})
resp, body = self.post(uri, post_data)
self.expected_success(201, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def list_bandwidth_limit_rules(self, policy_id):
uri = '%s/qos/policies/%s/bandwidth_limit_rules' % (
self.uri_prefix, policy_id)
resp, body = self.get(uri)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def show_bandwidth_limit_rule(self, policy_id, rule_id):
uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % (
self.uri_prefix, policy_id, rule_id)
resp, body = self.get(uri)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def update_bandwidth_limit_rule(self, policy_id, rule_id, **kwargs):
uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % (
self.uri_prefix, policy_id, rule_id)
post_data = {'bandwidth_limit_rule': kwargs}
resp, body = self.put(uri, json.dumps(post_data))
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def delete_bandwidth_limit_rule(self, policy_id, rule_id):
uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % (
self.uri_prefix, policy_id, rule_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def list_qos_rule_types(self):
uri = '%s/qos/rule-types' % self.uri_prefix
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)

View File

View File

@ -0,0 +1,91 @@
# Copyright (c) 2015 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_utils import uuidutils
from neutron.agent.l2.extensions import qos_agent
from neutron.api.rpc.callbacks import resources
from neutron import context
from neutron.tests import base
# This is a minimalistic mock of rules to be passed/checked around
# which should be exteneded as needed to make real rules
TEST_GET_INFO_RULES = ['rule1', 'rule2']
class QosAgentExtensionTestCase(base.BaseTestCase):
def setUp(self):
super(QosAgentExtensionTestCase, self).setUp()
self.qos_agent = qos_agent.QosAgentExtension()
self.context = context.get_admin_context()
# Don't rely on used driver
mock.patch(
'neutron.manager.NeutronManager.load_class_for_provider',
return_value=lambda: mock.Mock(spec=qos_agent.QosAgentDriver)
).start()
self.qos_agent.initialize()
self._create_fake_resource_rpc()
def _create_fake_resource_rpc(self):
self.get_info_mock = mock.Mock(return_value=TEST_GET_INFO_RULES)
self.qos_agent.resource_rpc.get_info = self.get_info_mock
def _create_test_port_dict(self):
return {'port_id': uuidutils.generate_uuid(),
'qos_policy_id': uuidutils.generate_uuid()}
def test_handle_port_with_no_policy(self):
port = self._create_test_port_dict()
del port['qos_policy_id']
self.qos_agent._process_rules_updates = mock.Mock()
self.qos_agent.handle_port(self.context, port)
self.assertFalse(self.qos_agent._process_rules_updates.called)
def test_handle_unknown_port(self):
port = self._create_test_port_dict()
qos_policy_id = port['qos_policy_id']
port_id = port['port_id']
self.qos_agent.handle_port(self.context, port)
# we make sure the underlaying qos driver is called with the
# right parameters
self.qos_agent.qos_driver.create.assert_called_once_with(
port, TEST_GET_INFO_RULES)
self.assertEqual(port,
self.qos_agent.qos_policy_ports[qos_policy_id][port_id])
self.assertTrue(port_id in self.qos_agent.known_ports)
def test_handle_known_port(self):
port_obj1 = self._create_test_port_dict()
port_obj2 = dict(port_obj1)
self.qos_agent.handle_port(self.context, port_obj1)
self.qos_agent.qos_driver.reset_mock()
self.qos_agent.handle_port(self.context, port_obj2)
self.assertFalse(self.qos_agent.qos_driver.create.called)
def test_handle_known_port_change_policy_id(self):
port = self._create_test_port_dict()
self.qos_agent.handle_port(self.context, port)
self.qos_agent.resource_rpc.get_info.reset_mock()
port['qos_policy_id'] = uuidutils.generate_uuid()
self.qos_agent.handle_port(self.context, port)
self.get_info_mock.assert_called_once_with(
self.context, resources.QOS_POLICY,
port['qos_policy_id'])
#TODO(QoS): handle qos_driver.update call check when
# we do that

View File

@ -0,0 +1,44 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from neutron.agent.l2 import agent_extensions_manager
from neutron.tests import base
class TestAgentExtensionsManager(base.BaseTestCase):
def setUp(self):
super(TestAgentExtensionsManager, self).setUp()
mock.patch('neutron.agent.l2.extensions.qos_agent.QosAgentExtension',
autospec=True).start()
conf = cfg.CONF
agent_extensions_manager.register_opts(conf)
cfg.CONF.set_override('extensions', ['qos'], 'agent')
self.manager = agent_extensions_manager.AgentExtensionsManager(conf)
def _get_extension(self):
return self.manager.extensions[0].obj
def test_initialize(self):
self.manager.initialize()
ext = self._get_extension()
self.assertTrue(ext.initialize.called)
def test_handle_port(self):
context = object()
data = object()
self.manager.handle_port(context, data)
ext = self._get_extension()
ext.handle_port.assert_called_once_with(context, data)

View File

@ -0,0 +1,63 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.api.rpc.callbacks import registry
from neutron.api.rpc.callbacks import resource_manager
from neutron.api.rpc.callbacks import resources
from neutron.objects.qos import policy
from neutron.tests import base
class GetInfoTestCase(base.BaseTestCase):
def setUp(self):
super(GetInfoTestCase, self).setUp()
mgr = resource_manager.ResourcesCallbacksManager()
mgr_p = mock.patch.object(
registry, '_get_resources_callback_manager', return_value=mgr)
mgr_p.start()
def test_returns_callback_result(self):
policy_obj = policy.QosPolicy(context=None)
def _fake_policy_cb(*args, **kwargs):
return policy_obj
registry.register_provider(_fake_policy_cb, resources.QOS_POLICY)
self.assertEqual(policy_obj,
registry.get_info(resources.QOS_POLICY, 'fake_id'))
def test_does_not_raise_on_none(self):
def _wrong_type_cb(*args, **kwargs):
pass
registry.register_provider(_wrong_type_cb, resources.QOS_POLICY)
obj = registry.get_info(resources.QOS_POLICY, 'fake_id')
self.assertIsNone(obj)
def test_raises_on_wrong_object_type(self):
def _wrong_type_cb(*args, **kwargs):
return object()
registry.register_provider(_wrong_type_cb, resources.QOS_POLICY)
self.assertRaises(
registry.CallbackReturnedWrongObjectType,
registry.get_info, resources.QOS_POLICY, 'fake_id')
def test_raises_on_callback_not_found(self):
self.assertRaises(
registry.CallbackNotFound,
registry.get_info, resources.QOS_POLICY, 'fake_id')

View File

@ -0,0 +1,61 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.rpc.callbacks import registry as rpc_registry
from neutron.api.rpc.callbacks import resources
from neutron.objects.qos import policy
from neutron.objects.qos import rule
from neutron.tests import base
class ResourcesCallbackRequestTestCase(base.BaseTestCase):
def setUp(self):
super(ResourcesCallbackRequestTestCase, self).setUp()
self.resource_id = '46ebaec0-0570-43ac-82f6-60d2b03168c4'
self.qos_rule_id = '5f126d84-551a-4dcf-bb01-0e9c0df0c793'
def test_resource_callback_request(self):
def _get_qos_policy_cb(resource, policy_id, **kwargs):
context = kwargs.get('context')
qos_policy = policy.QosPolicy(context,
tenant_id="8d4c70a21fed4aeba121a1a429ba0d04",
id="46ebaec0-0570-43ac-82f6-60d2b03168c4",
name="10Mbit",
description="This policy limits the ports to 10Mbit max.",
shared=False,
rules=[
rule.QosBandwidthLimitRule(context,
id="5f126d84-551a-4dcf-bb01-0e9c0df0c793",
max_kbps=10000,
max_burst_kbps=0)
]
)
qos_policy.obj_reset_changes()
return qos_policy
rpc_registry.register_provider(
_get_qos_policy_cb,
resources.QOS_POLICY)
self.ctx = None
kwargs = {'context': self.ctx}
qos_policy = rpc_registry.get_info(
resources.QOS_POLICY,
self.resource_id,
**kwargs)
self.assertEqual(self.resource_id, qos_policy['id'])

View File

@ -0,0 +1,54 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.rpc.callbacks import resources
from neutron.objects.qos import policy
from neutron.tests import base
class GetResourceTypeTestCase(base.BaseTestCase):
def test_get_resource_type_none(self):
self.assertIsNone(resources.get_resource_type(None))
def test_get_resource_type_wrong_type(self):
self.assertIsNone(resources.get_resource_type(object()))
def test_get_resource_type(self):
# we could use any other registered NeutronObject type here
self.assertEqual(policy.QosPolicy.obj_name(),
resources.get_resource_type(policy.QosPolicy()))
class IsValidResourceTypeTestCase(base.BaseTestCase):
def test_known_type(self):
# it could be any other NeutronObject, assuming it's known to RPC
# callbacks
self.assertTrue(resources.is_valid_resource_type(
policy.QosPolicy.obj_name()))
def test_unknown_type(self):
self.assertFalse(
resources.is_valid_resource_type('unknown-resource-type'))
class GetResourceClsTestCase(base.BaseTestCase):
def test_known_type(self):
# it could be any other NeutronObject, assuming it's known to RPC
# callbacks
self.assertEqual(policy.QosPolicy,
resources.get_resource_cls(resources.QOS_POLICY))
def test_unknown_type(self):
self.assertIsNone(resources.get_resource_cls('unknown-resource-type'))

View File

@ -0,0 +1,127 @@
# Copyright (c) 2015 Mellanox Technologies, Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_utils import uuidutils
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron import context
from neutron.objects.qos import policy
from neutron.tests import base
class ResourcesRpcBaseTestCase(base.BaseTestCase):
def setUp(self):
super(ResourcesRpcBaseTestCase, self).setUp()
self.context = context.get_admin_context()
def _create_test_policy_dict(self):
return {'id': uuidutils.generate_uuid(),
'tenant_id': uuidutils.generate_uuid(),
'name': 'test',
'description': 'test',
'shared': False}
def _create_test_policy(self, policy_dict):
policy_obj = policy.QosPolicy(self.context, **policy_dict)
policy_obj.obj_reset_changes()
return policy_obj
class ResourcesServerRpcApiTestCase(ResourcesRpcBaseTestCase):
def setUp(self):
super(ResourcesServerRpcApiTestCase, self).setUp()
self.client_p = mock.patch.object(resources_rpc.n_rpc, 'get_client')
self.client = self.client_p.start()
self.rpc = resources_rpc.ResourcesServerRpcApi()
self.mock_cctxt = self.rpc.client.prepare.return_value
def test_get_info(self):
policy_dict = self._create_test_policy_dict()
expected_policy_obj = self._create_test_policy(policy_dict)
qos_policy_id = policy_dict['id']
self.mock_cctxt.call.return_value = (
expected_policy_obj.obj_to_primitive())
get_info_result = self.rpc.get_info(
self.context, resources.QOS_POLICY, qos_policy_id)
self.mock_cctxt.call.assert_called_once_with(
self.context, 'get_info', resource_type=resources.QOS_POLICY,
version=policy.QosPolicy.VERSION, resource_id=qos_policy_id)
self.assertEqual(expected_policy_obj, get_info_result)
def test_get_info_invalid_resource_type_cls(self):
self.assertRaises(
resources_rpc.InvalidResourceTypeClass, self.rpc.get_info,
self.context, 'foo_type', 'foo_id')
def test_get_info_resource_not_found(self):
policy_dict = self._create_test_policy_dict()
qos_policy_id = policy_dict['id']
self.mock_cctxt.call.return_value = None
self.assertRaises(
resources_rpc.ResourceNotFound, self.rpc.get_info, self.context,
resources.QOS_POLICY, qos_policy_id)
class ResourcesServerRpcCallbackTestCase(ResourcesRpcBaseTestCase):
def setUp(self):
super(ResourcesServerRpcCallbackTestCase, self).setUp()
self.callbacks = resources_rpc.ResourcesServerRpcCallback()
def test_get_info(self):
policy_dict = self._create_test_policy_dict()
policy_obj = self._create_test_policy(policy_dict)
qos_policy_id = policy_dict['id']
with mock.patch.object(resources_rpc.registry, 'get_info',
return_value=policy_obj) as registry_mock:
primitive = self.callbacks.get_info(
self.context, resource_type=resources.QOS_POLICY,
version=policy.QosPolicy.VERSION,
resource_id=qos_policy_id)
registry_mock.assert_called_once_with(
resources.QOS_POLICY,
qos_policy_id, context=self.context)
self.assertEqual(policy_dict, primitive['versioned_object.data'])
self.assertEqual(policy_obj.obj_to_primitive(), primitive)
@mock.patch.object(policy.QosPolicy, 'obj_to_primitive')
def test_get_info_no_backport_for_latest_version(self, to_prim_mock):
policy_dict = self._create_test_policy_dict()
policy_obj = self._create_test_policy(policy_dict)
qos_policy_id = policy_dict['id']
with mock.patch.object(resources_rpc.registry, 'get_info',
return_value=policy_obj):
self.callbacks.get_info(
self.context, resource_type=resources.QOS_POLICY,
version=policy.QosPolicy.VERSION,
resource_id=qos_policy_id)
to_prim_mock.assert_called_with(target_version=None)
@mock.patch.object(policy.QosPolicy, 'obj_to_primitive')
def test_get_info_backports_to_older_version(self, to_prim_mock):
policy_dict = self._create_test_policy_dict()
policy_obj = self._create_test_policy(policy_dict)
qos_policy_id = policy_dict['id']
with mock.patch.object(resources_rpc.registry, 'get_info',
return_value=policy_obj):
self.callbacks.get_info(
self.context, resource_type=resources.QOS_POLICY,
version='0.9', # less than initial version 1.0
resource_id=qos_policy_id)
to_prim_mock.assert_called_with(target_version='0.9')

View File

@ -679,3 +679,14 @@ class TestEnsureDir(base.BaseTestCase):
def test_ensure_dir_calls_makedirs(self, makedirs):
utils.ensure_dir("/etc/create/directory")
makedirs.assert_called_once_with("/etc/create/directory", 0o755)
class TestCamelize(base.BaseTestCase):
def test_camelize(self):
data = {'bandwidth_limit': 'BandwidthLimit',
'test': 'Test',
'some__more__dashes': 'SomeMoreDashes',
'a_penguin_walks_into_a_bar': 'APenguinWalksIntoABar'}
for s, expected in data.items():
self.assertEqual(expected, utils.camelize(s))

View File

@ -0,0 +1,93 @@
# Copyright (c) 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db import db_base_plugin_common
from neutron.tests import base
class DummyObject(object):
def __init__(self, **kwargs):
self.kwargs = kwargs
def to_dict(self):
return self.kwargs
class ConvertToDictTestCase(base.BaseTestCase):
@db_base_plugin_common.convert_result_to_dict
def method_dict(self, fields=None):
return DummyObject(one=1, two=2, three=3)
@db_base_plugin_common.convert_result_to_dict
def method_list(self):
return [DummyObject(one=1, two=2, three=3)] * 3
def test_simple_object(self):
expected = {'one': 1, 'two': 2, 'three': 3}
observed = self.method_dict()
self.assertEqual(expected, observed)
def test_list_of_objects(self):
expected = [{'one': 1, 'two': 2, 'three': 3}] * 3
observed = self.method_list()
self.assertEqual(expected, observed)
class FilterFieldsTestCase(base.BaseTestCase):
@db_base_plugin_common.filter_fields
def method_dict(self, fields=None):
return {'one': 1, 'two': 2, 'three': 3}
@db_base_plugin_common.filter_fields
def method_list(self, fields=None):
return [self.method_dict() for _ in range(3)]
@db_base_plugin_common.filter_fields
def method_multiple_arguments(self, not_used, fields=None,
also_not_used=None):
return {'one': 1, 'two': 2, 'three': 3}
def test_no_fields(self):
expected = {'one': 1, 'two': 2, 'three': 3}
observed = self.method_dict()
self.assertEqual(expected, observed)
def test_dict(self):
expected = {'two': 2}
observed = self.method_dict(['two'])
self.assertEqual(expected, observed)
def test_list(self):
expected = [{'two': 2}, {'two': 2}, {'two': 2}]
observed = self.method_list(['two'])
self.assertEqual(expected, observed)
def test_multiple_arguments_positional(self):
expected = {'two': 2}
observed = self.method_multiple_arguments(list(), ['two'])
self.assertEqual(expected, observed)
def test_multiple_arguments_positional_and_keywords(self):
expected = {'two': 2}
observed = self.method_multiple_arguments(fields=['two'],
not_used=None)
self.assertEqual(expected, observed)
def test_multiple_arguments_keyword(self):
expected = {'two': 2}
observed = self.method_multiple_arguments(list(), fields=['two'])
self.assertEqual(expected, observed)

View File

View File

@ -0,0 +1,267 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db import models_v2
from neutron.objects.qos import policy
from neutron.objects.qos import rule
from neutron.tests.unit.objects import test_base
from neutron.tests.unit import testlib_api
class QosPolicyObjectTestCase(test_base.BaseObjectIfaceTestCase):
_test_class = policy.QosPolicy
def setUp(self):
super(QosPolicyObjectTestCase, self).setUp()
# qos_policy_ids will be incorrect, but we don't care in this test
self.db_qos_bandwidth_rules = [
self.get_random_fields(rule.QosBandwidthLimitRule)
for _ in range(3)]
self.model_map = {
self._test_class.db_model: self.db_objs,
rule.QosBandwidthLimitRule.db_model: self.db_qos_bandwidth_rules}
def fake_get_objects(self, context, model, **kwargs):
return self.model_map[model]
def fake_get_object(self, context, model, id):
objects = self.model_map[model]
return [obj for obj in objects if obj['id'] == id][0]
def test_get_objects(self):
admin_context = self.context.elevated()
with mock.patch.object(
db_api, 'get_objects',
side_effect=self.fake_get_objects) as get_objects_mock:
with mock.patch.object(
db_api, 'get_object',
side_effect=self.fake_get_object):
with mock.patch.object(
self.context,
'elevated',
return_value=admin_context) as context_mock:
objs = self._test_class.get_objects(self.context)
context_mock.assert_called_once_with()
get_objects_mock.assert_any_call(
admin_context, self._test_class.db_model)
self._validate_objects(self.db_objs, objs)
def test_get_by_id(self):
admin_context = self.context.elevated()
with mock.patch.object(db_api, 'get_object',
return_value=self.db_obj) as get_object_mock:
with mock.patch.object(self.context,
'elevated',
return_value=admin_context) as context_mock:
obj = self._test_class.get_by_id(self.context, id='fake_id')
self.assertTrue(self._is_test_class(obj))
self.assertEqual(self.db_obj, test_base.get_obj_db_fields(obj))
context_mock.assert_called_once_with()
get_object_mock.assert_called_once_with(
admin_context, self._test_class.db_model, id='fake_id')
class QosPolicyDbObjectTestCase(test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = policy.QosPolicy
def setUp(self):
super(QosPolicyDbObjectTestCase, self).setUp()
self._create_test_network()
self._create_test_port(self._network)
def _create_test_policy(self):
policy_obj = policy.QosPolicy(self.context, **self.db_obj)
policy_obj.create()
return policy_obj
def _create_test_policy_with_rule(self):
policy_obj = self._create_test_policy()
rule_fields = self.get_random_fields(
obj_cls=rule.QosBandwidthLimitRule)
rule_fields['qos_policy_id'] = policy_obj.id
rule_obj = rule.QosBandwidthLimitRule(self.context, **rule_fields)
rule_obj.create()
return policy_obj, rule_obj
def _create_test_network(self):
# TODO(ihrachys): replace with network.create() once we get an object
# implementation for networks
self._network = db_api.create_object(self.context, models_v2.Network,
{'name': 'test-network1'})
def _create_test_port(self, network):
# TODO(ihrachys): replace with port.create() once we get an object
# implementation for ports
self._port = db_api.create_object(self.context, models_v2.Port,
{'name': 'test-port1',
'network_id': network['id'],
'mac_address': 'fake_mac',
'admin_state_up': True,
'status': 'ACTIVE',
'device_id': 'fake_device',
'device_owner': 'fake_owner'})
def test_attach_network_get_network_policy(self):
obj = self._create_test_policy()
policy_obj = policy.QosPolicy.get_network_policy(self.context,
self._network['id'])
self.assertIsNone(policy_obj)
# Now attach policy and repeat
obj.attach_network(self._network['id'])
policy_obj = policy.QosPolicy.get_network_policy(self.context,
self._network['id'])
self.assertEqual(obj, policy_obj)
def test_attach_network_nonexistent_network(self):
obj = self._create_test_policy()
self.assertRaises(n_exc.NetworkQosBindingNotFound,
obj.attach_network, 'non-existent-network')
def test_attach_port_nonexistent_port(self):
obj = self._create_test_policy()
self.assertRaises(n_exc.PortQosBindingNotFound,
obj.attach_port, 'non-existent-port')
def test_attach_network_nonexistent_policy(self):
policy_obj = policy.QosPolicy(self.context, **self.db_obj)
self.assertRaises(n_exc.NetworkQosBindingNotFound,
policy_obj.attach_network, self._network['id'])
def test_attach_port_nonexistent_policy(self):
policy_obj = policy.QosPolicy(self.context, **self.db_obj)
self.assertRaises(n_exc.PortQosBindingNotFound,
policy_obj.attach_port, self._port['id'])
def test_attach_port_get_port_policy(self):
obj = self._create_test_policy()
policy_obj = policy.QosPolicy.get_network_policy(self.context,
self._network['id'])
self.assertIsNone(policy_obj)
# Now attach policy and repeat
obj.attach_port(self._port['id'])
policy_obj = policy.QosPolicy.get_port_policy(self.context,
self._port['id'])
self.assertEqual(obj, policy_obj)
def test_detach_port(self):
obj = self._create_test_policy()
obj.attach_port(self._port['id'])
obj.detach_port(self._port['id'])
policy_obj = policy.QosPolicy.get_port_policy(self.context,
self._port['id'])
self.assertIsNone(policy_obj)
def test_detach_network(self):
obj = self._create_test_policy()
obj.attach_network(self._network['id'])
obj.detach_network(self._network['id'])
policy_obj = policy.QosPolicy.get_network_policy(self.context,
self._network['id'])
self.assertIsNone(policy_obj)
def test_detach_port_nonexistent_port(self):
obj = self._create_test_policy()
self.assertRaises(n_exc.PortQosBindingNotFound,
obj.detach_port, 'non-existent-port')
def test_detach_network_nonexistent_network(self):
obj = self._create_test_policy()
self.assertRaises(n_exc.NetworkQosBindingNotFound,
obj.detach_network, 'non-existent-port')
def test_detach_port_nonexistent_policy(self):
policy_obj = policy.QosPolicy(self.context, **self.db_obj)
self.assertRaises(n_exc.PortQosBindingNotFound,
policy_obj.detach_port, self._port['id'])
def test_detach_network_nonexistent_policy(self):
policy_obj = policy.QosPolicy(self.context, **self.db_obj)
self.assertRaises(n_exc.NetworkQosBindingNotFound,
policy_obj.detach_network, self._network['id'])
def test_synthetic_rule_fields(self):
policy_obj, rule_obj = self._create_test_policy_with_rule()
policy_obj = policy.QosPolicy.get_by_id(self.context, policy_obj.id)
self.assertEqual([rule_obj], policy_obj.rules)
def test_get_by_id_fetches_rules_non_lazily(self):
policy_obj, rule_obj = self._create_test_policy_with_rule()
policy_obj = policy.QosPolicy.get_by_id(self.context, policy_obj.id)
primitive = policy_obj.obj_to_primitive()
self.assertNotEqual([], (primitive['versioned_object.data']['rules']))
def test_to_dict_returns_rules_as_dicts(self):
policy_obj, rule_obj = self._create_test_policy_with_rule()
policy_obj = policy.QosPolicy.get_by_id(self.context, policy_obj.id)
obj_dict = policy_obj.to_dict()
rule_dict = rule_obj.to_dict()
# first make sure that to_dict() is still sane and does not return
# objects
for obj in (rule_dict, obj_dict):
self.assertIsInstance(obj, dict)
self.assertEqual(rule_dict, obj_dict['rules'][0])
def test_shared_default(self):
self.db_obj.pop('shared')
obj = self._test_class(self.context, **self.db_obj)
self.assertEqual(False, obj.shared)
def test_delete_not_allowed_if_policy_in_use_by_port(self):
obj = self._create_test_policy()
obj.attach_port(self._port['id'])
self.assertRaises(n_exc.QosPolicyInUse, obj.delete)
obj.detach_port(self._port['id'])
obj.delete()
def test_delete_not_allowed_if_policy_in_use_by_network(self):
obj = self._create_test_policy()
obj.attach_network(self._network['id'])
self.assertRaises(n_exc.QosPolicyInUse, obj.delete)
obj.detach_network(self._network['id'])
obj.delete()

View File

@ -0,0 +1,42 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.objects.qos import policy
from neutron.objects.qos import rule
from neutron.services.qos import qos_consts
from neutron.tests.unit.objects import test_base
from neutron.tests.unit import testlib_api
class QosBandwidthLimitRuleObjectTestCase(test_base.BaseObjectIfaceTestCase):
_test_class = rule.QosBandwidthLimitRule
def test_to_dict_returns_type(self):
obj = rule.QosBandwidthLimitRule(self.context, **self.db_obj)
dict_ = obj.to_dict()
self.assertEqual(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT, dict_['type'])
class QosBandwidthLimitRuleDbObjectTestCase(test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = rule.QosBandwidthLimitRule
def setUp(self):
super(QosBandwidthLimitRuleDbObjectTestCase, self).setUp()
# Prepare policy to be able to insert a rule
generated_qos_policy_id = self.db_obj['qos_policy_id']
policy_obj = policy.QosPolicy(self.context,
id=generated_qos_policy_id)
policy_obj.create()

View File

@ -0,0 +1,46 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# rule types are so different from other objects that we don't base the test
# class on the common base class for all objects
import mock
from neutron import manager
from neutron.objects.qos import rule_type
from neutron.services.qos import qos_consts
from neutron.tests import base as test_base
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
class QosRuleTypeObjectTestCase(test_base.BaseTestCase):
def setUp(self):
self.config_parse()
self.setup_coreplugin(DB_PLUGIN_KLASS)
super(QosRuleTypeObjectTestCase, self).setUp()
def test_get_objects(self):
core_plugin = manager.NeutronManager.get_plugin()
rule_types_mock = mock.PropertyMock(
return_value=qos_consts.VALID_RULE_TYPES)
with mock.patch.object(core_plugin, 'supported_qos_rule_types',
new_callable=rule_types_mock,
create=True):
types = rule_type.QosRuleType.get_objects()
self.assertEqual(sorted(qos_consts.VALID_RULE_TYPES),
sorted(type_['type'] for type_ in types))
def test_wrong_type(self):
self.assertRaises(ValueError, rule_type.QosRuleType, type='bad_type')

View File

@ -0,0 +1,284 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
import string
import mock
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from neutron.common import exceptions as n_exc
from neutron import context
from neutron.db import api as db_api
from neutron.objects import base
from neutron.tests import base as test_base
SQLALCHEMY_COMMIT = 'sqlalchemy.engine.Connection._commit_impl'
class FakeModel(object):
def __init__(self, *args, **kwargs):
pass
@obj_base.VersionedObjectRegistry.register
class FakeNeutronObject(base.NeutronDbObject):
db_model = FakeModel
fields = {
'id': obj_fields.UUIDField(),
'field1': obj_fields.StringField(),
'field2': obj_fields.StringField()
}
fields_no_update = ['id']
def _random_string(n=10):
return ''.join(random.choice(string.ascii_lowercase) for _ in range(n))
def _random_boolean():
return bool(random.getrandbits(1))
def _random_integer():
return random.randint(0, 1000)
FIELD_TYPE_VALUE_GENERATOR_MAP = {
obj_fields.BooleanField: _random_boolean,
obj_fields.IntegerField: _random_integer,
obj_fields.StringField: _random_string,
obj_fields.UUIDField: _random_string,
obj_fields.ListOfObjectsField: lambda: []
}
def get_obj_db_fields(obj):
return {field: getattr(obj, field) for field in obj.fields
if field not in obj.synthetic_fields}
class _BaseObjectTestCase(object):
_test_class = FakeNeutronObject
def setUp(self):
super(_BaseObjectTestCase, self).setUp()
self.context = context.get_admin_context()
self.db_objs = list(self.get_random_fields() for _ in range(3))
self.db_obj = self.db_objs[0]
@classmethod
def get_random_fields(cls, obj_cls=None):
obj_cls = obj_cls or cls._test_class
fields = {}
for field, field_obj in obj_cls.fields.items():
if field not in obj_cls.synthetic_fields:
generator = FIELD_TYPE_VALUE_GENERATOR_MAP[type(field_obj)]
fields[field] = generator()
return fields
def get_updatable_fields(self, fields):
return base.get_updatable_fields(self._test_class, fields)
@classmethod
def _is_test_class(cls, obj):
return isinstance(obj, cls._test_class)
class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
def test_get_by_id(self):
with mock.patch.object(db_api, 'get_object',
return_value=self.db_obj) as get_object_mock:
obj = self._test_class.get_by_id(self.context, id='fake_id')
self.assertTrue(self._is_test_class(obj))
self.assertEqual(self.db_obj, get_obj_db_fields(obj))
get_object_mock.assert_called_once_with(
self.context, self._test_class.db_model, id='fake_id')
def test_get_by_id_missing_object(self):
with mock.patch.object(db_api, 'get_object', return_value=None):
obj = self._test_class.get_by_id(self.context, id='fake_id')
self.assertIsNone(obj)
def test_get_objects(self):
with mock.patch.object(db_api, 'get_objects',
return_value=self.db_objs) as get_objects_mock:
objs = self._test_class.get_objects(self.context)
self._validate_objects(self.db_objs, objs)
get_objects_mock.assert_called_once_with(
self.context, self._test_class.db_model)
def _validate_objects(self, expected, observed):
self.assertFalse(
filter(lambda obj: not self._is_test_class(obj), observed))
self.assertEqual(
sorted(expected),
sorted(get_obj_db_fields(obj) for obj in observed))
def _check_equal(self, obj, db_obj):
self.assertEqual(
sorted(db_obj),
sorted(get_obj_db_fields(obj)))
def test_create(self):
with mock.patch.object(db_api, 'create_object',
return_value=self.db_obj) as create_mock:
obj = self._test_class(self.context, **self.db_obj)
self._check_equal(obj, self.db_obj)
obj.create()
self._check_equal(obj, self.db_obj)
create_mock.assert_called_once_with(
self.context, self._test_class.db_model, self.db_obj)
def test_create_updates_from_db_object(self):
with mock.patch.object(db_api, 'create_object',
return_value=self.db_obj):
obj = self._test_class(self.context, **self.db_objs[1])
self._check_equal(obj, self.db_objs[1])
obj.create()
self._check_equal(obj, self.db_obj)
@mock.patch.object(db_api, 'update_object')
def test_update_no_changes(self, update_mock):
with mock.patch.object(base.NeutronDbObject,
'_get_changed_persistent_fields',
return_value={}):
obj = self._test_class(self.context)
obj.update()
self.assertFalse(update_mock.called)
@mock.patch.object(db_api, 'update_object')
def test_update_changes(self, update_mock):
fields_to_update = self.get_updatable_fields(self.db_obj)
with mock.patch.object(base.NeutronDbObject,
'_get_changed_persistent_fields',
return_value=fields_to_update):
obj = self._test_class(self.context, **self.db_obj)
obj.update()
update_mock.assert_called_once_with(
self.context, self._test_class.db_model,
self.db_obj['id'], fields_to_update)
@mock.patch.object(base.NeutronDbObject,
'_get_changed_persistent_fields',
return_value={'a': 'a', 'b': 'b', 'c': 'c'})
def test_update_changes_forbidden(self, *mocks):
with mock.patch.object(
self._test_class,
'fields_no_update',
new_callable=mock.PropertyMock(return_value=['a', 'c']),
create=True):
obj = self._test_class(self.context, **self.db_obj)
self.assertRaises(base.NeutronObjectUpdateForbidden, obj.update)
def test_update_updates_from_db_object(self):
with mock.patch.object(db_api, 'update_object',
return_value=self.db_obj):
obj = self._test_class(self.context, **self.db_objs[1])
fields_to_update = self.get_updatable_fields(self.db_objs[1])
with mock.patch.object(base.NeutronDbObject,
'_get_changed_persistent_fields',
return_value=fields_to_update):
obj.update()
self._check_equal(obj, self.db_obj)
@mock.patch.object(db_api, 'delete_object')
def test_delete(self, delete_mock):
obj = self._test_class(self.context, **self.db_obj)
self._check_equal(obj, self.db_obj)
obj.delete()
self._check_equal(obj, self.db_obj)
delete_mock.assert_called_once_with(
self.context, self._test_class.db_model, self.db_obj['id'])
class BaseDbObjectTestCase(_BaseObjectTestCase):
def test_get_by_id_create_update_delete(self):
obj = self._test_class(self.context, **self.db_obj)
obj.create()
new = self._test_class.get_by_id(self.context, id=obj.id)
self.assertEqual(obj, new)
obj = new
for key, val in self.get_updatable_fields(self.db_objs[1]).items():
setattr(obj, key, val)
obj.update()
new = self._test_class.get_by_id(self.context, id=obj.id)
self.assertEqual(obj, new)
obj = new
new.delete()
new = self._test_class.get_by_id(self.context, id=obj.id)
self.assertIsNone(new)
def test_update_non_existent_object_raises_not_found(self):
obj = self._test_class(self.context, **self.db_obj)
obj.obj_reset_changes()
for key, val in self.get_updatable_fields(self.db_obj).items():
setattr(obj, key, val)
self.assertRaises(n_exc.ObjectNotFound, obj.update)
def test_delete_non_existent_object_raises_not_found(self):
obj = self._test_class(self.context, **self.db_obj)
self.assertRaises(n_exc.ObjectNotFound, obj.delete)
@mock.patch(SQLALCHEMY_COMMIT)
def test_create_single_transaction(self, mock_commit):
obj = self._test_class(self.context, **self.db_obj)
obj.create()
self.assertEqual(1, mock_commit.call_count)
def test_update_single_transaction(self):
obj = self._test_class(self.context, **self.db_obj)
obj.create()
for key, val in self.get_updatable_fields(self.db_obj).items():
setattr(obj, key, val)
with mock.patch(SQLALCHEMY_COMMIT) as mock_commit:
obj.update()
self.assertEqual(1, mock_commit.call_count)
def test_delete_single_transaction(self):
obj = self._test_class(self.context, **self.db_obj)
obj.create()
with mock.patch(SQLALCHEMY_COMMIT) as mock_commit:
obj.delete()
self.assertEqual(1, mock_commit.call_count)
@mock.patch(SQLALCHEMY_COMMIT)
def test_get_objects_single_transaction(self, mock_commit):
self._test_class.get_objects(self.context)
self.assertEqual(1, mock_commit.call_count)
@mock.patch(SQLALCHEMY_COMMIT)
def test_get_by_id_single_transaction(self, mock_commit):
obj = self._test_class(self.context, **self.db_obj)
obj.create()
obj = self._test_class.get_by_id(self.context, obj.id)
self.assertEqual(2, mock_commit.call_count)

View File

@ -114,3 +114,20 @@ class TestPciLib(base.BaseTestCase):
self.pci_wrapper.set_vf_spoofcheck,
self.VF_INDEX,
True)
def test_set_vf_max_rate(self):
with mock.patch.object(self.pci_wrapper, "_as_root") \
as mock_as_root:
result = self.pci_wrapper.set_vf_max_rate(self.VF_INDEX, 1000)
self.assertIsNone(result)
mock_as_root.assert_called_once_with([], "link",
("set", self.DEV_NAME, "vf", str(self.VF_INDEX), "rate", '1000'))
def test_set_vf_max_rate_fail(self):
with mock.patch.object(self.pci_wrapper,
"_execute") as mock_exec:
mock_exec.side_effect = Exception()
self.assertRaises(exc.IpCommandError,
self.pci_wrapper.set_vf_max_rate,
self.VF_INDEX,
1000)

View File

@ -0,0 +1,106 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_utils import uuidutils
from neutron import context
from neutron.objects.qos import policy
from neutron.objects.qos import rule
from neutron.plugins.ml2.drivers.openvswitch.agent.extension_drivers import (
qos_driver)
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent import (
ovs_test_base)
class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
def setUp(self):
super(QosOVSAgentDriverTestCase, self).setUp()
self.context = context.get_admin_context()
self.qos_driver = qos_driver.QosOVSAgentDriver()
self.qos_driver.initialize()
self.qos_driver.br_int = mock.Mock()
self.qos_driver.br_int.get_qos_bw_limit_for_port = mock.Mock(
return_value=(1000, 10))
self.get = self.qos_driver.br_int.get_qos_bw_limit_for_port
self.qos_driver.br_int.del_qos_bw_limit_for_port = mock.Mock()
self.delete = self.qos_driver.br_int.del_qos_bw_limit_for_port
self.qos_driver.br_int.create_qos_bw_limit_for_port = mock.Mock()
self.create = self.qos_driver.br_int.create_qos_bw_limit_for_port
self.rule = self._create_bw_limit_rule_obj()
self.qos_policy = self._create_qos_policy_obj([self.rule])
self.port = self._create_fake_port()
def _create_bw_limit_rule_obj(self):
rule_obj = rule.QosBandwidthLimitRule()
rule_obj.id = uuidutils.generate_uuid()
rule_obj.max_kbps = 2
rule_obj.max_burst_kbps = 200
rule_obj.obj_reset_changes()
return rule_obj
def _create_qos_policy_obj(self, rules):
policy_dict = {'id': uuidutils.generate_uuid(),
'tenant_id': uuidutils.generate_uuid(),
'name': 'test',
'description': 'test',
'shared': False,
'rules': rules}
policy_obj = policy.QosPolicy(self.context, **policy_dict)
policy_obj.obj_reset_changes()
return policy_obj
def _create_fake_port(self):
self.port_name = 'fakeport'
class FakeVifPort(object):
port_name = self.port_name
return {'vif_port': FakeVifPort()}
def test_create_new_rule(self):
self.qos_driver.br_int.get_qos_bw_limit_for_port = mock.Mock(
return_value=(None, None))
self.qos_driver.create(self.port, self.qos_policy)
# Assert create is the last call
self.assertEqual(
'create_qos_bw_limit_for_port',
self.qos_driver.br_int.method_calls[-1][0])
self.assertEqual(0, self.delete.call_count)
self.create.assert_called_once_with(
self.port_name, self.rule.max_kbps,
self.rule.max_burst_kbps)
def test_create_existing_rules(self):
self.qos_driver.create(self.port, self.qos_policy)
self._assert_rule_create_updated()
def test_update_rules(self):
self.qos_driver.update(self.port, self.qos_policy)
self._assert_rule_create_updated()
def test_delete_rules(self):
self.qos_driver.delete(self.port, self.qos_policy)
self.delete.assert_called_once_with(self.port_name)
def _assert_rule_create_updated(self):
# Assert create is the last call
self.assertEqual(
'create_qos_bw_limit_for_port',
self.qos_driver.br_int.method_calls[-1][0])
self.delete.assert_called_once_with(self.port_name)
self.create.assert_called_once_with(
self.port_name, self.rule.max_kbps,
self.rule.max_burst_kbps)

View File

@ -402,6 +402,29 @@ class TestOvsNeutronAgent(object):
self.assertTrue(self._mock_treat_devices_added_updated(
details, mock.Mock(), 'treat_vif_port'))
def test_treat_devices_added_updated_sends_vif_port_into_extension_manager(
self, *args):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
port = mock.MagicMock()
def fake_handle_port(context, port):
self.assertIn('vif_port', port)
with mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list_and_failed_devices',
return_value={'devices': [details],
'failed_devices': None}),\
mock.patch.object(self.agent.agent_extensions_mgr,
'handle_port', new=fake_handle_port),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={details['device']: port}),\
mock.patch.object(self.agent, 'treat_vif_port',
return_value=False):
self.agent.treat_devices_added_or_updated([{}], False)
def test_treat_devices_added_updated_skips_if_port_not_found(self):
dev_mock = mock.MagicMock()
dev_mock.__getitem__.return_value = 'the_skipped_one'

View File

@ -49,6 +49,7 @@ from neutron.plugins.ml2 import driver_context
from neutron.plugins.ml2.drivers import type_vlan
from neutron.plugins.ml2 import models
from neutron.plugins.ml2 import plugin as ml2_plugin
from neutron.services.qos import qos_consts
from neutron.tests import base
from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit.agent import test_securitygroups_rpc as test_sg_rpc
@ -139,6 +140,37 @@ class TestMl2BulkToggleWithoutBulkless(Ml2PluginV2TestCase):
self.assertFalse(self._skip_native_bulk)
class TestMl2SupportedQosRuleTypes(Ml2PluginV2TestCase):
def test_empty_driver_list(self, *mocks):
mech_drivers_mock = mock.PropertyMock(return_value=[])
with mock.patch.object(self.driver.mechanism_manager,
'ordered_mech_drivers',
new_callable=mech_drivers_mock):
self.assertEqual(
[], self.driver.mechanism_manager.supported_qos_rule_types)
def test_no_rule_types_in_common(self):
self.assertEqual(
[], self.driver.mechanism_manager.supported_qos_rule_types)
@mock.patch.object(mech_logger.LoggerMechanismDriver,
'supported_qos_rule_types',
new_callable=mock.PropertyMock,
create=True)
@mock.patch.object(mech_test.TestMechanismDriver,
'supported_qos_rule_types',
new_callable=mock.PropertyMock,
create=True)
def test_rule_type_in_common(self, *mocks):
# make sure both plugins have the same supported qos rule types
for mock_ in mocks:
mock_.return_value = qos_consts.VALID_RULE_TYPES
self.assertEqual(
qos_consts.VALID_RULE_TYPES,
self.driver.mechanism_manager.supported_qos_rule_types)
class TestMl2BasicGet(test_plugin.TestBasicGet,
Ml2PluginV2TestCase):
pass
@ -1630,3 +1662,75 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
# run the transaction balancing function defined in this test
plugin.delete_port(self.context, 'fake_id')
self.assertTrue(self.notify.call_count)
class TestMl2PluginCreateUpdateNetwork(base.BaseTestCase):
def setUp(self):
super(TestMl2PluginCreateUpdateNetwork, self).setUp()
self.context = mock.MagicMock()
self.notify_p = mock.patch('neutron.callbacks.registry.notify')
self.notify = self.notify_p.start()
def _ensure_transaction_is_closed(self):
transaction = self.context.session.begin(subtransactions=True)
enter = transaction.__enter__.call_count
exit = transaction.__exit__.call_count
self.assertEqual(enter, exit)
def _create_plugin_for_create_update_network(self):
plugin = ml2_plugin.Ml2Plugin()
plugin.extension_manager = mock.Mock()
plugin.type_manager = mock.Mock()
plugin.mechanism_manager = mock.Mock()
plugin.notifier = mock.Mock()
mock.patch('neutron.extensions.providernet.'
'_raise_if_updates_provider_attributes').start()
self.notify.side_effect = (
lambda r, e, t, **kwargs: self._ensure_transaction_is_closed())
return plugin
def test_create_network_rpc_outside_transaction(self):
with mock.patch.object(ml2_plugin.Ml2Plugin, '__init__') as init,\
mock.patch.object(base_plugin.NeutronDbPluginV2,
'create_network'):
init.return_value = None
plugin = self._create_plugin_for_create_update_network()
plugin.create_network(self.context, mock.MagicMock())
kwargs = {'context': self.context, 'network': mock.ANY}
self.notify.assert_called_once_with('network', 'after_create',
plugin, **kwargs)
def test_create_network_bulk_rpc_outside_transaction(self):
with mock.patch.object(ml2_plugin.Ml2Plugin, '__init__') as init,\
mock.patch.object(base_plugin.NeutronDbPluginV2,
'create_network'):
init.return_value = None
plugin = self._create_plugin_for_create_update_network()
plugin.create_network_bulk(self.context,
{'networks':
[mock.MagicMock(), mock.MagicMock()]})
self.assertEqual(2, self.notify.call_count)
def test_update_network_rpc_outside_transaction(self):
with mock.patch.object(ml2_plugin.Ml2Plugin, '__init__') as init,\
mock.patch.object(base_plugin.NeutronDbPluginV2,
'update_network'):
init.return_value = None
plugin = self._create_plugin_for_create_update_network()
plugin.update_network(self.context, 'fake_id', mock.MagicMock())
kwargs = {
'context': self.context,
'network': mock.ANY,
}
self.notify.assert_called_once_with('network', 'after_update',
plugin, **kwargs)

View File

@ -32,6 +32,7 @@ from neutron.common import topics
from neutron.plugins.ml2.drivers import type_tunnel
from neutron.plugins.ml2 import managers
from neutron.plugins.ml2 import rpc as plugin_rpc
from neutron.services.qos import qos_consts
from neutron.tests import base
@ -135,6 +136,34 @@ class RpcCallbacksTestCase(base.BaseTestCase):
self.callbacks.get_device_details(mock.Mock())
self.assertTrue(self.plugin.update_port_status.called)
def test_get_device_details_qos_policy_id_none(self):
port = collections.defaultdict(lambda: 'fake_port')
self.plugin.get_bound_port_context().current = port
self.plugin.get_bound_port_context().network._network = (
{"id": "fake_network"})
res = self.callbacks.get_device_details(mock.Mock(), host='fake')
self.assertIsNone(res['qos_policy_id'])
def test_get_device_details_qos_policy_id_inherited_from_network(self):
port = collections.defaultdict(lambda: 'fake_port')
self.plugin.get_bound_port_context().current = port
self.plugin.get_bound_port_context().network._network = (
{"id": "fake_network",
qos_consts.QOS_POLICY_ID: 'test-policy-id'})
res = self.callbacks.get_device_details(mock.Mock(), host='fake')
self.assertEqual('test-policy-id', res['qos_policy_id'])
def test_get_device_details_qos_policy_id_taken_from_port(self):
port = collections.defaultdict(
lambda: 'fake_port',
{qos_consts.QOS_POLICY_ID: 'test-port-policy-id'})
self.plugin.get_bound_port_context().current = port
self.plugin.get_bound_port_context().network._network = (
{"id": "fake_network",
qos_consts.QOS_POLICY_ID: 'test-net-policy-id'})
res = self.callbacks.get_device_details(mock.Mock(), host='fake')
self.assertEqual('test-port-policy-id', res['qos_policy_id'])
def _test_get_devices_list(self, callback, side_effect, expected):
devices = [1, 2, 3, 4, 5]
kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}

View File

@ -0,0 +1,30 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.services.qos.notification_drivers import qos_base
class DummyQosServiceNotificationDriver(
qos_base.QosServiceNotificationDriverBase):
"""Dummy service notification driver for QoS."""
def get_description(self):
return "Dummy"
def create_policy(self, policy):
pass
def update_policy(self, policy):
pass
def delete_policy(self, policy):
pass

View File

@ -0,0 +1,100 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron import context
from neutron.objects.qos import policy as policy_object
from neutron.services.qos.notification_drivers import manager as driver_mgr
from neutron.services.qos.notification_drivers import message_queue
from neutron.tests import base
DUMMY_DRIVER = ("neutron.tests.unit.services.qos.notification_drivers."
"dummy.DummyQosServiceNotificationDriver")
def _load_multiple_drivers():
cfg.CONF.set_override(
"notification_drivers",
["message_queue", DUMMY_DRIVER],
"qos")
class TestQosDriversManager(base.BaseTestCase):
def setUp(self):
super(TestQosDriversManager, self).setUp()
self.config_parse()
self.setup_coreplugin()
self.registry_p = mock.patch(
'neutron.api.rpc.callbacks.registry.notify')
self.registry_m = self.registry_p.start()
self.driver_manager = driver_mgr.QosServiceNotificationDriverManager()
config = cfg.ConfigOpts()
config.register_opts(driver_mgr.QOS_PLUGIN_OPTS, "qos")
self.policy_data = {'policy': {
'id': 7777777,
'tenant_id': 888888,
'name': 'test-policy',
'description': 'test policy description',
'shared': True}}
self.policy = policy_object.QosPolicy(context,
**self.policy_data['policy'])
ctxt = None
self.kwargs = {'context': ctxt}
def _validate_registry_params(self, event_type, policy):
self.assertTrue(self.registry_m.called, policy)
self.registry_m.assert_called_with(
resources.QOS_POLICY,
event_type,
policy)
def test_create_policy_default_configuration(self):
#RPC driver should be loaded by default
self.driver_manager.create_policy(self.policy)
self.assertFalse(self.registry_m.called)
def test_update_policy_default_configuration(self):
#RPC driver should be loaded by default
self.driver_manager.update_policy(self.policy)
self._validate_registry_params(events.UPDATED, self.policy)
def test_delete_policy_default_configuration(self):
#RPC driver should be loaded by default
self.driver_manager.delete_policy(self.policy)
self._validate_registry_params(events.DELETED, self.policy)
def _test_multi_drivers_configuration_op(self, op):
_load_multiple_drivers()
# create a new manager with new configuration
driver_manager = driver_mgr.QosServiceNotificationDriverManager()
handler = '%s_policy' % op
with mock.patch('.'.join([DUMMY_DRIVER, handler])) as dummy_mock:
rpc_driver = message_queue.RpcQosServiceNotificationDriver
with mock.patch.object(rpc_driver, handler) as rpc_mock:
getattr(driver_manager, handler)(self.policy)
for mock_ in (dummy_mock, rpc_mock):
mock_.assert_called_with(self.policy)
def test_multi_drivers_configuration_create(self):
self._test_multi_drivers_configuration_op('create')
def test_multi_drivers_configuration_update(self):
self._test_multi_drivers_configuration_op('update')
def test_multi_drivers_configuration_delete(self):
self._test_multi_drivers_configuration_op('delete')

View File

@ -0,0 +1,72 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron import context
from neutron.objects.qos import policy as policy_object
from neutron.objects.qos import rule as rule_object
from neutron.services.qos.notification_drivers import message_queue
from neutron.tests import base
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
class TestQosRpcNotificationDriver(base.BaseTestCase):
def setUp(self):
super(TestQosRpcNotificationDriver, self).setUp()
registry_p = mock.patch(
'neutron.api.rpc.callbacks.registry.notify')
self.registry_m = registry_p.start()
self.driver = message_queue.RpcQosServiceNotificationDriver()
self.policy_data = {'policy': {
'id': 7777777,
'tenant_id': 888888,
'name': 'testi-policy',
'description': 'test policyi description',
'shared': True}}
self.rule_data = {'bandwidth_limit_rule': {
'id': 7777777,
'max_kbps': 100,
'max_burst_kbps': 150}}
self.policy = policy_object.QosPolicy(context,
**self.policy_data['policy'])
self.rule = rule_object.QosBandwidthLimitRule(
context,
**self.rule_data['bandwidth_limit_rule'])
def _validate_registry_params(self, event_type, policy):
self.assertTrue(self.registry_m.called, policy)
self.registry_m.assert_called_once_with(
resources.QOS_POLICY,
event_type,
policy)
def test_create_policy(self):
self.driver.create_policy(self.policy)
self.assertFalse(self.registry_m.called)
def test_update_policy(self):
self.driver.update_policy(self.policy)
self._validate_registry_params(events.UPDATED, self.policy)
def test_delete_policy(self):
self.driver.delete_policy(self.policy)
self._validate_registry_params(events.DELETED, self.policy)

View File

@ -0,0 +1,154 @@
# Copyright (c) 2015 Red Hat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron import context
from neutron.plugins.common import constants as plugin_constants
from neutron.services.qos import qos_consts
from neutron.services.qos import qos_extension
from neutron.tests import base
def _get_test_dbdata(qos_policy_id):
return {'id': None, 'qos_policy_binding': {'policy_id': qos_policy_id,
'network_id': 'fake_net_id'}}
class QosResourceExtensionHandlerTestCase(base.BaseTestCase):
def setUp(self):
super(QosResourceExtensionHandlerTestCase, self).setUp()
self.ext_handler = qos_extension.QosResourceExtensionHandler()
policy_p = mock.patch('neutron.objects.qos.policy.QosPolicy')
self.policy_m = policy_p.start()
self.context = context.get_admin_context()
def test_process_resource_no_qos_policy_id(self):
self.ext_handler.process_resource(
self.context, qos_extension.PORT, {}, None)
self.assertFalse(self.policy_m.called)
def _mock_plugin_loaded(self, plugin_loaded):
plugins = {}
if plugin_loaded:
plugins[plugin_constants.QOS] = None
return mock.patch('neutron.manager.NeutronManager.get_service_plugins',
return_value=plugins)
def test_process_resource_no_qos_plugin_loaded(self):
with self._mock_plugin_loaded(False):
self.ext_handler.process_resource(
self.context, qos_extension.PORT,
{qos_consts.QOS_POLICY_ID: None}, None)
self.assertFalse(self.policy_m.called)
def test_process_resource_port_new_policy(self):
with self._mock_plugin_loaded(True):
qos_policy_id = mock.Mock()
actual_port = {'id': mock.Mock(),
qos_consts.QOS_POLICY_ID: qos_policy_id}
qos_policy = mock.MagicMock()
self.policy_m.get_by_id = mock.Mock(return_value=qos_policy)
self.ext_handler.process_resource(
self.context, qos_extension.PORT,
{qos_consts.QOS_POLICY_ID: qos_policy_id},
actual_port)
qos_policy.attach_port.assert_called_once_with(actual_port['id'])
def test_process_resource_port_updated_policy(self):
with self._mock_plugin_loaded(True):
qos_policy_id = mock.Mock()
port_id = mock.Mock()
actual_port = {'id': port_id,
qos_consts.QOS_POLICY_ID: qos_policy_id}
old_qos_policy = mock.MagicMock()
self.policy_m.get_port_policy = mock.Mock(
return_value=old_qos_policy)
new_qos_policy = mock.MagicMock()
self.policy_m.get_by_id = mock.Mock(return_value=new_qos_policy)
self.ext_handler.process_resource(
self.context, qos_extension.PORT,
{qos_consts.QOS_POLICY_ID: qos_policy_id},
actual_port)
old_qos_policy.detach_port.assert_called_once_with(port_id)
new_qos_policy.attach_port.assert_called_once_with(port_id)
def test_process_resource_network_new_policy(self):
with self._mock_plugin_loaded(True):
qos_policy_id = mock.Mock()
actual_network = {'id': mock.Mock(),
qos_consts.QOS_POLICY_ID: qos_policy_id}
qos_policy = mock.MagicMock()
self.policy_m.get_by_id = mock.Mock(return_value=qos_policy)
self.ext_handler.process_resource(
self.context, qos_extension.NETWORK,
{qos_consts.QOS_POLICY_ID: qos_policy_id}, actual_network)
qos_policy.attach_network.assert_called_once_with(
actual_network['id'])
def test_process_resource_network_updated_policy(self):
with self._mock_plugin_loaded(True):
qos_policy_id = mock.Mock()
network_id = mock.Mock()
actual_network = {'id': network_id,
qos_consts.QOS_POLICY_ID: qos_policy_id}
old_qos_policy = mock.MagicMock()
self.policy_m.get_network_policy = mock.Mock(
return_value=old_qos_policy)
new_qos_policy = mock.MagicMock()
self.policy_m.get_by_id = mock.Mock(return_value=new_qos_policy)
self.ext_handler.process_resource(
self.context, qos_extension.NETWORK,
{qos_consts.QOS_POLICY_ID: qos_policy_id}, actual_network)
old_qos_policy.detach_network.assert_called_once_with(network_id)
new_qos_policy.attach_network.assert_called_once_with(network_id)
def test_extract_resource_fields_plugin_not_loaded(self):
with self._mock_plugin_loaded(False):
fields = self.ext_handler.extract_resource_fields(None, None)
self.assertEqual({}, fields)
def _test_extract_resource_fields_for_port(self, qos_policy_id):
with self._mock_plugin_loaded(True):
fields = self.ext_handler.extract_resource_fields(
qos_extension.PORT, _get_test_dbdata(qos_policy_id))
self.assertEqual({qos_consts.QOS_POLICY_ID: qos_policy_id}, fields)
def test_extract_resource_fields_no_port_policy(self):
self._test_extract_resource_fields_for_port(None)
def test_extract_resource_fields_port_policy_exists(self):
qos_policy_id = mock.Mock()
self._test_extract_resource_fields_for_port(qos_policy_id)
def _test_extract_resource_fields_for_network(self, qos_policy_id):
with self._mock_plugin_loaded(True):
fields = self.ext_handler.extract_resource_fields(
qos_extension.NETWORK, _get_test_dbdata(qos_policy_id))
self.assertEqual({qos_consts.QOS_POLICY_ID: qos_policy_id}, fields)
def test_extract_resource_fields_no_network_policy(self):
self._test_extract_resource_fields_for_network(None)
def test_extract_resource_fields_network_policy_exists(self):
qos_policy_id = mock.Mock()
qos_policy = mock.Mock()
qos_policy.id = qos_policy_id
self._test_extract_resource_fields_for_network(qos_policy_id)

View File

@ -0,0 +1,162 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.common import exceptions as n_exc
from neutron import context
from neutron import manager
from neutron.objects import base as base_object
from neutron.objects.qos import policy as policy_object
from neutron.objects.qos import rule as rule_object
from neutron.plugins.common import constants
from neutron.tests import base
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
class TestQosPlugin(base.BaseTestCase):
def setUp(self):
super(TestQosPlugin, self).setUp()
self.setup_coreplugin()
mock.patch('neutron.db.api.create_object').start()
mock.patch('neutron.db.api.update_object').start()
mock.patch('neutron.db.api.delete_object').start()
mock.patch('neutron.db.api.get_object').start()
mock.patch(
'neutron.objects.qos.policy.QosPolicy.obj_load_attr').start()
self.registry_p = mock.patch(
'neutron.api.rpc.callbacks.registry.notify')
self.registry_m = self.registry_p.start()
cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
cfg.CONF.set_override("service_plugins", ["qos"])
mgr = manager.NeutronManager.get_instance()
self.qos_plugin = mgr.get_service_plugins().get(
constants.QOS)
self.ctxt = context.Context('fake_user', 'fake_tenant')
self.policy_data = {
'policy': {'id': 7777777,
'tenant_id': 888888,
'name': 'test-policy',
'description': 'Test policy description',
'shared': True}}
self.rule_data = {
'bandwidth_limit_rule': {'id': 7777777,
'max_kbps': 100,
'max_burst_kbps': 150}}
self.policy = policy_object.QosPolicy(
context, **self.policy_data['policy'])
self.rule = rule_object.QosBandwidthLimitRule(
context, **self.rule_data['bandwidth_limit_rule'])
def _validate_registry_params(self, event_type):
self.registry_m.assert_called_once_with(
resources.QOS_POLICY,
event_type,
mock.ANY)
self.assertIsInstance(
self.registry_m.call_args[0][2], policy_object.QosPolicy)
def test_add_policy(self):
self.qos_plugin.create_policy(self.ctxt, self.policy_data)
self.assertFalse(self.registry_m.called)
def test_update_policy(self):
fields = base_object.get_updatable_fields(
policy_object.QosPolicy, self.policy_data['policy'])
self.qos_plugin.update_policy(
self.ctxt, self.policy.id, {'policy': fields})
self._validate_registry_params(events.UPDATED)
@mock.patch('neutron.db.api.get_object', return_value=None)
def test_delete_policy(self, *mocks):
self.qos_plugin.delete_policy(self.ctxt, self.policy.id)
self._validate_registry_params(events.DELETED)
def test_create_policy_rule(self):
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=self.policy):
self.qos_plugin.create_policy_bandwidth_limit_rule(
self.ctxt, self.policy.id, self.rule_data)
self._validate_registry_params(events.UPDATED)
def test_update_policy_rule(self):
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=self.policy):
self.qos_plugin.update_policy_bandwidth_limit_rule(
self.ctxt, self.rule.id, self.policy.id, self.rule_data)
self._validate_registry_params(events.UPDATED)
def test_delete_policy_rule(self):
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=self.policy):
self.qos_plugin.delete_policy_bandwidth_limit_rule(
self.ctxt, self.rule.id, self.policy.id)
self._validate_registry_params(events.UPDATED)
def test_get_policy_for_nonexistent_policy(self):
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=None):
self.assertRaises(
n_exc.QosPolicyNotFound,
self.qos_plugin.get_policy,
self.ctxt, self.policy.id)
def test_get_policy_bandwidth_limit_rule_for_nonexistent_policy(self):
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=None):
self.assertRaises(
n_exc.QosPolicyNotFound,
self.qos_plugin.get_policy_bandwidth_limit_rule,
self.ctxt, self.rule.id, self.policy.id)
def test_get_policy_bandwidth_limit_rules_for_nonexistent_policy(self):
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=None):
self.assertRaises(
n_exc.QosPolicyNotFound,
self.qos_plugin.get_policy_bandwidth_limit_rules,
self.ctxt, self.policy.id)
def test_create_policy_rule_for_nonexistent_policy(self):
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=None):
self.assertRaises(
n_exc.QosPolicyNotFound,
self.qos_plugin.create_policy_bandwidth_limit_rule,
self.ctxt, self.policy.id, self.rule_data)
def test_update_policy_rule_for_nonexistent_policy(self):
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=None):
self.assertRaises(
n_exc.QosPolicyNotFound,
self.qos_plugin.update_policy_bandwidth_limit_rule,
self.ctxt, self.rule.id, self.policy.id, self.rule_data)
def test_delete_policy_rule_for_nonexistent_policy(self):
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=None):
self.assertRaises(
n_exc.QosPolicyNotFound,
self.qos_plugin.delete_policy_bandwidth_limit_rule,
self.ctxt, self.rule.id, self.policy.id)

Some files were not shown because too many files have changed in this diff Show More