From b2aba64263a9335223be049eafd3a7aa7c458e40 Mon Sep 17 00:00:00 2001 From: dane-fichter Date: Tue, 28 Jun 2016 01:11:18 -0700 Subject: [PATCH] Add signature_utils module This change ports Nova's signature_utils module into the cursive library. Change-Id: Ic54dc204e41b3758bc2e8e1571d697931b371889 Partial-Bug: #1528349 --- cursive/exception.py | 52 +++ cursive/i18n.py | 36 +++ cursive/signature_utils.py | 339 ++++++++++++++++++++ cursive/tests/test_cursive.py | 28 -- cursive/tests/unit/__init__.py | 19 ++ cursive/tests/unit/test_signature_utils.py | 347 +++++++++++++++++++++ requirements.txt | 8 + tox.ini | 2 +- 8 files changed, 802 insertions(+), 29 deletions(-) create mode 100644 cursive/exception.py create mode 100644 cursive/i18n.py create mode 100644 cursive/signature_utils.py delete mode 100644 cursive/tests/test_cursive.py create mode 100644 cursive/tests/unit/__init__.py create mode 100644 cursive/tests/unit/test_signature_utils.py diff --git a/cursive/exception.py b/cursive/exception.py new file mode 100644 index 0000000..f9b7c1a --- /dev/null +++ b/cursive/exception.py @@ -0,0 +1,52 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Cursive base exception handling""" + +from cursive.i18n import _ + + +class CursiveException(Exception): + """Base Cursive Exception + + To correctly use this class, inherit from it and define + a 'msg_fmt' property. That msg_fmt will get printf'd + with the keyword arguments provided to the constructor. + + """ + msg_fmt = _("An unknown exception occurred.") + headers = {} + safe = False + + def __init__(self, message=None, **kwargs): + self.kwargs = kwargs + + if not message: + try: + message = self.msg_fmt % kwargs + + except Exception: + # at least get the core message out if something happened + message = self.msg_fmt + + self.message = message + super(CursiveException, self).__init__(message) + + def format_message(self): + # NOTE(dane-fichter): use the first argument to the python Exception + # object which should be our full CursiveException message + return self.args[0] + + +class SignatureVerificationError(CursiveException): + msg_fmt = _("Signature verification for the image " + "failed: %(reason)s.") diff --git a/cursive/i18n.py b/cursive/i18n.py new file mode 100644 index 0000000..6037320 --- /dev/null +++ b/cursive/i18n.py @@ -0,0 +1,36 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""oslo.i18n integration module. + +See http://docs.openstack.org/developer/oslo.i18n/usage.html . + +""" + +import oslo_i18n + +DOMAIN = 'cursive' + +_translators = oslo_i18n.TranslatorFactory(domain=DOMAIN) + +# The primary translation function using the well-known name "_" +_ = _translators.primary + +# Translators for log levels. +# +# The abbreviated names are meant to reflect the usual use of a short +# name like '_'. The "L" is for "log" and the other letter comes from +# the level. +_LI = _translators.log_info +_LW = _translators.log_warning +_LE = _translators.log_error +_LC = _translators.log_critical diff --git a/cursive/signature_utils.py b/cursive/signature_utils.py new file mode 100644 index 0000000..1a56112 --- /dev/null +++ b/cursive/signature_utils.py @@ -0,0 +1,339 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Support signature verification.""" + +import binascii + +from castellan.common.exception import KeyManagerError +from castellan import key_manager +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import dsa +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import hashes +from cryptography import x509 +from oslo_log import log as logging +from oslo_serialization import base64 +from oslo_utils import encodeutils +from oslo_utils import timeutils + +from cursive import exception +from cursive.i18n import _, _LE + +LOG = logging.getLogger(__name__) + + +HASH_METHODS = { + 'SHA-224': hashes.SHA224(), + 'SHA-256': hashes.SHA256(), + 'SHA-384': hashes.SHA384(), + 'SHA-512': hashes.SHA512(), +} + +# Currently supported signature key types +# RSA Options +RSA_PSS = 'RSA-PSS' +# DSA Options +DSA = 'DSA' + +# ECC curves -- note that only those with key sizes >=384 are included +# Note also that some of these may not be supported by the cryptography backend +ECC_CURVES = ( + ec.SECT571K1(), + ec.SECT409K1(), + ec.SECT571R1(), + ec.SECT409R1(), + ec.SECP521R1(), + ec.SECP384R1(), +) + +# These are the currently supported certificate formats +X_509 = 'X.509' + +CERTIFICATE_FORMATS = { + X_509, +} + +# These are the currently supported MGF formats, used for RSA-PSS signatures +MASK_GEN_ALGORITHMS = { + 'MGF1': padding.MGF1, +} + + +class SignatureKeyType(object): + + _REGISTERED_TYPES = {} + + def __init__(self, name, public_key_type, create_verifier): + self.name = name + self.public_key_type = public_key_type + self.create_verifier = create_verifier + + @classmethod + def register(cls, name, public_key_type, create_verifier): + """Register a signature key type. + + :param name: the name of the signature key type + :param public_key_type: e.g. RSAPublicKey, DSAPublicKey, etc. + :param create_verifier: a function to create a verifier for this type + """ + cls._REGISTERED_TYPES[name] = cls(name, + public_key_type, + create_verifier) + + @classmethod + def lookup(cls, name): + """Look up the signature key type. + + :param name: the name of the signature key type + :returns: the SignatureKeyType object + :raises: SignatureVerificationError if signature key type is invalid + """ + if name not in cls._REGISTERED_TYPES: + raise exception.SignatureVerificationError( + reason=_('Invalid signature key type: %s') % name) + return cls._REGISTERED_TYPES[name] + + +# each key type will require its own verifier +def create_verifier_for_pss(signature, hash_method, public_key): + """Create the verifier to use when the key type is RSA-PSS. + + :param signature: the decoded signature to use + :param hash_method: the hash method to use, as a cryptography object + :param public_key: the public key to use, as a cryptography object + :raises: SignatureVerificationError if the RSA-PSS specific properties + are invalid + :returns: the verifier to use to verify the signature for RSA-PSS + """ + # default to MGF1 + mgf = padding.MGF1(hash_method) + + # default to max salt length + salt_length = padding.PSS.MAX_LENGTH + + # return the verifier + return public_key.verifier( + signature, + padding.PSS(mgf=mgf, salt_length=salt_length), + hash_method + ) + + +def create_verifier_for_ecc(signature, hash_method, public_key): + """Create the verifier to use when the key type is ECC_*. + + :param signature: the decoded signature to use + :param hash_method: the hash method to use, as a cryptography object + :param public_key: the public key to use, as a cryptography object + :returns: the verifier to use to verify the signature for ECC_*. + """ + # return the verifier + return public_key.verifier( + signature, + ec.ECDSA(hash_method) + ) + + +def create_verifier_for_dsa(signature, hash_method, public_key): + """Create the verifier to use when the key type is DSA + + :param signature: the decoded signature to use + :param hash_method: the hash method to use, as a cryptography object + :param public_key: the public key to use, as a cryptography object + :returns: the verifier to use to verify the signature for DSA + """ + # return the verifier + return public_key.verifier( + signature, + hash_method + ) + + +SignatureKeyType.register(RSA_PSS, rsa.RSAPublicKey, create_verifier_for_pss) +SignatureKeyType.register(DSA, dsa.DSAPublicKey, create_verifier_for_dsa) + +# Register the elliptic curves which are supported by the backend +for curve in ECC_CURVES: + if default_backend().elliptic_curve_supported(curve): + SignatureKeyType.register('ECC_' + curve.name.upper(), + ec.EllipticCurvePublicKey, + create_verifier_for_ecc) + + +def get_verifier(context, img_signature_certificate_uuid, + img_signature_hash_method, img_signature, + img_signature_key_type): + """Instantiate signature properties and use them to create a verifier. + + :param context: the user context for authentication + :param img_signature_certificate_uuid: + uuid of signing certificate stored in key manager + :param img_signature_hash_method: + string denoting hash method used to compute signature + :param img_signature: string of base64 encoding of signature + :param img_signature_key_type: + string denoting type of keypair used to compute signature + :returns: instance of + cryptography.hazmat.primitives.asymmetric.AsymmetricVerificationContext + :raises: SignatureVerificationError if we fail to build the verifier + """ + image_meta_props = {'img_signature_uuid': img_signature_certificate_uuid, + 'img_signature_hash_method': img_signature_hash_method, + 'img_signature': img_signature, + 'img_signature_key_type': img_signature_key_type} + for key in image_meta_props.keys(): + if image_meta_props[key] is None: + raise exception.SignatureVerificationError( + reason=_('Required image properties for signature verification' + ' do not exist. Cannot verify signature. Missing' + ' property: %s') % key) + + signature = get_signature(img_signature) + hash_method = get_hash_method(img_signature_hash_method) + signature_key_type = SignatureKeyType.lookup(img_signature_key_type) + public_key = get_public_key(context, + img_signature_certificate_uuid, + signature_key_type) + + # create the verifier based on the signature key type + verifier = signature_key_type.create_verifier(signature, + hash_method, + public_key) + if verifier: + return verifier + else: + # Error creating the verifier + raise exception.SignatureVerificationError( + reason=_('Error occurred while creating the verifier')) + + +def get_signature(signature_data): + """Decode the signature data and returns the signature. + + :param signature_data: the base64-encoded signature data + :returns: the decoded signature + :raises: SignatureVerificationError if the signature data is malformatted + """ + try: + signature = base64.decode_as_bytes(signature_data) + except (TypeError, binascii.Error): + raise exception.SignatureVerificationError( + reason=_('The signature data was not properly ' + 'encoded using base64')) + + return signature + + +def get_hash_method(hash_method_name): + """Verify the hash method name and create the hash method. + + :param hash_method_name: the name of the hash method to retrieve + :returns: the hash method, a cryptography object + :raises: SignatureVerificationError if the hash method name is invalid + """ + if hash_method_name not in HASH_METHODS: + raise exception.SignatureVerificationError( + reason=_('Invalid signature hash method: %s') % hash_method_name) + + return HASH_METHODS[hash_method_name] + + +def get_public_key(context, signature_certificate_uuid, signature_key_type): + """Create the public key object from a retrieved certificate. + + :param context: the user context for authentication + :param signature_certificate_uuid: the uuid to use to retrieve the + certificate + :param signature_key_type: a SignatureKeyType object + :returns: the public key cryptography object + :raises: SignatureVerificationError if public key format is invalid + """ + certificate = get_certificate(context, signature_certificate_uuid) + + # Note that this public key could either be + # RSAPublicKey, DSAPublicKey, or EllipticCurvePublicKey + public_key = certificate.public_key() + + # Confirm the type is of the type expected based on the signature key type + if not isinstance(public_key, signature_key_type.public_key_type): + raise exception.SignatureVerificationError( + reason=_('Invalid public key type for signature key type: %s') + % signature_key_type.name) + + return public_key + + +def get_certificate(context, signature_certificate_uuid): + """Create the certificate object from the retrieved certificate data. + + :param context: the user context for authentication + :param signature_certificate_uuid: the uuid to use to retrieve the + certificate + :returns: the certificate cryptography object + :raises: SignatureVerificationError if the retrieval fails or the format + is invalid + """ + keymgr_api = key_manager.API() + + try: + # The certificate retrieved here is a castellan certificate object + cert = keymgr_api.get(context, signature_certificate_uuid) + except KeyManagerError as e: + # The problem encountered may be backend-specific, since castellan + # can use different backends. Rather than importing all possible + # backends here, the generic "Exception" is used. + msg = (_LE("Unable to retrieve certificate with ID %(id)s: %(e)s") + % {'id': signature_certificate_uuid, + 'e': encodeutils.exception_to_unicode(e)}) + LOG.error(msg) + raise exception.SignatureVerificationError( + reason=_('Unable to retrieve certificate with ID: %s') + % signature_certificate_uuid) + + if cert.format not in CERTIFICATE_FORMATS: + raise exception.SignatureVerificationError( + reason=_('Invalid certificate format: %s') % cert.format) + + if cert.format == X_509: + # castellan always encodes certificates in DER format + cert_data = cert.get_encoded() + certificate = x509.load_der_x509_certificate(cert_data, + default_backend()) + + # verify the certificate + verify_certificate(certificate) + + return certificate + + +def verify_certificate(certificate): + """Verify that the certificate has not expired. + + :param certificate: the cryptography certificate object + :raises: SignatureVerificationError if the certificate valid time range + does not include now + """ + # Get now in UTC, since certificate returns times in UTC + now = timeutils.utcnow() + + # Confirm the certificate valid time range includes now + if now < certificate.not_valid_before: + raise exception.SignatureVerificationError( + reason=_('Certificate is not valid before: %s UTC') + % certificate.not_valid_before) + elif now > certificate.not_valid_after: + raise exception.SignatureVerificationError( + reason=_('Certificate is not valid after: %s UTC') + % certificate.not_valid_after) diff --git a/cursive/tests/test_cursive.py b/cursive/tests/test_cursive.py deleted file mode 100644 index 0786e7f..0000000 --- a/cursive/tests/test_cursive.py +++ /dev/null @@ -1,28 +0,0 @@ -# -*- coding: utf-8 -*- - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -test_cursive ----------------------------------- - -Tests for `cursive` module. -""" - -from cursive.tests import base - - -class TestCursive(base.TestCase): - - def test_something(self): - pass diff --git a/cursive/tests/unit/__init__.py b/cursive/tests/unit/__init__.py new file mode 100644 index 0000000..a6fbbdb --- /dev/null +++ b/cursive/tests/unit/__init__.py @@ -0,0 +1,19 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +:mod:`cursive.tests.unit` -- Cursive Unittests +===================================================== + +.. automodule:: cursive.tests.unit + :platform: Unix +""" diff --git a/cursive/tests/unit/test_signature_utils.py b/cursive/tests/unit/test_signature_utils.py new file mode 100644 index 0000000..60b6020 --- /dev/null +++ b/cursive/tests/unit/test_signature_utils.py @@ -0,0 +1,347 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import base64 +import datetime + +from castellan.common.exception import KeyManagerError +import cryptography.exceptions as crypto_exceptions +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import dsa +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.asymmetric import rsa +import mock +from oslo_utils import timeutils + +from cursive import exception +from cursive import signature_utils +from cursive.tests import base + +TEST_RSA_PRIVATE_KEY = rsa.generate_private_key(public_exponent=3, + key_size=1024, + backend=default_backend()) + +# secp521r1 is assumed to be available on all supported platforms +TEST_ECC_PRIVATE_KEY = ec.generate_private_key(ec.SECP521R1(), + default_backend()) + +TEST_DSA_PRIVATE_KEY = dsa.generate_private_key(key_size=3072, + backend=default_backend()) + + +class FakeKeyManager(object): + + def __init__(self): + self.certs = {'invalid_format_cert': + FakeCastellanCertificate('A' * 256, 'BLAH'), + 'valid_format_cert': + FakeCastellanCertificate('A' * 256, 'X.509')} + + def get(self, context, cert_uuid): + cert = self.certs.get(cert_uuid) + + if cert is None: + raise KeyManagerError("No matching certificate found.") + + return cert + + +class FakeCastellanCertificate(object): + + def __init__(self, data, cert_format): + self.data = data + self.cert_format = cert_format + + @property + def format(self): + return self.cert_format + + def get_encoded(self): + return self.data + + +class FakeCryptoCertificate(object): + + def __init__(self, pub_key=TEST_RSA_PRIVATE_KEY.public_key(), + not_valid_before=(timeutils.utcnow() - + datetime.timedelta(hours=1)), + not_valid_after=(timeutils.utcnow() + + datetime.timedelta(hours=2))): + self.pub_key = pub_key + self.cert_not_valid_before = not_valid_before + self.cert_not_valid_after = not_valid_after + + def public_key(self): + return self.pub_key + + @property + def not_valid_before(self): + return self.cert_not_valid_before + + @property + def not_valid_after(self): + return self.cert_not_valid_after + + +class BadPublicKey(object): + + def verifier(self, signature, padding, hash_method): + return None + + +class TestSignatureUtils(base.TestCase): + """Test methods of signature_utils""" + + @mock.patch('cursive.signature_utils.get_public_key') + def test_verify_signature_PSS(self, mock_get_pub_key): + data = b'224626ae19824466f2a7f39ab7b80f7f' + mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() + for hash_name, hash_alg in signature_utils.HASH_METHODS.items(): + signer = TEST_RSA_PRIVATE_KEY.signer( + padding.PSS( + mgf=padding.MGF1(hash_alg), + salt_length=padding.PSS.MAX_LENGTH + ), + hash_alg + ) + signer.update(data) + signature = base64.b64encode(signer.finalize()) + img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693' + verifier = signature_utils.get_verifier(None, img_sig_cert_uuid, + hash_name, signature, + signature_utils.RSA_PSS) + verifier.update(data) + verifier.verify() + + @mock.patch('cursive.signature_utils.get_public_key') + def test_verify_signature_ECC(self, mock_get_pub_key): + data = b'224626ae19824466f2a7f39ab7b80f7f' + # test every ECC curve + for curve in signature_utils.ECC_CURVES: + key_type_name = 'ECC_' + curve.name.upper() + try: + signature_utils.SignatureKeyType.lookup(key_type_name) + except exception.SignatureVerificationError: + import warnings + warnings.warn("ECC curve '%s' not supported" % curve.name) + continue + + # Create a private key to use + private_key = ec.generate_private_key(curve, + default_backend()) + mock_get_pub_key.return_value = private_key.public_key() + for hash_name, hash_alg in signature_utils.HASH_METHODS.items(): + signer = private_key.signer( + ec.ECDSA(hash_alg) + ) + signer.update(data) + signature = base64.b64encode(signer.finalize()) + img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693' + verifier = signature_utils.get_verifier(None, + img_sig_cert_uuid, + hash_name, signature, + key_type_name) + verifier.update(data) + verifier.verify() + + @mock.patch('cursive.signature_utils.get_public_key') + def test_verify_signature_DSA(self, mock_get_pub_key): + data = b'224626ae19824466f2a7f39ab7b80f7f' + mock_get_pub_key.return_value = TEST_DSA_PRIVATE_KEY.public_key() + for hash_name, hash_alg in signature_utils.HASH_METHODS.items(): + signer = TEST_DSA_PRIVATE_KEY.signer( + hash_alg + ) + signer.update(data) + signature = base64.b64encode(signer.finalize()) + img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693' + verifier = signature_utils.get_verifier(None, img_sig_cert_uuid, + hash_name, signature, + signature_utils.DSA) + verifier.update(data) + verifier.verify() + + @mock.patch('cursive.signature_utils.get_public_key') + def test_verify_signature_bad_signature(self, mock_get_pub_key): + data = b'224626ae19824466f2a7f39ab7b80f7f' + mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() + img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693' + verifier = signature_utils.get_verifier(None, img_sig_cert_uuid, + 'SHA-256', 'BLAH', + signature_utils.RSA_PSS) + verifier.update(data) + self.assertRaises(crypto_exceptions.InvalidSignature, + verifier.verify) + + def test_get_verifier_invalid_image_props(self): + self.assertRaisesRegex(exception.SignatureVerificationError, + 'Required image properties for signature' + ' verification do not exist. Cannot verify' + ' signature. Missing property: .*', + signature_utils.get_verifier, + None, None, 'SHA-256', 'BLAH', + signature_utils.RSA_PSS) + + @mock.patch('cursive.signature_utils.get_public_key') + def test_verify_signature_bad_sig_key_type(self, mock_get_pub_key): + mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() + img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693' + self.assertRaisesRegex(exception.SignatureVerificationError, + 'Invalid signature key type: .*', + signature_utils.get_verifier, + None, img_sig_cert_uuid, 'SHA-256', + 'BLAH', 'BLAH') + + @mock.patch('cursive.signature_utils.get_public_key') + def test_get_verifier_none(self, mock_get_pub_key): + mock_get_pub_key.return_value = BadPublicKey() + img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693' + self.assertRaisesRegex(exception.SignatureVerificationError, + 'Error occurred while creating' + ' the verifier', + signature_utils.get_verifier, + None, img_sig_cert_uuid, 'SHA-256', + 'BLAH', signature_utils.RSA_PSS) + + def test_get_signature(self): + signature = b'A' * 256 + data = base64.b64encode(signature) + self.assertEqual(signature, + signature_utils.get_signature(data)) + + def test_get_signature_fail(self): + self.assertRaisesRegex(exception.SignatureVerificationError, + 'The signature data was not properly' + ' encoded using base64', + signature_utils.get_signature, '///') + + def test_get_hash_method(self): + hash_dict = signature_utils.HASH_METHODS + for hash_name in hash_dict.keys(): + hash_class = signature_utils.get_hash_method(hash_name).__class__ + self.assertIsInstance(hash_dict[hash_name], hash_class) + + def test_get_hash_method_fail(self): + self.assertRaisesRegex(exception.SignatureVerificationError, + 'Invalid signature hash method: .*', + signature_utils.get_hash_method, 'SHA-2') + + def test_signature_key_type_lookup(self): + for sig_format in [signature_utils.RSA_PSS, signature_utils.DSA]: + sig_key_type = signature_utils.SignatureKeyType.lookup(sig_format) + self.assertIsInstance(sig_key_type, + signature_utils.SignatureKeyType) + self.assertEqual(sig_format, sig_key_type.name) + + def test_signature_key_type_lookup_fail(self): + self.assertRaisesRegex(exception.SignatureVerificationError, + 'Invalid signature key type: .*', + signature_utils.SignatureKeyType.lookup, + 'RSB-PSS') + + @mock.patch('cursive.signature_utils.get_certificate') + def test_get_public_key_rsa(self, mock_get_cert): + fake_cert = FakeCryptoCertificate() + mock_get_cert.return_value = fake_cert + sig_key_type = signature_utils.SignatureKeyType.lookup( + signature_utils.RSA_PSS + ) + result_pub_key = signature_utils.get_public_key(None, None, + sig_key_type) + self.assertEqual(fake_cert.public_key(), result_pub_key) + + @mock.patch('cursive.signature_utils.get_certificate') + def test_get_public_key_ecc(self, mock_get_cert): + fake_cert = FakeCryptoCertificate(TEST_ECC_PRIVATE_KEY.public_key()) + mock_get_cert.return_value = fake_cert + sig_key_type = signature_utils.SignatureKeyType.lookup('ECC_SECP521R1') + result_pub_key = signature_utils.get_public_key(None, None, + sig_key_type) + self.assertEqual(fake_cert.public_key(), result_pub_key) + + @mock.patch('cursive.signature_utils.get_certificate') + def test_get_public_key_dsa(self, mock_get_cert): + fake_cert = FakeCryptoCertificate(TEST_DSA_PRIVATE_KEY.public_key()) + mock_get_cert.return_value = fake_cert + sig_key_type = signature_utils.SignatureKeyType.lookup( + signature_utils.DSA + ) + result_pub_key = signature_utils.get_public_key(None, None, + sig_key_type) + self.assertEqual(fake_cert.public_key(), result_pub_key) + + @mock.patch('cursive.signature_utils.get_certificate') + def test_get_public_key_invalid_key(self, mock_get_certificate): + bad_pub_key = 'A' * 256 + mock_get_certificate.return_value = FakeCryptoCertificate(bad_pub_key) + sig_key_type = signature_utils.SignatureKeyType.lookup( + signature_utils.RSA_PSS + ) + self.assertRaisesRegex(exception.SignatureVerificationError, + 'Invalid public key type for ' + 'signature key type: .*', + signature_utils.get_public_key, None, + None, sig_key_type) + + @mock.patch('cryptography.x509.load_der_x509_certificate') + @mock.patch('castellan.key_manager.API', return_value=FakeKeyManager()) + def test_get_certificate(self, mock_key_manager_API, mock_load_cert): + cert_uuid = 'valid_format_cert' + x509_cert = FakeCryptoCertificate() + mock_load_cert.return_value = x509_cert + self.assertEqual(x509_cert, + signature_utils.get_certificate(None, cert_uuid)) + + @mock.patch('cryptography.x509.load_der_x509_certificate') + @mock.patch('castellan.key_manager.API', return_value=FakeKeyManager()) + def test_get_expired_certificate(self, mock_key_manager_API, + mock_load_cert): + cert_uuid = 'valid_format_cert' + x509_cert = FakeCryptoCertificate( + not_valid_after=timeutils.utcnow() - + datetime.timedelta(hours=1)) + mock_load_cert.return_value = x509_cert + self.assertRaisesRegex(exception.SignatureVerificationError, + 'Certificate is not valid after: .*', + signature_utils.get_certificate, None, + cert_uuid) + + @mock.patch('cryptography.x509.load_der_x509_certificate') + @mock.patch('castellan.key_manager.API', return_value=FakeKeyManager()) + def test_get_not_yet_valid_certificate(self, mock_key_manager_API, + mock_load_cert): + cert_uuid = 'valid_format_cert' + x509_cert = FakeCryptoCertificate( + not_valid_before=timeutils.utcnow() + + datetime.timedelta(hours=1)) + mock_load_cert.return_value = x509_cert + self.assertRaisesRegex(exception.SignatureVerificationError, + 'Certificate is not valid before: .*', + signature_utils.get_certificate, None, + cert_uuid) + + @mock.patch('castellan.key_manager.API', return_value=FakeKeyManager()) + def test_get_certificate_key_manager_fail(self, mock_key_manager_API): + bad_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0695' + self.assertRaisesRegex(exception.SignatureVerificationError, + 'Unable to retrieve certificate with ID: .*', + signature_utils.get_certificate, None, + bad_cert_uuid) + + @mock.patch('castellan.key_manager.API', return_value=FakeKeyManager()) + def test_get_certificate_invalid_format(self, mock_API): + cert_uuid = 'invalid_format_cert' + self.assertRaisesRegex(exception.SignatureVerificationError, + 'Invalid certificate format: .*', + signature_utils.get_certificate, None, + cert_uuid) diff --git a/requirements.txt b/requirements.txt index 95d0fe8..70ddcd1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,11 @@ # process, which may cause wedges in the gate later. pbr>=1.6 # Apache-2.0 +lxml>=2.3 # BSD +cryptography!=1.3.0,>=1.0 # BSD/Apache-2.0 +netifaces>=0.10.4 # MIT +six>=1.9.0 # MIT +oslo.serialization>=1.10.0 # Apache-2.0 +oslo.utils>=3.16.0 # Apache-2.0 +oslo.i18n>=2.1.0 # Apache-2.0 +castellan>=0.4.0 # Apache-2.0 diff --git a/tox.ini b/tox.ini index 5894cef..4cfa4e6 100644 --- a/tox.ini +++ b/tox.ini @@ -59,6 +59,6 @@ commands = oslo_debug_helper {posargs} # E123, E125 skipped as they are invalid PEP-8. show-source = True -ignore = E123,E125 +ignore = E123,E125,H301 builtins = _ exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build