Add DeriveKey support to the server

This change adds the DeriveKey operation to the server. Unit tests
covering the new additions are included. The Query operation has
also been updated to reflect this addition.
This commit is contained in:
Peter Hamilton 2017-07-12 13:34:25 -04:00
parent 590313dcf9
commit 90fafe8713
3 changed files with 860 additions and 31 deletions

View File

@ -42,6 +42,7 @@ from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import derive_key
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import encrypt
@ -963,6 +964,8 @@ class KmipEngine(object):
return self._process_create_key_pair(payload)
elif operation == enums.Operation.REGISTER:
return self._process_register(payload)
elif operation == enums.Operation.DERIVE_KEY:
return self._process_derive_key(payload)
elif operation == enums.Operation.LOCATE:
return self._process_locate(payload)
elif operation == enums.Operation.GET:
@ -1330,6 +1333,188 @@ class KmipEngine(object):
return response_payload
@_kmip_version_supported('1.0')
def _process_derive_key(self, payload):
self._logger.info("Processing operation: DeriveKey")
object_attributes = {}
if payload.template_attribute:
object_attributes = self._process_template_attribute(
payload.template_attribute
)
if payload.object_type not in [
enums.ObjectType.SYMMETRIC_KEY,
enums.ObjectType.SECRET_DATA
]:
raise exceptions.InvalidField(
"Key derivation can only generate a SymmetricKey or "
"SecretData object."
)
# Retrieve existing managed objects to be used in the key derivation
# process. If any are unaccessible or not suitable for key derivation,
# raise an error.
existing_objects = []
for unique_identifier in payload.unique_identifiers:
managed_object = self._get_object_with_access_controls(
unique_identifier,
enums.Operation.GET
)
if managed_object._object_type not in [
enums.ObjectType.SECRET_DATA,
enums.ObjectType.SYMMETRIC_KEY,
enums.ObjectType.PUBLIC_KEY,
enums.ObjectType.PRIVATE_KEY
]:
raise exceptions.InvalidField(
"Object {0} is not a suitable type for key "
"derivation. Please specify a key or secret data.".format(
unique_identifier
)
)
elif enums.CryptographicUsageMask.DERIVE_KEY not in \
managed_object.cryptographic_usage_masks:
raise exceptions.InvalidField(
"The DeriveKey bit must be set in the cryptographic usage "
"mask for object {0} for it to be used in key "
"derivation.".format(unique_identifier)
)
else:
existing_objects.append(managed_object)
if len(existing_objects) > 1:
self._logger.info(
"{0} derivation objects specified with the DeriveKey "
"request.".format(len(existing_objects))
)
# Select the derivation object to use as the keying material
keying_object = existing_objects[0]
self._logger.info(
"Object {0} will be used as the keying material for the "
"derivation process.".format(keying_object.unique_identifier)
)
derivation_parameters = payload.derivation_parameters
derivation_data = None
if derivation_parameters.derivation_data is None:
if len(existing_objects) > 1:
for alternate in existing_objects[1:]:
if alternate._object_type == enums.ObjectType.SECRET_DATA:
self._logger.info(
"Object {0} will be used as the derivation data "
"for the derivation process.".format(
alternate.unique_identifier
)
)
derivation_data = alternate.value
break
else:
derivation_data = derivation_parameters.derivation_data
iv = b''
if derivation_parameters.initialization_vector is not None:
iv = derivation_parameters.initialization_vector
# Get the derivation length from the template attribute. It is
# required so if it cannot be found, raise an error.
derivation_length = None
attribute = object_attributes.get('Cryptographic Length')
if attribute:
derivation_length = attribute.value
if (derivation_length % 8) == 0:
derivation_length //= 8
else:
raise exceptions.InvalidField(
"The cryptographic length must correspond to a valid "
"number of bytes (i.e., it must be a multiple of 8)."
)
else:
raise exceptions.InvalidField(
"The cryptographic length must be provided in the template "
"attribute."
)
cryptographic_algorithm = None
if payload.object_type == enums.ObjectType.SYMMETRIC_KEY:
attribute = object_attributes.get('Cryptographic Algorithm')
if attribute:
cryptographic_algorithm = attribute.value
else:
raise exceptions.InvalidField(
"The cryptographic algorithm must be provided in the "
"template attribute when deriving a symmetric key."
)
# TODO (peterhamilton): Pull cryptographic parameters from the keying
# object if none are provided with the payload
crypto_parameters = derivation_parameters.cryptographic_parameters
derived_data = self._cryptography_engine.derive_key(
derivation_method=payload.derivation_method,
derivation_length=derivation_length,
derivation_data=derivation_data,
key_material=keying_object.value,
hash_algorithm=crypto_parameters.hashing_algorithm,
salt=derivation_parameters.salt,
iteration_count=derivation_parameters.iteration_count,
encryption_algorithm=crypto_parameters.cryptographic_algorithm,
cipher_mode=crypto_parameters.block_cipher_mode,
padding_method=crypto_parameters.padding_method,
iv_nonce=iv
)
if derivation_length > len(derived_data):
raise exceptions.CryptographicFailure(
"The specified length exceeds the output of the derivation "
"method."
)
if payload.object_type == enums.ObjectType.SYMMETRIC_KEY:
managed_object = objects.SymmetricKey(
algorithm=cryptographic_algorithm,
length=(derivation_length * 8),
value=derived_data,
)
else:
managed_object = objects.SecretData(
value=derived_data,
data_type=enums.SecretDataType.SEED,
)
managed_object.names = []
if payload.object_type == enums.ObjectType.SECRET_DATA:
del object_attributes['Cryptographic Length']
self._set_attributes_on_managed_object(
managed_object,
object_attributes
)
# TODO (peterhamilton) Set additional server-only attributes.
managed_object._owner = self._client_identity
managed_object.initial_date = int(time.time())
self._data_session.add(managed_object)
self._data_session.commit()
self._logger.info(
"Created a {0} with ID: {1}".format(
''.join(
[x.capitalize() for x in
payload.object_type.name.split('_')]
),
managed_object.unique_identifier
)
)
self._id_placeholder = str(managed_object.unique_identifier)
response_payload = derive_key.DeriveKeyResponsePayload(
unique_identifier=str(managed_object.unique_identifier)
)
return response_payload
@_kmip_version_supported('1.0')
def _process_locate(self, payload):
# TODO: Need to complete the filtering logic based on all given
@ -1734,6 +1919,7 @@ class KmipEngine(object):
contents.Operation(enums.Operation.CREATE),
contents.Operation(enums.Operation.CREATE_KEY_PAIR),
contents.Operation(enums.Operation.REGISTER),
contents.Operation(enums.Operation.DERIVE_KEY),
contents.Operation(enums.Operation.LOCATE),
contents.Operation(enums.Operation.GET),
contents.Operation(enums.Operation.GET_ATTRIBUTES),

View File

@ -1008,10 +1008,9 @@ def test_handle_symmetric_padding_undo(symmetric_padding_parameters):
#
# https://www.ietf.org/rfc/rfc6070.txt
#
# HMAC test vectors were obtained from IETF RFC 2202 and RFC 4231:
# HMAC test vectors were obtained from IETF RFC 5869:
#
# https://tools.ietf.org/html/rfc2202
# https://tools.ietf.org/html/rfc4231
# https://tools.ietf.org/html/rfc5869
#
# HASH test vectors for SHA1/SHA224/SHA256/SHA384/SHA512
# were obtained from the NIST CAVP test suite. Test vectors for MD5 were

View File

@ -43,6 +43,7 @@ from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import derive_key
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import encrypt
@ -3477,6 +3478,637 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_derive_key(self):
"""
Test that a DeriveKey request can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256,
length=176,
value=(
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
attribute_factory = factory.AttributeFactory()
# Derive a SymmetricKey object.
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SYMMETRIC_KEY,
unique_identifiers=[str(base_key.unique_identifier)],
derivation_method=enums.DerivationMethod.HMAC,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.SHA_256
),
derivation_data=(
b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7'
b'\xf8\xf9'
),
salt=(
b'\x00\x01\x02\x03\x04\x05\x06\x07'
b'\x08\x09\x0a\x0b\x0c'
)
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
336
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
]
)
)
response_payload = e._process_derive_key(payload)
e._logger.info.assert_any_call("Processing operation: DeriveKey")
e._logger.info.assert_any_call(
"Object 1 will be used as the keying material for the derivation "
"process."
)
e._logger.info.assert_any_call("Created a SymmetricKey with ID: 2")
self.assertEqual("2", response_payload.unique_identifier)
managed_object = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.SymmetricKey.unique_identifier == 2
).one()
self.assertEqual(
(
b'\x3c\xb2\x5f\x25\xfa\xac\xd5\x7a'
b'\x90\x43\x4f\x64\xd0\x36\x2f\x2a'
b'\x2d\x2d\x0a\x90\xcf\x1a\x5a\x4c'
b'\x5d\xb0\x2d\x56\xec\xc4\xc5\xbf'
b'\x34\x00\x72\x08\xd5\xb8\x87\x18'
b'\x58\x65'
),
managed_object.value
)
self.assertEqual(
enums.CryptographicAlgorithm.AES,
managed_object.cryptographic_algorithm
)
self.assertEqual(
336,
managed_object.cryptographic_length
)
self.assertIsNotNone(managed_object.initial_date)
e._logger.reset_mock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.BLOWFISH,
length=128,
value=(
b'\x01\x23\x45\x67\x89\xAB\xCD\xEF'
b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
# Derive a SecretData object.
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SECRET_DATA,
unique_identifiers=[str(base_key.unique_identifier)],
derivation_method=enums.DerivationMethod.ENCRYPT,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
block_cipher_mode=enums.BlockCipherMode.CBC,
padding_method=enums.PaddingMethod.PKCS5,
hashing_algorithm=enums.HashingAlgorithm.SHA_256,
cryptographic_algorithm=(
enums.CryptographicAlgorithm.BLOWFISH
)
),
initialization_vector=b'\xFE\xDC\xBA\x98\x76\x54\x32\x10',
derivation_data=(
b'\x37\x36\x35\x34\x33\x32\x31\x20'
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
b'\x66\x6F\x72\x20\x00'
),
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
256
)
]
)
)
response_payload = e._process_derive_key(payload)
e._logger.info.assert_any_call("Processing operation: DeriveKey")
e._logger.info.assert_any_call(
"Object 3 will be used as the keying material for the derivation "
"process."
)
e._logger.info.assert_any_call("Created a SecretData with ID: 4")
self.assertEqual("4", response_payload.unique_identifier)
managed_object = e._data_session.query(
pie_objects.SecretData
).filter(
pie_objects.SecretData.unique_identifier == 4
).one()
self.assertEqual(
(
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
),
managed_object.value
)
self.assertEqual(enums.SecretDataType.SEED, managed_object.data_type)
self.assertIsNotNone(managed_object.initial_date)
def test_derive_key_invalid_derivation_type(self):
"""
Test that the right error is thrown when an invalid derivation type
is provided with a DeriveKey request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.CERTIFICATE
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"Key derivation can only generate a SymmetricKey or SecretData "
"object.",
e._process_derive_key,
*args
)
def test_derive_key_invalid_base_key(self):
"""
Test that the right error is thrown when an object not suitable for
key derivation is provided as the base key with a DeriveKey request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
invalid_key = pie_objects.OpaqueObject(
b'\x01\x02\x04\x08\x10\x20\x40\x80',
enums.OpaqueDataType.NONE
)
e._data_session.add(invalid_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SECRET_DATA,
unique_identifiers=[str(invalid_key.unique_identifier)]
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"Object 1 is not a suitable type for key derivation. Please "
"specify a key or secret data.",
e._process_derive_key,
*args
)
def test_derive_key_non_derivable_base_key(self):
"""
Test that the right error is thrown when an object suitable for
key derivation but not marked as such is provided as the base key
with a DeriveKey request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
128,
(
b'\x00\x01\x02\x03\x04\x05\x06\x07'
b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F'
),
[enums.CryptographicUsageMask.ENCRYPT]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SECRET_DATA,
unique_identifiers=[str(base_key.unique_identifier)]
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"The DeriveKey bit must be set in the cryptographic usage mask "
"for object 1 for it to be used in key derivation.",
e._process_derive_key,
*args
)
def test_derive_key_alternate_derivation_data(self):
"""
Test that a DeriveKey request can be processed correctly by
specifying multiple base objects and no derivation data.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256,
length=176,
value=(
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
base_data = pie_objects.SecretData(
value=(
b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7'
b'\xf8\xf9'
),
data_type=enums.SecretDataType.SEED,
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_data)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
attribute_factory = factory.AttributeFactory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SYMMETRIC_KEY,
unique_identifiers=[
str(base_key.unique_identifier),
str(base_data.unique_identifier)
],
derivation_method=enums.DerivationMethod.HMAC,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.SHA_256
),
salt=(
b'\x00\x01\x02\x03\x04\x05\x06\x07'
b'\x08\x09\x0a\x0b\x0c'
)
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
336
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
]
)
)
response_payload = e._process_derive_key(payload)
e._logger.info.assert_any_call("Processing operation: DeriveKey")
e._logger.info.assert_any_call(
"2 derivation objects specified with the DeriveKey request."
)
e._logger.info.assert_any_call(
"Object 1 will be used as the keying material for the derivation "
"process."
)
e._logger.info.assert_any_call(
"Object 2 will be used as the derivation data for the derivation "
"process."
)
e._logger.info.assert_any_call("Created a SymmetricKey with ID: 3")
self.assertEqual("3", response_payload.unique_identifier)
managed_object = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.SymmetricKey.unique_identifier == 3
).one()
self.assertEqual(
(
b'\x3c\xb2\x5f\x25\xfa\xac\xd5\x7a'
b'\x90\x43\x4f\x64\xd0\x36\x2f\x2a'
b'\x2d\x2d\x0a\x90\xcf\x1a\x5a\x4c'
b'\x5d\xb0\x2d\x56\xec\xc4\xc5\xbf'
b'\x34\x00\x72\x08\xd5\xb8\x87\x18'
b'\x58\x65'
),
managed_object.value
)
self.assertEqual(
enums.CryptographicAlgorithm.AES,
managed_object.cryptographic_algorithm
)
self.assertEqual(
336,
managed_object.cryptographic_length
)
self.assertIsNotNone(managed_object.initial_date)
def test_derive_key_unspecified_iv(self):
"""
"""
self.skip('')
def test_derive_key_missing_cryptographic_length(self):
"""
Test that the right error is thrown when the cryptographic length is
missing from a DeriveKey request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256,
length=160,
value=(
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
attribute_factory = factory.AttributeFactory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SYMMETRIC_KEY,
unique_identifiers=[str(base_key.unique_identifier)],
derivation_method=enums.DerivationMethod.HMAC,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.SHA_256
),
derivation_data=b'\x48\x69\x20\x54\x68\x65\x72\x65',
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
]
)
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"The cryptographic length must be provided in the template "
"attribute.",
e._process_derive_key,
*args
)
def test_derive_key_invalid_cryptographic_length(self):
"""
Test that the right error is thrown when an invalid cryptographic
length is provided with a DeriveKey request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256,
length=160,
value=(
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
attribute_factory = factory.AttributeFactory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SYMMETRIC_KEY,
unique_identifiers=[str(base_key.unique_identifier)],
derivation_method=enums.DerivationMethod.HMAC,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.SHA_256
),
derivation_data=b'\x48\x69\x20\x54\x68\x65\x72\x65',
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
123
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
]
)
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"The cryptographic length must correspond to a valid number of "
"bytes \(i.e., it must be a multiple of 8\).",
e._process_derive_key,
*args
)
def test_derive_key_missing_cryptographic_algorithm(self):
"""
Test that the right error is thrown when the cryptographic algorithm
is missing from a DeriveKey request when deriving a symmetric key.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256,
length=160,
value=(
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
attribute_factory = factory.AttributeFactory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SYMMETRIC_KEY,
unique_identifiers=[str(base_key.unique_identifier)],
derivation_method=enums.DerivationMethod.HMAC,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.SHA_256
),
derivation_data=b'\x48\x69\x20\x54\x68\x65\x72\x65',
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
256
)
]
)
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"The cryptographic algorithm must be provided in the template "
"attribute when deriving a symmetric key.",
e._process_derive_key,
*args
)
def test_derive_key_oversized_cryptographic_length(self):
"""
Test that the right error is thrown when an invalid cryptographic
length is provided with a DeriveKey request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256,
length=160,
value=(
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._cryptography_engine = mock.MagicMock()
e._cryptography_engine.derive_key.return_value = b''
attribute_factory = factory.AttributeFactory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SYMMETRIC_KEY,
unique_identifiers=[str(base_key.unique_identifier)],
derivation_method=enums.DerivationMethod.HMAC,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.SHA_256
),
derivation_data=b'\x48\x69\x20\x54\x68\x65\x72\x65',
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
256
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
]
)
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.CryptographicFailure,
"The specified length exceeds the output of the derivation "
"method.",
e._process_derive_key,
*args
)
def test_locate(self):
"""
Test that a Locate request can be processed correctly.
@ -5620,7 +6252,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(10, len(result.operations))
self.assertEqual(11, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
@ -5634,33 +6266,37 @@ class TestKmipEngine(testtools.TestCase):
result.operations[2].value
)
self.assertEqual(
enums.Operation.LOCATE,
enums.Operation.DERIVE_KEY,
result.operations[3].value
)
self.assertEqual(
enums.Operation.GET,
enums.Operation.LOCATE,
result.operations[4].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTES,
enums.Operation.GET,
result.operations[5].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST,
enums.Operation.GET_ATTRIBUTES,
result.operations[6].value
)
self.assertEqual(
enums.Operation.ACTIVATE,
enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[7].value
)
self.assertEqual(
enums.Operation.DESTROY,
enums.Operation.ACTIVATE,
result.operations[8].value
)
self.assertEqual(
enums.Operation.QUERY,
enums.Operation.DESTROY,
result.operations[9].value
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[10].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
@ -5698,7 +6334,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(11, len(result.operations))
self.assertEqual(12, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
@ -5712,37 +6348,41 @@ class TestKmipEngine(testtools.TestCase):
result.operations[2].value
)
self.assertEqual(
enums.Operation.LOCATE,
enums.Operation.DERIVE_KEY,
result.operations[3].value
)
self.assertEqual(
enums.Operation.GET,
enums.Operation.LOCATE,
result.operations[4].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTES,
enums.Operation.GET,
result.operations[5].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST,
enums.Operation.GET_ATTRIBUTES,
result.operations[6].value
)
self.assertEqual(
enums.Operation.ACTIVATE,
enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[7].value
)
self.assertEqual(
enums.Operation.DESTROY,
enums.Operation.ACTIVATE,
result.operations[8].value
)
self.assertEqual(
enums.Operation.QUERY,
enums.Operation.DESTROY,
result.operations[9].value
)
self.assertEqual(
enums.Operation.DISCOVER_VERSIONS,
enums.Operation.QUERY,
result.operations[10].value
)
self.assertEqual(
enums.Operation.DISCOVER_VERSIONS,
result.operations[11].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
@ -5780,7 +6420,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(13, len(result.operations))
self.assertEqual(14, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
@ -5794,45 +6434,49 @@ class TestKmipEngine(testtools.TestCase):
result.operations[2].value
)
self.assertEqual(
enums.Operation.LOCATE,
enums.Operation.DERIVE_KEY,
result.operations[3].value
)
self.assertEqual(
enums.Operation.GET,
enums.Operation.LOCATE,
result.operations[4].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTES,
enums.Operation.GET,
result.operations[5].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST,
enums.Operation.GET_ATTRIBUTES,
result.operations[6].value
)
self.assertEqual(
enums.Operation.ACTIVATE,
enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[7].value
)
self.assertEqual(
enums.Operation.DESTROY,
enums.Operation.ACTIVATE,
result.operations[8].value
)
self.assertEqual(
enums.Operation.QUERY,
enums.Operation.DESTROY,
result.operations[9].value
)
self.assertEqual(
enums.Operation.DISCOVER_VERSIONS,
enums.Operation.QUERY,
result.operations[10].value
)
self.assertEqual(
enums.Operation.ENCRYPT,
enums.Operation.DISCOVER_VERSIONS,
result.operations[11].value
)
self.assertEqual(
enums.Operation.MAC,
enums.Operation.ENCRYPT,
result.operations[12].value
)
self.assertEqual(
enums.Operation.MAC,
result.operations[13].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(