Merge pull request #385 from sigmavirus24/move-to-cryptography

Swap pycrypto* for pyca/cryptography
This commit is contained in:
Roland Hedberg 2017-01-24 18:21:33 +01:00 committed by GitHub
commit a0c510af7a
4 changed files with 42 additions and 86 deletions

View File

@ -14,7 +14,7 @@ install_requires = [
'paste', 'paste',
'zope.interface', 'zope.interface',
'repoze.who', 'repoze.who',
'pycryptodomex', 'cryptography',
'pytz', 'pytz',
'pyOpenSSL', 'pyOpenSSL',
'python-dateutil', 'python-dateutil',

View File

@ -8,7 +8,11 @@ import six
from OpenSSL import crypto from OpenSSL import crypto
from os.path import join from os.path import join
from os import remove from os import remove
from Cryptodome.Util import asn1
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_pem_x509_certificate
backend = default_backend()
class WrongInput(Exception): class WrongInput(Exception):
pass pass
@ -194,9 +198,8 @@ class OpenSSLWrapper(object):
f.close() f.close()
def read_str_from_file(self, file, type="pem"): def read_str_from_file(self, file, type="pem"):
f = open(file, 'rt') with open(file, 'rb') as f:
str_data = f.read() str_data = f.read()
f.close()
if type == "pem": if type == "pem":
return str_data return str_data
@ -336,31 +339,13 @@ class OpenSSLWrapper(object):
cert_algorithm = cert.get_signature_algorithm() cert_algorithm = cert.get_signature_algorithm()
if six.PY3: if six.PY3:
cert_algorithm = cert_algorithm.decode('ascii') cert_algorithm = cert_algorithm.decode('ascii')
cert_str = cert_str.encode('ascii')
cert_asn1 = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) cert_crypto = load_pem_x509_certificate(cert_str, backend)
der_seq = asn1.DerSequence()
der_seq.decode(cert_asn1)
cert_certificate = der_seq[0]
#cert_signature_algorithm=der_seq[1]
cert_signature = der_seq[2]
cert_signature_decoded = asn1.DerObject()
cert_signature_decoded.decode(cert_signature)
signature_payload = cert_signature_decoded.payload
sig_pay0 = signature_payload[0]
if ((isinstance(sig_pay0, int) and sig_pay0 != 0) or
(isinstance(sig_pay0, str) and sig_pay0 != '\x00')):
return (False,
"The certificate should not contain any unused bits.")
signature = signature_payload[1:]
try: try:
crypto.verify(ca_cert, signature, cert_certificate, crypto.verify(ca_cert, cert_crypto.signature,
cert_crypto.tbs_certificate_bytes,
cert_algorithm) cert_algorithm)
return True, "Signed certificate is valid and correctly signed by CA certificate." return True, "Signed certificate is valid and correctly signed by CA certificate."
except crypto.Error as e: except crypto.Error as e:

View File

@ -7,9 +7,6 @@ import six
from binascii import hexlify from binascii import hexlify
from hashlib import sha1 from hashlib import sha1
# from Crypto.PublicKey import RSA
from Cryptodome.PublicKey import RSA
from saml2.metadata import ENDPOINTS from saml2.metadata import ENDPOINTS
from saml2.profile import paos, ecp from saml2.profile import paos, ecp
from saml2.soap import parse_soap_enveloped_saml_artifact_resolve from saml2.soap import parse_soap_enveloped_saml_artifact_resolve

View File

@ -19,25 +19,13 @@ from binascii import hexlify
from future.backports.urllib.parse import urlencode from future.backports.urllib.parse import urlencode
# from Crypto.PublicKey.RSA import importKey from cryptography.exceptions import InvalidSignature
# from Crypto.Signature import PKCS1_v1_5 from cryptography.hazmat.backends import default_backend
# from Crypto.Util.asn1 import DerSequence from cryptography.hazmat.primitives import hashes
# from Crypto.PublicKey import RSA from cryptography.hazmat.primitives.asymmetric import rsa
# from Crypto.Hash import SHA from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
# from Crypto.Hash import SHA224 from cryptography.hazmat.primitives.serialization import load_pem_private_key
# from Crypto.Hash import SHA256 from cryptography.x509 import load_pem_x509_certificate
# from Crypto.Hash import SHA384
# from Crypto.Hash import SHA512
from Cryptodome.PublicKey.RSA import importKey
from Cryptodome.Signature import PKCS1_v1_5
from Cryptodome.Util.asn1 import DerSequence
from Cryptodome.PublicKey import RSA
from Cryptodome.Hash import SHA
from Cryptodome.Hash import SHA224
from Cryptodome.Hash import SHA256
from Cryptodome.Hash import SHA384
from Cryptodome.Hash import SHA512
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from subprocess import Popen from subprocess import Popen
@ -87,6 +75,8 @@ XMLTAG = "<?xml version='1.0'?>"
PREFIX1 = "<?xml version='1.0' encoding='UTF-8'?>" PREFIX1 = "<?xml version='1.0' encoding='UTF-8'?>"
PREFIX2 = '<?xml version="1.0" encoding="UTF-8"?>' PREFIX2 = '<?xml version="1.0" encoding="UTF-8"?>'
backend = default_backend()
class SigverError(SAMLError): class SigverError(SAMLError):
pass pass
@ -406,14 +396,6 @@ def active_cert(key):
""" """
try: try:
cert_str = pem_format(key) cert_str = pem_format(key)
try:
certificate = importKey(cert_str)
not_before = to_time(str(certificate.get_not_before()))
not_after = to_time(str(certificate.get_not_after()))
assert not_before < utc_now()
assert not_after > utc_now()
return True
except:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str) cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
assert cert.has_expired() == 0 assert cert.has_expired() == 0
assert not OpenSSLWrapper().certificate_not_valid_yet(cert) assert not OpenSSLWrapper().certificate_not_valid_yet(cert)
@ -555,19 +537,8 @@ def rsa_eq(key1, key2):
def extract_rsa_key_from_x509_cert(pem): def extract_rsa_key_from_x509_cert(pem):
# Convert from PEM to DER cert = load_pem_x509_certificate(pem, backend)
der = ssl.PEM_cert_to_DER_cert(pem.decode('ascii')) return cert.public_key()
# Extract subjectPublicKeyInfo field from X.509 certificate (see RFC3280)
cert = DerSequence()
cert.decode(der)
tbsCertificate = DerSequence()
tbsCertificate.decode(cert[0])
subjectPublicKeyInfo = tbsCertificate[6]
# Initialize RSA key
rsa_key = RSA.importKey(subjectPublicKeyInfo)
return rsa_key
def pem_format(key): def pem_format(key):
@ -576,7 +547,7 @@ def pem_format(key):
def import_rsa_key_from_file(filename): def import_rsa_key_from_file(filename):
return RSA.importKey(read_file(filename, 'r')) return load_pem_private_key(read_file(filename, 'rb'), None, backend)
def parse_xmlsec_output(output): def parse_xmlsec_output(output):
@ -622,25 +593,28 @@ class RSASigner(Signer):
if key is None: if key is None:
key = self.key key = self.key
h = self.digest.new(msg) return key.sign(msg, PKCS1v15(), self.digest)
signer = PKCS1_v1_5.new(key)
return signer.sign(h)
def verify(self, msg, sig, key=None): def verify(self, msg, sig, key=None):
if key is None: if key is None:
key = self.key key = self.key
h = self.digest.new(msg) try:
verifier = PKCS1_v1_5.new(key) if isinstance(key, rsa.RSAPrivateKey):
return verifier.verify(h, sig) key = key.public_key()
key.verify(sig, msg, PKCS1v15(), self.digest)
return True
except InvalidSignature:
return False
SIGNER_ALGS = { SIGNER_ALGS = {
SIG_RSA_SHA1: RSASigner(SHA), SIG_RSA_SHA1: RSASigner(hashes.SHA1()),
SIG_RSA_SHA224: RSASigner(SHA224), SIG_RSA_SHA224: RSASigner(hashes.SHA224()),
SIG_RSA_SHA256: RSASigner(SHA256), SIG_RSA_SHA256: RSASigner(hashes.SHA256()),
SIG_RSA_SHA384: RSASigner(SHA384), SIG_RSA_SHA384: RSASigner(hashes.SHA384()),
SIG_RSA_SHA512: RSASigner(SHA512), SIG_RSA_SHA512: RSASigner(hashes.SHA512()),
} }
REQ_ORDER = ["SAMLRequest", "RelayState", "SigAlg"] REQ_ORDER = ["SAMLRequest", "RelayState", "SigAlg"]