Fix notBefore/notAfter handling in non-UTC time
Use the actual type returned from openssl - ASN1_TIME. Also, the test itself was incorrect. set_not_...() tests are fixed to test UTC-based values now and not convert to local timezone. Cryptography >= 0.9.1 needs to be used due to incompatible interface changes. Change-Id: I520e42e5a985f47e9fe8505f9023b8b1d05665b5
This commit is contained in:
parent
80c43520b7
commit
502c2b6c92
|
@ -11,12 +11,12 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from cryptography.hazmat.backends.openssl import backend
|
||||
import errors
|
||||
import message_digest
|
||||
import name
|
||||
import utils
|
||||
|
||||
|
||||
class X509CertificateError(errors.X509Error):
|
||||
|
@ -69,30 +69,6 @@ class X509Certificate(object):
|
|||
if getattr(self, '_certObj', None):
|
||||
self._lib.X509_free(self._certObj)
|
||||
|
||||
def _asn1_utctime(self, t):
|
||||
asn1_utctime = self._lib.ASN1_UTCTIME_set(self._ffi.NULL, t)
|
||||
if asn1_utctime == self._ffi.NULL:
|
||||
raise X509CertificateError("Could not create ASN1_UTCTIME "
|
||||
"object") # pragma: no cover
|
||||
|
||||
return asn1_utctime
|
||||
|
||||
def _python_utctime(self, t):
|
||||
bio = self._lib.BIO_new(self._lib.BIO_s_mem())
|
||||
bio = self._ffi.gc(bio, self._lib.BIO_free)
|
||||
|
||||
val = self._lib.ASN1_UTCTIME_print(bio, t)
|
||||
if val != 1:
|
||||
raise X509CertificateError("Could not print"
|
||||
" ASN1_UTCTIME") # pragma: no cover
|
||||
size = 1024
|
||||
data = self._ffi.new("char[]", size)
|
||||
self._lib.BIO_gets(bio, data, size)
|
||||
data = self._ffi.string(data)
|
||||
|
||||
val = time.strptime(data, "%b %d %H:%M:%S %Y %Z")
|
||||
return time.mktime(val) # seconds since the epoch
|
||||
|
||||
def from_buffer(self, data):
|
||||
"""Build this X509 object from a data buffer in memory.
|
||||
|
||||
|
@ -156,9 +132,9 @@ class X509Certificate(object):
|
|||
|
||||
:param t: time in seconds since the epoch
|
||||
"""
|
||||
ansi1_utc = self._asn1_utctime(t)
|
||||
ret = self._lib.X509_set_notBefore(self._certObj, ansi1_utc)
|
||||
self._lib.ASN1_UTCTIME_free(ansi1_utc)
|
||||
asn1_time = utils.timestamp_to_asn1_time(t)
|
||||
ret = self._lib.X509_set_notBefore(self._certObj, asn1_time)
|
||||
self._lib.ASN1_TIME_free(asn1_time)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"not before time.") # pragma: no cover
|
||||
|
@ -166,17 +142,16 @@ class X509Certificate(object):
|
|||
def get_not_before(self):
|
||||
"""Get the 'not before' date field as seconds since the epoch."""
|
||||
not_before = self._lib.X509_get_notBefore(self._certObj)
|
||||
not_before = self._ffi.cast("ASN1_UTCTIME*", not_before)
|
||||
return self._python_utctime(not_before)
|
||||
return utils.asn1_time_to_timestamp(not_before)
|
||||
|
||||
def set_not_after(self, t):
|
||||
"""Set the 'not after' date field.
|
||||
|
||||
:param t: time in seconds since the epoch
|
||||
"""
|
||||
ansi1_utc = self._asn1_utctime(t)
|
||||
ret = self._lib.X509_set_notAfter(self._certObj, ansi1_utc)
|
||||
self._lib.ASN1_UTCTIME_free(ansi1_utc)
|
||||
asn1_time = utils.timestamp_to_asn1_time(t)
|
||||
ret = self._lib.X509_set_notAfter(self._certObj, asn1_time)
|
||||
self._lib.ASN1_TIME_free(asn1_time)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"not after time.") # pragma: no cover
|
||||
|
@ -184,8 +159,7 @@ class X509Certificate(object):
|
|||
def get_not_after(self):
|
||||
"""Get the 'not after' date field as seconds since the epoch."""
|
||||
not_after = self._lib.X509_get_notAfter(self._certObj)
|
||||
not_after = self._ffi.cast("ASN1_UTCTIME*", not_after)
|
||||
return self._python_utctime(not_after)
|
||||
return utils.asn1_time_to_timestamp(not_after)
|
||||
|
||||
def set_pubkey(self, pkey):
|
||||
"""Set the public key field.
|
||||
|
|
|
@ -16,3 +16,8 @@ class X509Error(Exception):
|
|||
"""Base exception for X509 errors."""
|
||||
def __init__(self, what):
|
||||
super(X509Error, self).__init__(what)
|
||||
|
||||
|
||||
class ASN1TimeError(Exception):
|
||||
"""Base exception for ASN1-time related errors."""
|
||||
pass
|
||||
|
|
|
@ -11,7 +11,11 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import calendar
|
||||
import datetime
|
||||
|
||||
from cryptography.hazmat.backends.openssl import backend
|
||||
import errors
|
||||
|
||||
|
||||
def load_pem_private_key(key_data, passwd=None):
|
||||
|
@ -36,3 +40,96 @@ def load_pem_private_key(key_data, passwd=None):
|
|||
|
||||
evp_pkey = ffi.gc(evp_pkey, lib.EVP_PKEY_free)
|
||||
return evp_pkey
|
||||
|
||||
|
||||
def create_timezone(minute_offset):
|
||||
"""Create a new timezone with a specified offset.
|
||||
|
||||
Since tzinfo is just a base class, and tzinfo subclasses need a
|
||||
no-arguments __init__(), we need to generate a new class dynamically.
|
||||
|
||||
:param minute_offset: total timezone offset in minutes
|
||||
"""
|
||||
|
||||
class SpecificTZ(datetime.tzinfo):
|
||||
def utcoffset(self, _dt):
|
||||
return minute_offset
|
||||
|
||||
def dst(self, _dt):
|
||||
return None
|
||||
|
||||
def tzname(self, _dt):
|
||||
return None
|
||||
|
||||
def __repr__(self):
|
||||
sign = "+" if minute_offset > 0 else "-"
|
||||
hh = minute_offset / 60
|
||||
mm = minute_offset % 60
|
||||
return "Timezone %s%02i%02i" % (sign, hh, mm)
|
||||
|
||||
return SpecificTZ()
|
||||
|
||||
|
||||
def asn1_time_to_timestamp(t):
|
||||
"""Convert from ASN1_TIME type to a UTC-based timestamp.
|
||||
|
||||
:param t: ASN1_TIME to convert
|
||||
"""
|
||||
|
||||
gen_time = backend._lib.ASN1_TIME_to_generalizedtime(t, backend._ffi.NULL)
|
||||
if gen_time == backend._ffi.NULL:
|
||||
raise errors.ASN1TimeError("time conversion failure")
|
||||
|
||||
try:
|
||||
return asn1_generalizedtime_to_timestamp(gen_time)
|
||||
finally:
|
||||
backend._lib.ASN1_GENERALIZEDTIME_free(gen_time)
|
||||
|
||||
|
||||
def asn1_generalizedtime_to_timestamp(gt):
|
||||
"""Convert from ASN1_GENERALIZEDTIME to UTC-based timestamp.
|
||||
|
||||
:param gt: ASN1_GENERALIZEDTIME to convert
|
||||
"""
|
||||
|
||||
# ASN1_GENERALIZEDTIME is actually a string in known formats,
|
||||
# so the conversion can be done in this code
|
||||
string_time = backend._ffi.cast("ASN1_STRING*", gt)
|
||||
string_data = backend._lib.ASN1_STRING_data(string_time)
|
||||
res = backend._ffi.string(string_data)
|
||||
|
||||
before_tz = res[:14]
|
||||
tz_str = res[14:]
|
||||
d = datetime.datetime.strptime(before_tz, "%Y%m%d%H%M%S")
|
||||
if tz_str == 'Z':
|
||||
# YYYYMMDDhhmmssZ
|
||||
d.replace(tzinfo=create_timezone(0))
|
||||
else:
|
||||
# YYYYMMDDhhmmss+hhmm
|
||||
# YYYYMMDDhhmmss-hhmm
|
||||
sign = -1 if tz_str[0] == '-' else 1
|
||||
hh = tz_str[1:3]
|
||||
mm = tz_str[3:5]
|
||||
minute_offset = sign * (int(mm) + int(hh) * 60)
|
||||
d.replace(tzinfo=create_timezone(minute_offset))
|
||||
return calendar.timegm(d.timetuple())
|
||||
|
||||
|
||||
def timestamp_to_asn1_time(t):
|
||||
"""Convert from UTC-based timestamp to ASN1_TIME
|
||||
|
||||
:param t: time in seconds since the epoch
|
||||
"""
|
||||
|
||||
d = datetime.datetime.utcfromtimestamp(t)
|
||||
# use the ASN1_GENERALIZEDTIME format
|
||||
time_str = d.strftime("%Y%m%d%H%M%SZ")
|
||||
asn1_time = backend._lib.ASN1_STRING_type_new(
|
||||
backend._lib.V_ASN1_GENERALIZEDTIME)
|
||||
backend._lib.ASN1_STRING_set(asn1_time, time_str, len(time_str))
|
||||
asn1_gentime = backend._ffi.cast("ASN1_GENERALIZEDTIME*", asn1_time)
|
||||
if backend._lib.ASN1_GENERALIZEDTIME_check(asn1_gentime) == 0:
|
||||
raise errors.ASN1TimeError("timestamp not accepted by ASN1 check")
|
||||
|
||||
# ASN1_GENERALIZEDTIME is a form of ASN1_TIME, so a pointer cast is valid
|
||||
return backend._ffi.cast("ASN1_TIME*", asn1_time)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
cryptography>=0.8
|
||||
cryptography>=0.9.1
|
||||
pecan>=0.8.0
|
||||
Paste
|
||||
netaddr>=0.7.12
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
@ -286,8 +285,7 @@ class TestX509Cert(unittest.TestCase):
|
|||
def test_set_not_before(self):
|
||||
self.cert.set_not_before(0) # seconds since epoch
|
||||
val = self.cert.get_not_before()
|
||||
tst = time.mktime(time.gmtime(0))
|
||||
self.assertEqual(tst, val)
|
||||
self.assertEqual(0, val)
|
||||
|
||||
def test_get_not_after(self):
|
||||
val = self.cert.get_not_after()
|
||||
|
@ -296,5 +294,4 @@ class TestX509Cert(unittest.TestCase):
|
|||
def test_set_not_after(self):
|
||||
self.cert.set_not_after(0) # seconds since epoch
|
||||
val = self.cert.get_not_after()
|
||||
tst = time.mktime(time.gmtime(0))
|
||||
self.assertEqual(tst, val)
|
||||
self.assertEqual(0, val)
|
||||
|
|
Loading…
Reference in New Issue