Merge "Add name to Castellan Objects and Barbican Key Manager"

This commit is contained in:
Jenkins 2015-10-05 16:53:01 +00:00 committed by Gerrit Code Review
commit c347f4d4b2
16 changed files with 164 additions and 26 deletions

View File

@ -29,6 +29,18 @@ import six
class ManagedObject(object):
"""Base class to represent all managed objects."""
def __init__(self, name=None):
"""Managed Object has a name, defaulted to None."""
self._name = name
@property
def name(self):
"""Returns the name.
Returns the object's name or None if this object does not have one.
"""
return self._name
@abc.abstractproperty
def format(self):
"""Returns the encoding format.

View File

@ -25,12 +25,13 @@ from castellan.common.objects import managed_object
class OpaqueData(managed_object.ManagedObject):
"""This class represents opaque data."""
def __init__(self, data):
def __init__(self, data, name=None):
"""Create a new OpaqueData object.
Expected type for data is a bytestring.
"""
self._data = data
super(OpaqueData, self).__init__(name=name)
@property
def format(self):
@ -43,7 +44,8 @@ class OpaqueData(managed_object.ManagedObject):
def __eq__(self, other):
if isinstance(other, OpaqueData):
return self._data == other._data
return (self._data == other._data and
self._name == other._name)
else:
return False

View File

@ -25,12 +25,13 @@ from castellan.common.objects import managed_object
class Passphrase(managed_object.ManagedObject):
"""This class represents a passphrase."""
def __init__(self, passphrase):
def __init__(self, passphrase, name=None):
"""Create a new Passphrase object.
The expected type for the passphrase is a bytestring.
"""
self._passphrase = passphrase
super(Passphrase, self).__init__(name=name)
@property
def format(self):
@ -43,7 +44,8 @@ class Passphrase(managed_object.ManagedObject):
def __eq__(self, other):
if isinstance(other, Passphrase):
return self._passphrase == other._passphrase
return (self._passphrase == other._passphrase and
self._name == other._name)
else:
return False

View File

@ -25,7 +25,7 @@ from castellan.common.objects import key
class PrivateKey(key.Key):
"""This class represents private keys."""
def __init__(self, algorithm, bit_length, key):
def __init__(self, algorithm, bit_length, key, name=None):
"""Create a new PrivateKey object.
The arguments specify the algorithm and bit length for the asymmetric
@ -34,6 +34,7 @@ class PrivateKey(key.Key):
self._alg = algorithm
self._bit_length = bit_length
self._key = key
super(PrivateKey, self).__init__(name=name)
@property
def algorithm(self):
@ -57,7 +58,8 @@ class PrivateKey(key.Key):
def __eq__(self, other):
if isinstance(other, PrivateKey):
return (self._alg == other._alg and
self._key == other._key)
self._key == other._key and
self._name == other._name)
else:
return False

View File

@ -25,7 +25,7 @@ from castellan.common.objects import key
class PublicKey(key.Key):
"""This class represents public keys."""
def __init__(self, algorithm, bit_length, key):
def __init__(self, algorithm, bit_length, key, name=None):
"""Create a new PublicKey object.
The arguments specify the algorithm and bit length for the asymmetric
@ -35,6 +35,7 @@ class PublicKey(key.Key):
self._alg = algorithm
self._bit_length = bit_length
self._key = key
super(PublicKey, self).__init__(name=name)
@property
def algorithm(self):
@ -58,7 +59,8 @@ class PublicKey(key.Key):
def __eq__(self, other):
if isinstance(other, PublicKey):
return (self._alg == other._alg and
self._key == other._key)
self._key == other._key and
self._name == other._name)
else:
return False

View File

@ -25,7 +25,7 @@ from castellan.common.objects import key
class SymmetricKey(key.Key):
"""This class represents symmetric keys."""
def __init__(self, algorithm, bit_length, key):
def __init__(self, algorithm, bit_length, key, name=None):
"""Create a new SymmetricKey object.
The arguments specify the algorithm and bit length for the symmetric
@ -34,6 +34,7 @@ class SymmetricKey(key.Key):
self._alg = algorithm
self._bit_length = bit_length
self._key = key
super(SymmetricKey, self).__init__(name=name)
@property
def algorithm(self):
@ -58,7 +59,8 @@ class SymmetricKey(key.Key):
if isinstance(other, SymmetricKey):
return (self._alg == other._alg and
self._bit_length == other._bit_length and
self._key == other._key)
self._key == other._key and
self._name == other._name)
else:
return False

View File

@ -25,12 +25,13 @@ from castellan.common.objects import certificate
class X509(certificate.Certificate):
"""This class represents X.509 certificates."""
def __init__(self, data):
def __init__(self, data, name=None):
"""Create a new X509 object.
The data should be in a bytestring.
"""
self._data = data
super(X509, self).__init__(name=name)
@property
def format(self):
@ -43,7 +44,8 @@ class X509(certificate.Certificate):
def __eq__(self, other):
if isinstance(other, X509):
return (self._data == other._data)
return (self._data == other._data and
self._name == other._name)
else:
return False

View File

@ -166,13 +166,15 @@ class BarbicanKeyManager(key_manager.KeyManager):
endpoint, api_version)
return base_url
def create_key(self, context, algorithm, length, expiration=None):
def create_key(self, context, algorithm, length,
expiration=None, name=None):
"""Creates a symmetric key.
:param context: contains information of the user and the environment
for the request (castellan/context.py)
:param algorithm: the algorithm associated with the secret
:param length: the bit length of the secret
:param name: the name of the key
:param expiration: the date the key will expire
:return: the UUID of the new key
:raises KeyManagerError: if key creation fails
@ -181,6 +183,7 @@ class BarbicanKeyManager(key_manager.KeyManager):
try:
key_order = barbican_client.orders.create_key(
name=name,
algorithm=algorithm,
bit_length=length,
expiration=expiration)
@ -193,13 +196,15 @@ class BarbicanKeyManager(key_manager.KeyManager):
LOG.error(u._LE("Error creating key: %s"), e)
raise exception.KeyManagerError(reason=e)
def create_key_pair(self, context, algorithm, length, expiration=None):
def create_key_pair(self, context, algorithm, length,
expiration=None, name=None):
"""Creates an asymmetric key pair.
:param context: contains information of the user and the environment
for the request (castellan/context.py)
:param algorithm: the algorithm associated with the secret
:param length: the bit length of the secret
:param name: the name of the key
:param expiration: the date the key will expire
:return: the UUIDs of the new key, in the order (private, public)
:raises NotImplementedError: until implemented
@ -211,6 +216,7 @@ class BarbicanKeyManager(key_manager.KeyManager):
key_pair_order = barbican_client.orders.create_asymmetric(
algorithm=algorithm,
bit_length=length,
name=name,
expiration=expiration)
order_ref = key_pair_order.submit()
@ -230,6 +236,8 @@ class BarbicanKeyManager(key_manager.KeyManager):
def _get_barbican_object(self, barbican_client, managed_object):
"""Converts the Castellan managed_object to a Barbican secret."""
name = getattr(managed_object, 'name', None)
try:
algorithm = managed_object.algorithm
bit_length = managed_object.bit_length
@ -244,6 +252,7 @@ class BarbicanKeyManager(key_manager.KeyManager):
secret = barbican_client.secrets.create(payload=payload,
algorithm=algorithm,
bit_length=bit_length,
name=name,
secret_type=secret_type)
return secret
@ -283,8 +292,8 @@ class BarbicanKeyManager(key_manager.KeyManager):
:param context: contains information of the user and the environment
for the request (castellan/context.py)
:param managed_object: the unencrypted secret data. Known as "payload"
to the barbicanclient api
:param managed_object: a secret object with unencrypted payload.
Known as "secret" to the barbicanclient api
:param expiration: the expiration time of the secret in ISO 8601
format
:returns: the UUID of the stored object
@ -415,9 +424,11 @@ class BarbicanKeyManager(key_manager.KeyManager):
if issubclass(secret_type, key_base_class.Key):
return secret_type(secret.algorithm,
secret.bit_length,
secret_data)
secret_data,
secret.name)
else:
return secret_type(secret_data)
return secret_type(secret_data,
secret.name)
def _get_secret(self, context, object_id):
"""Returns the metadata of the secret.

View File

@ -40,7 +40,8 @@ class KeyManager(object):
pass
@abc.abstractmethod
def create_key(self, context, algorithm, length, expiration=None):
def create_key(self, context, algorithm, length,
expiration=None, name=None):
"""Creates a symmetric key.
This method creates a symmetric key and returns the key's UUID. If the
@ -50,7 +51,8 @@ class KeyManager(object):
pass
@abc.abstractmethod
def create_key_pair(self, context, algorithm, length, expiration=None):
def create_key_pair(self, context, algorithm, length,
expiration=None, name=None):
"""Creates an asymmetric key pair.
This method creates an asymmetric key pair and returns the pair of key

View File

@ -186,6 +186,10 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
original_secret_metadata.algorithm = mock.sentinel.alg
original_secret_metadata.bit_length = mock.sentinel.bit
original_secret_metadata.secret_type = 'symmetric'
key_name = 'my key'
original_secret_metadata.name = key_name
original_secret_data = b'test key'
original_secret_metadata.payload = original_secret_data
@ -193,6 +197,7 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
key = self.key_mgr.get(self.ctxt, self.key_id)
self.get.assert_called_once_with(self.secret_ref)
self.assertEqual(key_name, key.name)
self.assertEqual(original_secret_data, key.get_encoded())
def test_get_null_context(self):
@ -228,10 +233,36 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
self.create.assert_called_once_with(algorithm='AES',
bit_length=key_length,
name=None,
payload=secret_key,
secret_type='symmetric')
self.assertEqual(self.key_id, returned_uuid)
def test_store_key_with_name(self):
# Create Key to store
secret_key = bytes(b'\x01\x02\xA0\xB3')
key_length = len(secret_key) * 8
secret_name = 'My Secret'
_key = sym_key.SymmetricKey('AES',
key_length,
secret_key,
secret_name)
# Define the return values
secret = mock.Mock()
self.create.return_value = secret
secret.store.return_value = self.secret_ref
# Store the Key
returned_uuid = self.key_mgr.store(self.ctxt, _key)
self.create.assert_called_once_with(algorithm='AES',
bit_length=key_length,
payload=secret_key,
name=secret_name,
secret_type='symmetric')
self.assertEqual(self.key_id, returned_uuid)
def test_store_null_context(self):
self.key_mgr._barbican_client = None
self.assertRaises(exception.Forbidden,

View File

@ -24,10 +24,11 @@ from castellan.tests import base
class OpaqueDataTestCase(base.TestCase):
def _create_data(self):
return opaque_data.OpaqueData(self.data)
return opaque_data.OpaqueData(self.data, self.name)
def setUp(self):
self.data = bytes(b"secret opaque data")
self.name = 'my opaque'
self.opaque_data = self._create_data()
super(OpaqueDataTestCase, self).setUp()
@ -38,6 +39,9 @@ class OpaqueDataTestCase(base.TestCase):
def test_get_encoded(self):
self.assertEqual(self.data, self.opaque_data.get_encoded())
def test_get_name(self):
self.assertEqual(self.name, self.opaque_data.name)
def test___eq__(self):
self.assertTrue(self.opaque_data == self.opaque_data)
@ -46,6 +50,11 @@ class OpaqueDataTestCase(base.TestCase):
def test___ne__(self):
self.assertFalse(self.opaque_data != self.opaque_data)
self.assertFalse(self.name != self.name)
self.assertTrue(self.opaque_data is not None)
self.assertTrue(None != self.opaque_data)
def test___ne__name(self):
other_opaque = opaque_data.OpaqueData(self.data, "other opaque")
self.assertTrue(self.opaque_data != other_opaque)

View File

@ -24,10 +24,12 @@ from castellan.tests import base
class PassphraseTestCase(base.TestCase):
def _create_passphrase(self):
return passphrase.Passphrase(self.passphrase_data)
return passphrase.Passphrase(self.passphrase_data,
self.name)
def setUp(self):
self.passphrase_data = bytes(b"secret passphrase")
self.name = 'my phrase'
self.passphrase = self._create_passphrase()
super(PassphraseTestCase, self).setUp()
@ -38,6 +40,9 @@ class PassphraseTestCase(base.TestCase):
def test_get_encoded(self):
self.assertEqual(self.passphrase_data, self.passphrase.get_encoded())
def test_get_name(self):
self.assertEqual(self.name, self.passphrase.name)
def test___eq__(self):
self.assertTrue(self.passphrase == self.passphrase)
@ -46,6 +51,12 @@ class PassphraseTestCase(base.TestCase):
def test___ne__(self):
self.assertFalse(self.passphrase != self.passphrase)
self.assertFalse(self.name != self.name)
self.assertTrue(self.passphrase is not None)
self.assertTrue(None != self.passphrase)
def test___ne__name(self):
other_phrase = passphrase.Passphrase(self.passphrase_data,
"other phrase")
self.assertTrue(self.passphrase_data != other_phrase)

View File

@ -27,12 +27,14 @@ class PrivateKeyTestCase(base.KeyTestCase):
def _create_key(self):
return private_key.PrivateKey(self.algorithm,
self.length,
self.encoded)
self.encoded,
self.name)
def setUp(self):
self.algorithm = 'RSA'
self.length = 2048
self.encoded = bytes(utils.get_private_key_der())
self.name = 'my key'
super(PrivateKeyTestCase, self).setUp()
@ -42,6 +44,9 @@ class PrivateKeyTestCase(base.KeyTestCase):
def test_get_length(self):
self.assertEqual(self.length, self.key.bit_length)
def test_get_name(self):
self.assertEqual(self.name, self.key.name)
def test_get_format(self):
self.assertEqual('PKCS8', self.key.format)
@ -56,6 +61,14 @@ class PrivateKeyTestCase(base.KeyTestCase):
def test___ne__(self):
self.assertFalse(self.key != self.key)
self.assertFalse(self.name != self.name)
self.assertTrue(self.key is not None)
self.assertTrue(None != self.key)
def test___ne__name(self):
other_key = private_key.PrivateKey(self.algorithm,
self.length,
self.encoded,
'other key')
self.assertTrue(self.key != other_key)

View File

@ -25,12 +25,16 @@ from castellan.tests import utils
class PublicKeyTestCase(base.KeyTestCase):
def _create_key(self):
return public_key.PublicKey(self.algorithm, self.length, self.encoded)
return public_key.PublicKey(self.algorithm,
self.length,
self.encoded,
self.name)
def setUp(self):
self.algorithm = 'RSA'
self.length = 2048
self.encoded = bytes(utils.get_public_key_der())
self.name = 'my key'
super(PublicKeyTestCase, self).setUp()
@ -40,6 +44,9 @@ class PublicKeyTestCase(base.KeyTestCase):
def test_get_length(self):
self.assertEqual(self.length, self.key.bit_length)
def test_get_name(self):
self.assertEqual(self.name, self.key.name)
def test_get_format(self):
self.assertEqual('SubjectPublicKeyInfo', self.key.format)
@ -54,6 +61,14 @@ class PublicKeyTestCase(base.KeyTestCase):
def test___ne__(self):
self.assertFalse(self.key != self.key)
self.assertFalse(self.name != self.name)
self.assertTrue(self.key is not None)
self.assertTrue(None != self.key)
def test___ne__name(self):
other_key = public_key.PublicKey(self.algorithm,
self.length,
self.encoded,
'other key')
self.assertTrue(self.key != other_key)

View File

@ -26,18 +26,23 @@ class SymmetricKeyTestCase(base.KeyTestCase):
def _create_key(self):
return sym_key.SymmetricKey(self.algorithm,
self.bit_length,
self.encoded)
self.encoded,
self.name)
def setUp(self):
self.algorithm = 'AES'
self.encoded = bytes(b'0' * 64)
self.bit_length = len(self.encoded) * 8
self.name = 'my key'
super(SymmetricKeyTestCase, self).setUp()
def test_get_format(self):
self.assertEqual('RAW', self.key.format)
def test_get_name(self):
self.assertEqual(self.name, self.key.name)
def test_get_encoded(self):
self.assertEqual(self.encoded, self.key.get_encoded())
@ -55,6 +60,14 @@ class SymmetricKeyTestCase(base.KeyTestCase):
def test___ne__(self):
self.assertFalse(self.key != self.key)
self.assertFalse(self.name != self.name)
self.assertTrue(self.key is not None)
self.assertTrue(None != self.key)
def test___ne__name(self):
other_key = sym_key.SymmetricKey(self.algorithm,
self.bit_length,
self.encoded,
'other key')
self.assertTrue(self.key != other_key)

View File

@ -25,16 +25,20 @@ from castellan.tests import utils
class X509TestCase(base.CertificateTestCase):
def _create_cert(self):
return x_509.X509(self.data)
return x_509.X509(self.data, self.name)
def setUp(self):
self.data = utils.get_certificate_der()
self.name = 'my cert'
super(X509TestCase, self).setUp()
def test_get_format(self):
self.assertEqual('X.509', self.cert.format)
def test_get_name(self):
self.assertEqual(self.name, self.cert.name)
def test_get_encoded(self):
self.assertEqual(self.data, self.cert.get_encoded())
@ -46,6 +50,11 @@ class X509TestCase(base.CertificateTestCase):
def test___ne__(self):
self.assertFalse(self.cert != self.cert)
self.assertFalse(self.name != self.name)
self.assertTrue(self.cert is not None)
self.assertTrue(None != self.cert)
def test___ne__name(self):
other_x509 = x_509.X509(self.data, "other x509")
self.assertTrue(self.cert != other_x509)