Add TLS SNI scenario tests
This patch adds scenario tests that exercise the SNI capabilities of the Octavia TLS offloading. Depends-On: https://review.opendev.org/690444 Change-Id: I4bbd103e34997dd6b1bb64cb5d69b5135c6e26ea
This commit is contained in:
parent
9c7fe1f877
commit
402de7d80c
|
@ -52,6 +52,42 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
|
|||
raise cls.skipException('TLS with Barbican tests require the '
|
||||
'barbican service.')
|
||||
|
||||
@classmethod
|
||||
def _generate_load_certificate(cls, barbican_mgr, ca_cert, ca_key, name):
|
||||
new_cert, new_key = cert_utils.generate_server_cert_and_key(
|
||||
ca_cert, ca_key, name)
|
||||
|
||||
LOG.debug('%s Cert: %s', name, new_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
LOG.debug('%s private Key: %s', name, new_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption()))
|
||||
new_public_key = new_key.public_key()
|
||||
LOG.debug('%s public Key: %s', name, new_public_key.public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo))
|
||||
|
||||
# Create the pkcs12 bundle
|
||||
pkcs12 = cert_utils.generate_pkcs12_bundle(new_cert, new_key)
|
||||
LOG.debug('%s PKCS12 bundle: %s', name, base64.b64encode(pkcs12))
|
||||
|
||||
new_secret_ref = barbican_mgr.store_secret(pkcs12)
|
||||
cls.addClassResourceCleanup(barbican_mgr.delete_secret, new_secret_ref)
|
||||
|
||||
# Set the barbican ACL if the Octavia API version doesn't do it
|
||||
# automatically.
|
||||
if not cls.mem_lb_client.is_version_supported(
|
||||
cls.api_version, '2.1'):
|
||||
user_list = cls.os_admin.users_v3_client.list_users(
|
||||
name=CONF.load_balancer.octavia_svc_username)
|
||||
msg = 'Only one user named "{0}" should exist, {1} found.'.format(
|
||||
CONF.load_balancer.octavia_svc_username,
|
||||
len(user_list['users']))
|
||||
assert 1 == len(user_list['users']), msg
|
||||
barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id'])
|
||||
return new_cert, new_key, new_secret_ref
|
||||
|
||||
@classmethod
|
||||
def resource_setup(cls):
|
||||
"""Setup resources needed by the tests."""
|
||||
|
@ -70,45 +106,35 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
|
|||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo))
|
||||
|
||||
# Create a server cert and key
|
||||
cls.server_uuid = uuidutils.generate_uuid()
|
||||
server_cert, server_key = cert_utils.generate_server_cert_and_key(
|
||||
cls.ca_cert, ca_key, cls.server_uuid)
|
||||
|
||||
LOG.debug('Server Cert: %s' % server_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
LOG.debug('Server private Key: %s' % server_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption()))
|
||||
server_public_key = server_key.public_key()
|
||||
LOG.debug('Server public Key: %s' % server_public_key.public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo))
|
||||
|
||||
# Create the pkcs12 bundle
|
||||
pkcs12 = cert_utils.generate_pkcs12_bundle(server_cert, server_key)
|
||||
LOG.debug('Server PKCS12 bundle: %s' % base64.b64encode(pkcs12))
|
||||
|
||||
# Load the secret into the barbican service under the
|
||||
# os_roles_lb_member tenant
|
||||
barbican_mgr = barbican_client_mgr.BarbicanClientManager(
|
||||
cls.os_roles_lb_member)
|
||||
|
||||
cls.secret_ref = barbican_mgr.store_secret(pkcs12)
|
||||
cls.addClassResourceCleanup(barbican_mgr.delete_secret, cls.secret_ref)
|
||||
# Create a server cert and key
|
||||
# This will be used as the "default certificate" in SNI tests.
|
||||
cls.server_uuid = uuidutils.generate_uuid()
|
||||
LOG.debug('Server (default) UUID: %s' % cls.server_uuid)
|
||||
|
||||
# Set the barbican ACL if the Octavia API version doesn't do it
|
||||
# automatically.
|
||||
if not cls.mem_lb_client.is_version_supported(
|
||||
cls.api_version, '2.1'):
|
||||
user_list = cls.os_admin.users_v3_client.list_users(
|
||||
name=CONF.load_balancer.octavia_svc_username)
|
||||
msg = 'Only one user named "{0}" should exist, {1} found.'.format(
|
||||
CONF.load_balancer.octavia_svc_username,
|
||||
len(user_list['users']))
|
||||
assert 1 == len(user_list['users']), msg
|
||||
barbican_mgr.add_acl(cls.secret_ref, user_list['users'][0]['id'])
|
||||
server_cert, server_key, cls.server_secret_ref = (
|
||||
cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
|
||||
ca_key, cls.server_uuid))
|
||||
|
||||
# Create the SNI1 cert and key
|
||||
cls.SNI1_uuid = uuidutils.generate_uuid()
|
||||
LOG.debug('SNI1 UUID: %s' % cls.SNI1_uuid)
|
||||
|
||||
SNI1_cert, SNI1_key, cls.SNI1_secret_ref = (
|
||||
cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
|
||||
ca_key, cls.SNI1_uuid))
|
||||
|
||||
# Create the SNI2 cert and key
|
||||
cls.SNI2_uuid = uuidutils.generate_uuid()
|
||||
LOG.debug('SNI2 UUID: %s' % cls.SNI2_uuid)
|
||||
|
||||
SNI2_cert, SNI2_key, cls.SNI2_secret_ref = (
|
||||
cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
|
||||
ca_key, cls.SNI2_uuid))
|
||||
|
||||
# Setup a load balancer for the tests to use
|
||||
lb_name = data_utils.rand_name("lb_member_lb1-tls")
|
||||
|
@ -224,7 +250,7 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
|
|||
const.PROTOCOL_PORT: '443',
|
||||
const.LOADBALANCER_ID: self.lb_id,
|
||||
const.DEFAULT_POOL_ID: self.pool_id,
|
||||
const.DEFAULT_TLS_CONTAINER_REF: self.secret_ref,
|
||||
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
|
||||
}
|
||||
listener = self.mem_listener_client.create_listener(**listener_kwargs)
|
||||
self.listener_id = listener[const.ID]
|
||||
|
@ -242,7 +268,7 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
|
|||
# Test HTTPS listener load balancing.
|
||||
# Note: certificate validation tests will follow this test
|
||||
self.check_members_balanced(self.lb_vip_address, protocol='https',
|
||||
verify=False)
|
||||
verify=False, protocol_port=443)
|
||||
|
||||
def _verify_cb(connection, x509, errno, errdepth, retcode):
|
||||
"""Callback for certificate validation."""
|
||||
|
@ -250,9 +276,17 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
|
|||
if errdepth != 0:
|
||||
return True
|
||||
if errno == 0:
|
||||
received_cn = x509.get_subject().commonName
|
||||
received_name = self._get_cert_name(received_cn)
|
||||
expected_cn = '{}.example.com'.format(self.server_uuid)
|
||||
msg = ('ERROR: Received certificate "{received_name}" with CN '
|
||||
'{received_cn} is not the expected certificate '
|
||||
'"default" with CN {expected_cn}.'.format(
|
||||
received_name=received_name,
|
||||
received_cn=received_cn,
|
||||
expected_cn=expected_cn))
|
||||
# Make sure the certificate is the one we generated
|
||||
self.assertEqual('{}.example.com'.format(self.server_uuid),
|
||||
x509.get_subject().commonName)
|
||||
self.assertEqual(expected_cn, received_cn, message=msg)
|
||||
else:
|
||||
LOG.error('Certificate with CN: {0} failed validation with '
|
||||
'OpenSSL verify errno {1}'.format(
|
||||
|
@ -270,3 +304,317 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
|
|||
sock.connect((self.lb_vip_address, 443))
|
||||
# Validate the certificate is signed by the ca_cert we created
|
||||
sock.do_handshake()
|
||||
|
||||
@decorators.idempotent_id('08405802-4411-4454-b008-8607408f424a')
|
||||
def test_basic_tls_SNI_traffic(self):
|
||||
|
||||
listener_name = data_utils.rand_name("lb_member_listener1-tls-sni")
|
||||
listener_kwargs = {
|
||||
const.NAME: listener_name,
|
||||
const.PROTOCOL: const.TERMINATED_HTTPS,
|
||||
const.PROTOCOL_PORT: '443',
|
||||
const.LOADBALANCER_ID: self.lb_id,
|
||||
const.DEFAULT_POOL_ID: self.pool_id,
|
||||
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
|
||||
const.SNI_CONTAINER_REFS: [self.SNI1_secret_ref,
|
||||
self.SNI2_secret_ref],
|
||||
}
|
||||
listener = self.mem_listener_client.create_listener(**listener_kwargs)
|
||||
self.listener_id = listener[const.ID]
|
||||
self.addCleanup(
|
||||
self.mem_listener_client.cleanup_listener,
|
||||
self.listener_id,
|
||||
lb_client=self.mem_lb_client, lb_id=self.lb_id)
|
||||
|
||||
waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
|
||||
self.lb_id, const.PROVISIONING_STATUS,
|
||||
const.ACTIVE,
|
||||
CONF.load_balancer.build_interval,
|
||||
CONF.load_balancer.build_timeout)
|
||||
|
||||
# Test HTTPS listener load balancing.
|
||||
# Note: certificate validation tests will follow this test
|
||||
self.check_members_balanced(self.lb_vip_address, protocol='https',
|
||||
verify=False, protocol_port=443)
|
||||
|
||||
def _verify_server_cb(connection, x509, errno, errdepth, retcode):
|
||||
return _verify_cb(connection, x509, errno, errdepth, retcode,
|
||||
name=self.server_uuid)
|
||||
|
||||
def _verify_SNI1_cb(connection, x509, errno, errdepth, retcode):
|
||||
return _verify_cb(connection, x509, errno, errdepth, retcode,
|
||||
name=self.SNI1_uuid)
|
||||
|
||||
def _verify_SNI2_cb(connection, x509, errno, errdepth, retcode):
|
||||
return _verify_cb(connection, x509, errno, errdepth, retcode,
|
||||
name=self.SNI2_uuid)
|
||||
|
||||
def _verify_cb(connection, x509, errno, errdepth, retcode, name):
|
||||
"""Callback for certificate validation."""
|
||||
# don't validate names of root certificates
|
||||
if errdepth != 0:
|
||||
return True
|
||||
if errno == 0:
|
||||
received_cn = x509.get_subject().commonName
|
||||
received_name = self._get_cert_name(received_cn)
|
||||
expected_cn = '{}.example.com'.format(name)
|
||||
expected_name = self._get_cert_name(name)
|
||||
msg = ('ERROR: Received certificate "{received_name}" with CN '
|
||||
'{received_cn} is not the expected certificate '
|
||||
'"{expected_name}" with CN {expected_cn}.'.format(
|
||||
received_name=received_name,
|
||||
received_cn=received_cn,
|
||||
expected_name=expected_name,
|
||||
expected_cn=expected_cn))
|
||||
# Make sure the certificate is the one we generated
|
||||
self.assertEqual(expected_cn, received_cn, message=msg)
|
||||
else:
|
||||
LOG.error('Certificate with CN: {0} failed validation with '
|
||||
'OpenSSL verify errno {1}'.format(
|
||||
x509.get_subject().commonName, errno))
|
||||
return False
|
||||
return True
|
||||
|
||||
# Test that the default certificate is used with no SNI host request
|
||||
context = SSL.Context(SSL.SSLv23_METHOD)
|
||||
context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
|
||||
_verify_server_cb)
|
||||
ca_store = context.get_cert_store()
|
||||
ca_store.add_cert(X509.from_cryptography(self.ca_cert))
|
||||
sock = socket.socket()
|
||||
sock = SSL.Connection(context, sock)
|
||||
sock.connect((self.lb_vip_address, 443))
|
||||
# Validate the certificate is signed by the ca_cert we created
|
||||
sock.do_handshake()
|
||||
|
||||
# Test that the default certificate is used with bogus SNI host request
|
||||
context = SSL.Context(SSL.TLSv1_METHOD)
|
||||
context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
|
||||
_verify_server_cb)
|
||||
ca_store = context.get_cert_store()
|
||||
ca_store.add_cert(X509.from_cryptography(self.ca_cert))
|
||||
sock = socket.socket()
|
||||
sock = SSL.Connection(context, sock)
|
||||
sock.set_tlsext_host_name('bogus.example.com'.encode())
|
||||
sock.connect((self.lb_vip_address, 443))
|
||||
# Validate the certificate is signed by the ca_cert we created
|
||||
sock.do_handshake()
|
||||
|
||||
# Test that the SNI1 certificate is used when SNI1 host is specified
|
||||
context = SSL.Context(SSL.TLSv1_METHOD)
|
||||
context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
|
||||
_verify_SNI1_cb)
|
||||
ca_store = context.get_cert_store()
|
||||
ca_store.add_cert(X509.from_cryptography(self.ca_cert))
|
||||
sock = socket.socket()
|
||||
sock = SSL.Connection(context, sock)
|
||||
sock.set_tlsext_host_name(
|
||||
'{}.example.com'.format(self.SNI1_uuid).encode())
|
||||
sock.connect((self.lb_vip_address, 443))
|
||||
# Validate the certificate is signed by the ca_cert we created
|
||||
sock.do_handshake()
|
||||
|
||||
# Test that the SNI2 certificate is used when SNI2 host is specified
|
||||
context = SSL.Context(SSL.SSLv23_METHOD)
|
||||
context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
|
||||
_verify_SNI2_cb)
|
||||
ca_store = context.get_cert_store()
|
||||
ca_store.add_cert(X509.from_cryptography(self.ca_cert))
|
||||
sock = socket.socket()
|
||||
sock = SSL.Connection(context, sock)
|
||||
sock.set_tlsext_host_name(
|
||||
'{}.example.com'.format(self.SNI2_uuid).encode())
|
||||
sock.connect((self.lb_vip_address, 443))
|
||||
# Validate the certificate is signed by the ca_cert we created
|
||||
sock.do_handshake()
|
||||
|
||||
def _get_cert_name(self, lookup_string):
|
||||
if self.server_uuid in lookup_string:
|
||||
return 'default'
|
||||
elif self.SNI1_uuid in lookup_string:
|
||||
return 'SNI1'
|
||||
elif self.SNI2_uuid in lookup_string:
|
||||
return 'SNI2'
|
||||
else:
|
||||
return 'Unknown'
|
||||
|
||||
@decorators.idempotent_id('bfac9bf4-8cd0-4519-8d99-5ad0c75abf5c')
|
||||
def test_basic_tls_SNI_multi_listener_traffic(self):
|
||||
"""Make sure certificates are only used on the correct listeners."""
|
||||
|
||||
listener_name = data_utils.rand_name("lb_member_listener1-tls-sni")
|
||||
listener_kwargs = {
|
||||
const.NAME: listener_name,
|
||||
const.PROTOCOL: const.TERMINATED_HTTPS,
|
||||
const.PROTOCOL_PORT: '443',
|
||||
const.LOADBALANCER_ID: self.lb_id,
|
||||
const.DEFAULT_POOL_ID: self.pool_id,
|
||||
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
|
||||
const.SNI_CONTAINER_REFS: [self.SNI1_secret_ref],
|
||||
}
|
||||
listener = self.mem_listener_client.create_listener(**listener_kwargs)
|
||||
self.listener_id = listener[const.ID]
|
||||
self.addCleanup(
|
||||
self.mem_listener_client.cleanup_listener,
|
||||
self.listener_id,
|
||||
lb_client=self.mem_lb_client, lb_id=self.lb_id)
|
||||
|
||||
waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
|
||||
self.lb_id, const.PROVISIONING_STATUS,
|
||||
const.ACTIVE,
|
||||
CONF.load_balancer.build_interval,
|
||||
CONF.load_balancer.build_timeout)
|
||||
|
||||
# Test HTTPS listener load balancing.
|
||||
# Note: certificate validation tests will follow this test
|
||||
self.check_members_balanced(self.lb_vip_address, protocol='https',
|
||||
verify=False, protocol_port=443)
|
||||
|
||||
listener2_name = data_utils.rand_name("lb_member_listener2-tls-sni")
|
||||
listener2_kwargs = {
|
||||
const.NAME: listener2_name,
|
||||
const.PROTOCOL: const.TERMINATED_HTTPS,
|
||||
const.PROTOCOL_PORT: '8443',
|
||||
const.LOADBALANCER_ID: self.lb_id,
|
||||
const.DEFAULT_POOL_ID: self.pool_id,
|
||||
const.DEFAULT_TLS_CONTAINER_REF: self.SNI2_secret_ref,
|
||||
}
|
||||
listener2 = self.mem_listener_client.create_listener(
|
||||
**listener2_kwargs)
|
||||
self.listener2_id = listener2[const.ID]
|
||||
self.addCleanup(
|
||||
self.mem_listener_client.cleanup_listener,
|
||||
self.listener2_id,
|
||||
lb_client=self.mem_lb_client, lb_id=self.lb_id)
|
||||
|
||||
waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
|
||||
self.lb_id, const.PROVISIONING_STATUS,
|
||||
const.ACTIVE,
|
||||
CONF.load_balancer.build_interval,
|
||||
CONF.load_balancer.build_timeout)
|
||||
|
||||
# Test HTTPS listener load balancing.
|
||||
# Note: certificate validation tests will follow this test
|
||||
self.check_members_balanced(self.lb_vip_address, protocol='https',
|
||||
verify=False, protocol_port=8443)
|
||||
|
||||
def _verify_server_cb(connection, x509, errno, errdepth, retcode):
|
||||
return _verify_cb(connection, x509, errno, errdepth, retcode,
|
||||
name=self.server_uuid)
|
||||
|
||||
def _verify_SNI1_cb(connection, x509, errno, errdepth, retcode):
|
||||
return _verify_cb(connection, x509, errno, errdepth, retcode,
|
||||
name=self.SNI1_uuid)
|
||||
|
||||
def _verify_SNI2_cb(connection, x509, errno, errdepth, retcode):
|
||||
return _verify_cb(connection, x509, errno, errdepth, retcode,
|
||||
name=self.SNI2_uuid)
|
||||
|
||||
def _verify_cb(connection, x509, errno, errdepth, retcode, name):
|
||||
"""Callback for certificate validation."""
|
||||
# don't validate names of root certificates
|
||||
if errdepth != 0:
|
||||
return True
|
||||
if errno == 0:
|
||||
received_cn = x509.get_subject().commonName
|
||||
received_name = self._get_cert_name(received_cn)
|
||||
expected_cn = '{}.example.com'.format(name)
|
||||
expected_name = self._get_cert_name(name)
|
||||
msg = ('ERROR: Received certificate "{received_name}" with CN '
|
||||
'{received_cn} is not the expected certificate '
|
||||
'"{expected_name}" with CN {expected_cn}.'.format(
|
||||
received_name=received_name,
|
||||
received_cn=received_cn,
|
||||
expected_name=expected_name,
|
||||
expected_cn=expected_cn))
|
||||
# Make sure the certificate is the one we generated
|
||||
self.assertEqual(expected_cn, received_cn, message=msg)
|
||||
else:
|
||||
LOG.error('Certificate with CN: {0} failed validation with '
|
||||
'OpenSSL verify errno {1}'.format(
|
||||
x509.get_subject().commonName, errno))
|
||||
return False
|
||||
return True
|
||||
|
||||
# Test that the default certificate is used with no SNI host request
|
||||
context = SSL.Context(SSL.SSLv23_METHOD)
|
||||
context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
|
||||
_verify_server_cb)
|
||||
ca_store = context.get_cert_store()
|
||||
ca_store.add_cert(X509.from_cryptography(self.ca_cert))
|
||||
sock = socket.socket()
|
||||
sock = SSL.Connection(context, sock)
|
||||
sock.connect((self.lb_vip_address, 443))
|
||||
# Validate the certificate is signed by the ca_cert we created
|
||||
sock.do_handshake()
|
||||
|
||||
# Test that the SNI1 certificate is used when SNI1 host is specified
|
||||
context = SSL.Context(SSL.TLSv1_METHOD)
|
||||
context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
|
||||
_verify_SNI1_cb)
|
||||
ca_store = context.get_cert_store()
|
||||
ca_store.add_cert(X509.from_cryptography(self.ca_cert))
|
||||
sock = socket.socket()
|
||||
sock = SSL.Connection(context, sock)
|
||||
sock.set_tlsext_host_name(
|
||||
'{}.example.com'.format(self.SNI1_uuid).encode())
|
||||
sock.connect((self.lb_vip_address, 443))
|
||||
# Validate the certificate is signed by the ca_cert we created
|
||||
sock.do_handshake()
|
||||
|
||||
# Test that the default certificate is used when SNI2 host is specified
|
||||
context = SSL.Context(SSL.SSLv23_METHOD)
|
||||
context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
|
||||
_verify_server_cb)
|
||||
ca_store = context.get_cert_store()
|
||||
ca_store.add_cert(X509.from_cryptography(self.ca_cert))
|
||||
sock = socket.socket()
|
||||
sock = SSL.Connection(context, sock)
|
||||
sock.set_tlsext_host_name(
|
||||
'{}.example.com'.format(self.SNI2_uuid).encode())
|
||||
sock.connect((self.lb_vip_address, 443))
|
||||
# Validate the certificate is signed by the ca_cert we created
|
||||
sock.do_handshake()
|
||||
|
||||
# Test that the SNI2 certificate is used with no SNI host request
|
||||
# on listener 2, SNI2 is the default cert for listener 2
|
||||
context = SSL.Context(SSL.SSLv23_METHOD)
|
||||
context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
|
||||
_verify_SNI2_cb)
|
||||
ca_store = context.get_cert_store()
|
||||
ca_store.add_cert(X509.from_cryptography(self.ca_cert))
|
||||
sock = socket.socket()
|
||||
sock = SSL.Connection(context, sock)
|
||||
sock.connect((self.lb_vip_address, 8443))
|
||||
# Validate the certificate is signed by the ca_cert we created
|
||||
sock.do_handshake()
|
||||
|
||||
# Test that the SNI2 certificate is used with listener 1 host request
|
||||
# on listener 2, SNI2 is the default cert for listener 2
|
||||
context = SSL.Context(SSL.SSLv23_METHOD)
|
||||
context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
|
||||
_verify_SNI2_cb)
|
||||
ca_store = context.get_cert_store()
|
||||
ca_store.add_cert(X509.from_cryptography(self.ca_cert))
|
||||
sock = socket.socket()
|
||||
sock = SSL.Connection(context, sock)
|
||||
sock.set_tlsext_host_name(
|
||||
'{}.example.com'.format(self.server_uuid).encode())
|
||||
sock.connect((self.lb_vip_address, 8443))
|
||||
# Validate the certificate is signed by the ca_cert we created
|
||||
sock.do_handshake()
|
||||
|
||||
# Test that the SNI2 certificate is used with SNI1 host request
|
||||
# on listener 2, SNI2 is the default cert for listener 2
|
||||
context = SSL.Context(SSL.SSLv23_METHOD)
|
||||
context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
|
||||
_verify_SNI2_cb)
|
||||
ca_store = context.get_cert_store()
|
||||
ca_store.add_cert(X509.from_cryptography(self.ca_cert))
|
||||
sock = socket.socket()
|
||||
sock = SSL.Connection(context, sock)
|
||||
sock.set_tlsext_host_name(
|
||||
'{}.example.com'.format(self.SNI1_uuid).encode())
|
||||
sock.connect((self.lb_vip_address, 8443))
|
||||
# Validate the certificate is signed by the ca_cert we created
|
||||
sock.do_handshake()
|
||||
|
|
|
@ -861,7 +861,7 @@ class LoadBalancerBaseTestWithCompute(LoadBalancerBaseTest):
|
|||
raise Exception()
|
||||
|
||||
def check_members_balanced(self, vip_address, traffic_member_count=2,
|
||||
protocol='http', verify=True):
|
||||
protocol='http', verify=True, protocol_port=80):
|
||||
handler = requests
|
||||
if CONF.load_balancer.test_reuse_connection:
|
||||
handler = requests.Session()
|
||||
|
@ -875,7 +875,8 @@ class LoadBalancerBaseTestWithCompute(LoadBalancerBaseTest):
|
|||
# Send a number requests to lb vip
|
||||
for i in range(20):
|
||||
try:
|
||||
r = handler.get('{0}://{1}'.format(protocol, vip_address),
|
||||
r = handler.get('{0}://{1}:{2}'.format(protocol, vip_address,
|
||||
protocol_port),
|
||||
timeout=2, verify=verify)
|
||||
|
||||
if r.content in response_counts:
|
||||
|
|
Loading…
Reference in New Issue