futurist 0.9.0 release

meta:version: 0.9.0
 meta:series: mitaka
 meta:release-type: release
 -----BEGIN PGP SIGNATURE-----
 
 iEYEABECAAYFAlZ4WF4ACgkQgNg6eWEDv1kEegCfb0XC5YZbdhrv4GC20epIIfPw
 LlgAnjKijGc+xVm1kPBh5w4w8LLuIyCL
 =Ev5u
 -----END PGP SIGNATURE-----

Merge tag '0.9.0' into debian/mitaka

futurist 0.9.0 release

meta:version: 0.9.0
meta:series: mitaka
meta:release-type: release
This commit is contained in:
Thomas Goirand 2016-01-16 03:49:08 +00:00
commit 9c1c412850
17 changed files with 255 additions and 194 deletions

View File

@ -1,7 +1,8 @@
[run]
branch = True
source = futurist
omit = futurist/openstack/*
omit = futurist/tests/*,futurist/openstack/*
[report]
ignore-errors = True
ignore_errors = True
precision = 2

1
.gitignore vendored
View File

@ -23,6 +23,7 @@ pip-log.txt
# Unit test / coverage reports
.coverage
cover
.tox
nosetests.xml
.testrepository

View File

@ -1 +0,0 @@
.. This is a generated file! Do not edit.

View File

@ -10,7 +10,11 @@ Futurist
:target: https://pypi.python.org/pypi/futurist/
:alt: Downloads
Code from the future, delivered to you in the **now**.
Code from the future, delivered to you in the **now**. The goal of this library
would be to provide a well documented futures classes/utilities/additions that
allows for providing a level of transparency in how asynchronous work gets
executed. This library currently adds statistics gathering, an eventlet
executor, a synchronous executor etc.
* Free software: Apache license
* Documentation: http://docs.openstack.org/developer/futurist

View File

@ -15,7 +15,6 @@
# under the License.
import functools
import sys
import threading
from concurrent import futures as _futures
@ -23,16 +22,7 @@ from concurrent.futures import process as _process
from concurrent.futures import thread as _thread
import six
try:
from eventlet import greenpool
from eventlet import patcher as greenpatcher
from eventlet import queue as greenqueue
from eventlet.green import threading as greenthreading
except ImportError:
greenpatcher, greenpool, greenqueue, greenthreading = (None, None,
None, None)
from futurist import _green
from futurist import _utils
@ -227,56 +217,6 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor):
return self._gatherer.submit(fn, *args, **kwargs)
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException:
exc_type, exc_value, exc_tb = sys.exc_info()
try:
if six.PY2:
self.future.set_exception_info(exc_value, exc_tb)
else:
self.future.set_exception(exc_value)
finally:
del(exc_type, exc_value, exc_tb)
else:
self.future.set_result(result)
if _utils.EVENTLET_AVAILABLE:
class _GreenThreading(object):
@staticmethod
def event_object(*args, **kwargs):
return greenthreading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return greenthreading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return greenthreading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return greenthreading.Condition(*args, **kwargs)
_green_threading = _GreenThreading()
else:
_green_threading = None
class SynchronousExecutor(_futures.Executor):
"""Executor that uses the caller to execute calls synchronously.
@ -290,23 +230,30 @@ class SynchronousExecutor(_futures.Executor):
threading = _Threading()
def __init__(self, green=False):
def __init__(self, green=False, run_work_func=lambda work: work.run()):
"""Synchronous executor constructor.
:param green: when enabled this forces the usage of greened lock
classes and green futures (so that the internals of this
object operate correctly under eventlet)
:type green: bool
:param run_work_func: callable that takes a single work item and
runs it (typically in a blocking manner)
:param run_work_func: callable
"""
if green and not _utils.EVENTLET_AVAILABLE:
raise RuntimeError('Eventlet is needed to use a green'
' synchronous executor')
if not six.callable(run_work_func):
raise ValueError("Run work parameter expected to be callable")
self._run_work_func = run_work_func
self._shutoff = False
if green:
self.threading = _green_threading
self.threading = _green.threading
self._future_cls = GreenFuture
else:
self._future_cls = Future
self._run_work_func = run_work_func
self._gatherer = _Gatherer(self._submit,
self.threading.lock_object,
start_before_submit=True)
@ -342,37 +289,10 @@ class SynchronousExecutor(_futures.Executor):
def _submit(self, fn, *args, **kwargs):
fut = self._future_cls()
runner = _WorkItem(fut, fn, args, kwargs)
runner.run()
self._run_work_func(_utils.WorkItem(fut, fn, args, kwargs))
return fut
class _GreenWorker(object):
def __init__(self, executor, work, work_queue):
self.executor = executor
self.work = work
self.work_queue = work_queue
def __call__(self):
# Run our main piece of work.
try:
self.work.run()
finally:
# Consume any delayed work before finishing (this is how we finish
# work that was to big for the pool size, but needs to be finished
# no matter).
while True:
try:
w = self.work_queue.get_nowait()
except greenqueue.Empty:
break
else:
try:
w.run()
finally:
self.work_queue.task_done()
class GreenFuture(Future):
__doc__ = Future.__doc__
@ -385,8 +305,8 @@ class GreenFuture(Future):
# functions will correctly yield to eventlet. If this is not done then
# waiting on the future never actually causes the greenthreads to run
# and thus you wait for infinity.
if not greenpatcher.is_monkey_patched('threading'):
self._condition = greenthreading.Condition()
if not _green.is_monkey_patched('threading'):
self._condition = _green.threading.condition_object()
class GreenThreadPoolExecutor(_futures.Executor):
@ -399,7 +319,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
It gathers statistics about the submissions executed for post-analysis...
"""
threading = _green_threading
threading = _green.threading
def __init__(self, max_workers=1000, check_and_reject=None):
"""Initializes a green thread pool executor.
@ -424,10 +344,10 @@ class GreenThreadPoolExecutor(_futures.Executor):
if max_workers <= 0:
raise ValueError("Max workers must be greater than zero")
self._max_workers = max_workers
self._pool = greenpool.GreenPool(self._max_workers)
self._delayed_work = greenqueue.Queue()
self._pool = _green.Pool(self._max_workers)
self._delayed_work = _green.Queue()
self._check_and_reject = check_and_reject or (lambda e, waiting: None)
self._shutdown_lock = greenthreading.Lock()
self._shutdown_lock = self.threading.lock_object()
self._shutdown = False
self._gatherer = _Gatherer(self._submit,
self.threading.lock_object)
@ -459,7 +379,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
def _submit(self, fn, *args, **kwargs):
f = GreenFuture()
work = _WorkItem(f, fn, args, kwargs)
work = _utils.WorkItem(f, fn, args, kwargs)
if not self._spin_up(work):
self._delayed_work.put(work)
return f
@ -473,7 +393,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
"""
alive = self._pool.running() + self._pool.waiting()
if alive < self._max_workers:
self._pool.spawn_n(_GreenWorker(self, work, self._delayed_work))
self._pool.spawn_n(_green.GreenWorker(work, self._delayed_work))
return True
return False
@ -494,8 +414,10 @@ class ExecutorStatistics(object):
__slots__ = ['_failures', '_executed', '_runtime', '_cancelled']
__repr_format = ("failures=%(failures)s, executed=%(executed)s, "
"runtime=%(runtime)s, cancelled=%(cancelled)s")
_REPR_MSG_TPL = ("<ExecutorStatistics object at 0x%(ident)x"
" (failures=%(failures)s,"
" executed=%(executed)s, runtime=%(runtime)0.2f,"
" cancelled=%(cancelled)s)>")
def __init__(self, failures=0, executed=0, runtime=0.0, cancelled=0):
self._failures = failures
@ -550,13 +472,10 @@ class ExecutorStatistics(object):
return self._runtime / self._executed
def __repr__(self):
r = self.__class__.__name__
r += "("
r += self.__repr_format % ({
return self._REPR_MSG_TPL % ({
'ident': id(self),
'failures': self._failures,
'executed': self._executed,
'runtime': self._runtime,
'cancelled': self._cancelled,
})
r += ")"
return r

84
futurist/_green.py Normal file
View File

@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from futurist import _utils
try:
from eventlet import greenpool
from eventlet import patcher as greenpatcher
from eventlet import queue as greenqueue
from eventlet.green import threading as greenthreading
except ImportError:
greenpatcher, greenpool, greenqueue, greenthreading = (None, None,
None, None)
if _utils.EVENTLET_AVAILABLE:
# Aliases that we use and only expose vs the whole of eventlet...
Pool = greenpool.GreenPool
Queue = greenqueue.Queue
is_monkey_patched = greenpatcher.is_monkey_patched
class GreenThreading(object):
@staticmethod
def event_object(*args, **kwargs):
return greenthreading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return greenthreading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return greenthreading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return greenthreading.Condition(*args, **kwargs)
threading = GreenThreading()
else:
threading = None
Pool = None
Queue = None
is_monkey_patched = lambda mod: False
class GreenWorker(object):
def __init__(self, work, work_queue):
self.work = work
self.work_queue = work_queue
def __call__(self):
# Run our main piece of work.
try:
self.work.run()
finally:
# Consume any delayed work before finishing (this is how we finish
# work that was to big for the pool size, but needs to be finished
# no matter).
while True:
try:
w = self.work_queue.get_nowait()
except greenqueue.Empty:
break
else:
try:
w.run()
finally:
self.work_queue.task_done()

View File

@ -20,6 +20,7 @@ import sys
import traceback
from monotonic import monotonic as now # noqa
import six
try:
import eventlet as _eventlet # noqa
@ -28,6 +29,31 @@ except ImportError:
EVENTLET_AVAILABLE = False
class WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException:
exc_type, exc_value, exc_tb = sys.exc_info()
try:
if six.PY2:
self.future.set_exception_info(exc_value, exc_tb)
else:
self.future.set_exception(exc_value)
finally:
del(exc_type, exc_value, exc_tb)
else:
self.future.set_result(result)
class Failure(object):
"""Object that captures a exception (and its associated information)."""
@ -81,14 +107,6 @@ def get_callback_name(cb):
return ".".join(segments)
def reverse_enumerate(items):
"""Yields (index, item) from given list/tuple in reverse order."""
idx = len(items)
while idx > 0:
yield (idx - 1, items[idx - 1])
idx -= 1
def get_optimal_thread_count(default=2):
"""Try to guess optimal thread count for current system."""
try:

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import fractions
import functools
import heapq
@ -45,9 +46,21 @@ IMMEDIATE = 'immediate'
class Watcher(object):
"""A **read-only** object representing a periodics callbacks activities."""
_REPR_MSG_TPL = ("<Watcher object at 0x%(ident)x "
"("
"runs=%(runs)s,"
" successes=%(successes)s,"
" failures=%(failures)s,"
" elapsed=%(elapsed)0.2f,"
" elapsed_waiting=%(elapsed_waiting)0.2f"
")>")
def __init__(self, metrics):
self._metrics = metrics
def __repr__(self):
return self._REPR_MSG_TPL % dict(ident=id(self), **self._metrics)
@property
def runs(self):
"""How many times the periodic callback has been ran."""
@ -102,7 +115,7 @@ def _check_attrs(obj):
return missing_attrs
def periodic(spacing, run_immediately=False):
def periodic(spacing, run_immediately=False, enabled=True):
"""Tags a method/function as wanting/able to execute periodically.
:param spacing: how often to run the decorated function (required)
@ -111,6 +124,8 @@ def periodic(spacing, run_immediately=False):
immediately or wait until the spacing provided has
elapsed before running for the first time
:type run_immediately: boolean
:param enabled: whether the task is enabled to run
:type enabled: boolean
"""
if spacing <= 0:
@ -118,7 +133,7 @@ def periodic(spacing, run_immediately=False):
" zero instead of %s" % spacing)
def wrapper(f):
f._is_periodic = True
f._is_periodic = enabled
f._periodic_spacing = spacing
f._periodic_run_immediately = run_immediately
@ -257,12 +272,8 @@ def _run_callback_no_retain(now_func, cb, *args, **kwargs):
def _build(now_func, callables, next_run_scheduler):
schedule = _Schedule()
now = None
immediates = []
# Reverse order is used since these are later popped off (and to
# ensure the popping order is first -> last we need to append them
# in the opposite ordering last -> first).
reverse_it = utils.reverse_enumerate(callables)
for index, (cb, _cb_name, args, kwargs) in reverse_it:
immediates = collections.deque()
for index, (cb, _cb_name, args, kwargs) in enumerate(callables):
if cb._periodic_run_immediately:
immediates.append(index)
else:
@ -531,6 +542,61 @@ class PeriodicWorker(object):
def _run(self, executor, runner):
"""Main worker run loop."""
def _process_scheduled():
# Figure out when we should run next (by selecting the
# minimum item from the heap, where the minimum should be
# the callable that needs to run next and has the lowest
# next desired run time).
with self._waiter:
while (not self._schedule and
not self._tombstone.is_set() and
not self._immediates):
self._waiter.wait(self.MAX_LOOP_IDLE)
if self._tombstone.is_set():
# We were requested to stop, so stop.
return
if self._immediates:
# This will get processed in _process_immediates()
# in the next loop call.
return
submitted_at = now = self._now_func()
next_run, index = self._schedule.pop()
when_next = next_run - now
if when_next <= 0:
# Run & schedule its next execution.
cb, cb_name, args, kwargs = self._callables[index]
self._log.debug("Submitting periodic function '%s'",
cb_name)
fut = executor.submit(runner,
self._now_func,
cb, *args, **kwargs)
fut.add_done_callback(functools.partial(_on_done,
PERIODIC,
cb, cb_name,
index,
submitted_at))
else:
# Gotta wait...
self._schedule.push(next_run, index)
when_next = min(when_next, self.MAX_LOOP_IDLE)
self._waiter.wait(when_next)
def _process_immediates():
try:
index = self._immediates.popleft()
except IndexError:
pass
else:
cb, cb_name, args, kwargs = self._callables[index]
submitted_at = self._now_func()
self._log.debug("Submitting immediate function '%s'", cb_name)
fut = executor.submit(runner, self._now_func,
cb, *args, **kwargs)
fut.add_done_callback(functools.partial(_on_done,
IMMEDIATE,
cb, cb_name,
index, submitted_at))
def _on_done(kind, cb, cb_name, index, submitted_at, fut):
started_at, finished_at, failure = fut.result()
cb_metrics, _watcher = self._watchers[index]
@ -553,57 +619,8 @@ class PeriodicWorker(object):
self._waiter.notify_all()
while not self._tombstone.is_set():
if self._immediates:
# Run & schedule its next execution.
try:
index = self._immediates.pop()
except IndexError:
pass
else:
cb, cb_name, args, kwargs = self._callables[index]
submitted_at = self._now_func()
self._log.debug("Submitting immediate function '%s'",
cb_name)
fut = executor.submit(runner,
self._now_func,
cb, *args, **kwargs)
fut.add_done_callback(functools.partial(_on_done,
IMMEDIATE,
cb, cb_name,
index,
submitted_at))
else:
# Figure out when we should run next (by selecting the
# minimum item from the heap, where the minimum should be
# the callable that needs to run next and has the lowest
# next desired run time).
with self._waiter:
while (not self._schedule and
not self._tombstone.is_set()):
self._waiter.wait(self.MAX_LOOP_IDLE)
if self._tombstone.is_set():
break
submitted_at = now = self._now_func()
next_run, index = self._schedule.pop()
when_next = next_run - now
if when_next <= 0:
# Run & schedule its next execution.
cb, cb_name, args, kwargs = self._callables[index]
self._log.debug("Submitting periodic function '%s'",
cb_name)
fut = executor.submit(runner,
self._now_func,
cb, *args, **kwargs)
fut.add_done_callback(functools.partial(_on_done,
PERIODIC,
cb, cb_name,
index,
submitted_at))
else:
# Gotta wait...
self._schedule.push(next_run, index)
when_next = min(when_next, self.MAX_LOOP_IDLE)
self._waiter.wait(when_next)
_process_immediates()
_process_scheduled()
def _on_finish(self):
# TODO(harlowja): this may be to verbose for people?
@ -718,6 +735,9 @@ class PeriodicWorker(object):
self._dead.clear()
for cb_metrics, _watcher in self._watchers:
for k in list(six.iterkeys(cb_metrics)):
# NOTE(harlowja): mutate the original dictionaries keys
# so that the watcher (which references the same dictionary
# keys) is able to see those changes.
cb_metrics[k] = 0
self._immediates, self._schedule = _build(
self._now_func, self._callables, self._initial_schedule_strategy)

View File

@ -19,5 +19,4 @@ from oslotest import base
class TestCase(base.BaseTestCase):
"""Test case base class for all unit tests."""

View File

@ -79,7 +79,13 @@ class TestExecutors(testscenarios.TestWithScenarios, base.TestCase):
self.assertEqual(3, self.executor.statistics.executed)
self.assertEqual(1, self.executor.statistics.failures)
self.assertGreaterEqual(self.executor.statistics.runtime, 0.2)
self.assertGreaterEqual(self.executor.statistics.runtime,
# It appears that the the thread run loop
# may call this before 0.2 seconds (or 0.2
# will not be represented as a float correctly)
# is really up so accommodate for that
# happening...
0.199)
def test_post_shutdown_raises(self):
executor = self.executor_cls(**self.executor_kwargs)

View File

@ -133,6 +133,22 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
nows = list(reversed(nows))
self._test_strategy('last_finished', nows, last_now, 5.0)
def test_waiting_immediate_add_processed(self):
ran_at = []
@periodics.periodic(0.1, run_immediately=True)
def activated_periodic():
ran_at.append(time.time())
w = periodics.PeriodicWorker([], **self.worker_kwargs)
with self.create_destroy(w.start, allow_empty=True):
# Give some time for the thread to start...
self.sleep(0.5)
w.add(activated_periodic)
while len(ran_at) == 0:
self.sleep(0.1)
w.stop()
def test_double_start_fail(self):
w = periodics.PeriodicWorker([], **self.worker_kwargs)
with self.create_destroy(w.start, allow_empty=True):
@ -200,14 +216,12 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
self.sleep(0.1)
w.stop()
def test_not_added(self):
def test_disabled(self):
@periodics.periodic(0.5)
@periodics.periodic(0.5, enabled=False)
def no_add_me():
pass
no_add_me._is_periodic = False
@periodics.periodic(0.5)
def add_me():
pass

View File

@ -1,6 +0,0 @@
[DEFAULT]
# The list of modules to copy from oslo-incubator.git
# The base module to hold the copy of openstack.common
base=futurist

View File

@ -2,7 +2,7 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
pbr<2.0,>=1.6
pbr>=1.6
six>=1.9.0
monotonic>=0.3 # Apache-2.0
futures>=3.0;python_version=='2.7' or python_version=='2.6'

View File

@ -34,6 +34,9 @@ upload-dir = doc/build/html
directory = futurist/locale
domain = futurist
[pbr]
warnerrors = True
[wheel]
universal = 1

View File

@ -25,5 +25,5 @@ except ImportError:
pass
setuptools.setup(
setup_requires=['pbr>=1.3'],
setup_requires=['pbr>=1.8'],
pbr=True)

View File

@ -12,7 +12,7 @@ coverage>=3.6
discover
python-subunit>=0.0.18
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2
oslosphinx>=2.5.0 # Apache-2.0
oslosphinx!=3.4.0,>=2.5.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
testrepository>=0.0.18
testscenarios>=0.4

View File

@ -25,7 +25,7 @@ commands = python setup.py test --coverage --testr-args='{posargs}'
commands =
python setup.py testr --slowest --testr-args='{posargs}'
sphinx-build -b doctest doc/source doc/build
doc8 doc/source
doc8 --ignore-path "doc/source/history.rst" doc/source
[testenv:docs]
commands = python setup.py build_sphinx
@ -38,5 +38,4 @@ commands = oslo_debug_helper {posargs}
show-source = True
ignore = E123,E125
builtins = _
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build