barbican/barbican/tests/plugin/test_kmip.py

833 lines
35 KiB
Python

# Copyright (c) 2014 Johns Hopkins University Applied Physics Laboratory
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import ssl
import stat
import testtools
import mock
from barbican.plugin.interface import secret_store
from barbican.tests import keys
from barbican.tests import utils
from kmip.core import enums
from kmip.pie import client
from kmip.pie import objects
from barbican.plugin import kmip_secret_store as kss
from barbican.plugin.util import translations
def get_sample_opaque_secret():
return objects.OpaqueObject(
base64.b64decode(utils.get_symmetric_key()),
enums.OpaqueDataType.NONE)
def get_sample_symmetric_key(key_b64=utils.get_symmetric_key(),
key_length=128,
algorithm=enums.CryptographicAlgorithm.AES):
return objects.SymmetricKey(
algorithm,
key_length,
base64.b64decode(key_b64))
def get_sample_public_key(pkcs1=False):
if pkcs1:
public_key_value = kss.get_public_key_der_pkcs1(
keys.get_public_key_pem())
key_format_type = enums.KeyFormatType.PKCS_1
else:
public_key_value = keys.get_public_key_der()
key_format_type = enums.KeyFormatType.X_509
return objects.PublicKey(
enums.CryptographicAlgorithm.RSA,
2048,
public_key_value,
key_format_type)
def get_sample_private_key(pkcs1=False):
if pkcs1:
private_key_value = kss.get_private_key_der_pkcs1(
keys.get_private_key_pem())
key_format_type = enums.KeyFormatType.PKCS_1
else:
private_key_value = keys.get_private_key_der()
key_format_type = enums.KeyFormatType.PKCS_8
return objects.PrivateKey(
enums.CryptographicAlgorithm.RSA,
2048,
private_key_value,
key_format_type)
def get_sample_certificate():
return objects.X509Certificate(
keys.get_certificate_der())
@utils.parameterized_test_case
class WhenTestingKMIPSecretStore(utils.BaseTestCase):
"""Test using the KMIP server backend for SecretStore."""
def setUp(self):
super(WhenTestingKMIPSecretStore, self).setUp()
self.expected_username = "sample_username"
self.expected_password = "sample_password"
CONF = kss.CONF
CONF.kmip_plugin.username = self.expected_username
CONF.kmip_plugin.password = self.expected_password
CONF.kmip_plugin.keyfile = None
CONF.kmip_plugin.pkcs1_only = False
# get the latest protocol that SSL supports
protocol_dict = ssl.__dict__.get('_PROTOCOL_NAMES')
latest_protocol = protocol_dict.get(max(protocol_dict.keys()))
if not latest_protocol.startswith('PROTOCOL_'):
latest_protocol = 'PROTOCOL_' + latest_protocol
CONF.kmip_plugin.ssl_version = latest_protocol
self.secret_store = kss.KMIPSecretStore(CONF)
self.credential = self.secret_store.credential
self.symmetric_type = secret_store.SecretType.SYMMETRIC
self.sample_secret_features = {
'key_format_type': enums.KeyFormatType.RAW,
'key_value': {
'bytes': bytearray(b'\x00\x00\x00')
},
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES,
'cryptographic_length': 128
}
self.symmetric_key_uuid = 'dde870ad-cea3-41a3-9bb9-e8ab579a2f91'
self.public_key_uuid = 'cb908abb-d363-4d9f-8ef2-5e84d27dd25c'
self.private_key_uuid = '2d4c0544-4ec6-45b7-81cd-b23c75744eac'
self.sample_secret = get_sample_symmetric_key()
self.secret_store.client.open = mock.MagicMock(
spec=client.ProxyKmipClient.open)
self.secret_store.client.close = mock.MagicMock(
spec=client.ProxyKmipClient.close)
self.secret_store.client.create = mock.MagicMock(
return_value=self.symmetric_key_uuid)
self.secret_store.client.create_key_pair = mock.MagicMock(
return_value=(self.public_key_uuid, self.private_key_uuid))
self.secret_store.client.register = mock.MagicMock(
return_value='uuid')
self.secret_store.client.destroy = mock.MagicMock(
return_value=None)
self.secret_store.client.get = mock.MagicMock(
return_value=self.sample_secret)
# --------------- TEST CONFIG OPTIONS ---------------------------------
def test_enable_pkcs1_only_config_option(self):
CONF = kss.CONF
CONF.kmip_plugin.pkcs1_only = True
secret_store = kss.KMIPSecretStore(CONF)
self.assertTrue(secret_store.pkcs1_only)
@testtools.skipIf(not getattr(ssl, "PROTOCOL_TLSv1_2", None),
"TLSv1.2 is not available on this system")
def test_enable_tlsv12_config_option(self):
ssl_version = "PROTOCOL_TLSv1_2"
CONF = kss.CONF
CONF.kmip_plugin.ssl_version = ssl_version
kss.KMIPSecretStore(CONF)
self.assertEqual(ssl_version, CONF.kmip_plugin.ssl_version)
@testtools.skipIf(not getattr(ssl, "PROTOCOL_TLSv1", None),
"TLSv1 is not available on this system")
def test_enable_tlsv1_config_option(self):
ssl_version = "PROTOCOL_TLSv1"
CONF = kss.CONF
CONF.kmip_plugin.ssl_version = ssl_version
kss.KMIPSecretStore(CONF)
self.assertEqual(ssl_version, CONF.kmip_plugin.ssl_version)
# --------------- TEST GENERATE_SUPPORTS ---------------------------------
def test_generate_supports_aes(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
None, 'mode')
for x in [128, 192, 256]:
key_spec.bit_length = x
self.assertTrue(self.secret_store.generate_supports(key_spec))
def test_generate_supports_des(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.DES,
None, 'mode')
for x in [56]:
key_spec.bit_length = x
self.assertTrue(self.secret_store.generate_supports(key_spec))
def test_generate_supports_desede(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.DESEDE,
None, 'mode')
for x in [56, 112, 168]:
key_spec.bit_length = x
self.assertTrue(self.secret_store.generate_supports(key_spec))
def test_generate_supports_rsa(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA,
None, 'mode')
for x in [2048, 3072, 4096]:
key_spec.bit_length = x
self.assertTrue(self.secret_store.generate_supports(key_spec))
def test_generate_supports_dsa(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.DSA,
None, 'mode')
for x in [2048, 3072]:
key_spec.bit_length = x
self.assertTrue(self.secret_store.generate_supports(key_spec))
def test_generate_supports_with_invalid_alg(self):
key_spec = secret_store.KeySpec('invalid_alg', 56, 'mode')
self.assertFalse(self.secret_store.generate_supports(key_spec))
def test_generate_supports_with_valid_alg_invalid_bit_length(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
56, 'mode')
self.assertFalse(self.secret_store.generate_supports(key_spec))
# ------------ TEST GENERATE_SYMMETRIC -----------------------------------
def test_generate_symmetric_key_assert_called(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
128, 'mode')
self.secret_store.generate_symmetric_key(key_spec)
self.secret_store.client.create.assert_called_once_with(
enums.CryptographicAlgorithm.AES,
128)
def test_generate_symmetric_key_return_value(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
128, 'mode')
return_value = self.secret_store.generate_symmetric_key(key_spec)
expected = {kss.KMIPSecretStore.KEY_UUID:
self.symmetric_key_uuid}
self.assertEqual(expected, return_value)
def test_generate_symmetric_key_server_error_occurs(self):
self.secret_store.client.create.side_effect = Exception
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
128, 'mode')
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.generate_symmetric_key,
key_spec)
def test_generate_symmetric_key_invalid_algorithm(self):
key_spec = secret_store.KeySpec('invalid_algorithm',
128, 'mode')
self.assertRaises(
secret_store.SecretAlgorithmNotSupportedException,
self.secret_store.generate_symmetric_key,
key_spec)
def test_generate_symmetric_key_valid_algorithm_invalid_bit_length(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
56, 'mode')
self.assertRaises(
secret_store.SecretAlgorithmNotSupportedException,
self.secret_store.generate_symmetric_key,
key_spec)
def test_generate_symmetric_key_not_symmetric_algorithm(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA,
2048, 'mode')
self.assertRaises(
kss.KMIPSecretStoreError,
self.secret_store.generate_symmetric_key,
key_spec)
def test_generate_symmetric_key_error_opening_connection(self):
self.secret_store.client.open.side_effect = Exception
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
128, 'mode')
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.generate_symmetric_key,
key_spec)
# ---------------- TEST GENERATE_ASYMMETRIC ------------------------------
def test_generate_asymmetric_key_assert_called(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA,
2048, 'mode')
self.secret_store.generate_asymmetric_key(key_spec)
self.secret_store.client.create_key_pair.assert_called_once_with(
enums.CryptographicAlgorithm.RSA,
2048)
def test_generate_asymmetric_key_return_value(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA,
2048, 'mode')
return_value = self.secret_store.generate_asymmetric_key(key_spec)
expected_private_key_meta = {
kss.KMIPSecretStore.KEY_UUID:
self.private_key_uuid}
expected_public_key_meta = {
kss.KMIPSecretStore.KEY_UUID:
self.public_key_uuid}
expected_passphrase_meta = None
self.assertEqual(
expected_private_key_meta, return_value.private_key_meta)
self.assertEqual(
expected_public_key_meta, return_value.public_key_meta)
self.assertEqual(
expected_passphrase_meta, return_value.passphrase_meta)
def test_generate_asymmetric_key_server_error_occurs(self):
self.secret_store.client.create_key_pair.side_effect = Exception
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA,
2048, 'mode')
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.generate_asymmetric_key,
key_spec)
def test_generate_asymmetric_key_invalid_algorithm(self):
key_spec = secret_store.KeySpec('invalid_algorithm', 160, 'mode')
self.assertRaises(
secret_store.SecretAlgorithmNotSupportedException,
self.secret_store.generate_asymmetric_key,
key_spec)
def test_generate_asymmetric_key_valid_algorithm_invalid_bit_length(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA,
56, 'mode')
self.assertRaises(
secret_store.SecretAlgorithmNotSupportedException,
self.secret_store.generate_asymmetric_key,
key_spec)
def test_generate_asymmetric_key_not_asymmetric_algorithm(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
128, 'mode')
self.assertRaises(
secret_store.SecretAlgorithmNotSupportedException,
self.secret_store.generate_asymmetric_key,
key_spec)
def test_generate_asymmetric_key_check_for_passphrase(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA,
2048, 'mode', 'passphrase')
self.assertRaises(
kss.KMIPSecretStoreActionNotSupported,
self.secret_store.generate_asymmetric_key,
key_spec)
def test_generate_asymmetric_key_error_opening_connection(self):
self.secret_store.client.open.side_effect = Exception
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA,
2048, 'mode')
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.generate_asymmetric_key,
key_spec)
# ----------------- TEST STORE -------------------------------------------
def test_store_symmetric_secret_assert_called(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
128, 'mode')
sym_key = utils.get_symmetric_key()
secret_dto = secret_store.SecretDTO(secret_store.SecretType.SYMMETRIC,
sym_key,
key_spec,
'content_type',
transport_key=None)
self.secret_store.store_secret(secret_dto)
self.secret_store.client.register.assert_called_once_with(
objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
128,
base64.b64decode(utils.get_symmetric_key())))
def test_store_symmetric_secret_return_value(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
128, 'mode')
sym_key = utils.get_symmetric_key()
secret_dto = secret_store.SecretDTO(secret_store.SecretType.SYMMETRIC,
sym_key,
key_spec,
'content_type',
transport_key=None)
return_value = self.secret_store.store_secret(secret_dto)
expected = {kss.KMIPSecretStore.KEY_UUID: 'uuid'}
self.assertEqual(expected, return_value)
def test_store_passphrase_secret_assert_called(self):
key_spec = secret_store.KeySpec(None, None, None)
passphrase = base64.b64encode(b"supersecretpassphrase")
secret_dto = secret_store.SecretDTO(secret_store.SecretType.PASSPHRASE,
passphrase,
key_spec,
'content_type',
transport_key=None)
self.secret_store.store_secret(secret_dto)
self.secret_store.client.register.assert_called_once_with(
objects.SecretData(
base64.b64decode(passphrase),
enums.SecretDataType.PASSWORD))
def test_store_passphrase_secret_return_value(self):
key_spec = secret_store.KeySpec(None, None, None)
passphrase = b"supersecretpassphrase"
secret_dto = secret_store.SecretDTO(secret_store.SecretType.PASSPHRASE,
base64.b64encode(passphrase),
key_spec,
'content_type',
transport_key=None)
return_value = self.secret_store.store_secret(secret_dto)
expected = {kss.KMIPSecretStore.KEY_UUID: 'uuid'}
self.assertEqual(expected, return_value)
def test_store_opaque_secret_assert_called(self):
key_spec = secret_store.KeySpec(None, None, None)
opaque = base64.b64encode(b'\x00\x01\x02\x03\x04\x05\x06\x07')
secret_dto = secret_store.SecretDTO(secret_store.SecretType.OPAQUE,
opaque,
key_spec,
'content_type',
transport_key=None)
self.secret_store.store_secret(secret_dto)
self.secret_store.client.register.assert_called_once_with(
objects.OpaqueObject(
base64.b64decode(opaque),
enums.OpaqueDataType.NONE))
def test_store_opaque_secret_return_value(self):
key_spec = secret_store.KeySpec(None, None, None)
opaque = b'\x00\x01\x02\x03\x04\x05\x06\x07'
secret_dto = secret_store.SecretDTO(secret_store.SecretType.OPAQUE,
base64.b64encode(opaque),
key_spec,
'content_type',
transport_key=None)
return_value = self.secret_store.store_secret(secret_dto)
expected = {kss.KMIPSecretStore.KEY_UUID: 'uuid'}
self.assertEqual(expected, return_value)
@utils.parameterized_dataset({
'private_pkcs8': [secret_store.SecretType.PRIVATE,
keys.get_private_key_pem(),
enums.ObjectType.PRIVATE_KEY,
keys.get_private_key_der(),
False],
'private_pkcs1': [secret_store.SecretType.PRIVATE,
keys.get_private_key_pem(),
enums.ObjectType.PRIVATE_KEY,
kss.get_private_key_der_pkcs1(
keys.get_private_key_pem()),
True],
'public_pkcs8': [secret_store.SecretType.PUBLIC,
keys.get_public_key_pem(),
enums.ObjectType.PUBLIC_KEY,
keys.get_public_key_der(),
False],
'public_pkcs1': [secret_store.SecretType.PUBLIC,
keys.get_public_key_pem(),
enums.ObjectType.PUBLIC_KEY,
kss.get_public_key_der_pkcs1(
keys.get_public_key_pem()),
True],
})
def test_store_asymmetric_key_secret_assert_called(self,
barbican_type,
barbican_key,
kmip_type,
kmip_key,
pkcs1_only):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 2048)
secret_value = base64.b64encode(barbican_key)
secret_dto = secret_store.SecretDTO(barbican_type,
secret_value,
key_spec,
'content_type')
self.secret_store.pkcs1_only = pkcs1_only
self.secret_store.store_secret(secret_dto)
secret_value = base64.b64decode(secret_value)
if not pkcs1_only:
secret_value = translations.convert_pem_to_der(
secret_value,
barbican_type)
if kmip_type == enums.ObjectType.PUBLIC_KEY:
if pkcs1_only:
secret_value = kss.get_public_key_der_pkcs1(secret_value)
secret = objects.PublicKey(
enums.CryptographicAlgorithm.RSA,
2048,
secret_value,
enums.KeyFormatType.X_509)
else:
if pkcs1_only:
secret_value = kss.get_private_key_der_pkcs1(secret_value)
secret = objects.PrivateKey(
enums.CryptographicAlgorithm.RSA,
2048,
secret_value,
enums.KeyFormatType.PKCS_8)
self.secret_store.client.register.assert_called_once_with(secret)
@utils.parameterized_dataset({
'private_pkcs8': [secret_store.SecretType.PRIVATE,
keys.get_private_key_pem(),
False],
'private_pkcs1': [secret_store.SecretType.PRIVATE,
keys.get_private_key_pem(),
True],
'public_pkcs8': [secret_store.SecretType.PUBLIC,
keys.get_public_key_pem(),
False],
'public_pkcs1': [secret_store.SecretType.PUBLIC,
keys.get_public_key_pem(),
True],
})
def test_store_asymmetric_key_secret_return_value(self,
barbican_type,
barbican_key,
pkcs1_only):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 2048)
secret_dto = secret_store.SecretDTO(barbican_type,
base64.b64encode(barbican_key),
key_spec,
'content_type')
self.secret_store.pkcs1_only = pkcs1_only
return_value = self.secret_store.store_secret(secret_dto)
expected = {kss.KMIPSecretStore.KEY_UUID: 'uuid'}
self.assertEqual(expected, return_value)
@utils.parameterized_dataset({
'rsa': [secret_store.KeyAlgorithm.RSA, 2048],
'no_key_spec': [None, None]
})
def test_store_certificate_secret_assert_called(
self, algorithm, bit_length):
key_spec = secret_store.KeySpec(algorithm, bit_length)
certificate_value = base64.b64encode(keys.get_certificate_pem())
secret_dto = secret_store.SecretDTO(
secret_store.SecretType.CERTIFICATE,
certificate_value,
key_spec,
'content_type')
self.secret_store.store_secret(secret_dto)
self.secret_store.client.register.assert_called_once_with(
objects.X509Certificate(translations.convert_pem_to_der(
base64.b64decode(certificate_value),
secret_store.SecretType.CERTIFICATE)))
def test_store_certificate_secret_return_value(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 2048)
secret_dto = secret_store.SecretDTO(
secret_store.SecretType.CERTIFICATE,
base64.b64encode(keys.get_certificate_pem()),
key_spec,
'content_type')
return_value = self.secret_store.store_secret(secret_dto)
expected = {kss.KMIPSecretStore.KEY_UUID: 'uuid'}
self.assertEqual(expected, return_value)
def test_store_secret_server_error_occurs(self):
self.secret_store.client.register.side_effect = Exception
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
128, 'mode')
secret_dto = secret_store.SecretDTO(secret_store.SecretType.SYMMETRIC,
utils.get_symmetric_key(),
key_spec,
'content_type',
transport_key=None)
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.store_secret,
secret_dto)
def test_store_secret_invalid_algorithm(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.DSA,
128, 'mode')
secret_dto = secret_store.SecretDTO(secret_store.SecretType.SYMMETRIC,
"AAAA",
key_spec,
'content_type',
transport_key=None)
self.assertRaises(
secret_store.SecretAlgorithmNotSupportedException,
self.secret_store.store_secret,
secret_dto)
def test_store_secret_valid_algorithm_invalid_bit_length(self):
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
56, 'mode')
secret_dto = secret_store.SecretDTO(secret_store.SecretType.SYMMETRIC,
"AAAA",
key_spec,
'content_type',
transport_key=None)
self.assertRaises(
secret_store.SecretAlgorithmNotSupportedException,
self.secret_store.store_secret,
secret_dto)
def test_store_secret_error_opening_connection(self):
self.secret_store.client.open.side_effect = Exception
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
128, 'mode')
secret_dto = secret_store.SecretDTO(secret_store.SecretType.SYMMETRIC,
utils.get_symmetric_key(),
key_spec,
'content_type',
transport_key=None)
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.store_secret,
secret_dto)
# --------------- TEST GET -----------------------------------------------
@utils.parameterized_dataset({
'symmetric': [get_sample_symmetric_key(),
secret_store.SecretType.SYMMETRIC,
utils.get_symmetric_key(),
False],
'hmac_sha1': [get_sample_symmetric_key(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA1),
secret_store.SecretType.SYMMETRIC,
utils.get_symmetric_key(),
False],
'hmac_sha256': [get_sample_symmetric_key(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256),
secret_store.SecretType.SYMMETRIC,
utils.get_symmetric_key(),
False],
'hmac_sha384': [get_sample_symmetric_key(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA384),
secret_store.SecretType.SYMMETRIC,
utils.get_symmetric_key(),
False],
'hmac_sha512': [get_sample_symmetric_key(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA512),
secret_store.SecretType.SYMMETRIC,
utils.get_symmetric_key(),
False],
'triple_des': [get_sample_symmetric_key(
key_b64=utils.get_triple_des_key(),
key_length=192,
algorithm=enums.CryptographicAlgorithm.TRIPLE_DES),
secret_store.SecretType.SYMMETRIC,
utils.get_triple_des_key(),
False],
'opaque': [get_sample_opaque_secret(),
secret_store.SecretType.OPAQUE,
utils.get_symmetric_key(),
False],
'public_key': [get_sample_public_key(),
secret_store.SecretType.PUBLIC,
base64.b64encode(keys.get_public_key_pem()),
False],
'public_key_pkcs1': [get_sample_public_key(pkcs1=True),
secret_store.SecretType.PUBLIC,
base64.b64encode(keys.get_public_key_pem()),
True],
'private_key': [get_sample_private_key(),
secret_store.SecretType.PRIVATE,
base64.b64encode(keys.get_private_key_pem()),
False],
'private_key_pkcs1': [get_sample_private_key(pkcs1=True),
secret_store.SecretType.PRIVATE,
base64.b64encode(keys.get_private_key_pem()),
True],
'certificate': [get_sample_certificate(),
secret_store.SecretType.CERTIFICATE,
base64.b64encode(keys.get_certificate_pem()),
False]
})
def test_get_secret(self, kmip_secret, secret_type, expected_secret,
pkcs1_only):
self.secret_store.pkcs1_only = pkcs1_only
self.secret_store.client.get.return_value = kmip_secret
uuid = utils.generate_test_uuid(0)
metadata = {kss.KMIPSecretStore.KEY_UUID: uuid}
secret_dto = self.secret_store.get_secret(secret_type, metadata)
self.secret_store.client.get.assert_called_once_with(uuid)
self.assertEqual(secret_store.SecretDTO, type(secret_dto))
self.assertEqual(secret_type, secret_dto.type)
self.assertEqual(expected_secret, secret_dto.secret)
def test_get_secret_symmetric_return_value_invalid_key_material_type(self):
invalid_secret = self.sample_secret
invalid_secret.value = list('invalid')
self.secret_store.client.get.return_value = invalid_secret
metadata = {kss.KMIPSecretStore.KEY_UUID: self.symmetric_key_uuid}
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.get_secret,
self.symmetric_type, metadata)
def test_get_secret_symmetric_server_error_occurs(self):
self.secret_store.client.get.side_effect = Exception
metadata = {kss.KMIPSecretStore.KEY_UUID: self.symmetric_key_uuid}
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.get_secret,
self.symmetric_type, metadata)
def test_get_secret_symmetric_error_opening_connection(self):
self.secret_store.client.open.side_effect = Exception
metadata = {kss.KMIPSecretStore.KEY_UUID: self.symmetric_key_uuid}
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.get_secret,
self.symmetric_type, metadata)
# ---------------- TEST DELETE -------------------------------------------
def test_delete_with_null_metadata_values(self):
metadata = {kss.KMIPSecretStore.KEY_UUID: None}
self.assertIsNone(self.secret_store.delete_secret(metadata))
def test_delete_secret_assert_called(self):
metadata = {kss.KMIPSecretStore.KEY_UUID: self.symmetric_key_uuid}
self.secret_store.delete_secret(metadata)
self.secret_store.client.destroy.assert_called_once_with(
self.symmetric_key_uuid)
def test_delete_secret_return_value(self):
metadata = {kss.KMIPSecretStore.KEY_UUID: self.symmetric_key_uuid}
return_value = self.secret_store.delete_secret(metadata)
self.assertIsNone(return_value)
def test_delete_secret_server_error_occurs(self):
self.secret_store.client.destroy.side_effect = Exception
metadata = {kss.KMIPSecretStore.KEY_UUID: self.symmetric_key_uuid}
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.delete_secret,
metadata)
def test_delete_secret_error_opening_connection(self):
self.secret_store.client.open.side_effect = Exception
metadata = {kss.KMIPSecretStore.KEY_UUID: self.symmetric_key_uuid}
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.delete_secret,
metadata)
# -------------- TEST HELPER FUNCTIONS -----------------------------------
def test_credential(self):
actual_credential = self.secret_store.credential
self.assertEqual(
self.expected_username,
actual_credential.credential_value.username.value)
self.assertEqual(
self.expected_password,
actual_credential.credential_value.password.value)
def test_credential_None(self):
CONF = kss.CONF
CONF.kmip_plugin.username = None
CONF.kmip_plugin.password = None
CONF.kmip_plugin.keyfile = None
secret_store = kss.KMIPSecretStore(CONF)
self.assertIsNone(secret_store.credential)
def test_map_type_ss_to_kmip_valid_type(self):
ss_types = [secret_store.SecretType.SYMMETRIC,
secret_store.SecretType.PUBLIC,
secret_store.SecretType.PRIVATE]
for ss_type in ss_types:
self.assertIsNotNone(
self.secret_store._map_type_ss_to_kmip(ss_type))
def test_map_type_ss_to_kmip_invalid_type(self):
object_type, key_format_type = (
self.secret_store._map_type_ss_to_kmip('bad_type'))
self.assertIsNone(object_type)
self.assertIsNone(key_format_type)
def test_validate_keyfile_permissions_good(self):
config = {'return_value.st_mode':
(stat.S_IRUSR | stat.S_IFREG)}
with mock.patch('os.stat', **config):
self.assertIsNone(
self.secret_store._validate_keyfile_permissions('/some/path/'))
def test_check_keyfile_permissions_bad(self):
config = {'return_value.st_mode':
(stat.S_IWOTH | stat.S_IFREG)}
with mock.patch('os.stat', **config):
self.assertRaises(
kss.KMIPSecretStoreError,
self.secret_store._validate_keyfile_permissions,
'/some/path/')
def test_checks_keyfile_permissions(self):
config = {'return_value': True}
func = ("barbican.plugin.kmip_secret_store."
"KMIPSecretStore._validate_keyfile_permissions")
with mock.patch(func, **config) as m:
CONF = kss.CONF
CONF.kmip_plugin.keyfile = '/some/path'
kss.KMIPSecretStore(CONF)
self.assertEqual(1, len(m.mock_calls))
def test_get_plugin_name(self):
CONF = kss.CONF
CONF.kmip_plugin.plugin_name = "Test KMIP Plugin"
secret_store = kss.KMIPSecretStore(CONF)
self.assertEqual("Test KMIP Plugin", secret_store.get_plugin_name())