diff --git a/kuryr_tempest_plugin/tests/scenario/base.py b/kuryr_tempest_plugin/tests/scenario/base.py index a2a4bc15..b236bade 100644 --- a/kuryr_tempest_plugin/tests/scenario/base.py +++ b/kuryr_tempest_plugin/tests/scenario/base.py @@ -13,8 +13,10 @@ # limitations under the License. import six.moves +from functools import partial import json from multiprocessing import pool +import socket import time from oslo_log import log as logging @@ -348,25 +350,84 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest): raise lib_exc.ServerFault() @classmethod - def wait_service_status(cls, service_ip, timeout_period): - session = requests.Session() + def _verify_connectivity(cls, dest_ip, timeout_period, protocol, port): + udp_client_sock = None + + def verify_tcp(dest_ip, port, session): + try: + session.get("http://{0}:{1}".format(dest_ip, port), + timeout=2) + except Exception: + return False + return True + + def verify_udp(dest_ip, port, udp_client_sock): + udp_client_sock.sendto("Hi Server, howRU?", (dest_ip, port)) + try: + udp_client_sock.recvfrom(512) + except socket.timeout: + return False + return True + + if protocol == "TCP": + session = requests.Session() + iter_func = partial(verify_tcp, session=session) + elif protocol == "UDP": + udp_client_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + udp_client_sock.settimeout(2.0) + iter_func = partial(verify_udp, udp_client_sock=udp_client_sock) + else: + LOG.warning("Unsupported protocol %s, returning", protocol) + return False + start = time.time() while time.time() - start < timeout_period: - try: - time.sleep(5) - session.get("http://{0}".format(service_ip), timeout=2) + time.sleep(5) + if iter_func(dest_ip, port): + return True + LOG.warning('No initial traffic is passing through.') + LOG.error("Can't connect to %s:%d", dest_ip, port) + return False - return - except Exception: - LOG.warning('No initial traffic is passing through.') - time.sleep(5) - LOG.error( - "Traffic didn't pass within the period of %s" % timeout_period) - raise lib_exc.ServerFault() + @classmethod + def wait_service_status(cls, service_ip, timeout_period, + protocol="TCP", port=80): + if cls._verify_connectivity(service_ip, timeout_period, + protocol, port): + LOG.info('Service responding...') + else: + LOG.error("Can't connect service's IP %s", service_ip) + raise lib_exc.ServerFault() + + @classmethod + def wait_ep_members_status(cls, ep_name, namespace, timeout_period): + num_of_be = 0 + ep = cls.k8s_client.CoreV1Api().read_namespaced_endpoints( + ep_name, namespace) + try: + subset = ep.subsets[0] + subset_ports = subset.ports[0] + for subset_address in subset.addresses: + num_of_be += 1 + LOG.info('Verifying connectivity for EP backend: %s:%d; ' + 'prot=%s', subset_address.ip, subset_ports.port, + subset_ports.protocol) + if cls._verify_connectivity(subset_address.ip, timeout_period, + subset_ports.protocol, + subset_ports.port): + LOG.info('EP member %s responding...', subset_address.ip) + else: + LOG.error("Can't connect to EP member %s", + subset_address.ip) + raise lib_exc.ServerFault() + except Exception: + return 0 + return num_of_be @classmethod def create_setup_for_service_test(cls, pod_num=2, spec_type="ClusterIP", - protocol="TCP", label=None, + protocol="TCP", port=80, + target_port=8080, label=None, namespace="default", get_ip=True, service_name=None): @@ -379,14 +440,21 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest): cls.pod_num = pod_num service_name, service_obj = cls.create_service( pod_label=pod.metadata.labels, spec_type=spec_type, - protocol=protocol, namespace=namespace, service_name=service_name) + protocol=protocol, port=port, target_port=target_port, + namespace=namespace, service_name=service_name) if get_ip: cls.service_ip = cls.get_service_ip( service_name, spec_type=spec_type, namespace=namespace) cls.verify_lbaas_endpoints_configured(service_name, pod_num) cls.service_name = service_name - cls.wait_service_status( - cls.service_ip, CONF.kuryr_kubernetes.lb_build_timeout) + cls.wait_service_status(cls.service_ip, + CONF.kuryr_kubernetes.lb_build_timeout, + protocol, port) + if pod_num != cls.wait_ep_members_status( + cls.service_name, namespace, + CONF.kuryr_kubernetes.lb_build_timeout): + LOG.error("Actual EP backend num != pod_num") + raise lib_exc.ServerFault() cls.addClassResourceCleanup(cls.delete_service, service_name, namespace=namespace) @@ -462,20 +530,52 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest): 'Number of exclusive responses is incorrect. ' 'Got %s.' % cmd_outputs) - def assert_backend_amount(self, url, amount, headers=None, - repetitions=100, threads=8, request_timeout=5): - def req(): + def assert_backend_amount(self, server_ip, amount, server_port=None, + protocol="TCP", headers=None, repetitions=100, + threads=8, request_timeout=5): + def req_tcp(): resp = requests.get(url, headers=headers) self.assertEqual(requests.codes.OK, resp.status_code, 'Non-successful request to {}'.format(url)) return resp + def req_udp(): + # FIXME(yboaron): Current Octavia implementation doesn't + # round-robin UDP pool as expected, to work-around that + # a new socket (new local UDP port) is allocated per request. + udp_client_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + udp_client_sock.settimeout(3.0) + udp_client_sock.sendto("Hi Server, howRU?", + (server_ip, server_port)) + try: + data, addr = udp_client_sock.recvfrom(1024) + except Exception: + # NOTE(yboaron): for UDP (unlike TCP) not getting reply from + # the server is a valid use case. + return None + return data + def pred(tester, responses): - unique_resps = set(resp.content for resp in responses) + if protocol == 'TCP': + unique_resps = set(resp.content for resp in responses) + else: + unique_resps = set(resp for resp in responses if resp + is not None) tester.assertEqual(amount, len(unique_resps), 'Incorrect amount of unique backends. ' 'Got {}'.format(unique_resps)) + if protocol == 'TCP': + url = 'http://{}'.format(server_ip) + req = req_tcp + elif protocol == "UDP": + self.assertIsNotNone(server_port, "server_port must be " + "provided for UDP protocol") + req = req_udp + else: + LOG.info("Unsupported protocol %s, returning", protocol) + return + self._run_threaded_and_assert(req, pred, repetitions=repetitions, threads=threads, fn_timeout=request_timeout) diff --git a/kuryr_tempest_plugin/tests/scenario/test_service.py b/kuryr_tempest_plugin/tests/scenario/test_service.py index dd5d800b..c2baa561 100644 --- a/kuryr_tempest_plugin/tests/scenario/test_service.py +++ b/kuryr_tempest_plugin/tests/scenario/test_service.py @@ -39,8 +39,7 @@ class TestServiceScenario(base.BaseKuryrScenarioTest): @decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfc1a1a9') def test_service_curl(self): LOG.info("Trying to curl the service IP %s" % self.service_ip) - self.assert_backend_amount('http://{}'.format(self.service_ip), - self.pod_num) + self.assert_backend_amount(self.service_ip, self.pod_num) @decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfa1a7a9') def test_pod_service_curl(self): @@ -71,8 +70,7 @@ class TestLoadBalancerServiceScenario(base.BaseKuryrScenarioTest): def test_lb_service_http(self): LOG.info("Trying to curl the service IP %s" % self.service_ip) - self.assert_backend_amount('http://{}'.format(self.service_ip), - self.pod_num) + self.assert_backend_amount(self.service_ip, self.pod_num) # TODO(yboaron): Use multi threads for 'test_vm_service_http' test @decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfa1b5a9')