Disable optional authentication for plugin

Rework the tests to extract a mock fixture that is capable of mocking
server authenticated requests. Set the plugin to mutual authentication
by default.

Closes-Bug: #1482468
Change-Id: I06daea59d25cd63c552f2db9f663bbc33e659ed5
This commit is contained in:
Jamie Lennox 2015-06-04 19:32:54 +10:00
parent 4b3ce88f95
commit a7c6a7c04c
4 changed files with 112 additions and 44 deletions

View File

@ -16,8 +16,36 @@
# under the License.
from oslotest import base
from requests_mock.contrib import fixture as requests_fixture
from keystoneclient_kerberos.tests import utils
REQUEST = {'auth': {'identity': {'methods': ['kerberos'],
'kerberos': {}}}}
class TestCase(base.BaseTestCase):
"""Test case base class for all unit tests."""
TEST_ROOT_URL = utils.TEST_ROOT_URL
def setUp(self):
super(TestCase, self).setUp()
self.requests_mock = self.useFixture(requests_fixture.Fixture())
km = utils.KerberosMock(self.requests_mock)
self.kerberos_mock = self.useFixture(km)
def assertRequestBody(self, body=None):
"""Ensure the request body is the standard kerberos auth request.
:param dict body: the body to compare. If not provided the last request
body will be used.
"""
if not body:
body = self.requests_mock.last_request.json()
self.assertEqual(REQUEST, body)

View File

@ -10,14 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import uuid
from keystoneclient import fixture as ks_fixture
from keystoneclient import session
import mock
import requests_kerberos
from requests_mock.contrib import fixture as requests_fixture
from keystoneclient_kerberos.tests import base
from keystoneclient_kerberos import v3
@ -25,43 +18,16 @@ from keystoneclient_kerberos import v3
class TestKerberosAuth(base.TestCase):
TEST_ROOT_URL = 'http://keystoneserver.test.com:5000/'
def setUp(self):
super(TestKerberosAuth, self).setUp()
self.token_id = uuid.uuid4().hex
self.token_body = ks_fixture.V3Token()
self.requests = self.useFixture(requests_fixture.Fixture())
@mock.patch.object(requests_kerberos.HTTPKerberosAuth,
'generate_request_header')
def test_authenticate_with_kerberos_domain_scoped(self, request_header):
header = 'Negotiate %s' % uuid.uuid4().hex
request_header.return_value = header
fail_resp = {'text': 'Fail',
'status_code': 401,
'headers': {'WWW-Authenticate': 'Negotiate'}}
pass_resp = {'json': self.token_body,
'status_code': 200,
'headers': {'X-Subject-Token': self.token_id,
'Content-Type': 'application/json'}}
self.requests.register_uri('POST',
self.TEST_ROOT_URL + 'v3/auth/tokens',
response_list=[fail_resp, pass_resp])
def test_authenticate_with_kerberos_domain_scoped(self):
token_id, token_body = self.kerberos_mock.mock_auth_success()
a = v3.Kerberos(self.TEST_ROOT_URL + 'v3')
s = session.Session(a)
token = a.get_token(s)
req = {'auth': {'identity': {'methods': ['kerberos'],
'kerberos': {}}}}
self.assertEqual(req, json.loads(self.requests.last_request.body))
self.assertEqual(header,
self.requests.last_request.headers['Authorization'])
self.assertEqual(self.token_id, a.auth_ref.auth_token)
self.assertEqual(self.token_id, token)
self.assertRequestBody()
self.assertEqual(
self.kerberos_mock.challenge_header,
self.requests_mock.last_request.headers['Authorization'])
self.assertEqual(token_id, a.auth_ref.auth_token)
self.assertEqual(token_id, token)

View File

@ -0,0 +1,75 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import fixtures
from keystoneclient import fixture as ks_fixture
from oslotest import mockpatch
import requests_kerberos
# any old base url for test mocking
TEST_ROOT_URL = 'http://keystoneserver.test.com:5000/'
class KerberosMock(fixtures.Fixture):
def __init__(self, requests_mock):
super(KerberosMock, self).__init__()
self.challenge_header = 'Negotiate %s' % uuid.uuid4().hex
self.pass_header = 'Negotiate %s' % uuid.uuid4().hex
self.requests_mock = requests_mock
def setUp(self):
super(KerberosMock, self).setUp()
m = mockpatch.PatchObject(requests_kerberos.HTTPKerberosAuth,
'generate_request_header',
self._generate_request_header)
self.header_fixture = self.useFixture(m)
m = mockpatch.PatchObject(requests_kerberos.HTTPKerberosAuth,
'authenticate_server',
self._authenticate_server)
self.authenticate_fixture = self.useFixture(m)
def _generate_request_header(self, *args, **kwargs):
return self.challenge_header
def _authenticate_server(self, response):
return response.headers.get('www-authenticate') == self.pass_header
def mock_auth_success(self,
token_id=None,
token_body=None,
url=TEST_ROOT_URL + 'v3/auth/tokens'):
if not token_id:
token_id = uuid.uuid4().hex
if not token_body:
token_body = ks_fixture.V3Token()
response_list = [{'text': 'Fail',
'status_code': 401,
'headers': {'WWW-Authenticate': 'Negotiate'}},
{'headers': {'X-Subject-Token': token_id,
'Content-Type': 'application/json',
'WWW-Authenticate': self.pass_header},
'status_code': 200,
'json': token_body}]
self.requests_mock.post(url, response_list=response_list)
return token_id, token_body

View File

@ -21,8 +21,7 @@ class KerberosMethod(v3.AuthMethod):
def get_auth_data(self, session, auth, headers, request_kwargs, **kwargs):
# NOTE(jamielennox): request_kwargs is passed as a kwarg however it is
# required and always present when called from keystoneclient.
request_kwargs['requests_auth'] = requests_kerberos.HTTPKerberosAuth(
mutual_authentication=requests_kerberos.OPTIONAL)
request_kwargs['requests_auth'] = requests_kerberos.HTTPKerberosAuth()
return 'kerberos', {}