diff --git a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py index 25f741a3..0fe1d818 100644 --- a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py +++ b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py @@ -52,6 +52,42 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute): raise cls.skipException('TLS with Barbican tests require the ' 'barbican service.') + @classmethod + def _generate_load_certificate(cls, barbican_mgr, ca_cert, ca_key, name): + new_cert, new_key = cert_utils.generate_server_cert_and_key( + ca_cert, ca_key, name) + + LOG.debug('%s Cert: %s', name, new_cert.public_bytes( + serialization.Encoding.PEM)) + LOG.debug('%s private Key: %s', name, new_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption())) + new_public_key = new_key.public_key() + LOG.debug('%s public Key: %s', name, new_public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo)) + + # Create the pkcs12 bundle + pkcs12 = cert_utils.generate_pkcs12_bundle(new_cert, new_key) + LOG.debug('%s PKCS12 bundle: %s', name, base64.b64encode(pkcs12)) + + new_secret_ref = barbican_mgr.store_secret(pkcs12) + cls.addClassResourceCleanup(barbican_mgr.delete_secret, new_secret_ref) + + # Set the barbican ACL if the Octavia API version doesn't do it + # automatically. + if not cls.mem_lb_client.is_version_supported( + cls.api_version, '2.1'): + user_list = cls.os_admin.users_v3_client.list_users( + name=CONF.load_balancer.octavia_svc_username) + msg = 'Only one user named "{0}" should exist, {1} found.'.format( + CONF.load_balancer.octavia_svc_username, + len(user_list['users'])) + assert 1 == len(user_list['users']), msg + barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id']) + return new_cert, new_key, new_secret_ref + @classmethod def resource_setup(cls): """Setup resources needed by the tests.""" @@ -70,45 +106,35 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute): encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo)) - # Create a server cert and key - cls.server_uuid = uuidutils.generate_uuid() - server_cert, server_key = cert_utils.generate_server_cert_and_key( - cls.ca_cert, ca_key, cls.server_uuid) - - LOG.debug('Server Cert: %s' % server_cert.public_bytes( - serialization.Encoding.PEM)) - LOG.debug('Server private Key: %s' % server_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption())) - server_public_key = server_key.public_key() - LOG.debug('Server public Key: %s' % server_public_key.public_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo)) - - # Create the pkcs12 bundle - pkcs12 = cert_utils.generate_pkcs12_bundle(server_cert, server_key) - LOG.debug('Server PKCS12 bundle: %s' % base64.b64encode(pkcs12)) - # Load the secret into the barbican service under the # os_roles_lb_member tenant barbican_mgr = barbican_client_mgr.BarbicanClientManager( cls.os_roles_lb_member) - cls.secret_ref = barbican_mgr.store_secret(pkcs12) - cls.addClassResourceCleanup(barbican_mgr.delete_secret, cls.secret_ref) + # Create a server cert and key + # This will be used as the "default certificate" in SNI tests. + cls.server_uuid = uuidutils.generate_uuid() + LOG.debug('Server (default) UUID: %s' % cls.server_uuid) - # Set the barbican ACL if the Octavia API version doesn't do it - # automatically. - if not cls.mem_lb_client.is_version_supported( - cls.api_version, '2.1'): - user_list = cls.os_admin.users_v3_client.list_users( - name=CONF.load_balancer.octavia_svc_username) - msg = 'Only one user named "{0}" should exist, {1} found.'.format( - CONF.load_balancer.octavia_svc_username, - len(user_list['users'])) - assert 1 == len(user_list['users']), msg - barbican_mgr.add_acl(cls.secret_ref, user_list['users'][0]['id']) + server_cert, server_key, cls.server_secret_ref = ( + cls._generate_load_certificate(barbican_mgr, cls.ca_cert, + ca_key, cls.server_uuid)) + + # Create the SNI1 cert and key + cls.SNI1_uuid = uuidutils.generate_uuid() + LOG.debug('SNI1 UUID: %s' % cls.SNI1_uuid) + + SNI1_cert, SNI1_key, cls.SNI1_secret_ref = ( + cls._generate_load_certificate(barbican_mgr, cls.ca_cert, + ca_key, cls.SNI1_uuid)) + + # Create the SNI2 cert and key + cls.SNI2_uuid = uuidutils.generate_uuid() + LOG.debug('SNI2 UUID: %s' % cls.SNI2_uuid) + + SNI2_cert, SNI2_key, cls.SNI2_secret_ref = ( + cls._generate_load_certificate(barbican_mgr, cls.ca_cert, + ca_key, cls.SNI2_uuid)) # Setup a load balancer for the tests to use lb_name = data_utils.rand_name("lb_member_lb1-tls") @@ -224,7 +250,7 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute): const.PROTOCOL_PORT: '443', const.LOADBALANCER_ID: self.lb_id, const.DEFAULT_POOL_ID: self.pool_id, - const.DEFAULT_TLS_CONTAINER_REF: self.secret_ref, + const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref, } listener = self.mem_listener_client.create_listener(**listener_kwargs) self.listener_id = listener[const.ID] @@ -242,7 +268,7 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute): # Test HTTPS listener load balancing. # Note: certificate validation tests will follow this test self.check_members_balanced(self.lb_vip_address, protocol='https', - verify=False) + verify=False, protocol_port=443) def _verify_cb(connection, x509, errno, errdepth, retcode): """Callback for certificate validation.""" @@ -250,9 +276,17 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute): if errdepth != 0: return True if errno == 0: + received_cn = x509.get_subject().commonName + received_name = self._get_cert_name(received_cn) + expected_cn = '{}.example.com'.format(self.server_uuid) + msg = ('ERROR: Received certificate "{received_name}" with CN ' + '{received_cn} is not the expected certificate ' + '"default" with CN {expected_cn}.'.format( + received_name=received_name, + received_cn=received_cn, + expected_cn=expected_cn)) # Make sure the certificate is the one we generated - self.assertEqual('{}.example.com'.format(self.server_uuid), - x509.get_subject().commonName) + self.assertEqual(expected_cn, received_cn, message=msg) else: LOG.error('Certificate with CN: {0} failed validation with ' 'OpenSSL verify errno {1}'.format( @@ -270,3 +304,317 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute): sock.connect((self.lb_vip_address, 443)) # Validate the certificate is signed by the ca_cert we created sock.do_handshake() + + @decorators.idempotent_id('08405802-4411-4454-b008-8607408f424a') + def test_basic_tls_SNI_traffic(self): + + listener_name = data_utils.rand_name("lb_member_listener1-tls-sni") + listener_kwargs = { + const.NAME: listener_name, + const.PROTOCOL: const.TERMINATED_HTTPS, + const.PROTOCOL_PORT: '443', + const.LOADBALANCER_ID: self.lb_id, + const.DEFAULT_POOL_ID: self.pool_id, + const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref, + const.SNI_CONTAINER_REFS: [self.SNI1_secret_ref, + self.SNI2_secret_ref], + } + listener = self.mem_listener_client.create_listener(**listener_kwargs) + self.listener_id = listener[const.ID] + self.addCleanup( + self.mem_listener_client.cleanup_listener, + self.listener_id, + lb_client=self.mem_lb_client, lb_id=self.lb_id) + + waiters.wait_for_status(self.mem_lb_client.show_loadbalancer, + self.lb_id, const.PROVISIONING_STATUS, + const.ACTIVE, + CONF.load_balancer.build_interval, + CONF.load_balancer.build_timeout) + + # Test HTTPS listener load balancing. + # Note: certificate validation tests will follow this test + self.check_members_balanced(self.lb_vip_address, protocol='https', + verify=False, protocol_port=443) + + def _verify_server_cb(connection, x509, errno, errdepth, retcode): + return _verify_cb(connection, x509, errno, errdepth, retcode, + name=self.server_uuid) + + def _verify_SNI1_cb(connection, x509, errno, errdepth, retcode): + return _verify_cb(connection, x509, errno, errdepth, retcode, + name=self.SNI1_uuid) + + def _verify_SNI2_cb(connection, x509, errno, errdepth, retcode): + return _verify_cb(connection, x509, errno, errdepth, retcode, + name=self.SNI2_uuid) + + def _verify_cb(connection, x509, errno, errdepth, retcode, name): + """Callback for certificate validation.""" + # don't validate names of root certificates + if errdepth != 0: + return True + if errno == 0: + received_cn = x509.get_subject().commonName + received_name = self._get_cert_name(received_cn) + expected_cn = '{}.example.com'.format(name) + expected_name = self._get_cert_name(name) + msg = ('ERROR: Received certificate "{received_name}" with CN ' + '{received_cn} is not the expected certificate ' + '"{expected_name}" with CN {expected_cn}.'.format( + received_name=received_name, + received_cn=received_cn, + expected_name=expected_name, + expected_cn=expected_cn)) + # Make sure the certificate is the one we generated + self.assertEqual(expected_cn, received_cn, message=msg) + else: + LOG.error('Certificate with CN: {0} failed validation with ' + 'OpenSSL verify errno {1}'.format( + x509.get_subject().commonName, errno)) + return False + return True + + # Test that the default certificate is used with no SNI host request + context = SSL.Context(SSL.SSLv23_METHOD) + context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, + _verify_server_cb) + ca_store = context.get_cert_store() + ca_store.add_cert(X509.from_cryptography(self.ca_cert)) + sock = socket.socket() + sock = SSL.Connection(context, sock) + sock.connect((self.lb_vip_address, 443)) + # Validate the certificate is signed by the ca_cert we created + sock.do_handshake() + + # Test that the default certificate is used with bogus SNI host request + context = SSL.Context(SSL.TLSv1_METHOD) + context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, + _verify_server_cb) + ca_store = context.get_cert_store() + ca_store.add_cert(X509.from_cryptography(self.ca_cert)) + sock = socket.socket() + sock = SSL.Connection(context, sock) + sock.set_tlsext_host_name('bogus.example.com'.encode()) + sock.connect((self.lb_vip_address, 443)) + # Validate the certificate is signed by the ca_cert we created + sock.do_handshake() + + # Test that the SNI1 certificate is used when SNI1 host is specified + context = SSL.Context(SSL.TLSv1_METHOD) + context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, + _verify_SNI1_cb) + ca_store = context.get_cert_store() + ca_store.add_cert(X509.from_cryptography(self.ca_cert)) + sock = socket.socket() + sock = SSL.Connection(context, sock) + sock.set_tlsext_host_name( + '{}.example.com'.format(self.SNI1_uuid).encode()) + sock.connect((self.lb_vip_address, 443)) + # Validate the certificate is signed by the ca_cert we created + sock.do_handshake() + + # Test that the SNI2 certificate is used when SNI2 host is specified + context = SSL.Context(SSL.SSLv23_METHOD) + context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, + _verify_SNI2_cb) + ca_store = context.get_cert_store() + ca_store.add_cert(X509.from_cryptography(self.ca_cert)) + sock = socket.socket() + sock = SSL.Connection(context, sock) + sock.set_tlsext_host_name( + '{}.example.com'.format(self.SNI2_uuid).encode()) + sock.connect((self.lb_vip_address, 443)) + # Validate the certificate is signed by the ca_cert we created + sock.do_handshake() + + def _get_cert_name(self, lookup_string): + if self.server_uuid in lookup_string: + return 'default' + elif self.SNI1_uuid in lookup_string: + return 'SNI1' + elif self.SNI2_uuid in lookup_string: + return 'SNI2' + else: + return 'Unknown' + + @decorators.idempotent_id('bfac9bf4-8cd0-4519-8d99-5ad0c75abf5c') + def test_basic_tls_SNI_multi_listener_traffic(self): + """Make sure certificates are only used on the correct listeners.""" + + listener_name = data_utils.rand_name("lb_member_listener1-tls-sni") + listener_kwargs = { + const.NAME: listener_name, + const.PROTOCOL: const.TERMINATED_HTTPS, + const.PROTOCOL_PORT: '443', + const.LOADBALANCER_ID: self.lb_id, + const.DEFAULT_POOL_ID: self.pool_id, + const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref, + const.SNI_CONTAINER_REFS: [self.SNI1_secret_ref], + } + listener = self.mem_listener_client.create_listener(**listener_kwargs) + self.listener_id = listener[const.ID] + self.addCleanup( + self.mem_listener_client.cleanup_listener, + self.listener_id, + lb_client=self.mem_lb_client, lb_id=self.lb_id) + + waiters.wait_for_status(self.mem_lb_client.show_loadbalancer, + self.lb_id, const.PROVISIONING_STATUS, + const.ACTIVE, + CONF.load_balancer.build_interval, + CONF.load_balancer.build_timeout) + + # Test HTTPS listener load balancing. + # Note: certificate validation tests will follow this test + self.check_members_balanced(self.lb_vip_address, protocol='https', + verify=False, protocol_port=443) + + listener2_name = data_utils.rand_name("lb_member_listener2-tls-sni") + listener2_kwargs = { + const.NAME: listener2_name, + const.PROTOCOL: const.TERMINATED_HTTPS, + const.PROTOCOL_PORT: '8443', + const.LOADBALANCER_ID: self.lb_id, + const.DEFAULT_POOL_ID: self.pool_id, + const.DEFAULT_TLS_CONTAINER_REF: self.SNI2_secret_ref, + } + listener2 = self.mem_listener_client.create_listener( + **listener2_kwargs) + self.listener2_id = listener2[const.ID] + self.addCleanup( + self.mem_listener_client.cleanup_listener, + self.listener2_id, + lb_client=self.mem_lb_client, lb_id=self.lb_id) + + waiters.wait_for_status(self.mem_lb_client.show_loadbalancer, + self.lb_id, const.PROVISIONING_STATUS, + const.ACTIVE, + CONF.load_balancer.build_interval, + CONF.load_balancer.build_timeout) + + # Test HTTPS listener load balancing. + # Note: certificate validation tests will follow this test + self.check_members_balanced(self.lb_vip_address, protocol='https', + verify=False, protocol_port=8443) + + def _verify_server_cb(connection, x509, errno, errdepth, retcode): + return _verify_cb(connection, x509, errno, errdepth, retcode, + name=self.server_uuid) + + def _verify_SNI1_cb(connection, x509, errno, errdepth, retcode): + return _verify_cb(connection, x509, errno, errdepth, retcode, + name=self.SNI1_uuid) + + def _verify_SNI2_cb(connection, x509, errno, errdepth, retcode): + return _verify_cb(connection, x509, errno, errdepth, retcode, + name=self.SNI2_uuid) + + def _verify_cb(connection, x509, errno, errdepth, retcode, name): + """Callback for certificate validation.""" + # don't validate names of root certificates + if errdepth != 0: + return True + if errno == 0: + received_cn = x509.get_subject().commonName + received_name = self._get_cert_name(received_cn) + expected_cn = '{}.example.com'.format(name) + expected_name = self._get_cert_name(name) + msg = ('ERROR: Received certificate "{received_name}" with CN ' + '{received_cn} is not the expected certificate ' + '"{expected_name}" with CN {expected_cn}.'.format( + received_name=received_name, + received_cn=received_cn, + expected_name=expected_name, + expected_cn=expected_cn)) + # Make sure the certificate is the one we generated + self.assertEqual(expected_cn, received_cn, message=msg) + else: + LOG.error('Certificate with CN: {0} failed validation with ' + 'OpenSSL verify errno {1}'.format( + x509.get_subject().commonName, errno)) + return False + return True + + # Test that the default certificate is used with no SNI host request + context = SSL.Context(SSL.SSLv23_METHOD) + context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, + _verify_server_cb) + ca_store = context.get_cert_store() + ca_store.add_cert(X509.from_cryptography(self.ca_cert)) + sock = socket.socket() + sock = SSL.Connection(context, sock) + sock.connect((self.lb_vip_address, 443)) + # Validate the certificate is signed by the ca_cert we created + sock.do_handshake() + + # Test that the SNI1 certificate is used when SNI1 host is specified + context = SSL.Context(SSL.TLSv1_METHOD) + context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, + _verify_SNI1_cb) + ca_store = context.get_cert_store() + ca_store.add_cert(X509.from_cryptography(self.ca_cert)) + sock = socket.socket() + sock = SSL.Connection(context, sock) + sock.set_tlsext_host_name( + '{}.example.com'.format(self.SNI1_uuid).encode()) + sock.connect((self.lb_vip_address, 443)) + # Validate the certificate is signed by the ca_cert we created + sock.do_handshake() + + # Test that the default certificate is used when SNI2 host is specified + context = SSL.Context(SSL.SSLv23_METHOD) + context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, + _verify_server_cb) + ca_store = context.get_cert_store() + ca_store.add_cert(X509.from_cryptography(self.ca_cert)) + sock = socket.socket() + sock = SSL.Connection(context, sock) + sock.set_tlsext_host_name( + '{}.example.com'.format(self.SNI2_uuid).encode()) + sock.connect((self.lb_vip_address, 443)) + # Validate the certificate is signed by the ca_cert we created + sock.do_handshake() + + # Test that the SNI2 certificate is used with no SNI host request + # on listener 2, SNI2 is the default cert for listener 2 + context = SSL.Context(SSL.SSLv23_METHOD) + context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, + _verify_SNI2_cb) + ca_store = context.get_cert_store() + ca_store.add_cert(X509.from_cryptography(self.ca_cert)) + sock = socket.socket() + sock = SSL.Connection(context, sock) + sock.connect((self.lb_vip_address, 8443)) + # Validate the certificate is signed by the ca_cert we created + sock.do_handshake() + + # Test that the SNI2 certificate is used with listener 1 host request + # on listener 2, SNI2 is the default cert for listener 2 + context = SSL.Context(SSL.SSLv23_METHOD) + context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, + _verify_SNI2_cb) + ca_store = context.get_cert_store() + ca_store.add_cert(X509.from_cryptography(self.ca_cert)) + sock = socket.socket() + sock = SSL.Connection(context, sock) + sock.set_tlsext_host_name( + '{}.example.com'.format(self.server_uuid).encode()) + sock.connect((self.lb_vip_address, 8443)) + # Validate the certificate is signed by the ca_cert we created + sock.do_handshake() + + # Test that the SNI2 certificate is used with SNI1 host request + # on listener 2, SNI2 is the default cert for listener 2 + context = SSL.Context(SSL.SSLv23_METHOD) + context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, + _verify_SNI2_cb) + ca_store = context.get_cert_store() + ca_store.add_cert(X509.from_cryptography(self.ca_cert)) + sock = socket.socket() + sock = SSL.Connection(context, sock) + sock.set_tlsext_host_name( + '{}.example.com'.format(self.SNI1_uuid).encode()) + sock.connect((self.lb_vip_address, 8443)) + # Validate the certificate is signed by the ca_cert we created + sock.do_handshake() diff --git a/octavia_tempest_plugin/tests/test_base.py b/octavia_tempest_plugin/tests/test_base.py index c8f7954b..5033ade6 100644 --- a/octavia_tempest_plugin/tests/test_base.py +++ b/octavia_tempest_plugin/tests/test_base.py @@ -861,7 +861,7 @@ class LoadBalancerBaseTestWithCompute(LoadBalancerBaseTest): raise Exception() def check_members_balanced(self, vip_address, traffic_member_count=2, - protocol='http', verify=True): + protocol='http', verify=True, protocol_port=80): handler = requests if CONF.load_balancer.test_reuse_connection: handler = requests.Session() @@ -875,7 +875,8 @@ class LoadBalancerBaseTestWithCompute(LoadBalancerBaseTest): # Send a number requests to lb vip for i in range(20): try: - r = handler.get('{0}://{1}'.format(protocol, vip_address), + r = handler.get('{0}://{1}:{2}'.format(protocol, vip_address, + protocol_port), timeout=2, verify=verify) if r.content in response_counts: