Implement system scope

This commit introduces the necessary bits in order to get system
scoped tokens from a keystone server.

bp system-scope

Change-Id: I538f2a6cd2b4113910dfdac250c14f17f80051f6
This commit is contained in:
Lance Bragstad 2017-12-21 18:10:34 +00:00
parent dc667f7354
commit f9ab615eb1
11 changed files with 119 additions and 5 deletions

View File

@ -219,7 +219,7 @@ class AccessInfo(object):
:returns: bool
"""
return self.project_scoped or self.domain_scoped
return self.project_scoped or self.domain_scoped or self.system_scoped
@property
def project_scoped(self):
@ -237,6 +237,14 @@ class AccessInfo(object):
"""
raise NotImplementedError()
@property
def system_scoped(self):
"""Return true if the auth token was scoped to the system.
:returns: bool
"""
raise NotImplementedError()
@property
def trust_id(self):
"""Return the trust id associated with the auth request.
@ -491,6 +499,10 @@ class AccessInfoV2(AccessInfo):
def domain_scoped(self):
return False
@property
def system_scoped(self):
return False
@property
def _trust(self):
return self._data['access']['trust']
@ -647,6 +659,10 @@ class AccessInfoV3(AccessInfo):
def username(self):
return self._user['name']
@_missingproperty
def system(self):
return self._data['token']['system']
@property
def _domain(self):
return self._data['token']['domain']
@ -690,6 +706,10 @@ class AccessInfoV3(AccessInfo):
except KeyError:
return False
@_missingproperty
def system_scoped(self):
return self._data['token']['system'].get('all', False)
@property
def _trust(self):
return self._data['token']['OS-TRUST:trust']

View File

@ -264,6 +264,14 @@ class Token(dict):
def domain_name(self, value):
self.root.setdefault('domain', {})['name'] = value
@property
def system(self):
return self.root.get('system', {})
@system.setter
def system(self, value):
return self.root.setdefault('system', value)
@property
def trust_id(self):
return self.root.get('OS-TRUST:trust', {}).get('id')
@ -363,12 +371,13 @@ class Token(dict):
def validate(self):
project = self.root.get('project')
domain = self.root.get('domain')
system = self.root.get('system')
trust = self.root.get('OS-TRUST:trust')
catalog = self.root.get('catalog')
roles = self.root.get('roles')
scoped = project or domain or trust
if sum((bool(project), bool(domain), bool(trust))) > 1:
if sum((bool(project), bool(domain), bool(trust), bool(system))) > 1:
msg = 'You cannot scope to multiple targets'
raise exception.FixtureValidationError(msg)
@ -412,6 +421,13 @@ class Token(dict):
self.domain_id = id or uuid.uuid4().hex
self.domain_name = name or uuid.uuid4().hex
def set_system_scope(self):
# NOTE(lbragstad): In the future it might be possible to scope a token
# to a subset of the entire system (e.g. a specific service, region, or
# service within a region). Until then, the only system scope is the
# entire system.
self.system = {'all': True}
def set_trust_scope(self, id=None, impersonation=False,
trustee_user_id=None, trustor_user_id=None):
self.trust_id = id or uuid.uuid4().hex

View File

@ -41,6 +41,7 @@ class BaseGenericPlugin(base.BaseIdentityPlugin):
project_domain_name=None,
domain_id=None,
domain_name=None,
system_scope=None,
trust_id=None,
default_domain_id=None,
default_domain_name=None,
@ -54,6 +55,7 @@ class BaseGenericPlugin(base.BaseIdentityPlugin):
self._project_domain_name = project_domain_name
self._domain_id = domain_id
self._domain_name = domain_name
self._system_scope = system_scope
self._trust_id = trust_id
self._default_domain_id = default_domain_id
self._default_domain_name = default_domain_name
@ -102,6 +104,7 @@ class BaseGenericPlugin(base.BaseIdentityPlugin):
def _v3_params(self):
"""Return the parameters that are common to v3 plugins."""
return {'trust_id': self._trust_id,
'system_scope': self._system_scope,
'project_id': self._project_id,
'project_name': self._project_name,
'project_domain_id': self.project_domain_id,

View File

@ -31,6 +31,7 @@ class BaseAuth(base.BaseIdentityPlugin):
:param string auth_url: Identity service endpoint for authentication.
:param string trust_id: Trust ID for trust scoping.
:param string system_scope: System information to scope to.
:param string domain_id: Domain ID for domain scoping.
:param string domain_name: Domain name for domain scoping.
:param string project_id: Project ID for project scoping.
@ -45,6 +46,7 @@ class BaseAuth(base.BaseIdentityPlugin):
def __init__(self, auth_url,
trust_id=None,
system_scope=None,
domain_id=None,
domain_name=None,
project_id=None,
@ -56,6 +58,7 @@ class BaseAuth(base.BaseIdentityPlugin):
super(BaseAuth, self).__init__(auth_url=auth_url,
reauthenticate=reauthenticate)
self.trust_id = trust_id
self.system_scope = system_scope
self.domain_id = domain_id
self.domain_name = domain_name
self.project_id = project_id
@ -78,7 +81,7 @@ class BaseAuth(base.BaseIdentityPlugin):
"""Return true if parameters can be used to create a scoped token."""
return (self.domain_id or self.domain_name or
self.project_id or self.project_name or
self.trust_id)
self.trust_id or self.system_scope)
class Auth(BaseAuth):
@ -153,6 +156,15 @@ class Auth(BaseAuth):
body['auth']['scope'] = {'OS-TRUST:trust': {'id': self.trust_id}}
elif self.unscoped:
body['auth']['scope'] = 'unscoped'
elif self.system_scope:
# NOTE(lbragstad): Right now it's only possible to have role
# assignments on the entire system. In the future that might change
# so that users and groups can have roles on parts of the system,
# like a specific service in a specific region. If that happens,
# this will have to be accounted for here. Until then we'll only
# support scoping to the entire system.
if self.system_scope == 'all':
body['auth']['scope'] = {'system': {'all': True}}
# NOTE(jamielennox): we add nocatalog here rather than in token_url
# directly as some federation plugins require the base token_url

View File

@ -62,6 +62,7 @@ class Password(base.AuthConstructor):
:param string user_domain_id: User's domain ID for authentication.
:param string user_domain_name: User's domain name for authentication.
:param string trust_id: Trust ID for trust scoping.
:param string system_scope: System information to scope to.
:param string domain_id: Domain ID for domain scoping.
:param string domain_name: Domain name for domain scoping.
:param string project_id: Project ID for project scoping.

View File

@ -74,6 +74,7 @@ class BaseV3Loader(BaseIdentityLoader):
options = super(BaseV3Loader, self).get_options()
options.extend([
opts.Opt('system-scope', help='Scope for system operations'),
opts.Opt('domain-id', help='Domain ID to scope to'),
opts.Opt('domain-name', help='Domain name to scope to'),
opts.Opt('project-id', help='Project ID to scope to'),
@ -136,6 +137,7 @@ class BaseGenericLoader(BaseIdentityLoader):
options = super(BaseGenericLoader, self).get_options()
options.extend([
opts.Opt('system-scope', help='Scope for system operations'),
opts.Opt('domain-id', help='Domain ID to scope to'),
opts.Opt('domain-name', help='Domain name to scope to'),
opts.Opt('project-id', help='Project ID to scope to',

View File

@ -73,6 +73,54 @@ class AccessV3Test(utils.TestCase):
self.assertTrue(auth_ref.will_expire_soon(stale_duration=301))
self.assertFalse(auth_ref.will_expire_soon())
def test_building_system_scoped_assessinfo(self):
token = fixture.V3Token()
token.set_system_scope()
s = token.add_service(type='identity')
s.add_standard_endpoints(public='http://url')
token_id = uuid.uuid4().hex
auth_ref = access.create(body=token, auth_token=token_id)
self.assertTrue(auth_ref)
self.assertIn('methods', auth_ref._data['token'])
self.assertIn('catalog', auth_ref._data['token'])
self.assertTrue(auth_ref.has_service_catalog())
self.assertTrue(auth_ref._data['token']['catalog'])
self.assertEqual(token_id, auth_ref.auth_token)
self.assertEqual(token.user_name, auth_ref.username)
self.assertEqual(token.user_id, auth_ref.user_id)
self.assertEqual(token.role_ids, auth_ref.role_ids)
self.assertEqual(token.role_names, auth_ref.role_names)
self.assertEqual(token.domain_name, auth_ref.domain_name)
self.assertEqual(token.domain_id, auth_ref.domain_id)
self.assertEqual(token.user_domain_id, auth_ref.user_domain_id)
self.assertEqual(token.user_domain_name, auth_ref.user_domain_name)
self.assertIsNone(auth_ref.project_name)
self.assertIsNone(auth_ref.project_id)
self.assertIsNone(auth_ref.project_domain_id)
self.assertIsNone(auth_ref.project_domain_name)
self.assertIsNone(auth_ref.domain_name)
self.assertIsNone(auth_ref.domain_id)
self.assertEqual(token.system, auth_ref.system)
self.assertTrue(auth_ref.system_scoped)
self.assertFalse(auth_ref.domain_scoped)
self.assertFalse(auth_ref.project_scoped)
self.assertEqual(token.audit_id, auth_ref.audit_id)
self.assertEqual(token.audit_chain_id, auth_ref.audit_chain_id)
def test_building_domain_scoped_accessinfo(self):
token = fixture.V3Token()
token.set_domain_scope()

View File

@ -21,7 +21,8 @@ class FedKerbLoadingTests(test_utils.TestCase):
opts = [o.name for o in
loading.get_plugin_loader('v3fedkerb').get_options()]
allowed_opts = ['domain-id',
allowed_opts = ['system-scope',
'domain-id',
'domain-name',
'identity-provider',
'project-id',

View File

@ -20,7 +20,8 @@ class KerberosLoadingTests(test_utils.TestCase):
opts = [o.name for o in
loading.get_plugin_loader('v3kerberos').get_options()]
allowed_opts = ['domain-id',
allowed_opts = ['system-scope',
'domain-id',
'domain-name',
'project-id',
'project-name',

View File

@ -30,6 +30,7 @@ class PasswordTests(utils.TestCase):
'user-id',
'password',
'system-scope',
'domain-id',
'domain-name',
'project-id',
@ -70,6 +71,7 @@ class TokenTests(utils.TestCase):
opts = [o.name for o in generic.Token().get_options()]
allowed_opts = ['token',
'system-scope',
'domain-id',
'domain-name',
'project-id',

View File

@ -0,0 +1,8 @@
---
features:
- |
[`blueprint system-scope <https://blueprints.launchpad.net/keystone/+spec/system-scope>`_]
Keystoneauth now has the ability to authenticate for system-scoped tokens,
which were implemented during the Queens development cycle. System-scoped
tokens will eventually be required to separate system-level APIs from
project-level APIs, allowing for better security via scoped RBAC.