Remove KeystoneToken object
This commit removes the original KeystoneToken object in favor of the new TokenModel object. Since we have a token provider that knows how to deal with TokenModel object, we don't really need another object that uses reflection at all. Closes-Bug: 1778945 Change-Id: I778cab0a6449184ecf7d5ccfbfa12791be139236
This commit is contained in:
parent
0ab1f91704
commit
140a34b439
|
@ -95,10 +95,8 @@ class TrustResource(ks_flask.ResourceBase):
|
||||||
|
|
||||||
def _check_unrestricted(self):
|
def _check_unrestricted(self):
|
||||||
token = self.auth_context['token']
|
token = self.auth_context['token']
|
||||||
auth_methods = token['methods']
|
if 'application_credential' in token.methods:
|
||||||
if 'application_credential' in auth_methods:
|
if not token.application_credential['unrestricted']:
|
||||||
td = token.token_data['token']
|
|
||||||
if td['application_credential']['restricted']:
|
|
||||||
action = _("Using method 'application_credential' is not "
|
action = _("Using method 'application_credential' is not "
|
||||||
"allowed for managing trusts.")
|
"allowed for managing trusts.")
|
||||||
raise exception.ForbiddenAction(action=action)
|
raise exception.ForbiddenAction(action=action)
|
||||||
|
|
|
@ -84,10 +84,8 @@ class ApplicationCredentialV3(controller.V3Controller):
|
||||||
return {cls.member_name: ref}
|
return {cls.member_name: ref}
|
||||||
|
|
||||||
def _check_unrestricted(self, token):
|
def _check_unrestricted(self, token):
|
||||||
auth_methods = token['methods']
|
if 'application_credential' in token.methods:
|
||||||
if 'application_credential' in auth_methods:
|
if not token.application_credential['unrestricted']:
|
||||||
td = token.token_data['token']
|
|
||||||
if td['application_credential']['restricted']:
|
|
||||||
action = _("Using method 'application_credential' is not "
|
action = _("Using method 'application_credential' is not "
|
||||||
"allowed for managing additional application "
|
"allowed for managing additional application "
|
||||||
"credentials.")
|
"credentials.")
|
||||||
|
@ -112,7 +110,7 @@ class ApplicationCredentialV3(controller.V3Controller):
|
||||||
app_cred['user_id'] = user_id
|
app_cred['user_id'] = user_id
|
||||||
app_cred['project_id'] = project_id
|
app_cred['project_id'] = project_id
|
||||||
app_cred['roles'] = self._normalize_role_list(
|
app_cred['roles'] = self._normalize_role_list(
|
||||||
app_cred.get('roles', token['roles']))
|
app_cred.get('roles', token.roles))
|
||||||
if app_cred.get('expires_at'):
|
if app_cred.get('expires_at'):
|
||||||
app_cred['expires_at'] = utils.parse_expiration_date(
|
app_cred['expires_at'] = utils.parse_expiration_date(
|
||||||
app_cred['expires_at'])
|
app_cred['expires_at'])
|
||||||
|
|
|
@ -24,7 +24,6 @@ from keystone import exception
|
||||||
from keystone.federation import constants as federation_constants
|
from keystone.federation import constants as federation_constants
|
||||||
from keystone.federation import utils
|
from keystone.federation import utils
|
||||||
from keystone.i18n import _
|
from keystone.i18n import _
|
||||||
from keystone.models import token_model
|
|
||||||
from keystone import notifications
|
from keystone import notifications
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
@ -37,9 +36,7 @@ class Mapped(base.AuthMethodHandler):
|
||||||
|
|
||||||
def _get_token_ref(self, auth_payload):
|
def _get_token_ref(self, auth_payload):
|
||||||
token_id = auth_payload['id']
|
token_id = auth_payload['id']
|
||||||
response = PROVIDERS.token_provider_api.validate_token(token_id)
|
return PROVIDERS.token_provider_api.validate_token(token_id)
|
||||||
return token_model.KeystoneToken(token_id=token_id,
|
|
||||||
token_data=response)
|
|
||||||
|
|
||||||
def authenticate(self, request, auth_payload):
|
def authenticate(self, request, auth_payload):
|
||||||
"""Authenticate mapped user and set an authentication context.
|
"""Authenticate mapped user and set an authentication context.
|
||||||
|
|
|
@ -162,11 +162,11 @@ def check_policy(controller, request, action,
|
||||||
|
|
||||||
|
|
||||||
def get_token_ref(context):
|
def get_token_ref(context):
|
||||||
"""Retrieve KeystoneToken object from the auth context and returns it.
|
"""Retrieve TokenModel object from the auth context and returns it.
|
||||||
|
|
||||||
:param dict context: The request context.
|
:param dict context: The request context.
|
||||||
:raises keystone.exception.Unauthorized: If auth context cannot be found.
|
:raises keystone.exception.Unauthorized: If auth context cannot be found.
|
||||||
:returns: The KeystoneToken object.
|
:returns: The TokenModel object.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Retrieve the auth context that was prepared by AuthContextMiddleware.
|
# Retrieve the auth context that was prepared by AuthContextMiddleware.
|
||||||
|
|
|
@ -538,12 +538,12 @@ class V3Controller(provider_api.ProviderAPIMixin, wsgi.Application):
|
||||||
if domain_id:
|
if domain_id:
|
||||||
return domain_id
|
return domain_id
|
||||||
|
|
||||||
token_ref = authorization.get_token_ref(request.context_dict)
|
token = authorization.get_token_ref(request.context_dict)
|
||||||
|
|
||||||
if token_ref.domain_scoped:
|
if token.domain_scoped:
|
||||||
return token_ref.domain_id
|
return token.domain_id
|
||||||
elif token_ref.project_scoped:
|
elif token.project_scoped:
|
||||||
return token_ref.project_domain_id
|
return token.project_domain['id']
|
||||||
else:
|
else:
|
||||||
msg = _('No domain information specified as part of list request')
|
msg = _('No domain information specified as part of list request')
|
||||||
LOG.warning(msg)
|
LOG.warning(msg)
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import copy
|
||||||
import functools
|
import functools
|
||||||
|
|
||||||
import flask
|
import flask
|
||||||
|
@ -19,6 +20,7 @@ from oslo_utils import strutils
|
||||||
|
|
||||||
from keystone.common import authorization
|
from keystone.common import authorization
|
||||||
from keystone.common import context
|
from keystone.common import context
|
||||||
|
from keystone.common import controller
|
||||||
from keystone.common import policies
|
from keystone.common import policies
|
||||||
from keystone.common import provider_api
|
from keystone.common import provider_api
|
||||||
from keystone.common import utils
|
from keystone.common import utils
|
||||||
|
@ -77,6 +79,19 @@ class RBACEnforcer(object):
|
||||||
extra.update(exc=exception.ForbiddenAction, action=action,
|
extra.update(exc=exception.ForbiddenAction, action=action,
|
||||||
do_raise=do_raise)
|
do_raise=do_raise)
|
||||||
|
|
||||||
|
# NOTE(lbragstad): If there is a token in the credentials dictionary,
|
||||||
|
# it's going to be an instance of a TokenModel. We'll need to convert
|
||||||
|
# it to the a token response or dictionary before passing it to
|
||||||
|
# oslo.policy for enforcement. This is because oslo.policy shouldn't
|
||||||
|
# know how to deal with an internal object only used within keystone.
|
||||||
|
if 'token' in credentials:
|
||||||
|
token_ref = controller.render_token_response_from_model(
|
||||||
|
credentials['token']
|
||||||
|
)
|
||||||
|
credentials_copy = copy.deepcopy(credentials)
|
||||||
|
credentials_copy['token'] = token_ref
|
||||||
|
credentials = credentials_copy
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return self._enforcer.enforce(
|
return self._enforcer.enforce(
|
||||||
rule=action, target=target, creds=credentials, **extra)
|
rule=action, target=target, creds=credentials, **extra)
|
||||||
|
|
|
@ -28,6 +28,7 @@ from keystone.models import token_model
|
||||||
|
|
||||||
CONF = keystone.conf.CONF
|
CONF = keystone.conf.CONF
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
PROVIDERS = provider_api.ProviderAPIs
|
||||||
|
|
||||||
__all__ = ('AuthContextMiddleware',)
|
__all__ = ('AuthContextMiddleware',)
|
||||||
|
|
||||||
|
@ -169,16 +170,20 @@ class AuthContextMiddleware(provider_api.ProviderAPIMixin,
|
||||||
# parallel; only ione or the other should be used for access.
|
# parallel; only ione or the other should be used for access.
|
||||||
request_context.is_admin_project = False
|
request_context.is_admin_project = False
|
||||||
request_context.domain_id = token.domain_id
|
request_context.domain_id = token.domain_id
|
||||||
request_context.domain_name = token.domain_name
|
request_context.domain_name = token.domain['name']
|
||||||
if token.oauth_scoped:
|
if token.oauth_scoped:
|
||||||
request_context.is_delegated_auth = True
|
request_context.is_delegated_auth = True
|
||||||
request_context.oauth_consumer_id = token.oauth_consumer_id
|
request_context.oauth_consumer_id = (
|
||||||
request_context.oauth_access_token_id = token.oauth_access_token_id
|
token.access_token['consumer_id']
|
||||||
|
)
|
||||||
|
request_context.oauth_access_token_id = token.access_token_id
|
||||||
if token.trust_scoped:
|
if token.trust_scoped:
|
||||||
request_context.is_delegated_auth = True
|
request_context.is_delegated_auth = True
|
||||||
request_context.trust_id = token.trust_id
|
request_context.trust_id = token.trust_id
|
||||||
if token.is_federated_user:
|
if token.is_federated:
|
||||||
request_context.group_ids = token.federation_group_ids
|
request_context.group_ids = []
|
||||||
|
for group in token.federated_groups:
|
||||||
|
request_context.group_ids.append(group['id'])
|
||||||
else:
|
else:
|
||||||
request_context.group_ids = []
|
request_context.group_ids = []
|
||||||
|
|
||||||
|
@ -212,8 +217,9 @@ class AuthContextMiddleware(provider_api.ProviderAPIMixin,
|
||||||
elif request.token_auth.has_user_token:
|
elif request.token_auth.has_user_token:
|
||||||
# Keystone enforces policy on some values that other services
|
# Keystone enforces policy on some values that other services
|
||||||
# do not, and should not, use. This adds them in to the context.
|
# do not, and should not, use. This adds them in to the context.
|
||||||
token = token_model.KeystoneToken(token_id=request.user_token,
|
token = PROVIDERS.token_provider_api.validate_token(
|
||||||
token_data=request.token_info)
|
request.user_token
|
||||||
|
)
|
||||||
self._keystone_specific_values(token, request_context)
|
self._keystone_specific_values(token, request_context)
|
||||||
request_context.auth_token = request.user_token
|
request_context.auth_token = request.user_token
|
||||||
auth_context = request_context.to_policy_values()
|
auth_context = request_context.to_policy_values()
|
||||||
|
|
|
@ -17,13 +17,11 @@ import itertools
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from oslo_serialization import msgpackutils
|
from oslo_serialization import msgpackutils
|
||||||
from oslo_utils import reflection
|
from oslo_utils import reflection
|
||||||
from oslo_utils import timeutils
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from keystone.common import cache
|
from keystone.common import cache
|
||||||
from keystone.common import provider_api
|
from keystone.common import provider_api
|
||||||
from keystone import exception
|
from keystone import exception
|
||||||
from keystone.federation import constants
|
|
||||||
from keystone.i18n import _
|
from keystone.i18n import _
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
@ -34,268 +32,6 @@ V3 = 'v3.0'
|
||||||
VERSIONS = frozenset([V3])
|
VERSIONS = frozenset([V3])
|
||||||
|
|
||||||
|
|
||||||
def _parse_and_normalize_time(time_data):
|
|
||||||
if isinstance(time_data, six.string_types):
|
|
||||||
time_data = timeutils.parse_isotime(time_data)
|
|
||||||
return timeutils.normalize_time(time_data)
|
|
||||||
|
|
||||||
|
|
||||||
class KeystoneToken(dict):
|
|
||||||
"""An in-memory representation that unifies v3 tokens."""
|
|
||||||
|
|
||||||
# TODO(morganfainberg): Align this in-memory representation with the
|
|
||||||
# objects in keystoneclient. This object should be eventually updated
|
|
||||||
# to be the source of token data with the ability to emit any version
|
|
||||||
# of the token instead of only consuming the token dict and providing
|
|
||||||
# property accessors for the underlying data.
|
|
||||||
|
|
||||||
def __init__(self, token_id, token_data):
|
|
||||||
self.token_data = token_data
|
|
||||||
self.token_id = token_id
|
|
||||||
try:
|
|
||||||
super(KeystoneToken, self).__init__(**token_data['token'])
|
|
||||||
except KeyError:
|
|
||||||
raise exception.UnsupportedTokenVersionException()
|
|
||||||
|
|
||||||
if self.project_scoped and self.domain_scoped:
|
|
||||||
raise exception.UnexpectedError(_('Found invalid token: scoped to '
|
|
||||||
'both project and domain.'))
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
"""Return string representation of KeystoneToken."""
|
|
||||||
desc = ('<%(type)s (audit_id=%(audit_id)s, '
|
|
||||||
'audit_chain_id=%(audit_chain_id)s) at %(loc)s>')
|
|
||||||
self_cls_name = reflection.get_class_name(self,
|
|
||||||
fully_qualified=False)
|
|
||||||
return desc % {'type': self_cls_name,
|
|
||||||
'audit_id': self.audit_id,
|
|
||||||
'audit_chain_id': self.audit_chain_id,
|
|
||||||
'loc': hex(id(self))}
|
|
||||||
|
|
||||||
@property
|
|
||||||
def expires(self):
|
|
||||||
return _parse_and_normalize_time(self['expires_at'])
|
|
||||||
|
|
||||||
@property
|
|
||||||
def issued(self):
|
|
||||||
return _parse_and_normalize_time(self['issued_at'])
|
|
||||||
|
|
||||||
@property
|
|
||||||
def audit_id(self):
|
|
||||||
return self.get('audit_ids', [None])[0]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def audit_chain_id(self):
|
|
||||||
return self.get('audit_ids', [None])[-1]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def auth_token(self):
|
|
||||||
return self.token_id
|
|
||||||
|
|
||||||
@property
|
|
||||||
def user_id(self):
|
|
||||||
return self['user']['id']
|
|
||||||
|
|
||||||
@property
|
|
||||||
def user_name(self):
|
|
||||||
return self['user']['name']
|
|
||||||
|
|
||||||
@property
|
|
||||||
def user_domain_name(self):
|
|
||||||
try:
|
|
||||||
return self['user']['domain']['name']
|
|
||||||
except KeyError: # nosec
|
|
||||||
# Do not raise KeyError, raise UnexpectedError
|
|
||||||
pass
|
|
||||||
raise exception.UnexpectedError()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def user_password_expires_at(self):
|
|
||||||
try:
|
|
||||||
return self['user']['password_expires_at']
|
|
||||||
except KeyError:
|
|
||||||
# Do not raise KeyError, raise UnexpectedError
|
|
||||||
pass
|
|
||||||
raise exception.UnexpectedError()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def user_domain_id(self):
|
|
||||||
try:
|
|
||||||
return self['user']['domain']['id']
|
|
||||||
except KeyError: # nosec
|
|
||||||
# Do not raise KeyError, raise UnexpectedError
|
|
||||||
pass
|
|
||||||
raise exception.UnexpectedError()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def domain_id(self):
|
|
||||||
try:
|
|
||||||
return self['domain']['id']
|
|
||||||
except KeyError:
|
|
||||||
# Do not raise KeyError, raise UnexpectedError
|
|
||||||
raise exception.UnexpectedError()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def domain_name(self):
|
|
||||||
try:
|
|
||||||
return self['domain']['name']
|
|
||||||
except KeyError:
|
|
||||||
# Do not raise KeyError, raise UnexpectedError
|
|
||||||
raise exception.UnexpectedError()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def project_id(self):
|
|
||||||
try:
|
|
||||||
return self['project']['id']
|
|
||||||
except KeyError:
|
|
||||||
# Do not raise KeyError, raise UnexpectedError
|
|
||||||
raise exception.UnexpectedError()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def project_name(self):
|
|
||||||
try:
|
|
||||||
return self['project']['name']
|
|
||||||
except KeyError:
|
|
||||||
# Do not raise KeyError, raise UnexpectedError
|
|
||||||
raise exception.UnexpectedError()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def project_domain_id(self):
|
|
||||||
try:
|
|
||||||
return self['project']['domain']['id']
|
|
||||||
except KeyError: # nosec
|
|
||||||
# Do not raise KeyError, raise UnexpectedError
|
|
||||||
pass
|
|
||||||
|
|
||||||
raise exception.UnexpectedError()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def project_domain_name(self):
|
|
||||||
try:
|
|
||||||
return self['project']['domain']['name']
|
|
||||||
except KeyError: # nosec
|
|
||||||
# Do not raise KeyError, raise UnexpectedError
|
|
||||||
pass
|
|
||||||
|
|
||||||
raise exception.UnexpectedError()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def is_domain(self):
|
|
||||||
if 'is_domain' in self:
|
|
||||||
return self['is_domain']
|
|
||||||
return False
|
|
||||||
|
|
||||||
@property
|
|
||||||
def system_scoped(self):
|
|
||||||
return 'system' in self
|
|
||||||
|
|
||||||
@property
|
|
||||||
def project_scoped(self):
|
|
||||||
return 'project' in self
|
|
||||||
|
|
||||||
@property
|
|
||||||
def domain_scoped(self):
|
|
||||||
return 'domain' in self
|
|
||||||
|
|
||||||
@property
|
|
||||||
def scoped(self):
|
|
||||||
return self.project_scoped or self.domain_scoped or self.system_scoped
|
|
||||||
|
|
||||||
@property
|
|
||||||
def is_admin_project(self):
|
|
||||||
# Prevent domain scoped tokens from acting as is_admin_project
|
|
||||||
if self.domain_scoped:
|
|
||||||
return False
|
|
||||||
# TODO(ayoung/edmondsw): Having is_admin_project default to True is
|
|
||||||
# essential for fixing bug #968696. If an admin project is not
|
|
||||||
# configured, we can add checks for is_admin_project:True and not
|
|
||||||
# block anyone that hasn't configured an admin_project. Do not change
|
|
||||||
# this until we can assume admin_project is actually set
|
|
||||||
return self.get('is_admin_project', True)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def trust_id(self):
|
|
||||||
return self.get('OS-TRUST:trust', {}).get('id')
|
|
||||||
|
|
||||||
@property
|
|
||||||
def trust_scoped(self):
|
|
||||||
return 'OS-TRUST:trust' in self
|
|
||||||
|
|
||||||
@property
|
|
||||||
def trustee_user_id(self):
|
|
||||||
return self.get('OS-TRUST:trust', {}).get('trustee_user_id')
|
|
||||||
|
|
||||||
@property
|
|
||||||
def trustor_user_id(self):
|
|
||||||
return self.get('OS-TRUST:trust', {}).get('trustor_user_id')
|
|
||||||
|
|
||||||
@property
|
|
||||||
def trust_impersonation(self):
|
|
||||||
return self.get('OS-TRUST:trust', {}).get('impersonation')
|
|
||||||
|
|
||||||
@property
|
|
||||||
def oauth_scoped(self):
|
|
||||||
return 'OS-OAUTH1' in self
|
|
||||||
|
|
||||||
@property
|
|
||||||
def oauth_access_token_id(self):
|
|
||||||
if self.oauth_scoped:
|
|
||||||
return self['OS-OAUTH1']['access_token_id']
|
|
||||||
return None
|
|
||||||
|
|
||||||
@property
|
|
||||||
def oauth_consumer_id(self):
|
|
||||||
if self.oauth_scoped:
|
|
||||||
return self['OS-OAUTH1']['consumer_id']
|
|
||||||
return None
|
|
||||||
|
|
||||||
@property
|
|
||||||
def role_ids(self):
|
|
||||||
return [r['id'] for r in self.get('roles', [])]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def role_names(self):
|
|
||||||
return [r['name'] for r in self.get('roles', [])]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def is_federated_user(self):
|
|
||||||
try:
|
|
||||||
return constants.FEDERATION in self['user']
|
|
||||||
except KeyError:
|
|
||||||
raise exception.UnexpectedError()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def federation_group_ids(self):
|
|
||||||
if self.is_federated_user:
|
|
||||||
try:
|
|
||||||
groups = self['user'][constants.FEDERATION].get('groups', [])
|
|
||||||
return [g['id'] for g in groups]
|
|
||||||
except KeyError:
|
|
||||||
raise exception.UnexpectedError()
|
|
||||||
return []
|
|
||||||
|
|
||||||
@property
|
|
||||||
def federation_idp_id(self):
|
|
||||||
if self.is_federated_user:
|
|
||||||
return (
|
|
||||||
self['user'][constants.FEDERATION]['identity_provider']['id']
|
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def federation_protocol_id(self):
|
|
||||||
if self.is_federated_user:
|
|
||||||
return self['user'][constants.FEDERATION]['protocol']['id']
|
|
||||||
return None
|
|
||||||
|
|
||||||
@property
|
|
||||||
def metadata(self):
|
|
||||||
return self.get('metadata', {})
|
|
||||||
|
|
||||||
@property
|
|
||||||
def methods(self):
|
|
||||||
return self.get('methods', [])
|
|
||||||
|
|
||||||
|
|
||||||
class TokenModel(object):
|
class TokenModel(object):
|
||||||
"""An object that represents a token emitted by keystone.
|
"""An object that represents a token emitted by keystone.
|
||||||
|
|
||||||
|
|
|
@ -1,587 +0,0 @@
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import copy
|
|
||||||
import datetime
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
from oslo_utils import timeutils
|
|
||||||
from six.moves import range
|
|
||||||
|
|
||||||
from keystone.common import provider_api
|
|
||||||
from keystone.common import utils as ks_utils
|
|
||||||
import keystone.conf
|
|
||||||
from keystone import exception
|
|
||||||
from keystone.federation import constants as federation_constants
|
|
||||||
from keystone.models import token_model
|
|
||||||
from keystone.tests.unit import base_classes
|
|
||||||
from keystone.tests.unit import core
|
|
||||||
from keystone.tests.unit import test_token_provider
|
|
||||||
from keystone.token import provider
|
|
||||||
|
|
||||||
CONF = keystone.conf.CONF
|
|
||||||
PROVIDERS = provider_api.ProviderAPIs
|
|
||||||
|
|
||||||
|
|
||||||
class TestKeystoneTokenModel(core.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
super(TestKeystoneTokenModel, self).setUp()
|
|
||||||
self.v3_sample_token = copy.deepcopy(
|
|
||||||
test_token_provider.SAMPLE_V3_TOKEN)
|
|
||||||
|
|
||||||
def test_token_model_v3(self):
|
|
||||||
token_data = token_model.KeystoneToken(uuid.uuid4().hex,
|
|
||||||
self.v3_sample_token)
|
|
||||||
expires = timeutils.normalize_time(timeutils.parse_isotime(
|
|
||||||
self.v3_sample_token['token']['expires_at']))
|
|
||||||
issued = timeutils.normalize_time(timeutils.parse_isotime(
|
|
||||||
self.v3_sample_token['token']['issued_at']))
|
|
||||||
self.assertEqual(expires, token_data.expires)
|
|
||||||
self.assertEqual(issued, token_data.issued)
|
|
||||||
self.assertEqual(self.v3_sample_token['token']['user']['id'],
|
|
||||||
token_data.user_id)
|
|
||||||
self.assertEqual(self.v3_sample_token['token']['user']['name'],
|
|
||||||
token_data.user_name)
|
|
||||||
self.assertEqual(
|
|
||||||
self.v3_sample_token['token']['user']['password_expires_at'],
|
|
||||||
token_data.user_password_expires_at)
|
|
||||||
self.assertEqual(self.v3_sample_token['token']['user']['domain']['id'],
|
|
||||||
token_data.user_domain_id)
|
|
||||||
self.assertEqual(
|
|
||||||
self.v3_sample_token['token']['user']['domain']['name'],
|
|
||||||
token_data.user_domain_name)
|
|
||||||
self.assertEqual(
|
|
||||||
self.v3_sample_token['token']['project']['domain']['id'],
|
|
||||||
token_data.project_domain_id)
|
|
||||||
self.assertEqual(
|
|
||||||
self.v3_sample_token['token']['project']['domain']['name'],
|
|
||||||
token_data.project_domain_name)
|
|
||||||
self.assertEqual(
|
|
||||||
self.v3_sample_token['token']['is_domain'], token_data.is_domain)
|
|
||||||
self.assertEqual(self.v3_sample_token['token']['OS-TRUST:trust']['id'],
|
|
||||||
token_data.trust_id)
|
|
||||||
self.assertEqual(
|
|
||||||
self.v3_sample_token['token']['OS-TRUST:trust']['trustor_user_id'],
|
|
||||||
token_data.trustor_user_id)
|
|
||||||
self.assertEqual(
|
|
||||||
self.v3_sample_token['token']['OS-TRUST:trust']['trustee_user_id'],
|
|
||||||
token_data.trustee_user_id)
|
|
||||||
|
|
||||||
# Project Scoped Token
|
|
||||||
self.assertRaises(exception.UnexpectedError, getattr, token_data,
|
|
||||||
'domain_id')
|
|
||||||
self.assertRaises(exception.UnexpectedError, getattr, token_data,
|
|
||||||
'domain_name')
|
|
||||||
self.assertFalse(token_data.domain_scoped)
|
|
||||||
self.assertEqual(self.v3_sample_token['token']['project']['id'],
|
|
||||||
token_data.project_id)
|
|
||||||
self.assertEqual(self.v3_sample_token['token']['project']['name'],
|
|
||||||
token_data.project_name)
|
|
||||||
self.assertTrue(token_data.project_scoped)
|
|
||||||
self.assertTrue(token_data.scoped)
|
|
||||||
self.assertTrue(token_data.trust_scoped)
|
|
||||||
|
|
||||||
# by default admin project is True for project scoped tokens
|
|
||||||
self.assertTrue(token_data.is_admin_project)
|
|
||||||
|
|
||||||
self.assertEqual(
|
|
||||||
[r['id'] for r in self.v3_sample_token['token']['roles']],
|
|
||||||
token_data.role_ids)
|
|
||||||
self.assertEqual(
|
|
||||||
[r['name'] for r in self.v3_sample_token['token']['roles']],
|
|
||||||
token_data.role_names)
|
|
||||||
|
|
||||||
# Domain Scoped Token
|
|
||||||
token_data.pop('project')
|
|
||||||
self.assertFalse(token_data.project_scoped)
|
|
||||||
self.assertFalse(token_data.scoped)
|
|
||||||
self.assertRaises(exception.UnexpectedError, getattr, token_data,
|
|
||||||
'project_id')
|
|
||||||
self.assertRaises(exception.UnexpectedError, getattr, token_data,
|
|
||||||
'project_name')
|
|
||||||
self.assertFalse(token_data.project_scoped)
|
|
||||||
domain_id = uuid.uuid4().hex
|
|
||||||
domain_name = uuid.uuid4().hex
|
|
||||||
token_data['domain'] = {'id': domain_id,
|
|
||||||
'name': domain_name}
|
|
||||||
self.assertEqual(domain_id, token_data.domain_id)
|
|
||||||
self.assertEqual(domain_name, token_data.domain_name)
|
|
||||||
self.assertTrue(token_data.domain_scoped)
|
|
||||||
|
|
||||||
token_data['audit_ids'] = [uuid.uuid4().hex]
|
|
||||||
self.assertEqual(token_data.audit_id,
|
|
||||||
token_data['audit_ids'][0])
|
|
||||||
self.assertEqual(token_data.audit_chain_id,
|
|
||||||
token_data['audit_ids'][0])
|
|
||||||
token_data['audit_ids'].append(uuid.uuid4().hex)
|
|
||||||
self.assertEqual(token_data.audit_chain_id,
|
|
||||||
token_data['audit_ids'][1])
|
|
||||||
del token_data['audit_ids']
|
|
||||||
self.assertIsNone(token_data.audit_id)
|
|
||||||
self.assertIsNone(token_data.audit_chain_id)
|
|
||||||
|
|
||||||
# by default admin project is False for domain scoped tokens
|
|
||||||
self.assertFalse(token_data.is_admin_project)
|
|
||||||
|
|
||||||
def test_token_model_v3_federated_user(self):
|
|
||||||
token_data = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
|
|
||||||
token_data=self.v3_sample_token)
|
|
||||||
federation_data = {'identity_provider': {'id': uuid.uuid4().hex},
|
|
||||||
'protocol': {'id': 'saml2'},
|
|
||||||
'groups': [{'id': uuid.uuid4().hex}
|
|
||||||
for x in range(1, 5)]}
|
|
||||||
|
|
||||||
self.assertFalse(token_data.is_federated_user)
|
|
||||||
self.assertEqual([], token_data.federation_group_ids)
|
|
||||||
self.assertIsNone(token_data.federation_protocol_id)
|
|
||||||
self.assertIsNone(token_data.federation_idp_id)
|
|
||||||
|
|
||||||
token_data['user'][federation_constants.FEDERATION] = federation_data
|
|
||||||
|
|
||||||
self.assertTrue(token_data.is_federated_user)
|
|
||||||
self.assertEqual([x['id'] for x in federation_data['groups']],
|
|
||||||
token_data.federation_group_ids)
|
|
||||||
self.assertEqual(federation_data['protocol']['id'],
|
|
||||||
token_data.federation_protocol_id)
|
|
||||||
self.assertEqual(federation_data['identity_provider']['id'],
|
|
||||||
token_data.federation_idp_id)
|
|
||||||
|
|
||||||
def test_token_model_unknown(self):
|
|
||||||
self.assertRaises(exception.UnsupportedTokenVersionException,
|
|
||||||
token_model.KeystoneToken,
|
|
||||||
token_id=uuid.uuid4().hex,
|
|
||||||
token_data={'bogus_data': uuid.uuid4().hex})
|
|
||||||
|
|
||||||
def test_token_model_dual_scoped_token(self):
|
|
||||||
domain = {'id': uuid.uuid4().hex,
|
|
||||||
'name': uuid.uuid4().hex}
|
|
||||||
self.v3_sample_token['token']['domain'] = domain
|
|
||||||
|
|
||||||
self.assertRaises(exception.UnexpectedError,
|
|
||||||
token_model.KeystoneToken,
|
|
||||||
token_id=uuid.uuid4().hex,
|
|
||||||
token_data=self.v3_sample_token)
|
|
||||||
|
|
||||||
def test_token_model_is_admin_project(self):
|
|
||||||
token_data = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
|
|
||||||
token_data=self.v3_sample_token)
|
|
||||||
|
|
||||||
token_data['is_admin_project'] = False
|
|
||||||
self.assertFalse(token_data.is_admin_project)
|
|
||||||
|
|
||||||
|
|
||||||
class TokenModelTests(base_classes.TestCaseWithBootstrap):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TokenModelTests, self).setUp()
|
|
||||||
self.admin_user_id = self.bootstrapper.admin_user_id
|
|
||||||
self.admin_username = self.bootstrapper.admin_username
|
|
||||||
self.admin_password = self.bootstrapper.admin_password
|
|
||||||
self.project_id = self.bootstrapper.project_id
|
|
||||||
self.project_name = self.bootstrapper.project_name
|
|
||||||
self.admin_role_id = self.bootstrapper.admin_role_id
|
|
||||||
self.member_role_id = self.bootstrapper.member_role_id
|
|
||||||
self.reader_role_id = self.bootstrapper.reader_role_id
|
|
||||||
|
|
||||||
self.token_id = uuid.uuid4().hex
|
|
||||||
issued_at = datetime.datetime.utcnow()
|
|
||||||
self.issued_at = ks_utils.isotime(at=issued_at, subsecond=True)
|
|
||||||
|
|
||||||
def assertTokenContainsRole(self, token, role):
|
|
||||||
"""Ensure a role reference exists in a token's roles.
|
|
||||||
|
|
||||||
:param token: instance of ``keystone.models.token_model.TokenModel``
|
|
||||||
:param role: a dictionary reference of the expected role
|
|
||||||
"""
|
|
||||||
self.assertIn(role, token.roles)
|
|
||||||
|
|
||||||
def test_audit_id_attributes(self):
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
audit_id = provider.random_urlsafe_str()
|
|
||||||
token.audit_id = audit_id
|
|
||||||
|
|
||||||
self.assertTrue(len(token.audit_ids) == 1)
|
|
||||||
|
|
||||||
parent_audit_id = provider.random_urlsafe_str()
|
|
||||||
token.parent_audit_id = parent_audit_id
|
|
||||||
|
|
||||||
self.assertTrue(len(token.audit_ids) == 2)
|
|
||||||
|
|
||||||
self.assertEqual(audit_id, token.audit_ids[0])
|
|
||||||
self.assertEqual(parent_audit_id, token.audit_ids[-1])
|
|
||||||
|
|
||||||
def test_token_model_user_attributes(self):
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.user_id = self.admin_user_id
|
|
||||||
token.user_domain_id = token.user['domain_id']
|
|
||||||
|
|
||||||
self.assertEqual(self.admin_user_id, token.user_id)
|
|
||||||
self.assertIsNotNone(token.user)
|
|
||||||
self.assertIsNotNone(token.user_domain)
|
|
||||||
self.assertEqual(self.admin_username, token.user['name'])
|
|
||||||
self.assertEqual(CONF.identity.default_domain_id, token.user_domain_id)
|
|
||||||
self.assertEqual(
|
|
||||||
CONF.identity.default_domain_id, token.user_domain['id']
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_mint_unscoped_token(self):
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.user_id = self.admin_user_id
|
|
||||||
|
|
||||||
token.mint(self.token_id, self.issued_at)
|
|
||||||
|
|
||||||
self.assertTrue(token.unscoped)
|
|
||||||
self.assertTrue(len(token.roles) == 0)
|
|
||||||
|
|
||||||
def test_mint_system_scoped_token(self):
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.user_id = self.admin_user_id
|
|
||||||
token.system = {'all': True}
|
|
||||||
|
|
||||||
token.mint(self.token_id, self.issued_at)
|
|
||||||
|
|
||||||
self.assertTrue(token.system_scoped)
|
|
||||||
self.assertFalse(token.domain_scoped)
|
|
||||||
self.assertFalse(token.project_scoped)
|
|
||||||
self.assertFalse(token.trust_scoped)
|
|
||||||
self.assertFalse(token.unscoped)
|
|
||||||
|
|
||||||
self.assertIsNotNone(token.system)
|
|
||||||
self.assertTrue(len(token.roles) == 1)
|
|
||||||
admin_role = {'id': self.admin_role_id, 'name': 'admin'}
|
|
||||||
self.assertTokenContainsRole(token, admin_role)
|
|
||||||
|
|
||||||
def test_mint_system_scoped_token_with_multiple_roles(self):
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.user_id = self.admin_user_id
|
|
||||||
token.system = {'all': True}
|
|
||||||
|
|
||||||
self.assertTrue(token.system_scoped)
|
|
||||||
self.assertFalse(token.domain_scoped)
|
|
||||||
self.assertFalse(token.project_scoped)
|
|
||||||
self.assertFalse(token.trust_scoped)
|
|
||||||
self.assertFalse(token.unscoped)
|
|
||||||
|
|
||||||
role = core.new_role_ref()
|
|
||||||
PROVIDERS.role_api.create_role(role['id'], role)
|
|
||||||
role.pop('domain_id')
|
|
||||||
PROVIDERS.assignment_api.create_system_grant_for_user(
|
|
||||||
self.admin_user_id, role['id']
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertIsNotNone(token.system)
|
|
||||||
self.assertTrue(len(token.roles) == 2)
|
|
||||||
admin_role = {'id': self.admin_role_id, 'name': 'admin'}
|
|
||||||
self.assertTokenContainsRole(token, admin_role)
|
|
||||||
self.assertTokenContainsRole(token, role)
|
|
||||||
|
|
||||||
def test_mint_system_scoped_token_without_roles_fails(self):
|
|
||||||
user = core.new_user_ref(CONF.identity.default_domain_id)
|
|
||||||
user = PROVIDERS.identity_api.create_user(user)
|
|
||||||
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.user_id = user['id']
|
|
||||||
token.system = 'all'
|
|
||||||
token.audit_id = provider.random_urlsafe_str()
|
|
||||||
|
|
||||||
self.assertRaises(
|
|
||||||
exception.Unauthorized, token.mint, self.token_id, self.issued_at
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_mint_system_token_with_effective_role_assignment(self):
|
|
||||||
user = core.new_user_ref(CONF.identity.default_domain_id)
|
|
||||||
user = PROVIDERS.identity_api.create_user(user)
|
|
||||||
|
|
||||||
group = core.new_group_ref(CONF.identity.default_domain_id)
|
|
||||||
group = PROVIDERS.identity_api.create_group(group)
|
|
||||||
|
|
||||||
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
|
|
||||||
|
|
||||||
PROVIDERS.assignment_api.create_system_grant_for_group(
|
|
||||||
group['id'], self.admin_role_id
|
|
||||||
)
|
|
||||||
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.user_id = user['id']
|
|
||||||
token.system = 'all'
|
|
||||||
|
|
||||||
token.mint(self.token_id, self.issued_at)
|
|
||||||
|
|
||||||
exp_role = {'id': self.admin_role_id, 'name': 'admin'}
|
|
||||||
self.assertTokenContainsRole(token, exp_role)
|
|
||||||
|
|
||||||
def test_mint_domain_scoped_token(self):
|
|
||||||
PROVIDERS.assignment_api.create_grant(
|
|
||||||
self.admin_role_id, user_id=self.admin_user_id,
|
|
||||||
domain_id=CONF.identity.default_domain_id
|
|
||||||
)
|
|
||||||
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.user_id = self.admin_user_id
|
|
||||||
token.domain_id = CONF.identity.default_domain_id
|
|
||||||
|
|
||||||
token.mint(self.token_id, self.issued_at)
|
|
||||||
|
|
||||||
self.assertTrue(token.domain_scoped)
|
|
||||||
self.assertFalse(token.system_scoped)
|
|
||||||
self.assertFalse(token.project_scoped)
|
|
||||||
self.assertFalse(token.trust_scoped)
|
|
||||||
self.assertFalse(token.unscoped)
|
|
||||||
|
|
||||||
self.assertIsNotNone(token.domain)
|
|
||||||
exp_domain = PROVIDERS.resource_api.get_domain(
|
|
||||||
CONF.identity.default_domain_id
|
|
||||||
)
|
|
||||||
self.assertEqual(exp_domain['id'], token.domain_id)
|
|
||||||
self.assertEqual(exp_domain['name'], token.domain['name'])
|
|
||||||
|
|
||||||
self.assertTrue(len(token.roles) == 3)
|
|
||||||
exp_roles = [
|
|
||||||
{'id': self.admin_role_id, 'name': 'admin'},
|
|
||||||
{'id': self.member_role_id, 'name': 'member'},
|
|
||||||
{'id': self.reader_role_id, 'name': 'reader'}
|
|
||||||
]
|
|
||||||
for role in exp_roles:
|
|
||||||
self.assertTokenContainsRole(token, role)
|
|
||||||
|
|
||||||
def test_mint_domain_scoped_token_fails_without_roles(self):
|
|
||||||
user = core.new_user_ref(CONF.identity.default_domain_id)
|
|
||||||
user = PROVIDERS.identity_api.create_user(user)
|
|
||||||
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.user_id = user['id']
|
|
||||||
token.domain_id = CONF.identity.default_domain_id
|
|
||||||
token.audit_id = provider.random_urlsafe_str()
|
|
||||||
|
|
||||||
self.assertRaises(
|
|
||||||
exception.Unauthorized, token.mint, self.token_id, self.issued_at
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_mint_project_scoped_token(self):
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.user_id = self.admin_user_id
|
|
||||||
token.project_id = self.project_id
|
|
||||||
|
|
||||||
token.mint(self.token_id, self.issued_at)
|
|
||||||
|
|
||||||
self.assertTrue(token.project_scoped)
|
|
||||||
self.assertFalse(token.system_scoped)
|
|
||||||
self.assertFalse(token.domain_scoped)
|
|
||||||
self.assertFalse(token.trust_scoped)
|
|
||||||
self.assertFalse(token.unscoped)
|
|
||||||
|
|
||||||
self.assertIsNotNone(token.project)
|
|
||||||
self.assertEqual(self.project_name, token.project['name'])
|
|
||||||
|
|
||||||
self.assertTrue(len(token.roles) == 3)
|
|
||||||
exp_roles = [
|
|
||||||
{'id': self.admin_role_id, 'name': 'admin'},
|
|
||||||
{'id': self.member_role_id, 'name': 'member'},
|
|
||||||
{'id': self.reader_role_id, 'name': 'reader'}
|
|
||||||
]
|
|
||||||
for role in exp_roles:
|
|
||||||
self.assertTokenContainsRole(token, role)
|
|
||||||
|
|
||||||
def test_mint_project_scoped_token_fails_without_roles(self):
|
|
||||||
user = core.new_user_ref(CONF.identity.default_domain_id)
|
|
||||||
user = PROVIDERS.identity_api.create_user(user)
|
|
||||||
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.user_id = user['id']
|
|
||||||
token.project_id = self.project_id
|
|
||||||
token.audit_id = provider.random_urlsafe_str()
|
|
||||||
|
|
||||||
self.assertRaises(
|
|
||||||
exception.Unauthorized, token.mint, self.token_id, self.issued_at
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_mint_project_scoped_token_fails_when_project_is_disabled(self):
|
|
||||||
PROVIDERS.resource_api.update_project(
|
|
||||||
self.project_id, {'enabled': False}
|
|
||||||
)
|
|
||||||
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.user_id = self.admin_user_id
|
|
||||||
token.project_id = self.project_id
|
|
||||||
token.audit_id = provider.random_urlsafe_str()
|
|
||||||
|
|
||||||
self.assertRaises(
|
|
||||||
exception.ProjectNotFound, token.mint, self.token_id,
|
|
||||||
self.issued_at
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_mint_project_scoped_token_fails_when_domain_is_disabled(self):
|
|
||||||
project = PROVIDERS.resource_api.get_project(self.project_id)
|
|
||||||
PROVIDERS.resource_api.update_domain(
|
|
||||||
project['domain_id'], {'enabled': False}
|
|
||||||
)
|
|
||||||
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.user_id = self.admin_user_id
|
|
||||||
token.project_id = self.project_id
|
|
||||||
token.audit_id = provider.random_urlsafe_str()
|
|
||||||
|
|
||||||
self.assertRaises(
|
|
||||||
exception.DomainNotFound, token.mint, self.token_id, self.issued_at
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_mint_application_credential_token(self):
|
|
||||||
app_cred = {
|
|
||||||
'id': uuid.uuid4().hex,
|
|
||||||
'name': 'monitoring-application',
|
|
||||||
'user_id': self.admin_user_id,
|
|
||||||
'roles': [{'id': self.admin_role_id}],
|
|
||||||
'project_id': self.project_id,
|
|
||||||
'secret': uuid.uuid4().hex
|
|
||||||
}
|
|
||||||
|
|
||||||
PROVIDERS.application_credential_api.create_application_credential(
|
|
||||||
app_cred
|
|
||||||
)
|
|
||||||
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.user_id = self.admin_user_id
|
|
||||||
token.application_credential_id = app_cred['id']
|
|
||||||
token.project_id = self.project_id
|
|
||||||
|
|
||||||
token.mint(self.token_id, self.issued_at)
|
|
||||||
self.assertIsNotNone(token.application_credential_id)
|
|
||||||
self.assertIsNotNone(token.application_credential)
|
|
||||||
exp_role = {'id': self.admin_role_id, 'name': 'admin'}
|
|
||||||
self.assertTokenContainsRole(token, exp_role)
|
|
||||||
|
|
||||||
|
|
||||||
class TrustScopedTokenModelTests(TokenModelTests):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TrustScopedTokenModelTests, self).setUp()
|
|
||||||
|
|
||||||
trustor_domain = PROVIDERS.resource_api.create_domain(
|
|
||||||
uuid.uuid4().hex, core.new_domain_ref()
|
|
||||||
)
|
|
||||||
trustee_domain = PROVIDERS.resource_api.create_domain(
|
|
||||||
uuid.uuid4().hex, core.new_domain_ref()
|
|
||||||
)
|
|
||||||
|
|
||||||
self.trustor = PROVIDERS.identity_api.create_user(
|
|
||||||
core.new_user_ref(trustor_domain['id'])
|
|
||||||
)
|
|
||||||
self.trustee = PROVIDERS.identity_api.create_user(
|
|
||||||
core.new_user_ref(trustee_domain['id'])
|
|
||||||
)
|
|
||||||
|
|
||||||
PROVIDERS.assignment_api.create_grant(
|
|
||||||
self.admin_role_id, user_id=self.trustor['id'],
|
|
||||||
project_id=self.project_id
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_mint_trust_scoped_token(self):
|
|
||||||
roles = [{'id': self.admin_role_id}]
|
|
||||||
trust = core.new_trust_ref(
|
|
||||||
self.trustor['id'], self.trustee['id'], project_id=self.project_id
|
|
||||||
)
|
|
||||||
trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles)
|
|
||||||
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.trust_id = trust['id']
|
|
||||||
token.user_id = self.trustee['id']
|
|
||||||
|
|
||||||
token.mint(self.token_id, self.issued_at)
|
|
||||||
|
|
||||||
self.assertEqual(self.trustee['id'], token.user_id)
|
|
||||||
self.assertEqual(self.trustee['id'], token.trustee['id'])
|
|
||||||
self.assertEqual(self.trustor['id'], token.trustor['id'])
|
|
||||||
self.assertEqual(self.project_id, token.trust_project['id'])
|
|
||||||
self.assertEqual(
|
|
||||||
CONF.identity.default_domain_id, token.trust_project_domain['id']
|
|
||||||
)
|
|
||||||
# NOTE(lbragstad): The domain key here should be removed once
|
|
||||||
# https://bugs.launchpad.net/keystone/+bug/1763510 is fixed.
|
|
||||||
exp_role = {
|
|
||||||
'id': self.admin_role_id, 'name': 'admin', 'domain_id': None
|
|
||||||
}
|
|
||||||
self.assertTokenContainsRole(token, exp_role)
|
|
||||||
|
|
||||||
def test_mint_trust_scoped_token_fails_when_trustee_domain_disabled(self):
|
|
||||||
roles = [{'id': self.admin_role_id}]
|
|
||||||
trust = core.new_trust_ref(
|
|
||||||
self.trustor['id'], self.trustee['id'], project_id=self.project_id
|
|
||||||
)
|
|
||||||
trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles)
|
|
||||||
|
|
||||||
PROVIDERS.resource_api.update_domain(
|
|
||||||
self.trustee['domain_id'], {'enabled': False}
|
|
||||||
)
|
|
||||||
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.trust_id = trust['id']
|
|
||||||
token.user_id = self.trustee['id']
|
|
||||||
|
|
||||||
self.assertRaises(
|
|
||||||
exception.TokenNotFound, token.mint, self.token_id, self.issued_at
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_mint_trust_scoped_token_fails_when_trustor_domain_disabled(self):
|
|
||||||
roles = [{'id': self.admin_role_id}]
|
|
||||||
trust = core.new_trust_ref(
|
|
||||||
self.trustor['id'], self.trustee['id'], project_id=self.project_id
|
|
||||||
)
|
|
||||||
trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles)
|
|
||||||
|
|
||||||
PROVIDERS.resource_api.update_domain(
|
|
||||||
self.trustor['domain_id'], {'enabled': False}
|
|
||||||
)
|
|
||||||
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.trust_id = trust['id']
|
|
||||||
token.user_id = self.trustee['id']
|
|
||||||
|
|
||||||
self.assertRaises(
|
|
||||||
exception.TokenNotFound, token.mint, self.token_id, self.issued_at
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_mint_trust_scoped_token_fails_when_trustor_is_disabled(self):
|
|
||||||
roles = [{'id': self.admin_role_id}]
|
|
||||||
trust = core.new_trust_ref(
|
|
||||||
self.trustor['id'], self.trustee['id'], project_id=self.project_id
|
|
||||||
)
|
|
||||||
trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles)
|
|
||||||
|
|
||||||
PROVIDERS.identity_api.update_user(
|
|
||||||
self.trustor['id'], {'enabled': False}
|
|
||||||
)
|
|
||||||
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.trust_id = trust['id']
|
|
||||||
token.user_id = self.trustee['id']
|
|
||||||
|
|
||||||
self.assertRaises(
|
|
||||||
exception.Forbidden, token.mint, self.token_id, self.issued_at
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_mint_trust_scoped_token_with_mismatching_users_fails(self):
|
|
||||||
user = core.new_user_ref(CONF.identity.default_domain_id)
|
|
||||||
user = PROVIDERS.identity_api.create_user(user)
|
|
||||||
|
|
||||||
roles = [{'id': self.admin_role_id}]
|
|
||||||
trust = core.new_trust_ref(
|
|
||||||
self.trustor['id'], self.trustee['id'], project_id=self.project_id
|
|
||||||
)
|
|
||||||
trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles)
|
|
||||||
|
|
||||||
token = token_model.TokenModel()
|
|
||||||
token.trust_id = trust['id']
|
|
||||||
token.user_id = user['id']
|
|
||||||
|
|
||||||
self.assertRaises(
|
|
||||||
exception.Forbidden, token.mint, self.token_id, self.issued_at
|
|
||||||
)
|
|
Loading…
Reference in New Issue