
156 lines
5.7 KiB

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import portbindings
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from oslo_log import log as logging
from neutron._i18n import _LW
from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.callbacks.producer import registry as rpc_registry
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import exceptions
from neutron.objects.qos import policy as policy_object
from import qos_consts
LOG = logging.getLogger(__name__)
class QosServiceDriverManager(object):
def __init__(self):
self._drivers = []
self.notification_api = resources_rpc.ResourcesPushRpcApi()
self.rpc_notifications_required = False
rpc_registry.provide(self._get_qos_policy_cb, resources.QOS_POLICY)
# notify any registered QoS driver that we're ready, those will
# call the driver manager back with register_driver if they
# are enabled
registry.notify(qos_consts.QOS_PLUGIN, events.AFTER_INIT, self)
if self.rpc_notifications_required:
self.push_api = resources_rpc.ResourcesPushRpcApi()
def _get_qos_policy_cb(resource, policy_id, **kwargs):
context = kwargs.get('context')
if context is None:
'Received %(resource)s %(policy_id)s without context'),
{'resource': resource, 'policy_id': policy_id}
policy = policy_object.QosPolicy.get_object(context, id=policy_id)
return policy
def _validate_vnic_type(driver, vnic_type, port_id):
if driver.is_vnic_compatible(vnic_type):
return True
LOG.debug("vnic_type %(vnic_type)s of port %(port_id)s "
"is not compatible with QoS driver %(driver)s",
{'vnic_type': vnic_type,
'port_id': port_id,
return False
def _validate_vif_type(driver, vif_type, port_id):
if driver.is_vif_type_compatible(vif_type):
return True
LOG.debug("vif_type %(vif_type)s of port %(port_id)s "
"is not compatible with QoS driver %(driver)s",
{'vif_type': vif_type,
'port_id': port_id,
return False
def call(self, method_name, *args, **kwargs):
"""Helper method for calling a method across all extension drivers."""
exc_list = []
for driver in self._drivers:
getattr(driver, method_name)(*args, **kwargs)
except Exception as exc:
exception_msg = ("Extension driver '%(name)s' failed in "
exception_data = {'name':, 'method': method_name}
LOG.exception(exception_msg, exception_data)
if exc_list:
raise exceptions.DriverCallError(exc_list=exc_list)
if self.rpc_notifications_required:
context = kwargs.get('context') or args[0]
policy_obj = kwargs.get('policy_obj') or args[1]
# we don't push create_policy events since policies are empty
# on creation, they only become of any use when rules get
# attached to them.
if method_name == qos_consts.UPDATE_POLICY:
self.push_api.push(context, [policy_obj], rpc_events.UPDATED)
elif method_name == qos_consts.DELETE_POLICY:
self.push_api.push(context, [policy_obj], rpc_events.DELETED)
def register_driver(self, driver):
"""Register driver with qos plugin.
This method is called from drivers on INIT event.
self.rpc_notifications_required |= driver.requires_rpc_notifications
def validate_rule_for_port(self, rule, port):
for driver in self._drivers:
vif_type = port.binding.vif_type
if vif_type not in SKIPPED_VIF_TYPES:
if not self._validate_vif_type(driver, vif_type, port['id']):
vnic_type = port.binding.vnic_type
if not self._validate_vnic_type(driver, vnic_type, port['id']):
if driver.is_rule_supported(rule):
return True
return False
def supported_rule_types(self):
if not self._drivers:
return []
rule_types = set()
# Recalculate on every call to allow drivers determine supported rule
# types dynamically
for driver in self._drivers:
rule_types |= set(driver.supported_rules)
LOG.debug("Supported QoS rule types "
"(rules supported by at least one loaded QoS driver): %s",
return rule_types