Use cursive for signature verification

This change removes the signature_utils module
from Nova and uses the cursive library, which
contains an identical module.

Change-Id: I8179282a9d19f829aca0b5bd2775d855b3364c86
Depends-On: I7e5797661fee258bc0270b5f109704b591633519
Implements: blueprint signature-code-cleanup
Partial-Bug: #1528349
This commit is contained in:
dane-fichter 2017-03-14 09:23:44 -07:00
parent c844482a09
commit d17e701ddb
12 changed files with 49 additions and 752 deletions

View File

@ -35,6 +35,7 @@ import time
import traceback
from cinderclient import exceptions as cinder_exception
from cursive import exception as cursive_exception
import eventlet.event
from eventlet import greenthread
import eventlet.semaphore
@ -1990,7 +1991,7 @@ class ComputeManager(manager.Manager):
exception.ImageUnacceptable,
exception.InvalidDiskInfo,
exception.InvalidDiskFormat,
exception.SignatureVerificationError,
cursive_exception.SignatureVerificationError,
exception.VolumeEncryptionNotSupported,
exception.InvalidInput) as e:
self._notify_about_instance_usage(context, instance,

View File

@ -1634,11 +1634,6 @@ class ImageDownloadModuleConfigurationError(ImageDownloadModuleError):
msg_fmt = _("The module %(module)s is misconfigured: %(reason)s.")
class SignatureVerificationError(NovaException):
msg_fmt = _("Signature verification for the image "
"failed: %(reason)s.")
class ResourceMonitorError(NovaException):
msg_fmt = _("Error when creating resource monitor: %(monitor)s")

View File

@ -26,6 +26,8 @@ import sys
import time
import cryptography
from cursive import exception as cursive_exception
from cursive import signature_utils
import glanceclient
import glanceclient.exc
from glanceclient.v2 import schemas
@ -44,7 +46,6 @@ from nova.i18n import _LE, _LI, _LW
import nova.image.download as image_xfers
from nova import objects
from nova.objects import fields
from nova import signature_utils
LOG = logging.getLogger(__name__)
CONF = nova.conf.CONF
@ -314,12 +315,14 @@ class GlanceImageServiceV2(object):
'img_signature_key_type'
)
try:
verifier = signature_utils.get_verifier(context,
img_sig_cert_uuid,
img_sig_hash_method,
img_signature,
img_sig_key_type)
except exception.SignatureVerificationError:
verifier = signature_utils.get_verifier(
context=context,
img_signature_certificate_uuid=img_sig_cert_uuid,
img_signature_hash_method=img_sig_hash_method,
img_signature=img_signature,
img_signature_key_type=img_sig_key_type,
)
except cursive_exception.SignatureVerificationError:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Image signature verification failed '
'for image: %s'), image_id)

View File

@ -15,6 +15,7 @@
import os
import re
from cursive import signature_utils
from oslo_versionedobjects import fields
import six
@ -420,14 +421,14 @@ class HVType(BaseNovaEnum):
class ImageSignatureHashType(BaseNovaEnum):
# Represents the possible hash methods used for image signing
ALL = ('SHA-224', 'SHA-256', 'SHA-384', 'SHA-512')
ALL = tuple(sorted(signature_utils.HASH_METHODS.keys()))
class ImageSignatureKeyType(BaseNovaEnum):
# Represents the possible keypair types used for image signing
ALL = ('DSA', 'ECC_SECT571K1', 'ECC_SECT409K1', 'ECC_SECT571R1',
'ECC_SECT409R1', 'ECC_SECP521R1', 'ECC_SECP384R1', 'RSA-PSS'
)
ALL = tuple(
sorted(signature_utils.SignatureKeyType.REGISTERED_TYPES.keys())
)
class OSType(BaseNovaEnum):

View File

@ -166,7 +166,8 @@ class ImageMetaProps(base.NovaObject):
# Version 1.15: Added hw_rescue_bus and hw_rescue_device.
# Version 1.16: WatchdogActionField supports 'disabled' enum.
# Version 1.17: Add lan9118 as valid nic for hw_vif_model property for qemu
VERSION = '1.17'
# Version 1.18: Pull signature properties from cursive library
VERSION = '1.18'
def obj_make_compatible(self, primitive, target_version):
super(ImageMetaProps, self).obj_make_compatible(primitive,

View File

@ -1,342 +0,0 @@
# Copyright (c) The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Support signature verification."""
import binascii
from castellan.common.exception import KeyManagerError
from castellan import key_manager
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography import x509
from oslo_log import log as logging
from oslo_serialization import base64
from oslo_utils import encodeutils
from oslo_utils import timeutils
from nova import exception
from nova.i18n import _, _LE
LOG = logging.getLogger(__name__)
HASH_METHODS = {
'SHA-224': hashes.SHA224(),
'SHA-256': hashes.SHA256(),
'SHA-384': hashes.SHA384(),
'SHA-512': hashes.SHA512(),
}
# Currently supported signature key types
# RSA Options
RSA_PSS = 'RSA-PSS'
# DSA Options
DSA = 'DSA'
# ECC curves -- note that only those with key sizes >=384 are included
# Note also that some of these may not be supported by the cryptography backend
ECC_CURVES = (
ec.SECT571K1(),
ec.SECT409K1(),
ec.SECT571R1(),
ec.SECT409R1(),
ec.SECP521R1(),
ec.SECP384R1(),
)
# These are the currently supported certificate formats
X_509 = 'X.509'
CERTIFICATE_FORMATS = {
X_509,
}
# These are the currently supported MGF formats, used for RSA-PSS signatures
MASK_GEN_ALGORITHMS = {
'MGF1': padding.MGF1,
}
class SignatureKeyType(object):
_REGISTERED_TYPES = {}
def __init__(self, name, public_key_type, create_verifier):
self.name = name
self.public_key_type = public_key_type
self.create_verifier = create_verifier
@classmethod
def register(cls, name, public_key_type, create_verifier):
"""Register a signature key type.
:param name: the name of the signature key type
:param public_key_type: e.g. RSAPublicKey, DSAPublicKey, etc.
:param create_verifier: a function to create a verifier for this type
"""
cls._REGISTERED_TYPES[name] = cls(name,
public_key_type,
create_verifier)
@classmethod
def lookup(cls, name):
"""Look up the signature key type.
:param name: the name of the signature key type
:returns: the SignatureKeyType object
:raises: SignatureVerificationError if signature key type is invalid
"""
if name not in cls._REGISTERED_TYPES:
raise exception.SignatureVerificationError(
reason=_('Invalid signature key type: %s') % name)
return cls._REGISTERED_TYPES[name]
# each key type will require its own verifier
def create_verifier_for_pss(signature, hash_method, public_key):
"""Create the verifier to use when the key type is RSA-PSS.
:param signature: the decoded signature to use
:param hash_method: the hash method to use, as a cryptography object
:param public_key: the public key to use, as a cryptography object
:raises: SignatureVerificationError if the RSA-PSS specific properties
are invalid
:returns: the verifier to use to verify the signature for RSA-PSS
"""
# default to MGF1
mgf = padding.MGF1(hash_method)
# default to max salt length
salt_length = padding.PSS.MAX_LENGTH
# return the verifier
return public_key.verifier(
signature,
padding.PSS(mgf=mgf, salt_length=salt_length),
hash_method
)
def create_verifier_for_ecc(signature, hash_method, public_key):
"""Create the verifier to use when the key type is ECC_*.
:param signature: the decoded signature to use
:param hash_method: the hash method to use, as a cryptography object
:param public_key: the public key to use, as a cryptography object
:returns: the verifier to use to verify the signature for ECC_*.
"""
# return the verifier
return public_key.verifier(
signature,
ec.ECDSA(hash_method)
)
def create_verifier_for_dsa(signature, hash_method, public_key):
"""Create the verifier to use when the key type is DSA
:param signature: the decoded signature to use
:param hash_method: the hash method to use, as a cryptography object
:param public_key: the public key to use, as a cryptography object
:returns: the verifier to use to verify the signature for DSA
"""
# return the verifier
return public_key.verifier(
signature,
hash_method
)
SignatureKeyType.register(RSA_PSS, rsa.RSAPublicKey, create_verifier_for_pss)
SignatureKeyType.register(DSA, dsa.DSAPublicKey, create_verifier_for_dsa)
# Register the elliptic curves which are supported by the backend
for curve in ECC_CURVES:
if default_backend().elliptic_curve_supported(curve):
SignatureKeyType.register('ECC_' + curve.name.upper(),
ec.EllipticCurvePublicKey,
create_verifier_for_ecc)
def get_verifier(context, img_signature_certificate_uuid,
img_signature_hash_method, img_signature,
img_signature_key_type):
"""Instantiate signature properties and use them to create a verifier.
:param context: the user context for authentication
:param img_signature_certificate_uuid:
uuid of signing certificate stored in key manager
:param img_signature_hash_method:
string denoting hash method used to compute signature
:param img_signature: string of base64 encoding of signature
:param img_signature_key_type:
string denoting type of keypair used to compute signature
:returns: instance of
cryptography.hazmat.primitives.asymmetric.AsymmetricVerificationContext
:raises: SignatureVerificationError if we fail to build the verifier
"""
image_meta_props = {'img_signature_uuid': img_signature_certificate_uuid,
'img_signature_hash_method': img_signature_hash_method,
'img_signature': img_signature,
'img_signature_key_type': img_signature_key_type}
for key in image_meta_props.keys():
if image_meta_props[key] is None:
raise exception.SignatureVerificationError(
reason=_('Required image properties for signature verification'
' do not exist. Cannot verify signature. Missing'
' property: %s') % key)
signature = get_signature(img_signature)
hash_method = get_hash_method(img_signature_hash_method)
signature_key_type = SignatureKeyType.lookup(img_signature_key_type)
public_key = get_public_key(context,
img_signature_certificate_uuid,
signature_key_type)
# create the verifier based on the signature key type
verifier = signature_key_type.create_verifier(signature,
hash_method,
public_key)
if verifier:
return verifier
else:
# Error creating the verifier
raise exception.SignatureVerificationError(
reason=_('Error occurred while creating the verifier'))
def get_signature(signature_data):
"""Decode the signature data and returns the signature.
:param signature_data: the base64-encoded signature data
:returns: the decoded signature
:raises: SignatureVerificationError if the signature data is malformatted
"""
try:
signature = base64.decode_as_bytes(signature_data)
except (TypeError, binascii.Error):
raise exception.SignatureVerificationError(
reason=_('The signature data was not properly '
'encoded using base64'))
return signature
def get_hash_method(hash_method_name):
"""Verify the hash method name and create the hash method.
:param hash_method_name: the name of the hash method to retrieve
:returns: the hash method, a cryptography object
:raises: SignatureVerificationError if the hash method name is invalid
"""
if hash_method_name not in HASH_METHODS:
raise exception.SignatureVerificationError(
reason=_('Invalid signature hash method: %s') % hash_method_name)
return HASH_METHODS[hash_method_name]
def get_public_key(context, signature_certificate_uuid, signature_key_type):
"""Create the public key object from a retrieved certificate.
:param context: the user context for authentication
:param signature_certificate_uuid: the uuid to use to retrieve the
certificate
:param signature_key_type: a SignatureKeyType object
:returns: the public key cryptography object
:raises: SignatureVerificationError if public key format is invalid
"""
certificate = get_certificate(context, signature_certificate_uuid)
# Note that this public key could either be
# RSAPublicKey, DSAPublicKey, or EllipticCurvePublicKey
public_key = certificate.public_key()
# Confirm the type is of the type expected based on the signature key type
if not isinstance(public_key, signature_key_type.public_key_type):
raise exception.SignatureVerificationError(
reason=_('Invalid public key type for signature key type: %s')
% signature_key_type.name)
return public_key
def get_certificate(context, signature_certificate_uuid):
"""Create the certificate object from the retrieved certificate data.
:param context: the user context for authentication
:param signature_certificate_uuid: the uuid to use to retrieve the
certificate
:returns: the certificate cryptography object
:raises: SignatureVerificationError if the retrieval fails or the format
is invalid
"""
keymgr_api = key_manager.API()
try:
# The certificate retrieved here is a castellan certificate object
cert = keymgr_api.get(context, signature_certificate_uuid)
except KeyManagerError as e:
# The problem encountered may be backend-specific, since castellan
# can use different backends. Rather than importing all possible
# backends here, the generic "Exception" is used.
msg = (_LE("Unable to retrieve certificate with ID %(id)s: %(e)s")
% {'id': signature_certificate_uuid,
'e': encodeutils.exception_to_unicode(e)})
LOG.error(msg)
raise exception.SignatureVerificationError(
reason=_('Unable to retrieve certificate with ID: %s')
% signature_certificate_uuid)
if cert.format not in CERTIFICATE_FORMATS:
raise exception.SignatureVerificationError(
reason=_('Invalid certificate format: %s') % cert.format)
if cert.format == X_509:
# castellan always encodes certificates in DER format
cert_data = cert.get_encoded()
certificate = x509.load_der_x509_certificate(cert_data,
default_backend())
# verify the certificate
verify_certificate(certificate)
return certificate
def verify_certificate(certificate):
"""Verify that the certificate has not expired.
:param certificate: the cryptography certificate object
:raises: SignatureVerificationError if the certificate valid time range
does not include now
"""
# Get now in UTC, since certificate returns times in UTC
now = timeutils.utcnow()
# Confirm the certificate valid time range includes now
if now < certificate.not_valid_before:
raise exception.SignatureVerificationError(
reason=_('Certificate is not valid before: %s UTC')
% certificate.not_valid_before)
elif now > certificate.not_valid_after:
raise exception.SignatureVerificationError(
reason=_('Certificate is not valid after: %s UTC')
% certificate.not_valid_after)

View File

@ -17,6 +17,7 @@ import datetime
import time
from cinderclient import exceptions as cinder_exception
from cursive import exception as cursive_exception
from eventlet import event as eventlet_event
import mock
import netaddr
@ -4129,7 +4130,7 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
def test_build_and_run_signature_verification_error(self):
self._test_build_and_run_spawn_exceptions(
exception.SignatureVerificationError(reason=""))
cursive_exception.SignatureVerificationError(reason=""))
def test_build_and_run_volume_encryption_not_supported(self):
self._test_build_and_run_spawn_exceptions(

View File

@ -18,6 +18,7 @@ import copy
import datetime
import cryptography
from cursive import exception as cursive_exception
import glanceclient.exc
from glanceclient.v1 import images
import glanceclient.v2.schemas as schemas
@ -815,7 +816,7 @@ class TestDownloadSignatureVerification(test.NoDBTestCase):
@mock.patch('nova.image.glance.LOG')
@mock.patch('nova.image.glance.GlanceImageServiceV2.show')
@mock.patch('nova.signature_utils.get_verifier')
@mock.patch('cursive.signature_utils.get_verifier')
def test_download_with_signature_verification_v2(self,
mock_get_verifier,
mock_show,
@ -826,16 +827,19 @@ class TestDownloadSignatureVerification(test.NoDBTestCase):
res = service.download(context=None, image_id=None,
data=None, dst_path=None)
self.assertEqual(self.fake_img_data, res)
mock_get_verifier.assert_called_once_with(None,
uuids.img_sig_cert_uuid,
'SHA-224',
'signature', 'RSA-PSS')
mock_get_verifier.assert_called_once_with(
context=None,
img_signature_certificate_uuid=uuids.img_sig_cert_uuid,
img_signature_hash_method='SHA-224',
img_signature='signature',
img_signature_key_type='RSA-PSS'
)
mock_log.info.assert_called_once_with(mock.ANY, mock.ANY)
@mock.patch.object(six.moves.builtins, 'open')
@mock.patch('nova.image.glance.LOG')
@mock.patch('nova.image.glance.GlanceImageServiceV2.show')
@mock.patch('nova.signature_utils.get_verifier')
@mock.patch('cursive.signature_utils.get_verifier')
@mock.patch('os.fsync')
def test_download_dst_path_signature_verification_v2(self,
mock_fsync,
@ -851,10 +855,13 @@ class TestDownloadSignatureVerification(test.NoDBTestCase):
mock_open.return_value = mock_dest
service.download(context=None, image_id=None,
data=None, dst_path=fake_path)
mock_get_verifier.assert_called_once_with(None,
uuids.img_sig_cert_uuid,
'SHA-224',
'signature', 'RSA-PSS')
mock_get_verifier.assert_called_once_with(
context=None,
img_signature_certificate_uuid=uuids.img_sig_cert_uuid,
img_signature_hash_method='SHA-224',
img_signature='signature',
img_signature_key_type='RSA-PSS'
)
mock_log.info.assert_called_once_with(mock.ANY, mock.ANY)
self.assertEqual(len(self.fake_img_data), mock_dest.write.call_count)
self.assertTrue(mock_dest.close.called)
@ -863,18 +870,17 @@ class TestDownloadSignatureVerification(test.NoDBTestCase):
@mock.patch('nova.image.glance.LOG')
@mock.patch('nova.image.glance.GlanceImageServiceV2.show')
@mock.patch('nova.signature_utils.get_verifier')
@mock.patch('cursive.signature_utils.get_verifier')
def test_download_with_get_verifier_failure_v2(self,
mock_get_verifier,
mock_get,
mock_show,
mock_log):
service = glance.GlanceImageServiceV2(self.client)
mock_get_verifier.side_effect = exception.SignatureVerificationError(
reason='Signature verification '
'failed.'
)
mock_get.side_effect = cursive_exception.SignatureVerificationError(
reason='Signature verification failed.'
)
mock_show.return_value = self.fake_img_props
self.assertRaises(exception.SignatureVerificationError,
self.assertRaises(cursive_exception.SignatureVerificationError,
service.download,
context=None, image_id=None,
data=None, dst_path=None)
@ -882,7 +888,7 @@ class TestDownloadSignatureVerification(test.NoDBTestCase):
@mock.patch('nova.image.glance.LOG')
@mock.patch('nova.image.glance.GlanceImageServiceV2.show')
@mock.patch('nova.signature_utils.get_verifier')
@mock.patch('cursive.signature_utils.get_verifier')
def test_download_with_invalid_signature_v2(self,
mock_get_verifier,
mock_show,
@ -903,7 +909,7 @@ class TestDownloadSignatureVerification(test.NoDBTestCase):
mock_log):
service = glance.GlanceImageServiceV2(self.client)
mock_show.return_value = {'properties': {}}
self.assertRaisesRegex(exception.SignatureVerificationError,
self.assertRaisesRegex(cursive_exception.SignatureVerificationError,
'Required image properties for signature '
'verification do not exist. Cannot verify '
'signature. Missing property: .*',
@ -912,7 +918,7 @@ class TestDownloadSignatureVerification(test.NoDBTestCase):
data=None, dst_path=None)
@mock.patch.object(six.moves.builtins, 'open')
@mock.patch('nova.signature_utils.get_verifier')
@mock.patch('cursive.signature_utils.get_verifier')
@mock.patch('nova.image.glance.LOG')
@mock.patch('nova.image.glance.GlanceImageServiceV2.show')
@mock.patch('os.fsync')

View File

@ -23,7 +23,6 @@ import six
from nova import exception
from nova.network import model as network_model
from nova.objects import fields
from nova import signature_utils
from nova import test
from nova.tests.unit import fake_instance
from nova import utils
@ -332,25 +331,6 @@ class TestVMMode(TestField):
'invalid')
class TestImageSignatureTypes(TestField):
# Ensure that the object definition is updated
# in step with the signature_utils module
def setUp(self):
super(TestImageSignatureTypes, self).setUp()
self.hash_field = fields.ImageSignatureHashType()
self.key_type_field = fields.ImageSignatureKeyType()
def test_hashes(self):
for hash_name in list(signature_utils.HASH_METHODS.keys()):
self.assertIn(hash_name, self.hash_field.ALL)
def test_key_types(self):
key_type_dict = signature_utils.SignatureKeyType._REGISTERED_TYPES
key_types = list(key_type_dict.keys())
for key_type in key_types:
self.assertIn(key_type, self.key_type_field.ALL)
class TestResourceClass(TestString):
def setUp(self):
super(TestResourceClass, self).setUp()

View File

@ -1096,7 +1096,7 @@ object_data = {
'HVSpec': '1.2-de06bcec472a2f04966b855a49c46b41',
'IDEDeviceBus': '1.0-29d4c9f27ac44197f01b6ac1b7e16502',
'ImageMeta': '1.8-642d1b2eb3e880a367f37d72dd76162d',
'ImageMetaProps': '1.17-54f21e339d153fe30be6b8999dd83148',
'ImageMetaProps': '1.18-3e5975251f5843e817de68ac83274c27',
'Instance': '2.3-4f98ab23f4b0a25fabb1040c8f5edecc',
'InstanceAction': '1.1-f9f293e526b66fca0d05c3b3a2d13914',
'InstanceActionEvent': '1.1-e56a64fa4710e43ef7af2ad9d6028b33',

View File

@ -1,350 +0,0 @@
# Copyright (c) The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import datetime
from castellan.common.exception import KeyManagerError
import cryptography.exceptions as crypto_exceptions
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import rsa
import mock
from oslo_utils import timeutils
from nova import exception
from nova import signature_utils
from nova import test
TEST_RSA_PRIVATE_KEY = rsa.generate_private_key(public_exponent=3,
key_size=1024,
backend=default_backend())
# secp521r1 is assumed to be available on all supported platforms
TEST_ECC_PRIVATE_KEY = ec.generate_private_key(ec.SECP521R1(),
default_backend())
TEST_DSA_PRIVATE_KEY = dsa.generate_private_key(key_size=3072,
backend=default_backend())
class FakeKeyManager(object):
def __init__(self):
self.certs = {'invalid_format_cert':
FakeCastellanCertificate('A' * 256, 'BLAH'),
'valid_format_cert':
FakeCastellanCertificate('A' * 256, 'X.509')}
def get(self, context, cert_uuid):
cert = self.certs.get(cert_uuid)
if cert is None:
raise KeyManagerError("No matching certificate found.")
return cert
class FakeCastellanCertificate(object):
def __init__(self, data, cert_format):
self.data = data
self.cert_format = cert_format
@property
def format(self):
return self.cert_format
def get_encoded(self):
return self.data
class FakeCryptoCertificate(object):
def __init__(self, pub_key=TEST_RSA_PRIVATE_KEY.public_key(),
not_valid_before=(timeutils.utcnow() -
datetime.timedelta(hours=1)),
not_valid_after=(timeutils.utcnow() +
datetime.timedelta(hours=2))):
self.pub_key = pub_key
self.cert_not_valid_before = not_valid_before
self.cert_not_valid_after = not_valid_after
def public_key(self):
return self.pub_key
@property
def not_valid_before(self):
return self.cert_not_valid_before
@property
def not_valid_after(self):
return self.cert_not_valid_after
class BadPublicKey(object):
def verifier(self, signature, padding, hash_method):
return None
class TestSignatureUtils(test.NoDBTestCase):
"""Test methods of signature_utils"""
@mock.patch('nova.signature_utils.get_public_key')
def test_verify_signature_PSS(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
signer = TEST_RSA_PRIVATE_KEY.signer(
padding.PSS(
mgf=padding.MGF1(hash_alg),
salt_length=padding.PSS.MAX_LENGTH
),
hash_alg
)
signer.update(data)
signature = base64.b64encode(signer.finalize())
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
verifier = signature_utils.get_verifier(None, img_sig_cert_uuid,
hash_name, signature,
signature_utils.RSA_PSS)
verifier.update(data)
verifier.verify()
@mock.patch('nova.signature_utils.get_public_key')
def test_verify_signature_ECC(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
# test every ECC curve
for curve in signature_utils.ECC_CURVES:
key_type_name = 'ECC_' + curve.name.upper()
try:
signature_utils.SignatureKeyType.lookup(key_type_name)
except exception.SignatureVerificationError:
import warnings
warnings.warn("ECC curve '%s' not supported" % curve.name)
continue
# Create a private key to use
private_key = ec.generate_private_key(curve,
default_backend())
mock_get_pub_key.return_value = private_key.public_key()
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
signer = private_key.signer(
ec.ECDSA(hash_alg)
)
signer.update(data)
signature = base64.b64encode(signer.finalize())
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
verifier = signature_utils.get_verifier(None,
img_sig_cert_uuid,
hash_name, signature,
key_type_name)
verifier.update(data)
verifier.verify()
@mock.patch('nova.signature_utils.get_public_key')
def test_verify_signature_DSA(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_DSA_PRIVATE_KEY.public_key()
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
signer = TEST_DSA_PRIVATE_KEY.signer(
hash_alg
)
signer.update(data)
signature = base64.b64encode(signer.finalize())
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
verifier = signature_utils.get_verifier(None, img_sig_cert_uuid,
hash_name, signature,
signature_utils.DSA)
verifier.update(data)
verifier.verify()
@mock.patch('nova.signature_utils.get_public_key')
def test_verify_signature_bad_signature(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
verifier = signature_utils.get_verifier(None, img_sig_cert_uuid,
'SHA-256', 'BLAH',
signature_utils.RSA_PSS)
verifier.update(data)
self.assertRaises(crypto_exceptions.InvalidSignature,
verifier.verify)
def test_get_verifier_invalid_image_props(self):
self.assertRaisesRegex(exception.SignatureVerificationError,
'Required image properties for signature'
' verification do not exist. Cannot verify'
' signature. Missing property: .*',
signature_utils.get_verifier,
None, None, 'SHA-256', 'BLAH',
signature_utils.RSA_PSS)
@mock.patch('nova.signature_utils.get_public_key')
def test_verify_signature_bad_sig_key_type(self, mock_get_pub_key):
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
self.assertRaisesRegex(exception.SignatureVerificationError,
'Invalid signature key type: .*',
signature_utils.get_verifier,
None, img_sig_cert_uuid, 'SHA-256',
'BLAH', 'BLAH')
@mock.patch('nova.signature_utils.get_public_key')
def test_get_verifier_none(self, mock_get_pub_key):
mock_get_pub_key.return_value = BadPublicKey()
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
self.assertRaisesRegex(exception.SignatureVerificationError,
'Error occurred while creating'
' the verifier',
signature_utils.get_verifier,
None, img_sig_cert_uuid, 'SHA-256',
'BLAH', signature_utils.RSA_PSS)
def test_get_signature(self):
signature = b'A' * 256
data = base64.b64encode(signature)
self.assertEqual(signature,
signature_utils.get_signature(data))
def test_get_signature_fail(self):
self.assertRaisesRegex(exception.SignatureVerificationError,
'The signature data was not properly'
' encoded using base64',
signature_utils.get_signature, '///')
def test_get_hash_method(self):
hash_dict = signature_utils.HASH_METHODS
for hash_name in hash_dict.keys():
hash_class = signature_utils.get_hash_method(hash_name).__class__
self.assertIsInstance(hash_dict[hash_name], hash_class)
def test_get_hash_method_fail(self):
self.assertRaisesRegex(exception.SignatureVerificationError,
'Invalid signature hash method: .*',
signature_utils.get_hash_method, 'SHA-2')
def test_signature_key_type_lookup(self):
for sig_format in [signature_utils.RSA_PSS, signature_utils.DSA]:
sig_key_type = signature_utils.SignatureKeyType.lookup(sig_format)
self.assertIsInstance(sig_key_type,
signature_utils.SignatureKeyType)
self.assertEqual(sig_format, sig_key_type.name)
def test_signature_key_type_lookup_fail(self):
self.assertRaisesRegex(exception.SignatureVerificationError,
'Invalid signature key type: .*',
signature_utils.SignatureKeyType.lookup,
'RSB-PSS')
@mock.patch('nova.signature_utils.get_certificate')
def test_get_public_key_rsa(self, mock_get_cert):
fake_cert = FakeCryptoCertificate()
mock_get_cert.return_value = fake_cert
sig_key_type = signature_utils.SignatureKeyType.lookup(
signature_utils.RSA_PSS
)
result_pub_key = signature_utils.get_public_key(None, None,
sig_key_type)
self.assertEqual(fake_cert.public_key(), result_pub_key)
@mock.patch('nova.signature_utils.get_certificate')
def test_get_public_key_ecc(self, mock_get_cert):
fake_cert = FakeCryptoCertificate(TEST_ECC_PRIVATE_KEY.public_key())
mock_get_cert.return_value = fake_cert
sig_key_type = signature_utils.SignatureKeyType.lookup('ECC_SECP521R1')
result_pub_key = signature_utils.get_public_key(None, None,
sig_key_type)
self.assertEqual(fake_cert.public_key(), result_pub_key)
@mock.patch('nova.signature_utils.get_certificate')
def test_get_public_key_dsa(self, mock_get_cert):
fake_cert = FakeCryptoCertificate(TEST_DSA_PRIVATE_KEY.public_key())
mock_get_cert.return_value = fake_cert
sig_key_type = signature_utils.SignatureKeyType.lookup(
signature_utils.DSA
)
result_pub_key = signature_utils.get_public_key(None, None,
sig_key_type)
self.assertEqual(fake_cert.public_key(), result_pub_key)
@mock.patch('nova.signature_utils.get_certificate')
def test_get_public_key_invalid_key(self, mock_get_certificate):
bad_pub_key = 'A' * 256
mock_get_certificate.return_value = FakeCryptoCertificate(bad_pub_key)
sig_key_type = signature_utils.SignatureKeyType.lookup(
signature_utils.RSA_PSS
)
self.assertRaisesRegex(exception.SignatureVerificationError,
'Invalid public key type for '
'signature key type: .*',
signature_utils.get_public_key, None,
None, sig_key_type)
@mock.patch('cryptography.x509.load_der_x509_certificate')
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_certificate(self, mock_key_manager_API, mock_load_cert):
cert_uuid = 'valid_format_cert'
x509_cert = FakeCryptoCertificate()
mock_load_cert.return_value = x509_cert
self.assertEqual(x509_cert,
signature_utils.get_certificate(None, cert_uuid))
@mock.patch('cryptography.x509.load_der_x509_certificate')
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_expired_certificate(self, mock_key_manager_API,
mock_load_cert):
cert_uuid = 'valid_format_cert'
x509_cert = FakeCryptoCertificate(
not_valid_after=timeutils.utcnow() -
datetime.timedelta(hours=1))
mock_load_cert.return_value = x509_cert
self.assertRaisesRegex(exception.SignatureVerificationError,
'Certificate is not valid after: .*',
signature_utils.get_certificate, None,
cert_uuid)
@mock.patch('cryptography.x509.load_der_x509_certificate')
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_not_yet_valid_certificate(self, mock_key_manager_API,
mock_load_cert):
cert_uuid = 'valid_format_cert'
x509_cert = FakeCryptoCertificate(
not_valid_before=timeutils.utcnow() +
datetime.timedelta(hours=1))
mock_load_cert.return_value = x509_cert
self.assertRaisesRegex(exception.SignatureVerificationError,
'Certificate is not valid before: .*',
signature_utils.get_certificate, None,
cert_uuid)
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_certificate_key_manager_fail(self, mock_key_manager_API):
bad_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0695'
self.assertRaisesRegex(exception.SignatureVerificationError,
'Unable to retrieve certificate with ID: .*',
signature_utils.get_certificate, None,
bad_cert_uuid)
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_certificate_invalid_format(self, mock_API):
cert_uuid = 'invalid_format_cert'
self.assertRaisesRegex(exception.SignatureVerificationError,
'Invalid certificate format: .*',
signature_utils.get_certificate, None,
cert_uuid)

View File

@ -60,3 +60,4 @@ castellan>=0.4.0 # Apache-2.0
microversion-parse>=0.1.2 # Apache-2.0
os-xenapi>=0.1.1 # Apache-2.0
tooz>=1.47.0 # Apache-2.0
cursive>=0.1.2 # Apache-2.0