Merge "Add ability to get only metadata"
This commit is contained in:
commit
edcea82959
|
@ -77,3 +77,7 @@ class ManagedObject(object):
|
|||
specified.
|
||||
"""
|
||||
pass
|
||||
|
||||
def is_metadata_only(self):
|
||||
"""Returns if the associated object is only metadata or not."""
|
||||
return self.get_encoded() is None
|
||||
|
|
|
@ -454,10 +454,17 @@ class BarbicanKeyManager(key_manager.KeyManager):
|
|||
else:
|
||||
return secret.payload
|
||||
|
||||
def _get_castellan_object(self, secret):
|
||||
def _get_castellan_object(self, secret, metadata_only=False):
|
||||
"""Creates a Castellan managed object given the Barbican secret.
|
||||
|
||||
:param secret: the secret from barbican with the payload of data
|
||||
The python barbicanclient lazy-loads the secret data, i.e. the secret
|
||||
data is not requested until secret.payload is called. If the user
|
||||
specifies metadata_only=True, secret.payload is never called,
|
||||
preventing unnecessary loading of secret data.
|
||||
|
||||
:param secret: the barbican secret object
|
||||
:metadata_only: boolean indicating if the secret bytes should be
|
||||
included in the managed object
|
||||
:returns: the castellan object
|
||||
"""
|
||||
secret_type = op_data.OpaqueData
|
||||
|
@ -465,7 +472,10 @@ class BarbicanKeyManager(key_manager.KeyManager):
|
|||
if barbican_type == secret.secret_type:
|
||||
secret_type = castellan_type
|
||||
|
||||
secret_data = self._get_secret_data(secret)
|
||||
if metadata_only:
|
||||
secret_data = None
|
||||
else:
|
||||
secret_data = self._get_secret_data(secret)
|
||||
|
||||
# convert created ISO8601 in Barbican to POSIX
|
||||
if secret.created:
|
||||
|
@ -514,19 +524,20 @@ class BarbicanKeyManager(key_manager.KeyManager):
|
|||
else:
|
||||
return False
|
||||
|
||||
def get(self, context, managed_object_id):
|
||||
def get(self, context, managed_object_id, metadata_only=False):
|
||||
"""Retrieves the specified managed object.
|
||||
|
||||
:param context: contains information of the user and the environment
|
||||
for the request (castellan/context.py)
|
||||
:param managed_object_id: the UUID of the object to retrieve
|
||||
:param metadata_only: whether secret data should be included
|
||||
:return: ManagedObject representation of the managed object
|
||||
:raises KeyManagerError: if object retrieval fails
|
||||
:raises ManagedObjectNotFoundError: if object not found
|
||||
"""
|
||||
try:
|
||||
secret = self._get_secret(context, managed_object_id)
|
||||
return self._get_castellan_object(secret)
|
||||
return self._get_castellan_object(secret, metadata_only)
|
||||
except (barbican_exceptions.HTTPAuthError,
|
||||
barbican_exceptions.HTTPClientError,
|
||||
barbican_exceptions.HTTPServerError) as e:
|
||||
|
|
|
@ -74,7 +74,7 @@ class KeyManager(object):
|
|||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get(self, context, managed_object_id):
|
||||
def get(self, context, managed_object_id, metadata_only=False):
|
||||
"""Retrieves the specified managed object.
|
||||
|
||||
Implementations should verify that the caller has permissions to
|
||||
|
@ -82,6 +82,9 @@ class KeyManager(object):
|
|||
as context. If the user lacks permission then a NotAuthorized
|
||||
exception is raised.
|
||||
|
||||
If the caller requests only metadata, then the object that is
|
||||
returned will contain only the secret metadata and no secret bytes.
|
||||
|
||||
If the specified object does not exist, then a KeyError should be
|
||||
raised. Implementations should preclude users from discerning the
|
||||
UUIDs of objects that belong to other users by repeatedly calling
|
||||
|
|
|
@ -136,6 +136,25 @@ class KeyManagerTestCase(object):
|
|||
retrieved_object = self.key_mgr.get(self.ctxt, uuid)
|
||||
self.assertEqual(managed_object.get_encoded(),
|
||||
retrieved_object.get_encoded())
|
||||
self.assertFalse(managed_object.is_metadata_only())
|
||||
|
||||
@utils.parameterized_dataset({
|
||||
'symmetric_key': [_get_test_symmetric_key()],
|
||||
'public_key': [_get_test_public_key()],
|
||||
'private_key': [_get_test_private_key()],
|
||||
'certificate': [_get_test_certificate()],
|
||||
'passphrase': [_get_test_passphrase()],
|
||||
'opaque_data': [_get_test_opaque_data()],
|
||||
})
|
||||
def test_get_metadata(self, managed_object):
|
||||
uuid = self._get_valid_object_uuid(managed_object)
|
||||
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid)
|
||||
|
||||
retrieved_object = self.key_mgr.get(self.ctxt,
|
||||
uuid,
|
||||
metadata_only=True)
|
||||
self.assertFalse(managed_object.is_metadata_only())
|
||||
self.assertTrue(retrieved_object.is_metadata_only())
|
||||
|
||||
@utils.parameterized_dataset({
|
||||
'symmetric_key': [_get_test_symmetric_key()],
|
||||
|
|
|
@ -166,7 +166,7 @@ class MockKeyManager(key_manager.KeyManager):
|
|||
|
||||
return key_id
|
||||
|
||||
def get(self, context, managed_object_id, **kwargs):
|
||||
def get(self, context, managed_object_id, metadata_only=False, **kwargs):
|
||||
"""Retrieves the key identified by the specified id.
|
||||
|
||||
This implementation returns the key that is associated with the
|
||||
|
@ -176,7 +176,15 @@ class MockKeyManager(key_manager.KeyManager):
|
|||
if context is None:
|
||||
raise exception.Forbidden()
|
||||
|
||||
return self.keys[managed_object_id]
|
||||
obj = self.keys[managed_object_id]
|
||||
if metadata_only:
|
||||
if hasattr(obj, "_key"):
|
||||
obj._key = None
|
||||
if hasattr(obj, "_data"):
|
||||
obj._data = None
|
||||
if hasattr(obj, "_passphrase"):
|
||||
obj._passphrase = None
|
||||
return obj
|
||||
|
||||
def delete(self, context, managed_object_id, **kwargs):
|
||||
"""Deletes the object identified by the specified id.
|
||||
|
|
|
@ -148,6 +148,17 @@ class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
|
|||
actual_key = self.key_mgr.get(self.context, key_id)
|
||||
self.assertEqual(_key, actual_key)
|
||||
|
||||
def test_store_key_and_get_metadata(self):
|
||||
secret_key = bytes(b'0' * 64)
|
||||
_key = sym_key.SymmetricKey('AES', 64 * 8, secret_key)
|
||||
key_id = self.key_mgr.store(self.context, _key)
|
||||
|
||||
actual_key = self.key_mgr.get(self.context,
|
||||
key_id,
|
||||
metadata_only=True)
|
||||
self.assertIsNone(actual_key.get_encoded())
|
||||
self.assertTrue(actual_key.is_metadata_only())
|
||||
|
||||
def test_store_null_context(self):
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.store, None, None)
|
||||
|
|
|
@ -35,6 +35,13 @@ class OpaqueDataTestCase(base.TestCase):
|
|||
|
||||
super(OpaqueDataTestCase, self).setUp()
|
||||
|
||||
def test_is_not_only_metadata(self):
|
||||
self.assertFalse(self.opaque_data.is_metadata_only())
|
||||
|
||||
def test_is_only_metadata(self):
|
||||
d = opaque_data.OpaqueData(None, self.name, self.created)
|
||||
self.assertTrue(d.is_metadata_only())
|
||||
|
||||
def test_get_format(self):
|
||||
self.assertEqual('Opaque', self.opaque_data.format)
|
||||
|
||||
|
|
|
@ -35,6 +35,13 @@ class PassphraseTestCase(base.TestCase):
|
|||
|
||||
super(PassphraseTestCase, self).setUp()
|
||||
|
||||
def test_is_not_only_metadata(self):
|
||||
self.assertFalse(self.passphrase.is_metadata_only())
|
||||
|
||||
def test_is_only_metadata(self):
|
||||
p = passphrase.Passphrase(None, self.name, self.created)
|
||||
self.assertTrue(p.is_metadata_only())
|
||||
|
||||
def test_get_format(self):
|
||||
self.assertEqual('RAW', self.passphrase.format)
|
||||
|
||||
|
|
|
@ -39,6 +39,17 @@ class PrivateKeyTestCase(base.KeyTestCase):
|
|||
|
||||
super(PrivateKeyTestCase, self).setUp()
|
||||
|
||||
def test_is_not_only_metadata(self):
|
||||
self.assertFalse(self.key.is_metadata_only())
|
||||
|
||||
def test_is_only_metadata(self):
|
||||
k = private_key.PrivateKey(self.algorithm,
|
||||
self.bit_length,
|
||||
None,
|
||||
self.name,
|
||||
self.created)
|
||||
self.assertTrue(k.is_metadata_only())
|
||||
|
||||
def test_get_algorithm(self):
|
||||
self.assertEqual(self.algorithm, self.key.algorithm)
|
||||
|
||||
|
|
|
@ -39,6 +39,17 @@ class PublicKeyTestCase(base.KeyTestCase):
|
|||
|
||||
super(PublicKeyTestCase, self).setUp()
|
||||
|
||||
def test_is_not_only_metadata(self):
|
||||
self.assertFalse(self.key.is_metadata_only())
|
||||
|
||||
def test_is_only_metadata(self):
|
||||
k = public_key.PublicKey(self.algorithm,
|
||||
self.bit_length,
|
||||
None,
|
||||
self.name,
|
||||
self.created)
|
||||
self.assertTrue(k.is_metadata_only())
|
||||
|
||||
def test_get_algorithm(self):
|
||||
self.assertEqual(self.algorithm, self.key.algorithm)
|
||||
|
||||
|
|
|
@ -38,6 +38,17 @@ class SymmetricKeyTestCase(base.KeyTestCase):
|
|||
|
||||
super(SymmetricKeyTestCase, self).setUp()
|
||||
|
||||
def test_is_not_only_metadata(self):
|
||||
self.assertFalse(self.key.is_metadata_only())
|
||||
|
||||
def test_is_only_metadata(self):
|
||||
k = sym_key.SymmetricKey(self.algorithm,
|
||||
self.bit_length,
|
||||
None,
|
||||
self.name,
|
||||
self.created)
|
||||
self.assertTrue(k.is_metadata_only())
|
||||
|
||||
def test_get_format(self):
|
||||
self.assertEqual('RAW', self.key.format)
|
||||
|
||||
|
|
|
@ -35,6 +35,13 @@ class X509TestCase(base.CertificateTestCase):
|
|||
|
||||
super(X509TestCase, self).setUp()
|
||||
|
||||
def test_is_not_only_metadata(self):
|
||||
self.assertFalse(self.cert.is_metadata_only())
|
||||
|
||||
def test_is_only_metadata(self):
|
||||
c = x_509.X509(None, self.name, self.created)
|
||||
self.assertTrue(c.is_metadata_only())
|
||||
|
||||
def test_get_format(self):
|
||||
self.assertEqual('X.509', self.cert.format)
|
||||
|
||||
|
|
Loading…
Reference in New Issue