Changed config to use json not .py

- Co-authored by Bryan D. Payne <bdpayne@acm.org>

Change-Id: I89be207597331b3cd3debb7089e823b8c98077e6
Closes-Bug: 1398480
This commit is contained in:
Bryan D. Payne 2015-02-20 11:47:07 -08:00 committed by Doug Chivers
parent 1051ab49ae
commit 93e635668e
11 changed files with 259 additions and 116 deletions

View File

@ -13,6 +13,8 @@
import pecan
from anchor import jsonloader
# One time, on import, we want to safely build a list of the auth
# modules listed in the config that we should be using for validate.
@ -21,7 +23,7 @@ import pecan
# imcomplete or malformed.
AUTH_MODULES = []
try:
for auth_type in pecan.conf.to_dict().get('auth', {}).keys():
for auth_type in jsonloader.conf.auth.keys():
try:
module_name = "{}.{}".format(__name__, auth_type)
module = __import__(module_name, fromlist=[''])

View File

@ -14,10 +14,10 @@
import json
import logging
import pecan
import requests
from anchor.auth import results
from anchor import jsonloader
logger = logging.getLogger(__name__)
@ -35,7 +35,8 @@ def login(_, token):
"token": {
"id": token
}}}})
req = requests.post(pecan.conf.auth['keystone']['url'] + '/v3/auth/tokens',
req = requests.post(jsonloader.conf.auth['keystone']['url'] +
'/v3/auth/tokens',
headers={'Content-Type': 'application/json'},
data=data)
if req.status_code != 200:

View File

@ -13,9 +13,9 @@
import ldap
import ldap.filter
import pecan
from anchor.auth import results
from anchor import jsonloader
def user_get_groups(attributes):
@ -36,15 +36,16 @@ def login(user, secret):
:param secret: Secret/Passphrase
:returns: AuthDetails -- Class used for authentication information
"""
ldo = ldap.initialize("ldap://%s" % (pecan.conf.auth['ldap']['host'],))
ldo = ldap.initialize("ldap://%s" % (jsonloader.conf.auth['ldap']['host']))
ldo.set_option(ldap.OPT_REFERRALS, 0)
try:
ldo.simple_bind_s("%s@%s" % (user, pecan.conf.auth['ldap']['domain']),
ldo.simple_bind_s("%s@%s" % (user,
jsonloader.conf.auth['ldap']['domain']),
secret)
filter_str = ('(sAMAccountName=%s)' %
ldap.filter.escape_filter_chars(user))
ret = ldo.search_s(pecan.conf.auth['ldap']['base'],
ret = ldo.search_s(jsonloader.conf.auth['ldap']['base'],
ldap.SCOPE_SUBTREE,
filterstr=filter_str,
attrlist=['memberOf'])

View File

@ -13,9 +13,8 @@
import logging
import pecan
from anchor.auth import results
from anchor import jsonloader
from anchor import util
@ -44,31 +43,31 @@ def login(user, secret):
# expected values
try:
e_user = str(pecan.conf.auth['static']['user'])
e_pass = str(pecan.conf.auth['static']['secret'])
expected_user = str(jsonloader.conf.auth['static']['user'])
expected_secret = str(jsonloader.conf.auth['static']['secret'])
except (KeyError, TypeError):
logger.warn("auth conf missing static user or secret")
return None
# In python, len(<string>) is O(1)
# Short circuit this if lengths don't match
if len(user) != len(e_user):
if len(user) != len(expected_user):
logger.info("failed static auth: invalid username ({})".format(user))
return None
if len(secret) != len(e_pass):
if len(secret) != len(expected_secret):
logger.info("failed static auth: invalid password")
return None
# This technique is used to provide a constant time string compare
# between the user input and the expected values.
valid_user = util.constant_time_compare(user, e_user)
valid_pass = util.constant_time_compare(secret, e_pass)
valid_user = util.constant_time_compare(user, expected_user)
valid_secret = util.constant_time_compare(secret, expected_secret)
# This if statement results in a potential timing attack where the
# statement could return more quickly if valid_secret=False. We
# do not see an obvious solution to this problem, but also believe
# that leaking which input was valid isn't as big of a concern.
if valid_user and valid_pass:
return results.AuthDetails(username=e_user, groups=[])
if valid_user and valid_secret:
return results.AuthDetails(username=expected_user, groups=[])
logger.info("failed static auth for user {}".format(user))

View File

@ -19,6 +19,7 @@ import uuid
import pecan
from anchor import jsonloader
from anchor import validators
from anchor.X509 import certificate
from anchor.X509 import signing_request
@ -62,14 +63,14 @@ def parse_csr(csr, encoding):
def validate_csr(auth_result, csr, request):
args = {'auth_result': auth_result,
'csr': csr,
'conf': pecan.conf,
'conf': jsonloader.conf,
'request': request}
# It is ok if the config doesn't have any validators listed
# so we set the initial state to valid.
valid = True
for validator_set in pecan.conf.validators:
for validator_set in jsonloader.conf.validators:
logger.debug("Checking validators set <%s>",
validator_set.get("name"))
@ -119,14 +120,14 @@ def sign(csr):
try:
ca = certificate.X509Certificate()
ca.from_file(pecan.conf.ca["cert_path"])
ca.from_file(jsonloader.conf.ca["cert_path"])
except Exception as e:
logger.exception("Cannot load the signing CA: %s", e)
pecan.abort(500, "certificate signing error")
try:
key_data = None
with open(pecan.conf.ca["key_path"]) as f:
with open(jsonloader.conf.ca["key_path"]) as f:
key_data = f.read()
key = X509_utils.load_pem_private_key(key_data)
except Exception as e:
@ -137,7 +138,7 @@ def sign(csr):
new_cert.set_version(2)
start_time = int(time.time())
end_time = start_time + (pecan.conf.ca['valid_hours'] * 60 * 60)
end_time = start_time + (jsonloader.conf.ca['valid_hours'] * 60 * 60)
new_cert.set_not_before(start_time)
new_cert.set_not_after(end_time)
@ -157,11 +158,12 @@ def sign(csr):
logger.info("Signing certificate for <%s> with serial <%s>",
csr.get_subject(), serial)
new_cert.sign(key, pecan.conf.ca['signing_hash'])
new_cert.sign(key, jsonloader.conf.ca['signing_hash'])
path = os.path.join(
pecan.conf.ca['output_path'],
'%s.crt' % new_cert.get_fingerprint(pecan.conf.ca['signing_hash']))
jsonloader.conf.ca['output_path'],
'%s.crt' % new_cert.get_fingerprint(
jsonloader.conf.ca['signing_hash']))
logger.info("Saving certificate to: %s", path)
new_cert.save(path)

64
anchor/jsonloader.py Normal file
View File

@ -0,0 +1,64 @@
# -*- coding:utf-8 -*-
#
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import sys
logger = logging.getLogger(__name__)
class AnchorConf():
ca = None
_config = None
_logger = None
_settings = dict()
def __init__(self, logger, config_file):
'''Attempt to initialize a config dictionary from a yaml file.
Error out if loading the yaml file fails for any reason.
:param logger: Logger to be used in the case of errors
:param config_file: The Anchor yaml config file
:return: -
'''
self._logger = logger
try:
f = open(config_file, 'r')
except IOError:
logger.error("could not open config file: %s" % config_file)
sys.exit(2)
else:
# yaml parser does its own exception handling
self._config = json.load(f)
@property
def config(self):
'''Property to return the config dictionary
:return: Config dictionary
'''
return self._config
def __getattr__(self, name):
return self._config[name]
conf = AnchorConf(logger, "config.json")

153
config.json Executable file
View File

@ -0,0 +1,153 @@
{
"auth": {
"static": {
"secret": "simplepassword",
"user": "myusername"
}
},
"ca": {
"cert_path": "CA/root-ca.crt",
"key_path": "CA/root-ca-unwrapped.key",
"output_path": "certs",
"signing_hash": "sha1",
"valid_hours": 24
},
"logging": {
"formatters": {
"simple": {
"format": "%(asctime)s %(levelname)-5.5s [%(name)s][%(process)d/%(threadName)s] %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "simple",
"level": "DEBUG"
}
},
"loggers": {
"anchor": {
"level": "DEBUG"
},
"wsgi": {
"level": "INFO"
}
},
"root": {
"handlers": [
"console"
],
"level": "INFO"
}
},
"validators": [
{
"name": "default",
"steps": [
[
"common_name",
{
"allowed_domains": [
".example.com"
]
}
],
[
"alternative_names",
{
"allowed_domains": [
".example.com"
]
}
],
[
"server_group",
{
"group_prefixes": {
"bk": "Bock_Team",
"cs": "CS_Team",
"gl": "Glance_Team",
"mb": "MB_Team",
"nv": "Nova_Team",
"ops": "SysEng_Team",
"qu": "Neutron_Team",
"sw": "Swift_Team"
}
}
],
[
"extensions",
{
"allowed_extensions": [
"keyUsage",
"subjectAltName",
"basicConstraints",
"subjectKeyIdentifier"
]
}
],
[
"key_usage",
{
"allowed_usage": [
"Digital Signature",
"Key Encipherment",
"Non Repudiation",
"Certificate Sign",
"CRL Sign"
]
}
],
[
"ca_status",
{
"ca_requested": false
}
],
[
"source_cidrs",
{
"cidrs": [
"127.0.0.0/8"
]
}
]
]
},
{
"name": "ip",
"steps": [
[
"common_name",
{
"allowed_networks": [
"127/8"
]
}
],
[
"alternative_names",
{
"allowed_networks": [
"127/8"
]
}
],
[
"ca_status",
{
"ca_requested": false
}
],
[
"source_cidrs",
{
"cidrs": [
"127.0.0.0/8"
]
}
]
]
}
]
}

View File

@ -15,80 +15,3 @@ app = {
'__force_dict__': True
}
}
auth = {
'static': {
'user': 'woot',
'secret': 'woot',
},
# 'ldap': {
# 'host': "ldap.host.com",
# 'domain': "host.com",
# 'base': "CN=Users,DC=host,DC=com",
# },
# 'keystone': {
# 'url': 'https://keystone.example.com:35357',
# },
}
validators = [
{
"name": "common",
"steps": [
('common_name', {'allowed_domains': ['.example.com']}),
('alternative_names', {'allowed_domains': ['.example.com']}),
('server_group', {'group_prefixes': {
'nv': 'Nova_Team',
'sw': 'Swift_Team',
'bk': 'Bock_Team',
'gl': 'Glance_Team',
'cs': 'CS_Team',
'mb': 'MB_Team',
'ops': 'SysEng_Team',
'qu': 'Neutron_Team',
}}),
('extensions', {'allowed_extensions': ['keyUsage', 'subjectAltName', 'basicConstraints', 'subjectKeyIdentifier']}),
('key_usage', {'allowed_usage': ['Digital Signature', 'Key Encipherment', 'Non Repudiation', 'Certificate Sign', 'CRL Sign']}),
('ca_status', {'ca_requested': False}),
('source_cidrs', {'cidrs': ["127.0.0.0/8"]}),
]
},
{
"name": "ip",
"steps": [
('common_name', {'allowed_networks': ['127/8']}),
('alternative_names', {'allowed_networks': ['127/8']}),
('ca_status', {'ca_requested': False}),
('source_cidrs', {'cidrs': ["127.0.0.0/8"]}),
]
},
]
ca = {
'cert_path': "CA/root-ca.crt",
'key_path': "CA/root-ca-unwrapped.key",
'output_path': "certs",
'valid_hours': 24,
'signing_hash': "sha1",
}
logging = {
'root': {'level': 'INFO', 'handlers': ['console']},
'loggers': {
'anchor': {'level': 'DEBUG'},
'wsgi': {'level': 'INFO'},
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'simple'
}
},
'formatters': {
'simple': {
'format': ('%(asctime)s %(levelname)-5.5s [%(name)s]'
'[%(process)d/%(threadName)s] %(message)s')
}
}
}

View File

@ -14,17 +14,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import textwrap
import unittest
import textwrap
from cryptography.hazmat.backends.openssl import backend
import mock
from anchor.X509 import errors as x509_errors
from anchor.X509 import signing_request
from cryptography.hazmat.backends.openssl import backend
class TestX509Csr(unittest.TestCase):
csr_data = textwrap.dedent("""

View File

@ -30,7 +30,7 @@ class AuthStaticTests(unittest.TestCase):
def test_validate_static(self):
"""Test all static user/pass authentication paths."""
config = 'pecan.conf.__values__'
config = "anchor.jsonloader.conf._config"
data = {'auth': {'static': {'secret': 'simplepassword',
'user': 'myusername'}}}
@ -53,7 +53,7 @@ class AuthStaticTests(unittest.TestCase):
def test_validate_static_malformed1(self):
"""Test static user/pass authentication with malformed config."""
config = 'pecan.conf.__values__'
config = "anchor.jsonloader.conf._config"
data = {'auth': {'static': {}}}
with mock.patch.dict(config, data):
@ -64,7 +64,7 @@ class AuthStaticTests(unittest.TestCase):
def test_validate_static_malformed2(self):
"""Test static user/pass authentication with malformed config."""
config = 'pecan.conf.__values__'
config = "anchor.jsonloader.conf._config"
data = {'auth': {}}
with mock.patch.dict(config, data):

View File

@ -82,7 +82,7 @@ class CertificateOpsTests(unittest.TestCase):
certificate_ops.parse_csr(self.csr, 'blah')
def test_parse_csr_fail2(self):
"""Test invalid CSR format (wrong type) for parse_csr."""
"""Test invalid CSR format (wrong type) for parse_csr."""
with self.assertRaises(http_status.HTTPClientError):
certificate_ops.parse_csr(self.csr, True)
@ -99,7 +99,7 @@ class CertificateOpsTests(unittest.TestCase):
def test_validate_csr_success(self):
"""Test basic success path for validate_csr."""
csr_obj = certificate_ops.parse_csr(self.csr, 'pem')
config = "pecan.conf.__values__"
config = "anchor.jsonloader.conf._config"
validators = [{'name': 'common',
'steps': [
('extensions', {'allowed_extensions': []})]}]
@ -110,7 +110,7 @@ class CertificateOpsTests(unittest.TestCase):
def test_validate_csr_fail1(self):
"""Test empty validator set for validate_csr."""
config = "pecan.conf.__values__"
config = "anchor.jsonloader.conf._config"
data = {'validators': []}
with mock.patch.dict(config, data):
@ -119,7 +119,7 @@ class CertificateOpsTests(unittest.TestCase):
def test_validate_csr_fail2(self):
"""Test invalid validator set (no tuples) for validate_csr."""
config = "pecan.conf.__values__"
config = "anchor.jsonloader.conf._config"
validators = [{'name': 'common', 'steps': [True]}]
data = {'validators': validators}
@ -129,7 +129,7 @@ class CertificateOpsTests(unittest.TestCase):
def test_validate_csr_fail3(self):
"""Test invalid validator set (tuple too long) for validate_csr."""
config = "pecan.conf.__values__"
config = "anchor.jsonloader.conf._config"
validators = [{'name': 'common', 'steps': [(1, 2, 3)]}]
data = {'validators': validators}
@ -139,7 +139,7 @@ class CertificateOpsTests(unittest.TestCase):
def test_validate_csr_fail4(self):
"""Test invalid validator set (bogus validator) for validate_csr."""
config = "pecan.conf.__values__"
config = "anchor.jsonloader.conf._config"
validators = [{'name': 'common', 'steps': [('no_such_method')]}]
data = {'validators': validators}
@ -150,7 +150,7 @@ class CertificateOpsTests(unittest.TestCase):
def test_validate_csr_fail5(self):
"""Test validate_csr with a validator that should fail."""
csr_obj = certificate_ops.parse_csr(self.csr, 'pem')
config = "pecan.conf.__values__"
config = "anchor.jsonloader.conf._config"
validators = [{'name': 'common', 'steps': [('common_name')]}]
data = {'validators': validators}