Add application credential auth plugin

Add an auth plugin for application credentials and update the common
auth utilities to understand an auth method of 'application_credential'
and validate and scope accordingly.

By default, application credentials should not be allowed to be used for
creating other application credentials or trusts. If a user creates an
application credential with flag `allow_application_credential_creation`
then that application should be allowed to be used for creating and
deleting other application credentials and trusts. Ensure a flag is set
in the token if this property is set to allow this behavior.

bp application-credentials

Change-Id: I15a03e79128a11314d06751b94343f22d533243a
This commit is contained in:
Colleen Murphy 2017-12-05 00:36:50 +01:00
parent 166eced28b
commit 29280b1f68
14 changed files with 568 additions and 28 deletions

View File

@ -83,6 +83,15 @@ class ApplicationCredentialV3(controller.V3Controller):
ref = cls.filter_params(ref)
return {cls.member_name: ref}
def _check_unrestricted(self, token):
auth_methods = token['methods']
if 'application_credential' in auth_methods:
if token.token_data['token']['application_credential_restricted']:
action = _("Using method 'application_credential' is not "
"allowed for managing additional application "
"credentials.")
raise exception.ForbiddenAction(action=action)
@controller.protected()
def create_application_credential(self, request, user_id,
application_credential):
@ -90,7 +99,7 @@ class ApplicationCredentialV3(controller.V3Controller):
application_credential)
token = request.auth_context['token']
self._check_unrestricted(token)
if request.context.user_id != user_id:
action = _("Cannot create an application credential for another "
"user")
@ -138,6 +147,8 @@ class ApplicationCredentialV3(controller.V3Controller):
@controller.protected()
def delete_application_credential(self, request, user_id,
application_credential_id):
token = request.auth_context['token']
self._check_unrestricted(token)
PROVIDERS.application_credential_api.delete_application_credential(
application_credential_id, initiator=request.audit_initiator
)

View File

@ -129,6 +129,11 @@ class Auth(controller.V3Controller):
method_names_set = set(auth_context.get('method_names', []))
method_names = list(method_names_set)
app_cred_id = None
if 'application_credential' in method_names:
token_auth = auth_info.auth['identity']
app_cred_id = token_auth['application_credential']['id']
# Do MFA Rule Validation for the user
if not self._mfa_rules_validator.check_auth_methods_against_rules(
auth_context['user_id'], method_names_set):
@ -145,7 +150,7 @@ class Auth(controller.V3Controller):
system=system, project_id=project_id,
is_domain=is_domain, domain_id=domain_id,
auth_context=auth_context, trust=trust,
include_catalog=include_catalog,
app_cred_id=app_cred_id, include_catalog=include_catalog,
parent_audit_id=token_audit_id)
# NOTE(wanghong): We consume a trust use only when we are using

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from functools import partial
import sys
from oslo_log import log
@ -18,6 +19,7 @@ from oslo_utils import importutils
import six
import stevedore
from keystone.common import driver_hints
from keystone.common import provider_api
from keystone.common import utils
import keystone.conf
@ -27,8 +29,8 @@ from keystone.identity.backends import resource_options as ro
LOG = log.getLogger(__name__)
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
# registry of authentication methods
AUTH_METHODS = {}
@ -229,8 +231,57 @@ class AuthInfo(provider_api.ProviderAPIMixin, object):
trust = self.trust_api.get_trust(trust_id)
return trust
def _lookup_app_cred(self, app_cred_info):
app_cred_id = app_cred_info.get('id')
if app_cred_id:
get_app_cred = partial(
PROVIDERS.application_credential_api.get_application_credential
)
return get_app_cred(app_cred_id)
name = app_cred_info.get('name')
if not name:
raise exception.ValidationError(attribute='name or ID',
target='application credential')
user = app_cred_info.get('user')
if not user:
raise exception.ValidationError(attribute='user',
target='application credential')
user_id = user.get('id')
if not user_id:
if 'domain' not in user:
raise exception.ValidationError(attribute='domain',
target='user')
domain_ref = self._lookup_domain(user['domain'])
user_id = PROVIDERS.identity_api.get_user_by_name(
user['name'], domain_ref['id'])['id']
hints = driver_hints.Hints()
hints.add_filter('name', name)
app_cred_api = PROVIDERS.application_credential_api
app_creds = app_cred_api.list_application_credentials(
user_id, hints)
if len(app_creds) != 1:
message = "Could not find application credential: %s" % name
LOG.warning(six.text_type(message))
raise exception.Unauthorized(message)
return app_creds[0]
def _set_scope_from_app_cred(self, app_cred_info):
app_cred_ref = self._lookup_app_cred(app_cred_info)
self._scope_data = (None, app_cred_ref['project_id'], None, None, None)
return
def _validate_and_normalize_scope_data(self):
"""Validate and normalize scope data."""
if 'identity' in self.auth:
if 'application_credential' in self.auth['identity']['methods']:
# Application credentials can't choose their own scope
if 'scope' in self.auth:
detail = "Application credentials cannot request a scope."
raise exception.ApplicationCredentialAuthError(
detail=detail)
self._set_scope_from_app_cred(
self.auth['identity']['application_credential'])
return
if 'scope' not in self.auth:
return
if sum(['project' in self.auth['scope'],

View File

@ -0,0 +1,42 @@
# Copyright 2018 SUSE Linux GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.auth import plugins as auth_plugins
from keystone.auth.plugins import base
from keystone.common import provider_api
from keystone import exception
PROVIDERS = provider_api.ProviderAPIs
METHOD_NAME = 'application_credential'
class ApplicationCredential(base.AuthMethodHandler):
def authenticate(self, request, auth_payload):
"""Authenticate an application."""
response_data = {}
app_cred_info = auth_plugins.AppCredInfo.create(auth_payload,
METHOD_NAME)
try:
PROVIDERS.application_credential_api.authenticate(
request,
application_credential_id=app_cred_info.id,
secret=app_cred_info.secret)
except AssertionError as e:
raise exception.Unauthorized(e)
response_data['user_id'] = app_cred_info.user_id
return base.AuthHandlerResponse(status=True, response_body=None,
response_data=response_data)

View File

@ -17,6 +17,7 @@ import sys
from oslo_log import log
import six
from keystone.common import driver_hints
from keystone.common import provider_api
import keystone.conf
from keystone import exception
@ -24,6 +25,7 @@ from keystone import exception
CONF = keystone.conf.CONF
LOG = log.getLogger(__name__)
PROVIDERS = provider_api.ProviderAPIs
def construct_method_map_from_config():
@ -203,3 +205,35 @@ class TOTPUserInfo(BaseUserInfo):
auth_payload)
user_info = auth_payload['user']
self.passcode = user_info.get('passcode')
class AppCredInfo(BaseUserInfo):
def __init__(self):
super(AppCredInfo, self).__init__()
self.id = None
self.secret = None
def _validate_and_normalize_auth_data(self, auth_payload):
app_cred_api = PROVIDERS.application_credential_api
if auth_payload.get('id'):
app_cred = app_cred_api.get_application_credential(
auth_payload['id'])
self.user_id = app_cred['user_id']
if not auth_payload.get('user'):
auth_payload['user'] = {}
auth_payload['user']['id'] = self.user_id
super(AppCredInfo, self)._validate_and_normalize_auth_data(
auth_payload)
elif auth_payload.get('name'):
super(AppCredInfo, self)._validate_and_normalize_auth_data(
auth_payload)
hints = driver_hints.Hints()
hints.add_filter('name', auth_payload['name'])
app_cred = app_cred_api.list_application_credentials(
self.user_id, hints)[0]
auth_payload['id'] = app_cred['id']
else:
raise exception.ValidationError(attribute='id or name',
target='application credential')
self.id = auth_payload['id']
self.secret = auth_payload.get('secret')

View File

@ -302,6 +302,11 @@ class AuthMethodNotSupported(AuthPluginException):
self.authentication = {'methods': CONF.auth.methods}
class ApplicationCredentialAuthError(AuthPluginException):
message_format = _(
"Error authenticating with application credential: %(detail)s")
class AdditionalAuthRequired(AuthPluginException):
message_format = _("Additional authentications steps required.")

View File

@ -53,6 +53,21 @@ class AuthTestMixin(object):
return scope_data
def _build_user(self, user_id=None, username=None, user_domain_id=None,
user_domain_name=None):
user = {}
if user_id:
user['id'] = user_id
else:
user['name'] = username
if user_domain_id or user_domain_name:
user['domain'] = {}
if user_domain_id:
user['domain']['id'] = user_domain_id
else:
user['domain']['name'] = user_domain_name
return user
def _build_auth(self, user_id=None, username=None, user_domain_id=None,
user_domain_name=None, **kwargs):
@ -68,27 +83,36 @@ class AuthTestMixin(object):
message="_build_auth only supports 'passcode' "
"and 'password' secret types")
data = {'user': {}}
if user_id:
data['user']['id'] = user_id
else:
data['user']['name'] = username
if user_domain_id or user_domain_name:
data['user']['domain'] = {}
if user_domain_id:
data['user']['domain']['id'] = user_domain_id
else:
data['user']['domain']['name'] = user_domain_name
data = {}
data['user'] = self._build_user(user_id=user_id, username=username,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name)
data['user'][secret_type] = secret_value
return data
def _build_token_auth(self, token):
return {'id': token}
def _build_app_cred_auth(self, secret, app_cred_id=None,
app_cred_name=None, user_id=None, username=None,
user_domain_id=None, user_domain_name=None):
data = {'secret': secret}
if app_cred_id:
data['id'] = app_cred_id
else:
data['name'] = app_cred_name
data['user'] = self._build_user(user_id=user_id,
username=username,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name)
return data
def build_authentication_request(self, token=None, user_id=None,
username=None, user_domain_id=None,
user_domain_name=None, password=None,
kerberos=False, passcode=None, **kwargs):
kerberos=False, passcode=None,
app_cred_id=None, app_cred_name=None,
secret=None, **kwargs):
"""Build auth dictionary.
It will create an auth dictionary based on all the arguments
@ -112,6 +136,14 @@ class AuthTestMixin(object):
auth_data['identity']['totp'] = self._build_auth(
user_id, username, user_domain_id, user_domain_name,
passcode=passcode)
if (app_cred_id or app_cred_name) and secret:
auth_data['identity']['methods'].append('application_credential')
identity = auth_data['identity']
identity['application_credential'] = self._build_app_cred_auth(
secret, app_cred_id=app_cred_id, app_cred_name=app_cred_name,
user_id=user_id, username=username,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name)
if kwargs:
auth_data['scope'] = self._build_auth_scope(**kwargs)
return {'auth': auth_data}

View File

@ -30,6 +30,11 @@ MEMBER_PATH_FMT = '/users/%(user_id)s/application_credentials/%(app_cred_id)s'
class ApplicationCredentialTestCase(test_v3.RestfulTestCase):
"""Test CRUD operations for application credentials."""
def config_overrides(self):
super(ApplicationCredentialTestCase, self).config_overrides()
self.config_fixture.config(group='auth',
methods='password,application_credential')
def _app_cred_body(self, roles=None, name=None, expires=None, secret=None):
name = name or uuid.uuid4().hex
description = 'Credential for backups'
@ -115,13 +120,44 @@ class ApplicationCredentialTestCase(test_v3.RestfulTestCase):
body=app_cred_body,
expected_status=http_client.BAD_REQUEST)
def test_create_application_credential_with_application_credential(self):
roles = [{'id': self.role_id}]
app_cred_body_1 = self._app_cred_body(roles=roles)
app_cred_1 = self.post(
'/users/%s/application_credentials' % self.user_id,
body=app_cred_body_1,
expected_status=http_client.CREATED)
auth_data = self.build_authentication_request(
app_cred_id=app_cred_1.json['application_credential']['id'],
secret=app_cred_1.json['application_credential']['secret'])
token_data = self.v3_create_token(auth_data,
expected_status=http_client.CREATED)
app_cred_body_2 = self._app_cred_body(roles=roles)
self.post(
path='/users/%s/application_credentials' % self.user_id,
body=app_cred_body_2,
token=token_data.headers['x-subject-token'],
expected_status=http_client.FORBIDDEN)
def test_create_application_credential_allow_recursion(self):
roles = [{'id': self.role_id}]
app_cred_body = self._app_cred_body(roles=roles)
app_cred_body['application_credential']['unrestricted'] = True
self.post('/users/%s/application_credentials' % self.user_id,
body=app_cred_body,
expected_status=http_client.CREATED)
app_cred_body_1 = self._app_cred_body(roles=roles)
app_cred_body_1['application_credential']['unrestricted'] = True
app_cred_1 = self.post(
'/users/%s/application_credentials' % self.user_id,
body=app_cred_body_1,
expected_status=http_client.CREATED)
auth_data = self.build_authentication_request(
app_cred_id=app_cred_1.json['application_credential']['id'],
secret=app_cred_1.json['application_credential']['secret'])
token_data = self.v3_create_token(auth_data,
expected_status=http_client.CREATED)
app_cred_body_2 = self._app_cred_body(roles=roles)
self.post(
path='/users/%s/application_credentials' % self.user_id,
body=app_cred_body_2,
token=token_data.headers['x-subject-token'],
expected_status=http_client.CREATED)
def test_list_application_credentials(self):
resp = self.get('/users/%s/application_credentials' % self.user_id,
@ -216,6 +252,45 @@ class ApplicationCredentialTestCase(test_v3.RestfulTestCase):
'app_cred_id': uuid.uuid4().hex},
expected_status=http_client.NOT_FOUND)
def test_delete_application_credential_with_application_credential(self):
roles = [{'id': self.role_id}]
app_cred_body = self._app_cred_body(roles=roles)
app_cred = self.post(
'/users/%s/application_credentials' % self.user_id,
body=app_cred_body,
expected_status=http_client.CREATED)
auth_data = self.build_authentication_request(
app_cred_id=app_cred.json['application_credential']['id'],
secret=app_cred.json['application_credential']['secret'])
token_data = self.v3_create_token(auth_data,
expected_status=http_client.CREATED)
self.delete(
path=MEMBER_PATH_FMT % {
'user_id': self.user_id,
'app_cred_id': app_cred.json['application_credential']['id']},
token=token_data.headers['x-subject-token'],
expected_status=http_client.FORBIDDEN)
def test_delete_application_credential_allow_recursion(self):
roles = [{'id': self.role_id}]
app_cred_body = self._app_cred_body(roles=roles)
app_cred_body['application_credential']['unrestricted'] = True
app_cred = self.post(
'/users/%s/application_credentials' % self.user_id,
body=app_cred_body,
expected_status=http_client.CREATED)
auth_data = self.build_authentication_request(
app_cred_id=app_cred.json['application_credential']['id'],
secret=app_cred.json['application_credential']['secret'])
token_data = self.v3_create_token(auth_data,
expected_status=http_client.CREATED)
self.delete(
path=MEMBER_PATH_FMT % {
'user_id': self.user_id,
'app_cred_id': app_cred.json['application_credential']['id']},
token=token_data.headers['x-subject-token'],
expected_status=http_client.NO_CONTENT)
def test_update_application_credential(self):
roles = [{'id': self.role_id}]
app_cred_body = self._app_cred_body(roles=roles)

View File

@ -34,6 +34,7 @@ from testtools import testcase
from keystone import auth
from keystone.auth.plugins import totp
from keystone.common import policy
from keystone.common import provider_api
from keystone.common import utils
import keystone.conf
from keystone.credential.providers import fernet as credential_fernet
@ -46,6 +47,7 @@ from keystone.tests.unit import test_v3
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class TestMFARules(test_v3.RestfulTestCase):
@ -5317,3 +5319,160 @@ class UUIDFetchRevocationList(TestFetchRevocationList,
# NOTE(lbragstad): The Fernet token provider doesn't use Revocation lists so
# don't inherit TestFetchRevocationList here to test it.
class ApplicationCredentialAuth(test_v3.RestfulTestCase):
def setUp(self):
super(ApplicationCredentialAuth, self).setUp()
self.app_cred_api = PROVIDERS.application_credential_api
def config_overrides(self):
super(ApplicationCredentialAuth, self).config_overrides()
self.auth_plugin_config_override(
methods=['application_credential', 'password', 'token'])
def _make_app_cred(self, expires=None):
roles = [{'id': self.role_id}]
data = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'secret': uuid.uuid4().hex,
'user_id': self.user['id'],
'project_id': self.project['id'],
'description': uuid.uuid4().hex,
'roles': roles
}
if expires:
data['expires_at'] = expires
return data
def test_valid_application_credential_succeeds(self):
app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential(
app_cred)
auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, expected_status=http_client.CREATED)
def test_valid_application_credential_with_name_succeeds(self):
app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential(
app_cred)
auth_data = self.build_authentication_request(
app_cred_name=app_cred_ref['name'], secret=app_cred_ref['secret'],
user_id=self.user['id'])
self.v3_create_token(auth_data, expected_status=http_client.CREATED)
def test_valid_application_credential_name_and_username_succeeds(self):
app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential(
app_cred)
auth_data = self.build_authentication_request(
app_cred_name=app_cred_ref['name'], secret=app_cred_ref['secret'],
username=self.user['name'], user_domain_id=self.user['domain_id'])
self.v3_create_token(auth_data, expected_status=http_client.CREATED)
def test_application_credential_with_invalid_secret_fails(self):
app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential(
app_cred)
auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret='badsecret')
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_unexpired_application_credential_succeeds(self):
expires_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
app_cred = self._make_app_cred(expires=expires_at)
app_cred_ref = self.app_cred_api.create_application_credential(
app_cred)
auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, expected_status=http_client.CREATED)
def test_expired_application_credential_fails(self):
expires_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
app_cred = self._make_app_cred(expires=expires_at)
app_cred_ref = self.app_cred_api.create_application_credential(
app_cred)
auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
future = datetime.datetime.utcnow() + datetime.timedelta(minutes=2)
with freezegun.freeze_time(future):
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_application_credential_fails_when_user_deleted(self):
app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential(
app_cred)
PROVIDERS.identity_api.delete_user(self.user['id'])
auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, expected_status=http_client.NOT_FOUND)
def test_application_credential_fails_when_user_disabled(self):
app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential(
app_cred)
PROVIDERS.identity_api.update_user(self.user['id'],
{'enabled': False})
auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_application_credential_fails_when_project_deleted(self):
app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential(
app_cred)
PROVIDERS.resource_api.delete_project(self.project['id'])
auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, expected_status=http_client.NOT_FOUND)
def test_application_credential_fails_when_role_deleted(self):
app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential(
app_cred)
PROVIDERS.role_api.delete_role(self.role_id)
auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, expected_status=http_client.NOT_FOUND)
def test_application_credential_fails_when_role_unassigned(self):
app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential(
app_cred)
PROVIDERS.assignment_api.remove_role_from_user_and_project(
self.user['id'], self.project['id'],
self.role_id)
auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, expected_status=http_client.NOT_FOUND)
def test_application_credential_cannot_scope(self):
app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential(
app_cred)
new_project_ref = unit.new_project_ref(domain_id=self.domain_id)
# Create a new project and assign the user a valid role on it
new_project = PROVIDERS.resource_api.create_project(
new_project_ref['id'], new_project_ref)
PROVIDERS.assignment_api.add_role_to_user_and_project(
self.user['id'], new_project['id'], self.role_id)
# Check that a password auth would work
password_auth = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=new_project['id'])
password_response = self.v3_create_token(password_auth)
self.assertValidProjectScopedTokenResponse(password_response)
# Should not be able to use that scope with an application credential
# even though the user has a valid assignment on it
app_cred_auth = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'],
project_id=new_project['id'])
self.v3_create_token(app_cred_auth,
expected_status=http_client.UNAUTHORIZED)

View File

@ -15,12 +15,14 @@ import uuid
from six.moves import http_client
from keystone.common import provider_api
import keystone.conf
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit import test_v3
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class TestTrustOperations(test_v3.RestfulTestCase):
@ -506,3 +508,73 @@ class TestTrustOperations(test_v3.RestfulTestCase):
self.assertRaises(exception.TrustNotFound,
self.trust_api.get_trust,
trust['id'])
class TrustsWithApplicationCredentials(test_v3.RestfulTestCase):
def setUp(self):
super(TrustsWithApplicationCredentials, self).setUp()
self.trustee_user = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domain_id)
self.trustee_user_id = self.trustee_user['id']
def config_overrides(self):
super(TrustsWithApplicationCredentials, self).config_overrides()
self.config_fixture.config(group='auth',
methods='password,application_credential')
def test_create_trust_with_application_credential(self):
app_cred = {
'id': uuid.uuid4().hex,
'user_id': self.user_id,
'project_id': self.project_id,
'name': uuid.uuid4().hex,
'roles': [{'id': self.role_id}],
'secret': uuid.uuid4().hex
}
app_cred_api = PROVIDERS.application_credential_api
app_cred_api.create_application_credential(app_cred)
auth_data = self.build_authentication_request(
app_cred_id=app_cred['id'], secret=app_cred['secret'])
token_data = self.v3_create_token(auth_data,
expected_status=http_client.CREATED)
trust_body = unit.new_trust_ref(trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user_id,
project_id=self.project_id,
role_ids=[self.role_id])
self.post(
path='/OS-TRUST/trusts',
body={'trust': trust_body},
token=token_data.headers['x-subject-token'],
expected_status=http_client.FORBIDDEN)
def test_delete_trust_with_application_credential(self):
ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user_id,
project_id=self.project_id,
impersonation=False,
expires=dict(minutes=1),
role_ids=[self.role_id])
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r, ref)
app_cred = {
'id': uuid.uuid4().hex,
'user_id': self.user_id,
'project_id': self.project_id,
'name': uuid.uuid4().hex,
'roles': [{'id': self.role_id}],
'secret': uuid.uuid4().hex
}
app_cred_api = PROVIDERS.application_credential_api
app_cred_api.create_application_credential(app_cred)
auth_data = self.build_authentication_request(
app_cred_id=app_cred['id'], secret=app_cred['secret'])
token_data = self.v3_create_token(auth_data,
expected_status=http_client.CREATED)
# delete the trust
self.delete(path='/OS-TRUST/trusts/%(trust_id)s' % {
'trust_id': trust['id']},
token=token_data.headers['x-subject-token'],
expected_status=http_client.FORBIDDEN)

View File

@ -198,12 +198,14 @@ class Manager(manager.Manager):
def issue_token(self, user_id, method_names, expires_at=None,
system=None, project_id=None, is_domain=False,
domain_id=None, auth_context=None, trust=None,
include_catalog=True, parent_audit_id=None):
app_cred_id=None, include_catalog=True,
parent_audit_id=None):
token_id, token_data = self.driver.issue_token(
user_id, method_names, expires_at=expires_at,
system=system, project_id=project_id,
domain_id=domain_id, auth_context=auth_context, trust=trust,
include_catalog=include_catalog, parent_audit_id=parent_audit_id)
app_cred_id=app_cred_id, include_catalog=include_catalog,
parent_audit_id=parent_audit_id)
if self._needs_persistence:
data = dict(key=token_id,

View File

@ -192,6 +192,20 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
user_id, project_id)
return [PROVIDERS.role_api.get_role(role_id) for role_id in roles]
def _get_app_cred_roles(self, app_cred, user_id, domain_id, project_id):
roles = app_cred['roles']
token_roles = []
for role in roles:
try:
role_ref = PROVIDERS.assignment_api.get_grant(
role['id'], user_id=user_id, domain_id=domain_id,
project_id=project_id)
token_roles.append(role_ref)
except exception.RoleAssignmentNotFound:
pass
return [
PROVIDERS.role_api.get_role(role['id']) for role in token_roles]
def populate_roles_for_federated_user(self, token_data, group_ids,
project_id=None, domain_id=None,
user_id=None, system=None):
@ -309,7 +323,7 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
'consumer_id': consumer_id})
def _populate_roles(self, token_data, user_id, system, domain_id,
project_id, trust, access_token):
project_id, trust, app_cred_id, access_token):
if 'roles' in token_data:
# no need to repopulate roles
return
@ -381,6 +395,16 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
else:
raise exception.Forbidden(
_('Trustee has no delegated roles.'))
elif app_cred_id:
app_cred_api = PROVIDERS.application_credential_api
app_cred_ref = app_cred_api.get_application_credential(
app_cred_id)
for role in self._get_app_cred_roles(app_cred_ref,
token_user_id,
token_domain_id,
token_project_id):
filtered_roles.append({'id': role['id'],
'name': role['name']})
else:
for role in self._get_roles_for_user(token_user_id,
system,
@ -458,10 +482,18 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
LOG.error(msg)
raise exception.UnexpectedError(msg)
def _populate_app_cred_restrictions(self, token_data, app_cred_id):
if app_cred_id:
app_cred_api = PROVIDERS.application_credential_api
app_cred = app_cred_api.get_application_credential(app_cred_id)
restricted = not app_cred['unrestricted']
token_data['application_credential_restricted'] = restricted
def get_token_data(self, user_id, method_names, system=None,
domain_id=None, project_id=None, expires=None,
trust=None, token=None, include_catalog=True, bind=None,
access_token=None, issued_at=None, audit_info=None):
app_cred_id=None, trust=None, token=None,
include_catalog=True, bind=None, access_token=None,
issued_at=None, audit_info=None):
token_data = {'methods': method_names}
# We've probably already written these to the token
@ -478,7 +510,7 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
self._populate_is_admin_project(token_data)
self._populate_user(token_data, user_id, trust)
self._populate_roles(token_data, user_id, system, domain_id,
project_id, trust, access_token)
project_id, trust, app_cred_id, access_token)
self._populate_audit_info(token_data, audit_info)
if include_catalog:
@ -489,6 +521,7 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
self._populate_token_dates(token_data, expires=expires,
issued_at=issued_at)
self._populate_oauth_section(token_data, access_token)
self._populate_app_cred_restrictions(token_data, app_cred_id)
return {'token': token_data}
@ -518,8 +551,8 @@ class BaseProvider(provider_api.ProviderAPIMixin, base.Provider):
def issue_token(self, user_id, method_names, expires_at=None,
system=None, project_id=None, domain_id=None,
auth_context=None, trust=None, include_catalog=True,
parent_audit_id=None):
auth_context=None, trust=None, app_cred_id=None,
include_catalog=True, parent_audit_id=None):
if auth_context and auth_context.get('bind'):
# NOTE(lbragstad): Check if the token provider being used actually
# supports bind authentication methods before proceeding.
@ -552,6 +585,7 @@ class BaseProvider(provider_api.ProviderAPIMixin, base.Provider):
project_id=project_id,
expires=expires_at,
trust=trust,
app_cred_id=app_cred_id,
bind=auth_context.get('bind') if auth_context else None,
token=token_ref,
include_catalog=include_catalog,

View File

@ -100,6 +100,14 @@ class TrustV3(controller.V3Controller):
redelegated_trust = None
return redelegated_trust
def _check_unrestricted(self, token):
auth_methods = token['methods']
if 'application_credential' in auth_methods:
if token.token_data['token']['application_credential_restricted']:
action = _("Using method 'application_credential' is not "
"allowed for managing trusts.")
raise exception.ForbiddenAction(action=action)
@controller.protected()
def create_trust(self, request, trust):
"""Create a new trust.
@ -108,6 +116,10 @@ class TrustV3(controller.V3Controller):
"""
validation.lazy_validate(schema.trust_create, trust)
token = request.auth_context['token']
self._check_unrestricted(token)
redelegated_trust = self._find_redelegated_trust(request)
if trust.get('project_id') and not trust.get('roles'):
@ -211,6 +223,9 @@ class TrustV3(controller.V3Controller):
@controller.protected()
def delete_trust(self, request, trust_id):
token = request.auth_context['token']
self._check_unrestricted(token)
trust = PROVIDERS.trust_api.get_trust(trust_id)
if (request.context.user_id != trust.get('trustor_user_id') and

View File

@ -84,6 +84,9 @@ wsgi_scripts =
keystone.assignment =
sql = keystone.assignment.backends.sql:Assignment
keystone.auth.application_credential =
default = keystone.auth.plugins.application_credential:ApplicationCredential
keystone.auth.external =
default = keystone.auth.plugins.external:DefaultDomain
DefaultDomain = keystone.auth.plugins.external:DefaultDomain