Merge "Executor docstring & attribute tweaks"

This commit is contained in:
Jenkins 2015-07-15 00:56:55 +00:00 committed by Gerrit Code Review
commit 8a9b5d44ae
4 changed files with 22 additions and 17 deletions

View File

@ -35,7 +35,6 @@ class FakeBlockingThread(object):
class BlockingExecutor(impl_pooledexecutor.PooledExecutor):
"""A message executor which blocks the current thread.
The blocking executor's start() method functions as a request processing

View File

@ -25,7 +25,6 @@ LOG = logging.getLogger(__name__)
class EventletExecutor(impl_pooledexecutor.PooledExecutor):
"""A message executor which integrates with eventlet.
This is an executor which polls for incoming messages from a greenthread

View File

@ -15,6 +15,7 @@
# under the License.
import collections
import functools
import threading
from concurrent import futures
@ -31,25 +32,30 @@ _pool_opts = [
class PooledExecutor(base.ExecutorBase):
"""A message executor which integrates with threads.
"""A message executor which integrates with some async executor.
A message process that polls for messages from a dispatching thread and
on reception of an incoming message places the message to be processed in
a thread pool to be executed at a later time.
This will create a message thread that polls for messages from a
dispatching thread and on reception of an incoming message places the
message to be processed into a async executor to be executed at a later
time.
"""
# NOTE(harlowja): if eventlet is being used and the thread module is monkey
# patched this should/is supposed to work the same as the eventlet based
# executor.
# NOTE(harlowja): Make it somewhat easy to change this via
# inheritance (since there does exist other executor types that could be
# used/tried here).
_executor_cls = futures.ThreadPoolExecutor
# These may be overridden by subclasses (and implemented using whatever
# objects make most sense for the provided async execution model).
_event_cls = threading.Event
_lock_cls = threading.Lock
# Pooling and dispatching (executor submission) will happen from a
# thread created from this class/function.
_thread_cls = threading.Thread
# This one **must** be overridden by a subclass.
_executor_cls = None
# Blocking function that should wait for all provided futures to finish.
_wait_for_all = functools.partial(futures.wait,
return_when=futures.ALL_COMPLETED)
def __init__(self, conf, listener, dispatcher):
super(PooledExecutor, self).__init__(conf, listener, dispatcher)
self.conf.register_opts(_pool_opts)
@ -108,5 +114,5 @@ class PooledExecutor(base.ExecutorBase):
incomplete_fs = list(self._incomplete)
self._incomplete.clear()
if incomplete_fs:
futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED)
self._wait_for_all(incomplete_fs)
self._executor = None

View File

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from concurrent import futures
import futurist
from oslo_messaging._executors import impl_pooledexecutor
@ -26,4 +26,5 @@ class ThreadExecutor(impl_pooledexecutor.PooledExecutor):
on reception of an incoming message places the message to be processed in
a thread pool to be executed at a later time.
"""
_executor_cls = futures.ThreadPoolExecutor
_executor_cls = futurist.ThreadPoolExecutor