From 0a68329c50ba7c9963e6e4c50f12e518f2602783 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 21 Oct 2015 16:14:49 -0700 Subject: [PATCH] Allow for providing different run work sync functions It can be quite useful to replace the running of work function in the sync executor with a function that logs or with a function that shows whats running, or other so enable that replacement by having a new sync executor parameter that can be provided to give alternate functions that run work items. Change-Id: I8f4a806ed698370b4f5a300d74ee507cbe4eebc5 --- futurist/_futures.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/futurist/_futures.py b/futurist/_futures.py index 668e5f3..1617a46 100644 --- a/futurist/_futures.py +++ b/futurist/_futures.py @@ -20,6 +20,7 @@ import threading from concurrent import futures as _futures from concurrent.futures import process as _process from concurrent.futures import thread as _thread +import six from futurist import _green from futurist import _utils @@ -229,23 +230,30 @@ class SynchronousExecutor(_futures.Executor): threading = _Threading() - def __init__(self, green=False): + def __init__(self, green=False, run_work_func=lambda work: work.run()): """Synchronous executor constructor. :param green: when enabled this forces the usage of greened lock classes and green futures (so that the internals of this object operate correctly under eventlet) :type green: bool + :param run_work_func: callable that takes a single work item and + runs it (typically in a blocking manner) + :param run_work_func: callable """ if green and not _utils.EVENTLET_AVAILABLE: raise RuntimeError('Eventlet is needed to use a green' ' synchronous executor') + if not six.callable(run_work_func): + raise ValueError("Run work parameter expected to be callable") + self._run_work_func = run_work_func self._shutoff = False if green: self.threading = _green.threading self._future_cls = GreenFuture else: self._future_cls = Future + self._run_work_func = run_work_func self._gatherer = _Gatherer(self._submit, self.threading.lock_object, start_before_submit=True) @@ -281,8 +289,7 @@ class SynchronousExecutor(_futures.Executor): def _submit(self, fn, *args, **kwargs): fut = self._future_cls() - runner = _utils.WorkItem(fut, fn, args, kwargs) - runner.run() + self._run_work_func(_utils.WorkItem(fut, fn, args, kwargs)) return fut