Merge "Remove the rest of v2.0 legacy"

This commit is contained in:
Zuul 2018-06-09 00:59:11 +00:00 committed by Gerrit Code Review
commit ee4fbf619b
13 changed files with 9 additions and 977 deletions

View File

@ -33,30 +33,6 @@ LOG = log.getLogger(__name__)
CONF = keystone.conf.CONF
def v2_ec2_deprecated(f):
@six.wraps(f)
def wrapper(*args, **kwargs):
deprecated = versionutils.deprecated(
what=f.__name__ + ' of the v2 EC2 APIs',
as_of=versionutils.deprecated.MITAKA,
in_favor_of=('a similar function in the v3 Credential APIs'),
remove_in=+7)
return deprecated(f)
return wrapper()
def v2_auth_deprecated(f):
@six.wraps(f)
def wrapper(*args, **kwargs):
deprecated = versionutils.deprecated(
what=f.__name__ + ' of the v2 Authentication APIs',
as_of=versionutils.deprecated.MITAKA,
in_favor_of=('a similar function in the v3 Authentication APIs'),
remove_in=+7)
return deprecated(f)
return wrapper()
def protected(callback=None):
"""Wrap API calls with role based access controls (RBAC).
@ -146,77 +122,6 @@ def protected_wrapper(self, f, check_function, request, filter_attr,
check_function(self, request, prep_info, *args, **kwargs)
class V2Controller(provider_api.ProviderAPIMixin, wsgi.Application):
"""Base controller class for Identity API v2."""
@staticmethod
def v3_to_v2_user(ref):
"""Convert a user_ref from v3 to v2 compatible.
* v2.0 users are not domain aware, and should have domain_id removed
* v2.0 users expect the use of tenantId instead of default_project_id
* v2.0 users have a username attribute
* v2.0 remove password_expires_at
If ref is a list type, we will iterate through each element and do the
conversion.
"""
def _format_default_project_id(ref):
"""Convert default_project_id to tenantId for v2 calls."""
default_project_id = ref.pop('default_project_id', None)
if default_project_id is not None:
ref['tenantId'] = default_project_id
elif 'tenantId' in ref:
# NOTE(morganfainberg): To avoid v2.0 confusion if somehow a
# tenantId property sneaks its way into the extra blob on the
# user, we remove it here. If default_project_id is set, we
# would override it in either case.
del ref['tenantId']
def _normalize_and_filter_user_properties(ref):
_format_default_project_id(ref)
ref.pop('password_expires_at', None)
ref.pop('domain', None)
ref.pop('domain_id', None)
if 'username' not in ref and 'name' in ref:
ref['username'] = ref['name']
return ref
if isinstance(ref, dict):
return _normalize_and_filter_user_properties(ref)
elif isinstance(ref, list):
return [_normalize_and_filter_user_properties(x) for x in ref]
else:
raise ValueError(_('Expected dict or list: %s') % type(ref))
@staticmethod
def v3_to_v2_project(ref):
"""Convert a project_ref from v3 to v2.
* v2.0 projects are not domain aware, and should have domain_id removed
* v2.0 projects are not hierarchy aware, and should have parent_id
removed
This method should only be applied to project_refs being returned from
the v2.0 controller(s).
If ref is a list type, we will iterate through each element and do the
conversion.
"""
def _filter_project_properties(ref):
ref.pop('domain_id', None)
ref.pop('parent_id', None)
ref.pop('is_domain', None)
return ref
if isinstance(ref, dict):
return _filter_project_properties(ref)
elif isinstance(ref, list):
return [_filter_project_properties(x) for x in ref]
else:
raise ValueError(_('Expected dict or list: %s') % type(ref))
class V3Controller(provider_api.ProviderAPIMixin, wsgi.Application):
"""Base controller class for Identity API v3.

View File

@ -15,5 +15,4 @@
from keystone.contrib.ec2 import controllers # noqa
from keystone.contrib.ec2.core import * # noqa
from keystone.contrib.ec2 import routers # noqa
from keystone.contrib.ec2.routers import Ec2Extension # noqa
from keystone.contrib.ec2.routers import Routers as Ec2ExtensionV3 # noqa

View File

@ -41,7 +41,6 @@ from oslo_serialization import jsonutils
import six
from six.moves import http_client
from keystone.common import authorization
from keystone.common import controller
from keystone.common import provider_api
from keystone.common import utils
@ -55,177 +54,6 @@ CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class V2TokenDataHelper(provider_api.ProviderAPIMixin, object):
"""Create V2 token data."""
def v3_to_v2_token(self, v3_token_data, token_id):
"""Convert v3 token data into v2.0 token data.
This method expects a dictionary generated from
V3TokenDataHelper.get_token_data() and converts it to look like a v2.0
token dictionary.
:param v3_token_data: dictionary formatted for v3 tokens
:param token_id: ID of the token being converted
:returns: dictionary formatted for v2 tokens
:raises keystone.exception.Unauthorized: If a specific token type is
not supported in v2.
"""
token_data = {}
# Build v2 token
v3_token = v3_token_data['token']
# NOTE(lbragstad): Version 2.0 tokens don't know about any domain other
# than the default domain specified in the configuration.
domain_id = v3_token.get('domain', {}).get('id')
if domain_id and CONF.identity.default_domain_id != domain_id:
msg = ('Unable to validate domain-scoped tokens outside of the '
'default domain')
raise exception.Unauthorized(msg)
token = {}
token['expires'] = v3_token.get('expires_at')
token['issued_at'] = v3_token.get('issued_at')
token['audit_ids'] = v3_token.get('audit_ids')
if v3_token.get('bind'):
token['bind'] = v3_token['bind']
token['id'] = token_id
if 'project' in v3_token:
# v3 token_data does not contain all tenant attributes
tenant = PROVIDERS.resource_api.get_project(
v3_token['project']['id'])
# Drop domain specific fields since v2 calls are not domain-aware.
token['tenant'] = controller.V2Controller.v3_to_v2_project(
tenant)
token_data['token'] = token
# Build v2 user
v3_user = v3_token['user']
user = controller.V2Controller.v3_to_v2_user(v3_user)
if 'OS-TRUST:trust' in v3_token:
v3_trust = v3_token['OS-TRUST:trust']
# if token is scoped to trust, both trustor and trustee must
# be in the default domain. Furthermore, the delegated project
# must also be in the default domain
msg = _('Non-default domain is not supported')
if CONF.trust.enabled:
try:
trust_ref = PROVIDERS.trust_api.get_trust(v3_trust['id'])
except exception.TrustNotFound:
raise exception.TokenNotFound(token_id=token_id)
trustee_user_ref = PROVIDERS.identity_api.get_user(
trust_ref['trustee_user_id'])
if (trustee_user_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
trustor_user_ref = PROVIDERS.identity_api.get_user(
trust_ref['trustor_user_id'])
if (trustor_user_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
if trust_ref.get('project_id'):
project_ref = PROVIDERS.resource_api.get_project(
trust_ref['project_id'])
if (project_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
token_data['trust'] = {
'impersonation': v3_trust['impersonation'],
'id': v3_trust['id'],
'trustee_user_id': v3_trust['trustee_user']['id'],
'trustor_user_id': v3_trust['trustor_user']['id']
}
if 'OS-OAUTH1' in v3_token:
msg = ('Unable to validate Oauth tokens using the version v2.0 '
'API.')
raise exception.Unauthorized(msg)
if 'OS-FEDERATION' in v3_token['user']:
msg = _('Unable to validate Federation tokens using the version '
'v2.0 API.')
raise exception.Unauthorized(msg)
# Set user roles
user['roles'] = []
role_ids = []
for role in v3_token.get('roles', []):
role_ids.append(role.pop('id'))
user['roles'].append(role)
user['roles_links'] = []
token_data['user'] = user
# Get and build v2 service catalog
token_data['serviceCatalog'] = []
if 'tenant' in token:
catalog_ref = PROVIDERS.catalog_api.get_catalog(
user['id'], token['tenant']['id'])
if catalog_ref:
token_data['serviceCatalog'] = self.format_catalog(catalog_ref)
# Build v2 metadata
metadata = {}
metadata['roles'] = role_ids
# Setting is_admin to keep consistency in v2 response
metadata['is_admin'] = 0
token_data['metadata'] = metadata
return {'access': token_data}
@classmethod
def format_catalog(cls, catalog_ref):
"""Munge catalogs from internal to output format.
Internal catalogs look like::
{$REGION: {
{$SERVICE: {
$key1: $value1,
...
}
}
}
The legacy api wants them to look like::
[{'name': $SERVICE[name],
'type': $SERVICE,
'endpoints': [{
'tenantId': $tenant_id,
...
'region': $REGION,
}],
'endpoints_links': [],
}]
"""
if not catalog_ref:
return []
services = {}
for region, region_ref in catalog_ref.items():
for service, service_ref in region_ref.items():
new_service_ref = services.get(service, {})
new_service_ref['name'] = service_ref.pop('name')
new_service_ref['type'] = service
new_service_ref['endpoints_links'] = []
service_ref['region'] = region
endpoints_ref = new_service_ref.get('endpoints', [])
endpoints_ref.append(service_ref)
new_service_ref['endpoints'] = endpoints_ref
services[service] = new_service_ref
return list(services.values())
@six.add_metaclass(abc.ABCMeta)
class Ec2ControllerCommon(provider_api.ProviderAPIMixin, object):
def check_signature(self, creds_ref, credentials):
@ -440,93 +268,6 @@ class Ec2ControllerCommon(provider_api.ProviderAPIMixin, object):
headers=headers)
class Ec2Controller(Ec2ControllerCommon, controller.V2Controller):
@controller.v2_ec2_deprecated
def authenticate(self, request, credentials=None, ec2Credentials=None):
(user_ref, project_ref, roles_ref, catalog_ref) = self._authenticate(
credentials=credentials, ec2credentials=ec2Credentials
)
method_names = ['ec2credential']
token_id, token_data = self.token_provider_api.issue_token(
user_ref['id'], method_names, project_id=project_ref['id'])
v2_helper = V2TokenDataHelper()
token_data = v2_helper.v3_to_v2_token(token_data, token_id)
return token_data
@controller.v2_ec2_deprecated
def get_credential(self, request, user_id, credential_id):
if not self._is_admin(request):
self._assert_identity(request.context_dict, user_id)
return super(Ec2Controller, self).get_credential(user_id,
credential_id)
@controller.v2_ec2_deprecated
def get_credentials(self, request, user_id):
if not self._is_admin(request):
self._assert_identity(request.context_dict, user_id)
return super(Ec2Controller, self).get_credentials(user_id)
@controller.v2_ec2_deprecated
def create_credential(self, request, user_id, tenant_id):
if not self._is_admin(request):
self._assert_identity(request.context_dict, user_id)
return super(Ec2Controller, self).create_credential(
request, user_id, tenant_id)
@controller.v2_ec2_deprecated
def delete_credential(self, request, user_id, credential_id):
if not self._is_admin(request):
self._assert_identity(request.context_dict, user_id)
self._assert_owner(user_id, credential_id)
return super(Ec2Controller, self).delete_credential(user_id,
credential_id)
def _assert_identity(self, context, user_id):
"""Check that the provided token belongs to the user.
:param context: standard context
:param user_id: id of user
:raises keystone.exception.Forbidden: when token is invalid
"""
token_ref = authorization.get_token_ref(context)
if token_ref.user_id != user_id:
raise exception.Forbidden(_('Token belongs to another user'))
def _is_admin(self, request):
"""Wrap admin assertion error return statement.
:param context: standard context
:returns: bool: success
"""
try:
# NOTE(morganfainberg): policy_api is required for assert_admin
# to properly perform policy enforcement.
self.assert_admin(request)
return True
except (exception.Forbidden, exception.Unauthorized):
return False
def _assert_owner(self, user_id, credential_id):
"""Ensure the provided user owns the credential.
:param user_id: expected credential owner
:param credential_id: id of credential object
:raises keystone.exception.Forbidden: on failure
"""
ec2_credential_id = utils.hash_access_key(credential_id)
cred_ref = self.credential_api.get_credential(ec2_credential_id)
if user_id != cred_ref['user_id']:
raise exception.Forbidden(_('Credential belongs to another user'))
class Ec2ControllerV3(Ec2ControllerCommon, controller.V3Controller):
collection_name = 'credentials'

View File

@ -24,39 +24,6 @@ build_resource_relation = functools.partial(
extension_version='1.0')
class Ec2Extension(wsgi.ExtensionRouter):
def add_routes(self, mapper):
ec2_controller = controllers.Ec2Controller()
# validation
mapper.connect(
'/ec2tokens',
controller=ec2_controller,
action='authenticate',
conditions=dict(method=['POST']))
# crud
mapper.connect(
'/users/{user_id}/credentials/OS-EC2',
controller=ec2_controller,
action='create_credential',
conditions=dict(method=['POST']))
mapper.connect(
'/users/{user_id}/credentials/OS-EC2',
controller=ec2_controller,
action='get_credentials',
conditions=dict(method=['GET']))
mapper.connect(
'/users/{user_id}/credentials/OS-EC2/{credential_id}',
controller=ec2_controller,
action='get_credential',
conditions=dict(method=['GET']))
mapper.connect(
'/users/{user_id}/credentials/OS-EC2/{credential_id}',
controller=ec2_controller,
action='delete_credential',
conditions=dict(method=['DELETE']))
class Routers(wsgi.RoutersBase):
_path_prefixes = ('ec2tokens', 'users')

View File

@ -215,47 +215,6 @@ class RestfulTestCase(unit.TestCase):
def admin_request(self, **kwargs):
return self._request(app=self.admin_app, **kwargs)
def _get_token(self, body):
"""Convenience method so that we can test authenticated requests."""
r = self.public_request(method='POST', path='/v2.0/tokens', body=body)
return self._get_token_id(r)
def get_admin_token(self):
return self._get_token({
'auth': {
'passwordCredentials': {
'username': self.user_req_admin['name'],
'password': self.user_req_admin['password']
},
'tenantId': default_fixtures.SERVICE_TENANT_ID
}
})
def get_unscoped_token(self):
"""Convenience method so that we can test authenticated requests."""
return self._get_token({
'auth': {
'passwordCredentials': {
'username': self.user_foo['name'],
'password': self.user_foo['password'],
},
},
})
def get_scoped_token(self, tenant_id=None):
"""Convenience method so that we can test authenticated requests."""
if not tenant_id:
tenant_id = self.tenant_bar['id']
return self._get_token({
'auth': {
'passwordCredentials': {
'username': self.user_foo['name'],
'password': self.user_foo['password'],
},
'tenantId': tenant_id,
},
})
def _get_token_id(self, r):
"""Helper method to return a token ID from a response.

View File

@ -18,123 +18,12 @@ from six.moves import http_client
from keystone.common import provider_api
from keystone.contrib.ec2 import controllers
from keystone.tests import unit
from keystone.tests.unit import rest
from keystone.tests.unit import test_v3
PROVIDERS = provider_api.ProviderAPIs
class EC2ContribCoreV2(rest.RestfulTestCase):
def setUp(self):
super(EC2ContribCoreV2, self).setUp()
# TODO(morgan): remove test class, v2.0 has been deleted
self.skipTest('V2.0 has been deleted, test is nolonger valid')
def config_overrides(self):
super(EC2ContribCoreV2, self).config_overrides()
def assertValidAuthenticationResponse(self, r):
self.assertIsNotNone(r.result.get('access'))
self.assertIsNotNone(r.result['access'].get('token'))
self.assertIsNotNone(r.result['access'].get('user'))
# validate token
self.assertIsNotNone(r.result['access']['token'].get('id'))
self.assertIsNotNone(r.result['access']['token'].get('expires'))
tenant = r.result['access']['token'].get('tenant')
if tenant is not None:
# validate tenant
self.assertIsNotNone(tenant.get('id'))
self.assertIsNotNone(tenant.get('name'))
# validate user
self.assertIsNotNone(r.result['access']['user'].get('id'))
self.assertIsNotNone(r.result['access']['user'].get('name'))
def assertValidErrorResponse(self, r):
resp = r.result
self.assertIsNotNone(resp.get('error'))
self.assertIsNotNone(resp['error'].get('code'))
self.assertIsNotNone(resp['error'].get('title'))
self.assertIsNotNone(resp['error'].get('message'))
self.assertEqual(int(resp['error']['code']), r.status_code)
def test_valid_authentication_response_with_proper_secret(self):
cred_blob, credential = unit.new_ec2_credential(
self.user_foo['id'], self.tenant_bar['id'])
PROVIDERS.credential_api.create_credential(
credential['id'], credential)
signer = ec2_utils.Ec2Signer(cred_blob['secret'])
credentials = {
'access': cred_blob['access'],
'secret': cred_blob['secret'],
'host': 'localhost',
'verb': 'GET',
'path': '/',
'params': {
'SignatureVersion': '2',
'Action': 'Test',
'Timestamp': '2007-01-31T23:59:59Z'
},
}
credentials['signature'] = signer.generate(credentials)
resp = self.public_request(
method='POST',
path='/v2.0/ec2tokens',
body={'credentials': credentials},
expected_status=http_client.OK)
self.assertValidAuthenticationResponse(resp)
def test_authenticate_with_empty_body_returns_bad_request(self):
self.public_request(
method='POST',
path='/v2.0/ec2tokens',
body={},
expected_status=http_client.BAD_REQUEST)
def test_authenticate_without_json_request_returns_bad_request(self):
self.public_request(
method='POST',
path='/v2.0/ec2tokens',
body='not json',
expected_status=http_client.BAD_REQUEST)
def test_authenticate_without_request_body_returns_bad_request(self):
self.public_request(
method='POST',
path='/v2.0/ec2tokens',
expected_status=http_client.BAD_REQUEST)
def test_authenticate_without_proper_secret_returns_unauthorized(self):
cred_blob, credential = unit.new_ec2_credential(
self.user_foo['id'], self.tenant_bar['id'])
PROVIDERS.credential_api.create_credential(
credential['id'], credential)
signer = ec2_utils.Ec2Signer('totally not the secret')
credentials = {
'access': cred_blob['access'],
'secret': 'totally not the secret',
'host': 'localhost',
'verb': 'GET',
'path': '/',
'params': {
'SignatureVersion': '2',
'Action': 'Test',
'Timestamp': '2007-01-31T23:59:59Z'
},
}
credentials['signature'] = signer.generate(credentials)
self.public_request(
method='POST',
path='/v2.0/ec2tokens',
body={'credentials': credentials},
expected_status=http_client.UNAUTHORIZED)
class EC2ContribCoreV3(test_v3.RestfulTestCase):
def setUp(self):
super(EC2ContribCoreV3, self).setUp()

View File

@ -1,283 +0,0 @@
# Copyright 2015 UnitedStack, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from keystoneclient.contrib.ec2 import utils as ec2_utils
from six.moves import http_client
from keystone.common import context
from keystone.common import provider_api
from keystone.common import request
from keystone.common import utils
from keystone.contrib.ec2 import controllers
from keystone.credential.providers import fernet as credential_fernet
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import database
from keystone.tests.unit import test_v3 as rest
CRED_TYPE_EC2 = controllers.CRED_TYPE_EC2
PROVIDERS = provider_api.ProviderAPIs
class V2CredentialEc2TestCase(rest.RestfulTestCase):
def _get_token_id(self, r):
return r.headers.get('X-Subject-Token')
def _get_ec2_cred(self):
uri = self._get_ec2_cred_uri()
r = self.public_request(method='POST', token=self.get_scoped_token(),
path=uri, body={'tenant_id': self.project_id})
return r.result['credential']
def _get_ec2_cred_uri(self):
return '/v2.0/users/%s/credentials/OS-EC2' % self.user_id
def test_ec2_cannot_get_non_ec2_credential(self):
# TODO(morgan): remove this test class, V2 has been removed.
self.skipTest('V2.0 has been removed, remove the whole test class.')
access_key = uuid.uuid4().hex
cred_id = utils.hash_access_key(access_key)
non_ec2_cred = unit.new_credential_ref(
user_id=self.user_id,
project_id=self.project_id)
non_ec2_cred['id'] = cred_id
PROVIDERS.credential_api.create_credential(cred_id, non_ec2_cred)
# if access_key is not found, ec2 controller raises Unauthorized
# exception
path = '/'.join([self._get_ec2_cred_uri(), access_key])
self.public_request(method='GET', token=self.get_scoped_token(),
path=path,
expected_status=http_client.UNAUTHORIZED)
def assertValidErrorResponse(self, r):
# FIXME(wwwjfy): it's copied from test_v3.py. The logic of this method
# in test_v2.py and test_v3.py (both are inherited from rest.py) has no
# difference, so they should be refactored into one place. Also, the
# function signatures in both files don't match the one in the parent
# class in rest.py.
resp = r.result
self.assertIsNotNone(resp.get('error'))
self.assertIsNotNone(resp['error'].get('code'))
self.assertIsNotNone(resp['error'].get('title'))
self.assertIsNotNone(resp['error'].get('message'))
self.assertEqual(int(resp['error']['code']), r.status_code)
def test_ec2_list_credentials(self):
# TODO(morgan): remove this test class, V2 has been removed.
self.skipTest('V2.0 has been removed, remove the whole test class.')
self._get_ec2_cred()
uri = self._get_ec2_cred_uri()
r = self.public_request(method='GET', token=self.get_scoped_token(),
path=uri)
cred_list = r.result['credentials']
self.assertEqual(1, len(cred_list))
# non-EC2 credentials won't be fetched
non_ec2_cred = unit.new_credential_ref(
user_id=self.user_id,
project_id=self.project_id)
non_ec2_cred['type'] = uuid.uuid4().hex
PROVIDERS.credential_api.create_credential(
non_ec2_cred['id'], non_ec2_cred
)
r = self.public_request(method='GET', token=self.get_scoped_token(),
path=uri)
cred_list_2 = r.result['credentials']
# still one element because non-EC2 credentials are not returned.
self.assertEqual(1, len(cred_list_2))
self.assertEqual(cred_list[0], cred_list_2[0])
class V2CredentialEc2Controller(unit.TestCase):
def setUp(self):
super(V2CredentialEc2Controller, self).setUp()
# TODO(morgan): remove the whole test case/class, v2.0 is dead.
self.skipTest('V2.0 has been removed, test case is not valid.')
self.useFixture(database.Database())
self.useFixture(
ksfixtures.KeyRepository(
self.config_fixture,
'credential',
credential_fernet.MAX_ACTIVE_KEYS
)
)
self.load_backends()
self.load_fixtures(default_fixtures)
self.user_id = self.user_foo['id']
self.project_id = self.tenant_bar['id']
self.controller = controllers.Ec2Controller()
self.blob, tmp_ref = unit.new_ec2_credential(
user_id=self.user_id,
project_id=self.project_id)
self.creds_ref = (controllers.Ec2Controller
._convert_v3_to_ec2_credential(tmp_ref))
def test_signature_validate_no_host_port(self):
"""Test signature validation with the access/secret provided."""
access = self.blob['access']
secret = self.blob['secret']
signer = ec2_utils.Ec2Signer(secret)
params = {'SignatureMethod': 'HmacSHA256',
'SignatureVersion': '2',
'AWSAccessKeyId': access}
request = {'host': 'foo',
'verb': 'GET',
'path': '/bar',
'params': params}
signature = signer.generate(request)
sig_ref = {'access': access,
'signature': signature,
'host': 'foo',
'verb': 'GET',
'path': '/bar',
'params': params}
# Now validate the signature based on the dummy request
self.assertTrue(self.controller.check_signature(self.creds_ref,
sig_ref))
def test_signature_validate_with_host_port(self):
"""Test signature validation when host is bound with port.
Host is bound with a port, generally, the port here is not the
standard port for the protocol, like '80' for HTTP and port 443
for HTTPS, the port is not omitted by the client library.
"""
access = self.blob['access']
secret = self.blob['secret']
signer = ec2_utils.Ec2Signer(secret)
params = {'SignatureMethod': 'HmacSHA256',
'SignatureVersion': '2',
'AWSAccessKeyId': access}
request = {'host': 'foo:8181',
'verb': 'GET',
'path': '/bar',
'params': params}
signature = signer.generate(request)
sig_ref = {'access': access,
'signature': signature,
'host': 'foo:8181',
'verb': 'GET',
'path': '/bar',
'params': params}
# Now validate the signature based on the dummy request
self.assertTrue(self.controller.check_signature(self.creds_ref,
sig_ref))
def test_signature_validate_with_missed_host_port(self):
"""Test signature validation when host is bound with well-known port.
Host is bound with a port, but the port is well-know port like '80'
for HTTP and port 443 for HTTPS, sometimes, client library omit
the port but then make the request with the port.
see (How to create the string to sign): 'http://docs.aws.amazon.com/
general/latest/gr/signature-version-2.html'.
Since "credentials['host']" is not set by client library but is
taken from "req.host", so caused the differences.
"""
access = self.blob['access']
secret = self.blob['secret']
signer = ec2_utils.Ec2Signer(secret)
params = {'SignatureMethod': 'HmacSHA256',
'SignatureVersion': '2',
'AWSAccessKeyId': access}
# Omit the port to generate the signature.
cnt_req = {'host': 'foo',
'verb': 'GET',
'path': '/bar',
'params': params}
signature = signer.generate(cnt_req)
sig_ref = {'access': access,
'signature': signature,
'host': 'foo:8080',
'verb': 'GET',
'path': '/bar',
'params': params}
# Now validate the signature based on the dummy request
# Check the signature again after omitting the port.
self.assertTrue(self.controller.check_signature(self.creds_ref,
sig_ref))
def test_signature_validate_no_signature(self):
"""Signature is not presented in signature reference data."""
access = self.blob['access']
params = {'SignatureMethod': 'HmacSHA256',
'SignatureVersion': '2',
'AWSAccessKeyId': access}
sig_ref = {'access': access,
'signature': None,
'host': 'foo:8080',
'verb': 'GET',
'path': '/bar',
'params': params}
# Now validate the signature based on the dummy request
self.assertRaises(exception.Unauthorized,
self.controller.check_signature,
self.creds_ref, sig_ref)
def test_signature_validate_invalid_signature(self):
"""Signature is not signed on the correct data."""
access = self.blob['access']
secret = self.blob['secret']
signer = ec2_utils.Ec2Signer(secret)
params = {'SignatureMethod': 'HmacSHA256',
'SignatureVersion': '2',
'AWSAccessKeyId': access}
request = {'host': 'bar',
'verb': 'GET',
'path': '/bar',
'params': params}
signature = signer.generate(request)
sig_ref = {'access': access,
'signature': signature,
'host': 'foo:8080',
'verb': 'GET',
'path': '/bar',
'params': params}
# Now validate the signature based on the dummy request
self.assertRaises(exception.Unauthorized,
self.controller.check_signature,
self.creds_ref, sig_ref)
def test_check_non_admin_user(self):
"""Checking if user is admin causes uncaught error.
When checking if a user is an admin, keystone.exception.Unauthorized
is raised but not caught if the user is not an admin.
"""
# make a non-admin user
req = request.Request.blank('/')
req.context = context.RequestContext(is_admin=False)
req.context_dict['is_admin'] = False
req.context_dict['token_id'] = uuid.uuid4().hex
# check if user is admin
# no exceptions should be raised
self.controller._is_admin(req)

View File

@ -41,19 +41,19 @@ SAMPLE_V3_TOKEN = {
"id": "02850c5d1d094887bdc46e81e1e15dc7",
"interface": "admin",
"region": "RegionOne",
"url": "http://localhost:35357/v2.0"
"url": "http://localhost:35357/v3"
},
{
"id": "446e244b75034a9ab4b0811e82d0b7c8",
"interface": "internal",
"region": "RegionOne",
"url": "http://localhost:5000/v2.0"
"url": "http://localhost:5000/v3"
},
{
"id": "47fa3d9f499240abb5dfcf2668f168cd",
"interface": "public",
"region": "RegionOne",
"url": "http://localhost:5000/v2.0"
"url": "http://localhost:5000/v3"
}
],
"id": "26d7541715a44a4d9adad96f9872b633",
@ -231,19 +231,19 @@ SAMPLE_V3_TOKEN_WITH_EMBEDED_VERSION = {
"id": "02850c5d1d094887bdc46e81e1e15dc7",
"interface": "admin",
"region": "RegionOne",
"url": "http://localhost:35357/v2.0"
"url": "http://localhost:35357/v3"
},
{
"id": "446e244b75034a9ab4b0811e82d0b7c8",
"interface": "internal",
"region": "RegionOne",
"url": "http://localhost:5000/v2.0"
"url": "http://localhost:5000/v3"
},
{
"id": "47fa3d9f499240abb5dfcf2668f168cd",
"interface": "public",
"region": "RegionOne",
"url": "http://localhost:5000/v2.0"
"url": "http://localhost:5000/v3"
}
],
"id": "26d7541715a44a4d9adad96f9872b633",

View File

@ -29,39 +29,6 @@ from keystone.tests.unit import utils
from keystone.version import controllers
v2_MEDIA_TYPES = [
{
"base": "application/json",
"type": "application/"
"vnd.openstack.identity-v2.0+json"
}
]
v2_HTML_DESCRIPTION = {
"rel": "describedby",
"type": "text/html",
"href": "https://docs.openstack.org/"
}
v2_EXPECTED_RESPONSE = {
"id": "v2.0",
"status": "deprecated",
"updated": "2016-08-04T00:00:00Z",
"links": [
{
"rel": "self",
"href": "", # Will get filled in after initialization
},
v2_HTML_DESCRIPTION
],
"media-types": v2_MEDIA_TYPES
}
v2_VERSION_RESPONSE = {
"version": v2_EXPECTED_RESPONSE
}
v3_MEDIA_TYPES = [
{
"base": "application/json",
@ -745,11 +712,6 @@ class VersionTestCase(unit.TestCase):
if version['id'].startswith('v3'):
self._paste_in_port(
version, 'http://localhost:%s/v3/' % self.public_port)
elif version['id'] == 'v2.0':
# TODO(morgan): remove this if block in future patch,
# v2.0 has been removed.
self._paste_in_port(
version, 'http://localhost:%s/v2.0/' % self.public_port)
self.assertThat(data, _VersionsEqual(expected))
def test_admin_versions(self):
@ -762,11 +724,6 @@ class VersionTestCase(unit.TestCase):
if version['id'].startswith('v3'):
self._paste_in_port(
version, 'http://localhost:%s/v3/' % self.admin_port)
elif version['id'] == 'v2.0':
# TODO(morgan): remove this if block in future patch,
# v2.0 has been removed.
self._paste_in_port(
version, 'http://localhost:%s/v2.0/' % self.admin_port)
self.assertThat(data, _VersionsEqual(expected))
def test_use_site_url_if_endpoint_unset(self):
@ -783,50 +740,8 @@ class VersionTestCase(unit.TestCase):
if version['id'].startswith('v3'):
self._paste_in_port(
version, 'http://localhost/v3/')
elif version['id'] == 'v2.0':
# TODO(morgan): remove this if block in future patch,
# v2.0 has been removed.
self._paste_in_port(
version, 'http://localhost/v2.0/')
self.assertThat(data, _VersionsEqual(expected))
def test_public_version_v2(self):
# TODO(morgan): Remove this test in a future patch.
self.skipTest('Test is not Valid, v2.0 has been removed.')
client = TestClient(self.public_app)
resp = client.get('/v2.0/')
self.assertEqual(http_client.OK, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v2_VERSION_RESPONSE
self._paste_in_port(expected['version'],
'http://localhost:%s/v2.0/' % self.public_port)
self.assertEqual(expected, data)
def test_admin_version_v2(self):
# TODO(morgan): Remove this test in a future patch.
self.skipTest('Test is not Valid, v2.0 has been removed.')
client = TestClient(self.admin_app)
resp = client.get('/v2.0/')
self.assertEqual(http_client.OK, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v2_VERSION_RESPONSE
self._paste_in_port(expected['version'],
'http://localhost:%s/v2.0/' % self.admin_port)
self.assertEqual(expected, data)
def test_use_site_url_if_endpoint_unset_v2(self):
# TODO(morgan): Remove this test in a future patch.
self.skipTest('Test is not Valid, v2.0 has been removed.')
self.config_fixture.config(public_endpoint=None, admin_endpoint=None)
for app in (self.public_app, self.admin_app):
client = TestClient(app)
resp = client.get('/v2.0/')
self.assertEqual(http_client.OK, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v2_VERSION_RESPONSE
self._paste_in_port(expected['version'], 'http://localhost/v2.0/')
self.assertEqual(data, expected)
def test_public_version_v3(self):
client = TestClient(self.public_app)
resp = client.get('/v3/')
@ -893,38 +808,6 @@ class VersionTestCase(unit.TestCase):
data = jsonutils.loads(resp.body)
self.assertEqual(v3_only_response, data)
def test_v3_disabled(self):
# TODO(morgan): Remove this test in a future patch.
self.skipTest('Test is not Valid, v2.0 has been removed.')
client = TestClient(self.public_app)
# request to /v3 should fail
resp = client.get('/v3/')
self.assertEqual(http_client.NOT_FOUND, resp.status_int)
# request to /v2.0 should pass
resp = client.get('/v2.0/')
self.assertEqual(http_client.OK, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v2_VERSION_RESPONSE
self._paste_in_port(expected['version'],
'http://localhost:%s/v2.0/' % self.public_port)
self.assertEqual(expected, data)
# only v2 information should be displayed by requests to /
v2_only_response = {
"versions": {
"values": [
v2_EXPECTED_RESPONSE
]
}
}
self._paste_in_port(v2_only_response['versions']['values'][0],
'http://localhost:%s/v2.0/' % self.public_port)
resp = client.get('/')
self.assertEqual(300, resp.status_int)
data = jsonutils.loads(resp.body)
self.assertEqual(v2_only_response, data)
def _test_json_home(self, path, exp_json_home_data):
client = TestClient(self.public_app)
resp = client.get(path, headers={'Accept': 'application/json-home'})
@ -1052,11 +935,6 @@ class VersionSingleAppTestCase(unit.TestCase):
if version['id'].startswith('v3'):
self._paste_in_port(
version, 'http://localhost:%s/v3/' % app_port())
elif version['id'] == 'v2.0':
# TODO(morgan): remove this if block in future patch,
# v2.0 has been removed.
self._paste_in_port(
version, 'http://localhost:%s/v2.0/' % app_port())
self.assertThat(data, _VersionsEqual(expected))
def test_public(self):
@ -1087,8 +965,6 @@ class VersionBehindSslTestCase(unit.TestCase):
for version in expected['versions']['values']:
if version['id'].startswith('v3'):
self._paste_in_port(version, host + 'v3/')
elif version['id'] == 'v2.0':
self._paste_in_port(version, host + 'v2.0/')
return expected
def test_versions_without_headers(self):

View File

@ -282,14 +282,6 @@ class ApplicationTest(BaseWSGITest):
resp = req.get_response(FakeApp())
self.assertEqual(b"http://foo:1234/identity", resp.body)
# make sure version portion of the SCRIPT_NAME, '/v2.0', is stripped
# from base url
req = self._make_request(url='/')
req.environ.update({'HTTP_HOST': 'foo:80',
'SCRIPT_NAME': '/bar/identity/v2.0'})
resp = req.get_response(FakeApp())
self.assertEqual(b"http://foo/bar/identity", resp.body)
# make sure version portion of the SCRIPT_NAME, '/v3' is stripped from
# base url
req = self._make_request(url='/')

View File

@ -56,6 +56,8 @@ class TestFernetTokenProvider(unit.TestCase):
self.assertIn(token_id, u'%s' % e)
def test_invalid_v2_token_raises_token_not_found(self):
# NOTE(morgan): Kept for regression testing, should not be deleted
# even though it references V2.0
token_id = uuid.uuid4().hex
e = self.assertRaises(
exception.TokenNotFound,

View File

@ -545,12 +545,6 @@ class BaseProvider(provider_api.ProviderAPIMixin, base.Provider):
if 'token_version' in token_data:
if token_data['token_version'] in token_model.VERSIONS:
return token_data['token_version']
# FIXME(morganfainberg): deprecate the following logic in future
# revisions. It is better to just specify the token_version in
# the token_data itself. This way we can support future versions
# that might have the same fields.
if 'access' in token_data:
return token_model.V2
if 'token' in token_data and 'methods' in token_data['token']:
return token_model.V3
raise exception.UnsupportedTokenVersionException()

View File

@ -179,15 +179,6 @@ class Version(wsgi.Application):
}
})
def get_version_v2(self, request):
versions = self._get_versions_list(request.context_dict)
if 'v2.0' in _VERSIONS:
return wsgi.render_response(body={
'version': versions['v2.0']
})
else:
raise exception.VersionNotFound(version='v2.0')
def _get_json_home_v3(self):
def all_resources():