Add support for CMC requests

Incoming CMC requests should be stripped of all wrappers, then the internal
pkcs10 request is processed as usual. No verification is done on the SignedData
wrapper, because there's no known certificate to trust.

Response is just the bare certificate for now.

Change-Id: I92c76df775e5f339ac2fae95582097e3afe138af
This commit is contained in:
Stanisław Pitucha 2016-01-15 17:32:09 +11:00 committed by Stanislaw Pitucha
parent a309748be9
commit 7f51b08ea3
11 changed files with 169 additions and 43 deletions

View File

@ -19,15 +19,15 @@ import io
from pyasn1.codec.der import decoder
from pyasn1.codec.der import encoder
from pyasn1.type import univ as asn1_univ
from pyasn1_modules import pem
from anchor.asn1 import rfc5280
from anchor.asn1 import rfc6402
from anchor import util
from anchor.X509 import errors
from anchor.X509 import extension
from anchor.X509 import name
from anchor.X509 import signature
from anchor.X509 import utils
from anchor.X509 import utils as x509_utils
OID_extensionRequest = asn1_univ.ObjectIdentifier('1.2.840.113549.1.9.14')
@ -47,34 +47,40 @@ class X509Csr(signature.SignatureMixin):
self._csr = csr
@staticmethod
def from_open_file(f):
def from_open_file(f, encoding='pem'):
if encoding == 'pem':
try:
der_content = util.extract_pem(f.read())
except Exception:
raise X509CsrError("Data not in PEM format")
elif encoding == 'der':
der_content = f.read()
else:
raise X509CsrError("Unknown encoding")
try:
der_content = pem.readPemFromFile(
f, startMarker='-----BEGIN CERTIFICATE REQUEST-----',
endMarker='-----END CERTIFICATE REQUEST-----')
csr = decoder.decode(der_content,
asn1Spec=rfc6402.CertificationRequest())[0]
return X509Csr(csr)
except Exception:
raise X509CsrError("Could not read X509 certificate from "
"PEM data.")
raise X509CsrError("Could not read X509 certificate from data.")
@staticmethod
def from_buffer(data):
def from_buffer(data, encoding='pem'):
"""Create this CSR from a buffer
:param data: The data buffer
"""
return X509Csr.from_open_file(io.StringIO(data))
return X509Csr.from_open_file(io.BytesIO(data), encoding)
@staticmethod
def from_file(path):
def from_file(path, encoding='pem'):
"""Create this CSR from a file on disk
:param path: Path to the file on disk
"""
with open(path, 'r') as f:
return X509Csr.from_open_file(f)
return X509Csr.from_open_file(f, encoding)
def get_pubkey(self):
"""Get the public key from the CSR
@ -207,7 +213,7 @@ class X509Csr(signature.SignatureMixin):
return self._get_signing_algorithm()
def _get_signature(self):
return utils.bin_to_bytes(self._csr['signature'])
return x509_utils.bin_to_bytes(self._csr['signature'])
def _get_signing_algorithm(self):
return self._csr['signatureAlgorithm']['algorithm']
@ -215,7 +221,7 @@ class X509Csr(signature.SignatureMixin):
def _get_public_key(self):
csr_info = self._csr['certificationRequestInfo']
key_info = csr_info['subjectPublicKeyInfo']
return utils.get_public_key_from_der(encoder.encode(key_info))
return x509_utils.get_public_key_from_der(encoder.encode(key_info))
def _get_bytes_to_sign(self):
return encoder.encode(self._csr['certificationRequestInfo'])

View File

@ -21,12 +21,14 @@ import uuid
import pecan
from webob import exc as http_status
from anchor import cmc
from anchor import jsonloader
from anchor import util
from anchor import validation
from anchor.X509 import certificate
from anchor.X509 import extension
from anchor.X509 import signing_request
from anchor.X509 import utils
from anchor.X509 import utils as x509_utils
logger = logging.getLogger(__name__)
@ -41,10 +43,10 @@ class SigningError(Exception):
pass
def parse_csr(csr, encoding):
def parse_csr(data, encoding):
"""Loads the user provided CSR into the backend X509 library.
:param csr: CSR as provided by the API user
:param data: CSR as provided by the API user
:param encoding: encoding for the CSR (must be PEM today)
:return: CSR object from backend X509 library or aborts
"""
@ -53,17 +55,27 @@ def parse_csr(csr, encoding):
logger.error("parse_csr failed: bad encoding ({})".format(encoding))
pecan.abort(400, "invalid CSR")
if csr is None:
if data is None:
logger.error("parse_csr failed: missing CSR")
pecan.abort(400, "invalid CSR")
# load the CSR into the backend X509 library
# get DER version
der = util.extract_pem(data.encode('ascii'))
if der is None:
logger.error("perse_csr failed: PEM contentents not found")
pecan.abort(400, "PEM contents not found")
# try to unpack the certificate from CMC wrappers
try:
out_req = signing_request.X509Csr.from_buffer(csr)
return out_req
except Exception as e:
logger.exception("Exception while parsing the CSR: %s", e)
pecan.abort(400, "CSR cannot be parsed")
csr = cmc.parse_request(der)
return signing_request.X509Csr(csr)
except cmc.CMCParsingError:
# it's not CMC data, that's fine, it's likely the CSR itself
try:
return signing_request.X509Csr.from_buffer(der, 'der')
except Exception as e:
logger.exception("Exception while parsing the CSR: %s", e)
pecan.abort(400, "CSR cannot be parsed")
def validate_csr(ra_name, auth_result, csr, request):
@ -208,7 +220,7 @@ def sign(csr, ca_conf):
raise SigningError("Cannot load the signing CA: %s" % (e,))
try:
key = utils.get_private_key_from_file(ca_conf['key_path'])
key = x509_utils.get_private_key_from_file(ca_conf['key_path'])
except Exception as e:
raise SigningError("Cannot load the signing CA key: %s" % (e,))

80
anchor/cmc.py Normal file
View File

@ -0,0 +1,80 @@
from anchor.asn1 import rfc5652
from anchor.asn1 import rfc6402
from pyasn1.codec.der import decoder
from pyasn1 import error
class CMCParsingError(Exception):
pass
class UnexpectedContentType(CMCParsingError):
def __init__(self, content_type):
self.content_type = content_type
def __str__(self):
return "Unexpected content type, got %s" % self.content_type
def _unwrap_signed_data(data):
# Since we don't have trust with anyone signing the requests, this
# signature is not relevant. The request itself is self-signed which
# stops accidents.
result = decoder.decode(data, rfc5652.SignedData())[0]
return _unwrap_generic(
result['encapContentInfo']['eContentType'],
result['encapContentInfo']['eContent'])
def _unwrap_content_info(data):
result = decoder.decode(data, rfc5652.ContentInfo())[0]
return _unwrap_generic(result['contentType'], result['content'])
def _unwrap_generic(content_type, data):
unwrapper = CONTENT_TYPES.get(content_type)
if unwrapper is None:
return (content_type, data)
return unwrapper(data)
def strip_wrappers(data):
# assume the outer wrapper is contentinfo
return _unwrap_content_info(data)
CONTENT_TYPES = {
rfc5652.id_ct_contentInfo: _unwrap_content_info,
rfc5652.id_signedData: _unwrap_signed_data,
}
def parse_request(data):
try:
content_type, data = strip_wrappers(data)
except error.PyAsn1Error:
raise CMCParsingError("Cannot find valid CMC wrapper")
if content_type != rfc6402.id_cct_PKIData:
raise UnexpectedContentType(content_type)
pd = decoder.decode(data, rfc6402.PKIData())[0]
if len(pd['reqSequence']) == 0:
raise CMCParsingError("No certificate requests")
if len(pd['reqSequence']) > 1:
raise CMCParsingError("Can't handle multiple certificates")
req = pd['reqSequence'][0]
if req.getName() != 'tcr':
raise CMCParsingError("Can handle only tagged cert requests")
return req['tcr']['certificationRequest']
if __name__ == "__main__":
import sys
with open(sys.argv[1], 'rb') as f:
data = f.read()
cert_req = parse_request(data)
print(cert_req.prettyPrint())

View File

@ -13,6 +13,7 @@
from __future__ import absolute_import
import base64
import hmac
import re
@ -74,3 +75,28 @@ def verify_domain(domain, allow_wildcards=False):
raise ValueError(
"domain <%s> contains invalid characters "
"(RFC1034/3.5)" % (domain,))
def extract_pem(data, use_markers=True):
"""Extract and unpack PEM data
Anything between the BEGIN and END lines will be unpacked using base64. The
specific BEGIN/END content name is ignored since it's not standard anyway.
"""
if not isinstance(data, bytes):
raise TypeError("data must be bytes")
lines = data.splitlines()
seen_start = not use_markers
b64_content = b""
for line in lines:
if line.startswith(b"-----END ") and line.endswith(b"-----"):
break
if seen_start:
b64_content += line
if line.startswith(b"-----BEGIN ") and line.endswith(b"-----"):
seen_start = True
if not b64_content:
return None
decoder = getattr(base64, 'decodebytes', base64.decodestring)
return decoder(b64_content)

View File

@ -49,7 +49,8 @@ class TestX509Csr(tests.DefaultRequestMixin, unittest.TestCase):
def setUp(self):
super(TestX509Csr, self).setUp()
self.csr = signing_request.X509Csr.from_buffer(TestX509Csr.csr_sample)
self.csr = signing_request.X509Csr.from_buffer(
TestX509Csr.csr_sample_bytes)
def tearDown(self):
pass
@ -87,7 +88,7 @@ class TestX509Csr(tests.DefaultRequestMixin, unittest.TestCase):
def test_read_from_file(self):
open_name = 'anchor.X509.signing_request.open'
f = io.StringIO(self.csr_sample)
f = io.BytesIO(self.csr_sample_bytes)
with mock.patch(open_name, create=True) as mock_open:
mock_open.return_value = f
csr = signing_request.X509Csr.from_file("some_path")
@ -98,8 +99,8 @@ class TestX509Csr(tests.DefaultRequestMixin, unittest.TestCase):
def test_bad_data_throws(self):
bad_data = (
u"some bad data is "
"EHRlc3RAYW5jaG9yLnRlc3QwTDANBgkqhkiG9w0BAQEFAAM7ADA4AjEA6m")
b"some bad data is "
b"EHRlc3RAYW5jaG9yLnRlc3QwTDANBgkqhkiG9w0BAQEFAAM7ADA4AjEA6m")
csr = signing_request.X509Csr()
self.assertRaises(x509_errors.X509Error,

View File

@ -79,7 +79,7 @@ class DefaultRequestMixin(object):
# CN=server1.example.com
# 2048 RSA, basicConstraints, keyUsage exts
csr_sample_cn = 'server1.example.com'
csr_sample = textwrap.dedent(u"""
csr_sample = textwrap.dedent("""
-----BEGIN CERTIFICATE REQUEST-----
MIIDDjCCAfYCAQAwgZwxCzAJBgNVBAYTAlVLMQ8wDQYDVQQIEwZOYXJuaWExEjAQ
BgNVBAcTCUZ1bmt5dG93bjEXMBUGA1UEChMOQW5jaG9yIFRlc3RpbmcxEDAOBgNV
@ -99,3 +99,4 @@ class DefaultRequestMixin(object):
DxpZNBHlkA6LWaRqAtWws3uvom7IjHGgSr7UITrOR5iO5Hrm85X7K0AT6Bu75RZL
+uYLLfj9Nb/iznREl9E3a/fN
-----END CERTIFICATE REQUEST-----""")
csr_sample_bytes = csr_sample.encode('ascii')

View File

@ -32,7 +32,7 @@ class TestFixupFunctionality(tests.DefaultConfigMixin,
super(TestFixupFunctionality, self).setUp()
jsonloader.conf.load_extensions()
self.csr = signing_request.X509Csr.from_buffer(
TestFixupFunctionality.csr_sample)
TestFixupFunctionality.csr_sample_bytes)
def test_with_noop(self):
"""Ensure single fixup is processed."""

View File

@ -33,7 +33,7 @@ class UnknownExtension(extension.X509Extension):
class SigningBackendExtensions(tests.DefaultConfigMixin,
tests.DefaultRequestMixin, unittest.TestCase):
def test_copy_good_extensions(self):
csr = signing_request.X509Csr.from_buffer(self.csr_sample)
csr = signing_request.X509Csr.from_buffer(self.csr_sample_bytes)
ext = extension.X509ExtensionSubjectAltName()
ext.add_dns_id("example.com")
csr.add_extension(ext)
@ -44,7 +44,7 @@ class SigningBackendExtensions(tests.DefaultConfigMixin,
extension.X509ExtensionSubjectAltName)))
def test_ignore_unknown_extensions(self):
csr = signing_request.X509Csr.from_buffer(self.csr_sample)
csr = signing_request.X509Csr.from_buffer(self.csr_sample_bytes)
ext = UnknownExtension()
csr.add_extension(ext)
@ -53,7 +53,7 @@ class SigningBackendExtensions(tests.DefaultConfigMixin,
self.assertEqual(2, len(cert.get_extensions()))
def test_fail_critical_unknown_extensions(self):
csr = signing_request.X509Csr.from_buffer(self.csr_sample)
csr = signing_request.X509Csr.from_buffer(self.csr_sample_bytes)
ext = UnknownExtension()
ext.set_critical(True)
csr.add_extension(ext)

View File

@ -29,7 +29,7 @@ class TestBaseValidators(tests.DefaultRequestMixin, unittest.TestCase):
def setUp(self):
super(TestBaseValidators, self).setUp()
self.csr = signing_request.X509Csr.from_buffer(
self.csr_sample)
self.csr_sample_bytes)
def tearDown(self):
super(TestBaseValidators, self).tearDown()

View File

@ -555,11 +555,11 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase):
)
def test_csr_signature(self):
csr = x509_csr.X509Csr.from_buffer(self.csr_sample)
csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes)
self.assertIsNone(custom.csr_signature(csr=csr))
def test_csr_signature_bad_sig(self):
csr = x509_csr.X509Csr.from_buffer(self.csr_sample)
csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes)
with mock.patch.object(x509_csr.X509Csr, '_get_signature',
return_value=(b'A'*49)):
with self.assertRaisesRegexp(errors.ValidationError,
@ -567,7 +567,7 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase):
custom.csr_signature(csr=csr)
def test_csr_signature_bad_algo(self):
csr = x509_csr.X509Csr.from_buffer(self.csr_sample)
csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes)
with mock.patch.object(x509_csr.X509Csr, '_get_signing_algorithm',
return_value=rfc2459.id_dsa_with_sha1):
with self.assertRaisesRegexp(errors.ValidationError,
@ -575,7 +575,7 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase):
custom.csr_signature(csr=csr)
def test_public_key_good_rsa(self):
csr = x509_csr.X509Csr.from_buffer(self.csr_sample)
csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes)
self.assertIsNone(custom.public_key(csr=csr,
allowed_keys={'RSA': 1024}))
@ -595,18 +595,18 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase):
dsa_key_der = base64.b64decode(dsa_key_pem)
spki = decoder.decode(dsa_key_der,
asn1Spec=rfc5280.SubjectPublicKeyInfo())[0]
csr = x509_csr.X509Csr.from_buffer(self.csr_sample)
csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes)
csr._csr['certificationRequestInfo']['subjectPublicKeyInfo'] = spki
self.assertIsNone(custom.public_key(csr=csr,
allowed_keys={'DSA': 1024}))
def test_public_key_too_short(self):
csr = x509_csr.X509Csr.from_buffer(self.csr_sample)
csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes)
with self.assertRaises(errors.ValidationError):
custom.public_key(csr=csr, allowed_keys={'RSA': 99999999})
def test_public_key_wrong_algo(self):
csr = x509_csr.X509Csr.from_buffer(self.csr_sample)
csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes)
with self.assertRaises(errors.ValidationError):
custom.public_key(csr=csr, allowed_keys={'XXX': 0})

View File

@ -29,7 +29,7 @@ import tests
class TestStandardsValidator(tests.DefaultRequestMixin, unittest.TestCase):
def test_passing(self):
csr = signing_request.X509Csr.from_buffer(self.csr_sample)
csr = signing_request.X509Csr.from_buffer(self.csr_sample_bytes)
standards.standards_compliance(csr=csr)