Revert "Revert "Permit aborting loopingcall while sleeping""
This reverts commit 5975da493b
.
Added code to support the case where unit tests are not being
monkey patched (example heat).
Change-Id: If715fbe21ac085e4f5c83cef0729dbca8dcb19ca
This commit is contained in:
parent
55e0fd8f71
commit
9091986228
|
@ -17,11 +17,13 @@
|
|||
|
||||
import random
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
from eventlet import event
|
||||
from eventlet import greenthread
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import eventletutils
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import reflection
|
||||
from oslo_utils import timeutils
|
||||
|
@ -75,6 +77,53 @@ def _safe_wrapper(f, kind, func_name):
|
|||
return func
|
||||
|
||||
|
||||
def _Event():
|
||||
if eventletutils.is_monkey_patched('thread'):
|
||||
return _ThreadingEvent()
|
||||
else:
|
||||
return _GreenEvent()
|
||||
|
||||
|
||||
class _ThreadingEvent(object):
|
||||
def __init__(self):
|
||||
self._abort = threading.Event()
|
||||
|
||||
def is_running(self):
|
||||
return not self._abort.is_set()
|
||||
|
||||
def clear(self):
|
||||
self._abort.clear()
|
||||
|
||||
def wait(self, timeout):
|
||||
self._abort.wait(timeout)
|
||||
|
||||
def stop(self):
|
||||
self._abort.set()
|
||||
|
||||
def done(self):
|
||||
pass
|
||||
|
||||
|
||||
class _GreenEvent(object):
|
||||
def __init__(self):
|
||||
self._running = False
|
||||
|
||||
def is_running(self):
|
||||
return self._running
|
||||
|
||||
def clear(self):
|
||||
self._running = True
|
||||
|
||||
def wait(self, timeout):
|
||||
greenthread.sleep(timeout)
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
|
||||
def done(self):
|
||||
self._running = False
|
||||
|
||||
|
||||
class LoopingCallBase(object):
|
||||
_KIND = _("Unknown looping call")
|
||||
|
||||
|
@ -85,19 +134,26 @@ class LoopingCallBase(object):
|
|||
self.args = args
|
||||
self.kw = kw
|
||||
self.f = f
|
||||
self._running = False
|
||||
self._thread = None
|
||||
self.done = None
|
||||
self._event = _Event()
|
||||
|
||||
@property
|
||||
def _running(self):
|
||||
return self._event.is_running()
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
self._event.stop()
|
||||
|
||||
def wait(self):
|
||||
return self.done.wait()
|
||||
|
||||
def _on_done(self, gt, *args, **kwargs):
|
||||
self._thread = None
|
||||
self._running = False
|
||||
self._event.done()
|
||||
|
||||
def _sleep(self, timeout):
|
||||
self._event.wait(timeout)
|
||||
|
||||
def _start(self, idle_for, initial_delay=None, stop_on_exception=True):
|
||||
"""Start the looping
|
||||
|
@ -114,8 +170,8 @@ class LoopingCallBase(object):
|
|||
"""
|
||||
if self._thread is not None:
|
||||
raise RuntimeError(self._RUN_ONLY_ONE_MESSAGE)
|
||||
self._running = True
|
||||
self.done = event.Event()
|
||||
self._event.clear()
|
||||
self._thread = greenthread.spawn(
|
||||
self._run_loop, idle_for,
|
||||
initial_delay=initial_delay, stop_on_exception=stop_on_exception)
|
||||
|
@ -129,7 +185,7 @@ class LoopingCallBase(object):
|
|||
func = self.f if stop_on_exception else _safe_wrapper(self.f, kind,
|
||||
func_name)
|
||||
if initial_delay:
|
||||
greenthread.sleep(initial_delay)
|
||||
self._sleep(initial_delay)
|
||||
try:
|
||||
watch = timeutils.StopWatch()
|
||||
while self._running:
|
||||
|
@ -143,7 +199,7 @@ class LoopingCallBase(object):
|
|||
'for %(idle).02f seconds',
|
||||
{'func_name': func_name, 'idle': idle,
|
||||
'kind': kind})
|
||||
greenthread.sleep(idle)
|
||||
self._sleep(idle)
|
||||
except LoopingCallDone as e:
|
||||
self.done.send(e.retvalue)
|
||||
except Exception:
|
||||
|
|
|
@ -285,7 +285,7 @@ class DynamicLoopingCallTestCase(test_base.BaseTestCase):
|
|||
else:
|
||||
self.num_runs = self.num_runs - 1
|
||||
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
@mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep')
|
||||
def test_timeout_task_without_return(self, sleep_mock):
|
||||
self.num_runs = 1
|
||||
timer = loopingcall.DynamicLoopingCall(
|
||||
|
@ -294,7 +294,7 @@ class DynamicLoopingCallTestCase(test_base.BaseTestCase):
|
|||
timer.start(periodic_interval_max=5).wait()
|
||||
sleep_mock.assert_has_calls([mock.call(5)])
|
||||
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
@mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep')
|
||||
def test_interval_adjustment(self, sleep_mock):
|
||||
self.num_runs = 2
|
||||
|
||||
|
@ -303,7 +303,7 @@ class DynamicLoopingCallTestCase(test_base.BaseTestCase):
|
|||
|
||||
sleep_mock.assert_has_calls([mock.call(5), mock.call(1)])
|
||||
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
@mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep')
|
||||
def test_initial_delay(self, sleep_mock):
|
||||
self.num_runs = 1
|
||||
|
||||
|
@ -315,7 +315,7 @@ class DynamicLoopingCallTestCase(test_base.BaseTestCase):
|
|||
|
||||
class TestBackOffLoopingCall(test_base.BaseTestCase):
|
||||
@mock.patch('random.SystemRandom.gauss')
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
@mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep')
|
||||
def test_exponential_backoff(self, sleep_mock, random_mock):
|
||||
def false():
|
||||
return False
|
||||
|
@ -340,7 +340,7 @@ class TestBackOffLoopingCall(test_base.BaseTestCase):
|
|||
self.assertEqual(expected_times, sleep_mock.call_args_list)
|
||||
|
||||
@mock.patch('random.SystemRandom.gauss')
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
@mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep')
|
||||
def test_exponential_backoff_negative_value(self, sleep_mock, random_mock):
|
||||
def false():
|
||||
return False
|
||||
|
@ -366,7 +366,7 @@ class TestBackOffLoopingCall(test_base.BaseTestCase):
|
|||
self.assertEqual(expected_times, sleep_mock.call_args_list)
|
||||
|
||||
@mock.patch('random.SystemRandom.gauss')
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
@mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep')
|
||||
def test_no_backoff(self, sleep_mock, random_mock):
|
||||
random_mock.return_value = 1
|
||||
func = mock.Mock()
|
||||
|
@ -381,7 +381,7 @@ class TestBackOffLoopingCall(test_base.BaseTestCase):
|
|||
self.assertTrue(retvalue, 'return value')
|
||||
|
||||
@mock.patch('random.SystemRandom.gauss')
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
@mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep')
|
||||
def test_no_sleep(self, sleep_mock, random_mock):
|
||||
# Any call that executes properly the first time shouldn't sleep
|
||||
random_mock.return_value = 1
|
||||
|
@ -394,7 +394,7 @@ class TestBackOffLoopingCall(test_base.BaseTestCase):
|
|||
self.assertTrue(retvalue, 'return value')
|
||||
|
||||
@mock.patch('random.SystemRandom.gauss')
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
@mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep')
|
||||
def test_max_interval(self, sleep_mock, random_mock):
|
||||
def false():
|
||||
return False
|
||||
|
|
Loading…
Reference in New Issue