
524 lines
18 KiB

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import functools
import netaddr
from pyasn1.codec.der import decoder
from pyasn1.codec.der import encoder
from pyasn1.type import constraint as asn1_constraint
from pyasn1.type import namedtype as asn1_namedtype
from pyasn1.type import tag as asn1_tag
from pyasn1.type import univ as asn1_univ
from anchor.asn1 import rfc5280
from anchor.X509 import errors
from anchor.X509 import utils
# missing extended use ids from rfc5280
id_kp_OCSPSigning = asn1_univ.ObjectIdentifier(rfc5280.id_kp.asTuple() + (9,))
anyExtendedKeyUsage = asn1_univ.ObjectIdentifier(
rfc5280.id_ce_extKeyUsage.asTuple() + (0,))
# names matching openssl
rfc5280.id_kp_serverAuth: "TLS Web Server Authentication",
rfc5280.id_kp_clientAuth: "TLS Web Client Authentication",
rfc5280.id_kp_codeSigning: "Code Signing",
rfc5280.id_kp_emailProtection: "E-mail Protection",
rfc5280.id_kp_timeStamping: "Time Stamping",
id_kp_OCSPSigning: "OCSP Signing",
anyExtendedKeyUsage: "Any Extended Key Usage",
EXT_KEY_USAGE_NAMES_INV = dict((v, k) for k, v in EXT_KEY_USAGE_NAMES.items())
rfc5280.id_kp_serverAuth: "serverAuth",
rfc5280.id_kp_clientAuth: "clientAuth",
rfc5280.id_kp_codeSigning: "codeSigning",
rfc5280.id_kp_emailProtection: "emailProtection",
rfc5280.id_kp_timeStamping: "timeStamping",
id_kp_OCSPSigning: "ocspSigning",
anyExtendedKeyUsage: "anyExtendedKeyUsage",
EXT_KEY_USAGE_SHORT_NAMES_INV = dict((v, k) for k, v in
rfc5280.id_ce_policyConstraints: 'policyConstraints',
rfc5280.id_ce_basicConstraints: 'basicConstraints',
rfc5280.id_ce_subjectDirectoryAttributes: 'subjectDirectoryAttributes',
rfc5280.id_ce_deltaCRLIndicator: 'deltaCRLIndicator',
rfc5280.id_ce_cRLDistributionPoints: 'cRLDistributionPoints',
rfc5280.id_ce_issuingDistributionPoint: 'issuingDistributionPoint',
rfc5280.id_ce_nameConstraints: 'nameConstraints',
rfc5280.id_ce_certificatePolicies: 'certificatePolicies',
rfc5280.id_ce_policyMappings: 'policyMappings',
rfc5280.id_ce_privateKeyUsagePeriod: 'privateKeyUsagePeriod',
rfc5280.id_ce_keyUsage: 'keyUsage',
rfc5280.id_ce_authorityKeyIdentifier: 'authorityKeyIdentifier',
rfc5280.id_ce_subjectKeyIdentifier: 'subjectKeyIdentifier',
rfc5280.id_ce_certificateIssuer: 'certificateIssuer',
rfc5280.id_ce_subjectAltName: 'subjectAltName',
rfc5280.id_ce_issuerAltName: 'issuerAltName',
"Digital Signature": "digitalSignature",
"Non Repudiation": "nonRepudiation",
"Key Encipherment": "keyEncipherment",
"Data Encipherment": "dataEncipherment",
"Key Agreement": "keyAgreement",
"Certificate Sign": "keyCertSign",
"CRL Sign": "cRLSign",
"Encipher Only": "encipherOnly",
"Decipher Only": "decipherOnly",
def uses_ext_value(f):
"""Wrapper allowing reading of extension value.
Because the value is normally saved in a (double) serialised way, it's
not easily accessible to the member methods. This is made easier by
unpacking the extension value into an extra argument.
def ext_value_filled(self, *args, **kwargs):
kwargs['ext_value'] = self._get_value()
return f(self, *args, **kwargs)
return ext_value_filled
def modifies_ext_value(f):
"""Wrapper allowing modification of extension value.
Because the value is normally saved in a (double) serialised way, it's
not easily accessible to the member methods. This is made easier by
unpacking the extension value into an extra argument.
New value needs to be returned from the method.
def ext_value_filled(self, *args, **kwargs):
value = self._get_value()
kwargs['ext_value'] = value
# since some elements like NamedValue are pure value types, there is
# no interface to modify them and new versions have to be returned
value = f(self, *args, **kwargs)
return ext_value_filled
class BasicConstraints(asn1_univ.Sequence):
"""Custom BasicConstraint implementation until pyasn1_modules is fixes."""
componentType = asn1_namedtype.NamedTypes(
asn1_namedtype.DefaultedNamedType('cA', asn1_univ.Boolean(False)),
subtypeSpec=asn1_constraint.ValueRangeConstraint(0, 64)))
class NameConstraints(asn1_univ.Sequence):
"""Custom NameConstraints implementation until pyasn1_modules is fixed."""
componentType = asn1_namedtype.NamedTypes(
asn1_tag.tagFormatConstructed, 0))),
asn1_tag.tagFormatConstructed, 1)))
class X509Extension(object):
"""Abstraction for the pyasn1 Extension structures.
The object should normally be constructed using `construct_extension`,
which will choose the right extension type based on the id.
Each extension has an immutable oid and a spec of the internal value
Unknown extension types can be still represented by the
X509Extension object and copied/serialised without understanding the
value details. The value will not be displayed properly in the logs
in the case.
_oid = None
spec = None
"""An X509 V3 Certificate extension."""
def __init__(self, ext=None):
if ext is None:
if self.spec is None:
raise errors.X509Error("cannot create generic extension")
self._ext = rfc5280.Extension()
self._ext['extnID'] = self._oid
if not isinstance(ext, rfc5280.Extension):
raise errors.X509Error("extension has incorrect type")
self._ext = ext
def _get_default_value(cls):
# if there are any non-optional fields, this needs to be defined in
# the class
return cls.spec()
def __str__(self):
return "%s: %s" % (self.get_name(), self.get_value_as_str())
def get_value_as_str(self):
return "<unknown>"
def get_oid(self):
return self._ext['extnID']
def get_name(self):
"""Get the extension name as a python string."""
oid = self.get_oid()
return EXTENSION_NAMES.get(oid, oid)
def get_critical(self):
return self._ext['critical']
def set_critical(self, critical):
self._ext['critical'] = critical
def _get_value(self):
return decoder.decode(self._ext['extnValue'].asOctets(),
def _set_value(self, value):
if not isinstance(value, self.spec):
raise errors.X509Error("extension value has incorrect type")
self._ext['extnValue'] = encoder.encode(value)
def as_der(self):
return encoder.encode(self._ext)
def as_asn1(self):
return self._ext
class X509ExtensionBasicConstraints(X509Extension):
spec = BasicConstraints
_oid = rfc5280.id_ce_basicConstraints
def get_ca(self, ext_value=None):
return bool(ext_value['cA'])
def set_ca(self, ca, ext_value=None):
ext_value['cA'] = ca
return ext_value
def get_path_len_constraint(self, ext_value=None):
return ext_value['pathLenConstraint']
def set_path_len_constraint(self, length, ext_value=None):
ext_value['pathLenConstraint'] = length
return ext_value
def __str__(self):
return "basicConstraints: CA: %s, pathLen: %s" % (
str(self.get_ca()).upper(), self.get_path_len_constraint())
class X509ExtensionKeyUsage(X509Extension):
spec = rfc5280.KeyUsage
_oid = rfc5280.id_ce_keyUsage
fields = dict(spec.namedValues.namedValues)
inv_fields = dict((v, k) for k, v in spec.namedValues.namedValues)
def _get_default_value(cls):
# if there are any non-optional fields, this needs to be defined in
# the class
return cls.spec("''B")
def get_usage(self, usage, ext_value=None):
usage = LONG_KEY_USAGE_NAMES.get(usage, usage)
pos = self.fields[usage]
if pos >= len(ext_value):
return False
return bool(ext_value[pos])
def get_all_usages(self, ext_value=None):
return [self.inv_fields[i] for i, enabled in enumerate(ext_value)
if enabled]
def set_usage(self, usage, state, ext_value=None):
usage = LONG_KEY_USAGE_NAMES.get(usage, usage)
pos = self.fields[usage]
values = [x for x in ext_value]
if state:
while pos >= len(values):
values[pos] = 1
if pos < len(values):
values[pos] = 0
bits = ''.join(str(x) for x in values)
return self.spec("'%s'B" % bits)
def __str__(self):
return "keyUsage: " + ", ".join(self.get_all_usages())
class X509ExtensionSubjectAltName(X509Extension):
spec = rfc5280.SubjectAltName
_oid = rfc5280.id_ce_subjectAltName
def get_dns_ids(self, ext_value=None):
dns_ids = []
for name in ext_value:
if name.getName() != 'dNSName':
component = name.getComponent()
dns_id = component.asOctets().decode(component.encoding)
return dns_ids
def get_ips(self, ext_value=None):
ips = []
for name in ext_value:
if name.getName() != 'iPAddress':
return ips
def has_unknown_entries(self, ext_value=None):
for name in ext_value:
if name.getName() not in ('dNSName', 'iPAddress'):
return True
return False
def add_dns_id(self, dns_id, validate=True, ext_value=None):
new_pos = len(ext_value)
ext_value[new_pos] = None
ext_value[new_pos]['dNSName'] = dns_id
return ext_value
def add_ip(self, ip, ext_value=None):
if not isinstance(ip, netaddr.IPAddress):
raise errors.X509Error("not a real ip address provided")
new_pos = len(ext_value)
ext_value[new_pos] = None
ext_value[new_pos]['iPAddress'] = utils.netaddr_to_asn1(ip)
return ext_value
def __str__(self, ext_value=None):
entries = ["DNS:%s" % (x,) for x in self.get_dns_ids()]
entries += ["IP:%s" % (x,) for x in self.get_ips()]
return "subjectAltName: " + ", ".join(entries)
class X509ExtensionNameConstraints(X509Extension):
spec = NameConstraints
_oid = rfc5280.id_ce_nameConstraints
def _get_permitted(self, ext_value):
return ext_value['permittedSubtrees'] or []
def _get_excluded(self, ext_value):
return ext_value['excludedSubtrees'] or []
def get_permitted_length(self, ext_value=None):
return len(self._get_permitted(ext_value))
def get_permitted_name(self, n, ext_value=None):
name = self._get_permitted(ext_value)[n]['base']
return (name.getName(), name.getComponent())
def get_permitted_range(self, n, ext_value=None):
entry = self._get_permitted(ext_value)[n]
return (entry['minimum'], entry['maximum'])
def get_excluded_length(self, ext_value=None):
return len(self._get_excluded(ext_value))
def get_excluded_name(self, n, ext_value=None):
name = self._get_excluded(ext_value)[n]['base']
return (name.getName(), name.getComponent())
def get_excluded_range(self, n, ext_value=None):
entry = self._get_excluded(ext_value)[n]
return (entry['minimum'], entry['maximum'])
def _add_to_tree(self, ext_value, tree_name, position, name_type, name):
if ext_value[tree_name] is None:
ext_value[tree_name] = None
ext_value[tree_name][position] = None
ext_value[tree_name][position]['base'] = None
ext_value[tree_name][position]['base'][name_type] = name
ext_value[tree_name][position]['minimum'] = 0
# maximum should be missing (RFC5280/
def add_permitted(self, name_type, name, ext_value=None):
last = self.get_permitted_length()
self._add_to_tree(ext_value, 'permittedSubtrees', last,
name_type, name)
return ext_value
def add_excluded(self, name_type, name, ext_value=None):
last = self.get_excluded_length()
self._add_to_tree(ext_value, 'excludedSubtrees', last, name_type, name)
return ext_value
class X509ExtensionExtendedKeyUsage(X509Extension):
spec = rfc5280.ExtKeyUsageSyntax
_oid = rfc5280.id_ce_extKeyUsage
_valid = list(EXT_KEY_USAGE_NAMES.keys())
def get_all_usages(self, ext_value=None):
return [usage for usage in ext_value]
def get_usage(self, usage, ext_value=None):
if usage not in self._valid:
raise ValueError("usage not valid")
return (usage in ext_value)
def set_usage(self, usage, state, ext_value=None):
if usage not in self._valid:
raise ValueError("usage not valid")
if state:
if usage not in ext_value:
ext_value[len(ext_value)] = usage
if usage in ext_value:
old = [x for x in ext_value if x != usage]
for i, x in enumerate(old):
ext_value[i] = x
return ext_value
def __str__(self, ext_value=None):
usages = [EXT_KEY_USAGE_NAMES.get(u) for u in ext_value]
return "extKeyUsage: " + ", ".join(usages)
class X509ExtensionAuthorityKeyId(X509Extension):
spec = rfc5280.AuthorityKeyIdentifier
_oid = rfc5280.id_ce_authorityKeyIdentifier
def get_key_id(self, ext_value=None):
ki = ext_value['keyIdentifier']
if ki:
return ki.asOctets()
return None
def get_serial(self, ext_value=None):
return ext_value['authorityCertSerialNumber']
def set_key_id(self, key, ext_value=None):
# new extension, pyasn1 cannot remove values
new_ext = self.spec()
new_ext['keyIdentifier'] = key
return new_ext
def set_serial(self, serial, ext_value=None):
# new extension, pyasn1 cannot remove values
new_ext = self.spec()
new_ext['authorityCertSerialNumber'] = int(serial)
return new_ext
class X509ExtensionSubjectKeyId(X509Extension):
spec = rfc5280.SubjectKeyIdentifier
_oid = rfc5280.id_ce_subjectKeyIdentifier
def _get_default_value(cls):
return cls.spec(b"")
def get_key_id(self, ext_value=None):
return ext_value.asOctets()
def set_key_id(self, key, ext_value=None):
return self.spec(key)
rfc5280.id_ce_basicConstraints: X509ExtensionBasicConstraints,
rfc5280.id_ce_keyUsage: X509ExtensionKeyUsage,
rfc5280.id_ce_extKeyUsage: X509ExtensionExtendedKeyUsage,
rfc5280.id_ce_subjectAltName: X509ExtensionSubjectAltName,
rfc5280.id_ce_nameConstraints: X509ExtensionNameConstraints,
rfc5280.id_ce_authorityKeyIdentifier: X509ExtensionAuthorityKeyId,
rfc5280.id_ce_subjectKeyIdentifier: X509ExtensionSubjectKeyId,
def construct_extension(ext):
"""Construct an extension object of the right type.
While X509Extension can provide basic access to the extension elements,
it cannot parse details of extensions. This function detects which type
should be used based on the extension id.
If the type is unknown, generic X509Extension is used instead.
if not isinstance(ext, rfc5280.Extension):
raise errors.X509Error("extension has incorrect type")
ext_class = EXTENSION_CLASSES.get(ext['extnID'], X509Extension)
return ext_class(ext)