Add action to generate root CA

Allows Vault to act as the root CA, rather than an intermediary CA.

Adds an action to fetch the root CA certificate after the fact.
Cleans up interaction with the relation by using the Endpoint refactor.
Adds config option to enable automatic generation of the root CA.
Increases timeout checking for Vault health to 5 min, since we've seen 4
minute startup times in CI.

Change-Id: Ic4e481452a46cc0500437113640509b387912ddc
Partial-Bug: 1776976
Depends-On: https://github.com/juju-solutions/interface-tls-certificates/pull/6
Signed-off-by: Cory Johns <johnsca@gmail.com>
This commit is contained in:
Cory Johns 2018-09-24 15:31:15 -04:00
parent e621b4dec0
commit 98684d7482
11 changed files with 612 additions and 740 deletions

View File

@ -65,3 +65,60 @@ upload-signed-csr:
- pem
reissue-certificates:
description: Reissue certificates to all clients
generate-root-ca:
description: Generate a self-signed root CA
properties:
ttl:
type: string
default: '87598h'
description: >-
Specifies the Time To Live for the root CA certificate
allow-any-name:
type: boolean
default: True
description: >-
Specifies if clients can request certificates for any CN.
allowed-domains:
type: array
items:
type: string
default: []
description: >-
Restricted list of CNs for which the root CA may issue certificates.
If domains are provided, allow-any-name should be set to false.
allow-bare-domains:
type: boolean
default: False
description: >-
Specifies whether clients can request certificates exactly matching
the CNs in allowed-domains.
allow-subdomains:
type: boolean
default: False
description: >-
Specifies whether clients can request certificates for subdomains of
the CNs in allowed-domains, including wildcard subdomains.
allow-glob-domains:
type: boolean
default: True
description: >-
Specifies whether CNs in allowed-domains can contain glob patterns
(e.g., 'ftp*.example.com'), in which case clients will be able to
request certificates for any CN matching the glob pattern.
enforce-hostnames:
type: boolean
default: False
description: >-
Specifies if only valid host names are allowed
for CNs, DNS SANs, and the host part of email addresses.
max-ttl:
type: string
default: '696h'
description: >-
Specifies the maximum Time To Live for generated certificates.
get-root-ca:
description: Get the root CA certificate
disable-pki:
description: >-
Disable the PKI secrets backend. This is needed if you wish to switch the CA type
after being set up via either upload-signed-csr or generate-root-ca.

View File

@ -16,6 +16,7 @@
import base64
import os
import sys
from traceback import format_exc
# Load modules from $CHARM_DIR/lib
sys.path.append('lib')
@ -25,6 +26,7 @@ basic.bootstrap_charm_deps()
basic.init_config_states()
import charmhelpers.core.hookenv as hookenv
import charmhelpers.core.unitdata as unitdata
import charm.vault as vault
import charm.vault_pki as vault_pki
@ -81,10 +83,43 @@ def upload_signed_csr(*args):
enforce_hostnames=action_config.get('enforce-hostnames'),
allow_any_name=action_config.get('allow-any-name'),
max_ttl=action_config.get('max-ttl'))
set_flag('charm.vault.ca.ready')
def generate_root_ca(*args):
if not hookenv.is_leader():
hookenv.action_fail('Please run action on lead unit')
return
action_config = hookenv.action_get()
root_ca = vault_pki.generate_root_ca(
ttl=action_config['ttl'],
allow_any_name=action_config['allow-any-name'],
allowed_domains=action_config['allowed-domains'],
allow_bare_domains=action_config['allow-bare-domains'],
allow_subdomains=action_config['allow-subdomains'],
allow_glob_domains=action_config['allow-glob-domains'],
enforce_hostnames=action_config['enforce-hostnames'],
max_ttl=action_config['max-ttl'])
hookenv.leader_set({'root-ca': root_ca})
hookenv.action_set({'output': root_ca})
set_flag('charm.vault.ca.ready')
def get_root_ca(*args):
hookenv.action_set({'output': vault_pki.get_ca()})
def disable_pki(*args):
if not hookenv.is_leader():
hookenv.action_fail('Please run action on lead unit')
return
vault_pki.disable_pki_backend()
def reissue_certificates(*args):
charms.reactive.set_flag('certificates.reissue.requested')
charms.reactive.set_flag('certificates.reissue.global.requested')
# Actions to function mapping, to allow for illegal python action names that
# can map to a python function.
@ -94,6 +129,9 @@ ACTIONS = {
"get-csr": get_intermediate_csrs,
"upload-signed-csr": upload_signed_csr,
"reissue-certificates": reissue_certificates,
"generate-root-ca": generate_root_ca,
"get-root-ca": get_root_ca,
"disable-pki": disable_pki,
}
@ -106,10 +144,22 @@ def main(args):
else:
try:
action(args)
except Exception as e:
except vault.VaultError as e:
hookenv.action_fail(str(e))
except Exception:
exc = format_exc()
hookenv.log(exc, hookenv.ERROR)
hookenv.action_fail(exc.splitlines()[-1])
else:
charms.reactive.main()
# we were successful, so commit changes from the action
unitdata.kv().flush()
# try running handlers based on new state
try:
charms.reactive.main()
except Exception:
exc = format_exc()
hookenv.log(exc, hookenv.ERROR)
hookenv.action_fail(exc.splitlines()[-1])
if __name__ == "__main__":

1
src/actions/disable-pki Symbolic link
View File

@ -0,0 +1 @@
actions.py

View File

@ -0,0 +1 @@
actions.py

1
src/actions/get-root-ca Symbolic link
View File

@ -0,0 +1 @@
actions.py

View File

@ -64,3 +64,13 @@ options:
description: >-
FOR TESTING ONLY. Initialise vault after deployment and store the keys
locally.
auto-generate-root-ca-cert:
type: boolean
default: false
description: >-
Once unsealed, automatically generate a self-signed root CA rather
than waiting for an action to be called to either generate one or
process a signing request to act as an intermediary CA. Note that
this will use all default values for the root CA cert. If you want
to adjust those values, you should use the generate-root-ca action
instead.

View File

@ -83,8 +83,16 @@ VAULT_LOCALHOST_URL = "http://127.0.0.1:8220"
VAULT_HEALTH_URL = '{vault_addr}/v1/sys/health'
class VaultNotReady(Exception):
"""Exception raised for units in error state
class VaultError(Exception):
"""
Exception raised for Vault errors.
"""
pass
class VaultNotReady(VaultError):
"""
Exception raised for units in error state
"""
def __init__(self, reason):
@ -92,6 +100,13 @@ class VaultNotReady(Exception):
super(VaultNotReady, self).__init__(message)
class VaultInvalidRequest(VaultError):
"""
Exception raised if a cert request can't be fulfilled.
"""
pass
def binding_address(binding):
try:
return hookenv.network_get_primary_address(binding)
@ -203,7 +218,7 @@ def get_local_client():
return client
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=10),
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),
stop=tenacity.stop_after_attempt(10),
reraise=True)
def get_vault_health():

View File

@ -1,9 +1,4 @@
import datetime
import json
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.x509.extensions import ExtensionNotFound
from cryptography.x509.oid import NameOID, ExtensionOID
import hvac
import charmhelpers.contrib.network.ip as ch_ip
import charmhelpers.core.hookenv as hookenv
@ -12,6 +7,7 @@ from . import vault
CHARM_PKI_MP = "charm-pki-local"
CHARM_PKI_ROLE = "local"
CHARM_PKI_ROLE_CLIENT = "local-client"
def configure_pki_backend(client, name, ttl=None):
@ -33,6 +29,14 @@ def configure_pki_backend(client, name, ttl=None):
config={'max-lease-ttl': ttl or '87600h'})
def disable_pki_backend():
"""Ensure a pki backend is disabled
"""
client = vault.get_local_client()
if vault.is_backend_mounted(client, CHARM_PKI_MP):
client.disable_secret_backend(CHARM_PKI_MP)
def is_ca_ready(client, name, role):
"""Check if CA is ready for use
@ -55,42 +59,53 @@ def get_chain(name=None):
def get_ca():
"""Check if CA is ready for use
"""Get the root CA certificate.
:returns: Whether CA is ready
:rtype: bool
:returns: Root CA certificate
:rtype: str
"""
return hookenv.leader_get('root-ca')
def get_server_certificate(cn, ip_sans=None, alt_names=None):
"""Create a certificate and key for the given cn inc sans if requested
def generate_certificate(cert_type, common_name, sans):
"""
Create a certificate and key for the given CN and SANs, if requested.
:param cn: Common name to use for certifcate
:type cn: string
:param ip_sans: List of IP address to create san records for
:type ip_sans: [str1,...]
:param alt_names: List of names to create san records for
:type alt_names: [str1,...]
:raises: vault.VaultNotReady
May raise VaultNotReady if called too early, or VaultInvalidRequest if
something is wrong with the request.
:param request: Certificate request from the tls-certificates interface.
:type request: CertificateRequest
:returns: The newly created cert, issuing ca and key
:rtype: tuple
"""
client = vault.get_local_client()
configure_pki_backend(client, CHARM_PKI_MP)
if is_ca_ready(client, CHARM_PKI_MP, CHARM_PKI_ROLE):
config = {
'common_name': cn}
if not is_ca_ready(client, CHARM_PKI_MP, CHARM_PKI_ROLE):
raise vault.VaultNotReady("CA not ready")
role = None
if cert_type == 'server':
role = CHARM_PKI_ROLE
elif cert_type == 'client':
role = CHARM_PKI_ROLE_CLIENT
else:
raise vault.VaultInvalidRequest('Unsupported cert_type: '
'{}'.format(cert_type))
config = {
'common_name': common_name,
}
if sans:
ip_sans, alt_names = sort_sans(sans)
if ip_sans:
config['ip_sans'] = ','.join(ip_sans)
if alt_names:
config['alt_names'] = ','.join(alt_names)
bundle = client.write(
'{}/issue/{}'.format(CHARM_PKI_MP, CHARM_PKI_ROLE),
**config)['data']
else:
raise vault.VaultNotReady("CA not ready")
return bundle
try:
response = client.write('{}/issue/{}'.format(CHARM_PKI_MP, role),
**config)
except hvac.exceptions.InvalidRequest as e:
raise vault.VaultInvalidRequest(str(e)) from e
return response['data']
def get_csr(ttl=None, country=None, province=None,
@ -116,8 +131,7 @@ def get_csr(ttl=None, country=None, province=None,
:rtype: string
"""
client = vault.get_local_client()
if not vault.is_backend_mounted(client, CHARM_PKI_MP):
configure_pki_backend(client, CHARM_PKI_MP)
configure_pki_backend(client, CHARM_PKI_MP)
config = {
'common_name': ("Vault Intermediate Certificate Authority "
"({})".format(CHARM_PKI_MP)),
@ -177,197 +191,110 @@ def upload_signed_csr(pem, allowed_domains, allow_subdomains=True,
allow_subdomains=allow_subdomains,
enforce_hostnames=enforce_hostnames,
allow_any_name=allow_any_name,
max_ttl=max_ttl)
max_ttl=max_ttl,
server_flag=True,
client_flag=True) # server certs can also be used as client certs
# Configure a role for using this PKI to issue server certs
client.write(
'{}/roles/{}'.format(CHARM_PKI_MP, CHARM_PKI_ROLE_CLIENT),
allowed_domains=allowed_domains,
allow_subdomains=allow_subdomains,
enforce_hostnames=enforce_hostnames,
allow_any_name=allow_any_name,
max_ttl=max_ttl,
server_flag=False, # client certs cannot be used as server certs
client_flag=True)
def generate_root_ca(ttl='87599h', allow_any_name=True, allowed_domains=None,
allow_bare_domains=False, allow_subdomains=False,
allow_glob_domains=True, enforce_hostnames=False,
max_ttl='87598h'):
"""Configure Vault to generate a self-signed root CA.
:param ttl: TTL of the root CA certificate
:type ttl: string
:param allow_any_name: Specifies if clients can request certs for any CN.
:type allow_any_name: bool
:param allow_any_name: List of CNs for which clients can request certs.
:type allowed_domains: list
:param allow_bare_domains: Specifies if clients can request certs for CNs
exactly matching those in allowed_domains.
:type allow_bare_domains: bool
:param allow_subdomains: Specifies if clients can request certificates with
CNs that are subdomains of those in
allowed_domains, including wildcard subdomains.
:type allow_subdomains: bool
:param allow_glob_domains: Specifies whether CNs in allowed-domains can
contain glob patterns (e.g.,
'ftp*.example.com'), in which case clients will
be able to request certificates for any CN
matching the glob pattern.
:type allow_glob_domains: bool
:param enforce_hostnames: Specifies if only valid host names are allowed
for CNs, DNS SANs, and the host part of email
addresses.
:type enforce_hostnames: bool
:param max_ttl: Specifies the maximum Time To Live for generated certs.
:type max_ttl: str
"""
client = vault.get_local_client()
configure_pki_backend(client, CHARM_PKI_MP)
if is_ca_ready(client, CHARM_PKI_MP, CHARM_PKI_ROLE):
raise vault.VaultError('PKI CA already configured')
config = {
'common_name': ("Vault Root Certificate Authority "
"({})".format(CHARM_PKI_MP)),
'ttl': ttl,
}
csr_info = client.write(
'{}/root/generate/internal'.format(CHARM_PKI_MP),
**config)
cert = csr_info['data']['certificate']
# Generated certificates can have the CRL location and the location of the
# issuing certificate encoded.
addr = vault.get_access_address()
client.write(
'{}/config/urls'.format(CHARM_PKI_MP),
issuing_certificates="{}/v1/{}/ca".format(addr, CHARM_PKI_MP),
crl_distribution_points="{}/v1/{}/crl".format(addr, CHARM_PKI_MP)
)
# Configure a role for using this PKI to issue server certs
client.write(
'{}/roles/{}'.format(CHARM_PKI_MP, CHARM_PKI_ROLE),
allow_any_name=allow_any_name,
allowed_domains=allowed_domains,
allow_bare_domains=allow_bare_domains,
allow_subdomains=allow_subdomains,
allow_glob_domains=allow_glob_domains,
enforce_hostnames=enforce_hostnames,
max_ttl=max_ttl,
server_flag=True,
client_flag=True) # server certs can also be used as client certs
# Configure a role for using this PKI to issue client-only certs
client.write(
'{}/roles/{}'.format(CHARM_PKI_MP, CHARM_PKI_ROLE_CLIENT),
allow_any_name=allow_any_name,
allowed_domains=allowed_domains,
allow_bare_domains=allow_bare_domains,
allow_subdomains=allow_subdomains,
allow_glob_domains=allow_glob_domains,
enforce_hostnames=enforce_hostnames,
max_ttl=max_ttl,
server_flag=False, # client certs cannot be used as server certs
client_flag=True)
return cert
def sort_sans(sans):
"""Split SANS into IP sans and name SANS
"""
Split SANs into IP SANs and name SANs
:param sans: List of SANS
:param sans: List of SANs
:type sans: list
:returns: List of IP sans and list of Name SANS
:returns: List of IP SANs and list of name SANs
:rtype: ([], [])
"""
ip_sans = {s for s in sans if ch_ip.is_ip(s)}
alt_names = set(sans).difference(ip_sans)
return sorted(list(ip_sans)), sorted(list(alt_names))
def get_vault_units():
"""Return all vault units related to this one
:returns: List of vault units
:rtype: []
"""
peer_rid = hookenv.relation_ids('cluster')[0]
vault_units = [hookenv.local_unit()]
vault_units.extend(hookenv.related_units(relid=peer_rid))
return vault_units
def get_matching_cert_from_relation(unit_name, cn, ip_sans, alt_names):
"""Scan vault units relation data for a cert that matches
Scan the relation data that each vault unit has sent to the clients
to find a cert that matchs the cn and sans. If one exists return it.
If mutliple are found then return the one with the lastest valid_to
date
:param unit_name: Return the unit_name to look for serts for.
:type unit_name: string
:param cn: Common name to use for certifcate
:type cn: string
:param ip_sans: List of IP address to create san records for
:type ip_sans: [str1,...]
:param alt_names: List of names to create san records for
:type alt_names: [str1,...]
:returns: Cert and key if found
:rtype: {}
"""
vault_units = get_vault_units()
rid = hookenv.relation_id('certificates', unit_name)
match = []
for vunit in vault_units:
sent_data = hookenv.relation_get(unit=vunit, rid=rid)
name = unit_name.replace('/', '_')
cert_name = '{}.server.cert'.format(name)
cert_key = '{}.server.key'.format(name)
candidate_cert = sent_data.get(cert_name)
if candidate_cert and cert_matches_request(candidate_cert, cn,
ip_sans, alt_names):
match.append({
'certificate': sent_data.get(cert_name),
'private_key': sent_data.get(cert_key)})
batch_request_raw = sent_data.get('processed_requests')
if batch_request_raw:
batch_request = json.loads(batch_request_raw)
for sent_cn in batch_request.keys():
if sent_cn == cn:
candidate_cert = batch_request[cn]['cert']
candidate_key = batch_request[cn]['key']
if cert_matches_request(candidate_cert, cn, ip_sans,
alt_names):
match.append({
'certificate': candidate_cert,
'private_key': candidate_key})
return select_newest(match)
def cert_matches_request(cert_pem, cn, ip_sans, alt_names):
"""Test if the cert matches the supplied attributes
If the cn is duplicated in either the cert or the supplied alt_names
it is removed before performing the check.
:param cert_pem: Certificate in pem format to check
:type cert_pem: string
:param cn: Common name to use for certifcate
:type cn: string
:param ip_sans: List of IP address to create san records for
:type ip_sans: [str1,...]
:param alt_names: List of names to create san records for
:type alt_names: [str1,...]
:returns: Whether cert matches criteria
:rtype: bool
"""
cert_data = certificate_information(cert_pem)
if cn == cert_data['cn']:
try:
cert_data['alt_names'].remove(cn)
except ValueError:
pass
try:
alt_names.remove(cn)
except ValueError:
pass
else:
return False
if sorted(cert_data['alt_names']) == sorted(alt_names) and \
sorted(cert_data['ip_sans']) == sorted(ip_sans):
return True
else:
return False
def certificate_information(cert_pem):
"""Extract cn, sans and expiration info from certificate
:param cert_pem: Certificate in pem format to check
:type cert_pem: string
:returns: Certificate information in a dictionary
:rtype: {}
"""
cert = x509.load_pem_x509_certificate(cert_pem.encode(), default_backend())
bundle = {
'cn': cert.subject.get_attributes_for_oid(
NameOID.COMMON_NAME)[0].value,
'not_valid_after': cert.not_valid_after}
try:
sans = cert.extensions.get_extension_for_oid(
ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
alt_names = sans.value.get_values_for_type(x509.DNSName)
ip_sans = sans.value.get_values_for_type(x509.IPAddress)
ip_sans = [str(ip) for ip in ip_sans]
except ExtensionNotFound:
alt_names = ip_sans = []
bundle['ip_sans'] = ip_sans
bundle['alt_names'] = alt_names
return bundle
def select_newest(certs):
"""Iterate over the certificate bundle and return the one with the latest
not_valid_after date
:returns: Certificate bundle
:rtype: {}
"""
latest = datetime.datetime.utcfromtimestamp(0)
candidate = None
for bundle in certs:
cert = x509.load_pem_x509_certificate(
bundle['certificate'].encode(),
default_backend())
not_valid_after = cert.not_valid_after
if not_valid_after > latest:
latest = not_valid_after
candidate = bundle
return candidate
def process_cert_request(cn, sans, unit_name, reissue_requested):
"""Return a certificate and key matching the requeest
Return a certificate and key matching the request. This may be an existing
certificate and key if one exists and reissue_requested is False.
:param cn: Common name to use for certifcate
:type cn: string
:param sans: List of SANS
:type sans: list
:param unit_name: Return the unit_name to look for serts for.
:type unit_name: string
:returns: Cert and key
:rtype: {}
"""
bundle = {}
ip_sans, alt_names = sort_sans(sans)
if not reissue_requested:
bundle = get_matching_cert_from_relation(
unit_name,
cn,
list(ip_sans),
list(alt_names))
hookenv.log(
"Found existing cert for {}, reusing".format(cn),
level=hookenv.DEBUG)
if not bundle:
hookenv.log(
"Requesting new cert for {}".format(cn),
level=hookenv.DEBUG)
# Create the server certificate based on the info in request.
bundle = get_server_certificate(
cn,
ip_sans=ip_sans,
alt_names=alt_names)
return bundle

View File

@ -2,6 +2,8 @@ import base64
import psycopg2
import subprocess
import tenacity
import yaml
from pathlib import Path
from charmhelpers.contrib.charmsupport.nrpe import (
@ -23,6 +25,7 @@ from charmhelpers.core.hookenv import (
application_version_set,
atexit,
local_unit,
leader_set,
)
from charmhelpers.core.host import (
@ -37,6 +40,8 @@ from charmhelpers.core.templating import (
render,
)
from charmhelpers.core import unitdata
from charms.reactive import (
hook,
is_state,
@ -634,67 +639,85 @@ def _assess_status():
)
@when('leadership.is_leader')
@when_any('certificates.server.cert.requested',
'certificates.reissue.requested')
def create_server_cert():
if not vault.vault_ready_for_clients():
log('Unable to process new secret backend requests,'
' deferring until vault is fully configured', level=DEBUG)
return
reissue_requested = is_flag_set('certificates.reissue.requested')
@when('leadership.is_leader',
'config.set.auto-generate-root-ca-cert')
@when_not('charm.vault.ca.ready')
def auto_generate_root_ca_cert():
actions_yaml = yaml.load(Path('actions.yaml').read_text())
props = actions_yaml['generate-root-ca']['properties']
action_config = {key: value['default'] for key, value in props.items()}
try:
root_ca = vault_pki.generate_root_ca(
ttl=action_config['ttl'],
allow_any_name=action_config['allow-any-name'],
allowed_domains=action_config['allowed-domains'],
allow_bare_domains=action_config['allow-bare-domains'],
allow_subdomains=action_config['allow-subdomains'],
allow_glob_domains=action_config['allow-glob-domains'],
enforce_hostnames=action_config['enforce-hostnames'],
max_ttl=action_config['max-ttl'])
leader_set({'root-ca': root_ca})
set_flag('charm.vault.ca.ready')
except vault.VaultError as e:
log("Skipping auto-generate root CA cert: {}".format(e))
@when('leadership.is_leader',
'charm.vault.ca.ready',
'certificates.available')
def publish_ca_info():
tls = endpoint_from_flag('certificates.available')
server_requests = tls.get_server_requests()
for unit_name, request in server_requests.items():
log(
'Processing certificate requests from {}'.format(unit_name),
level=DEBUG)
# Process request for a single certificate
cn = request.get('common_name')
sans = request.get('sans')
if cn and sans:
log(
'Processing single certificate requests for {}'.format(cn),
level=DEBUG)
try:
bundle = vault_pki.process_cert_request(
cn,
sans,
unit_name,
reissue_requested)
except vault.VaultNotReady:
# Cannot continue if vault is not ready
return
# Set the certificate and key for the unit on the relationship.
tls.set_server_cert(
unit_name,
bundle['certificate'],
bundle['private_key'])
# Process request for a batch of certificates
cert_requests = request.get('cert_requests')
if cert_requests:
log(
'Processing batch of requests from {}'.format(unit_name),
level=DEBUG)
for cn, crequest in cert_requests.items():
log('Processing requests for {}'.format(cn), level=DEBUG)
try:
bundle = vault_pki.process_cert_request(
cn,
crequest.get('sans'),
unit_name,
reissue_requested)
except vault.VaultNotReady:
# Cannot continue if vault is not ready
return
tls.add_server_cert(
unit_name,
cn,
bundle['certificate'],
bundle['private_key'])
tls.set_server_multicerts(unit_name)
tls.set_ca(vault_pki.get_ca())
chain = vault_pki.get_chain()
if chain:
tls.set_chain(chain)
tls.set_ca(vault_pki.get_ca())
chain = vault_pki.get_chain()
if chain:
tls.set_chain(chain)
@when('leadership.is_leader',
'charm.vault.ca.ready',
'certificates.available')
def publish_global_client_cert():
"""
This is for backwards compatibility with older tls-certificate clients
only. Obviously, it's not good security / design to have clients sharing
a certificate, but it seems that there are clients that depend on this
(though some, like etcd, only block on the flag that it triggers but don't
actually use the cert), so we have to set it for now.
"""
cert_created = is_flag_set('charm.vault.global-client-cert.created')
reissue_requested = is_flag_set('certificates.reissue.global.requested')
tls = endpoint_from_flag('certificates.available')
if not cert_created or reissue_requested:
bundle = vault_pki.generate_certificate('client',
'global-client',
[])
unitdata.kv().set('charm.vault.global-client-cert', bundle)
set_flag('charm.vault.global-client-cert.created')
clear_flag('certificates.reissue.global.requested')
else:
bundle = unitdata.kv().get('charm.vault.global-client-cert')
tls.set_client_cert(bundle['certificate'], bundle['private_key'])
@when('leadership.is_leader',
'charm.vault.ca.ready')
@when_any('certificates.certs.requested',
'certificates.reissue.requested')
def create_certs():
reissue_requested = is_flag_set('certificates.reissue.requested')
tls = endpoint_from_flag('certificates.certs.requested')
requests = tls.all_requests if reissue_requested else tls.new_requests
if reissue_requested:
log('Reissuing all certs')
for request in requests:
log('Processing certificate request from {} for {}'.format(
request.unit_name, request.common_name))
try:
bundle = vault_pki.generate_certificate(request.cert_type,
request.common_name,
request.sans)
request.set_cert(bundle['certificate'], bundle['private_key'])
except vault.VaultInvalidRequest as e:
log(str(e), level=ERROR)
continue # TODO: report failure back to client
clear_flag('certificates.reissue.requested')

View File

@ -1,8 +1,7 @@
import datetime
import json
import mock
from unittest.mock import patch
from cryptography.x509.extensions import ExtensionNotFound
import hvac
import lib.charm.vault_pki as vault_pki
import unit_tests.test_utils
@ -89,44 +88,88 @@ class TestLibCharmVaultPKI(unit_tests.test_utils.CharmTestCase):
leader_get.return_value = 'ROOTCA'
self.assertEqual(vault_pki.get_ca(), 'ROOTCA')
@patch.object(vault_pki, 'sort_sans')
@patch.object(vault_pki, 'is_ca_ready')
@patch.object(vault_pki, 'configure_pki_backend')
@patch.object(vault_pki.vault, 'get_local_client')
def test_get_server_certificate(self, get_local_client,
configure_pki_backend, is_ca_ready):
def test_generate_certificate(self, get_local_client,
configure_pki_backend,
is_ca_ready,
sort_sans):
client_mock = mock.MagicMock()
client_mock.write.return_value = {'data': 'data'}
get_local_client.return_value = client_mock
is_ca_ready.return_value = True
vault_pki.get_server_certificate('bob.example.com')
client_mock.write.assert_called_once_with(
'charm-pki-local/issue/local',
common_name='bob.example.com'
)
sort_sans.side_effect = lambda l: (l[0], l[1])
write_calls = [
mock.call(
'charm-pki-local/issue/local',
common_name='example.com',
),
mock.call(
'charm-pki-local/issue/local',
common_name='example.com',
ip_sans='ip1',
alt_names='alt1',
),
mock.call(
'charm-pki-local/issue/local-client',
common_name='example.com',
ip_sans='ip1,ip2',
alt_names='alt1,alt2',
),
]
vault_pki.generate_certificate('server',
'example.com',
([], []))
vault_pki.generate_certificate('server',
'example.com',
(['ip1'], ['alt1']))
vault_pki.generate_certificate('client',
'example.com',
(['ip1', 'ip2'], ['alt1', 'alt2']))
client_mock.write.assert_has_calls(write_calls)
@patch.object(vault_pki, 'is_ca_ready')
@patch.object(vault_pki, 'configure_pki_backend')
@patch.object(vault_pki.vault, 'get_local_client')
def test_get_server_certificate_sans(self, get_local_client,
configure_pki_backend,
is_ca_ready):
def test_generate_certificate_not_ready(self, get_local_client,
configure_pki_backend,
is_ca_ready):
client_mock = mock.MagicMock()
get_local_client.return_value = client_mock
is_ca_ready.return_value = False
with self.assertRaises(vault_pki.vault.VaultNotReady):
vault_pki.generate_certificate('server', 'exmaple.com', [])
@patch.object(vault_pki, 'is_ca_ready')
@patch.object(vault_pki, 'configure_pki_backend')
@patch.object(vault_pki.vault, 'get_local_client')
def test_generate_certificate_invalid_type(self, get_local_client,
configure_pki_backend,
is_ca_ready):
client_mock = mock.MagicMock()
get_local_client.return_value = client_mock
is_ca_ready.return_value = True
vault_pki.get_server_certificate(
'bob.example.com',
ip_sans=['10.10.10.10', '192.197.45.23'],
alt_names=['localunit', 'public.bob.example.com'])
client_mock.write.assert_called_once_with(
'charm-pki-local/issue/local',
alt_names='localunit,public.bob.example.com',
common_name='bob.example.com',
ip_sans='10.10.10.10,192.197.45.23'
)
with self.assertRaises(vault_pki.vault.VaultInvalidRequest):
vault_pki.generate_certificate('unknown', 'exmaple.com', [])
@patch.object(vault_pki.vault, 'is_backend_mounted')
@patch.object(vault_pki, 'is_ca_ready')
@patch.object(vault_pki, 'configure_pki_backend')
@patch.object(vault_pki.vault, 'get_local_client')
def test_get_csr(self, get_local_client, is_backend_mounted):
is_backend_mounted.return_value = True
def test_generate_certificate_invalid_request(self, get_local_client,
configure_pki_backend,
is_ca_ready):
client_mock = mock.MagicMock()
get_local_client.return_value = client_mock
is_ca_ready.return_value = True
client_mock.write.side_effect = hvac.exceptions.InvalidRequest
with self.assertRaises(vault_pki.vault.VaultInvalidRequest):
vault_pki.generate_certificate('server', 'exmaple.com', [])
@patch.object(vault_pki, 'configure_pki_backend')
@patch.object(vault_pki.vault, 'get_local_client')
def test_get_csr(self, get_local_client, configure_pki_backend):
client_mock = mock.MagicMock()
get_local_client.return_value = client_mock
client_mock.write.return_value = {
@ -140,30 +183,8 @@ class TestLibCharmVaultPKI(unit_tests.test_utils.CharmTestCase):
ttl='87599h')
@patch.object(vault_pki, 'configure_pki_backend')
@patch.object(vault_pki.vault, 'is_backend_mounted')
@patch.object(vault_pki.vault, 'get_local_client')
def test_get_csr_config_backend(self, get_local_client, is_backend_mounted,
configure_pki_backend):
is_backend_mounted.return_value = False
client_mock = mock.MagicMock()
get_local_client.return_value = client_mock
client_mock.write.return_value = {
'data': {
'csr': 'somecert'}}
self.assertEqual(vault_pki.get_csr(), 'somecert')
client_mock.write.assert_called_once_with(
'charm-pki-local/intermediate/generate/internal',
common_name=('Vault Intermediate Certificate Authority'
' (charm-pki-local)'),
ttl='87599h')
configure_pki_backend.assert_called_once_with(
client_mock,
'charm-pki-local')
@patch.object(vault_pki.vault, 'is_backend_mounted')
@patch.object(vault_pki.vault, 'get_local_client')
def test_get_csr_explicit(self, get_local_client, is_backend_mounted):
is_backend_mounted.return_value = False
def test_get_csr_explicit(self, get_local_client, configure_pki_backend):
client_mock = mock.MagicMock()
get_local_client.return_value = client_mock
client_mock.write.return_value = {
@ -205,7 +226,18 @@ class TestLibCharmVaultPKI(unit_tests.test_utils.CharmTestCase):
allow_subdomains=True,
enforce_hostnames=False,
allow_any_name=True,
max_ttl='87598h')
max_ttl='87598h',
server_flag=True,
client_flag=True),
mock.call(
'charm-pki-local/roles/local-client',
allowed_domains='exmaple.com',
allow_subdomains=True,
enforce_hostnames=False,
allow_any_name=True,
max_ttl='87598h',
server_flag=False,
client_flag=True),
]
vault_pki.upload_signed_csr('MYPEM', 'exmaple.com')
client_mock._post.assert_called_once_with(
@ -232,7 +264,18 @@ class TestLibCharmVaultPKI(unit_tests.test_utils.CharmTestCase):
allow_subdomains=False,
enforce_hostnames=True,
allow_any_name=False,
max_ttl='42h')
max_ttl='42h',
server_flag=True,
client_flag=True),
mock.call(
'charm-pki-local/roles/local-client',
allowed_domains='exmaple.com',
allow_subdomains=False,
enforce_hostnames=True,
allow_any_name=False,
max_ttl='42h',
server_flag=False,
client_flag=True),
]
vault_pki.upload_signed_csr(
'MYPEM',
@ -246,6 +289,71 @@ class TestLibCharmVaultPKI(unit_tests.test_utils.CharmTestCase):
json={'certificate': 'MYPEM'})
client_mock.write.assert_has_calls(write_calls)
@patch.object(vault_pki.vault, 'get_access_address')
@patch.object(vault_pki, 'is_ca_ready')
@patch.object(vault_pki, 'configure_pki_backend')
@patch.object(vault_pki.vault, 'get_local_client')
def test_generate_root_ca(self,
get_local_client,
configure_pki_backend,
is_ca_ready,
get_access_address):
mock_client = get_local_client.return_value
mock_client.write.return_value = {'data': {'certificate': 'cert'}}
is_ca_ready.return_value = False
get_access_address.return_value = 'addr'
rv = vault_pki.generate_root_ca(ttl='0h',
allow_any_name=True,
allowed_domains='domains',
allow_bare_domains=True,
allow_subdomains=True,
allow_glob_domains=False,
enforce_hostnames=True,
max_ttl='0h')
self.assertEqual(rv, 'cert')
mock_client.write.assert_has_calls([
mock.call('charm-pki-local/root/generate/internal',
common_name='Vault Root Certificate Authority '
'(charm-pki-local)',
ttl='0h'),
mock.call('charm-pki-local/config/urls',
issuing_certificates='addr/v1/charm-pki-local/ca',
crl_distribution_points='addr/v1/charm-pki-local/crl'),
mock.call('charm-pki-local/roles/local',
allow_any_name=True,
allowed_domains='domains',
allow_bare_domains=True,
allow_subdomains=True,
allow_glob_domains=False,
enforce_hostnames=True,
max_ttl='0h',
server_flag=True,
client_flag=True),
mock.call('charm-pki-local/roles/local-client',
allow_any_name=True,
allowed_domains='domains',
allow_bare_domains=True,
allow_subdomains=True,
allow_glob_domains=False,
enforce_hostnames=True,
max_ttl='0h',
server_flag=False,
client_flag=True),
])
@patch.object(vault_pki.vault, 'get_access_address')
@patch.object(vault_pki, 'is_ca_ready')
@patch.object(vault_pki, 'configure_pki_backend')
@patch.object(vault_pki.vault, 'get_local_client')
def test_generate_root_ca_already_init(self,
get_local_client,
configure_pki_backend,
is_ca_ready,
get_access_address):
is_ca_ready.return_value = True
with self.assertRaises(vault_pki.vault.VaultError):
vault_pki.generate_root_ca()
def test_sort_sans(self):
self.assertEqual(
vault_pki.sort_sans([
@ -256,294 +364,3 @@ class TestLibCharmVaultPKI(unit_tests.test_utils.CharmTestCase):
'admin.local',
'public.local']),
(['10.0.0.10', '10.0.0.20'], ['admin.local', 'public.local']))
@patch.object(vault_pki.hookenv, 'related_units')
@patch.object(vault_pki.hookenv, 'relation_ids')
@patch.object(vault_pki.hookenv, 'local_unit')
def test_get_vault_units(self, local_unit, relation_ids, related_units):
local_unit.return_value = 'vault/3'
relation_ids.return_value = 'certificates:34'
related_units.return_value = ['vault/1', 'vault/5']
self.assertEqual(
vault_pki.get_vault_units(),
['vault/3', 'vault/1', 'vault/5'])
def _get_matching_cert_from_relation(self, vault_relation, cert_match,
func_args,
expected_bundle,
expected_newest_calls):
self.patch_object(vault_pki.hookenv, 'relation_get')
self.patch_object(vault_pki.hookenv, 'relation_id')
self.patch_object(vault_pki, 'select_newest')
self.patch_object(vault_pki, 'cert_matches_request')
self.patch_object(vault_pki, 'get_vault_units')
self.relation_get.side_effect = lambda unit, rid: vault_relation[unit]
self.cert_matches_request.side_effect = \
lambda w, x, y, z: cert_match[w]
self.get_vault_units.return_value = ['vault/3', 'vault/1', 'vault/5']
self.relation_id.return_value = 'certificates:23'
self.select_newest.side_effect = lambda x: x[0]
rget_calls = [
mock.call(unit='vault/3', rid='certificates:23'),
mock.call(unit='vault/1', rid='certificates:23'),
mock.call(unit='vault/5', rid='certificates:23')]
self.assertEqual(
vault_pki.get_matching_cert_from_relation(*func_args),
expected_bundle)
self.relation_get.assert_has_calls(rget_calls)
self.select_newest.assert_called_once_with(expected_newest_calls)
def test_get_matching_cert_from_relation(self):
_rinfo = {
'vault/1': {
'keystone_0.server.cert': 'V1CERT',
'keystone_0.server.key': 'V1KEY'},
'vault/3': {},
'vault/5': {},
}
_cmatch = {
'V1CERT': True
}
self._get_matching_cert_from_relation(
_rinfo,
_cmatch,
('keystone/0', 'ks.bob.com', ['10.0.0.23'], ['junit1.maas.local']),
{'private_key': 'V1KEY', 'certificate': 'V1CERT'},
[{'private_key': 'V1KEY', 'certificate': 'V1CERT'}])
def test_get_matching_cert_from_relation_batch_single(self):
_rinfo = {
'vault/1': {},
'vault/3': {
'processed_requests': json.dumps({
'ks.bob.com': {
'cert': 'V3CERT',
'key': 'V3KEY'}})},
'vault/5': {},
}
_cmatch = {
'V3CERT': True
}
self._get_matching_cert_from_relation(
_rinfo,
_cmatch,
('keystone/0', 'ks.bob.com', ['10.0.0.23'], ['junit1.maas.local']),
{'private_key': 'V3KEY', 'certificate': 'V3CERT'},
[{'private_key': 'V3KEY', 'certificate': 'V3CERT'}])
def test_get_matching_cert_from_relation_batch_multi_one_match(self):
_rinfo = {
'vault/1': {},
'vault/3': {
'processed_requests': json.dumps({
'ks.bob.com': {
'cert': 'V3CERT',
'key': 'V3KEY'}})},
'vault/5': {
'processed_requests': json.dumps({
'glance.bob.com': {
'cert': 'V5CERT',
'key': 'V5KEY'}})},
}
_cmatch = {
'V3CERT': True
}
self._get_matching_cert_from_relation(
_rinfo,
_cmatch,
('keystone/0', 'ks.bob.com', ['10.0.0.23'], ['junit1.maas.local']),
{'private_key': 'V3KEY', 'certificate': 'V3CERT'},
[{'private_key': 'V3KEY', 'certificate': 'V3CERT'}])
def test_get_matching_cert_from_relation_batch_multi_two_match(self):
_rinfo = {
'vault/1': {},
'vault/3': {
'processed_requests': json.dumps({
'ks.bob.com': {
'cert': 'V3CERT',
'key': 'V3KEY'}})},
'vault/5': {
'processed_requests': json.dumps({
'ks.bob.com': {
'cert': 'V5CERT',
'key': 'V5KEY'}})},
}
_cmatch = {
'V3CERT': True,
'V5CERT': True
}
self._get_matching_cert_from_relation(
_rinfo,
_cmatch,
('keystone/0', 'ks.bob.com', ['10.0.0.23'], ['junit1.maas.local']),
{'private_key': 'V3KEY', 'certificate': 'V3CERT'},
[
{'private_key': 'V3KEY', 'certificate': 'V3CERT'},
{'private_key': 'V5KEY', 'certificate': 'V5CERT'}])
def test_get_matching_cert_from_relation_batch_multi_sans_mismatch(self):
_rinfo = {
'vault/1': {},
'vault/3': {
'processed_requests': json.dumps({
'ks.bob.com': {
'cert': 'V3CERT',
'key': 'V3KEY'}})},
'vault/5': {
'processed_requests': json.dumps({
'ks.bob.com': {
'cert': 'V5CERT',
'key': 'V5KEY'}})},
}
_cmatch = {
'V3CERT': False,
'V5CERT': True
}
self._get_matching_cert_from_relation(
_rinfo,
_cmatch,
('keystone/0', 'ks.bob.com', ['10.0.0.23'], ['junit1.maas.local']),
{'private_key': 'V5KEY', 'certificate': 'V5CERT'},
[{'private_key': 'V5KEY', 'certificate': 'V5CERT'}])
@patch.object(vault_pki, 'certificate_information')
def test_cert_matches_request(self, certificate_information):
certificate_information.return_value = {
'cn': 'ks.bob.com',
'ip_sans': ['10.0.0.10'],
'alt_names': ['unit1.bob.com']}
self.assertTrue(
vault_pki.cert_matches_request(
'pem', 'ks.bob.com', ['10.0.0.10'], ['unit1.bob.com']))
@patch.object(vault_pki, 'certificate_information')
def test_cert_matches_request_mismatch_cn(self, certificate_information):
certificate_information.return_value = {
'cn': 'glance.bob.com',
'ip_sans': ['10.0.0.10'],
'alt_names': ['unit1.bob.com']}
self.assertFalse(
vault_pki.cert_matches_request(
'pem', 'ks.bob.com', ['10.0.0.10'], ['unit1.bob.com']))
@patch.object(vault_pki, 'certificate_information')
def test_cert_matches_request_mismatch_ipsan(self,
certificate_information):
certificate_information.return_value = {
'cn': 'glance.bob.com',
'ip_sans': ['10.0.0.10', '10.0.0.20'],
'alt_names': ['unit1.bob.com']}
self.assertFalse(
vault_pki.cert_matches_request(
'pem', 'ks.bob.com', ['10.0.0.10'], ['unit1.bob.com']))
@patch.object(vault_pki, 'certificate_information')
def test_cert_matches_request_cn_in_san(self, certificate_information):
certificate_information.return_value = {
'cn': 'ks.bob.com',
'ip_sans': ['10.0.0.10'],
'alt_names': ['ks.bob.com', 'unit1.bob.com']}
self.assertTrue(
vault_pki.cert_matches_request(
'pem', 'ks.bob.com', ['10.0.0.10'], ['unit1.bob.com']))
@patch.object(vault_pki.x509, 'load_pem_x509_certificate')
def test_certificate_information(self, load_pem_x509_certificate):
x509_mock = mock.MagicMock(not_valid_after="10 Mar 1976")
x509_name_mock = mock.MagicMock(value='ks.bob.com')
x509_mock.subject.get_attributes_for_oid.return_value = [
x509_name_mock]
x509_sans_mock = mock.MagicMock()
sans = [
['10.0.0.0.10'],
['sans1.bob.com']]
x509_sans_mock.value.get_values_for_type = lambda x: sans.pop()
x509_mock.extensions.get_extension_for_oid.return_value = \
x509_sans_mock
load_pem_x509_certificate.return_value = x509_mock
self.assertEqual(
vault_pki.certificate_information('pem'),
{
'cn': 'ks.bob.com',
'not_valid_after': '10 Mar 1976',
'ip_sans': ['10.0.0.0.10'],
'alt_names': ['sans1.bob.com']})
@patch.object(vault_pki.x509, 'load_pem_x509_certificate')
def test_certificate_information_no_sans(self, load_pem_x509_certificate):
x509_mock = mock.MagicMock(not_valid_after="10 Mar 1976")
x509_name_mock = mock.MagicMock(value='ks.bob.com')
x509_mock.subject.get_attributes_for_oid.return_value = [
x509_name_mock]
x509_mock.extensions.get_extension_for_oid.side_effect = \
ExtensionNotFound('msg', 'oid')
load_pem_x509_certificate.return_value = x509_mock
self.assertEqual(
vault_pki.certificate_information('pem'),
{
'cn': 'ks.bob.com',
'not_valid_after': '10 Mar 1976',
'ip_sans': [],
'alt_names': []})
@patch.object(vault_pki.x509, 'load_pem_x509_certificate')
def test_select_newest(self, load_pem_x509_certificate):
def _load_pem_x509(pem):
pem = pem.decode()
cmock1 = mock.MagicMock(
not_valid_after=datetime.datetime(2018, 5, 3))
cmock2 = mock.MagicMock(
not_valid_after=datetime.datetime(2018, 5, 4))
cmock3 = mock.MagicMock(
not_valid_after=datetime.datetime(2018, 5, 5))
certs = {
'cert1': cmock1,
'cert2': cmock2,
'cert3': cmock3}
return certs[pem]
load_pem_x509_certificate.side_effect = lambda x, y: _load_pem_x509(x)
certs = [
{'certificate': 'cert1'},
{'certificate': 'cert2'},
{'certificate': 'cert3'}]
self.assertEqual(
vault_pki.select_newest(certs),
{'certificate': 'cert3'})
@patch.object(vault_pki, 'get_matching_cert_from_relation')
@patch.object(vault_pki, 'get_server_certificate')
def test_process_cert_request(self, get_server_certificate,
get_matching_cert_from_relation):
get_matching_cert_from_relation.return_value = 'cached_bundle'
self.assertEqual(
vault_pki.process_cert_request(
'ks.bob.com',
['10.0.0.10', 'sans1.bob.com'],
'keystone_0',
False),
'cached_bundle')
get_matching_cert_from_relation.assert_called_once_with(
'keystone_0',
'ks.bob.com',
['10.0.0.10'],
['sans1.bob.com'])
get_server_certificate.assert_not_called()
@patch.object(vault_pki, 'get_matching_cert_from_relation')
@patch.object(vault_pki, 'get_server_certificate')
def test_process_cert_request_reissue(self, get_server_certificate,
get_matching_cert_from_relation):
get_server_certificate.return_value = 'new_bundle'
self.assertEqual(
vault_pki.process_cert_request(
'ks.bob.com',
['10.0.0.10', 'sans1.bob.com'],
'keystone_0',
True),
'new_bundle')
get_matching_cert_from_relation.assert_not_called()
get_server_certificate.assert_called_once_with(
'ks.bob.com',
ip_sans=['10.0.0.10'],
alt_names=['sans1.bob.com'])

View File

@ -70,7 +70,7 @@ class TestHandlers(unit_tests.test_utils.CharmTestCase):
'set_flag',
'clear_flag',
'is_container',
'endpoint_from_flag',
'unitdata',
]
self.patch_all()
self.is_container.return_value = False
@ -638,122 +638,92 @@ class TestHandlers(unit_tests.test_utils.CharmTestCase):
vault_ca='test-ca'
)
@mock.patch.object(handlers.vault_pki, 'get_ca')
@mock.patch.object(handlers.vault_pki, 'get_chain')
@mock.patch.object(handlers.vault_pki, 'process_cert_request')
@mock.patch.object(handlers, 'vault')
def test_create_server_cert(self, _vault, process_cert_request,
get_chain, get_ca):
tls_mock = mock.MagicMock()
tls_mock.get_server_requests.return_value = {
'keystone_0': {
'common_name': 'public.openstack.local',
'sans': ['10.0.0.10', 'admin.public.openstack.local']}
}
_vault.vault_ready_for_clients.return_value = True
process_cert_request.return_value = {
'certificate': 'CERT',
'private_key': 'KEY'}
get_ca.return_value = 'CA'
get_chain.return_value = 'CHAIN'
self.endpoint_from_flag.return_value = tls_mock
@mock.patch.object(handlers, 'vault_pki')
def test_publish_ca_info(self, vault_pki):
tls = self.endpoint_from_flag.return_value
vault_pki.get_ca.return_value = 'ca'
vault_pki.get_chain.return_value = 'chain'
handlers.publish_ca_info()
tls.set_ca.assert_called_with('ca')
tls.set_chain.assert_called_with('chain')
@mock.patch.object(handlers, 'vault_pki')
def test_publish_global_client_cert_already_gend(self, vault_pki):
tls = self.endpoint_from_flag.return_value
self.is_flag_set.side_effect = [True, False]
self.unitdata.kv().get.return_value = {'certificate': 'crt',
'private_key': 'key'}
handlers.publish_global_client_cert()
assert not vault_pki.generate_certificate.called
assert not self.set_flag.called
self.unitdata.kv().get.assert_called_with('charm.vault.'
'global-client-cert')
tls.set_client_cert.assert_called_with('crt', 'key')
@mock.patch.object(handlers, 'vault_pki')
def test_publish_global_client_cert_reissue(self, vault_pki):
tls = self.endpoint_from_flag.return_value
self.is_flag_set.side_effect = [True, True]
bundle = {'certificate': 'crt',
'private_key': 'key'}
vault_pki.generate_certificate.return_value = bundle
handlers.publish_global_client_cert()
vault_pki.generate_certificate.assert_called_with('client',
'global-client',
[])
self.unitdata.kv().set.assert_called_with('charm.vault.'
'global-client-cert',
bundle)
self.set_flag.assert_called_with('charm.vault.'
'global-client-cert.created')
tls.set_client_cert.assert_called_with('crt', 'key')
@mock.patch.object(handlers, 'vault_pki')
def test_publish_global_client_certe(self, vault_pki):
tls = self.endpoint_from_flag.return_value
self.is_flag_set.side_effect = [False, False]
bundle = {'certificate': 'crt',
'private_key': 'key'}
vault_pki.generate_certificate.return_value = bundle
handlers.publish_global_client_cert()
vault_pki.generate_certificate.assert_called_with('client',
'global-client',
[])
self.unitdata.kv().set.assert_called_with('charm.vault.'
'global-client-cert',
bundle)
self.set_flag.assert_called_with('charm.vault.'
'global-client-cert.created')
tls.set_client_cert.assert_called_with('crt', 'key')
@mock.patch.object(handlers, 'vault_pki')
def test_create_certs(self, vault_pki):
tls = self.endpoint_from_flag.return_value
self.is_flag_set.return_value = False
handlers.create_server_cert()
process_cert_request.assert_called_once_with(
'public.openstack.local',
['10.0.0.10', 'admin.public.openstack.local'],
'keystone_0',
False)
tls_mock.set_server_cert.assert_called_once_with(
'keystone_0',
'CERT',
'KEY')
tls_mock.set_ca.assert_called_once_with('CA')
tls_mock.set_chain.assert_called_once_with('CHAIN')
@mock.patch.object(handlers.vault_pki, 'get_ca')
@mock.patch.object(handlers.vault_pki, 'get_chain')
@mock.patch.object(handlers.vault_pki, 'process_cert_request')
@mock.patch.object(handlers, 'vault')
def test_create_server_cert_batch(self, _vault, process_cert_request,
get_chain, get_ca):
def _certs(cn, ip_sans, alt_names, reissue_requested=False):
data = {
'admin.openstack.local': {
'certificate': 'ADMINCERT',
'private_key': 'ADMINKEY'},
'public.openstack.local': {
'certificate': 'PUBLICCERT',
'private_key': 'PUBLICKEY'},
'internal.openstack.local': {
'certificate': 'INTCERT',
'private_key': 'INTKEY'}}
return data[cn]
tls_mock = mock.MagicMock()
tls_mock.get_server_requests.return_value = {
'keystone_0': {
'common_name': 'admin.openstack.local',
'sans': ['10.0.0.10', 'flump.openstack.local'],
'cert_requests': {
'public.openstack.local': {
'sans': ['10.10.0.10', 'unit_name.openstack.local']},
'internal.openstack.local': {
'sans': ['10.20.0.10']}}}}
_vault.vault_ready_for_clients.return_value = True
process_cert_request.side_effect = _certs
get_ca.return_value = 'CA'
get_chain.return_value = 'CHAIN'
create_calls = [
mock.call(
'admin.openstack.local',
['10.0.0.10', 'flump.openstack.local'],
'keystone_0',
False),
mock.call(
'public.openstack.local',
['10.10.0.10', 'unit_name.openstack.local'],
'keystone_0',
False),
mock.call(
'internal.openstack.local',
['10.20.0.10'],
'keystone_0',
False)]
add_server_calls = [
mock.call(
'keystone_0',
'public.openstack.local',
'PUBLICCERT',
'PUBLICKEY'),
mock.call(
'keystone_0',
'internal.openstack.local',
'INTCERT',
'INTKEY')
tls.new_requests = [mock.Mock(cert_type='cert_type1',
common_name='common_name1',
sans='sans1'),
mock.Mock(cert_type='invalid',
common_name='invalid',
sans='invalid'),
mock.Mock(cert_type='cert_type2',
common_name='common_name2',
sans='sans2')]
vault_pki.generate_certificate.side_effect = [
{'certificate': 'crt1', 'private_key': 'key1'},
handlers.vault.VaultInvalidRequest,
{'certificate': 'crt2', 'private_key': 'key2'},
]
self.endpoint_from_flag.return_value = tls_mock
self.is_flag_set.return_value = False
handlers.create_server_cert()
print(process_cert_request.call_args_list)
process_cert_request.assert_has_calls(
create_calls,
any_order=True)
tls_mock.set_server_cert.assert_called_once_with(
'keystone_0',
'ADMINCERT',
'ADMINKEY')
tls_mock.add_server_cert.assert_has_calls(
add_server_calls,
any_order=True)
tls_mock.set_ca.assert_called_once_with('CA')
tls_mock.set_chain.assert_called_once_with('CHAIN')
@mock.patch.object(handlers, 'vault')
def test_create_server_cert_vault_not_ready(self, _vault):
_vault.vault_ready_for_clients.return_value = False
tls_mock = mock.MagicMock()
self.endpoint_from_flag.return_value = tls_mock
handlers.create_server_cert()
self.assertFalse(tls_mock.get_server_requests.called)
handlers.create_certs()
vault_pki.generate_certificate.assert_has_calls([
mock.call('cert_type1', 'common_name1', 'sans1'),
mock.call('invalid', 'invalid', 'invalid'),
mock.call('cert_type2', 'common_name2', 'sans2'),
])
tls.new_requests[0].set_cert.assert_has_calls([
mock.call('crt1', 'key1'),
])
assert not tls.new_requests[1].called
tls.new_requests[2].set_cert.assert_has_calls([
mock.call('crt2', 'key2'),
])