Update the Get request and response payloads

This change updates the Get payloads to reflect current coding
styles. It streamlines payload usage and makes it easier to create
and access payload fields. A new unit test suite dedicated to the
Get payloads is added. Surrounding code in the client/server and
associated utilities and unit tests are updated to reflect these
changes.
This commit is contained in:
Peter Hamilton 2017-07-20 12:36:42 -04:00
parent 758bc348f7
commit ba47813553
7 changed files with 2006 additions and 198 deletions

View File

@ -13,143 +13,483 @@
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core.factories.secrets import SecretFactory
import six
from kmip.core import attributes
from kmip.core import enums
from kmip.core.enums import Tags
from kmip.core import objects
from kmip.core import primitives
from kmip.core import secrets
from kmip.core import utils
from kmip.core.objects import KeyWrappingSpecification
from kmip.core.primitives import Struct
from kmip.core.primitives import Enumeration
from kmip.core.utils import BytearrayStream
from kmip.core.factories import secrets as secret_factory
# 4.11
class GetRequestPayload(Struct):
class GetRequestPayload(primitives.Struct):
"""
A request payload for the Get operation.
# 9.1.3.2.2
class KeyCompressionType(Enumeration):
def __init__(self, value=None):
super(GetRequestPayload.KeyCompressionType, self).__init__(
enums.KeyCompressionType, value, Tags.KEY_COMPRESSION_TYPE)
# 9.1.3.2.3
class KeyFormatType(Enumeration):
def __init__(self, value=None):
super(GetRequestPayload.KeyFormatType, self).__init__(
enums.KeyFormatType, value, Tags.KEY_FORMAT_TYPE)
Attributes:
unique_identifier: The unique ID of the managed object to retrieve
from the server.
key_format_type: The format of the returned object, if it is a key.
key_compression_type: The compression method to be used for the
returned object, if it is an elliptic curve public key.
key_wrapping_specification: A collection of settings specifying how
the returned object should be cryptographically wrapped if it is
a key.
"""
def __init__(self,
unique_identifier=None,
key_format_type=None,
key_compression_type=None,
key_wrapping_specification=None):
super(GetRequestPayload, self).__init__(tag=enums.Tags.REQUEST_PAYLOAD)
"""
Construct a Get request payload struct.
Args:
unique_identifier (string): The ID of the managed object (e.g., a
symmetric key) to retrieve. Optional, defaults to None.
key_format_type (KeyFormatType): A KeyFormatType enumeration that
specifies the format in which the object should be returned.
Optional, defaults to None.
key_compression_type (KeyCompressionType): A KeyCompressionType
enumeration that specifies the compression method to be used
when returning elliptic curve public keys. Optional, defaults
to None.
key_wrapping_specification (KeyWrappingSpecification): A
KeyWrappingSpecification struct that specifies keys and other
information for wrapping the returned object. Optional,
defaults to None.
"""
super(GetRequestPayload, self).__init__(
enums.Tags.REQUEST_PAYLOAD
)
self._unique_identifier = None
self._key_format_type = None
self._key_compression_type = None
self._key_wrapping_specification = None
self.unique_identifier = unique_identifier
self.key_format_type = key_format_type
self.key_compression_type = key_compression_type
self.key_wrapping_specification = key_wrapping_specification
self.validate()
def read(self, istream):
super(GetRequestPayload, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
@property
def unique_identifier(self):
if self._unique_identifier:
return self._unique_identifier.value
else:
return None
if self.is_tag_next(Tags.UNIQUE_IDENTIFIER, tstream):
self.unique_identifier = attributes.UniqueIdentifier()
self.unique_identifier.read(tstream)
@unique_identifier.setter
def unique_identifier(self, value):
if value is None:
self._unique_identifier = None
elif isinstance(value, six.string_types):
self._unique_identifier = primitives.TextString(
value=value,
tag=enums.Tags.UNIQUE_IDENTIFIER
)
else:
raise TypeError("Unique identifier must be a string.")
if self.is_tag_next(Tags.KEY_FORMAT_TYPE, tstream):
self.key_format_type = GetRequestPayload.KeyFormatType()
self.key_format_type.read(tstream)
@property
def key_format_type(self):
if self._key_format_type:
return self._key_format_type.value
else:
return None
if self.is_tag_next(Tags.KEY_COMPRESSION_TYPE, tstream):
self.key_compression_type = GetRequestPayload.KeyCompressionType()
self.key_compression_type.read(tstream)
@key_format_type.setter
def key_format_type(self, value):
if value is None:
self._key_format_type = None
elif isinstance(value, enums.KeyFormatType):
self._key_format_type = primitives.Enumeration(
enums.KeyFormatType,
value=value,
tag=enums.Tags.KEY_FORMAT_TYPE
)
else:
raise TypeError(
"Key format type must be a KeyFormatType enumeration."
)
if self.is_tag_next(Tags.KEY_WRAPPING_SPECIFICATION, tstream):
self.key_wrapping_specification = KeyWrappingSpecification()
self.key_wrapping_specification.read(tstream)
@property
def key_compression_type(self):
if self._key_compression_type:
return self._key_compression_type.value
else:
return None
self.is_oversized(tstream)
self.validate()
@key_compression_type.setter
def key_compression_type(self, value):
if value is None:
self._key_compression_type = None
elif isinstance(value, enums.KeyCompressionType):
self._key_compression_type = primitives.Enumeration(
enums.KeyCompressionType,
value=value,
tag=enums.Tags.KEY_COMPRESSION_TYPE
)
else:
raise TypeError(
"Key compression type must be a KeyCompressionType "
"enumeration."
)
def write(self, ostream):
tstream = BytearrayStream()
@property
def key_wrapping_specification(self):
return self._key_wrapping_specification
# Write the contents of the request payload
if self.unique_identifier is not None:
self.unique_identifier.write(tstream)
if self.key_format_type is not None:
self.key_format_type.write(tstream)
if self.key_compression_type is not None:
self.key_compression_type.write(tstream)
if self.key_wrapping_specification is not None:
self.key_wrapping_specification.write(tstream)
@key_wrapping_specification.setter
def key_wrapping_specification(self, value):
if value is None:
self._key_wrapping_specification = None
elif isinstance(value, objects.KeyWrappingSpecification):
self._key_wrapping_specification = value
else:
raise TypeError(
"Key wrapping specification must be a "
"KeyWrappingSpecification struct."
)
# Write the length and value of the request payload
self.length = tstream.length()
super(GetRequestPayload, self).write(ostream)
ostream.write(tstream.buffer)
def read(self, input_stream):
"""
Read the data encoding the Get request payload and decode it into its
constituent parts.
def validate(self):
self.__validate()
Args:
input_stream (stream): A data stream containing encoded object
data, supporting a read method; usually a BytearrayStream
object.
"""
super(GetRequestPayload, self).read(input_stream)
local_stream = utils.BytearrayStream(input_stream.read(self.length))
def __validate(self):
# TODO (peter-hamilton) Finish implementation
pass
if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream):
self._unique_identifier = primitives.TextString(
tag=enums.Tags.UNIQUE_IDENTIFIER
)
self._unique_identifier.read(local_stream)
if self.is_tag_next(enums.Tags.KEY_FORMAT_TYPE, local_stream):
self._key_format_type = primitives.Enumeration(
enum=enums.KeyFormatType,
tag=enums.Tags.KEY_FORMAT_TYPE
)
self._key_format_type.read(local_stream)
if self.is_tag_next(enums.Tags.KEY_COMPRESSION_TYPE, local_stream):
self._key_compression_type = primitives.Enumeration(
enum=enums.KeyCompressionType,
tag=enums.Tags.KEY_COMPRESSION_TYPE
)
self._key_compression_type.read(local_stream)
if self.is_tag_next(
enums.Tags.KEY_WRAPPING_SPECIFICATION,
local_stream
):
self._key_wrapping_specification = \
objects.KeyWrappingSpecification()
self._key_wrapping_specification.read(local_stream)
self.is_oversized(local_stream)
def write(self, output_stream):
"""
Write the data encoding the Get request payload to a stream.
Args:
output_stream (stream): A data stream in which to encode object
data, supporting a write method; usually a BytearrayStream
object.
"""
local_stream = utils.BytearrayStream()
if self._unique_identifier is not None:
self._unique_identifier.write(local_stream)
if self._key_format_type is not None:
self._key_format_type.write(local_stream)
if self._key_compression_type is not None:
self._key_compression_type.write(local_stream)
if self._key_wrapping_specification is not None:
self._key_wrapping_specification.write(local_stream)
self.length = local_stream.length()
super(GetRequestPayload, self).write(output_stream)
output_stream.write(local_stream.buffer)
def __eq__(self, other):
if isinstance(other, GetRequestPayload):
if self.unique_identifier != other.unique_identifier:
return False
elif self.key_format_type != other.key_format_type:
return False
elif self.key_compression_type != other.key_compression_type:
return False
elif self.key_wrapping_specification != \
other.key_wrapping_specification:
return False
else:
return True
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, GetRequestPayload):
return not (self == other)
else:
return NotImplemented
def __repr__(self):
args = ", ".join([
"unique_identifier='{0}'".format(self.unique_identifier),
"key_format_type={0}".format(self.key_format_type),
"key_compression_type={0}".format(self.key_compression_type),
"key_wrapping_specification={0}".format(
repr(self.key_wrapping_specification)
)
])
return "GetRequestPayload({0})".format(args)
def __str__(self):
return str({
'unique_identifier': self.unique_identifier,
'key_format_type': self.key_format_type,
'key_compression_type': self.key_compression_type,
'key_wrapping_specification': self.key_wrapping_specification
})
class GetResponsePayload(Struct):
class GetResponsePayload(primitives.Struct):
"""
A response payload for the Get operation.
Attributes:
object_type: The type of the managed object being returned.
unique_identifier: The unique ID of the managed object being returned.
secret: The managed object being returned.
"""
def __init__(self,
object_type=None,
unique_identifier=None,
secret=None):
super(GetResponsePayload, self).__init__(tag=Tags.RESPONSE_PAYLOAD)
"""
Construct a Get response payload struct.
Args:
object_type (ObjectType): An ObjectType enumeration specifying the
type of managed object being returned. Optional, defaults to
None. Required for read/write.
unique_identifier (string): The ID of the managed object (e.g., a
symmetric key) being returned. Optional, defaults to None.
Required for read/write.
secret (various): The managed object struct being returned. Must
be one of the following:
Optional, defaults to None. Required for read/write.
"""
super(GetResponsePayload, self).__init__(
tag=enums.Tags.RESPONSE_PAYLOAD
)
self._object_type = None
self._unique_identifier = None
self._secret = None
self.object_type = object_type
self.unique_identifier = unique_identifier
self.secret = secret
self.secret_factory = SecretFactory()
self.validate()
def read(self, istream):
super(GetResponsePayload, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
self.secret_factory = secret_factory.SecretFactory()
self.object_type = attributes.ObjectType()
self.unique_identifier = attributes.UniqueIdentifier()
@property
def object_type(self):
if self._object_type:
return self._object_type.value
else:
return None
self.object_type.read(tstream)
self.unique_identifier.read(tstream)
@object_type.setter
def object_type(self, value):
if value is None:
self._object_type = None
elif isinstance(value, enums.ObjectType):
self._object_type = primitives.Enumeration(
enums.ObjectType,
value=value,
tag=enums.Tags.OBJECT_TYPE
)
else:
raise TypeError("Object type must be an ObjectType enumeration.")
secret_type = self.object_type.value
self.secret = self.secret_factory.create(secret_type)
self.secret.read(tstream)
@property
def unique_identifier(self):
if self._unique_identifier:
return self._unique_identifier.value
else:
return None
self.is_oversized(tstream)
self.validate()
@unique_identifier.setter
def unique_identifier(self, value):
if value is None:
self._unique_identifier = None
elif isinstance(value, six.string_types):
self._unique_identifier = primitives.TextString(
value=value,
tag=enums.Tags.UNIQUE_IDENTIFIER
)
else:
raise TypeError("Unique identifier must be a string.")
def write(self, ostream):
tstream = BytearrayStream()
@property
def secret(self):
return self._secret
self.object_type.write(tstream)
self.unique_identifier.write(tstream)
self.secret.write(tstream)
@secret.setter
def secret(self, value):
if value is None:
self._secret = None
elif isinstance(
value,
(
secrets.Certificate,
secrets.OpaqueObject,
secrets.PrivateKey,
secrets.PublicKey,
secrets.SecretData,
secrets.SplitKey,
secrets.SymmetricKey,
secrets.Template
)
):
self._secret = value
else:
raise TypeError(
"Secret must be one of the following structs: Certificate, "
"OpaqueObject, PrivateKey, PublicKey, SecretData, SplitKey, "
"SymmetricKey, Template"
)
# Write the length and value of the request payload
self.length = tstream.length()
super(GetResponsePayload, self).write(ostream)
ostream.write(tstream.buffer)
def read(self, input_stream):
"""
Read the data encoding the Get response payload and decode it
into its constituent parts.
def validate(self):
self.__validate()
Args:
input_stream (stream): A data stream containing encoded object
data, supporting a read method; usually a BytearrayStream
object.
def __validate(self):
# TODO (peter-hamilton) Finish implementation.
pass
Raises:
ValueError: Raised if the object type, unique identifier, or
secret attributes are missing from the encoded payload.
"""
super(GetResponsePayload, self).read(input_stream)
local_stream = utils.BytearrayStream(input_stream.read(self.length))
if self.is_tag_next(enums.Tags.OBJECT_TYPE, local_stream):
self._object_type = primitives.Enumeration(
enum=enums.ObjectType,
tag=enums.Tags.OBJECT_TYPE
)
self._object_type.read(local_stream)
else:
raise ValueError(
"Parsed payload encoding is missing the object type field."
)
if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream):
self._unique_identifier = primitives.TextString(
tag=enums.Tags.UNIQUE_IDENTIFIER
)
self._unique_identifier.read(local_stream)
else:
raise ValueError(
"Parsed payload encoding is missing the unique identifier "
"field."
)
self.secret = self.secret_factory.create(self.object_type)
if self.is_tag_next(self._secret.tag, local_stream):
self._secret.read(local_stream)
else:
raise ValueError(
"Parsed payload encoding is missing the secret field."
)
self.is_oversized(local_stream)
def write(self, output_stream):
"""
Write the data encoding the Get response payload to a stream.
Args:
output_stream (stream): A data stream in which to encode object
data, supporting a write method; usually a BytearrayStream
object.
Raises:
ValueError: Raised if the object type, unique identifier, or
secret attributes are missing from the payload struct.
"""
local_stream = utils.BytearrayStream()
if self.object_type:
self._object_type.write(local_stream)
else:
raise ValueError("Payload is missing the object type field.")
if self.unique_identifier:
self._unique_identifier.write(local_stream)
else:
raise ValueError(
"Payload is missing the unique identifier field."
)
if self.secret:
self._secret.write(local_stream)
else:
raise ValueError("Payload is missing the secret field.")
self.length = local_stream.length()
super(GetResponsePayload, self).write(output_stream)
output_stream.write(local_stream.buffer)
def __eq__(self, other):
if isinstance(other, GetResponsePayload):
if self.object_type != other.object_type:
return False
elif self.unique_identifier != other.unique_identifier:
return False
elif self.secret != other.secret:
return False
else:
return True
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, GetResponsePayload):
return not (self == other)
else:
return NotImplemented
def __repr__(self):
args = ", ".join([
"object_type={0}".format(self.object_type),
"unique_identifier='{0}'".format(self.unique_identifier),
"secret={0}".format(repr(self.secret))
])
return "GetResponsePayload({0})".format(args)
def __str__(self):
return str({
'object_type': self.object_type,
'unique_identifier': self.unique_identifier,
'secret': self.secret
})

View File

@ -92,9 +92,9 @@ if __name__ == '__main__':
if result.result_status.value == ResultStatus.SUCCESS:
logger.info('retrieved object type: {0}'.format(
result.object_type.value))
logger.info('retrieved UUID: {0}'.format(result.uuid.value))
logger.info('retrieved UUID: {0}'.format(result.uuid))
utils.log_secret(logger, result.object_type.value, result.secret)
utils.log_secret(logger, result.object_type, result.secret)
else:
logger.info('get() result reason: {0}'.format(
result.result_reason.value))

View File

@ -769,25 +769,19 @@ class KMIPProxy(KMIP):
credential=None):
operation = Operation(OperationEnum.GET)
uuid = None
kft = None
kct = None
kws = None
if unique_identifier is not None:
uuid = attr.UniqueIdentifier(unique_identifier)
if key_format_type is not None:
kft = get.GetRequestPayload.KeyFormatType(key_format_type.value)
if key_compression_type is not None:
kct = key_compression_type
kct = get.GetRequestPayload.KeyCompressionType(kct)
key_format_type = key_format_type.value
if key_wrapping_specification is not None:
kws = objects.KeyWrappingSpecification(key_wrapping_specification)
req_pl = get.GetRequestPayload(unique_identifier=uuid,
key_format_type=kft,
key_compression_type=kct,
key_wrapping_specification=kws)
req_pl = get.GetRequestPayload(
unique_identifier=unique_identifier,
key_format_type=key_format_type,
key_compression_type=key_compression_type,
key_wrapping_specification=kws
)
batch_item = messages.RequestBatchItem(operation=operation,
request_payload=req_pl)

View File

@ -1378,11 +1378,11 @@ class KmipEngine(object):
unique_identifier = self._id_placeholder
if payload.unique_identifier:
unique_identifier = payload.unique_identifier.value
unique_identifier = payload.unique_identifier
key_format_type = None
if payload.key_format_type:
key_format_type = payload.key_format_type.value
key_format_type = payload.key_format_type
if payload.key_compression_type:
raise exceptions.KeyCompressionTypeNotSupported(
@ -1429,8 +1429,8 @@ class KmipEngine(object):
core_secret = self._build_core_object(managed_object)
response_payload = get.GetResponsePayload(
object_type=attributes.ObjectType(managed_object._object_type),
unique_identifier=attributes.UniqueIdentifier(unique_identifier),
object_type=managed_object._object_type,
unique_identifier=unique_identifier,
secret=core_secret
)

File diff suppressed because it is too large Load Diff

View File

@ -480,16 +480,16 @@ class TestRequestMessage(TestCase):
msg.format(get.GetRequestPayload,
type(request_payload)))
unique_identifier = request_payload.unique_identifier
msg = "Bad unique identifier type: expected {0}, received {1}"
self.assertIsInstance(unique_identifier, attr.UniqueIdentifier,
msg.format(attr.UniqueIdentifier,
type(unique_identifier)))
msg = "Bad unique identifier value: expected {0}, received {1}"
self.assertEqual('49a1ca88-6bea-4fb2-b450-7e58802c3038',
unique_identifier.value,
msg.format('49a1ca88-6bea-4fb2-b450-7e58802c3038',
unique_identifier.value))
# unique_identifier = request_payload.unique_identifier
# msg = "Bad unique identifier type: expected {0}, received {1}"
# self.assertIsInstance(unique_identifier, attr.UniqueIdentifier,
# msg.format(attr.UniqueIdentifier,
# type(unique_identifier)))
# msg = "Bad unique identifier value: expected {0}, received {1}"
self.assertEqual(
'49a1ca88-6bea-4fb2-b450-7e58802c3038',
request_payload.unique_identifier
)
def test_get_request_write(self):
prot_ver = contents.ProtocolVersion.create(1, 1)
@ -500,8 +500,10 @@ class TestRequestMessage(TestCase):
operation = contents.Operation(enums.Operation.GET)
uuid = attr.UniqueIdentifier('49a1ca88-6bea-4fb2-b450-7e58802c3038')
request_payload = get.GetRequestPayload(unique_identifier=uuid)
# uuid = attr.UniqueIdentifier('49a1ca88-6bea-4fb2-b450-7e58802c3038')
request_payload = get.GetRequestPayload(
unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038'
)
batch_item = messages.RequestBatchItem(operation=operation,
request_payload=request_payload)
request_message = messages.RequestMessage(request_header=req_header,
@ -1498,25 +1500,14 @@ class TestResponseMessage(TestCase):
self.msg.format('response payload', 'type',
exp_type, rcv_type))
object_type = response_payload.object_type
self.assertIsInstance(object_type, attr.ObjectType,
self.msg.format('object type', 'type',
attr.ObjectType,
type(object_type)))
self.assertEqual(enums.ObjectType.SYMMETRIC_KEY, object_type.value,
self.msg.format('object type', 'value',
enums.ObjectType.SYMMETRIC_KEY,
object_type.value))
unique_identifier = response_payload.unique_identifier
value = '49a1ca88-6bea-4fb2-b450-7e58802c3038'
self.assertIsInstance(unique_identifier, attr.UniqueIdentifier,
self.msg.format('unique identifier', 'type',
attr.UniqueIdentifier,
type(unique_identifier)))
self.assertEqual(value, unique_identifier.value,
self.msg.format('unique identifier', 'value',
unique_identifier.value, value))
self.assertEqual(
enums.ObjectType.SYMMETRIC_KEY,
response_payload.object_type
)
self.assertEqual(
'49a1ca88-6bea-4fb2-b450-7e58802c3038',
response_payload.unique_identifier
)
secret = response_payload.secret
self.assertIsInstance(secret, SymmetricKey,
@ -1619,8 +1610,8 @@ class TestResponseMessage(TestCase):
secret = SymmetricKey(key_block)
resp_pl = get.GetResponsePayload(object_type=object_type,
unique_identifier=uniq_id,
resp_pl = get.GetResponsePayload(object_type=object_type.value,
unique_identifier=uniq_id.value,
secret=secret)
batch_item = messages.ResponseBatchItem(operation=operation,
result_status=result_status,

View File

@ -3679,9 +3679,7 @@ class TestKmipEngine(testtools.TestCase):
id_b = str(obj_b.unique_identifier)
# Test by specifying the ID of the object to get.
payload = get.GetRequestPayload(
unique_identifier=attributes.UniqueIdentifier(id_a)
)
payload = get.GetRequestPayload(unique_identifier=id_a)
response_payload = e._process_get(payload)
e._data_session.commit()
@ -3692,9 +3690,9 @@ class TestKmipEngine(testtools.TestCase):
)
self.assertEqual(
enums.ObjectType.OPAQUE_DATA,
response_payload.object_type.value
response_payload.object_type
)
self.assertEqual(str(id_a), response_payload.unique_identifier.value)
self.assertEqual(str(id_a), response_payload.unique_identifier)
self.assertIsInstance(response_payload.secret, secrets.OpaqueObject)
self.assertEqual(
enums.OpaqueDataType.NONE,
@ -3722,9 +3720,9 @@ class TestKmipEngine(testtools.TestCase):
)
self.assertEqual(
enums.ObjectType.OPAQUE_DATA,
response_payload.object_type.value
response_payload.object_type
)
self.assertEqual(str(id_b), response_payload.unique_identifier.value)
self.assertEqual(str(id_b), response_payload.unique_identifier)
self.assertIsInstance(response_payload.secret, secrets.OpaqueObject)
self.assertEqual(
enums.OpaqueDataType.NONE,
@ -3749,11 +3747,8 @@ class TestKmipEngine(testtools.TestCase):
e._logger = mock.MagicMock()
# Test that specifying the key compression type generates an error.
payload = get.GetRequestPayload(
key_compression_type=get.GetRequestPayload.KeyCompressionType(
enums.KeyCompressionType.EC_PUBLIC_KEY_TYPE_UNCOMPRESSED
)
)
k = enums.KeyCompressionType.EC_PUBLIC_KEY_TYPE_UNCOMPRESSED
payload = get.GetRequestPayload(key_compression_type=k)
args = (payload, )
regex = "Key compression is not supported."
@ -3813,10 +3808,8 @@ class TestKmipEngine(testtools.TestCase):
# Test that a key can be retrieved with the right key format.
payload = get.GetRequestPayload(
unique_identifier=attributes.UniqueIdentifier(id_a),
key_format_type=get.GetRequestPayload.KeyFormatType(
enums.KeyFormatType.RAW
)
unique_identifier=id_a,
key_format_type=enums.KeyFormatType.RAW
)
response_payload = e._process_get(payload)
@ -3850,10 +3843,8 @@ class TestKmipEngine(testtools.TestCase):
e._logger.reset_mock()
payload = get.GetRequestPayload(
unique_identifier=attributes.UniqueIdentifier(id_a),
key_format_type=get.GetRequestPayload.KeyFormatType(
enums.KeyFormatType.OPAQUE
)
unique_identifier=id_a,
key_format_type=enums.KeyFormatType.OPAQUE
)
args = (payload, )
@ -3883,10 +3874,8 @@ class TestKmipEngine(testtools.TestCase):
id_b = str(obj_b.unique_identifier)
payload = get.GetRequestPayload(
unique_identifier=attributes.UniqueIdentifier(id_b),
key_format_type=get.GetRequestPayload.KeyFormatType(
enums.KeyFormatType.RAW
)
unique_identifier=id_b,
key_format_type=enums.KeyFormatType.RAW
)
args = (payload, )
@ -3921,9 +3910,7 @@ class TestKmipEngine(testtools.TestCase):
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
payload = get.GetRequestPayload(
unique_identifier=attributes.UniqueIdentifier(id_a)
)
payload = get.GetRequestPayload(unique_identifier=id_a)
# Test by specifying the ID of the object to get.
args = [payload]
@ -5931,9 +5918,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.reset_mock()
# Retrieve the created key using Get and verify all fields set
payload = get.GetRequestPayload(
unique_identifier=attributes.UniqueIdentifier(uid)
)
payload = get.GetRequestPayload(unique_identifier=uid)
response_payload = e._process_get(payload)
e._data_session.commit()
@ -5944,9 +5929,9 @@ class TestKmipEngine(testtools.TestCase):
)
self.assertEqual(
enums.ObjectType.SYMMETRIC_KEY,
response_payload.object_type.value
response_payload.object_type
)
self.assertEqual(str(uid), response_payload.unique_identifier.value)
self.assertEqual(str(uid), response_payload.unique_identifier)
self.assertIsInstance(response_payload.secret, secrets.SymmetricKey)
key_block = response_payload.secret.key_block
@ -6070,9 +6055,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.reset_mock()
# Retrieve the created public key using Get and verify all fields set
payload = get.GetRequestPayload(
unique_identifier=attributes.UniqueIdentifier(public_id)
)
payload = get.GetRequestPayload(unique_identifier=public_id)
response_payload = e._process_get(payload)
e._data_session.commit()
@ -6083,12 +6066,9 @@ class TestKmipEngine(testtools.TestCase):
)
self.assertEqual(
enums.ObjectType.PUBLIC_KEY,
response_payload.object_type.value
)
self.assertEqual(
str(public_id),
response_payload.unique_identifier.value
response_payload.object_type
)
self.assertEqual(str(public_id), response_payload.unique_identifier)
self.assertIsInstance(response_payload.secret, secrets.PublicKey)
key_block = response_payload.secret.key_block
@ -6108,9 +6088,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.reset_mock()
# Retrieve the created private key using Get and verify all fields set
payload = get.GetRequestPayload(
unique_identifier=attributes.UniqueIdentifier(private_id)
)
payload = get.GetRequestPayload(unique_identifier=private_id)
response_payload = e._process_get(payload)
e._data_session.commit()
@ -6121,12 +6099,9 @@ class TestKmipEngine(testtools.TestCase):
)
self.assertEqual(
enums.ObjectType.PRIVATE_KEY,
response_payload.object_type.value
)
self.assertEqual(
str(private_id),
response_payload.unique_identifier.value
response_payload.object_type
)
self.assertEqual(str(private_id), response_payload.unique_identifier)
self.assertIsInstance(response_payload.secret, secrets.PrivateKey)
key_block = response_payload.secret.key_block
@ -6295,9 +6270,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.reset_mock()
# Retrieve the registered key using Get and verify all fields set
payload = get.GetRequestPayload(
unique_identifier=attributes.UniqueIdentifier(uid)
)
payload = get.GetRequestPayload(unique_identifier=uid)
response_payload = e._process_get(payload)
e._data_session.commit()
@ -6308,9 +6281,9 @@ class TestKmipEngine(testtools.TestCase):
)
self.assertEqual(
enums.ObjectType.SYMMETRIC_KEY,
response_payload.object_type.value
response_payload.object_type
)
self.assertEqual(str(uid), response_payload.unique_identifier.value)
self.assertEqual(str(uid), response_payload.unique_identifier)
self.assertIsInstance(response_payload.secret, secrets.SymmetricKey)
self.assertEqual(
key_bytes,