Support generation of intermediate CA certificates

Add support for 'intermediate' CA certificate requests, and
generate an intermediate CA certificate with the specified CN
and SANs.

This enables charms and applications that handle PKI on their
own, or have specific requirements around the generated
certificate fields, attributes and extensions.

The intermediate CA certificates are generated in two steps.
First, we use openssl to generate a CSR and the private key.
We then sign the request using the pki/root/sign-intermediate
endpoint.

This implies tls-certificates support for intermediate certificates,
see https://github.com/juju-solutions/interface-tls-certificates/pull/31
for more details.

We also extend the generate-certificate action to accept the certificate
type that will be generated (server, client, intermediate).

func-test-pr https://github.com/openstack-charmers/zaza-openstack-tests/pull/1156

Change-Id: I2065e9d627dca3a9f895e747ccf0498ff53bd570
This commit is contained in:
Angelos Kolaitis 2023-10-30 22:51:47 +02:00
parent 8d72e64d84
commit 0c2d57238d
No known key found for this signature in database
GPG Key ID: 66AC4122BBC46C5C
6 changed files with 152 additions and 33 deletions

View File

@ -207,6 +207,11 @@ generate-certificate:
default: 8760h
description: >-
Specifies the maximum Time To Live for generated certificates.
cert-type:
type: string
default: server
description: >-
Type of certificate to generate. Must be one of: server, client, intermediate
raft-state:
description: >-
Get the raft cluster state.

View File

@ -291,7 +291,7 @@ def generate_cert(*args):
sans_list = action_config.get('sans')
try:
new_crt = vault_pki.generate_certificate(
cert_type='server',
cert_type=action_config.get('cert-type'),
common_name=action_config.get('common-name'),
sans=list(sans_list.split()),
ttl=action_config.get('ttl'),

View File

@ -13,6 +13,10 @@ from . import vault
CHARM_PKI_MP = "charm-pki-local"
CHARM_PKI_ROLE = "local"
CHARM_PKI_ROLE_CLIENT = "local-client"
CHARM_PKI_ROLE_MAP = {
"server": CHARM_PKI_ROLE,
"client": CHARM_PKI_ROLE_CLIENT,
}
def configure_pki_backend(client, name, ttl=None, max_ttl=None):
@ -112,20 +116,13 @@ def generate_certificate(cert_type, common_name, sans, ttl, max_ttl):
:param request: Certificate request from the tls-certificates interface.
:type request: CertificateRequest
:returns: The newly created cert, issuing ca and key
:rtype: tuple
:rtype: Dict
"""
client = vault.get_local_client()
configure_pki_backend(client, CHARM_PKI_MP, ttl, max_ttl)
if not is_ca_ready(client, CHARM_PKI_MP, CHARM_PKI_ROLE):
raise vault.VaultNotReady("CA not ready")
role = None
if cert_type == 'server':
role = CHARM_PKI_ROLE
elif cert_type == 'client':
role = CHARM_PKI_ROLE_CLIENT
else:
raise vault.VaultInvalidRequest('Unsupported cert_type: '
'{}'.format(cert_type))
config = {}
if sans:
ip_sans, alt_names = sort_sans(sans)
@ -134,17 +131,44 @@ def generate_certificate(cert_type, common_name, sans, ttl, max_ttl):
if alt_names:
config['alt_names'] = ','.join(alt_names)
try:
response = client.secrets.pki.generate_certificate(
role,
common_name,
extra_params=config,
mount_point=CHARM_PKI_MP,
)
if not response['data']:
raise vault.VaultError(response.get('warnings', 'unknown error'))
if cert_type in ['server', 'client']:
response = client.secrets.pki.generate_certificate(
CHARM_PKI_ROLE_MAP[cert_type],
common_name,
extra_params=config,
mount_point=CHARM_PKI_MP,
)
if not response['data']:
raise vault.VaultError('generate_certificate failed. response '
'was: {}'.format(response))
return response['data']
elif cert_type == 'intermediate':
# NOTE(neoaggelos): Starting in Vault 1.10.x or newer, this
# can optionally use client.secrets.pki.generate_intermediate()
try:
private_key, csr = openssl_generate_key_and_csr(common_name)
except CalledProcessError as e:
raise vault.VaultError('failed to generate csr and key: '
'reason: {}'.format(str(e)))
response = client.secrets.pki.sign_intermediate(
csr=csr,
common_name=common_name,
extra_params={'ttl': ttl, **config},
mount_point=CHARM_PKI_MP,
)
if not response['data']:
raise vault.VaultError('sign_intermediate failed. response '
'was: {}'.format(response))
return {'private_key': private_key, **response['data']}
else:
raise vault.VaultInvalidRequest('Unsupported cert_type: '
'{}'.format(cert_type))
except hvac.exceptions.InvalidRequest as e:
raise vault.VaultInvalidRequest(str(e)) from e
return response['data']
def get_csr(ttl=None, common_name=None, locality=None,
@ -434,6 +458,26 @@ def is_cert_from_vault(cert, name=None):
return False
def openssl_generate_key_and_csr(common_name):
"""Generate a private key and CSR for a common name.
:param common_name: the certificate common name
:type common_name: str
:returns: the private key and the certificate signing request
:rtype: (str, str)
:raises subprocess.CalledProcessError: if openssl command fails
"""
private_key = check_output(['openssl', 'genrsa', '2048'])
with NamedTemporaryFile() as f:
f.write(private_key)
f.flush()
csr = check_output(['openssl', 'req', '-new', '-sha256', '-subj',
'/CN={}'.format(common_name), '-key', f.name])
return private_key.decode(), csr.decode()
def get_serial_number_from_cert(cert, name=None):
"""Extract the serial number from the cert, or return None.

View File

@ -39,6 +39,7 @@ target_deploy_status:
tests:
- zaza.openstack.charm_tests.vault.tests.VaultTest
- zaza.openstack.charm_tests.vault.tests.VaultCacheTest
- zaza.openstack.charm_tests.vault.tests.VaultIntermediateCATest
tests_options:
force_deploy:

View File

@ -34,6 +34,7 @@ class TestActions(unit_tests.test_utils.CharmTestCase):
def test_generate_cert(self, mock_vault_pki):
self.mock_hookenv.is_leader.return_value = True
self.mock_hookenv.action_get.return_value = {
'cert-type': 'server',
'sans': 'foobar 1.2.3.4',
'common-name': 'bazbuz',
'ttl': '5m',
@ -57,6 +58,7 @@ class TestActions(unit_tests.test_utils.CharmTestCase):
"""Test failure interacting with vault_pki"""
self.mock_hookenv.is_leader.return_value = True
self.mock_hookenv.action_get.return_value = {
'cert-type': 'server',
'sans': 'foobar',
'common-name': 'bazbuz',
'ttl': '5m',

View File

@ -115,28 +115,45 @@ class TestLibCharmVaultPKI(unit_tests.test_utils.CharmTestCase):
@patch.object(vault_pki, 'is_ca_ready')
@patch.object(vault_pki, 'configure_pki_backend')
@patch.object(vault_pki.vault, 'get_local_client')
def test_generate_certificate(self, get_local_client,
@patch.object(vault_pki, 'openssl_generate_key_and_csr')
def test_generate_certificate(self, openssl_generate_key_and_csr,
get_local_client,
configure_pki_backend,
is_ca_ready,
sort_sans):
client_mock = mock.MagicMock()
client_mock.secrets.pki.generate_certificate.return_value = {
'data': 'data'}
'data': {'private_key': 'key1', 'certificate': 'cert1'},
}
openssl_generate_key_and_csr.return_value = ('key2', 'csr')
client_mock.secrets.pki.sign_intermediate.return_value = {
'data': {'certificate': 'cert2'},
}
get_local_client.return_value = client_mock
is_ca_ready.return_value = True
sort_sans.side_effect = lambda l: (l[0], l[1])
vault_pki.generate_certificate('server',
'example.com',
([], []),
ttl='3456h', max_ttl='3456h')
vault_pki.generate_certificate('server',
'example.com',
(['ip1'], ['alt1']),
ttl='3456h', max_ttl='3456h')
vault_pki.generate_certificate('client',
'example.com',
(['ip1', 'ip2'], ['alt1', 'alt2']),
ttl='3456h', max_ttl='3456h')
c1 = vault_pki.generate_certificate('server',
'example.com',
([], []),
ttl='3456h', max_ttl='3456h')
c2 = vault_pki.generate_certificate('server',
'example.com',
(['ip1'], ['alt1']),
ttl='3456h', max_ttl='3456h')
c3 = vault_pki.generate_certificate('client',
'example.com',
(['ip1', 'ip2'], ['alt1', 'alt2']),
ttl='3456h', max_ttl='3456h')
c4 = vault_pki.generate_certificate('intermediate',
'example.com',
(['ip1', 'ip2'], ['alt1', 'alt2']),
ttl='3456h', max_ttl='3456h')
assert c1 == {'certificate': 'cert1', 'private_key': 'key1'}
assert c2 == {'certificate': 'cert1', 'private_key': 'key1'}
assert c3 == {'certificate': 'cert1', 'private_key': 'key1'}
assert c4 == {'certificate': 'cert2', 'private_key': 'key2'}
client_mock.secrets.pki.generate_certificate.assert_has_calls([
mock.call(
vault_pki.CHARM_PKI_ROLE, 'example.com',
@ -161,6 +178,16 @@ class TestLibCharmVaultPKI(unit_tests.test_utils.CharmTestCase):
),
])
client_mock.secrets.pki.sign_intermediate.assert_has_calls([
mock.call(
csr='csr',
common_name='example.com',
extra_params={'ttl': '3456h', 'ip_sans': 'ip1,ip2',
'alt_names': 'alt1,alt2'},
mount_point=vault_pki.CHARM_PKI_MP,
)
])
@patch.object(vault_pki, 'is_ca_ready')
@patch.object(vault_pki, 'configure_pki_backend')
@patch.object(vault_pki.vault, 'get_local_client')
@ -203,6 +230,13 @@ class TestLibCharmVaultPKI(unit_tests.test_utils.CharmTestCase):
vault_pki.generate_certificate('server', 'example.com', [],
ttl='3456h', max_ttl='3456h')
client_mock.secrets.pki.sign_intermediate.side_effect = (
hvac.exceptions.InvalidRequest
)
with self.assertRaises(vault_pki.vault.VaultInvalidRequest):
vault_pki.generate_certificate('intermediate', 'example.com', [],
ttl='3456h', max_ttl='3456h')
@patch.object(vault_pki, 'configure_pki_backend')
@patch.object(vault_pki.vault, 'get_local_client')
def test_get_csr(self, get_local_client, configure_pki_backend):
@ -611,6 +645,39 @@ class TestLibCharmVaultPKI(unit_tests.test_utils.CharmTestCase):
"General failure verifying cert: on noes",
level=vault_pki.hookenv.DEBUG)
@patch.object(vault_pki, 'check_output')
@patch.object(vault_pki, 'NamedTemporaryFile')
def test_openssl_generate_key_and_csr(
self,
mock_named_temporary_file,
mock_check_output,
):
mock_check_output.side_effect = (b'key', b'csr')
mock_f = MagicMock()
mock_f.name = "filename"
mock_named_temporary_file.return_value.__enter__.return_value = mock_f
assert vault_pki.openssl_generate_key_and_csr("name") == ("key", "csr")
assert mock_check_output.mock_calls == [
mock.call(["openssl", "genrsa", "2048"]),
mock.call(["openssl", "req", "-new", "-sha256",
"-subj", "/CN=name", "-key", "filename"])
]
@patch.object(vault_pki, 'check_output')
def test_openssl_generate_key_and_csr_subprocess_error(self,
mock_check_output):
def _raise(*args, **kwargs):
raise vault_pki.CalledProcessError(cmd="bang", returncode=1)
mock_check_output.side_effect = _raise
with self.assertRaises(vault_pki.CalledProcessError):
vault_pki.openssl_generate_key_and_csr("test")
@patch.object(vault_pki, 'check_output')
@patch.object(vault_pki, 'NamedTemporaryFile')
@patch.object(vault_pki.hookenv, 'log')