Use hash_md5 instead of deprecated hash. Fixed fd leak.

This commit is contained in:
Pino de Candia 2017-11-22 21:09:49 +00:00
parent 849152f16d
commit 6235f5e2b0
8 changed files with 128 additions and 22 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*.pyc
*.swp
*.swo

View File

@ -36,10 +36,13 @@ class Logger(object):
def __init__(self):
self.logger = logging.getLogger('gunicorn.error')
def process_resource(self, req, resp, resource, params):
self.logger.debug('Received request {0} {1}'.format(req.method, req.relative_uri))
def process_response(self, req, resp, resource, params):
self.logger.debug(
'Request {0} {1} with body {2} produced'
'response with status {3} location {4} and body {5}'.format(
'Request {0} {1} with body {2} produced response '
'with status {3} location {4} and body {5}'.format(
req.method, req.relative_uri,
req.body if hasattr(req, 'body') else 'None',
resp.status, resp.location, resp.body))

View File

@ -48,7 +48,7 @@ def createUserCert(session, user_id, auth_id, pub):
auth = getAuthority(session, auth_id)
if auth is None:
raise falcon.HTTPNotFound(description='No Authority found with that ID')
fingerprint = sshpubkeys.SSHKey(pub).hash()
fingerprint = sshpubkeys.SSHKey(pub).hash_md5()
certRecord = session.query(UserCert).get([user_id, fingerprint])
if certRecord is not None:
raise falcon.HTTPConflict('This public key is already signed.')
@ -117,7 +117,7 @@ def createHostCert(session, token_id, host_id, pub):
raise falcon.HTTPNotFound(description='No Token found with that ID')
if token.host_id != host_id:
raise falcon.HTTPConflict(description='The token is not valid for this instance ID')
fingerprint = sshpubkeys.SSHKey(pub).hash()
fingerprint = sshpubkeys.SSHKey(pub).hash_md5()
if token.used:
if token.fingerprint_used != fingerprint:

View File

@ -7,6 +7,7 @@ from tatu.db.models import Base
def get_url():
return os.getenv("DATABASE_URL", "sqlite:///development.db")
#return os.getenv("DATABASE_URL", "sqlite:///:memory:")
class SQLAlchemySessionManager:
"""

0
tatu/ftests/__init__.py Normal file
View File

97
tatu/ftests/test_api.py Normal file
View File

@ -0,0 +1,97 @@
import json
import requests
import os
from tatu.utils import random_uuid
from Crypto.PublicKey import RSA
import sshpubkeys
import uuid
server = 'http://127.0.0.1:18321'
def vendordata_request(instance_id, project_id, hostname):
return {
'instance-id': instance_id,
'project-id': project_id,
'hostname': hostname
}
def host_request(token, host, pub_key):
return {
'token_id': token,
'host_id': host,
'key.pub': pub_key
}
def test_host_certificate_generation():
project_id = random_uuid()
response = requests.post(
server + '/authorities',
data=json.dumps({'auth_id': project_id})
)
assert response.status_code == 201
assert 'location' in response.headers
assert response.headers['location'] == '/authorities/' + project_id
response = requests.get(server + response.headers['location'])
assert response.status_code == 200
auth = json.loads(response.content)
assert 'auth_id' in auth
assert auth['auth_id'] == project_id
assert 'user_key.pub' in auth
assert 'host_key.pub' in auth
ca_user = auth['user_key.pub']
key = RSA.generate(2048)
pub_key = key.publickey().exportKey('OpenSSH')
fingerprint = sshpubkeys.SSHKey(pub_key).hash_md5()
for i in range(100):
instance_id = random_uuid()
hostname = 'host{}'.format(i)
# Simulate Nova's separate requests for each version of metadata API
vendordata = None
token = None
for j in range(3):
response = requests.post(
server + '/novavendordata',
data=json.dumps(vendordata_request(instance_id, project_id, hostname))
)
assert response.status_code == 201
assert 'location' in response.headers
location_path = response.headers['location'].split('/')
assert location_path[1] == 'hosttokens'
vendordata = json.loads(response.content)
assert 'token' in vendordata
tok = vendordata['token']
if token is None:
token = tok
else:
assert token == tok
assert token == location_path[-1]
assert 'auth_pub_key_user' in vendordata
assert vendordata['auth_pub_key_user'] == ca_user
assert 'principals' in vendordata
assert vendordata['principals'] == 'admin'
response = requests.post(
server + '/hostcerts',
data=json.dumps(host_request(token, instance_id, pub_key))
)
assert response.status_code == 201
assert 'location' in response.headers
location = response.headers['location']
location_path = location.split('/')
assert location_path[1] == 'hostcerts'
assert location_path[2] == instance_id
assert location_path[3] == fingerprint
response = requests.get(server + location)
assert response.status_code == 200
hostcert = json.loads(response.content)
assert 'host_id' in hostcert
assert hostcert['host_id'] == instance_id
assert 'fingerprint' in hostcert
assert hostcert['fingerprint']
assert 'auth_id' in hostcert
auth_id = str(uuid.UUID(hostcert['auth_id'], version=4))
assert auth_id == project_id
assert 'key-cert.pub' in hostcert

View File

@ -10,6 +10,7 @@ from tatu.db.models import Authority
from tatu.utils import random_uuid
from Crypto.PublicKey import RSA
import sshpubkeys
import time
@pytest.fixture
def db():
@ -25,12 +26,12 @@ token_id = ''
host_id = random_uuid()
host_key = RSA.generate(2048)
host_pub_key = host_key.publickey().exportKey('OpenSSH')
host_fingerprint = sshpubkeys.SSHKey(host_pub_key).hash()
host_fingerprint = sshpubkeys.SSHKey(host_pub_key).hash_md5()
user_id = random_uuid()
user_key = RSA.generate(2048)
user_pub_key = user_key.publickey().exportKey('OpenSSH')
user_fingerprint = sshpubkeys.SSHKey(user_pub_key).hash()
user_fingerprint = sshpubkeys.SSHKey(user_pub_key).hash_md5()
auth_id = random_uuid()
auth_user_pub_key = None
@ -47,10 +48,6 @@ def test_post_authority(client, auth_id=auth_id):
assert response.status == falcon.HTTP_CREATED
assert response.headers['location'] == '/authorities/' + auth_id
def test_stress_post_authority(client):
for i in range(10000):
test_post_authority(client, auth_id=random_uuid())
@pytest.mark.dependency(depends=['test_post_authority'])
def test_post_authority_duplicate(client):
body = {
@ -135,7 +132,7 @@ def test_post_user(client):
location = response.headers['location'].split('/')
assert location[1] == 'usercerts'
assert location[2] == body['user_id']
assert location[3] == sshpubkeys.SSHKey(body['key.pub']).hash()
assert location[3] == sshpubkeys.SSHKey(body['key.pub']).hash_md5()
@pytest.mark.dependency(depends=['test_post_user'])
def test_get_user(client):
@ -170,7 +167,7 @@ def test_post_second_cert_same_user(client):
location = response.headers['location'].split('/')
assert location[1] == 'usercerts'
assert location[2] == user_id
assert location[3] == sshpubkeys.SSHKey(pub_key).hash()
assert location[3] == sshpubkeys.SSHKey(pub_key).hash_md5()
def test_post_user_unknown_auth(client):
body = user_request(auth=random_uuid())
@ -288,15 +285,18 @@ def test_post_token_and_host(client):
assert location[2] == host_id
assert location[3] == host_fingerprint
def test_stress_post_token_same_host_id(client):
def test_stress_post_token_and_host(client):
my_auth_id = random_uuid()
test_post_authority(client, my_auth_id)
for i in range(10000):
# Generate a single RSA key pair and reuse it - it takes a few seconds.
key = RSA.generate(2048)
pub_key = key.publickey().exportKey('OpenSSH')
fingerprint = sshpubkeys.SSHKey(pub_key).hash_md5()
# Should do about 15 iterations/second, so only do 4 seconds worth.
start = time.time()
for i in range(60):
hid = random_uuid()
key = RSA.generate(2048)
pub_key = key.publickey().exportKey('OpenSSH')
fingerprint = sshpubkeys.SSHKey(key).hash()
token = token_request(auth=auth_id, host=hid)
token = token_request(auth=my_auth_id, host=hid)
response = client.simulate_post(
'/hosttokens',
body=json.dumps(token)
@ -305,6 +305,7 @@ def test_stress_post_token_same_host_id(client):
assert 'location' in response.headers
location_path = response.headers['location'].split('/')
assert location_path[1] == 'hosttokens'
token_id = location_path[-1]
# Verify that it's a valid UUID
uuid.UUID(token_id, version=4)
host = host_request(token_id, host=hid, pub_key=pub_key)
@ -316,9 +317,9 @@ def test_stress_post_token_same_host_id(client):
assert 'location' in response.headers
location = response.headers['location'].split('/')
assert location[1] == 'hostcerts'
assert location[2] == host_id
assert location[2] == hid
assert location[3] == fingerprint
assert time.time() - start < 5
@pytest.mark.dependency(depends=['test_post_token_and_host'])
def test_post_token_same_host_id(client):

View File

@ -15,7 +15,8 @@ def generateCert(auth_key, entity_key, hostname=None, principals='root'):
cert_file = ''.join([dir, prefix, '-cert.pub'])
cert = ''
try:
os.open(ca_file, os.O_WRONLY | os.O_CREAT, 0o600)
fd = os.open(ca_file, os.O_WRONLY | os.O_CREAT, 0o600)
os.close(fd)
with open(ca_file, "w") as text_file:
text_file.write(auth_key)
with open(pub_file, "w", 0o644) as text_file:
@ -26,7 +27,7 @@ def generateCert(auth_key, entity_key, hostname=None, principals='root'):
args.extend(['-n', principals, pub_file])
else:
args.extend(['-h', pub_file])
print subprocess.check_output(args, stderr=subprocess.STDOUT)
subprocess.check_output(args, stderr=subprocess.STDOUT)
# Read the contents of the certificate file
cert = ''
with open(cert_file, 'r') as text_file: