From 0392b36a0d7d3e7cc479b357245da04c949924de Mon Sep 17 00:00:00 2001 From: Kristi Nikolla Date: Fri, 21 Apr 2017 15:31:49 -0400 Subject: [PATCH] Handle NotFound when listing role assignments for deleted users Keystone can use an external identity store for the users, and store assignments for these users in the SQL database that it manages. When a user has been deleted directly in the external identity store, these assignments will persist. Therefore when listing role assignments and asking for names to be included, keystone will try to get information of the user and fail with NotFound. This catches the NotFound exception of the get_user and get_group calls and fills the user values with and empty string. Change-Id: Iec3e12f6cd1402e1e3f192b0ede5d608bd41ca1d Closes-Bug: 1684820 --- keystone/assignment/core.py | 48 ++++++++++++++----- .../tests/unit/assignment/test_backends.py | 46 ++++++++++++++++++ 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/keystone/assignment/core.py b/keystone/assignment/core.py index 68531e4111..be9b68d291 100644 --- a/keystone/assignment/core.py +++ b/keystone/assignment/core.py @@ -938,19 +938,43 @@ class Manager(manager.Manager): _domain = self.resource_api.get_domain(value) new_assign['domain_name'] = _domain['name'] elif key == 'user_id': - _user = self.identity_api.get_user(value) - new_assign['user_name'] = _user['name'] - new_assign['user_domain_id'] = _user['domain_id'] - new_assign['user_domain_name'] = ( - self.resource_api.get_domain(_user['domain_id']) - ['name']) + try: + # Note(knikolla): Try to get the user, otherwise + # if the user wasn't found in the backend + # use empty values. + _user = self.identity_api.get_user(value) + except exception.UserNotFound: + msg = ('User %(user)s not found in the' + ' backend but still has role assignments.') + LOG.warning(msg, {'user': value}) + new_assign['user_name'] = '' + new_assign['user_domain_id'] = '' + new_assign['user_domain_name'] = '' + else: + new_assign['user_name'] = _user['name'] + new_assign['user_domain_id'] = _user['domain_id'] + new_assign['user_domain_name'] = ( + self.resource_api.get_domain(_user['domain_id']) + ['name']) elif key == 'group_id': - _group = self.identity_api.get_group(value) - new_assign['group_name'] = _group['name'] - new_assign['group_domain_id'] = _group['domain_id'] - new_assign['group_domain_name'] = ( - self.resource_api.get_domain(_group['domain_id']) - ['name']) + try: + # Note(knikolla): Try to get the group, otherwise + # if the group wasn't found in the backend + # use empty values. + _group = self.identity_api.get_group(value) + except exception.GroupNotFound: + msg = ('Group %(group)s not found in the' + ' backend but still has role assignments.') + LOG.warning(msg, {'group': value}) + new_assign['group_name'] = '' + new_assign['group_domain_id'] = '' + new_assign['group_domain_name'] = '' + else: + new_assign['group_name'] = _group['name'] + new_assign['group_domain_id'] = _group['domain_id'] + new_assign['group_domain_name'] = ( + self.resource_api.get_domain(_group['domain_id']) + ['name']) elif key == 'project_id': _project = self.resource_api.get_project(value) new_assign['project_name'] = _project['name'] diff --git a/keystone/tests/unit/assignment/test_backends.py b/keystone/tests/unit/assignment/test_backends.py index 1612e4c1c2..5f5b29508c 100644 --- a/keystone/tests/unit/assignment/test_backends.py +++ b/keystone/tests/unit/assignment/test_backends.py @@ -579,6 +579,52 @@ class AssignmentTests(AssignmentTestHelperMixin): role_id=uuid.uuid4().hex) self.assertEqual([], assignment_list) + def test_list_role_assignments_user_not_found(self): + def _user_not_found(value): + raise exception.UserNotFound(user_id=value) + + # Note(knikolla): Patch get_user to return UserNotFound + # this simulates the possibility of a user being deleted + # directly in the backend and still having lingering role + # assignments. + with mock.patch.object(self.identity_api, 'get_user', + _user_not_found): + assignment_list = self.assignment_api.list_role_assignments( + include_names=True + ) + + self.assertNotEqual([], assignment_list) + for assignment in assignment_list: + if 'user_name' in assignment: + # Note(knikolla): In the case of a not found user we + # populate the values with empty strings. + self.assertEqual('', assignment['user_name']) + self.assertEqual('', assignment['user_domain_id']) + self.assertEqual('', assignment['user_domain_name']) + + def test_list_role_assignments_group_not_found(self): + def _group_not_found(value): + raise exception.GroupNotFound(group_id=value) + + # Note(knikolla): Patch get_group to return GroupNotFound + # this simulates the case of a group being deleted + # directly in the backend and still having lingering role + # assignments. + with mock.patch.object(self.identity_api, 'get_group', + _group_not_found): + assignment_list = self.assignment_api.list_role_assignments( + include_names=True + ) + + self.assertNotEqual([], assignment_list) + for assignment in assignment_list: + if 'group_name' in assignment: + # Note(knikolla): In the case of a not found group we + # populate the values with empty strings. + self.assertEqual('', assignment['group_name']) + self.assertEqual('', assignment['group_domain_id']) + self.assertEqual('', assignment['group_domain_name']) + def test_add_duplicate_role_grant(self): roles_ref = self.assignment_api.get_roles_for_user_and_project( self.user_foo['id'], self.tenant_bar['id'])