Allow Barbican Key Manager to accept different auth credentials
This patch alters the barbican key manager to be able to use Token, Password, and OSLO Credentials. It is the third of several patches which will implement the "Allow different Keystone Auth Support in Castellan" blueprint. Other patches will add: 1.) documentation on usage Needs Functional Tests Change-Id: Ib3bb9d4e167f0b85bcf7a9053743239c9e6e6dae Implements: blueprint remove-keystone-dependency
This commit is contained in:
parent
6090ae8032
commit
01f6801c0d
|
@ -103,12 +103,6 @@ class BarbicanKeyManager(key_manager.KeyManager):
|
|||
LOG.error(msg)
|
||||
raise exception.Forbidden(msg)
|
||||
|
||||
if not hasattr(context, 'tenant') or context.tenant is None:
|
||||
msg = u._("Unable to create Barbican Client without tenant "
|
||||
"attribute in context object.")
|
||||
LOG.error(msg)
|
||||
raise exception.KeyManagerError(reason=msg)
|
||||
|
||||
if self._barbican_client and self._current_context == context:
|
||||
return self._barbican_client
|
||||
|
||||
|
@ -133,14 +127,48 @@ class BarbicanKeyManager(key_manager.KeyManager):
|
|||
return self._barbican_client
|
||||
|
||||
def _get_keystone_auth(self, context):
|
||||
# TODO(kfarr): support keystone v2
|
||||
auth = identity.v3.Token(
|
||||
auth_url=self.conf.barbican.auth_endpoint,
|
||||
token=context.auth_token,
|
||||
project_id=context.tenant,
|
||||
domain_id=context.user_domain,
|
||||
project_domain_id=context.project_domain)
|
||||
return auth
|
||||
auth_url = self.conf.barbican.auth_endpoint
|
||||
|
||||
if context.__class__.__name__ is 'KeystonePassword':
|
||||
return identity.v3.Password(
|
||||
auth_url=auth_url,
|
||||
username=context.username,
|
||||
password=context.password,
|
||||
user_id=context.user_id,
|
||||
user_domain_id=context.user_domain_id,
|
||||
user_domain_name=context.user_domain_name,
|
||||
trust_id=context.trust_id,
|
||||
domain_id=context.domain_id,
|
||||
domain_name=context.domain_name,
|
||||
project_id=context.project_id,
|
||||
project_name=context.project_name,
|
||||
project_domain_id=context.project_domain_id,
|
||||
project_domain_name=context.project_domain_name,
|
||||
reauthenticate=context.reauthenticate)
|
||||
elif context.__class__.__name__ is 'KeystoneToken':
|
||||
return identity.v3.Token(
|
||||
auth_url=auth_url,
|
||||
token=context.token,
|
||||
trust_id=context.trust_id,
|
||||
domain_id=context.domain_id,
|
||||
domain_name=context.domain_name,
|
||||
project_id=context.project_id,
|
||||
project_name=context.project_name,
|
||||
project_domain_id=context.project_domain_id,
|
||||
project_domain_name=context.project_domain_name,
|
||||
reauthenticate=context.reauthenticate)
|
||||
# this will be kept for oslo.context compatibility until
|
||||
# projects begin to use utils.credential_factory
|
||||
elif context.__class__.__name__ is 'RequestContext':
|
||||
return identity.v3.Token(
|
||||
auth_url=auth_url,
|
||||
token=context.auth_token,
|
||||
project_id=context.tenant)
|
||||
else:
|
||||
msg = "context must be of type KeystonePassword, KeystoneToken, "
|
||||
"or RequestContext."
|
||||
LOG.error(msg)
|
||||
raise exception.Forbidden(reason=msg)
|
||||
|
||||
def _get_barbican_endpoint(self, auth, sess):
|
||||
if self.conf.barbican.barbican_endpoint:
|
||||
|
|
|
@ -28,6 +28,8 @@ from oslo_config import cfg
|
|||
from oslo_context import context
|
||||
from oslotest import base
|
||||
|
||||
from castellan.common.credentials import keystone_password
|
||||
from castellan.common.credentials import keystone_token
|
||||
from castellan.common import exception
|
||||
from castellan.key_manager import barbican_key_manager
|
||||
from castellan.tests.functional import config
|
||||
|
@ -114,3 +116,152 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase,
|
|||
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.store, None, key)
|
||||
|
||||
|
||||
class BarbicanKeyManagerKSPasswordTestCase(test_key_manager.KeyManagerTestCase,
|
||||
base.BaseTestCase):
|
||||
|
||||
def _create_key_manager(self):
|
||||
return barbican_key_manager.BarbicanKeyManager(cfg.CONF)
|
||||
|
||||
def setUp(self):
|
||||
super(BarbicanKeyManagerKSPasswordTestCase, self).setUp()
|
||||
username = CONF.identity.username
|
||||
password = CONF.identity.password
|
||||
project_name = CONF.identity.project_name
|
||||
user_domain_name = CONF.identity.user_domain_name
|
||||
project_domain_name = CONF.identity.project_domain_name
|
||||
|
||||
self.ctxt = keystone_password.KeystonePassword(
|
||||
username=username,
|
||||
password=password,
|
||||
project_name=project_name,
|
||||
user_domain_name=user_domain_name,
|
||||
project_domain_name=project_domain_name)
|
||||
|
||||
def tearDown(self):
|
||||
super(BarbicanKeyManagerKSPasswordTestCase, self).tearDown()
|
||||
|
||||
def test_create_null_context(self):
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.create_key, None, 'AES', 256)
|
||||
|
||||
def test_create_key_pair_null_context(self):
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.create_key_pair, None, 'RSA', 2048)
|
||||
|
||||
def test_delete_null_context(self):
|
||||
key_uuid = self._get_valid_object_uuid(
|
||||
test_key_manager._get_test_symmetric_key())
|
||||
self.addCleanup(self.key_mgr.delete, self.ctxt, key_uuid)
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.delete, None, key_uuid)
|
||||
|
||||
def test_delete_null_object(self):
|
||||
self.assertRaises(exception.KeyManagerError,
|
||||
self.key_mgr.delete, self.ctxt, None)
|
||||
|
||||
def test_delete_unknown_object(self):
|
||||
unknown_uuid = str(uuid.uuid4())
|
||||
self.assertRaises(exception.ManagedObjectNotFoundError,
|
||||
self.key_mgr.delete, self.ctxt, unknown_uuid)
|
||||
|
||||
def test_get_null_context(self):
|
||||
key_uuid = self._get_valid_object_uuid(
|
||||
test_key_manager._get_test_symmetric_key())
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.get, None, key_uuid)
|
||||
|
||||
def test_get_null_object(self):
|
||||
self.assertRaises(exception.KeyManagerError,
|
||||
self.key_mgr.get, self.ctxt, None)
|
||||
|
||||
def test_get_unknown_key(self):
|
||||
bad_key_uuid = str(uuid.uuid4())
|
||||
self.assertRaises(exception.ManagedObjectNotFoundError,
|
||||
self.key_mgr.get, self.ctxt, bad_key_uuid)
|
||||
|
||||
def test_store_null_context(self):
|
||||
key = test_key_manager._get_test_symmetric_key()
|
||||
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.store, None, key)
|
||||
|
||||
|
||||
class BarbicanKeyManagerKSTokenTestCase(test_key_manager.KeyManagerTestCase,
|
||||
base.BaseTestCase):
|
||||
|
||||
def _create_key_manager(self):
|
||||
return barbican_key_manager.BarbicanKeyManager(cfg.CONF)
|
||||
|
||||
def setUp(self):
|
||||
super(BarbicanKeyManagerKSTokenTestCase, self).setUp()
|
||||
username = CONF.identity.username
|
||||
password = CONF.identity.password
|
||||
project_name = CONF.identity.project_name
|
||||
auth_url = CONF.identity.auth_url
|
||||
user_domain_name = CONF.identity.user_domain_name
|
||||
project_domain_name = CONF.identity.project_domain_name
|
||||
|
||||
auth = v3.Password(auth_url=auth_url,
|
||||
username=username,
|
||||
password=password,
|
||||
project_name=project_name,
|
||||
user_domain_name=user_domain_name,
|
||||
project_domain_name=project_domain_name)
|
||||
sess = session.Session(auth=auth)
|
||||
keystone_client = client.Client(session=sess)
|
||||
|
||||
project_list = keystone_client.projects.list(name=project_name)
|
||||
|
||||
self.ctxt = keystone_token.KeystoneToken(
|
||||
token=auth.auth_ref.auth_token,
|
||||
project_id=project_list[0].id)
|
||||
|
||||
def tearDown(self):
|
||||
super(BarbicanKeyManagerKSTokenTestCase, self).tearDown()
|
||||
|
||||
def test_create_null_context(self):
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.create_key, None, 'AES', 256)
|
||||
|
||||
def test_create_key_pair_null_context(self):
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.create_key_pair, None, 'RSA', 2048)
|
||||
|
||||
def test_delete_null_context(self):
|
||||
key_uuid = self._get_valid_object_uuid(
|
||||
test_key_manager._get_test_symmetric_key())
|
||||
self.addCleanup(self.key_mgr.delete, self.ctxt, key_uuid)
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.delete, None, key_uuid)
|
||||
|
||||
def test_delete_null_object(self):
|
||||
self.assertRaises(exception.KeyManagerError,
|
||||
self.key_mgr.delete, self.ctxt, None)
|
||||
|
||||
def test_delete_unknown_object(self):
|
||||
unknown_uuid = str(uuid.uuid4())
|
||||
self.assertRaises(exception.ManagedObjectNotFoundError,
|
||||
self.key_mgr.delete, self.ctxt, unknown_uuid)
|
||||
|
||||
def test_get_null_context(self):
|
||||
key_uuid = self._get_valid_object_uuid(
|
||||
test_key_manager._get_test_symmetric_key())
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.get, None, key_uuid)
|
||||
|
||||
def test_get_null_object(self):
|
||||
self.assertRaises(exception.KeyManagerError,
|
||||
self.key_mgr.get, self.ctxt, None)
|
||||
|
||||
def test_get_unknown_key(self):
|
||||
bad_key_uuid = str(uuid.uuid4())
|
||||
self.assertRaises(exception.ManagedObjectNotFoundError,
|
||||
self.key_mgr.get, self.ctxt, bad_key_uuid)
|
||||
|
||||
def test_store_null_context(self):
|
||||
key = test_key_manager._get_test_symmetric_key()
|
||||
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.store, None, key)
|
||||
|
|
Loading…
Reference in New Issue