Use ThreadPool and requests instead of subprocess

This way in less than a second we can check 100 times the service
and reduce the test flakiness.

In addition this patch also gets pod execution error
information, curl -Ss displays errors on stderr but the
streaming facility was swallowing them.

This patch adds the following:

* stderr for pod execution,
* status_code for pod execution,
* refactoring of service backend utility functions.

Change-Id: Ide6674c250cb2a7300ef8a77648f77e0d9c589bc
Signed-off-by: Antoni Segura Puimedon <celebdor@gmail.com>
This commit is contained in:
Antoni Segura Puimedon 2018-08-13 15:53:32 +02:00 committed by Yossi Boaron
parent f510c8411d
commit 177dab22a2
2 changed files with 78 additions and 43 deletions

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from multiprocessing import pool
import time
from oslo_log import log as logging
@ -129,11 +130,21 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
return kuryr_if['versioned_object.data']['id']
def exec_command_in_pod(self, pod_name, command, namespace="default"):
def exec_command_in_pod(self, pod_name, command, namespace="default",
stderr=False):
api = self.k8s_client.CoreV1Api()
return stream(api.connect_get_namespaced_pod_exec, pod_name, namespace,
command=command, stderr=False, stdin=False, stdout=True,
tty=False)
if stderr:
resp = stream(api.connect_get_namespaced_pod_exec, pod_name,
namespace, command=command, stderr=True,
stdin=False, stdout=True, tty=False,
_preload_content=False)
# Run until completion
resp.run_forever()
return resp.read_stdout(), resp.read_stderr()
else:
return stream(api.connect_get_namespaced_pod_exec, pod_name,
namespace, command=command, stderr=False,
stdin=False, stdout=True, tty=False)
def assign_fip_to_pod(self, pod_name, namespace="default"):
ext_net_id = CONF.network.public_network_id
@ -260,6 +271,7 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
labels={"app": label}, namespace=namespace)
cls.addClassResourceCleanup(cls.delete_pod, pod_name,
namespace=namespace)
cls.pod_num = pod_num
service_name, service_obj = cls.create_service(
pod_label=pod.metadata.labels, spec_type=spec_type,
protocol=protocol, namespace=namespace)
@ -322,6 +334,59 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
'Number of exclusive responses is incorrect. '
'Got %s.' % cmd_outputs)
def assert_backend_amount(self, url, amount, headers=None,
repetitions=100, threads=8, request_timeout=5):
def req():
resp = requests.get(url, headers=headers)
self.assertEqual(requests.codes.OK, resp.status_code,
'Non-successful request to {}'.format(url))
return resp
def pred(tester, responses):
unique_resps = set(resp.content for resp in responses)
tester.assertEqual(amount, len(unique_resps),
'Incorrect amount of unique backends. '
'Got {}'.format(unique_resps))
self._run_threaded_and_assert(req, pred, repetitions=repetitions,
threads=threads,
fn_timeout=request_timeout)
def assert_backend_amount_from_pod(self, url, amount, pod, repetitions=100,
threads=8, request_timeout=7):
def req():
stdout, stderr = self.exec_command_in_pod(
pod, ['/usr/bin/curl', '-Ss', '-w "\n%{http_code}"', url],
stderr=True)
# check if the curl command succeeded
if stderr:
LOG.error('Failed to curl the service at {}. '
'Err: {}'.format(url, stderr))
raise lib_exc.UnexpectedResponseCode()
delimiter = stdout.rfind('\n')
content = stdout[:delimiter]
status_code = int(stdout[delimiter + 1:].split('"')[0])
self.assertEqual(requests.codes.OK, status_code,
'Non-successful request to {}'.format(url))
return content
def pred(tester, responses):
unique_resps = set(resp for resp in responses)
tester.assertEqual(amount, len(unique_resps),
'Incorrect amount of unique backends. '
'Got {}'.format(unique_resps))
self._run_threaded_and_assert(req, pred, repetitions=repetitions,
threads=threads,
fn_timeout=request_timeout)
def _run_threaded_and_assert(self, fn, predicate, repetitions=100,
threads=8, fn_timeout=1):
tp = pool.ThreadPool(processes=threads)
results = [tp.apply_async(fn) for _ in range(repetitions)]
resps = [result.get(timeout=fn_timeout) for result in results]
predicate(self, resps)
@classmethod
def verify_lbaas_endpoints_configured(cls, ep_name, namespace='default'):
cls._verify_endpoints_annotation(

View File

@ -11,14 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import shlex
import subprocess
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from kuryr_tempest_plugin.tests.scenario import base
@ -44,35 +39,17 @@ class TestServiceScenario(base.BaseKuryrScenarioTest):
@decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfc1a1a9')
def test_service_curl(self):
LOG.info("Trying to curl the service IP %s" % self.service_ip)
cmd = "curl -Ss {dst_ip}".format(dst_ip=self.service_ip)
def curl():
try:
return subprocess.check_output(shlex.split(cmd))
except subprocess.CalledProcessError:
LOG.error("Checking output of curl to the service IP %s "
"failed" % self.service_ip)
raise lib_exc.UnexpectedResponseCode()
self._run_and_assert_fn(curl)
self.assert_backend_amount('http://{}'.format(self.service_ip),
self.pod_num)
@decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfa1a7a9')
def test_pod_service_curl(self):
pod_name, pod = self.create_pod()
self.addCleanup(self.delete_pod, pod_name)
cmd = [
"/bin/sh", "-c", "curl -Ss {dst_ip}".format(
dst_ip=self.service_ip)]
def curl():
output = self.exec_command_in_pod(pod_name, cmd)
# check if the curl command succeeded
if not output:
LOG.error("Curl the service IP %s failed" % self.service_ip)
raise lib_exc.UnexpectedResponseCode()
return output
self._run_and_assert_fn(curl)
self.assert_backend_amount_from_pod(
'http://{}'.format(self.service_ip),
self.pod_num,
pod_name)
class TestLoadBalancerServiceScenario(base.BaseKuryrScenarioTest):
@ -94,17 +71,10 @@ class TestLoadBalancerServiceScenario(base.BaseKuryrScenarioTest):
def test_lb_service_http(self):
LOG.info("Trying to curl the service IP %s" % self.service_ip)
cmd = "curl -Ss {dst_ip}".format(dst_ip=self.service_ip)
def curl():
try:
return subprocess.check_output(shlex.split(cmd))
except subprocess.CalledProcessError:
LOG.error("Checking output of curl to the service IP %s "
"failed" % self.service_ip)
raise lib_exc.UnexpectedResponseCode()
self._run_and_assert_fn(curl)
self.assert_backend_amount('http://{}'.format(self.service_ip),
self.pod_num)
# TODO(yboaron): Use multi threads for 'test_vm_service_http' test
@decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfa1b5a9')
def test_vm_service_http(self):
ssh_client, fip = self.create_vm_for_connectivity_test()