Add support for Credentials in Identity Manager

* Needed for later plugins and MFA

Change-Id: I0bcf290c047e218089f6c8f85b97ed76a903ed5f
This commit is contained in:
Amelia Cordwell 2017-11-29 15:36:38 +13:00
parent f8b8e0c97f
commit 571c89f98f
2 changed files with 66 additions and 2 deletions

View File

@ -46,7 +46,8 @@ class FakeProject(object):
class FakeUser(object):
def __init__(self, name, password, domain_id='default',
enabled=True, default_project_id=None, **kwargs):
enabled=True, default_project_id=None,
**kwargs):
self.id = uuid4().hex
self.name = name
self.password = password
@ -66,6 +67,16 @@ class FakeRole(object):
self.name = name
class FakeCredential(object):
def __init__(self, user_id, cred_type, blob, project_id=None):
self.id = uuid4().hex
self.user_id = user_id
self.project_id = project_id
self.type = cred_type
self.blob = blob
class FakeRoleAssignment(object):
def __init__(self, scope, role=None, role_name=None, user=None,
@ -87,13 +98,15 @@ class FakeRoleAssignment(object):
def setup_identity_cache(projects=None, users=None, role_assignments=None,
extra_roles=[]):
credentials=None, extra_roles=[]):
if not projects:
projects = []
if not users:
users = []
if not role_assignments:
role_assignments = []
if not credentials:
credentials = []
default_domain = FakeProject(
name="Default", is_domain=True)
@ -138,6 +151,7 @@ def setup_identity_cache(projects=None, users=None, role_assignments=None,
'domains': {
default_domain.id: default_domain,
},
'credentials': credentials,
}
@ -401,6 +415,37 @@ class FakeManager(object):
global identity_cache
return identity_cache['regions'].values()
def list_credentials(self, user_id, cred_type=None):
global identity_cache
found = []
for cred in identity_cache['credentials']:
if cred.user_id == user_id:
if cred_type and cred.type == cred_type:
found.append(cred)
elif cred_type is None:
found.append(cred)
return found
def add_credential(self, user, cred_type, blob, project=None):
global identity_cache
user = self._user_from_id(user)
project = self._project_from_id(project)
cred = FakeCredential(
user_id=user.id, blob=blob, cred_type=cred_type)
if project:
cred.project_id = project.id
identity_cache['credentials'].append(cred)
return cred
def clear_credential_type(self, user_id, cred_type):
global identity_cache
found = []
for cred in identity_cache['credentials']:
if cred.user_id == user_id and cred.type == cred_type:
found.append(cred)
for cred in found:
identity_cache['credentials'].remove(cred)
class FakeOpenstackClient(object):
class Quotas(object):

View File

@ -328,3 +328,22 @@ class IdentityManager(object): # pragma: no cover
def list_regions(self, **kwargs):
return self.ks_client.regions.list(**kwargs)
def list_credentials(self, user_id, cred_type=None):
return self.ks_client.credentials.list(
user_id=user_id, type=cred_type)
def add_credential(self, user, cred_type, blob, project=None):
return self.ks_client.credentials.create(
user=user, type=cred_type, blob=blob, project=project)
def delete_credential(self, credential):
return self.ks_client.credentials.delete(credential)
def clear_credential_type(self, user_id, cred_type):
# list credentials of the type for the user
credentials = self.ks_client.credentials.list(
user_id=user_id, type=cred_type)
for cred in credentials:
if cred.user_id == user_id and cred.type == cred_type:
self.ks_client.credentials.delete(cred)