Use and verify event and latch wait() return using timeouts

Instead of blocking up the whole test suite when a latch or
event was not decremented to its desired value (or not set for
an event) we should use a reasonably high value that we use
when waiting for those actions to occur and verify that when those
wait() functions return that we have reached the desired state and
if not either raise an exception or stop further testing.

Fixes bug 1363739

Change-Id: I8b40282ac2db9cabd48b0b65c8a2a49610d77c4f
This commit is contained in:
Joshua Harlow 2014-09-02 12:36:52 -07:00 committed by Joshua Harlow
parent 4370fd19a5
commit 7ca631356e
10 changed files with 81 additions and 52 deletions

View File

@ -13,7 +13,6 @@
# under the License.
import logging
import threading
import six
@ -23,6 +22,7 @@ from taskflow.listeners import logging as logging_listener
from taskflow.types import timing as tt
from taskflow.utils import async_utils
from taskflow.utils import lock_utils
from taskflow.utils import threading_utils
LOG = logging.getLogger(__name__)
WAIT_TIMEOUT = 0.5
@ -64,7 +64,7 @@ class SingleThreadedConductor(base.Conductor):
self._wait_timeout = wait_timeout
else:
raise ValueError("Invalid timeout literal: %s" % (wait_timeout))
self._dead = threading.Event()
self._dead = threading_utils.Event()
@lock_utils.locked
def stop(self, timeout=None):
@ -81,8 +81,7 @@ class SingleThreadedConductor(base.Conductor):
be honored in the future) and False will be returned indicating this.
"""
self._wait_timeout.interrupt()
self._dead.wait(timeout)
return self._dead.is_set()
return self._dead.wait(timeout)
@property
def dispatching(self):

View File

@ -16,13 +16,13 @@
import logging
import socket
import threading
import kombu
import six
from taskflow.engines.worker_based import dispatcher
from taskflow.utils import misc
from taskflow.utils import threading_utils
LOG = logging.getLogger(__name__)
@ -39,7 +39,7 @@ class Proxy(object):
self._topic = topic
self._exchange_name = exchange_name
self._on_wait = on_wait
self._running = threading.Event()
self._running = threading_utils.Event()
self._dispatcher = dispatcher.TypeDispatcher(type_handlers)
self._dispatcher.add_requeue_filter(
# NOTE(skudriashev): Process all incoming messages only if proxy is

View File

@ -30,6 +30,7 @@ from taskflow import test
from taskflow.tests import utils as test_utils
from taskflow.utils import misc
from taskflow.utils import persistence_utils as pu
from taskflow.utils import threading_utils
@contextlib.contextmanager
@ -88,14 +89,15 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
with close_many(components.conductor, components.client):
t = make_thread(components.conductor)
t.start()
self.assertTrue(components.conductor.stop(0.5))
self.assertTrue(
components.conductor.stop(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
t.join()
def test_run(self):
components = self.make_components()
components.conductor.connect()
consumed_event = threading.Event()
consumed_event = threading_utils.Event()
def on_consume(state, details):
consumed_event.set()
@ -110,9 +112,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
backend=components.persistence)
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
consumed_event.wait(1.0)
self.assertTrue(consumed_event.is_set())
self.assertTrue(components.conductor.stop(1.0))
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence
@ -125,8 +126,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
def test_fail_run(self):
components = self.make_components()
components.conductor.connect()
consumed_event = threading.Event()
consumed_event = threading_utils.Event()
def on_consume(state, details):
consumed_event.set()
@ -141,9 +141,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
backend=components.persistence)
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
consumed_event.wait(1.0)
self.assertTrue(consumed_event.is_set())
self.assertTrue(components.conductor.stop(1.0))
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence

View File

@ -25,8 +25,10 @@ from taskflow.openstack.common import uuidutils
from taskflow.persistence.backends import impl_dir
from taskflow import states
from taskflow.test import mock
from taskflow.tests import utils as test_utils
from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils
from taskflow.utils import threading_utils
FLUSH_PATH_TPL = '/taskflow/flush-test/%s'
@ -52,8 +54,8 @@ def flush(client, path=None):
# before this context manager exits.
if not path:
path = FLUSH_PATH_TPL % uuidutils.generate_uuid()
created = threading.Event()
deleted = threading.Event()
created = threading_utils.Event()
deleted = threading_utils.Event()
def on_created(data, stat):
if stat is not None:
@ -67,13 +69,19 @@ def flush(client, path=None):
watchers.DataWatch(client, path, func=on_created)
client.create(path, makepath=True)
created.wait()
if not created.wait(test_utils.WAIT_TIMEOUT):
raise RuntimeError("Could not receive creation of %s in"
" the alloted timeout of %s seconds"
% (path, test_utils.WAIT_TIMEOUT))
try:
yield
finally:
watchers.DataWatch(client, path, func=on_deleted)
client.delete(path, recursive=True)
deleted.wait()
if not deleted.wait(test_utils.WAIT_TIMEOUT):
raise RuntimeError("Could not receive deletion of %s in"
" the alloted timeout of %s seconds"
% (path, test_utils.WAIT_TIMEOUT))
class BoardTestMixin(object):
@ -119,11 +127,13 @@ class BoardTestMixin(object):
self.assertRaises(excp.NotFound, self.board.wait, timeout=0.1)
def test_wait_arrival(self):
ev = threading.Event()
ev = threading_utils.Event()
jobs = []
def poster(wait_post=0.2):
ev.wait() # wait until the waiter is active
if not ev.wait(test_utils.WAIT_TIMEOUT):
raise RuntimeError("Waiter did not appear ready"
" in %s seconds" % test_utils.WAIT_TIMEOUT)
time.sleep(wait_post)
self.board.post('test', p_utils.temporary_log_book())

View File

@ -22,7 +22,9 @@ from concurrent import futures
import mock
from taskflow import test
from taskflow.tests import utils as test_utils
from taskflow.utils import lock_utils
from taskflow.utils import threading_utils
# NOTE(harlowja): Sleep a little so time.time() can not be the same (which will
# cause false positives when our overlap detection code runs). If there are
@ -353,7 +355,7 @@ class ReadWriteLockTest(test.TestCase):
def test_double_reader_writer(self):
lock = lock_utils.ReaderWriterLock()
activated = collections.deque()
active = threading.Event()
active = threading_utils.Event()
def double_reader():
with lock.read_lock():
@ -369,7 +371,7 @@ class ReadWriteLockTest(test.TestCase):
reader = threading.Thread(target=double_reader)
reader.start()
active.wait()
self.assertTrue(active.wait(test_utils.WAIT_TIMEOUT))
writer = threading.Thread(target=happy_writer)
writer.start()

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
import time
from concurrent import futures
@ -24,15 +23,16 @@ from taskflow.engines.worker_based import executor
from taskflow.engines.worker_based import protocol as pr
from taskflow import test
from taskflow.test import mock
from taskflow.tests import utils
from taskflow.tests import utils as test_utils
from taskflow.utils import misc
from taskflow.utils import threading_utils
class TestWorkerTaskExecutor(test.MockTestCase):
def setUp(self):
super(TestWorkerTaskExecutor, self).setUp()
self.task = utils.DummyTask()
self.task = test_utils.DummyTask()
self.task_uuid = 'task-uuid'
self.task_args = {'a': 'a'}
self.task_result = 'task-result'
@ -42,7 +42,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.executor_uuid = 'executor-uuid'
self.executor_exchange = 'executor-exchange'
self.executor_topic = 'test-topic1'
self.proxy_started_event = threading.Event()
self.proxy_started_event = threading_utils.Event()
# patch classes
self.proxy_mock, self.proxy_inst_mock = self.patchClass(
@ -121,7 +121,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.assertEqual(len(ex._requests_cache), 0)
expected_calls = [
mock.call.transition_and_log_error(pr.FAILURE, logger=mock.ANY),
mock.call.set_result(result=utils.FailureMatcher(failure))
mock.call.set_result(result=test_utils.FailureMatcher(failure))
]
self.assertEqual(expected_calls, self.request_inst_mock.mock_calls)
@ -303,7 +303,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
ex.start()
# make sure proxy thread started
self.proxy_started_event.wait()
self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT))
# stop executor
ex.stop()
@ -319,7 +319,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
ex.start()
# make sure proxy thread started
self.proxy_started_event.wait()
self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT))
# start executor again
ex.start()
@ -362,14 +362,14 @@ class TestWorkerTaskExecutor(test.MockTestCase):
ex.start()
# make sure thread started
self.proxy_started_event.wait()
self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT))
# restart executor
ex.stop()
ex.start()
# make sure thread started
self.proxy_started_event.wait()
self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT))
# stop executor
ex.stop()

View File

@ -23,15 +23,15 @@ from taskflow import test
from taskflow.test import mock
from taskflow.tests import utils as test_utils
from taskflow.types import latch
from taskflow.utils import threading_utils
TEST_EXCHANGE, TEST_TOPIC = ('test-exchange', 'test-topic')
BARRIER_WAIT_TIMEOUT = 1.0
POLLING_INTERVAL = 0.01
class TestMessagePump(test.TestCase):
def test_notify(self):
barrier = threading.Event()
barrier = threading_utils.Event()
on_notify = mock.MagicMock()
on_notify.side_effect = lambda *args, **kwargs: barrier.set()
@ -49,8 +49,7 @@ class TestMessagePump(test.TestCase):
p.wait()
p.publish(pr.Notify(), TEST_TOPIC)
barrier.wait(BARRIER_WAIT_TIMEOUT)
self.assertTrue(barrier.is_set())
self.assertTrue(barrier.wait(test_utils.WAIT_TIMEOUT))
p.stop()
t.join()
@ -58,7 +57,7 @@ class TestMessagePump(test.TestCase):
on_notify.assert_called_with({}, mock.ANY)
def test_response(self):
barrier = threading.Event()
barrier = threading_utils.Event()
on_response = mock.MagicMock()
on_response.side_effect = lambda *args, **kwargs: barrier.set()
@ -77,7 +76,7 @@ class TestMessagePump(test.TestCase):
resp = pr.Response(pr.RUNNING)
p.publish(resp, TEST_TOPIC)
barrier.wait(BARRIER_WAIT_TIMEOUT)
self.assertTrue(barrier.wait(test_utils.WAIT_TIMEOUT))
self.assertTrue(barrier.is_set())
p.stop()
t.join()
@ -126,7 +125,7 @@ class TestMessagePump(test.TestCase):
uuidutils.generate_uuid(),
pr.EXECUTE, [], None, None), TEST_TOPIC)
barrier.wait(BARRIER_WAIT_TIMEOUT)
self.assertTrue(barrier.wait(test_utils.WAIT_TIMEOUT))
self.assertEqual(0, barrier.needed)
p.stop()
t.join()

View File

@ -16,7 +16,6 @@
import contextlib
import string
import threading
import six
@ -26,15 +25,18 @@ from taskflow import retry
from taskflow import task
from taskflow.utils import kazoo_utils
from taskflow.utils import misc
from taskflow.utils import threading_utils
ARGS_KEY = '__args__'
KWARGS_KEY = '__kwargs__'
ORDER_KEY = '__order__'
ZK_TEST_CONFIG = {
'timeout': 1.0,
'hosts': ["localhost:2181"],
}
# If latches/events take longer than this to become empty/set, something is
# usually wrong and should be debugged instead of deadlocking...
WAIT_TIMEOUT = 300
@contextlib.contextmanager
@ -342,16 +344,14 @@ class WaitForOneFromTask(SaveOrderTask):
self.wait_states = [wait_states]
else:
self.wait_states = wait_states
self.event = threading.Event()
self.event = threading_utils.Event()
def execute(self):
# NOTE(imelnikov): if test was not complete within
# 5 minutes, something is terribly wrong
self.event.wait(300)
if not self.event.is_set():
raise RuntimeError('Timeout occurred while waiting '
if not self.event.wait(WAIT_TIMEOUT):
raise RuntimeError('%s second timeout occurred while waiting '
'for %s to change state to %s'
% (self.wait_for, self.wait_states))
% (WAIT_TIMEOUT, self.wait_for,
self.wait_states))
return super(WaitForOneFromTask, self).execute()
def callback(self, state, details):

View File

@ -14,10 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
from oslo.utils import timeutils
from taskflow.utils import threading_utils
class Timeout(object):
"""An object which represents a timeout.
@ -29,7 +29,7 @@ class Timeout(object):
if timeout < 0:
raise ValueError("Timeout must be >= 0 and not %s" % (timeout))
self._timeout = timeout
self._event = threading.Event()
self._event = threading_utils.Event()
def interrupt(self):
self._event.set()

View File

@ -15,11 +15,31 @@
# under the License.
import multiprocessing
import sys
import threading
from six.moves import _thread
if sys.version_info[0:2] == (2, 6):
# This didn't return that was/wasn't set in 2.6, since we actually care
# whether it did or didn't add that feature by taking the code from 2.7
# that added this functionality...
#
# TODO(harlowja): remove when we can drop 2.6 support.
class Event(threading._Event):
def wait(self, timeout=None):
self.__cond.acquire()
try:
if not self.__flag:
self.__cond.wait(timeout)
return self.__flag
finally:
self.__cond.release()
else:
Event = threading.Event
def get_ident():
"""Return the 'thread identifier' of the current thread."""
return _thread.get_ident()