diff --git a/anchor/X509/signature.py b/anchor/X509/signature.py index f74925a..6c14b4f 100644 --- a/anchor/X509/signature.py +++ b/anchor/X509/signature.py @@ -50,20 +50,6 @@ SIGNING_ALGORITHMS = { SIGNING_ALGORITHMS_INV = dict((v, k) for k, v in SIGNING_ALGORITHMS.items()) -SIGNER_CONSTRUCTION = { - sha224WithRSAEncryption: (lambda key: key.signer(padding.PKCS1v15(), - hashes.SHA224())), - sha256WithRSAEncryption: (lambda key: key.signer(padding.PKCS1v15(), - hashes.SHA256())), - sha384WithRSAEncryption: (lambda key: key.signer(padding.PKCS1v15(), - hashes.SHA384())), - sha512WithRSAEncryption: (lambda key: key.signer(padding.PKCS1v15(), - hashes.SHA512())), - id_dsa_with_sha224: (lambda key: key.signer(hashes.SHA224())), - id_dsa_with_sha256: (lambda key: key.signer(hashes.SHA256())), -} - - VERIFIER_CONSTRUCTION = { sha224WithRSAEncryption: (lambda key, signature: key.verifier( signature, padding.PKCS1v15(), hashes.SHA224())), @@ -96,17 +82,10 @@ class SignatureMixin(object): Both operations rely on the functions provided by the certificate and csr classes. """ - def sign(self, key, md="sha256"): + def sign(self, encryption, md, signer): """Sign the current object.""" md = md.upper() - if isinstance(key, rsa.RSAPrivateKey): - encryption = 'RSA' - elif isinstance(key, dsa.DSAPrivateKey): - encryption = 'DSA' - else: - raise errors.X509Error("Unknown key type: %s" % (key.__class__,)) - signature_type = SIGNING_ALGORITHMS.get((encryption, md)) if signature_type is None: raise errors.X509Error( @@ -120,9 +99,7 @@ class SignatureMixin(object): self._embed_signature_algorithm(algo_id) to_sign = self._get_bytes_to_sign() - signer = SIGNER_CONSTRUCTION[signature_type](key) - signer.update(to_sign) - signature = signer.finalize() + signature = signer(to_sign) self._embed_signature(algo_id, signature) diff --git a/anchor/app.py b/anchor/app.py index 399209f..17fd39f 100644 --- a/anchor/app.py +++ b/anchor/app.py @@ -15,7 +15,6 @@ from __future__ import absolute_import import logging import os -import stat import sys import paste @@ -23,57 +22,40 @@ from paste import translogger # noqa import pecan from anchor import audit +from anchor import errors from anchor import jsonloader logger = logging.getLogger(__name__) -class ConfigValidationException(Exception): - pass - - def config_check_domains(validator_set): for name, step in validator_set.items(): if 'allowed_domains' in step: for domain in step['allowed_domains']: if not domain.startswith('.'): - raise ConfigValidationException( + raise errors.ConfigValidationException( "Domain that does not start with " "a '.' <{}>".format(domain)) -def _check_file_permissions(path): - # checks that file is owner readable only - expected_permissions = (stat.S_IRUSR | stat.S_IFREG) # 0o100400 - st = os.stat(path) - if st.st_mode != expected_permissions: - raise ConfigValidationException("CA file: %s has incorrect " - "permissions set, expected " - "owner readable only" % path) - - -def _check_file_exists(path): - if not (os.path.isfile(path) and - os.access(path, os.R_OK)): - raise ConfigValidationException("could not read file: %s" % - path) - - def validate_config(conf): for old_name in ['auth', 'ca', 'validators']: if old_name in conf.config: - raise ConfigValidationException("The config seems to be for an " - "old version of Anchor. Please " - "check documentation.") + raise errors.ConfigValidationException( + "The config seems to be for an old version of Anchor. Please " + "check documentation.") if not conf.config.get('registration_authority'): - raise ConfigValidationException("No registration authorities present") + raise errors.ConfigValidationException( + "No registration authorities present") if not conf.config.get('signing_ca'): - raise ConfigValidationException("No signing CA configurations present") + raise errors.ConfigValidationException( + "No signing CA configurations present") if not conf.config.get('authentication'): - raise ConfigValidationException("No authentication methods present") + raise errors.ConfigValidationException( + "No authentication methods present") for name in conf.registration_authority.keys(): logger.info("Checking config for registration authority: %s", name) @@ -99,13 +81,13 @@ def validate_audit_config(conf): audit_conf = conf.audit if audit_conf.get('target', 'log') not in valid_targets: - raise ConfigValidationException( + raise errors.ConfigValidationException( "Audit target not known (expected one of %s)" % ( ", ".join(valid_targets),)) if audit_conf.get('target') == 'messaging': if audit_conf.get('url') is None: - raise ConfigValidationException("Audit url required") + raise errors.ConfigValidationException("Audit url required") def validate_authentication_config(name, conf): @@ -115,11 +97,11 @@ def validate_authentication_config(name, conf): default_secret = "simplepassword" if not auth_conf.get('backend'): - raise ConfigValidationException( + raise errors.ConfigValidationException( "Authentication method %s doesn't define backend" % name) if auth_conf['backend'] not in ('static', 'keystone', 'ldap'): - raise ConfigValidationException( + raise errors.ConfigValidationException( "Authentication backend % unknown" % (auth_conf['backend'],)) # Check for anchor being run with default user/secret @@ -132,51 +114,45 @@ def validate_authentication_config(name, conf): def validate_signing_ca_config(name, conf): ca_conf = conf.signing_ca[name] + backend_name = ca_conf.get('backend') + if not backend_name: + raise errors.ConfigValidationException( + "Backend type not defined for RA '%s'" % name) + sign_func = jsonloader.conf.get_signing_backend(backend_name) + if not sign_func: + raise errors.ConfigValidationException( + "Backend '%s' could not be found" % backend_name) - # mandatory CA settings - ca_config_requirements = ["cert_path", "key_path", "output_path", - "signing_hash", "valid_hours"] - - for requirement in ca_config_requirements: - if requirement not in ca_conf.keys(): - raise ConfigValidationException( - "CA config missing: %s (for signing CA %s)" % (requirement, - name)) - - # all are specified, check the CA certificate and key are readable with - # sane permissions - _check_file_exists(ca_conf['cert_path']) - _check_file_exists(ca_conf['key_path']) - - _check_file_permissions(ca_conf['key_path']) + if hasattr(sign_func, "_config_validator"): + sign_func._config_validator(name, ca_conf) def validate_registration_authority_config(ra_name, conf): ra_conf = conf.registration_authority[ra_name] auth_name = ra_conf.get('authentication') if not auth_name: - raise ConfigValidationException( + raise errors.ConfigValidationException( "No authentication configured for registration authority: %s" % ra_name) if not conf.authentication.get(auth_name): - raise ConfigValidationException( + raise errors.ConfigValidationException( "Authentication method %s configured for registration authority " "%s doesn't exist" % (auth_name, ra_name)) ca_name = ra_conf.get('signing_ca') if not ca_name: - raise ConfigValidationException( + raise errors.ConfigValidationException( "No signing CA configuration present for registration authority: " "%s" % ra_name) if not conf.signing_ca.get(ca_name): - raise ConfigValidationException( + raise errors.ConfigValidationException( "Signing CA %s configured for registration authority %s doesn't " "exist" % (ca_name, ra_name)) if not ra_conf.get("validators"): - raise ConfigValidationException( + raise errors.ConfigValidationException( "No validators configured for registration authority: %s" % ra_name) @@ -186,7 +162,7 @@ def validate_registration_authority_config(ra_name, conf): try: jsonloader.conf.get_validator(step) except KeyError: - raise ConfigValidationException( + raise errors.ConfigValidationException( "Unknown validator <{}> found (for registration " "authority {})".format(step, ra_name)) @@ -199,7 +175,7 @@ def validate_registration_authority_config(ra_name, conf): try: jsonloader.conf.get_fixup(step) except KeyError: - raise ConfigValidationException( + raise errors.ConfigValidationException( "Unknown fixup <{}> found (for registration " "authority {})".format(step, ra_name)) diff --git a/anchor/certificate_ops.py b/anchor/certificate_ops.py index e234b67..44b531c 100644 --- a/anchor/certificate_ops.py +++ b/anchor/certificate_ops.py @@ -15,8 +15,6 @@ from __future__ import absolute_import import logging import os -import time -import uuid import pecan from webob import exc as http_status @@ -26,9 +24,7 @@ from anchor import jsonloader from anchor import util from anchor import validation from anchor.X509 import certificate -from anchor.X509 import extension from anchor.X509 import signing_request -from anchor.X509 import utils as x509_utils logger = logging.getLogger(__name__) @@ -39,10 +35,6 @@ logger = logging.getLogger(__name__) VALID_ENCODINGS = ['pem'] -class SigningError(Exception): - pass - - def parse_csr(data, encoding): """Loads the user provided CSR into the backend X509 library. @@ -204,79 +196,3 @@ def fixup_csr(ra_name, csr, request): pecan.abort(500, "Could not finish all required modifications") return args['csr'] - - -def sign(csr, ca_conf): - """Generate an X.509 certificate and sign it. - - :param csr: X509 certificate signing request - :param ca_conf: signing CA configuration - :return: signed certificate in PEM format - """ - try: - ca = certificate.X509Certificate.from_file( - ca_conf['cert_path']) - except Exception as e: - raise SigningError("Cannot load the signing CA: %s" % (e,)) - - try: - key = x509_utils.get_private_key_from_file(ca_conf['key_path']) - except Exception as e: - raise SigningError("Cannot load the signing CA key: %s" % (e,)) - - new_cert = certificate.X509Certificate() - new_cert.set_version(2) - - start_time = int(time.time()) - end_time = start_time + (ca_conf['valid_hours'] * 60 * 60) - new_cert.set_not_before(start_time) - new_cert.set_not_after(end_time) - - new_cert.set_pubkey(pkey=csr.get_pubkey()) - new_cert.set_subject(csr.get_subject()) - new_cert.set_issuer(ca.get_subject()) - - serial = int(uuid.uuid4().hex, 16) - new_cert.set_serial_number(serial) - - exts = csr.get_extensions() - - ext_i = 0 - for ext in exts: - # this check is separate from standards validator - the signing backend - # may know about more/fewer extensions than we do - if ext.get_oid() not in extension.EXTENSION_CLASSES.keys(): - if ext.get_critical(): - logger.warning("CSR submitted with unknown extension oid %s, " - "refusing to sign", ext.get_oid()) - raise SigningError("Unknown critical extension %s" % ( - ext.get_oid(),)) - else: - logger.info("CSR submitted with non-critical unknown oid %s, " - "not including extension", (ext.get_oid(),)) - else: - logger.info("Adding certificate extension: %i %s", ext_i, str(ext)) - # authority id will be replaced with current signer - # this cannot be a fixup, because they don't get access to the CA - if isinstance(ext, extension.X509ExtensionAuthorityKeyId): - continue - - new_cert.add_extension(ext, ext_i) - ext_i += 1 - - ca_exts = ca.get_extensions(extension.X509ExtensionSubjectKeyId) - auth_key_id = extension.X509ExtensionAuthorityKeyId() - if ca_exts: - auth_key_id.set_key_id(ca_exts[0].get_key_id()) - else: - auth_key_id.set_key_id(ca.get_key_id()) - new_cert.add_extension(auth_key_id, ext_i) - - logger.info("Signing certificate for <%s> with serial <%s>", - csr.get_subject(), serial) - - new_cert.sign(key, ca_conf['signing_hash']) - - cert_pem = new_cert.as_pem() - - return cert_pem diff --git a/anchor/errors.py b/anchor/errors.py new file mode 100644 index 0000000..57f94d9 --- /dev/null +++ b/anchor/errors.py @@ -0,0 +1,2 @@ +class ConfigValidationException(Exception): + pass diff --git a/anchor/signers/__init__.py b/anchor/signers/__init__.py new file mode 100644 index 0000000..450896c --- /dev/null +++ b/anchor/signers/__init__.py @@ -0,0 +1,91 @@ +import logging +import time +import uuid + +from anchor.X509 import certificate +from anchor.X509 import extension + + +logger = logging.getLogger(__name__) + + +def config_validator(val): + def patcher(f): + setattr(f, "_config_validator", val) + return f + return patcher + + +class SigningError(Exception): + pass + + +def sign_generic(csr, ca_conf, encryption, signer): + """Generate an X.509 certificate and sign it. + + :param csr: X509 certificate signing request + :param ca_conf: signing CA configuration + :return: signed certificate in PEM format + """ + try: + ca = certificate.X509Certificate.from_file( + ca_conf['cert_path']) + except Exception as e: + raise SigningError("Cannot load the signing CA: %s" % (e,)) + + new_cert = certificate.X509Certificate() + new_cert.set_version(2) + + start_time = int(time.time()) + end_time = start_time + (ca_conf['valid_hours'] * 60 * 60) + new_cert.set_not_before(start_time) + new_cert.set_not_after(end_time) + + new_cert.set_pubkey(pkey=csr.get_pubkey()) + new_cert.set_subject(csr.get_subject()) + new_cert.set_issuer(ca.get_subject()) + + serial = int(uuid.uuid4().hex, 16) + new_cert.set_serial_number(serial) + + exts = csr.get_extensions() + + ext_i = 0 + for ext in exts: + # this check is separate from standards validator - the signing backend + # may know about more/fewer extensions than we do + if ext.get_oid() not in extension.EXTENSION_CLASSES.keys(): + if ext.get_critical(): + logger.warning("CSR submitted with unknown extension oid %s, " + "refusing to sign", ext.get_oid()) + raise SigningError("Unknown critical extension %s" % ( + ext.get_oid(),)) + else: + logger.info("CSR submitted with non-critical unknown oid %s, " + "not including extension", (ext.get_oid(),)) + else: + logger.info("Adding certificate extension: %i %s", ext_i, str(ext)) + # authority id will be replaced with current signer + # this cannot be a fixup, because they don't get access to the CA + if isinstance(ext, extension.X509ExtensionAuthorityKeyId): + continue + + new_cert.add_extension(ext, ext_i) + ext_i += 1 + + ca_exts = ca.get_extensions(extension.X509ExtensionSubjectKeyId) + auth_key_id = extension.X509ExtensionAuthorityKeyId() + if ca_exts: + auth_key_id.set_key_id(ca_exts[0].get_key_id()) + else: + auth_key_id.set_key_id(ca.get_key_id()) + new_cert.add_extension(auth_key_id, ext_i) + + logger.info("Signing certificate for <%s> with serial <%s>", + csr.get_subject(), serial) + + new_cert.sign(encryption, ca_conf['signing_hash'], signer) + + cert_pem = new_cert.as_pem() + + return cert_pem diff --git a/anchor/signers/cryptography_io.py b/anchor/signers/cryptography_io.py new file mode 100644 index 0000000..5ce0978 --- /dev/null +++ b/anchor/signers/cryptography_io.py @@ -0,0 +1,74 @@ +from cryptography.hazmat.primitives.asymmetric import dsa +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import hashes + +from anchor import errors +from anchor import signers +from anchor import util +from anchor.X509 import utils as x509_utils + + +SIGNER_CONSTRUCTION = { + ('RSA', 'SHA224'): (lambda key: key.signer(padding.PKCS1v15(), + hashes.SHA224())), + ('RSA', 'SHA256'): (lambda key: key.signer(padding.PKCS1v15(), + hashes.SHA256())), + ('RSA', 'SHA384'): (lambda key: key.signer(padding.PKCS1v15(), + hashes.SHA384())), + ('RSA', 'SHA512'): (lambda key: key.signer(padding.PKCS1v15(), + hashes.SHA512())), + ('DSA', 'SHA224'): (lambda key: key.signer(hashes.SHA224())), + ('DSA', 'SHA256'): (lambda key: key.signer(hashes.SHA256())), +} + + +def conf_validator(name, ca_conf): + # mandatory CA settings + ca_config_requirements = ["cert_path", "key_path", "output_path", + "signing_hash", "valid_hours"] + + for requirement in ca_config_requirements: + if requirement not in ca_conf.keys(): + raise errors.ConfigValidationException( + "CA config missing: %s (for signing CA %s)" % (requirement, + name)) + + # all are specified, check the CA certificate and key are readable with + # sane permissions + util.check_file_exists(ca_conf['cert_path']) + util.check_file_exists(ca_conf['key_path']) + + util.check_file_permissions(ca_conf['key_path']) + + +def make_signer(key, encryption, md): + signer = SIGNER_CONSTRUCTION.get((encryption, md.upper())) + if signer is None: + raise signers.SigningError( + "Unknown hash/encryption combination (%s/%s)" % (md, encryption)) + signer = signer(key) + + def cryptography_io_signer(to_sign): + signer.update(to_sign) + return signer.finalize() + + return cryptography_io_signer + + +@signers.config_validator(conf_validator) +def sign(csr, ca_conf): + try: + key = x509_utils.get_private_key_from_file(ca_conf['key_path']) + except Exception as e: + raise signers.SigningError("Cannot load the signing CA key: %s" % (e,)) + + if isinstance(key, rsa.RSAPrivateKey): + encryption = 'RSA' + elif isinstance(key, dsa.DSAPrivateKey): + encryption = 'DSA' + else: + raise signers.SigningError("Unknown key type: %s" % (key.__class__,)) + + signer = make_signer(key, encryption, ca_conf['signing_hash']) + return signers.sign_generic(csr, ca_conf, encryption, signer) diff --git a/anchor/util.py b/anchor/util.py index ca37431..35d6fc6 100644 --- a/anchor/util.py +++ b/anchor/util.py @@ -15,7 +15,11 @@ from __future__ import absolute_import import base64 import hmac +import os import re +import stat + +from anchor import errors def constant_time_compare(val1, val2): @@ -100,3 +104,20 @@ def extract_pem(data, use_markers=True): return None decoder = getattr(base64, 'decodebytes', base64.decodestring) return decoder(b64_content) + + +def check_file_permissions(path): + # checks that file is owner readable only + expected_permissions = (stat.S_IRUSR | stat.S_IFREG) # 0o100400 + st = os.stat(path) + if st.st_mode != expected_permissions: + raise errors.ConfigValidationException("CA file: %s has incorrect " + "permissions set, expected " + "owner readable only" % path) + + +def check_file_exists(path): + if not (os.path.isfile(path) and + os.access(path, os.R_OK)): + raise errors.ConfigValidationException("could not read file: %s" % + path) diff --git a/doc/source/signing_backends.rst b/doc/source/signing_backends.rst index 9162fc9..3cf130f 100644 --- a/doc/source/signing_backends.rst +++ b/doc/source/signing_backends.rst @@ -53,9 +53,11 @@ Backends are simple functions which need to take 2 parameters: the CSR in PEM format and the configuration block contents. Configuration can contain any keys required by the backend. -The return value must be a signed certificate in PEM format. The backend may -either throw a specific ``WebOb`` HTTP exception, or any other exception which -will result in a generic 500 response. +The return value must be a signed certificate in PEM format, however in most +cases it's enough to implement the actual hash signing part and rely on +``anchor.signer.sign_generic`` framework. The backend may either throw a +specific ``WebOb`` HTTP exception, or SigningError exception which will result +in a 500 response. For security, http exceptions from the signing backend should not expose any specific information about the reason for failure. Internal exceptions are @@ -66,3 +68,6 @@ are applied to the submitted CSR in Anchor, they will invalidate the signature. Unless the backend is intended to work only with validators, and not any fixup operations in the future, the signature field should be ignored and the request treated as already correct/verified. + +Configuration is verified using the function provided using the +``@signers.config_validator(f)`` decorator. diff --git a/setup.cfg b/setup.cfg index cfb80df..17df612 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,7 +28,7 @@ source-dir = doc/source [entry_points] anchor.signing_backends = - anchor = anchor.certificate_ops:sign + anchor = anchor.signers.cryptography_io:sign anchor.validators = check_domains = anchor.validators.custom:check_domains diff --git a/tests/X509/test_x509_certificate.py b/tests/X509/test_x509_certificate.py index 737217f..0e06333 100644 --- a/tests/X509/test_x509_certificate.py +++ b/tests/X509/test_x509_certificate.py @@ -25,7 +25,6 @@ from anchor.X509 import certificate from anchor.X509 import errors as x509_errors from anchor.X509 import extension from anchor.X509 import name as x509_name -from anchor.X509 import utils class TestX509Cert(unittest.TestCase): @@ -238,17 +237,6 @@ class TestX509Cert(unittest.TestCase): with self.assertRaises(x509_errors.X509Error): self.cert.get_fingerprint('no_such_hash') - def test_sign_bad_md(self): - key = utils.get_private_key_from_pem(self.key_rsa_data) - self.assertRaises(x509_errors.X509Error, - self.cert.sign, - key, "BAD") - - def test_sign_bad_key(self): - self.assertRaises(x509_errors.X509Error, - self.cert.sign, - "BAD") - def test_get_version(self): v = self.cert.get_version() self.assertEqual(v, 2) @@ -290,16 +278,6 @@ class TestX509Cert(unittest.TestCase): with self.assertRaises(x509_errors.X509Error): self.cert.add_extension("abcdef", 2) - def test_sign_unknown_key(self): - key = object() - with self.assertRaises(x509_errors.X509Error): - self.cert.sign(key, 'sha1') - - def test_sign_unknown_hash(self): - key = utils.get_private_key_from_pem(self.key_rsa_data) - with self.assertRaises(x509_errors.X509Error): - self.cert.sign(key, 'no_such_hash') - def test_verify_unknown_key(self): with self.assertRaises(x509_errors.X509Error): self.cert.verify("abc") diff --git a/tests/X509/test_x509_csr.py b/tests/X509/test_x509_csr.py index 4d7bdb5..76e91bf 100644 --- a/tests/X509/test_x509_csr.py +++ b/tests/X509/test_x509_csr.py @@ -21,6 +21,7 @@ import unittest import mock from pyasn1_modules import rfc2459 +from anchor.signers import cryptography_io from anchor.X509 import errors as x509_errors from anchor.X509 import extension from anchor.X509 import name as x509_name @@ -158,7 +159,8 @@ class TestX509Csr(tests.DefaultRequestMixin, unittest.TestCase): def test_sign(self): key = utils.get_private_key_from_pem(self.key_rsa_data) - self.csr.sign(key) + signer = cryptography_io.make_signer(key, 'RSA', 'SHA256') + self.csr.sign('RSA', 'SHA256', signer) # 10 bytes is definitely enough for non malicious case, right? self.assertEqual(b'\x16\xbd!\x9b\xfb\xfd\x10\xa1\xaf\x92', self.csr._get_signature()[:10]) diff --git a/tests/controllers/test_app.py b/tests/controllers/test_app.py index 71ea37b..72ffdc1 100644 --- a/tests/controllers/test_app.py +++ b/tests/controllers/test_app.py @@ -22,7 +22,9 @@ import unittest import mock from anchor import app +from anchor import errors from anchor import jsonloader +from anchor import util import tests @@ -39,8 +41,8 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase): def test_self_test(self): self.assertTrue(True) - @mock.patch('anchor.app._check_file_exists') - @mock.patch('anchor.app._check_file_permissions') + @mock.patch('anchor.util.check_file_exists') + @mock.patch('anchor.util.check_file_permissions') def test_config_check_domains_good(self, a, b): self.sample_conf_ra['default_ra']['validators'] = { "common_name": { @@ -54,8 +56,8 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase): with mock.patch("os.stat", **config): self.assertEqual(app.validate_config(jsonloader.conf), None) - @mock.patch('anchor.app._check_file_exists') - @mock.patch('anchor.app._check_file_permissions') + @mock.patch('anchor.util.check_file_exists') + @mock.patch('anchor.util.check_file_permissions') def test_config_check_domains_bad(self, a, b): self.sample_conf_ra['default_ra']['validators'] = { "common_name": { @@ -68,7 +70,7 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase): config = {'return_value.st_mode': (stat.S_IRUSR | stat.S_IFREG)} with mock.patch("os.stat", **config): self.assertRaises( - app.ConfigValidationException, + errors.ConfigValidationException, app.validate_config, jsonloader.conf ) @@ -76,13 +78,13 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase): def test_check_file_permissions_good(self): config = {'return_value.st_mode': (stat.S_IRUSR | stat.S_IFREG)} with mock.patch("os.stat", **config): - app._check_file_permissions("/mock/path") + util.check_file_permissions("/mock/path") def test_check_file_permissions_bad(self): config = {'return_value.st_mode': (stat.S_IWOTH | stat.S_IFREG)} with mock.patch("os.stat", **config): - self.assertRaises(app.ConfigValidationException, - app._check_file_permissions, "/mock/path") + self.assertRaises(errors.ConfigValidationException, + util.check_file_permissions, "/mock/path") def test_validate_old_config(self): config = json.dumps({ @@ -91,45 +93,45 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase): "validators": {}, }) jsonloader.conf.load_str_data(config) - self.assertRaisesRegexp(app.ConfigValidationException, + self.assertRaisesRegexp(errors.ConfigValidationException, "old version of Anchor", app.validate_config, jsonloader.conf) - @mock.patch('anchor.app._check_file_permissions') + @mock.patch('anchor.util.check_file_permissions') def test_validate_config_no_registration_authorities(self, mock_check_perm): del self.sample_conf['registration_authority'] config = json.dumps(self.sample_conf) jsonloader.conf.load_str_data(config) - self.assertRaisesRegexp(app.ConfigValidationException, + self.assertRaisesRegexp(errors.ConfigValidationException, "No registration authorities present", app.validate_config, jsonloader.conf) - @mock.patch('anchor.app._check_file_permissions') + @mock.patch('anchor.util.check_file_permissions') def test_validate_config_no_auth(self, mock_check_perm): del self.sample_conf['authentication'] config = json.dumps(self.sample_conf) jsonloader.conf.load_str_data(config) - self.assertRaisesRegexp(app.ConfigValidationException, + self.assertRaisesRegexp(errors.ConfigValidationException, "No authentication methods present", app.validate_config, jsonloader.conf) - @mock.patch('anchor.app._check_file_permissions') + @mock.patch('anchor.util.check_file_permissions') def test_validate_config_no_auth_backend(self, mock_check_perm): del self.sample_conf_auth['default_auth']['backend'] config = json.dumps(self.sample_conf) jsonloader.conf.load_str_data(config) - self.assertRaisesRegexp(app.ConfigValidationException, + self.assertRaisesRegexp(errors.ConfigValidationException, "Authentication method .* doesn't define " "backend", app.validate_config, jsonloader.conf) - @mock.patch('anchor.app._check_file_permissions') + @mock.patch('anchor.util.check_file_permissions') def test_validate_config_no_ra_auth(self, mock_check_perm): del self.sample_conf_ra['default_ra']['authentication'] config = json.dumps(self.sample_conf) jsonloader.conf.load_str_data(config) - self.assertRaisesRegexp(app.ConfigValidationException, + self.assertRaisesRegexp(errors.ConfigValidationException, "No authentication .* for .* default_ra", app.validate_config, jsonloader.conf) @@ -137,20 +139,20 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase): del self.sample_conf['signing_ca'] config = json.dumps(self.sample_conf) jsonloader.conf.load_str_data(config) - self.assertRaisesRegexp(app.ConfigValidationException, + self.assertRaisesRegexp(errors.ConfigValidationException, "No signing CA configurations present", app.validate_config, jsonloader.conf) - @mock.patch('anchor.app._check_file_permissions') + @mock.patch('anchor.util.check_file_permissions') def test_validate_config_no_ra_ca(self, mock_check_perm): del self.sample_conf_ra['default_ra']['signing_ca'] config = json.dumps(self.sample_conf) jsonloader.conf.load_str_data(config) - self.assertRaisesRegexp(app.ConfigValidationException, + self.assertRaisesRegexp(errors.ConfigValidationException, "No signing CA .* for .* default_ra", app.validate_config, jsonloader.conf) - @mock.patch('anchor.app._check_file_permissions') + @mock.patch('anchor.util.check_file_permissions') def test_validate_config_ca_config_reqs(self, mock_check_perm): ca_config_requirements = ["cert_path", "key_path", "output_path", "signing_hash", "valid_hours"] @@ -162,7 +164,7 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase): # with 'missing_req', perform validation. Each should raise in turn for req in ca_config_requirements: jsonloader.conf.load_str_data(config.replace(req, "missing_req")) - self.assertRaisesRegexp(app.ConfigValidationException, + self.assertRaisesRegexp(errors.ConfigValidationException, "CA config missing: %s" % req, app.validate_config, jsonloader.conf) @@ -171,11 +173,11 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase): config = json.dumps(self.sample_conf) jsonloader.conf.load_str_data(config) isfile.return_value = False - self.assertRaisesRegexp(app.ConfigValidationException, + self.assertRaisesRegexp(errors.ConfigValidationException, "could not read file: tests/CA/root-ca.crt", app.validate_config, jsonloader.conf) - @mock.patch('anchor.app._check_file_permissions') + @mock.patch('anchor.util.check_file_permissions') @mock.patch('os.path.isfile') @mock.patch('os.access') @mock.patch('os.stat') @@ -187,11 +189,11 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase): isfile.return_value = True access.return_value = True stat.return_value.st_mode = self.expected_key_permissions - self.assertRaisesRegexp(app.ConfigValidationException, + self.assertRaisesRegexp(errors.ConfigValidationException, "No validators configured", app.validate_config, jsonloader.conf) - @mock.patch('anchor.app._check_file_permissions') + @mock.patch('anchor.util.check_file_permissions') @mock.patch('os.path.isfile') @mock.patch('os.access') @mock.patch('os.stat') @@ -203,13 +205,13 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase): isfile.return_value = True access.return_value = True stat.return_value.st_mode = self.expected_key_permissions - with self.assertRaises(app.ConfigValidationException, + with self.assertRaises(errors.ConfigValidationException, msg="Unknown validator " "found (for registration authority " "default)"): app.validate_config(jsonloader.conf) - @mock.patch('anchor.app._check_file_permissions') + @mock.patch('anchor.util.check_file_permissions') @mock.patch('os.path.isfile') @mock.patch('os.access') @mock.patch('os.stat') diff --git a/tests/test_signing_backend.py b/tests/test_signing_backend.py index 281215d..fa2edd5 100644 --- a/tests/test_signing_backend.py +++ b/tests/test_signing_backend.py @@ -14,14 +14,17 @@ # License for the specific language governing permissions and limitations # under the License. +import textwrap import unittest from pyasn1.type import univ as asn1_univ -from anchor import certificate_ops +from anchor import signers +from anchor.signers import cryptography_io from anchor.X509 import certificate from anchor.X509 import extension from anchor.X509 import signing_request +from anchor.X509 import utils import tests @@ -38,7 +41,8 @@ class SigningBackendExtensions(tests.DefaultConfigMixin, ext.add_dns_id("example.com") csr.add_extension(ext) - pem = certificate_ops.sign(csr, self.sample_conf_ca['default_ca']) + pem = signers.sign_generic(csr, self.sample_conf_ca['default_ca'], + 'RSA', lambda x: b"") cert = certificate.X509Certificate.from_buffer(pem) self.assertEqual(1, len(cert.get_extensions( extension.X509ExtensionSubjectAltName))) @@ -48,7 +52,8 @@ class SigningBackendExtensions(tests.DefaultConfigMixin, ext = UnknownExtension() csr.add_extension(ext) - pem = certificate_ops.sign(csr, self.sample_conf_ca['default_ca']) + pem = signers.sign_generic(csr, self.sample_conf_ca['default_ca'], + 'RSA', lambda x: b"") cert = certificate.X509Certificate.from_buffer(pem) self.assertEqual(0, len(cert.get_extensions(UnknownExtension))) @@ -58,5 +63,35 @@ class SigningBackendExtensions(tests.DefaultConfigMixin, ext.set_critical(True) csr.add_extension(ext) - with self.assertRaises(certificate_ops.SigningError): - certificate_ops.sign(csr, self.sample_conf_ca['default_ca']) + with self.assertRaises(signers.SigningError): + signers.sign_generic(csr, self.sample_conf_ca['default_ca'], + 'RSA', lambda x: b"") + + +class TestCryptographyBackend(tests.DefaultConfigMixin, + tests.DefaultRequestMixin, unittest.TestCase): + key_rsa_data = textwrap.dedent(""" + -----BEGIN RSA PRIVATE KEY----- + MIICXAIBAAKBgQCeeqg1Qeccv8hqj1BP9KEJX5QsFCxR62M8plPb5t4sLo8UYfZd + 6kFLcOP8xzwwvx/eFY6Sux52enQ197o8aMwyP77hMhZqtd8NCgLJMVlUbRhwLti0 + SkHFPic0wAg+esfXa6yhd5TxC+bti7MgV/ljA80XQxHH8xOjdOoGN0DHfQIDAQAB + AoGBAJ2ozJpe+7qgGJPaCz3f0izvBwtq7kR49fqqRZbo8HHnx7OxWVVI7LhOkKEy + 2/Bq0xsvOu1CdiXL4LynvIDIiQqLaeINzG48Rbk+0HadbXblt3nDkIWdYII6zHKI + W9ewX4KpHEPbrlEO9BjAlAcYsDIvFIMYpQhtQ+0R/gmZ99WJAkEAz5C2a6FIcMbE + o3aTc9ECq99zY7lxh+6aLpUdIeeHyb/QzfGDBdlbpBAkA6EcxSqp0aqH4xIQnYHa + 3P5ZCShqSwJBAMN1sb76xq94xkg2cxShPFPAE6xKRFyKqLgsBYVtulOdfOtOnjh9 + 1SK2XQQfBRIRdG4Q/gDoCP8XQHpJcWMk+FcCQDnuJqulaOVo5GrG5mJ1nCxCAh98 + G06X7lo/7dCPoRtSuMExvaK9RlFk29hTeAcjYCAPWzupyA9dtarmJg1jRT8CQCKf + gYnb8D/6+9yk0IPR/9ayCooVacCeyz48hgnZowzWs98WwQ4utAd/GED3obVOpDov + Bl9wus889i3zPoOac+cCQCZHredQcJGd4dlthbVtP2NhuPXz33JuETGR9pXtsDUZ + uX/nSq1oo9kUh/dPOz6aP5Ues1YVe3LExmExPBQfwIE= + -----END RSA PRIVATE KEY-----""").encode('ascii') + + def test_sign_bad_md(self): + key = utils.get_private_key_from_pem(self.key_rsa_data) + with self.assertRaises(signers.SigningError): + cryptography_io.make_signer(key, "BAD", "RSA") + + def test_sign_bad_key(self): + with self.assertRaises(signers.SigningError): + cryptography_io.make_signer("BAD", "sha256", "RSA")