Use ThreadPool and requests instead of subprocess
This way in less than a second we can check 100 times the service and reduce the test flakiness. In addition this patch also gets pod execution error information, curl -Ss displays errors on stderr but the streaming facility was swallowing them. This patch adds the following: * stderr for pod execution, * status_code for pod execution, * refactoring of service backend utility functions. Change-Id: Ide6674c250cb2a7300ef8a77648f77e0d9c589bc Signed-off-by: Antoni Segura Puimedon <celebdor@gmail.com>
This commit is contained in:
parent
f510c8411d
commit
177dab22a2
|
@ -12,6 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
from multiprocessing import pool
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
@ -129,11 +130,21 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
|
|||
|
||||
return kuryr_if['versioned_object.data']['id']
|
||||
|
||||
def exec_command_in_pod(self, pod_name, command, namespace="default"):
|
||||
def exec_command_in_pod(self, pod_name, command, namespace="default",
|
||||
stderr=False):
|
||||
api = self.k8s_client.CoreV1Api()
|
||||
return stream(api.connect_get_namespaced_pod_exec, pod_name, namespace,
|
||||
command=command, stderr=False, stdin=False, stdout=True,
|
||||
tty=False)
|
||||
if stderr:
|
||||
resp = stream(api.connect_get_namespaced_pod_exec, pod_name,
|
||||
namespace, command=command, stderr=True,
|
||||
stdin=False, stdout=True, tty=False,
|
||||
_preload_content=False)
|
||||
# Run until completion
|
||||
resp.run_forever()
|
||||
return resp.read_stdout(), resp.read_stderr()
|
||||
else:
|
||||
return stream(api.connect_get_namespaced_pod_exec, pod_name,
|
||||
namespace, command=command, stderr=False,
|
||||
stdin=False, stdout=True, tty=False)
|
||||
|
||||
def assign_fip_to_pod(self, pod_name, namespace="default"):
|
||||
ext_net_id = CONF.network.public_network_id
|
||||
|
@ -260,6 +271,7 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
|
|||
labels={"app": label}, namespace=namespace)
|
||||
cls.addClassResourceCleanup(cls.delete_pod, pod_name,
|
||||
namespace=namespace)
|
||||
cls.pod_num = pod_num
|
||||
service_name, service_obj = cls.create_service(
|
||||
pod_label=pod.metadata.labels, spec_type=spec_type,
|
||||
protocol=protocol, namespace=namespace)
|
||||
|
@ -322,6 +334,59 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
|
|||
'Number of exclusive responses is incorrect. '
|
||||
'Got %s.' % cmd_outputs)
|
||||
|
||||
def assert_backend_amount(self, url, amount, headers=None,
|
||||
repetitions=100, threads=8, request_timeout=5):
|
||||
def req():
|
||||
resp = requests.get(url, headers=headers)
|
||||
self.assertEqual(requests.codes.OK, resp.status_code,
|
||||
'Non-successful request to {}'.format(url))
|
||||
return resp
|
||||
|
||||
def pred(tester, responses):
|
||||
unique_resps = set(resp.content for resp in responses)
|
||||
tester.assertEqual(amount, len(unique_resps),
|
||||
'Incorrect amount of unique backends. '
|
||||
'Got {}'.format(unique_resps))
|
||||
|
||||
self._run_threaded_and_assert(req, pred, repetitions=repetitions,
|
||||
threads=threads,
|
||||
fn_timeout=request_timeout)
|
||||
|
||||
def assert_backend_amount_from_pod(self, url, amount, pod, repetitions=100,
|
||||
threads=8, request_timeout=7):
|
||||
def req():
|
||||
stdout, stderr = self.exec_command_in_pod(
|
||||
pod, ['/usr/bin/curl', '-Ss', '-w "\n%{http_code}"', url],
|
||||
stderr=True)
|
||||
# check if the curl command succeeded
|
||||
if stderr:
|
||||
LOG.error('Failed to curl the service at {}. '
|
||||
'Err: {}'.format(url, stderr))
|
||||
raise lib_exc.UnexpectedResponseCode()
|
||||
delimiter = stdout.rfind('\n')
|
||||
content = stdout[:delimiter]
|
||||
status_code = int(stdout[delimiter + 1:].split('"')[0])
|
||||
self.assertEqual(requests.codes.OK, status_code,
|
||||
'Non-successful request to {}'.format(url))
|
||||
return content
|
||||
|
||||
def pred(tester, responses):
|
||||
unique_resps = set(resp for resp in responses)
|
||||
tester.assertEqual(amount, len(unique_resps),
|
||||
'Incorrect amount of unique backends. '
|
||||
'Got {}'.format(unique_resps))
|
||||
|
||||
self._run_threaded_and_assert(req, pred, repetitions=repetitions,
|
||||
threads=threads,
|
||||
fn_timeout=request_timeout)
|
||||
|
||||
def _run_threaded_and_assert(self, fn, predicate, repetitions=100,
|
||||
threads=8, fn_timeout=1):
|
||||
tp = pool.ThreadPool(processes=threads)
|
||||
results = [tp.apply_async(fn) for _ in range(repetitions)]
|
||||
resps = [result.get(timeout=fn_timeout) for result in results]
|
||||
predicate(self, resps)
|
||||
|
||||
@classmethod
|
||||
def verify_lbaas_endpoints_configured(cls, ep_name, namespace='default'):
|
||||
cls._verify_endpoints_annotation(
|
||||
|
|
|
@ -11,14 +11,9 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import shlex
|
||||
import subprocess
|
||||
|
||||
from oslo_log import log as logging
|
||||
from tempest import config
|
||||
from tempest.lib import decorators
|
||||
from tempest.lib import exceptions as lib_exc
|
||||
|
||||
from kuryr_tempest_plugin.tests.scenario import base
|
||||
|
||||
|
@ -44,35 +39,17 @@ class TestServiceScenario(base.BaseKuryrScenarioTest):
|
|||
@decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfc1a1a9')
|
||||
def test_service_curl(self):
|
||||
LOG.info("Trying to curl the service IP %s" % self.service_ip)
|
||||
cmd = "curl -Ss {dst_ip}".format(dst_ip=self.service_ip)
|
||||
|
||||
def curl():
|
||||
try:
|
||||
return subprocess.check_output(shlex.split(cmd))
|
||||
except subprocess.CalledProcessError:
|
||||
LOG.error("Checking output of curl to the service IP %s "
|
||||
"failed" % self.service_ip)
|
||||
raise lib_exc.UnexpectedResponseCode()
|
||||
|
||||
self._run_and_assert_fn(curl)
|
||||
self.assert_backend_amount('http://{}'.format(self.service_ip),
|
||||
self.pod_num)
|
||||
|
||||
@decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfa1a7a9')
|
||||
def test_pod_service_curl(self):
|
||||
pod_name, pod = self.create_pod()
|
||||
self.addCleanup(self.delete_pod, pod_name)
|
||||
cmd = [
|
||||
"/bin/sh", "-c", "curl -Ss {dst_ip}".format(
|
||||
dst_ip=self.service_ip)]
|
||||
|
||||
def curl():
|
||||
output = self.exec_command_in_pod(pod_name, cmd)
|
||||
# check if the curl command succeeded
|
||||
if not output:
|
||||
LOG.error("Curl the service IP %s failed" % self.service_ip)
|
||||
raise lib_exc.UnexpectedResponseCode()
|
||||
return output
|
||||
|
||||
self._run_and_assert_fn(curl)
|
||||
self.assert_backend_amount_from_pod(
|
||||
'http://{}'.format(self.service_ip),
|
||||
self.pod_num,
|
||||
pod_name)
|
||||
|
||||
|
||||
class TestLoadBalancerServiceScenario(base.BaseKuryrScenarioTest):
|
||||
|
@ -94,17 +71,10 @@ class TestLoadBalancerServiceScenario(base.BaseKuryrScenarioTest):
|
|||
def test_lb_service_http(self):
|
||||
|
||||
LOG.info("Trying to curl the service IP %s" % self.service_ip)
|
||||
cmd = "curl -Ss {dst_ip}".format(dst_ip=self.service_ip)
|
||||
|
||||
def curl():
|
||||
try:
|
||||
return subprocess.check_output(shlex.split(cmd))
|
||||
except subprocess.CalledProcessError:
|
||||
LOG.error("Checking output of curl to the service IP %s "
|
||||
"failed" % self.service_ip)
|
||||
raise lib_exc.UnexpectedResponseCode()
|
||||
self._run_and_assert_fn(curl)
|
||||
self.assert_backend_amount('http://{}'.format(self.service_ip),
|
||||
self.pod_num)
|
||||
|
||||
# TODO(yboaron): Use multi threads for 'test_vm_service_http' test
|
||||
@decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfa1b5a9')
|
||||
def test_vm_service_http(self):
|
||||
ssh_client, fip = self.create_vm_for_connectivity_test()
|
||||
|
|
Loading…
Reference in New Issue