Added first test.

This commit is contained in:
Pino de Candia 2017-10-24 17:58:57 +00:00
parent 759aedf52a
commit 93782dc2b8
7 changed files with 79 additions and 33 deletions

View File

@ -1,18 +1,18 @@
import falcon
import models
from db.persistence import SQLAlchemySessionManager
from tatu.db.persistence import SQLAlchemySessionManager
def create_app():
api = falcon.API(middleware=[SQLAlchemySessionManager()])
def create_app(sa):
api = falcon.API(middleware=[sa])
api.add_route('/authorities', models.Authorities())
api.add_route('/authorities/{uuid}', models.Authority())
api.add_route('/users/{uuid}/certs', models.UserCerts())
api.add_route('/users/{uuid}/certs/{fingerprint}', models.UserCert())
api.add_route('/hosts/{uuid}/certs', models.HostCerts())
api.add_route('/hosts/{uuid}/certs/{fingerprint}', models.HostCert())
api.add_route('/host_cert_token', models.Tokens())
api.add_route('/host_cert_tokens', models.Token())
return api
def get_app():
return create_app()
return create_app(SQLAlchemySessionManager())

View File

@ -37,3 +37,7 @@ class HostCert(object):
def on_post(self, req, resp):
resp.status = falcon.HTTP_400
class Token(object):
def on_post(self, req, resp):
resp.status = falcon.HTTP_400

View File

@ -4,6 +4,7 @@ import sshpubkeys
import uuid
import subprocess
import os
from tatu.utils import generateCert
Base = declarative_base()
@ -40,30 +41,6 @@ class UserCert(Base):
pubkey = sa.Column(sa.Text)
cert = sa.Column(sa.Text)
def generateCert(auth_key, entity_key, host_name=None):
# Temporarily write the authority private key and entity public key to /tmp
ca_file = '/tmp'.join(uuid.uuid4().hex)
pub_prefix = uuid.uuid4().hex
pub_file = ''.join('/tmp/', pub_prefix, '.pub')
with open(ca_file, "w") as text_file:
text_file.write(auth_key)
with open(pub_file, "w") as text_file:
text_file.write(entity_key)
# Call keygen
if host_name is None:
subprocess.call(['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n "myRoot,yourRoot"', pub_file], shell=True)
else:
subprocess.call(['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n', host_name, '-h', pub_file], shell=True)
# Read the contents of the certificate file
cert_file = ''.join('/tmp/', pub_prefix, '-cert.pub')
cert = ''
with open(cert_file, 'r') as text_file:
cert = text_file.read()
# Delete temporary files
for file in [ca_file, pub_file, cert_file]:
os.remove(file)
return cert
def createUserCert(session, id, auth_id, pub, priv):
with session:
user = User(id=id,
@ -88,10 +65,10 @@ class Token(Base):
default=generate_uuid)
hostname = sa.Column(sa.String(36))
instance_id = sa.Column(sa.String(36))
auth_id = sa.Column(sa.String(36), ForeignKey('authorities.id'))
auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.id'))
used = sa.Column(sa.Boolean)
date_used = sa.Column(sa.Date)
fingerprint_used = sa.Column(sa.String(36), optional)
fingerprint_used = sa.Column(sa.String(36), default=False)
def createToken(session, instance_id, auth_id, hostname):
with session:
@ -127,7 +104,7 @@ def createHostCert(session, token_id, pub):
if auth is None:
raise falcon.HTTPNotFound("Unrecognized certificate authority")
host = HostCert(id=token.instance_id,
fingerprint=sshpubkeys.SSHKey(pub).hash()
fingerprint=sshpubkeys.SSHKey(pub).hash(),
token_id=token_id,
pubkey=pub,
cert=generateCert(auth.host_privkey, pub))

View File

@ -15,7 +15,7 @@ class SQLAlchemySessionManager:
def __init__(self):
self.engine = create_engine(get_url())
self.Session = scoped_session(sessionmaker(self._engine))
self.Session = scoped_session(sessionmaker(self.engine))
def process_resource(self, req, resp, resource, params):
resource.session = self.Session()

0
tatu/tests/__init__.py Normal file
View File

41
tatu/tests/test_app.py Normal file
View File

@ -0,0 +1,41 @@
# coding=utf-8
import json
import falcon
from falcon import testing
import msgpack
import pytest
import uuid
from tatu.api.app import create_app
from tatu.db.persistence import SQLAlchemySessionManager
from tatu.db.models import Authority
from Crypto.PublicKey import RSA
@pytest.fixture
def db():
return SQLAlchemySessionManager()
@pytest.fixture
def client(db):
api = create_app(db)
return testing.TestClient(api)
def test_post_authority(client, db):
auth_id = str(uuid.uuid4())
user_ca = RSA.generate(2048)
host_ca = RSA.generate(2048)
body = {
'íd': auth_id,
'user_privkey': user_ca.exportKey('PEM'),
'user_pubkey': user_ca.publickey().exportKey('OpenSSH'),
'host_privkey': host_ca.exportKey('PEM'),
'host_pubkey': host_ca.publickey().exportKey('OpenSSH')
}
response = client.simulate_post(
'/authorities',
body=json.dumps(body),
)
#with db.Session() as session:
session = db.Session()
auth = session.query(Authority).get(auth_id)
assert auth is not None

24
tatu/utils.py Normal file
View File

@ -0,0 +1,24 @@
def generateCert(auth_key, entity_key, host_name=None):
# Temporarily write the authority private key and entity public key to /tmp
ca_file = '/tmp'.join(uuid.uuid4().hex)
pub_prefix = uuid.uuid4().hex
pub_file = ''.join('/tmp/', pub_prefix, '.pub')
with open(ca_file, "w") as text_file:
text_file.write(auth_key)
with open(pub_file, "w") as text_file:
text_file.write(entity_key)
# Call keygen
if host_name is None:
subprocess.call(['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n "myRoot,yourRoot"', pub_file], shell=True)
else:
subprocess.call(['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n', host_name, '-h', pub_file], shell=True)
# Read the contents of the certificate file
cert_file = ''.join('/tmp/', pub_prefix, '-cert.pub')
cert = ''
with open(cert_file, 'r') as text_file:
cert = text_file.read()
# Delete temporary files
for file in [ca_file, pub_file, cert_file]:
os.remove(file)
return cert