Update the key manager API

Includes changes to the base API class to support managed objects
and creation of asymmetric key pairs. The current implementations
of the key manager only support symmetric keys for retrieval, and raise
NotImplementedErrors for generation of asymmetric key pairs. Full
functionality coming in later commits.

Change-Id: I69e0c22729413e95808f9419df59017011f14d99
This commit is contained in:
Kaitlin Farr 2015-07-17 15:45:06 -04:00
parent 39e139f88e
commit 3d031cb5af
7 changed files with 174 additions and 143 deletions

View File

@ -107,7 +107,7 @@ class BarbicanKeyManager(key_manager.KeyManager):
return base_url return base_url
def create_key(self, context, algorithm, length, expiration=None): def create_key(self, context, algorithm, length, expiration=None):
"""Creates a key. """Creates a symmetric key.
:param context: contains information of the user and the environment :param context: contains information of the user and the environment
for the request (castellan/context.py) for the request (castellan/context.py)
@ -135,28 +135,48 @@ class BarbicanKeyManager(key_manager.KeyManager):
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
LOG.error(u._LE("Error creating key: %s"), e) LOG.error(u._LE("Error creating key: %s"), e)
def store_key(self, context, key, expiration=None): def create_key_pair(self, context, algorithm, length, expiration=None):
"""Stores (i.e., registers) a key with the key manager. """Creates an asymmetric key pair.
Not implemented yet.
:param context: contains information of the user and the environment :param context: contains information of the user and the environment
for the request (castellan/context.py) for the request (castellan/context.py)
:param key: the unencrypted secret data. Known as "payload" to the :param algorithm: the algorithm associated with the secret
barbicanclient api :param length: the bit length of the secret
:param expiration: the expiration time of the secret in ISO 8601 :param expiration: the date the key will expire
format :return: TODO: the UUIDs of the new key, in the order (private, public)
:returns: the UUID of the stored key :raises NotImplementedError: until implemented
:raises HTTPAuthError: if key creation fails with 401 :raises HTTPAuthError: if key creation fails with 401
:raises HTTPClientError: if key creation failes with 4xx :raises HTTPClientError: if key creation failes with 4xx
:raises HTTPServerError: if key creation fails with 5xx :raises HTTPServerError: if key creation fails with 5xx
""" """
raise NotImplementedError()
def store(self, context, managed_object, expiration=None):
"""Stores (i.e., registers) an object with the key manager.
:param context: contains information of the user and the environment
for the request (castellan/context.py)
:param managed_object: the unencrypted secret data. Known as "payload"
to the barbicanclient api
:param expiration: the expiration time of the secret in ISO 8601
format
:returns: the UUID of the stored object
:raises HTTPAuthError: if object creation fails with 401
:raises HTTPClientError: if object creation failes with 4xx
:raises HTTPServerError: if object creation fails with 5xx
"""
barbican_client = self._get_barbican_client(context) barbican_client = self._get_barbican_client(context)
try: try:
if key.algorithm: if managed_object.algorithm:
algorithm = key.algorithm algorithm = managed_object.algorithm
encoded_key = key.get_encoded() else:
algorithm = None
encoded_object = managed_object.get_encoded()
# TODO(kfarr) add support for objects other than symmetric keys # TODO(kfarr) add support for objects other than symmetric keys
secret = barbican_client.secrets.create(payload=encoded_key, secret = barbican_client.secrets.create(payload=encoded_object,
algorithm=algorithm, algorithm=algorithm,
expiration=expiration) expiration=expiration)
secret_ref = secret.store() secret_ref = secret.store()
@ -165,32 +185,34 @@ class BarbicanKeyManager(key_manager.KeyManager):
barbican_exceptions.HTTPClientError, barbican_exceptions.HTTPClientError,
barbican_exceptions.HTTPServerError) as e: barbican_exceptions.HTTPServerError) as e:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
LOG.error(u._LE("Error storing key: %s"), e) LOG.error(u._LE("Error storing object: %s"), e)
def copy_key(self, context, key_id): def copy(self, context, managed_object_id):
"""Copies (i.e., clones) a key stored by barbican. """Copies (i.e., clones) a managed object stored by barbican.
:param context: contains information of the user and the environment :param context: contains information of the user and the environment
for the request (castellan/context.py) for the request (castellan/context.py)
:param key_id: the UUID of the key to copy :param managed_object_id: the UUID of the object to copy
:return: the UUID of the key copy :return: the UUID of the object copy
:raises HTTPAuthError: if key creation fails with 401 :raises HTTPAuthError: if object creation fails with 401
:raises HTTPClientError: if key creation failes with 4xx :raises HTTPClientError: if object creation failes with 4xx
:raises HTTPServerError: if key creation fails with 5xx :raises HTTPServerError: if object creation fails with 5xx
""" """
try: try:
secret = self._get_secret(context, key_id) secret = self._get_secret(context, managed_object_id)
secret_data = self._get_secret_data(secret) secret_data = self._get_secret_data(secret)
# TODO(kfarr) modify to support other types of keys # TODO(kfarr) modify to support other types of keys
key = sym_key.SymmetricKey(secret.algorithm, secret_data) key = sym_key.SymmetricKey(secret.algorithm,
copy_uuid = self.store_key(context, key, secret.expiration) secret.bit_length,
secret_data)
copy_uuid = self.store(context, key, secret.expiration)
return copy_uuid return copy_uuid
except (barbican_exceptions.HTTPAuthError, except (barbican_exceptions.HTTPAuthError,
barbican_exceptions.HTTPClientError, barbican_exceptions.HTTPClientError,
barbican_exceptions.HTTPServerError) as e: barbican_exceptions.HTTPServerError) as e:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
LOG.error(u._LE("Error copying key: %s"), e) LOG.error(u._LE("Error copying object: %s"), e)
def _create_secret_ref(self, key_id): def _create_secret_ref(self, key_id):
"""Creates the URL required for accessing a secret. """Creates the URL required for accessing a secret.
@ -235,9 +257,9 @@ class BarbicanKeyManager(key_manager.KeyManager):
for the request (castellan/context.py) for the request (castellan/context.py)
:param key_id: UUID of the secret :param key_id: UUID of the secret
:return: the secret's metadata :return: the secret's metadata
:raises HTTPAuthError: if key creation fails with 401 :raises HTTPAuthError: if object retrieval fails with 401
:raises HTTPClientError: if key creation failes with 4xx :raises HTTPClientError: if object retrieval fails with 4xx
:raises HTTPServerError: if key creation fails with 5xx :raises HTTPServerError: if object retrieval fails with 5xx
""" """
barbican_client = self._get_barbican_client(context) barbican_client = self._get_barbican_client(context)
@ -251,19 +273,21 @@ class BarbicanKeyManager(key_manager.KeyManager):
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
LOG.error(u._LE("Error getting secret metadata: %s"), e) LOG.error(u._LE("Error getting secret metadata: %s"), e)
def get_key(self, context, key_id): def get(self, context, managed_object_id):
"""Retrieves the specified key. """Retrieves the specified managed object.
Currently only supports retrieving symmetric keys.
:param context: contains information of the user and the environment :param context: contains information of the user and the environment
for the request (castellan/context.py) for the request (castellan/context.py)
:param key_id: the UUID of the key to retrieve :param managed_object_id: the UUID of the object to retrieve
:return: SymmetricKey representation of the key :return: SymmetricKey representation of the key
:raises HTTPAuthError: if key creation fails with 401 :raises HTTPAuthError: if object retrieval fails with 401
:raises HTTPClientError: if key creation failes with 4xx :raises HTTPClientError: if object retrieval fails with 4xx
:raises HTTPServerError: if key creation fails with 5xx :raises HTTPServerError: if object retrieval fails with 5xx
""" """
try: try:
secret = self._get_secret(context, key_id) secret = self._get_secret(context, managed_object_id)
secret_data = self._get_secret_data(secret) secret_data = self._get_secret_data(secret)
# TODO(kfarr) add support for other objects # TODO(kfarr) add support for other objects
key = sym_key.SymmetricKey(secret.algorithm, key = sym_key.SymmetricKey(secret.algorithm,
@ -274,25 +298,25 @@ class BarbicanKeyManager(key_manager.KeyManager):
barbican_exceptions.HTTPClientError, barbican_exceptions.HTTPClientError,
barbican_exceptions.HTTPServerError) as e: barbican_exceptions.HTTPServerError) as e:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
LOG.error(u._LE("Error getting key: %s"), e) LOG.error(u._LE("Error getting object: %s"), e)
def delete_key(self, context, key_id): def delete(self, context, managed_object_id):
"""Deletes the specified key. """Deletes the specified managed object.
:param context: contains information of the user and the environment :param context: contains information of the user and the environment
for the request (castellan/context.py) for the request (castellan/context.py)
:param key_id: the UUID of the key to delete :param managed_object_id: the UUID of the object to delete
:raises HTTPAuthError: if key creation fails with 401 :raises HTTPAuthError: if key deletion fails with 401
:raises HTTPClientError: if key creation failes with 4xx :raises HTTPClientError: if key deletion fails with 4xx
:raises HTTPServerError: if key creation fails with 5xx :raises HTTPServerError: if key deletion fails with 5xx
""" """
barbican_client = self._get_barbican_client(context) barbican_client = self._get_barbican_client(context)
try: try:
secret_ref = self._create_secret_ref(key_id) secret_ref = self._create_secret_ref(managed_object_id)
barbican_client.secrets.delete(secret_ref) barbican_client.secrets.delete(secret_ref)
except (barbican_exceptions.HTTPAuthError, except (barbican_exceptions.HTTPAuthError,
barbican_exceptions.HTTPClientError, barbican_exceptions.HTTPClientError,
barbican_exceptions.HTTPServerError) as e: barbican_exceptions.HTTPServerError) as e:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
LOG.error(u._LE("Error deleting key: %s"), e) LOG.error(u._LE("Error deleting object: %s"), e)

View File

@ -31,70 +31,82 @@ class KeyManager(object):
""" """
@abc.abstractmethod @abc.abstractmethod
def create_key(self, context, algorithm, length, def create_key(self, context, algorithm, length, expiration=None):
expiration=None): """Creates a symmetric key.
"""Creates a key.
This method creates a key and returns the key's UUID. If the specified This method creates a symmetric key and returns the key's UUID. If the
context does not permit the creation of keys, then a NotAuthorized specified context does not permit the creation of keys, then a
exception should be raised. NotAuthorized exception should be raised.
""" """
pass pass
@abc.abstractmethod @abc.abstractmethod
def store_key(self, context, key, expiration=None): def create_key_pair(self, context, algorithm, length, expiration=None):
"""Stores (i.e., registers) a key with the key manager. """Creates an asymmetric key pair.
This method stores the specified key and returns its UUID that This method creates an asymmetric key pair and returns the pair of key
identifies it within the key manager. If the specified context does UUIDs. If the specified context does not permit the creation of keys,
not permit the creation of keys, then a NotAuthorized exception should then a NotAuthorized exception should be raised. The order of the UUIDs
be raised. will be (private, public).
""" """
pass pass
@abc.abstractmethod @abc.abstractmethod
def copy_key(self, context, key_id): def store(self, context, managed_object, expiration=None):
"""Copies (i.e., clones) a key stored by the key manager. """Stores a managed object with the key manager.
This method copies the specified key and returns the copy's UUID. If This method stores the specified managed object and returns its UUID
the specified context does not permit copying keys, then a that identifies it within the key manager. If the specified context
does not permit the creation of keys, then a NotAuthorized exception
should be raised.
"""
pass
@abc.abstractmethod
def copy(self, context, managed_object_id):
"""Copies (i.e., clones) a managed object stored by the key manager.
This method copies the specified managed object and returns the copy's
UUID. If the specified context does not permit copying objects, then a
NotAuthorized error should be raised. NotAuthorized error should be raised.
Implementation note: This method should behave identically to Implementation note: This method should behave identically to
store_key(context, get_key(context, <encryption key UUID>)) store(context, get(context, <object UUID>))
although it is preferable to perform this operation within the key although it is preferable to perform this operation within the key
manager to avoid unnecessary handling of the key material. manager to avoid unnecessary handling of the object material.
""" """
pass pass
@abc.abstractmethod @abc.abstractmethod
def get_key(self, context, key_id): def get(self, context, managed_object_id):
"""Retrieves the specified key. """Retrieves the specified managed object.
Implementations should verify that the caller has permissions to Implementations should verify that the caller has permissions to
retrieve the key by checking the context object passed in as context. retrieve the managed object by checking the context object passed in
If the user lacks permission then a NotAuthorized exception is raised. as context. If the user lacks permission then a NotAuthorized
exception is raised.
If the specified key does not exist, then a KeyError should be raised. If the specified object does not exist, then a KeyError should be
Implementations should preclude users from discerning the UUIDs of raised. Implementations should preclude users from discerning the
keys that belong to other users by repeatedly calling this method. UUIDs of objects that belong to other users by repeatedly calling
That is, keys that belong to other users should be considered "non- this method. That is, objects that belong to other users should be
existent" and completely invisible. considered "non-existent" and completely invisible.
""" """
pass pass
@abc.abstractmethod @abc.abstractmethod
def delete_key(self, context, key_id): def delete(self, context, managed_object_id):
"""Deletes the specified key. """Deletes the specified managed object.
Implementations should verify that the caller has permission to delete Implementations should verify that the caller has permission to delete
the key by checking the context object (context). A NotAuthorized the managed object by checking the context object (context). A
exception should be raised if the caller lacks permission. NotAuthorized exception should be raised if the caller lacks
permission.
If the specified key does not exist, then a KeyError should be raised. If the specified object does not exist, then a KeyError should be
Implementations should preclude users from discerning the UUIDs of raised. Implementations should preclude users from discerning the
keys that belong to other users by repeatedly calling this method. UUIDs of objects that belong to other users by repeatedly calling this
That is, keys that belong to other users should be considered "non- method. That is, objects that belong to other users should be
existent" and completely invisible. considered "non-existent" and completely invisible.
""" """
pass pass

View File

@ -29,14 +29,17 @@ class NotImplementedKeyManager(key_manager.KeyManager):
expiration=None, **kwargs): expiration=None, **kwargs):
raise NotImplementedError() raise NotImplementedError()
def store_key(self, context, key, expiration=None, **kwargs): def create_key_pair(self, context, algorithm, lengthm, expiration=None):
raise NotImplementedError() raise NotImplementedError()
def copy_key(self, context, key_id, **kwargs): def store(self, context, managed_object, expiration=None, **kwargs):
raise NotImplementedError() raise NotImplementedError()
def get_key(self, context, key_id, **kwargs): def copy(self, context, managed_object_id, **kwargs):
raise NotImplementedError() raise NotImplementedError()
def delete_key(self, context, key_id, **kwargs): def get(self, context, managed_object_id, **kwargs):
raise NotImplementedError()
def delete(self, context, managed_object_id, **kwargs):
raise NotImplementedError() raise NotImplementedError()

View File

@ -36,7 +36,6 @@ from castellan.key_manager import key_manager
class MockKeyManager(key_manager.KeyManager): class MockKeyManager(key_manager.KeyManager):
"""Mocking manager for integration tests. """Mocking manager for integration tests.
This mock key manager implementation supports all the methods specified This mock key manager implementation supports all the methods specified
@ -67,7 +66,7 @@ class MockKeyManager(key_manager.KeyManager):
bytes(binascii.unhexlify(_hex))) bytes(binascii.unhexlify(_hex)))
def create_key(self, context, **kwargs): def create_key(self, context, **kwargs):
"""Creates a key. """Creates a symmetric key.
This implementation returns a UUID for the created key. A This implementation returns a UUID for the created key. A
Forbidden exception is raised if the specified context is None. Forbidden exception is raised if the specified context is None.
@ -76,7 +75,10 @@ class MockKeyManager(key_manager.KeyManager):
raise exception.Forbidden() raise exception.Forbidden()
key = self._generate_key(**kwargs) key = self._generate_key(**kwargs)
return self.store_key(context, key) return self.store(context, key)
def create_key_pair(self, context, algorithm, length, expiration=None):
raise NotImplementedError()
def _generate_key_id(self): def _generate_key_id(self):
key_id = str(uuid.uuid4()) key_id = str(uuid.uuid4())
@ -85,26 +87,26 @@ class MockKeyManager(key_manager.KeyManager):
return key_id return key_id
def store_key(self, context, key, **kwargs): def store(self, context, managed_object, **kwargs):
"""Stores (i.e., registers) a key with the key manager.""" """Stores (i.e., registers) a key with the key manager."""
if context is None: if context is None:
raise exception.Forbidden() raise exception.Forbidden()
key_id = self._generate_key_id() key_id = self._generate_key_id()
self.keys[key_id] = key self.keys[key_id] = managed_object
return key_id return key_id
def copy_key(self, context, key_id, **kwargs): def copy(self, context, managed_object_id, **kwargs):
if context is None: if context is None:
raise exception.Forbidden() raise exception.Forbidden()
copied_key_id = self._generate_key_id() copied_key_id = self._generate_key_id()
self.keys[copied_key_id] = self.keys[key_id] self.keys[copied_key_id] = self.keys[managed_object_id]
return copied_key_id return copied_key_id
def get_key(self, context, key_id, **kwargs): def get(self, context, managed_object_id, **kwargs):
"""Retrieves the key identified by the specified id. """Retrieves the key identified by the specified id.
This implementation returns the key that is associated with the This implementation returns the key that is associated with the
@ -114,10 +116,10 @@ class MockKeyManager(key_manager.KeyManager):
if context is None: if context is None:
raise exception.Forbidden() raise exception.Forbidden()
return self.keys[key_id] return self.keys[managed_object_id]
def delete_key(self, context, key_id, **kwargs): def delete(self, context, managed_object_id, **kwargs):
"""Deletes the key identified by the specified id. """Deletes the object identified by the specified id.
A Forbidden exception is raised if the context is None and a A Forbidden exception is raised if the context is None and a
KeyError is raised if the UUID is invalid. KeyError is raised if the UUID is invalid.
@ -125,7 +127,7 @@ class MockKeyManager(key_manager.KeyManager):
if context is None: if context is None:
raise exception.Forbidden() raise exception.Forbidden()
del self.keys[key_id] del self.keys[managed_object_id]
def _generate_password(self, length, symbolgroups): def _generate_password(self, length, symbolgroups):
"""Generate a random password from the supplied symbol groups. """Generate a random password from the supplied symbol groups.

View File

@ -68,17 +68,6 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
self.key_mgr._barbican_client = self.mock_barbican self.key_mgr._barbican_client = self.mock_barbican
self.key_mgr._current_context = self.ctxt self.key_mgr._current_context = self.ctxt
def _build_mock_symKey(self):
self.mock_symKey = mock.Mock()
def fake_sym_key(alg, key):
self.mock_symKey.get_encoded.return_value = key
p = mock.PropertyMock(return_value=alg)
type(self.mock_symKey).algorithm = p
return self.mock_symKey
self.original_key = key_manager_key.SymmetricKey
key_manager_key.SymmetricKey = fake_sym_key
def test_copy_key(self): def test_copy_key(self):
# Create metadata for original secret # Create metadata for original secret
original_secret_metadata = mock.Mock() original_secret_metadata = mock.Mock()
@ -101,16 +90,13 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
self.get.return_value = original_secret_metadata self.get.return_value = original_secret_metadata
self.create.return_value = copied_secret self.create.return_value = copied_secret
# Create the mock key
self._build_mock_symKey()
# Copy the original # Copy the original
self.key_mgr.copy_key(self.ctxt, self.key_id) self.key_mgr.copy(self.ctxt, self.key_id)
# Assert proper methods were called # Assert proper methods were called
self.get.assert_called_once_with(self.secret_ref) self.get.assert_called_once_with(self.secret_ref)
self.create.assert_called_once_with( self.create.assert_called_once_with(
payload=self.mock_symKey.get_encoded(), payload=original_secret_metadata.payload,
algorithm=mock.sentinel.alg, algorithm=mock.sentinel.alg,
expiration=mock.sentinel.expiration) expiration=mock.sentinel.expiration)
copied_secret.store.assert_called_once_with() copied_secret.store.assert_called_once_with()
@ -118,7 +104,7 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
def test_copy_null_context(self): def test_copy_null_context(self):
self.key_mgr._barbican_client = None self.key_mgr._barbican_client = None
self.assertRaises(exception.Forbidden, self.assertRaises(exception.Forbidden,
self.key_mgr.copy_key, None, self.key_id) self.key_mgr.copy, None, self.key_id)
def test_create_key(self): def test_create_key(self):
# Create order_ref_url and assign return value # Create order_ref_url and assign return value
@ -149,15 +135,15 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
def test_delete_null_context(self): def test_delete_null_context(self):
self.key_mgr._barbican_client = None self.key_mgr._barbican_client = None
self.assertRaises(exception.Forbidden, self.assertRaises(exception.Forbidden,
self.key_mgr.delete_key, None, self.key_id) self.key_mgr.delete, None, self.key_id)
def test_delete_key(self): def test_delete_key(self):
self.key_mgr.delete_key(self.ctxt, self.key_id) self.key_mgr.delete(self.ctxt, self.key_id)
self.delete.assert_called_once_with(self.secret_ref) self.delete.assert_called_once_with(self.secret_ref)
def test_delete_unknown_key(self): def test_delete_unknown_key(self):
self.assertRaises(exception.KeyManagerError, self.assertRaises(exception.KeyManagerError,
self.key_mgr.delete_key, self.ctxt, None) self.key_mgr.delete, self.ctxt, None)
def test_get_key(self): def test_get_key(self):
original_secret_metadata = mock.Mock() original_secret_metadata = mock.Mock()
@ -167,7 +153,7 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
original_secret_metadata.payload = original_secret_data original_secret_metadata.payload = original_secret_data
self.mock_barbican.secrets.get.return_value = original_secret_metadata self.mock_barbican.secrets.get.return_value = original_secret_metadata
key = self.key_mgr.get_key(self.ctxt, self.key_id) key = self.key_mgr.get(self.ctxt, self.key_id)
self.get.assert_called_once_with(self.secret_ref) self.get.assert_called_once_with(self.secret_ref)
self.assertEqual(key.get_encoded(), original_secret_data) self.assertEqual(key.get_encoded(), original_secret_data)
@ -175,11 +161,11 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
def test_get_null_context(self): def test_get_null_context(self):
self.key_mgr._barbican_client = None self.key_mgr._barbican_client = None
self.assertRaises(exception.Forbidden, self.assertRaises(exception.Forbidden,
self.key_mgr.get_key, None, self.key_id) self.key_mgr.get, None, self.key_id)
def test_get_unknown_key(self): def test_get_unknown_key(self):
self.assertRaises(exception.KeyManagerError, self.assertRaises(exception.KeyManagerError,
self.key_mgr.get_key, self.ctxt, None) self.key_mgr.get, self.ctxt, None)
def test_store_key_base64(self): def test_store_key_base64(self):
# Create Key to store # Create Key to store
@ -194,7 +180,7 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
secret.store.return_value = self.secret_ref secret.store.return_value = self.secret_ref
# Store the Key # Store the Key
returned_uuid = self.key_mgr.store_key(self.ctxt, _key) returned_uuid = self.key_mgr.store(self.ctxt, _key)
self.create.assert_called_once_with(algorithm='AES', self.create.assert_called_once_with(algorithm='AES',
payload=secret_key, payload=secret_key,
@ -209,7 +195,7 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
secret_key_text) secret_key_text)
# Store the Key # Store the Key
self.key_mgr.store_key(self.ctxt, _key) self.key_mgr.store(self.ctxt, _key)
self.create.assert_called_once_with(algorithm='AES', self.create.assert_called_once_with(algorithm='AES',
payload=secret_key_text, payload=secret_key_text,
expiration=None) expiration=None)
@ -218,4 +204,4 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
def test_store_null_context(self): def test_store_null_context(self):
self.key_mgr._barbican_client = None self.key_mgr._barbican_client = None
self.assertRaises(exception.Forbidden, self.assertRaises(exception.Forbidden,
self.key_mgr.store_key, None, None) self.key_mgr.store, None, None)

View File

@ -44,7 +44,7 @@ class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
def test_create_key_with_length(self): def test_create_key_with_length(self):
for length in [64, 128, 256]: for length in [64, 128, 256]:
key_id = self.key_mgr.create_key(self.context, key_length=length) key_id = self.key_mgr.create_key(self.context, key_length=length)
key = self.key_mgr.get_key(self.context, key_id) key = self.key_mgr.get(self.context, key_id)
self.assertEqual(length / 8, len(key.get_encoded())) self.assertEqual(length / 8, len(key.get_encoded()))
def test_create_null_context(self): def test_create_null_context(self):
@ -54,47 +54,47 @@ class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
def test_store_and_get_key(self): def test_store_and_get_key(self):
secret_key = bytes(b'0' * 64) secret_key = bytes(b'0' * 64)
_key = sym_key.SymmetricKey('AES', 64 * 8, secret_key) _key = sym_key.SymmetricKey('AES', 64 * 8, secret_key)
key_id = self.key_mgr.store_key(self.context, _key) key_id = self.key_mgr.store(self.context, _key)
actual_key = self.key_mgr.get_key(self.context, key_id) actual_key = self.key_mgr.get(self.context, key_id)
self.assertEqual(_key, actual_key) self.assertEqual(_key, actual_key)
def test_store_null_context(self): def test_store_null_context(self):
self.assertRaises(exception.Forbidden, self.assertRaises(exception.Forbidden,
self.key_mgr.store_key, None, None) self.key_mgr.store, None, None)
def test_copy_key(self): def test_copy_key(self):
key_id = self.key_mgr.create_key(self.context) key_id = self.key_mgr.create_key(self.context)
key = self.key_mgr.get_key(self.context, key_id) key = self.key_mgr.get(self.context, key_id)
copied_key_id = self.key_mgr.copy_key(self.context, key_id) copied_key_id = self.key_mgr.copy(self.context, key_id)
copied_key = self.key_mgr.get_key(self.context, copied_key_id) copied_key = self.key_mgr.get(self.context, copied_key_id)
self.assertNotEqual(key_id, copied_key_id) self.assertNotEqual(key_id, copied_key_id)
self.assertEqual(key, copied_key) self.assertEqual(key, copied_key)
def test_copy_null_context(self): def test_copy_null_context(self):
self.assertRaises(exception.Forbidden, self.assertRaises(exception.Forbidden,
self.key_mgr.copy_key, None, None) self.key_mgr.copy, None, None)
def test_get_null_context(self): def test_get_null_context(self):
self.assertRaises(exception.Forbidden, self.assertRaises(exception.Forbidden,
self.key_mgr.get_key, None, None) self.key_mgr.get, None, None)
def test_get_unknown_key(self): def test_get_unknown_key(self):
self.assertRaises(KeyError, self.key_mgr.get_key, self.context, None) self.assertRaises(KeyError, self.key_mgr.get, self.context, None)
def test_delete_key(self): def test_delete_key(self):
key_id = self.key_mgr.create_key(self.context) key_id = self.key_mgr.create_key(self.context)
self.key_mgr.delete_key(self.context, key_id) self.key_mgr.delete(self.context, key_id)
self.assertRaises(KeyError, self.key_mgr.get_key, self.context, self.assertRaises(KeyError, self.key_mgr.get, self.context,
key_id) key_id)
def test_delete_null_context(self): def test_delete_null_context(self):
self.assertRaises(exception.Forbidden, self.assertRaises(exception.Forbidden,
self.key_mgr.delete_key, None, None) self.key_mgr.delete, None, None)
def test_delete_unknown_key(self): def test_delete_unknown_key(self):
self.assertRaises(KeyError, self.key_mgr.delete_key, self.context, self.assertRaises(KeyError, self.key_mgr.delete, self.context,
None) None)

View File

@ -30,18 +30,22 @@ class NotImplementedKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
self.assertRaises(NotImplementedError, self.assertRaises(NotImplementedError,
self.key_mgr.create_key, None) self.key_mgr.create_key, None)
def test_store_key(self): def test_create_key_pair(self):
self.assertRaises(NotImplementedError, self.assertRaises(NotImplementedError,
self.key_mgr.store_key, None, None) self.key_mgr.create_key_pair, None, None, None)
def test_copy_key(self): def test_store(self):
self.assertRaises(NotImplementedError, self.assertRaises(NotImplementedError,
self.key_mgr.copy_key, None, None) self.key_mgr.store, None, None)
def test_get_key(self): def test_copy(self):
self.assertRaises(NotImplementedError, self.assertRaises(NotImplementedError,
self.key_mgr.get_key, None, None) self.key_mgr.copy, None, None)
def test_delete_key(self): def test_get(self):
self.assertRaises(NotImplementedError, self.assertRaises(NotImplementedError,
self.key_mgr.delete_key, None, None) self.key_mgr.get, None, None)
def test_delete(self):
self.assertRaises(NotImplementedError,
self.key_mgr.delete, None, None)