Allow Barbican Key Manager to accept different auth credentials

This patch alters the barbican key manager to be able to use
Token, Password, and OSLO Credentials. It is the third of
several patches which will implement the
"Allow different Keystone Auth Support in Castellan" blueprint.

Other patches will add:
1.) documentation on usage

Needs Functional Tests
Change-Id: Ib3bb9d4e167f0b85bcf7a9053743239c9e6e6dae
Implements: blueprint remove-keystone-dependency
This commit is contained in:
Fernando Diaz 2016-01-29 04:17:34 +00:00
parent 6090ae8032
commit 01f6801c0d
2 changed files with 193 additions and 14 deletions

View File

@ -103,12 +103,6 @@ class BarbicanKeyManager(key_manager.KeyManager):
LOG.error(msg)
raise exception.Forbidden(msg)
if not hasattr(context, 'tenant') or context.tenant is None:
msg = u._("Unable to create Barbican Client without tenant "
"attribute in context object.")
LOG.error(msg)
raise exception.KeyManagerError(reason=msg)
if self._barbican_client and self._current_context == context:
return self._barbican_client
@ -133,14 +127,48 @@ class BarbicanKeyManager(key_manager.KeyManager):
return self._barbican_client
def _get_keystone_auth(self, context):
# TODO(kfarr): support keystone v2
auth = identity.v3.Token(
auth_url=self.conf.barbican.auth_endpoint,
token=context.auth_token,
project_id=context.tenant,
domain_id=context.user_domain,
project_domain_id=context.project_domain)
return auth
auth_url = self.conf.barbican.auth_endpoint
if context.__class__.__name__ is 'KeystonePassword':
return identity.v3.Password(
auth_url=auth_url,
username=context.username,
password=context.password,
user_id=context.user_id,
user_domain_id=context.user_domain_id,
user_domain_name=context.user_domain_name,
trust_id=context.trust_id,
domain_id=context.domain_id,
domain_name=context.domain_name,
project_id=context.project_id,
project_name=context.project_name,
project_domain_id=context.project_domain_id,
project_domain_name=context.project_domain_name,
reauthenticate=context.reauthenticate)
elif context.__class__.__name__ is 'KeystoneToken':
return identity.v3.Token(
auth_url=auth_url,
token=context.token,
trust_id=context.trust_id,
domain_id=context.domain_id,
domain_name=context.domain_name,
project_id=context.project_id,
project_name=context.project_name,
project_domain_id=context.project_domain_id,
project_domain_name=context.project_domain_name,
reauthenticate=context.reauthenticate)
# this will be kept for oslo.context compatibility until
# projects begin to use utils.credential_factory
elif context.__class__.__name__ is 'RequestContext':
return identity.v3.Token(
auth_url=auth_url,
token=context.auth_token,
project_id=context.tenant)
else:
msg = "context must be of type KeystonePassword, KeystoneToken, "
"or RequestContext."
LOG.error(msg)
raise exception.Forbidden(reason=msg)
def _get_barbican_endpoint(self, auth, sess):
if self.conf.barbican.barbican_endpoint:

View File

@ -28,6 +28,8 @@ from oslo_config import cfg
from oslo_context import context
from oslotest import base
from castellan.common.credentials import keystone_password
from castellan.common.credentials import keystone_token
from castellan.common import exception
from castellan.key_manager import barbican_key_manager
from castellan.tests.functional import config
@ -114,3 +116,152 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase,
self.assertRaises(exception.Forbidden,
self.key_mgr.store, None, key)
class BarbicanKeyManagerKSPasswordTestCase(test_key_manager.KeyManagerTestCase,
base.BaseTestCase):
def _create_key_manager(self):
return barbican_key_manager.BarbicanKeyManager(cfg.CONF)
def setUp(self):
super(BarbicanKeyManagerKSPasswordTestCase, self).setUp()
username = CONF.identity.username
password = CONF.identity.password
project_name = CONF.identity.project_name
user_domain_name = CONF.identity.user_domain_name
project_domain_name = CONF.identity.project_domain_name
self.ctxt = keystone_password.KeystonePassword(
username=username,
password=password,
project_name=project_name,
user_domain_name=user_domain_name,
project_domain_name=project_domain_name)
def tearDown(self):
super(BarbicanKeyManagerKSPasswordTestCase, self).tearDown()
def test_create_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.create_key, None, 'AES', 256)
def test_create_key_pair_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.create_key_pair, None, 'RSA', 2048)
def test_delete_null_context(self):
key_uuid = self._get_valid_object_uuid(
test_key_manager._get_test_symmetric_key())
self.addCleanup(self.key_mgr.delete, self.ctxt, key_uuid)
self.assertRaises(exception.Forbidden,
self.key_mgr.delete, None, key_uuid)
def test_delete_null_object(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.delete, self.ctxt, None)
def test_delete_unknown_object(self):
unknown_uuid = str(uuid.uuid4())
self.assertRaises(exception.ManagedObjectNotFoundError,
self.key_mgr.delete, self.ctxt, unknown_uuid)
def test_get_null_context(self):
key_uuid = self._get_valid_object_uuid(
test_key_manager._get_test_symmetric_key())
self.assertRaises(exception.Forbidden,
self.key_mgr.get, None, key_uuid)
def test_get_null_object(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.get, self.ctxt, None)
def test_get_unknown_key(self):
bad_key_uuid = str(uuid.uuid4())
self.assertRaises(exception.ManagedObjectNotFoundError,
self.key_mgr.get, self.ctxt, bad_key_uuid)
def test_store_null_context(self):
key = test_key_manager._get_test_symmetric_key()
self.assertRaises(exception.Forbidden,
self.key_mgr.store, None, key)
class BarbicanKeyManagerKSTokenTestCase(test_key_manager.KeyManagerTestCase,
base.BaseTestCase):
def _create_key_manager(self):
return barbican_key_manager.BarbicanKeyManager(cfg.CONF)
def setUp(self):
super(BarbicanKeyManagerKSTokenTestCase, self).setUp()
username = CONF.identity.username
password = CONF.identity.password
project_name = CONF.identity.project_name
auth_url = CONF.identity.auth_url
user_domain_name = CONF.identity.user_domain_name
project_domain_name = CONF.identity.project_domain_name
auth = v3.Password(auth_url=auth_url,
username=username,
password=password,
project_name=project_name,
user_domain_name=user_domain_name,
project_domain_name=project_domain_name)
sess = session.Session(auth=auth)
keystone_client = client.Client(session=sess)
project_list = keystone_client.projects.list(name=project_name)
self.ctxt = keystone_token.KeystoneToken(
token=auth.auth_ref.auth_token,
project_id=project_list[0].id)
def tearDown(self):
super(BarbicanKeyManagerKSTokenTestCase, self).tearDown()
def test_create_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.create_key, None, 'AES', 256)
def test_create_key_pair_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.create_key_pair, None, 'RSA', 2048)
def test_delete_null_context(self):
key_uuid = self._get_valid_object_uuid(
test_key_manager._get_test_symmetric_key())
self.addCleanup(self.key_mgr.delete, self.ctxt, key_uuid)
self.assertRaises(exception.Forbidden,
self.key_mgr.delete, None, key_uuid)
def test_delete_null_object(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.delete, self.ctxt, None)
def test_delete_unknown_object(self):
unknown_uuid = str(uuid.uuid4())
self.assertRaises(exception.ManagedObjectNotFoundError,
self.key_mgr.delete, self.ctxt, unknown_uuid)
def test_get_null_context(self):
key_uuid = self._get_valid_object_uuid(
test_key_manager._get_test_symmetric_key())
self.assertRaises(exception.Forbidden,
self.key_mgr.get, None, key_uuid)
def test_get_null_object(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.get, self.ctxt, None)
def test_get_unknown_key(self):
bad_key_uuid = str(uuid.uuid4())
self.assertRaises(exception.ManagedObjectNotFoundError,
self.key_mgr.get, self.ctxt, bad_key_uuid)
def test_store_null_context(self):
key = test_key_manager._get_test_symmetric_key()
self.assertRaises(exception.Forbidden,
self.key_mgr.store, None, key)