diff --git a/anchor/X509/certificate.py b/anchor/X509/certificate.py index d53d112..2844697 100644 --- a/anchor/X509/certificate.py +++ b/anchor/X509/certificate.py @@ -11,12 +11,12 @@ # License for the specific language governing permissions and limitations # under the License. -import time from cryptography.hazmat.backends.openssl import backend import errors import message_digest import name +import utils class X509CertificateError(errors.X509Error): @@ -69,30 +69,6 @@ class X509Certificate(object): if getattr(self, '_certObj', None): self._lib.X509_free(self._certObj) - def _asn1_utctime(self, t): - asn1_utctime = self._lib.ASN1_UTCTIME_set(self._ffi.NULL, t) - if asn1_utctime == self._ffi.NULL: - raise X509CertificateError("Could not create ASN1_UTCTIME " - "object") # pragma: no cover - - return asn1_utctime - - def _python_utctime(self, t): - bio = self._lib.BIO_new(self._lib.BIO_s_mem()) - bio = self._ffi.gc(bio, self._lib.BIO_free) - - val = self._lib.ASN1_UTCTIME_print(bio, t) - if val != 1: - raise X509CertificateError("Could not print" - " ASN1_UTCTIME") # pragma: no cover - size = 1024 - data = self._ffi.new("char[]", size) - self._lib.BIO_gets(bio, data, size) - data = self._ffi.string(data) - - val = time.strptime(data, "%b %d %H:%M:%S %Y %Z") - return time.mktime(val) # seconds since the epoch - def from_buffer(self, data): """Build this X509 object from a data buffer in memory. @@ -156,9 +132,9 @@ class X509Certificate(object): :param t: time in seconds since the epoch """ - ansi1_utc = self._asn1_utctime(t) - ret = self._lib.X509_set_notBefore(self._certObj, ansi1_utc) - self._lib.ASN1_UTCTIME_free(ansi1_utc) + asn1_time = utils.timestamp_to_asn1_time(t) + ret = self._lib.X509_set_notBefore(self._certObj, asn1_time) + self._lib.ASN1_TIME_free(asn1_time) if ret == 0: raise X509CertificateError("Could not set X509 certificate " "not before time.") # pragma: no cover @@ -166,17 +142,16 @@ class X509Certificate(object): def get_not_before(self): """Get the 'not before' date field as seconds since the epoch.""" not_before = self._lib.X509_get_notBefore(self._certObj) - not_before = self._ffi.cast("ASN1_UTCTIME*", not_before) - return self._python_utctime(not_before) + return utils.asn1_time_to_timestamp(not_before) def set_not_after(self, t): """Set the 'not after' date field. :param t: time in seconds since the epoch """ - ansi1_utc = self._asn1_utctime(t) - ret = self._lib.X509_set_notAfter(self._certObj, ansi1_utc) - self._lib.ASN1_UTCTIME_free(ansi1_utc) + asn1_time = utils.timestamp_to_asn1_time(t) + ret = self._lib.X509_set_notAfter(self._certObj, asn1_time) + self._lib.ASN1_TIME_free(asn1_time) if ret == 0: raise X509CertificateError("Could not set X509 certificate " "not after time.") # pragma: no cover @@ -184,8 +159,7 @@ class X509Certificate(object): def get_not_after(self): """Get the 'not after' date field as seconds since the epoch.""" not_after = self._lib.X509_get_notAfter(self._certObj) - not_after = self._ffi.cast("ASN1_UTCTIME*", not_after) - return self._python_utctime(not_after) + return utils.asn1_time_to_timestamp(not_after) def set_pubkey(self, pkey): """Set the public key field. diff --git a/anchor/X509/errors.py b/anchor/X509/errors.py index 61c677d..4d08544 100644 --- a/anchor/X509/errors.py +++ b/anchor/X509/errors.py @@ -16,3 +16,8 @@ class X509Error(Exception): """Base exception for X509 errors.""" def __init__(self, what): super(X509Error, self).__init__(what) + + +class ASN1TimeError(Exception): + """Base exception for ASN1-time related errors.""" + pass diff --git a/anchor/X509/utils.py b/anchor/X509/utils.py index 2de80b8..ef0db3d 100644 --- a/anchor/X509/utils.py +++ b/anchor/X509/utils.py @@ -11,7 +11,11 @@ # License for the specific language governing permissions and limitations # under the License. +import calendar +import datetime + from cryptography.hazmat.backends.openssl import backend +import errors def load_pem_private_key(key_data, passwd=None): @@ -36,3 +40,96 @@ def load_pem_private_key(key_data, passwd=None): evp_pkey = ffi.gc(evp_pkey, lib.EVP_PKEY_free) return evp_pkey + + +def create_timezone(minute_offset): + """Create a new timezone with a specified offset. + + Since tzinfo is just a base class, and tzinfo subclasses need a + no-arguments __init__(), we need to generate a new class dynamically. + + :param minute_offset: total timezone offset in minutes + """ + + class SpecificTZ(datetime.tzinfo): + def utcoffset(self, _dt): + return minute_offset + + def dst(self, _dt): + return None + + def tzname(self, _dt): + return None + + def __repr__(self): + sign = "+" if minute_offset > 0 else "-" + hh = minute_offset / 60 + mm = minute_offset % 60 + return "Timezone %s%02i%02i" % (sign, hh, mm) + + return SpecificTZ() + + +def asn1_time_to_timestamp(t): + """Convert from ASN1_TIME type to a UTC-based timestamp. + + :param t: ASN1_TIME to convert + """ + + gen_time = backend._lib.ASN1_TIME_to_generalizedtime(t, backend._ffi.NULL) + if gen_time == backend._ffi.NULL: + raise errors.ASN1TimeError("time conversion failure") + + try: + return asn1_generalizedtime_to_timestamp(gen_time) + finally: + backend._lib.ASN1_GENERALIZEDTIME_free(gen_time) + + +def asn1_generalizedtime_to_timestamp(gt): + """Convert from ASN1_GENERALIZEDTIME to UTC-based timestamp. + + :param gt: ASN1_GENERALIZEDTIME to convert + """ + + # ASN1_GENERALIZEDTIME is actually a string in known formats, + # so the conversion can be done in this code + string_time = backend._ffi.cast("ASN1_STRING*", gt) + string_data = backend._lib.ASN1_STRING_data(string_time) + res = backend._ffi.string(string_data) + + before_tz = res[:14] + tz_str = res[14:] + d = datetime.datetime.strptime(before_tz, "%Y%m%d%H%M%S") + if tz_str == 'Z': + # YYYYMMDDhhmmssZ + d.replace(tzinfo=create_timezone(0)) + else: + # YYYYMMDDhhmmss+hhmm + # YYYYMMDDhhmmss-hhmm + sign = -1 if tz_str[0] == '-' else 1 + hh = tz_str[1:3] + mm = tz_str[3:5] + minute_offset = sign * (int(mm) + int(hh) * 60) + d.replace(tzinfo=create_timezone(minute_offset)) + return calendar.timegm(d.timetuple()) + + +def timestamp_to_asn1_time(t): + """Convert from UTC-based timestamp to ASN1_TIME + + :param t: time in seconds since the epoch + """ + + d = datetime.datetime.utcfromtimestamp(t) + # use the ASN1_GENERALIZEDTIME format + time_str = d.strftime("%Y%m%d%H%M%SZ") + asn1_time = backend._lib.ASN1_STRING_type_new( + backend._lib.V_ASN1_GENERALIZEDTIME) + backend._lib.ASN1_STRING_set(asn1_time, time_str, len(time_str)) + asn1_gentime = backend._ffi.cast("ASN1_GENERALIZEDTIME*", asn1_time) + if backend._lib.ASN1_GENERALIZEDTIME_check(asn1_gentime) == 0: + raise errors.ASN1TimeError("timestamp not accepted by ASN1 check") + + # ASN1_GENERALIZEDTIME is a form of ASN1_TIME, so a pointer cast is valid + return backend._ffi.cast("ASN1_TIME*", asn1_time) diff --git a/requirements.txt b/requirements.txt index be9ab06..e975b50 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -cryptography>=0.8 +cryptography>=0.9.1 pecan>=0.8.0 Paste netaddr>=0.7.12 diff --git a/tests/X509/test_x509_certificate.py b/tests/X509/test_x509_certificate.py index 934a870..fa469c1 100644 --- a/tests/X509/test_x509_certificate.py +++ b/tests/X509/test_x509_certificate.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import time import unittest import mock @@ -286,8 +285,7 @@ class TestX509Cert(unittest.TestCase): def test_set_not_before(self): self.cert.set_not_before(0) # seconds since epoch val = self.cert.get_not_before() - tst = time.mktime(time.gmtime(0)) - self.assertEqual(tst, val) + self.assertEqual(0, val) def test_get_not_after(self): val = self.cert.get_not_after() @@ -296,5 +294,4 @@ class TestX509Cert(unittest.TestCase): def test_set_not_after(self): self.cert.set_not_after(0) # seconds since epoch val = self.cert.get_not_after() - tst = time.mktime(time.gmtime(0)) - self.assertEqual(tst, val) + self.assertEqual(0, val)