Merge "Rework wait and wait_pass methods"
This commit is contained in:
commit
7a5f97b9a9
|
@ -16,6 +16,7 @@ from __future__ import absolute_import
|
|||
|
||||
import functools
|
||||
import os
|
||||
import signal
|
||||
import socket
|
||||
import time
|
||||
import warnings
|
||||
|
@ -79,74 +80,154 @@ def tcp_ping(host, port, timeout=None):
|
|||
return True
|
||||
|
||||
|
||||
def wait(predicate, interval=5, timeout=60, timeout_msg="Waiting timed out"):
|
||||
"""Wait until predicate will become True.
|
||||
class RunLimit(object):
|
||||
def __init__(self, timeout=60, timeout_msg='Timeout'):
|
||||
self.seconds = int(timeout)
|
||||
self.error_message = timeout_msg
|
||||
logger.debug("RunLimit.__init__(timeout={0}, timeout_msg='{1}'"
|
||||
.format(timeout, timeout_msg))
|
||||
|
||||
returns number of seconds that is left or 0 if timeout is None.
|
||||
def handle_timeout(self, signum, frame):
|
||||
logger.debug("RunLimit.handle_timeout reached!")
|
||||
raise error.TimeoutError(self.error_message.format(spent=self.seconds))
|
||||
|
||||
def __enter__(self):
|
||||
signal.signal(signal.SIGALRM, self.handle_timeout)
|
||||
signal.alarm(self.seconds)
|
||||
logger.debug("RunLimit.__enter__(seconds={0}".format(self.seconds))
|
||||
|
||||
def __exit__(self, exc_type, value, traceback):
|
||||
time_remained = signal.alarm(0)
|
||||
logger.debug("RunLimit.__exit__ , remained '{0}' sec"
|
||||
.format(time_remained))
|
||||
|
||||
|
||||
def _check_wait_args(predicate,
|
||||
predicate_args,
|
||||
predicate_kwargs,
|
||||
interval,
|
||||
timeout):
|
||||
|
||||
if not callable(predicate):
|
||||
raise TypeError("Not callable raising_predicate has been posted: '{0}'"
|
||||
.format(predicate))
|
||||
if not isinstance(predicate_args, (list, tuple)):
|
||||
raise TypeError("Incorrect predicate_args type for '{0}', should be "
|
||||
"list or tuple, got '{1}'"
|
||||
.format(predicate, type(predicate_args)))
|
||||
if not isinstance(predicate_kwargs, dict):
|
||||
raise TypeError("Incorrect predicate_kwargs type, should be dict, "
|
||||
"got {}".format(type(predicate_kwargs)))
|
||||
if interval <= 0:
|
||||
raise ValueError("For '{0}(*{1}, **{2})', waiting interval '{3}'sec is"
|
||||
" wrong".format(predicate,
|
||||
predicate_args,
|
||||
predicate_kwargs,
|
||||
interval))
|
||||
if timeout <= 0:
|
||||
raise ValueError("For '{0}(*{1}, **{2})', timeout '{3}'sec is "
|
||||
"wrong".format(predicate,
|
||||
predicate_args,
|
||||
predicate_kwargs,
|
||||
timeout))
|
||||
|
||||
|
||||
def wait(predicate, interval=5, timeout=60, timeout_msg="Waiting timed out",
|
||||
predicate_args=None, predicate_kwargs=None):
|
||||
"""Wait until predicate will become True.
|
||||
|
||||
Options:
|
||||
|
||||
interval - seconds between checks.
|
||||
|
||||
timeout - raise error.TimeoutError if predicate won't become True after
|
||||
this amount of seconds. 'None' disables timeout.
|
||||
|
||||
timeout_msg - text of the error.TimeoutError
|
||||
:param interval: - seconds between checks.
|
||||
:param timeout: - raise TimeoutError if predicate won't become True after
|
||||
this amount of seconds.
|
||||
:param timeout_msg: - text of the TimeoutError
|
||||
:param predicate_args: - positional arguments for given predicate wrapped
|
||||
in list or tuple
|
||||
:param predicate_kwargs: - dict with named arguments for the predicate
|
||||
|
||||
"""
|
||||
predicate_args = predicate_args or []
|
||||
predicate_kwargs = predicate_kwargs or {}
|
||||
_check_wait_args(predicate, predicate_args, predicate_kwargs,
|
||||
interval, timeout)
|
||||
msg = (
|
||||
"{msg}\nWaited for pass {cmd}: {spent} seconds."
|
||||
"".format(
|
||||
msg=timeout_msg,
|
||||
cmd=repr(predicate),
|
||||
spent="{spent:0.3f}"
|
||||
))
|
||||
|
||||
start_time = time.time()
|
||||
if not timeout:
|
||||
return predicate()
|
||||
while not predicate():
|
||||
if start_time + timeout < time.time():
|
||||
msg = (
|
||||
"{msg}\nWaited for pass {cmd}: {spent:0.3f} seconds."
|
||||
"".format(
|
||||
msg=timeout_msg,
|
||||
cmd=predicate.func_name,
|
||||
spent=time.time() - start_time
|
||||
))
|
||||
logger.debug(msg)
|
||||
raise error.TimeoutError(timeout_msg)
|
||||
with RunLimit(timeout, msg):
|
||||
while True:
|
||||
result = predicate(*predicate_args, **predicate_kwargs)
|
||||
if result:
|
||||
logger.debug("wait() completed with result='{0}'"
|
||||
.format(result))
|
||||
return result
|
||||
|
||||
seconds_to_sleep = max(
|
||||
0,
|
||||
min(interval, start_time + timeout - time.time()))
|
||||
time.sleep(seconds_to_sleep)
|
||||
if start_time + timeout < time.time():
|
||||
err_msg = msg.format(spent=time.time() - start_time)
|
||||
logger.error(err_msg)
|
||||
raise error.TimeoutError(err_msg)
|
||||
|
||||
return timeout + start_time - time.time()
|
||||
|
||||
|
||||
def wait_pass(
|
||||
raising_predicate,
|
||||
expected=Exception,
|
||||
interval=5, timeout=None,
|
||||
timeout_msg="Waiting timed out"
|
||||
):
|
||||
"""Wait for successful return from predicate or expected exception"""
|
||||
if not callable(raising_predicate):
|
||||
raise TypeError('Not callable raising_predicate has been posted')
|
||||
start_time = time.time()
|
||||
while True:
|
||||
try:
|
||||
return raising_predicate()
|
||||
except expected:
|
||||
if timeout and start_time + timeout < time.time():
|
||||
msg = (
|
||||
"{msg}\nWaited for pass {cmd}: {spent:0.3f} seconds."
|
||||
"".format(
|
||||
msg=timeout_msg,
|
||||
cmd=raising_predicate.func_name,
|
||||
spent=time.time() - start_time
|
||||
))
|
||||
logger.error(msg)
|
||||
raise
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def wait_pass(raising_predicate, expected=Exception,
|
||||
interval=5, timeout=60, timeout_msg="Waiting timed out",
|
||||
predicate_args=None, predicate_kwargs=None):
|
||||
"""Wait for successful return from predicate ignoring expected exception
|
||||
|
||||
Options:
|
||||
|
||||
:param interval: - seconds between checks.
|
||||
:param timeout: - raise TimeoutError if predicate still throwing expected
|
||||
exception after this amount of seconds.
|
||||
:param timeout_msg: - text of the TimeoutError
|
||||
:param predicate_args: - positional arguments for given predicate wrapped
|
||||
in list or tuple
|
||||
:param predicate_kwargs: - dict with named arguments for the predicate
|
||||
:param expected_exc: Exception that can be ignored while waiting (its
|
||||
possible to pass several using list/tuple
|
||||
|
||||
"""
|
||||
|
||||
predicate_args = predicate_args or []
|
||||
predicate_kwargs = predicate_kwargs or {}
|
||||
_check_wait_args(raising_predicate, predicate_args, predicate_kwargs,
|
||||
interval, timeout)
|
||||
msg = (
|
||||
"{msg}\nWaited for pass {cmd}: {spent} seconds."
|
||||
"".format(
|
||||
msg=timeout_msg,
|
||||
cmd=repr(raising_predicate),
|
||||
spent="{spent:0.3f}"
|
||||
))
|
||||
|
||||
start_time = time.time()
|
||||
with RunLimit(timeout, msg):
|
||||
while True:
|
||||
try:
|
||||
result = raising_predicate(*predicate_args, **predicate_kwargs)
|
||||
logger.debug("wait_pass() completed with result='{0}'"
|
||||
.format(result))
|
||||
return result
|
||||
except expected as e:
|
||||
if start_time + timeout < time.time():
|
||||
err_msg = msg.format(spent=time.time() - start_time)
|
||||
logger.error(err_msg)
|
||||
raise error.TimeoutError(err_msg)
|
||||
|
||||
logger.debug("Got expected exception {!r}, continue".format(e))
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def wait_tcp(host, port, timeout, timeout_msg="Waiting timed out"):
|
||||
is_port_active = functools.partial(tcp_ping, host=host, port=port)
|
||||
wait(is_port_active, timeout=timeout, timeout_msg=timeout_msg)
|
||||
wait(tcp_ping, timeout=timeout, timeout_msg=timeout_msg,
|
||||
predicate_kwargs={'host': host, 'port': port})
|
||||
|
||||
|
||||
def wait_ssh_cmd(
|
||||
|
|
|
@ -120,34 +120,49 @@ class TestHelpersHelpers(unittest.TestCase):
|
|||
ping.assert_called_once_with(host, port, timeout)
|
||||
self.assertFalse(result)
|
||||
|
||||
@mock.patch('time.time', autospec=True)
|
||||
@mock.patch('time.sleep', autospec=True)
|
||||
def test_wait(self, sleep, time):
|
||||
time.return_value = 1
|
||||
predicate = mock.Mock(return_value=True)
|
||||
def test__check_wait_args(self):
|
||||
helpers._check_wait_args(lambda: None, [], {}, 1, 1)
|
||||
|
||||
result = helpers.wait(predicate, interval=0, timeout=0)
|
||||
def test__check_wait_args_wrong_predicate(self):
|
||||
with self.assertRaises(TypeError):
|
||||
helpers._check_wait_args('predicate', [], {}, 1, 1)
|
||||
|
||||
def test__check_wait_args_wrong_predicate_args(self):
|
||||
with self.assertRaises(TypeError):
|
||||
helpers._check_wait_args(lambda: None, 12, {}, 1, 1)
|
||||
|
||||
def test__check_wait_args_wrong_predicate_kwargs(self):
|
||||
with self.assertRaises(TypeError):
|
||||
helpers._check_wait_args(lambda: None, [], {'one', 'two'}, 1, 1)
|
||||
|
||||
def test__check_wait_args_wrong_interval(self):
|
||||
with self.assertRaises(ValueError):
|
||||
helpers._check_wait_args(lambda: None, ['one'], {'two': 2}, -2, 1)
|
||||
|
||||
def test__check_wait_args_wrong_timeout(self):
|
||||
with self.assertRaises(ValueError):
|
||||
helpers._check_wait_args(lambda: None, (1, 2, 3), {}, 10, 0)
|
||||
|
||||
@mock.patch('time.sleep', autospec=True)
|
||||
def test_wait(self, sleep):
|
||||
|
||||
predicate = mock.Mock(return_value=True)
|
||||
result = helpers.wait(predicate, interval=1, timeout=1)
|
||||
|
||||
self.assertTrue(result)
|
||||
predicate.assert_called_once()
|
||||
time.assert_called_once()
|
||||
sleep.assert_not_called()
|
||||
|
||||
time.reset_mock()
|
||||
time.return_value = 1
|
||||
sleep.reset_mock()
|
||||
predicate.reset_mock()
|
||||
predicate.return_value = True
|
||||
|
||||
predicate.return_value = 2
|
||||
result = helpers.wait(predicate, interval=2, timeout=2)
|
||||
|
||||
self.assertEqual(result, 2)
|
||||
predicate.assert_called_once()
|
||||
sleep.assert_not_called()
|
||||
time.assert_has_calls([mock.call(), mock.call()])
|
||||
|
||||
time.reset_mock()
|
||||
time.return_value = 1
|
||||
sleep.reset_mock()
|
||||
predicate.reset_mock()
|
||||
predicate.return_value = False
|
||||
|
@ -155,45 +170,31 @@ class TestHelpersHelpers(unittest.TestCase):
|
|||
self.assertRaises(
|
||||
error.TimeoutError,
|
||||
helpers.wait,
|
||||
predicate, interval=2, timeout=-2)
|
||||
sleep.assert_not_called()
|
||||
time.assert_has_calls([mock.call(), mock.call()])
|
||||
predicate, interval=1, timeout=1)
|
||||
predicate.assert_called()
|
||||
|
||||
@mock.patch('time.time', autospec=True)
|
||||
@mock.patch('time.sleep', autospec=True)
|
||||
def test_wait_pass(self, sleep, time):
|
||||
def test_wait_pass(self, sleep):
|
||||
predicate = mock.Mock(return_value=True)
|
||||
|
||||
result = helpers.wait_pass(predicate)
|
||||
self.assertTrue(result)
|
||||
time.assert_called_once()
|
||||
sleep.assert_not_called()
|
||||
|
||||
time.reset_mock()
|
||||
time.return_value = 1
|
||||
sleep.reset_mock()
|
||||
predicate.reset_mock()
|
||||
predicate.side_effect = ValueError
|
||||
|
||||
self.assertRaises(
|
||||
ValueError,
|
||||
error.TimeoutError,
|
||||
helpers.wait_pass,
|
||||
predicate, timeout=-1)
|
||||
sleep.assert_not_called()
|
||||
time.assert_has_calls([mock.call(), mock.call()])
|
||||
predicate, timeout=1)
|
||||
|
||||
@mock.patch('devops.helpers.helpers.tcp_ping', autospec=True)
|
||||
@mock.patch('time.time', autospec=True)
|
||||
@mock.patch('time.sleep', autospec=True)
|
||||
def test_wait_tcp(self, sleep, time, ping):
|
||||
def test_wait_tcp(self, ping):
|
||||
host = '127.0.0.1'
|
||||
port = 65535
|
||||
timeout = 0
|
||||
timeout = 1
|
||||
|
||||
helpers.wait_tcp(host, port, timeout)
|
||||
|
||||
ping.assert_called_once_with(host=host, port=port)
|
||||
time.assert_called_once()
|
||||
sleep.assert_not_called()
|
||||
|
||||
@mock.patch('devops.helpers.ssh_client.SSHClient', autospec=True)
|
||||
@mock.patch('devops.helpers.helpers.wait')
|
||||
|
|
Loading…
Reference in New Issue