Merge "Add ability to get only metadata"

This commit is contained in:
Jenkins 2017-04-05 03:04:40 +00:00 committed by Gerrit Code Review
commit edcea82959
12 changed files with 118 additions and 8 deletions

View File

@ -77,3 +77,7 @@ class ManagedObject(object):
specified.
"""
pass
def is_metadata_only(self):
"""Returns if the associated object is only metadata or not."""
return self.get_encoded() is None

View File

@ -454,10 +454,17 @@ class BarbicanKeyManager(key_manager.KeyManager):
else:
return secret.payload
def _get_castellan_object(self, secret):
def _get_castellan_object(self, secret, metadata_only=False):
"""Creates a Castellan managed object given the Barbican secret.
:param secret: the secret from barbican with the payload of data
The python barbicanclient lazy-loads the secret data, i.e. the secret
data is not requested until secret.payload is called. If the user
specifies metadata_only=True, secret.payload is never called,
preventing unnecessary loading of secret data.
:param secret: the barbican secret object
:metadata_only: boolean indicating if the secret bytes should be
included in the managed object
:returns: the castellan object
"""
secret_type = op_data.OpaqueData
@ -465,7 +472,10 @@ class BarbicanKeyManager(key_manager.KeyManager):
if barbican_type == secret.secret_type:
secret_type = castellan_type
secret_data = self._get_secret_data(secret)
if metadata_only:
secret_data = None
else:
secret_data = self._get_secret_data(secret)
# convert created ISO8601 in Barbican to POSIX
if secret.created:
@ -514,19 +524,20 @@ class BarbicanKeyManager(key_manager.KeyManager):
else:
return False
def get(self, context, managed_object_id):
def get(self, context, managed_object_id, metadata_only=False):
"""Retrieves the specified managed object.
:param context: contains information of the user and the environment
for the request (castellan/context.py)
:param managed_object_id: the UUID of the object to retrieve
:param metadata_only: whether secret data should be included
:return: ManagedObject representation of the managed object
:raises KeyManagerError: if object retrieval fails
:raises ManagedObjectNotFoundError: if object not found
"""
try:
secret = self._get_secret(context, managed_object_id)
return self._get_castellan_object(secret)
return self._get_castellan_object(secret, metadata_only)
except (barbican_exceptions.HTTPAuthError,
barbican_exceptions.HTTPClientError,
barbican_exceptions.HTTPServerError) as e:

View File

@ -74,7 +74,7 @@ class KeyManager(object):
pass
@abc.abstractmethod
def get(self, context, managed_object_id):
def get(self, context, managed_object_id, metadata_only=False):
"""Retrieves the specified managed object.
Implementations should verify that the caller has permissions to
@ -82,6 +82,9 @@ class KeyManager(object):
as context. If the user lacks permission then a NotAuthorized
exception is raised.
If the caller requests only metadata, then the object that is
returned will contain only the secret metadata and no secret bytes.
If the specified object does not exist, then a KeyError should be
raised. Implementations should preclude users from discerning the
UUIDs of objects that belong to other users by repeatedly calling

View File

@ -136,6 +136,25 @@ class KeyManagerTestCase(object):
retrieved_object = self.key_mgr.get(self.ctxt, uuid)
self.assertEqual(managed_object.get_encoded(),
retrieved_object.get_encoded())
self.assertFalse(managed_object.is_metadata_only())
@utils.parameterized_dataset({
'symmetric_key': [_get_test_symmetric_key()],
'public_key': [_get_test_public_key()],
'private_key': [_get_test_private_key()],
'certificate': [_get_test_certificate()],
'passphrase': [_get_test_passphrase()],
'opaque_data': [_get_test_opaque_data()],
})
def test_get_metadata(self, managed_object):
uuid = self._get_valid_object_uuid(managed_object)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid)
retrieved_object = self.key_mgr.get(self.ctxt,
uuid,
metadata_only=True)
self.assertFalse(managed_object.is_metadata_only())
self.assertTrue(retrieved_object.is_metadata_only())
@utils.parameterized_dataset({
'symmetric_key': [_get_test_symmetric_key()],

View File

@ -166,7 +166,7 @@ class MockKeyManager(key_manager.KeyManager):
return key_id
def get(self, context, managed_object_id, **kwargs):
def get(self, context, managed_object_id, metadata_only=False, **kwargs):
"""Retrieves the key identified by the specified id.
This implementation returns the key that is associated with the
@ -176,7 +176,15 @@ class MockKeyManager(key_manager.KeyManager):
if context is None:
raise exception.Forbidden()
return self.keys[managed_object_id]
obj = self.keys[managed_object_id]
if metadata_only:
if hasattr(obj, "_key"):
obj._key = None
if hasattr(obj, "_data"):
obj._data = None
if hasattr(obj, "_passphrase"):
obj._passphrase = None
return obj
def delete(self, context, managed_object_id, **kwargs):
"""Deletes the object identified by the specified id.

View File

@ -148,6 +148,17 @@ class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
actual_key = self.key_mgr.get(self.context, key_id)
self.assertEqual(_key, actual_key)
def test_store_key_and_get_metadata(self):
secret_key = bytes(b'0' * 64)
_key = sym_key.SymmetricKey('AES', 64 * 8, secret_key)
key_id = self.key_mgr.store(self.context, _key)
actual_key = self.key_mgr.get(self.context,
key_id,
metadata_only=True)
self.assertIsNone(actual_key.get_encoded())
self.assertTrue(actual_key.is_metadata_only())
def test_store_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.store, None, None)

View File

@ -35,6 +35,13 @@ class OpaqueDataTestCase(base.TestCase):
super(OpaqueDataTestCase, self).setUp()
def test_is_not_only_metadata(self):
self.assertFalse(self.opaque_data.is_metadata_only())
def test_is_only_metadata(self):
d = opaque_data.OpaqueData(None, self.name, self.created)
self.assertTrue(d.is_metadata_only())
def test_get_format(self):
self.assertEqual('Opaque', self.opaque_data.format)

View File

@ -35,6 +35,13 @@ class PassphraseTestCase(base.TestCase):
super(PassphraseTestCase, self).setUp()
def test_is_not_only_metadata(self):
self.assertFalse(self.passphrase.is_metadata_only())
def test_is_only_metadata(self):
p = passphrase.Passphrase(None, self.name, self.created)
self.assertTrue(p.is_metadata_only())
def test_get_format(self):
self.assertEqual('RAW', self.passphrase.format)

View File

@ -39,6 +39,17 @@ class PrivateKeyTestCase(base.KeyTestCase):
super(PrivateKeyTestCase, self).setUp()
def test_is_not_only_metadata(self):
self.assertFalse(self.key.is_metadata_only())
def test_is_only_metadata(self):
k = private_key.PrivateKey(self.algorithm,
self.bit_length,
None,
self.name,
self.created)
self.assertTrue(k.is_metadata_only())
def test_get_algorithm(self):
self.assertEqual(self.algorithm, self.key.algorithm)

View File

@ -39,6 +39,17 @@ class PublicKeyTestCase(base.KeyTestCase):
super(PublicKeyTestCase, self).setUp()
def test_is_not_only_metadata(self):
self.assertFalse(self.key.is_metadata_only())
def test_is_only_metadata(self):
k = public_key.PublicKey(self.algorithm,
self.bit_length,
None,
self.name,
self.created)
self.assertTrue(k.is_metadata_only())
def test_get_algorithm(self):
self.assertEqual(self.algorithm, self.key.algorithm)

View File

@ -38,6 +38,17 @@ class SymmetricKeyTestCase(base.KeyTestCase):
super(SymmetricKeyTestCase, self).setUp()
def test_is_not_only_metadata(self):
self.assertFalse(self.key.is_metadata_only())
def test_is_only_metadata(self):
k = sym_key.SymmetricKey(self.algorithm,
self.bit_length,
None,
self.name,
self.created)
self.assertTrue(k.is_metadata_only())
def test_get_format(self):
self.assertEqual('RAW', self.key.format)

View File

@ -35,6 +35,13 @@ class X509TestCase(base.CertificateTestCase):
super(X509TestCase, self).setUp()
def test_is_not_only_metadata(self):
self.assertFalse(self.cert.is_metadata_only())
def test_is_only_metadata(self):
c = x_509.X509(None, self.name, self.created)
self.assertTrue(c.is_metadata_only())
def test_get_format(self):
self.assertEqual('X.509', self.cert.format)