diff --git a/.coveragerc b/.coveragerc deleted file mode 100644 index 8cf2144..0000000 --- a/.coveragerc +++ /dev/null @@ -1,8 +0,0 @@ -[run] -branch = True -source = futurist -omit = futurist/tests/*,futurist/openstack/* - -[report] -ignore_errors = True -precision = 2 diff --git a/.gitignore b/.gitignore deleted file mode 100644 index af8f134..0000000 --- a/.gitignore +++ /dev/null @@ -1,54 +0,0 @@ -*.py[cod] - -# C extensions -*.so - -# Packages -*.egg -*.egg-info -dist -build -eggs -parts -bin -var -sdist -develop-eggs -.installed.cfg -lib -lib64 - -# Installer logs -pip-log.txt - -# Unit test / coverage reports -.coverage -cover -.tox -nosetests.xml -.testrepository -.venv - -# Translations -*.mo - -# Mr Developer -.mr.developer.cfg -.project -.pydevproject - -# Complexity -output/*.html -output/*/index.html - -# Sphinx -doc/build - -# pbr generates these -AUTHORS -ChangeLog - -# Editors -*~ -.*.swp -.*sw? diff --git a/.gitreview b/.gitreview deleted file mode 100644 index 0725f83..0000000 --- a/.gitreview +++ /dev/null @@ -1,4 +0,0 @@ -[gerrit] -host=review.openstack.org -port=29418 -project=openstack/futurist.git diff --git a/.mailmap b/.mailmap deleted file mode 100644 index 516ae6f..0000000 --- a/.mailmap +++ /dev/null @@ -1,3 +0,0 @@ -# Format is: -# -# diff --git a/.testr.conf b/.testr.conf deleted file mode 100644 index 6d83b3c..0000000 --- a/.testr.conf +++ /dev/null @@ -1,7 +0,0 @@ -[DEFAULT] -test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \ - OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \ - OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \ - ${PYTHON:-python} -m subunit.run discover -t ./ . $LISTOPT $IDOPTION -test_id_option=--load-list $IDFILE -test_list_option=--list diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst deleted file mode 100644 index fe08203..0000000 --- a/CONTRIBUTING.rst +++ /dev/null @@ -1,17 +0,0 @@ -If you would like to contribute to the development of OpenStack, you must -follow the steps in this page: - - http://docs.openstack.org/infra/manual/developers.html - -If you already have a good understanding of how the system works and your -OpenStack accounts are set up, you can skip to the development workflow -section of this documentation to learn how changes to OpenStack should be -submitted for review via the Gerrit tool: - - http://docs.openstack.org/infra/manual/developers.html#development-workflow - -Pull requests submitted through GitHub will be ignored. - -Bugs should be filed on Launchpad, not GitHub: - - https://bugs.launchpad.net/futurist diff --git a/HACKING.rst b/HACKING.rst deleted file mode 100644 index 30811f9..0000000 --- a/HACKING.rst +++ /dev/null @@ -1,4 +0,0 @@ -Futurist Style Commandments -=============================================== - -Read the OpenStack Style Commandments http://docs.openstack.org/developer/hacking/ diff --git a/LICENSE b/LICENSE deleted file mode 100644 index 68c771a..0000000 --- a/LICENSE +++ /dev/null @@ -1,176 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - diff --git a/MANIFEST.in b/MANIFEST.in deleted file mode 100644 index c978a52..0000000 --- a/MANIFEST.in +++ /dev/null @@ -1,6 +0,0 @@ -include AUTHORS -include ChangeLog -exclude .gitignore -exclude .gitreview - -global-exclude *.pyc diff --git a/README b/README new file mode 100644 index 0000000..8fcd2b2 --- /dev/null +++ b/README @@ -0,0 +1,14 @@ +This project is no longer maintained. + +The contents of this repository are still available in the Git +source code management system. To see the contents of this +repository before it reached its end of life, please check out the +previous commit with "git checkout HEAD^1". + +For ongoing work on maintaining OpenStack packages in the Debian +distribution, please see the Debian OpenStack packaging team at +https://wiki.debian.org/OpenStack/. + +For any further questions, please email +openstack-dev@lists.openstack.org or join #openstack-dev on +Freenode. diff --git a/README.rst b/README.rst deleted file mode 100644 index cc2fb8d..0000000 --- a/README.rst +++ /dev/null @@ -1,22 +0,0 @@ -======== -Futurist -======== - -.. image:: https://img.shields.io/pypi/v/futurist.svg - :target: https://pypi.python.org/pypi/futurist/ - :alt: Latest Version - -.. image:: https://img.shields.io/pypi/dm/futurist.svg - :target: https://pypi.python.org/pypi/futurist/ - :alt: Downloads - -Code from the future, delivered to you in the **now**. The goal of this library -would be to provide a well documented futures classes/utilities/additions that -allows for providing a level of transparency in how asynchronous work gets -executed. This library currently adds statistics gathering, an eventlet -executor, a synchronous executor etc. - -* Free software: Apache license -* Documentation: http://docs.openstack.org/developer/futurist -* Source: http://git.openstack.org/cgit/openstack/futurist -* Bugs: http://bugs.launchpad.net/futurist diff --git a/babel.cfg b/babel.cfg deleted file mode 100644 index 15cd6cb..0000000 --- a/babel.cfg +++ /dev/null @@ -1,2 +0,0 @@ -[python: **.py] - diff --git a/debian/changelog b/debian/changelog deleted file mode 100644 index 6d5e01b..0000000 --- a/debian/changelog +++ /dev/null @@ -1,60 +0,0 @@ -python-futurist (0.13.0-3) UNRELEASED; urgency=medium - - * Standards-Version is 3.9.8 now (no change) - * d/rules: Changed UPSTREAM_GIT protocol to https - - -- Ondřej Nový Sat, 09 Apr 2016 19:26:50 +0200 - -python-futurist (0.13.0-2) unstable; urgency=medium - - * Uploading to unstable. - - -- Thomas Goirand Mon, 04 Apr 2016 09:25:07 +0000 - -python-futurist (0.13.0-1) experimental; urgency=medium - - [ Ondřej Nový ] - * Fixed VCS URLs (https). - - [ Thomas Goirand ] - * New upstream release. - * Fixed (build-)depends for this release. - * Standards-Version: 3.9.7 (no change). - * Disabled futurist.tests.test_executors.TestRejection.test_rejection which - is failing. - - -- Thomas Goirand Wed, 02 Mar 2016 13:06:31 +0000 - -python-futurist (0.9.0-1) experimental; urgency=medium - - * New upstream release. - * Fixed debian/copyright ordering. - - -- Thomas Goirand Sat, 16 Jan 2016 03:48:28 +0000 - -python-futurist (0.5.0-2) unstable; urgency=medium - - * Uploading to unstable. - - -- Thomas Goirand Fri, 16 Oct 2015 09:31:19 +0000 - -python-futurist (0.5.0-1) experimental; urgency=medium - - [ James Page ] - * d/rules: Drop use of dpkg-parsechangelog -S to ease backporting to - older Debian and Ubuntu releases. - - [ Corey Bryant ] - * New upstream release. - * Align dependencies with upstream. - - [ Thomas Goirand ] - * Drop versions already satisfied in Trusty. - - -- Thomas Goirand Mon, 05 Oct 2015 08:21:43 +0000 - -python-futurist (0.1.2-1) unstable; urgency=medium - - * Initial release. (Closes: #792500) - - -- Thomas Goirand Wed, 15 Jul 2015 14:47:49 +0200 diff --git a/debian/compat b/debian/compat deleted file mode 100644 index ec63514..0000000 --- a/debian/compat +++ /dev/null @@ -1 +0,0 @@ -9 diff --git a/debian/control b/debian/control deleted file mode 100644 index 3a616af..0000000 --- a/debian/control +++ /dev/null @@ -1,113 +0,0 @@ -Source: python-futurist -Section: python -Priority: optional -Maintainer: PKG OpenStack -Uploaders: Thomas Goirand , -Build-Depends: debhelper (>= 9), - dh-python, - python-all, - python-pbr (>= 1.8), - python-setuptools, - python-sphinx, - python3-all, - python3-pbr (>= 1.8), - python3-setuptools, -Build-Depends-Indep: python-concurrent.futures (>= 3.0), - python-contextlib2, - python-coverage, - python-eventlet (>= 0.18.4), - python-hacking (>= 0.10.0), - python-monotonic (>= 0.6), - python-oslosphinx (>= 2.5.0), - python-oslotest (>= 1.10.0), - python-six (>= 1.9.0), - python-testscenarios, - python-testtools (>= 1.4.0), - python3-contextlib2, - python3-eventlet (>= 0.18.4), - python3-monotonic (>= 0.6), - python3-oslotest (>= 1.10.0), - python3-six (>= 1.9.0), - python3-testscenarios, - python3-testtools (>= 1.4.0), - subunit (>= 1.1), - testrepository, -Standards-Version: 3.9.8 -Vcs-Browser: https://anonscm.debian.org/cgit/openstack/python-futurist.git/ -Vcs-Git: https://anonscm.debian.org/git/openstack/python-futurist.git -Homepage: http://www.openstack.org/ - -Package: python-futurist -Architecture: all -Depends: python-concurrent.futures (>= 3.0), - python-contextlib2, - python-monotonic (>= 0.6), - python-pbr (>= 1.8), - python-six (>= 1.9.0), - ${misc:Depends}, - ${python:Depends}, -Suggests: python-futurist-doc, -Description: useful additions to futures, from the future - Python 2.x - Code from the future, delivered to you in the now. For example: - * A futurist.GreenThreadPoolExecutor using eventlet green thread pools. It - provides a standard executor API/interface and it also gathers execution - statistics. - * A futurist.ProcessPoolExecutor derivative that gathers execution - statistics. - * A futurist.SynchronousExecutor that doesn’t run concurrently. It has the - same executor API/interface and it also gathers execution statistics. - * A futurist.ThreadPoolExecutor derivative that gathers execution statistics. - * A futurist.periodics.PeriodicWorker that can use the previously mentioned - executors to run asynchronous work periodically in parallel or - synchronously. - * etc. - . - This package contains the Python 2.x module. - -Package: python3-futurist -Architecture: all -Depends: python3-contextlib2, - python3-monotonic (>= 0.6), - python3-pbr (>= 1.8), - python3-six (>= 1.9.0), - ${misc:Depends}, - ${python3:Depends}, -Suggests: python-futurist-doc, -Description: useful additions to futures, from the future - Python 3.x - Code from the future, delivered to you in the now. For example: - * A futurist.GreenThreadPoolExecutor using eventlet green thread pools. It - provides a standard executor API/interface and it also gathers execution - statistics. - * A futurist.ProcessPoolExecutor derivative that gathers execution - statistics. - * A futurist.SynchronousExecutor that doesn’t run concurrently. It has the - same executor API/interface and it also gathers execution statistics. - * A futurist.ThreadPoolExecutor derivative that gathers execution statistics. - * A futurist.periodics.PeriodicWorker that can use the previously mentioned - executors to run asynchronous work periodically in parallel or - synchronously. - * etc. - . - This package contains the Python 3.x module. - -Package: python-futurist-doc -Section: doc -Architecture: all -Depends: ${misc:Depends}, - ${sphinxdoc:Depends}, -Description: useful additions to futures, from the future - doc - Code from the future, delivered to you in the now. For example: - * A futurist.GreenThreadPoolExecutor using eventlet green thread pools. It - provides a standard executor API/interface and it also gathers execution - statistics. - * A futurist.ProcessPoolExecutor derivative that gathers execution - statistics. - * A futurist.SynchronousExecutor that doesn’t run concurrently. It has the - same executor API/interface and it also gathers execution statistics. - * A futurist.ThreadPoolExecutor derivative that gathers execution statistics. - * A futurist.periodics.PeriodicWorker that can use the previously mentioned - executors to run asynchronous work periodically in parallel or - synchronously. - * etc. - . - This package contains the documentation. diff --git a/debian/copyright b/debian/copyright deleted file mode 100644 index dd1ee87..0000000 --- a/debian/copyright +++ /dev/null @@ -1,29 +0,0 @@ -Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ -Upstream-Name: futurist -Source: http://www.openstack.org/ - -Files: * -Copyright: (c) 2010-2016, OpenStack Foundation - (c) 2013-2015, Yahoo INC. - (c) 2013 Hewlett-Packard Development Company, L.P. -License: Apache-2.0 - -Files: debian/* -Copyright: (c) 2015-2016, Thomas Goirand -License: Apache-2.0 - -License: Apache-2.0 - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - . - http://www.apache.org/licenses/LICENSE-2.0 - . - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - . - On Debian-based systems the full text of the Apache version 2.0 license - can be found in /usr/share/common-licenses/Apache-2.0. diff --git a/debian/gbp.conf b/debian/gbp.conf deleted file mode 100644 index e388b0a..0000000 --- a/debian/gbp.conf +++ /dev/null @@ -1,9 +0,0 @@ -[DEFAULT] -upstream-branch = master -debian-branch = debian/newton -upstream-tag = %(version)s -compression = xz - -[buildpackage] -export-dir = ../build-area/ - diff --git a/debian/python-futurist-doc.doc-base b/debian/python-futurist-doc.doc-base deleted file mode 100644 index 556bf3a..0000000 --- a/debian/python-futurist-doc.doc-base +++ /dev/null @@ -1,9 +0,0 @@ -Document: python-futurist-doc -Title: futurist Documentation -Author: N/A -Abstract: Sphinx documentation for futurist -Section: Programming/Python - -Format: HTML -Index: /usr/share/doc/python-futurist-doc/html/index.html -Files: /usr/share/doc/python-futurist-doc/html/* diff --git a/debian/rules b/debian/rules deleted file mode 100755 index f0a3ff0..0000000 --- a/debian/rules +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/make -f - -PYTHONS:=$(shell pyversions -vr) -PYTHON3S:=$(shell py3versions -vr) - -export OSLO_PACKAGE_VERSION=$(shell dpkg-parsechangelog | sed -n -e 's/^Version: //p' | sed -e 's/^[[:digit:]]*://' -e 's/[-].*//') - -UPSTREAM_GIT := https://github.com/openstack/futurist.git --include /usr/share/openstack-pkg-tools/pkgos.make - -%: - dh $@ --buildsystem=python_distutils --with python2,python3,sphinxdoc - -override_dh_auto_install: - set -e ; for pyvers in $(PYTHONS); do \ - python$$pyvers setup.py install --install-layout=deb \ - --root $(CURDIR)/debian/python-futurist; \ - done - set -e ; for pyvers in $(PYTHON3S); do \ - python$$pyvers setup.py install --install-layout=deb \ - --root $(CURDIR)/debian/python3-futurist; \ - done - rm -rf $(CURDIR)/debian/python*-futurist/usr/lib/python*/dist-packages/*.pth - -override_dh_auto_test: -ifeq (,$(findstring nocheck, $(DEB_BUILD_OPTIONS))) - @echo "===> Running tests" - set -e ; set -x ; for i in 2.7 $(PYTHON3S) ; do \ - PYMAJOR=`echo $$i | cut -d'.' -f1` ; \ - echo "===> Testing with python$$i (python$$PYMAJOR)" ; \ - rm -rf .testrepository ; \ - testr-python$$PYMAJOR init ; \ - TEMP_REZ=`mktemp -t` ; \ - PYTHONPATH=$(CURDIR) PYTHON=python$$i testr-python$$PYMAJOR run --subunit 'futurist\.tests\.(?!.*test_executors\.TestRejection\.test_rejection.*)' | tee $$TEMP_REZ | subunit2pyunit ; \ - cat $$TEMP_REZ | subunit-filter -s --no-passthrough | subunit-stats ; \ - rm -f $$TEMP_REZ ; \ - testr-python$$PYMAJOR slowest ; \ - done -endif - -override_dh_sphinxdoc: -ifeq (,$(findstring nodocs, $(DEB_BUILD_OPTIONS))) - sphinx-build -b html doc/source debian/python-futurist-doc/usr/share/doc/python-futurist-doc/html - dh_sphinxdoc -O--buildsystem=python_distutils -endif - -override_dh_clean: - dh_clean -O--buildsystem=python_distutils - rm -rf build - -# Commands not to run -override_dh_installcatalogs: -override_dh_installemacsen override_dh_installifupdown: -override_dh_installinfo override_dh_installmenu override_dh_installmime: -override_dh_installmodules override_dh_installlogcheck: -override_dh_installpam override_dh_installppp override_dh_installudev override_dh_installwm: -override_dh_installxfonts override_dh_gconf override_dh_icons override_dh_perl override_dh_usrlocal: -override_dh_installcron override_dh_installdebconf: -override_dh_installlogrotate override_dh_installgsettings: diff --git a/debian/source/format b/debian/source/format deleted file mode 100644 index 163aaf8..0000000 --- a/debian/source/format +++ /dev/null @@ -1 +0,0 @@ -3.0 (quilt) diff --git a/debian/source/options b/debian/source/options deleted file mode 100644 index cb61fa5..0000000 --- a/debian/source/options +++ /dev/null @@ -1 +0,0 @@ -extend-diff-ignore = "^[^/]*[.]egg-info/" diff --git a/debian/watch b/debian/watch deleted file mode 100644 index 301a919..0000000 --- a/debian/watch +++ /dev/null @@ -1,3 +0,0 @@ -version=3 -opts="uversionmangle=s/\.(b|rc)/~$1/" \ -https://github.com/openstack/futurist/tags .*/(\d[\d\.]+)\.tar\.gz diff --git a/doc/source/api.rst b/doc/source/api.rst deleted file mode 100644 index 44535c9..0000000 --- a/doc/source/api.rst +++ /dev/null @@ -1,68 +0,0 @@ -=== -API -=== - ---------- -Executors ---------- - -.. autoclass:: futurist.GreenThreadPoolExecutor - :members: - :special-members: __init__ - -.. autoclass:: futurist.ProcessPoolExecutor - :members: - :special-members: __init__ - -.. autoclass:: futurist.SynchronousExecutor - :members: - :special-members: __init__ - -.. autoclass:: futurist.ThreadPoolExecutor - :members: - :special-members: __init__ - -------- -Futures -------- - -.. autoclass:: futurist.Future - :members: - -.. autoclass:: futurist.GreenFuture - :members: - ---------- -Periodics ---------- - -.. autoclass:: futurist.periodics.PeriodicWorker - :members: - :special-members: __init__, __len__ - -.. autofunction:: futurist.periodics.periodic - -.. autoclass:: futurist.periodics.Watcher - :members: - -------------- -Miscellaneous -------------- - -.. autoclass:: futurist.ExecutorStatistics - :members: - ----------- -Exceptions ----------- - -.. autoclass:: futurist.RejectedSubmission - :members: - -------- -Waiters -------- - -.. autofunction:: futurist.waiters.wait_for_any -.. autofunction:: futurist.waiters.wait_for_all -.. autoclass:: futurist.waiters.DoneAndNotDoneFutures diff --git a/doc/source/conf.py b/doc/source/conf.py deleted file mode 100755 index 33df93e..0000000 --- a/doc/source/conf.py +++ /dev/null @@ -1,75 +0,0 @@ -# -*- coding: utf-8 -*- -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys - -sys.path.insert(0, os.path.abspath('../..')) -# -- General configuration ---------------------------------------------------- - -# Add any Sphinx extension module names here, as strings. They can be -# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones. -extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.doctest', - 'oslosphinx' -] - -# autodoc generation is a bit aggressive and a nuisance when doing heavy -# text edit cycles. -# execute "export SPHINX_DEBUG=1" in your terminal to disable - -# The suffix of source filenames. -source_suffix = '.rst' - -# The master toctree document. -master_doc = 'index' - -# General information about the project. -project = u'futurist' -copyright = u'2013, OpenStack Foundation' - -# If true, '()' will be appended to :func: etc. cross-reference text. -add_function_parentheses = True - -# If true, the current module name will be prepended to all description -# unit titles (such as .. function::). -add_module_names = True - -# The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' - -# -- Options for HTML output -------------------------------------------------- - -# The theme to use for HTML and HTML Help pages. Major themes that come with -# Sphinx are currently 'default' and 'sphinxdoc'. -# html_theme_path = ["."] -# html_theme = '_theme' -# html_static_path = ['static'] - -# Output file base name for HTML help builder. -htmlhelp_basename = '%sdoc' % project - -# Grouping the document tree into LaTeX files. List of tuples -# (source start file, target name, title, author, documentclass -# [howto/manual]). -latex_documents = [ - ('index', - '%s.tex' % project, - u'%s Documentation' % project, - u'OpenStack Foundation', 'manual'), -] - -# Example configuration for intersphinx: refer to the Python standard library. -#intersphinx_mapping = {'http://docs.python.org/': None} diff --git a/doc/source/contributing.rst b/doc/source/contributing.rst deleted file mode 100644 index 1f5ca23..0000000 --- a/doc/source/contributing.rst +++ /dev/null @@ -1,5 +0,0 @@ -============ -Contributing -============ - -.. include:: ../../CONTRIBUTING.rst diff --git a/doc/source/examples.rst b/doc/source/examples.rst deleted file mode 100644 index 76350c5..0000000 --- a/doc/source/examples.rst +++ /dev/null @@ -1,258 +0,0 @@ -======== -Examples -======== - ------------------------------------------ -Creating and using a synchronous executor ------------------------------------------ - -.. testcode:: - - # NOTE: enable printing timestamp for additional data - - import sys - import futurist - import eventlet - - def delayed_func(): - print("started") - eventlet.sleep(3) - print("done") - - #print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) - e = futurist.SynchronousExecutor() - fut = e.submit(delayed_func) - eventlet.sleep(1) - print("Hello") - eventlet.sleep(1) - e.shutdown() - #print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) - -**Expected output:** - -.. testoutput:: - - started - done - Hello - ------------------------------------------------- -Creating and using a green thread-based executor ------------------------------------------------- - -.. testcode:: - - # NOTE: enable printing timestamp for additional data - - import sys - import futurist - import eventlet - - def delayed_func(): - print("started") - eventlet.sleep(3) - print("done") - - #print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) - e = futurist.GreenThreadPoolExecutor() - fut = e.submit(delayed_func) - eventlet.sleep(1) - print("Hello") - eventlet.sleep(1) - e.shutdown() - #print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) - -**Expected output:** - -.. testoutput:: - - started - Hello - done - - ------------------------------------------- -Creating and using a thread-based executor ------------------------------------------- - -.. testcode:: - - import time - - import futurist - - def delayed_func(): - time.sleep(0.1) - return "hello" - - e = futurist.ThreadPoolExecutor() - fut = e.submit(delayed_func) - print(fut.result()) - e.shutdown() - -**Expected output:** - -.. testoutput:: - - hello - -------------------------------------------- -Creating and using a process-based executor -------------------------------------------- - -:: - - import time - - import futurist - - def delayed_func(): - time.sleep(0.1) - return "hello" - - e = futurist.ProcessPoolExecutor() - fut = e.submit(delayed_func) - print(fut.result()) - e.shutdown() - -**Expected output:** - -:: - - hello - ---------------------------------------- -Running a set of functions periodically ---------------------------------------- - -.. testcode:: - - import futurist - from futurist import periodics - - import time - import threading - - - @periodics.periodic(1) - def every_one(started_at): - print("1: %s" % (time.time() - started_at)) - - - @periodics.periodic(2) - def every_two(started_at): - print("2: %s" % (time.time() - started_at)) - - - @periodics.periodic(4) - def every_four(started_at): - print("4: %s" % (time.time() - started_at)) - - - @periodics.periodic(6) - def every_six(started_at): - print("6: %s" % (time.time() - started_at)) - - - started_at = time.time() - callables = [ - # The function to run + any automatically provided positional and - # keyword arguments to provide to it everytime it is activated. - (every_one, (started_at,), {}), - (every_two, (started_at,), {}), - (every_four, (started_at,), {}), - (every_six, (started_at,), {}), - ] - w = periodics.PeriodicWorker(callables) - - # In this example we will run the periodic functions using a thread, it - # is also possible to just call the w.start() method directly if you do - # not mind blocking up the current program. - t = threading.Thread(target=w.start) - t.daemon = True - t.start() - - # Run for 10 seconds and then stop. - while (time.time() - started_at) <= 10: - time.sleep(0.1) - w.stop() - w.wait() - t.join() - -.. testoutput:: - :hide: - - ... - ------------------------------------------------------------ -Running a set of functions periodically (using an executor) ------------------------------------------------------------ - -.. testcode:: - - import futurist - from futurist import periodics - - import time - import threading - - - @periodics.periodic(1) - def every_one(started_at): - print("1: %s" % (time.time() - started_at)) - time.sleep(0.5) - - - @periodics.periodic(2) - def every_two(started_at): - print("2: %s" % (time.time() - started_at)) - time.sleep(1) - - - @periodics.periodic(4) - def every_four(started_at): - print("4: %s" % (time.time() - started_at)) - time.sleep(2) - - - @periodics.periodic(6) - def every_six(started_at): - print("6: %s" % (time.time() - started_at)) - time.sleep(3) - - - started_at = time.time() - callables = [ - # The function to run + any automatically provided positional and - # keyword arguments to provide to it everytime it is activated. - (every_one, (started_at,), {}), - (every_two, (started_at,), {}), - (every_four, (started_at,), {}), - (every_six, (started_at,), {}), - ] - - # To avoid getting blocked up by slow periodic functions we can also - # provide a executor pool to make sure that slow functions only block - # up a thread (or green thread), instead of blocking other periodic - # functions that need to be scheduled to run. - executor_factory = lambda: futurist.ThreadPoolExecutor(max_workers=2) - w = periodics.PeriodicWorker(callables, executor_factory=executor_factory) - - # In this example we will run the periodic functions using a thread, it - # is also possible to just call the w.start() method directly if you do - # not mind blocking up the current program. - t = threading.Thread(target=w.start) - t.daemon = True - t.start() - - # Run for 10 seconds and then stop. - while (time.time() - started_at) <= 10: - time.sleep(0.1) - w.stop() - w.wait() - t.join() - -.. testoutput:: - :hide: - - ... diff --git a/doc/source/features.rst b/doc/source/features.rst deleted file mode 100644 index 4e7b028..0000000 --- a/doc/source/features.rst +++ /dev/null @@ -1,34 +0,0 @@ -======== -Features -======== - -Async ------ - -* A :py:class:`.futurist.GreenThreadPoolExecutor` using `eventlet`_ green - thread pools. It provides a standard `executor`_ API/interface and it also - gathers execution statistics. It returns instances of - :py:class:`.futurist.GreenFuture` objects. -* A :py:class:`.futurist.ProcessPoolExecutor` derivative that gathers execution - statistics. It returns instances of :py:class:`.futurist.Future` objects. -* A :py:class:`.futurist.SynchronousExecutor` that **doesn't** run - concurrently. It has the same `executor`_ API/interface and it also - gathers execution statistics. It returns instances - of :py:class:`.futurist.Future` objects. -* A :py:class:`.futurist.ThreadPoolExecutor` derivative that gathers - execution statistics. It returns instances - of :py:class:`.futurist.Future` objects. - -Periodics ---------- - -* A :py:class:`.futurist.periodics.PeriodicWorker` that can use the previously - mentioned executors to run asynchronous work periodically in parallel - or synchronously. It does this by executing arbitary functions/methods - that have been decorated with the :py:func:`.futurist.periodics.periodic` - decorator according to a internally maintained schedule (which itself is - based on the `heap`_ algorithm). - -.. _heap: https://en.wikipedia.org/wiki/Heap_%28data_structure%29 -.. _eventlet: http://eventlet.net/ -.. _executor: https://docs.python.org/dev/library/concurrent.futures.html#executor-objects diff --git a/doc/source/history.rst b/doc/source/history.rst deleted file mode 100644 index db8340b..0000000 --- a/doc/source/history.rst +++ /dev/null @@ -1,2 +0,0 @@ -.. include:: ../../ChangeLog - diff --git a/doc/source/index.rst b/doc/source/index.rst deleted file mode 100644 index 577ed15..0000000 --- a/doc/source/index.rst +++ /dev/null @@ -1,29 +0,0 @@ -Welcome to futurist's documentation! -======================================================== - -Code from the future, delivered to you in the **now**. - -.. toctree:: - :maxdepth: 2 - - features - api - installation - examples - contributing - -History -======= - -.. toctree:: - :maxdepth: 2 - - history - -Indices and tables -================== - -* :ref:`genindex` -* :ref:`modindex` -* :ref:`search` - diff --git a/doc/source/installation.rst b/doc/source/installation.rst deleted file mode 100644 index fa02187..0000000 --- a/doc/source/installation.rst +++ /dev/null @@ -1,12 +0,0 @@ -============ -Installation -============ - -At the command line:: - - $ pip install futurist - -Or, if you have virtualenvwrapper installed:: - - $ mkvirtualenv futurist - $ pip install futurist diff --git a/futurist/__init__.py b/futurist/__init__.py deleted file mode 100644 index f7f874b..0000000 --- a/futurist/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# Promote accessible items to this module namespace (for easy access). - -from futurist._futures import Future # noqa -from futurist._futures import GreenFuture # noqa - -from futurist._futures import CancelledError # noqa - -from futurist._futures import GreenThreadPoolExecutor # noqa -from futurist._futures import ProcessPoolExecutor # noqa -from futurist._futures import SynchronousExecutor # noqa -from futurist._futures import ThreadPoolExecutor # noqa - -from futurist._futures import RejectedSubmission # noqa - -from futurist._futures import ExecutorStatistics # noqa diff --git a/futurist/_futures.py b/futurist/_futures.py deleted file mode 100644 index bc46813..0000000 --- a/futurist/_futures.py +++ /dev/null @@ -1,484 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import functools -import threading - -from concurrent import futures as _futures -from concurrent.futures import process as _process -from concurrent.futures import thread as _thread -import six - -from futurist import _green -from futurist import _utils - - -CancelledError = _futures.CancelledError - - -class RejectedSubmission(Exception): - """Exception raised when a submitted call is rejected (for some reason).""" - - -# NOTE(harlowja): Allows for simpler access to this type... -Future = _futures.Future - - -class _Threading(object): - - @staticmethod - def event_object(*args, **kwargs): - return threading.Event(*args, **kwargs) - - @staticmethod - def lock_object(*args, **kwargs): - return threading.Lock(*args, **kwargs) - - @staticmethod - def rlock_object(*args, **kwargs): - return threading.RLock(*args, **kwargs) - - @staticmethod - def condition_object(*args, **kwargs): - return threading.Condition(*args, **kwargs) - - -class _Gatherer(object): - def __init__(self, submit_func, lock_factory, start_before_submit=False): - self._submit_func = submit_func - self._stats_lock = lock_factory() - self._stats = ExecutorStatistics() - self._start_before_submit = start_before_submit - - @property - def statistics(self): - return self._stats - - def clear(self): - with self._stats_lock: - self._stats = ExecutorStatistics() - - def _capture_stats(self, started_at, fut): - """Capture statistics - - :param started_at: when the activity the future has performed - was started at - :param fut: future object - """ - # If time somehow goes backwards, make sure we cap it at 0.0 instead - # of having negative elapsed time... - elapsed = max(0.0, _utils.now() - started_at) - with self._stats_lock: - # Use a new collection and lock so that all mutations are seen as - # atomic and not overlapping and corrupting with other - # mutations (the clone ensures that others reading the current - # values will not see a mutated/corrupted one). Since futures may - # be completed by different threads we need to be extra careful to - # gather this data in a way that is thread-safe... - (failures, executed, runtime, cancelled) = (self._stats.failures, - self._stats.executed, - self._stats.runtime, - self._stats.cancelled) - if fut.cancelled(): - cancelled += 1 - else: - executed += 1 - if fut.exception() is not None: - failures += 1 - runtime += elapsed - self._stats = ExecutorStatistics(failures=failures, - executed=executed, - runtime=runtime, - cancelled=cancelled) - - def submit(self, fn, *args, **kwargs): - """Submit work to be executed and capture statistics.""" - if self._start_before_submit: - started_at = _utils.now() - fut = self._submit_func(fn, *args, **kwargs) - if not self._start_before_submit: - started_at = _utils.now() - fut.add_done_callback(functools.partial(self._capture_stats, - started_at)) - return fut - - -class ThreadPoolExecutor(_thread.ThreadPoolExecutor): - """Executor that uses a thread pool to execute calls asynchronously. - - It gathers statistics about the submissions executed for post-analysis... - - See: https://docs.python.org/dev/library/concurrent.futures.html - """ - - threading = _Threading() - - def __init__(self, max_workers=None, check_and_reject=None): - """Initializes a thread pool executor. - - :param max_workers: maximum number of workers that can be - simulatenously active at the same time, further - submitted work will be queued up when this limit - is reached. - :type max_workers: int - :param check_and_reject: a callback function that will be provided - two position arguments, the first argument - will be this executor instance, and the second - will be the number of currently queued work - items in this executors backlog; the callback - should raise a :py:class:`.RejectedSubmission` - exception if it wants to have this submission - rejected. - :type check_and_reject: callback - """ - if max_workers is None: - max_workers = _utils.get_optimal_thread_count() - super(ThreadPoolExecutor, self).__init__(max_workers=max_workers) - if self._max_workers <= 0: - raise ValueError("Max workers must be greater than zero") - # NOTE(harlowja): this replaces the parent classes non-reentrant lock - # with a reentrant lock so that we can correctly call into the check - # and reject lock, and that it will block correctly if another - # submit call is done during that... - self._shutdown_lock = threading.RLock() - self._check_and_reject = check_and_reject or (lambda e, waiting: None) - self._gatherer = _Gatherer( - # Since our submit will use this gatherer we have to reference - # the parent submit, bound to this instance (which is what we - # really want to use anyway). - super(ThreadPoolExecutor, self).submit, - self.threading.lock_object) - - @property - def statistics(self): - """:class:`.ExecutorStatistics` about the executors executions.""" - return self._gatherer.statistics - - @property - def alive(self): - """Accessor to determine if the executor is alive/active.""" - return not self._shutdown - - def submit(self, fn, *args, **kwargs): - """Submit some work to be executed (and gather statistics).""" - with self._shutdown_lock: - if self._shutdown: - raise RuntimeError('Can not schedule new futures' - ' after being shutdown') - self._check_and_reject(self, self._work_queue.qsize()) - return self._gatherer.submit(fn, *args, **kwargs) - - -class ProcessPoolExecutor(_process.ProcessPoolExecutor): - """Executor that uses a process pool to execute calls asynchronously. - - It gathers statistics about the submissions executed for post-analysis... - - See: https://docs.python.org/dev/library/concurrent.futures.html - """ - - threading = _Threading() - - def __init__(self, max_workers=None): - if max_workers is None: - max_workers = _utils.get_optimal_thread_count() - super(ProcessPoolExecutor, self).__init__(max_workers=max_workers) - if self._max_workers <= 0: - raise ValueError("Max workers must be greater than zero") - self._gatherer = _Gatherer( - # Since our submit will use this gatherer we have to reference - # the parent submit, bound to this instance (which is what we - # really want to use anyway). - super(ProcessPoolExecutor, self).submit, - self.threading.lock_object) - - @property - def alive(self): - """Accessor to determine if the executor is alive/active.""" - return not self._shutdown_thread - - @property - def statistics(self): - """:class:`.ExecutorStatistics` about the executors executions.""" - return self._gatherer.statistics - - def submit(self, fn, *args, **kwargs): - """Submit some work to be executed (and gather statistics).""" - return self._gatherer.submit(fn, *args, **kwargs) - - -class SynchronousExecutor(_futures.Executor): - """Executor that uses the caller to execute calls synchronously. - - This provides an interface to a caller that looks like an executor but - will execute the calls inside the caller thread instead of executing it - in a external process/thread for when this type of functionality is - useful to provide... - - It gathers statistics about the submissions executed for post-analysis... - """ - - threading = _Threading() - - def __init__(self, green=False, run_work_func=lambda work: work.run()): - """Synchronous executor constructor. - - :param green: when enabled this forces the usage of greened lock - classes and green futures (so that the internals of this - object operate correctly under eventlet) - :type green: bool - :param run_work_func: callable that takes a single work item and - runs it (typically in a blocking manner) - :param run_work_func: callable - """ - if green and not _utils.EVENTLET_AVAILABLE: - raise RuntimeError('Eventlet is needed to use a green' - ' synchronous executor') - if not six.callable(run_work_func): - raise ValueError("Run work parameter expected to be callable") - self._run_work_func = run_work_func - self._shutoff = False - if green: - self.threading = _green.threading - self._future_cls = GreenFuture - else: - self._future_cls = Future - self._run_work_func = run_work_func - self._gatherer = _Gatherer(self._submit, - self.threading.lock_object, - start_before_submit=True) - - @property - def alive(self): - """Accessor to determine if the executor is alive/active.""" - return not self._shutoff - - def shutdown(self, wait=True): - self._shutoff = True - - def restart(self): - """Restarts this executor (*iff* previously shutoff/shutdown). - - NOTE(harlowja): clears any previously gathered statistics. - """ - if self._shutoff: - self._shutoff = False - self._gatherer.clear() - - @property - def statistics(self): - """:class:`.ExecutorStatistics` about the executors executions.""" - return self._gatherer.statistics - - def submit(self, fn, *args, **kwargs): - """Submit some work to be executed (and gather statistics).""" - if self._shutoff: - raise RuntimeError('Can not schedule new futures' - ' after being shutdown') - return self._gatherer.submit(fn, *args, **kwargs) - - def _submit(self, fn, *args, **kwargs): - fut = self._future_cls() - self._run_work_func(_utils.WorkItem(fut, fn, args, kwargs)) - return fut - - -class GreenFuture(Future): - __doc__ = Future.__doc__ - - def __init__(self): - super(GreenFuture, self).__init__() - if not _utils.EVENTLET_AVAILABLE: - raise RuntimeError('Eventlet is needed to use a green future') - # NOTE(harlowja): replace the built-in condition with a greenthread - # compatible one so that when getting the result of this future the - # functions will correctly yield to eventlet. If this is not done then - # waiting on the future never actually causes the greenthreads to run - # and thus you wait for infinity. - if not _green.is_monkey_patched('threading'): - self._condition = _green.threading.condition_object() - - -class GreenThreadPoolExecutor(_futures.Executor): - """Executor that uses a green thread pool to execute calls asynchronously. - - See: https://docs.python.org/dev/library/concurrent.futures.html - and http://eventlet.net/doc/modules/greenpool.html for information on - how this works. - - It gathers statistics about the submissions executed for post-analysis... - """ - - threading = _green.threading - - def __init__(self, max_workers=1000, check_and_reject=None): - """Initializes a green thread pool executor. - - :param max_workers: maximum number of workers that can be - simulatenously active at the same time, further - submitted work will be queued up when this limit - is reached. - :type max_workers: int - :param check_and_reject: a callback function that will be provided - two position arguments, the first argument - will be this executor instance, and the second - will be the number of currently queued work - items in this executors backlog; the callback - should raise a :py:class:`.RejectedSubmission` - exception if it wants to have this submission - rejected. - :type check_and_reject: callback - """ - if not _utils.EVENTLET_AVAILABLE: - raise RuntimeError('Eventlet is needed to use a green executor') - if max_workers <= 0: - raise ValueError("Max workers must be greater than zero") - self._max_workers = max_workers - self._pool = _green.Pool(self._max_workers) - self._delayed_work = _green.Queue() - self._check_and_reject = check_and_reject or (lambda e, waiting: None) - self._shutdown_lock = self.threading.lock_object() - self._shutdown = False - self._gatherer = _Gatherer(self._submit, - self.threading.lock_object) - - @property - def alive(self): - """Accessor to determine if the executor is alive/active.""" - return not self._shutdown - - @property - def statistics(self): - """:class:`.ExecutorStatistics` about the executors executions.""" - return self._gatherer.statistics - - def submit(self, fn, *args, **kwargs): - """Submit some work to be executed (and gather statistics). - - :param args: non-keyworded arguments - :type args: list - :param kwargs: key-value arguments - :type kwargs: dictionary - """ - with self._shutdown_lock: - if self._shutdown: - raise RuntimeError('Can not schedule new futures' - ' after being shutdown') - self._check_and_reject(self, self._delayed_work.qsize()) - return self._gatherer.submit(fn, *args, **kwargs) - - def _submit(self, fn, *args, **kwargs): - f = GreenFuture() - work = _utils.WorkItem(f, fn, args, kwargs) - if not self._spin_up(work): - self._delayed_work.put(work) - return f - - def _spin_up(self, work): - """Spin up a greenworker if less than max_workers. - - :param work: work to be given to the greenworker - :returns: whether a green worker was spun up or not - :rtype: boolean - """ - alive = self._pool.running() + self._pool.waiting() - if alive < self._max_workers: - self._pool.spawn_n(_green.GreenWorker(work, self._delayed_work)) - return True - return False - - def shutdown(self, wait=True): - with self._shutdown_lock: - if not self._shutdown: - self._shutdown = True - shutoff = True - else: - shutoff = False - if wait and shutoff: - self._pool.waitall() - self._delayed_work.join() - - -class ExecutorStatistics(object): - """Holds *immutable* information about a executors executions.""" - - __slots__ = ['_failures', '_executed', '_runtime', '_cancelled'] - - _REPR_MSG_TPL = ("") - - def __init__(self, failures=0, executed=0, runtime=0.0, cancelled=0): - self._failures = failures - self._executed = executed - self._runtime = runtime - self._cancelled = cancelled - - @property - def failures(self): - """How many submissions ended up raising exceptions. - - :returns: how many submissions ended up raising exceptions - :rtype: number - """ - return self._failures - - @property - def executed(self): - """How many submissions were executed (failed or not). - - :returns: how many submissions were executed - :rtype: number - """ - return self._executed - - @property - def runtime(self): - """Total runtime of all submissions executed (failed or not). - - :returns: total runtime of all submissions executed - :rtype: number - """ - return self._runtime - - @property - def cancelled(self): - """How many submissions were cancelled before executing. - - :returns: how many submissions were cancelled before executing - :rtype: number - """ - return self._cancelled - - @property - def average_runtime(self): - """The average runtime of all submissions executed. - - :returns: average runtime of all submissions executed - :rtype: number - :raises: ZeroDivisionError when no executions have occurred. - """ - return self._runtime / self._executed - - def __repr__(self): - return self._REPR_MSG_TPL % ({ - 'ident': id(self), - 'failures': self._failures, - 'executed': self._executed, - 'runtime': self._runtime, - 'cancelled': self._cancelled, - }) diff --git a/futurist/_green.py b/futurist/_green.py deleted file mode 100644 index ef2c2d5..0000000 --- a/futurist/_green.py +++ /dev/null @@ -1,84 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from futurist import _utils - -try: - from eventlet import greenpool - from eventlet import patcher as greenpatcher - from eventlet import queue as greenqueue - - from eventlet.green import threading as greenthreading -except ImportError: - greenpatcher, greenpool, greenqueue, greenthreading = (None, None, - None, None) - - -if _utils.EVENTLET_AVAILABLE: - # Aliases that we use and only expose vs the whole of eventlet... - Pool = greenpool.GreenPool - Queue = greenqueue.Queue - is_monkey_patched = greenpatcher.is_monkey_patched - - class GreenThreading(object): - - @staticmethod - def event_object(*args, **kwargs): - return greenthreading.Event(*args, **kwargs) - - @staticmethod - def lock_object(*args, **kwargs): - return greenthreading.Lock(*args, **kwargs) - - @staticmethod - def rlock_object(*args, **kwargs): - return greenthreading.RLock(*args, **kwargs) - - @staticmethod - def condition_object(*args, **kwargs): - return greenthreading.Condition(*args, **kwargs) - - threading = GreenThreading() -else: - threading = None - Pool = None - Queue = None - is_monkey_patched = lambda mod: False - - -class GreenWorker(object): - def __init__(self, work, work_queue): - self.work = work - self.work_queue = work_queue - - def __call__(self): - # Run our main piece of work. - try: - self.work.run() - finally: - # Consume any delayed work before finishing (this is how we finish - # work that was to big for the pool size, but needs to be finished - # no matter). - while True: - try: - w = self.work_queue.get_nowait() - except greenqueue.Empty: - break - else: - try: - w.run() - finally: - self.work_queue.task_done() diff --git a/futurist/_utils.py b/futurist/_utils.py deleted file mode 100644 index 0b9fce5..0000000 --- a/futurist/_utils.py +++ /dev/null @@ -1,142 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import inspect -import multiprocessing -import sys -import threading -import traceback - -from monotonic import monotonic as now # noqa -import six - -try: - import eventlet as _eventlet # noqa - EVENTLET_AVAILABLE = True -except ImportError: - EVENTLET_AVAILABLE = False - - -class WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - - def run(self): - if not self.future.set_running_or_notify_cancel(): - return - try: - result = self.fn(*self.args, **self.kwargs) - except BaseException: - exc_type, exc_value, exc_tb = sys.exc_info() - try: - if six.PY2: - self.future.set_exception_info(exc_value, exc_tb) - else: - self.future.set_exception(exc_value) - finally: - del(exc_type, exc_value, exc_tb) - else: - self.future.set_result(result) - - -class Failure(object): - """Object that captures a exception (and its associated information).""" - - def __init__(self, retain_tb): - exc_info = sys.exc_info() - if not any(exc_info): - raise RuntimeError("No active exception being handled, can" - " not create a failure which represents" - " nothing") - try: - if retain_tb: - self.exc_info = tuple(exc_info) - self.traceback = None - else: - self.exc_info = (exc_info[0], exc_info[1], None) - self.traceback = "".join(traceback.format_exception(*exc_info)) - finally: - del exc_info - - -def get_callback_name(cb): - """Tries to get a callbacks fully-qualified name. - - If no name can be produced ``repr(cb)`` is called and returned. - """ - segments = [] - try: - segments.append(cb.__qualname__) - except AttributeError: - try: - segments.append(cb.__name__) - if inspect.ismethod(cb): - try: - # This attribute doesn't exist on py3.x or newer, so - # we optionally ignore it... (on those versions of - # python `__qualname__` should have been found anyway). - segments.insert(0, cb.im_class.__name__) - except AttributeError: - pass - except AttributeError: - pass - if not segments: - return repr(cb) - else: - try: - # When running under sphinx it appears this can be none? - if cb.__module__: - segments.insert(0, cb.__module__) - except AttributeError: - pass - return ".".join(segments) - - -def get_optimal_thread_count(default=2): - """Try to guess optimal thread count for current system.""" - try: - return multiprocessing.cpu_count() + 1 - except NotImplementedError: - # NOTE(harlowja): apparently may raise so in this case we will - # just setup two threads since it's hard to know what else we - # should do in this situation. - return default - - -class Barrier(object): - """A class that ensures active <= 0 occur before unblocking.""" - - def __init__(self, cond_cls=threading.Condition): - self._active = 0 - self._cond = cond_cls() - - def incr(self): - with self._cond: - self._active += 1 - self._cond.notify_all() - - def decr(self): - with self._cond: - self._active -= 1 - self._cond.notify_all() - - def wait(self): - with self._cond: - while self._active > 0: - self._cond.wait() diff --git a/futurist/periodics.py b/futurist/periodics.py deleted file mode 100644 index 4762735..0000000 --- a/futurist/periodics.py +++ /dev/null @@ -1,827 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import fractions -import functools -import heapq -import inspect -import logging -import math -import threading - -# For: https://wiki.openstack.org/wiki/Security/Projects/Bandit -from random import SystemRandom as random - -from concurrent import futures -import six - -import futurist -from futurist import _utils as utils - -LOG = logging.getLogger(__name__) - -_REQUIRED_ATTRS = ('_is_periodic', '_periodic_spacing', - '_periodic_run_immediately') - -# Constants that are used to determine what 'kind' the current callback -# is being ran as. -PERIODIC = 'periodic' -IMMEDIATE = 'immediate' - - -class Watcher(object): - """A **read-only** object representing a periodics callbacks activities.""" - - _REPR_MSG_TPL = ("") - - def __init__(self, metrics): - self._metrics = metrics - - def __repr__(self): - return self._REPR_MSG_TPL % dict(ident=id(self), **self._metrics) - - @property - def runs(self): - """How many times the periodic callback has been ran.""" - return self._metrics['runs'] - - @property - def successes(self): - """How many times the periodic callback ran successfully.""" - return self._metrics['successes'] - - @property - def failures(self): - """How many times the periodic callback ran unsuccessfully.""" - return self._metrics['failures'] - - @property - def elapsed(self): - """Total amount of time the periodic callback has ran for.""" - return self._metrics['elapsed'] - - @property - def elapsed_waiting(self): - """Total amount of time the periodic callback has waited to run for.""" - return self._metrics['elapsed_waiting'] - - @property - def average_elapsed_waiting(self): - """Avg. amount of time the periodic callback has waited to run for. - - This may raise a ``ZeroDivisionError`` if there has been no runs. - """ - return self._metrics['elapsed_waiting'] / self._metrics['runs'] - - @property - def average_elapsed(self): - """Avg. amount of time the periodic callback has ran for. - - This may raise a ``ZeroDivisionError`` if there has been no runs. - """ - return self._metrics['elapsed'] / self._metrics['runs'] - - -def _check_attrs(obj): - """Checks that a periodic function/method has all the expected attributes. - - This will return the expected attributes that were **not** found. - """ - missing_attrs = [] - for attr_name in _REQUIRED_ATTRS: - if not hasattr(obj, attr_name): - missing_attrs.append(attr_name) - return missing_attrs - - -def is_periodic(obj): - """Check whether an object is a valid periodic callable. - - :param obj: object to inspect - :type obj: anything - :return: True if obj is a periodic task, otherwise False - """ - return callable(obj) and not _check_attrs(obj) - - -def periodic(spacing, run_immediately=False, enabled=True): - """Tags a method/function as wanting/able to execute periodically. - - :param spacing: how often to run the decorated function (required) - :type spacing: float/int - :param run_immediately: option to specify whether to run - immediately or wait until the spacing provided has - elapsed before running for the first time - :type run_immediately: boolean - :param enabled: whether the task is enabled to run - :type enabled: boolean - """ - - if spacing <= 0: - raise ValueError("Periodicity/spacing must be greater than" - " zero instead of %s" % spacing) - - def wrapper(f): - f._is_periodic = enabled - f._periodic_spacing = spacing - f._periodic_run_immediately = run_immediately - - @six.wraps(f) - def decorator(*args, **kwargs): - return f(*args, **kwargs) - - return decorator - - return wrapper - - -def _add_jitter(max_percent_jitter): - """Wraps a existing strategy and adds jitter to it. - - 0% to 100% of the spacing value will be added to this value to ensure - callbacks do not synchronize. - """ - if max_percent_jitter > 1 or max_percent_jitter < 0: - raise ValueError("Invalid 'max_percent_jitter', must be greater or" - " equal to 0.0 and less than or equal to 1.0") - - def wrapper(func): - - @six.wraps(func) - def decorator(cb, metrics, now=None): - next_run = func(cb, metrics, now=now) - how_often = cb._periodic_spacing - jitter = how_often * (random.random() * max_percent_jitter) - return next_run + jitter - - decorator.__name__ += "_with_jitter" - return decorator - - return wrapper - - -def _last_finished_strategy(cb, started_at, finished_at, metrics): - # Determine when the callback should next run based on when it was - # last finished **only** given metrics about this information. - how_often = cb._periodic_spacing - return finished_at + how_often - - -def _last_started_strategy(cb, started_at, finished_at, metrics): - # Determine when the callback should next run based on when it was - # last started **only** given metrics about this information. - how_often = cb._periodic_spacing - return started_at + how_often - - -def _aligned_last_finished_strategy(cb, started_at, finished_at, metrics): - # Determine when the callback should next run based on when it was - # last finished **only** where the last finished time is first aligned to - # be a multiple of the expected spacing (so that no matter how long or - # how short the callback takes it is always ran on its next aligned - # to spacing time). - how_often = cb._periodic_spacing - aligned_finished_at = finished_at - math.fmod(finished_at, how_often) - return aligned_finished_at + how_often - - -def _now_plus_periodicity(cb, now): - how_often = cb._periodic_spacing - return how_often + now - - -class _Schedule(object): - """Internal heap-based structure that maintains the schedule/ordering. - - This stores a heap composed of the following ``(next_run, index)`` where - ``next_run`` is the next desired runtime for the callback that is stored - somewhere with the index provided. The index is saved so that if two - functions with the same ``next_run`` time are inserted, that the one with - the smaller index is preferred (it is also saved so that on pop we can - know what the index of the callback we should call is). - """ - - def __init__(self): - self._ordering = [] - - def push(self, next_run, index): - heapq.heappush(self._ordering, (next_run, index)) - - def __len__(self): - return len(self._ordering) - - def pop(self): - return heapq.heappop(self._ordering) - - -def _on_failure_log(log, cb, kind, spacing, exc_info, traceback=None): - cb_name = utils.get_callback_name(cb) - if all(exc_info) or not traceback: - log.error("Failed to call %s '%s' (it runs every %0.2f" - " seconds)", kind, cb_name, spacing, exc_info=exc_info) - else: - log.error("Failed to call %s '%s' (it runs every %0.2f" - " seconds):\n%s", kind, cb_name, spacing, traceback) - - -def _run_callback_retain(now_func, cb, *args, **kwargs): - # NOTE(harlowja): this needs to be a module level function so that the - # process pool execution can locate it (it can't be a lambda or method - # local function because it won't be able to find those). - failure = None - started_at = now_func() - try: - cb(*args, **kwargs) - except Exception: - # Until https://bugs.python.org/issue24451 is merged we have to - # capture and return the failure, so that we can have reliable - # timing information. - failure = utils.Failure(True) - finished_at = now_func() - return (started_at, finished_at, failure) - - -def _run_callback_no_retain(now_func, cb, *args, **kwargs): - # NOTE(harlowja): this needs to be a module level function so that the - # process pool execution can locate it (it can't be a lambda or method - # local function because it won't be able to find those). - failure = None - started_at = now_func() - try: - cb(*args, **kwargs) - except Exception: - # Until https://bugs.python.org/issue24451 is merged we have to - # capture and return the failure, so that we can have reliable - # timing information. - failure = utils.Failure(False) - finished_at = now_func() - return (started_at, finished_at, failure) - - -def _build(now_func, callables, next_run_scheduler): - schedule = _Schedule() - now = None - immediates = collections.deque() - for index, (cb, _cb_name, args, kwargs) in enumerate(callables): - if cb._periodic_run_immediately: - immediates.append(index) - else: - if now is None: - now = now_func() - next_run = next_run_scheduler(cb, now) - schedule.push(next_run, index) - return immediates, schedule - - -_SCHEDULE_RETRY_EXCEPTIONS = (RuntimeError, futurist.RejectedSubmission) - - -class ExecutorFactory(object): - """Base class for any executor factory.""" - - shutdown = True - """Whether the executor should be shut down on periodic worker stop.""" - - def __call__(self): - """Return the executor to be used.""" - raise NotImplementedError() - - -class ExistingExecutor(ExecutorFactory): - """An executor factory returning the existing object.""" - - def __init__(self, executor, shutdown=False): - self._executor = executor - self.shutdown = shutdown - - def __call__(self): - return self._executor - - -class PeriodicWorker(object): - """Calls a collection of callables periodically (sleeping as needed...). - - NOTE(harlowja): typically the :py:meth:`.start` method is executed in a - background thread so that the periodic callables are executed in - the background/asynchronously (using the defined periods to determine - when each is called). - """ - - #: Max amount of time to wait when running (forces a wakeup when elapsed). - MAX_LOOP_IDLE = 30 - - _NO_OP_ARGS = () - _NO_OP_KWARGS = {} - _INITIAL_METRICS = { - 'runs': 0, - 'elapsed': 0, - 'elapsed_waiting': 0, - 'failures': 0, - 'successes': 0, - } - - # When scheduling fails temporary, use a random delay between 0.9-1.1 sec. - _RESCHEDULE_DELAY = 0.9 - _RESCHEDULE_JITTER = 0.2 - - DEFAULT_JITTER = fractions.Fraction(5, 100) - """ - Default jitter percentage the built-in strategies (that have jitter - support) will use. - """ - - BUILT_IN_STRATEGIES = { - 'last_started': ( - _last_started_strategy, - _now_plus_periodicity, - ), - 'last_started_jitter': ( - _add_jitter(DEFAULT_JITTER)(_last_started_strategy), - _now_plus_periodicity, - ), - 'last_finished': ( - _last_finished_strategy, - _now_plus_periodicity, - ), - 'last_finished_jitter': ( - _add_jitter(DEFAULT_JITTER)(_last_finished_strategy), - _now_plus_periodicity, - ), - 'aligned_last_finished': ( - _aligned_last_finished_strategy, - _now_plus_periodicity, - ), - 'aligned_last_finished_jitter': ( - _add_jitter(DEFAULT_JITTER)(_aligned_last_finished_strategy), - _now_plus_periodicity, - ), - } - """ - Built in scheduling strategies (used to determine when next to run - a periodic callable). - - The first element is the strategy to use after the initial start - and the second element is the strategy to use for the initial start. - - These are made somewhat pluggable so that we can *easily* add-on - different types later (perhaps one that uses a cron-style syntax - for example). - """ - - @classmethod - def create(cls, objects, exclude_hidden=True, - log=None, executor_factory=None, - cond_cls=threading.Condition, event_cls=threading.Event, - schedule_strategy='last_started', now_func=utils.now, - on_failure=None, args=_NO_OP_ARGS, kwargs=_NO_OP_KWARGS): - """Automatically creates a worker by analyzing object(s) methods. - - Only picks up methods that have been tagged/decorated with - the :py:func:`.periodic` decorator (does not match against private - or protected methods unless explicitly requested to). - - :param objects: the objects to introspect for decorated members - :type objects: iterable - :param exclude_hidden: exclude hidden members (ones that start with - an underscore) - :type exclude_hidden: bool - :param log: logger to use when creating a new worker (defaults - to the module logger if none provided), it is currently - only used to report callback failures (if they occur) - :type log: logger - :param executor_factory: factory callable that can be used to generate - executor objects that will be used to - run the periodic callables (if none is - provided one will be created that uses - the :py:class:`~futurist.SynchronousExecutor` - class) - :type executor_factory: ExecutorFactory or any callable - :param cond_cls: callable object that can - produce ``threading.Condition`` - (or compatible/equivalent) objects - :type cond_cls: callable - :param event_cls: callable object that can produce ``threading.Event`` - (or compatible/equivalent) objects - :type event_cls: callable - :param schedule_strategy: string to select one of the built-in - strategies that can return the - next time a callable should run - :type schedule_strategy: string - :param now_func: callable that can return the current time offset - from some point (used in calculating elapsed times - and next times to run); preferably this is - monotonically increasing - :type now_func: callable - :param on_failure: callable that will be called whenever a periodic - function fails with an error, it will be provided - four positional arguments and one keyword - argument, the first positional argument being the - callable that failed, the second being the type - of activity under which it failed (IMMEDIATE or - PERIODIC), the third being the spacing that the - callable runs at and the fourth `exc_info` tuple - of the failure. The keyword argument 'traceback' - will also be provided that may be be a string - that caused the failure (this is required for - executors which run out of process, as those can not - transfer stack frames across process boundaries); if - no callable is provided then a default failure - logging function will be used instead, do note that - any user provided callable should not raise - exceptions on being called - :type on_failure: callable - :param args: positional arguments to be passed to all callables - :type args: tuple - :param kwargs: keyword arguments to be passed to all callables - :type kwargs: dict - """ - callables = [] - for obj in objects: - for (name, member) in inspect.getmembers(obj): - if name.startswith("_") and exclude_hidden: - continue - if six.callable(member): - missing_attrs = _check_attrs(member) - if not missing_attrs: - callables.append((member, args, kwargs)) - return cls(callables, log=log, executor_factory=executor_factory, - cond_cls=cond_cls, event_cls=event_cls, - schedule_strategy=schedule_strategy, now_func=now_func, - on_failure=on_failure) - - def __init__(self, callables, log=None, executor_factory=None, - cond_cls=threading.Condition, event_cls=threading.Event, - schedule_strategy='last_started', now_func=utils.now, - on_failure=None): - """Creates a new worker using the given periodic callables. - - :param callables: a iterable of tuple objects previously decorated - with the :py:func:`.periodic` decorator, each item - in the iterable is expected to be in the format - of ``(cb, args, kwargs)`` where ``cb`` is the - decorated function and ``args`` and ``kwargs`` are - any positional and keyword arguments to send into - the callback when it is activated (both ``args`` - and ``kwargs`` may be provided as none to avoid - using them) - :type callables: iterable - :param log: logger to use when creating a new worker (defaults - to the module logger if none provided), it is currently - only used to report callback failures (if they occur) - :type log: logger - :param executor_factory: factory callable that can be used to generate - executor objects that will be used to - run the periodic callables (if none is - provided one will be created that uses - the :py:class:`~futurist.SynchronousExecutor` - class) - :type executor_factory: ExecutorFactory or any callable - :param cond_cls: callable object that can - produce ``threading.Condition`` - (or compatible/equivalent) objects - :type cond_cls: callable - :param event_cls: callable object that can produce ``threading.Event`` - (or compatible/equivalent) objects - :type event_cls: callable - :param schedule_strategy: string to select one of the built-in - strategies that can return the - next time a callable should run - :type schedule_strategy: string - :param now_func: callable that can return the current time offset - from some point (used in calculating elapsed times - and next times to run); preferably this is - monotonically increasing - :type now_func: callable - :param on_failure: callable that will be called whenever a periodic - function fails with an error, it will be provided - four positional arguments and one keyword - argument, the first positional argument being the - callable that failed, the second being the type - of activity under which it failed (IMMEDIATE or - PERIODIC), the third being the spacing that the - callable runs at and the fourth `exc_info` tuple - of the failure. The keyword argument 'traceback' - will also be provided that may be be a string - that caused the failure (this is required for - executors which run out of process, as those can not - transfer stack frames across process boundaries); if - no callable is provided then a default failure - logging function will be used instead, do note that - any user provided callable should not raise - exceptions on being called - :type on_failure: callable - """ - self._tombstone = event_cls() - self._waiter = cond_cls() - self._dead = event_cls() - self._active = event_cls() - self._cond_cls = cond_cls - self._watchers = [] - self._callables = [] - for (cb, args, kwargs) in callables: - if not six.callable(cb): - raise ValueError("Periodic callback %r must be callable" % cb) - missing_attrs = _check_attrs(cb) - if missing_attrs: - raise ValueError("Periodic callback %r missing required" - " attributes %s" % (cb, missing_attrs)) - if cb._is_periodic: - # Ensure these aren't none and if so replace them with - # something more appropriate... - if args is None: - args = self._NO_OP_ARGS - if kwargs is None: - kwargs = self._NO_OP_KWARGS - cb_name = utils.get_callback_name(cb) - cb_metrics = self._INITIAL_METRICS.copy() - watcher = Watcher(cb_metrics) - self._callables.append((cb, cb_name, args, kwargs)) - self._watchers.append((cb_metrics, watcher)) - try: - strategy = self.BUILT_IN_STRATEGIES[schedule_strategy] - self._schedule_strategy = strategy[0] - self._initial_schedule_strategy = strategy[1] - except KeyError: - valid_strategies = sorted(self.BUILT_IN_STRATEGIES.keys()) - raise ValueError("Scheduling strategy '%s' must be one of" - " %s selectable strategies" - % (schedule_strategy, valid_strategies)) - self._immediates, self._schedule = _build( - now_func, self._callables, self._initial_schedule_strategy) - self._log = log or LOG - if executor_factory is None: - executor_factory = lambda: futurist.SynchronousExecutor() - self._on_failure = functools.partial(_on_failure_log, self._log) - self._executor_factory = executor_factory - self._now_func = now_func - - def __len__(self): - """How many callables are currently active.""" - return len(self._callables) - - def _run(self, executor, runner): - """Main worker run loop.""" - barrier = utils.Barrier(cond_cls=self._cond_cls) - - def _process_scheduled(): - # Figure out when we should run next (by selecting the - # minimum item from the heap, where the minimum should be - # the callable that needs to run next and has the lowest - # next desired run time). - with self._waiter: - while (not self._schedule and - not self._tombstone.is_set() and - not self._immediates): - self._waiter.wait(self.MAX_LOOP_IDLE) - if self._tombstone.is_set(): - # We were requested to stop, so stop. - return - if self._immediates: - # This will get processed in _process_immediates() - # in the next loop call. - return - submitted_at = now = self._now_func() - next_run, index = self._schedule.pop() - when_next = next_run - now - if when_next <= 0: - # Run & schedule its next execution. - cb, cb_name, args, kwargs = self._callables[index] - self._log.debug("Submitting periodic function '%s'", - cb_name) - try: - fut = executor.submit(runner, - self._now_func, - cb, *args, **kwargs) - except _SCHEDULE_RETRY_EXCEPTIONS as exc: - # Restart after a short delay - delay = (self._RESCHEDULE_DELAY + - random().random() * self._RESCHEDULE_JITTER) - self._log.error("Failed to submit periodic function " - "'%s', retrying after %.2f sec. " - "Error: %s", - cb_name, delay, exc) - self._schedule.push(self._now_func() + delay, - index) - else: - barrier.incr() - fut.add_done_callback(functools.partial(_on_done, - PERIODIC, - cb, cb_name, - index, - submitted_at)) - fut.add_done_callback(lambda _fut: barrier.decr()) - else: - # Gotta wait... - self._schedule.push(next_run, index) - when_next = min(when_next, self.MAX_LOOP_IDLE) - self._waiter.wait(when_next) - - def _process_immediates(): - try: - index = self._immediates.popleft() - except IndexError: - pass - else: - cb, cb_name, args, kwargs = self._callables[index] - submitted_at = self._now_func() - self._log.debug("Submitting immediate function '%s'", cb_name) - try: - fut = executor.submit(runner, self._now_func, - cb, *args, **kwargs) - except _SCHEDULE_RETRY_EXCEPTIONS as exc: - self._log.error("Failed to submit immediate function " - "'%s', retrying. Error: %s", cb_name, exc) - # Restart as soon as possible - self._immediates.append(index) - else: - barrier.incr() - fut.add_done_callback(functools.partial(_on_done, - IMMEDIATE, - cb, cb_name, - index, - submitted_at)) - fut.add_done_callback(lambda _fut: barrier.decr()) - - def _on_done(kind, cb, cb_name, index, submitted_at, fut): - started_at, finished_at, failure = fut.result() - cb_metrics, _watcher = self._watchers[index] - cb_metrics['runs'] += 1 - if failure is not None: - cb_metrics['failures'] += 1 - self._on_failure(cb, kind, cb._periodic_spacing, - failure.exc_info, traceback=failure.traceback) - else: - cb_metrics['successes'] += 1 - elapsed = max(0, finished_at - started_at) - elapsed_waiting = max(0, started_at - submitted_at) - cb_metrics['elapsed'] += elapsed - cb_metrics['elapsed_waiting'] += elapsed_waiting - next_run = self._schedule_strategy(cb, - started_at, finished_at, - cb_metrics) - with self._waiter: - self._schedule.push(next_run, index) - self._waiter.notify_all() - - try: - while not self._tombstone.is_set(): - _process_immediates() - _process_scheduled() - finally: - barrier.wait() - - def _on_finish(self): - # TODO(harlowja): this may be to verbose for people? - if not self._log.isEnabledFor(logging.DEBUG): - return - watcher_it = self.iter_watchers() - for index, watcher in enumerate(watcher_it): - cb, cb_name, _args, _kwargs = self._callables[index] - self._log.debug("Stopped running callback[%s] '%s' periodically:", - index, cb_name) - self._log.debug(" Periodicity = %ss", cb._periodic_spacing) - self._log.debug(" Runs = %s", watcher.runs) - self._log.debug(" Failures = %s", watcher.failures) - self._log.debug(" Successes = %s", watcher.successes) - try: - self._log.debug(" Average elapsed = %0.4fs", - watcher.average_elapsed) - self._log.debug(" Average elapsed waiting = %0.4fs", - watcher.average_elapsed_waiting) - except ZeroDivisionError: - pass - - def add(self, cb, *args, **kwargs): - """Adds a new periodic callback to the current worker. - - Returns a :py:class:`.Watcher` if added successfully or the value - ``None`` if not (or raises a ``ValueError`` if the callback is not - correctly formed and/or decorated). - - :param cb: a callable object/method/function previously decorated - with the :py:func:`.periodic` decorator - :type cb: callable - """ - if not six.callable(cb): - raise ValueError("Periodic callback %r must be callable" % cb) - missing_attrs = _check_attrs(cb) - if missing_attrs: - raise ValueError("Periodic callback %r missing required" - " attributes %s" % (cb, missing_attrs)) - if not cb._is_periodic: - return None - now = self._now_func() - with self._waiter: - cb_index = len(self._callables) - cb_name = utils.get_callback_name(cb) - cb_metrics = self._INITIAL_METRICS.copy() - watcher = Watcher(cb_metrics) - self._callables.append((cb, cb_name, args, kwargs)) - self._watchers.append((cb_metrics, watcher)) - if cb._periodic_run_immediately: - self._immediates.append(cb_index) - else: - next_run = self._initial_schedule_strategy(cb, now) - self._schedule.push(next_run, cb_index) - self._waiter.notify_all() - return watcher - - def start(self, allow_empty=False): - """Starts running (will not return until :py:meth:`.stop` is called). - - :param allow_empty: instead of running with no callbacks raise when - this worker has no contained callables (this can be - set to true and :py:meth:`.add` can be used to add - new callables on demand), note that when enabled - and no callbacks exist this will block and - sleep (until either stopped or callbacks are - added) - :type allow_empty: bool - """ - if not self._callables and not allow_empty: - raise RuntimeError("A periodic worker can not start" - " without any callables") - if self._active.is_set(): - raise RuntimeError("A periodic worker can not be started" - " twice") - executor = self._executor_factory() - # NOTE(harlowja): we compare with the futures process pool executor - # since its the base type of futurist ProcessPoolExecutor and it is - # possible for users to pass in there own custom executors, this one - # is known to not be able to retain tracebacks... - if isinstance(executor, futures.ProcessPoolExecutor): - # Pickling a traceback will not work, so do not try to do it... - # - # Avoids 'TypeError: can't pickle traceback objects' - runner = _run_callback_no_retain - else: - runner = _run_callback_retain - self._dead.clear() - self._active.set() - try: - self._run(executor, runner) - finally: - if getattr(self._executor_factory, 'shutdown', True): - executor.shutdown() - self._dead.set() - self._active.clear() - self._on_finish() - - def stop(self): - """Sets the tombstone (this stops any further executions).""" - with self._waiter: - self._tombstone.set() - self._waiter.notify_all() - - def iter_watchers(self): - """Iterator/generator over all the currently maintained watchers.""" - for _cb_metrics, watcher in self._watchers: - yield watcher - - def reset(self): - """Resets the workers internal state.""" - self._tombstone.clear() - self._dead.clear() - for cb_metrics, _watcher in self._watchers: - for k in list(six.iterkeys(cb_metrics)): - # NOTE(harlowja): mutate the original dictionaries keys - # so that the watcher (which references the same dictionary - # keys) is able to see those changes. - cb_metrics[k] = 0 - self._immediates, self._schedule = _build( - self._now_func, self._callables, self._initial_schedule_strategy) - - def wait(self, timeout=None): - """Waits for the :py:meth:`.start` method to gracefully exit. - - An optional timeout can be provided, which will cause the method to - return within the specified timeout. If the timeout is reached, the - returned value will be False. - - :param timeout: Maximum number of seconds that the :meth:`.wait` - method should block for - :type timeout: float/int - """ - self._dead.wait(timeout) - return self._dead.is_set() diff --git a/futurist/rejection.py b/futurist/rejection.py deleted file mode 100644 index 5bb5105..0000000 --- a/futurist/rejection.py +++ /dev/null @@ -1,32 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Executor rejection strategies.""" - -import futurist - - -def reject_when_reached(max_backlog): - """Returns a function that will raise when backlog goes past max size.""" - - def _rejector(executor, backlog): - if backlog >= max_backlog: - raise futurist.RejectedSubmission("Current backlog %s is not" - " allowed to go" - " beyond %s" % (backlog, - max_backlog)) - - return _rejector diff --git a/futurist/tests/__init__.py b/futurist/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/futurist/tests/base.py b/futurist/tests/base.py deleted file mode 100644 index 3054386..0000000 --- a/futurist/tests/base.py +++ /dev/null @@ -1,22 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright 2010-2011 OpenStack Foundation -# Copyright (c) 2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslotest import base - - -class TestCase(base.BaseTestCase): - """Test case base class for all unit tests.""" diff --git a/futurist/tests/test_executors.py b/futurist/tests/test_executors.py deleted file mode 100644 index c1ee549..0000000 --- a/futurist/tests/test_executors.py +++ /dev/null @@ -1,168 +0,0 @@ -# -*- coding: utf-8 -*- - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import time - -import testscenarios -from testtools import testcase - -import futurist -from futurist import rejection -from futurist.tests import base - - -# Module level functions need to be used since the process pool -# executor can not access instance or lambda level functions (since those -# are not pickleable). - -def returns_one(): - return 1 - - -def blows_up(): - raise RuntimeError("no worky") - - -def delayed(wait_secs): - time.sleep(wait_secs) - - -class TestExecutors(testscenarios.TestWithScenarios, base.TestCase): - scenarios = [ - ('sync', {'executor_cls': futurist.SynchronousExecutor, - 'restartable': True, 'executor_kwargs': {}}), - ('green_sync', {'executor_cls': futurist.SynchronousExecutor, - 'restartable': True, - 'executor_kwargs': {'green': True}}), - ('green', {'executor_cls': futurist.GreenThreadPoolExecutor, - 'restartable': False, 'executor_kwargs': {}}), - ('thread', {'executor_cls': futurist.ThreadPoolExecutor, - 'restartable': False, 'executor_kwargs': {}}), - ('process', {'executor_cls': futurist.ProcessPoolExecutor, - 'restartable': False, 'executor_kwargs': {}}), - ] - - def setUp(self): - super(TestExecutors, self).setUp() - self.executor = self.executor_cls(**self.executor_kwargs) - - def tearDown(self): - super(TestExecutors, self).tearDown() - self.executor.shutdown() - self.executor = None - - def test_run_one(self): - fut = self.executor.submit(returns_one) - self.assertEqual(1, fut.result()) - self.assertTrue(fut.done()) - - def test_blows_up(self): - fut = self.executor.submit(blows_up) - self.assertRaises(RuntimeError, fut.result) - self.assertIsInstance(fut.exception(), RuntimeError) - - def test_gather_stats(self): - self.executor.submit(blows_up) - self.executor.submit(delayed, 0.2) - self.executor.submit(returns_one) - self.executor.shutdown() - - self.assertEqual(3, self.executor.statistics.executed) - self.assertEqual(1, self.executor.statistics.failures) - self.assertGreaterEqual(self.executor.statistics.runtime, - # It appears that the the thread run loop - # may call this before 0.2 seconds (or 0.2 - # will not be represented as a float correctly) - # is really up so accommodate for that - # happening... - 0.199) - - def test_post_shutdown_raises(self): - executor = self.executor_cls(**self.executor_kwargs) - executor.shutdown() - self.assertRaises(RuntimeError, executor.submit, returns_one) - - def test_restartable(self): - if not self.restartable: - raise testcase.TestSkipped("not restartable") - else: - executor = self.executor_cls(**self.executor_kwargs) - fut = executor.submit(returns_one) - self.assertEqual(1, fut.result()) - executor.shutdown() - self.assertEqual(1, executor.statistics.executed) - - self.assertRaises(RuntimeError, executor.submit, returns_one) - - executor.restart() - self.assertEqual(0, executor.statistics.executed) - fut = executor.submit(returns_one) - self.assertEqual(1, fut.result()) - self.assertEqual(1, executor.statistics.executed) - executor.shutdown() - - def test_alive(self): - with self.executor_cls(**self.executor_kwargs) as executor: - self.assertTrue(executor.alive) - self.assertFalse(executor.alive) - - def test_done_callback(self): - happy_completed = [] - unhappy_completed = [] - - def on_done(fut): - if fut.exception(): - unhappy_completed.append(fut) - else: - happy_completed.append(fut) - - for i in range(0, 10): - if i % 2 == 0: - fut = self.executor.submit(returns_one) - else: - fut = self.executor.submit(blows_up) - fut.add_done_callback(on_done) - - self.executor.shutdown() - self.assertEqual(10, len(happy_completed) + len(unhappy_completed)) - self.assertEqual(5, len(unhappy_completed)) - self.assertEqual(5, len(happy_completed)) - - -_REJECTION = rejection.reject_when_reached(1) - - -class TestRejection(testscenarios.TestWithScenarios, base.TestCase): - scenarios = [ - ('green', {'executor_cls': futurist.GreenThreadPoolExecutor, - 'executor_kwargs': {'check_and_reject': _REJECTION, - 'max_workers': 1}}), - ('thread', {'executor_cls': futurist.ThreadPoolExecutor, - 'executor_kwargs': {'check_and_reject': _REJECTION, - 'max_workers': 1}}), - ] - - def setUp(self): - super(TestRejection, self).setUp() - self.executor = self.executor_cls(**self.executor_kwargs) - - def test_rejection(self): - self.addCleanup(self.executor.shutdown) - - # 1 worker + 1 item of backlog - for _i in range(2): - self.executor.submit(delayed, 0.5) - - self.assertRaises(futurist.RejectedSubmission, - self.executor.submit, returns_one) diff --git a/futurist/tests/test_periodics.py b/futurist/tests/test_periodics.py deleted file mode 100644 index a09e06a..0000000 --- a/futurist/tests/test_periodics.py +++ /dev/null @@ -1,385 +0,0 @@ -# -*- coding: utf-8 -*- - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import contextlib -import functools -import threading -import time - -import eventlet -from eventlet.green import threading as green_threading -import mock -import testscenarios - -import futurist -from futurist import periodics -from futurist.tests import base - - -@periodics.periodic(1) -def every_one_sec(cb): - cb() - - -@periodics.periodic(0.5) -def every_half_sec(cb): - cb() - - -@contextlib.contextmanager -def create_destroy_thread(run_what, *args, **kwargs): - t = threading.Thread(target=run_what, args=args, kwargs=kwargs) - t.daemon = True - t.start() - try: - yield - finally: - t.join() - - -@contextlib.contextmanager -def create_destroy_green_thread(run_what, *args, **kwargs): - t = eventlet.spawn(run_what, *args, **kwargs) - try: - yield - finally: - t.wait() - - -class TestPeriodicsStrategies(base.TestCase): - def test_invalids(self): - self.assertRaises(ValueError, - periodics.PeriodicWorker, [], - schedule_strategy='not_a_strategy') - - -class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase): - scenarios = [ - ('sync', {'executor_cls': futurist.SynchronousExecutor, - 'executor_kwargs': {}, - 'create_destroy': create_destroy_thread, - 'sleep': time.sleep, - 'event_cls': threading.Event, - 'worker_kwargs': {}}), - ('thread', {'executor_cls': futurist.ThreadPoolExecutor, - 'executor_kwargs': {'max_workers': 2}, - 'create_destroy': create_destroy_thread, - 'sleep': time.sleep, - 'event_cls': threading.Event, - 'worker_kwargs': {}}), - ('green', {'executor_cls': futurist.GreenThreadPoolExecutor, - 'executor_kwargs': {'max_workers': 10}, - 'sleep': eventlet.sleep, - 'event_cls': green_threading.Event, - 'create_destroy': create_destroy_green_thread, - 'worker_kwargs': {'cond_cls': green_threading.Condition, - 'event_cls': green_threading.Event}}), - ] - - def _test_strategy(self, schedule_strategy, nows, - last_now, expected_next): - nows = list(nows) - ev = self.event_cls() - - def now_func(): - if len(nows) == 1: - ev.set() - return last_now - return nows.pop() - - @periodics.periodic(2, run_immediately=False) - def slow_periodic(): - pass - - callables = [ - (slow_periodic, None, None), - ] - worker_kwargs = self.worker_kwargs.copy() - worker_kwargs['schedule_strategy'] = schedule_strategy - worker_kwargs['now_func'] = now_func - w = periodics.PeriodicWorker(callables, **worker_kwargs) - - with self.create_destroy(w.start): - ev.wait() - w.stop() - - schedule_order = w._schedule._ordering - self.assertEqual([(expected_next, 0)], schedule_order) - - def test_last_finished_strategy(self): - last_now = 3.2 - nows = [ - # Initial schedule building. - 0, - # Worker run loop fetch time (to see how long to wait). - 2, - # Function call start time. - 2, - # Function call end time. - 3, - # Stop. - -1, - ] - nows = list(reversed(nows)) - self._test_strategy('last_finished', nows, last_now, 5.0) - - def test_waiting_immediate_add_processed(self): - ran_at = [] - - @periodics.periodic(0.1, run_immediately=True) - def activated_periodic(): - ran_at.append(time.time()) - - w = periodics.PeriodicWorker([], **self.worker_kwargs) - with self.create_destroy(w.start, allow_empty=True): - # Give some time for the thread to start... - self.sleep(0.5) - w.add(activated_periodic) - while len(ran_at) == 0: - self.sleep(0.1) - w.stop() - - def test_double_start_fail(self): - w = periodics.PeriodicWorker([], **self.worker_kwargs) - with self.create_destroy(w.start, allow_empty=True): - # Give some time for the thread to start... - self.sleep(0.5) - # Now ensure we can't start it again... - self.assertRaises(RuntimeError, w.start) - w.stop() - - def test_last_started_strategy(self): - last_now = 3.2 - nows = [ - # Initial schedule building. - 0, - # Worker run loop fetch time (to see how long to wait). - 2, - # Function call start time. - 2, - # Function call end time. - 3, - # Stop. - -1, - ] - nows = list(reversed(nows)) - self._test_strategy('last_started', nows, last_now, 4.0) - - def test_aligned_strategy(self): - last_now = 5.5 - nows = [ - # Initial schedule building. - 0, - # Worker run loop fetch time (to see how long to wait). - 2, - # Function call start time. - 2, - # Function call end time. - 5, - # Stop. - -1, - ] - nows = list(reversed(nows)) - self._test_strategy('aligned_last_finished', nows, last_now, 6.0) - - def test_add_on_demand(self): - called = set() - - def cb(name): - called.add(name) - - callables = [] - for i in range(0, 10): - i_cb = functools.partial(cb, '%s_has_called' % i) - callables.append((every_half_sec, (i_cb,), {})) - - leftover_callables = list(callables) - w = periodics.PeriodicWorker([], **self.worker_kwargs) - with self.create_destroy(w.start, allow_empty=True): - # NOTE(harlowja): if this never happens, the test will fail - # eventually, with a timeout error..., probably can make it fail - # slightly faster in the future... - while len(called) != len(callables): - if leftover_callables: - cb, args, kwargs = leftover_callables.pop() - w.add(cb, *args, **kwargs) - self.sleep(0.1) - w.stop() - - def test_disabled(self): - - @periodics.periodic(0.5, enabled=False) - def no_add_me(): - pass - - @periodics.periodic(0.5) - def add_me(): - pass - - w = periodics.PeriodicWorker([], **self.worker_kwargs) - self.assertEqual(0, len(w)) - self.assertIsNone(w.add(no_add_me)) - self.assertEqual(0, len(w)) - - self.assertIsNotNone(w.add(add_me)) - self.assertEqual(1, len(w)) - - def test_is_periodic(self): - - @periodics.periodic(0.5, enabled=False) - def no_add_me(): - pass - - @periodics.periodic(0.5) - def add_me(): - pass - - self.assertTrue(periodics.is_periodic(add_me)) - self.assertTrue(periodics.is_periodic(no_add_me)) - self.assertFalse(periodics.is_periodic(self.test_is_periodic)) - self.assertFalse(periodics.is_periodic(42)) - - def test_watcher(self): - - def cb(): - pass - - callables = [ - (every_one_sec, (cb,), None), - (every_half_sec, (cb,), None), - ] - executor_factory = lambda: self.executor_cls(**self.executor_kwargs) - w = periodics.PeriodicWorker(callables, - executor_factory=executor_factory, - **self.worker_kwargs) - with self.create_destroy(w.start): - self.sleep(2.0) - w.stop() - - for watcher in w.iter_watchers(): - self.assertGreaterEqual(watcher.runs, 1) - - w.reset() - for watcher in w.iter_watchers(): - self.assertEqual(watcher.runs, 0) - self.assertEqual(watcher.successes, 0) - self.assertEqual(watcher.failures, 0) - self.assertEqual(watcher.elapsed, 0) - self.assertEqual(watcher.elapsed_waiting, 0) - - def test_worker(self): - called = [] - - def cb(): - called.append(1) - - callables = [ - (every_one_sec, (cb,), None), - (every_half_sec, (cb,), None), - ] - executor = self.executor_cls(**self.executor_kwargs) - executor_factory = lambda: executor - w = periodics.PeriodicWorker(callables, - executor_factory=executor_factory, - **self.worker_kwargs) - with self.create_destroy(w.start): - self.sleep(2.0) - w.stop() - - am_called = sum(called) - self.assertGreaterEqual(am_called, 4) - - self.assertFalse(executor.alive) - - def test_existing_executor(self): - called = [] - - def cb(): - called.append(1) - - callables = [ - (every_one_sec, (cb,), None), - (every_half_sec, (cb,), None), - ] - executor = self.executor_cls(**self.executor_kwargs) - executor_factory = periodics.ExistingExecutor(executor) - w = periodics.PeriodicWorker(callables, - executor_factory=executor_factory, - **self.worker_kwargs) - with self.create_destroy(w.start): - self.sleep(2.0) - w.stop() - - am_called = sum(called) - self.assertGreaterEqual(am_called, 4) - - self.assertTrue(executor.alive) - - def test_create_with_arguments(self): - m = mock.Mock() - - class Object(object): - @periodics.periodic(0.5) - def func1(self, *args, **kwargs): - m(*args, **kwargs) - - executor_factory = lambda: self.executor_cls(**self.executor_kwargs) - w = periodics.PeriodicWorker.create(objects=[Object()], - executor_factory=executor_factory, - args=('foo',), - kwargs={'bar': 'baz'}, - **self.worker_kwargs) - with self.create_destroy(w.start): - self.sleep(2.0) - w.stop() - - m.assert_called_with('foo', bar='baz') - - -class RejectingExecutor(futurist.GreenThreadPoolExecutor): - MAX_REJECTIONS_COUNT = 2 - - def _reject(self, *args): - if self._rejections_count < self.MAX_REJECTIONS_COUNT: - self._rejections_count += 1 - raise futurist.RejectedSubmission() - - def __init__(self): - self._rejections_count = 0 - super(RejectingExecutor, self).__init__(check_and_reject=self._reject) - - -class TestRetrySubmission(base.TestCase): - def test_retry_submission(self): - called = [] - - def cb(): - called.append(1) - - callables = [ - (every_one_sec, (cb,), None), - (every_half_sec, (cb,), None), - ] - w = periodics.PeriodicWorker(callables, - executor_factory=RejectingExecutor, - cond_cls=green_threading.Condition, - event_cls=green_threading.Event) - w._RESCHEDULE_DELAY = 0 - w._RESCHEDULE_JITTER = 0 - with create_destroy_green_thread(w.start): - eventlet.sleep(2.0) - w.stop() - - am_called = sum(called) - self.assertGreaterEqual(am_called, 4) diff --git a/futurist/tests/test_waiters.py b/futurist/tests/test_waiters.py deleted file mode 100644 index e69b3d3..0000000 --- a/futurist/tests/test_waiters.py +++ /dev/null @@ -1,89 +0,0 @@ -# -*- coding: utf-8 -*- - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import time - -import eventlet -import testscenarios - -import futurist -from futurist.tests import base -from futurist import waiters - - -# Module level functions need to be used since the process pool -# executor can not access instance or lambda level functions (since those -# are not pickleable). - -def mini_delay(use_eventlet_sleep=False): - if use_eventlet_sleep: - eventlet.sleep(0.1) - else: - time.sleep(0.1) - return 1 - - -class TestWaiters(testscenarios.TestWithScenarios, base.TestCase): - scenarios = [ - ('sync', {'executor_cls': futurist.SynchronousExecutor, - 'executor_kwargs': {}, 'use_eventlet_sleep': False}), - ('green_sync', {'executor_cls': futurist.SynchronousExecutor, - 'executor_kwargs': {'green': True}, - 'use_eventlet_sleep': True}), - ('green', {'executor_cls': futurist.GreenThreadPoolExecutor, - 'executor_kwargs': {}, 'use_eventlet_sleep': True}), - ('thread', {'executor_cls': futurist.ThreadPoolExecutor, - 'executor_kwargs': {}, 'use_eventlet_sleep': False}), - ('process', {'executor_cls': futurist.ProcessPoolExecutor, - 'executor_kwargs': {}, 'use_eventlet_sleep': False}), - ] - - def setUp(self): - super(TestWaiters, self).setUp() - self.executor = self.executor_cls(**self.executor_kwargs) - - def tearDown(self): - super(TestWaiters, self).tearDown() - self.executor.shutdown() - self.executor = None - - def test_wait_for_any(self): - fs = [] - for _i in range(0, 10): - fs.append(self.executor.submit( - mini_delay, use_eventlet_sleep=self.use_eventlet_sleep)) - all_done_fs = [] - total_fs = len(fs) - while len(all_done_fs) != total_fs: - done, not_done = waiters.wait_for_any(fs) - all_done_fs.extend(done) - fs = not_done - self.assertEqual(total_fs, sum(f.result() for f in all_done_fs)) - - def test_wait_for_all(self): - fs = [] - for _i in range(0, 10): - fs.append(self.executor.submit( - mini_delay, use_eventlet_sleep=self.use_eventlet_sleep)) - done_fs, not_done_fs = waiters.wait_for_all(fs) - self.assertEqual(len(fs), sum(f.result() for f in done_fs)) - self.assertEqual(0, len(not_done_fs)) - - def test_no_mixed_wait_for_any(self): - fs = [futurist.GreenFuture(), futurist.Future()] - self.assertRaises(RuntimeError, waiters.wait_for_any, fs) - - def test_no_mixed_wait_for_all(self): - fs = [futurist.GreenFuture(), futurist.Future()] - self.assertRaises(RuntimeError, waiters.wait_for_all, fs) diff --git a/futurist/waiters.py b/futurist/waiters.py deleted file mode 100644 index 24aeebc..0000000 --- a/futurist/waiters.py +++ /dev/null @@ -1,214 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack - -import collections -import contextlib - -from concurrent import futures -from concurrent.futures import _base -import six - -import futurist -from futurist import _utils - -try: - from eventlet.green import threading as greenthreading -except ImportError: - greenthreading = None - - -#: Named tuple returned from ``wait_for*`` calls. -DoneAndNotDoneFutures = collections.namedtuple( - 'DoneAndNotDoneFutures', 'done not_done') - -_DONE_STATES = frozenset([ - _base.CANCELLED_AND_NOTIFIED, - _base.FINISHED, -]) - - -@contextlib.contextmanager -def _acquire_and_release_futures(fs): - # Do this to ensure that we always get the futures in the same order (aka - # always acquire the conditions in the same order, no matter what; a way - # to avoid dead-lock). - fs = sorted(fs, key=id) - with ExitStack() as stack: - for fut in fs: - stack.enter_context(fut._condition) - yield - - -def _ensure_eventlet(func): - """Decorator that verifies we have the needed eventlet components.""" - - @six.wraps(func) - def wrapper(*args, **kwargs): - if not _utils.EVENTLET_AVAILABLE or greenthreading is None: - raise RuntimeError('Eventlet is needed to wait on green futures') - return func(*args, **kwargs) - - return wrapper - - -def _wait_for(fs, no_green_return_when, on_all_green_cb, - caller_name, timeout=None): - green_fs = sum(1 for f in fs if isinstance(f, futurist.GreenFuture)) - if not green_fs: - done, not_done = futures.wait(fs, timeout=timeout, - return_when=no_green_return_when) - return DoneAndNotDoneFutures(done, not_done) - else: - non_green_fs = len(fs) - green_fs - if non_green_fs: - raise RuntimeError("Can not wait on %s green futures and %s" - " non-green futures in the same" - " `%s` call" % (green_fs, non_green_fs, - caller_name)) - else: - return on_all_green_cb(fs, timeout=timeout) - - -def wait_for_all(fs, timeout=None): - """Wait for all of the futures to complete. - - Works correctly with both green and non-green futures (but not both - together, since this can't be guaranteed to avoid dead-lock due to how - the waiting implementations are different when green threads are being - used). - - Returns pair (done futures, not done futures). - """ - return _wait_for(fs, futures.ALL_COMPLETED, _wait_for_all_green, - 'wait_for_all', timeout=timeout) - - -def wait_for_any(fs, timeout=None): - """Wait for one (**any**) of the futures to complete. - - Works correctly with both green and non-green futures (but not both - together, since this can't be guaranteed to avoid dead-lock due to how - the waiting implementations are different when green threads are being - used). - - Returns pair (done futures, not done futures). - """ - return _wait_for(fs, futures.FIRST_COMPLETED, _wait_for_any_green, - 'wait_for_any', timeout=timeout) - - -class _AllGreenWaiter(object): - """Provides the event that ``_wait_for_all_green`` blocks on.""" - - def __init__(self, pending): - self.event = greenthreading.Event() - self.lock = greenthreading.Lock() - self.pending = pending - - def _decrement_pending(self): - with self.lock: - self.pending -= 1 - if self.pending <= 0: - self.event.set() - - def add_result(self, future): - self._decrement_pending() - - def add_exception(self, future): - self._decrement_pending() - - def add_cancelled(self, future): - self._decrement_pending() - - -class _AnyGreenWaiter(object): - """Provides the event that ``_wait_for_any_green`` blocks on.""" - - def __init__(self): - self.event = greenthreading.Event() - - def add_result(self, future): - self.event.set() - - def add_exception(self, future): - self.event.set() - - def add_cancelled(self, future): - self.event.set() - - -def _partition_futures(fs): - done = set() - not_done = set() - for f in fs: - if f._state in _DONE_STATES: - done.add(f) - else: - not_done.add(f) - return done, not_done - - -def _create_and_install_waiters(fs, waiter_cls, *args, **kwargs): - waiter = waiter_cls(*args, **kwargs) - for f in fs: - f._waiters.append(waiter) - return waiter - - -@_ensure_eventlet -def _wait_for_all_green(fs, timeout=None): - if not fs: - return DoneAndNotDoneFutures(set(), set()) - - with _acquire_and_release_futures(fs): - done, not_done = _partition_futures(fs) - if len(done) == len(fs): - return DoneAndNotDoneFutures(done, not_done) - waiter = _create_and_install_waiters(not_done, - _AllGreenWaiter, - len(not_done)) - waiter.event.wait(timeout) - for f in not_done: - f._waiters.remove(waiter) - - with _acquire_and_release_futures(fs): - done, not_done = _partition_futures(fs) - return DoneAndNotDoneFutures(done, not_done) - - -@_ensure_eventlet -def _wait_for_any_green(fs, timeout=None): - if not fs: - return DoneAndNotDoneFutures(set(), set()) - - with _acquire_and_release_futures(fs): - done, not_done = _partition_futures(fs) - if done: - return DoneAndNotDoneFutures(done, not_done) - waiter = _create_and_install_waiters(fs, _AnyGreenWaiter) - - waiter.event.wait(timeout) - for f in fs: - f._waiters.remove(waiter) - - with _acquire_and_release_futures(fs): - done, not_done = _partition_futures(fs) - return DoneAndNotDoneFutures(done, not_done) diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 5b22cfd..0000000 --- a/requirements.txt +++ /dev/null @@ -1,9 +0,0 @@ -# The order of packages is significant, because pip processes them in the order -# of appearance. Changing the order has an impact on the overall integration -# process, which may cause wedges in the gate later. - -pbr>=1.6 # Apache-2.0 -six>=1.9.0 # MIT -monotonic>=0.6 # Apache-2.0 -futures>=3.0;python_version=='2.7' or python_version=='2.6' # BSD -contextlib2>=0.4.0 # PSF License diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 5461f96..0000000 --- a/setup.cfg +++ /dev/null @@ -1,51 +0,0 @@ -[metadata] -name = futurist -summary = Useful additions to futures, from the future. -description-file = - README.rst -author = OpenStack -author-email = openstack-dev@lists.openstack.org -home-page = http://www.openstack.org/ -classifier = - Environment :: OpenStack - Intended Audience :: Information Technology - Intended Audience :: System Administrators - License :: OSI Approved :: Apache Software License - Operating System :: POSIX :: Linux - Programming Language :: Python - Programming Language :: Python :: 2 - Programming Language :: Python :: 2.7 - Programming Language :: Python :: 3 - Programming Language :: Python :: 3.4 - -[files] -packages = - futurist - -[build_sphinx] -source-dir = doc/source -build-dir = doc/build -all_files = 1 - -[upload_sphinx] -upload-dir = doc/build/html - -[compile_catalog] -directory = futurist/locale -domain = futurist - -[pbr] -warnerrors = True - -[wheel] -universal = 1 - -[update_catalog] -domain = futurist -output_dir = futurist/locale -input_file = futurist/locale/futurist.pot - -[extract_messages] -keywords = _ gettext ngettext l_ lazy_gettext -mapping_file = babel.cfg -output_file = futurist/locale/futurist.pot diff --git a/setup.py b/setup.py deleted file mode 100644 index 782bb21..0000000 --- a/setup.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright (c) 2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT -import setuptools - -# In python < 2.7.4, a lazy loading of package `pbr` will break -# setuptools if some other modules registered functions in `atexit`. -# solution from: http://bugs.python.org/issue15881#msg170215 -try: - import multiprocessing # noqa -except ImportError: - pass - -setuptools.setup( - setup_requires=['pbr>=1.8'], - pbr=True) diff --git a/test-requirements.txt b/test-requirements.txt deleted file mode 100644 index 6e6d300..0000000 --- a/test-requirements.txt +++ /dev/null @@ -1,19 +0,0 @@ -# The order of packages is significant, because pip processes them in the order -# of appearance. Changing the order has an impact on the overall integration -# process, which may cause wedges in the gate later. - -hacking<0.11,>=0.10.0 - -# Used for making sure the eventlet executors work. -eventlet!=0.18.3,>=0.18.2 # MIT - -doc8 # Apache-2.0 -coverage>=3.6 # Apache-2.0 -discover # BSD -python-subunit>=0.0.18 # Apache-2.0/BSD -sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 # BSD -oslosphinx!=3.4.0,>=2.5.0 # Apache-2.0 -oslotest>=1.10.0 # Apache-2.0 -testrepository>=0.0.18 # Apache-2.0/BSD -testscenarios>=0.4 # Apache-2.0/BSD -testtools>=1.4.0 # MIT diff --git a/tox.ini b/tox.ini deleted file mode 100644 index 1baa7e7..0000000 --- a/tox.ini +++ /dev/null @@ -1,41 +0,0 @@ -[tox] -minversion = 1.6 -envlist = py34,py27,pypy,pep8 -skipsdist = True - -[testenv] -usedevelop = True -install_command = pip install -U {opts} {packages} -setenv = - VIRTUAL_ENV={envdir} -deps = -r{toxinidir}/requirements.txt - -r{toxinidir}/test-requirements.txt -commands = python setup.py test --slowest --testr-args='{posargs}' - -[testenv:pep8] -commands = flake8 - -[testenv:venv] -commands = {posargs} - -[testenv:cover] -commands = python setup.py test --coverage --testr-args='{posargs}' - -[testenv:py27] -commands = - python setup.py testr --slowest --testr-args='{posargs}' - sphinx-build -b doctest doc/source doc/build - doc8 --ignore-path "doc/source/history.rst" doc/source - -[testenv:docs] -commands = python setup.py build_sphinx - -[testenv:debug] -commands = oslo_debug_helper {posargs} - -[flake8] -# E123, E125 skipped as they are invalid PEP-8. - -show-source = True -ignore = E123,E125 -exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build