Implement GET /role_assignment API call

Add support for the GET /role_assignment call as a first step
to making role_assignment a first class entity.

This patch also enables v3 collection filtering to match against
attributes of entities being returned in the list, using the same
dot notation (e.g. user.id) that we already support for policy file
checking against filters.

Limitations:

- The current implementation uses the standard v3 collections wrapper
  mechanism for filtering.  Given the potential numbers of role
  assignments in a large system, this may have performance and resource
  impacts.  A future improvement would pass the filters into the
  driver layer to keep the internal assignment processing to a minimum.

- The LDAP backend is not currently supported

Implements bp get-role-assignments

Change-Id: I6ff2ea780e39d7097a88214fbb3ddee1b924c30c
This commit is contained in:
Henry Nash 2013-06-21 22:50:40 +01:00
parent 62d948a66b
commit fa10d4945c
11 changed files with 872 additions and 1 deletions

View File

@ -66,6 +66,8 @@
"identity:create_grant": [["rule:admin_required"]],
"identity:revoke_grant": [["rule:admin_required"]],
"identity:list_role_assignments": [["rule:admin_required"]],
"identity:get_policy": [["rule:admin_required"]],
"identity:list_policies": [["rule:admin_required"]],
"identity:create_policy": [["rule:admin_required"]],

View File

@ -280,7 +280,8 @@ class V3Controller(V2Controller):
if attr in context['query_string']:
value = context['query_string'][attr]
return [r for r in refs if _attr_match(r[attr], value)]
return [r for r in refs if _attr_match(
flatten(r).get(attr), value)]
return refs
def _require_matching_id(self, value, ref):

View File

@ -180,6 +180,45 @@ class Identity(kvs.Base, identity.Driver):
else:
self.update_metadata(user_id, tenant_id, metadata_ref)
def list_role_assignments(self):
"""List the role assignments.
The kvs backend stores role assignments as key-values:
"metadata-{target}-{actor}", with the value being a role list
i.e. "metadata-MyProjectID-MyUserID" [role1, role2]
...so we enumerate the list and extract the targets, actors
and roles.
"""
assignment_list = []
metadata_keys = filter(lambda x: x.startswith("metadata-"),
self.db.keys())
for key in metadata_keys:
template = {}
meta_id1 = key.split('-')[1]
meta_id2 = key.split('-')[2]
try:
self.get_project(meta_id1)
template['project_id'] = meta_id1
except exception.NotFound:
template['domain_id'] = meta_id1
try:
self._get_user(meta_id2)
template['user_id'] = meta_id2
except exception.NotFound:
template['group_id'] = meta_id2
entry = self.db.get(key)
for r in entry.get('roles', []):
role_assignment = template.copy()
role_assignment['role_id'] = r
assignment_list.append(role_assignment)
return assignment_list
# CRUD
def create_user(self, user_id, user):
try:

View File

@ -355,6 +355,43 @@ class Identity(sql.Base, identity.Driver):
self.update_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
def list_role_assignments(self):
# TODO(henry-nash): The current implementation is really simulating
# us having a common role assignment table, rather than having the
# four different grant tables we have today. When we move to role
# assignment as a first class entity, we should create the single
# assignment table, simplifying the logic of this (and many other)
# functions.
session = self.get_session()
assignment_list = []
refs = session.query(UserDomainGrant).all()
for x in refs:
for r in x.data.get('roles', []):
assignment_list.append({'user_id': x.user_id,
'domain_id': x.domain_id,
'role_id': r})
refs = session.query(UserProjectGrant).all()
for x in refs:
for r in x.data.get('roles', []):
assignment_list.append({'user_id': x.user_id,
'project_id': x.project_id,
'role_id': r})
refs = session.query(GroupDomainGrant).all()
for x in refs:
for r in x.data.get('roles', []):
assignment_list.append({'group_id': x.group_id,
'domain_id': x.domain_id,
'role_id': r})
refs = session.query(GroupProjectGrant).all()
for x in refs:
for r in x.data.get('roles', []):
assignment_list.append({'group_id': x.group_id,
'project_id': x.project_id,
'role_id': r})
return assignment_list
def list_projects(self):
session = self.get_session()
tenant_refs = session.query(Project).all()

View File

@ -16,6 +16,7 @@
"""Workflow Logic the Identity service."""
import copy
import urllib
import urlparse
import uuid
@ -794,3 +795,195 @@ class RoleV3(controller.V3Controller):
self._delete_tokens_for_user(user_id)
else:
self._delete_tokens_for_group(group_id)
class RoleAssignmentV3(controller.V3Controller):
# TODO(henry-nash): The current implementation does not provide a full
# first class entity for role-assignment. There is no role_assignment_id
# and only the list_role_assignment call is supported. Further, since it
# is not a first class entity, the links for the individual entities
# reference the individual role grant APIs.
collection_name = 'role_assignments'
member_name = 'role_assignment'
@classmethod
def wrap_member(cls, context, ref):
# NOTE(henry-nash): Since we are not yet a true collection, we override
# the wrapper as have already included the links in the entities
pass
def _format_entity(self, entity):
formatted_entity = {}
if 'user_id' in entity:
formatted_entity['user'] = {'id': entity['user_id']}
actor_link = '/users/%s' % entity['user_id']
if 'group_id' in entity:
formatted_entity['group'] = {'id': entity['group_id']}
actor_link = '/groups/%s' % entity['group_id']
if 'role_id' in entity:
formatted_entity['role'] = {'id': entity['role_id']}
if 'project_id' in entity:
formatted_entity['scope'] = (
{'project': {'id': entity['project_id']}})
target_link = '/projects/%s' % entity['project_id']
if 'domain_id' in entity:
formatted_entity['scope'] = (
{'domain': {'id': entity['domain_id']}})
target_link = '/domains/%s' % entity['domain_id']
formatted_entity.setdefault('links', {})
formatted_entity['links']['assignment'] = (
self.base_url(target_link + actor_link +
'/roles/%s' % entity['role_id']))
return formatted_entity
def _expand_indirect_assignments(self, refs):
"""Processes entity list into all-direct assignments.
For any group role assignments in the list, create a role assignment
entity for each member of that group, and then remove the group
assignment entity itself from the list.
For any new entity created by virtue of group membership, add in an
additional link to that membership.
"""
def _get_group_members(ref):
"""Get a list of group members.
Get the list of group members. If this fails with
GroupNotFound, then log this as a warning, but allow
overall processing to continue.
"""
try:
members = self.identity_api.list_users_in_group(
ref['group']['id'])
except exception.GroupNotFound:
members = []
# The group is missing, which should not happen since
# group deletion should remove any related assignments, so
# log a warning
if 'domain' in ref:
target = 'Domain: %s' % ref['domain'].get('domain_id')
elif 'project' in ref:
target = 'Project: %s' % ref['project'].get('project_id')
else:
# Should always be a domain or project, but since to get
# here things have gone astray, let's be cautious.
target = 'Unknown'
LOG.warning(
_('Group %(group)s not found for role-assignment - '
'%(target)s with Role: %(role)s') % {
'group': ref['group_id'], 'target': target,
'role': ref.get('role_id')})
return members
def _build_equivalent_user_assignment(user, group_id, template):
"""Create a user assignment equivalent to the group one.
The template has had the 'group' entity removed, so
substitute a 'user' one, modify the 'assignment' link
to match, and add a 'membership' link.
"""
user_entry = copy.deepcopy(template)
user_entry['user'] = {'id': user['id']}
scope = user_entry.get('scope')
if 'domain' in scope:
target_link = (
'/domains/%s' % scope['domain']['id'])
else:
target_link = (
'/projects/%s' % scope['project']['id'])
user_entry['links']['assignment'] = (
self.base_url('%s/users/%s/roles/%s' %
(target_link, m['id'],
user_entry['role']['id'])))
user_entry['links']['membership'] = (
self.base_url('/groups/%s/users/%s' %
(group_id, user['id'])))
return user_entry
# Scan the list of entities for any group assignments, expanding
# them into equivalent user entities. Due to potential large
# expansion of group entities, rather than modify the
# list we are enumerating, we build a new one as we go.
new_refs = []
for r in refs:
if 'group' in r:
# As it is a group role assignment, first get the list of
# members.
members = _get_group_members(r)
# Now replace that group role assignment entry with an
# equivalent user role assignment for each of the group members
base_entry = copy.deepcopy(r)
group_id = base_entry['group']['id']
base_entry.pop('group')
for m in members:
user_entry = _build_equivalent_user_assignment(
m, group_id, base_entry)
new_refs.append(user_entry)
else:
new_refs.append(r)
return new_refs
def _query_filter_is_true(self, filter_value):
"""Determine if bool query param is 'True'.
We treat this the same way as we do for policy
enforcement:
{bool_param}=0 is treated as False
Any other value is considered to be equivalent to
True, including the absence of a value
"""
if (isinstance(filter_value, basestring) and
filter_value == '0'):
val = False
else:
val = True
return val
@controller.filterprotected('group.id', 'role.id',
'scope.domain.id', 'scope.project.id',
'user.id')
def list_role_assignments(self, context, filters):
# TODO(henry-nash): This implementation uses the standard filtering
# in the V3.wrap_collection. Given the large number of individual
# assignments, this is pretty inefficient. An alternative would be
# to pass the filters into the driver call, so that the list size is
# kept a minimum.
refs = self.identity_api.list_role_assignments()
formatted_refs = [self._format_entity(x) for x in refs]
if ('effective' in context['query_string'] and
self._query_filter_is_true(
context['query_string']['effective'])):
formatted_refs = self._expand_indirect_assignments(formatted_refs)
return self.wrap_collection(context, formatted_refs, filters)
@controller.protected
def get_role_assignment(self, context):
raise exception.NotImplemented()
@controller.protected
def update_role_assignment(self, context):
raise exception.NotImplemented()
@controller.protected
def delete_role_assignment(self, context):
raise exception.NotImplemented()

View File

@ -504,6 +504,10 @@ class Driver(object):
"""
raise exception.NotImplemented()
def list_role_assignments(self):
raise exception.NotImplemented()
# group crud
def create_group(self, group_id, group):

View File

@ -173,3 +173,7 @@ def append_v3_routers(mapper, routers):
controller=role_controller,
action='revoke_grant',
conditions=dict(method=['DELETE']))
routers.append(
router.Router(controllers.RoleAssignmentV3(),
'role_assignments', 'role_assignment'))

View File

@ -482,6 +482,72 @@ class IdentityTests(object):
self.identity_api.get_project,
'fake2')
def test_list_role_assignments_unfiltered(self):
"""Test for unfiltered listing role assignments.
Test Plan:
- Create a domain, with a user, group & project
- Find how many role assignments already exist (from default
fixtures)
- Create a grant of each type (user/group on project/domain)
- Check the number of assignments has gone up by 4 and that
the entries we added are in the list returned
"""
new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(new_domain['id'], new_domain)
new_user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex, 'enabled': True,
'domain_id': new_domain['id']}
self.identity_api.create_user(new_user['id'],
new_user)
new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'],
'name': uuid.uuid4().hex}
self.identity_api.create_group(new_group['id'], new_group)
new_project = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': new_domain['id']}
self.identity_api.create_project(new_project['id'], new_project)
# First check how many role grant already exist
existing_assignments = len(self.identity_api.list_role_assignments())
# Now create the grants (roles are defined in default_fixtures)
self.identity_api.create_grant(user_id=new_user['id'],
domain_id=new_domain['id'],
role_id='member')
self.identity_api.create_grant(user_id=new_user['id'],
project_id=new_project['id'],
role_id='other')
self.identity_api.create_grant(group_id=new_group['id'],
domain_id=new_domain['id'],
role_id='admin')
self.identity_api.create_grant(group_id=new_group['id'],
project_id=new_project['id'],
role_id='admin')
# Read back the list of assignments - check it is gone up by 4
assignment_list = self.identity_api.list_role_assignments()
self.assertEquals(len(assignment_list), existing_assignments + 4)
# Now check that each of our four new entries are in the list
self.assertIn(
{'user_id': new_user['id'], 'domain_id': new_domain['id'],
'role_id': 'member'},
assignment_list)
self.assertIn(
{'user_id': new_user['id'], 'project_id': new_project['id'],
'role_id': 'other'},
assignment_list)
self.assertIn(
{'group_id': new_group['id'], 'domain_id': new_domain['id'],
'role_id': 'admin'},
assignment_list)
self.assertIn(
{'group_id': new_group['id'], 'project_id': new_project['id'],
'role_id': 'admin'},
assignment_list)
def test_add_duplicate_role_grant(self):
roles_ref = self.identity_api.get_roles_for_user_and_project(
self.user_foo['id'], self.tenant_bar['id'])

View File

@ -458,6 +458,9 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
def test_get_and_remove_correct_role_grant_from_a_mix(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
def test_list_role_assignments_unfiltered(self):
raise nose.exc.SkipTest('Blocked by bug 1195019')
def test_project_crud(self):
# NOTE(topol): LDAP implementation does not currently support the
# updating of a project name so this method override

View File

@ -715,6 +715,87 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
self.assertEqual(ref['name'], entity['name'])
return entity
def assertValidRoleAssignmentListResponse(self, resp, ref=None,
expected_length=None):
entities = resp.result.get('role_assignments')
if expected_length is not None:
self.assertEqual(len(entities), expected_length)
elif ref is not None:
# we're at least expecting the ref
self.assertNotEmpty(entities)
# collections should have relational links
self.assertValidListLinks(resp.result.get('links'))
for entity in entities:
self.assertIsNotNone(entity)
self.assertValidRoleAssignment(entity)
if ref:
self.assertValidRoleAssignment(entity, ref)
return entities
def assertValidRoleAssignment(self, entity, ref=None, url=None):
self.assertIsNotNone(entity.get('role'))
self.assertIsNotNone(entity.get('scope'))
# Only one of user or group should be present
self.assertIsNotNone(entity.get('user') or
entity.get('group'))
self.assertIsNone(entity.get('user') and
entity.get('group'))
# Only one of domain or project should be present
self.assertIsNotNone(entity['scope'].get('project') or
entity['scope'].get('domain'))
self.assertIsNone(entity['scope'].get('project') and
entity['scope'].get('domain'))
if entity['scope'].get('project'):
self.assertIsNotNone(entity['scope']['project'].get('id'))
else:
self.assertIsNotNone(entity['scope']['domain'].get('id'))
self.assertIsNotNone(entity.get('links'))
self.assertIsNotNone(entity['links'].get('assignment'))
if ref:
if ref.get('user'):
self.assertEqual(ref['user']['id'], entity['user']['id'])
if ref.get('group'):
self.assertEqual(ref['group']['id'], entity['group']['id'])
if ref.get('role'):
self.assertEqual(ref['role']['id'], entity['role']['id'])
if ref['scope'].get('project'):
self.assertEqual(ref['scope']['project']['id'],
entity['scope']['project']['id'])
if ref['scope'].get('domain'):
self.assertEqual(ref['scope']['domain']['id'],
entity['scope']['domain']['id'])
if url:
self.assertIn(url, entity['links']['assignment'])
def assertRoleAssignmentInListResponse(
self, resp, ref, link_url=None, expected=1):
found_count = 0
for entity in resp.result.get('role_assignments'):
try:
self.assertValidRoleAssignment(
entity, ref=ref, url=link_url)
except Exception:
# It doesn't match, so let's go onto the next one
pass
else:
found_count += 1
self.assertEqual(found_count, expected)
def assertRoleAssignmentNotInListResponse(
self, resp, ref, link_url=None):
self.assertRoleAssignmentInListResponse(
resp, ref=ref, link_url=link_url, expected=0)
# policy validation
def assertValidPolicyListResponse(self, resp, *args, **kwargs):

View File

@ -627,3 +627,444 @@ class IdentityTestCase(test_v3.RestfulTestCase):
r = self.get(collection_url)
self.assertValidRoleListResponse(r, expected_length=0)
self.assertIn(collection_url, r.result['links']['self'])
def _build_role_assignment_url_and_entity(
self, role_id, user_id=None, group_id=None, domain_id=None,
project_id=None):
if user_id and domain_id:
url = ('/domains/%(domain_id)s/users/%(user_id)s'
'/roles/%(role_id)s' % {
'domain_id': domain_id,
'user_id': user_id,
'role_id': role_id})
entity = {'role': {'id': role_id},
'user': {'id': user_id},
'scope': {'domain': {'id': domain_id}}}
elif user_id and project_id:
url = ('/projects/%(project_id)s/users/%(user_id)s'
'/roles/%(role_id)s' % {
'project_id': project_id,
'user_id': user_id,
'role_id': role_id})
entity = {'role': {'id': role_id},
'user': {'id': user_id},
'scope': {'project': {'id': project_id}}}
if group_id and domain_id:
url = ('/domains/%(domain_id)s/groups/%(group_id)s'
'/roles/%(role_id)s' % {
'domain_id': domain_id,
'group_id': group_id,
'role_id': role_id})
entity = {'role': {'id': role_id},
'group': {'id': group_id},
'scope': {'domain': {'id': domain_id}}}
elif group_id and project_id:
url = ('/projects/%(project_id)s/groups/%(group_id)s'
'/roles/%(role_id)s' % {
'project_id': project_id,
'group_id': group_id,
'role_id': role_id})
entity = {'role': {'id': role_id},
'group': {'id': group_id},
'scope': {'project': {'id': project_id}}}
return (url, entity)
def test_get_role_assignments(self):
"""Call ``GET /role_assignments``.
The sample data set up already has a user, group and project
that is part of self.domain. We use these plus a new user
we create as our data set, making sure we ignore any
role assignments that are already in existence.
Since we don't yet support a first class entity for role
assignments, we are only testing the LIST API. To create
and delete the role assignments we use the old grant APIs.
Test Plan:
- Create extra user for tests
- Get a list of all existing role assignments
- Add a new assignment for each of the four combinations, i.e.
group+domain, user+domain, group+project, user+project, using
the same role each time
- Get a new list of all role assignments, checking these four new
ones have been added
- Then delete the four we added
- Get a new list of all role assignments, checking the four have
been removed
"""
# Since the default fixtures already assign some roles to the
# user it creates, we also need a new user that will not have any
# existing assignments
self.user1 = self.new_user_ref(
domain_id=self.domain['id'])
self.user1['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.user1['id'], self.user1)
collection_url = '/role_assignments'
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertIn(collection_url, r.result['links']['self'])
existing_assignments = len(r.result.get('role_assignments'))
# Now add one of each of the four types of assignment, making sure
# that we get them all back.
gd_url, gd_entity = self._build_role_assignment_url_and_entity(
domain_id=self.domain_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gd_url)
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments + 1)
self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url)
ud_url, ud_entity = self._build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user1['id'],
role_id=self.role_id)
self.put(ud_url)
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments + 2)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
gp_url, gp_entity = self._build_role_assignment_url_and_entity(
project_id=self.project_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gp_url)
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments + 3)
self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url)
up_url, up_entity = self._build_role_assignment_url_and_entity(
project_id=self.project_id, user_id=self.user1['id'],
role_id=self.role_id)
self.put(up_url)
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments + 4)
self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url)
# Now delete the four we added and make sure they are removed
# from the collection.
self.delete(gd_url)
self.delete(ud_url)
self.delete(gp_url)
self.delete(up_url)
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments)
self.assertRoleAssignmentNotInListResponse(r, gd_entity)
self.assertRoleAssignmentNotInListResponse(r, ud_entity)
self.assertRoleAssignmentNotInListResponse(r, gp_entity)
self.assertRoleAssignmentNotInListResponse(r, up_entity)
def test_get_effective_role_assignments(self):
"""Call ``GET /role_assignments?effective``.
Test Plan:
- Create two extra user for tests
- Add these users to a group
- Add a role assignment for the group on a domain
- Get a list of all role assignments, checking one has been added
- Then get a list of all effective role assignments - the group
assignment should have turned into assignments on the domain
for each of the group members.
"""
self.user1 = self.new_user_ref(
domain_id=self.domain['id'])
self.user1['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.user1['id'], self.user1)
self.user2 = self.new_user_ref(
domain_id=self.domain['id'])
self.user2['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.user2['id'], self.user2)
self.identity_api.add_user_to_group(self.user1['id'], self.group['id'])
self.identity_api.add_user_to_group(self.user2['id'], self.group['id'])
collection_url = '/role_assignments'
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertIn(collection_url, r.result['links']['self'])
existing_assignments = len(r.result.get('role_assignments'))
gd_url, gd_entity = self._build_role_assignment_url_and_entity(
domain_id=self.domain_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gd_url)
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments + 1)
self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url)
# Now re-read the collection asking for effective roles - this
# should mean the group assignment is translated into the two
# member user assignments
collection_url = '/role_assignments?effective'
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments + 2)
ud_url, ud_entity = self._build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user1['id'],
role_id=self.role_id)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
ud_url, ud_entity = self._build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user2['id'],
role_id=self.role_id)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
def test_check_effective_values_for_role_assignments(self):
"""Call ``GET /role_assignments?effective=value``.
Check the various ways of specifying the 'effective'
query parameter. If the 'effective' query parameter
is included then this should always be treated as
as meaning 'True' unless it is specified as:
{url}?effective=0
This is by design to match the agreed way of handling
policy checking on query/filter parameters.
Test Plan:
- Create two extra user for tests
- Add these users to a group
- Add a role assignment for the group on a domain
- Get a list of all role assignments, checking one has been added
- Then issue various request with different ways of defining
the 'effective' query parameter. As we have tested the
correctness of the data coming back when we get effective roles
in other tests, here we just use the count of entities to
know if we are getting effective roles or not
"""
self.user1 = self.new_user_ref(
domain_id=self.domain['id'])
self.user1['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.user1['id'], self.user1)
self.user2 = self.new_user_ref(
domain_id=self.domain['id'])
self.user2['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.user2['id'], self.user2)
self.identity_api.add_user_to_group(self.user1['id'], self.group['id'])
self.identity_api.add_user_to_group(self.user2['id'], self.group['id'])
collection_url = '/role_assignments'
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
existing_assignments = len(r.result.get('role_assignments'))
gd_url, gd_entity = self._build_role_assignment_url_and_entity(
domain_id=self.domain_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gd_url)
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments + 1)
self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url)
# Now re-read the collection asking for effective roles,
# using the most common way of defining "effective'. This
# should mean the group assignment is translated into the two
# member user assignments
collection_url = '/role_assignments?effective'
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments + 2)
# Now set 'effective' to false explicitly - should get
# back the regular roles
collection_url = '/role_assignments?effective=0'
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments + 1)
# Now try setting 'effective' to 'False' explicitly- this is
# NOT supported as a way of setting a query or filter
# parameter to false by design. Hence we should get back
# effective roles.
collection_url = '/role_assignments?effective=False'
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments + 2)
# Now set 'effective' to True explicitly
collection_url = '/role_assignments?effective=True'
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments + 2)
def test_filtered_role_assignments(self):
"""Call ``GET /role_assignments?filters``.
Test Plan:
- Create extra users, group, role and project for tests
- Make the following assignments:
Give group1, role1 on project1 and domain
Give user1, role2 on project1 and domain
Make User1 a member of Group1
- Test a series of single filter list calls, checking that
the correct results are obtained
- Test a multi-filtered list call
- Test listing all effective roles for a given user
- Test the equivalent of the list of roles in a project scoped
token (all effective roles for a user on a project)
"""
# Since the default fixtures already assign some roles to the
# user it creates, we also need a new user that will not have any
# existing assignments
self.user1 = self.new_user_ref(
domain_id=self.domain['id'])
self.user1['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.user1['id'], self.user1)
self.user2 = self.new_user_ref(
domain_id=self.domain['id'])
self.user2['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.user2['id'], self.user2)
self.group1 = self.new_group_ref(
domain_id=self.domain['id'])
self.identity_api.create_group(self.group1['id'], self.group1)
self.identity_api.add_user_to_group(self.user1['id'],
self.group1['id'])
self.identity_api.add_user_to_group(self.user2['id'],
self.group1['id'])
self.project1 = self.new_project_ref(
domain_id=self.domain['id'])
self.identity_api.create_project(self.project1['id'], self.project1)
self.role1 = self.new_role_ref()
self.identity_api.create_role(self.role1['id'], self.role1)
self.role2 = self.new_role_ref()
self.identity_api.create_role(self.role2['id'], self.role2)
# Now add one of each of the four types of assignment
gd_url, gd_entity = self._build_role_assignment_url_and_entity(
domain_id=self.domain_id, group_id=self.group1['id'],
role_id=self.role1['id'])
self.put(gd_url)
ud_url, ud_entity = self._build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user1['id'],
role_id=self.role2['id'])
self.put(ud_url)
gp_url, gp_entity = self._build_role_assignment_url_and_entity(
project_id=self.project1['id'], group_id=self.group1['id'],
role_id=self.role1['id'])
self.put(gp_url)
up_url, up_entity = self._build_role_assignment_url_and_entity(
project_id=self.project1['id'], user_id=self.user1['id'],
role_id=self.role2['id'])
self.put(up_url)
# Now list by various filters to make sure we get back the right ones
collection_url = ('/role_assignments?scope.project.id=%s' %
self.project1['id'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 2)
self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url)
self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url)
collection_url = ('/role_assignments?scope.domain.id=%s' %
self.domain['id'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 2)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url)
collection_url = '/role_assignments?user.id=%s' % self.user1['id']
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 2)
self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
collection_url = '/role_assignments?group.id=%s' % self.group1['id']
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 2)
self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url)
self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url)
collection_url = '/role_assignments?role.id=%s' % self.role1['id']
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 2)
self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url)
self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url)
# Let's try combining two filers together....
collection_url = (
'/role_assignments?user.id=%(user_id)s'
'&scope.project.id=%(project_id)s' % {
'user_id': self.user1['id'],
'project_id': self.project1['id']})
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 1)
self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url)
# Now for a harder one - filter for user with effective
# roles - this should return role assignment that were directly
# assigned as well as by virtue of group membership
collection_url = ('/role_assignments?effective&user.id=%s' %
self.user1['id'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 4)
# Should have the two direct roles...
self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
# ...and the two via group membership...
up1_url, up1_entity = self._build_role_assignment_url_and_entity(
project_id=self.project1['id'], user_id=self.user1['id'],
role_id=self.role1['id'])
ud1_url, ud1_entity = self._build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user1['id'],
role_id=self.role1['id'])
self.assertRoleAssignmentInListResponse(r, up1_entity,
link_url=up1_url)
self.assertRoleAssignmentInListResponse(r, ud1_entity,
link_url=ud1_url)
# ...and for the grand-daddy of them all, simulate the request
# that would generate the list of effective roles in a project
# scoped token.
collection_url = (
'/role_assignments?effective&user.id=%(user_id)s'
'&scope.project.id=%(project_id)s' % {
'user_id': self.user1['id'],
'project_id': self.project1['id']})
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 2)
# Should have one direct role and one from group membership...
up1_url, up1_entity = self._build_role_assignment_url_and_entity(
project_id=self.project1['id'], user_id=self.user1['id'],
role_id=self.role1['id'])
self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url)
self.assertRoleAssignmentInListResponse(r, up1_entity,
link_url=up1_url)