Add list capability

Adds ability to list secrets, and adds initial filtering ability. Can
filter by secret_type.

Depends-On: I583f27f91cb3c6bdb23438dff6b539407b4005ed
Depends-On: I99cd72724e11bab362bcaaeb773f33b2abfe815c
Change-Id: I245d5846aa8d3b9586bea6dc4e0b24db86c911c9
This commit is contained in:
Kaitlin Farr 2016-11-16 17:27:09 -05:00
parent 68e4056ed4
commit 1a13c2b203
8 changed files with 232 additions and 0 deletions

View File

@ -572,3 +572,44 @@ class BarbicanKeyManager(key_manager.KeyManager):
uuid=managed_object_id)
else:
raise exception.KeyManagerError(reason=e)
def list(self, context, object_type=None, metadata_only=False):
"""Retrieves a list of managed objects that match the criteria.
If no search criteria is given, all objects are returned.
:param context: contains information of the user and the environment
for the request (castellan/context.py)
:param object_type: the type of object to retrieve
:param metadata_only: whether secret data should be included
:raises KeyManagerError: if listing secrets fails
"""
objects = []
barbican_client = self._get_barbican_client(context)
if object_type and object_type not in self._secret_type_dict:
msg = _("Invalid secret type: %s") % object_type
LOG.error(msg)
raise exception.KeyManagerError(reason=msg)
secret_type = self._secret_type_dict.get(object_type)
try:
secrets = barbican_client.secrets.list(secret_type=secret_type)
except (barbican_exceptions.HTTPAuthError,
barbican_exceptions.HTTPClientError,
barbican_exceptions.HTTPServerError) as e:
LOG.error(_("Error listing objects: %s"), e)
raise exception.KeyManagerError(reason=e)
for secret in secrets:
try:
obj = self._get_castellan_object(secret, metadata_only)
objects.append(obj)
except (barbican_exceptions.HTTPAuthError,
barbican_exceptions.HTTPClientError,
barbican_exceptions.HTTPServerError) as e:
LOG.warn(_("Error occurred while retrieving object metadata,"
" not adding it to the list: %s"), e)
return objects

View File

@ -109,3 +109,18 @@ class KeyManager(object):
considered "non-existent" and completely invisible.
"""
pass
@abc.abstractmethod
def list(self, context, object_type=None, metadata_only=False):
"""Lists the managed objects given the criteria.
Implementations should verify that the caller has permission to list
the managed objects and should only list the objects the caller has
access to by checking the context object (context). A NotAuthorized
exception should be raised if the caller lacks permission.
A list of managed objects or managed object metadata should be
returned, depending on the metadata_only flag. If no objects are
found, an empty list should be returned instead.
"""
pass

View File

@ -45,5 +45,8 @@ class NotImplementedKeyManager(key_manager.KeyManager):
def get(self, context, managed_object_id, **kwargs):
raise NotImplementedError()
def list(self, context, object_type=None):
raise NotImplementedError()
def delete(self, context, managed_object_id, **kwargs):
raise NotImplementedError()

View File

@ -171,3 +171,66 @@ class KeyManagerTestCase(object):
retrieved_object = self.key_mgr.get(self.ctxt, uuid)
self.assertEqual(managed_object.get_encoded(),
retrieved_object.get_encoded())
@utils.parameterized_dataset({
'symmetric_key': [_get_test_symmetric_key()],
'public_key': [_get_test_public_key()],
'private_key': [_get_test_private_key()],
'certificate': [_get_test_certificate()],
'passphrase': [_get_test_passphrase()],
'opaque_data': [_get_test_opaque_data()],
})
def test_list(self, managed_object):
uuid = self.key_mgr.store(self.ctxt, managed_object)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid)
# the list command may return more objects than the one we just
# created if older objects were not cleaned up, so we will simply
# check if the object we created is in the list
retrieved_objects = self.key_mgr.list(self.ctxt)
self.assertTrue(managed_object in retrieved_objects)
for obj in retrieved_objects:
self.assertFalse(obj.is_metadata_only())
@utils.parameterized_dataset({
'symmetric_key': [_get_test_symmetric_key()],
'public_key': [_get_test_public_key()],
'private_key': [_get_test_private_key()],
'certificate': [_get_test_certificate()],
'passphrase': [_get_test_passphrase()],
'opaque_data': [_get_test_opaque_data()],
})
def test_list_metadata_only(self, managed_object):
uuid = self.key_mgr.store(self.ctxt, managed_object)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid)
expected_obj = self.key_mgr.get(self.ctxt, uuid, metadata_only=True)
# the list command may return more objects than the one we just
# created if older objects were not cleaned up, so we will simply
# check if the object we created is in the list
retrieved_objects = self.key_mgr.list(self.ctxt, metadata_only=True)
self.assertTrue(expected_obj in retrieved_objects)
for obj in retrieved_objects:
self.assertTrue(obj.is_metadata_only())
@utils.parameterized_dataset({
'query_by_object_type': {
'object_1': _get_test_symmetric_key(),
'object_2': _get_test_public_key(),
'query_dict': dict(object_type=symmetric_key.SymmetricKey)
},
})
def test_list_with_filter(self, object_1, object_2, query_dict):
uuid1 = self.key_mgr.store(self.ctxt, object_1)
uuid2 = self.key_mgr.store(self.ctxt, object_2)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid1)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid2)
# the list command may return more objects than the one we just
# created if older objects were not cleaned up, so we will simply
# check that the returned objects have the expected type
retrieved_objects = self.key_mgr.list(self.ctxt, **query_dict)
for retrieved_object in retrieved_objects:
self.assertEqual(type(object_1), type(retrieved_object))
self.assertTrue(object_1 in retrieved_objects)

View File

@ -226,3 +226,19 @@ class MockKeyManager(key_manager.KeyManager):
random.shuffle(password)
return ''.join(password)
def list(self, context, object_type=None, metadata_only=False):
"""Retrieves a list of managed objects that match the criteria.
A Forbidden exception is raised if the context is None.
If no search criteria is given, all objects are returned.
"""
if context is None:
raise exception.Forbidden()
objects = []
for obj_id in self.keys:
obj = self.get(context, obj_id, metadata_only=metadata_only)
if type(obj) == object_type or object_type is None:
objects.append(obj)
return objects

View File

@ -72,6 +72,7 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
self.delete = self.mock_barbican.secrets.delete
self.store = self.mock_barbican.secrets.store
self.create = self.mock_barbican.secrets.create
self.list = self.mock_barbican.secrets.list
self.key_mgr._barbican_client = self.mock_barbican
self.key_mgr._current_context = self.ctxt
@ -348,3 +349,60 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
order_ref_url)
self.assertEqual(1, self.mock_barbican.orders.get.call_count)
def test_list_null_context(self):
self.key_mgr._barbican_client = None
self.assertRaises(exception.Forbidden,
self.key_mgr.list, None)
def test_list(self):
original_secret_metadata = mock.Mock()
original_secret_metadata.algorithm = mock.sentinel.alg
original_secret_metadata.bit_length = mock.sentinel.bit
original_secret_metadata.secret_type = 'symmetric'
created = timeutils.parse_isotime('2015-10-20 18:51:17+00:00')
original_secret_metadata.created = created
created_formatted = timeutils.parse_isotime(str(created))
created_posix = calendar.timegm(created_formatted.timetuple())
key_name = 'my key'
original_secret_metadata.name = key_name
original_secret_data = b'test key'
original_secret_metadata.payload = original_secret_data
self.mock_barbican.secrets.list.return_value = (
[original_secret_metadata])
# check metadata_only = False
key_list = self.key_mgr.list(self.ctxt)
self.assertEqual(1, len(key_list))
key = key_list[0]
self.list.assert_called_once()
self.assertEqual(key_name, key.name)
self.assertEqual(original_secret_data, key.get_encoded())
self.assertEqual(created_posix, key.created)
self.list.reset_mock()
# check metadata_only = True
key_list = self.key_mgr.list(self.ctxt, metadata_only=True)
self.assertEqual(1, len(key_list))
key = key_list[0]
self.list.assert_called_once()
self.assertEqual(key_name, key.name)
self.assertIsNone(key.get_encoded())
self.assertEqual(created_posix, key.created)
def test_list_with_error(self):
self.mock_barbican.secrets.list = mock.Mock(
side_effect=barbican_exceptions.HTTPClientError('test error'))
self.assertRaises(exception.KeyManagerError,
self.key_mgr.list, self.ctxt)
def test_list_with_invalid_object_type(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.list, self.ctxt, "invalid_type")

View File

@ -54,6 +54,11 @@ class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
self.context = context.RequestContext('fake', 'fake')
def cleanUp(self):
super(MockKeyManagerTestCase, self).cleanUp()
self.key_mgr.keys = {}
def test_create_key(self):
key_id_1 = self.key_mgr.create_key(self.context)
key_id_2 = self.key_mgr.create_key(self.context)
@ -201,3 +206,30 @@ class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
def test_delete_unknown_key(self):
self.assertRaises(KeyError, self.key_mgr.delete, self.context,
None)
def test_list_null_context(self):
self.assertRaises(exception.Forbidden, self.key_mgr.list, None)
def test_list_keys(self):
key1 = sym_key.SymmetricKey('AES', 64 * 8, bytes(b'0' * 64))
self.key_mgr.store(self.context, key1)
key2 = sym_key.SymmetricKey('AES', 32 * 8, bytes(b'0' * 32))
self.key_mgr.store(self.context, key2)
keys = self.key_mgr.list(self.context)
self.assertEqual(2, len(keys))
self.assertTrue(key1 in keys)
self.assertTrue(key2 in keys)
def test_list_keys_metadata_only(self):
key1 = sym_key.SymmetricKey('AES', 64 * 8, bytes(b'0' * 64))
self.key_mgr.store(self.context, key1)
key2 = sym_key.SymmetricKey('AES', 32 * 8, bytes(b'0' * 32))
self.key_mgr.store(self.context, key2)
keys = self.key_mgr.list(self.context, metadata_only=True)
self.assertEqual(2, len(keys))
bit_length_list = [key1.bit_length, key2.bit_length]
for key in keys:
self.assertTrue(key.is_metadata_only())
self.assertTrue(key.bit_length in bit_length_list)

View File

@ -46,6 +46,10 @@ class NotImplementedKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
self.assertRaises(NotImplementedError,
self.key_mgr.get, None, None)
def test_list(self):
self.assertRaises(NotImplementedError,
self.key_mgr.list, None)
def test_delete(self):
self.assertRaises(NotImplementedError,
self.key_mgr.delete, None, None)