Retire Packaging Deb project repos

This commit is part of a series to retire the Packaging Deb
project. Step 2 is to remove all content from the project
repos, replacing it with a README notification where to find
ongoing work, and how to recover the repo if needed at some
future point (as in
https://docs.openstack.org/infra/manual/drivers.html#retiring-a-project).

Change-Id: I9fce77de093a2922ef3cc9484cc2880cec812cba
This commit is contained in:
Tony Breeds 2017-09-12 15:59:04 -06:00
parent fc76675610
commit 8077f22f3a
47 changed files with 14 additions and 3698 deletions

View File

@ -1,8 +0,0 @@
[run]
branch = True
source = futurist
omit = futurist/tests/*,futurist/openstack/*
[report]
ignore_errors = True
precision = 2

54
.gitignore vendored
View File

@ -1,54 +0,0 @@
*.py[cod]
# C extensions
*.so
# Packages
*.egg
*.egg-info
dist
build
eggs
parts
bin
var
sdist
develop-eggs
.installed.cfg
lib
lib64
# Installer logs
pip-log.txt
# Unit test / coverage reports
.coverage
cover
.tox
nosetests.xml
.testrepository
.venv
# Translations
*.mo
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# Complexity
output/*.html
output/*/index.html
# Sphinx
doc/build
# pbr generates these
AUTHORS
ChangeLog
# Editors
*~
.*.swp
.*sw?

View File

@ -1,4 +0,0 @@
[gerrit]
host=review.openstack.org
port=29418
project=openstack/futurist.git

View File

@ -1,3 +0,0 @@
# Format is:
# <preferred e-mail> <other e-mail 1>
# <preferred e-mail> <other e-mail 2>

View File

@ -1,7 +0,0 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
${PYTHON:-python} -m subunit.run discover -t ./ . $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@ -1,17 +0,0 @@
If you would like to contribute to the development of OpenStack, you must
follow the steps in this page:
http://docs.openstack.org/infra/manual/developers.html
If you already have a good understanding of how the system works and your
OpenStack accounts are set up, you can skip to the development workflow
section of this documentation to learn how changes to OpenStack should be
submitted for review via the Gerrit tool:
http://docs.openstack.org/infra/manual/developers.html#development-workflow
Pull requests submitted through GitHub will be ignored.
Bugs should be filed on Launchpad, not GitHub:
https://bugs.launchpad.net/futurist

View File

@ -1,4 +0,0 @@
Futurist Style Commandments
===============================================
Read the OpenStack Style Commandments http://docs.openstack.org/developer/hacking/

176
LICENSE
View File

@ -1,176 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.

View File

@ -1,6 +0,0 @@
include AUTHORS
include ChangeLog
exclude .gitignore
exclude .gitreview
global-exclude *.pyc

14
README Normal file
View File

@ -0,0 +1,14 @@
This project is no longer maintained.
The contents of this repository are still available in the Git
source code management system. To see the contents of this
repository before it reached its end of life, please check out the
previous commit with "git checkout HEAD^1".
For ongoing work on maintaining OpenStack packages in the Debian
distribution, please see the Debian OpenStack packaging team at
https://wiki.debian.org/OpenStack/.
For any further questions, please email
openstack-dev@lists.openstack.org or join #openstack-dev on
Freenode.

View File

@ -1,22 +0,0 @@
========
Futurist
========
.. image:: https://img.shields.io/pypi/v/futurist.svg
:target: https://pypi.python.org/pypi/futurist/
:alt: Latest Version
.. image:: https://img.shields.io/pypi/dm/futurist.svg
:target: https://pypi.python.org/pypi/futurist/
:alt: Downloads
Code from the future, delivered to you in the **now**. The goal of this library
would be to provide a well documented futures classes/utilities/additions that
allows for providing a level of transparency in how asynchronous work gets
executed. This library currently adds statistics gathering, an eventlet
executor, a synchronous executor etc.
* Free software: Apache license
* Documentation: http://docs.openstack.org/developer/futurist
* Source: http://git.openstack.org/cgit/openstack/futurist
* Bugs: http://bugs.launchpad.net/futurist

View File

@ -1,2 +0,0 @@
[python: **.py]

60
debian/changelog vendored
View File

@ -1,60 +0,0 @@
python-futurist (0.13.0-3) UNRELEASED; urgency=medium
* Standards-Version is 3.9.8 now (no change)
* d/rules: Changed UPSTREAM_GIT protocol to https
-- Ondřej Nový <novy@ondrej.org> Sat, 09 Apr 2016 19:26:50 +0200
python-futurist (0.13.0-2) unstable; urgency=medium
* Uploading to unstable.
-- Thomas Goirand <zigo@debian.org> Mon, 04 Apr 2016 09:25:07 +0000
python-futurist (0.13.0-1) experimental; urgency=medium
[ Ondřej Nový ]
* Fixed VCS URLs (https).
[ Thomas Goirand ]
* New upstream release.
* Fixed (build-)depends for this release.
* Standards-Version: 3.9.7 (no change).
* Disabled futurist.tests.test_executors.TestRejection.test_rejection which
is failing.
-- Thomas Goirand <zigo@debian.org> Wed, 02 Mar 2016 13:06:31 +0000
python-futurist (0.9.0-1) experimental; urgency=medium
* New upstream release.
* Fixed debian/copyright ordering.
-- Thomas Goirand <zigo@debian.org> Sat, 16 Jan 2016 03:48:28 +0000
python-futurist (0.5.0-2) unstable; urgency=medium
* Uploading to unstable.
-- Thomas Goirand <zigo@debian.org> Fri, 16 Oct 2015 09:31:19 +0000
python-futurist (0.5.0-1) experimental; urgency=medium
[ James Page ]
* d/rules: Drop use of dpkg-parsechangelog -S to ease backporting to
older Debian and Ubuntu releases.
[ Corey Bryant ]
* New upstream release.
* Align dependencies with upstream.
[ Thomas Goirand ]
* Drop versions already satisfied in Trusty.
-- Thomas Goirand <zigo@debian.org> Mon, 05 Oct 2015 08:21:43 +0000
python-futurist (0.1.2-1) unstable; urgency=medium
* Initial release. (Closes: #792500)
-- Thomas Goirand <zigo@debian.org> Wed, 15 Jul 2015 14:47:49 +0200

1
debian/compat vendored
View File

@ -1 +0,0 @@
9

113
debian/control vendored
View File

@ -1,113 +0,0 @@
Source: python-futurist
Section: python
Priority: optional
Maintainer: PKG OpenStack <openstack-devel@lists.alioth.debian.org>
Uploaders: Thomas Goirand <zigo@debian.org>,
Build-Depends: debhelper (>= 9),
dh-python,
python-all,
python-pbr (>= 1.8),
python-setuptools,
python-sphinx,
python3-all,
python3-pbr (>= 1.8),
python3-setuptools,
Build-Depends-Indep: python-concurrent.futures (>= 3.0),
python-contextlib2,
python-coverage,
python-eventlet (>= 0.18.4),
python-hacking (>= 0.10.0),
python-monotonic (>= 0.6),
python-oslosphinx (>= 2.5.0),
python-oslotest (>= 1.10.0),
python-six (>= 1.9.0),
python-testscenarios,
python-testtools (>= 1.4.0),
python3-contextlib2,
python3-eventlet (>= 0.18.4),
python3-monotonic (>= 0.6),
python3-oslotest (>= 1.10.0),
python3-six (>= 1.9.0),
python3-testscenarios,
python3-testtools (>= 1.4.0),
subunit (>= 1.1),
testrepository,
Standards-Version: 3.9.8
Vcs-Browser: https://anonscm.debian.org/cgit/openstack/python-futurist.git/
Vcs-Git: https://anonscm.debian.org/git/openstack/python-futurist.git
Homepage: http://www.openstack.org/
Package: python-futurist
Architecture: all
Depends: python-concurrent.futures (>= 3.0),
python-contextlib2,
python-monotonic (>= 0.6),
python-pbr (>= 1.8),
python-six (>= 1.9.0),
${misc:Depends},
${python:Depends},
Suggests: python-futurist-doc,
Description: useful additions to futures, from the future - Python 2.x
Code from the future, delivered to you in the now. For example:
* A futurist.GreenThreadPoolExecutor using eventlet green thread pools. It
provides a standard executor API/interface and it also gathers execution
statistics.
* A futurist.ProcessPoolExecutor derivative that gathers execution
statistics.
* A futurist.SynchronousExecutor that doesnt run concurrently. It has the
same executor API/interface and it also gathers execution statistics.
* A futurist.ThreadPoolExecutor derivative that gathers execution statistics.
* A futurist.periodics.PeriodicWorker that can use the previously mentioned
executors to run asynchronous work periodically in parallel or
synchronously.
* etc.
.
This package contains the Python 2.x module.
Package: python3-futurist
Architecture: all
Depends: python3-contextlib2,
python3-monotonic (>= 0.6),
python3-pbr (>= 1.8),
python3-six (>= 1.9.0),
${misc:Depends},
${python3:Depends},
Suggests: python-futurist-doc,
Description: useful additions to futures, from the future - Python 3.x
Code from the future, delivered to you in the now. For example:
* A futurist.GreenThreadPoolExecutor using eventlet green thread pools. It
provides a standard executor API/interface and it also gathers execution
statistics.
* A futurist.ProcessPoolExecutor derivative that gathers execution
statistics.
* A futurist.SynchronousExecutor that doesnt run concurrently. It has the
same executor API/interface and it also gathers execution statistics.
* A futurist.ThreadPoolExecutor derivative that gathers execution statistics.
* A futurist.periodics.PeriodicWorker that can use the previously mentioned
executors to run asynchronous work periodically in parallel or
synchronously.
* etc.
.
This package contains the Python 3.x module.
Package: python-futurist-doc
Section: doc
Architecture: all
Depends: ${misc:Depends},
${sphinxdoc:Depends},
Description: useful additions to futures, from the future - doc
Code from the future, delivered to you in the now. For example:
* A futurist.GreenThreadPoolExecutor using eventlet green thread pools. It
provides a standard executor API/interface and it also gathers execution
statistics.
* A futurist.ProcessPoolExecutor derivative that gathers execution
statistics.
* A futurist.SynchronousExecutor that doesnt run concurrently. It has the
same executor API/interface and it also gathers execution statistics.
* A futurist.ThreadPoolExecutor derivative that gathers execution statistics.
* A futurist.periodics.PeriodicWorker that can use the previously mentioned
executors to run asynchronous work periodically in parallel or
synchronously.
* etc.
.
This package contains the documentation.

29
debian/copyright vendored
View File

@ -1,29 +0,0 @@
Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: futurist
Source: http://www.openstack.org/
Files: *
Copyright: (c) 2010-2016, OpenStack Foundation <openstack-dev@lists.openstack.org>
(c) 2013-2015, Yahoo INC.
(c) 2013 Hewlett-Packard Development Company, L.P.
License: Apache-2.0
Files: debian/*
Copyright: (c) 2015-2016, Thomas Goirand <zigo@debian.org>
License: Apache-2.0
License: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
.
http://www.apache.org/licenses/LICENSE-2.0
.
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
.
On Debian-based systems the full text of the Apache version 2.0 license
can be found in /usr/share/common-licenses/Apache-2.0.

9
debian/gbp.conf vendored
View File

@ -1,9 +0,0 @@
[DEFAULT]
upstream-branch = master
debian-branch = debian/newton
upstream-tag = %(version)s
compression = xz
[buildpackage]
export-dir = ../build-area/

View File

@ -1,9 +0,0 @@
Document: python-futurist-doc
Title: futurist Documentation
Author: N/A
Abstract: Sphinx documentation for futurist
Section: Programming/Python
Format: HTML
Index: /usr/share/doc/python-futurist-doc/html/index.html
Files: /usr/share/doc/python-futurist-doc/html/*

59
debian/rules vendored
View File

@ -1,59 +0,0 @@
#!/usr/bin/make -f
PYTHONS:=$(shell pyversions -vr)
PYTHON3S:=$(shell py3versions -vr)
export OSLO_PACKAGE_VERSION=$(shell dpkg-parsechangelog | sed -n -e 's/^Version: //p' | sed -e 's/^[[:digit:]]*://' -e 's/[-].*//')
UPSTREAM_GIT := https://github.com/openstack/futurist.git
-include /usr/share/openstack-pkg-tools/pkgos.make
%:
dh $@ --buildsystem=python_distutils --with python2,python3,sphinxdoc
override_dh_auto_install:
set -e ; for pyvers in $(PYTHONS); do \
python$$pyvers setup.py install --install-layout=deb \
--root $(CURDIR)/debian/python-futurist; \
done
set -e ; for pyvers in $(PYTHON3S); do \
python$$pyvers setup.py install --install-layout=deb \
--root $(CURDIR)/debian/python3-futurist; \
done
rm -rf $(CURDIR)/debian/python*-futurist/usr/lib/python*/dist-packages/*.pth
override_dh_auto_test:
ifeq (,$(findstring nocheck, $(DEB_BUILD_OPTIONS)))
@echo "===> Running tests"
set -e ; set -x ; for i in 2.7 $(PYTHON3S) ; do \
PYMAJOR=`echo $$i | cut -d'.' -f1` ; \
echo "===> Testing with python$$i (python$$PYMAJOR)" ; \
rm -rf .testrepository ; \
testr-python$$PYMAJOR init ; \
TEMP_REZ=`mktemp -t` ; \
PYTHONPATH=$(CURDIR) PYTHON=python$$i testr-python$$PYMAJOR run --subunit 'futurist\.tests\.(?!.*test_executors\.TestRejection\.test_rejection.*)' | tee $$TEMP_REZ | subunit2pyunit ; \
cat $$TEMP_REZ | subunit-filter -s --no-passthrough | subunit-stats ; \
rm -f $$TEMP_REZ ; \
testr-python$$PYMAJOR slowest ; \
done
endif
override_dh_sphinxdoc:
ifeq (,$(findstring nodocs, $(DEB_BUILD_OPTIONS)))
sphinx-build -b html doc/source debian/python-futurist-doc/usr/share/doc/python-futurist-doc/html
dh_sphinxdoc -O--buildsystem=python_distutils
endif
override_dh_clean:
dh_clean -O--buildsystem=python_distutils
rm -rf build
# Commands not to run
override_dh_installcatalogs:
override_dh_installemacsen override_dh_installifupdown:
override_dh_installinfo override_dh_installmenu override_dh_installmime:
override_dh_installmodules override_dh_installlogcheck:
override_dh_installpam override_dh_installppp override_dh_installudev override_dh_installwm:
override_dh_installxfonts override_dh_gconf override_dh_icons override_dh_perl override_dh_usrlocal:
override_dh_installcron override_dh_installdebconf:
override_dh_installlogrotate override_dh_installgsettings:

View File

@ -1 +0,0 @@
3.0 (quilt)

View File

@ -1 +0,0 @@
extend-diff-ignore = "^[^/]*[.]egg-info/"

3
debian/watch vendored
View File

@ -1,3 +0,0 @@
version=3
opts="uversionmangle=s/\.(b|rc)/~$1/" \
https://github.com/openstack/futurist/tags .*/(\d[\d\.]+)\.tar\.gz

View File

@ -1,68 +0,0 @@
===
API
===
---------
Executors
---------
.. autoclass:: futurist.GreenThreadPoolExecutor
:members:
:special-members: __init__
.. autoclass:: futurist.ProcessPoolExecutor
:members:
:special-members: __init__
.. autoclass:: futurist.SynchronousExecutor
:members:
:special-members: __init__
.. autoclass:: futurist.ThreadPoolExecutor
:members:
:special-members: __init__
-------
Futures
-------
.. autoclass:: futurist.Future
:members:
.. autoclass:: futurist.GreenFuture
:members:
---------
Periodics
---------
.. autoclass:: futurist.periodics.PeriodicWorker
:members:
:special-members: __init__, __len__
.. autofunction:: futurist.periodics.periodic
.. autoclass:: futurist.periodics.Watcher
:members:
-------------
Miscellaneous
-------------
.. autoclass:: futurist.ExecutorStatistics
:members:
----------
Exceptions
----------
.. autoclass:: futurist.RejectedSubmission
:members:
-------
Waiters
-------
.. autofunction:: futurist.waiters.wait_for_any
.. autofunction:: futurist.waiters.wait_for_all
.. autoclass:: futurist.waiters.DoneAndNotDoneFutures

View File

@ -1,75 +0,0 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
sys.path.insert(0, os.path.abspath('../..'))
# -- General configuration ----------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.doctest',
'oslosphinx'
]
# autodoc generation is a bit aggressive and a nuisance when doing heavy
# text edit cycles.
# execute "export SPHINX_DEBUG=1" in your terminal to disable
# The suffix of source filenames.
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'futurist'
copyright = u'2013, OpenStack Foundation'
# If true, '()' will be appended to :func: etc. cross-reference text.
add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
add_module_names = True
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# -- Options for HTML output --------------------------------------------------
# The theme to use for HTML and HTML Help pages. Major themes that come with
# Sphinx are currently 'default' and 'sphinxdoc'.
# html_theme_path = ["."]
# html_theme = '_theme'
# html_static_path = ['static']
# Output file base name for HTML help builder.
htmlhelp_basename = '%sdoc' % project
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass
# [howto/manual]).
latex_documents = [
('index',
'%s.tex' % project,
u'%s Documentation' % project,
u'OpenStack Foundation', 'manual'),
]
# Example configuration for intersphinx: refer to the Python standard library.
#intersphinx_mapping = {'http://docs.python.org/': None}

View File

@ -1,5 +0,0 @@
============
Contributing
============
.. include:: ../../CONTRIBUTING.rst

View File

@ -1,258 +0,0 @@
========
Examples
========
-----------------------------------------
Creating and using a synchronous executor
-----------------------------------------
.. testcode::
# NOTE: enable printing timestamp for additional data
import sys
import futurist
import eventlet
def delayed_func():
print("started")
eventlet.sleep(3)
print("done")
#print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
e = futurist.SynchronousExecutor()
fut = e.submit(delayed_func)
eventlet.sleep(1)
print("Hello")
eventlet.sleep(1)
e.shutdown()
#print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
**Expected output:**
.. testoutput::
started
done
Hello
------------------------------------------------
Creating and using a green thread-based executor
------------------------------------------------
.. testcode::
# NOTE: enable printing timestamp for additional data
import sys
import futurist
import eventlet
def delayed_func():
print("started")
eventlet.sleep(3)
print("done")
#print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
e = futurist.GreenThreadPoolExecutor()
fut = e.submit(delayed_func)
eventlet.sleep(1)
print("Hello")
eventlet.sleep(1)
e.shutdown()
#print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
**Expected output:**
.. testoutput::
started
Hello
done
------------------------------------------
Creating and using a thread-based executor
------------------------------------------
.. testcode::
import time
import futurist
def delayed_func():
time.sleep(0.1)
return "hello"
e = futurist.ThreadPoolExecutor()
fut = e.submit(delayed_func)
print(fut.result())
e.shutdown()
**Expected output:**
.. testoutput::
hello
-------------------------------------------
Creating and using a process-based executor
-------------------------------------------
::
import time
import futurist
def delayed_func():
time.sleep(0.1)
return "hello"
e = futurist.ProcessPoolExecutor()
fut = e.submit(delayed_func)
print(fut.result())
e.shutdown()
**Expected output:**
::
hello
---------------------------------------
Running a set of functions periodically
---------------------------------------
.. testcode::
import futurist
from futurist import periodics
import time
import threading
@periodics.periodic(1)
def every_one(started_at):
print("1: %s" % (time.time() - started_at))
@periodics.periodic(2)
def every_two(started_at):
print("2: %s" % (time.time() - started_at))
@periodics.periodic(4)
def every_four(started_at):
print("4: %s" % (time.time() - started_at))
@periodics.periodic(6)
def every_six(started_at):
print("6: %s" % (time.time() - started_at))
started_at = time.time()
callables = [
# The function to run + any automatically provided positional and
# keyword arguments to provide to it everytime it is activated.
(every_one, (started_at,), {}),
(every_two, (started_at,), {}),
(every_four, (started_at,), {}),
(every_six, (started_at,), {}),
]
w = periodics.PeriodicWorker(callables)
# In this example we will run the periodic functions using a thread, it
# is also possible to just call the w.start() method directly if you do
# not mind blocking up the current program.
t = threading.Thread(target=w.start)
t.daemon = True
t.start()
# Run for 10 seconds and then stop.
while (time.time() - started_at) <= 10:
time.sleep(0.1)
w.stop()
w.wait()
t.join()
.. testoutput::
:hide:
...
-----------------------------------------------------------
Running a set of functions periodically (using an executor)
-----------------------------------------------------------
.. testcode::
import futurist
from futurist import periodics
import time
import threading
@periodics.periodic(1)
def every_one(started_at):
print("1: %s" % (time.time() - started_at))
time.sleep(0.5)
@periodics.periodic(2)
def every_two(started_at):
print("2: %s" % (time.time() - started_at))
time.sleep(1)
@periodics.periodic(4)
def every_four(started_at):
print("4: %s" % (time.time() - started_at))
time.sleep(2)
@periodics.periodic(6)
def every_six(started_at):
print("6: %s" % (time.time() - started_at))
time.sleep(3)
started_at = time.time()
callables = [
# The function to run + any automatically provided positional and
# keyword arguments to provide to it everytime it is activated.
(every_one, (started_at,), {}),
(every_two, (started_at,), {}),
(every_four, (started_at,), {}),
(every_six, (started_at,), {}),
]
# To avoid getting blocked up by slow periodic functions we can also
# provide a executor pool to make sure that slow functions only block
# up a thread (or green thread), instead of blocking other periodic
# functions that need to be scheduled to run.
executor_factory = lambda: futurist.ThreadPoolExecutor(max_workers=2)
w = periodics.PeriodicWorker(callables, executor_factory=executor_factory)
# In this example we will run the periodic functions using a thread, it
# is also possible to just call the w.start() method directly if you do
# not mind blocking up the current program.
t = threading.Thread(target=w.start)
t.daemon = True
t.start()
# Run for 10 seconds and then stop.
while (time.time() - started_at) <= 10:
time.sleep(0.1)
w.stop()
w.wait()
t.join()
.. testoutput::
:hide:
...

View File

@ -1,34 +0,0 @@
========
Features
========
Async
-----
* A :py:class:`.futurist.GreenThreadPoolExecutor` using `eventlet`_ green
thread pools. It provides a standard `executor`_ API/interface and it also
gathers execution statistics. It returns instances of
:py:class:`.futurist.GreenFuture` objects.
* A :py:class:`.futurist.ProcessPoolExecutor` derivative that gathers execution
statistics. It returns instances of :py:class:`.futurist.Future` objects.
* A :py:class:`.futurist.SynchronousExecutor` that **doesn't** run
concurrently. It has the same `executor`_ API/interface and it also
gathers execution statistics. It returns instances
of :py:class:`.futurist.Future` objects.
* A :py:class:`.futurist.ThreadPoolExecutor` derivative that gathers
execution statistics. It returns instances
of :py:class:`.futurist.Future` objects.
Periodics
---------
* A :py:class:`.futurist.periodics.PeriodicWorker` that can use the previously
mentioned executors to run asynchronous work periodically in parallel
or synchronously. It does this by executing arbitary functions/methods
that have been decorated with the :py:func:`.futurist.periodics.periodic`
decorator according to a internally maintained schedule (which itself is
based on the `heap`_ algorithm).
.. _heap: https://en.wikipedia.org/wiki/Heap_%28data_structure%29
.. _eventlet: http://eventlet.net/
.. _executor: https://docs.python.org/dev/library/concurrent.futures.html#executor-objects

View File

@ -1,2 +0,0 @@
.. include:: ../../ChangeLog

View File

@ -1,29 +0,0 @@
Welcome to futurist's documentation!
========================================================
Code from the future, delivered to you in the **now**.
.. toctree::
:maxdepth: 2
features
api
installation
examples
contributing
History
=======
.. toctree::
:maxdepth: 2
history
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

View File

@ -1,12 +0,0 @@
============
Installation
============
At the command line::
$ pip install futurist
Or, if you have virtualenvwrapper installed::
$ mkvirtualenv futurist
$ pip install futurist

View File

@ -1,31 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Promote accessible items to this module namespace (for easy access).
from futurist._futures import Future # noqa
from futurist._futures import GreenFuture # noqa
from futurist._futures import CancelledError # noqa
from futurist._futures import GreenThreadPoolExecutor # noqa
from futurist._futures import ProcessPoolExecutor # noqa
from futurist._futures import SynchronousExecutor # noqa
from futurist._futures import ThreadPoolExecutor # noqa
from futurist._futures import RejectedSubmission # noqa
from futurist._futures import ExecutorStatistics # noqa

View File

@ -1,484 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import threading
from concurrent import futures as _futures
from concurrent.futures import process as _process
from concurrent.futures import thread as _thread
import six
from futurist import _green
from futurist import _utils
CancelledError = _futures.CancelledError
class RejectedSubmission(Exception):
"""Exception raised when a submitted call is rejected (for some reason)."""
# NOTE(harlowja): Allows for simpler access to this type...
Future = _futures.Future
class _Threading(object):
@staticmethod
def event_object(*args, **kwargs):
return threading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return threading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return threading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return threading.Condition(*args, **kwargs)
class _Gatherer(object):
def __init__(self, submit_func, lock_factory, start_before_submit=False):
self._submit_func = submit_func
self._stats_lock = lock_factory()
self._stats = ExecutorStatistics()
self._start_before_submit = start_before_submit
@property
def statistics(self):
return self._stats
def clear(self):
with self._stats_lock:
self._stats = ExecutorStatistics()
def _capture_stats(self, started_at, fut):
"""Capture statistics
:param started_at: when the activity the future has performed
was started at
:param fut: future object
"""
# If time somehow goes backwards, make sure we cap it at 0.0 instead
# of having negative elapsed time...
elapsed = max(0.0, _utils.now() - started_at)
with self._stats_lock:
# Use a new collection and lock so that all mutations are seen as
# atomic and not overlapping and corrupting with other
# mutations (the clone ensures that others reading the current
# values will not see a mutated/corrupted one). Since futures may
# be completed by different threads we need to be extra careful to
# gather this data in a way that is thread-safe...
(failures, executed, runtime, cancelled) = (self._stats.failures,
self._stats.executed,
self._stats.runtime,
self._stats.cancelled)
if fut.cancelled():
cancelled += 1
else:
executed += 1
if fut.exception() is not None:
failures += 1
runtime += elapsed
self._stats = ExecutorStatistics(failures=failures,
executed=executed,
runtime=runtime,
cancelled=cancelled)
def submit(self, fn, *args, **kwargs):
"""Submit work to be executed and capture statistics."""
if self._start_before_submit:
started_at = _utils.now()
fut = self._submit_func(fn, *args, **kwargs)
if not self._start_before_submit:
started_at = _utils.now()
fut.add_done_callback(functools.partial(self._capture_stats,
started_at))
return fut
class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
"""Executor that uses a thread pool to execute calls asynchronously.
It gathers statistics about the submissions executed for post-analysis...
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
threading = _Threading()
def __init__(self, max_workers=None, check_and_reject=None):
"""Initializes a thread pool executor.
:param max_workers: maximum number of workers that can be
simulatenously active at the same time, further
submitted work will be queued up when this limit
is reached.
:type max_workers: int
:param check_and_reject: a callback function that will be provided
two position arguments, the first argument
will be this executor instance, and the second
will be the number of currently queued work
items in this executors backlog; the callback
should raise a :py:class:`.RejectedSubmission`
exception if it wants to have this submission
rejected.
:type check_and_reject: callback
"""
if max_workers is None:
max_workers = _utils.get_optimal_thread_count()
super(ThreadPoolExecutor, self).__init__(max_workers=max_workers)
if self._max_workers <= 0:
raise ValueError("Max workers must be greater than zero")
# NOTE(harlowja): this replaces the parent classes non-reentrant lock
# with a reentrant lock so that we can correctly call into the check
# and reject lock, and that it will block correctly if another
# submit call is done during that...
self._shutdown_lock = threading.RLock()
self._check_and_reject = check_and_reject or (lambda e, waiting: None)
self._gatherer = _Gatherer(
# Since our submit will use this gatherer we have to reference
# the parent submit, bound to this instance (which is what we
# really want to use anyway).
super(ThreadPoolExecutor, self).submit,
self.threading.lock_object)
@property
def statistics(self):
""":class:`.ExecutorStatistics` about the executors executions."""
return self._gatherer.statistics
@property
def alive(self):
"""Accessor to determine if the executor is alive/active."""
return not self._shutdown
def submit(self, fn, *args, **kwargs):
"""Submit some work to be executed (and gather statistics)."""
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('Can not schedule new futures'
' after being shutdown')
self._check_and_reject(self, self._work_queue.qsize())
return self._gatherer.submit(fn, *args, **kwargs)
class ProcessPoolExecutor(_process.ProcessPoolExecutor):
"""Executor that uses a process pool to execute calls asynchronously.
It gathers statistics about the submissions executed for post-analysis...
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
threading = _Threading()
def __init__(self, max_workers=None):
if max_workers is None:
max_workers = _utils.get_optimal_thread_count()
super(ProcessPoolExecutor, self).__init__(max_workers=max_workers)
if self._max_workers <= 0:
raise ValueError("Max workers must be greater than zero")
self._gatherer = _Gatherer(
# Since our submit will use this gatherer we have to reference
# the parent submit, bound to this instance (which is what we
# really want to use anyway).
super(ProcessPoolExecutor, self).submit,
self.threading.lock_object)
@property
def alive(self):
"""Accessor to determine if the executor is alive/active."""
return not self._shutdown_thread
@property
def statistics(self):
""":class:`.ExecutorStatistics` about the executors executions."""
return self._gatherer.statistics
def submit(self, fn, *args, **kwargs):
"""Submit some work to be executed (and gather statistics)."""
return self._gatherer.submit(fn, *args, **kwargs)
class SynchronousExecutor(_futures.Executor):
"""Executor that uses the caller to execute calls synchronously.
This provides an interface to a caller that looks like an executor but
will execute the calls inside the caller thread instead of executing it
in a external process/thread for when this type of functionality is
useful to provide...
It gathers statistics about the submissions executed for post-analysis...
"""
threading = _Threading()
def __init__(self, green=False, run_work_func=lambda work: work.run()):
"""Synchronous executor constructor.
:param green: when enabled this forces the usage of greened lock
classes and green futures (so that the internals of this
object operate correctly under eventlet)
:type green: bool
:param run_work_func: callable that takes a single work item and
runs it (typically in a blocking manner)
:param run_work_func: callable
"""
if green and not _utils.EVENTLET_AVAILABLE:
raise RuntimeError('Eventlet is needed to use a green'
' synchronous executor')
if not six.callable(run_work_func):
raise ValueError("Run work parameter expected to be callable")
self._run_work_func = run_work_func
self._shutoff = False
if green:
self.threading = _green.threading
self._future_cls = GreenFuture
else:
self._future_cls = Future
self._run_work_func = run_work_func
self._gatherer = _Gatherer(self._submit,
self.threading.lock_object,
start_before_submit=True)
@property
def alive(self):
"""Accessor to determine if the executor is alive/active."""
return not self._shutoff
def shutdown(self, wait=True):
self._shutoff = True
def restart(self):
"""Restarts this executor (*iff* previously shutoff/shutdown).
NOTE(harlowja): clears any previously gathered statistics.
"""
if self._shutoff:
self._shutoff = False
self._gatherer.clear()
@property
def statistics(self):
""":class:`.ExecutorStatistics` about the executors executions."""
return self._gatherer.statistics
def submit(self, fn, *args, **kwargs):
"""Submit some work to be executed (and gather statistics)."""
if self._shutoff:
raise RuntimeError('Can not schedule new futures'
' after being shutdown')
return self._gatherer.submit(fn, *args, **kwargs)
def _submit(self, fn, *args, **kwargs):
fut = self._future_cls()
self._run_work_func(_utils.WorkItem(fut, fn, args, kwargs))
return fut
class GreenFuture(Future):
__doc__ = Future.__doc__
def __init__(self):
super(GreenFuture, self).__init__()
if not _utils.EVENTLET_AVAILABLE:
raise RuntimeError('Eventlet is needed to use a green future')
# NOTE(harlowja): replace the built-in condition with a greenthread
# compatible one so that when getting the result of this future the
# functions will correctly yield to eventlet. If this is not done then
# waiting on the future never actually causes the greenthreads to run
# and thus you wait for infinity.
if not _green.is_monkey_patched('threading'):
self._condition = _green.threading.condition_object()
class GreenThreadPoolExecutor(_futures.Executor):
"""Executor that uses a green thread pool to execute calls asynchronously.
See: https://docs.python.org/dev/library/concurrent.futures.html
and http://eventlet.net/doc/modules/greenpool.html for information on
how this works.
It gathers statistics about the submissions executed for post-analysis...
"""
threading = _green.threading
def __init__(self, max_workers=1000, check_and_reject=None):
"""Initializes a green thread pool executor.
:param max_workers: maximum number of workers that can be
simulatenously active at the same time, further
submitted work will be queued up when this limit
is reached.
:type max_workers: int
:param check_and_reject: a callback function that will be provided
two position arguments, the first argument
will be this executor instance, and the second
will be the number of currently queued work
items in this executors backlog; the callback
should raise a :py:class:`.RejectedSubmission`
exception if it wants to have this submission
rejected.
:type check_and_reject: callback
"""
if not _utils.EVENTLET_AVAILABLE:
raise RuntimeError('Eventlet is needed to use a green executor')
if max_workers <= 0:
raise ValueError("Max workers must be greater than zero")
self._max_workers = max_workers
self._pool = _green.Pool(self._max_workers)
self._delayed_work = _green.Queue()
self._check_and_reject = check_and_reject or (lambda e, waiting: None)
self._shutdown_lock = self.threading.lock_object()
self._shutdown = False
self._gatherer = _Gatherer(self._submit,
self.threading.lock_object)
@property
def alive(self):
"""Accessor to determine if the executor is alive/active."""
return not self._shutdown
@property
def statistics(self):
""":class:`.ExecutorStatistics` about the executors executions."""
return self._gatherer.statistics
def submit(self, fn, *args, **kwargs):
"""Submit some work to be executed (and gather statistics).
:param args: non-keyworded arguments
:type args: list
:param kwargs: key-value arguments
:type kwargs: dictionary
"""
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('Can not schedule new futures'
' after being shutdown')
self._check_and_reject(self, self._delayed_work.qsize())
return self._gatherer.submit(fn, *args, **kwargs)
def _submit(self, fn, *args, **kwargs):
f = GreenFuture()
work = _utils.WorkItem(f, fn, args, kwargs)
if not self._spin_up(work):
self._delayed_work.put(work)
return f
def _spin_up(self, work):
"""Spin up a greenworker if less than max_workers.
:param work: work to be given to the greenworker
:returns: whether a green worker was spun up or not
:rtype: boolean
"""
alive = self._pool.running() + self._pool.waiting()
if alive < self._max_workers:
self._pool.spawn_n(_green.GreenWorker(work, self._delayed_work))
return True
return False
def shutdown(self, wait=True):
with self._shutdown_lock:
if not self._shutdown:
self._shutdown = True
shutoff = True
else:
shutoff = False
if wait and shutoff:
self._pool.waitall()
self._delayed_work.join()
class ExecutorStatistics(object):
"""Holds *immutable* information about a executors executions."""
__slots__ = ['_failures', '_executed', '_runtime', '_cancelled']
_REPR_MSG_TPL = ("<ExecutorStatistics object at 0x%(ident)x"
" (failures=%(failures)s,"
" executed=%(executed)s, runtime=%(runtime)0.2f,"
" cancelled=%(cancelled)s)>")
def __init__(self, failures=0, executed=0, runtime=0.0, cancelled=0):
self._failures = failures
self._executed = executed
self._runtime = runtime
self._cancelled = cancelled
@property
def failures(self):
"""How many submissions ended up raising exceptions.
:returns: how many submissions ended up raising exceptions
:rtype: number
"""
return self._failures
@property
def executed(self):
"""How many submissions were executed (failed or not).
:returns: how many submissions were executed
:rtype: number
"""
return self._executed
@property
def runtime(self):
"""Total runtime of all submissions executed (failed or not).
:returns: total runtime of all submissions executed
:rtype: number
"""
return self._runtime
@property
def cancelled(self):
"""How many submissions were cancelled before executing.
:returns: how many submissions were cancelled before executing
:rtype: number
"""
return self._cancelled
@property
def average_runtime(self):
"""The average runtime of all submissions executed.
:returns: average runtime of all submissions executed
:rtype: number
:raises: ZeroDivisionError when no executions have occurred.
"""
return self._runtime / self._executed
def __repr__(self):
return self._REPR_MSG_TPL % ({
'ident': id(self),
'failures': self._failures,
'executed': self._executed,
'runtime': self._runtime,
'cancelled': self._cancelled,
})

View File

@ -1,84 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from futurist import _utils
try:
from eventlet import greenpool
from eventlet import patcher as greenpatcher
from eventlet import queue as greenqueue
from eventlet.green import threading as greenthreading
except ImportError:
greenpatcher, greenpool, greenqueue, greenthreading = (None, None,
None, None)
if _utils.EVENTLET_AVAILABLE:
# Aliases that we use and only expose vs the whole of eventlet...
Pool = greenpool.GreenPool
Queue = greenqueue.Queue
is_monkey_patched = greenpatcher.is_monkey_patched
class GreenThreading(object):
@staticmethod
def event_object(*args, **kwargs):
return greenthreading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return greenthreading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return greenthreading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return greenthreading.Condition(*args, **kwargs)
threading = GreenThreading()
else:
threading = None
Pool = None
Queue = None
is_monkey_patched = lambda mod: False
class GreenWorker(object):
def __init__(self, work, work_queue):
self.work = work
self.work_queue = work_queue
def __call__(self):
# Run our main piece of work.
try:
self.work.run()
finally:
# Consume any delayed work before finishing (this is how we finish
# work that was to big for the pool size, but needs to be finished
# no matter).
while True:
try:
w = self.work_queue.get_nowait()
except greenqueue.Empty:
break
else:
try:
w.run()
finally:
self.work_queue.task_done()

View File

@ -1,142 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
import multiprocessing
import sys
import threading
import traceback
from monotonic import monotonic as now # noqa
import six
try:
import eventlet as _eventlet # noqa
EVENTLET_AVAILABLE = True
except ImportError:
EVENTLET_AVAILABLE = False
class WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException:
exc_type, exc_value, exc_tb = sys.exc_info()
try:
if six.PY2:
self.future.set_exception_info(exc_value, exc_tb)
else:
self.future.set_exception(exc_value)
finally:
del(exc_type, exc_value, exc_tb)
else:
self.future.set_result(result)
class Failure(object):
"""Object that captures a exception (and its associated information)."""
def __init__(self, retain_tb):
exc_info = sys.exc_info()
if not any(exc_info):
raise RuntimeError("No active exception being handled, can"
" not create a failure which represents"
" nothing")
try:
if retain_tb:
self.exc_info = tuple(exc_info)
self.traceback = None
else:
self.exc_info = (exc_info[0], exc_info[1], None)
self.traceback = "".join(traceback.format_exception(*exc_info))
finally:
del exc_info
def get_callback_name(cb):
"""Tries to get a callbacks fully-qualified name.
If no name can be produced ``repr(cb)`` is called and returned.
"""
segments = []
try:
segments.append(cb.__qualname__)
except AttributeError:
try:
segments.append(cb.__name__)
if inspect.ismethod(cb):
try:
# This attribute doesn't exist on py3.x or newer, so
# we optionally ignore it... (on those versions of
# python `__qualname__` should have been found anyway).
segments.insert(0, cb.im_class.__name__)
except AttributeError:
pass
except AttributeError:
pass
if not segments:
return repr(cb)
else:
try:
# When running under sphinx it appears this can be none?
if cb.__module__:
segments.insert(0, cb.__module__)
except AttributeError:
pass
return ".".join(segments)
def get_optimal_thread_count(default=2):
"""Try to guess optimal thread count for current system."""
try:
return multiprocessing.cpu_count() + 1
except NotImplementedError:
# NOTE(harlowja): apparently may raise so in this case we will
# just setup two threads since it's hard to know what else we
# should do in this situation.
return default
class Barrier(object):
"""A class that ensures active <= 0 occur before unblocking."""
def __init__(self, cond_cls=threading.Condition):
self._active = 0
self._cond = cond_cls()
def incr(self):
with self._cond:
self._active += 1
self._cond.notify_all()
def decr(self):
with self._cond:
self._active -= 1
self._cond.notify_all()
def wait(self):
with self._cond:
while self._active > 0:
self._cond.wait()

View File

@ -1,827 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import fractions
import functools
import heapq
import inspect
import logging
import math
import threading
# For: https://wiki.openstack.org/wiki/Security/Projects/Bandit
from random import SystemRandom as random
from concurrent import futures
import six
import futurist
from futurist import _utils as utils
LOG = logging.getLogger(__name__)
_REQUIRED_ATTRS = ('_is_periodic', '_periodic_spacing',
'_periodic_run_immediately')
# Constants that are used to determine what 'kind' the current callback
# is being ran as.
PERIODIC = 'periodic'
IMMEDIATE = 'immediate'
class Watcher(object):
"""A **read-only** object representing a periodics callbacks activities."""
_REPR_MSG_TPL = ("<Watcher object at 0x%(ident)x "
"("
"runs=%(runs)s,"
" successes=%(successes)s,"
" failures=%(failures)s,"
" elapsed=%(elapsed)0.2f,"
" elapsed_waiting=%(elapsed_waiting)0.2f"
")>")
def __init__(self, metrics):
self._metrics = metrics
def __repr__(self):
return self._REPR_MSG_TPL % dict(ident=id(self), **self._metrics)
@property
def runs(self):
"""How many times the periodic callback has been ran."""
return self._metrics['runs']
@property
def successes(self):
"""How many times the periodic callback ran successfully."""
return self._metrics['successes']
@property
def failures(self):
"""How many times the periodic callback ran unsuccessfully."""
return self._metrics['failures']
@property
def elapsed(self):
"""Total amount of time the periodic callback has ran for."""
return self._metrics['elapsed']
@property
def elapsed_waiting(self):
"""Total amount of time the periodic callback has waited to run for."""
return self._metrics['elapsed_waiting']
@property
def average_elapsed_waiting(self):
"""Avg. amount of time the periodic callback has waited to run for.
This may raise a ``ZeroDivisionError`` if there has been no runs.
"""
return self._metrics['elapsed_waiting'] / self._metrics['runs']
@property
def average_elapsed(self):
"""Avg. amount of time the periodic callback has ran for.
This may raise a ``ZeroDivisionError`` if there has been no runs.
"""
return self._metrics['elapsed'] / self._metrics['runs']
def _check_attrs(obj):
"""Checks that a periodic function/method has all the expected attributes.
This will return the expected attributes that were **not** found.
"""
missing_attrs = []
for attr_name in _REQUIRED_ATTRS:
if not hasattr(obj, attr_name):
missing_attrs.append(attr_name)
return missing_attrs
def is_periodic(obj):
"""Check whether an object is a valid periodic callable.
:param obj: object to inspect
:type obj: anything
:return: True if obj is a periodic task, otherwise False
"""
return callable(obj) and not _check_attrs(obj)
def periodic(spacing, run_immediately=False, enabled=True):
"""Tags a method/function as wanting/able to execute periodically.
:param spacing: how often to run the decorated function (required)
:type spacing: float/int
:param run_immediately: option to specify whether to run
immediately or wait until the spacing provided has
elapsed before running for the first time
:type run_immediately: boolean
:param enabled: whether the task is enabled to run
:type enabled: boolean
"""
if spacing <= 0:
raise ValueError("Periodicity/spacing must be greater than"
" zero instead of %s" % spacing)
def wrapper(f):
f._is_periodic = enabled
f._periodic_spacing = spacing
f._periodic_run_immediately = run_immediately
@six.wraps(f)
def decorator(*args, **kwargs):
return f(*args, **kwargs)
return decorator
return wrapper
def _add_jitter(max_percent_jitter):
"""Wraps a existing strategy and adds jitter to it.
0% to 100% of the spacing value will be added to this value to ensure
callbacks do not synchronize.
"""
if max_percent_jitter > 1 or max_percent_jitter < 0:
raise ValueError("Invalid 'max_percent_jitter', must be greater or"
" equal to 0.0 and less than or equal to 1.0")
def wrapper(func):
@six.wraps(func)
def decorator(cb, metrics, now=None):
next_run = func(cb, metrics, now=now)
how_often = cb._periodic_spacing
jitter = how_often * (random.random() * max_percent_jitter)
return next_run + jitter
decorator.__name__ += "_with_jitter"
return decorator
return wrapper
def _last_finished_strategy(cb, started_at, finished_at, metrics):
# Determine when the callback should next run based on when it was
# last finished **only** given metrics about this information.
how_often = cb._periodic_spacing
return finished_at + how_often
def _last_started_strategy(cb, started_at, finished_at, metrics):
# Determine when the callback should next run based on when it was
# last started **only** given metrics about this information.
how_often = cb._periodic_spacing
return started_at + how_often
def _aligned_last_finished_strategy(cb, started_at, finished_at, metrics):
# Determine when the callback should next run based on when it was
# last finished **only** where the last finished time is first aligned to
# be a multiple of the expected spacing (so that no matter how long or
# how short the callback takes it is always ran on its next aligned
# to spacing time).
how_often = cb._periodic_spacing
aligned_finished_at = finished_at - math.fmod(finished_at, how_often)
return aligned_finished_at + how_often
def _now_plus_periodicity(cb, now):
how_often = cb._periodic_spacing
return how_often + now
class _Schedule(object):
"""Internal heap-based structure that maintains the schedule/ordering.
This stores a heap composed of the following ``(next_run, index)`` where
``next_run`` is the next desired runtime for the callback that is stored
somewhere with the index provided. The index is saved so that if two
functions with the same ``next_run`` time are inserted, that the one with
the smaller index is preferred (it is also saved so that on pop we can
know what the index of the callback we should call is).
"""
def __init__(self):
self._ordering = []
def push(self, next_run, index):
heapq.heappush(self._ordering, (next_run, index))
def __len__(self):
return len(self._ordering)
def pop(self):
return heapq.heappop(self._ordering)
def _on_failure_log(log, cb, kind, spacing, exc_info, traceback=None):
cb_name = utils.get_callback_name(cb)
if all(exc_info) or not traceback:
log.error("Failed to call %s '%s' (it runs every %0.2f"
" seconds)", kind, cb_name, spacing, exc_info=exc_info)
else:
log.error("Failed to call %s '%s' (it runs every %0.2f"
" seconds):\n%s", kind, cb_name, spacing, traceback)
def _run_callback_retain(now_func, cb, *args, **kwargs):
# NOTE(harlowja): this needs to be a module level function so that the
# process pool execution can locate it (it can't be a lambda or method
# local function because it won't be able to find those).
failure = None
started_at = now_func()
try:
cb(*args, **kwargs)
except Exception:
# Until https://bugs.python.org/issue24451 is merged we have to
# capture and return the failure, so that we can have reliable
# timing information.
failure = utils.Failure(True)
finished_at = now_func()
return (started_at, finished_at, failure)
def _run_callback_no_retain(now_func, cb, *args, **kwargs):
# NOTE(harlowja): this needs to be a module level function so that the
# process pool execution can locate it (it can't be a lambda or method
# local function because it won't be able to find those).
failure = None
started_at = now_func()
try:
cb(*args, **kwargs)
except Exception:
# Until https://bugs.python.org/issue24451 is merged we have to
# capture and return the failure, so that we can have reliable
# timing information.
failure = utils.Failure(False)
finished_at = now_func()
return (started_at, finished_at, failure)
def _build(now_func, callables, next_run_scheduler):
schedule = _Schedule()
now = None
immediates = collections.deque()
for index, (cb, _cb_name, args, kwargs) in enumerate(callables):
if cb._periodic_run_immediately:
immediates.append(index)
else:
if now is None:
now = now_func()
next_run = next_run_scheduler(cb, now)
schedule.push(next_run, index)
return immediates, schedule
_SCHEDULE_RETRY_EXCEPTIONS = (RuntimeError, futurist.RejectedSubmission)
class ExecutorFactory(object):
"""Base class for any executor factory."""
shutdown = True
"""Whether the executor should be shut down on periodic worker stop."""
def __call__(self):
"""Return the executor to be used."""
raise NotImplementedError()
class ExistingExecutor(ExecutorFactory):
"""An executor factory returning the existing object."""
def __init__(self, executor, shutdown=False):
self._executor = executor
self.shutdown = shutdown
def __call__(self):
return self._executor
class PeriodicWorker(object):
"""Calls a collection of callables periodically (sleeping as needed...).
NOTE(harlowja): typically the :py:meth:`.start` method is executed in a
background thread so that the periodic callables are executed in
the background/asynchronously (using the defined periods to determine
when each is called).
"""
#: Max amount of time to wait when running (forces a wakeup when elapsed).
MAX_LOOP_IDLE = 30
_NO_OP_ARGS = ()
_NO_OP_KWARGS = {}
_INITIAL_METRICS = {
'runs': 0,
'elapsed': 0,
'elapsed_waiting': 0,
'failures': 0,
'successes': 0,
}
# When scheduling fails temporary, use a random delay between 0.9-1.1 sec.
_RESCHEDULE_DELAY = 0.9
_RESCHEDULE_JITTER = 0.2
DEFAULT_JITTER = fractions.Fraction(5, 100)
"""
Default jitter percentage the built-in strategies (that have jitter
support) will use.
"""
BUILT_IN_STRATEGIES = {
'last_started': (
_last_started_strategy,
_now_plus_periodicity,
),
'last_started_jitter': (
_add_jitter(DEFAULT_JITTER)(_last_started_strategy),
_now_plus_periodicity,
),
'last_finished': (
_last_finished_strategy,
_now_plus_periodicity,
),
'last_finished_jitter': (
_add_jitter(DEFAULT_JITTER)(_last_finished_strategy),
_now_plus_periodicity,
),
'aligned_last_finished': (
_aligned_last_finished_strategy,
_now_plus_periodicity,
),
'aligned_last_finished_jitter': (
_add_jitter(DEFAULT_JITTER)(_aligned_last_finished_strategy),
_now_plus_periodicity,
),
}
"""
Built in scheduling strategies (used to determine when next to run
a periodic callable).
The first element is the strategy to use after the initial start
and the second element is the strategy to use for the initial start.
These are made somewhat pluggable so that we can *easily* add-on
different types later (perhaps one that uses a cron-style syntax
for example).
"""
@classmethod
def create(cls, objects, exclude_hidden=True,
log=None, executor_factory=None,
cond_cls=threading.Condition, event_cls=threading.Event,
schedule_strategy='last_started', now_func=utils.now,
on_failure=None, args=_NO_OP_ARGS, kwargs=_NO_OP_KWARGS):
"""Automatically creates a worker by analyzing object(s) methods.
Only picks up methods that have been tagged/decorated with
the :py:func:`.periodic` decorator (does not match against private
or protected methods unless explicitly requested to).
:param objects: the objects to introspect for decorated members
:type objects: iterable
:param exclude_hidden: exclude hidden members (ones that start with
an underscore)
:type exclude_hidden: bool
:param log: logger to use when creating a new worker (defaults
to the module logger if none provided), it is currently
only used to report callback failures (if they occur)
:type log: logger
:param executor_factory: factory callable that can be used to generate
executor objects that will be used to
run the periodic callables (if none is
provided one will be created that uses
the :py:class:`~futurist.SynchronousExecutor`
class)
:type executor_factory: ExecutorFactory or any callable
:param cond_cls: callable object that can
produce ``threading.Condition``
(or compatible/equivalent) objects
:type cond_cls: callable
:param event_cls: callable object that can produce ``threading.Event``
(or compatible/equivalent) objects
:type event_cls: callable
:param schedule_strategy: string to select one of the built-in
strategies that can return the
next time a callable should run
:type schedule_strategy: string
:param now_func: callable that can return the current time offset
from some point (used in calculating elapsed times
and next times to run); preferably this is
monotonically increasing
:type now_func: callable
:param on_failure: callable that will be called whenever a periodic
function fails with an error, it will be provided
four positional arguments and one keyword
argument, the first positional argument being the
callable that failed, the second being the type
of activity under which it failed (IMMEDIATE or
PERIODIC), the third being the spacing that the
callable runs at and the fourth `exc_info` tuple
of the failure. The keyword argument 'traceback'
will also be provided that may be be a string
that caused the failure (this is required for
executors which run out of process, as those can not
transfer stack frames across process boundaries); if
no callable is provided then a default failure
logging function will be used instead, do note that
any user provided callable should not raise
exceptions on being called
:type on_failure: callable
:param args: positional arguments to be passed to all callables
:type args: tuple
:param kwargs: keyword arguments to be passed to all callables
:type kwargs: dict
"""
callables = []
for obj in objects:
for (name, member) in inspect.getmembers(obj):
if name.startswith("_") and exclude_hidden:
continue
if six.callable(member):
missing_attrs = _check_attrs(member)
if not missing_attrs:
callables.append((member, args, kwargs))
return cls(callables, log=log, executor_factory=executor_factory,
cond_cls=cond_cls, event_cls=event_cls,
schedule_strategy=schedule_strategy, now_func=now_func,
on_failure=on_failure)
def __init__(self, callables, log=None, executor_factory=None,
cond_cls=threading.Condition, event_cls=threading.Event,
schedule_strategy='last_started', now_func=utils.now,
on_failure=None):
"""Creates a new worker using the given periodic callables.
:param callables: a iterable of tuple objects previously decorated
with the :py:func:`.periodic` decorator, each item
in the iterable is expected to be in the format
of ``(cb, args, kwargs)`` where ``cb`` is the
decorated function and ``args`` and ``kwargs`` are
any positional and keyword arguments to send into
the callback when it is activated (both ``args``
and ``kwargs`` may be provided as none to avoid
using them)
:type callables: iterable
:param log: logger to use when creating a new worker (defaults
to the module logger if none provided), it is currently
only used to report callback failures (if they occur)
:type log: logger
:param executor_factory: factory callable that can be used to generate
executor objects that will be used to
run the periodic callables (if none is
provided one will be created that uses
the :py:class:`~futurist.SynchronousExecutor`
class)
:type executor_factory: ExecutorFactory or any callable
:param cond_cls: callable object that can
produce ``threading.Condition``
(or compatible/equivalent) objects
:type cond_cls: callable
:param event_cls: callable object that can produce ``threading.Event``
(or compatible/equivalent) objects
:type event_cls: callable
:param schedule_strategy: string to select one of the built-in
strategies that can return the
next time a callable should run
:type schedule_strategy: string
:param now_func: callable that can return the current time offset
from some point (used in calculating elapsed times
and next times to run); preferably this is
monotonically increasing
:type now_func: callable
:param on_failure: callable that will be called whenever a periodic
function fails with an error, it will be provided
four positional arguments and one keyword
argument, the first positional argument being the
callable that failed, the second being the type
of activity under which it failed (IMMEDIATE or
PERIODIC), the third being the spacing that the
callable runs at and the fourth `exc_info` tuple
of the failure. The keyword argument 'traceback'
will also be provided that may be be a string
that caused the failure (this is required for
executors which run out of process, as those can not
transfer stack frames across process boundaries); if
no callable is provided then a default failure
logging function will be used instead, do note that
any user provided callable should not raise
exceptions on being called
:type on_failure: callable
"""
self._tombstone = event_cls()
self._waiter = cond_cls()
self._dead = event_cls()
self._active = event_cls()
self._cond_cls = cond_cls
self._watchers = []
self._callables = []
for (cb, args, kwargs) in callables:
if not six.callable(cb):
raise ValueError("Periodic callback %r must be callable" % cb)
missing_attrs = _check_attrs(cb)
if missing_attrs:
raise ValueError("Periodic callback %r missing required"
" attributes %s" % (cb, missing_attrs))
if cb._is_periodic:
# Ensure these aren't none and if so replace them with
# something more appropriate...
if args is None:
args = self._NO_OP_ARGS
if kwargs is None:
kwargs = self._NO_OP_KWARGS
cb_name = utils.get_callback_name(cb)
cb_metrics = self._INITIAL_METRICS.copy()
watcher = Watcher(cb_metrics)
self._callables.append((cb, cb_name, args, kwargs))
self._watchers.append((cb_metrics, watcher))
try:
strategy = self.BUILT_IN_STRATEGIES[schedule_strategy]
self._schedule_strategy = strategy[0]
self._initial_schedule_strategy = strategy[1]
except KeyError:
valid_strategies = sorted(self.BUILT_IN_STRATEGIES.keys())
raise ValueError("Scheduling strategy '%s' must be one of"
" %s selectable strategies"
% (schedule_strategy, valid_strategies))
self._immediates, self._schedule = _build(
now_func, self._callables, self._initial_schedule_strategy)
self._log = log or LOG
if executor_factory is None:
executor_factory = lambda: futurist.SynchronousExecutor()
self._on_failure = functools.partial(_on_failure_log, self._log)
self._executor_factory = executor_factory
self._now_func = now_func
def __len__(self):
"""How many callables are currently active."""
return len(self._callables)
def _run(self, executor, runner):
"""Main worker run loop."""
barrier = utils.Barrier(cond_cls=self._cond_cls)
def _process_scheduled():
# Figure out when we should run next (by selecting the
# minimum item from the heap, where the minimum should be
# the callable that needs to run next and has the lowest
# next desired run time).
with self._waiter:
while (not self._schedule and
not self._tombstone.is_set() and
not self._immediates):
self._waiter.wait(self.MAX_LOOP_IDLE)
if self._tombstone.is_set():
# We were requested to stop, so stop.
return
if self._immediates:
# This will get processed in _process_immediates()
# in the next loop call.
return
submitted_at = now = self._now_func()
next_run, index = self._schedule.pop()
when_next = next_run - now
if when_next <= 0:
# Run & schedule its next execution.
cb, cb_name, args, kwargs = self._callables[index]
self._log.debug("Submitting periodic function '%s'",
cb_name)
try:
fut = executor.submit(runner,
self._now_func,
cb, *args, **kwargs)
except _SCHEDULE_RETRY_EXCEPTIONS as exc:
# Restart after a short delay
delay = (self._RESCHEDULE_DELAY +
random().random() * self._RESCHEDULE_JITTER)
self._log.error("Failed to submit periodic function "
"'%s', retrying after %.2f sec. "
"Error: %s",
cb_name, delay, exc)
self._schedule.push(self._now_func() + delay,
index)
else:
barrier.incr()
fut.add_done_callback(functools.partial(_on_done,
PERIODIC,
cb, cb_name,
index,
submitted_at))
fut.add_done_callback(lambda _fut: barrier.decr())
else:
# Gotta wait...
self._schedule.push(next_run, index)
when_next = min(when_next, self.MAX_LOOP_IDLE)
self._waiter.wait(when_next)
def _process_immediates():
try:
index = self._immediates.popleft()
except IndexError:
pass
else:
cb, cb_name, args, kwargs = self._callables[index]
submitted_at = self._now_func()
self._log.debug("Submitting immediate function '%s'", cb_name)
try:
fut = executor.submit(runner, self._now_func,
cb, *args, **kwargs)
except _SCHEDULE_RETRY_EXCEPTIONS as exc:
self._log.error("Failed to submit immediate function "
"'%s', retrying. Error: %s", cb_name, exc)
# Restart as soon as possible
self._immediates.append(index)
else:
barrier.incr()
fut.add_done_callback(functools.partial(_on_done,
IMMEDIATE,
cb, cb_name,
index,
submitted_at))
fut.add_done_callback(lambda _fut: barrier.decr())
def _on_done(kind, cb, cb_name, index, submitted_at, fut):
started_at, finished_at, failure = fut.result()
cb_metrics, _watcher = self._watchers[index]
cb_metrics['runs'] += 1
if failure is not None:
cb_metrics['failures'] += 1
self._on_failure(cb, kind, cb._periodic_spacing,
failure.exc_info, traceback=failure.traceback)
else:
cb_metrics['successes'] += 1
elapsed = max(0, finished_at - started_at)
elapsed_waiting = max(0, started_at - submitted_at)
cb_metrics['elapsed'] += elapsed
cb_metrics['elapsed_waiting'] += elapsed_waiting
next_run = self._schedule_strategy(cb,
started_at, finished_at,
cb_metrics)
with self._waiter:
self._schedule.push(next_run, index)
self._waiter.notify_all()
try:
while not self._tombstone.is_set():
_process_immediates()
_process_scheduled()
finally:
barrier.wait()
def _on_finish(self):
# TODO(harlowja): this may be to verbose for people?
if not self._log.isEnabledFor(logging.DEBUG):
return
watcher_it = self.iter_watchers()
for index, watcher in enumerate(watcher_it):
cb, cb_name, _args, _kwargs = self._callables[index]
self._log.debug("Stopped running callback[%s] '%s' periodically:",
index, cb_name)
self._log.debug(" Periodicity = %ss", cb._periodic_spacing)
self._log.debug(" Runs = %s", watcher.runs)
self._log.debug(" Failures = %s", watcher.failures)
self._log.debug(" Successes = %s", watcher.successes)
try:
self._log.debug(" Average elapsed = %0.4fs",
watcher.average_elapsed)
self._log.debug(" Average elapsed waiting = %0.4fs",
watcher.average_elapsed_waiting)
except ZeroDivisionError:
pass
def add(self, cb, *args, **kwargs):
"""Adds a new periodic callback to the current worker.
Returns a :py:class:`.Watcher` if added successfully or the value
``None`` if not (or raises a ``ValueError`` if the callback is not
correctly formed and/or decorated).
:param cb: a callable object/method/function previously decorated
with the :py:func:`.periodic` decorator
:type cb: callable
"""
if not six.callable(cb):
raise ValueError("Periodic callback %r must be callable" % cb)
missing_attrs = _check_attrs(cb)
if missing_attrs:
raise ValueError("Periodic callback %r missing required"
" attributes %s" % (cb, missing_attrs))
if not cb._is_periodic:
return None
now = self._now_func()
with self._waiter:
cb_index = len(self._callables)
cb_name = utils.get_callback_name(cb)
cb_metrics = self._INITIAL_METRICS.copy()
watcher = Watcher(cb_metrics)
self._callables.append((cb, cb_name, args, kwargs))
self._watchers.append((cb_metrics, watcher))
if cb._periodic_run_immediately:
self._immediates.append(cb_index)
else:
next_run = self._initial_schedule_strategy(cb, now)
self._schedule.push(next_run, cb_index)
self._waiter.notify_all()
return watcher
def start(self, allow_empty=False):
"""Starts running (will not return until :py:meth:`.stop` is called).
:param allow_empty: instead of running with no callbacks raise when
this worker has no contained callables (this can be
set to true and :py:meth:`.add` can be used to add
new callables on demand), note that when enabled
and no callbacks exist this will block and
sleep (until either stopped or callbacks are
added)
:type allow_empty: bool
"""
if not self._callables and not allow_empty:
raise RuntimeError("A periodic worker can not start"
" without any callables")
if self._active.is_set():
raise RuntimeError("A periodic worker can not be started"
" twice")
executor = self._executor_factory()
# NOTE(harlowja): we compare with the futures process pool executor
# since its the base type of futurist ProcessPoolExecutor and it is
# possible for users to pass in there own custom executors, this one
# is known to not be able to retain tracebacks...
if isinstance(executor, futures.ProcessPoolExecutor):
# Pickling a traceback will not work, so do not try to do it...
#
# Avoids 'TypeError: can't pickle traceback objects'
runner = _run_callback_no_retain
else:
runner = _run_callback_retain
self._dead.clear()
self._active.set()
try:
self._run(executor, runner)
finally:
if getattr(self._executor_factory, 'shutdown', True):
executor.shutdown()
self._dead.set()
self._active.clear()
self._on_finish()
def stop(self):
"""Sets the tombstone (this stops any further executions)."""
with self._waiter:
self._tombstone.set()
self._waiter.notify_all()
def iter_watchers(self):
"""Iterator/generator over all the currently maintained watchers."""
for _cb_metrics, watcher in self._watchers:
yield watcher
def reset(self):
"""Resets the workers internal state."""
self._tombstone.clear()
self._dead.clear()
for cb_metrics, _watcher in self._watchers:
for k in list(six.iterkeys(cb_metrics)):
# NOTE(harlowja): mutate the original dictionaries keys
# so that the watcher (which references the same dictionary
# keys) is able to see those changes.
cb_metrics[k] = 0
self._immediates, self._schedule = _build(
self._now_func, self._callables, self._initial_schedule_strategy)
def wait(self, timeout=None):
"""Waits for the :py:meth:`.start` method to gracefully exit.
An optional timeout can be provided, which will cause the method to
return within the specified timeout. If the timeout is reached, the
returned value will be False.
:param timeout: Maximum number of seconds that the :meth:`.wait`
method should block for
:type timeout: float/int
"""
self._dead.wait(timeout)
return self._dead.is_set()

View File

@ -1,32 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Executor rejection strategies."""
import futurist
def reject_when_reached(max_backlog):
"""Returns a function that will raise when backlog goes past max size."""
def _rejector(executor, backlog):
if backlog >= max_backlog:
raise futurist.RejectedSubmission("Current backlog %s is not"
" allowed to go"
" beyond %s" % (backlog,
max_backlog))
return _rejector

View File

@ -1,22 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslotest import base
class TestCase(base.BaseTestCase):
"""Test case base class for all unit tests."""

View File

@ -1,168 +0,0 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import testscenarios
from testtools import testcase
import futurist
from futurist import rejection
from futurist.tests import base
# Module level functions need to be used since the process pool
# executor can not access instance or lambda level functions (since those
# are not pickleable).
def returns_one():
return 1
def blows_up():
raise RuntimeError("no worky")
def delayed(wait_secs):
time.sleep(wait_secs)
class TestExecutors(testscenarios.TestWithScenarios, base.TestCase):
scenarios = [
('sync', {'executor_cls': futurist.SynchronousExecutor,
'restartable': True, 'executor_kwargs': {}}),
('green_sync', {'executor_cls': futurist.SynchronousExecutor,
'restartable': True,
'executor_kwargs': {'green': True}}),
('green', {'executor_cls': futurist.GreenThreadPoolExecutor,
'restartable': False, 'executor_kwargs': {}}),
('thread', {'executor_cls': futurist.ThreadPoolExecutor,
'restartable': False, 'executor_kwargs': {}}),
('process', {'executor_cls': futurist.ProcessPoolExecutor,
'restartable': False, 'executor_kwargs': {}}),
]
def setUp(self):
super(TestExecutors, self).setUp()
self.executor = self.executor_cls(**self.executor_kwargs)
def tearDown(self):
super(TestExecutors, self).tearDown()
self.executor.shutdown()
self.executor = None
def test_run_one(self):
fut = self.executor.submit(returns_one)
self.assertEqual(1, fut.result())
self.assertTrue(fut.done())
def test_blows_up(self):
fut = self.executor.submit(blows_up)
self.assertRaises(RuntimeError, fut.result)
self.assertIsInstance(fut.exception(), RuntimeError)
def test_gather_stats(self):
self.executor.submit(blows_up)
self.executor.submit(delayed, 0.2)
self.executor.submit(returns_one)
self.executor.shutdown()
self.assertEqual(3, self.executor.statistics.executed)
self.assertEqual(1, self.executor.statistics.failures)
self.assertGreaterEqual(self.executor.statistics.runtime,
# It appears that the the thread run loop
# may call this before 0.2 seconds (or 0.2
# will not be represented as a float correctly)
# is really up so accommodate for that
# happening...
0.199)
def test_post_shutdown_raises(self):
executor = self.executor_cls(**self.executor_kwargs)
executor.shutdown()
self.assertRaises(RuntimeError, executor.submit, returns_one)
def test_restartable(self):
if not self.restartable:
raise testcase.TestSkipped("not restartable")
else:
executor = self.executor_cls(**self.executor_kwargs)
fut = executor.submit(returns_one)
self.assertEqual(1, fut.result())
executor.shutdown()
self.assertEqual(1, executor.statistics.executed)
self.assertRaises(RuntimeError, executor.submit, returns_one)
executor.restart()
self.assertEqual(0, executor.statistics.executed)
fut = executor.submit(returns_one)
self.assertEqual(1, fut.result())
self.assertEqual(1, executor.statistics.executed)
executor.shutdown()
def test_alive(self):
with self.executor_cls(**self.executor_kwargs) as executor:
self.assertTrue(executor.alive)
self.assertFalse(executor.alive)
def test_done_callback(self):
happy_completed = []
unhappy_completed = []
def on_done(fut):
if fut.exception():
unhappy_completed.append(fut)
else:
happy_completed.append(fut)
for i in range(0, 10):
if i % 2 == 0:
fut = self.executor.submit(returns_one)
else:
fut = self.executor.submit(blows_up)
fut.add_done_callback(on_done)
self.executor.shutdown()
self.assertEqual(10, len(happy_completed) + len(unhappy_completed))
self.assertEqual(5, len(unhappy_completed))
self.assertEqual(5, len(happy_completed))
_REJECTION = rejection.reject_when_reached(1)
class TestRejection(testscenarios.TestWithScenarios, base.TestCase):
scenarios = [
('green', {'executor_cls': futurist.GreenThreadPoolExecutor,
'executor_kwargs': {'check_and_reject': _REJECTION,
'max_workers': 1}}),
('thread', {'executor_cls': futurist.ThreadPoolExecutor,
'executor_kwargs': {'check_and_reject': _REJECTION,
'max_workers': 1}}),
]
def setUp(self):
super(TestRejection, self).setUp()
self.executor = self.executor_cls(**self.executor_kwargs)
def test_rejection(self):
self.addCleanup(self.executor.shutdown)
# 1 worker + 1 item of backlog
for _i in range(2):
self.executor.submit(delayed, 0.5)
self.assertRaises(futurist.RejectedSubmission,
self.executor.submit, returns_one)

View File

@ -1,385 +0,0 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import functools
import threading
import time
import eventlet
from eventlet.green import threading as green_threading
import mock
import testscenarios
import futurist
from futurist import periodics
from futurist.tests import base
@periodics.periodic(1)
def every_one_sec(cb):
cb()
@periodics.periodic(0.5)
def every_half_sec(cb):
cb()
@contextlib.contextmanager
def create_destroy_thread(run_what, *args, **kwargs):
t = threading.Thread(target=run_what, args=args, kwargs=kwargs)
t.daemon = True
t.start()
try:
yield
finally:
t.join()
@contextlib.contextmanager
def create_destroy_green_thread(run_what, *args, **kwargs):
t = eventlet.spawn(run_what, *args, **kwargs)
try:
yield
finally:
t.wait()
class TestPeriodicsStrategies(base.TestCase):
def test_invalids(self):
self.assertRaises(ValueError,
periodics.PeriodicWorker, [],
schedule_strategy='not_a_strategy')
class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
scenarios = [
('sync', {'executor_cls': futurist.SynchronousExecutor,
'executor_kwargs': {},
'create_destroy': create_destroy_thread,
'sleep': time.sleep,
'event_cls': threading.Event,
'worker_kwargs': {}}),
('thread', {'executor_cls': futurist.ThreadPoolExecutor,
'executor_kwargs': {'max_workers': 2},
'create_destroy': create_destroy_thread,
'sleep': time.sleep,
'event_cls': threading.Event,
'worker_kwargs': {}}),
('green', {'executor_cls': futurist.GreenThreadPoolExecutor,
'executor_kwargs': {'max_workers': 10},
'sleep': eventlet.sleep,
'event_cls': green_threading.Event,
'create_destroy': create_destroy_green_thread,
'worker_kwargs': {'cond_cls': green_threading.Condition,
'event_cls': green_threading.Event}}),
]
def _test_strategy(self, schedule_strategy, nows,
last_now, expected_next):
nows = list(nows)
ev = self.event_cls()
def now_func():
if len(nows) == 1:
ev.set()
return last_now
return nows.pop()
@periodics.periodic(2, run_immediately=False)
def slow_periodic():
pass
callables = [
(slow_periodic, None, None),
]
worker_kwargs = self.worker_kwargs.copy()
worker_kwargs['schedule_strategy'] = schedule_strategy
worker_kwargs['now_func'] = now_func
w = periodics.PeriodicWorker(callables, **worker_kwargs)
with self.create_destroy(w.start):
ev.wait()
w.stop()
schedule_order = w._schedule._ordering
self.assertEqual([(expected_next, 0)], schedule_order)
def test_last_finished_strategy(self):
last_now = 3.2
nows = [
# Initial schedule building.
0,
# Worker run loop fetch time (to see how long to wait).
2,
# Function call start time.
2,
# Function call end time.
3,
# Stop.
-1,
]
nows = list(reversed(nows))
self._test_strategy('last_finished', nows, last_now, 5.0)
def test_waiting_immediate_add_processed(self):
ran_at = []
@periodics.periodic(0.1, run_immediately=True)
def activated_periodic():
ran_at.append(time.time())
w = periodics.PeriodicWorker([], **self.worker_kwargs)
with self.create_destroy(w.start, allow_empty=True):
# Give some time for the thread to start...
self.sleep(0.5)
w.add(activated_periodic)
while len(ran_at) == 0:
self.sleep(0.1)
w.stop()
def test_double_start_fail(self):
w = periodics.PeriodicWorker([], **self.worker_kwargs)
with self.create_destroy(w.start, allow_empty=True):
# Give some time for the thread to start...
self.sleep(0.5)
# Now ensure we can't start it again...
self.assertRaises(RuntimeError, w.start)
w.stop()
def test_last_started_strategy(self):
last_now = 3.2
nows = [
# Initial schedule building.
0,
# Worker run loop fetch time (to see how long to wait).
2,
# Function call start time.
2,
# Function call end time.
3,
# Stop.
-1,
]
nows = list(reversed(nows))
self._test_strategy('last_started', nows, last_now, 4.0)
def test_aligned_strategy(self):
last_now = 5.5
nows = [
# Initial schedule building.
0,
# Worker run loop fetch time (to see how long to wait).
2,
# Function call start time.
2,
# Function call end time.
5,
# Stop.
-1,
]
nows = list(reversed(nows))
self._test_strategy('aligned_last_finished', nows, last_now, 6.0)
def test_add_on_demand(self):
called = set()
def cb(name):
called.add(name)
callables = []
for i in range(0, 10):
i_cb = functools.partial(cb, '%s_has_called' % i)
callables.append((every_half_sec, (i_cb,), {}))
leftover_callables = list(callables)
w = periodics.PeriodicWorker([], **self.worker_kwargs)
with self.create_destroy(w.start, allow_empty=True):
# NOTE(harlowja): if this never happens, the test will fail
# eventually, with a timeout error..., probably can make it fail
# slightly faster in the future...
while len(called) != len(callables):
if leftover_callables:
cb, args, kwargs = leftover_callables.pop()
w.add(cb, *args, **kwargs)
self.sleep(0.1)
w.stop()
def test_disabled(self):
@periodics.periodic(0.5, enabled=False)
def no_add_me():
pass
@periodics.periodic(0.5)
def add_me():
pass
w = periodics.PeriodicWorker([], **self.worker_kwargs)
self.assertEqual(0, len(w))
self.assertIsNone(w.add(no_add_me))
self.assertEqual(0, len(w))
self.assertIsNotNone(w.add(add_me))
self.assertEqual(1, len(w))
def test_is_periodic(self):
@periodics.periodic(0.5, enabled=False)
def no_add_me():
pass
@periodics.periodic(0.5)
def add_me():
pass
self.assertTrue(periodics.is_periodic(add_me))
self.assertTrue(periodics.is_periodic(no_add_me))
self.assertFalse(periodics.is_periodic(self.test_is_periodic))
self.assertFalse(periodics.is_periodic(42))
def test_watcher(self):
def cb():
pass
callables = [
(every_one_sec, (cb,), None),
(every_half_sec, (cb,), None),
]
executor_factory = lambda: self.executor_cls(**self.executor_kwargs)
w = periodics.PeriodicWorker(callables,
executor_factory=executor_factory,
**self.worker_kwargs)
with self.create_destroy(w.start):
self.sleep(2.0)
w.stop()
for watcher in w.iter_watchers():
self.assertGreaterEqual(watcher.runs, 1)
w.reset()
for watcher in w.iter_watchers():
self.assertEqual(watcher.runs, 0)
self.assertEqual(watcher.successes, 0)
self.assertEqual(watcher.failures, 0)
self.assertEqual(watcher.elapsed, 0)
self.assertEqual(watcher.elapsed_waiting, 0)
def test_worker(self):
called = []
def cb():
called.append(1)
callables = [
(every_one_sec, (cb,), None),
(every_half_sec, (cb,), None),
]
executor = self.executor_cls(**self.executor_kwargs)
executor_factory = lambda: executor
w = periodics.PeriodicWorker(callables,
executor_factory=executor_factory,
**self.worker_kwargs)
with self.create_destroy(w.start):
self.sleep(2.0)
w.stop()
am_called = sum(called)
self.assertGreaterEqual(am_called, 4)
self.assertFalse(executor.alive)
def test_existing_executor(self):
called = []
def cb():
called.append(1)
callables = [
(every_one_sec, (cb,), None),
(every_half_sec, (cb,), None),
]
executor = self.executor_cls(**self.executor_kwargs)
executor_factory = periodics.ExistingExecutor(executor)
w = periodics.PeriodicWorker(callables,
executor_factory=executor_factory,
**self.worker_kwargs)
with self.create_destroy(w.start):
self.sleep(2.0)
w.stop()
am_called = sum(called)
self.assertGreaterEqual(am_called, 4)
self.assertTrue(executor.alive)
def test_create_with_arguments(self):
m = mock.Mock()
class Object(object):
@periodics.periodic(0.5)
def func1(self, *args, **kwargs):
m(*args, **kwargs)
executor_factory = lambda: self.executor_cls(**self.executor_kwargs)
w = periodics.PeriodicWorker.create(objects=[Object()],
executor_factory=executor_factory,
args=('foo',),
kwargs={'bar': 'baz'},
**self.worker_kwargs)
with self.create_destroy(w.start):
self.sleep(2.0)
w.stop()
m.assert_called_with('foo', bar='baz')
class RejectingExecutor(futurist.GreenThreadPoolExecutor):
MAX_REJECTIONS_COUNT = 2
def _reject(self, *args):
if self._rejections_count < self.MAX_REJECTIONS_COUNT:
self._rejections_count += 1
raise futurist.RejectedSubmission()
def __init__(self):
self._rejections_count = 0
super(RejectingExecutor, self).__init__(check_and_reject=self._reject)
class TestRetrySubmission(base.TestCase):
def test_retry_submission(self):
called = []
def cb():
called.append(1)
callables = [
(every_one_sec, (cb,), None),
(every_half_sec, (cb,), None),
]
w = periodics.PeriodicWorker(callables,
executor_factory=RejectingExecutor,
cond_cls=green_threading.Condition,
event_cls=green_threading.Event)
w._RESCHEDULE_DELAY = 0
w._RESCHEDULE_JITTER = 0
with create_destroy_green_thread(w.start):
eventlet.sleep(2.0)
w.stop()
am_called = sum(called)
self.assertGreaterEqual(am_called, 4)

View File

@ -1,89 +0,0 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import eventlet
import testscenarios
import futurist
from futurist.tests import base
from futurist import waiters
# Module level functions need to be used since the process pool
# executor can not access instance or lambda level functions (since those
# are not pickleable).
def mini_delay(use_eventlet_sleep=False):
if use_eventlet_sleep:
eventlet.sleep(0.1)
else:
time.sleep(0.1)
return 1
class TestWaiters(testscenarios.TestWithScenarios, base.TestCase):
scenarios = [
('sync', {'executor_cls': futurist.SynchronousExecutor,
'executor_kwargs': {}, 'use_eventlet_sleep': False}),
('green_sync', {'executor_cls': futurist.SynchronousExecutor,
'executor_kwargs': {'green': True},
'use_eventlet_sleep': True}),
('green', {'executor_cls': futurist.GreenThreadPoolExecutor,
'executor_kwargs': {}, 'use_eventlet_sleep': True}),
('thread', {'executor_cls': futurist.ThreadPoolExecutor,
'executor_kwargs': {}, 'use_eventlet_sleep': False}),
('process', {'executor_cls': futurist.ProcessPoolExecutor,
'executor_kwargs': {}, 'use_eventlet_sleep': False}),
]
def setUp(self):
super(TestWaiters, self).setUp()
self.executor = self.executor_cls(**self.executor_kwargs)
def tearDown(self):
super(TestWaiters, self).tearDown()
self.executor.shutdown()
self.executor = None
def test_wait_for_any(self):
fs = []
for _i in range(0, 10):
fs.append(self.executor.submit(
mini_delay, use_eventlet_sleep=self.use_eventlet_sleep))
all_done_fs = []
total_fs = len(fs)
while len(all_done_fs) != total_fs:
done, not_done = waiters.wait_for_any(fs)
all_done_fs.extend(done)
fs = not_done
self.assertEqual(total_fs, sum(f.result() for f in all_done_fs))
def test_wait_for_all(self):
fs = []
for _i in range(0, 10):
fs.append(self.executor.submit(
mini_delay, use_eventlet_sleep=self.use_eventlet_sleep))
done_fs, not_done_fs = waiters.wait_for_all(fs)
self.assertEqual(len(fs), sum(f.result() for f in done_fs))
self.assertEqual(0, len(not_done_fs))
def test_no_mixed_wait_for_any(self):
fs = [futurist.GreenFuture(), futurist.Future()]
self.assertRaises(RuntimeError, waiters.wait_for_any, fs)
def test_no_mixed_wait_for_all(self):
fs = [futurist.GreenFuture(), futurist.Future()]
self.assertRaises(RuntimeError, waiters.wait_for_all, fs)

View File

@ -1,214 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
try:
from contextlib import ExitStack
except ImportError:
from contextlib2 import ExitStack
import collections
import contextlib
from concurrent import futures
from concurrent.futures import _base
import six
import futurist
from futurist import _utils
try:
from eventlet.green import threading as greenthreading
except ImportError:
greenthreading = None
#: Named tuple returned from ``wait_for*`` calls.
DoneAndNotDoneFutures = collections.namedtuple(
'DoneAndNotDoneFutures', 'done not_done')
_DONE_STATES = frozenset([
_base.CANCELLED_AND_NOTIFIED,
_base.FINISHED,
])
@contextlib.contextmanager
def _acquire_and_release_futures(fs):
# Do this to ensure that we always get the futures in the same order (aka
# always acquire the conditions in the same order, no matter what; a way
# to avoid dead-lock).
fs = sorted(fs, key=id)
with ExitStack() as stack:
for fut in fs:
stack.enter_context(fut._condition)
yield
def _ensure_eventlet(func):
"""Decorator that verifies we have the needed eventlet components."""
@six.wraps(func)
def wrapper(*args, **kwargs):
if not _utils.EVENTLET_AVAILABLE or greenthreading is None:
raise RuntimeError('Eventlet is needed to wait on green futures')
return func(*args, **kwargs)
return wrapper
def _wait_for(fs, no_green_return_when, on_all_green_cb,
caller_name, timeout=None):
green_fs = sum(1 for f in fs if isinstance(f, futurist.GreenFuture))
if not green_fs:
done, not_done = futures.wait(fs, timeout=timeout,
return_when=no_green_return_when)
return DoneAndNotDoneFutures(done, not_done)
else:
non_green_fs = len(fs) - green_fs
if non_green_fs:
raise RuntimeError("Can not wait on %s green futures and %s"
" non-green futures in the same"
" `%s` call" % (green_fs, non_green_fs,
caller_name))
else:
return on_all_green_cb(fs, timeout=timeout)
def wait_for_all(fs, timeout=None):
"""Wait for all of the futures to complete.
Works correctly with both green and non-green futures (but not both
together, since this can't be guaranteed to avoid dead-lock due to how
the waiting implementations are different when green threads are being
used).
Returns pair (done futures, not done futures).
"""
return _wait_for(fs, futures.ALL_COMPLETED, _wait_for_all_green,
'wait_for_all', timeout=timeout)
def wait_for_any(fs, timeout=None):
"""Wait for one (**any**) of the futures to complete.
Works correctly with both green and non-green futures (but not both
together, since this can't be guaranteed to avoid dead-lock due to how
the waiting implementations are different when green threads are being
used).
Returns pair (done futures, not done futures).
"""
return _wait_for(fs, futures.FIRST_COMPLETED, _wait_for_any_green,
'wait_for_any', timeout=timeout)
class _AllGreenWaiter(object):
"""Provides the event that ``_wait_for_all_green`` blocks on."""
def __init__(self, pending):
self.event = greenthreading.Event()
self.lock = greenthreading.Lock()
self.pending = pending
def _decrement_pending(self):
with self.lock:
self.pending -= 1
if self.pending <= 0:
self.event.set()
def add_result(self, future):
self._decrement_pending()
def add_exception(self, future):
self._decrement_pending()
def add_cancelled(self, future):
self._decrement_pending()
class _AnyGreenWaiter(object):
"""Provides the event that ``_wait_for_any_green`` blocks on."""
def __init__(self):
self.event = greenthreading.Event()
def add_result(self, future):
self.event.set()
def add_exception(self, future):
self.event.set()
def add_cancelled(self, future):
self.event.set()
def _partition_futures(fs):
done = set()
not_done = set()
for f in fs:
if f._state in _DONE_STATES:
done.add(f)
else:
not_done.add(f)
return done, not_done
def _create_and_install_waiters(fs, waiter_cls, *args, **kwargs):
waiter = waiter_cls(*args, **kwargs)
for f in fs:
f._waiters.append(waiter)
return waiter
@_ensure_eventlet
def _wait_for_all_green(fs, timeout=None):
if not fs:
return DoneAndNotDoneFutures(set(), set())
with _acquire_and_release_futures(fs):
done, not_done = _partition_futures(fs)
if len(done) == len(fs):
return DoneAndNotDoneFutures(done, not_done)
waiter = _create_and_install_waiters(not_done,
_AllGreenWaiter,
len(not_done))
waiter.event.wait(timeout)
for f in not_done:
f._waiters.remove(waiter)
with _acquire_and_release_futures(fs):
done, not_done = _partition_futures(fs)
return DoneAndNotDoneFutures(done, not_done)
@_ensure_eventlet
def _wait_for_any_green(fs, timeout=None):
if not fs:
return DoneAndNotDoneFutures(set(), set())
with _acquire_and_release_futures(fs):
done, not_done = _partition_futures(fs)
if done:
return DoneAndNotDoneFutures(done, not_done)
waiter = _create_and_install_waiters(fs, _AnyGreenWaiter)
waiter.event.wait(timeout)
for f in fs:
f._waiters.remove(waiter)
with _acquire_and_release_futures(fs):
done, not_done = _partition_futures(fs)
return DoneAndNotDoneFutures(done, not_done)

View File

@ -1,9 +0,0 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
pbr>=1.6 # Apache-2.0
six>=1.9.0 # MIT
monotonic>=0.6 # Apache-2.0
futures>=3.0;python_version=='2.7' or python_version=='2.6' # BSD
contextlib2>=0.4.0 # PSF License

View File

@ -1,51 +0,0 @@
[metadata]
name = futurist
summary = Useful additions to futures, from the future.
description-file =
README.rst
author = OpenStack
author-email = openstack-dev@lists.openstack.org
home-page = http://www.openstack.org/
classifier =
Environment :: OpenStack
Intended Audience :: Information Technology
Intended Audience :: System Administrators
License :: OSI Approved :: Apache Software License
Operating System :: POSIX :: Linux
Programming Language :: Python
Programming Language :: Python :: 2
Programming Language :: Python :: 2.7
Programming Language :: Python :: 3
Programming Language :: Python :: 3.4
[files]
packages =
futurist
[build_sphinx]
source-dir = doc/source
build-dir = doc/build
all_files = 1
[upload_sphinx]
upload-dir = doc/build/html
[compile_catalog]
directory = futurist/locale
domain = futurist
[pbr]
warnerrors = True
[wheel]
universal = 1
[update_catalog]
domain = futurist
output_dir = futurist/locale
input_file = futurist/locale/futurist.pot
[extract_messages]
keywords = _ gettext ngettext l_ lazy_gettext
mapping_file = babel.cfg
output_file = futurist/locale/futurist.pot

View File

@ -1,29 +0,0 @@
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT
import setuptools
# In python < 2.7.4, a lazy loading of package `pbr` will break
# setuptools if some other modules registered functions in `atexit`.
# solution from: http://bugs.python.org/issue15881#msg170215
try:
import multiprocessing # noqa
except ImportError:
pass
setuptools.setup(
setup_requires=['pbr>=1.8'],
pbr=True)

View File

@ -1,19 +0,0 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
hacking<0.11,>=0.10.0
# Used for making sure the eventlet executors work.
eventlet!=0.18.3,>=0.18.2 # MIT
doc8 # Apache-2.0
coverage>=3.6 # Apache-2.0
discover # BSD
python-subunit>=0.0.18 # Apache-2.0/BSD
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 # BSD
oslosphinx!=3.4.0,>=2.5.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
testrepository>=0.0.18 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD
testtools>=1.4.0 # MIT

41
tox.ini
View File

@ -1,41 +0,0 @@
[tox]
minversion = 1.6
envlist = py34,py27,pypy,pep8
skipsdist = True
[testenv]
usedevelop = True
install_command = pip install -U {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = python setup.py test --slowest --testr-args='{posargs}'
[testenv:pep8]
commands = flake8
[testenv:venv]
commands = {posargs}
[testenv:cover]
commands = python setup.py test --coverage --testr-args='{posargs}'
[testenv:py27]
commands =
python setup.py testr --slowest --testr-args='{posargs}'
sphinx-build -b doctest doc/source doc/build
doc8 --ignore-path "doc/source/history.rst" doc/source
[testenv:docs]
commands = python setup.py build_sphinx
[testenv:debug]
commands = oslo_debug_helper {posargs}
[flake8]
# E123, E125 skipped as they are invalid PEP-8.
show-source = True
ignore = E123,E125
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build