Qos policy RBAC DB setup and migration

This patch implements a new database model required for the
qos-policy RBAC support. In addition it migrates the current qos-policy
'shared' attribute to leverage the new 'qospolicyrbacs' table.

'shared' is no longer a property of the QosPolicy DB model. Its status
is based on the tenant ID of the API caller. From an API perspective the
logic remains the same - tenants will see qos-policies as 'shared=True'
in case the qos-policy is shared with them). However, internal callers
(e.g. plugins, drivers, services) must not check for the 'shared'
attribute on qos-policy db objects any more.

DocImpact
APIImpact

Blueprint: rbac-qos
Related-bug: #1512587

Change-Id: I1c59073daa181005a3e878bc2fe033a0709fbf31
This commit is contained in:
Haim Daniel 2015-11-25 18:49:45 -05:00
parent e3210bc880
commit aeaf77a529
15 changed files with 1092 additions and 27 deletions

View File

@ -1 +1 @@
e3278ee65050
c6c112992c9

View File

@ -1 +1 @@
b4caf27aae4
15e43b934f81

View File

@ -0,0 +1,69 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""rbac_qos_policy
Revision ID: c6c112992c9
Revises: 8a6d8bdae39
Create Date: 2015-11-25 18:45:03.831359
"""
from alembic import op
from oslo_utils import uuidutils
import sqlalchemy as sa
from neutron.api.v2 import attributes as attrs
from neutron.db import rbac_db_models
# revision identifiers, used by Alembic.
revision = 'c6c112992c9'
down_revision = 'e3278ee65050'
depends_on = ('15e43b934f81',)
qos_rbacs = sa.Table(
'qospolicyrbacs', sa.MetaData(),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=attrs.TENANT_ID_MAX_LEN),
nullable=True),
sa.Column('target_tenant', sa.String(length=attrs.TENANT_ID_MAX_LEN),
nullable=False),
sa.Column('action', sa.String(length=255), nullable=False),
sa.Column('object_id', sa.String(length=36), nullable=False))
# A simple model of the qos_policies table with only the fields needed for
# the migration.
qos_policy = sa.Table('qos_policies', sa.MetaData(),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('tenant_id',
sa.String(length=attrs.TENANT_ID_MAX_LEN)),
sa.Column('shared', sa.Boolean(), nullable=False))
def upgrade():
op.bulk_insert(qos_rbacs, get_values())
op.drop_column('qos_policies', 'shared')
def get_values():
session = sa.orm.Session(bind=op.get_bind())
values = []
for row in session.query(qos_policy).filter(qos_policy.c.shared).all():
values.append({'id': uuidutils.generate_uuid(), 'object_id': row[0],
'tenant_id': row[1], 'target_tenant': '*',
'action': rbac_db_models.ACCESS_SHARED})
session.commit()
return values

View File

@ -0,0 +1,54 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""rbac_qos_policy
Revision ID: 15e43b934f81
Revises: 1df244e556f5
Create Date: 2015-11-25 18:45:03.819115
"""
from alembic import op
import sqlalchemy as sa
from neutron.api.v2 import attributes as attrs
# revision identifiers, used by Alembic.
revision = '15e43b934f81'
down_revision = 'b4caf27aae4'
def upgrade():
op.create_table('qospolicyrbacs',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('tenant_id',
sa.String(length=attrs.TENANT_ID_MAX_LEN),
nullable=True),
sa.Column('target_tenant',
sa.String(length=attrs.TENANT_ID_MAX_LEN),
nullable=False),
sa.Column('action', sa.String(length=255), nullable=False),
sa.Column('object_id', sa.String(length=36),
nullable=False),
sa.ForeignKeyConstraint(['object_id'],
['qos_policies.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('target_tenant',
'object_id', 'action'),
mysql_engine='InnoDB')
op.create_index(op.f('ix_qospolicyrbacs_tenant_id'), 'qospolicyrbacs',
['tenant_id'], unique=False)

View File

@ -18,13 +18,16 @@ import sqlalchemy as sa
from neutron.api.v2 import attributes as attrs
from neutron.db import model_base
from neutron.db import models_v2
from neutron.db import rbac_db_models
class QosPolicy(model_base.BASEV2, model_base.HasId, model_base.HasTenant):
__tablename__ = 'qos_policies'
name = sa.Column(sa.String(attrs.NAME_MAX_LEN))
description = sa.Column(sa.String(attrs.DESCRIPTION_MAX_LEN))
shared = sa.Column(sa.Boolean, nullable=False)
rbac_entries = sa.orm.relationship(rbac_db_models.QosPolicyRBAC,
backref='qos_policy', lazy='joined',
cascade='all, delete, delete-orphan')
class QosNetworkPolicyBinding(model_base.BASEV2):

View File

@ -24,6 +24,9 @@ from neutron.common import exceptions as n_exc
from neutron.db import model_base
ACCESS_SHARED = 'access_as_shared'
class InvalidActionForType(n_exc.InvalidInput):
message = _("Invalid action '%(action)s' for object type "
"'%(object_type)s'. Valid actions: %(valid_actions)s")
@ -75,13 +78,27 @@ def get_type_model_map():
return {table.object_type: table for table in RBACColumns.__subclasses__()}
def _object_id_column(foreign_key):
return sa.Column(sa.String(36),
sa.ForeignKey(foreign_key, ondelete="CASCADE"),
nullable=False)
class NetworkRBAC(RBACColumns, model_base.BASEV2):
"""RBAC table for networks."""
object_id = sa.Column(sa.String(36),
sa.ForeignKey('networks.id', ondelete="CASCADE"),
nullable=False)
object_id = _object_id_column('networks.id')
object_type = 'network'
def get_valid_actions(self):
return ('access_as_shared',)
return (ACCESS_SHARED,)
class QosPolicyRBAC(RBACColumns, model_base.BASEV2):
"""RBAC table for qos policies."""
object_id = _object_id_column('qos_policies.id')
object_type = 'qos_policy'
def get_valid_actions(self):
return (ACCESS_SHARED,)

View File

@ -129,6 +129,11 @@ class NeutronDbObject(NeutronObject):
obj.obj_reset_changes()
return objs
@classmethod
def is_accessible(cls, context, db_obj):
return (context.is_admin or
context.tenant_id == db_obj.tenant_id)
def _get_changed_persistent_fields(self):
fields = self.obj_get_changes()
for field in self.synthetic_fields:

View File

@ -13,23 +13,32 @@
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from six import add_metaclass
from neutron._i18n import _
from neutron.common import exceptions
from neutron.db import api as db_api
from neutron.db import models_v2
from neutron.db.qos import api as qos_db_api
from neutron.db.qos import models as qos_db_model
from neutron.db.rbac_db_models import QosPolicyRBAC
from neutron.objects import base
from neutron.objects.qos import rule as rule_obj_impl
from neutron.objects import rbac_db
@obj_base.VersionedObjectRegistry.register
@add_metaclass(rbac_db.RbacNeutronMetaclass)
class QosPolicy(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
# required by RbacNeutronMetaclass
rbac_db_model = QosPolicyRBAC
db_model = qos_db_model.QosPolicy
port_binding_model = qos_db_model.QosPortPolicyBinding
@ -48,6 +57,9 @@ class QosPolicy(base.NeutronDbObject):
synthetic_fields = ['rules']
binding_models = {'network': network_binding_model,
'port': port_binding_model}
def to_dict(self):
dict_ = super(QosPolicy, self).to_dict()
if 'rules' in dict_:
@ -80,14 +92,6 @@ class QosPolicy(base.NeutronDbObject):
raise exceptions.QosRuleNotFound(policy_id=self.id,
rule_id=rule_id)
@staticmethod
def _is_policy_accessible(context, db_obj):
#TODO(QoS): Look at I3426b13eede8bfa29729cf3efea3419fb91175c4 for
# other possible solutions to this.
return (context.is_admin or
db_obj.shared or
db_obj.tenant_id == context.tenant_id)
@classmethod
def get_by_id(cls, context, id):
# We want to get the policy regardless of its tenant id. We'll make
@ -96,7 +100,7 @@ class QosPolicy(base.NeutronDbObject):
with db_api.autonested_transaction(admin_context.session):
policy_obj = super(QosPolicy, cls).get_by_id(admin_context, id)
if (not policy_obj or
not cls._is_policy_accessible(context, policy_obj)):
not cls.is_accessible(context, policy_obj)):
return
policy_obj.reload_rules()
@ -112,7 +116,7 @@ class QosPolicy(base.NeutronDbObject):
**kwargs)
result = []
for obj in objs:
if not cls._is_policy_accessible(context, obj):
if not cls.is_accessible(context, obj):
continue
obj.reload_rules()
result.append(obj)
@ -142,12 +146,8 @@ class QosPolicy(base.NeutronDbObject):
self.reload_rules()
def delete(self):
models = (
('network', self.network_binding_model),
('port', self.port_binding_model)
)
with db_api.autonested_transaction(self._context.session):
for object_type, model in models:
for object_type, model in self.binding_models.items():
binding_db_obj = db_api.get_object(self._context, model,
policy_id=self.id)
if binding_db_obj:
@ -177,3 +177,30 @@ class QosPolicy(base.NeutronDbObject):
qos_db_api.delete_policy_port_binding(self._context,
policy_id=self.id,
port_id=port_id)
@classmethod
def _get_bound_tenant_ids(cls, session, binding_db, bound_db,
binding_db_id_column, policy_id):
return list(itertools.chain.from_iterable(
session.query(bound_db.tenant_id).join(
binding_db, bound_db.id == binding_db_id_column).filter(
binding_db.policy_id == policy_id).all()))
@classmethod
def get_bound_tenant_ids(cls, context, policy_id):
"""Implements RbacNeutronObject.get_bound_tenant_ids.
:returns: set -- a set of tenants' ids dependant on QosPolicy.
"""
net = models_v2.Network
qosnet = qos_db_model.QosNetworkPolicyBinding
port = models_v2.Port
qosport = qos_db_model.QosPortPolicyBinding
bound_tenants = []
with db_api.autonested_transaction(context.session):
bound_tenants.extend(cls._get_bound_tenant_ids(
context.session, qosnet, net, qosnet.network_id, policy_id))
bound_tenants.extend(
cls._get_bound_tenant_ids(context.session, qosport, port,
qosport.port_id, policy_id))
return set(bound_tenants)

301
neutron/objects/rbac_db.py Normal file
View File

@ -0,0 +1,301 @@
# Copyright 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import itertools
from six import add_metaclass
from sqlalchemy import and_
from neutron._i18n import _
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db import common_db_mixin
from neutron.db import rbac_db_mixin
from neutron.db import rbac_db_models as models
from neutron.extensions import rbac as ext_rbac
from neutron.objects import base
@add_metaclass(abc.ABCMeta)
class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
base.NeutronDbObject):
rbac_db_model = None
@classmethod
@abc.abstractmethod
def get_bound_tenant_ids(cls, context, obj_id):
"""Returns ids of all tenants depending on this db object.
Has to be implemented by classes using RbacNeutronMetaclass.
The tenants are the ones that need the sharing or 'visibility' of the
object to them. E.g: for QosPolicy that would be the tenants using the
Networks and Ports with the shared QosPolicy applied to them.
:returns: set -- a set of tenants' ids dependent on this object.
"""
@classmethod
def is_shared_with_tenant(cls, context, obj_id, tenant_id):
ctx = context.elevated()
rbac_db_model = cls.rbac_db_model
with ctx.session.begin(subtransactions=True):
return (common_db_mixin.model_query(ctx, rbac_db_model).filter(
and_(rbac_db_model.object_id == obj_id,
rbac_db_model.action == models.ACCESS_SHARED,
rbac_db_model.target_tenant.in_(
['*', tenant_id]))).count() != 0)
@classmethod
def is_accessible(cls, context, db_obj):
return (super(
RbacNeutronDbObjectMixin, cls).is_accessible(context, db_obj) or
cls.is_shared_with_tenant(context, db_obj.id,
context.tenant_id))
@classmethod
def _get_db_obj_rbac_entries(cls, context, rbac_obj_id, rbac_action):
rbac_db_model = cls.rbac_db_model
return common_db_mixin.model_query(context, rbac_db_model).filter(
and_(rbac_db_model.object_id == rbac_obj_id,
rbac_db_model.action == rbac_action))
@classmethod
def _get_tenants_with_shared_access_to_db_obj(cls, context, obj_id):
return set(itertools.chain.from_iterable(context.session.query(
cls.rbac_db_model.target_tenant).filter(
and_(cls.rbac_db_model.object_id == obj_id,
cls.rbac_db_model.action == models.ACCESS_SHARED,
cls.rbac_db_model.target_tenant != '*'))))
@classmethod
def _validate_rbac_policy_delete(cls, context, obj_id, target_tenant):
ctx_admin = context.elevated()
rb_model = cls.rbac_db_model
bound_tenant_ids = cls.get_bound_tenant_ids(ctx_admin, obj_id)
db_obj_sharing_entries = cls._get_db_obj_rbac_entries(
ctx_admin, obj_id, models.ACCESS_SHARED)
def raise_policy_in_use():
raise ext_rbac.RbacPolicyInUse(
object_id=obj_id,
details='tenant_id={}'.format(target_tenant))
if target_tenant != '*':
# if there is a wildcard rule, we can return early because it
# shares the object globally
wildcard_sharing_entries = db_obj_sharing_entries.filter(
rb_model.target_tenant == '*')
if wildcard_sharing_entries.count():
return
if target_tenant in bound_tenant_ids:
raise_policy_in_use()
return
# for the wildcard we need to query all of the rbac entries to
# see if any allow the object sharing
other_target_tenants = cls._get_tenants_with_shared_access_to_db_obj(
ctx_admin, obj_id)
if not bound_tenant_ids.issubset(other_target_tenants):
raise_policy_in_use()
@classmethod
def validate_rbac_policy_delete(cls, resource, event, trigger, context,
object_type, policy, **kwargs):
"""Callback to handle RBAC_POLICY, BEFORE_DELETE callback.
:raises: RbacPolicyInUse -- in case the policy is in use.
"""
if policy['action'] != models.ACCESS_SHARED:
return
target_tenant = policy['target_tenant']
db_obj = cls.get_by_id(context, policy['object_id'])
if db_obj.tenant_id == target_tenant:
return
cls._validate_rbac_policy_delete(context=context,
obj_id=policy['object_id'],
target_tenant=target_tenant)
@classmethod
def validate_rbac_policy_update(cls, resource, event, trigger, context,
object_type, policy, **kwargs):
"""Callback to handle RBAC_POLICY, BEFORE_UPDATE callback.
:raises: RbacPolicyInUse -- in case the update is forbidden.
"""
prev_tenant = policy['target_tenant']
new_tenant = kwargs['policy_update']['target_tenant']
if prev_tenant == new_tenant:
return
if new_tenant != '*':
return cls.validate_rbac_policy_delete(
resource, event, trigger, context, object_type, policy)
@classmethod
def validate_rbac_policy_change(cls, resource, event, trigger, context,
object_type, policy, **kwargs):
"""Callback to validate RBAC_POLICY changes.
This is the dispatching function for create, update and delete
callbacks. On creation and update, verify that the creator is an admin
or owns the resource being shared.
"""
# TODO(hdaniel): As this code was shamelessly stolen from
# NeutronDbPluginV2.validate_network_rbac_policy_change(), those pieces
# should be synced and contain the same bugs, until Network RBAC logic
# (hopefully) melded with this one.
if object_type != cls.rbac_db_model.object_type:
return
db_obj = cls.get_by_id(context.elevated(), policy['object_id'])
if event in (events.BEFORE_CREATE, events.BEFORE_UPDATE):
if (not context.is_admin and
db_obj['tenant_id'] != context.tenant_id):
msg = _("Only admins can manipulate policies on objects "
"they do not own")
raise n_exc.InvalidInput(error_message=msg)
callback_map = {events.BEFORE_UPDATE: cls.validate_rbac_policy_update,
events.BEFORE_DELETE: cls.validate_rbac_policy_delete}
if event in callback_map:
return callback_map[event](resource, event, trigger, context,
object_type, policy, **kwargs)
def attach_rbac(self, obj_id, tenant_id, target_tenant='*'):
obj_type = self.rbac_db_model.object_type
rbac_policy = {'rbac_policy': {'object_id': obj_id,
'target_tenant': target_tenant,
'tenant_id': tenant_id,
'object_type': obj_type,
'action': models.ACCESS_SHARED}}
return self.create_rbac_policy(self._context, rbac_policy)
def update_shared(self, is_shared_new, obj_id):
admin_context = self._context.elevated()
shared_prev = db_api.get_object(admin_context, self.rbac_db_model,
object_id=obj_id, target_tenant='*',
action=models.ACCESS_SHARED)
is_shared_prev = bool(shared_prev)
if is_shared_prev == is_shared_new:
return
# 'shared' goes False -> True
if not is_shared_prev and is_shared_new:
self.attach_rbac(obj_id, self._context.tenant_id)
return
# 'shared' goes True -> False is actually an attempt to delete
# rbac rule for sharing obj_id with target_tenant = '*'
self._validate_rbac_policy_delete(self._context, obj_id, '*')
return self._context.session.delete(shared_prev)
def _update_post(self):
self.update_shared(self.shared, self.id)
def _update_hook(self, update_orig):
with db_api.autonested_transaction(self._context.session):
update_orig(self)
_update_post(self)
def _create_post(self):
if self.shared:
self.attach_rbac(self.id, self._context.tenant_id)
def _create_hook(self, orig_create):
with db_api.autonested_transaction(self._context.session):
orig_create(self)
_create_post(self)
def _to_dict_hook(self, to_dict_orig):
dct = to_dict_orig(self)
dct['shared'] = self.is_shared_with_tenant(self._context,
self.id,
self._context.tenant_id)
return dct
class RbacNeutronMetaclass(type):
"""Adds support for RBAC in NeutronDbObjects.
Injects code for CRUD operations and modifies existing ops to do so.
"""
@classmethod
def _get_attribute(mcs, attribute_name, bases):
for b in bases:
attribute = getattr(b, attribute_name, None)
if attribute:
return attribute
@classmethod
def get_attribute(mcs, attribute_name, bases, dct):
return (dct.get(attribute_name, None) or
mcs._get_attribute(attribute_name, bases))
@classmethod
def update_synthetic_fields(mcs, bases, dct):
if not dct.get('synthetic_fields', None):
synthetic_attr = mcs.get_attribute('synthetic_fields', bases, dct)
dct['synthetic_fields'] = synthetic_attr or []
if 'shared' in dct['synthetic_fields']:
raise n_exc.ObjectActionError(
action=_('shared attribute switching to synthetic'),
reason=_('already a synthetic attribute'))
dct['synthetic_fields'].append('shared')
@staticmethod
def subscribe_to_rbac_events(class_instance):
for e in (events.BEFORE_CREATE, events.BEFORE_UPDATE,
events.BEFORE_DELETE):
registry.subscribe(class_instance.validate_rbac_policy_change,
rbac_db_mixin.RBAC_POLICY, e)
@staticmethod
def validate_existing_attrs(cls_name, dct):
if 'shared' not in dct['fields']:
raise KeyError(_('No shared key in %s fields') % cls_name)
if 'rbac_db_model' not in dct:
raise AttributeError(_('rbac_db_model not found in %s') % cls_name)
@staticmethod
def get_replaced_method(orig_method, new_method):
def func(self):
return new_method(self, orig_method)
return func
@classmethod
def replace_class_methods_with_hooks(mcs, bases, dct):
methods_replacement_map = {'create': _create_hook,
'update': _update_hook,
'to_dict': _to_dict_hook}
for orig_method_name, new_method in methods_replacement_map.items():
orig_method = mcs.get_attribute(orig_method_name, bases, dct)
hook_method = mcs.get_replaced_method(orig_method,
new_method)
dct[orig_method_name] = hook_method
def __new__(mcs, name, bases, dct):
mcs.validate_existing_attrs(name, dct)
mcs.update_synthetic_fields(bases, dct)
mcs.replace_class_methods_with_hooks(bases, dct)
cls = type(name, (RbacNeutronDbObjectMixin,) + bases, dct)
mcs.subscribe_to_rbac_events(cls)
return cls

View File

@ -12,8 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions
from tempest import test
import testtools
from neutron.services.qos import qos_consts
@ -448,3 +450,257 @@ class QosBandwidthLimitRuleTestJSON(base.BaseAdminNetworkTest):
rules_ids = [r['id'] for r in rules]
self.assertIn(rule1['id'], rules_ids)
self.assertNotIn(rule2['id'], rules_ids)
class RbacSharedQosPoliciesTest(base.BaseAdminNetworkTest):
force_tenant_isolation = True
credentials = ['primary', 'alt', 'admin']
@classmethod
def resource_setup(cls):
super(RbacSharedQosPoliciesTest, cls).resource_setup()
if not test.is_extension_enabled('qos', 'network'):
msg = "qos extension not enabled."
raise cls.skipException(msg)
cls.client2 = cls.alt_manager.network_client
def _create_qos_policy(self, tenant_id=None):
args = {'name': data_utils.rand_name('test-policy'),
'description': 'test policy',
'shared': False,
'tenant_id': tenant_id}
qos_policy = self.admin_client.create_qos_policy(**args)['policy']
self.addCleanup(self.admin_client.delete_qos_policy, qos_policy['id'])
return qos_policy
def _make_admin_policy_shared_to_tenant_id(self, tenant_id):
policy = self._create_qos_policy()
rbac_policy = self.admin_client.create_rbac_policy(
object_type='qos_policy',
object_id=policy['id'],
action='access_as_shared',
target_tenant=tenant_id,
)['rbac_policy']
return {'policy': policy, 'rbac_policy': rbac_policy}
def _create_network(self, qos_policy_id, client, should_cleanup=True):
net = client.create_network(
name=data_utils.rand_name('test-network'),
qos_policy_id=qos_policy_id)['network']
if should_cleanup:
self.addCleanup(client.delete_network, net['id'])
return net
@test.idempotent_id('b9dcf582-d3b3-11e5-950a-54ee756c66df')
def test_policy_sharing_with_wildcard(self):
qos_pol = self.create_qos_policy(
name=data_utils.rand_name('test-policy'),
description='test-shared-policy', shared=False)
self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies'])
# test update shared False -> True
self.admin_client.update_qos_policy(qos_pol['id'], shared=True)
qos_pol['shared'] = True
self.client2.show_qos_policy(qos_pol['id'])
rbac_pol = {'target_tenant': '*',
'tenant_id': self.admin_client.tenant_id,
'object_type': 'qos_policy',
'object_id': qos_pol['id'],
'action': 'access_as_shared'}
rbac_policies = self.admin_client.list_rbac_policies()['rbac_policies']
rbac_policies = [r for r in rbac_policies if r.pop('id')]
self.assertIn(rbac_pol, rbac_policies)
# update shared True -> False should fail because the policy is bound
# to a network
net = self._create_network(qos_pol['id'], self.admin_client, False)
with testtools.ExpectedException(exceptions.Conflict):
self.admin_client.update_qos_policy(qos_pol['id'], shared=False)
# delete the network, and update shared True -> False should pass now
self.admin_client.delete_network(net['id'])
self.admin_client.update_qos_policy(qos_pol['id'], shared=False)
qos_pol['shared'] = False
self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies'])
def _create_net_bound_qos_rbacs(self):
res = self._make_admin_policy_shared_to_tenant_id(
self.client.tenant_id)
qos_policy, rbac_for_client_tenant = res['policy'], res['rbac_policy']
# add a wildcard rbac rule - now the policy globally shared
rbac_wildcard = self.admin_client.create_rbac_policy(
object_type='qos_policy',
object_id=qos_policy['id'],
action='access_as_shared',
target_tenant='*',
)['rbac_policy']
# tenant1 now uses qos policy for net
self._create_network(qos_policy['id'], self.client)
return rbac_for_client_tenant, rbac_wildcard
@test.idempotent_id('328b1f70-d424-11e5-a57f-54ee756c66df')
def test_net_bound_shared_policy_wildcard_and_tenant_id_wild_remove(self):
client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs()
# globally unshare the qos-policy, the specific share should remain
self.admin_client.delete_rbac_policy(wildcard_rbac['id'])
self.client.list_rbac_policies(id=client_rbac['id'])
@test.idempotent_id('328b1f70-d424-11e5-a57f-54ee756c66df')
def test_net_bound_shared_policy_wildcard_and_tenant_id_wild_remains(self):
client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs()
# remove client_rbac policy the wildcard share should remain
self.admin_client.delete_rbac_policy(client_rbac['id'])
self.client.list_rbac_policies(id=wildcard_rbac['id'])
@test.idempotent_id('2ace9adc-da6e-11e5-aafe-54ee756c66df')
def test_policy_sharing_with_wildcard_and_tenant_id(self):
res = self._make_admin_policy_shared_to_tenant_id(
self.client.tenant_id)
qos_policy, rbac = res['policy'], res['rbac_policy']
qos_pol = self.client.show_qos_policy(qos_policy['id'])['policy']
self.assertTrue(qos_pol['shared'])
with testtools.ExpectedException(exceptions.NotFound):
self.client2.show_qos_policy(qos_policy['id'])
# make the qos-policy globally shared
self.admin_client.update_qos_policy(qos_policy['id'], shared=True)
qos_pol = self.client2.show_qos_policy(qos_policy['id'])['policy']
self.assertTrue(qos_pol['shared'])
# globally unshare the qos-policy, the specific share should remain
self.admin_client.update_qos_policy(qos_policy['id'], shared=False)
self.client.show_qos_policy(qos_policy['id'])
with testtools.ExpectedException(exceptions.NotFound):
self.client2.show_qos_policy(qos_policy['id'])
self.assertIn(rbac,
self.admin_client.list_rbac_policies()['rbac_policies'])
@test.idempotent_id('9f85c76a-a350-11e5-8ae5-54ee756c66df')
def test_policy_target_update(self):
res = self._make_admin_policy_shared_to_tenant_id(
self.client.tenant_id)
# change to client2
update_res = self.admin_client.update_rbac_policy(
res['rbac_policy']['id'], target_tenant=self.client2.tenant_id)
self.assertEqual(self.client2.tenant_id,
update_res['rbac_policy']['target_tenant'])
# make sure everything else stayed the same
res['rbac_policy'].pop('target_tenant')
update_res['rbac_policy'].pop('target_tenant')
self.assertEqual(res['rbac_policy'], update_res['rbac_policy'])
@test.idempotent_id('a9b39f46-a350-11e5-97c7-54ee756c66df')
def test_network_presence_prevents_policy_rbac_policy_deletion(self):
res = self._make_admin_policy_shared_to_tenant_id(
self.client2.tenant_id)
qos_policy_id = res['policy']['id']
self._create_network(qos_policy_id, self.client2)
# a network with shared qos-policy should prevent the deletion of an
# rbac-policy required for it to be shared
with testtools.ExpectedException(exceptions.Conflict):
self.admin_client.delete_rbac_policy(res['rbac_policy']['id'])
# a wildcard policy should allow the specific policy to be deleted
# since it allows the remaining port
wild = self.admin_client.create_rbac_policy(
object_type='qos_policy', object_id=res['policy']['id'],
action='access_as_shared', target_tenant='*')['rbac_policy']
self.admin_client.delete_rbac_policy(res['rbac_policy']['id'])
# now that wildcard is the only remaining, it should be subjected to
# the same restriction
with testtools.ExpectedException(exceptions.Conflict):
self.admin_client.delete_rbac_policy(wild['id'])
# we can't update the policy to a different tenant
with testtools.ExpectedException(exceptions.Conflict):
self.admin_client.update_rbac_policy(
wild['id'], target_tenant=self.client2.tenant_id)
@test.idempotent_id('b0fe87e8-a350-11e5-9f08-54ee756c66df')
def test_regular_client_shares_to_another_regular_client(self):
# owned by self.admin_client
policy = self._create_qos_policy()
with testtools.ExpectedException(exceptions.NotFound):
self.client.show_qos_policy(policy['id'])
rbac_policy = self.admin_client.create_rbac_policy(
object_type='qos_policy', object_id=policy['id'],
action='access_as_shared',
target_tenant=self.client.tenant_id)['rbac_policy']
self.client.show_qos_policy(policy['id'])
self.assertIn(rbac_policy,
self.admin_client.list_rbac_policies()['rbac_policies'])
# ensure that 'client2' can't see the rbac-policy sharing the
# qos-policy to it because the rbac-policy belongs to 'client'
self.assertNotIn(rbac_policy['id'], [p['id'] for p in
self.client2.list_rbac_policies()['rbac_policies']])
@test.idempotent_id('ba88d0ca-a350-11e5-a06f-54ee756c66df')
def test_filter_fields(self):
policy = self._create_qos_policy()
self.admin_client.create_rbac_policy(
object_type='qos_policy', object_id=policy['id'],
action='access_as_shared', target_tenant=self.client2.tenant_id)
field_args = (('id',), ('id', 'action'), ('object_type', 'object_id'),
('tenant_id', 'target_tenant'))
for fields in field_args:
res = self.admin_client.list_rbac_policies(fields=fields)
self.assertEqual(set(fields), set(res['rbac_policies'][0].keys()))
@test.idempotent_id('c10d993a-a350-11e5-9c7a-54ee756c66df')
def test_rbac_policy_show(self):
res = self._make_admin_policy_shared_to_tenant_id(
self.client.tenant_id)
p1 = res['rbac_policy']
p2 = self.admin_client.create_rbac_policy(
object_type='qos_policy', object_id=res['policy']['id'],
action='access_as_shared',
target_tenant='*')['rbac_policy']
self.assertEqual(
p1, self.admin_client.show_rbac_policy(p1['id'])['rbac_policy'])
self.assertEqual(
p2, self.admin_client.show_rbac_policy(p2['id'])['rbac_policy'])
@test.idempotent_id('c7496f86-a350-11e5-b380-54ee756c66df')
def test_filter_rbac_policies(self):
policy = self._create_qos_policy()
rbac_pol1 = self.admin_client.create_rbac_policy(
object_type='qos_policy', object_id=policy['id'],
action='access_as_shared',
target_tenant=self.client2.tenant_id)['rbac_policy']
rbac_pol2 = self.admin_client.create_rbac_policy(
object_type='qos_policy', object_id=policy['id'],
action='access_as_shared',
target_tenant=self.admin_client.tenant_id)['rbac_policy']
res1 = self.admin_client.list_rbac_policies(id=rbac_pol1['id'])[
'rbac_policies']
res2 = self.admin_client.list_rbac_policies(id=rbac_pol2['id'])[
'rbac_policies']
self.assertEqual(1, len(res1))
self.assertEqual(1, len(res2))
self.assertEqual(rbac_pol1['id'], res1[0]['id'])
self.assertEqual(rbac_pol2['id'], res2[0]['id'])
@test.idempotent_id('cd7d755a-a350-11e5-a344-54ee756c66df')
def test_regular_client_blocked_from_sharing_anothers_policy(self):
qos_policy = self._make_admin_policy_shared_to_tenant_id(
self.client.tenant_id)['policy']
with testtools.ExpectedException(exceptions.BadRequest):
self.client.create_rbac_policy(
object_type='qos_policy', object_id=qos_policy['id'],
action='access_as_shared',
target_tenant=self.client2.tenant_id)
# make sure the rbac-policy is invisible to the tenant for which it's
# being shared
self.assertFalse(self.client.list_rbac_policies()['rbac_policies'])

View File

@ -246,7 +246,6 @@ class QosPolicyDbObjectTestCase(test_base.BaseDbObjectTestCase,
self.assertEqual(rule_dict, obj_dict['rules'][0])
def test_shared_default(self):
self.db_obj.pop('shared')
obj = self._test_class(self.context, **self.db_obj)
self.assertFalse(obj.shared)
@ -274,3 +273,13 @@ class QosPolicyDbObjectTestCase(test_base.BaseDbObjectTestCase,
policy_obj.reload_rules()
self.assertEqual([rule_obj], policy_obj.rules)
def test_get_bound_tenant_ids_returns_set_of_tenant_ids(self):
obj = self._create_test_policy()
obj.attach_port(self._port['id'])
ids = self._test_class.get_bound_tenant_ids(self.context, obj['id'])
self.assertEqual(ids.pop(), self._port['tenant_id'])
self.assertEqual(len(ids), 0)
obj.detach_port(self._port['id'])
obj.delete()

View File

@ -234,7 +234,7 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
with mock.patch.object(base.NeutronDbObject,
'_get_changed_persistent_fields',
return_value={}):
obj = self._test_class(self.context)
obj = self._test_class(self.context, id=7777)
obj.update()
self.assertFalse(update_mock.called)
@ -311,7 +311,8 @@ class BaseDbObjectTestCase(_BaseObjectTestCase):
# TODO(ihrachys): replace with port.create() once we get an object
# implementation for ports
self._port = db_api.create_object(self.context, models_v2.Port,
{'name': 'test-port1',
{'tenant_id': 'fake_tenant_id',
'name': 'test-port1',
'network_id': network['id'],
'mac_address': 'fake_mac',
'admin_state_up': True,

View File

@ -0,0 +1,312 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from six import add_metaclass
import sqlalchemy as sa
from neutron.callbacks import events
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db import model_base
from neutron.db import rbac_db_models
from neutron.extensions import rbac as ext_rbac
from neutron.objects import base
from neutron.objects import rbac_db
from neutron.tests.unit.objects import test_base
from neutron.tests.unit import testlib_api
class FakeDbModel(object):
def __init__(self, *args, **kwargs):
pass
class FakeRbacModel(rbac_db_models.RBACColumns, model_base.BASEV2):
object_id = sa.Column(sa.String(36), nullable=False)
object_type = 'fake_rbac_object'
def get_valid_actions(self):
return (rbac_db_models.ACCESS_SHARED,)
@obj_base.VersionedObjectRegistry.register_if(False)
@add_metaclass(rbac_db.RbacNeutronMetaclass)
class FakeNeutronDbObject(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
rbac_db_model = FakeRbacModel
db_model = FakeDbModel
fields = {
'id': obj_fields.UUIDField(),
'field1': obj_fields.StringField(),
'field2': obj_fields.StringField(),
'shared': obj_fields.BooleanField(default=False),
}
fields_no_update = ['id']
synthetic_fields = ['field2']
def get_bound_tenant_ids(cls, context, policy_id):
pass
class RbacNeutronDbObjectTestCase(test_base.BaseObjectIfaceTestCase,
testlib_api.SqlTestCase):
_test_class = FakeNeutronDbObject
def setUp(self):
super(RbacNeutronDbObjectTestCase, self).setUp()
FakeNeutronDbObject.update_post = mock.Mock()
@mock.patch.object(_test_class, 'rbac_db_model')
def test_get_tenants_with_shared_access_to_db_obj_return_tenant_ids(
self, *mocks):
ctx = mock.Mock()
fake_ids = {'tenant_id_' + str(i) for i in range(10)}
ctx.session.query.return_value.filter.return_value = [
(fake_id,) for fake_id in fake_ids]
ret_ids = self._test_class._get_tenants_with_shared_access_to_db_obj(
ctx, 'fake_db_obj_id')
self.assertEqual(fake_ids, ret_ids)
def test_is_accessible_for_admin(self):
ctx = mock.Mock(is_admin=True, tenant_id='we_dont_care')
self.assertTrue(self._test_class.is_accessible(ctx, None))
def test_is_accessible_for_db_object_owner(self):
ctx = mock.Mock(is_admin=False, tenant_id='db_object_owner')
db_obj = mock.Mock(tenant_id=ctx.tenant_id)
self.assertTrue(self._test_class.is_accessible(ctx, db_obj))
@mock.patch.object(_test_class, 'is_shared_with_tenant', return_value=True)
def test_is_accessible_if_shared_with_tenant(self, mock_is_shared):
ctx = mock.Mock(is_admin=False, tenant_id='db_object_shareholder')
db_obj = mock.Mock(tenant_id='db_object_owner')
self.assertTrue(self._test_class.is_accessible(ctx, db_obj))
mock_is_shared.assert_called_once_with(
mock.ANY, db_obj.id, ctx.tenant_id)
@mock.patch.object(_test_class, 'is_shared_with_tenant',
return_value=False)
def test_is_accessible_fails_for_unauthorized_tenant(self, mock_is_shared):
ctx = mock.Mock(is_admin=False, tenant_id='Billy_the_kid')
db_obj = mock.Mock(tenant_id='db_object_owner')
self.assertFalse(self._test_class.is_accessible(ctx, db_obj))
mock_is_shared.assert_called_once_with(
mock.ANY, db_obj.id, ctx.tenant_id)
def _rbac_policy_generate_change_events(self, resource, trigger,
context, object_type, policy,
event_list):
for event in event_list:
self._test_class.validate_rbac_policy_change(
resource, event, trigger, context, object_type, policy)
@mock.patch.object(_test_class, 'validate_rbac_policy_update')
def test_validate_rbac_policy_change_handles_only_object_type(
self, mock_validate_rbac_update):
self._rbac_policy_generate_change_events(
resource=None, trigger='dummy_trigger', context=None,
object_type='dummy_object_type', policy=None,
event_list=(events.BEFORE_CREATE, events.BEFORE_UPDATE,
events.BEFORE_DELETE))
mock_validate_rbac_update.assert_not_called()
@mock.patch.object(_test_class, 'validate_rbac_policy_update')
@mock.patch.object(_test_class, 'get_by_id',
return_value={'tenant_id': 'tyrion_lannister'})
def test_validate_rbac_policy_change_allowed_for_admin_or_owner(
self, mock_get_by_id, mock_validate_update):
context = mock.Mock(is_admin=True, tenant_id='db_obj_owner_id')
self._rbac_policy_generate_change_events(
resource=None, trigger='dummy_trigger', context=context,
object_type=self._test_class.rbac_db_model.object_type,
policy={'object_id': 'fake_object_id'},
event_list=(events.BEFORE_CREATE, events.BEFORE_UPDATE))
self.assertTrue(self._test_class.validate_rbac_policy_update.called)
@mock.patch.object(_test_class, 'validate_rbac_policy_update')
@mock.patch.object(_test_class, 'get_by_id',
return_value={'tenant_id': 'king_beyond_the_wall'})
def test_validate_rbac_policy_change_forbidden_for_outsiders(
self, mock_get_by_id, mock_validate_update):
context = mock.Mock(is_admin=False, tenant_id='db_obj_owner_id')
self.assertRaises(
n_exc.InvalidInput,
self._rbac_policy_generate_change_events,
resource=mock.Mock(), trigger='dummy_trigger', context=context,
object_type=self._test_class.rbac_db_model.object_type,
policy={'object_id': 'fake_object_id'},
event_list=(events.BEFORE_CREATE, events.BEFORE_UPDATE))
self.assertFalse(mock_validate_update.called)
@mock.patch.object(_test_class, '_validate_rbac_policy_delete')
def _test_validate_rbac_policy_delete_handles_policy(
self, policy, mock_validate_delete):
self._test_class.validate_rbac_policy_delete(
resource=mock.Mock(), event=events.BEFORE_DELETE,
trigger='dummy_trigger', context=mock.Mock(),
object_type=self._test_class.rbac_db_model.object_type,
policy=policy)
mock_validate_delete.assert_not_called()
def test_validate_rbac_policy_delete_handles_shared_action(self):
self._test_validate_rbac_policy_delete_handles_policy(
{'action': 'unknown_action'})
@mock.patch.object(_test_class, 'get_by_id')
def test_validate_rbac_policy_delete_skips_db_object_owner(self,
mock_get_by_id):
policy = {'action': rbac_db_models.ACCESS_SHARED,
'target_tenant': 'fake_tenant_id',
'object_id': 'fake_obj_id',
'tenant_id': 'fake_tenant_id'}
mock_get_by_id.return_value.tenant_id = policy['target_tenant']
self._test_validate_rbac_policy_delete_handles_policy(policy)
@mock.patch.object(_test_class, 'get_by_id')
@mock.patch.object(_test_class, 'get_bound_tenant_ids',
return_value='tenant_id_shared_with')
def test_validate_rbac_policy_delete_fails_single_tenant_and_in_use(
self, get_bound_tenant_ids_mock, mock_get_by_id):
policy = {'action': rbac_db_models.ACCESS_SHARED,
'target_tenant': 'tenant_id_shared_with',
'tenant_id': 'object_owner_tenant_id',
'object_id': 'fake_obj_id'}
context = mock.Mock()
with mock.patch.object(
self._test_class,
'_get_db_obj_rbac_entries') as target_tenants_mock:
filter_mock = target_tenants_mock.return_value.filter
filter_mock.return_value.count.return_value = 0
self.assertRaises(
ext_rbac.RbacPolicyInUse,
self._test_class.validate_rbac_policy_delete,
resource=None,
event=events.BEFORE_DELETE,
trigger='dummy_trigger',
context=context,
object_type=self._test_class.rbac_db_model.object_type,
policy=policy)
def test_validate_rbac_policy_delete_not_bound_tenant_success(self):
context = mock.Mock()
with mock.patch.object(
self._test_class, 'get_bound_tenant_ids',
return_value={'fake_tid2', 'fake_tid3'}), \
mock.patch.object(self._test_class,
'_get_db_obj_rbac_entries') as get_rbac_entries_mock, \
mock.patch.object(
self._test_class,
'_get_tenants_with_shared_access_to_db_obj') as sh_tids:
get_rbac_entries_mock.filter.return_value.count.return_value = 0
self._test_class._validate_rbac_policy_delete(
context=context,
obj_id='fake_obj_id',
target_tenant='fake_tid1')
sh_tids.assert_not_called()
@mock.patch.object(_test_class, '_get_db_obj_rbac_entries')
@mock.patch.object(_test_class,
'_get_tenants_with_shared_access_to_db_obj',
return_value=['some_other_tenant'])
@mock.patch.object(_test_class, 'get_bound_tenant_ids',
return_value={'fake_id1'})
def test_validate_rbac_policy_delete_fails_single_used_wildcarded(
self, get_bound_tenant_ids_mock, mock_tenants_with_shared_access,
_get_db_obj_rbac_entries_mock):
policy = {'action': rbac_db_models.ACCESS_SHARED,
'target_tenant': '*',
'tenant_id': 'object_owner_tenant_id',
'object_id': 'fake_obj_id'}
context = mock.Mock()
with mock.patch.object(self._test_class, 'get_by_id'):
self.assertRaises(
ext_rbac.RbacPolicyInUse,
self._test_class.validate_rbac_policy_delete,
resource=mock.Mock(),
event=events.BEFORE_DELETE,
trigger='dummy_trigger',
context=context,
object_type=self._test_class.rbac_db_model.object_type,
policy=policy)
@mock.patch.object(_test_class, 'attach_rbac')
@mock.patch.object(db_api, 'get_object', return_value=['fake_rbac_policy'])
@mock.patch.object(_test_class, '_validate_rbac_policy_delete')
def test_update_shared_avoid_duplicate_update(
self, mock_validate_delete, get_object_mock, attach_rbac_mock):
obj_id = 'fake_obj_id'
self._test_class(mock.Mock()).update_shared(is_shared_new=True,
obj_id=obj_id)
get_object_mock.assert_called_with(
mock.ANY, self._test_class.rbac_db_model, object_id=obj_id,
target_tenant='*', action=rbac_db_models.ACCESS_SHARED)
self.assertFalse(mock_validate_delete.called)
self.assertFalse(attach_rbac_mock.called)
@mock.patch.object(_test_class, 'attach_rbac')
@mock.patch.object(db_api, 'get_object', return_value=[])
@mock.patch.object(_test_class, '_validate_rbac_policy_delete')
def test_update_shared_wildcard(
self, mock_validate_delete, get_object_mock, attach_rbac_mock):
obj_id = 'fake_obj_id'
test_neutron_obj = self._test_class(mock.Mock())
test_neutron_obj.update_shared(is_shared_new=True, obj_id=obj_id)
get_object_mock.assert_called_with(
mock.ANY, self._test_class.rbac_db_model, object_id=obj_id,
target_tenant='*', action=rbac_db_models.ACCESS_SHARED)
attach_rbac_mock.assert_called_with(
obj_id, test_neutron_obj._context.tenant_id)
@mock.patch.object(_test_class, 'attach_rbac')
@mock.patch.object(db_api, 'get_object', return_value=['fake_rbac_policy'])
@mock.patch.object(_test_class, '_validate_rbac_policy_delete')
def test_update_shared_remove_wildcard_sharing(
self, mock_validate_delete, get_object_mock, attach_rbac_mock):
obj_id = 'fake_obj_id'
self._test_class(mock.Mock()).update_shared(is_shared_new=False,
obj_id=obj_id)
get_object_mock.assert_called_with(
mock.ANY, self._test_class.rbac_db_model, object_id=obj_id,
target_tenant='*', action=rbac_db_models.ACCESS_SHARED)
self.assertFalse(attach_rbac_mock.attach_rbac.called)
mock_validate_delete.assert_called_with(mock.ANY, obj_id, '*')
@mock.patch.object(_test_class, 'create_rbac_policy')
def test_attach_rbac_returns_type(self, create_rbac_mock):
obj_id = 'fake_obj_id'
tenant_id = 'fake_tenant_id'
target_tenant = 'fake_target_tenant'
self._test_class(mock.Mock()).attach_rbac(obj_id, tenant_id,
target_tenant)
rbac_pol = create_rbac_mock.call_args_list[0][0][1]['rbac_policy']
self.assertEqual(rbac_pol['object_id'], obj_id)
self.assertEqual(rbac_pol['target_tenant'], target_tenant)
self.assertEqual(rbac_pol['action'], rbac_db_models.ACCESS_SHARED)
self.assertEqual(rbac_pol['object_type'],
self._test_class.rbac_db_model.object_type)

View File

@ -76,11 +76,17 @@ class TestQosPlugin(base.BaseQosTestCase):
self.assertIsInstance(
method.call_args[0][1], policy_object.QosPolicy)
def test_add_policy(self):
@mock.patch(
'neutron.objects.rbac_db.RbacNeutronDbObjectMixin'
'.create_rbac_policy')
def test_add_policy(self, *mocks):
self.qos_plugin.create_policy(self.ctxt, self.policy_data)
self._validate_notif_driver_params('create_policy')
def test_update_policy(self):
@mock.patch(
'neutron.objects.rbac_db.RbacNeutronDbObjectMixin'
'.create_rbac_policy')
def test_update_policy(self, *mocks):
fields = base_object.get_updatable_fields(
policy_object.QosPolicy, self.policy_data['policy'])
self.qos_plugin.update_policy(

View File

@ -0,0 +1,5 @@
---
prelude: >
RBAC support for QoS policies
features:
- Neutron now supports sharing of QoS policies between a subset of tenants.