diff --git a/futurist/tests/test_executors.py b/futurist/tests/test_executors.py index c1ee549..f354e76 100644 --- a/futurist/tests/test_executors.py +++ b/futurist/tests/test_executors.py @@ -12,8 +12,10 @@ # License for the specific language governing permissions and limitations # under the License. +import threading import time +from eventlet.green import threading as green_threading import testscenarios from testtools import testcase @@ -140,29 +142,41 @@ class TestExecutors(testscenarios.TestWithScenarios, base.TestCase): self.assertEqual(5, len(happy_completed)) -_REJECTION = rejection.reject_when_reached(1) - - class TestRejection(testscenarios.TestWithScenarios, base.TestCase): + rejector = rejection.reject_when_reached(1) + scenarios = [ ('green', {'executor_cls': futurist.GreenThreadPoolExecutor, - 'executor_kwargs': {'check_and_reject': _REJECTION, - 'max_workers': 1}}), + 'executor_kwargs': {'check_and_reject': rejector, + 'max_workers': 1}, + 'event_cls': green_threading.Event}), ('thread', {'executor_cls': futurist.ThreadPoolExecutor, - 'executor_kwargs': {'check_and_reject': _REJECTION, - 'max_workers': 1}}), + 'executor_kwargs': {'check_and_reject': rejector, + 'max_workers': 1}, + 'event_cls': threading.Event}), ] def setUp(self): super(TestRejection, self).setUp() self.executor = self.executor_cls(**self.executor_kwargs) + self.addCleanup(self.executor.shutdown, wait=True) def test_rejection(self): - self.addCleanup(self.executor.shutdown) + ev = self.event_cls() + ev_thread_started = self.event_cls() + self.addCleanup(ev.set) + + def wait_until_set(check_delay): + ev_thread_started.set() + while not ev.is_set(): + ev.wait(check_delay) # 1 worker + 1 item of backlog - for _i in range(2): - self.executor.submit(delayed, 0.5) + self.executor.submit(wait_until_set, 0.1) + # ensure the above thread has started before doing anything + # else. + ev_thread_started.wait() + self.executor.submit(wait_until_set, 0.1) self.assertRaises(futurist.RejectedSubmission, self.executor.submit, returns_one)