Re-enabling more pep8 tests and bringing code inline

Change-Id: Ib2a6b6c59a0f231a41cc7b865be615596287075c
This commit is contained in:
Tim Kelsey 2015-01-09 13:02:09 +00:00
parent 9eada1323e
commit 4b5e0bb7d1
14 changed files with 105 additions and 63 deletions

View File

@ -13,19 +13,19 @@
from cryptography.hazmat.backends.openssl import backend
import errors
import message_digest
import name
import errors
class X509CertificateError(errors.X509Error):
"""Specific error for X509 certificate operations"""
"""Specific error for X509 certificate operations."""
def __init__(self, what):
super(X509CertificateError, self).__init__(what)
class X509Extension(object):
"""An X509 V3 Certificate extension"""
"""An X509 V3 Certificate extension."""
def __init__(self, ext):
self._lib = backend._lib
self._ffi = backend._ffi
@ -35,14 +35,14 @@ class X509Extension(object):
return "%s %s" % (self.get_name(), self.get_value())
def get_name(self):
"""Get the extension name as a python string"""
"""Get the extension name as a python string."""
ext_obj = self._lib.X509_EXTENSION_get_object(self._ext)
ext_nid = self._lib.OBJ_obj2nid(ext_obj)
ext_name_str = self._lib.OBJ_nid2sn(ext_nid)
return self._ffi.string(ext_name_str)
def get_value(self):
"""Get the extension value as a python string"""
"""Get the extension value as a python string."""
bio = self._lib.BIO_new(self._lib.BIO_s_mem())
bio = self._ffi.gc(bio, self._lib.BIO_free)
self._lib.X509V3_EXT_print(bio, self._ext, 0, 0)
@ -53,7 +53,7 @@ class X509Extension(object):
class X509Certificate(object):
"""X509 certificate class"""
"""X509 certificate class."""
def __init__(self):
self._lib = backend._lib
self._ffi = backend._ffi
@ -77,7 +77,8 @@ class X509Certificate(object):
return asn1_utctime
def from_buffer(self, data):
"""Build this X509 object from a data buffer in memory
"""Build this X509 object from a data buffer in memory.
:param data: A data buffer
"""
bio = backend._bytes_to_bio(data.encode('ascii'))
@ -97,7 +98,8 @@ class X509Certificate(object):
self._certObj = certObj
def from_file(self, path):
"""Build this X509 certificate object from a data file on disk
"""Build this X509 certificate object from a data file on disk.
:param path: A data buffer
"""
data = None
@ -106,7 +108,8 @@ class X509Certificate(object):
self.from_buffer(data)
def save(self, path):
"""Save this X509 certificate object to a file on disk
"""Save this X509 certificate object to a file on disk.
:param path: Output file path
"""
bio = self._lib.BIO_new_file(path, "w")
@ -118,7 +121,8 @@ class X509Certificate(object):
"disk as PEM data.")
def set_version(self, v):
"""Set the version of this X509 certificate object
"""Set the version of this X509 certificate object.
:param v: The version
"""
ret = self._lib.X509_set_version(self._certObj, v)
@ -128,6 +132,7 @@ class X509Certificate(object):
def set_not_before(self, t):
"""Set the 'not before' date field.
:param t: a Python date-time object
"""
ansi1_utc = self._asn1_utctime(t)
@ -139,6 +144,7 @@ class X509Certificate(object):
def set_not_after(self, t):
"""Set the 'not after' date field.
:param t: a Python date-time object
"""
ansi1_utc = self._asn1_utctime(t)
@ -149,7 +155,8 @@ class X509Certificate(object):
"not after time.")
def set_pubkey(self, pkey):
"""Set the public key field
"""Set the public key field.
:param pkey: The public key, an EVP_PKEY ssl type
"""
ret = self._lib.X509_set_pubkey(self._certObj, pkey)
@ -158,7 +165,8 @@ class X509Certificate(object):
"pubkey.")
def get_subject(self):
"""Get the subject name field value
"""Get the subject name field value.
:return: An X509Name object instance
"""
val = self._lib.X509_get_subject_name(self._certObj)
@ -169,7 +177,8 @@ class X509Certificate(object):
return name.X509Name(val)
def set_subject(self, subject):
"""Set the subject name filed value
"""Set the subject name filed value.
:param subject: An X509Name object instance
"""
val = subject._name_obj
@ -179,7 +188,8 @@ class X509Certificate(object):
"subject.")
def set_issuer(self, issuer):
"""Set the issuer name field value
"""Set the issuer name field value.
:param issuer: An X509Name object instance
"""
val = issuer._name_obj
@ -189,7 +199,8 @@ class X509Certificate(object):
"issuer.")
def get_issuer(self):
"""Get the issuer name field value
"""Get the issuer name field value.
:return: An X509Name object instance
"""
val = self._lib.X509_get_issuer_name(self._certObj)
@ -216,7 +227,8 @@ class X509Certificate(object):
"serial number.")
def add_extension(self, ext, index):
"""Add an X509 V3 Certificate extension
"""Add an X509 V3 Certificate extension.
:param ext: An X509Extension instance
:param index: The index of the extension
"""
@ -227,6 +239,7 @@ class X509Certificate(object):
def sign(self, key, md='sha1'):
"""Sign the X509 certificate with a key using a message digest algorithm
:param key: The signing key, an EVP_PKEY OpenSSL object
:param md: The name of a message digest algorithm to use, it must be
valid and known to OpenSSL, possible values are
@ -244,11 +257,11 @@ class X509Certificate(object):
" certificate.")
def as_der(self):
"""Return this X509 certificate as DER encoded data"""
"""Return this X509 certificate as DER encoded data."""
buf = None
num = self._lib.i2d_X509(self._certObj, self._ffi.NULL)
if num != 0:
buf = self._ffi.new("unsigned char[]", num+1)
buf = self._ffi.new("unsigned char[]", num + 1)
buf_ptr = self._ffi.new("unsigned char**")
buf_ptr[0] = buf
num = self._lib.i2d_X509(self._certObj, buf_ptr)
@ -259,7 +272,8 @@ class X509Certificate(object):
return buf
def get_fingerprint(self, md='md5'):
"""Get the fingerprint of this X509 certifiacte
"""Get the fingerprint of this X509 certifiacte.
:param md: The message digest algorthim used to compute the fingerprint
:return: The fingerprint encoded as a hex string
"""

View File

@ -13,6 +13,6 @@
class X509Error(Exception):
"""Base exception for X509 errors"""
"""Base exception for X509 errors."""
def __init__(self, what):
super(X509Error, self).__init__(what)

View File

@ -49,7 +49,7 @@ class MessageDigest(object):
v = 0L
lx = len(x)
for i in range(lx):
v = v + ord(x[i]) * (256L ** (lx-i-1))
v = v + ord(x[i]) * (256L ** (lx - i - 1))
return v
def update(self, data):

View File

@ -17,7 +17,7 @@ import errors
class X509Name(object):
"""An X509 Name object"""
"""An X509 Name object."""
# NOTE(tkelsey): this is not exhaustive
nid = {'C': backend._lib.NID_countryName,
@ -42,7 +42,7 @@ class X509Name(object):
}
class Entry():
"""An X509 Name sub-entry object"""
"""An X509 Name sub-entry object."""
def __init__(self, obj):
self._lib = backend._lib
self._ffi = backend._ffi
@ -63,7 +63,8 @@ class X509Name(object):
raise errors.X509Error("Could not setup ASN1 string data.")
def get_name(self):
"""Get the name of this entry
"""Get the name of this entry.
:return: entry name as a python string
"""
asn1_obj = self._lib.X509_NAME_ENTRY_get_object(self._entry)
@ -75,7 +76,8 @@ class X509Name(object):
return self._ffi.string(buf)
def get_value(self):
"""Get the value of this entry
"""Get the value of this entry.
:return: entry value as a python string
"""
val = self._lib.X509_NAME_ENTRY_get_data(self._entry)
@ -116,11 +118,12 @@ class X509Name(object):
yield self[i]
def entry_count(self):
"""Get the number of entries in the name object"""
"""Get the number of entries in the name object."""
return self._lib.X509_NAME_entry_count(self._name_obj)
def get_entries_by_nid_name(self, nid_name):
"""Get a name entry corresponding to an NID name
"""Get a name entry corresponding to an NID name.
:param nid_name: an NID name, chosen from the X509Name.nid table
:return: An X509Name.Entry object
"""

View File

@ -11,12 +11,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from cryptography.hazmat.backends.openssl import backend
import errors
import certificate
import errors
import name
from cryptography.hazmat.backends.openssl import backend
class X509CsrError(errors.X509Error):
def __init__(self, what):
@ -24,7 +24,7 @@ class X509CsrError(errors.X509Error):
class X509Csr(object):
"""An X509 Certificate Signing Request"""
"""An X509 Certificate Signing Request."""
def __init__(self):
self._lib = backend._lib
self._ffi = backend._ffi
@ -40,6 +40,7 @@ class X509Csr(object):
def from_buffer(self, data, password=None):
"""Create this CSR from a buffer
:param data: The data buffer
:param password: decryption password, if needed
"""
@ -54,6 +55,7 @@ class X509Csr(object):
def from_file(self, path, password=None):
"""Create this CSR from a file on disk
:param path: Path to the file on disk
:param password: decryption password, if needed
"""
@ -64,6 +66,7 @@ class X509Csr(object):
def get_pubkey(self):
"""Get the public key from the CSR
:return: an OpenSSL EVP_PKEY object
"""
pkey = self._lib.X509_REQ_get_pubkey(self._csrObj)
@ -74,6 +77,7 @@ class X509Csr(object):
def get_subject(self):
"""Get the subject name field from the CSR
:return: an X509Name object
"""
subs = self._lib.X509_REQ_get_subject_name(self._csrObj)
@ -84,6 +88,7 @@ class X509Csr(object):
def get_extensions(self):
"""Get the list of all X509 V3 Extensions on this CSR
:return: a list of X509Extension objects
"""
# TODO(tkelsey): I assume the ext list copies data and this is safe

View File

@ -16,6 +16,7 @@ from cryptography.hazmat.backends.openssl import backend
def load_pem_private_key(key_data, passwd=None):
"""Load and return an OpenSSL EVP_PKEY public key object from a data buffer
:param key_data: The data buffer
:param passwd: Decryption password if neded (not used for now)
:return: an OpenSSL EVP_PKEY public key object

View File

@ -11,9 +11,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import paste
from paste import translogger
from pecan import make_app
import paste.translogger
from . import validators
import validators
class ConfigValidationException(Exception):
@ -30,20 +32,28 @@ def validate_config(conf):
for i, validators_list in enumerate(conf.validators):
name = validators_list.get("name")
if not name:
raise ConfigValidationException("Validator set %i is missing a name", i+1)
raise ConfigValidationException("Validator set %i is missing a "
"name", i + 1)
if not validators_list.get("steps"):
raise ConfigValidationException("Validator set <%s> is missing validation steps", name)
raise ConfigValidationException("Validator set <%s> is missing "
"validation steps", name)
for step in validators_list["steps"]:
if not isinstance(step, tuple):
raise ConfigValidationException("Validator set <%s> contains a step that's <%s> and not a tuple", name, step)
raise ConfigValidationException("Validator set <%s> contains "
"a step that's <%s> and not a "
"tuple", name, step)
if len(step) == 0:
raise ConfigValidationException("Validator set <%s> contains a step with no validator name", name)
raise ConfigValidationException("Validator set <%s> contains "
"a step with no validator name",
name)
if not hasattr(validators, step[0]):
raise ConfigValidationException("Validator set <%s> contains an unknown validator <%s>", name, step[0])
raise ConfigValidationException("Validator set <%s> contains "
"an unknown validator <%s>",
name, step[0])
def setup_app(config):

View File

@ -11,7 +11,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from .results import AuthDetails, AUTH_FAILED
from .results import AUTH_FAILED
from .results import AuthDetails
from pecan import conf
@ -24,8 +25,8 @@ if conf.auth.get('keystone'):
def validate(user, secret):
if conf.auth.get('static'):
if secret == conf.auth['static']['secret'] and \
user == conf.auth['static']['user']:
if (secret == conf.auth['static']['secret'] and
user == conf.auth['static']['user']):
return AuthDetails(username=conf.auth['static']['user'], groups=[])
if conf.auth.get('ldap'):

View File

@ -11,10 +11,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from .results import AuthDetails, AUTH_FAILED
from .results import AUTH_FAILED
from .results import AuthDetails
import json
import logging
from pecan import conf
import requests

View File

@ -13,7 +13,8 @@
from __future__ import absolute_import
from .results import AuthDetails, AUTH_FAILED
from .results import AUTH_FAILED
from .results import AuthDetails
import ldap
import ldap.filter

View File

@ -20,7 +20,9 @@ import os
import sys
import time
import uuid
from pecan import conf
from . import validators
logger = logging.getLogger(__name__)
@ -110,7 +112,7 @@ def sign(csr):
new_cert.set_version(0)
start_time = int(time.time())
end_time = start_time+(conf.ca['valid_hours']*60*60)
end_time = start_time + (conf.ca['valid_hours'] * 60 * 60)
new_cert.set_not_before(start_time)
new_cert.set_not_after(end_time)

View File

@ -11,8 +11,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from pecan import expose, request, response
from .. import auth, certificate_ops, validators
from pecan import expose
from pecan import request
from pecan import response
from .. import auth
from .. import certificate_ops
from .. import validators
import logging

View File

@ -11,9 +11,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
import logging
import netaddr
logger = logging.getLogger(__name__)
@ -48,7 +50,8 @@ def check_networks(domain, allowed_networks):
def common_name(csr=None, allowed_domains=[], allowed_networks=[], **kwargs):
"""
"""Check CN entire is a known domain.
Refuse requests for certificates if they contain multiple CN
entries, or the domain does not match the list of known suffixes
or network ranges.
@ -78,7 +81,8 @@ def common_name(csr=None, allowed_domains=[], allowed_networks=[], **kwargs):
def alternative_names(csr=None, allowed_domains=[], allowed_networks=[],
**kwargs):
"""
"""Check known domain alternative names.
Refuse requests for certificates if the domain does not match
the list of known suffixes, or network ranges.
"""
@ -99,7 +103,8 @@ def alternative_names(csr=None, allowed_domains=[], allowed_networks=[],
def server_group(auth_result=None, csr=None, group_prefixes={}, **kwargs):
"""
"""Check Team prefix.
Make sure that for server names containing a team prefix, the team is
verified against the groups the user is a member of.
"""
@ -115,9 +120,7 @@ def server_group(auth_result=None, csr=None, group_prefixes={}, **kwargs):
def extensions(csr=None, allowed_extensions=[], **kwargs):
"""
Ensure only accepted extensions are used
"""
"""Ensure only accepted extensions are used."""
exts = csr.get_extensions() or []
for ext in exts:
if ext.get_name() not in allowed_extensions:
@ -126,9 +129,7 @@ def extensions(csr=None, allowed_extensions=[], **kwargs):
def key_usage(csr=None, allowed_usage=None, **kwargs):
"""
Ensure only accepted key usages are specified
"""
"""Ensure only accepted key usages are specified."""
allowed = set(allowed_usage)
for ext in (csr.get_extensions() or []):
@ -140,9 +141,7 @@ def key_usage(csr=None, allowed_usage=None, **kwargs):
def ca_status(csr=None, ca_requested=False, **kwargs):
"""
Ensure the request has/hasn't got the CA flag
"""
"""Ensure the request has/hasn't got the CA flag."""
for ext in (csr.get_extensions() or []):
ext_name = ext.get_name()
@ -173,9 +172,7 @@ def ca_status(csr=None, ca_requested=False, **kwargs):
def source_cidrs(request=None, cidrs=None, **kwargs):
"""
Ensure that the request comes from a known source
"""
"""Ensure that the request comes from a known source."""
for cidr in cidrs:
try:
r = netaddr.IPNetwork(cidr)

View File

@ -27,8 +27,9 @@ commands = python setup.py testr --coverage --testr-args='{posargs}'
# F403 unable to detect undefined names
# H104 file contains nothing but comments
# H302 import only modules
# H301,H305,H405,H404,H306,E226,H904,H307,F401
show-source = True
ignore = E123,E125,H303,F403,H104,H302,E501,H301,H305,H405,H404,H306,E226,H904,H307,F401
ignore = E123,E125,H303,F403,H104,H302,E501,H307,H304
builtins = _
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build