Merge "Refactor the signing backends"
This commit is contained in:
commit
737d1efc57
|
@ -50,20 +50,6 @@ SIGNING_ALGORITHMS = {
|
|||
SIGNING_ALGORITHMS_INV = dict((v, k) for k, v in SIGNING_ALGORITHMS.items())
|
||||
|
||||
|
||||
SIGNER_CONSTRUCTION = {
|
||||
sha224WithRSAEncryption: (lambda key: key.signer(padding.PKCS1v15(),
|
||||
hashes.SHA224())),
|
||||
sha256WithRSAEncryption: (lambda key: key.signer(padding.PKCS1v15(),
|
||||
hashes.SHA256())),
|
||||
sha384WithRSAEncryption: (lambda key: key.signer(padding.PKCS1v15(),
|
||||
hashes.SHA384())),
|
||||
sha512WithRSAEncryption: (lambda key: key.signer(padding.PKCS1v15(),
|
||||
hashes.SHA512())),
|
||||
id_dsa_with_sha224: (lambda key: key.signer(hashes.SHA224())),
|
||||
id_dsa_with_sha256: (lambda key: key.signer(hashes.SHA256())),
|
||||
}
|
||||
|
||||
|
||||
VERIFIER_CONSTRUCTION = {
|
||||
sha224WithRSAEncryption: (lambda key, signature: key.verifier(
|
||||
signature, padding.PKCS1v15(), hashes.SHA224())),
|
||||
|
@ -96,17 +82,10 @@ class SignatureMixin(object):
|
|||
Both operations rely on the functions provided by the certificate and
|
||||
csr classes.
|
||||
"""
|
||||
def sign(self, key, md="sha256"):
|
||||
def sign(self, encryption, md, signer):
|
||||
"""Sign the current object."""
|
||||
md = md.upper()
|
||||
|
||||
if isinstance(key, rsa.RSAPrivateKey):
|
||||
encryption = 'RSA'
|
||||
elif isinstance(key, dsa.DSAPrivateKey):
|
||||
encryption = 'DSA'
|
||||
else:
|
||||
raise errors.X509Error("Unknown key type: %s" % (key.__class__,))
|
||||
|
||||
signature_type = SIGNING_ALGORITHMS.get((encryption, md))
|
||||
if signature_type is None:
|
||||
raise errors.X509Error(
|
||||
|
@ -120,9 +99,7 @@ class SignatureMixin(object):
|
|||
|
||||
self._embed_signature_algorithm(algo_id)
|
||||
to_sign = self._get_bytes_to_sign()
|
||||
signer = SIGNER_CONSTRUCTION[signature_type](key)
|
||||
signer.update(to_sign)
|
||||
signature = signer.finalize()
|
||||
signature = signer(to_sign)
|
||||
|
||||
self._embed_signature(algo_id, signature)
|
||||
|
||||
|
|
|
@ -15,7 +15,6 @@ from __future__ import absolute_import
|
|||
|
||||
import logging
|
||||
import os
|
||||
import stat
|
||||
import sys
|
||||
|
||||
import paste
|
||||
|
@ -23,57 +22,40 @@ from paste import translogger # noqa
|
|||
import pecan
|
||||
|
||||
from anchor import audit
|
||||
from anchor import errors
|
||||
from anchor import jsonloader
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConfigValidationException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def config_check_domains(validator_set):
|
||||
for name, step in validator_set.items():
|
||||
if 'allowed_domains' in step:
|
||||
for domain in step['allowed_domains']:
|
||||
if not domain.startswith('.'):
|
||||
raise ConfigValidationException(
|
||||
raise errors.ConfigValidationException(
|
||||
"Domain that does not start with "
|
||||
"a '.' <{}>".format(domain))
|
||||
|
||||
|
||||
def _check_file_permissions(path):
|
||||
# checks that file is owner readable only
|
||||
expected_permissions = (stat.S_IRUSR | stat.S_IFREG) # 0o100400
|
||||
st = os.stat(path)
|
||||
if st.st_mode != expected_permissions:
|
||||
raise ConfigValidationException("CA file: %s has incorrect "
|
||||
"permissions set, expected "
|
||||
"owner readable only" % path)
|
||||
|
||||
|
||||
def _check_file_exists(path):
|
||||
if not (os.path.isfile(path) and
|
||||
os.access(path, os.R_OK)):
|
||||
raise ConfigValidationException("could not read file: %s" %
|
||||
path)
|
||||
|
||||
|
||||
def validate_config(conf):
|
||||
for old_name in ['auth', 'ca', 'validators']:
|
||||
if old_name in conf.config:
|
||||
raise ConfigValidationException("The config seems to be for an "
|
||||
"old version of Anchor. Please "
|
||||
raise errors.ConfigValidationException(
|
||||
"The config seems to be for an old version of Anchor. Please "
|
||||
"check documentation.")
|
||||
|
||||
if not conf.config.get('registration_authority'):
|
||||
raise ConfigValidationException("No registration authorities present")
|
||||
raise errors.ConfigValidationException(
|
||||
"No registration authorities present")
|
||||
|
||||
if not conf.config.get('signing_ca'):
|
||||
raise ConfigValidationException("No signing CA configurations present")
|
||||
raise errors.ConfigValidationException(
|
||||
"No signing CA configurations present")
|
||||
|
||||
if not conf.config.get('authentication'):
|
||||
raise ConfigValidationException("No authentication methods present")
|
||||
raise errors.ConfigValidationException(
|
||||
"No authentication methods present")
|
||||
|
||||
for name in conf.registration_authority.keys():
|
||||
logger.info("Checking config for registration authority: %s", name)
|
||||
|
@ -99,13 +81,13 @@ def validate_audit_config(conf):
|
|||
|
||||
audit_conf = conf.audit
|
||||
if audit_conf.get('target', 'log') not in valid_targets:
|
||||
raise ConfigValidationException(
|
||||
raise errors.ConfigValidationException(
|
||||
"Audit target not known (expected one of %s)" % (
|
||||
", ".join(valid_targets),))
|
||||
|
||||
if audit_conf.get('target') == 'messaging':
|
||||
if audit_conf.get('url') is None:
|
||||
raise ConfigValidationException("Audit url required")
|
||||
raise errors.ConfigValidationException("Audit url required")
|
||||
|
||||
|
||||
def validate_authentication_config(name, conf):
|
||||
|
@ -115,11 +97,11 @@ def validate_authentication_config(name, conf):
|
|||
default_secret = "simplepassword"
|
||||
|
||||
if not auth_conf.get('backend'):
|
||||
raise ConfigValidationException(
|
||||
raise errors.ConfigValidationException(
|
||||
"Authentication method %s doesn't define backend" % name)
|
||||
|
||||
if auth_conf['backend'] not in ('static', 'keystone', 'ldap'):
|
||||
raise ConfigValidationException(
|
||||
raise errors.ConfigValidationException(
|
||||
"Authentication backend % unknown" % (auth_conf['backend'],))
|
||||
|
||||
# Check for anchor being run with default user/secret
|
||||
|
@ -132,51 +114,45 @@ def validate_authentication_config(name, conf):
|
|||
|
||||
def validate_signing_ca_config(name, conf):
|
||||
ca_conf = conf.signing_ca[name]
|
||||
backend_name = ca_conf.get('backend')
|
||||
if not backend_name:
|
||||
raise errors.ConfigValidationException(
|
||||
"Backend type not defined for RA '%s'" % name)
|
||||
sign_func = jsonloader.conf.get_signing_backend(backend_name)
|
||||
if not sign_func:
|
||||
raise errors.ConfigValidationException(
|
||||
"Backend '%s' could not be found" % backend_name)
|
||||
|
||||
# mandatory CA settings
|
||||
ca_config_requirements = ["cert_path", "key_path", "output_path",
|
||||
"signing_hash", "valid_hours"]
|
||||
|
||||
for requirement in ca_config_requirements:
|
||||
if requirement not in ca_conf.keys():
|
||||
raise ConfigValidationException(
|
||||
"CA config missing: %s (for signing CA %s)" % (requirement,
|
||||
name))
|
||||
|
||||
# all are specified, check the CA certificate and key are readable with
|
||||
# sane permissions
|
||||
_check_file_exists(ca_conf['cert_path'])
|
||||
_check_file_exists(ca_conf['key_path'])
|
||||
|
||||
_check_file_permissions(ca_conf['key_path'])
|
||||
if hasattr(sign_func, "_config_validator"):
|
||||
sign_func._config_validator(name, ca_conf)
|
||||
|
||||
|
||||
def validate_registration_authority_config(ra_name, conf):
|
||||
ra_conf = conf.registration_authority[ra_name]
|
||||
auth_name = ra_conf.get('authentication')
|
||||
if not auth_name:
|
||||
raise ConfigValidationException(
|
||||
raise errors.ConfigValidationException(
|
||||
"No authentication configured for registration authority: %s" %
|
||||
ra_name)
|
||||
|
||||
if not conf.authentication.get(auth_name):
|
||||
raise ConfigValidationException(
|
||||
raise errors.ConfigValidationException(
|
||||
"Authentication method %s configured for registration authority "
|
||||
"%s doesn't exist" % (auth_name, ra_name))
|
||||
|
||||
ca_name = ra_conf.get('signing_ca')
|
||||
if not ca_name:
|
||||
raise ConfigValidationException(
|
||||
raise errors.ConfigValidationException(
|
||||
"No signing CA configuration present for registration authority: "
|
||||
"%s" % ra_name)
|
||||
|
||||
if not conf.signing_ca.get(ca_name):
|
||||
raise ConfigValidationException(
|
||||
raise errors.ConfigValidationException(
|
||||
"Signing CA %s configured for registration authority %s doesn't "
|
||||
"exist" % (ca_name, ra_name))
|
||||
|
||||
if not ra_conf.get("validators"):
|
||||
raise ConfigValidationException(
|
||||
raise errors.ConfigValidationException(
|
||||
"No validators configured for registration authority: %s" %
|
||||
ra_name)
|
||||
|
||||
|
@ -186,7 +162,7 @@ def validate_registration_authority_config(ra_name, conf):
|
|||
try:
|
||||
jsonloader.conf.get_validator(step)
|
||||
except KeyError:
|
||||
raise ConfigValidationException(
|
||||
raise errors.ConfigValidationException(
|
||||
"Unknown validator <{}> found (for registration "
|
||||
"authority {})".format(step, ra_name))
|
||||
|
||||
|
@ -199,7 +175,7 @@ def validate_registration_authority_config(ra_name, conf):
|
|||
try:
|
||||
jsonloader.conf.get_fixup(step)
|
||||
except KeyError:
|
||||
raise ConfigValidationException(
|
||||
raise errors.ConfigValidationException(
|
||||
"Unknown fixup <{}> found (for registration "
|
||||
"authority {})".format(step, ra_name))
|
||||
|
||||
|
|
|
@ -15,8 +15,6 @@ from __future__ import absolute_import
|
|||
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import pecan
|
||||
from webob import exc as http_status
|
||||
|
@ -26,9 +24,7 @@ from anchor import jsonloader
|
|||
from anchor import util
|
||||
from anchor import validation
|
||||
from anchor.X509 import certificate
|
||||
from anchor.X509 import extension
|
||||
from anchor.X509 import signing_request
|
||||
from anchor.X509 import utils as x509_utils
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -39,10 +35,6 @@ logger = logging.getLogger(__name__)
|
|||
VALID_ENCODINGS = ['pem']
|
||||
|
||||
|
||||
class SigningError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def parse_csr(data, encoding):
|
||||
"""Loads the user provided CSR into the backend X509 library.
|
||||
|
||||
|
@ -204,79 +196,3 @@ def fixup_csr(ra_name, csr, request):
|
|||
pecan.abort(500, "Could not finish all required modifications")
|
||||
|
||||
return args['csr']
|
||||
|
||||
|
||||
def sign(csr, ca_conf):
|
||||
"""Generate an X.509 certificate and sign it.
|
||||
|
||||
:param csr: X509 certificate signing request
|
||||
:param ca_conf: signing CA configuration
|
||||
:return: signed certificate in PEM format
|
||||
"""
|
||||
try:
|
||||
ca = certificate.X509Certificate.from_file(
|
||||
ca_conf['cert_path'])
|
||||
except Exception as e:
|
||||
raise SigningError("Cannot load the signing CA: %s" % (e,))
|
||||
|
||||
try:
|
||||
key = x509_utils.get_private_key_from_file(ca_conf['key_path'])
|
||||
except Exception as e:
|
||||
raise SigningError("Cannot load the signing CA key: %s" % (e,))
|
||||
|
||||
new_cert = certificate.X509Certificate()
|
||||
new_cert.set_version(2)
|
||||
|
||||
start_time = int(time.time())
|
||||
end_time = start_time + (ca_conf['valid_hours'] * 60 * 60)
|
||||
new_cert.set_not_before(start_time)
|
||||
new_cert.set_not_after(end_time)
|
||||
|
||||
new_cert.set_pubkey(pkey=csr.get_pubkey())
|
||||
new_cert.set_subject(csr.get_subject())
|
||||
new_cert.set_issuer(ca.get_subject())
|
||||
|
||||
serial = int(uuid.uuid4().hex, 16)
|
||||
new_cert.set_serial_number(serial)
|
||||
|
||||
exts = csr.get_extensions()
|
||||
|
||||
ext_i = 0
|
||||
for ext in exts:
|
||||
# this check is separate from standards validator - the signing backend
|
||||
# may know about more/fewer extensions than we do
|
||||
if ext.get_oid() not in extension.EXTENSION_CLASSES.keys():
|
||||
if ext.get_critical():
|
||||
logger.warning("CSR submitted with unknown extension oid %s, "
|
||||
"refusing to sign", ext.get_oid())
|
||||
raise SigningError("Unknown critical extension %s" % (
|
||||
ext.get_oid(),))
|
||||
else:
|
||||
logger.info("CSR submitted with non-critical unknown oid %s, "
|
||||
"not including extension", (ext.get_oid(),))
|
||||
else:
|
||||
logger.info("Adding certificate extension: %i %s", ext_i, str(ext))
|
||||
# authority id will be replaced with current signer
|
||||
# this cannot be a fixup, because they don't get access to the CA
|
||||
if isinstance(ext, extension.X509ExtensionAuthorityKeyId):
|
||||
continue
|
||||
|
||||
new_cert.add_extension(ext, ext_i)
|
||||
ext_i += 1
|
||||
|
||||
ca_exts = ca.get_extensions(extension.X509ExtensionSubjectKeyId)
|
||||
auth_key_id = extension.X509ExtensionAuthorityKeyId()
|
||||
if ca_exts:
|
||||
auth_key_id.set_key_id(ca_exts[0].get_key_id())
|
||||
else:
|
||||
auth_key_id.set_key_id(ca.get_key_id())
|
||||
new_cert.add_extension(auth_key_id, ext_i)
|
||||
|
||||
logger.info("Signing certificate for <%s> with serial <%s>",
|
||||
csr.get_subject(), serial)
|
||||
|
||||
new_cert.sign(key, ca_conf['signing_hash'])
|
||||
|
||||
cert_pem = new_cert.as_pem()
|
||||
|
||||
return cert_pem
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
class ConfigValidationException(Exception):
|
||||
pass
|
|
@ -0,0 +1,91 @@
|
|||
import logging
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from anchor.X509 import certificate
|
||||
from anchor.X509 import extension
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def config_validator(val):
|
||||
def patcher(f):
|
||||
setattr(f, "_config_validator", val)
|
||||
return f
|
||||
return patcher
|
||||
|
||||
|
||||
class SigningError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def sign_generic(csr, ca_conf, encryption, signer):
|
||||
"""Generate an X.509 certificate and sign it.
|
||||
|
||||
:param csr: X509 certificate signing request
|
||||
:param ca_conf: signing CA configuration
|
||||
:return: signed certificate in PEM format
|
||||
"""
|
||||
try:
|
||||
ca = certificate.X509Certificate.from_file(
|
||||
ca_conf['cert_path'])
|
||||
except Exception as e:
|
||||
raise SigningError("Cannot load the signing CA: %s" % (e,))
|
||||
|
||||
new_cert = certificate.X509Certificate()
|
||||
new_cert.set_version(2)
|
||||
|
||||
start_time = int(time.time())
|
||||
end_time = start_time + (ca_conf['valid_hours'] * 60 * 60)
|
||||
new_cert.set_not_before(start_time)
|
||||
new_cert.set_not_after(end_time)
|
||||
|
||||
new_cert.set_pubkey(pkey=csr.get_pubkey())
|
||||
new_cert.set_subject(csr.get_subject())
|
||||
new_cert.set_issuer(ca.get_subject())
|
||||
|
||||
serial = int(uuid.uuid4().hex, 16)
|
||||
new_cert.set_serial_number(serial)
|
||||
|
||||
exts = csr.get_extensions()
|
||||
|
||||
ext_i = 0
|
||||
for ext in exts:
|
||||
# this check is separate from standards validator - the signing backend
|
||||
# may know about more/fewer extensions than we do
|
||||
if ext.get_oid() not in extension.EXTENSION_CLASSES.keys():
|
||||
if ext.get_critical():
|
||||
logger.warning("CSR submitted with unknown extension oid %s, "
|
||||
"refusing to sign", ext.get_oid())
|
||||
raise SigningError("Unknown critical extension %s" % (
|
||||
ext.get_oid(),))
|
||||
else:
|
||||
logger.info("CSR submitted with non-critical unknown oid %s, "
|
||||
"not including extension", (ext.get_oid(),))
|
||||
else:
|
||||
logger.info("Adding certificate extension: %i %s", ext_i, str(ext))
|
||||
# authority id will be replaced with current signer
|
||||
# this cannot be a fixup, because they don't get access to the CA
|
||||
if isinstance(ext, extension.X509ExtensionAuthorityKeyId):
|
||||
continue
|
||||
|
||||
new_cert.add_extension(ext, ext_i)
|
||||
ext_i += 1
|
||||
|
||||
ca_exts = ca.get_extensions(extension.X509ExtensionSubjectKeyId)
|
||||
auth_key_id = extension.X509ExtensionAuthorityKeyId()
|
||||
if ca_exts:
|
||||
auth_key_id.set_key_id(ca_exts[0].get_key_id())
|
||||
else:
|
||||
auth_key_id.set_key_id(ca.get_key_id())
|
||||
new_cert.add_extension(auth_key_id, ext_i)
|
||||
|
||||
logger.info("Signing certificate for <%s> with serial <%s>",
|
||||
csr.get_subject(), serial)
|
||||
|
||||
new_cert.sign(encryption, ca_conf['signing_hash'], signer)
|
||||
|
||||
cert_pem = new_cert.as_pem()
|
||||
|
||||
return cert_pem
|
|
@ -0,0 +1,74 @@
|
|||
from cryptography.hazmat.primitives.asymmetric import dsa
|
||||
from cryptography.hazmat.primitives.asymmetric import padding
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
|
||||
from anchor import errors
|
||||
from anchor import signers
|
||||
from anchor import util
|
||||
from anchor.X509 import utils as x509_utils
|
||||
|
||||
|
||||
SIGNER_CONSTRUCTION = {
|
||||
('RSA', 'SHA224'): (lambda key: key.signer(padding.PKCS1v15(),
|
||||
hashes.SHA224())),
|
||||
('RSA', 'SHA256'): (lambda key: key.signer(padding.PKCS1v15(),
|
||||
hashes.SHA256())),
|
||||
('RSA', 'SHA384'): (lambda key: key.signer(padding.PKCS1v15(),
|
||||
hashes.SHA384())),
|
||||
('RSA', 'SHA512'): (lambda key: key.signer(padding.PKCS1v15(),
|
||||
hashes.SHA512())),
|
||||
('DSA', 'SHA224'): (lambda key: key.signer(hashes.SHA224())),
|
||||
('DSA', 'SHA256'): (lambda key: key.signer(hashes.SHA256())),
|
||||
}
|
||||
|
||||
|
||||
def conf_validator(name, ca_conf):
|
||||
# mandatory CA settings
|
||||
ca_config_requirements = ["cert_path", "key_path", "output_path",
|
||||
"signing_hash", "valid_hours"]
|
||||
|
||||
for requirement in ca_config_requirements:
|
||||
if requirement not in ca_conf.keys():
|
||||
raise errors.ConfigValidationException(
|
||||
"CA config missing: %s (for signing CA %s)" % (requirement,
|
||||
name))
|
||||
|
||||
# all are specified, check the CA certificate and key are readable with
|
||||
# sane permissions
|
||||
util.check_file_exists(ca_conf['cert_path'])
|
||||
util.check_file_exists(ca_conf['key_path'])
|
||||
|
||||
util.check_file_permissions(ca_conf['key_path'])
|
||||
|
||||
|
||||
def make_signer(key, encryption, md):
|
||||
signer = SIGNER_CONSTRUCTION.get((encryption, md.upper()))
|
||||
if signer is None:
|
||||
raise signers.SigningError(
|
||||
"Unknown hash/encryption combination (%s/%s)" % (md, encryption))
|
||||
signer = signer(key)
|
||||
|
||||
def cryptography_io_signer(to_sign):
|
||||
signer.update(to_sign)
|
||||
return signer.finalize()
|
||||
|
||||
return cryptography_io_signer
|
||||
|
||||
|
||||
@signers.config_validator(conf_validator)
|
||||
def sign(csr, ca_conf):
|
||||
try:
|
||||
key = x509_utils.get_private_key_from_file(ca_conf['key_path'])
|
||||
except Exception as e:
|
||||
raise signers.SigningError("Cannot load the signing CA key: %s" % (e,))
|
||||
|
||||
if isinstance(key, rsa.RSAPrivateKey):
|
||||
encryption = 'RSA'
|
||||
elif isinstance(key, dsa.DSAPrivateKey):
|
||||
encryption = 'DSA'
|
||||
else:
|
||||
raise signers.SigningError("Unknown key type: %s" % (key.__class__,))
|
||||
|
||||
signer = make_signer(key, encryption, ca_conf['signing_hash'])
|
||||
return signers.sign_generic(csr, ca_conf, encryption, signer)
|
|
@ -15,7 +15,11 @@ from __future__ import absolute_import
|
|||
|
||||
import base64
|
||||
import hmac
|
||||
import os
|
||||
import re
|
||||
import stat
|
||||
|
||||
from anchor import errors
|
||||
|
||||
|
||||
def constant_time_compare(val1, val2):
|
||||
|
@ -100,3 +104,20 @@ def extract_pem(data, use_markers=True):
|
|||
return None
|
||||
decoder = getattr(base64, 'decodebytes', base64.decodestring)
|
||||
return decoder(b64_content)
|
||||
|
||||
|
||||
def check_file_permissions(path):
|
||||
# checks that file is owner readable only
|
||||
expected_permissions = (stat.S_IRUSR | stat.S_IFREG) # 0o100400
|
||||
st = os.stat(path)
|
||||
if st.st_mode != expected_permissions:
|
||||
raise errors.ConfigValidationException("CA file: %s has incorrect "
|
||||
"permissions set, expected "
|
||||
"owner readable only" % path)
|
||||
|
||||
|
||||
def check_file_exists(path):
|
||||
if not (os.path.isfile(path) and
|
||||
os.access(path, os.R_OK)):
|
||||
raise errors.ConfigValidationException("could not read file: %s" %
|
||||
path)
|
||||
|
|
|
@ -53,9 +53,11 @@ Backends are simple functions which need to take 2 parameters: the CSR in PEM
|
|||
format and the configuration block contents. Configuration can contain any keys
|
||||
required by the backend.
|
||||
|
||||
The return value must be a signed certificate in PEM format. The backend may
|
||||
either throw a specific ``WebOb`` HTTP exception, or any other exception which
|
||||
will result in a generic 500 response.
|
||||
The return value must be a signed certificate in PEM format, however in most
|
||||
cases it's enough to implement the actual hash signing part and rely on
|
||||
``anchor.signer.sign_generic`` framework. The backend may either throw a
|
||||
specific ``WebOb`` HTTP exception, or SigningError exception which will result
|
||||
in a 500 response.
|
||||
|
||||
For security, http exceptions from the signing backend should not expose any
|
||||
specific information about the reason for failure. Internal exceptions are
|
||||
|
@ -66,3 +68,6 @@ are applied to the submitted CSR in Anchor, they will invalidate the signature.
|
|||
Unless the backend is intended to work only with validators, and not any fixup
|
||||
operations in the future, the signature field should be ignored and the request
|
||||
treated as already correct/verified.
|
||||
|
||||
Configuration is verified using the function provided using the
|
||||
``@signers.config_validator(f)`` decorator.
|
||||
|
|
|
@ -28,7 +28,7 @@ source-dir = doc/source
|
|||
|
||||
[entry_points]
|
||||
anchor.signing_backends =
|
||||
anchor = anchor.certificate_ops:sign
|
||||
anchor = anchor.signers.cryptography_io:sign
|
||||
|
||||
anchor.validators =
|
||||
check_domains = anchor.validators.custom:check_domains
|
||||
|
|
|
@ -26,7 +26,6 @@ from anchor.X509 import certificate
|
|||
from anchor.X509 import errors as x509_errors
|
||||
from anchor.X509 import extension
|
||||
from anchor.X509 import name as x509_name
|
||||
from anchor.X509 import utils
|
||||
|
||||
|
||||
class TestX509Cert(unittest.TestCase):
|
||||
|
@ -239,17 +238,6 @@ class TestX509Cert(unittest.TestCase):
|
|||
with self.assertRaises(x509_errors.X509Error):
|
||||
self.cert.get_fingerprint('no_such_hash')
|
||||
|
||||
def test_sign_bad_md(self):
|
||||
key = utils.get_private_key_from_pem(self.key_rsa_data)
|
||||
self.assertRaises(x509_errors.X509Error,
|
||||
self.cert.sign,
|
||||
key, "BAD")
|
||||
|
||||
def test_sign_bad_key(self):
|
||||
self.assertRaises(x509_errors.X509Error,
|
||||
self.cert.sign,
|
||||
"BAD")
|
||||
|
||||
def test_get_version(self):
|
||||
v = self.cert.get_version()
|
||||
self.assertEqual(v, 2)
|
||||
|
@ -291,16 +279,6 @@ class TestX509Cert(unittest.TestCase):
|
|||
with self.assertRaises(x509_errors.X509Error):
|
||||
self.cert.add_extension("abcdef", 2)
|
||||
|
||||
def test_sign_unknown_key(self):
|
||||
key = object()
|
||||
with self.assertRaises(x509_errors.X509Error):
|
||||
self.cert.sign(key, 'sha1')
|
||||
|
||||
def test_sign_unknown_hash(self):
|
||||
key = utils.get_private_key_from_pem(self.key_rsa_data)
|
||||
with self.assertRaises(x509_errors.X509Error):
|
||||
self.cert.sign(key, 'no_such_hash')
|
||||
|
||||
def test_verify_unknown_key(self):
|
||||
with self.assertRaises(x509_errors.X509Error):
|
||||
self.cert.verify("abc")
|
||||
|
|
|
@ -21,6 +21,7 @@ import unittest
|
|||
import mock
|
||||
from pyasn1_modules import rfc2459
|
||||
|
||||
from anchor.signers import cryptography_io
|
||||
from anchor.X509 import errors as x509_errors
|
||||
from anchor.X509 import extension
|
||||
from anchor.X509 import name as x509_name
|
||||
|
@ -188,7 +189,8 @@ class TestX509Csr(tests.DefaultRequestMixin, unittest.TestCase):
|
|||
|
||||
def test_sign(self):
|
||||
key = utils.get_private_key_from_pem(self.key_rsa_data)
|
||||
self.csr.sign(key)
|
||||
signer = cryptography_io.make_signer(key, 'RSA', 'SHA256')
|
||||
self.csr.sign('RSA', 'SHA256', signer)
|
||||
# 10 bytes is definitely enough for non malicious case, right?
|
||||
self.assertEqual(b'\x16\xbd!\x9b\xfb\xfd\x10\xa1\xaf\x92',
|
||||
self.csr._get_signature()[:10])
|
||||
|
|
|
@ -22,7 +22,9 @@ import unittest
|
|||
import mock
|
||||
|
||||
from anchor import app
|
||||
from anchor import errors
|
||||
from anchor import jsonloader
|
||||
from anchor import util
|
||||
import tests
|
||||
|
||||
|
||||
|
@ -39,8 +41,8 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
|
|||
def test_self_test(self):
|
||||
self.assertTrue(True)
|
||||
|
||||
@mock.patch('anchor.app._check_file_exists')
|
||||
@mock.patch('anchor.app._check_file_permissions')
|
||||
@mock.patch('anchor.util.check_file_exists')
|
||||
@mock.patch('anchor.util.check_file_permissions')
|
||||
def test_config_check_domains_good(self, a, b):
|
||||
self.sample_conf_ra['default_ra']['validators'] = {
|
||||
"common_name": {
|
||||
|
@ -54,8 +56,8 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
|
|||
with mock.patch("os.stat", **config):
|
||||
self.assertEqual(app.validate_config(jsonloader.conf), None)
|
||||
|
||||
@mock.patch('anchor.app._check_file_exists')
|
||||
@mock.patch('anchor.app._check_file_permissions')
|
||||
@mock.patch('anchor.util.check_file_exists')
|
||||
@mock.patch('anchor.util.check_file_permissions')
|
||||
def test_config_check_domains_bad(self, a, b):
|
||||
self.sample_conf_ra['default_ra']['validators'] = {
|
||||
"common_name": {
|
||||
|
@ -68,7 +70,7 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
|
|||
config = {'return_value.st_mode': (stat.S_IRUSR | stat.S_IFREG)}
|
||||
with mock.patch("os.stat", **config):
|
||||
self.assertRaises(
|
||||
app.ConfigValidationException,
|
||||
errors.ConfigValidationException,
|
||||
app.validate_config,
|
||||
jsonloader.conf
|
||||
)
|
||||
|
@ -76,13 +78,13 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
|
|||
def test_check_file_permissions_good(self):
|
||||
config = {'return_value.st_mode': (stat.S_IRUSR | stat.S_IFREG)}
|
||||
with mock.patch("os.stat", **config):
|
||||
app._check_file_permissions("/mock/path")
|
||||
util.check_file_permissions("/mock/path")
|
||||
|
||||
def test_check_file_permissions_bad(self):
|
||||
config = {'return_value.st_mode': (stat.S_IWOTH | stat.S_IFREG)}
|
||||
with mock.patch("os.stat", **config):
|
||||
self.assertRaises(app.ConfigValidationException,
|
||||
app._check_file_permissions, "/mock/path")
|
||||
self.assertRaises(errors.ConfigValidationException,
|
||||
util.check_file_permissions, "/mock/path")
|
||||
|
||||
def test_validate_old_config(self):
|
||||
config = json.dumps({
|
||||
|
@ -91,45 +93,45 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
|
|||
"validators": {},
|
||||
})
|
||||
jsonloader.conf.load_str_data(config)
|
||||
self.assertRaisesRegexp(app.ConfigValidationException,
|
||||
self.assertRaisesRegexp(errors.ConfigValidationException,
|
||||
"old version of Anchor",
|
||||
app.validate_config, jsonloader.conf)
|
||||
|
||||
@mock.patch('anchor.app._check_file_permissions')
|
||||
@mock.patch('anchor.util.check_file_permissions')
|
||||
def test_validate_config_no_registration_authorities(self,
|
||||
mock_check_perm):
|
||||
del self.sample_conf['registration_authority']
|
||||
config = json.dumps(self.sample_conf)
|
||||
jsonloader.conf.load_str_data(config)
|
||||
self.assertRaisesRegexp(app.ConfigValidationException,
|
||||
self.assertRaisesRegexp(errors.ConfigValidationException,
|
||||
"No registration authorities present",
|
||||
app.validate_config, jsonloader.conf)
|
||||
|
||||
@mock.patch('anchor.app._check_file_permissions')
|
||||
@mock.patch('anchor.util.check_file_permissions')
|
||||
def test_validate_config_no_auth(self, mock_check_perm):
|
||||
del self.sample_conf['authentication']
|
||||
config = json.dumps(self.sample_conf)
|
||||
jsonloader.conf.load_str_data(config)
|
||||
self.assertRaisesRegexp(app.ConfigValidationException,
|
||||
self.assertRaisesRegexp(errors.ConfigValidationException,
|
||||
"No authentication methods present",
|
||||
app.validate_config, jsonloader.conf)
|
||||
|
||||
@mock.patch('anchor.app._check_file_permissions')
|
||||
@mock.patch('anchor.util.check_file_permissions')
|
||||
def test_validate_config_no_auth_backend(self, mock_check_perm):
|
||||
del self.sample_conf_auth['default_auth']['backend']
|
||||
config = json.dumps(self.sample_conf)
|
||||
jsonloader.conf.load_str_data(config)
|
||||
self.assertRaisesRegexp(app.ConfigValidationException,
|
||||
self.assertRaisesRegexp(errors.ConfigValidationException,
|
||||
"Authentication method .* doesn't define "
|
||||
"backend",
|
||||
app.validate_config, jsonloader.conf)
|
||||
|
||||
@mock.patch('anchor.app._check_file_permissions')
|
||||
@mock.patch('anchor.util.check_file_permissions')
|
||||
def test_validate_config_no_ra_auth(self, mock_check_perm):
|
||||
del self.sample_conf_ra['default_ra']['authentication']
|
||||
config = json.dumps(self.sample_conf)
|
||||
jsonloader.conf.load_str_data(config)
|
||||
self.assertRaisesRegexp(app.ConfigValidationException,
|
||||
self.assertRaisesRegexp(errors.ConfigValidationException,
|
||||
"No authentication .* for .* default_ra",
|
||||
app.validate_config, jsonloader.conf)
|
||||
|
||||
|
@ -137,20 +139,20 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
|
|||
del self.sample_conf['signing_ca']
|
||||
config = json.dumps(self.sample_conf)
|
||||
jsonloader.conf.load_str_data(config)
|
||||
self.assertRaisesRegexp(app.ConfigValidationException,
|
||||
self.assertRaisesRegexp(errors.ConfigValidationException,
|
||||
"No signing CA configurations present",
|
||||
app.validate_config, jsonloader.conf)
|
||||
|
||||
@mock.patch('anchor.app._check_file_permissions')
|
||||
@mock.patch('anchor.util.check_file_permissions')
|
||||
def test_validate_config_no_ra_ca(self, mock_check_perm):
|
||||
del self.sample_conf_ra['default_ra']['signing_ca']
|
||||
config = json.dumps(self.sample_conf)
|
||||
jsonloader.conf.load_str_data(config)
|
||||
self.assertRaisesRegexp(app.ConfigValidationException,
|
||||
self.assertRaisesRegexp(errors.ConfigValidationException,
|
||||
"No signing CA .* for .* default_ra",
|
||||
app.validate_config, jsonloader.conf)
|
||||
|
||||
@mock.patch('anchor.app._check_file_permissions')
|
||||
@mock.patch('anchor.util.check_file_permissions')
|
||||
def test_validate_config_ca_config_reqs(self, mock_check_perm):
|
||||
ca_config_requirements = ["cert_path", "key_path", "output_path",
|
||||
"signing_hash", "valid_hours"]
|
||||
|
@ -162,7 +164,7 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
|
|||
# with 'missing_req', perform validation. Each should raise in turn
|
||||
for req in ca_config_requirements:
|
||||
jsonloader.conf.load_str_data(config.replace(req, "missing_req"))
|
||||
self.assertRaisesRegexp(app.ConfigValidationException,
|
||||
self.assertRaisesRegexp(errors.ConfigValidationException,
|
||||
"CA config missing: %s" % req,
|
||||
app.validate_config, jsonloader.conf)
|
||||
|
||||
|
@ -171,11 +173,11 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
|
|||
config = json.dumps(self.sample_conf)
|
||||
jsonloader.conf.load_str_data(config)
|
||||
isfile.return_value = False
|
||||
self.assertRaisesRegexp(app.ConfigValidationException,
|
||||
self.assertRaisesRegexp(errors.ConfigValidationException,
|
||||
"could not read file: tests/CA/root-ca.crt",
|
||||
app.validate_config, jsonloader.conf)
|
||||
|
||||
@mock.patch('anchor.app._check_file_permissions')
|
||||
@mock.patch('anchor.util.check_file_permissions')
|
||||
@mock.patch('os.path.isfile')
|
||||
@mock.patch('os.access')
|
||||
@mock.patch('os.stat')
|
||||
|
@ -187,11 +189,11 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
|
|||
isfile.return_value = True
|
||||
access.return_value = True
|
||||
stat.return_value.st_mode = self.expected_key_permissions
|
||||
self.assertRaisesRegexp(app.ConfigValidationException,
|
||||
self.assertRaisesRegexp(errors.ConfigValidationException,
|
||||
"No validators configured",
|
||||
app.validate_config, jsonloader.conf)
|
||||
|
||||
@mock.patch('anchor.app._check_file_permissions')
|
||||
@mock.patch('anchor.util.check_file_permissions')
|
||||
@mock.patch('os.path.isfile')
|
||||
@mock.patch('os.access')
|
||||
@mock.patch('os.stat')
|
||||
|
@ -203,13 +205,13 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
|
|||
isfile.return_value = True
|
||||
access.return_value = True
|
||||
stat.return_value.st_mode = self.expected_key_permissions
|
||||
with self.assertRaises(app.ConfigValidationException,
|
||||
with self.assertRaises(errors.ConfigValidationException,
|
||||
msg="Unknown validator <unknown_validator> "
|
||||
"found (for registration authority "
|
||||
"default)"):
|
||||
app.validate_config(jsonloader.conf)
|
||||
|
||||
@mock.patch('anchor.app._check_file_permissions')
|
||||
@mock.patch('anchor.util.check_file_permissions')
|
||||
@mock.patch('os.path.isfile')
|
||||
@mock.patch('os.access')
|
||||
@mock.patch('os.stat')
|
||||
|
|
|
@ -14,14 +14,17 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import textwrap
|
||||
import unittest
|
||||
|
||||
from pyasn1.type import univ as asn1_univ
|
||||
|
||||
from anchor import certificate_ops
|
||||
from anchor import signers
|
||||
from anchor.signers import cryptography_io
|
||||
from anchor.X509 import certificate
|
||||
from anchor.X509 import extension
|
||||
from anchor.X509 import signing_request
|
||||
from anchor.X509 import utils
|
||||
import tests
|
||||
|
||||
|
||||
|
@ -38,7 +41,8 @@ class SigningBackendExtensions(tests.DefaultConfigMixin,
|
|||
ext.add_dns_id("example.com")
|
||||
csr.add_extension(ext)
|
||||
|
||||
pem = certificate_ops.sign(csr, self.sample_conf_ca['default_ca'])
|
||||
pem = signers.sign_generic(csr, self.sample_conf_ca['default_ca'],
|
||||
'RSA', lambda x: b"")
|
||||
cert = certificate.X509Certificate.from_buffer(pem)
|
||||
self.assertEqual(1, len(cert.get_extensions(
|
||||
extension.X509ExtensionSubjectAltName)))
|
||||
|
@ -48,7 +52,8 @@ class SigningBackendExtensions(tests.DefaultConfigMixin,
|
|||
ext = UnknownExtension()
|
||||
csr.add_extension(ext)
|
||||
|
||||
pem = certificate_ops.sign(csr, self.sample_conf_ca['default_ca'])
|
||||
pem = signers.sign_generic(csr, self.sample_conf_ca['default_ca'],
|
||||
'RSA', lambda x: b"")
|
||||
cert = certificate.X509Certificate.from_buffer(pem)
|
||||
self.assertEqual(0, len(cert.get_extensions(UnknownExtension)))
|
||||
|
||||
|
@ -58,5 +63,35 @@ class SigningBackendExtensions(tests.DefaultConfigMixin,
|
|||
ext.set_critical(True)
|
||||
csr.add_extension(ext)
|
||||
|
||||
with self.assertRaises(certificate_ops.SigningError):
|
||||
certificate_ops.sign(csr, self.sample_conf_ca['default_ca'])
|
||||
with self.assertRaises(signers.SigningError):
|
||||
signers.sign_generic(csr, self.sample_conf_ca['default_ca'],
|
||||
'RSA', lambda x: b"")
|
||||
|
||||
|
||||
class TestCryptographyBackend(tests.DefaultConfigMixin,
|
||||
tests.DefaultRequestMixin, unittest.TestCase):
|
||||
key_rsa_data = textwrap.dedent("""
|
||||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIICXAIBAAKBgQCeeqg1Qeccv8hqj1BP9KEJX5QsFCxR62M8plPb5t4sLo8UYfZd
|
||||
6kFLcOP8xzwwvx/eFY6Sux52enQ197o8aMwyP77hMhZqtd8NCgLJMVlUbRhwLti0
|
||||
SkHFPic0wAg+esfXa6yhd5TxC+bti7MgV/ljA80XQxHH8xOjdOoGN0DHfQIDAQAB
|
||||
AoGBAJ2ozJpe+7qgGJPaCz3f0izvBwtq7kR49fqqRZbo8HHnx7OxWVVI7LhOkKEy
|
||||
2/Bq0xsvOu1CdiXL4LynvIDIiQqLaeINzG48Rbk+0HadbXblt3nDkIWdYII6zHKI
|
||||
W9ewX4KpHEPbrlEO9BjAlAcYsDIvFIMYpQhtQ+0R/gmZ99WJAkEAz5C2a6FIcMbE
|
||||
o3aTc9ECq99zY7lxh+6aLpUdIeeHyb/QzfGDBdlbpBAkA6EcxSqp0aqH4xIQnYHa
|
||||
3P5ZCShqSwJBAMN1sb76xq94xkg2cxShPFPAE6xKRFyKqLgsBYVtulOdfOtOnjh9
|
||||
1SK2XQQfBRIRdG4Q/gDoCP8XQHpJcWMk+FcCQDnuJqulaOVo5GrG5mJ1nCxCAh98
|
||||
G06X7lo/7dCPoRtSuMExvaK9RlFk29hTeAcjYCAPWzupyA9dtarmJg1jRT8CQCKf
|
||||
gYnb8D/6+9yk0IPR/9ayCooVacCeyz48hgnZowzWs98WwQ4utAd/GED3obVOpDov
|
||||
Bl9wus889i3zPoOac+cCQCZHredQcJGd4dlthbVtP2NhuPXz33JuETGR9pXtsDUZ
|
||||
uX/nSq1oo9kUh/dPOz6aP5Ues1YVe3LExmExPBQfwIE=
|
||||
-----END RSA PRIVATE KEY-----""").encode('ascii')
|
||||
|
||||
def test_sign_bad_md(self):
|
||||
key = utils.get_private_key_from_pem(self.key_rsa_data)
|
||||
with self.assertRaises(signers.SigningError):
|
||||
cryptography_io.make_signer(key, "BAD", "RSA")
|
||||
|
||||
def test_sign_bad_key(self):
|
||||
with self.assertRaises(signers.SigningError):
|
||||
cryptography_io.make_signer("BAD", "sha256", "RSA")
|
||||
|
|
Loading…
Reference in New Issue