Implement manager logic for group+system roles

This change implements all the manager APIs to grant roles to groups
at a system level. A subsequent patch will:

  - wire up the controllers and expose the assignment API publicly
  - return system roles when asking for user and group assignments

bp system-scope

Change-Id: I681023995ac2a605669a575abd43aac11e1ae012
This commit is contained in:
Lance Bragstad 2017-10-17 14:18:20 +00:00
parent 05e3ddb954
commit 420f50e6c7
2 changed files with 209 additions and 0 deletions

View File

@ -58,6 +58,7 @@ class Manager(manager.Manager):
_SYSTEM_SCOPE_TOKEN = 'system'
_USER_SYSTEM = 'UserSystem'
_GROUP_SYSTEM = 'GroupSystem'
_PROJECT = 'project'
_ROLE_REMOVED_FROM_USER = 'role_removed_from_user'
_INVALIDATION_USER_PROJECT_TOKENS = 'invalidate_user_project_tokens'
@ -1122,6 +1123,75 @@ class Manager(manager.Manager):
inherited = False
self.driver.delete_system_grant(role_id, user_id, target_id, inherited)
def check_system_grant_for_group(self, group_id, role_id):
"""Check if a group has a specific role on the system.
:param group_id: the ID of the group in the assignment
:param role_id: the ID of the system role in the assignment
:raises keystone.exception.RoleAssignmentNotFound: if the group doesn't
have a role assignment matching the role_id on the system
"""
target_id = self._SYSTEM_SCOPE_TOKEN
inherited = False
return self.driver.check_system_grant(
role_id, group_id, target_id, inherited
)
def list_system_grants_for_group(self, group_id):
"""Return a list of roles the group has on the system.
:param group_id: the ID of the group
:returns: a list of role assignments the group has system-wide
"""
target_id = self._SYSTEM_SCOPE_TOKEN
assignment_type = self._GROUP_SYSTEM
return self.driver.list_system_grants(
group_id, target_id, assignment_type
)
def create_system_grant_for_group(self, group_id, role_id):
"""Grant a group a role on the system.
:param group_id: the ID of the group
:param role_id: the ID of the role to grant on the system
"""
role = self.role_api.get_role(role_id)
if role.get('domain_id'):
raise exception.ValidationError(
'Role %(role_id)s is a domain-specific role. Unable to use '
'a domain-specific role in a system assignment.' % {
'role_id': role_id
}
)
target_id = self._SYSTEM_SCOPE_TOKEN
assignment_type = self._GROUP_SYSTEM
inherited = False
self.driver.create_system_grant(
role_id, group_id, target_id, assignment_type, inherited
)
def delete_system_grant_for_group(self, group_id, role_id):
"""Remove a system grant from a group.
:param group_id: the ID of the group
:param role_id: the ID of the role to remove from the group on the
system
:raises keystone.exception.RoleAssignmentNotFound: if the group doesn't
have a role assignment with role_id on the system
"""
target_id = self._SYSTEM_SCOPE_TOKEN
inherited = False
self.driver.delete_system_grant(
role_id, group_id, target_id, inherited
)
class RoleManager(manager.Manager):
"""Default pivot point for the Role backend."""

View File

@ -3839,3 +3839,142 @@ class SystemAssignmentTests(AssignmentTestHelperMixin):
user_id,
role['id']
)
def test_create_system_grant_for_group(self):
group_ref = unit.new_group_ref(CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group_ref)['id']
role_ref = self._create_role()
self.assignment_api.create_system_grant_for_group(
group_id, role_ref['id']
)
system_roles = self.assignment_api.list_system_grants_for_group(
group_id
)
self.assertEqual(len(system_roles), 1)
self.assertEqual(system_roles[0]['type'], 'GroupSystem')
self.assertEqual(system_roles[0]['target_id'], 'system')
self.assertEqual(system_roles[0]['actor_id'], group_id)
self.assertFalse(system_roles[0]['inherited'])
def test_list_system_grants_for_group(self):
group_ref = unit.new_group_ref(CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group_ref)['id']
first_role = self._create_role()
second_role = self._create_role()
self.assignment_api.create_system_grant_for_group(
group_id, first_role['id']
)
system_roles = self.assignment_api.list_system_grants_for_group(
group_id
)
self.assertEqual(len(system_roles), 1)
self.assignment_api.create_system_grant_for_group(
group_id, second_role['id']
)
system_roles = self.assignment_api.list_system_grants_for_group(
group_id
)
self.assertEqual(len(system_roles), 2)
def test_check_system_grant_for_group(self):
group_ref = unit.new_group_ref(CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group_ref)['id']
role = self._create_role()
self.assertRaises(
exception.RoleAssignmentNotFound,
self.assignment_api.check_system_grant_for_group,
group_id,
role['id']
)
self.assignment_api.create_system_grant_for_group(group_id, role['id'])
self.assignment_api.check_system_grant_for_group(group_id, role['id'])
def test_delete_system_grant_for_group(self):
group_ref = unit.new_group_ref(CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group_ref)['id']
role = self._create_role()
self.assignment_api.create_system_grant_for_group(group_id, role['id'])
system_roles = self.assignment_api.list_system_grants_for_group(
group_id
)
self.assertEqual(len(system_roles), 1)
self.assignment_api.delete_system_grant_for_group(group_id, role['id'])
system_roles = self.assignment_api.list_system_grants_for_group(
group_id
)
self.assertEqual(len(system_roles), 0)
def test_check_system_grant_for_group_with_invalid_role_fails(self):
group_ref = unit.new_group_ref(CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group_ref)['id']
self.assertRaises(
exception.RoleAssignmentNotFound,
self.assignment_api.check_system_grant_for_group,
group_id,
uuid.uuid4().hex
)
def test_check_system_grant_for_group_with_invalid_group_fails(self):
role = self._create_role()
self.assertRaises(
exception.RoleAssignmentNotFound,
self.assignment_api.check_system_grant_for_group,
uuid.uuid4().hex,
role['id']
)
def test_delete_system_grant_for_group_with_invalid_role_fails(self):
group_ref = unit.new_group_ref(CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group_ref)['id']
role = self._create_role()
self.assignment_api.create_system_grant_for_group(group_id, role['id'])
self.assertRaises(
exception.RoleAssignmentNotFound,
self.assignment_api.delete_system_grant_for_group,
group_id,
uuid.uuid4().hex
)
def test_delete_system_grant_for_group_with_invalid_group_fails(self):
group_ref = unit.new_group_ref(CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group_ref)['id']
role = self._create_role()
self.assignment_api.create_system_grant_for_group(group_id, role['id'])
self.assertRaises(
exception.RoleAssignmentNotFound,
self.assignment_api.delete_system_grant_for_group,
uuid.uuid4().hex,
role['id']
)
def test_list_system_grants_for_group_returns_empty_list(self):
group_ref = unit.new_group_ref(CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group_ref)['id']
system_roles = self.assignment_api.list_system_grants_for_group(
group_id
)
self.assertFalse(system_roles)
def test_create_system_grant_for_group_fails_with_domain_role(self):
group_ref = unit.new_group_ref(CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group_ref)['id']
role = self._create_role(CONF.identity.default_domain_id)
self.assertRaises(
exception.ValidationError,
self.assignment_api.create_system_grant_for_group,
group_id,
role['id']
)