objects: expose database model for NeutronDbObject instances
Sometimes object users need access to corresponding models that are used to persist object data. While it's not encouraged, and object consumers should try to rely solely on object API and fields, we should fulfill this special need, at least for now. One of use cases to access the corresponding database model are functions registered by plugins to extend core resources. Those functions are passed into register_dict_extend_funcs and expect the model as one of its arguments. Later, when more objects are adopted in base plugin code, and we are ready to switch extensions to objects, we can pass to those functions some wrappers that would trigger deprecation warnings on attempts to access attributes that are not available on objects; and then after a while finally switch to passing objects directly instead of those wrappers. Of course, that would not happen overnight, and the path would take several cycles. To avoid the stored reference to the model to influence other code fetching from the session, we detach (expunge) the model from the active database session on every fetch. We also refresh the model before detaching it when the corresponding object had synthetic fields changed, because that's usually an indication that some relationships may be stale on the model. Since we now consistently detach the model from the active session on each fetch, we cannot reuse it. So every time we hit update, we now need to refetch the model from the session, otherwise we will hit an error trying to refresh and/or detach an already detached model. Hence the change in NeutronDbObject.update to always trigger update_object irrespective to whether any persistent fields were changed. This makes test_update_no_changes test case incorrect, hence its removal. Due to the way RBAC metaclass works, it may trigger cls.get_object in the middle of object creation (to validate newly created RBAC entry against the object). It results in duplicate expunge calls for the same object model (one during object creation, another when fetching the same object to validate it for RBAC). To avoid that, switched RBAC code from objects API to direct objects.db_api.get_object calls that will avoid triggering the whole model expunge/refresh machinery. Now that we have models stored on objects, the patch switched back plugin code to passing models in places where we previously, by mistake, were passing objects into extensions. Specifically, the switch for allowed address pairs occurred with I3c937267ce789ed510373616713b3fa9517c18ac. For subnetpools, it happened in I1415c7a29af86d377ed31cce40888631a34d4811. Neither of those was released in Mitaka, so it did not break anyone using major releases. Also, we have not heard from any trunk chaser that would be affected by the mistake. There are not other objects used in database code where we would pass them into extensions, so we should be good. Closes-Bug: #1621837 Change-Id: I130609194f15b89df89e5606fb8193849edd14d8 Partially-Implements: blueprint adopt-oslo-versioned-objects-for-db
This commit is contained in:
parent
9f0647e181
commit
10ada71486
|
@ -58,7 +58,7 @@ class AllowedAddressPairsMixin(object):
|
|||
|
||||
def get_allowed_address_pairs(self, context, port_id):
|
||||
pairs = self._get_allowed_address_pairs_objs(context, port_id)
|
||||
return [self._make_allowed_address_pairs_dict(pair)
|
||||
return [self._make_allowed_address_pairs_dict(pair.db_obj)
|
||||
for pair in pairs]
|
||||
|
||||
def _get_allowed_address_pairs_objs(self, context, port_id):
|
||||
|
|
|
@ -167,8 +167,7 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
|
|||
'max_prefixlen': max_prefixlen,
|
||||
'is_default': subnetpool['is_default'],
|
||||
'shared': subnetpool['shared'],
|
||||
'prefixes': [str(prefix)
|
||||
for prefix in subnetpool['prefixes']],
|
||||
'prefixes': [prefix.cidr for prefix in subnetpool['prefixes']],
|
||||
'ip_version': subnetpool['ip_version'],
|
||||
'default_quota': subnetpool['default_quota'],
|
||||
'address_scope_id': subnetpool['address_scope_id']}
|
||||
|
|
|
@ -1012,7 +1012,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
|
|||
subnetpool = subnetpool_obj.SubnetPool(context, **pool_args)
|
||||
subnetpool.create()
|
||||
|
||||
return self._make_subnetpool_dict(subnetpool)
|
||||
return self._make_subnetpool_dict(subnetpool.db_obj)
|
||||
|
||||
def update_subnetpool(self, context, id, subnetpool):
|
||||
new_sp = subnetpool['subnetpool']
|
||||
|
@ -1048,12 +1048,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
|
|||
for key in ['min_prefixlen', 'max_prefixlen', 'default_prefixlen']:
|
||||
updated['key'] = str(updated[key])
|
||||
self._apply_dict_extend_functions(attributes.SUBNETPOOLS,
|
||||
updated, orig_sp)
|
||||
updated, orig_sp.db_obj)
|
||||
return updated
|
||||
|
||||
def get_subnetpool(self, context, id, fields=None):
|
||||
subnetpool = self._get_subnetpool(context, id)
|
||||
return self._make_subnetpool_dict(subnetpool, fields)
|
||||
return self._make_subnetpool_dict(subnetpool.db_obj, fields)
|
||||
|
||||
def get_subnetpools(self, context, filters=None, fields=None,
|
||||
sorts=None, limit=None, marker=None,
|
||||
|
@ -1062,7 +1062,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
|
|||
subnetpools = subnetpool_obj.SubnetPool.get_objects(
|
||||
context, _pager=pager, **filters)
|
||||
return [
|
||||
self._make_subnetpool_dict(pool, fields)
|
||||
self._make_subnetpool_dict(pool.db_obj, fields)
|
||||
for pool in subnetpools
|
||||
]
|
||||
|
||||
|
|
|
@ -358,7 +358,7 @@ class SubnetPoolReader(object):
|
|||
for prefix in prefix_list:
|
||||
ip_set.add(netaddr.IPNetwork(prefix))
|
||||
ip_set.compact()
|
||||
return [str(x.cidr) for x in ip_set.iter_cidrs()]
|
||||
return [x.cidr for x in ip_set.iter_cidrs()]
|
||||
|
||||
|
||||
class SubnetPoolHelper(object):
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
|
||||
import abc
|
||||
import copy
|
||||
import functools
|
||||
import itertools
|
||||
|
||||
from neutron_lib import exceptions
|
||||
|
@ -222,6 +223,24 @@ class NeutronObject(obj_base.VersionedObject,
|
|||
return len(cls.get_objects(context, **kwargs))
|
||||
|
||||
|
||||
def _detach_db_obj(func):
|
||||
"""Decorator to detach db_obj from the session."""
|
||||
@functools.wraps(func)
|
||||
def decorator(self, *args, **kwargs):
|
||||
synthetic_changed = bool(self._get_changed_synthetic_fields())
|
||||
res = func(self, *args, **kwargs)
|
||||
# some relationship based fields may be changed since we
|
||||
# captured the model, let's refresh it for the latest database
|
||||
# state
|
||||
if synthetic_changed:
|
||||
# TODO(ihrachys) consider refreshing just changed attributes
|
||||
self.obj_context.session.refresh(self.db_obj)
|
||||
# detach the model so that consequent fetches don't reuse it
|
||||
self.obj_context.session.expunge(self.db_obj)
|
||||
return res
|
||||
return decorator
|
||||
|
||||
|
||||
class DeclarativeObject(abc.ABCMeta):
|
||||
|
||||
def __init__(cls, name, bases, dct):
|
||||
|
@ -244,19 +263,23 @@ class DeclarativeObject(abc.ABCMeta):
|
|||
fields_no_update_set.add(key)
|
||||
cls.fields_no_update = list(fields_no_update_set)
|
||||
|
||||
# generate unique_keys from the model
|
||||
model = getattr(cls, 'db_model', None)
|
||||
if model and not getattr(cls, 'unique_keys', None):
|
||||
cls.unique_keys = []
|
||||
obj_field_names = set(cls.fields.keys())
|
||||
model_to_obj_translation = {
|
||||
v: k for (k, v) in cls.fields_need_translation.items()}
|
||||
if model:
|
||||
# generate unique_keys from the model
|
||||
if not getattr(cls, 'unique_keys', None):
|
||||
cls.unique_keys = []
|
||||
obj_field_names = set(cls.fields.keys())
|
||||
model_to_obj_translation = {
|
||||
v: k for (k, v) in cls.fields_need_translation.items()}
|
||||
|
||||
for model_unique_key in model_base.get_unique_keys(model):
|
||||
obj_unique_key = [model_to_obj_translation.get(key, key)
|
||||
for key in model_unique_key]
|
||||
if obj_field_names.issuperset(obj_unique_key):
|
||||
cls.unique_keys.append(obj_unique_key)
|
||||
for model_unique_key in model_base.get_unique_keys(model):
|
||||
obj_unique_key = [model_to_obj_translation.get(key, key)
|
||||
for key in model_unique_key]
|
||||
if obj_field_names.issuperset(obj_unique_key):
|
||||
cls.unique_keys.append(obj_unique_key)
|
||||
# detach db_obj right after object is loaded from the model
|
||||
cls.create = _detach_db_obj(cls.create)
|
||||
cls.update = _detach_db_obj(cls.update)
|
||||
|
||||
if (hasattr(cls, 'has_standard_attributes') and
|
||||
cls.has_standard_attributes()):
|
||||
|
@ -303,12 +326,22 @@ class NeutronDbObject(NeutronObject):
|
|||
# in to_dict()
|
||||
# obj_extra_fields = []
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(NeutronDbObject, self).__init__(*args, **kwargs)
|
||||
self._captured_db_model = None
|
||||
|
||||
@property
|
||||
def db_obj(self):
|
||||
'''Return a database model that persists object data.'''
|
||||
return self._captured_db_model
|
||||
|
||||
def from_db_object(self, db_obj):
|
||||
fields = self.modify_fields_from_db(db_obj)
|
||||
for field in self.fields:
|
||||
if field in fields and not self.is_synthetic(field):
|
||||
setattr(self, field, fields[field])
|
||||
self.load_synthetic_db_fields(db_obj)
|
||||
self._captured_db_model = db_obj
|
||||
self.obj_reset_changes()
|
||||
|
||||
@classmethod
|
||||
|
@ -364,6 +397,8 @@ class NeutronDbObject(NeutronObject):
|
|||
def _load_object(cls, context, db_obj):
|
||||
obj = cls(context)
|
||||
obj.from_db_object(db_obj)
|
||||
# detach the model so that consequent fetches don't reuse it
|
||||
context.session.expunge(obj.db_obj)
|
||||
return obj
|
||||
|
||||
def obj_load_attr(self, attrname):
|
||||
|
@ -441,6 +476,13 @@ class NeutronDbObject(NeutronObject):
|
|||
del fields[field]
|
||||
return fields
|
||||
|
||||
def _get_changed_synthetic_fields(self):
|
||||
fields = self.obj_get_changes()
|
||||
for field in self._get_changed_persistent_fields():
|
||||
if field in fields:
|
||||
del fields[field]
|
||||
return fields
|
||||
|
||||
def _validate_changed_fields(self, fields):
|
||||
fields = fields.copy()
|
||||
forbidden_updates = set(self.fields_no_update) & set(fields.keys())
|
||||
|
@ -492,8 +534,7 @@ class NeutronDbObject(NeutronObject):
|
|||
if synth_db_objs is not None:
|
||||
if not isinstance(synth_db_objs, list):
|
||||
synth_db_objs = [synth_db_objs]
|
||||
synth_objs = [objclass._load_object(self.obj_context,
|
||||
objclass.modify_fields_from_db(obj))
|
||||
synth_objs = [objclass._load_object(self.obj_context, obj)
|
||||
for obj in synth_db_objs]
|
||||
else:
|
||||
synth_objs = objclass.get_objects(
|
||||
|
@ -546,19 +587,19 @@ class NeutronDbObject(NeutronObject):
|
|||
updates = self._get_changed_persistent_fields()
|
||||
updates = self._validate_changed_fields(updates)
|
||||
|
||||
if updates:
|
||||
with db_api.autonested_transaction(self.obj_context.session):
|
||||
db_obj = obj_db_api.update_object(
|
||||
self.obj_context, self.db_model,
|
||||
self.modify_fields_to_db(updates),
|
||||
**self.modify_fields_to_db(
|
||||
self._get_composite_keys()))
|
||||
self.from_db_object(db_obj)
|
||||
with db_api.autonested_transaction(self.obj_context.session):
|
||||
db_obj = obj_db_api.update_object(
|
||||
self.obj_context, self.db_model,
|
||||
self.modify_fields_to_db(updates),
|
||||
**self.modify_fields_to_db(
|
||||
self._get_composite_keys()))
|
||||
self.from_db_object(db_obj)
|
||||
|
||||
def delete(self):
|
||||
obj_db_api.delete_object(self.obj_context, self.db_model,
|
||||
**self.modify_fields_to_db(
|
||||
self._get_composite_keys()))
|
||||
self._captured_db_model = None
|
||||
|
||||
@classmethod
|
||||
def count(cls, context, **kwargs):
|
||||
|
|
|
@ -145,7 +145,8 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
|
|||
if policy['action'] != models.ACCESS_SHARED:
|
||||
return
|
||||
target_tenant = policy['target_tenant']
|
||||
db_obj = cls.get_object(context, id=policy['object_id'])
|
||||
db_obj = obj_db_api.get_object(
|
||||
context.elevated(), cls.db_model, id=policy['object_id'])
|
||||
if db_obj.tenant_id == target_tenant:
|
||||
return
|
||||
cls._validate_rbac_policy_delete(context=context,
|
||||
|
@ -182,7 +183,8 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
|
|||
# (hopefully) melded with this one.
|
||||
if object_type != cls.rbac_db_model.object_type:
|
||||
return
|
||||
db_obj = cls.get_object(context.elevated(), id=policy['object_id'])
|
||||
db_obj = obj_db_api.get_object(
|
||||
context.elevated(), cls.db_model, id=policy['object_id'])
|
||||
if event in (events.BEFORE_CREATE, events.BEFORE_UPDATE):
|
||||
if (not context.is_admin and
|
||||
db_obj['tenant_id'] != context.tenant_id):
|
||||
|
@ -207,8 +209,9 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
|
|||
def update_shared(self, is_shared_new, obj_id):
|
||||
admin_context = self.obj_context.elevated()
|
||||
shared_prev = obj_db_api.get_object(admin_context, self.rbac_db_model,
|
||||
object_id=obj_id, target_tenant='*',
|
||||
action=models.ACCESS_SHARED)
|
||||
object_id=obj_id,
|
||||
target_tenant='*',
|
||||
action=models.ACCESS_SHARED)
|
||||
is_shared_prev = bool(shared_prev)
|
||||
if is_shared_prev == is_shared_new:
|
||||
return
|
||||
|
|
|
@ -97,13 +97,6 @@ class SubnetPool(base.NeutronDbObject):
|
|||
obj.reload_prefixes()
|
||||
return objs
|
||||
|
||||
def _get_changed_synthetic_fields(self):
|
||||
fields = self.obj_get_changes()
|
||||
for field in self._get_changed_persistent_fields():
|
||||
if field in fields:
|
||||
del fields[field]
|
||||
return fields
|
||||
|
||||
# TODO(ihrachys): Consider extending base to trigger registered methods
|
||||
def create(self):
|
||||
synthetic_changes = self._get_changed_synthetic_fields()
|
||||
|
|
|
@ -511,6 +511,11 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
|
|||
self.model_map = collections.defaultdict(list)
|
||||
self.model_map[self._test_class.db_model] = self.db_objs
|
||||
self.pager_map = collections.defaultdict(lambda: None)
|
||||
# don't validate refresh and expunge in tests that don't touch database
|
||||
# because otherwise it will fail due to db models not being injected
|
||||
# into active session in the first place
|
||||
mock.patch.object(self.context.session, 'refresh').start()
|
||||
mock.patch.object(self.context.session, 'expunge').start()
|
||||
|
||||
# TODO(ihrachys) document the intent of all common test cases in docstrings
|
||||
def test_get_object(self):
|
||||
|
@ -745,16 +750,6 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
|
|||
project_id = self.obj_fields[0]['project_id']
|
||||
self.assertEqual(project_id, obj.tenant_id)
|
||||
|
||||
@mock.patch.object(obj_db_api, 'update_object')
|
||||
def test_update_no_changes(self, update_mock):
|
||||
with mock.patch.object(base.NeutronDbObject,
|
||||
'_get_changed_persistent_fields',
|
||||
return_value={}):
|
||||
obj_keys = self.generate_object_keys(self._test_class)
|
||||
obj = self._test_class(self.context, **obj_keys)
|
||||
obj.update()
|
||||
self.assertFalse(update_mock.called)
|
||||
|
||||
@mock.patch.object(obj_db_api, 'update_object')
|
||||
def test_update_changes(self, update_mock):
|
||||
fields_to_update = self.get_updatable_fields(
|
||||
|
@ -1234,6 +1229,12 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
|
|||
objclass = self._get_ovo_object_class(cls_, field)
|
||||
if not objclass:
|
||||
continue
|
||||
|
||||
# check that the stored database model does not have non-empty
|
||||
# relationships
|
||||
dbattr = obj.fields_need_translation.get(field, field)
|
||||
self.assertFalse(getattr(obj.db_obj, dbattr, None))
|
||||
|
||||
objclass_fields = self._get_non_synth_fields(objclass,
|
||||
db_obj[field][0])
|
||||
|
||||
|
@ -1245,17 +1246,19 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
|
|||
synth_field_obj = objclass(self.context, **objclass_fields)
|
||||
synth_field_obj.create()
|
||||
|
||||
# populate the base object synthetic fields with created children
|
||||
if isinstance(cls_.fields[field], obj_fields.ObjectField):
|
||||
setattr(obj, field, synth_field_obj)
|
||||
else:
|
||||
setattr(obj, field, [synth_field_obj])
|
||||
# reload the parent object under test
|
||||
obj = cls_.get_object(self.context, **obj._get_composite_keys())
|
||||
|
||||
# check that the stored database model now has filled relationships
|
||||
dbattr = obj.fields_need_translation.get(field, field)
|
||||
self.assertTrue(getattr(obj.db_obj, dbattr, None))
|
||||
|
||||
# reset the object so that we can compare it to other clean objects
|
||||
obj.obj_reset_changes([field])
|
||||
|
||||
return obj
|
||||
|
||||
def test_get_object_with_synthetic_fields(self):
|
||||
def _test_get_with_synthetic_fields(self, getter):
|
||||
object_fields = self._get_object_synthetic_fields(self._test_class)
|
||||
if not object_fields:
|
||||
self.skipTest(
|
||||
|
@ -1263,11 +1266,21 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
|
|||
'in test class %r' % self._test_class
|
||||
)
|
||||
obj = self._create_object_with_synthetic_fields(self.db_objs[0])
|
||||
listed_obj = self._test_class.get_object(
|
||||
self.context, **obj._get_composite_keys())
|
||||
listed_obj = getter(self.context, **obj._get_composite_keys())
|
||||
self.assertTrue(listed_obj)
|
||||
self.assertEqual(obj, listed_obj)
|
||||
|
||||
def test_get_object_with_synthetic_fields(self):
|
||||
self._test_get_with_synthetic_fields(self._test_class.get_object)
|
||||
|
||||
def test_get_objects_with_synthetic_fields(self):
|
||||
def getter(*args, **kwargs):
|
||||
objs = self._test_class.get_objects(*args, **kwargs)
|
||||
self.assertEqual(1, len(objs))
|
||||
return objs[0]
|
||||
|
||||
self._test_get_with_synthetic_fields(getter)
|
||||
|
||||
# NOTE(korzen) _list method is used in neutron.tests.db.unit.db.
|
||||
# test_db_base_plugin_v2.DbOperationBoundMixin in _list_and_count_queries()
|
||||
# This is used in test_subnet for asserting that number of queries is
|
||||
|
@ -1290,6 +1303,25 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
|
|||
self.assertEqual(
|
||||
len(self.obj_fields), self._test_class.count(self.context))
|
||||
|
||||
def test_db_obj(self):
|
||||
obj = self._make_object(self.obj_fields[0])
|
||||
self.assertIsNone(obj.db_obj)
|
||||
|
||||
obj.create()
|
||||
self.assertIsNotNone(obj.db_obj)
|
||||
|
||||
fields_to_update = self.get_updatable_fields(self.obj_fields[1])
|
||||
if fields_to_update:
|
||||
old_model = copy.deepcopy(obj.db_obj)
|
||||
for key, val in fields_to_update.items():
|
||||
setattr(obj, key, val)
|
||||
obj.update()
|
||||
self.assertIsNotNone(obj.db_obj)
|
||||
self.assertNotEqual(old_model, obj.db_obj)
|
||||
|
||||
obj.delete()
|
||||
self.assertIsNone(obj.db_obj)
|
||||
|
||||
|
||||
class UniqueObjectBase(test_base.BaseTestCase):
|
||||
def setUp(self):
|
||||
|
|
|
@ -19,6 +19,7 @@ from oslo_versionedobjects import fields as obj_fields
|
|||
import sqlalchemy as sa
|
||||
|
||||
from neutron.callbacks import events
|
||||
from neutron import context as n_context
|
||||
from neutron.db import rbac_db_models
|
||||
from neutron.extensions import rbac as ext_rbac
|
||||
from neutron.objects.db import api as obj_db_api
|
||||
|
@ -129,7 +130,7 @@ class RbacNeutronDbObjectTestCase(test_base.BaseObjectIfaceTestCase,
|
|||
mock_validate_rbac_update.assert_not_called()
|
||||
|
||||
@mock.patch.object(_test_class, 'validate_rbac_policy_update')
|
||||
@mock.patch.object(_test_class, 'get_object',
|
||||
@mock.patch.object(obj_db_api, 'get_object',
|
||||
return_value={'tenant_id': 'tyrion_lannister'})
|
||||
def test_validate_rbac_policy_change_allowed_for_admin_or_owner(
|
||||
self, mock_get_object, mock_validate_update):
|
||||
|
@ -143,7 +144,7 @@ class RbacNeutronDbObjectTestCase(test_base.BaseObjectIfaceTestCase,
|
|||
self.assertTrue(self._test_class.validate_rbac_policy_update.called)
|
||||
|
||||
@mock.patch.object(_test_class, 'validate_rbac_policy_update')
|
||||
@mock.patch.object(_test_class, 'get_object',
|
||||
@mock.patch.object(obj_db_api, 'get_object',
|
||||
return_value={'tenant_id': 'king_beyond_the_wall'})
|
||||
def test_validate_rbac_policy_change_forbidden_for_outsiders(
|
||||
self, mock_get_object, mock_validate_update):
|
||||
|
@ -162,7 +163,7 @@ class RbacNeutronDbObjectTestCase(test_base.BaseObjectIfaceTestCase,
|
|||
self, policy, mock_validate_delete):
|
||||
self._test_class.validate_rbac_policy_delete(
|
||||
resource=mock.Mock(), event=events.BEFORE_DELETE,
|
||||
trigger='dummy_trigger', context=mock.Mock(),
|
||||
trigger='dummy_trigger', context=n_context.get_admin_context(),
|
||||
object_type=self._test_class.rbac_db_model.object_type,
|
||||
policy=policy)
|
||||
mock_validate_delete.assert_not_called()
|
||||
|
@ -171,7 +172,7 @@ class RbacNeutronDbObjectTestCase(test_base.BaseObjectIfaceTestCase,
|
|||
self._test_validate_rbac_policy_delete_handles_policy(
|
||||
{'action': 'unknown_action'})
|
||||
|
||||
@mock.patch.object(_test_class, 'get_object')
|
||||
@mock.patch.object(obj_db_api, 'get_object')
|
||||
def test_validate_rbac_policy_delete_skips_db_object_owner(self,
|
||||
mock_get_object):
|
||||
policy = {'action': rbac_db_models.ACCESS_SHARED,
|
||||
|
@ -181,7 +182,7 @@ class RbacNeutronDbObjectTestCase(test_base.BaseObjectIfaceTestCase,
|
|||
mock_get_object.return_value.tenant_id = policy['target_tenant']
|
||||
self._test_validate_rbac_policy_delete_handles_policy(policy)
|
||||
|
||||
@mock.patch.object(_test_class, 'get_object')
|
||||
@mock.patch.object(obj_db_api, 'get_object')
|
||||
@mock.patch.object(_test_class, 'get_bound_tenant_ids',
|
||||
return_value='tenant_id_shared_with')
|
||||
def test_validate_rbac_policy_delete_fails_single_tenant_and_in_use(
|
||||
|
@ -237,7 +238,7 @@ class RbacNeutronDbObjectTestCase(test_base.BaseObjectIfaceTestCase,
|
|||
'tenant_id': 'object_owner_tenant_id',
|
||||
'object_id': 'fake_obj_id'}
|
||||
context = mock.Mock()
|
||||
with mock.patch.object(self._test_class, 'get_object'):
|
||||
with mock.patch.object(obj_db_api, 'get_object'):
|
||||
self.assertRaises(
|
||||
ext_rbac.RbacPolicyInUse,
|
||||
self._test_class.validate_rbac_policy_delete,
|
||||
|
|
|
@ -56,6 +56,9 @@ class TestQosPlugin(base.BaseQosTestCase):
|
|||
self.qos_plugin.notification_driver_manager = mock.Mock()
|
||||
|
||||
self.ctxt = context.Context('fake_user', 'fake_tenant')
|
||||
mock.patch.object(self.ctxt.session, 'refresh').start()
|
||||
mock.patch.object(self.ctxt.session, 'expunge').start()
|
||||
|
||||
self.policy_data = {
|
||||
'policy': {'id': uuidutils.generate_uuid(),
|
||||
'tenant_id': uuidutils.generate_uuid(),
|
||||
|
|
Loading…
Reference in New Issue