From afdf5b4a8cca33dbe746095d9442b958c5fa9a24 Mon Sep 17 00:00:00 2001 From: Paul Kehrer Date: Wed, 11 Jan 2017 13:12:52 -0800 Subject: [PATCH] Swap pycrypto* for pyca/cryptography pyOpenSSL is already a dependency and pyOpenSSL uses cryptography. This also reduces the complexity of the code significantly in several places (and removes the need to directly manipulate asn1). A future PR could remove pyOpenSSL entirely as all the cert behavior is supported directly by cryptography. --- setup.py | 2 +- src/saml2/cert.py | 37 ++++++------------- src/saml2/entity.py | 3 -- src/saml2/sigver.py | 86 ++++++++++++++++----------------------------- 4 files changed, 42 insertions(+), 86 deletions(-) diff --git a/setup.py b/setup.py index de96cae..b29c31c 100755 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ install_requires = [ 'paste', 'zope.interface', 'repoze.who', - 'pycryptodomex', + 'cryptography', 'pytz', 'pyOpenSSL', 'python-dateutil', diff --git a/src/saml2/cert.py b/src/saml2/cert.py index f71fd8e..f9f97a6 100644 --- a/src/saml2/cert.py +++ b/src/saml2/cert.py @@ -8,7 +8,11 @@ import six from OpenSSL import crypto from os.path import join from os import remove -from Cryptodome.Util import asn1 + +from cryptography.hazmat.backends import default_backend +from cryptography.x509 import load_pem_x509_certificate + +backend = default_backend() class WrongInput(Exception): pass @@ -194,9 +198,8 @@ class OpenSSLWrapper(object): f.close() def read_str_from_file(self, file, type="pem"): - f = open(file, 'rt') - str_data = f.read() - f.close() + with open(file, 'rb') as f: + str_data = f.read() if type == "pem": return str_data @@ -336,31 +339,13 @@ class OpenSSLWrapper(object): cert_algorithm = cert.get_signature_algorithm() if six.PY3: cert_algorithm = cert_algorithm.decode('ascii') + cert_str = cert_str.encode('ascii') - cert_asn1 = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) - - der_seq = asn1.DerSequence() - der_seq.decode(cert_asn1) - - cert_certificate = der_seq[0] - #cert_signature_algorithm=der_seq[1] - cert_signature = der_seq[2] - - cert_signature_decoded = asn1.DerObject() - cert_signature_decoded.decode(cert_signature) - - signature_payload = cert_signature_decoded.payload - - sig_pay0 = signature_payload[0] - if ((isinstance(sig_pay0, int) and sig_pay0 != 0) or - (isinstance(sig_pay0, str) and sig_pay0 != '\x00')): - return (False, - "The certificate should not contain any unused bits.") - - signature = signature_payload[1:] + cert_crypto = load_pem_x509_certificate(cert_str, backend) try: - crypto.verify(ca_cert, signature, cert_certificate, + crypto.verify(ca_cert, cert_crypto.signature, + cert_crypto.tbs_certificate_bytes, cert_algorithm) return True, "Signed certificate is valid and correctly signed by CA certificate." except crypto.Error as e: diff --git a/src/saml2/entity.py b/src/saml2/entity.py index c6b287f..b24c621 100644 --- a/src/saml2/entity.py +++ b/src/saml2/entity.py @@ -7,9 +7,6 @@ import six from binascii import hexlify from hashlib import sha1 -# from Crypto.PublicKey import RSA -from Cryptodome.PublicKey import RSA - from saml2.metadata import ENDPOINTS from saml2.profile import paos, ecp from saml2.soap import parse_soap_enveloped_saml_artifact_resolve diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index 095a79c..3d80fb7 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -19,25 +19,13 @@ from binascii import hexlify from future.backports.urllib.parse import urlencode -# from Crypto.PublicKey.RSA import importKey -# from Crypto.Signature import PKCS1_v1_5 -# from Crypto.Util.asn1 import DerSequence -# from Crypto.PublicKey import RSA -# from Crypto.Hash import SHA -# from Crypto.Hash import SHA224 -# from Crypto.Hash import SHA256 -# from Crypto.Hash import SHA384 -# from Crypto.Hash import SHA512 - -from Cryptodome.PublicKey.RSA import importKey -from Cryptodome.Signature import PKCS1_v1_5 -from Cryptodome.Util.asn1 import DerSequence -from Cryptodome.PublicKey import RSA -from Cryptodome.Hash import SHA -from Cryptodome.Hash import SHA224 -from Cryptodome.Hash import SHA256 -from Cryptodome.Hash import SHA384 -from Cryptodome.Hash import SHA512 +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 +from cryptography.hazmat.primitives.serialization import load_pem_private_key +from cryptography.x509 import load_pem_x509_certificate from tempfile import NamedTemporaryFile from subprocess import Popen @@ -87,6 +75,8 @@ XMLTAG = "" PREFIX1 = "" PREFIX2 = '' +backend = default_backend() + class SigverError(SAMLError): pass @@ -406,18 +396,10 @@ def active_cert(key): """ try: cert_str = pem_format(key) - try: - certificate = importKey(cert_str) - not_before = to_time(str(certificate.get_not_before())) - not_after = to_time(str(certificate.get_not_after())) - assert not_before < utc_now() - assert not_after > utc_now() - return True - except: - cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str) - assert cert.has_expired() == 0 - assert not OpenSSLWrapper().certificate_not_valid_yet(cert) - return True + cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str) + assert cert.has_expired() == 0 + assert not OpenSSLWrapper().certificate_not_valid_yet(cert) + return True except AssertionError: return False except AttributeError: @@ -555,19 +537,8 @@ def rsa_eq(key1, key2): def extract_rsa_key_from_x509_cert(pem): - # Convert from PEM to DER - der = ssl.PEM_cert_to_DER_cert(pem.decode('ascii')) - - # Extract subjectPublicKeyInfo field from X.509 certificate (see RFC3280) - cert = DerSequence() - cert.decode(der) - tbsCertificate = DerSequence() - tbsCertificate.decode(cert[0]) - subjectPublicKeyInfo = tbsCertificate[6] - - # Initialize RSA key - rsa_key = RSA.importKey(subjectPublicKeyInfo) - return rsa_key + cert = load_pem_x509_certificate(pem, backend) + return cert.public_key() def pem_format(key): @@ -576,7 +547,7 @@ def pem_format(key): def import_rsa_key_from_file(filename): - return RSA.importKey(read_file(filename, 'r')) + return load_pem_private_key(read_file(filename, 'rb'), None, backend) def parse_xmlsec_output(output): @@ -622,25 +593,28 @@ class RSASigner(Signer): if key is None: key = self.key - h = self.digest.new(msg) - signer = PKCS1_v1_5.new(key) - return signer.sign(h) + return key.sign(msg, PKCS1v15(), self.digest) def verify(self, msg, sig, key=None): if key is None: key = self.key - h = self.digest.new(msg) - verifier = PKCS1_v1_5.new(key) - return verifier.verify(h, sig) + try: + if isinstance(key, rsa.RSAPrivateKey): + key = key.public_key() + + key.verify(sig, msg, PKCS1v15(), self.digest) + return True + except InvalidSignature: + return False SIGNER_ALGS = { - SIG_RSA_SHA1: RSASigner(SHA), - SIG_RSA_SHA224: RSASigner(SHA224), - SIG_RSA_SHA256: RSASigner(SHA256), - SIG_RSA_SHA384: RSASigner(SHA384), - SIG_RSA_SHA512: RSASigner(SHA512), + SIG_RSA_SHA1: RSASigner(hashes.SHA1()), + SIG_RSA_SHA224: RSASigner(hashes.SHA224()), + SIG_RSA_SHA256: RSASigner(hashes.SHA256()), + SIG_RSA_SHA384: RSASigner(hashes.SHA384()), + SIG_RSA_SHA512: RSASigner(hashes.SHA512()), } REQ_ORDER = ["SAMLRequest", "RelayState", "SigAlg"]