From 140a34b439d9aa60712a164b21622e721a60dfc2 Mon Sep 17 00:00:00 2001 From: Lance Bragstad Date: Fri, 22 Jun 2018 22:02:04 +0000 Subject: [PATCH] Remove KeystoneToken object This commit removes the original KeystoneToken object in favor of the new TokenModel object. Since we have a token provider that knows how to deal with TokenModel object, we don't really need another object that uses reflection at all. Closes-Bug: 1778945 Change-Id: I778cab0a6449184ecf7d5ccfbfa12791be139236 --- keystone/api/trusts.py | 6 +- .../application_credential/controllers.py | 8 +- keystone/auth/plugins/mapped.py | 5 +- keystone/common/authorization.py | 4 +- keystone/common/controller.py | 10 +- keystone/common/rbac_enforcer/enforcer.py | 15 + keystone/middleware/auth.py | 20 +- keystone/models/token_model.py | 264 -------- keystone/tests/unit/token/test_token_model.py | 587 ------------------ 9 files changed, 41 insertions(+), 878 deletions(-) delete mode 100644 keystone/tests/unit/token/test_token_model.py diff --git a/keystone/api/trusts.py b/keystone/api/trusts.py index 027efd0d57..ce11cf1320 100644 --- a/keystone/api/trusts.py +++ b/keystone/api/trusts.py @@ -95,10 +95,8 @@ class TrustResource(ks_flask.ResourceBase): def _check_unrestricted(self): token = self.auth_context['token'] - auth_methods = token['methods'] - if 'application_credential' in auth_methods: - td = token.token_data['token'] - if td['application_credential']['restricted']: + if 'application_credential' in token.methods: + if not token.application_credential['unrestricted']: action = _("Using method 'application_credential' is not " "allowed for managing trusts.") raise exception.ForbiddenAction(action=action) diff --git a/keystone/application_credential/controllers.py b/keystone/application_credential/controllers.py index 0cb2df2c81..4350b3b3ef 100644 --- a/keystone/application_credential/controllers.py +++ b/keystone/application_credential/controllers.py @@ -84,10 +84,8 @@ class ApplicationCredentialV3(controller.V3Controller): return {cls.member_name: ref} def _check_unrestricted(self, token): - auth_methods = token['methods'] - if 'application_credential' in auth_methods: - td = token.token_data['token'] - if td['application_credential']['restricted']: + if 'application_credential' in token.methods: + if not token.application_credential['unrestricted']: action = _("Using method 'application_credential' is not " "allowed for managing additional application " "credentials.") @@ -112,7 +110,7 @@ class ApplicationCredentialV3(controller.V3Controller): app_cred['user_id'] = user_id app_cred['project_id'] = project_id app_cred['roles'] = self._normalize_role_list( - app_cred.get('roles', token['roles'])) + app_cred.get('roles', token.roles)) if app_cred.get('expires_at'): app_cred['expires_at'] = utils.parse_expiration_date( app_cred['expires_at']) diff --git a/keystone/auth/plugins/mapped.py b/keystone/auth/plugins/mapped.py index dcb9bc8ae0..6a67f1c5a8 100644 --- a/keystone/auth/plugins/mapped.py +++ b/keystone/auth/plugins/mapped.py @@ -24,7 +24,6 @@ from keystone import exception from keystone.federation import constants as federation_constants from keystone.federation import utils from keystone.i18n import _ -from keystone.models import token_model from keystone import notifications LOG = log.getLogger(__name__) @@ -37,9 +36,7 @@ class Mapped(base.AuthMethodHandler): def _get_token_ref(self, auth_payload): token_id = auth_payload['id'] - response = PROVIDERS.token_provider_api.validate_token(token_id) - return token_model.KeystoneToken(token_id=token_id, - token_data=response) + return PROVIDERS.token_provider_api.validate_token(token_id) def authenticate(self, request, auth_payload): """Authenticate mapped user and set an authentication context. diff --git a/keystone/common/authorization.py b/keystone/common/authorization.py index c521ef61e7..6daf258462 100644 --- a/keystone/common/authorization.py +++ b/keystone/common/authorization.py @@ -162,11 +162,11 @@ def check_policy(controller, request, action, def get_token_ref(context): - """Retrieve KeystoneToken object from the auth context and returns it. + """Retrieve TokenModel object from the auth context and returns it. :param dict context: The request context. :raises keystone.exception.Unauthorized: If auth context cannot be found. - :returns: The KeystoneToken object. + :returns: The TokenModel object. """ try: # Retrieve the auth context that was prepared by AuthContextMiddleware. diff --git a/keystone/common/controller.py b/keystone/common/controller.py index b3004a04d6..2fe1f06724 100644 --- a/keystone/common/controller.py +++ b/keystone/common/controller.py @@ -538,12 +538,12 @@ class V3Controller(provider_api.ProviderAPIMixin, wsgi.Application): if domain_id: return domain_id - token_ref = authorization.get_token_ref(request.context_dict) + token = authorization.get_token_ref(request.context_dict) - if token_ref.domain_scoped: - return token_ref.domain_id - elif token_ref.project_scoped: - return token_ref.project_domain_id + if token.domain_scoped: + return token.domain_id + elif token.project_scoped: + return token.project_domain['id'] else: msg = _('No domain information specified as part of list request') LOG.warning(msg) diff --git a/keystone/common/rbac_enforcer/enforcer.py b/keystone/common/rbac_enforcer/enforcer.py index 12ad1eb255..272232463c 100644 --- a/keystone/common/rbac_enforcer/enforcer.py +++ b/keystone/common/rbac_enforcer/enforcer.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. +import copy import functools import flask @@ -19,6 +20,7 @@ from oslo_utils import strutils from keystone.common import authorization from keystone.common import context +from keystone.common import controller from keystone.common import policies from keystone.common import provider_api from keystone.common import utils @@ -77,6 +79,19 @@ class RBACEnforcer(object): extra.update(exc=exception.ForbiddenAction, action=action, do_raise=do_raise) + # NOTE(lbragstad): If there is a token in the credentials dictionary, + # it's going to be an instance of a TokenModel. We'll need to convert + # it to the a token response or dictionary before passing it to + # oslo.policy for enforcement. This is because oslo.policy shouldn't + # know how to deal with an internal object only used within keystone. + if 'token' in credentials: + token_ref = controller.render_token_response_from_model( + credentials['token'] + ) + credentials_copy = copy.deepcopy(credentials) + credentials_copy['token'] = token_ref + credentials = credentials_copy + try: return self._enforcer.enforce( rule=action, target=target, creds=credentials, **extra) diff --git a/keystone/middleware/auth.py b/keystone/middleware/auth.py index b7553e9478..1aebf6cb78 100644 --- a/keystone/middleware/auth.py +++ b/keystone/middleware/auth.py @@ -28,6 +28,7 @@ from keystone.models import token_model CONF = keystone.conf.CONF LOG = log.getLogger(__name__) +PROVIDERS = provider_api.ProviderAPIs __all__ = ('AuthContextMiddleware',) @@ -169,16 +170,20 @@ class AuthContextMiddleware(provider_api.ProviderAPIMixin, # parallel; only ione or the other should be used for access. request_context.is_admin_project = False request_context.domain_id = token.domain_id - request_context.domain_name = token.domain_name + request_context.domain_name = token.domain['name'] if token.oauth_scoped: request_context.is_delegated_auth = True - request_context.oauth_consumer_id = token.oauth_consumer_id - request_context.oauth_access_token_id = token.oauth_access_token_id + request_context.oauth_consumer_id = ( + token.access_token['consumer_id'] + ) + request_context.oauth_access_token_id = token.access_token_id if token.trust_scoped: request_context.is_delegated_auth = True request_context.trust_id = token.trust_id - if token.is_federated_user: - request_context.group_ids = token.federation_group_ids + if token.is_federated: + request_context.group_ids = [] + for group in token.federated_groups: + request_context.group_ids.append(group['id']) else: request_context.group_ids = [] @@ -212,8 +217,9 @@ class AuthContextMiddleware(provider_api.ProviderAPIMixin, elif request.token_auth.has_user_token: # Keystone enforces policy on some values that other services # do not, and should not, use. This adds them in to the context. - token = token_model.KeystoneToken(token_id=request.user_token, - token_data=request.token_info) + token = PROVIDERS.token_provider_api.validate_token( + request.user_token + ) self._keystone_specific_values(token, request_context) request_context.auth_token = request.user_token auth_context = request_context.to_policy_values() diff --git a/keystone/models/token_model.py b/keystone/models/token_model.py index 20d2f449f0..937e97ab3f 100644 --- a/keystone/models/token_model.py +++ b/keystone/models/token_model.py @@ -17,13 +17,11 @@ import itertools from oslo_log import log from oslo_serialization import msgpackutils from oslo_utils import reflection -from oslo_utils import timeutils import six from keystone.common import cache from keystone.common import provider_api from keystone import exception -from keystone.federation import constants from keystone.i18n import _ LOG = log.getLogger(__name__) @@ -34,268 +32,6 @@ V3 = 'v3.0' VERSIONS = frozenset([V3]) -def _parse_and_normalize_time(time_data): - if isinstance(time_data, six.string_types): - time_data = timeutils.parse_isotime(time_data) - return timeutils.normalize_time(time_data) - - -class KeystoneToken(dict): - """An in-memory representation that unifies v3 tokens.""" - - # TODO(morganfainberg): Align this in-memory representation with the - # objects in keystoneclient. This object should be eventually updated - # to be the source of token data with the ability to emit any version - # of the token instead of only consuming the token dict and providing - # property accessors for the underlying data. - - def __init__(self, token_id, token_data): - self.token_data = token_data - self.token_id = token_id - try: - super(KeystoneToken, self).__init__(**token_data['token']) - except KeyError: - raise exception.UnsupportedTokenVersionException() - - if self.project_scoped and self.domain_scoped: - raise exception.UnexpectedError(_('Found invalid token: scoped to ' - 'both project and domain.')) - - def __repr__(self): - """Return string representation of KeystoneToken.""" - desc = ('<%(type)s (audit_id=%(audit_id)s, ' - 'audit_chain_id=%(audit_chain_id)s) at %(loc)s>') - self_cls_name = reflection.get_class_name(self, - fully_qualified=False) - return desc % {'type': self_cls_name, - 'audit_id': self.audit_id, - 'audit_chain_id': self.audit_chain_id, - 'loc': hex(id(self))} - - @property - def expires(self): - return _parse_and_normalize_time(self['expires_at']) - - @property - def issued(self): - return _parse_and_normalize_time(self['issued_at']) - - @property - def audit_id(self): - return self.get('audit_ids', [None])[0] - - @property - def audit_chain_id(self): - return self.get('audit_ids', [None])[-1] - - @property - def auth_token(self): - return self.token_id - - @property - def user_id(self): - return self['user']['id'] - - @property - def user_name(self): - return self['user']['name'] - - @property - def user_domain_name(self): - try: - return self['user']['domain']['name'] - except KeyError: # nosec - # Do not raise KeyError, raise UnexpectedError - pass - raise exception.UnexpectedError() - - @property - def user_password_expires_at(self): - try: - return self['user']['password_expires_at'] - except KeyError: - # Do not raise KeyError, raise UnexpectedError - pass - raise exception.UnexpectedError() - - @property - def user_domain_id(self): - try: - return self['user']['domain']['id'] - except KeyError: # nosec - # Do not raise KeyError, raise UnexpectedError - pass - raise exception.UnexpectedError() - - @property - def domain_id(self): - try: - return self['domain']['id'] - except KeyError: - # Do not raise KeyError, raise UnexpectedError - raise exception.UnexpectedError() - - @property - def domain_name(self): - try: - return self['domain']['name'] - except KeyError: - # Do not raise KeyError, raise UnexpectedError - raise exception.UnexpectedError() - - @property - def project_id(self): - try: - return self['project']['id'] - except KeyError: - # Do not raise KeyError, raise UnexpectedError - raise exception.UnexpectedError() - - @property - def project_name(self): - try: - return self['project']['name'] - except KeyError: - # Do not raise KeyError, raise UnexpectedError - raise exception.UnexpectedError() - - @property - def project_domain_id(self): - try: - return self['project']['domain']['id'] - except KeyError: # nosec - # Do not raise KeyError, raise UnexpectedError - pass - - raise exception.UnexpectedError() - - @property - def project_domain_name(self): - try: - return self['project']['domain']['name'] - except KeyError: # nosec - # Do not raise KeyError, raise UnexpectedError - pass - - raise exception.UnexpectedError() - - @property - def is_domain(self): - if 'is_domain' in self: - return self['is_domain'] - return False - - @property - def system_scoped(self): - return 'system' in self - - @property - def project_scoped(self): - return 'project' in self - - @property - def domain_scoped(self): - return 'domain' in self - - @property - def scoped(self): - return self.project_scoped or self.domain_scoped or self.system_scoped - - @property - def is_admin_project(self): - # Prevent domain scoped tokens from acting as is_admin_project - if self.domain_scoped: - return False - # TODO(ayoung/edmondsw): Having is_admin_project default to True is - # essential for fixing bug #968696. If an admin project is not - # configured, we can add checks for is_admin_project:True and not - # block anyone that hasn't configured an admin_project. Do not change - # this until we can assume admin_project is actually set - return self.get('is_admin_project', True) - - @property - def trust_id(self): - return self.get('OS-TRUST:trust', {}).get('id') - - @property - def trust_scoped(self): - return 'OS-TRUST:trust' in self - - @property - def trustee_user_id(self): - return self.get('OS-TRUST:trust', {}).get('trustee_user_id') - - @property - def trustor_user_id(self): - return self.get('OS-TRUST:trust', {}).get('trustor_user_id') - - @property - def trust_impersonation(self): - return self.get('OS-TRUST:trust', {}).get('impersonation') - - @property - def oauth_scoped(self): - return 'OS-OAUTH1' in self - - @property - def oauth_access_token_id(self): - if self.oauth_scoped: - return self['OS-OAUTH1']['access_token_id'] - return None - - @property - def oauth_consumer_id(self): - if self.oauth_scoped: - return self['OS-OAUTH1']['consumer_id'] - return None - - @property - def role_ids(self): - return [r['id'] for r in self.get('roles', [])] - - @property - def role_names(self): - return [r['name'] for r in self.get('roles', [])] - - @property - def is_federated_user(self): - try: - return constants.FEDERATION in self['user'] - except KeyError: - raise exception.UnexpectedError() - - @property - def federation_group_ids(self): - if self.is_federated_user: - try: - groups = self['user'][constants.FEDERATION].get('groups', []) - return [g['id'] for g in groups] - except KeyError: - raise exception.UnexpectedError() - return [] - - @property - def federation_idp_id(self): - if self.is_federated_user: - return ( - self['user'][constants.FEDERATION]['identity_provider']['id'] - ) - - @property - def federation_protocol_id(self): - if self.is_federated_user: - return self['user'][constants.FEDERATION]['protocol']['id'] - return None - - @property - def metadata(self): - return self.get('metadata', {}) - - @property - def methods(self): - return self.get('methods', []) - - class TokenModel(object): """An object that represents a token emitted by keystone. diff --git a/keystone/tests/unit/token/test_token_model.py b/keystone/tests/unit/token/test_token_model.py deleted file mode 100644 index 10f457d7e4..0000000000 --- a/keystone/tests/unit/token/test_token_model.py +++ /dev/null @@ -1,587 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import datetime -import uuid - -from oslo_utils import timeutils -from six.moves import range - -from keystone.common import provider_api -from keystone.common import utils as ks_utils -import keystone.conf -from keystone import exception -from keystone.federation import constants as federation_constants -from keystone.models import token_model -from keystone.tests.unit import base_classes -from keystone.tests.unit import core -from keystone.tests.unit import test_token_provider -from keystone.token import provider - -CONF = keystone.conf.CONF -PROVIDERS = provider_api.ProviderAPIs - - -class TestKeystoneTokenModel(core.TestCase): - def setUp(self): - super(TestKeystoneTokenModel, self).setUp() - self.v3_sample_token = copy.deepcopy( - test_token_provider.SAMPLE_V3_TOKEN) - - def test_token_model_v3(self): - token_data = token_model.KeystoneToken(uuid.uuid4().hex, - self.v3_sample_token) - expires = timeutils.normalize_time(timeutils.parse_isotime( - self.v3_sample_token['token']['expires_at'])) - issued = timeutils.normalize_time(timeutils.parse_isotime( - self.v3_sample_token['token']['issued_at'])) - self.assertEqual(expires, token_data.expires) - self.assertEqual(issued, token_data.issued) - self.assertEqual(self.v3_sample_token['token']['user']['id'], - token_data.user_id) - self.assertEqual(self.v3_sample_token['token']['user']['name'], - token_data.user_name) - self.assertEqual( - self.v3_sample_token['token']['user']['password_expires_at'], - token_data.user_password_expires_at) - self.assertEqual(self.v3_sample_token['token']['user']['domain']['id'], - token_data.user_domain_id) - self.assertEqual( - self.v3_sample_token['token']['user']['domain']['name'], - token_data.user_domain_name) - self.assertEqual( - self.v3_sample_token['token']['project']['domain']['id'], - token_data.project_domain_id) - self.assertEqual( - self.v3_sample_token['token']['project']['domain']['name'], - token_data.project_domain_name) - self.assertEqual( - self.v3_sample_token['token']['is_domain'], token_data.is_domain) - self.assertEqual(self.v3_sample_token['token']['OS-TRUST:trust']['id'], - token_data.trust_id) - self.assertEqual( - self.v3_sample_token['token']['OS-TRUST:trust']['trustor_user_id'], - token_data.trustor_user_id) - self.assertEqual( - self.v3_sample_token['token']['OS-TRUST:trust']['trustee_user_id'], - token_data.trustee_user_id) - - # Project Scoped Token - self.assertRaises(exception.UnexpectedError, getattr, token_data, - 'domain_id') - self.assertRaises(exception.UnexpectedError, getattr, token_data, - 'domain_name') - self.assertFalse(token_data.domain_scoped) - self.assertEqual(self.v3_sample_token['token']['project']['id'], - token_data.project_id) - self.assertEqual(self.v3_sample_token['token']['project']['name'], - token_data.project_name) - self.assertTrue(token_data.project_scoped) - self.assertTrue(token_data.scoped) - self.assertTrue(token_data.trust_scoped) - - # by default admin project is True for project scoped tokens - self.assertTrue(token_data.is_admin_project) - - self.assertEqual( - [r['id'] for r in self.v3_sample_token['token']['roles']], - token_data.role_ids) - self.assertEqual( - [r['name'] for r in self.v3_sample_token['token']['roles']], - token_data.role_names) - - # Domain Scoped Token - token_data.pop('project') - self.assertFalse(token_data.project_scoped) - self.assertFalse(token_data.scoped) - self.assertRaises(exception.UnexpectedError, getattr, token_data, - 'project_id') - self.assertRaises(exception.UnexpectedError, getattr, token_data, - 'project_name') - self.assertFalse(token_data.project_scoped) - domain_id = uuid.uuid4().hex - domain_name = uuid.uuid4().hex - token_data['domain'] = {'id': domain_id, - 'name': domain_name} - self.assertEqual(domain_id, token_data.domain_id) - self.assertEqual(domain_name, token_data.domain_name) - self.assertTrue(token_data.domain_scoped) - - token_data['audit_ids'] = [uuid.uuid4().hex] - self.assertEqual(token_data.audit_id, - token_data['audit_ids'][0]) - self.assertEqual(token_data.audit_chain_id, - token_data['audit_ids'][0]) - token_data['audit_ids'].append(uuid.uuid4().hex) - self.assertEqual(token_data.audit_chain_id, - token_data['audit_ids'][1]) - del token_data['audit_ids'] - self.assertIsNone(token_data.audit_id) - self.assertIsNone(token_data.audit_chain_id) - - # by default admin project is False for domain scoped tokens - self.assertFalse(token_data.is_admin_project) - - def test_token_model_v3_federated_user(self): - token_data = token_model.KeystoneToken(token_id=uuid.uuid4().hex, - token_data=self.v3_sample_token) - federation_data = {'identity_provider': {'id': uuid.uuid4().hex}, - 'protocol': {'id': 'saml2'}, - 'groups': [{'id': uuid.uuid4().hex} - for x in range(1, 5)]} - - self.assertFalse(token_data.is_federated_user) - self.assertEqual([], token_data.federation_group_ids) - self.assertIsNone(token_data.federation_protocol_id) - self.assertIsNone(token_data.federation_idp_id) - - token_data['user'][federation_constants.FEDERATION] = federation_data - - self.assertTrue(token_data.is_federated_user) - self.assertEqual([x['id'] for x in federation_data['groups']], - token_data.federation_group_ids) - self.assertEqual(federation_data['protocol']['id'], - token_data.federation_protocol_id) - self.assertEqual(federation_data['identity_provider']['id'], - token_data.federation_idp_id) - - def test_token_model_unknown(self): - self.assertRaises(exception.UnsupportedTokenVersionException, - token_model.KeystoneToken, - token_id=uuid.uuid4().hex, - token_data={'bogus_data': uuid.uuid4().hex}) - - def test_token_model_dual_scoped_token(self): - domain = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} - self.v3_sample_token['token']['domain'] = domain - - self.assertRaises(exception.UnexpectedError, - token_model.KeystoneToken, - token_id=uuid.uuid4().hex, - token_data=self.v3_sample_token) - - def test_token_model_is_admin_project(self): - token_data = token_model.KeystoneToken(token_id=uuid.uuid4().hex, - token_data=self.v3_sample_token) - - token_data['is_admin_project'] = False - self.assertFalse(token_data.is_admin_project) - - -class TokenModelTests(base_classes.TestCaseWithBootstrap): - - def setUp(self): - super(TokenModelTests, self).setUp() - self.admin_user_id = self.bootstrapper.admin_user_id - self.admin_username = self.bootstrapper.admin_username - self.admin_password = self.bootstrapper.admin_password - self.project_id = self.bootstrapper.project_id - self.project_name = self.bootstrapper.project_name - self.admin_role_id = self.bootstrapper.admin_role_id - self.member_role_id = self.bootstrapper.member_role_id - self.reader_role_id = self.bootstrapper.reader_role_id - - self.token_id = uuid.uuid4().hex - issued_at = datetime.datetime.utcnow() - self.issued_at = ks_utils.isotime(at=issued_at, subsecond=True) - - def assertTokenContainsRole(self, token, role): - """Ensure a role reference exists in a token's roles. - - :param token: instance of ``keystone.models.token_model.TokenModel`` - :param role: a dictionary reference of the expected role - """ - self.assertIn(role, token.roles) - - def test_audit_id_attributes(self): - token = token_model.TokenModel() - audit_id = provider.random_urlsafe_str() - token.audit_id = audit_id - - self.assertTrue(len(token.audit_ids) == 1) - - parent_audit_id = provider.random_urlsafe_str() - token.parent_audit_id = parent_audit_id - - self.assertTrue(len(token.audit_ids) == 2) - - self.assertEqual(audit_id, token.audit_ids[0]) - self.assertEqual(parent_audit_id, token.audit_ids[-1]) - - def test_token_model_user_attributes(self): - token = token_model.TokenModel() - token.user_id = self.admin_user_id - token.user_domain_id = token.user['domain_id'] - - self.assertEqual(self.admin_user_id, token.user_id) - self.assertIsNotNone(token.user) - self.assertIsNotNone(token.user_domain) - self.assertEqual(self.admin_username, token.user['name']) - self.assertEqual(CONF.identity.default_domain_id, token.user_domain_id) - self.assertEqual( - CONF.identity.default_domain_id, token.user_domain['id'] - ) - - def test_mint_unscoped_token(self): - token = token_model.TokenModel() - token.user_id = self.admin_user_id - - token.mint(self.token_id, self.issued_at) - - self.assertTrue(token.unscoped) - self.assertTrue(len(token.roles) == 0) - - def test_mint_system_scoped_token(self): - token = token_model.TokenModel() - token.user_id = self.admin_user_id - token.system = {'all': True} - - token.mint(self.token_id, self.issued_at) - - self.assertTrue(token.system_scoped) - self.assertFalse(token.domain_scoped) - self.assertFalse(token.project_scoped) - self.assertFalse(token.trust_scoped) - self.assertFalse(token.unscoped) - - self.assertIsNotNone(token.system) - self.assertTrue(len(token.roles) == 1) - admin_role = {'id': self.admin_role_id, 'name': 'admin'} - self.assertTokenContainsRole(token, admin_role) - - def test_mint_system_scoped_token_with_multiple_roles(self): - token = token_model.TokenModel() - token.user_id = self.admin_user_id - token.system = {'all': True} - - self.assertTrue(token.system_scoped) - self.assertFalse(token.domain_scoped) - self.assertFalse(token.project_scoped) - self.assertFalse(token.trust_scoped) - self.assertFalse(token.unscoped) - - role = core.new_role_ref() - PROVIDERS.role_api.create_role(role['id'], role) - role.pop('domain_id') - PROVIDERS.assignment_api.create_system_grant_for_user( - self.admin_user_id, role['id'] - ) - - self.assertIsNotNone(token.system) - self.assertTrue(len(token.roles) == 2) - admin_role = {'id': self.admin_role_id, 'name': 'admin'} - self.assertTokenContainsRole(token, admin_role) - self.assertTokenContainsRole(token, role) - - def test_mint_system_scoped_token_without_roles_fails(self): - user = core.new_user_ref(CONF.identity.default_domain_id) - user = PROVIDERS.identity_api.create_user(user) - - token = token_model.TokenModel() - token.user_id = user['id'] - token.system = 'all' - token.audit_id = provider.random_urlsafe_str() - - self.assertRaises( - exception.Unauthorized, token.mint, self.token_id, self.issued_at - ) - - def test_mint_system_token_with_effective_role_assignment(self): - user = core.new_user_ref(CONF.identity.default_domain_id) - user = PROVIDERS.identity_api.create_user(user) - - group = core.new_group_ref(CONF.identity.default_domain_id) - group = PROVIDERS.identity_api.create_group(group) - - PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) - - PROVIDERS.assignment_api.create_system_grant_for_group( - group['id'], self.admin_role_id - ) - - token = token_model.TokenModel() - token.user_id = user['id'] - token.system = 'all' - - token.mint(self.token_id, self.issued_at) - - exp_role = {'id': self.admin_role_id, 'name': 'admin'} - self.assertTokenContainsRole(token, exp_role) - - def test_mint_domain_scoped_token(self): - PROVIDERS.assignment_api.create_grant( - self.admin_role_id, user_id=self.admin_user_id, - domain_id=CONF.identity.default_domain_id - ) - - token = token_model.TokenModel() - token.user_id = self.admin_user_id - token.domain_id = CONF.identity.default_domain_id - - token.mint(self.token_id, self.issued_at) - - self.assertTrue(token.domain_scoped) - self.assertFalse(token.system_scoped) - self.assertFalse(token.project_scoped) - self.assertFalse(token.trust_scoped) - self.assertFalse(token.unscoped) - - self.assertIsNotNone(token.domain) - exp_domain = PROVIDERS.resource_api.get_domain( - CONF.identity.default_domain_id - ) - self.assertEqual(exp_domain['id'], token.domain_id) - self.assertEqual(exp_domain['name'], token.domain['name']) - - self.assertTrue(len(token.roles) == 3) - exp_roles = [ - {'id': self.admin_role_id, 'name': 'admin'}, - {'id': self.member_role_id, 'name': 'member'}, - {'id': self.reader_role_id, 'name': 'reader'} - ] - for role in exp_roles: - self.assertTokenContainsRole(token, role) - - def test_mint_domain_scoped_token_fails_without_roles(self): - user = core.new_user_ref(CONF.identity.default_domain_id) - user = PROVIDERS.identity_api.create_user(user) - - token = token_model.TokenModel() - token.user_id = user['id'] - token.domain_id = CONF.identity.default_domain_id - token.audit_id = provider.random_urlsafe_str() - - self.assertRaises( - exception.Unauthorized, token.mint, self.token_id, self.issued_at - ) - - def test_mint_project_scoped_token(self): - token = token_model.TokenModel() - token.user_id = self.admin_user_id - token.project_id = self.project_id - - token.mint(self.token_id, self.issued_at) - - self.assertTrue(token.project_scoped) - self.assertFalse(token.system_scoped) - self.assertFalse(token.domain_scoped) - self.assertFalse(token.trust_scoped) - self.assertFalse(token.unscoped) - - self.assertIsNotNone(token.project) - self.assertEqual(self.project_name, token.project['name']) - - self.assertTrue(len(token.roles) == 3) - exp_roles = [ - {'id': self.admin_role_id, 'name': 'admin'}, - {'id': self.member_role_id, 'name': 'member'}, - {'id': self.reader_role_id, 'name': 'reader'} - ] - for role in exp_roles: - self.assertTokenContainsRole(token, role) - - def test_mint_project_scoped_token_fails_without_roles(self): - user = core.new_user_ref(CONF.identity.default_domain_id) - user = PROVIDERS.identity_api.create_user(user) - - token = token_model.TokenModel() - token.user_id = user['id'] - token.project_id = self.project_id - token.audit_id = provider.random_urlsafe_str() - - self.assertRaises( - exception.Unauthorized, token.mint, self.token_id, self.issued_at - ) - - def test_mint_project_scoped_token_fails_when_project_is_disabled(self): - PROVIDERS.resource_api.update_project( - self.project_id, {'enabled': False} - ) - - token = token_model.TokenModel() - token.user_id = self.admin_user_id - token.project_id = self.project_id - token.audit_id = provider.random_urlsafe_str() - - self.assertRaises( - exception.ProjectNotFound, token.mint, self.token_id, - self.issued_at - ) - - def test_mint_project_scoped_token_fails_when_domain_is_disabled(self): - project = PROVIDERS.resource_api.get_project(self.project_id) - PROVIDERS.resource_api.update_domain( - project['domain_id'], {'enabled': False} - ) - - token = token_model.TokenModel() - token.user_id = self.admin_user_id - token.project_id = self.project_id - token.audit_id = provider.random_urlsafe_str() - - self.assertRaises( - exception.DomainNotFound, token.mint, self.token_id, self.issued_at - ) - - def test_mint_application_credential_token(self): - app_cred = { - 'id': uuid.uuid4().hex, - 'name': 'monitoring-application', - 'user_id': self.admin_user_id, - 'roles': [{'id': self.admin_role_id}], - 'project_id': self.project_id, - 'secret': uuid.uuid4().hex - } - - PROVIDERS.application_credential_api.create_application_credential( - app_cred - ) - - token = token_model.TokenModel() - token.user_id = self.admin_user_id - token.application_credential_id = app_cred['id'] - token.project_id = self.project_id - - token.mint(self.token_id, self.issued_at) - self.assertIsNotNone(token.application_credential_id) - self.assertIsNotNone(token.application_credential) - exp_role = {'id': self.admin_role_id, 'name': 'admin'} - self.assertTokenContainsRole(token, exp_role) - - -class TrustScopedTokenModelTests(TokenModelTests): - - def setUp(self): - super(TrustScopedTokenModelTests, self).setUp() - - trustor_domain = PROVIDERS.resource_api.create_domain( - uuid.uuid4().hex, core.new_domain_ref() - ) - trustee_domain = PROVIDERS.resource_api.create_domain( - uuid.uuid4().hex, core.new_domain_ref() - ) - - self.trustor = PROVIDERS.identity_api.create_user( - core.new_user_ref(trustor_domain['id']) - ) - self.trustee = PROVIDERS.identity_api.create_user( - core.new_user_ref(trustee_domain['id']) - ) - - PROVIDERS.assignment_api.create_grant( - self.admin_role_id, user_id=self.trustor['id'], - project_id=self.project_id - ) - - def test_mint_trust_scoped_token(self): - roles = [{'id': self.admin_role_id}] - trust = core.new_trust_ref( - self.trustor['id'], self.trustee['id'], project_id=self.project_id - ) - trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles) - - token = token_model.TokenModel() - token.trust_id = trust['id'] - token.user_id = self.trustee['id'] - - token.mint(self.token_id, self.issued_at) - - self.assertEqual(self.trustee['id'], token.user_id) - self.assertEqual(self.trustee['id'], token.trustee['id']) - self.assertEqual(self.trustor['id'], token.trustor['id']) - self.assertEqual(self.project_id, token.trust_project['id']) - self.assertEqual( - CONF.identity.default_domain_id, token.trust_project_domain['id'] - ) - # NOTE(lbragstad): The domain key here should be removed once - # https://bugs.launchpad.net/keystone/+bug/1763510 is fixed. - exp_role = { - 'id': self.admin_role_id, 'name': 'admin', 'domain_id': None - } - self.assertTokenContainsRole(token, exp_role) - - def test_mint_trust_scoped_token_fails_when_trustee_domain_disabled(self): - roles = [{'id': self.admin_role_id}] - trust = core.new_trust_ref( - self.trustor['id'], self.trustee['id'], project_id=self.project_id - ) - trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles) - - PROVIDERS.resource_api.update_domain( - self.trustee['domain_id'], {'enabled': False} - ) - - token = token_model.TokenModel() - token.trust_id = trust['id'] - token.user_id = self.trustee['id'] - - self.assertRaises( - exception.TokenNotFound, token.mint, self.token_id, self.issued_at - ) - - def test_mint_trust_scoped_token_fails_when_trustor_domain_disabled(self): - roles = [{'id': self.admin_role_id}] - trust = core.new_trust_ref( - self.trustor['id'], self.trustee['id'], project_id=self.project_id - ) - trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles) - - PROVIDERS.resource_api.update_domain( - self.trustor['domain_id'], {'enabled': False} - ) - - token = token_model.TokenModel() - token.trust_id = trust['id'] - token.user_id = self.trustee['id'] - - self.assertRaises( - exception.TokenNotFound, token.mint, self.token_id, self.issued_at - ) - - def test_mint_trust_scoped_token_fails_when_trustor_is_disabled(self): - roles = [{'id': self.admin_role_id}] - trust = core.new_trust_ref( - self.trustor['id'], self.trustee['id'], project_id=self.project_id - ) - trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles) - - PROVIDERS.identity_api.update_user( - self.trustor['id'], {'enabled': False} - ) - - token = token_model.TokenModel() - token.trust_id = trust['id'] - token.user_id = self.trustee['id'] - - self.assertRaises( - exception.Forbidden, token.mint, self.token_id, self.issued_at - ) - - def test_mint_trust_scoped_token_with_mismatching_users_fails(self): - user = core.new_user_ref(CONF.identity.default_domain_id) - user = PROVIDERS.identity_api.create_user(user) - - roles = [{'id': self.admin_role_id}] - trust = core.new_trust_ref( - self.trustor['id'], self.trustee['id'], project_id=self.project_id - ) - trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles) - - token = token_model.TokenModel() - token.trust_id = trust['id'] - token.user_id = user['id'] - - self.assertRaises( - exception.Forbidden, token.mint, self.token_id, self.issued_at - )