Merge "Add scenario test for Prometheus over TLS"

This commit is contained in:
Zuul 2022-12-02 19:05:24 +00:00 committed by Gerrit Code Review
commit dddf6515ab
1 changed files with 75 additions and 0 deletions

View File

@ -28,6 +28,7 @@ from oslo_utils import uuidutils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
import testtools
from octavia_tempest_plugin.common import barbican_client_mgr
from octavia_tempest_plugin.common import cert_utils
@ -1539,3 +1540,77 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
self.check_members_balanced(self.lb_vip_address, protocol=const.HTTP,
protocol_port=85)
@decorators.idempotent_id('7d9dcae6-3e2c-4eae-9bfb-1ef0d00aa530')
@testtools.skipUnless(
CONF.loadbalancer_feature_enabled.prometheus_listener_enabled,
'PROMETHEUS listener tests are disabled in the tempest configuration.')
def test_tls_prometheus_client_auth_mandatory(self):
if not self.mem_listener_client.is_version_supported(
self.api_version, '2.25'):
raise self.skipException('Prometheus listeners are only available '
'on Octavia API version 2.25 or newer.')
LISTENER1_TCP_PORT = '9443'
listener_name = data_utils.rand_name(
"lb_member_listener1-prometheus-client-auth-mand")
listener_kwargs = {
const.NAME: listener_name,
const.PROTOCOL: const.PROMETHEUS,
const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
const.LOADBALANCER_ID: self.lb_id,
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
}
listener = self.mem_listener_client.create_listener(**listener_kwargs)
self.listener_id = listener[const.ID]
self.addCleanup(
self.mem_listener_client.cleanup_listener,
self.listener_id,
lb_client=self.mem_lb_client, lb_id=self.lb_id)
waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
self.lb_id, const.PROVISIONING_STATUS,
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
# Test that no client certificate fails to connect
self.assertRaises(
requests.exceptions.SSLError,
requests.get,
'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
timeout=12, verify=False)
# Test that a revoked client certificate fails to connect
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(self.revoked_client_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(self.revoked_client_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
self.assertRaises(
requests.exceptions.SSLError, requests.get,
'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER1_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))
# Test that a valid client certificate can connect
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
cert_file.write(self.client_cert.public_bytes(
serialization.Encoding.PEM))
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
key_file.write(self.client_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()))
response = requests.get(
'https://{0}:{1}'.format(self.lb_vip_address,
LISTENER1_TCP_PORT),
timeout=12, verify=False, cert=(cert_file.name,
key_file.name))
self.assertEqual(200, response.status_code)