Federated Kerberos plugin

A plugin to support kerberos configured via keystone federation.

Closes-Bug: #1451299
Change-Id: I766c024089dad94f3f54e2578d578f65a246bdae
This commit is contained in:
Jamie Lennox 2015-04-15 07:57:28 +10:00
parent a7c6a7c04c
commit 3074933d4c
6 changed files with 99 additions and 2 deletions

View File

@ -20,5 +20,7 @@ __version__ = pbr.version.VersionInfo(
'python-keystoneclient-kerberos').version_string()
V3Kerberos = v3.Kerberos
V3FederatedKerberos = v3.FederatedKerberos
__all__ = ['V3Kerberos']
__all__ = ['V3FederatedKerberos',
'V3Kerberos']

View File

@ -30,6 +30,7 @@ class TestCase(base.BaseTestCase):
"""Test case base class for all unit tests."""
TEST_ROOT_URL = utils.TEST_ROOT_URL
TEST_V3_URL = TEST_ROOT_URL + 'v3'
def setUp(self):
super(TestCase, self).setUp()

View File

@ -0,0 +1,72 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from keystoneclient import fixture as ks_fixture
from keystoneclient import session
from keystoneclient_kerberos.tests import base
from keystoneclient_kerberos import v3
class TestFederatedAuth(base.TestCase):
def setUp(self):
super(TestFederatedAuth, self).setUp()
self.protocol = uuid.uuid4().hex
self.identity_provider = uuid.uuid4().hex
@property
def token_url(self):
return "%s/OS-FEDERATION/identity_providers/%s/protocols/%s/auth" % (
self.TEST_V3_URL,
self.identity_provider,
self.protocol)
def test_unscoped_federated_auth(self):
token_id, _ = self.kerberos_mock.mock_auth_success(url=self.token_url,
method='GET')
plugin = v3.FederatedKerberos(auth_url=self.TEST_V3_URL,
protocol=self.protocol,
identity_provider=self.identity_provider)
sess = session.Session()
tok = plugin.get_token(sess)
self.assertEqual(token_id, tok)
def test_project_scoped_federated_auth(self):
self.kerberos_mock.mock_auth_success(url=self.token_url, method='GET')
scoped_id = uuid.uuid4().hex
scoped_body = ks_fixture.V3Token()
scoped_body.set_project_scope()
self.requests_mock.post('%s/auth/tokens' % self.TEST_V3_URL,
json=scoped_body,
headers={'X-Subject-Token': scoped_id,
'Content-Type': 'application/json'})
plugin = v3.FederatedKerberos(auth_url=self.TEST_V3_URL,
protocol=self.protocol,
identity_provider=self.identity_provider,
project_id=scoped_body.project_id)
sess = session.Session()
tok = plugin.get_token(sess)
proj = plugin.get_project_id(sess)
self.assertEqual(scoped_id, tok)
self.assertEqual(scoped_body.project_id, proj)

View File

@ -55,6 +55,7 @@ class KerberosMock(fixtures.Fixture):
def mock_auth_success(self,
token_id=None,
token_body=None,
method='POST',
url=TEST_ROOT_URL + 'v3/auth/tokens'):
if not token_id:
token_id = uuid.uuid4().hex
@ -70,6 +71,8 @@ class KerberosMock(fixtures.Fixture):
'status_code': 200,
'json': token_body}]
self.requests_mock.post(url, response_list=response_list)
self.requests_mock.register_uri(method,
url,
response_list=response_list)
return token_id, token_body

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from keystoneclient import access
from keystoneclient.auth.identity import v3
import requests_kerberos
@ -27,3 +28,20 @@ class KerberosMethod(v3.AuthMethod):
class Kerberos(v3.AuthConstructor):
_auth_method_class = KerberosMethod
class FederatedKerberos(v3.FederatedBaseAuth):
"""Authenticate using Kerberos via the keystone federation mechanisms.
This is not technically federation. However, federation is the term which
keystone uses for all mapped authentication. This uses the OS-FEDERATION
extension to gain an unscoped token and then use the standard keystone auth
process to scope that to any given project.
"""
def get_unscoped_auth_ref(self, session, **kwargs):
resp = session.get(self.federated_token_url,
requests_auth=requests_kerberos.HTTPKerberosAuth(),
authenticated=False)
return access.AccessInfo.factory(body=resp.json(), resp=resp)

View File

@ -24,6 +24,7 @@ packages =
[entry_points]
keystoneclient.auth.plugin =
v3kerberos = keystoneclient_kerberos.v3:Kerberos
v3fedkerb = keystoneclient_kerberos.v3:FederatedKerberos
[wheel]
universal = 1