Support implied rules

Using keystone API[0] to get all role inference rules and makes it
possible to extend the used list of roles with implied roles.

[0] https://developer.openstack.org/api-ref/identity/v3/#list-all-role-inference-rules
Change-Id: Ia57351f3b21a82f4556ec61323abd295b427fc1e
This commit is contained in:
Sergey Vilgelm 2019-01-07 11:59:41 -06:00
parent 2daa5e6017
commit 19e3becfc7
No known key found for this signature in database
GPG Key ID: 08D0E2FF778887E6
10 changed files with 225 additions and 49 deletions

View File

@ -265,14 +265,6 @@ class PolicyAuthority(RbacAuthority):
return CONF.identity.admin_role in roles
def _get_access_token(self, roles):
roles = {r.lower() for r in roles if r}
# Extend roles for an user with admin or member role
if 'admin' in roles:
roles.add('member')
if 'member' in roles:
roles.add('reader')
access_token = {
"token": {
"roles": [{'name': r} for r in roles],

View File

@ -344,6 +344,9 @@ def _is_authorized(test_obj, service, rule, extra_target_data):
if not roles:
roles.append(CONF.patrole.rbac_test_role)
# Adding implied roles
roles = test_obj.rbac_utils.get_all_needed_roles(roles)
# Test RBAC against custom requirements. Otherwise use oslo.policy.
if CONF.patrole.test_custom_requirements:
authority = requirements_authority.RequirementsAuthority(

View File

@ -119,6 +119,10 @@ class RbacUtils(object):
:param test_obj: An instance of `tempest.test.BaseTestCase`.
"""
self.admin_role_id = None
self.rbac_role_ids = None
self._role_map = None
# Intialize the admin roles_client to perform role switching.
admin_mgr = clients.Manager(
credentials.get_configured_admin_credentials())
@ -132,12 +136,83 @@ class RbacUtils(object):
self.user_id = test_obj.os_primary.credentials.user_id
self.project_id = test_obj.os_primary.credentials.tenant_id
self._role_inferences_mapping = self._prepare_role_inferences_mapping()
# Change default role to admin
self._override_role(test_obj, False)
admin_role_id = None
rbac_role_ids = None
def _prepare_role_inferences_mapping(self):
"""Preparing roles mapping to support role inferences
Making query to `list-all-role-inference-rules`_ keystone API
returns all inference rules, which makes it possible to prepare
roles mapping.
It walks recursively through the raw data::
{"role_inferences": [
{
"implies": [{"id": "3", "name": "reader"}],
"prior_role": {"id": "2", "name": "member"}
},
{
"implies": [{"id": "2", "name": "member"}],
"prior_role": {"id": "1", "name": "admin"}
}
]
}
and converts it to the mapping::
{
"2": ["3"], # "member": ["reader"],
"1": ["2", "3"] # "admin": ["member", "reader"]
}
.. _list-all-role-inference-rules: https://developer.openstack.org/api-ref/identity/v3/#list-all-role-inference-rules
""" # noqa: E501
def process_roles(role_id, data):
roles = data.get(role_id, set())
for rid in roles.copy():
roles.update(process_roles(rid, data))
return roles
def convert_data(data):
res = {}
for rule in data:
prior_role = rule['prior_role']['id']
implies = {r['id'] for r in rule['implies']}
res[prior_role] = implies
return res
raw_data = self.admin_roles_client.list_all_role_inference_rules()
data = convert_data(raw_data['role_inferences'])
res = {}
for role_id in data:
res[role_id] = process_roles(role_id, data)
return res
def get_all_needed_roles(self, roles):
"""Extending given roles with roles from mapping
Examples::
["admin"] >> ["admin", "member", "reader"]
["member"] >> ["member", "reader"]
["reader"] >> ["reader"]
["custom_role"] >> ["custom_role"]
:param roles: list of roles
:return: extended list of roles
"""
res = set(r for r in roles)
for role in res.copy():
role_id = self._role_map.get(role)
implied_roles = self._role_inferences_mapping.get(role_id, set())
role_names = {self._role_map[rid] for rid in implied_roles}
res.update(role_names)
LOG.debug('All needed roles: %s; Base roles: %s', res, roles)
return list(res)
@contextlib.contextmanager
def override_role(self, test_obj):
@ -233,8 +308,8 @@ class RbacUtils(object):
def _get_roles_by_name(self):
available_roles = self.admin_roles_client.list_roles()['roles']
role_map = {r['name']: r['id'] for r in available_roles}
LOG.debug('Available roles: %s', list(role_map.keys()))
self._role_map = {r['name']: r['id'] for r in available_roles}
LOG.debug('Available roles: %s', list(self._role_map.keys()))
rbac_role_ids = []
roles = CONF.patrole.rbac_test_roles
@ -244,9 +319,9 @@ class RbacUtils(object):
roles.append(CONF.patrole.rbac_test_role)
for role_name in roles:
rbac_role_ids.append(role_map.get(role_name))
rbac_role_ids.append(self._role_map.get(role_name))
admin_role_id = role_map.get(CONF.identity.admin_role)
admin_role_id = self._role_map.get(CONF.identity.admin_role)
if not all([admin_role_id, all(rbac_role_ids)]):
missing_roles = []
@ -257,15 +332,18 @@ class RbacUtils(object):
missing_roles.append(CONF.identity.admin_role)
if not all(rbac_role_ids):
missing_roles += [role_name for role_name in roles
if not role_map.get(role_name)]
if not self._role_map.get(role_name)]
msg += " Following roles were not found: %s." % (
", ".join(missing_roles))
msg += " Available roles: %s." % ", ".join(list(role_map.keys()))
msg += " Available roles: %s." % ", ".join(list(
self._role_map.keys()))
raise rbac_exceptions.RbacResourceSetupFailed(msg)
self.admin_role_id = admin_role_id
self.rbac_role_ids = rbac_role_ids
# Adding backward mapping
self._role_map.update({v: k for k, v in self._role_map.items()})
def _create_user_role_on_project(self, role_ids):
for role_id in role_ids:

View File

@ -14,9 +14,19 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslotest import base
from tempest.tests import base
class TestCase(base.BaseTestCase):
class TestCase(base.TestCase):
"""Test case base class for all unit tests."""
def get_all_needed_roles(self, roles):
role_inferences_mapping = {
"admin": {"member", "reader"},
"member": {"reader"}
}
res = set(r.lower() for r in roles)
for role in res.copy():
res.update(role_inferences_mapping.get(role, set()))
return list(res)

View File

@ -94,6 +94,8 @@ class RbacUtilsFixture(fixtures.Fixture):
clients, 'Manager', spec=clients.Manager,
roles_v3_client=mock.Mock(), roles_client=mock.Mock()).start()
self.admin_roles_client = mock_admin_mgr.return_value.roles_v3_client
self.admin_roles_client.list_all_role_inference_rules.return_value = {
"role_inferences": []}
self.set_roles(['admin', 'member'], [])
@ -157,3 +159,28 @@ class RbacUtilsFixture(fixtures.Fixture):
self.admin_roles_client.list_roles.return_value = available_roles
self.admin_roles_client.list_user_roles_on_project.return_value = (
available_project_roles)
def get_all_needed_roles(self, roles):
self.admin_roles_client.list_all_role_inference_rules.return_value = {
"role_inferences": [
{
"implies": [{"id": "3", "name": "reader"}],
"prior_role": {"id": "2", "name": "member"}
},
{
"implies": [{"id": "2", "name": "member"}],
"prior_role": {"id": "1", "name": "admin"}
}
]
}
# Call real get_all_needed_roles function
with mock.patch.object(rbac_utils.RbacUtils, '_override_role',
autospec=True):
obj = rbac_utils.RbacUtils(mock.Mock())
obj._role_map = {
"1": "admin", "admin": "1",
"2": "member", "member": "2",
"3": "reader", "reader": "3"
}
return obj.get_all_needed_roles(roles)

View File

@ -17,10 +17,10 @@ import mock
import os
from tempest import config
from tempest.tests import base
from patrole_tempest_plugin import policy_authority
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin.tests.unit import base
from patrole_tempest_plugin.tests.unit import fixtures
CONF = config.CONF
@ -96,6 +96,8 @@ class PolicyAuthorityTest(base.TestCase):
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, service)
roles = self.get_all_needed_roles(roles)
for rule in allowed_rules:
allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
@ -286,7 +288,8 @@ class PolicyAuthorityTest(base.TestCase):
}
for rule in allowed_rules:
allowed = authority.allowed(rule, ['member'])
allowed = authority.allowed(
rule, self.get_all_needed_roles(['member']))
self.assertTrue(allowed)
# for sure that roles are in same order
mock_try_rule.call_args[0][2]["roles"] = sorted(

View File

@ -22,11 +22,11 @@ import fixtures
from tempest.lib import exceptions
from tempest import manager
from tempest import test
from tempest.tests import base
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import rbac_rule_validation as rbac_rv
from patrole_tempest_plugin import rbac_utils
from patrole_tempest_plugin.tests.unit import base
from patrole_tempest_plugin.tests.unit import fixtures as patrole_fixtures
CONF = cfg.CONF
@ -34,19 +34,22 @@ CONF = cfg.CONF
class BaseRBACRuleValidationTest(base.TestCase):
test_roles = ['member']
def setUp(self):
super(BaseRBACRuleValidationTest, self).setUp()
self.mock_test_args = mock.Mock(spec=test.BaseTestCase)
self.mock_test_args.os_primary = mock.Mock(spec=manager.Manager)
self.mock_test_args.rbac_utils = mock.Mock(
spec_set=rbac_utils.RbacUtils)
self.mock_test_args.rbac_utils.get_all_needed_roles.side_effect = \
self.get_all_needed_roles
# Setup credentials for mock client manager.
mock_creds = mock.Mock(user_id=mock.sentinel.user_id,
project_id=mock.sentinel.project_id)
setattr(self.mock_test_args.os_primary, 'credentials', mock_creds)
self.test_roles = ['member']
self.useFixture(
patrole_fixtures.ConfPatcher(rbac_test_roles=self.test_roles,
group='patrole'))
@ -56,28 +59,9 @@ class BaseRBACRuleValidationTest(base.TestCase):
group='patrole_log'))
class BaseRBACMultiRoleRuleValidationTest(base.TestCase):
class BaseRBACMultiRoleRuleValidationTest(BaseRBACRuleValidationTest):
def setUp(self):
super(BaseRBACMultiRoleRuleValidationTest, self).setUp()
self.mock_test_args = mock.Mock(spec=test.BaseTestCase)
self.mock_test_args.os_primary = mock.Mock(spec=manager.Manager)
self.mock_test_args.rbac_utils = mock.Mock(
spec_set=rbac_utils.RbacUtils)
# Setup credentials for mock client manager.
mock_creds = mock.Mock(user_id=mock.sentinel.user_id,
project_id=mock.sentinel.project_id)
setattr(self.mock_test_args.os_primary, 'credentials', mock_creds)
self.test_roles = ['member', 'anotherrole']
self.useFixture(
patrole_fixtures.ConfPatcher(rbac_test_roles=self.test_roles,
group='patrole'))
# Disable patrole log for unit tests.
self.useFixture(
patrole_fixtures.ConfPatcher(enable_reporting=False,
group='patrole_log'))
test_roles = ['member', 'anotherrole']
class RBACRuleValidationTest(BaseRBACRuleValidationTest):
@ -549,7 +533,7 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
mock.sentinel.action,
CONF.patrole.rbac_test_roles)
self.get_all_needed_roles(CONF.patrole.rbac_test_roles))
mock_log.error.assert_not_called()
@ -561,6 +545,8 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
evaluated correctly.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
expected_roles = self.get_all_needed_roles(
CONF.patrole.rbac_test_roles)
def partial_func(x):
return "foo" if x == "bar" else "qux"
@ -581,14 +567,14 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
"foo",
CONF.patrole.rbac_test_roles)
expected_roles)
policy_authority.allowed.reset_mock()
test_bar_policy(self.mock_test_args)
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
"qux",
CONF.patrole.rbac_test_roles)
expected_roles)
mock_log.error.assert_not_called()
@ -639,7 +625,10 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
def _assert_policy_authority_called_with(self, rules, mock_authority):
m_authority = mock_authority.PolicyAuthority.return_value
m_authority.allowed.assert_has_calls([
mock.call(rule, CONF.patrole.rbac_test_roles) for rule in rules
mock.call(
rule,
self.get_all_needed_roles(CONF.patrole.rbac_test_roles)
) for rule in rules
])
m_authority.allowed.reset_mock()

View File

@ -18,10 +18,10 @@ import testtools
from tempest.lib import exceptions as lib_exc
from tempest import test
from tempest.tests import base
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import rbac_utils
from patrole_tempest_plugin.tests.unit import base
from patrole_tempest_plugin.tests.unit import fixtures as patrole_fixtures
@ -213,6 +213,58 @@ class RBACUtilsTest(base.TestCase):
m_override_role.assert_called_once_with(test_obj)
m_validate.assert_called_once()
def test_prepare_role_inferences_mapping(self):
self.patchobject(rbac_utils.RbacUtils, '_override_role')
test_obj = mock.MagicMock()
_rbac_utils = rbac_utils.RbacUtils(test_obj)
_rbac_utils.admin_roles_client.list_all_role_inference_rules.\
return_value = {
"role_inferences": [
{
"implies": [{"id": "3", "name": "reader"}],
"prior_role": {"id": "2", "name": "member"}
},
{
"implies": [{"id": "2", "name": "member"}],
"prior_role": {"id": "1", "name": "admin"}
}
]
}
expected_role_inferences_mapping = {
"2": {"3"}, # "member": ["reader"],
"1": {"2", "3"} # "admin": ["member", "reader"]
}
actual_role_inferences_mapping = _rbac_utils.\
_prepare_role_inferences_mapping()
self.assertEqual(expected_role_inferences_mapping,
actual_role_inferences_mapping)
def test_get_all_needed_roles(self):
self.patchobject(rbac_utils.RbacUtils, '_override_role')
test_obj = mock.MagicMock()
_rbac_utils = rbac_utils.RbacUtils(test_obj)
_rbac_utils._role_inferences_mapping = {
"2": {"3"}, # "member": ["reader"],
"1": {"2", "3"} # "admin": ["member", "reader"]
}
_rbac_utils._role_map = {
"1": "admin", "admin": "1",
"2": "member", "member": "2",
"3": "reader", "reader": "3"
}
for roles, expected_roles in (
(['admin'], ['admin', 'member', 'reader']),
(['member'], ['member', 'reader']),
(['reader'], ['reader']),
(['custom_role'], ['custom_role']),
(['custom_role', 'member'], ['custom_role', 'member', 'reader']),
(['admin', 'member'], ['admin', 'member', 'reader']),
):
expected_roles = sorted(expected_roles)
actual_roles = sorted(_rbac_utils.get_all_needed_roles(roles))
self.assertEqual(expected_roles, actual_roles)
class RBACUtilsMixinTest(base.TestCase):

View File

@ -15,10 +15,10 @@
import os
from tempest.lib import exceptions
from tempest.tests import base
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import requirements_authority as req_auth
from patrole_tempest_plugin.tests.unit import base
class BaseRequirementsAuthorityTest(base.TestCase):

View File

@ -0,0 +1,22 @@
---
features:
- |
Supporting the role inference rules API gives Patrole an ability of testing
role chains, when one role implies the second which can also imply the
third:
``admin`` implies ``member`` implies ``reader``
Now in a case of testing against an ``admin`` role (``[patole]
rbac_test_roles`` = ``admin``) the ``rbac_rule_validation.action`` calls
the ``rbac_utils.get_all_needed_roles`` function to extend the roles
and validates a policy rule against the full list of possible roles:
["admin", "member", "reader"]
Here is few examples:
["admin"] >> ["admin", "member", "reader"]
["member"] >> ["member", "reader"]
["reader"] >> ["reader"]
["custom_role"] >> ["custom_role"]
["custom_role", "member"] >> ["custom_role", "member", "reader"]