Allow synchronous executors to run in green compat. mode

Change-Id: Idcde3249064cba79f9c977aa97991b9de29cef3b
This commit is contained in:
Joshua Harlow 2015-06-06 12:18:08 -07:00 committed by Joshua Harlow
parent cba2ef8169
commit 5087ba9f6f
3 changed files with 34 additions and 13 deletions

View File

@ -14,6 +14,7 @@ Executors
.. autoclass:: futurist.SynchronousExecutor
:members:
:special-members: __init__
.. autoclass:: futurist.ThreadPoolExecutor
:members:

View File

@ -205,10 +205,27 @@ class SynchronousExecutor(_futures.Executor):
It gathers statistics about the submissions executed for post-analysis...
"""
def __init__(self):
def __init__(self, green=False):
"""Synchronous executor constructor.
:param green: when enabled this forces the usage of greened lock
classes and green futures (so that the internals of this
object operate correctly under eventlet)
:type green: bool
"""
if green and not _utils.EVENTLET_AVAILABLE:
raise RuntimeError('Eventlet is needed to use a green'
' synchronous executor')
self._shutoff = False
if green:
lock_cls = greenthreading.Lock
self._future_cls = GreenFuture
else:
lock_cls = threading.Lock
self._future_cls = Future
self._gatherer = _Gatherer(self._submit,
start_before_submit=True)
start_before_submit=True,
lock_cls=lock_cls)
@property
def alive(self):
@ -240,10 +257,10 @@ class SynchronousExecutor(_futures.Executor):
return self._gatherer.submit(fn, *args, **kwargs)
def _submit(self, fn, *args, **kwargs):
f = Future()
runner = _WorkItem(f, fn, args, kwargs)
fut = self._future_cls()
runner = _WorkItem(fut, fn, args, kwargs)
runner.run()
return f
return fut
class _GreenWorker(object):

View File

@ -40,18 +40,21 @@ def delayed(wait_secs):
class TestExecutors(testscenarios.TestWithScenarios, base.TestCase):
scenarios = [
('sync', {'executor_cls': futurist.SynchronousExecutor,
'restartable': True}),
'restartable': True, 'executor_kwargs': {}}),
('green_sync', {'executor_cls': futurist.SynchronousExecutor,
'restartable': True,
'executor_kwargs': {'green': True}}),
('green', {'executor_cls': futurist.GreenThreadPoolExecutor,
'restartable': False}),
'restartable': False, 'executor_kwargs': {}}),
('thread', {'executor_cls': futurist.ThreadPoolExecutor,
'restartable': False}),
'restartable': False, 'executor_kwargs': {}}),
('process', {'executor_cls': futurist.ProcessPoolExecutor,
'restartable': False}),
'restartable': False, 'executor_kwargs': {}}),
]
def setUp(self):
super(TestExecutors, self).setUp()
self.executor = self.executor_cls()
self.executor = self.executor_cls(**self.executor_kwargs)
def tearDown(self):
super(TestExecutors, self).tearDown()
@ -79,7 +82,7 @@ class TestExecutors(testscenarios.TestWithScenarios, base.TestCase):
self.assertGreaterEqual(self.executor.statistics.runtime, 0.2)
def test_post_shutdown_raises(self):
executor = self.executor_cls()
executor = self.executor_cls(**self.executor_kwargs)
executor.shutdown()
self.assertRaises(RuntimeError, executor.submit, returns_one)
@ -87,7 +90,7 @@ class TestExecutors(testscenarios.TestWithScenarios, base.TestCase):
if not self.restartable:
raise testcase.TestSkipped("not restartable")
else:
executor = self.executor_cls()
executor = self.executor_cls(**self.executor_kwargs)
fut = executor.submit(returns_one)
self.assertEqual(1, fut.result())
executor.shutdown()
@ -103,7 +106,7 @@ class TestExecutors(testscenarios.TestWithScenarios, base.TestCase):
executor.shutdown()
def test_alive(self):
with self.executor_cls() as executor:
with self.executor_cls(**self.executor_kwargs) as executor:
self.assertTrue(executor.alive)
self.assertFalse(executor.alive)