Invalidate trust when the trustor or trustee is deleted

The trust without a valid trustee, trustor is useless and will
no longer be active since the id of user is a random number and
only assigned when it created.

The patch delete trust if the related trustee or trustor is
deleted if the user is maintained by keystone.

Change-Id: I67dac6b7bac8cb94575ceda4a3277847a2bcc2d8
Partial-Bug: #1622310
This commit is contained in:
Dave Chen 2016-09-13 19:00:11 +08:00
parent 2102498b36
commit 52642cc562
3 changed files with 64 additions and 1 deletions

View File

@ -4880,10 +4880,11 @@ class TestTrustChain(test_v3.RestfulTestCase):
self.identity_api.delete_user(self.user_list[0]['id'])
# Bypass policy enforcement
# Delete trustee will invalidate the trust.
with mock.patch.object(rules, 'enforce', return_value=True):
headers = {'X-Subject-Token': self.last_token}
self.head('/auth/tokens', headers=headers,
expected_status=http_client.FORBIDDEN)
expected_status=http_client.NOT_FOUND)
class TestAuthContext(unit.TestCase):

View File

@ -16,6 +16,7 @@ import uuid
from six.moves import http_client
import keystone.conf
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit import test_v3
@ -460,3 +461,50 @@ class TestTrustOperations(test_v3.RestfulTestCase):
token=resp.headers.get('X-Subject-Token'),
method='POST',
expected_status=http_client.FORBIDDEN)
def test_trust_deleted_when_user_deleted(self):
# create trust
ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user_id,
project_id=self.project_id,
impersonation=False,
role_ids=[self.role_id],
allow_redelegation=True)
resp = self.post('/OS-TRUST/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(resp)
# list all trusts
r = self.get('/OS-TRUST/trusts')
self.assertEqual(1, len(r.result['trusts']))
# delete the trustee will delete the trust
self.delete(
'/users/%(user_id)s' % {'user_id': trust['trustee_user_id']})
self.get(
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
expected_status=http_client.NOT_FOUND)
# create another user as the new trustee
trustee_user = unit.create_user(self.identity_api,
domain_id=self.domain_id)
trustee_user_id = trustee_user['id']
# create the trust again
ref['trustee_user_id'] = trustee_user_id
resp = self.post('/OS-TRUST/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(resp)
r = self.get('/OS-TRUST/trusts')
self.assertEqual(1, len(r.result['trusts']))
# delete the trustor will delete the trust
self.delete(
'/users/%(user_id)s' % {'user_id': trust['trustor_user_id']})
# call the backend method directly to bypass authentication since the
# user has been deleted.
self.assertRaises(exception.TrustNotFound,
self.trust_api.get_trust,
trust['id'])

View File

@ -43,6 +43,20 @@ class Manager(manager.Manager):
def __init__(self):
super(Manager, self).__init__(CONF.trust.driver)
notifications.register_event_callback(
notifications.ACTIONS.deleted, 'user',
self._on_user_delete)
def _on_user_delete(self, service, resource_type, operation,
payload):
# NOTE(davechen): Only delete the user that is maintained by
# keystone will delete the related trust, since we don't know
# when a LDAP user or federation user is deleted.
user_id = payload['resource_info']
trusts = self.driver.list_trusts_for_trustee(user_id)
trusts = trusts + self.driver.list_trusts_for_trustor(user_id)
for trust in trusts:
self.driver.delete_trust(trust['id'])
@staticmethod
def _validate_redelegation(redelegated_trust, trust):