222 lines
8.4 KiB
Python
222 lines
8.4 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import uuid
|
|
|
|
from keystoneauth1 import fixture
|
|
from keystoneauth1 import loading
|
|
|
|
from keystonemiddleware.auth_token import _base
|
|
from keystonemiddleware.tests.unit.auth_token import base
|
|
|
|
# NOTE(jamielennox): just some sample values that we can use for testing
|
|
BASE_URI = 'https://keystone.example.com:1234'
|
|
AUTH_URL = 'https://keystone.auth.com:1234'
|
|
|
|
|
|
class BaseUserPluginTests(object):
|
|
|
|
def configure_middleware(self,
|
|
auth_type,
|
|
**kwargs):
|
|
opts = loading.get_auth_plugin_conf_options(auth_type)
|
|
self.cfg.register_opts(opts, group=_base.AUTHTOKEN_GROUP)
|
|
|
|
# Since these tests cfg.config() themselves rather than waiting for
|
|
# auth_token to do it on __init__ we need to register the base auth
|
|
# options (e.g., auth_plugin)
|
|
loading.register_auth_conf_options(self.cfg.conf,
|
|
group=_base.AUTHTOKEN_GROUP)
|
|
|
|
self.cfg.config(group=_base.AUTHTOKEN_GROUP,
|
|
auth_type=auth_type,
|
|
**kwargs)
|
|
|
|
def assertTokenDataEqual(self, token_id, token, token_data):
|
|
self.assertEqual(token_id, token_data.auth_token)
|
|
self.assertEqual(token.user_id, token_data.user_id)
|
|
try:
|
|
trust_id = token.trust_id
|
|
except KeyError:
|
|
trust_id = None
|
|
self.assertEqual(trust_id, token_data.trust_id)
|
|
self.assertEqual(self.get_role_names(token), token_data.role_names)
|
|
|
|
def get_plugin(self, token_id, service_token_id=None):
|
|
headers = {'X-Auth-Token': token_id}
|
|
|
|
if service_token_id:
|
|
headers['X-Service-Token'] = service_token_id
|
|
|
|
m = self.create_simple_middleware()
|
|
|
|
resp = self.call(m, headers=headers)
|
|
return resp.request.environ['keystone.token_auth']
|
|
|
|
def test_user_information(self):
|
|
token_id, token = self.get_token()
|
|
plugin = self.get_plugin(token_id)
|
|
|
|
self.assertTokenDataEqual(token_id, token, plugin.user)
|
|
self.assertFalse(plugin.has_service_token)
|
|
self.assertIsNone(plugin.service)
|
|
|
|
def test_with_service_information(self):
|
|
token_id, token = self.get_token()
|
|
service_id, service = self.get_token()
|
|
|
|
plugin = self.get_plugin(token_id, service_id)
|
|
|
|
self.assertTokenDataEqual(token_id, token, plugin.user)
|
|
self.assertTokenDataEqual(service_id, service, plugin.service)
|
|
|
|
|
|
class V2UserPluginTests(BaseUserPluginTests, base.BaseAuthTokenTestCase):
|
|
|
|
def setUp(self):
|
|
super(V2UserPluginTests, self).setUp()
|
|
|
|
self.service_token = fixture.V2Token()
|
|
self.service_token.set_scope()
|
|
s = self.service_token.add_service('identity', name='keystone')
|
|
|
|
s.add_endpoint(public=BASE_URI,
|
|
admin=BASE_URI,
|
|
internal=BASE_URI)
|
|
|
|
self.configure_middleware(auth_type='v2password',
|
|
auth_url='%s/v2.0/' % AUTH_URL,
|
|
user_id=self.service_token.user_id,
|
|
password=uuid.uuid4().hex,
|
|
tenant_id=self.service_token.tenant_id)
|
|
|
|
auth_discovery = fixture.DiscoveryList(href=AUTH_URL, v3=False)
|
|
self.requests_mock.get(AUTH_URL, json=auth_discovery)
|
|
|
|
base_discovery = fixture.DiscoveryList(href=BASE_URI, v3=False)
|
|
self.requests_mock.get(BASE_URI, json=base_discovery)
|
|
|
|
url = '%s/v2.0/tokens' % AUTH_URL
|
|
self.requests_mock.post(url, json=self.service_token)
|
|
|
|
def get_role_names(self, token):
|
|
return set(x['name'] for x in token['access']['user'].get('roles', []))
|
|
|
|
def get_token(self):
|
|
token = fixture.V2Token()
|
|
token.set_scope()
|
|
token.add_role()
|
|
|
|
request_headers = {'X-Auth-Token': self.service_token.token_id}
|
|
|
|
url = '%s/v2.0/tokens/%s' % (BASE_URI, token.token_id)
|
|
self.requests_mock.get(url,
|
|
request_headers=request_headers,
|
|
json=token)
|
|
|
|
return token.token_id, token
|
|
|
|
def assertTokenDataEqual(self, token_id, token, token_data):
|
|
super(V2UserPluginTests, self).assertTokenDataEqual(token_id,
|
|
token,
|
|
token_data)
|
|
|
|
self.assertEqual(token.tenant_id, token_data.project_id)
|
|
self.assertIsNone(token_data.user_domain_id)
|
|
self.assertIsNone(token_data.project_domain_id)
|
|
|
|
def test_trust_scope(self):
|
|
token_id, token = self.get_token()
|
|
token.set_trust()
|
|
|
|
plugin = self.get_plugin(token_id)
|
|
self.assertEqual(token.trust_id, plugin.user.trust_id)
|
|
self.assertEqual(token.trustee_user_id, plugin.user.trustee_user_id)
|
|
self.assertIsNone(plugin.user.trustor_user_id)
|
|
|
|
|
|
class V3UserPluginTests(BaseUserPluginTests, base.BaseAuthTokenTestCase):
|
|
|
|
def setUp(self):
|
|
super(V3UserPluginTests, self).setUp()
|
|
|
|
self.service_token_id = uuid.uuid4().hex
|
|
self.service_token = fixture.V3Token()
|
|
s = self.service_token.add_service('identity', name='keystone')
|
|
s.add_standard_endpoints(public=BASE_URI,
|
|
admin=BASE_URI,
|
|
internal=BASE_URI)
|
|
|
|
self.configure_middleware(auth_type='v3password',
|
|
auth_url='%s/v3/' % AUTH_URL,
|
|
user_id=self.service_token.user_id,
|
|
password=uuid.uuid4().hex,
|
|
project_id=self.service_token.project_id)
|
|
|
|
auth_discovery = fixture.DiscoveryList(href=AUTH_URL)
|
|
self.requests_mock.get(AUTH_URL, json=auth_discovery)
|
|
|
|
base_discovery = fixture.DiscoveryList(href=BASE_URI)
|
|
self.requests_mock.get(BASE_URI, json=base_discovery)
|
|
|
|
self.requests_mock.post(
|
|
'%s/v3/auth/tokens' % AUTH_URL,
|
|
headers={'X-Subject-Token': self.service_token_id},
|
|
json=self.service_token)
|
|
|
|
def get_role_names(self, token):
|
|
return set(x['name'] for x in token['token'].get('roles', []))
|
|
|
|
def get_token(self, project=True):
|
|
token_id = uuid.uuid4().hex
|
|
token = fixture.V3Token()
|
|
if project:
|
|
token.set_project_scope()
|
|
token.add_role()
|
|
|
|
request_headers = {'X-Auth-Token': self.service_token_id,
|
|
'X-Subject-Token': token_id}
|
|
headers = {'X-Subject-Token': token_id}
|
|
|
|
self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI,
|
|
request_headers=request_headers,
|
|
headers=headers,
|
|
json=token)
|
|
|
|
return token_id, token
|
|
|
|
def assertTokenDataEqual(self, token_id, token, token_data):
|
|
super(V3UserPluginTests, self).assertTokenDataEqual(token_id,
|
|
token,
|
|
token_data)
|
|
|
|
self.assertEqual(token.user_domain_id, token_data.user_domain_id)
|
|
self.assertEqual(token.project_id, token_data.project_id)
|
|
self.assertEqual(token.project_domain_id, token_data.project_domain_id)
|
|
|
|
def test_domain_scope(self):
|
|
token_id, token = self.get_token(project=False)
|
|
token.set_domain_scope()
|
|
|
|
plugin = self.get_plugin(token_id)
|
|
self.assertEqual(token.domain_id, plugin.user.domain_id)
|
|
self.assertIsNone(plugin.user.project_id)
|
|
|
|
def test_trust_scope(self):
|
|
token_id, token = self.get_token(project=False)
|
|
token.set_trust_scope()
|
|
|
|
plugin = self.get_plugin(token_id)
|
|
self.assertEqual(token.trust_id, plugin.user.trust_id)
|
|
self.assertEqual(token.trustor_user_id, plugin.user.trustor_user_id)
|
|
self.assertEqual(token.trustee_user_id, plugin.user.trustee_user_id)
|