Add the field "rbac_actions" to the model query hooks

This patch adds a new field to the models query hook: "rbac_actions".
This field stores the multiple RBAC actions to filter the query in
case of making a non-admin query.

The default RBAC action are "access_as_shared" and "access_as_readonly".
For networks and subnets, the action "access_as_external" is needed in
this query. Instead of adding a query hook, this additional field allows
to modify the required RBAC action filter.

Closes-Bug: #2059236
Change-Id: Ic41461223719c535ac9d2bbadfb1b6077c898903
This commit is contained in:
Rodolfo Alonso Hernandez 2024-03-27 10:45:11 +00:00
parent f6852071ef
commit 91cebe97bf
3 changed files with 123 additions and 14 deletions

View File

@ -28,6 +28,11 @@ from neutron_lib.objects import utils as obj_utils
from neutron_lib.utils import helpers
DEFAULT_RBAC_ACTIONS = {constants.ACCESS_SHARED,
constants.ACCESS_READONLY,
}
# Classes implementing extensions will register hooks into this dictionary
# for "augmenting" the "core way" of building a query for retrieving objects
# from a model class. Hooks are registered by invoking register_hook().
@ -37,11 +42,13 @@ _model_query_hooks = {
# 'query': query_hook,
# 'filter': filter_hook,
# 'result_filters': result_filters
# 'rbac_actions': rbac_actions
# },
# hook2: {
# 'query': query_hook,
# 'filter': filter_hook,
# 'result_filters': result_filters
# 'rbac_actions': rbac_actions
# },
# ...
# },
@ -50,11 +57,13 @@ _model_query_hooks = {
# 'query': query_hook,
# 'filter': filter_hook,
# 'result_filters': result_filters
# 'rbac_actions': rbac_actions
# },
# hook2: {
# 'query': query_hook,
# 'filter': filter_hook,
# 'result_filters': result_filters
# 'rbac_actions': rbac_actions
# },
# ...
# },
@ -63,18 +72,20 @@ _model_query_hooks = {
def register_hook(model, name, query_hook, filter_hook,
result_filters=None):
result_filters=None, rbac_actions=None):
"""Register a hook to be invoked when a query is executed.
Adds the hook components to the _model_query_hooks dict. Models are the
keys of this dict, whereas the value is another dict mapping hook names
to callables performing the hook.
to callables performing the hook or in the case of ``rbac_actions``, a set
of RBAC actions to filter the model.
:param model: The DB Model that the hook applies to.
:param name: A name for the hook.
:param query_hook: The method to be called to augment the query.
:param filter_hook: A method to be called to augment the query filter.
:param result_filters: A Method to be called to filter the query result.
:param rbac_actions: An iterable of RBAC actions or a single one (string).
:returns: None.
"""
if callable(query_hook):
@ -83,10 +94,16 @@ def register_hook(model, name, query_hook, filter_hook,
filter_hook = helpers.make_weak_ref(filter_hook)
if callable(result_filters):
result_filters = helpers.make_weak_ref(result_filters)
if rbac_actions is not None:
if isinstance(rbac_actions, (set, list, tuple)):
rbac_actions = set(rbac_actions)
else:
rbac_actions = {rbac_actions}
_model_query_hooks.setdefault(model, {})[name] = {
'query': query_hook,
'filter': filter_hook,
'result_filters': result_filters
'result_filters': result_filters,
'rbac_actions': rbac_actions,
}
@ -99,6 +116,23 @@ def get_hooks(model):
return _model_query_hooks.get(model, {}).values()
def get_rbac_actions(model):
"""Retrieve and combine all RBAC actions requested in a model
:param model: The DB Model to look up for query hooks.
:returns: A set of RBAC actions defined in the model or the default RBAC
actions ('access_as_shared', 'access_as_readonly')
"""
rbac_actions = None
for hook in get_hooks(model):
hook_rbac_actions = hook.get('rbac_actions')
if hook_rbac_actions is not None:
if rbac_actions is None:
rbac_actions = set()
rbac_actions.update(hook_rbac_actions)
return rbac_actions if rbac_actions is not None else DEFAULT_RBAC_ACTIONS
def query_with_hooks(context, model, field=None, lazy_fields=None):
"""Query with hooks using the said context and model.
@ -126,8 +160,7 @@ def query_with_hooks(context, model, field=None, lazy_fields=None):
rbac_model = model.rbac_entries.property.mapper.class_
query_filter = (
(model.tenant_id == context.tenant_id) |
(rbac_model.action.in_(
[constants.ACCESS_SHARED, constants.ACCESS_READONLY]) &
(rbac_model.action.in_(get_rbac_actions(model)) &
((rbac_model.target_project == context.tenant_id) |
(rbac_model.target_project == '*'))))
# This "group_by" clause will limit the number of registers

View File

@ -12,6 +12,9 @@
from unittest import mock
from oslo_utils import uuidutils
from neutron_lib import constants
from neutron_lib.db import model_query
from neutron_lib import fixture
from neutron_lib.tests import _base
@ -29,6 +32,12 @@ class TestHooks(_base.BaseTestCase):
def _mock_hook(self, x):
return x
def check_registered_hooks(self, registered_hooks, expected=None):
expected = expected if expected else {}
for d in registered_hooks:
for k in d.keys():
self.assertEqual(expected[k], d.get(k))
def test_register_hook(self):
mock_model = mock.Mock()
model_query.register_hook(
@ -38,9 +47,11 @@ class TestHooks(_base.BaseTestCase):
hook_ref = helpers.make_weak_ref(self._mock_hook)
registered_hooks = model_query.get_hooks(mock_model)
self.assertEqual(1, len(registered_hooks))
for d in registered_hooks:
for k in d.keys():
self.assertEqual(hook_ref, d.get(k))
expected_hooks = {'query': hook_ref,
'filter': hook_ref,
'result_filters': hook_ref,
'rbac_actions': None}
self.check_registered_hooks(registered_hooks, expected_hooks)
def test_register_hook_non_callables(self):
mock_model = mock.Mock()
@ -50,12 +61,44 @@ class TestHooks(_base.BaseTestCase):
hook_ref = helpers.make_weak_ref(self._mock_hook)
registered_hooks = model_query.get_hooks(mock_model)
self.assertEqual(1, len(registered_hooks))
for d in registered_hooks:
for k in d.keys():
if k == 'query':
self.assertEqual(hook_ref, d.get(k))
else:
self.assertEqual({}, d.get(k))
expected_hooks = {'query': hook_ref,
'filter': {},
'result_filters': {},
'rbac_actions': None}
self.check_registered_hooks(registered_hooks, expected_hooks)
def test_register_hook_with_mutiple_rbacs(self):
mock_model = mock.Mock()
model_query.register_hook(
mock_model, 'hook1', self._mock_hook,
self._mock_hook, result_filters=self._mock_hook,
rbac_actions=[constants.ACCESS_SHARED, constants.ACCESS_EXTERNAL])
self.assertEqual(1, len(model_query._model_query_hooks.keys()))
hook_ref = helpers.make_weak_ref(self._mock_hook)
registered_hooks = model_query.get_hooks(mock_model)
self.assertEqual(1, len(registered_hooks))
expected_hooks = {'query': hook_ref,
'filter': hook_ref,
'result_filters': hook_ref,
'rbac_actions': {constants.ACCESS_SHARED,
constants.ACCESS_EXTERNAL}}
self.check_registered_hooks(registered_hooks, expected_hooks)
def test_register_hook_with_one_rbac(self):
mock_model = mock.Mock()
model_query.register_hook(
mock_model, 'hook1', self._mock_hook,
self._mock_hook, result_filters=self._mock_hook,
rbac_actions=constants.ACCESS_READONLY)
self.assertEqual(1, len(model_query._model_query_hooks.keys()))
hook_ref = helpers.make_weak_ref(self._mock_hook)
registered_hooks = model_query.get_hooks(mock_model)
self.assertEqual(1, len(registered_hooks))
expected_hooks = {'query': hook_ref,
'filter': hook_ref,
'result_filters': hook_ref,
'rbac_actions': {constants.ACCESS_READONLY}}
self.check_registered_hooks(registered_hooks, expected_hooks)
def test_get_values(self):
mock_model = mock.Mock()
@ -68,3 +111,28 @@ class TestHooks(_base.BaseTestCase):
self.assertEqual(['value1', 'value2'], values)
query_with_hooks.assert_called_with(
mock_context, mock_model, field='fake_field')
def test_get_rbac_actions_no_rbacs(self):
model = uuidutils.generate_uuid()
model_query.register_hook(model, 'hook1', mock.ANY, mock.ANY)
rbacs = model_query.get_rbac_actions(model)
self.assertEqual(model_query.DEFAULT_RBAC_ACTIONS, rbacs)
def test_get_rbac_actions_one_rbac(self):
model = uuidutils.generate_uuid()
model_query.register_hook(model, 'hook1', mock.ANY, mock.ANY,
rbac_actions=constants.ACCESS_READONLY)
rbacs = model_query.get_rbac_actions(model)
self.assertEqual({constants.ACCESS_READONLY}, rbacs)
def test_get_rbac_actions_multiple_rbac(self):
model = uuidutils.generate_uuid()
model_query.register_hook(model, 'hook1', mock.ANY, mock.ANY,
rbac_actions=constants.ACCESS_READONLY)
model_query.register_hook(model, 'hook2', mock.ANY, mock.ANY,
rbac_actions=constants.ACCESS_READONLY)
model_query.register_hook(model, 'hook3', mock.ANY, mock.ANY,
rbac_actions=constants.ACCESS_SHARED)
rbacs = model_query.get_rbac_actions(model)
self.assertEqual({constants.ACCESS_READONLY, constants.ACCESS_SHARED},
rbacs)

View File

@ -0,0 +1,8 @@
---
features:
- |
A new field is added to the model query hooks: "rbac_actions". This field
can modify the RBAC entries filter, setting the required RBAC actions for
this specific model. This field is implemented initially for the "network"
and the "subnet" model, that require an additional RBAC action
("access_as_external").