Handle NotFound when listing role assignments for deleted users

Keystone can use an external identity store for the users, and
store assignments for these users in the SQL database that it
manages. When a user has been deleted directly in the external
identity store, these assignments will persist. Therefore when
listing role assignments and asking for names to be included,
keystone will try to get information of the user and fail with
NotFound.

This catches the NotFound exception of the get_user and get_group
calls and fills the user values with and empty string.

Change-Id: Iec3e12f6cd1402e1e3f192b0ede5d608bd41ca1d
Closes-Bug: 1684820
(cherry picked from commit 0392b36a0d)
This commit is contained in:
Kristi Nikolla 2017-04-21 15:31:49 -04:00 committed by Divya K Konoor
parent 955fd6ca37
commit e1ee00f1df
2 changed files with 82 additions and 12 deletions

View File

@ -949,19 +949,43 @@ class Manager(manager.Manager):
_domain = self.resource_api.get_domain(value)
new_assign['domain_name'] = _domain['name']
elif key == 'user_id':
_user = self.identity_api.get_user(value)
new_assign['user_name'] = _user['name']
new_assign['user_domain_id'] = _user['domain_id']
new_assign['user_domain_name'] = (
self.resource_api.get_domain(_user['domain_id'])
['name'])
try:
# Note(knikolla): Try to get the user, otherwise
# if the user wasn't found in the backend
# use empty values.
_user = self.identity_api.get_user(value)
except exception.UserNotFound:
msg = ('User %(user)s not found in the'
' backend but still has role assignments.')
LOG.warning(msg, {'user': value})
new_assign['user_name'] = ''
new_assign['user_domain_id'] = ''
new_assign['user_domain_name'] = ''
else:
new_assign['user_name'] = _user['name']
new_assign['user_domain_id'] = _user['domain_id']
new_assign['user_domain_name'] = (
self.resource_api.get_domain(_user['domain_id'])
['name'])
elif key == 'group_id':
_group = self.identity_api.get_group(value)
new_assign['group_name'] = _group['name']
new_assign['group_domain_id'] = _group['domain_id']
new_assign['group_domain_name'] = (
self.resource_api.get_domain(_group['domain_id'])
['name'])
try:
# Note(knikolla): Try to get the group, otherwise
# if the group wasn't found in the backend
# use empty values.
_group = self.identity_api.get_group(value)
except exception.GroupNotFound:
msg = ('Group %(group)s not found in the'
' backend but still has role assignments.')
LOG.warning(msg, {'group': value})
new_assign['group_name'] = ''
new_assign['group_domain_id'] = ''
new_assign['group_domain_name'] = ''
else:
new_assign['group_name'] = _group['name']
new_assign['group_domain_id'] = _group['domain_id']
new_assign['group_domain_name'] = (
self.resource_api.get_domain(_group['domain_id'])
['name'])
elif key == 'project_id':
_project = self.resource_api.get_project(value)
new_assign['project_name'] = _project['name']

View File

@ -578,6 +578,52 @@ class AssignmentTests(AssignmentTestHelperMixin):
role_id=uuid.uuid4().hex)
self.assertEqual([], assignment_list)
def test_list_role_assignments_user_not_found(self):
def _user_not_found(value):
raise exception.UserNotFound(user_id=value)
# Note(knikolla): Patch get_user to return UserNotFound
# this simulates the possibility of a user being deleted
# directly in the backend and still having lingering role
# assignments.
with mock.patch.object(self.identity_api, 'get_user',
_user_not_found):
assignment_list = self.assignment_api.list_role_assignments(
include_names=True
)
self.assertNotEqual([], assignment_list)
for assignment in assignment_list:
if 'user_name' in assignment:
# Note(knikolla): In the case of a not found user we
# populate the values with empty strings.
self.assertEqual('', assignment['user_name'])
self.assertEqual('', assignment['user_domain_id'])
self.assertEqual('', assignment['user_domain_name'])
def test_list_role_assignments_group_not_found(self):
def _group_not_found(value):
raise exception.GroupNotFound(group_id=value)
# Note(knikolla): Patch get_group to return GroupNotFound
# this simulates the case of a group being deleted
# directly in the backend and still having lingering role
# assignments.
with mock.patch.object(self.identity_api, 'get_group',
_group_not_found):
assignment_list = self.assignment_api.list_role_assignments(
include_names=True
)
self.assertNotEqual([], assignment_list)
for assignment in assignment_list:
if 'group_name' in assignment:
# Note(knikolla): In the case of a not found group we
# populate the values with empty strings.
self.assertEqual('', assignment['group_name'])
self.assertEqual('', assignment['group_domain_id'])
self.assertEqual('', assignment['group_domain_name'])
def test_add_duplicate_role_grant(self):
roles_ref = self.assignment_api.get_roles_for_user_and_project(
self.user_foo['id'], self.tenant_bar['id'])