diff --git a/kuryr_tempest_plugin/tests/scenario/base.py b/kuryr_tempest_plugin/tests/scenario/base.py index 4f9e16bb..ee67dd7f 100644 --- a/kuryr_tempest_plugin/tests/scenario/base.py +++ b/kuryr_tempest_plugin/tests/scenario/base.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import json +from multiprocessing import pool import time from oslo_log import log as logging @@ -129,11 +130,21 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest): return kuryr_if['versioned_object.data']['id'] - def exec_command_in_pod(self, pod_name, command, namespace="default"): + def exec_command_in_pod(self, pod_name, command, namespace="default", + stderr=False): api = self.k8s_client.CoreV1Api() - return stream(api.connect_get_namespaced_pod_exec, pod_name, namespace, - command=command, stderr=False, stdin=False, stdout=True, - tty=False) + if stderr: + resp = stream(api.connect_get_namespaced_pod_exec, pod_name, + namespace, command=command, stderr=True, + stdin=False, stdout=True, tty=False, + _preload_content=False) + # Run until completion + resp.run_forever() + return resp.read_stdout(), resp.read_stderr() + else: + return stream(api.connect_get_namespaced_pod_exec, pod_name, + namespace, command=command, stderr=False, + stdin=False, stdout=True, tty=False) def assign_fip_to_pod(self, pod_name, namespace="default"): ext_net_id = CONF.network.public_network_id @@ -260,6 +271,7 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest): labels={"app": label}, namespace=namespace) cls.addClassResourceCleanup(cls.delete_pod, pod_name, namespace=namespace) + cls.pod_num = pod_num service_name, service_obj = cls.create_service( pod_label=pod.metadata.labels, spec_type=spec_type, protocol=protocol, namespace=namespace) @@ -322,6 +334,59 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest): 'Number of exclusive responses is incorrect. ' 'Got %s.' % cmd_outputs) + def assert_backend_amount(self, url, amount, headers=None, + repetitions=100, threads=8, request_timeout=5): + def req(): + resp = requests.get(url, headers=headers) + self.assertEqual(requests.codes.OK, resp.status_code, + 'Non-successful request to {}'.format(url)) + return resp + + def pred(tester, responses): + unique_resps = set(resp.content for resp in responses) + tester.assertEqual(amount, len(unique_resps), + 'Incorrect amount of unique backends. ' + 'Got {}'.format(unique_resps)) + + self._run_threaded_and_assert(req, pred, repetitions=repetitions, + threads=threads, + fn_timeout=request_timeout) + + def assert_backend_amount_from_pod(self, url, amount, pod, repetitions=100, + threads=8, request_timeout=7): + def req(): + stdout, stderr = self.exec_command_in_pod( + pod, ['/usr/bin/curl', '-Ss', '-w "\n%{http_code}"', url], + stderr=True) + # check if the curl command succeeded + if stderr: + LOG.error('Failed to curl the service at {}. ' + 'Err: {}'.format(url, stderr)) + raise lib_exc.UnexpectedResponseCode() + delimiter = stdout.rfind('\n') + content = stdout[:delimiter] + status_code = int(stdout[delimiter + 1:].split('"')[0]) + self.assertEqual(requests.codes.OK, status_code, + 'Non-successful request to {}'.format(url)) + return content + + def pred(tester, responses): + unique_resps = set(resp for resp in responses) + tester.assertEqual(amount, len(unique_resps), + 'Incorrect amount of unique backends. ' + 'Got {}'.format(unique_resps)) + + self._run_threaded_and_assert(req, pred, repetitions=repetitions, + threads=threads, + fn_timeout=request_timeout) + + def _run_threaded_and_assert(self, fn, predicate, repetitions=100, + threads=8, fn_timeout=1): + tp = pool.ThreadPool(processes=threads) + results = [tp.apply_async(fn) for _ in range(repetitions)] + resps = [result.get(timeout=fn_timeout) for result in results] + predicate(self, resps) + @classmethod def verify_lbaas_endpoints_configured(cls, ep_name, namespace='default'): cls._verify_endpoints_annotation( diff --git a/kuryr_tempest_plugin/tests/scenario/test_service.py b/kuryr_tempest_plugin/tests/scenario/test_service.py index c1349b39..dd5d800b 100644 --- a/kuryr_tempest_plugin/tests/scenario/test_service.py +++ b/kuryr_tempest_plugin/tests/scenario/test_service.py @@ -11,14 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import shlex -import subprocess - from oslo_log import log as logging from tempest import config from tempest.lib import decorators -from tempest.lib import exceptions as lib_exc from kuryr_tempest_plugin.tests.scenario import base @@ -44,35 +39,17 @@ class TestServiceScenario(base.BaseKuryrScenarioTest): @decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfc1a1a9') def test_service_curl(self): LOG.info("Trying to curl the service IP %s" % self.service_ip) - cmd = "curl -Ss {dst_ip}".format(dst_ip=self.service_ip) - - def curl(): - try: - return subprocess.check_output(shlex.split(cmd)) - except subprocess.CalledProcessError: - LOG.error("Checking output of curl to the service IP %s " - "failed" % self.service_ip) - raise lib_exc.UnexpectedResponseCode() - - self._run_and_assert_fn(curl) + self.assert_backend_amount('http://{}'.format(self.service_ip), + self.pod_num) @decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfa1a7a9') def test_pod_service_curl(self): pod_name, pod = self.create_pod() self.addCleanup(self.delete_pod, pod_name) - cmd = [ - "/bin/sh", "-c", "curl -Ss {dst_ip}".format( - dst_ip=self.service_ip)] - - def curl(): - output = self.exec_command_in_pod(pod_name, cmd) - # check if the curl command succeeded - if not output: - LOG.error("Curl the service IP %s failed" % self.service_ip) - raise lib_exc.UnexpectedResponseCode() - return output - - self._run_and_assert_fn(curl) + self.assert_backend_amount_from_pod( + 'http://{}'.format(self.service_ip), + self.pod_num, + pod_name) class TestLoadBalancerServiceScenario(base.BaseKuryrScenarioTest): @@ -94,17 +71,10 @@ class TestLoadBalancerServiceScenario(base.BaseKuryrScenarioTest): def test_lb_service_http(self): LOG.info("Trying to curl the service IP %s" % self.service_ip) - cmd = "curl -Ss {dst_ip}".format(dst_ip=self.service_ip) - - def curl(): - try: - return subprocess.check_output(shlex.split(cmd)) - except subprocess.CalledProcessError: - LOG.error("Checking output of curl to the service IP %s " - "failed" % self.service_ip) - raise lib_exc.UnexpectedResponseCode() - self._run_and_assert_fn(curl) + self.assert_backend_amount('http://{}'.format(self.service_ip), + self.pod_num) + # TODO(yboaron): Use multi threads for 'test_vm_service_http' test @decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfa1b5a9') def test_vm_service_http(self): ssh_client, fip = self.create_vm_for_connectivity_test()