Merge "Remove foreign assignments when deleting a domain"
This commit is contained in:
commit
3c6091cc48
|
@ -264,6 +264,14 @@ class Assignment(keystone_assignment.AssignmentDriverV9):
|
|||
q = q.filter_by(role_id=role_id)
|
||||
q.delete(False)
|
||||
|
||||
def delete_domain_assignments(self, domain_id):
|
||||
with sql.session_for_write() as session:
|
||||
q = session.query(RoleAssignment)
|
||||
q = q.filter(RoleAssignment.target_id == domain_id).filter(
|
||||
(RoleAssignment.type == AssignmentType.USER_DOMAIN) |
|
||||
(RoleAssignment.type == AssignmentType.GROUP_DOMAIN))
|
||||
q.delete(False)
|
||||
|
||||
def delete_user_assignments(self, user_id):
|
||||
with sql.session_for_write() as session:
|
||||
q = session.query(RoleAssignment)
|
||||
|
|
|
@ -29,7 +29,7 @@ from keystone.common import driver_hints
|
|||
from keystone.common import manager
|
||||
from keystone import exception
|
||||
from keystone.i18n import _
|
||||
from keystone.i18n import _LI, _LE
|
||||
from keystone.i18n import _LI, _LE, _LW
|
||||
from keystone import notifications
|
||||
|
||||
|
||||
|
@ -49,6 +49,7 @@ MEMOIZE_COMPUTED_ASSIGNMENTS = cache.get_memoization_decorator(
|
|||
region=COMPUTED_ASSIGNMENTS_REGION)
|
||||
|
||||
|
||||
@notifications.listener
|
||||
@dependency.provider('assignment_api')
|
||||
@dependency.requires('credential_api', 'identity_api', 'resource_api',
|
||||
'revoke_api', 'role_api')
|
||||
|
@ -99,6 +100,17 @@ class Manager(manager.Manager):
|
|||
elif not isinstance(self.driver, AssignmentDriverV9):
|
||||
raise exception.UnsupportedDriverVersion(driver=assignment_driver)
|
||||
|
||||
self.event_callbacks = {
|
||||
notifications.ACTIONS.deleted: {
|
||||
'domain': [self._delete_domain_assignments],
|
||||
},
|
||||
}
|
||||
|
||||
def _delete_domain_assignments(self, service, resource_type, operations,
|
||||
payload):
|
||||
domain_id = payload['resource_info']
|
||||
self.driver.delete_domain_assignments(domain_id)
|
||||
|
||||
def _get_group_ids_for_user_id(self, user_id):
|
||||
# TODO(morganfainberg): Implement a way to get only group_ids
|
||||
# instead of the more expensive to_dict() call for each record.
|
||||
|
@ -1340,7 +1352,10 @@ class AssignmentDriverV9(AssignmentDriverBase):
|
|||
|
||||
"""
|
||||
|
||||
pass
|
||||
@abc.abstractmethod
|
||||
def delete_domain_assignments(self, domain_id):
|
||||
"""Deletes all assignments for a domain."""
|
||||
raise exception.NotImplemented()
|
||||
|
||||
|
||||
class V9AssignmentWrapperForV8Driver(AssignmentDriverV9):
|
||||
|
@ -1373,6 +1388,14 @@ class V9AssignmentWrapperForV8Driver(AssignmentDriverV9):
|
|||
def __init__(self, wrapped_driver):
|
||||
self.driver = wrapped_driver
|
||||
|
||||
def delete_domain_assignments(self, domain_id):
|
||||
"""Deletes all assignments for a domain."""
|
||||
msg = _LW('delete_domain_assignments method not found in custom '
|
||||
'assignment driver. Domain assignments for domain (%s) to '
|
||||
'users from other domains will not be removed. This was '
|
||||
'added in V9 of the assignment driver.')
|
||||
LOG.warning(msg, domain_id)
|
||||
|
||||
def default_role_driver(self):
|
||||
return self.driver.default_role_driver()
|
||||
|
||||
|
|
|
@ -2419,6 +2419,51 @@ class AssignmentTests(AssignmentTestHelperMixin):
|
|||
for assignment in group_assignments:
|
||||
self.assertThat(assignment.keys(), matchers.Contains('user_id'))
|
||||
|
||||
def test_remove_foreign_assignments_when_deleting_a_domain(self):
|
||||
# A user and a group are in default domain and have assigned a role on
|
||||
# two new domains. This test makes sure that when one of the new
|
||||
# domains is deleted, the role assignments for the user and the group
|
||||
# from the default domain are deleted only on that domain.
|
||||
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
|
||||
group = self.identity_api.create_group(group)
|
||||
|
||||
role = unit.new_role_ref()
|
||||
role = self.role_api.create_role(role['id'], role)
|
||||
|
||||
new_domains = [unit.new_domain_ref(), unit.new_domain_ref()]
|
||||
for new_domain in new_domains:
|
||||
self.resource_api.create_domain(new_domain['id'], new_domain)
|
||||
|
||||
self.assignment_api.create_grant(group_id=group['id'],
|
||||
domain_id=new_domain['id'],
|
||||
role_id=role['id'])
|
||||
self.assignment_api.create_grant(user_id=self.user_two['id'],
|
||||
domain_id=new_domain['id'],
|
||||
role_id=role['id'])
|
||||
|
||||
# Check there are 4 role assignments for that role
|
||||
role_assignments = self.assignment_api.list_role_assignments(
|
||||
role_id=role['id'])
|
||||
self.assertThat(role_assignments, matchers.HasLength(4))
|
||||
|
||||
# Delete first new domain and check only 2 assignments were left
|
||||
self.resource_api.update_domain(new_domains[0]['id'],
|
||||
{'enabled': False})
|
||||
self.resource_api.delete_domain(new_domains[0]['id'])
|
||||
|
||||
role_assignments = self.assignment_api.list_role_assignments(
|
||||
role_id=role['id'])
|
||||
self.assertThat(role_assignments, matchers.HasLength(2))
|
||||
|
||||
# Delete second new domain and check no assignments were left
|
||||
self.resource_api.update_domain(new_domains[1]['id'],
|
||||
{'enabled': False})
|
||||
self.resource_api.delete_domain(new_domains[1]['id'])
|
||||
|
||||
role_assignments = self.assignment_api.list_role_assignments(
|
||||
role_id=role['id'])
|
||||
self.assertEqual([], role_assignments)
|
||||
|
||||
|
||||
class InheritanceTests(AssignmentTestHelperMixin):
|
||||
|
||||
|
|
|
@ -1010,6 +1010,13 @@ class BaseLDAPIdentity(identity_tests.IdentityTests,
|
|||
super(BaseLDAPIdentity, self).
|
||||
test_create_project_with_domain_id_mismatch_to_parent_domain)
|
||||
|
||||
def test_remove_foreign_assignments_when_deleting_a_domain(self):
|
||||
"""Multiple domains are not supported."""
|
||||
self.assertRaises(
|
||||
(exception.ValidationError, exception.DomainNotFound),
|
||||
super(BaseLDAPIdentity,
|
||||
self).test_remove_foreign_assignments_when_deleting_a_domain)
|
||||
|
||||
|
||||
class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
|
||||
|
||||
|
@ -2715,6 +2722,12 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
|
|||
super(BaseLDAPIdentity, self).\
|
||||
test_create_project_with_domain_id_mismatch_to_parent_domain()
|
||||
|
||||
def test_remove_foreign_assignments_when_deleting_a_domain(self):
|
||||
# With multi LDAP this method should work, so override the override
|
||||
# from BaseLDAPIdentity
|
||||
base = super(BaseLDAPIdentity, self)
|
||||
base.test_remove_foreign_assignments_when_deleting_a_domain()
|
||||
|
||||
|
||||
class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
|
||||
"""Class to test the use of domain configs stored in the database.
|
||||
|
|
Loading…
Reference in New Issue