# Copyright 2012 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import hashlib import uuid from oslo_config import cfg import webob from keystone.common import authorization from keystone.common import tokenless_auth from keystone.contrib.federation import constants as federation_constants from keystone import exception from keystone import middleware from keystone.tests import unit as tests from keystone.tests.unit import mapping_fixtures from keystone.tests.unit import test_backend_sql CONF = cfg.CONF def make_request(**kwargs): accept = kwargs.pop('accept', None) method = kwargs.pop('method', 'GET') body = kwargs.pop('body', None) req = webob.Request.blank('/', **kwargs) req.method = method if body is not None: req.body = body if accept is not None: req.accept = accept return req def make_response(**kwargs): body = kwargs.pop('body', None) return webob.Response(body) class TokenAuthMiddlewareTest(tests.TestCase): def test_request(self): req = make_request() req.headers[middleware.AUTH_TOKEN_HEADER] = 'MAGIC' middleware.TokenAuthMiddleware(None).process_request(req) context = req.environ[middleware.CONTEXT_ENV] self.assertEqual('MAGIC', context['token_id']) class AdminTokenAuthMiddlewareTest(tests.TestCase): def test_request_admin(self): req = make_request() req.headers[middleware.AUTH_TOKEN_HEADER] = CONF.admin_token middleware.AdminTokenAuthMiddleware(None).process_request(req) context = req.environ[middleware.CONTEXT_ENV] self.assertTrue(context['is_admin']) def test_request_non_admin(self): req = make_request() req.headers[middleware.AUTH_TOKEN_HEADER] = 'NOT-ADMIN' middleware.AdminTokenAuthMiddleware(None).process_request(req) context = req.environ[middleware.CONTEXT_ENV] self.assertFalse(context['is_admin']) class PostParamsMiddlewareTest(tests.TestCase): def test_request_with_params(self): req = make_request(body="arg1=one", method='POST') middleware.PostParamsMiddleware(None).process_request(req) params = req.environ[middleware.PARAMS_ENV] self.assertEqual({"arg1": "one"}, params) class JsonBodyMiddlewareTest(tests.TestCase): def test_request_with_params(self): req = make_request(body='{"arg1": "one", "arg2": ["a"]}', content_type='application/json', method='POST') middleware.JsonBodyMiddleware(None).process_request(req) params = req.environ[middleware.PARAMS_ENV] self.assertEqual({"arg1": "one", "arg2": ["a"]}, params) def test_malformed_json(self): req = make_request(body='{"arg1": "on', content_type='application/json', method='POST') resp = middleware.JsonBodyMiddleware(None).process_request(req) self.assertEqual(400, resp.status_int) def test_not_dict_body(self): req = make_request(body='42', content_type='application/json', method='POST') resp = middleware.JsonBodyMiddleware(None).process_request(req) self.assertEqual(400, resp.status_int) self.assertTrue('valid JSON object' in resp.json['error']['message']) def test_no_content_type(self): req = make_request(body='{"arg1": "one", "arg2": ["a"]}', method='POST') middleware.JsonBodyMiddleware(None).process_request(req) params = req.environ[middleware.PARAMS_ENV] self.assertEqual({"arg1": "one", "arg2": ["a"]}, params) def test_unrecognized_content_type(self): req = make_request(body='{"arg1": "one", "arg2": ["a"]}', content_type='text/plain', method='POST') resp = middleware.JsonBodyMiddleware(None).process_request(req) self.assertEqual(400, resp.status_int) def test_unrecognized_content_type_without_body(self): req = make_request(content_type='text/plain', method='GET') middleware.JsonBodyMiddleware(None).process_request(req) params = req.environ.get(middleware.PARAMS_ENV, {}) self.assertEqual({}, params) class AuthContextMiddlewareTest(test_backend_sql.SqlTests): def setUp(self): super(AuthContextMiddlewareTest, self).setUp() self.client_issuer = uuid.uuid4().hex self.untrusted_client_issuer = uuid.uuid4().hex self.trusted_issuer = self.client_issuer self.config_fixture.config(group='tokenless_auth', trusted_issuer=[self.trusted_issuer]) # This idp_id is calculated based on # sha256(self.client_issuer) hashed_idp = hashlib.sha256(self.client_issuer) self.idp_id = hashed_idp.hexdigest() self._load_sample_data() def _load_sample_data(self): self.domain_id = uuid.uuid4().hex self.domain_name = uuid.uuid4().hex self.project_id = uuid.uuid4().hex self.project_name = uuid.uuid4().hex self.user_name = uuid.uuid4().hex self.user_password = uuid.uuid4().hex self.user_email = uuid.uuid4().hex self.protocol_id = 'x509' self.role_id = uuid.uuid4().hex self.role_name = uuid.uuid4().hex # for ephemeral user self.group_name = uuid.uuid4().hex # 1) Create a domain for the user. self.domain = { 'description': uuid.uuid4().hex, 'enabled': True, 'id': self.domain_id, 'name': self.domain_name, } self.resource_api.create_domain(self.domain_id, self.domain) # 2) Create a project for the user. self.project = { 'description': uuid.uuid4().hex, 'domain_id': self.domain_id, 'enabled': True, 'id': self.project_id, 'name': self.project_name, } self.resource_api.create_project(self.project_id, self.project) # 3) Create a user in new domain. self.user = { 'name': self.user_name, 'domain_id': self.domain_id, 'project_id': self.project_id, 'password': self.user_password, 'email': self.user_email, } self.user = self.identity_api.create_user(self.user) # Add IDP self.idp = self._idp_ref(id=self.idp_id) self.federation_api.create_idp(self.idp['id'], self.idp) # Add a role self.role = { 'id': self.role_id, 'name': self.role_name, } self.role_api.create_role(self.role_id, self.role) # Add a group self.group = { 'name': self.group_name, 'domain_id': self.domain_id, } self.group = self.identity_api.create_group(self.group) # Assign a role to the user on a project self.assignment_api.add_role_to_user_and_project( user_id=self.user['id'], tenant_id=self.project_id, role_id=self.role_id) # Assign a role to the group on a project self.assignment_api.create_grant( role_id=self.role_id, group_id=self.group['id'], project_id=self.project_id) def _load_mapping_rules(self, rules): # Add a mapping self.mapping = self._mapping_ref(rules=rules) self.federation_api.create_mapping(self.mapping['id'], self.mapping) # Add protocols self.proto_x509 = self._proto_ref(mapping_id=self.mapping['id']) self.proto_x509['id'] = self.protocol_id self.federation_api.create_protocol(self.idp['id'], self.proto_x509['id'], self.proto_x509) def _idp_ref(self, id=None): idp = { 'id': id or uuid.uuid4().hex, 'enabled': True, 'description': uuid.uuid4().hex } return idp def _proto_ref(self, mapping_id=None): proto = { 'id': uuid.uuid4().hex, 'mapping_id': mapping_id or uuid.uuid4().hex } return proto def _mapping_ref(self, rules=None): if rules is None: mapped_rules = {} else: mapped_rules = rules.get('rules', {}) return { 'id': uuid.uuid4().hex, 'rules': mapped_rules } def _assert_tokenless_auth_context(self, context, ephemeral_user=False): self.assertIsNotNone(context) self.assertEqual(self.project_id, context['project_id']) self.assertIn(self.role_name, context['roles']) if ephemeral_user: self.assertEqual(self.group['id'], context['group_ids'][0]) self.assertEqual('ephemeral', context[federation_constants.PROTOCOL]) self.assertEqual(self.idp_id, context[federation_constants.IDENTITY_PROVIDER]) else: self.assertEqual(self.user['id'], context['user_id']) def _create_context(self, request, mapping_ref=None, exception_expected=False): """Builds the auth context from the given arguments. auth context will be returned from the AuthContextMiddleware based on what is being passed in the given request and what mapping is being setup in the backend DB. :param request: HTTP request :param mapping_ref: A mapping in JSON structure will be setup in the backend DB for mapping an user or a group. :param exception_expected: Sets to True when an exception is expected to raised based on the given arguments. :returns: context an auth context contains user and role information :rtype: dict """ if mapping_ref: self._load_mapping_rules(mapping_ref) if not exception_expected: (middleware.AuthContextMiddleware('Tokenless_auth_test'). process_request(request)) context = request.environ.get(authorization.AUTH_CONTEXT_ENV) else: context = middleware.AuthContextMiddleware('Tokenless_auth_test') return context def test_context_already_exists(self): req = make_request() token_id = uuid.uuid4().hex req.environ[authorization.AUTH_CONTEXT_ENV] = {'token_id': token_id} context = self._create_context(request=req) self.assertEqual(token_id, context['token_id']) def test_not_applicable_to_token_request(self): env = {} env['PATH_INFO'] = '/auth/tokens' env['REQUEST_METHOD'] = 'POST' req = make_request(environ=env) context = self._create_context(request=req) self.assertIsNone(context) def test_no_tokenless_attributes_request(self): req = make_request() context = self._create_context(request=req) self.assertIsNone(context) def test_no_issuer_attribute_request(self): env = {} env['HTTP_X_PROJECT_ID'] = uuid.uuid4().hex req = make_request(environ=env) context = self._create_context(request=req) self.assertIsNone(context) def test_has_only_issuer_and_project_name_request(self): env = {} # SSL_CLIENT_I_DN is the attribute name that wsgi env # references to issuer of the client certificate. env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = uuid.uuid4().hex req = make_request(environ=env) context = self._create_context(request=req, exception_expected=True) self.assertRaises(exception.ValidationError, context.process_request, req) def test_has_only_issuer_and_project_domain_name_request(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_DOMAIN_NAME'] = uuid.uuid4().hex req = make_request(environ=env) context = self._create_context(request=req, exception_expected=True) self.assertRaises(exception.ValidationError, context.process_request, req) def test_has_only_issuer_and_project_domain_id_request(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_DOMAIN_ID'] = uuid.uuid4().hex req = make_request(environ=env) context = self._create_context(request=req, exception_expected=True) self.assertRaises(exception.ValidationError, context.process_request, req) def test_missing_both_domain_and_project_request(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer req = make_request(environ=env) context = self._create_context(request=req, exception_expected=True) self.assertRaises(exception.ValidationError, context.process_request, req) def test_empty_trusted_issuer_list(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_ID'] = uuid.uuid4().hex req = make_request(environ=env) self.config_fixture.config(group='tokenless_auth', trusted_issuer=[]) context = self._create_context(request=req) self.assertIsNone(context) def test_client_issuer_not_trusted(self): env = {} env['SSL_CLIENT_I_DN'] = self.untrusted_client_issuer env['HTTP_X_PROJECT_ID'] = uuid.uuid4().hex req = make_request(environ=env) context = self._create_context(request=req) self.assertIsNone(context) def test_proj_scope_with_proj_id_and_proj_dom_id_success(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_ID'] = self.project_id env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id # SSL_CLIENT_USER_NAME and SSL_CLIENT_DOMAIN_NAME are the types # defined in the mapping that will map to the user name and # domain name env['SSL_CLIENT_USER_NAME'] = self.user_name env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name req = make_request(environ=env) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) self._assert_tokenless_auth_context(context) def test_proj_scope_with_proj_id_only_success(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_ID'] = self.project_id env['SSL_CLIENT_USER_NAME'] = self.user_name env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name req = make_request(environ=env) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) self._assert_tokenless_auth_context(context) def test_proj_scope_with_proj_name_and_proj_dom_id_success(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id env['SSL_CLIENT_USER_NAME'] = self.user_name env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name req = make_request(environ=env) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) self._assert_tokenless_auth_context(context) def test_proj_scope_with_proj_name_and_proj_dom_name_success(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_NAME'] = self.user_name env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name req = make_request(environ=env) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) self._assert_tokenless_auth_context(context) def test_proj_scope_with_proj_name_only_fail(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_id env['SSL_CLIENT_USER_NAME'] = self.user_name env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name req = make_request(environ=env) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME, exception_expected=True) self.assertRaises(exception.ValidationError, context.process_request, req) def test_mapping_with_userid_and_domainid_success(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_ID'] = self.user['id'] env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id req = make_request(environ=env) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_USERID_AND_DOMAINID) self._assert_tokenless_auth_context(context) def test_mapping_with_userid_and_domainname_success(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_ID'] = self.user['id'] env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name req = make_request(environ=env) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_USERID_AND_DOMAINNAME) self._assert_tokenless_auth_context(context) def test_mapping_with_username_and_domainid_success(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_NAME'] = self.user_name env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id req = make_request(environ=env) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID) self._assert_tokenless_auth_context(context) def test_only_domain_name_fail(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_ID'] = self.project_id env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name req = make_request(environ=env) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_DOMAINNAME_ONLY, exception_expected=True) self.assertRaises(exception.ValidationError, context.process_request, req) def test_only_domain_id_fail(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_ID'] = self.project_id env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id req = make_request(environ=env) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_DOMAINID_ONLY, exception_expected=True) self.assertRaises(exception.ValidationError, context.process_request, req) def test_missing_domain_data_fail(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_ID'] = self.project_id env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id env['SSL_CLIENT_USER_NAME'] = self.user_name req = make_request(environ=env) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_ONLY, exception_expected=True) self.assertRaises(exception.ValidationError, context.process_request, req) def test_userid_success(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_ID'] = self.project_id env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id env['SSL_CLIENT_USER_ID'] = self.user['id'] req = make_request(environ=env) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_USERID_ONLY) self._assert_tokenless_auth_context(context) def test_domain_disable_fail(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_NAME'] = self.user_name env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id req = make_request(environ=env) self.domain['enabled'] = False self.domain = self.resource_api.update_domain( self.domain['id'], self.domain) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID, exception_expected=True) self.assertRaises(exception.Unauthorized, context.process_request, req) def test_user_disable_fail(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_NAME'] = self.user_name env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id req = make_request(environ=env) self.user['enabled'] = False self.user = self.identity_api.update_user(self.user['id'], self.user) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID, exception_expected=True) self.assertRaises(AssertionError, context.process_request, req) def test_invalid_user_fail(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_ID'] = self.project_id env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id env['SSL_CLIENT_USER_NAME'] = uuid.uuid4().hex env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name req = make_request(environ=env) context = self._create_context( request=req, mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME, exception_expected=True) self.assertRaises(exception.UserNotFound, context.process_request, req) def test_ephemeral_success(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_NAME'] = self.user_name req = make_request(environ=env) self.config_fixture.config(group='tokenless_auth', protocol='ephemeral') self.protocol_id = 'ephemeral' mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() mapping['rules'][0]['local'][0]['group']['id'] = self.group['id'] context = self._create_context( request=req, mapping_ref=mapping) self._assert_tokenless_auth_context(context, ephemeral_user=True) def test_ephemeral_with_default_user_type_success(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_NAME'] = self.user_name req = make_request(environ=env) self.config_fixture.config(group='tokenless_auth', protocol='ephemeral') self.protocol_id = 'ephemeral' # this mapping does not have the user type defined # and it should defaults to 'ephemeral' which is # the expected type for the test case. mapping = mapping_fixtures.MAPPING_FOR_DEFAULT_EPHEMERAL_USER.copy() mapping['rules'][0]['local'][0]['group']['id'] = self.group['id'] context = self._create_context( request=req, mapping_ref=mapping) self._assert_tokenless_auth_context(context, ephemeral_user=True) def test_ephemeral_any_user_success(self): """Ephemeral user does not need a specified user Keystone is not looking to match the user, but a corresponding group. """ env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_NAME'] = uuid.uuid4().hex req = make_request(environ=env) self.config_fixture.config(group='tokenless_auth', protocol='ephemeral') self.protocol_id = 'ephemeral' mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() mapping['rules'][0]['local'][0]['group']['id'] = self.group['id'] context = self._create_context( request=req, mapping_ref=mapping) self._assert_tokenless_auth_context(context, ephemeral_user=True) def test_ephemeral_invalid_scope_fail(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = uuid.uuid4().hex env['HTTP_X_PROJECT_DOMAIN_NAME'] = uuid.uuid4().hex env['SSL_CLIENT_USER_NAME'] = self.user_name req = make_request(environ=env) self.config_fixture.config(group='tokenless_auth', protocol='ephemeral') self.protocol_id = 'ephemeral' mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() mapping['rules'][0]['local'][0]['group']['id'] = self.group['id'] context = self._create_context( request=req, mapping_ref=mapping, exception_expected=True) self.assertRaises(exception.Unauthorized, context.process_request, req) def test_ephemeral_no_group_found_fail(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_NAME'] = self.user_name req = make_request(environ=env) self.config_fixture.config(group='tokenless_auth', protocol='ephemeral') self.protocol_id = 'ephemeral' mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() mapping['rules'][0]['local'][0]['group']['id'] = uuid.uuid4().hex context = self._create_context( request=req, mapping_ref=mapping, exception_expected=True) self.assertRaises(exception.MappedGroupNotFound, context.process_request, req) def test_ephemeral_incorrect_mapping_fail(self): """Ephemeral user picks up the non-ephemeral user mapping. Looking up the mapping with protocol Id 'x509' will load up the non-ephemeral user mapping, results unauthenticated. """ env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_NAME'] = self.user_name req = make_request(environ=env) # This will pick up the incorrect mapping self.config_fixture.config(group='tokenless_auth', protocol='x509') self.protocol_id = 'x509' mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() mapping['rules'][0]['local'][0]['group']['id'] = uuid.uuid4().hex context = self._create_context( request=req, mapping_ref=mapping, exception_expected=True) self.assertRaises(exception.MappedGroupNotFound, context.process_request, req) def test_create_idp_id_success(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer auth = tokenless_auth.TokenlessAuthHelper(env) idp_id = auth._build_idp_id() self.assertEqual(self.idp_id, idp_id) def test_create_idp_id_attri_not_found_fail(self): env = {} env[uuid.uuid4().hex] = self.client_issuer auth = tokenless_auth.TokenlessAuthHelper(env) expected_msg = ('Could not determine Identity Provider ID. The ' 'configuration option %s was not found in the ' 'request environment.' % CONF.tokenless_auth.issuer_attribute) # Check the content of the exception message as well self.assertRaisesRegexp(exception.TokenlessAuthConfigError, expected_msg, auth._build_idp_id)