Merge "Add listener client authentication scenario tests"

This commit is contained in:
Zuul 2019-12-09 23:05:45 +00:00 committed by Gerrit Code Review
commit 7cd22d2598
6 changed files with 566 additions and 23 deletions

View File

@ -63,15 +63,15 @@ class BarbicanClientManager(object):
# Setup the barbican client
self.barbican = client.Client(session=id_session)
def store_secret(self, pkcs12_secret):
def store_secret(self, secret):
"""Store a secret in barbican.
:param pkcs12_secret: A pkcs12 secret.
:param secret: A pkcs12 secret.
:returns: The barbican secret_ref.
"""
p12_secret = self.barbican.secrets.create()
p12_secret.name = data_utils.rand_name("lb_member_barbican_pkcs12")
p12_secret.payload = pkcs12_secret
p12_secret.name = data_utils.rand_name("lb_member_barbican")
p12_secret.payload = secret
secret_ref = p12_secret.store()
LOG.debug('Secret {0} has ref {1}'.format(p12_secret.name, secret_ref))
return secret_ref

View File

@ -58,6 +58,13 @@ def generate_ca_cert_and_key():
).add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
).add_extension(
# KeyUsage(digital_signature, content_commitment, key_encipherment,
# data_encipherment, key_agreement, key_cert_sign, crl_sign,
# encipher_only, decipher_only)
x509.KeyUsage(True, False, False, False, False,
True, True, False, False),
critical=True,
).sign(ca_key, hashes.SHA256(), default_backend())
return ca_cert, ca_key
@ -104,11 +111,66 @@ def generate_server_cert_and_key(ca_cert, ca_key, server_uuid):
).add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
).add_extension(
# KeyUsage(digital_signature, content_commitment, key_encipherment,
# data_encipherment, key_agreement, key_cert_sign, crl_sign,
# encipher_only, decipher_only)
x509.KeyUsage(True, False, True, False, False,
False, False, False, False),
critical=True,
).sign(ca_key, hashes.SHA256(), default_backend())
return server_cert, server_key
def generate_client_cert_and_key(ca_cert, ca_key, client_uuid):
"""Creates a client cert and key for testing.
:param ca_cert: A cryptography CA certificate (x509) object.
:param ca_key: A cryptography CA key (x509) object.
:param client_uuid: A UUID identifying the client.
:returns: The cryptography server cert and key objects.
"""
client_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend())
subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Denial"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"Corvallis"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"OpenStack"),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"Octavia"),
x509.NameAttribute(NameOID.COMMON_NAME, u"{}".format(client_uuid)),
])
client_cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
ca_cert.subject
).public_key(
client_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=10)
).add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
).add_extension(
# KeyUsage(digital_signature, content_commitment, key_encipherment,
# data_encipherment, key_agreement, key_cert_sign, crl_sign,
# encipher_only, decipher_only)
x509.KeyUsage(True, True, True, False, False, False,
False, False, False),
critical=True,
).sign(ca_key, hashes.SHA256(), default_backend())
return client_cert, client_key
def generate_pkcs12_bundle(server_cert, server_key):
"""Creates a pkcs12 formated bundle.
@ -128,3 +190,28 @@ def generate_pkcs12_bundle(server_cert, server_key):
OpenSSL.crypto.PKey.from_cryptography_key(server_key))
pkcs12.set_certificate(OpenSSL.crypto.X509.from_cryptography(server_cert))
return pkcs12.export()
def generate_certificate_revocation_list(ca_cert, ca_key, cert_to_revoke):
"""Create a certificate revocation list with a revoked certificate.
:param ca_cert: A cryptography CA certificate (x509) object.
:param ca_key: A cryptography CA key (x509) object.
:param cert_to_revoke: A cryptography CA certificate (x509) object.
:returns: A signed certificate revocation list.
"""
crl_builder = x509.CertificateRevocationListBuilder()
crl_builder = crl_builder.issuer_name(ca_cert.subject)
crl_builder = crl_builder.last_update(datetime.datetime.today())
crl_builder = crl_builder.next_update(datetime.datetime.today() +
datetime.timedelta(1, 0, 0))
revoked_cert = x509.RevokedCertificateBuilder().serial_number(
cert_to_revoke.serial_number
).revocation_date(
datetime.datetime.today()
).build(default_backend())
crl_builder = crl_builder.add_revoked_certificate(revoked_cert)
return crl_builder.sign(private_key=ca_key, algorithm=hashes.SHA256(),
backend=default_backend())

View File

@ -20,6 +20,12 @@ AVAILABILITY_ZONE_PROFILE_ID = 'availability_zone_profile_id'
ADMIN_STATE_UP = 'admin_state_up'
BYTES_IN = 'bytes_in'
BYTES_OUT = 'bytes_out'
CLIENT_AUTHENTICATION = 'client_authentication'
CLIENT_AUTH_NONE = 'NONE'
CLIENT_AUTH_OPTIONAL = 'OPTIONAL'
CLIENT_AUTH_MANDATORY = 'MANDATORY'
CLIENT_CA_TLS_CONTAINER_REF = 'client_ca_tls_container_ref'
CLIENT_CRL_CONTAINER_REF = 'client_crl_container_ref'
CREATED_AT = 'created_at'
DESCRIPTION = 'description'
FLAVOR_DATA = 'flavor_data'

View File

@ -35,7 +35,10 @@ class ListenerClient(base_client.BaseLBaaSClient):
timeout_member_data=Unset, timeout_tcp_inspect=Unset,
insert_headers=Unset, default_pool_id=Unset,
default_tls_container_ref=Unset,
sni_container_refs=Unset, return_object_only=True):
sni_container_refs=Unset, client_authentication=Unset,
client_ca_tls_container_ref=Unset,
client_crl_container_ref=Unset,
return_object_only=True):
"""Create a listener.
:param protocol: The protocol for the resource.
@ -70,6 +73,17 @@ class ListenerClient(base_client.BaseLBaaSClient):
secrets containing PKCS12 format
certificate/key bundles for TERMINATED_TLS
listeners.
:param client_authentication: The TLS client authentication mode. One
of the options NONE, OPTIONAL or
MANDATORY.
:param client_ca_tls_container_ref: The ref of the key manager service
secret containing a PEM format
client CA certificate bundle for
TERMINATED_HTTPS listeners.
:param client_crl_container_ref: The URI of the key manager service
secret containing a PEM format CA
revocation list file for
TERMINATED_HTTPS listeners.
:param return_object_only: If True, the response returns the object
inside the root tag. False returns the full
response from the API.
@ -190,7 +204,10 @@ class ListenerClient(base_client.BaseLBaaSClient):
timeout_member_data=Unset, timeout_tcp_inspect=Unset,
insert_headers=Unset, default_pool_id=Unset,
default_tls_container_ref=Unset,
sni_container_refs=Unset, return_object_only=True):
sni_container_refs=Unset, client_authentication=Unset,
client_ca_tls_container_ref=Unset,
client_crl_container_ref=Unset,
return_object_only=True):
"""Update a listener.
:param listener_id: The listener ID to update.
@ -223,6 +240,17 @@ class ListenerClient(base_client.BaseLBaaSClient):
secrets containing PKCS12 format
certificate/key bundles for TERMINATED_TLS
listeners.
:param client_authentication: The TLS client authentication mode. One
of the options NONE, OPTIONAL or
MANDATORY.
:param client_ca_tls_container_ref: The ref of the key manager service
secret containing a PEM format
client CA certificate bundle for
TERMINATED_HTTPS listeners.
:param client_crl_container_ref: The URI of the key manager service
secret containing a PEM format CA
revocation list file for
TERMINATED_HTTPS listeners.
:param return_object_only: If True, the response returns the object
inside the root tag. False returns the full
response from the API.

View File

@ -13,7 +13,9 @@
# under the License.
import base64
import requests
import socket
import tempfile
from cryptography.hazmat.primitives import serialization
from OpenSSL.crypto import X509
@ -52,6 +54,25 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
raise cls.skipException('TLS with Barbican tests require the '
'barbican service.')
@classmethod
def _store_secret(cls, barbican_mgr, secret):
new_secret_ref = barbican_mgr.store_secret(secret)
cls.addClassResourceCleanup(barbican_mgr.delete_secret,
new_secret_ref)
# Set the barbican ACL if the Octavia API version doesn't do it
# automatically.
if not cls.mem_lb_client.is_version_supported(
cls.api_version, '2.1'):
user_list = cls.os_admin.users_v3_client.list_users(
name=CONF.load_balancer.octavia_svc_username)
msg = 'Only one user named "{0}" should exist, {1} found.'.format(
CONF.load_balancer.octavia_svc_username,
len(user_list['users']))
assert 1 == len(user_list['users']), msg
barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id'])
return new_secret_ref
@classmethod
def _generate_load_certificate(cls, barbican_mgr, ca_cert, ca_key, name):
new_cert, new_key = cert_utils.generate_server_cert_and_key(
@ -72,20 +93,8 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
pkcs12 = cert_utils.generate_pkcs12_bundle(new_cert, new_key)
LOG.debug('%s PKCS12 bundle: %s', name, base64.b64encode(pkcs12))
new_secret_ref = barbican_mgr.store_secret(pkcs12)
cls.addClassResourceCleanup(barbican_mgr.delete_secret, new_secret_ref)
new_secret_ref = cls._store_secret(barbican_mgr, pkcs12)
# Set the barbican ACL if the Octavia API version doesn't do it
# automatically.
if not cls.mem_lb_client.is_version_supported(
cls.api_version, '2.1'):
user_list = cls.os_admin.users_v3_client.list_users(
name=CONF.load_balancer.octavia_svc_username)
msg = 'Only one user named "{0}" should exist, {1} found.'.format(
CONF.load_balancer.octavia_svc_username,
len(user_list['users']))
assert 1 == len(user_list['users']), msg
barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id'])
return new_cert, new_key, new_secret_ref
@classmethod
@ -108,7 +117,7 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
# Load the secret into the barbican service under the
# os_roles_lb_member tenant
barbican_mgr = barbican_client_mgr.BarbicanClientManager(
cls.barbican_mgr = barbican_client_mgr.BarbicanClientManager(
cls.os_roles_lb_member)
# Create a server cert and key
@ -117,7 +126,7 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
LOG.debug('Server (default) UUID: %s' % cls.server_uuid)
server_cert, server_key, cls.server_secret_ref = (
cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
ca_key, cls.server_uuid))
# Create the SNI1 cert and key
@ -125,7 +134,7 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
LOG.debug('SNI1 UUID: %s' % cls.SNI1_uuid)
SNI1_cert, SNI1_key, cls.SNI1_secret_ref = (
cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
ca_key, cls.SNI1_uuid))
# Create the SNI2 cert and key
@ -133,9 +142,37 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
LOG.debug('SNI2 UUID: %s' % cls.SNI2_uuid)
SNI2_cert, SNI2_key, cls.SNI2_secret_ref = (
cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
ca_key, cls.SNI2_uuid))
# Create the client authentication CA
cls.client_ca_cert, client_ca_key = (
cert_utils.generate_ca_cert_and_key())
cls.client_ca_cert_ref = cls._store_secret(
cls.barbican_mgr,
cls.client_ca_cert.public_bytes(serialization.Encoding.PEM))
# Create client cert and key
cls.client_cn = uuidutils.generate_uuid()
cls.client_cert, cls.client_key = (
cert_utils.generate_client_cert_and_key(
cls.client_ca_cert, client_ca_key, cls.client_cn))
# Create revoked client cert and key
cls.revoked_client_cn = uuidutils.generate_uuid()
cls.revoked_client_cert, cls.revoked_client_key = (
cert_utils.generate_client_cert_and_key(
cls.client_ca_cert, client_ca_key, cls.revoked_client_cn))
# Create certificate revocation list and revoke cert
cls.client_crl = cert_utils.generate_certificate_revocation_list(
cls.client_ca_cert, client_ca_key, cls.revoked_client_cert)
cls.client_crl_ref = cls._store_secret(
cls.barbican_mgr,
cls.client_crl.public_bytes(serialization.Encoding.PEM))
# Setup a load balancer for the tests to use
lb_name = data_utils.rand_name("lb_member_lb1-tls")
lb_kwargs = {const.PROVIDER: CONF.load_balancer.provider,
@ -618,3 +655,384 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
sock.connect((self.lb_vip_address, 8443))
# Validate the certificate is signed by the ca_cert we created
sock.do_handshake()
@decorators.idempotent_id('af6bb7d2-acbb-4f6e-861f-39a2a3f02331')
def test_tls_client_auth_mandatory(self):
if not self.mem_listener_client.is_version_supported(
self.api_version, '2.8'):
raise self.skipException('TLS client authentication '
'is only available on Octavia API '
'version 2.8 or newer.')
LISTENER1_TCP_PORT = '443'
listener_name = data_utils.rand_name(
"lb_member_listener1-client-auth-mand")
listener_kwargs = {
const.NAME: listener_name,
const.PROTOCOL: const.TERMINATED_HTTPS,
const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
const.LOADBALANCER_ID: self.lb_id,
const.DEFAULT_POOL_ID: self.pool_id,
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
}
listener = self.mem_listener_client.create_listener(**listener_kwargs)
self.listener_id = listener[const.ID]
self.addCleanup(
self.mem_listener_client.cleanup_listener,
self.listener_id,
lb_client=self.mem_lb_client, lb_id=self.lb_id)
waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
self.lb_id, const.PROVISIONING_STATUS,
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
# Test that no client certificate fails to connect
self.assertRaisesRegex(
requests.exceptions.SSLError, ".*certificate required.*",
requests.get,
'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
timeout=12, verify=False)
# Test that a revoked client certificate fails to connect
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(self.revoked_client_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(self.revoked_client_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
self.assertRaisesRegex(
requests.exceptions.SSLError, ".*revoked.*", requests.get,
'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER1_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))
# Test that a valid client certificate can connect
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(self.client_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(self.client_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
response = requests.get(
'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER1_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))
self.assertEqual(200, response.status_code)
@decorators.idempotent_id('42d696bf-e7f5-44f0-9331-4a5e01d69ef3')
def test_tls_client_auth_optional(self):
if not self.mem_listener_client.is_version_supported(
self.api_version, '2.8'):
raise self.skipException('TLS client authentication '
'is only available on Octavia API '
'version 2.8 or newer.')
LISTENER1_TCP_PORT = '443'
listener_name = data_utils.rand_name(
"lb_member_listener1-client-auth-optional")
listener_kwargs = {
const.NAME: listener_name,
const.PROTOCOL: const.TERMINATED_HTTPS,
const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
const.LOADBALANCER_ID: self.lb_id,
const.DEFAULT_POOL_ID: self.pool_id,
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_OPTIONAL,
const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
}
listener = self.mem_listener_client.create_listener(**listener_kwargs)
self.listener_id = listener[const.ID]
self.addCleanup(
self.mem_listener_client.cleanup_listener,
self.listener_id,
lb_client=self.mem_lb_client, lb_id=self.lb_id)
waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
self.lb_id, const.PROVISIONING_STATUS,
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
# Test that no client certificate connects
response = requests.get(
'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
timeout=12, verify=False)
self.assertEqual(200, response.status_code)
# Test that a revoked client certificate fails to connect
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(self.revoked_client_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(self.revoked_client_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
self.assertRaisesRegex(
requests.exceptions.SSLError, ".*revoked.*", requests.get,
'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER1_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))
# Test that a valid client certificate can connect
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(self.client_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(self.client_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
response = requests.get(
'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER1_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))
self.assertEqual(200, response.status_code)
@decorators.idempotent_id('13271ce6-f9f7-4017-a017-c2fc390b9438')
def test_tls_multi_listener_client_auth(self):
"""Test client authentication in a multi-listener LB.
Validates that certificates and CRLs don't get cross configured
between multiple listeners on the same load balancer.
"""
if not self.mem_listener_client.is_version_supported(
self.api_version, '2.8'):
raise self.skipException('TLS client authentication '
'is only available on Octavia API '
'version 2.8 or newer.')
# Create the client2 authentication CA
client2_ca_cert, client2_ca_key = (
cert_utils.generate_ca_cert_and_key())
client2_ca_cert_ref = self._store_secret(
self.barbican_mgr,
client2_ca_cert.public_bytes(serialization.Encoding.PEM))
# Create client2 cert and key
client2_cn = uuidutils.generate_uuid()
client2_cert, client2_key = (
cert_utils.generate_client_cert_and_key(
client2_ca_cert, client2_ca_key, client2_cn))
# Create revoked client2 cert and key
revoked_client2_cn = uuidutils.generate_uuid()
revoked_client2_cert, revoked_client2_key = (
cert_utils.generate_client_cert_and_key(
client2_ca_cert, client2_ca_key, revoked_client2_cn))
# Create certificate revocation list and revoke cert
client2_crl = cert_utils.generate_certificate_revocation_list(
client2_ca_cert, client2_ca_key, revoked_client2_cert)
client2_crl_ref = self._store_secret(
self.barbican_mgr,
client2_crl.public_bytes(serialization.Encoding.PEM))
LISTENER1_TCP_PORT = '443'
listener_name = data_utils.rand_name(
"lb_member_listener1-multi-list-client-auth")
listener_kwargs = {
const.NAME: listener_name,
const.PROTOCOL: const.TERMINATED_HTTPS,
const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
const.LOADBALANCER_ID: self.lb_id,
const.DEFAULT_POOL_ID: self.pool_id,
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
}
listener = self.mem_listener_client.create_listener(**listener_kwargs)
self.listener_id = listener[const.ID]
self.addCleanup(
self.mem_listener_client.cleanup_listener,
self.listener_id,
lb_client=self.mem_lb_client, lb_id=self.lb_id)
waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
self.lb_id, const.PROVISIONING_STATUS,
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
LISTENER2_TCP_PORT = '8443'
listener_name = data_utils.rand_name(
"lb_member_listener2-multi-list-client-auth")
listener_kwargs = {
const.NAME: listener_name,
const.PROTOCOL: const.TERMINATED_HTTPS,
const.PROTOCOL_PORT: LISTENER2_TCP_PORT,
const.LOADBALANCER_ID: self.lb_id,
const.DEFAULT_POOL_ID: self.pool_id,
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
const.CLIENT_CA_TLS_CONTAINER_REF: client2_ca_cert_ref,
const.CLIENT_CRL_CONTAINER_REF: client2_crl_ref,
}
listener2 = self.mem_listener_client.create_listener(**listener_kwargs)
self.listener2_id = listener2[const.ID]
self.addCleanup(
self.mem_listener_client.cleanup_listener,
self.listener2_id,
lb_client=self.mem_lb_client, lb_id=self.lb_id)
waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
self.lb_id, const.PROVISIONING_STATUS,
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
# Test that no client certificate fails to connect to listener1
self.assertRaisesRegex(
requests.exceptions.SSLError, ".*certificate required.*",
requests.get,
'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
timeout=12, verify=False)
# Test that no client certificate fails to connect to listener2
self.assertRaisesRegex(
requests.exceptions.SSLError, ".*certificate required.*",
requests.get,
'https://{0}:{1}'.format(self.lb_vip_address, LISTENER2_TCP_PORT),
timeout=12, verify=False)
# Test that a revoked client certificate fails to connect
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(self.revoked_client_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(self.revoked_client_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
self.assertRaisesRegex(
requests.exceptions.SSLError, ".*revoked.*", requests.get,
'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER1_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))
# Test that a revoked client2 certificate fails to connect
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(revoked_client2_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(revoked_client2_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
self.assertRaisesRegex(
requests.exceptions.SSLError, ".*revoked.*", requests.get,
'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER2_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))
# Test that a valid client certificate can connect to listener1
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(self.client_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(self.client_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
response = requests.get(
'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER1_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))
self.assertEqual(200, response.status_code)
# Test that a valid client2 certificate can connect to listener2
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(client2_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(client2_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
response = requests.get(
'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER2_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))
self.assertEqual(200, response.status_code)
# Test that a valid client1 certificate can not connect to listener2
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(self.client_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(self.client_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
self.assertRaisesRegex(
requests.exceptions.SSLError, ".*decrypt error.*",
requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER2_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))
# Test that a valid client2 certificate can not connect to listener1
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(client2_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(client2_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
self.assertRaisesRegex(
requests.exceptions.SSLError, ".*decrypt error.*",
requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER1_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))
# Test that a revoked client1 certificate can not connect to listener2
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(self.revoked_client_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(self.revoked_client_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
self.assertRaisesRegex(
requests.exceptions.SSLError, ".*decrypt error.*",
requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER2_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))
# Test that a revoked client2 certificate can not connect to listener1
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(revoked_client2_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(revoked_client2_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
self.assertRaisesRegex(
requests.exceptions.SSLError, ".*decrypt error.*",
requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER1_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))

View File

@ -0,0 +1,4 @@
---
features:
- |
Adds scenario tests for listener client authentication.