From 1fab17828ce6aa46c8e0c00e3057d49b810a7794 Mon Sep 17 00:00:00 2001 From: Kaitlin Farr Date: Mon, 21 Sep 2015 13:21:06 -0400 Subject: [PATCH] Update Barbican functional tests The Barbican wrapper functional tests now reflect the changes to the API that supported operations on the manager object hierarchy. Reorganizes the functional tests to put tests that should pass against any backend into a base class that can be shared. Also adds parameterized test utilities. Change-Id: Ibbddddc760eda06db620c90b033e3292a2148fe8 --- .../key_manager/test_barbican_key_manager.py | 107 +++++--------- .../key_manager/test_key_manager.py | 131 +++++++++++++++++- .../key_manager/test_barbican_key_manager.py | 18 +-- castellan/tests/utils.py | 103 ++++++++++++++ 4 files changed, 265 insertions(+), 94 deletions(-) diff --git a/castellan/tests/functional/key_manager/test_barbican_key_manager.py b/castellan/tests/functional/key_manager/test_barbican_key_manager.py index b44a8a88..df2249f7 100644 --- a/castellan/tests/functional/key_manager/test_barbican_key_manager.py +++ b/castellan/tests/functional/key_manager/test_barbican_key_manager.py @@ -21,12 +21,12 @@ Note: This requires local running instances of Barbican and Keystone. import uuid -from barbicanclient import exceptions as barbican_exceptions from keystoneclient.v3 import client +from oslo_config import cfg from oslo_context import context +from oslotest import base from castellan.common import exception -from castellan.common.objects import symmetric_key from castellan.key_manager import barbican_key_manager from castellan.tests.functional import config from castellan.tests.functional.key_manager import test_key_manager @@ -35,10 +35,11 @@ from castellan.tests.functional.key_manager import test_key_manager CONF = config.get_config() -class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase): +class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase, + base.BaseTestCase): def _create_key_manager(self): - return barbican_key_manager.BarbicanKeyManager() + return barbican_key_manager.BarbicanKeyManager(cfg.CONF) def setUp(self): super(BarbicanKeyManagerTestCase, self).setUp() @@ -49,100 +50,58 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase): keystone_client = client.Client(username=username, password=password, project_name=project_name, - auth_url=auth_url) + auth_url=auth_url, + project_domain_id='default') + project_list = keystone_client.projects.list(name=project_name) + self.ctxt = context.RequestContext( - auth_token=keystone_client.auth_token) + auth_token=keystone_client.auth_token, + tenant=project_list[0].id) def tearDown(self): super(BarbicanKeyManagerTestCase, self).tearDown() - def test_create_key(self): - key_uuid = self.key_mgr.create_key(self.ctxt, - algorithm='AES', - length=256) - self.addCleanup(self.key_mgr.delete_key, self.ctxt, key_uuid) - self.assertIsNotNone(key_uuid) - def test_create_null_context(self): self.assertRaises(exception.Forbidden, self.key_mgr.create_key, None, 'AES', 256) - def test_delete_symmetric_key(self): - key_uuid = self.key_mgr.create_key(self.ctxt, - algorithm='AES', - length=256) - self.key_mgr.delete_key(self.ctxt, key_uuid) - try: - self.key_mgr.get_key(self.ctxt, key_uuid) - except barbican_exceptions.HTTPClientError as e: - self.assertEqual(404, e.status_code) - else: - self.fail('No exception when deleting non-existent key') + def test_create_key_pair_null_context(self): + self.assertRaises(exception.Forbidden, + self.key_mgr.create_key_pair, None, 'RSA', 2048) def test_delete_null_context(self): - key_uuid = self.key_mgr.create_key(self.ctxt, - algorithm='AES', - length=256) - self.addCleanup(self.key_mgr.delete_key, self.ctxt, key_uuid) + key_uuid = self._get_valid_object_uuid( + test_key_manager._get_test_symmetric_key()) + self.addCleanup(self.key_mgr.delete, self.ctxt, key_uuid) self.assertRaises(exception.Forbidden, - self.key_mgr.delete_key, None, key_uuid) + self.key_mgr.delete, None, key_uuid) - def test_delete_null_key(self): + def test_delete_null_object(self): self.assertRaises(exception.KeyManagerError, - self.key_mgr.delete_key, self.ctxt, None) + self.key_mgr.delete, self.ctxt, None) - def test_delete_unknown_key(self): - bad_key_uuid = str(uuid.uuid4()) - self.assertRaises(barbican_exceptions.HTTPClientError, - self.key_mgr.delete_key, self.ctxt, bad_key_uuid) - - def test_get_key(self): - secret_key = b'\x01\x02\xA0\xB3' - key = symmetric_key.SymmetricKey('AES', secret_key) - - uuid = self.key_mgr.store_key(self.ctxt, key) - self.addCleanup(self.key_mgr.delete_key, self.ctxt, uuid) - - retrieved_key = self.key_mgr.get_key(self.ctxt, uuid) - self.assertEqual(key.get_encoded(), retrieved_key.get_encoded()) + def test_delete_unknown_object(self): + unknown_uuid = str(uuid.uuid4()) + self.assertRaises(exception.ManagedObjectNotFoundError, + self.key_mgr.delete, self.ctxt, unknown_uuid) def test_get_null_context(self): - key_uuid = self.key_mgr.create_key(self.ctxt, - algorithm='AES', - length=256) + key_uuid = self._get_valid_object_uuid( + test_key_manager._get_test_symmetric_key()) self.assertRaises(exception.Forbidden, - self.key_mgr.get_key, None, key_uuid) + self.key_mgr.get, None, key_uuid) - def test_get_null_key(self): - key_uuid = self.key_mgr.create_key(self.ctxt, - algorithm='AES', - length=256) - self.addCleanup(self.key_mgr.delete_key, self.ctxt, key_uuid) + def test_get_null_object(self): self.assertRaises(exception.KeyManagerError, - self.key_mgr.get_key, self.ctxt, None) + self.key_mgr.get, self.ctxt, None) def test_get_unknown_key(self): - key_uuid = self.key_mgr.create_key(self.ctxt, - algorithm='AES', - length=256) - self.addCleanup(self.key_mgr.delete_key, self.ctxt, key_uuid) bad_key_uuid = str(uuid.uuid4()) - self.assertRaises(barbican_exceptions.HTTPClientError, - self.key_mgr.get_key, self.ctxt, bad_key_uuid) - - def test_store(self): - secret_key = b'\x01\x02\xA0\xB3' - key = symmetric_key.SymmetricKey('AES', secret_key) - - uuid = self.key_mgr.store_key(self.ctxt, key) - self.addCleanup(self.key_mgr.delete_key, self.ctxt, uuid) - - retrieved_key = self.key_mgr.get_key(self.ctxt, uuid) - self.assertEqual(key.get_encoded(), retrieved_key.get_encoded()) + self.assertRaises(exception.ManagedObjectNotFoundError, + self.key_mgr.get, self.ctxt, bad_key_uuid) def test_store_null_context(self): - secret_key = b'\x01\x02\xA0\xB3' - key = symmetric_key.SymmetricKey('AES', secret_key) + key = test_key_manager._get_test_symmetric_key() self.assertRaises(exception.Forbidden, - self.key_mgr.store_key, None, key) + self.key_mgr.store, None, key) diff --git a/castellan/tests/functional/key_manager/test_key_manager.py b/castellan/tests/functional/key_manager/test_key_manager.py index 5b29b1da..657e3e17 100644 --- a/castellan/tests/functional/key_manager/test_key_manager.py +++ b/castellan/tests/functional/key_manager/test_key_manager.py @@ -14,18 +14,141 @@ # under the License. """ -Test cases for the key manager. +Test cases for a key manager. + +These test cases should pass against any key manager. """ -from castellan.tests import base +from castellan.common import exception +from castellan.common.objects import opaque_data +from castellan.common.objects import passphrase +from castellan.common.objects import private_key +from castellan.common.objects import public_key +from castellan.common.objects import symmetric_key +from castellan.common.objects import x_509 +from castellan.tests import utils -class KeyManagerTestCase(base.TestCase): +def _get_test_symmetric_key(): + key_bytes = bytes(utils.get_symmetric_key()) + bit_length = 128 + key = symmetric_key.SymmetricKey('AES', bit_length, key_bytes) + return key + + +def _get_test_public_key(): + key_bytes = bytes(utils.get_public_key_der()) + bit_length = 2048 + key = public_key.PublicKey('RSA', bit_length, key_bytes) + return key + + +def _get_test_private_key(): + key_bytes = bytes(utils.get_private_key_der()) + bit_length = 2048 + key = private_key.PrivateKey('RSA', bit_length, key_bytes) + return key + + +def _get_test_certificate(): + data = bytes(utils.get_certificate_der()) + cert = x_509.X509(data) + return cert + + +def _get_test_opaque_data(): + data = bytes(b'opaque data') + opaque_object = opaque_data.OpaqueData(data) + return opaque_object + + +def _get_test_passphrase(): + data = bytes(b'passphrase') + passphrase_object = passphrase.Passphrase(data) + return passphrase_object + + +@utils.parameterized_test_case +class KeyManagerTestCase(object): def _create_key_manager(self): raise NotImplementedError() def setUp(self): super(KeyManagerTestCase, self).setUp() - self.key_mgr = self._create_key_manager() + + def _get_valid_object_uuid(self, managed_object): + object_uuid = self.key_mgr.store(self.ctxt, managed_object) + self.assertIsNotNone(object_uuid) + return object_uuid + + def test_create_key(self): + key_uuid = self.key_mgr.create_key(self.ctxt, + algorithm='AES', + length=256) + self.addCleanup(self.key_mgr.delete, self.ctxt, key_uuid) + self.assertIsNotNone(key_uuid) + + def test_create_key_pair(self): + private_key_uuid, public_key_uuid = self.key_mgr.create_key_pair( + self.ctxt, + algorithm='RSA', + length=2048) + + self.addCleanup(self.key_mgr.delete, self.ctxt, private_key_uuid) + self.addCleanup(self.key_mgr.delete, self.ctxt, public_key_uuid) + + self.assertIsNotNone(private_key_uuid) + self.assertIsNotNone(public_key_uuid) + self.assertNotEqual(private_key_uuid, public_key_uuid) + + @utils.parameterized_dataset({ + 'symmetric_key': [_get_test_symmetric_key()], + 'public_key': [_get_test_public_key()], + 'private_key': [_get_test_private_key()], + 'certificate': [_get_test_certificate()], + 'passphrase': [_get_test_passphrase()], + 'opaque_data': [_get_test_opaque_data()], + }) + def test_delete(self, managed_object): + object_uuid = self._get_valid_object_uuid(managed_object) + self.key_mgr.delete(self.ctxt, object_uuid) + try: + self.key_mgr.get(self.ctxt, object_uuid) + except exception.ManagedObjectNotFoundError: + pass + else: + self.fail('No exception when deleting non-existent key') + + @utils.parameterized_dataset({ + 'symmetric_key': [_get_test_symmetric_key()], + 'public_key': [_get_test_public_key()], + 'private_key': [_get_test_private_key()], + 'certificate': [_get_test_certificate()], + 'passphrase': [_get_test_passphrase()], + 'opaque_data': [_get_test_opaque_data()], + }) + def test_get(self, managed_object): + uuid = self._get_valid_object_uuid(managed_object) + self.addCleanup(self.key_mgr.delete, self.ctxt, uuid) + + retrieved_object = self.key_mgr.get(self.ctxt, uuid) + self.assertEqual(managed_object.get_encoded(), + retrieved_object.get_encoded()) + + @utils.parameterized_dataset({ + 'symmetric_key': [_get_test_symmetric_key()], + 'public_key': [_get_test_public_key()], + 'private_key': [_get_test_private_key()], + 'certificate': [_get_test_certificate()], + 'passphrase': [_get_test_passphrase()], + 'opaque_data': [_get_test_opaque_data()], + }) + def test_store(self, managed_object): + uuid = self.key_mgr.store(self.ctxt, managed_object) + self.addCleanup(self.key_mgr.delete, self.ctxt, uuid) + + retrieved_object = self.key_mgr.get(self.ctxt, uuid) + self.assertEqual(managed_object.get_encoded(), + retrieved_object.get_encoded()) diff --git a/castellan/tests/unit/key_manager/test_barbican_key_manager.py b/castellan/tests/unit/key_manager/test_barbican_key_manager.py index ebc2e69f..e2ef7218 100644 --- a/castellan/tests/unit/key_manager/test_barbican_key_manager.py +++ b/castellan/tests/unit/key_manager/test_barbican_key_manager.py @@ -171,17 +171,10 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase): self.key_mgr.delete(self.ctxt, self.key_id) self.delete.assert_called_once_with(self.secret_ref) - def test_delete_none_key(self): + def test_delete_unknown_key(self): self.assertRaises(exception.KeyManagerError, self.key_mgr.delete, self.ctxt, None) - def test_delete_unkown_key(self): - side_effect = barbican_exceptions.HTTPClientError('key not found') - side_effect.status_code = 404 - self.mock_barbican.secrets.delete = mock.Mock(side_effect=side_effect) - self.assertRaises(exception.ManagedObjectNotFoundError, - self.key_mgr.delete, self.ctxt, self.key_id) - def test_delete_with_error(self): self.mock_barbican.secrets.delete = mock.Mock( side_effect=barbican_exceptions.HTTPClientError('test error')) @@ -207,17 +200,10 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase): self.assertRaises(exception.Forbidden, self.key_mgr.get, None, self.key_id) - def test_get_none_key(self): + def test_get_unknown_key(self): self.assertRaises(exception.KeyManagerError, self.key_mgr.get, self.ctxt, None) - def test_get_unknown_key(self): - side_effect = barbican_exceptions.HTTPClientError('key not found') - side_effect.status_code = 404 - self.mock_barbican.secrets.get = mock.Mock(side_effect=side_effect) - self.assertRaises(exception.ManagedObjectNotFoundError, - self.key_mgr.get, self.ctxt, self.key_id) - def test_get_with_error(self): self.mock_barbican.secrets.get = mock.Mock( side_effect=barbican_exceptions.HTTPClientError('test error')) diff --git a/castellan/tests/utils.py b/castellan/tests/utils.py index 49924a55..4e2aac32 100644 --- a/castellan/tests/utils.py +++ b/castellan/tests/utils.py @@ -16,6 +16,99 @@ """These utility functions are borrowed from Barbican's testing utilities.""" +import functools +import types + +import six + + +def construct_new_test_function(original_func, name, build_params): + """Builds a new test function based on parameterized data. + + :param original_func: The original test function that is used as a template + :param name: The fullname of the new test function + :param build_params: A dictionary or list containing args or kwargs + for the new test + :return: A new function object + """ + new_func = types.FunctionType( + six.get_function_code(original_func), + six.get_function_globals(original_func), + name=name, + argdefs=six.get_function_defaults(original_func) + ) + + for key, val in six.iteritems(original_func.__dict__): + if key != 'build_data': + new_func.__dict__[key] = val + + # Support either an arg list or kwarg dict for our data + build_args = build_params if isinstance(build_params, list) else [] + build_kwargs = build_params if isinstance(build_params, dict) else {} + + # Build a test wrapper to execute with our kwargs + def test_wrapper(func, test_args, test_kwargs): + @functools.wraps(func) + def wrapper(self): + return func(self, *test_args, **test_kwargs) + return wrapper + + return test_wrapper(new_func, build_args, build_kwargs) + + +def process_parameterized_function(name, func_obj, build_data): + """Build lists of functions to add and remove to a test case.""" + to_remove = [] + to_add = [] + + for subtest_name, params in six.iteritems(build_data): + # Build new test function + func_name = '{0}_{1}'.format(name, subtest_name) + new_func = construct_new_test_function(func_obj, func_name, params) + + # Mark the new function as needed to be added to the class + to_add.append((func_name, new_func)) + + # Mark key for removal + to_remove.append(name) + + return to_remove, to_add + + +def parameterized_test_case(cls): + """Class decorator to process parameterized tests + + This allows for parameterization to be used for potentially any + unittest compatible runner; including testr and py.test. + """ + tests_to_remove = [] + tests_to_add = [] + for key, val in six.iteritems(vars(cls)): + # Only process tests with build data on them + if key.startswith('test_') and val.__dict__.get('build_data'): + to_remove, to_add = process_parameterized_function( + name=key, + func_obj=val, + build_data=val.__dict__.get('build_data') + ) + tests_to_remove.extend(to_remove) + tests_to_add.extend(to_add) + + # Add all new test functions + [setattr(cls, name, func) for name, func in tests_to_add] + + # Remove all old test function templates (if they still exist) + [delattr(cls, key) for key in tests_to_remove if hasattr(cls, key)] + return cls + + +def parameterized_dataset(build_data): + """Simple decorator to mark a test method for processing.""" + def decorator(func): + func.__dict__['build_data'] = build_data + return func + return decorator + def get_certificate_der(): """Returns an X509 certificate in DER format @@ -211,3 +304,13 @@ def get_public_key_der(): b'\x01\x15\xcd\x52\x83\x3f\x06\x67\xfd\xa1\x2d\x2b\x07\xba\x32' b'\x62\x21\x07\x2f\x02\x03\x01\x00\x01') return key_der + + +def get_symmetric_key(): + """Returns symmetric key bytes + + 16 bytes that were randomly generated. Form a 128 bit key. + """ + symmetric_key = ( + b'\x92\xcf\x1e\xd9\x54\xea\x30\x70\xd8\xc2\x48\xae\xc1\xc8\x72\xa3') + return symmetric_key