Merge "Update Barbican functional tests"

This commit is contained in:
Jenkins 2015-09-28 14:01:43 +00:00 committed by Gerrit Code Review
commit e2603c899b
4 changed files with 265 additions and 94 deletions

View File

@ -21,12 +21,12 @@ Note: This requires local running instances of Barbican and Keystone.
import uuid
from barbicanclient import exceptions as barbican_exceptions
from keystoneclient.v3 import client
from oslo_config import cfg
from oslo_context import context
from oslotest import base
from castellan.common import exception
from castellan.common.objects import symmetric_key
from castellan.key_manager import barbican_key_manager
from castellan.tests.functional import config
from castellan.tests.functional.key_manager import test_key_manager
@ -35,10 +35,11 @@ from castellan.tests.functional.key_manager import test_key_manager
CONF = config.get_config()
class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase,
base.BaseTestCase):
def _create_key_manager(self):
return barbican_key_manager.BarbicanKeyManager()
return barbican_key_manager.BarbicanKeyManager(cfg.CONF)
def setUp(self):
super(BarbicanKeyManagerTestCase, self).setUp()
@ -49,100 +50,58 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
keystone_client = client.Client(username=username,
password=password,
project_name=project_name,
auth_url=auth_url)
auth_url=auth_url,
project_domain_id='default')
project_list = keystone_client.projects.list(name=project_name)
self.ctxt = context.RequestContext(
auth_token=keystone_client.auth_token)
auth_token=keystone_client.auth_token,
tenant=project_list[0].id)
def tearDown(self):
super(BarbicanKeyManagerTestCase, self).tearDown()
def test_create_key(self):
key_uuid = self.key_mgr.create_key(self.ctxt,
algorithm='AES',
length=256)
self.addCleanup(self.key_mgr.delete_key, self.ctxt, key_uuid)
self.assertIsNotNone(key_uuid)
def test_create_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.create_key, None, 'AES', 256)
def test_delete_symmetric_key(self):
key_uuid = self.key_mgr.create_key(self.ctxt,
algorithm='AES',
length=256)
self.key_mgr.delete_key(self.ctxt, key_uuid)
try:
self.key_mgr.get_key(self.ctxt, key_uuid)
except barbican_exceptions.HTTPClientError as e:
self.assertEqual(404, e.status_code)
else:
self.fail('No exception when deleting non-existent key')
def test_create_key_pair_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.create_key_pair, None, 'RSA', 2048)
def test_delete_null_context(self):
key_uuid = self.key_mgr.create_key(self.ctxt,
algorithm='AES',
length=256)
self.addCleanup(self.key_mgr.delete_key, self.ctxt, key_uuid)
key_uuid = self._get_valid_object_uuid(
test_key_manager._get_test_symmetric_key())
self.addCleanup(self.key_mgr.delete, self.ctxt, key_uuid)
self.assertRaises(exception.Forbidden,
self.key_mgr.delete_key, None, key_uuid)
self.key_mgr.delete, None, key_uuid)
def test_delete_null_key(self):
def test_delete_null_object(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.delete_key, self.ctxt, None)
self.key_mgr.delete, self.ctxt, None)
def test_delete_unknown_key(self):
bad_key_uuid = str(uuid.uuid4())
self.assertRaises(barbican_exceptions.HTTPClientError,
self.key_mgr.delete_key, self.ctxt, bad_key_uuid)
def test_get_key(self):
secret_key = b'\x01\x02\xA0\xB3'
key = symmetric_key.SymmetricKey('AES', secret_key)
uuid = self.key_mgr.store_key(self.ctxt, key)
self.addCleanup(self.key_mgr.delete_key, self.ctxt, uuid)
retrieved_key = self.key_mgr.get_key(self.ctxt, uuid)
self.assertEqual(key.get_encoded(), retrieved_key.get_encoded())
def test_delete_unknown_object(self):
unknown_uuid = str(uuid.uuid4())
self.assertRaises(exception.ManagedObjectNotFoundError,
self.key_mgr.delete, self.ctxt, unknown_uuid)
def test_get_null_context(self):
key_uuid = self.key_mgr.create_key(self.ctxt,
algorithm='AES',
length=256)
key_uuid = self._get_valid_object_uuid(
test_key_manager._get_test_symmetric_key())
self.assertRaises(exception.Forbidden,
self.key_mgr.get_key, None, key_uuid)
self.key_mgr.get, None, key_uuid)
def test_get_null_key(self):
key_uuid = self.key_mgr.create_key(self.ctxt,
algorithm='AES',
length=256)
self.addCleanup(self.key_mgr.delete_key, self.ctxt, key_uuid)
def test_get_null_object(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.get_key, self.ctxt, None)
self.key_mgr.get, self.ctxt, None)
def test_get_unknown_key(self):
key_uuid = self.key_mgr.create_key(self.ctxt,
algorithm='AES',
length=256)
self.addCleanup(self.key_mgr.delete_key, self.ctxt, key_uuid)
bad_key_uuid = str(uuid.uuid4())
self.assertRaises(barbican_exceptions.HTTPClientError,
self.key_mgr.get_key, self.ctxt, bad_key_uuid)
def test_store(self):
secret_key = b'\x01\x02\xA0\xB3'
key = symmetric_key.SymmetricKey('AES', secret_key)
uuid = self.key_mgr.store_key(self.ctxt, key)
self.addCleanup(self.key_mgr.delete_key, self.ctxt, uuid)
retrieved_key = self.key_mgr.get_key(self.ctxt, uuid)
self.assertEqual(key.get_encoded(), retrieved_key.get_encoded())
self.assertRaises(exception.ManagedObjectNotFoundError,
self.key_mgr.get, self.ctxt, bad_key_uuid)
def test_store_null_context(self):
secret_key = b'\x01\x02\xA0\xB3'
key = symmetric_key.SymmetricKey('AES', secret_key)
key = test_key_manager._get_test_symmetric_key()
self.assertRaises(exception.Forbidden,
self.key_mgr.store_key, None, key)
self.key_mgr.store, None, key)

View File

@ -14,18 +14,141 @@
# under the License.
"""
Test cases for the key manager.
Test cases for a key manager.
These test cases should pass against any key manager.
"""
from castellan.tests import base
from castellan.common import exception
from castellan.common.objects import opaque_data
from castellan.common.objects import passphrase
from castellan.common.objects import private_key
from castellan.common.objects import public_key
from castellan.common.objects import symmetric_key
from castellan.common.objects import x_509
from castellan.tests import utils
class KeyManagerTestCase(base.TestCase):
def _get_test_symmetric_key():
key_bytes = bytes(utils.get_symmetric_key())
bit_length = 128
key = symmetric_key.SymmetricKey('AES', bit_length, key_bytes)
return key
def _get_test_public_key():
key_bytes = bytes(utils.get_public_key_der())
bit_length = 2048
key = public_key.PublicKey('RSA', bit_length, key_bytes)
return key
def _get_test_private_key():
key_bytes = bytes(utils.get_private_key_der())
bit_length = 2048
key = private_key.PrivateKey('RSA', bit_length, key_bytes)
return key
def _get_test_certificate():
data = bytes(utils.get_certificate_der())
cert = x_509.X509(data)
return cert
def _get_test_opaque_data():
data = bytes(b'opaque data')
opaque_object = opaque_data.OpaqueData(data)
return opaque_object
def _get_test_passphrase():
data = bytes(b'passphrase')
passphrase_object = passphrase.Passphrase(data)
return passphrase_object
@utils.parameterized_test_case
class KeyManagerTestCase(object):
def _create_key_manager(self):
raise NotImplementedError()
def setUp(self):
super(KeyManagerTestCase, self).setUp()
self.key_mgr = self._create_key_manager()
def _get_valid_object_uuid(self, managed_object):
object_uuid = self.key_mgr.store(self.ctxt, managed_object)
self.assertIsNotNone(object_uuid)
return object_uuid
def test_create_key(self):
key_uuid = self.key_mgr.create_key(self.ctxt,
algorithm='AES',
length=256)
self.addCleanup(self.key_mgr.delete, self.ctxt, key_uuid)
self.assertIsNotNone(key_uuid)
def test_create_key_pair(self):
private_key_uuid, public_key_uuid = self.key_mgr.create_key_pair(
self.ctxt,
algorithm='RSA',
length=2048)
self.addCleanup(self.key_mgr.delete, self.ctxt, private_key_uuid)
self.addCleanup(self.key_mgr.delete, self.ctxt, public_key_uuid)
self.assertIsNotNone(private_key_uuid)
self.assertIsNotNone(public_key_uuid)
self.assertNotEqual(private_key_uuid, public_key_uuid)
@utils.parameterized_dataset({
'symmetric_key': [_get_test_symmetric_key()],
'public_key': [_get_test_public_key()],
'private_key': [_get_test_private_key()],
'certificate': [_get_test_certificate()],
'passphrase': [_get_test_passphrase()],
'opaque_data': [_get_test_opaque_data()],
})
def test_delete(self, managed_object):
object_uuid = self._get_valid_object_uuid(managed_object)
self.key_mgr.delete(self.ctxt, object_uuid)
try:
self.key_mgr.get(self.ctxt, object_uuid)
except exception.ManagedObjectNotFoundError:
pass
else:
self.fail('No exception when deleting non-existent key')
@utils.parameterized_dataset({
'symmetric_key': [_get_test_symmetric_key()],
'public_key': [_get_test_public_key()],
'private_key': [_get_test_private_key()],
'certificate': [_get_test_certificate()],
'passphrase': [_get_test_passphrase()],
'opaque_data': [_get_test_opaque_data()],
})
def test_get(self, managed_object):
uuid = self._get_valid_object_uuid(managed_object)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid)
retrieved_object = self.key_mgr.get(self.ctxt, uuid)
self.assertEqual(managed_object.get_encoded(),
retrieved_object.get_encoded())
@utils.parameterized_dataset({
'symmetric_key': [_get_test_symmetric_key()],
'public_key': [_get_test_public_key()],
'private_key': [_get_test_private_key()],
'certificate': [_get_test_certificate()],
'passphrase': [_get_test_passphrase()],
'opaque_data': [_get_test_opaque_data()],
})
def test_store(self, managed_object):
uuid = self.key_mgr.store(self.ctxt, managed_object)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid)
retrieved_object = self.key_mgr.get(self.ctxt, uuid)
self.assertEqual(managed_object.get_encoded(),
retrieved_object.get_encoded())

View File

@ -171,17 +171,10 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
self.key_mgr.delete(self.ctxt, self.key_id)
self.delete.assert_called_once_with(self.secret_ref)
def test_delete_none_key(self):
def test_delete_unknown_key(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.delete, self.ctxt, None)
def test_delete_unkown_key(self):
side_effect = barbican_exceptions.HTTPClientError('key not found')
side_effect.status_code = 404
self.mock_barbican.secrets.delete = mock.Mock(side_effect=side_effect)
self.assertRaises(exception.ManagedObjectNotFoundError,
self.key_mgr.delete, self.ctxt, self.key_id)
def test_delete_with_error(self):
self.mock_barbican.secrets.delete = mock.Mock(
side_effect=barbican_exceptions.HTTPClientError('test error'))
@ -207,17 +200,10 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
self.assertRaises(exception.Forbidden,
self.key_mgr.get, None, self.key_id)
def test_get_none_key(self):
def test_get_unknown_key(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.get, self.ctxt, None)
def test_get_unknown_key(self):
side_effect = barbican_exceptions.HTTPClientError('key not found')
side_effect.status_code = 404
self.mock_barbican.secrets.get = mock.Mock(side_effect=side_effect)
self.assertRaises(exception.ManagedObjectNotFoundError,
self.key_mgr.get, self.ctxt, self.key_id)
def test_get_with_error(self):
self.mock_barbican.secrets.get = mock.Mock(
side_effect=barbican_exceptions.HTTPClientError('test error'))

View File

@ -16,6 +16,99 @@
"""These utility functions are borrowed from Barbican's testing utilities."""
import functools
import types
import six
def construct_new_test_function(original_func, name, build_params):
"""Builds a new test function based on parameterized data.
:param original_func: The original test function that is used as a template
:param name: The fullname of the new test function
:param build_params: A dictionary or list containing args or kwargs
for the new test
:return: A new function object
"""
new_func = types.FunctionType(
six.get_function_code(original_func),
six.get_function_globals(original_func),
name=name,
argdefs=six.get_function_defaults(original_func)
)
for key, val in six.iteritems(original_func.__dict__):
if key != 'build_data':
new_func.__dict__[key] = val
# Support either an arg list or kwarg dict for our data
build_args = build_params if isinstance(build_params, list) else []
build_kwargs = build_params if isinstance(build_params, dict) else {}
# Build a test wrapper to execute with our kwargs
def test_wrapper(func, test_args, test_kwargs):
@functools.wraps(func)
def wrapper(self):
return func(self, *test_args, **test_kwargs)
return wrapper
return test_wrapper(new_func, build_args, build_kwargs)
def process_parameterized_function(name, func_obj, build_data):
"""Build lists of functions to add and remove to a test case."""
to_remove = []
to_add = []
for subtest_name, params in six.iteritems(build_data):
# Build new test function
func_name = '{0}_{1}'.format(name, subtest_name)
new_func = construct_new_test_function(func_obj, func_name, params)
# Mark the new function as needed to be added to the class
to_add.append((func_name, new_func))
# Mark key for removal
to_remove.append(name)
return to_remove, to_add
def parameterized_test_case(cls):
"""Class decorator to process parameterized tests
This allows for parameterization to be used for potentially any
unittest compatible runner; including testr and py.test.
"""
tests_to_remove = []
tests_to_add = []
for key, val in six.iteritems(vars(cls)):
# Only process tests with build data on them
if key.startswith('test_') and val.__dict__.get('build_data'):
to_remove, to_add = process_parameterized_function(
name=key,
func_obj=val,
build_data=val.__dict__.get('build_data')
)
tests_to_remove.extend(to_remove)
tests_to_add.extend(to_add)
# Add all new test functions
[setattr(cls, name, func) for name, func in tests_to_add]
# Remove all old test function templates (if they still exist)
[delattr(cls, key) for key in tests_to_remove if hasattr(cls, key)]
return cls
def parameterized_dataset(build_data):
"""Simple decorator to mark a test method for processing."""
def decorator(func):
func.__dict__['build_data'] = build_data
return func
return decorator
def get_certificate_der():
"""Returns an X509 certificate in DER format
@ -211,3 +304,13 @@ def get_public_key_der():
b'\x01\x15\xcd\x52\x83\x3f\x06\x67\xfd\xa1\x2d\x2b\x07\xba\x32'
b'\x62\x21\x07\x2f\x02\x03\x01\x00\x01')
return key_der
def get_symmetric_key():
"""Returns symmetric key bytes
16 bytes that were randomly generated. Form a 128 bit key.
"""
symmetric_key = (
b'\x92\xcf\x1e\xd9\x54\xea\x30\x70\xd8\xc2\x48\xae\xc1\xc8\x72\xa3')
return symmetric_key