Merge "Introduce new TokenModel object"

This commit is contained in:
Zuul 2018-07-25 20:31:36 +00:00 committed by Gerrit Code Review
commit dd41096323
3 changed files with 1006 additions and 0 deletions

View File

@ -12,14 +12,23 @@
"""Unified in-memory token model."""
import itertools
from oslo_log import log
from oslo_utils import reflection
from oslo_utils import timeutils
import six
from keystone.common import provider_api
import keystone.conf
from keystone import exception
from keystone.federation import constants
from keystone.i18n import _
CONF = keystone.conf.CONF
LOG = log.getLogger(__name__)
PROVIDERS = provider_api.ProviderAPIs
# supported token versions
V3 = 'v3.0'
VERSIONS = frozenset([V3])
@ -289,3 +298,518 @@ class KeystoneToken(dict):
@property
def methods(self):
return self.get('methods', [])
class TokenModel(object):
"""An object that represents a token emitted by keystone.
This is a queryable object that other parts of keystone can use to reason
about a user's authentication or authorization.
"""
def __init__(self):
self.user_id = None
self.__user = None
self.__user_domain = None
self.methods = None
self.bind = None
self.audit_id = None
self.parent_audit_id = None
self.__expires_at = None
self.__issued_at = None
self.system = None
self.domain_id = None
self.__domain = None
self.project_id = None
self.__project = None
self.__project_domain = None
self.trust_id = None
self.__trust = None
self.__trustor = None
self.__trustee = None
self.__trust_project = None
self.__trust_project_domain = None
self.is_federated = False
self.identity_provider_id = None
self.protocol_id = None
self.federated_groups = None
self.access_token_id = None
self.__access_token = None
self.application_credential_id = None
self.__application_credential = None
def __repr__(self):
"""Return string representation of TokenModel."""
desc = ('<%(type)s (audit_id=%(audit_id)s, '
'audit_chain_id=%(audit_ids)s) at %(loc)s>')
self_cls_name = reflection.get_class_name(self, fully_qualified=False)
return desc % {'type': self_cls_name,
'audit_id': self.audit_id,
'audit_ids': self.audit_ids,
'loc': hex(id(self))}
@property
def audit_ids(self):
if self.parent_audit_id:
return [self.audit_id, self.parent_audit_id]
return [self.audit_id]
@property
def expires_at(self):
return self.__expires_at
@expires_at.setter
def expires_at(self, value):
if not isinstance(value, six.string_types):
raise ValueError('expires_at must be a string.')
self.__expires_at = value
@property
def issued_at(self):
return self.__issued_at
@issued_at.setter
def issued_at(self, value):
if not isinstance(value, six.string_types):
raise ValueError('issued_at must be a string.')
self.__issued_at = value
@property
def unscoped(self):
return not any(
[self.system_scoped, self.domain_scoped, self.project_scoped,
self.trust_scoped]
)
@property
def system_scoped(self):
return self.system is not None
@property
def user(self):
if not self.__user:
if self.user_id:
self.__user = PROVIDERS.identity_api.get_user(self.user_id)
return self.__user
@property
def user_domain(self):
if not self.__user_domain:
if self.user:
self.__user_domain = PROVIDERS.resource_api.get_domain(
self.user['domain_id']
)
return self.__user_domain
@property
def domain(self):
if not self.__domain:
if self.domain_id:
self.__domain = PROVIDERS.resource_api.get_domain(
self.domain_id
)
return self.__domain
@property
def domain_scoped(self):
return self.domain_id is not None
@property
def project(self):
if not self.__project:
if self.project_id:
self.__project = PROVIDERS.resource_api.get_project(
self.project_id
)
return self.__project
@property
def project_scoped(self):
return self.project_id is not None
@property
def project_domain(self):
if not self.__project_domain:
if self.project and self.project.get('domain_id'):
self.__project_domain = PROVIDERS.resource_api.get_domain(
self.project['domain_id']
)
return self.__project_domain
@property
def application_credential(self):
if not self.__application_credential:
if self.application_credential_id:
app_cred_api = PROVIDERS.application_credential_api
self.__application_credential = (
app_cred_api.get_application_credential(
self.application_credential_id
)
)
return self.__application_credential
@property
def oauth_scoped(self):
return self.access_token_id is not None
@property
def access_token(self):
if not self.__access_token:
if self.access_token_id:
self.__access_token = PROVIDERS.oauth_api.get_access_token(
self.access_token_id
)
return self.__access_token
@property
def trust_scoped(self):
return self.trust_id is not None
@property
def trust(self):
if not self.__trust:
if self.trust_id:
self.__trust = PROVIDERS.trust_api.get_trust(self.trust_id)
return self.__trust
@property
def trustor(self):
if not self.__trustor:
if self.trust:
self.__trustor = PROVIDERS.identity_api.get_user(
self.trust['trustor_user_id']
)
return self.__trustor
@property
def trustee(self):
if not self.__trustee:
if self.trust:
self.__trustee = PROVIDERS.identity_api.get_user(
self.trust['trustee_user_id']
)
return self.__trustee
@property
def trust_project(self):
if not self.__trust_project:
if self.trust:
self.__trust_project = PROVIDERS.resource_api.get_project(
self.trust['project_id']
)
return self.__trust_project
@property
def trust_project_domain(self):
if not self.__trust_project_domain:
if self.trust:
self.__trust_project_domain = (
PROVIDERS.resource_api.get_domain(
self.trust_project['domain_id']
)
)
return self.__trust_project_domain
def _get_system_roles(self):
roles = []
groups = PROVIDERS.identity_api.list_groups_for_user(self.user_id)
all_group_roles = []
for group in groups:
group_roles = (
PROVIDERS.assignment_api.list_system_grants_for_group(
group['id']
)
)
for role in group_roles:
all_group_roles.append(role)
user_roles = PROVIDERS.assignment_api.list_system_grants_for_user(
self.user_id
)
for role in itertools.chain(all_group_roles, user_roles):
roles.append({'id': role['id'], 'name': role['name']})
return roles
def _get_trust_roles(self):
roles = []
# If redelegated_trust_id is set, then we must traverse the trust_chain
# in order to determine who the original trustor is. We need to do this
# because the user ID of the original trustor helps us determine scope
# in the redelegated context.
if self.trust.get('redelegated_trust_id'):
trust_chain = PROVIDERS.trust_api.get_trust_pedigree(
self.trust_id
)
original_trustor_id = trust_chain[-1]['trustor_user_id']
else:
original_trustor_id = self.trustor['id']
trust_roles = [
{'role_id': role['id']} for role in self.trust['roles']
]
effective_trust_roles = (
PROVIDERS.assignment_api.add_implied_roles(trust_roles)
)
effective_trust_role_ids = (
set([r['role_id'] for r in effective_trust_roles])
)
trustor_assignments = (
PROVIDERS.assignment_api.list_role_assignments(
user_id=original_trustor_id,
project_id=self.trust.get('project_id'),
effective=True, strip_domain_roles=False
)
)
current_effective_trustor_roles = (
set([x['role_id'] for x in trustor_assignments])
)
for trust_role_id in effective_trust_role_ids:
if trust_role_id in current_effective_trustor_roles:
role = PROVIDERS.role_api.get_role(trust_role_id)
if role['domain_id'] is None:
roles.append(role)
else:
raise exception.Forbidden(
_('Trustee has no delegated roles.'))
return roles
def _get_federated_roles(self):
roles = []
group_ids = [group['id'] for group in self.federated_groups]
federated_roles = PROVIDERS.assignment_api.get_roles_for_groups(
group_ids, self.project_id, self.domain_id
)
for group_id in group_ids:
group_roles = (
PROVIDERS.assignment_api.list_system_grants_for_group(
group_id
)
)
for role in group_roles:
federated_roles.append(role)
user_roles = PROVIDERS.assignment_api.list_system_grants_for_user(
self.user_id
)
for role in user_roles:
federated_roles.append(role)
if self.domain_id:
domain_roles = (
PROVIDERS.assignment_api.get_roles_for_user_and_domain(
self.user_id, self.domain_id
)
)
for role in domain_roles:
federated_roles.append(role)
if self.project_id:
project_roles = (
PROVIDERS.assignment_api.get_roles_for_user_and_project(
self.user_id, self.project_id
)
)
for role in project_roles:
federated_roles.append(role)
# NOTE(lbragstad): Remove duplicate role references from a list of
# roles. It is often suggested that this be done with:
#
# roles = [dict(t) for t in set([tuple(d.items()) for d in roles])]
#
# But that doesn't actually remove duplicates in all cases and
# causes transient failures because dictionaries are unordered
# objects. This means {'id': 1, 'foo': 'bar'} and {'foo': 'bar',
# 'id': 1} won't actually resolve to a single entity in the above
# logic since they are both considered unique. By using `in` we're
# performing a containment check, which also does a deep comparison
# of the objects, which is what we want.
for role in federated_roles:
if not isinstance(role, dict):
role = PROVIDERS.role_api.get_role(role)
if role not in roles:
roles.append(role)
return roles
def _get_domain_roles(self):
roles = []
domain_roles = (
PROVIDERS.assignment_api.get_roles_for_user_and_domain(
self.user_id, self.domain_id
)
)
for role_id in domain_roles:
role = PROVIDERS.role_api.get_role(role_id)
roles.append({'id': role['id'], 'name': role['name']})
return roles
def _get_project_roles(self):
roles = []
project_roles = (
PROVIDERS.assignment_api.get_roles_for_user_and_project(
self.user_id, self.project_id
)
)
for role_id in project_roles:
r = PROVIDERS.role_api.get_role(role_id)
roles.append({'id': r['id'], 'name': r['name']})
return roles
def _get_application_credential_roles(self):
roles = []
app_cred_roles = self.application_credential['roles']
for role in app_cred_roles:
try:
r = PROVIDERS.assignment_api.get_grant(
role['id'], user_id=self.user_id,
domain_id=self.domain_id, project_id=self.project_id)
roles.append({'id': r['id'], 'name': r['name']})
except exception.RoleAssignmentNotFound:
pass
return roles
@property
def roles(self):
if self.system_scoped:
roles = self._get_system_roles()
elif self.trust_scoped:
roles = self._get_trust_roles()
elif self.is_federated and not self.unscoped:
roles = self._get_federated_roles()
elif self.domain_scoped:
roles = self._get_domain_roles()
elif self.application_credential_id and self.project_id:
roles = self._get_application_credential_roles()
elif self.project_scoped:
roles = self._get_project_roles()
else:
roles = []
return roles
def _validate_token_resources(self):
if self.project and not self.project.get('enabled'):
msg = _('Unable to validate token because project %(id)s is '
'disabled') % {'id': self.project_id}
LOG.warning(msg)
raise exception.ProjectNotFound(msg)
if self.project and not self.project_domain.get('enabled'):
msg = _('Unable to validate token because domain %(id)s is '
'disabled') % {'id': self.project_domain['id']}
LOG.warning(msg)
raise exception.DomainNotFound(msg)
def _validate_token_user(self):
if self.trust_scoped:
if self.user_id != self.trustee['id']:
raise exception.Forbidden(_('User is not a trustee.'))
try:
PROVIDERS.resource_api.assert_domain_enabled(
self.trustor['domain_id']
)
except AssertionError:
raise exception.TokenNotFound(_('Trustor domain is disabled.'))
try:
PROVIDERS.resource_api.assert_domain_enabled(
self.trustee['domain_id']
)
except AssertionError:
raise exception.TokenNotFound(_('Trustee domain is disabled.'))
try:
PROVIDERS.identity_api.assert_user_enabled(
self.trustor['id']
)
except AssertionError:
raise exception.Forbidden(_('Trustor is disabled.'))
if not self.user_domain.get('enabled'):
msg = _('Unable to validate token because domain %(id)s is '
'disabled') % {'id': self.user_domain['id']}
LOG.warning(msg)
raise exception.DomainNotFound(msg)
def _validate_system_scope(self):
if self.system_scoped and not self.roles:
msg = _(
'User %(user_id)s has no access to the system'
) % {'user_id': self.user_id}
LOG.debug(msg)
raise exception.Unauthorized(msg)
def _validate_domain_scope(self):
if self.domain_scoped and not self.roles:
msg = _(
'User %(user_id)s has no access to domain %(domain_id)s'
) % {'user_id': self.user_id, 'domain_id': self.domain_id}
LOG.debug(msg)
raise exception.Unauthorized(msg)
def _validate_project_scope(self):
if self.project_scoped and not self.roles:
msg = _(
'User %(user_id)s has no access to project %(project_id)s'
) % {'user_id': self.user_id, 'project_id': self.project_id}
LOG.debug(msg)
raise exception.Unauthorized(msg)
def _validate_trust_scope(self):
trust_roles = []
if self.trust_id:
refs = [{'role_id': role['id']} for role in self.trust['roles']]
effective_trust_roles = PROVIDERS.assignment_api.add_implied_roles(
refs
)
effective_trust_role_ids = (
set([r['role_id'] for r in effective_trust_roles])
)
assignments = PROVIDERS.assignment_api.list_role_assignments(
user_id=self.trustor['id'], system=self.system,
project_id=self.project_id, effective=True,
strip_domain_roles=False
)
current_effective_trustor_roles = (
set([x['role_id'] for x in assignments])
)
# Go through each of the effective trust roles, making sure the
# trustor still has them, if any have been removed, then we
# will treat the trust as invalid
for trust_role_id in effective_trust_role_ids:
if trust_role_id in current_effective_trustor_roles:
role = PROVIDERS.role_api.get_role(trust_role_id)
if role['domain_id'] is None:
trust_roles.append(role)
else:
raise exception.Forbidden(
_('Trustee has no delegated roles.'))
def mint(self, token_id, issued_at):
"""Set the ``id`` and ``issued_at`` attributes of a token.
The process of building a token requires setting attributes about the
authentication and authorization context, like ``user_id`` and
``project_id`` for example. Once a Token object accurately represents
this information it should be "minted". Tokens are minted when they get
an ``id`` attribute and their creation time is recorded.
"""
self._validate_token_resources()
self._validate_token_user()
self._validate_system_scope()
self._validate_domain_scope()
self._validate_project_scope()
self._validate_trust_scope()
self.id = token_id
self.issued_at = issued_at

View File

@ -0,0 +1,65 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import fixture as config_fixture
from keystone.cmd import bootstrap
from keystone.common import provider_api
import keystone.conf
from keystone.tests.unit import core
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import database
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class TestCaseWithBootstrap(core.BaseTestCase):
"""A simpler version of TestCase that uses bootstrap.
Re-implementation of TestCase that doesn't load a bunch of fixtures by
hand and instead uses the bootstrap process. This makes it so that our base
tests have the same things available to us as operators after they run
boostrap. It also makes our tests DRY and pushes setup required for
specific tests into the actual test class, instead of pushing it into a
generic structure that gets loaded for every test.
"""
def setUp(self):
self.useFixture(database.Database())
super(TestCaseWithBootstrap, self).setUp()
self.config_fixture = self.useFixture(config_fixture.Config(CONF))
self.useFixture(
ksfixtures.KeyRepository(
self.config_fixture,
'fernet_tokens',
CONF.fernet_tokens.max_active_keys
)
)
self.bootstrapper = bootstrap.Bootstrapper()
self.addCleanup(provider_api.ProviderAPIs._clear_registry_instances)
self.addCleanup(self.clean_default_domain)
self.bootstrapper.admin_password = 'password'
self.bootstrapper.admin_username = 'admin'
self.bootstrapper.project_name = 'admin'
self.bootstrapper.admin_role_name = 'admin'
self.bootstrapper.service_name = 'keystone'
self.bootstrapper.public_url = 'http://localhost/identity/'
self.bootstrapper.bootstrap()
def clean_default_domain(self):
PROVIDERS.resource_api.update_domain(
CONF.identity.default_domain_id, {'enabled': False}
)
PROVIDERS.resource_api.delete_domain(CONF.identity.default_domain_id)

View File

@ -11,16 +11,25 @@
# under the License.
import copy
import datetime
import uuid
from oslo_utils import timeutils
from six.moves import range
from keystone.common import provider_api
from keystone.common import utils as ks_utils
import keystone.conf
from keystone import exception
from keystone.federation import constants as federation_constants
from keystone.models import token_model
from keystone.tests.unit import base_classes
from keystone.tests.unit import core
from keystone.tests.unit import test_token_provider
from keystone.token.providers import common as provider_common
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class TestKeystoneTokenModel(core.TestCase):
@ -168,3 +177,411 @@ class TestKeystoneTokenModel(core.TestCase):
token_data['is_admin_project'] = False
self.assertFalse(token_data.is_admin_project)
class TokenModelTests(base_classes.TestCaseWithBootstrap):
def setUp(self):
super(TokenModelTests, self).setUp()
self.admin_user_id = self.bootstrapper.admin_user_id
self.admin_username = self.bootstrapper.admin_username
self.admin_password = self.bootstrapper.admin_password
self.project_id = self.bootstrapper.project_id
self.project_name = self.bootstrapper.project_name
self.admin_role_id = self.bootstrapper.admin_role_id
self.member_role_id = self.bootstrapper.member_role_id
self.reader_role_id = self.bootstrapper.reader_role_id
self.token_id = uuid.uuid4().hex
issued_at = datetime.datetime.utcnow()
self.issued_at = ks_utils.isotime(at=issued_at, subsecond=True)
def assertTokenContainsRole(self, token, role):
"""Ensure a role reference exists in a token's roles.
:param token: instance of ``keystone.models.token_model.TokenModel``
:param role: a dictionary reference of the expected role
"""
self.assertIn(role, token.roles)
def test_audit_id_attributes(self):
token = token_model.TokenModel()
audit_id = provider_common.random_urlsafe_str()
token.audit_id = audit_id
self.assertTrue(len(token.audit_ids) == 1)
parent_audit_id = provider_common.random_urlsafe_str()
token.parent_audit_id = parent_audit_id
self.assertTrue(len(token.audit_ids) == 2)
self.assertEqual(audit_id, token.audit_ids[0])
self.assertEqual(parent_audit_id, token.audit_ids[-1])
def test_token_model_user_attributes(self):
token = token_model.TokenModel()
token.user_id = self.admin_user_id
token.user_domain_id = token.user['domain_id']
self.assertEqual(self.admin_user_id, token.user_id)
self.assertIsNotNone(token.user)
self.assertIsNotNone(token.user_domain)
self.assertEqual(self.admin_username, token.user['name'])
self.assertEqual(CONF.identity.default_domain_id, token.user_domain_id)
self.assertEqual(
CONF.identity.default_domain_id, token.user_domain['id']
)
def test_mint_unscoped_token(self):
token = token_model.TokenModel()
token.user_id = self.admin_user_id
token.mint(self.token_id, self.issued_at)
self.assertTrue(token.unscoped)
self.assertTrue(len(token.roles) == 0)
def test_mint_system_scoped_token(self):
token = token_model.TokenModel()
token.user_id = self.admin_user_id
token.system = {'all': True}
token.mint(self.token_id, self.issued_at)
self.assertTrue(token.system_scoped)
self.assertFalse(token.domain_scoped)
self.assertFalse(token.project_scoped)
self.assertFalse(token.trust_scoped)
self.assertFalse(token.unscoped)
self.assertIsNotNone(token.system)
self.assertTrue(len(token.roles) == 1)
admin_role = {'id': self.admin_role_id, 'name': 'admin'}
self.assertTokenContainsRole(token, admin_role)
def test_mint_system_scoped_token_with_multiple_roles(self):
token = token_model.TokenModel()
token.user_id = self.admin_user_id
token.system = {'all': True}
self.assertTrue(token.system_scoped)
self.assertFalse(token.domain_scoped)
self.assertFalse(token.project_scoped)
self.assertFalse(token.trust_scoped)
self.assertFalse(token.unscoped)
role = core.new_role_ref()
PROVIDERS.role_api.create_role(role['id'], role)
role.pop('domain_id')
PROVIDERS.assignment_api.create_system_grant_for_user(
self.admin_user_id, role['id']
)
self.assertIsNotNone(token.system)
self.assertTrue(len(token.roles) == 2)
admin_role = {'id': self.admin_role_id, 'name': 'admin'}
self.assertTokenContainsRole(token, admin_role)
self.assertTokenContainsRole(token, role)
def test_mint_system_scoped_token_without_roles_fails(self):
user = core.new_user_ref(CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
token = token_model.TokenModel()
token.user_id = user['id']
token.system = 'all'
token.audit_id = provider_common.random_urlsafe_str()
self.assertRaises(
exception.Unauthorized, token.mint, self.token_id, self.issued_at
)
def test_mint_system_token_with_effective_role_assignment(self):
user = core.new_user_ref(CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
group = core.new_group_ref(CONF.identity.default_domain_id)
group = PROVIDERS.identity_api.create_group(group)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
PROVIDERS.assignment_api.create_system_grant_for_group(
group['id'], self.admin_role_id
)
token = token_model.TokenModel()
token.user_id = user['id']
token.system = 'all'
token.mint(self.token_id, self.issued_at)
exp_role = {'id': self.admin_role_id, 'name': 'admin'}
self.assertTokenContainsRole(token, exp_role)
def test_mint_domain_scoped_token(self):
PROVIDERS.assignment_api.create_grant(
self.admin_role_id, user_id=self.admin_user_id,
domain_id=CONF.identity.default_domain_id
)
token = token_model.TokenModel()
token.user_id = self.admin_user_id
token.domain_id = CONF.identity.default_domain_id
token.mint(self.token_id, self.issued_at)
self.assertTrue(token.domain_scoped)
self.assertFalse(token.system_scoped)
self.assertFalse(token.project_scoped)
self.assertFalse(token.trust_scoped)
self.assertFalse(token.unscoped)
self.assertIsNotNone(token.domain)
exp_domain = PROVIDERS.resource_api.get_domain(
CONF.identity.default_domain_id
)
self.assertEqual(exp_domain['id'], token.domain_id)
self.assertEqual(exp_domain['name'], token.domain['name'])
self.assertTrue(len(token.roles) == 3)
exp_roles = [
{'id': self.admin_role_id, 'name': 'admin'},
{'id': self.member_role_id, 'name': 'member'},
{'id': self.reader_role_id, 'name': 'reader'}
]
for role in exp_roles:
self.assertTokenContainsRole(token, role)
def test_mint_domain_scoped_token_fails_without_roles(self):
user = core.new_user_ref(CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
token = token_model.TokenModel()
token.user_id = user['id']
token.domain_id = CONF.identity.default_domain_id
token.audit_id = provider_common.random_urlsafe_str()
self.assertRaises(
exception.Unauthorized, token.mint, self.token_id, self.issued_at
)
def test_mint_project_scoped_token(self):
token = token_model.TokenModel()
token.user_id = self.admin_user_id
token.project_id = self.project_id
token.mint(self.token_id, self.issued_at)
self.assertTrue(token.project_scoped)
self.assertFalse(token.system_scoped)
self.assertFalse(token.domain_scoped)
self.assertFalse(token.trust_scoped)
self.assertFalse(token.unscoped)
self.assertIsNotNone(token.project)
self.assertEqual(self.project_name, token.project['name'])
self.assertTrue(len(token.roles) == 3)
exp_roles = [
{'id': self.admin_role_id, 'name': 'admin'},
{'id': self.member_role_id, 'name': 'member'},
{'id': self.reader_role_id, 'name': 'reader'}
]
for role in exp_roles:
self.assertTokenContainsRole(token, role)
def test_mint_project_scoped_token_fails_without_roles(self):
user = core.new_user_ref(CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
token = token_model.TokenModel()
token.user_id = user['id']
token.project_id = self.project_id
token.audit_id = provider_common.random_urlsafe_str()
self.assertRaises(
exception.Unauthorized, token.mint, self.token_id, self.issued_at
)
def test_mint_project_scoped_token_fails_when_project_is_disabled(self):
PROVIDERS.resource_api.update_project(
self.project_id, {'enabled': False}
)
token = token_model.TokenModel()
token.user_id = self.admin_user_id
token.project_id = self.project_id
token.audit_id = provider_common.random_urlsafe_str()
self.assertRaises(
exception.ProjectNotFound, token.mint, self.token_id,
self.issued_at
)
def test_mint_project_scoped_token_fails_when_domain_is_disabled(self):
project = PROVIDERS.resource_api.get_project(self.project_id)
PROVIDERS.resource_api.update_domain(
project['domain_id'], {'enabled': False}
)
token = token_model.TokenModel()
token.user_id = self.admin_user_id
token.project_id = self.project_id
token.audit_id = provider_common.random_urlsafe_str()
self.assertRaises(
exception.DomainNotFound, token.mint, self.token_id, self.issued_at
)
def test_mint_application_credential_token(self):
app_cred = {
'id': uuid.uuid4().hex,
'name': 'monitoring-application',
'user_id': self.admin_user_id,
'roles': [{'id': self.admin_role_id}],
'project_id': self.project_id,
'secret': uuid.uuid4().hex
}
PROVIDERS.application_credential_api.create_application_credential(
app_cred
)
token = token_model.TokenModel()
token.user_id = self.admin_user_id
token.application_credential_id = app_cred['id']
token.project_id = self.project_id
token.mint(self.token_id, self.issued_at)
self.assertIsNotNone(token.application_credential_id)
self.assertIsNotNone(token.application_credential)
exp_role = {'id': self.admin_role_id, 'name': 'admin'}
self.assertTokenContainsRole(token, exp_role)
class TrustScopedTokenModelTests(TokenModelTests):
def setUp(self):
super(TrustScopedTokenModelTests, self).setUp()
trustor_domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, core.new_domain_ref()
)
trustee_domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, core.new_domain_ref()
)
self.trustor = PROVIDERS.identity_api.create_user(
core.new_user_ref(trustor_domain['id'])
)
self.trustee = PROVIDERS.identity_api.create_user(
core.new_user_ref(trustee_domain['id'])
)
PROVIDERS.assignment_api.create_grant(
self.admin_role_id, user_id=self.trustor['id'],
project_id=self.project_id
)
def test_mint_trust_scoped_token(self):
roles = [{'id': self.admin_role_id}]
trust = core.new_trust_ref(
self.trustor['id'], self.trustee['id'], project_id=self.project_id
)
trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles)
token = token_model.TokenModel()
token.trust_id = trust['id']
token.user_id = self.trustee['id']
token.mint(self.token_id, self.issued_at)
self.assertEqual(self.trustee['id'], token.user_id)
self.assertEqual(self.trustee['id'], token.trustee['id'])
self.assertEqual(self.trustor['id'], token.trustor['id'])
self.assertEqual(self.project_id, token.trust_project['id'])
self.assertEqual(
CONF.identity.default_domain_id, token.trust_project_domain['id']
)
# NOTE(lbragstad): The domain key here should be removed once
# https://bugs.launchpad.net/keystone/+bug/1763510 is fixed.
exp_role = {
'id': self.admin_role_id, 'name': 'admin', 'domain_id': None
}
self.assertTokenContainsRole(token, exp_role)
def test_mint_trust_scoped_token_fails_when_trustee_domain_disabled(self):
roles = [{'id': self.admin_role_id}]
trust = core.new_trust_ref(
self.trustor['id'], self.trustee['id'], project_id=self.project_id
)
trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles)
PROVIDERS.resource_api.update_domain(
self.trustee['domain_id'], {'enabled': False}
)
token = token_model.TokenModel()
token.trust_id = trust['id']
token.user_id = self.trustee['id']
self.assertRaises(
exception.TokenNotFound, token.mint, self.token_id, self.issued_at
)
def test_mint_trust_scoped_token_fails_when_trustor_domain_disabled(self):
roles = [{'id': self.admin_role_id}]
trust = core.new_trust_ref(
self.trustor['id'], self.trustee['id'], project_id=self.project_id
)
trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles)
PROVIDERS.resource_api.update_domain(
self.trustor['domain_id'], {'enabled': False}
)
token = token_model.TokenModel()
token.trust_id = trust['id']
token.user_id = self.trustee['id']
self.assertRaises(
exception.TokenNotFound, token.mint, self.token_id, self.issued_at
)
def test_mint_trust_scoped_token_fails_when_trustor_is_disabled(self):
roles = [{'id': self.admin_role_id}]
trust = core.new_trust_ref(
self.trustor['id'], self.trustee['id'], project_id=self.project_id
)
trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles)
PROVIDERS.identity_api.update_user(
self.trustor['id'], {'enabled': False}
)
token = token_model.TokenModel()
token.trust_id = trust['id']
token.user_id = self.trustee['id']
self.assertRaises(
exception.Forbidden, token.mint, self.token_id, self.issued_at
)
def test_mint_trust_scoped_token_with_mismatching_users_fails(self):
user = core.new_user_ref(CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
roles = [{'id': self.admin_role_id}]
trust = core.new_trust_ref(
self.trustor['id'], self.trustee['id'], project_id=self.project_id
)
trust = PROVIDERS.trust_api.create_trust(trust['id'], trust, roles)
token = token_model.TokenModel()
token.trust_id = trust['id']
token.user_id = user['id']
self.assertRaises(
exception.Forbidden, token.mint, self.token_id, self.issued_at
)