Add necessary objects/attributes and payloads for MAC operation

This commit is contained in:
Hao Shen 2017-01-30 09:59:14 -08:00
parent 51ec018b35
commit a2696b722f
12 changed files with 1041 additions and 13 deletions

View File

@ -263,6 +263,7 @@ class CryptographicParameters(Struct):
enums.BlockCipherMode, value, Tags.BLOCK_CIPHER_MODE)
class PaddingMethod(Enumeration):
def __init__(self, value=None):
super(CryptographicParameters.PaddingMethod, self).__init__(
enums.PaddingMethod, value, Tags.PADDING_METHOD)
@ -273,17 +274,29 @@ class CryptographicParameters(Struct):
super(CryptographicParameters.KeyRoleType, self).__init__(
enums.KeyRoleType, value, Tags.KEY_ROLE_TYPE)
class DigitalSignatureAlgorithm(Enumeration):
def __init__(self, value=None):
super(CryptographicParameters.DigitalSignatureAlgorithm,
self).__init__(enums.DigitalSignatureAlgorithm,
value, Tags.DIGITAL_SIGNATURE_ALGORITHM)
# TODO: Need to implement other fields of CryptographicParameters (3.6)
def __init__(self,
block_cipher_mode=None,
padding_method=None,
hashing_algorithm=None,
key_role_type=None):
key_role_type=None,
digital_signature_algorithm=None,
cryptographic_algorithm=None):
super(CryptographicParameters, self).__init__(
tag=Tags.CRYPTOGRAPHIC_PARAMETERS)
self.block_cipher_mode = block_cipher_mode
self.padding_method = padding_method
self.hashing_algorithm = hashing_algorithm
self.key_role_type = key_role_type
self.digital_signature_algorithm = digital_signature_algorithm
self.cryptographic_algorithm = cryptographic_algorithm
def read(self, istream):
super(CryptographicParameters, self).read(istream)
@ -305,6 +318,15 @@ class CryptographicParameters(Struct):
self.key_role_type = CryptographicParameters.KeyRoleType()
self.key_role_type.read(tstream)
if self.is_tag_next(Tags.DIGITAL_SIGNATURE_ALGORITHM, tstream):
self.digital_signature_algorithm = \
CryptographicParameters.DigitalSignatureAlgorithm()
self.digital_signature_algorithm.read(tstream)
if self.is_tag_next(Tags.CRYPTOGRAPHIC_ALGORITHM, tstream):
self.cryptographic_algorithm = CryptographicAlgorithm()
self.cryptographic_algorithm.read(tstream)
self.is_oversized(tstream)
self.validate()
@ -320,6 +342,10 @@ class CryptographicParameters(Struct):
self.hashing_algorithm.write(tstream)
if self.key_role_type is not None:
self.key_role_type.write(tstream)
if self.digital_signature_algorithm is not None:
self.digital_signature_algorithm.write(tstream)
if self.cryptographic_algorithm is not None:
self.cryptographic_algorithm.write(tstream)
# Write the length and value of the request payload
self.length = tstream.length()
@ -354,6 +380,23 @@ class CryptographicParameters(Struct):
msg += "; expected {0}, received {1}".format(
self.KeyRoleType, self.key_role_type)
raise TypeError(msg)
if self.digital_signature_algorithm is not None:
if not isinstance(
self.digital_signature_algorithm,
CryptographicParameters.DigitalSignatureAlgorithm
):
msg = "Invalid digital signature algorithm"
msg += "; expected {0}, received {1}".format(
CryptographicParameters.DigitalSignatureAlgorithm,
self.digital_signature_algorithm)
raise TypeError(msg)
if self.cryptographic_algorithm is not None:
if not isinstance(self.cryptographic_algorithm,
CryptographicAlgorithm):
msg = "Invalid cryptograhic algorithm"
msg += "; expected {0}, received {1}".format(
CryptographicAlgorithm, self.cryptographic_algorithm)
raise TypeError(msg)
def __eq__(self, other):
if isinstance(other, CryptographicParameters):
@ -363,6 +406,11 @@ class CryptographicParameters(Struct):
return False
elif self.hashing_algorithm != other.hashing_algorithm:
return False
elif self.digital_signature_algorithm \
!= other.digital_signature_algorithm:
return False
elif self.cryptographic_algorithm != other.cryptographic_algorithm:
return False
elif self.padding_method != other.padding_method:
return False
else:

View File

@ -140,6 +140,9 @@ class AttributeValueFactory(object):
padding_method = None
hashing_algorithm = None
key_role_type = None
digital_signature_algorithm = None
cryptographic_algorithm = None
# TODO: Need to implement other fields of CryptographicParameters (3.6)
if params is not None:
@ -162,11 +165,23 @@ class AttributeValueFactory(object):
hashing_algorithm = attributes.HashingAlgorithm(
params.get("hashing_algorithm"))
if 'digital_signature_algorithm' in params:
digital_signature_algorithm = \
attributes.CryptographicParameters. \
DigitalSignatureAlgorithm(
params.get("digital_signature_algorithm"))
if 'cryptographic_algorithm' in params:
cryptographic_algorithm = attributes.CryptographicAlgorithm(
params.get("cryptographic_algorithm"))
return attributes.CryptographicParameters(
block_cipher_mode=bcm,
padding_method=padding_method,
hashing_algorithm=hashing_algorithm,
key_role_type=key_role_type)
key_role_type=key_role_type,
digital_signature_algorithm=digital_signature_algorithm,
cryptographic_algorithm=cryptographic_algorithm)
def _create_cryptographic_usage_mask(self, flags):
mask = None

View File

@ -28,6 +28,7 @@ from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import mac
class RequestPayloadFactory(PayloadFactory):
@ -70,3 +71,6 @@ class RequestPayloadFactory(PayloadFactory):
def _create_revoke_payload(self):
return revoke.RevokeRequestPayload()
def _create_mac_payload(self):
return mac.MACRequestPayload()

View File

@ -28,6 +28,7 @@ from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import mac
class ResponsePayloadFactory(PayloadFactory):
@ -70,3 +71,6 @@ class ResponsePayloadFactory(PayloadFactory):
def _create_revoke_payload(self):
return revoke.RevokeResponsePayload()
def _create_mac_payload(self):
return mac.MACResponsePayload()

View File

@ -0,0 +1,207 @@
# Copyright (c) 2017 Pure Storage, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core import attributes
from kmip.core import enums
from kmip.core import exceptions
from kmip.core.enums import Tags
from kmip.core.objects import Data, MACData
from kmip.core.primitives import Struct
from kmip.core.utils import BytearrayStream
# 4.33
class MACRequestPayload(Struct):
def __init__(self,
unique_identifier=None,
cryptographic_parameters=None,
data=None):
super(MACRequestPayload, self).__init__(
tag=enums.Tags.REQUEST_PAYLOAD)
self._unique_identifier = None
self._cryptographic_parameters = None
self._data = None
self.unique_identifier = unique_identifier
self.cryptographic_parameters = cryptographic_parameters
self.data = data
@property
def unique_identifier(self):
return self._unique_identifier
@unique_identifier.setter
def unique_identifier(self, value):
if value is None:
self._unique_identifier = None
elif isinstance(value, attributes.UniqueIdentifier):
self._unique_identifier = value
else:
raise TypeError("unique identifier must be UniqueIdentifier type")
@property
def cryptographic_parameters(self):
return self._cryptographic_parameters
@cryptographic_parameters.setter
def cryptographic_parameters(self, value):
if value is None:
self._cryptographic_parameters = None
elif isinstance(value, attributes.CryptographicParameters):
self._cryptographic_parameters = value
else:
raise TypeError("cryptographic parameters must "
"be CryptographicParameters type")
@property
def data(self):
return self._data
@data.setter
def data(self, value):
if value is None:
self._data = None
elif isinstance(value, Data):
self._data = value
else:
raise TypeError("data must be Data type")
def read(self, istream):
super(MACRequestPayload, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
if self.is_tag_next(Tags.UNIQUE_IDENTIFIER, tstream):
self.unique_identifier = attributes.UniqueIdentifier()
self.unique_identifier.read(tstream)
if self.is_tag_next(Tags.CRYPTOGRAPHIC_PARAMETERS, tstream):
self.cryptographic_parameters = \
attributes.CryptographicParameters()
self.cryptographic_parameters.read(tstream)
if self.is_tag_next(Tags.DATA, tstream):
self.data = Data()
self.data.read(tstream)
else:
raise exceptions.InvalidKmipEncoding(
"expected mac request data not found"
)
self.is_oversized(tstream)
def write(self, ostream):
tstream = BytearrayStream()
if self._unique_identifier is not None:
self._unique_identifier.write(tstream)
if self._cryptographic_parameters is not None:
self._cryptographic_parameters.write(tstream)
if self._data is not None:
self.data.write(tstream)
else:
raise exceptions.InvalidField(
"The mac request data is required"
)
self.length = tstream.length()
super(MACRequestPayload, self).write(ostream)
ostream.write(tstream.buffer)
class MACResponsePayload(Struct):
def __init__(self,
unique_identifier=None,
mac_data=None):
super(MACResponsePayload, self).__init__(
tag=enums.Tags.RESPONSE_PAYLOAD)
self._unique_identifier = None
self._mac_data = None
self.unique_identifier = unique_identifier
self.mac_data = mac_data
@property
def unique_identifier(self):
return self._unique_identifier
@unique_identifier.setter
def unique_identifier(self, value):
if value is None:
self._unique_identifier = None
elif isinstance(value, attributes.UniqueIdentifier):
self._unique_identifier = value
else:
raise TypeError("unique identifier must be UniqueIdentifier type")
@property
def mac_data(self):
return self._mac_data
@mac_data.setter
def mac_data(self, value):
if value is None:
self._mac_data = None
elif isinstance(value, MACData):
self._mac_data = value
else:
raise TypeError("mac_data must be MACData type")
def read(self, istream):
super(MACResponsePayload, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
if self.is_tag_next(Tags.UNIQUE_IDENTIFIER, tstream):
self._unique_identifier = attributes.UniqueIdentifier()
self._unique_identifier.read(tstream)
else:
raise exceptions.InvalidKmipEncoding(
"expected mac response unique identifier not found"
)
if self.is_tag_next(Tags.MAC_DATA, tstream):
self._mac_data = MACData()
self._mac_data.read(tstream)
else:
raise exceptions.InvalidKmipEncoding(
"expected mac response mac data not found"
)
self.is_oversized(tstream)
def write(self, ostream):
tstream = BytearrayStream()
if self._unique_identifier is not None:
self._unique_identifier.write(tstream)
else:
raise exceptions.InvalidField(
"The mac response unique identifier is required"
)
if self._mac_data is not None:
self._mac_data.write(tstream)
else:
raise exceptions.InvalidField(
"The mac response mac data is required"
)
self.length = tstream.length()
super(MACResponsePayload, self).write(ostream)
ostream.write(tstream.buffer)

View File

@ -1216,6 +1216,20 @@ class ExtensionInformation(Struct):
extension_type=extension_type)
# 2.1.10
class Data(ByteString):
def __init__(self, value=None):
super(Data, self).__init__(value, Tags.DATA)
# 2.1.13
class MACData(ByteString):
def __init__(self, value=None):
super(MACData, self).__init__(value, Tags.MAC_DATA)
# 3.31, 9.1.3.2.19
class RevocationReasonCode(Enumeration):

View File

@ -32,6 +32,8 @@ from kmip.core.enums import BlockCipherMode
from kmip.core.enums import CertificateTypeEnum
from kmip.core.enums import HashingAlgorithm as HashingAlgorithmEnum
from kmip.core.enums import KeyRoleType
from kmip.core.enums import DigitalSignatureAlgorithm
from kmip.core.enums import CryptographicAlgorithm
from kmip.core.enums import PaddingMethod
from kmip.core.enums import NameType
@ -483,6 +485,7 @@ class TestApplicationData(TestCase):
class TestCryptographicParameters(TestCase):
"""
A test suite for the CryptographicParameters class
"""
@ -498,25 +501,33 @@ class TestCryptographicParameters(TestCase):
{'block_cipher_mode': BlockCipherMode.CBC,
'padding_method': PaddingMethod.PKCS5,
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
'key_role_type': KeyRoleType.BDK})
'key_role_type': KeyRoleType.BDK,
'digital_signature_algorithm':
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
self.cp_none = self.factory.create_attribute_value(
AttributeType.CRYPTOGRAPHIC_PARAMETERS, {})
# Symmetric key object with Cryptographic Parameters
# Byte stream edited to add Key Role Type parameter
# Byte stream edited to add:
# Key Role Type parameter
# Digital Signature Algorithm parameter
# Cryptographic Algorithm parameter
# Based on the KMIP Spec 1.1 Test Cases document
# 11.1 page 255 on the pdf version
self.key_req_with_crypt_params = BytearrayStream((
b'\x42\x00\x2B\x01\x00\x00\x00\x40'
b'\x42\x00\x2B\x01\x00\x00\x00\x60'
b'\x42\x00\x11\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x00\x5F\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
b'\x42\x00\x38\x05\x00\x00\x00\x04\x00\x00\x00\x04\x00\x00\x00\x00'
b'\x42\x00\x83\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x00\xAE\x05\x00\x00\x00\x04\x00\x00\x00\x05\x00\x00\x00\x00'
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x0B\x00\x00\x00\x00'
))
def teardown(self):
super(TestDigestValue, self).tearDown()
super(TestCryptographicParameters, self).tearDown()
def test_write_crypto_params(self):
ostream = BytearrayStream()
@ -546,6 +557,17 @@ class TestCryptographicParameters(TestCase):
self.assertEqual(HashingAlgorithmEnum.SHA_1.value,
self.cp.hashing_algorithm.value.value)
self.assertEqual(Tags.DIGITAL_SIGNATURE_ALGORITHM.value,
self.cp.digital_signature_algorithm.tag.value)
self.assertEqual(DigitalSignatureAlgorithm.
SHA256_WITH_RSA_ENCRYPTION.value,
self.cp.digital_signature_algorithm.value.value)
self.assertEqual(Tags.CRYPTOGRAPHIC_ALGORITHM.value,
self.cp.cryptographic_algorithm.tag.value)
self.assertEqual(CryptographicAlgorithm.HMAC_SHA512.value,
self.cp.cryptographic_algorithm.value.value)
def test_bad_cipher_mode(self):
self.cp.block_cipher_mode = self.bad_enum_code
cp_valid = self.factory.create_attribute_value(
@ -553,7 +575,10 @@ class TestCryptographicParameters(TestCase):
{'block_cipher_mode': BlockCipherMode.CBC,
'padding_method': PaddingMethod.PKCS5,
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
'key_role_type': KeyRoleType.BDK})
'key_role_type': KeyRoleType.BDK,
'digital_signature_algorithm':
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
self.assertFalse(self.cp == cp_valid)
self.assertRaises(TypeError, self.cp.validate)
@ -564,7 +589,10 @@ class TestCryptographicParameters(TestCase):
{'block_cipher_mode': BlockCipherMode.CBC,
'padding_method': PaddingMethod.PKCS5,
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
'key_role_type': KeyRoleType.BDK})
'key_role_type': KeyRoleType.BDK,
'digital_signature_algorithm':
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
self.assertFalse(self.cp == cp_valid)
self.assertRaises(TypeError, self.cp.validate)
@ -575,7 +603,10 @@ class TestCryptographicParameters(TestCase):
{'block_cipher_mode': BlockCipherMode.CBC,
'padding_method': PaddingMethod.PKCS5,
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
'key_role_type': KeyRoleType.BDK})
'key_role_type': KeyRoleType.BDK,
'digital_signature_algorithm':
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
self.assertFalse(self.cp == cp_valid)
self.assertRaises(TypeError, self.cp.validate)
@ -586,6 +617,37 @@ class TestCryptographicParameters(TestCase):
{'block_cipher_mode': BlockCipherMode.CBC,
'padding_method': PaddingMethod.PKCS5,
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
'key_role_type': KeyRoleType.BDK})
'key_role_type': KeyRoleType.BDK,
'digital_signature_algorithm':
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
self.assertFalse(self.cp == cp_valid)
self.assertRaises(TypeError, self.cp.validate)
def test_bad_digital_signature_algorithm(self):
self.cp.digital_signature_algorithm = self.bad_enum_code
cp_valid = self.factory.create_attribute_value(
AttributeType.CRYPTOGRAPHIC_PARAMETERS,
{'block_cipher_mode': BlockCipherMode.CBC,
'padding_method': PaddingMethod.PKCS5,
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
'key_role_type': KeyRoleType.BDK,
'digital_signature_algorithm':
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
self.assertFalse(self.cp == cp_valid)
self.assertRaises(TypeError, self.cp.validate)
def test_bad_cryptographic_algorithm(self):
self.cp.cryptographic_algorithm = self.bad_enum_code
cp_valid = self.factory.create_attribute_value(
AttributeType.CRYPTOGRAPHIC_PARAMETERS,
{'block_cipher_mode': BlockCipherMode.CBC,
'padding_method': PaddingMethod.PKCS5,
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
'key_role_type': KeyRoleType.BDK,
'digital_signature_algorithm':
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
self.assertFalse(self.cp == cp_valid)
self.assertRaises(TypeError, self.cp.validate)

View File

@ -31,6 +31,7 @@ from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import mac
class TestRequestPayloadFactory(testtools.TestCase):
@ -228,7 +229,8 @@ class TestRequestPayloadFactory(testtools.TestCase):
)
def test_create_mac_payload(self):
self._test_not_implemented(self.factory.create, enums.Operation.MAC)
payload = self.factory.create(enums.Operation.MAC)
self._test_payload_type(payload, mac.MACRequestPayload)
def test_create_mac_verify_payload(self):
self._test_not_implemented(

View File

@ -31,6 +31,7 @@ from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import mac
class TestResponsePayloadFactory(testtools.TestCase):
@ -226,7 +227,11 @@ class TestResponsePayloadFactory(testtools.TestCase):
)
def test_create_mac_payload(self):
self._test_not_implemented(self.factory.create, enums.Operation.MAC)
payload = self.factory.create(enums.Operation.MAC)
self._test_payload_type(
payload,
mac.MACResponsePayload
)
def test_create_mac_verify_payload(self):
self._test_not_implemented(

View File

@ -89,7 +89,11 @@ class TestAttributeValueFactory(testtools.TestCase):
'block_cipher_mode': enums.BlockCipherMode.NIST_KEY_WRAP,
'padding_method': enums.PaddingMethod.ANSI_X9_23,
'key_role_type': enums.KeyRoleType.KEK,
'hashing_algorithm': enums.HashingAlgorithm.SHA_512}
'hashing_algorithm': enums.HashingAlgorithm.SHA_512,
'digital_signature_algorithm':
enums.DigitalSignatureAlgorithm.ECDSA_WITH_SHA512,
'cryptographic_algorithm':
enums.CryptographicAlgorithm.HMAC_SHA512}
params = self.factory.create_attribute_value(
enums.AttributeType.CRYPTOGRAPHIC_PARAMETERS, value)
@ -110,6 +114,14 @@ class TestAttributeValueFactory(testtools.TestCase):
self.assertEqual(
attributes.HashingAlgorithm(enums.HashingAlgorithm.SHA_512),
params.hashing_algorithm)
self.assertEqual(
attributes.CryptographicParameters.DigitalSignatureAlgorithm(
enums.DigitalSignatureAlgorithm.ECDSA_WITH_SHA512),
params.digital_signature_algorithm)
self.assertEqual(
attributes.CryptographicAlgorithm(
enums.CryptographicAlgorithm.HMAC_SHA512),
params.cryptographic_algorithm)
def test_create_cryptographic_domain_parameters(self):
"""

View File

@ -0,0 +1,298 @@
# Copyright (c) 2017 Pure Storage, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testtools import TestCase
from kmip.core import attributes
from kmip.core import objects
from kmip.core import utils
from kmip.core import enums
from kmip.core import exceptions
from kmip.core.messages.payloads import mac
class TestMACRequestPayload(TestCase):
def setUp(self):
super(TestMACRequestPayload, self).setUp()
self.unique_identifier = attributes.UniqueIdentifier(value='1')
self.cryptographic_parameters = \
attributes.CryptographicParameters(
cryptographic_algorithm=attributes.CryptographicAlgorithm(
enums.CryptographicAlgorithm.HMAC_SHA512)
)
self.data = objects.Data(
value=(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B'
b'\x0C\x0D\x0E\x0F')
)
self.encoding_full = utils.BytearrayStream((
b'\x42\x00\x79\x01\x00\x00\x00\x40\x42\x00\x94\x07\x00\x00\x00\x01'
b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\x2b\x01\x00\x00\x00\x10'
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x0b\x00\x00\x00\x00'
b'\x42\x00\xc2\x08\x00\x00\x00\x10\x00\x01\x02\x03\x04\x05\x06\x07'
b'\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f'))
self.encoding_no_data = utils.BytearrayStream((
b'\x42\x00\x79\x01\x00\x00\x00\x28\x42\x00\x94\x07\x00\x00\x00\x01'
b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\x2b\x01\x00\x00\x00\x10'
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x0b\x00\x00\x00\x00'
))
def tearDown(self):
super(TestMACRequestPayload, self).tearDown()
def test_init_with_none(self):
mac.MACRequestPayload()
def test_init_valid(self):
"""
Test that the payload can be properly constructed and the attributes
cab be properly set and retrieved.
"""
payload = mac.MACRequestPayload(
self.unique_identifier,
self.cryptographic_parameters,
self.data)
self.assertEqual(payload.unique_identifier, self.unique_identifier)
self.assertEqual(payload.cryptographic_parameters,
self.cryptographic_parameters)
self.assertEqual(payload.data, self.data)
def test_init_with_invalid_unique_identifier(self):
kwargs = {'unique_identifier': 'invalid',
'cryptographic_parameters': None,
'data': None}
self.assertRaisesRegexp(
TypeError, "unique identifier must be UniqueIdentifier type",
mac.MACRequestPayload, **kwargs)
def test_init_with_invalid_cryptographic_parameters(self):
kwargs = {'unique_identifier': None,
'cryptographic_parameters': 'invalid',
'data': None}
self.assertRaisesRegexp(
TypeError,
"cryptographic parameters must be CryptographicParameters type",
mac.MACRequestPayload, **kwargs)
def test_init_with_invalid_data(self):
kwargs = {'unique_identifier': None,
'cryptographic_parameters': None,
'data': 'invalid'}
self.assertRaises(
TypeError, "data must be Data type",
mac.MACRequestPayload, **kwargs)
def test_read_valid(self):
stream = self.encoding_full
payload = mac.MACRequestPayload()
payload.read(stream)
self.assertEqual(self.unique_identifier, payload.unique_identifier)
self.assertEqual(self.cryptographic_parameters,
payload.cryptographic_parameters)
self.assertEqual(self.data, payload.data)
def test_read_no_data(self):
"""
Test that an InvalidKmipEncoding error gets raised when attempting to
read a mac request encoding with no data.
"""
payload = mac.MACRequestPayload()
args = (self.encoding_no_data,)
self.assertRaisesRegexp(
exceptions.InvalidKmipEncoding,
"expected mac request data not found",
payload.read,
*args
)
def test_write_valid(self):
expected = self.encoding_full
stream = utils.BytearrayStream()
payload = mac.MACRequestPayload(
self.unique_identifier,
self.cryptographic_parameters,
self.data)
payload.write(stream)
self.assertEqual(expected, stream)
def test_write_with_no_data(self):
"""
Test that an InvalidField error gets raised when attempting to
write a mac request with no data.
"""
stream = utils.BytearrayStream()
payload = mac.MACRequestPayload(
self.unique_identifier,
self.cryptographic_parameters,
None)
args = (stream,)
self.assertRaisesRegexp(
exceptions.InvalidField,
"The mac request data is required",
payload.write,
*args
)
class TestMACResponsePayload(TestCase):
def setUp(self):
super(TestMACResponsePayload, self).setUp()
self.unique_identifier = attributes.UniqueIdentifier(value='1')
self.mac_data = objects.MACData(value=(
b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb\x32\x9f'
b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2\x2a\x5b'
b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe\xc8\x23'
b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51\xff\x7c'
)
)
self.encoding_full = utils.BytearrayStream((
b'\x42\x00\x7c\x01\x00\x00\x00\x58\x42\x00\x94\x07\x00\x00\x00\x01'
b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\xc6\x08\x00\x00\x00\x40'
b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb\x32\x9f'
b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2\x2a\x5b'
b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe\xc8\x23'
b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51\xff\x7c'
))
self.encoding_no_unique_identifier = utils.BytearrayStream((
b'\x42\x00\x7c\x01\x00\x00\x00\x48\x42\x00\xc6\x08\x00\x00\x00\x40'
b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb\x32\x9f'
b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2\x2a\x5b'
b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe\xc8\x23'
b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51\xff\x7c'
))
self.encoding_no_mac_data = utils.BytearrayStream((
b'\x42\x00\x7c\x01\x00\x00\x00\x10\x42\x00\x94\x07\x00\x00\x00\x01'
b'\x31\x00\x00\x00\x00\x00\x00\x00'
))
def tearDown(self):
super(TestMACResponsePayload, self).tearDown()
def test_init_with_none(self):
mac.MACResponsePayload()
def test_init_valid(self):
"""
Test that the payload can be properly constructed and the attributes
can be properly set and retrieved.
"""
payload = mac.MACResponsePayload(
self.unique_identifier,
self.mac_data)
self.assertEqual(payload.unique_identifier, self.unique_identifier)
self.assertEqual(payload.mac_data, self.mac_data)
def test_init_with_invalid_unique_identifier(self):
kwargs = {'unique_identifier': 'invalid',
'mac_data': None}
self.assertRaisesRegexp(
TypeError, "unique identifier must be UniqueIdentifier type",
mac.MACResponsePayload, **kwargs)
def test_init_with_invalid_mac_data(self):
kwargs = {'unique_identifier': None,
'mac_data': 'invalid'}
self.assertRaises(
TypeError, "data must be MACData type",
mac.MACResponsePayload, **kwargs)
def test_read_valid(self):
stream = self.encoding_full
payload = mac.MACResponsePayload()
payload.read(stream)
self.assertEqual(self.unique_identifier, payload.unique_identifier)
self.assertEqual(self.mac_data, payload.mac_data)
def test_read_no_unique_identifier(self):
"""
Test that an InvalidKmipEncoding error gets raised when attempting to
read a mac response encoding with no unique identifier.
"""
payload = mac.MACResponsePayload()
args = (self.encoding_no_unique_identifier,)
self.assertRaisesRegexp(
exceptions.InvalidKmipEncoding,
"expected mac response unique identifier not found",
payload.read,
*args
)
def test_read_no_mac_data(self):
"""
Test that an InvalidKmipEncoding error gets raised when attempting to
read a mac response encoding with no mac data.
"""
payload = mac.MACResponsePayload()
args = (self.encoding_no_mac_data,)
self.assertRaisesRegexp(
exceptions.InvalidKmipEncoding,
"expected mac response mac data not found",
payload.read,
*args
)
def test_write_valid(self):
expected = self.encoding_full
stream = utils.BytearrayStream()
payload = mac.MACResponsePayload(
self.unique_identifier,
self.mac_data)
payload.write(stream)
self.assertEqual(expected, stream)
def test_write_with_no_unique_identifier(self):
"""
Test that an InvalidField error gets raised when attempting to
write a mac response with no unique identifier.
"""
stream = utils.BytearrayStream()
payload = mac.MACResponsePayload(
None,
self.mac_data)
args = (stream,)
self.assertRaisesRegexp(
exceptions.InvalidField,
"The mac response unique identifier is required",
payload.write,
*args
)
def test_write_with_no_data(self):
"""
Test that an InvalidField error gets raised when attempting to
write a mac response with no mac data.
"""
stream = utils.BytearrayStream()
payload = mac.MACResponsePayload(
self.unique_identifier,
None)
args = (stream,)
self.assertRaisesRegexp(
exceptions.InvalidField,
"The mac response mac data is required",
payload.write,
*args
)

View File

@ -14,6 +14,7 @@
# under the License.
from testtools import TestCase
import binascii
from kmip.core.factories.keys import KeyFactory
from kmip.core.factories.secrets import SecretFactory
@ -47,6 +48,7 @@ from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import mac
from kmip.core.misc import KeyFormatType
from kmip.core.primitives import TextString
@ -155,6 +157,18 @@ class TestRequestMessage(TestCase):
b'\x42\x00\x0b\x01\x00\x00\x00\x20\x42\x00\x55\x07\x00\x00\x00\x04'
b'\x4b\x65\x79\x31\x00\x00\x00\x00\x42\x00\x54\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00')
self.mac = (
b'\x42\x00\x78\x01\x00\x00\x00\xa0\x42\x00\x77\x01\x00\x00\x00\x38'
b'\x42\x00\x69\x01\x00\x00\x00\x20\x42\x00\x6a\x02\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x6b\x02\x00\x00\x00\x04'
b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x0d\x02\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x0f\x01\x00\x00\x00\x58'
b'\x42\x00\x5c\x05\x00\x00\x00\x04\x00\x00\x00\x23\x00\x00\x00\x00'
b'\x42\x00\x79\x01\x00\x00\x00\x40\x42\x00\x94\x07\x00\x00\x00\x01'
b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\x2b\x01\x00\x00\x00\x10'
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x0b\x00\x00\x00\x00'
b'\x42\x00\xc2\x08\x00\x00\x00\x10\x00\x01\x02\x03\x04\x05\x06\x07'
b'\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f')
def tearDown(self):
super(TestRequestMessage, self).tearDown()
@ -966,6 +980,164 @@ class TestRequestMessage(TestCase):
'Key1',
attribute_value.name_value.value))
def test_mac_request_read(self):
self.stream = BytearrayStream(self.mac)
request_message = messages.RequestMessage()
request_message.read(self.stream)
request_header = request_message.request_header
msg = "Bad request header type: expected {0}, received{0}"
self.assertIsInstance(request_header, messages.RequestHeader,
msg.format(messages.RequestHeader,
type(request_header)))
protocol_version = request_header.protocol_version
msg = "Bad protocol version type: expected {0}, received {1}"
self.assertIsInstance(protocol_version, contents.ProtocolVersion,
msg.format(contents.ProtocolVersion,
type(protocol_version)))
protocol_version_major = protocol_version.protocol_version_major
msg = "Bad protocol version major type: expected {0}, received {1}"
exp_type = contents.ProtocolVersion.ProtocolVersionMajor
rcv_type = type(protocol_version_major)
self.assertIsInstance(protocol_version_major, exp_type,
msg.format(exp_type, rcv_type))
msg = "Bad protocol version major value: expected {0}, received {1}"
self.assertEqual(1, protocol_version_major.value,
msg.format(1, protocol_version_major.value))
protocol_version_minor = protocol_version.protocol_version_minor
msg = "Bad protocol version minor type: expected {0}, received {1}"
exp_type = contents.ProtocolVersion.ProtocolVersionMinor
rcv_type = type(protocol_version_minor)
self.assertIsInstance(protocol_version_minor, exp_type,
msg.format(exp_type, rcv_type))
msg = "Bad protocol version minor value: expected {0}, received {1}"
self.assertEqual(2, protocol_version_minor.value,
msg.format(2, protocol_version_minor.value))
batch_count = request_header.batch_count
msg = "Bad batch count type: expected {0}, received {1}"
self.assertIsInstance(batch_count, contents.BatchCount,
msg.format(contents.BatchCount,
type(batch_count)))
msg = "Bad batch count value: expected {0}, received {1}"
self.assertEqual(1, batch_count.value,
msg.format(1, batch_count.value))
batch_items = request_message.batch_items
msg = "Bad batch items type: expected {0}, received {1}"
self.assertIsInstance(batch_items, list,
msg.format(list, type(batch_items)))
self.assertEquals(1, len(batch_items),
self.msg.format('batch items', 'length',
1, len(batch_items)))
batch_item = batch_items[0]
msg = "Bad batch item type: expected {0}, received {1}"
self.assertIsInstance(batch_item, messages.RequestBatchItem,
msg.format(messages.RequestBatchItem,
type(batch_item)))
operation = batch_item.operation
msg = "Bad operation type: expected {0}, received {1}"
self.assertIsInstance(operation, contents.Operation,
msg.format(contents.Operation,
type(operation)))
msg = "Bad operation value: expected {0}, received {1}"
self.assertEqual(enums.Operation.MAC, operation.value,
msg.format(enums.Operation.MAC,
operation.value))
request_payload = batch_item.request_payload
msg = "Bad request payload type: expected {0}, received {1}"
self.assertIsInstance(request_payload,
mac.MACRequestPayload,
msg.format(mac.MACRequestPayload,
type(request_payload)))
unique_identifier = request_payload.unique_identifier
msg = "Bad unique identifier type: expected {0}, received {1}"
self.assertIsInstance(unique_identifier, attr.UniqueIdentifier,
msg.format(attr.UniqueIdentifier,
type(unique_identifier)))
msg = "Bad unique identifier value: expected {0}, received {1}"
self.assertEqual('1', unique_identifier.value,
msg.format('1', unique_identifier.value))
parameters_attribute = request_payload.cryptographic_parameters
msg = "Bad cryptographic parameters type: expected {0}, received {1}"
self.assertIsInstance(parameters_attribute,
attr.CryptographicParameters,
msg.format(attr.CryptographicParameters,
type(parameters_attribute)))
cryptographic_algorithm = parameters_attribute.cryptographic_algorithm
msg = "Bad cryptographic algorithm type: expected {0}, received {1}"
self.assertIsInstance(cryptographic_algorithm,
attr.CryptographicAlgorithm,
msg.format(attr.CryptographicAlgorithm,
type(cryptographic_algorithm)))
msg = "Bad cryptographic algorithm value: expected {0}, received {1}"
self.assertEquals(cryptographic_algorithm.value,
enums.CryptographicAlgorithm.HMAC_SHA512,
msg.format(cryptographic_algorithm.value,
enums.CryptographicAlgorithm.HMAC_SHA512))
data = request_payload.data
msg = "Bad data type: expected {0}, received {1}"
self.assertIsInstance(data, objects.Data, msg.format(objects.Data,
type(data)))
exp_value = (b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B'
b'\x0C\x0D\x0E\x0F')
msg = "Bad data value: expected {0}, received {1}"
self.assertEqual(
exp_value, data.value,
msg.format(binascii.hexlify(exp_value),
binascii.hexlify(data.value))
)
def test_mac_request_write(self):
prot_ver = contents.ProtocolVersion.create(1, 2)
batch_count = contents.BatchCount(1)
req_header = messages.RequestHeader(protocol_version=prot_ver,
batch_count=batch_count)
operation = contents.Operation(enums.Operation.MAC)
uuid = attr.UniqueIdentifier('1')
data = objects.Data(
value=(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B'
b'\x0C\x0D\x0E\x0F')
)
parameters_attribute = attr.CryptographicParameters(
cryptographic_algorithm=attr.
CryptographicAlgorithm(enums.CryptographicAlgorithm.HMAC_SHA512)
)
request_payload = mac.MACRequestPayload(
unique_identifier=uuid,
cryptographic_parameters=parameters_attribute,
data=data
)
batch_item = messages.RequestBatchItem(operation=operation,
request_payload=request_payload)
request_message = messages.RequestMessage(request_header=req_header,
batch_items=[batch_item])
request_message.write(self.stream)
result = self.stream.read()
len_exp = len(self.mac)
len_rcv = len(result)
self.assertEqual(len_exp, len_rcv,
self.msg.format('request message', 'write',
len_exp, len_rcv))
msg = "Bad request message write: encoding mismatch"
self.assertEqual(self.mac, result, msg)
class TestResponseMessage(TestCase):
@ -1050,6 +1222,22 @@ class TestResponseMessage(TestCase):
b'\x34\x39\x61\x31\x63\x61\x38\x38\x2d\x36\x62\x65\x61\x2d\x34\x66'
b'\x62\x32\x2d\x62\x34\x35\x30\x2d\x37\x65\x35\x38\x38\x30\x32\x63'
b'\x33\x30\x33\x38\x00\x00\x00\x00')
self.mac = (
b'\x42\x00\x7b\x01\x00\x00\x00\xd8\x42\x00\x7a\x01\x00\x00\x00\x48'
b'\x42\x00\x69\x01\x00\x00\x00\x20\x42\x00\x6a\x02\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x6b\x02\x00\x00\x00\x04'
b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x92\x09\x00\x00\x00\x08'
b'\x00\x00\x00\x00\x58\x8a\x3f\x23\x42\x00\x0d\x02\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x0f\x01\x00\x00\x00\x80'
b'\x42\x00\x5c\x05\x00\x00\x00\x04\x00\x00\x00\x23\x00\x00\x00\x00'
b'\x42\x00\x7f\x05\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x42\x00\x7c\x01\x00\x00\x00\x58\x42\x00\x94\x07\x00\x00\x00\x01'
b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\xc6\x08\x00\x00\x00\x40'
b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb\x32\x9f'
b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2\x2a\x5b'
b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe\xc8\x23'
b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51\xff\x7c'
)
self.invalid_message_response = (
b'\x42\x00\x7b\x01\x00\x00\x00\xb0\x42\x00\x7a\x01\x00\x00\x00\x48'
b'\x42\x00\x69\x01\x00\x00\x00\x20\x42\x00\x6a\x02\x00\x00\x00\x04'
@ -1775,6 +1963,175 @@ class TestResponseMessage(TestCase):
msg = "Bad response message write: encoding mismatch"
self.assertEqual(self.locate, result, msg)
def test_mac_response_read(self):
self.stream = BytearrayStream(self.mac)
response_message = messages.ResponseMessage()
response_message.read(self.stream)
response_header = response_message.response_header
self.assertIsInstance(response_header, messages.ResponseHeader,
self.msg.format('response header', 'type',
messages.ResponseHeader,
type(response_header)))
protocol_version = response_header.protocol_version
self.assertIsInstance(protocol_version, contents.ProtocolVersion,
self.msg.format('response header', 'value',
contents.ProtocolVersion,
type(protocol_version)))
protocol_version_major = protocol_version.protocol_version_major
exp_type = contents.ProtocolVersion.ProtocolVersionMajor
rcv_type = type(protocol_version_major)
self.assertIsInstance(protocol_version_major, exp_type,
self.msg.format('protocol version major', 'type',
exp_type, rcv_type))
self.assertEqual(1, protocol_version_major.value,
self.msg.format('protocol version major', 'value',
1, protocol_version_major.value))
protocol_version_minor = protocol_version.protocol_version_minor
exp_type = contents.ProtocolVersion.ProtocolVersionMinor
rcv_type = type(protocol_version_minor)
self.assertIsInstance(protocol_version_minor, exp_type,
self.msg.format('protocol version minor', 'type',
exp_type, rcv_type))
self.assertEqual(2, protocol_version_minor.value,
self.msg.format('protocol version minor', 'value',
2, protocol_version_minor.value))
time_stamp = response_header.time_stamp
value = 0x588a3f23
self.assertIsInstance(time_stamp, contents.TimeStamp,
self.msg.format('time stamp', 'value',
contents.TimeStamp,
type(time_stamp)))
self.assertEqual(time_stamp.value, value,
self.msg.format('time stamp', 'value',
time_stamp.value, value))
batch_count = response_header.batch_count
self.assertIsInstance(batch_count, contents.BatchCount,
self.msg.format('batch count', 'type',
contents.BatchCount,
type(batch_count)))
self.assertEqual(1, batch_count.value,
self.msg.format('batch count', 'value', 1,
batch_count.value))
batch_items = response_message.batch_items
self.assertIsInstance(batch_items, list,
self.msg.format('batch items', 'type',
list, type(batch_items)))
for batch_item in batch_items:
self.assertIsInstance(batch_item, messages.ResponseBatchItem,
self.msg.format('batch item', 'type',
messages.ResponseBatchItem,
type(batch_item)))
operation = batch_item.operation
self.assertIsInstance(operation, contents.Operation,
self.msg.format('operation', 'type',
contents.Operation,
type(operation)))
self.assertEqual(enums.Operation.MAC, operation.value,
self.msg.format('operation', 'value',
enums.Operation.MAC,
operation.value))
result_status = batch_item.result_status
self.assertIsInstance(result_status, contents.ResultStatus,
self.msg.format('result status', 'type',
contents.ResultStatus,
type(result_status)))
self.assertEqual(enums.ResultStatus.SUCCESS, result_status.value,
self.msg.format('result status', 'value',
enums.ResultStatus.SUCCESS,
result_status.value))
response_payload = batch_item.response_payload
exp_type = mac.MACResponsePayload
rcv_type = type(response_payload)
self.assertIsInstance(response_payload, exp_type,
self.msg.format('response payload', 'type',
exp_type, rcv_type))
unique_identifier = response_payload.unique_identifier
value = '1'
self.assertIsInstance(unique_identifier, attr.UniqueIdentifier,
self.msg.format('unique identifier', 'type',
attr.UniqueIdentifier,
type(unique_identifier)))
self.assertEqual(value, unique_identifier.value,
self.msg.format('unique identifier', 'value',
unique_identifier.value, value))
mac_data = response_payload.mac_data
value = \
(b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb'
b'\x32\x9f'
b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2'
b'\x2a\x5b'
b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe'
b'\xc8\x23'
b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51'
b'\xff\x7c')
self.assertIsInstance(mac_data, objects.MACData,
self.msg.format('secret', 'type',
objects.MACData,
type(mac_data)))
self.assertEqual(value, mac_data.value,
self.msg.format('mac data', 'value',
binascii.hexlify(mac_data.value),
binascii.hexlify(value)))
def test_mac_response_write(self):
prot_ver = contents.ProtocolVersion.create(1, 2)
# Fri Apr 27 10:12:23 CEST 2012
time_stamp = contents.TimeStamp(0x588a3f23)
batch_count = contents.BatchCount(1)
response_header = messages.ResponseHeader(protocol_version=prot_ver,
time_stamp=time_stamp,
batch_count=batch_count)
operation = contents.Operation(enums.Operation.MAC)
result_status = contents.ResultStatus(enums.ResultStatus.SUCCESS)
uuid = '1'
uniq_id = attr.UniqueIdentifier(uuid)
value = \
(b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb'
b'\x32\x9f'
b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2'
b'\x2a\x5b'
b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe'
b'\xc8\x23'
b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51'
b'\xff\x7c')
mac_data = objects.MACData(value)
resp_pl = mac.MACResponsePayload(unique_identifier=uniq_id,
mac_data=mac_data)
batch_item = messages.ResponseBatchItem(operation=operation,
result_status=result_status,
response_payload=resp_pl)
rm = messages.ResponseMessage(response_header=response_header,
batch_items=[batch_item])
rm.write(self.stream)
result = self.stream.read()
len_exp = len(self.mac)
len_rcv = len(result)
self.assertEqual(len_exp, len_rcv,
self.msg.format('get response message', 'write',
len_exp, len_rcv))
msg = "Bad response message write: encoding mismatch"
self.assertEqual(self.mac, result, msg)
def test_message_invalid_response_write(self):
# Batch item of 'INVALID MESSAGE' response
# has no 'operation' attribute