From 5d07e0b8efda030601d052aca0ce18511141bbb1 Mon Sep 17 00:00:00 2001 From: Vincent Llorens Date: Tue, 16 Feb 2016 15:53:49 +0100 Subject: [PATCH] import project from launchpad This is a combination of 5 commits. . The first commit's message is: README template . This is the 2nd commit message: Synergy Scheduler Manager . This is the 3rd commit message: add initial setup.py . This is the 4th commit message: make the package installable 4 main changes: - define entry points for the provided managers - make this a real package by moving the code at the root of "synergy_scheduler_manager". - add missing dependencies - add constraint on oslo.* package versions This is needed because we use "oslo.*" namespaces when importing, and this is only available for oslo.config and oslo.messaging for versions < 2.0.0 . This is the 5th commit message: add cookiecutter template files Change-Id: I39609f07e38cfe27dd844bb277528b4a23049f9d --- .coveragerc | 7 + .testr.conf | 7 + CONTRIBUTING.rst | 17 + HACKING.rst | 4 + LICENSE | 176 +++ MANIFEST.in | 6 + README.rst | 26 + babel.cfg | 2 + doc/source/conf.py | 74 ++ doc/source/contributing.rst | 4 + doc/source/index.rst | 25 + doc/source/installation.rst | 12 + doc/source/readme.rst | 1 + doc/source/usage.rst | 5 + requirements.txt | 9 + setup.cfg | 53 + setup.py | 27 + synergy_scheduler_manager/__init__.py | 19 + .../fairshare_manager.py | 376 ++++++ synergy_scheduler_manager/keystone_manager.py | 672 +++++++++++ synergy_scheduler_manager/nova_manager.py | 1064 +++++++++++++++++ synergy_scheduler_manager/queue_manager.py | 466 ++++++++ synergy_scheduler_manager/quota_manager.py | 427 +++++++ .../scheduler_manager.py | 496 ++++++++ synergy_scheduler_manager/tests/__init__.py | 0 synergy_scheduler_manager/tests/base.py | 18 + .../tests/test_dynamic_quota.py | 26 + test-requirements.txt | 14 + tox.ini | 60 + 29 files changed, 4093 insertions(+) create mode 100644 .coveragerc create mode 100644 .testr.conf create mode 100644 CONTRIBUTING.rst create mode 100644 HACKING.rst create mode 100644 LICENSE create mode 100644 MANIFEST.in create mode 100644 README.rst create mode 100644 babel.cfg create mode 100755 doc/source/conf.py create mode 100644 doc/source/contributing.rst create mode 100644 doc/source/index.rst create mode 100644 doc/source/installation.rst create mode 100644 doc/source/readme.rst create mode 100644 doc/source/usage.rst create mode 100644 requirements.txt create mode 100644 setup.cfg create mode 100644 setup.py create mode 100644 synergy_scheduler_manager/__init__.py create mode 100644 synergy_scheduler_manager/fairshare_manager.py create mode 100644 synergy_scheduler_manager/keystone_manager.py create mode 100644 synergy_scheduler_manager/nova_manager.py create mode 100644 synergy_scheduler_manager/queue_manager.py create mode 100644 synergy_scheduler_manager/quota_manager.py create mode 100644 synergy_scheduler_manager/scheduler_manager.py create mode 100644 synergy_scheduler_manager/tests/__init__.py create mode 100644 synergy_scheduler_manager/tests/base.py create mode 100644 synergy_scheduler_manager/tests/test_dynamic_quota.py create mode 100644 test-requirements.txt create mode 100644 tox.ini diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..76045da --- /dev/null +++ b/.coveragerc @@ -0,0 +1,7 @@ +[run] +branch = True +source = synergy_scheduler_manager +omit = synergy_scheduler_manager/openstack/* + +[report] +ignore_errors = True diff --git a/.testr.conf b/.testr.conf new file mode 100644 index 0000000..6d83b3c --- /dev/null +++ b/.testr.conf @@ -0,0 +1,7 @@ +[DEFAULT] +test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \ + OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \ + OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \ + ${PYTHON:-python} -m subunit.run discover -t ./ . $LISTOPT $IDOPTION +test_id_option=--load-list $IDFILE +test_list_option=--list diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst new file mode 100644 index 0000000..0866929 --- /dev/null +++ b/CONTRIBUTING.rst @@ -0,0 +1,17 @@ +If you would like to contribute to the development of OpenStack, you must +follow the steps in this page: + + http://docs.openstack.org/infra/manual/developers.html + +If you already have a good understanding of how the system works and your +OpenStack accounts are set up, you can skip to the development workflow +section of this documentation to learn how changes to OpenStack should be +submitted for review via the Gerrit tool: + + http://docs.openstack.org/infra/manual/developers.html#development-workflow + +Pull requests submitted through GitHub will be ignored. + +Bugs should be filed on Launchpad, not GitHub: + + https://bugs.launchpad.net/synergy-scheduler-manager diff --git a/HACKING.rst b/HACKING.rst new file mode 100644 index 0000000..56da37d --- /dev/null +++ b/HACKING.rst @@ -0,0 +1,4 @@ +synergy-scheduler-manager Style Commandments +=============================================== + +Read the OpenStack Style Commandments http://docs.openstack.org/developer/hacking/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..68c771a --- /dev/null +++ b/LICENSE @@ -0,0 +1,176 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..c978a52 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,6 @@ +include AUTHORS +include ChangeLog +exclude .gitignore +exclude .gitreview + +global-exclude *.pyc diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..c293f8e --- /dev/null +++ b/README.rst @@ -0,0 +1,26 @@ +------------------------------ + SYNERGY SCHEDULER MANAGER +------------------------------ + +The Scheduler Manager + +Synergy is as a new extensible general purpose management OpenStack service. +Its capabilities are implemented by a collection of managers which are specific +and independent pluggable tasks, executed periodically or interactively. The +managers can interact with each other in a loosely coupled way. +The Scheduler Manager provides advanced scheduling (fairshare) capability for +OpenStack. In particular it aims to address the resource utilization issues +coming from the static allocation model inherent in the Cloud paradigm, by +adopting the dynamic partitioning strategy implemented by the advanced batch +schedulers. + + +* Free software: Apache license +* Documentation: http://docs.openstack.org/developer/synergy-scheduler-manager +* Source: http://git.openstack.org/cgit/openstack/synergy-scheduler-manager +* Bugs: http://bugs.launchpad.net/synergy-scheduler-manager + +Features +-------- + +* TODO diff --git a/babel.cfg b/babel.cfg new file mode 100644 index 0000000..15cd6cb --- /dev/null +++ b/babel.cfg @@ -0,0 +1,2 @@ +[python: **.py] + diff --git a/doc/source/conf.py b/doc/source/conf.py new file mode 100755 index 0000000..1f2d516 --- /dev/null +++ b/doc/source/conf.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys + +sys.path.insert(0, os.path.abspath('../..')) +# -- General configuration ---------------------------------------------------- + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones. +extensions = [ + 'sphinx.ext.autodoc', + #'sphinx.ext.intersphinx', + 'oslosphinx' +] + +# autodoc generation is a bit aggressive and a nuisance when doing heavy +# text edit cycles. +# execute "export SPHINX_DEBUG=1" in your terminal to disable + +# The suffix of source filenames. +source_suffix = '.rst' + +# The master toctree document. +master_doc = 'index' + +# General information about the project. +project = u'synergy-scheduler-manager' + +# If true, '()' will be appended to :func: etc. cross-reference text. +add_function_parentheses = True + +# If true, the current module name will be prepended to all description +# unit titles (such as .. function::). +add_module_names = True + +# The name of the Pygments (syntax highlighting) style to use. +pygments_style = 'sphinx' + +# -- Options for HTML output -------------------------------------------------- + +# The theme to use for HTML and HTML Help pages. Major themes that come with +# Sphinx are currently 'default' and 'sphinxdoc'. +# html_theme_path = ["."] +# html_theme = '_theme' +# html_static_path = ['static'] + +# Output file base name for HTML help builder. +htmlhelp_basename = '%sdoc' % project + +# Grouping the document tree into LaTeX files. List of tuples +# (source start file, target name, title, author, documentclass +# [howto/manual]). +latex_documents = [ + ('index', + '%s.tex' % project, + u'%s Documentation' % project, + u'OpenStack Foundation', 'manual'), +] + +# Example configuration for intersphinx: refer to the Python standard library. +#intersphinx_mapping = {'http://docs.python.org/': None} diff --git a/doc/source/contributing.rst b/doc/source/contributing.rst new file mode 100644 index 0000000..1728a61 --- /dev/null +++ b/doc/source/contributing.rst @@ -0,0 +1,4 @@ +============ +Contributing +============ +.. include:: ../../CONTRIBUTING.rst diff --git a/doc/source/index.rst b/doc/source/index.rst new file mode 100644 index 0000000..1513578 --- /dev/null +++ b/doc/source/index.rst @@ -0,0 +1,25 @@ +.. synergy-scheduler-manager documentation master file, created by + sphinx-quickstart on Tue Jul 9 22:26:36 2013. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Welcome to synergy-scheduler-manager's documentation! +======================================================== + +Contents: + +.. toctree:: + :maxdepth: 2 + + readme + installation + usage + contributing + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` + diff --git a/doc/source/installation.rst b/doc/source/installation.rst new file mode 100644 index 0000000..d0ef516 --- /dev/null +++ b/doc/source/installation.rst @@ -0,0 +1,12 @@ +============ +Installation +============ + +At the command line:: + + $ pip install synergy-scheduler-manager + +Or, if you have virtualenvwrapper installed:: + + $ mkvirtualenv synergy-scheduler-manager + $ pip install synergy-scheduler-manager diff --git a/doc/source/readme.rst b/doc/source/readme.rst new file mode 100644 index 0000000..a6210d3 --- /dev/null +++ b/doc/source/readme.rst @@ -0,0 +1 @@ +.. include:: ../../README.rst diff --git a/doc/source/usage.rst b/doc/source/usage.rst new file mode 100644 index 0000000..7111ba8 --- /dev/null +++ b/doc/source/usage.rst @@ -0,0 +1,5 @@ +======== +Usage +======== + +TODO diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e8ac661 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +# The order of packages is significant, because pip processes them in the order +# of appearance. Changing the order has an impact on the overall integration +# process, which may cause wedges in the gate later. + +pbr>=1.6 +synergy-service==0.2.0.dev4 +oslo.config<2.0.0 +oslo.messaging<2.0.0 +sqlalchemy==1.0.13 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..dd87780 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,53 @@ +[metadata] +name = synergy-scheduler-manager +version = 0.1 +summary = Provide advanced scheduling (fairshare) capability for OpenStack +description-file = + README.rst +author = Lisa Zangrando +author-email = lisa.zangrando@pd.infn.it +home-page = https://launchpad.net/synergy-scheduler-manager +classifier = + Environment :: OpenStack + Intended Audience :: Information Technology + Intended Audience :: System Administrators + License :: OSI Approved :: Apache Software License + Operating System :: POSIX :: Linux + Programming Language :: Python + Programming Language :: Python :: 2 + Programming Language :: Python :: 2.7 + +[files] +packages = + synergy_scheduler_manager + +[entry_points] +synergy.managers = + KeystoneManager = synergy_scheduler_manager.keystone_manager:KeystoneManager + NovaManager = synergy_scheduler_manager.nova_manager:NovaManager + QueueManager = synergy_scheduler_manager.queue_manager:QueueManager + QuotaManager = synergy_scheduler_manager.quota_manager:QuotaManager + FairShareManager = synergy_scheduler_manager.fairshare_manager:FairShareManager + SchedulerManager = synergy_scheduler_manager.scheduler_manager:SchedulerManager + +[build_sphinx] +source-dir = doc/source +build-dir = doc/build +all_files = 1 + +[upload_sphinx] +upload-dir = doc/build/html + +[compile_catalog] +directory = synergy_scheduler_manager/locale +domain = synergy_scheduler_manager + +[update_catalog] +domain = synergy_scheduler_manager +output_dir = synergy_scheduler_manager/locale +input_file = synergy_scheduler_manager/locale/synergy_scheduler_manager.pot + +[extract_messages] +keywords = _ gettext ngettext l_ lazy_gettext +mapping_file = babel.cfg +output_file = synergy_scheduler_manager/locale/synergy_scheduler_manager.pot diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..98b93eb --- /dev/null +++ b/setup.py @@ -0,0 +1,27 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT +import setuptools + +# In python < 2.7.4, a lazy loading of package `pbr` will break +# setuptools if some other modules registered functions in `atexit`. +# solution from: http://bugs.python.org/issue15881#msg170215 +try: + import multiprocessing # noqa +except ImportError: + pass + +setuptools.setup( + setup_requires=['pbr'], + pbr=True) diff --git a/synergy_scheduler_manager/__init__.py b/synergy_scheduler_manager/__init__.py new file mode 100644 index 0000000..b6223ca --- /dev/null +++ b/synergy_scheduler_manager/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import pbr.version + + +__version__ = pbr.version.VersionInfo( + 'synergy_scheduler_manager').version_string() diff --git a/synergy_scheduler_manager/fairshare_manager.py b/synergy_scheduler_manager/fairshare_manager.py new file mode 100644 index 0000000..4bf1c73 --- /dev/null +++ b/synergy_scheduler_manager/fairshare_manager.py @@ -0,0 +1,376 @@ +import logging +import threading + +from datetime import datetime +from datetime import timedelta + +try: + from oslo_config import cfg +except ImportError: + from oslo.config import cfg + +from synergy.common.manager import Manager + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class FairShareManager(Manager): + + def __init__(self): + Manager.__init__(self, name="FairShareManager") + + self.config_opts = [ + cfg.IntOpt('periods', default=3), + cfg.IntOpt('period_length', default=7), + cfg.FloatOpt('default_share', default=10.0), + cfg.FloatOpt('decay_weight', default=0.5, help="the decay weight"), + cfg.IntOpt('age_weight', default=1000, help="the age weight"), + cfg.IntOpt('vcpus_weight', default=10000, help="the vcpus weight"), + cfg.IntOpt('memory_weight', default=7000, help="the memory weight") + ] + + def setup(self): + if self.getManager("NovaManager") is None: + raise Exception("NovaManager not found!") + + if self.getManager("QueueManager") is None: + raise Exception("QueueManager not found!") + + if self.getManager("QuotaManager") is None: + raise Exception("QuotaManager not found!") + + if self.getManager("KeystoneManager") is None: + raise Exception("KeystoneManager not found!") + + self.periods = CONF.FairShareManager.periods + self.period_length = CONF.FairShareManager.period_length + self.default_share = float(CONF.FairShareManager.default_share) + self.decay_weight = CONF.FairShareManager.decay_weight + self.vcpus_weight = CONF.FairShareManager.vcpus_weight + self.age_weight = CONF.FairShareManager.age_weight + self.memory_weight = CONF.FairShareManager.memory_weight + self.projects = {} + self.workers = [] + self.exit = False + self.nova_manager = self.getManager("NovaManager") + self.queue_manager = self.getManager("QueueManager") + self.quota_manager = self.getManager("QuotaManager") + self.keystone_manager = self.getManager("KeystoneManager") + self.condition = threading.Condition() + + def execute(self, command, *args, **kargs): + if command == "ADD_PROJECT": + return self.addProject(*args, **kargs) + elif command == "GET_PROJECT": + return self.getProject(*args, **kargs) + elif command == "GET_PROJECTS": + return self.getProjects(*args, **kargs) + elif command == "REMOVE_PROJECT": + return self.removeProject(*args, **kargs) + elif command == "GET_PRIORITY": + result = {} + for prj_id, project in self.projects.items(): + users = {} + + for user_id, user in project["users"].items(): + p = self.calculatePriority(user_id=user_id, prj_id=prj_id) + users[user["name"]] = p + + result[project["name"]] = users + return result + elif command == "CALCULATE_PRIORITY": + return self.calculatePriority(*args, **kargs) + else: + raise Exception("command=%r not supported!" % command) + + def task(self): + with self.condition: + try: + self.calculateFairShare() + except Exception as ex: + LOG.error(ex) + raise ex + finally: + self.condition.notifyAll() + + def destroy(self): + pass + + def calculatePriority(self, user_id, prj_id, timestamp=None, retry=0): + if prj_id not in self.projects: + raise Exception("project=%s not found!" % prj_id) + + if user_id not in self.projects[prj_id]["users"]: + raise Exception("user=%s not found!" % user_id) + + fair_share_cores = 0 + fair_share_ram = 0 + + with self.condition: + user = self.projects[prj_id]["users"].get(user_id) + fair_share_cores = user["fairshare_cores"] + fair_share_ram = user["fairshare_ram"] + + self.condition.notifyAll() + + if not timestamp: + timestamp = datetime.utcnow() + + now = datetime.utcnow() + + diff = (now - timestamp) + minutes = diff.seconds / 60 + priority = (float(self.age_weight) * minutes + + float(self.vcpus_weight) * fair_share_cores + + float(self.memory_weight) * fair_share_ram - + float(self.age_weight) * retry) + + return int(priority) + + def addProject(self, prj_id, prj_name, share=float(0)): + if prj_id not in self.projects: + if share == 0: + share = self.default_share + + with self.condition: + self.projects[prj_id] = {"id": prj_id, + "name": prj_name, + "type": "dynamic", + "users": {}, + "usage": {}, + "share": share} + self.condition.notifyAll() + + def getProject(self, prj_id): + if prj_id not in self.projects: + raise Exception("project name=%r not found!" % prj_id) + + return self.projects.get(prj_id) + + def getProjects(self): + return self.projects + + def removeProject(self, prj_id): + if prj_id in self.projects: + with self.condition: + del self.projects[prj_id] + self.condition.notifyAll() + + def calculateFairShare(self): + total_prj_share = float(0) + total_usage_ram = float(0) + total_usage_cores = float(0) + total_actual_usage_cores = float(0) + total_actual_usage_ram = float(0) + + users = self.keystone_manager.execute("GET_USERS") + + if not users: + LOG.error("cannot retrieve the users list from KeystoneManager") + return + + for user in users: + user_id = str(user["id"]) + user_name = str(user["name"]) + user_projects = self.keystone_manager.execute("GET_USER_PROJECTS", + id=user_id) + + for project in user_projects: + prj_id = str(project["id"]) + + if prj_id not in self.projects: + continue + + p_users = self.projects[prj_id]["users"] + + if user_id not in p_users: + p_users[user_id] = {"name": user_name, + "share": self.default_share, + "usage": {"ram": float(0), + "cores": float(0)}} + else: + p_users[user_id]["usage"]["ram"] = float(0) + p_users[user_id]["usage"]["cores"] = float(0) + + to_date = datetime.utcnow() + + for x in xrange(self.periods): + default_share = self.default_share + decay = self.decay_weight ** x + from_date = to_date - timedelta(days=(self.period_length)) + + usages = self.nova_manager.execute("GET_RESOURCE_USAGE", + prj_ids=self.projects.keys(), + from_date=from_date, + to_date=to_date) + + for prj_id, users in usages.items(): + project = self.projects[prj_id] + + for user_id, usage_record in users.items(): + if user_id not in project["users"]: + project["users"][user_id] = {"name": user_name, + "share": default_share, + "usage": {}} + + user_usage = project["users"][user_id]["usage"] + user_usage["ram"] += decay * usage_record["ram"] + user_usage["cores"] += decay * usage_record["cores"] + + total_usage_ram += user_usage["ram"] + total_usage_cores += user_usage["cores"] + + to_date = from_date + + for project in self.projects.values(): + if "share" not in project or project["share"] == 0: + project["share"] = self.default_share + + # check the share for each user and update the usage_record + users = project["users"] + prj_id = project["id"] + # prj_name = project["name"] + prj_share = project["share"] + sibling_share = float(0) + + for user_id, user in users.items(): + if "share" not in user or user["share"] == 0: + user["share"] = self.default_share + + if len(users) == 1: + user["share"] = prj_share + sibling_share = prj_share + else: + sibling_share += user["share"] + + project["sibling_share"] = sibling_share + total_prj_share += prj_share + + for prj_id, project in self.projects.items(): + sibling_share = project["sibling_share"] + prj_share = project["share"] + actual_usage_cores = float(0) + actual_usage_ram = float(0) + + users = project["users"] + + for user_id, user in users.items(): + # for each user the normalized share + # is calculated (0 <= user_norm_share <= 1) + user_share = user["share"] + user_usage = user["usage"] + user_usage["norm_ram"] = user_usage["ram"] + user_usage["norm_cores"] = user_usage["cores"] + + if prj_share > 0 and sibling_share > 0 and total_prj_share > 0: + user["norm_share"] = (user_share / sibling_share) * \ + (prj_share / total_prj_share) + else: + user["norm_share"] = user_share + + if total_usage_ram > 0: + user_usage["norm_ram"] /= total_usage_ram + + if total_usage_cores > 0: + user_usage["norm_cores"] /= total_usage_cores + + actual_usage_ram += user_usage["norm_ram"] + actual_usage_cores += user_usage["norm_cores"] + + project["usage"]["actual_ram"] = actual_usage_ram + project["usage"]["actual_cores"] = actual_usage_cores + + total_actual_usage_ram += actual_usage_ram + total_actual_usage_cores += actual_usage_cores + + for project in self.projects.values(): + actual_usage_ram = project["usage"]["actual_ram"] + actual_usage_cores = project["usage"]["actual_cores"] + prj_share = project["share"] + sibling_share = project["sibling_share"] + users = project["users"] + + # effect_prj_cores_usage = actual_usage_cores + + # ((total_actual_usage_cores - actual_usage_cores) * + # prj_share / total_prj_share) + + # effect_prj_cores_usage = actual_usage_ram + + # ((total_actual_usage_ram - actual_usage_ram) * + # prj_share / total_prj_share) + + effect_prj_ram_usage = actual_usage_ram + effect_prj_cores_usage = actual_usage_cores + + project["usage"]["effective_ram"] = effect_prj_ram_usage + project["usage"]["effective_cores"] = effect_prj_cores_usage + + for user in users.values(): + user["fairshare_ram"] = float(0) + user["fairshare_cores"] = float(0) + user_share = user["share"] + user_usage = user["usage"] + user_usage["effective_cores"] = float(0) + user_usage["effective_ram"] = float(0) + + if user_share > 0: + norm_share = user["norm_share"] + norm_usage_ram = user_usage["norm_ram"] + norm_usage_cores = user_usage["norm_cores"] + + effect_usage_ram = (norm_usage_ram + ( + (effect_prj_cores_usage - + norm_usage_ram) * + user_share / sibling_share)) + + effect_usage_cores = (norm_usage_cores + ( + (effect_prj_cores_usage - + norm_usage_cores) * + user_share / sibling_share)) + + user_usage["effective_ram"] = effect_usage_ram + user_usage["effective_rel_ram"] = float(0) + + user_usage["effective_cores"] = effect_usage_cores + user_usage["effective_rel_cores"] = float(0) + + if actual_usage_cores > 0: + user_usage["effective_rel_cores"] = norm_usage_cores + user_usage["effective_rel_cores"] /= actual_usage_cores + + if actual_usage_ram > 0: + user_usage["effect_rel_ram"] = norm_usage_ram + user_usage["effect_rel_ram"] /= actual_usage_ram + + # user["effect_usage_rel_cores"] = effect_usage_cores / + # effect_prj_cores_usage + # user["effect_usage_rel_ram"] = effect_usage_ram / + # effect_prj_cores_usage + + if norm_share > 0: + f_ram = 2 ** (-effect_usage_ram / norm_share) + user["fairshare_ram"] = f_ram + + f_cores = 2 ** (-effect_usage_cores / norm_share) + user["fairshare_cores"] = f_cores + + LOG.debug("fairshare project %s" % project) diff --git a/synergy_scheduler_manager/keystone_manager.py b/synergy_scheduler_manager/keystone_manager.py new file mode 100644 index 0000000..309d486 --- /dev/null +++ b/synergy_scheduler_manager/keystone_manager.py @@ -0,0 +1,672 @@ +import json +import logging +import os.path +import requests + +from datetime import datetime + +try: + from oslo_config import cfg +except ImportError: + from oslo.config import cfg + +from synergy.common.manager import Manager + + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class Trust(object): + + def __init__(self, data): + data = data["trust"] + + self.id = data["id"] + self.impersonations = data["impersonation"] + self.roles_links = data["roles_links"] + self.trustor_user_id = data["trustor_user_id"] + self.trustee_user_id = data["trustee_user_id"] + self.links = data["links"] + self.roles = data["roles"] + self.remaining_uses = data["remaining_uses"] + self.expires_at = None + + if data["expires_at"] is not None: + self.expires_at = datetime.strptime(data["expires_at"], + "%Y-%m-%dT%H:%M:%S.%fZ") + self.project_id = data["project_id"] + + def getId(self): + return self.id + + def isImpersonations(self): + return self.impersonations + + def getRolesLinks(self): + return self.roles_links + + def getTrustorUserId(self): + return self.trustor_user_id + + def getTrusteeUserId(self): + return self.trustee_user_id + + def getlinks(self): + return self.links + + def getProjectId(self): + return self.project_id + + def getRoles(self): + return self.roles + + def getRemainingUses(self): + return self.remaining_uses + + def getExpiration(self): + return self.expires_at + + def isExpired(self): + if self.getExpiration() is None: + return False + + return self.getExpiration() < datetime.utcnow() + + +class Token(object): + + def __init__(self, token, data): + self.id = token + + data = data["token"] + self.roles = data["roles"] + self.catalog = data["catalog"] + self.issued_at = datetime.strptime(data["issued_at"], + "%Y-%m-%dT%H:%M:%S.%fZ") + self.expires_at = datetime.strptime(data["expires_at"], + "%Y-%m-%dT%H:%M:%S.%fZ") + self.project = data["project"] + self.user = data["user"] + self.extras = data["extras"] + + def getCatalog(self, service_name=None, interface="public"): + if service_name: + for service in self.catalog: + if service["name"] == service_name: + for endpoint in service["endpoints"]: + if endpoint["interface"] == interface: + return endpoint + return None + else: + return self.catalog + + def getExpiration(self): + return self.expires_at + + def getId(self): + return self.id + + def getExtras(self): + return self.extras + + def getProject(self): + return self.project + + def getRoles(self): + return self.roles + + def getUser(self): + return self.user + + def isAdmin(self): + if not self.roles: + return False + + for role in self.roles: + if role["name"] == "admin": + return True + + return False + + def issuedAt(self): + return self.issued_at + + def isExpired(self): + return self.getExpiration() < datetime.utcnow() + + def save(self, filename): + # save to file + with open(filename, 'w') as f: + token = {} + token["catalog"] = self.catalog + token["extras"] = self.extras + token["user"] = self.user + token["project"] = self.project + token["roles"] = self.roles + token["roles"] = self.roles + token["issued_at"] = self.issued_at.isoformat() + token["expires_at"] = self.expires_at.isoformat() + + data = {"id": self.id, "token": token} + + json.dump(data, f) + + @classmethod + def load(cls, filename): + if not os.path.isfile(".auth_token"): + return None + + # load from file: + with open(filename, 'r') as f: + try: + data = json.load(f) + return Token(data["id"], data) + # if the file is empty the ValueError will be thrown + except ValueError as ex: + raise ex + + def isotime(self, at=None, subsecond=False): + """Stringify time in ISO 8601 format.""" + if not at: + at = datetime.utcnow() + + if not subsecond: + st = at.strftime('%Y-%m-%dT%H:%M:%S') + else: + st = at.strftime('%Y-%m-%dT%H:%M:%S.%f') + + if at.tzinfo: + tz = at.tzinfo.tzname(None) + else: + tz = 'UTC' + + st += ('Z' if tz == 'UTC' else tz) + return st + + """The trustor or grantor of a trust is the person who creates the trust. + The trustor is the one who contributes property to the trust. + The trustee is the person who manages the trust, and is usually appointed + by the trustor. The trustor is also often the trustee in living trusts. + """ + def trust(self, trustee_user, expires_at=None, + project_id=None, roles=None, impersonation=True): + if self.isExpired(): + raise Exception("token expired!") + + headers = {"Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "python-novaclient", + "X-Auth-Token": self.getId()} + + if roles is None: + roles = self.getRoles() + + if project_id is None: + project_id = self.getProject().get("id") + + data = {} + data["trust"] = {"impersonation": impersonation, + "project_id": project_id, + "roles": roles, + "trustee_user_id": trustee_user, + "trustor_user_id": self.getUser().get("id")} + + if expires_at is not None: + data["trust"]["expires_at"] = self.isotime(expires_at, True) + + endpoint = self.getCatalog(service_name="keystone") + + if not endpoint: + raise Exception("keystone endpoint not found!") + + if "v2.0" in endpoint["url"]: + endpoint["url"] = endpoint["url"].replace("v2.0", "v3") + + response = requests.post(url=endpoint["url"] + "/OS-TRUST/trusts", + headers=headers, + data=json.dumps(data)) + + if response.status_code != requests.codes.ok: + response.raise_for_status() + + if not response.text: + raise Exception("trust token failed!") + + return Trust(response.json()) + + +class KeystoneManager(Manager): + + def __init__(self): + Manager.__init__(self, name="KeystoneManager") + + self.config_opts = [ + cfg.StrOpt("auth_url", + help="the Keystone url (v3 only)", + required=True), + cfg.StrOpt("username", + help="the name of user with admin role", + required=True), + cfg.StrOpt("password", + help="the password of user with admin role", + required=True), + cfg.StrOpt("project_name", + help="the project to request authorization on", + required=True), + cfg.StrOpt("project_id", + help="the project id to request authorization on", + required=False), + cfg.IntOpt("timeout", + help="set the http connection timeout", + default=60, + required=False), + cfg.IntOpt("trust_expiration", + help="set the trust expiration", + default=24, + required=False) + ] + + def setup(self): + self.auth_url = CONF.KeystoneManager.auth_url + self.username = CONF.KeystoneManager.username + self.password = CONF.KeystoneManager.password + self.project_name = CONF.KeystoneManager.project_name + self.project_id = CONF.KeystoneManager.project_id + self.timeout = CONF.KeystoneManager.timeout + self.trust_expiration = CONF.KeystoneManager.trust_expiration + self.token = None + + self.authenticate() + + def task(self): + pass + + def destroy(self): + pass + + def execute(self, command, *args, **kargs): + if command == "GET_USERS": + return self.getUsers() + elif command == "GET_USER": + return self.getProject(*args, **kargs) + elif command == "GET_USER_PROJECTS": + return self.getUserProjects(*args, **kargs) + elif command == "GET_USER_ROLES": + return self.getUserRoles(*args, **kargs) + elif command == "GET_PROJECTS": + return self.getProjects() + elif command == "GET_PROJECT": + return self.getProject(*args, **kargs) + elif command == "GET_ROLES": + return self.getRoles() + elif command == "GET_ROLE": + return self.getRole(*args, **kargs) + elif command == "GET_TOKEN": + return self.getToken() + elif command == "DELETE_TOKEN": + return self.deleteToken(*args, **kargs) + elif command == "VALIDATE_TOKEN": + return self.validateToken(*args, **kargs) + elif command == "GET_ENDPOINTS": + return self.getEndpoints() + elif command == "GET_ENDPOINT": + return self.getEndpoints(*args, **kargs) + elif command == "GET_SERVICES": + return self.getServices() + elif command == "GET_SERVICE": + return self.getService(*args, **kargs) + elif command == "TRUST": + return self.trust(*args, **kargs) + else: + return None + + def doOnEvent(self, event_type, *args, **kargs): + if event_type == "GET_PROJECTS": + kargs["result"].extend(self.getProjects()) + + def authenticate(self): + if self.token is not None: + if self.token.isExpired(): + try: + self.deleteToken(self.token.getId()) + except requests.exceptions.HTTPError: + pass + else: + return + + headers = {"Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "python-novaclient"} + + identity = {"methods": ["password"], + "password": {"user": {"name": self.username, + "domain": {"id": "default"}, + "password": self.password}}} + + data = {"auth": {}} + data["auth"]["identity"] = identity + + if self.project_name: + data["auth"]["scope"] = {"project": {"name": self.project_name, + "domain": {"id": "default"}}} + + if self.project_id: + data["auth"]["scope"] = {"project": {"id": self.project_id, + "domain": {"id": "default"}}} + + response = requests.post(url=self.auth_url + "/auth/tokens", + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + + if response.status_code != requests.codes.ok: + response.raise_for_status() + + if not response.text: + raise Exception("authentication failed!") + + # print(response.__dict__) + + token_subject = response.headers["X-Subject-Token"] + token_data = response.json() + + self.token = Token(token_subject, token_data) + + def getUser(self, id): + try: + response = self.getResource("users/%s" % id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the user info (id=%r): %s" + % (id, response["error"]["message"])) + + if response: + response = response["user"] + + return response + + def getUsers(self): + try: + response = self.getResource("users", "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the users list: %s" + % response["error"]["message"]) + + if response: + response = response["users"] + + return response + + def getUserProjects(self, id): + try: + response = self.getResource("users/%s/projects" % id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the users's projects " + "(id=%r): %s" % (id, response["error"]["message"])) + + if response: + response = response["projects"] + + return response + + def getUserRoles(self, user_id, project_id): + try: + response = self.getResource("/projects/%s/users/%s/roles" + % (project_id, user_id), "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the user's roles (usrId=%r, " + "prjId=%r): %s" % (user_id, + project_id, + response["error"]["message"])) + + if response: + response = response["roles"] + + return response + + def getProject(self, id): + try: + response = self.getResource("/projects/%s" % id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the project (id=%r, " + % (id, response["error"]["message"])) + + if response: + response = response["project"] + + return response + + def getProjects(self): + try: + response = self.getResource("/projects", "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the projects list: %s" + % response["error"]["message"]) + + if response: + response = response["projects"] + + return response + + def getRole(self, id): + try: + response = self.getResource("/roles/%s" % id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the role info (id=%r): %s" + % (id, response["error"]["message"])) + + if response: + response = response["role"] + + return response + + def getRoles(self): + try: + response = self.getResource("/roles", "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the roles list: %s" + % response["error"]["message"]) + + if response: + response = response["roles"] + + return response + + def getToken(self): + self.authenticate() + return self.token + + def deleteToken(self, id): + if self.token is None: + return + + headers = {"Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "python-novaclient", + "X-Auth-Project-Id": self.token.getProject()["name"], + "X-Auth-Token": self.token.getId(), + "X-Subject-Token": id} + + response = requests.delete(url=self.auth_url + "/auth/tokens", + headers=headers, + timeout=self.timeout) + + self.token = None + + if response.status_code != requests.codes.ok: + response.raise_for_status() + + def validateToken(self, id): + self.authenticate() + + headers = {"Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "python-novaclient", + "X-Auth-Project-Id": self.token.getProject()["name"], + "X-Auth-Token": self.token.getId(), + "X-Subject-Token": id} + + response = requests.get(url=self.auth_url + "/auth/tokens", + headers=headers, + timeout=self.timeout) + + if response.status_code != requests.codes.ok: + response.raise_for_status() + + if not response.text: + raise Exception("token not found!") + + token_subject = response.headers["X-Subject-Token"] + token_data = response.json() + + return Token(token_subject, token_data) + + def getEndpoint(self, id=None, service_id=None): + if id: + try: + response = self.getResource("/endpoints/%s" % id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the endpoint (id=%r): %s" + % (id, response["error"]["message"])) + if response: + response = response["endpoint"] + + return response + elif service_id: + try: + endpoints = self.getEndpoints() + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the endpoints list" + "(serviceId=%r): %s" + % response["error"]["message"]) + + if endpoints: + for endpoint in endpoints: + if endpoint["service_id"] == service_id: + return endpoint + + return None + + def getEndpoints(self): + try: + response = self.getResource("/endpoints", "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the endpoints list: %s" + % response["error"]["message"]) + + if response: + response = response["endpoints"] + + return response + + def getService(self, id=None, name=None): + if id: + try: + response = self.getResource("/services/%s" % id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the service info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response: + response = response["service"] + return response + elif name: + services = self.getServices() + + if services: + for service in services: + if service["name"] == name: + return service + + return None + + def getServices(self): + try: + response = self.getResource("/services", "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the services list: %s" + % response["error"]["message"]) + + if response: + response = response["services"] + + return response + + def getResource(self, resource, method, data=None): + self.authenticate() + + url = self.auth_url + "/" + resource + + headers = {"Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "python-novaclient", + "X-Auth-Project-Id": self.token.getProject()["name"], + "X-Auth-Token": self.token.getId()} + + if method == "GET": + response = requests.get(url, + headers=headers, + params=data, + timeout=self.timeout) + elif method == "POST": + response = requests.post(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + elif method == "PUT": + response = requests.put(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + elif method == "HEAD": + response = requests.head(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + elif method == "DELETE": + response = requests.delete(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + else: + raise Exception("wrong HTTP method: %s" % method) + + if response.status_code != requests.codes.ok: + response.raise_for_status() + + if response.text: + return response.json() + else: + return None diff --git a/synergy_scheduler_manager/nova_manager.py b/synergy_scheduler_manager/nova_manager.py new file mode 100644 index 0000000..1c15565 --- /dev/null +++ b/synergy_scheduler_manager/nova_manager.py @@ -0,0 +1,1064 @@ +import ConfigParser +import eventlet +import json +import logging +import os.path +import requests + +from nova.baserpc import BaseAPI +from nova.compute.rpcapi import ComputeAPI +from nova.conductor.rpcapi import ComputeTaskAPI +from nova.conductor.rpcapi import ConductorAPI +from nova.objects import base as objects_base + +try: + from oslo_config import cfg +except ImportError: + from oslo.config import cfg + +try: + import oslo_messaging as oslo_msg +except ImportError: + import oslo.messaging as oslo_msg + +try: + from oslo_serialization import jsonutils +except ImportError: + from oslo.serialization import jsonutils + +try: + from oslo_versionedobjects import base as ovo_base +except ImportError: + from oslo.versionedobjects import base as ovo_base + +from sqlalchemy import create_engine +from sqlalchemy.exc import SQLAlchemyError + +from synergy.common.manager import Manager + + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF +CONFIG = ConfigParser.SafeConfigParser() + + +class MessagingAPI(object): + + def __init__(self, transport_url): + LOG.debug("setting up the AMQP transport url: %s" % transport_url) + oslo_msg.set_transport_defaults(control_exchange="nova") + self.TRANSPORT = oslo_msg.get_transport(CONF, url=transport_url) + + def getTarget(self, topic, exchange=None, namespace=None, + version=None, server=None): + return oslo_msg.Target(topic=topic, + exchange=exchange, + namespace=namespace, + version=version, + server=server) + + def getRPCClient(self, target, version_cap=None, serializer=None): + assert self.TRANSPORT is not None + + LOG.info("creating RPC client with target %s" % target) + return oslo_msg.RPCClient(self.TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer) + + def getRPCServer(self, target, endpoints, serializer=None): + assert self.TRANSPORT is not None + + LOG.info("creating RPC server with target %s" % target) + return oslo_msg.get_rpc_server(self.TRANSPORT, + target, + endpoints, + executor="eventlet", + serializer=serializer) + + def getNotificationListener(self, targets, endpoints): + assert self.TRANSPORT is not None + + LOG.info("creating notification listener with target %s endpoints %s" + % (targets, endpoints)) + return oslo_msg.get_notification_listener(self.TRANSPORT, + targets, + endpoints, + allow_requeue=True, + executor="eventlet") + + +class NovaBaseRPCAPI(BaseAPI): + + def __init__(self, topic, msg): + self.target = msg.getTarget(topic=None, + namespace="baseapi", + version="1.1") + + target_synergy = msg.getTarget(topic=topic + "_synergy", + namespace="baseapi", + version="1.0") + + LOG.info(target_synergy) + self.client = msg.getRPCClient(target=target_synergy, version_cap=None) + + def ping(self, context, arg, timeout=None): + try: + cctxt = self.client.prepare(timeout=timeout) + + return cctxt.call(context, 'ping', arg=arg) + except Exception as ex: + LOG.error("NovaBaseRPCAPI ping! %s" % (ex)) + raise ex + + def get_backdoor_port(self, context, host): + cctxt = self.client.prepare(server=host, version='1.1') + return cctxt.call(context, 'get_backdoor_port') + + +class NovaComputeAPI(ComputeAPI): + + def __init__(self, topic, msg): + self.target = msg.getTarget(topic=topic, version="4.0") + self.client = msg.getRPCClient(target=msg.getTarget(topic=topic)) + + +class NovaConductorAPI(ConductorAPI): + + def __init__(self, topic, msg): + report_interval = cfg.IntOpt("report_interval", + default=10, + help="Seconds between nodes reporting " + "state to datastore") + CONF.register_opt(report_interval) + + self.target = msg.getTarget(topic=topic, version="3.0") + + self.client = msg.getRPCClient( + target=msg.getTarget(topic=topic + "_synergy", version="3.0"), + version_cap=None) + + def provider_fw_rule_get_all(self, context): + cctxt = self.client.prepare() + return cctxt.call(context, 'provider_fw_rule_get_all') + + # TODO(hanlind): This method can be removed once oslo.versionedobjects + # has been converted to use version_manifests in remotable_classmethod + # operations, which will use the new class action handler. + def object_class_action(self, context, objname, objmethod, objver, + args, kwargs): + versions = ovo_base.obj_tree_get_versions(objname) + return self.object_class_action_versions(context, + objname, + objmethod, + versions, + args, kwargs) + + def object_class_action_versions(self, context, objname, objmethod, + object_versions, args, kwargs): + cctxt = self.client.prepare() + return cctxt.call(context, 'object_class_action_versions', + objname=objname, objmethod=objmethod, + object_versions=object_versions, + args=args, kwargs=kwargs) + + def object_action(self, context, objinst, objmethod, args, kwargs): + cctxt = self.client.prepare() + return cctxt.call(context, 'object_action', objinst=objinst, + objmethod=objmethod, args=args, kwargs=kwargs) + + def object_backport_versions(self, context, objinst, object_versions): + cctxt = self.client.prepare() + return cctxt.call(context, 'object_backport_versions', objinst=objinst, + object_versions=object_versions) + + +class NovaConductorComputeAPI(ComputeTaskAPI): + + def __init__(self, topic, scheduler_manager, keystone_manager, msg): + self.topic = topic + self.scheduler_manager = scheduler_manager + self.keystone_manager = keystone_manager + self.messagingAPI = msg + serializer = objects_base.NovaObjectSerializer() + + self.target = self.messagingAPI.getTarget(topic=topic, + namespace="compute_task", + version="1.11") + + self.client = self.messagingAPI.getRPCClient( + target=oslo_msg.Target(topic=topic + "_synergy", + namespace="compute_task", + version="1.10"), + serializer=serializer) + + """ nova-conductor rpc operations """ + def build_instances(self, context, instances, image, filter_properties, + admin_password, injected_files, requested_networks, + security_groups, block_device_mapping=None, + legacy_bdm=True): + # token = self.keystone_manager.validateToken(context["auth_token"]) + + for instance in instances: + request = {'instance': instance, + 'image': image, + 'filter_properties': filter_properties, + 'admin_password': admin_password, + 'injected_files': injected_files, + 'requested_networks': requested_networks, + 'security_groups': security_groups, + 'block_device_mapping': block_device_mapping, + 'legacy_bdm': legacy_bdm, + 'context': context} + + self.scheduler_manager.execute("PROCESS_REQUEST", request) + + def build_instance(self, context, instance, image, filter_properties, + admin_password, injected_files, requested_networks, + security_groups, block_device_mapping=None, + legacy_bdm=True): + try: + # LOG.info(">>> filter_properties %s" % filter_properties) + # LOG.info(">>> context %s" % context) + + version = '1.10' + if not self.client.can_send_version(version): + version = '1.9' + LOG.info(filter_properties) + if 'instance_type' in filter_properties: + flavor = filter_properties['instance_type'] + # ################################# + # ####### objects_base WRONG!!! ### + flavor_p = objects_base.obj_to_primitive(flavor) + filter_properties = dict(filter_properties, + instance_type=flavor_p) + kw = {'instances': [instance], + 'image': image, + 'filter_properties': filter_properties, + 'admin_password': admin_password, + 'injected_files': injected_files, + 'requested_networks': requested_networks, + 'security_groups': security_groups} + + if not self.client.can_send_version(version): + version = '1.8' + kw['requested_networks'] = kw['requested_networks'].as_tuples() + if not self.client.can_send_version('1.7'): + version = '1.5' + bdm_p = objects_base.obj_to_primitive(block_device_mapping) + kw.update({'block_device_mapping': bdm_p, + 'legacy_bdm': legacy_bdm}) + + cctxt = self.client.prepare(version_cap=version) + cctxt.cast(context, 'build_instances', **kw) + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error(ex) + raise ex + + def migrate_server(self, context, instance, scheduler_hint, live, rebuild, + flavor, block_migration, disk_over_commit, + reservations=None, clean_shutdown=True, + request_spec=None): + kw = {'instance': instance, 'scheduler_hint': scheduler_hint, + 'live': live, 'rebuild': rebuild, 'flavor': flavor, + 'block_migration': block_migration, + 'disk_over_commit': disk_over_commit, + 'reservations': reservations, + 'clean_shutdown': clean_shutdown, + 'request_spec': request_spec, + } + + version = '1.13' + if not self.client.can_send_version(version): + del kw['request_spec'] + version = '1.11' + if not self.client.can_send_version(version): + del kw['clean_shutdown'] + version = '1.10' + if not self.client.can_send_version(version): + kw['flavor'] = objects_base.obj_to_primitive(flavor) + version = '1.6' + if not self.client.can_send_version(version): + kw['instance'] = jsonutils.to_primitive( + objects_base.obj_to_primitive(instance)) + version = '1.4' + + cctxt = self.client.prepare(version=version) + return cctxt.call(context, 'migrate_server', **kw) + + def unshelve_instance(self, context, instance): + cctxt = self.client.prepare(version='1.3') + cctxt.cast(context, 'unshelve_instance', instance=instance) + + def rebuild_instance(self, context, instance, orig_image_ref, image_ref, + injected_files, new_pass, orig_sys_metadata, + bdms, recreate, on_shared_storage, + preserve_ephemeral=False, host=None): + cctxt = self.client.prepare(version='1.8') + cctxt.cast(context, + 'rebuild_instance', + instance=instance, new_pass=new_pass, + injected_files=injected_files, image_ref=image_ref, + orig_image_ref=orig_image_ref, + orig_sys_metadata=orig_sys_metadata, bdms=bdms, + recreate=recreate, on_shared_storage=on_shared_storage, + preserve_ephemeral=preserve_ephemeral, + host=host) + + +class NovaManager(Manager): + + def __init__(self): + Manager.__init__(self, name="NovaManager") + + self.config_opts = [ + cfg.StrOpt("nova_conf", + help="the nova.conf path", + default=None, + required=False), + cfg.StrOpt("amqp_backend", + help="the amqp backend tpye (e.g. rabbit, qpid)", + default=None, + required=False), + cfg.StrOpt("amqp_host", + help="the amqp host name", + default=None, + required=False), + cfg.IntOpt("amqp_port", + help="the amqp listening port", + default=None, + required=False), + cfg.StrOpt("amqp_user", + help="the amqp user", + default=None, + required=False), + cfg.StrOpt("amqp_password", + help="the amqp password", + default=None, + required=False), + cfg.StrOpt("amqp_virt_host", + help="the amqp virtual host", + default=None, + required=False), + cfg.StrOpt("conductor_topic", + help="the conductor topic", + default=None, + required=False), + cfg.StrOpt("compute_topic", + help="the compute topic", + default=None, + required=False), + cfg.StrOpt("scheduler_topic", + help="the scheduler topic", + default=None, + required=False), + cfg.StrOpt("db_connection", + help="the NOVA database connection", + default=None, + required=False), + cfg.StrOpt("host", + help="the host name", + default=None, + required=False), + cfg.IntOpt("timeout", + help="set the http connection timeout", + default=60, + required=False) + ] + + def setup(self): + eventlet.monkey_patch(os=False) + + self.timeout = CONF.NovaManager.timeout + + if self.getManager("KeystoneManager") is None: + raise Exception("KeystoneManager not found!") + + if self.getManager("SchedulerManager") is None: + raise Exception("SchedulerManager not found!") + + self.keystone_manager = self.getManager("KeystoneManager") + self.scheduler_manager = self.getManager("SchedulerManager") + + host = "localhost" + conductor_topic = "conductor" + compute_topic = "compute" + scheduler_topic = "scheduler" + db_connection = None + amqp_backend = None + amqp_host = None + amqp_port = None + amqp_user = None + amqp_password = None + amqp_virt_host = None + + if CONF.NovaManager.nova_conf is not None: + if os.path.isfile(CONF.NovaManager.nova_conf): + CONFIG.read(CONF.NovaManager.nova_conf) + else: + raise Exception("nova configuration file not found at %s!" + % CONF.NovaManager.nova_conf) + + host = self.getParameter("my_ip", + "DEFAULT", + default=host) + + conductor_topic = self.getParameter("conductor_topic", + "DEFAULT", + default=conductor_topic) + + compute_topic = self.getParameter("compute_topic", + "DEFAULT", + default=compute_topic) + + scheduler_topic = self.getParameter("scheduler_topic", + "DEFAULT", + default=scheduler_topic) + + db_connection = self.getParameter("connection", + "database", + fallback=True) + + amqp_backend = self.getParameter("rpc_backend", + "DEFAULT", + fallback=True) + + if amqp_backend == "rabbit": + amqp_host = self.getParameter("rabbit_host", + "oslo_messaging_rabbit", + "localhost", + fallback=False) + + amqp_port = self.getParameter("rabbit_port", + "oslo_messaging_rabbit", + "5672", + fallback=False) + + amqp_virt_host = self.getParameter("rabbit_virtual_host", + "oslo_messaging_rabbit", + "/", + fallback=False) + + amqp_user = self.getParameter("rabbit_userid", + "oslo_messaging_rabbit", + "guest", + fallback=False) + + amqp_password = self.getParameter("rabbit_password", + "oslo_messaging_rabbit", + fallback=True) + elif amqp_backend == "qpid": + amqp_host = self.getParameter("qpid_hostname", + "oslo_messaging_qpid", + "localhost", + fallback=False) + + amqp_port = self.getParameter("qpid_port", + "oslo_messaging_qpid", + "5672", + fallback=False) + + amqp_user = self.getParameter("qpid_username", + "oslo_messaging_qpid", + fallback=True) + + amqp_password = self.getParameter("qpid_password", + "oslo_messaging_qpid", + fallback=True) + else: + raise Exception("unsupported amqp backend found: %s!" + % amqp_backend) + else: + amqp_backend = CONF.NovaManager.amqp_backend + amqp_host = CONF.NovaManager.amqp_host + amqp_port = CONF.NovaManager.amqp_port + amqp_user = CONF.NovaManager.amqp_user + amqp_password = CONF.NovaManager.amqp_password + amqp_virt_host = CONF.NovaManager.amqp_virt_host + db_connection = CONF.NovaManager.db_connection + host = amqp_host + + amqp_backend = self.getParameter("amqp_backend", + "NovaManager", + fallback=True) + + amqp_host = self.getParameter("amqp_host", + "NovaManager", + fallback=True) + + amqp_host = self.getParameter("amqp_host", + "NovaManager", + default=5672) + + amqp_user = self.getParameter("amqp_user", + "NovaManager", + fallback=True) + + amqp_password = self.getParameter("amqp_password", + "NovaManager", + fallback=True) + + amqp_virt_host = self.getParameter("amqp_virt_host", + "NovaManager", + default="/") + + db_connection = self.getParameter("db_connection", + "NovaManager", + fallback=True) + + host = self.getParameter("host", + "NovaManager", + default="localhost") + + conductor_topic = self.getParameter("conductor_topic", + "NovaManager", + default=conductor_topic) + + compute_topic = self.getParameter("compute_topic", + "NovaManager", + default=compute_topic) + + scheduler_topic = self.getParameter("scheduler_topic", + "NovaManager", + default=scheduler_topic) + + try: + LOG.debug("setting up the NOVA database connection: %s" + % db_connection) + + self.db_engine = create_engine(db_connection) + + transport_url = "%s://%s:%s@%s:%s%s" % (amqp_backend, + amqp_user, + amqp_password, + amqp_host, + amqp_port, + amqp_virt_host) + + self.messagingAPI = MessagingAPI(transport_url) + + self.novaBaseRPCAPI = NovaBaseRPCAPI(conductor_topic, + self.messagingAPI) + + self.novaConductorAPI = NovaConductorAPI(conductor_topic, + self.messagingAPI) + + self.novaConductorComputeAPI = NovaConductorComputeAPI( + conductor_topic, + self.scheduler_manager, + self.keystone_manager, + self.messagingAPI) + + self.conductor_rpc = self.messagingAPI.getRPCServer( + target=self.messagingAPI.getTarget(topic=conductor_topic, + server=host), + endpoints=[self.novaBaseRPCAPI, + self.novaConductorAPI, + self.novaConductorComputeAPI]) + + self.conductor_rpc.start() + + self.novaComputeAPI = NovaComputeAPI(compute_topic, + self.messagingAPI) + + self.compute_rpc = self.messagingAPI.getRPCServer( + target=self.messagingAPI.getTarget(topic=compute_topic, + server=host), + endpoints=[self.novaComputeAPI]) + + # self.rpcserver.start() + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error("NovaManager initialization failed! %s" % (ex)) + raise ex + + def execute(self, command, *args, **kargs): + if command == "GET_PARAMETER": + return self.getParameter(*args, **kargs) + elif command == "GET_FLAVORS": + return self.getFlavors(*args, **kargs) + elif command == "GET_FLAVOR": + return self.getFlavor(*args, **kargs) + elif command == "GET_SERVERS": + return self.getServers(*args, **kargs) + elif command == "GET_SERVER": + return self.getServer(*args, **kargs) + elif command == "DELETE_SERVER": + return self.deleteServer(*args, **kargs) + elif command == "START_SERVER": + return self.startServer(*args, **kargs) + elif command == "STOP_SERVER": + return self.stopServer(*args, **kargs) + elif command == "BUILD_SERVER": + return self.buildServer(*args, **kargs) + elif command == "GET_HYPERVISORS": + return self.getHypervisors() + elif command == "GET_HYPERVISOR": + return self.getHypervisor(*args, **kargs) + elif command == "GET_QUOTA": + return self.getQuota(*args, **kargs) + elif command == "UPDATE_QUOTA": + return self.updateQuota(*args, **kargs) + elif command == "GET_TARGET": + return self.getTarget(*args, **kargs) + elif command == "GET_RCP_CLIENT": + return self.getRPCClient() + elif command == "GET_RCP_SERVER": + return self.getRPCServer(*args, **kargs) + elif command == "GET_NOTIFICATION_LISTENER": + return self.getNotificationListener(*args, **kargs) + elif command == "GET_RESOURCE_USAGE": + return self.getResourceUsage(*args, **kargs) + elif command == "GET_PROJECT_USAGE": + return self.getProjectUsage(*args, **kargs) + elif command == "GET_EXPIRED_SERVERS": + return self.getExpiredServers(*args, **kargs) + else: + raise Exception("command=%r not supported!" % command) + + def task(self): + pass + + def destroy(self): + pass + + def getParameter(self, name, section="DEFAULT", + default=None, fallback=False): + if section != "NovaManager": + try: + return CONFIG.get(section, name) + except Exception: + if fallback is True: + raise Exception("No attribute %r found in [%s] section of " + "nova.conf" % (name, section)) + else: + LOG.info("No attribute %r found in [%s] section of " + "nova.conf, using default: %r" + % (name, section, default)) + + return default + else: + result = CONF.NovaManager.get(name, None) + + if result is not None: + return result + + if fallback is True: + raise Exception("No attribute %r found in [NovaManager] " + "section of synergy.conf" % name) + else: + LOG.info("No attribute %r found in in [NovaManager] of synergy" + ".conf, using default: %r" % (name, default)) + return default + + def getFlavors(self): + url = "flavors/detail" + + try: + response_data = self.getResource(url, method="GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the flavors list: %s" + % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["flavors"] + + return response_data + + def getFlavor(self, id): + try: + response_data = self.getResource("flavors/" + id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the flavor info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["flavor"] + + return response_data + + def getServers(self, detail=False, status=None): + params = {} + if status: + params["status"] = status + + url = "servers" + + if detail: + url = "servers/detail" + + response_data = self.getResource(url, "GET", params) + + if response_data: + response_data = response_data["servers"] + + return response_data + + def getServer(self, id): + try: + response_data = self.getResource("servers/" + id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the server info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["server"] + + return response_data + + def buildServer(self, context, instance, image, filter_properties, + admin_password, injected_files, requested_networks, + security_groups, block_device_mapping=None, + legacy_bdm=True): + self.novaConductorComputeAPI.build_instance( + context=context, + instance=instance, + image=image, + filter_properties=filter_properties, + admin_password=admin_password, + injected_files=injected_files, + requested_networks=requested_networks, + security_groups=security_groups, + block_device_mapping=block_device_mapping, + legacy_bdm=legacy_bdm) + + def deleteServer(self, id): + # data = { "forceDelete": None } + # url = "servers/%s/action" % id + url = "servers/%s" % id + + try: + # response_data = self.getResource(url, "POST", data) + response_data = self.getResource(url, "DELETE") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on deleting the server (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["server"] + + return response_data + + def startServer(self, id): + data = {"os-start": None} + url = "servers/%s/action" % id + + try: + response_data = self.getResource(url, "POST", data) + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on starting the server info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["server"] + + return response_data + + def stopServer(self, id): + data = {"os-stop": None} + url = "servers/%s/action" % id + + try: + response_data = self.getResource(url, "POST", data) + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on stopping the server info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["server"] + + return response_data + + def getHypervisors(self): + data = {"os-stop": None} + url = "os-hypervisors" + # /%s" % id + + try: + response_data = self.getResource(url, "GET", data) + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the hypervisors list (id=%r)" + ": %s" % response["error"]["message"]) + + if response_data: + response_data = response_data["hypervisors"] + + return response_data + + def getHypervisor(self, id): + data = {"os-stop": None} + url = "os-hypervisors/%s" % id + + try: + response_data = self.getResource(url, "GET", data) + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the hypervisor info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["hypervisor"] + + return response_data + + def getQuota(self, id=None, defaults=False): + if defaults: + try: + url = "os-quota-sets/defaults" + response_data = self.getResource(url, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the quota defaults" + ": %s" % response["error"]["message"]) + elif id is not None: + try: + url = "os-quota-sets/%s" % id + response_data = self.getResource(url, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the quota info (id=%r)" + ": %s" % (id, response["error"]["message"])) + else: + raise Exception("wrong arguments") + + if response_data: + response_data = response_data["quota_set"] + + return response_data + + def updateQuota(self, id, data): + url = "os-quota-sets/%s" % id + quota_set = {"quota_set": data} + + try: + response_data = self.getResource(url, "PUT", quota_set) + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on updating the quota info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["quota_set"] + + return response_data + + def getResource(self, resource, method, data=None): + self.keystone_manager.authenticate() + token = self.keystone_manager.getToken() + url = token.getCatalog("nova")["url"] + "/" + resource + + headers = {"Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "python-novaclient", + "X-Auth-Project-Id": token.getProject()["name"], + "X-Auth-Token": token.getId()} + + if method == "GET": + request = requests.get(url, headers=headers, + params=data, timeout=self.timeout) + elif method == "POST": + request = requests.post(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + elif method == "PUT": + request = requests.put(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + elif method == "HEAD": + request = requests.head(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + elif method == "DELETE": + request = requests.delete(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + else: + raise Exception("wrong HTTP method: %s" % method) + + if request.status_code != requests.codes.ok: + request.raise_for_status() + + if request.text: + return request.json() + else: + return None + + def getTarget(self, topic, exchange=None, namespace=None, + version=None, server=None): + return self.messagingAPI.getTarget(topic=topic, + namespace=namespace, + exchange=exchange, + version=version, + server=server) + + def getRPCClient(self, target, version_cap=None, serializer=None): + return self.messagingAPI.getRPCClient(target, + version_cap=version_cap, + serializer=serializer) + + def getRPCServer(self, target, endpoints, serializer=None): + return self.messagingAPI.getRPCServer(target, + endpoints, + serializer=serializer) + + def getNotificationListener(self, targets, endpoints): + return self.messagingAPI.getNotificationListener(targets, endpoints) + + def getResourceUsage(self, prj_ids, from_date, to_date): + # LOG.info("getUsage: fromDate=%s period_length=%s days" + # % (fromDate, period_length)) + # print("getUsage: fromDate=%s period_length=%s days" + # % (fromDate, period_length)) + + # period = str(period_length) + + resource_usage = {} + connection = self.db_engine.connect() + + try: + ids = "" + + if prj_ids is not None: + ids = " project_id IN (" + + for prj_id in prj_ids: + ids += "%r, " % str(prj_id) + + if "," in ids: + ids = ids[:-2] + + ids += ") and" + + QUERY = """select user_id, project_id, sum(TIMESTAMPDIFF(SECOND, \ +IF(launched_at<='%(from_date)s', '%(from_date)s', IFNULL(launched_at, \ +'%(from_date)s')), IF(terminated_at>='%(to_date)s', '%(to_date)s', \ +IFNULL(terminated_at, '%(to_date)s')))*memory_mb) as memory_usage, \ +sum(TIMESTAMPDIFF(SECOND, IF(launched_at<='%(from_date)s', '%(from_date)s', \ +IFNULL(launched_at, '%(from_date)s')), IF(terminated_at>='%(to_date)s', \ +'%(to_date)s', IFNULL(terminated_at, '%(to_date)s')))*vcpus) as vcpus_usage \ +from nova.instances where %(prj_ids)s launched_at is not NULL and \ +launched_at<='%(to_date)s' and (terminated_at>='%(from_date)s' or \ +terminated_at is NULL) group by user_id, project_id\ +""" % {"prj_ids": ids, "from_date": from_date, "to_date": to_date} + + result = connection.execute(QUERY) + + # LOG.info("QUERY %s\n" % QUERY) + # print("from_date %s\n" % from_date) + # print("to_date %s\n" % to_date) + + prj_id = 0 + user_id = 0 + project = None + + # for row in result.fetchall(): + for row in result: + # LOG.info("row=%s" % row) + user_id = row[0] + prj_id = row[1] + + if (prj_ids is not None and prj_id not in prj_ids): + LOG.warn("project not found: %s" % prj_id) + continue + + if prj_id not in resource_usage: + resource_usage[prj_id] = {} + + project = resource_usage.get(prj_id) + + project[user_id] = {"ram": float(row[2]), + "cores": float(row[3])} + + except SQLAlchemyError as ex: + raise Exception(ex.message) + finally: + connection.close() + + return resource_usage + + def getProjectUsage(self, prj_id): + connection = self.db_engine.connect() + usage = {"instances": [], "cores": 0, "ram": 0} + + try: + # retrieve the amount of resources in terms of cores + # and ram the specified project is consuming + QUERY = """select uuid, vcpus, memory_mb from nova.instances \ +where project_id='%(project_id)s' and deleted_at is NULL and (vm_state in \ +('error') or (vm_state in ('active') and terminated_at is NULL))\ +""" % {"project_id": prj_id} + + result = connection.execute(QUERY) + + for row in result.fetchall(): + usage["instances"].append(str(row[0])) + usage["cores"] += row[1] + usage["ram"] += row[2] + except SQLAlchemyError as ex: + raise Exception(ex.message) + finally: + connection.close() + + return usage + + def getExpiredServers(self, prj_id, instances, TTL): + uuids = [] + connection = self.db_engine.connect() + + try: + # retrieve all expired instances for the specified + # project and expiration time + ids = "'%s'" % "', '".join(instances) + + QUERY = """select uuid from nova.instances where project_id = \ +'%(project_id)s' and deleted_at is NULL and (vm_state in ('error') or \ +(uuid in (%(instances)s) and ((vm_state in ('active') and terminated_at is \ +NULL and timestampdiff(minute, launched_at, utc_timestamp()) >= \ +%(expiration)s) or (vm_state in ('building') and task_state in ('scheduling') \ +and created_at != updated_at and timestampdiff(minute, updated_at, \ +utc_timestamp()) >= 20))))""" % {"project_id": prj_id, + "instances": ids, + "expiration": TTL} + + # LOG.info(QUERY) + result = connection.execute(QUERY) + + for row in result.fetchall(): + uuids.append(row[0]) + except SQLAlchemyError as ex: + raise Exception(ex.message) + finally: + connection.close() + + return uuids diff --git a/synergy_scheduler_manager/queue_manager.py b/synergy_scheduler_manager/queue_manager.py new file mode 100644 index 0000000..7ae231d --- /dev/null +++ b/synergy_scheduler_manager/queue_manager.py @@ -0,0 +1,466 @@ +import heapq +import json +import logging +import threading + +from datetime import datetime + +try: + from oslo_config import cfg +except ImportError: + from oslo.config import cfg + +from sqlalchemy import create_engine +from sqlalchemy.exc import SQLAlchemyError + +from synergy.common.manager import Manager + + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class QueueItem(object): + + def __init__(self, id, user_id, prj_id, priority, + retry_count, creation_time, last_update, data=None): + self.id = id + self.user_id = user_id + self.prj_id = prj_id + self.priority = priority + self.retry_count = retry_count + self.creation_time = creation_time + self.last_update = last_update + self.data = data + + def getId(self): + return self.id + + def setId(self, id): + self.id = id + + def getUserId(self): + return self.user_id + + def setUserId(self, user_id): + self.user_id = user_id + + def getProjectId(self): + return self.prj_id + + def setProjectId(self, prj_id): + self.prj_id = prj_id + + def getPriority(self): + return self.priority + + def setPriority(self, priority): + self.priority = priority + + def getRetryCount(self): + return self.retry_count + + def setRetryCount(self, retry_count): + self.retry_count = retry_count + + def incRetryCount(self): + self.retry_count += 1 + + def getCreationTime(self): + return self.creation_time + + def setCreationTime(self, creation_time): + self.creation_time = creation_time + + def getLastUpdate(self): + return self.last_update + + def setLastUpdate(self, last_update): + self.last_update = last_update + + def getData(self): + return self.data + + def setData(self, data): + self.data + + +class PriorityQueue(object): + + def __init__(self): + self.queue = [] + self._index = 0 + + def put(self, priority, item): + heapq.heappush(self.queue, (-priority, self._index, item)) + self._index += 1 + + def get(self): + return heapq.heappop(self.queue)[-1] + + def size(self): + return len(self.queue) + + +class Queue(object): + + def __init__(self, name, db_engine, fairshare_manager=None): + self.name = name + self.db_engine = db_engine + self.fairshare_manager = fairshare_manager + self.is_closed = False + self.priority_updater = None + self.condition = threading.Condition() + self.pqueue = PriorityQueue() + self.createTable() + self.buildFromDB() + + def getName(self): + return self.name + + def getSize(self): + connection = self.db_engine.connect() + + try: + QUERY = "select count(*) from `%s`" % self.name + result = connection.execute(QUERY) + + row = result.fetchone() + + return row[0] + except SQLAlchemyError as ex: + raise Exception(ex.message) + finally: + connection.close() + + def createTable(self): + TABLE = """CREATE TABLE IF NOT EXISTS `%s` (`id` BIGINT NOT NULL \ +AUTO_INCREMENT PRIMARY KEY, `priority` INT DEFAULT 0, user_id CHAR(40) \ +NOT NULL, prj_id CHAR(40) NOT NULL, `retry_count` INT DEFAULT 0, \ +`creation_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `last_update` \ +TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.name + + connection = self.db_engine.connect() + + try: + connection.execute(TABLE) + except SQLAlchemyError as ex: + raise Exception(ex.message) + except Exception as ex: + raise Exception(ex.message) + finally: + connection.close() + + def close(self): + if not self.is_closed: + self.is_closed = True + + with self.condition: + self.condition.notifyAll() + + def isClosed(self): + return self.is_closed + + def buildFromDB(self): + connection = self.db_engine.connect() + + try: + QUERY = "select id, user_id, prj_id, priority, retry_count, " \ + "creation_time, last_update from `%s`" % self.name + result = connection.execute(QUERY) + + for row in result: + queue_item = QueueItem(row[0], row[1], row[2], + row[3], row[4], row[5], row[6]) + + self.pqueue.put(row[3], queue_item) + except SQLAlchemyError as ex: + raise Exception(ex.message) + finally: + connection.close() + + with self.condition: + self.condition.notifyAll() + + def insertItem(self, user_id, prj_id, priority, data): + with self.condition: + idRecord = -1 + QUERY = "insert into `%s` (user_id, prj_id, priority, " \ + "data) values" % self.name + QUERY += "(%s, %s, %s, %s)" + + connection = self.db_engine.connect() + trans = connection.begin() + + try: + result = connection.execute(QUERY, + [user_id, prj_id, priority, + json.dumps(data)]) + + idRecord = result.lastrowid + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + raise Exception(ex.message) + finally: + connection.close() + + now = datetime.now() + queue_item = QueueItem(idRecord, user_id, prj_id, + priority, 0, now, now) + + self.pqueue.put(priority, queue_item) + + self.condition.notifyAll() + + def reinsertItem(self, queue_item): + with self.condition: + self.pqueue.put(queue_item.getPriority(), queue_item) + self.condition.notifyAll() + + def getItem(self): + item = None + queue_item = None + + with self.condition: + while (queue_item is None and not self.is_closed): + if self.pqueue.size() > 0: + queue_item = self.pqueue.get() + + # self.pqueue.task_done() + else: + self.condition.wait() + + if (not self.is_closed and queue_item is not None): + connection = self.db_engine.connect() + + try: + QUERY = """select user_id, prj_id, priority, \ +retry_count, creation_time, last_update, data from `%s`""" % self.name + QUERY += " where id=%s" + + result = connection.execute(QUERY, [queue_item.getId()]) + + row = result.fetchone() + + item = QueueItem(queue_item.getId(), row[0], row[1], + row[2], row[3], row[4], row[5], + json.loads(row[6])) + except SQLAlchemyError as ex: + raise Exception(ex.message) + finally: + connection.close() + + self.condition.notifyAll() + return item + + def deleteItem(self, queue_item): + if not queue_item: + return + + with self.condition: + connection = self.db_engine.connect() + trans = connection.begin() + + try: + QUERY = "delete from `%s`" % self.name + QUERY += " where id=%s" + + connection.execute(QUERY, [queue_item.getId()]) + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + + raise Exception(ex.message) + finally: + connection.close() + self.condition.notifyAll() + + def updateItem(self, queue_item): + if not queue_item: + return + + with self.condition: + connection = self.db_engine.connect() + trans = connection.begin() + + try: + queue_item.setLastUpdate(datetime.now()) + + QUERY = "update `%s`" % self.name + QUERY += " set priority=%s, retry_count=%s, " \ + "last_update=%s where id=%s" + + connection.execute(QUERY, [queue_item.getPriority(), + queue_item.getRetryCount(), + queue_item.getLastUpdate(), + queue_item.getId()]) + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + + raise Exception(ex.message) + finally: + connection.close() + + self.pqueue.put(queue_item.getPriority(), queue_item) + self.condition.notifyAll() + + def updatePriority(self): + if self.fairshare_manager is None: + # LOG.warn("priority_updater not found!!!") + return + + with self.condition: + # now = datetime.now() + queue_items = [] + + connection = self.db_engine.connect() + + while self.pqueue.size() > 0: + queue_item = self.pqueue.get() + priority = queue_item.getPriority() + + try: + priority = self.fairshare_manager.execute( + "CALCULATE_PRIORITY", + user_id=queue_item.getUserId(), + prj_id=queue_item.getProjectId(), + timestamp=queue_item.getCreationTime(), + retry=queue_item.getRetryCount()) + + queue_item.setPriority(priority) + except Exception as ex: + continue + finally: + queue_items.append(queue_item) + + trans = connection.begin() + + try: + queue_item.setLastUpdate(datetime.now()) + + QUERY = "update `%s`" % self.name + QUERY += " set priority=%s, last_update=%s where id=%s" + + connection.execute(QUERY, [queue_item.getPriority(), + queue_item.getLastUpdate(), + queue_item.getId()]) + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + raise Exception(ex.message) + + connection.close() + + if len(queue_items) > 0: + for queue_item in queue_items: + self.pqueue.put(queue_item.getPriority(), queue_item) + + del queue_items + + self.condition.notifyAll() + + def toDict(self): + queue = {} + queue["name"] = self.name + queue["size"] = self.getSize() + # queue["size"] = self.pqueue.size() + + if self.is_closed: + queue["status"] = "OFF" + else: + queue["status"] = "ON" + + return queue + + +class QueueManager(Manager): + + def __init__(self): + Manager.__init__(self, name="QueueManager") + + self.config_opts = [ + cfg.StrOpt("db_connection", help="the DB url", required=True), + cfg.IntOpt('db_pool_size', default=10, required=False), + cfg.IntOpt('db_max_overflow', default=5, required=False) + ] + + def setup(self): + if self.getManager("FairShareManager") is None: + raise Exception("FairShareManager not found!") + + self.fairshare_manager = self.getManager("FairShareManager") + + self.queue_list = {} + db_connection = CONF.QueueManager.db_connection + pool_size = CONF.QueueManager.db_pool_size + max_overflow = CONF.QueueManager.db_max_overflow + + try: + self.db_engine = create_engine(db_connection, + pool_size=pool_size, + max_overflow=max_overflow) + except Exception as ex: + LOG.error(ex) + raise ex + + def execute(self, command, *args, **kargs): + if command == "CREATE_QUEUE": + return self.createQueue(*args, **kargs) + elif command == "DELETE_QUEUE": + return self.deleteQueue(*args, **kargs) + elif command == "GET_QUEUE": + return self.getQueue(*args, **kargs) + else: + raise Exception("command=%r not supported!" % command) + + def task(self): + for queue in self.queue_list.values(): + queue.updatePriority() + + def destroy(self): + for queue in self.queue_list.values(): + queue.close() + + def createQueue(self, name): + if name not in self.queue_list: + queue = Queue(name, self.db_engine, self.fairshare_manager) + self.queue_list[name] = queue + return queue + else: + raise Exception("the queue %r already exists!" % name) + + def deleteQueue(self, name): + if name not in self.queue_list: + raise Exception("queue %r not found!" % name) + + del self.queue_list[name] + + def getQueue(self, name): + if name not in self.queue_list: + raise Exception("queue %r not found!" % name) + + return self.queue_list[name] diff --git a/synergy_scheduler_manager/quota_manager.py b/synergy_scheduler_manager/quota_manager.py new file mode 100644 index 0000000..f25248e --- /dev/null +++ b/synergy_scheduler_manager/quota_manager.py @@ -0,0 +1,427 @@ +import ConfigParser +import logging +import threading + +try: + from oslo_config import cfg +except ImportError: + from oslo.config import cfg + +from synergy.common.manager import Manager + + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + + +CONF = cfg.CONF +CONFIG = ConfigParser.SafeConfigParser() +LOG = logging.getLogger(__name__) + + +class DynamicQuota(object): + + def __init__(self): + self.exit = False + self.projects = {} + self.ram = {"in_use": 0, "limit": 0} + self.cores = {"in_use": 0, "limit": 0} + self.condition = threading.Condition() + + def setSize(self, cores, ram): + self.ram["limit"] = ram + self.cores["limit"] = cores + + def getSize(self, cores, ram): + return {"cores": self.cores["limit"], "ram": self.ram["limit"]} + + def getProjects(self): + return self.projects + + def getProject(self, prj_id): + return self.projects.get(prj_id, None) + + def addProject(self, prj_id, prj_name, usage=None, TTL=0): + if prj_id not in self.projects: + with self.condition: + project = {"name": prj_name, + "cores": 0, + "ram": 0, + "instances": {"active": [], "pending": []}, + "TTL": 0} + + if usage is not None: + project["cores"] = usage["cores"] + project["ram"] = usage["ram"] + project["instances"]["active"].extend(usage["instances"]) + + self.ram["in_use"] += project["ram"] + self.cores["in_use"] += project["cores"] + + self.projects[prj_id] = project + self.condition.notifyAll() + + def removeProject(self, prj_id): + if prj_id in self.projects: + with self.condition: + project = self.projects[prj_id] + + self.ram["in_use"] -= project["ram"] + self.cores["in_use"] -= project["cores"] + + del self.projects[prj_id] + self.condition.notifyAll() + + return True + return False + + def close(self): + self.exit = True + + def allocate(self, instance_id, prj_id, cores, ram, blocking=True): + if prj_id not in self.projects: + return + + project = self.projects[prj_id] + + if project is None: + return + + found = False + + with self.condition: + if instance_id in project["instances"]["active"]: + found = True + elif instance_id in project["instances"]["pending"]: + found = True + else: + project["instances"]["pending"].append(instance_id) + + while (not self.exit and not found and + instance_id in project["instances"]["pending"]): + + LOG.debug("allocate instance_id=%s project=%s cores=%s " + "ram=%s [vcpu in use %s of %s; ram in use %s of %s]" + % (instance_id, + project["name"], + cores, + ram, + self.cores["in_use"], + self.cores["limit"], + self.ram["in_use"], + self.ram["limit"])) + + if (self.cores["limit"] - self.cores["in_use"] >= cores) and \ + (self.ram["limit"] - self.ram["in_use"] >= ram): + self.cores["in_use"] += cores + self.ram["in_use"] += ram + project["cores"] += cores + project["ram"] += ram + + found = True + project["instances"]["active"].append(instance_id) + project["instances"]["pending"].remove(instance_id) + + LOG.info("allocated instance_id=%s project=%s cores=%s ram" + "=%s [vcpu in use %s of %s; ram in use %s of %s]" + % (instance_id, + project["name"], + cores, + ram, + self.cores["in_use"], + self.cores["limit"], + self.ram["in_use"], + self.ram["limit"])) + elif blocking: + LOG.info("allocate wait!!!") + self.condition.wait() + + self.condition.notifyAll() + + return found + + def release(self, instance_id, prj_id, cores, ram): + if prj_id not in self.projects: + return + + project = self.projects[prj_id] + + LOG.debug("release instance_id=%s project=%s cores=%s " + "ram=%s [vcpu in use %s of %s; ram in use %s of %s]" + % (instance_id, + project["name"], + cores, + ram, + self.cores["in_use"], + self.cores["limit"], + self.ram["in_use"], + self.ram["limit"])) + + with self.condition: + if instance_id in project["instances"]["pending"]: + project["instances"]["pending"].remove(instance_id) + elif instance_id in instance_id in project["instances"]["active"]: + if self.cores["in_use"] - cores < 0: + self.cores["in_use"] = 0 + else: + self.cores["in_use"] -= cores + + if self.ram["in_use"] - ram < 0: + self.ram["in_use"] = 0 + else: + self.ram["in_use"] -= ram + + if project["cores"] - cores < 0: + project["cores"] = 0 + else: + project["cores"] -= cores + + if project["ram"] - ram < 0: + project["ram"] = 0 + else: + project["ram"] -= ram + + project["instances"]["active"].remove(instance_id) + + LOG.info("released instance_id=%s project=%s cores=%s " + "ram=%s [vcpu in use %s of %s; ram in use %s of %s]" + % (instance_id, + project["name"], + cores, + ram, + self.cores["in_use"], + self.cores["limit"], + self.ram["in_use"], + self.ram["limit"])) + else: + LOG.debug("release: instance '%s' not found!" % (instance_id)) + + self.condition.notifyAll() + + def toDict(self): + quota = {} + quota["ram"] = self.ram + quota["cores"] = self.cores + quota["projects"] = self.projects + + return quota + + +class QuotaManager(Manager): + + def __init__(self): + Manager.__init__(self, name="QuotaManager") + + def setup(self): + try: + self.dynamic_quota = DynamicQuota() + + if self.getManager("NovaManager") is None: + raise Exception("NovaManager not found!") + + if self.getManager("KeystoneManager") is None: + raise Exception("KeystoneManager not found!") + + self.nova_manager = self.getManager("NovaManager") + self.keystone_manager = self.getManager("KeystoneManager") + self.listener = None + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error(ex) + + def destroy(self): + LOG.info("destroy invoked!") + self.dynamic_quota.close() + + def execute(self, command, *args, **kargs): + if command == "ADD_PROJECT": + return self.addProject(*args, **kargs) + elif command == "GET_PROJECT": + return self.getProject(*args, **kargs) + elif command == "REMOVE_PROJECT": + return self.removeProject(*args, **kargs) + elif command == "GET_DYNAMIC_QUOTA": + return self.dynamic_quota + else: + raise Exception("command=%r not supported!" % command) + + def task(self): + try: + self.updateDynamicQuota() + self.deleteExpiredServices() + except Exception as ex: + LOG.error(ex) + + def getProject(self, prj_id): + return self.dynamic_quota.getProject(prj_id) + + def addProject(self, prj_id, prj_name): + try: + quota = {"cores": -1, "ram": -1, "instances": -1} + self.nova_manager.execute("UPDATE_QUOTA", prj_id, quota) + + usage = self.nova_manager.execute("GET_PROJECT_USAGE", prj_id) + self.dynamic_quota.addProject(prj_id, prj_name, usage) + + self.updateDynamicQuota() + except Exception as ex: + LOG.error(ex) + raise ex + + def removeProject(self, prj_id, destroy=False): + project = self.dynamic_quota.getProject(prj_id) + if project is None: + return + + try: + if destroy: + ids = [] + ids.extend(project["instances"]["active"]) + ids.extend(project["instances"]["pending"]) + + for instance_id in ids: + self.nova_manager.execute("DELETE_SERVER", instance_id) + + quota = self.nova_manager.execute("GET_QUOTA", defaults=True) + self.nova_manager.execute("UPDATE_QUOTA", prj_id, quota) + + self.dynamic_quota.removeProject(prj_id) + + self.updateDynamicQuota() + except Exception as ex: + LOG.error(ex) + raise ex + + def deleteExpiredServices(self): + for prj_id, project in self.dynamic_quota.projects.items(): + instance_ids = project["instances"]["active"] + TTL = project["TTL"] + + if project["TTL"] == 0: + continue + + try: + expired_ids = self.nova_manager.execute("GET_EXPIRED_SERVERS", + prj_id=prj_id, + instances=instance_ids, + expiration=TTL) + + for instance_id in expired_ids: + self.nova_manager.execute("DELETE_SERVER", instance_id) + except Exception as ex: + LOG.error(ex) + raise ex + + def updateDynamicQuota(self): + # calculate the the total limit per cores and ram + total_ram = float(0) + total_cores = float(0) + static_ram = float(0) + static_cores = float(0) + dynamic_ram = float(0) + dynamic_cores = float(0) + + try: + cpu_ratio = self.nova_manager.execute("GET_PARAMETER", + name="cpu_allocation_ratio", + default=float(16)) + + ram_ratio = self.nova_manager.execute("GET_PARAMETER", + name="ram_allocation_ratio", + default=float(16)) + + quota_default = self.nova_manager.execute("GET_QUOTA", + defaults=True) + + hypervisors = self.nova_manager.execute("GET_HYPERVISORS") + + for hypervisor in hypervisors: + if hypervisor["status"] == "enabled" and \ + hypervisor["state"] == "up": + info = self.nova_manager.execute("GET_HYPERVISOR", + hypervisor["id"]) + + total_ram += info["memory_mb"] + total_cores += info["vcpus"] + + total_ram *= float(ram_ratio) + total_cores *= float(cpu_ratio) + + # LOG.info("total_ram=%s total_cores=%s" + # % (total_ram, total_cores)) + + kprojects = self.keystone_manager.execute("GET_PROJECTS") + + for project in kprojects: + prj_id = project["id"] + # prj_name = str(project["name"]) + + if self.dynamic_quota.getProject(prj_id) is None: + quota = self.nova_manager.execute("GET_QUOTA", prj_id) + + if quota["cores"] == -1 and quota["ram"] == -1: + quota["cores"] = quota_default["cores"] + quota["ram"] = quota_default["ram"] + + try: + self.nova_manager.execute("UPDATE_QUOTA", + prj_id, + quota_default) + except Exception as ex: + LOG.error(ex) + + static_cores += quota["cores"] + static_ram += quota["ram"] + + enabled = False + + if total_cores < static_cores: + LOG.warn("dynamic quota: the total statically " + "allocated cores (%s) is greater than the total " + "amount of cores allowed (%s)" + % (static_cores, total_cores)) + else: + enabled = True + dynamic_cores = total_cores - static_cores + + if total_ram < static_ram: + enabled = False + LOG.warn("dynamic quota: the total statically " + "allocated ram (%s) is greater than the total " + "amount of ram allowed (%s)" + % (static_ram, total_ram)) + else: + enabled = True + dynamic_ram = total_ram - static_ram + + if enabled: + LOG.info("dynamic quota: cores=%s ram=%s" + % (dynamic_cores, dynamic_ram)) + + self.dynamic_quota.setSize(dynamic_cores, dynamic_ram) + + """ + LOG.info("cpu_ratio=%s, ram_ratio=%s" % (cpu_ratio, ram_ratio)) + LOG.info("total_cores=%s total_ram=%s" % (total_cores, total_ram)) + LOG.info("static cores=%s ram=%s" % (static_cores, static_ram)) + LOG.info("dynamic cores=%s ram=%s" % (dynamic_cores, dynamic_ram)) + """ + LOG.debug("dynamic quota %s" % self.dynamic_quota.toDict()) + except Exception as ex: + LOG.error(ex) + raise ex diff --git a/synergy_scheduler_manager/scheduler_manager.py b/synergy_scheduler_manager/scheduler_manager.py new file mode 100644 index 0000000..4e210ee --- /dev/null +++ b/synergy_scheduler_manager/scheduler_manager.py @@ -0,0 +1,496 @@ +import logging +import re +import threading + +from datetime import datetime + +try: + from oslo_config import cfg +except ImportError: + from oslo.config import cfg + +from synergy.common.manager import Manager + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class Notifications(object): + + def __init__(self, dynamic_quota): + super(Notifications, self).__init__() + + self.dynamic_quota = dynamic_quota + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.debug("Notification INFO: event_type=%s payload=%s" + % (event_type, payload)) + + if payload is None or "state" not in payload: + return + + state = payload["state"] + instance_id = payload["instance_id"] + + if ((event_type == "compute.instance.delete.end" and + (state == "deleted" or state == "error" or state == "building")) or + (event_type == "compute.instance.update" and + (state == "deleted" or state == "error")) or + (event_type == "scheduler.run_instance" and state == "error")): + ram = 0 + cores = 0 + prj_id = None + instance_info = None + + if event_type == "scheduler.run_instance": + instance_info = payload["request_spec"]["instance_type"] + else: + instance_info = payload + + prj_id = instance_info["tenant_id"] + instance_id = instance_info["instance_id"] + ram = instance_info["memory_mb"] + cores = instance_info["vcpus"] + # disk = instance_info["root_gb"] + # node = instance_info["node"] + + LOG.debug("Notification INFO (type=%s state=%s): cores=%s ram=%s " + "prj_id=%s instance_id=%s" + % (event_type, state, cores, ram, prj_id, instance_id)) + + try: + self.dynamic_quota.release(instance_id, prj_id, cores, ram) + except Exception as ex: + LOG.warn("Notification INFO: %s" % ex) + + def warn(self, ctxt, publisher_id, event_type, payload, metadata): + # LOG.info("Notification WARN: event_type=%s payload=%s metadata=%s" + # % (event_type, payload, metadata)) + + state = payload["state"] + instance_id = payload["instance_id"] + LOG.info("Notification WARN: event_type=%s state=%s instance_id=%s" + % (event_type, state, instance_id)) + + def error(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.info("Notification ERROR: event_type=%s payload=%s metadata=%s" + % (event_type, payload, metadata)) + + +class Worker(threading.Thread): + + def __init__(self, name, queue, quota, nova_manager): + super(Worker, self).__init__() + self.setDaemon(True) + + self.name = name + self.queue = queue + self.quota = quota + self.nova_manager = nova_manager + self.exit = False + LOG.info("Worker %r created!" % self.name) + + def getName(self): + return self.name + + def destroy(self): + try: + # if self.queue: + self.queue.close() + + self.exit = True + except Exception as ex: + LOG.error(ex) + raise ex + + def run(self): + LOG.info("Worker %r running!" % self.name) + + while not self.exit and not self.queue.isClosed(): + try: + queue_item = self.queue.getItem() + except Exception as ex: + LOG.error("Worker %r: %s" % (self.name, ex)) + # self.exit = True + # break + continue + + if queue_item is None: + continue + + try: + request = queue_item.getData() + + instance = request["instance"] + # user_id = instance["nova_object.data"]["user_id"] + prj_id = instance["nova_object.data"]["project_id"] + uuid = instance["nova_object.data"]["uuid"] + vcpus = instance["nova_object.data"]["vcpus"] + memory_mb = instance["nova_object.data"]["memory_mb"] + context = request["context"] + filter_properties = request["filter_properties"] + admin_password = request["admin_password"] + injected_files = request["injected_files"] + requested_networks = request["requested_networks"] + security_groups = request["security_groups"] + block_device_mapping = request["block_device_mapping"] + legacy_bdm = request["legacy_bdm"] + image = request["image"] + + try: + # vm_instance = self.novaConductorAPI.instance_get_by_uuid + # (context, instance_uuid=instance_uuid) + server = self.nova_manager.execute("GET_SERVER", + id=uuid) + except Exception as ex: + LOG.warn("Worker %s: server %r not found! reason=%s" + % (self.name, uuid, ex)) + self.queue.deleteItem(queue_item) + + self.quota.release(instance_id=uuid, + prj_id=prj_id, + cores=vcpus, + ram=memory_mb) + continue + + if server["OS-EXT-STS:vm_state"] != "building" or \ + server["OS-EXT-STS:task_state"] != "scheduling": + self.queue.deleteItem(queue_item) + + self.quota.release(instance_id=uuid, + prj_id=prj_id, + cores=vcpus, + ram=memory_mb) + continue + + # LOG.info(request_spec) + + # if (self.quota.reserve(instance_uuid, vcpus, memory_mb)): + # done = False + + if self.quota.allocate(instance_id=uuid, + prj_id=prj_id, + cores=vcpus, + ram=memory_mb, + blocking=True): + try: + self.nova_manager.execute( + "BUILD_SERVER", + context=context, + instance=instance, + image=image, + filter_properties=filter_properties, + admin_password=admin_password, + injected_files=injected_files, + requested_networks=requested_networks, + security_groups=security_groups, + block_device_mapping=block_device_mapping, + legacy_bdm=legacy_bdm) + + LOG.info("Worker %r: server (instance_id=%s) build OK" + % (self.name, uuid)) + except Exception as ex: + LOG.error("Worker %r: error on building the server " + "(instance_id=%s) reason=%s" + % (self.name, uuid, ex)) + + self.quota.release(instance_id=uuid, + prj_id=prj_id, + cores=vcpus, + ram=memory_mb) + + self.queue.deleteItem(queue_item) + except Exception as ex: + LOG.error("Worker '%s': %s" % (self.name, ex)) + # self.queue.reinsertItem(queue_item) + + continue + + # LOG.info("Worker done is %s" % done) + + # LOG.info(">>>> Worker '%s' queue.isClosed %s exit=%s" + # % (self.name, self.queue.isClosed(), self.exit)) + LOG.info("Worker '%s' destroyed!" % self.name) + + +class SchedulerManager(Manager): + + def __init__(self): + Manager.__init__(self, name="SchedulerManager") + + self.config_opts = [ + cfg.FloatOpt('default_TTL', default=10.0), + cfg.ListOpt("projects", default=[], help="the projects list"), + cfg.ListOpt("shares", default=[], help="the shares list"), + cfg.ListOpt("TTLs", default=[], help="the TTLs list"), + ] + + def setup(self): + if self.getManager("NovaManager") is None: + raise Exception("NovaManager not found!") + + if self.getManager("QueueManager") is None: + raise Exception("QueueManager not found!") + + if self.getManager("QuotaManager") is None: + raise Exception("QuotaManager not found!") + + if self.getManager("KeystoneManager") is None: + raise Exception("KeystoneManager not found!") + + if self.getManager("FairShareManager") is None: + raise Exception("FairShareManager not found!") + + self.nova_manager = self.getManager("NovaManager") + self.queue_manager = self.getManager("QueueManager") + self.quota_manager = self.getManager("QuotaManager") + self.keystone_manager = self.getManager("KeystoneManager") + self.fairshare_manager = self.getManager("FairShareManager") + self.default_TTL = float(CONF.SchedulerManager.default_TTL) + self.fairshare_manager = self.getManager("FairShareManager") + self.projects = {} + self.workers = [] + self.listener = None + self.exit = False + + try: + self.dynamic_quota = self.quota_manager.execute( + "GET_DYNAMIC_QUOTA") + + k_projects = self.keystone_manager.execute("GET_PROJECTS") + + for k_project in k_projects: + prj_id = str(k_project["id"]) + prj_name = str(k_project["name"]) + + if prj_name in CONF.SchedulerManager.projects: + CONF.SchedulerManager.projects.remove(prj_name) + + self.projects[prj_name] = {"id": prj_id, + "name": prj_name, + "type": "dynamic", + "share": float(0), + "TTL": self.default_TTL} + + if len(CONF.SchedulerManager.projects) > 0: + raise Exception("projects %s not found" + % CONF.SchedulerManager.projects) + + for prj_ttl in CONF.SchedulerManager.TTLs: + prj_name, TTL = self.parseAttribute(prj_ttl) + self.projects[prj_name]["TTL"] = TTL + + for prj_share in CONF.SchedulerManager.shares: + prj_name, share = self.parseAttribute(prj_share) + self.projects[prj_name]["share"] = share + + for project in self.projects.values(): + prj_id = project["id"] + prj_name = project["name"] + prj_share = project["share"] + + del self.projects[prj_name] + self.projects[prj_id] = project + + quota = {"cores": -1, "ram": -1, "instances": -1} + + self.nova_manager.execute("UPDATE_QUOTA", + id=prj_id, + data=quota) + + self.quota_manager.execute("ADD_PROJECT", + prj_id=prj_id, + prj_name=prj_name) + + self.fairshare_manager.execute("ADD_PROJECT", + prj_id=prj_id, + prj_name=prj_name, + share=prj_share) + try: + self.dynamic_queue = self.queue_manager.execute("CREATE_QUEUE", + name="DYNAMIC") + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error(ex) + + self.dynamic_queue = self.queue_manager.execute("GET_QUEUE", + name="DYNAMIC") + + dynamic_worker = Worker(name="DYNAMIC", + queue=self.dynamic_queue, + quota=self.dynamic_quota, + nova_manager=self.nova_manager) + dynamic_worker.start() + + self.workers.append(dynamic_worker) + + print(self.projects) + print(self.dynamic_quota.toDict()) + + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error(ex) + raise ex + + def parseAttribute(self, attribute): + if attribute is None: + return None + + prj_name = None + value = float(0) + + parsed_attribute = re.split('=', attribute) + + if len(parsed_attribute) > 1: + if not parsed_attribute[-1].isdigit(): + raise Exception("wrong value %r found in %r!" + % (parsed_attribute[-1], parsed_attribute)) + + if len(parsed_attribute) == 2: + prj_name = parsed_attribute[0] + value = float(parsed_attribute[1]) + else: + raise Exception("wrong attribute definition: %r" + % parsed_attribute) + else: + raise Exception("wrong attribute definition: %r" + % parsed_attribute) + + return (prj_name, value) + + def execute(self, command, *args, **kargs): + if command == "PROCESS_REQUEST": + return self.processRequest(*args, **kargs) + else: + raise Exception("command=%r not supported!" % command) + + def task(self): + if self.listener is None: + self.notifications = Notifications(self.dynamic_quota) + + target = self.nova_manager.execute("GET_TARGET", + topic='notifications', + exchange="nova") + self.listener = self.nova_manager.execute( + "GET_NOTIFICATION_LISTENER", + targets=[target], + endpoints=[self.notifications]) + + LOG.info("listener created") + + self.listener.start() + for prj_id, project in self.dynamic_quota.getProjects().items(): + instances = project["instances"]["active"] + TTL = self.projects[prj_id]["TTL"] + uuids = self.nova_manager.execute("GET_EXPIRED_SERVERS", + prj_id=prj_id, + instances=instances, + TTL=TTL) + + for uuid in uuids: + LOG.info("deleting the expired instance %r from project=%s" + % (uuid, prj_id)) + self.nova_manager.execute("DELETE_SERVER", id=uuid) + + def destroy(self): + if self.workers: + for queue_worker in self.workers: + queue_worker.destroy() + + def processRequest(self, request): + try: + filter_properties = request["filter_properties"] + instance = request["instance"] + user_id = instance["nova_object.data"]["user_id"] + prj_id = instance["nova_object.data"]["project_id"] + uuid = instance["nova_object.data"]["uuid"] + vcpus = instance["nova_object.data"]["vcpus"] + memory_mb = instance["nova_object.data"]["memory_mb"] + + if prj_id in self.projects: + # prj_name = self.projects[prj_id]["name"] + # metadata = instance["nova_object.data"]["metadata"] + timestamp = instance["nova_object.data"]["created_at"] + timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ") + priority = 0 + + try: + if "retry" in filter_properties: + retry = filter_properties["retry"] + num_attempts = retry["num_attempts"] + + if num_attempts > 0: + self.dynamic_quota.release(instance_id=uuid, + prj_id=prj_id, + cores=vcpus, + ram=memory_mb) + priority = 999999999999 + LOG.info("released resource uuid %s " + "num_attempts %s" % (uuid, num_attempts)) + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error(ex) + + if priority == 0: + priority = self.fairshare_manager.execute( + "CALCULATE_PRIORITY", + user_id=user_id, + prj_id=prj_id, + timestamp=timestamp, + retry=0) + + self.dynamic_queue.insertItem(user_id, + prj_id, + priority=priority, + data=request) + + LOG.info("new request: instance_id=%s user_id=%s prj_id=%s " + "priority=%s type=dynamic" % (uuid, user_id, + prj_id, priority)) + + else: + context = request["context"] + admin_password = request["admin_password"] + injected_files = request["injected_files"] + requested_networks = request["requested_networks"] + security_groups = request["security_groups"] + block_device_mapping = request["block_device_mapping"] + legacy_bdm = request["legacy_bdm"] + image = request["image"] + + self.nova_manager.execute( + "BUILD_SERVER", + context=context, + instance=instance, + image=image, + filter_properties=filter_properties, + admin_password=admin_password, + injected_files=injected_files, + requested_networks=requested_networks, + security_groups=security_groups, + block_device_mapping=block_device_mapping, + legacy_bdm=legacy_bdm) + + LOG.info("new request: instance_id=%s user_id=%s " + "prj_id=%s type=static" % (uuid, user_id, prj_id)) + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error(ex) diff --git a/synergy_scheduler_manager/tests/__init__.py b/synergy_scheduler_manager/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/synergy_scheduler_manager/tests/base.py b/synergy_scheduler_manager/tests/base.py new file mode 100644 index 0000000..bc2d9c8 --- /dev/null +++ b/synergy_scheduler_manager/tests/base.py @@ -0,0 +1,18 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslotest import base + + +class TestCase(base.BaseTestCase): + + """Test case base class for all unit tests.""" diff --git a/synergy_scheduler_manager/tests/test_dynamic_quota.py b/synergy_scheduler_manager/tests/test_dynamic_quota.py new file mode 100644 index 0000000..9f25dbd --- /dev/null +++ b/synergy_scheduler_manager/tests/test_dynamic_quota.py @@ -0,0 +1,26 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from synergy_scheduler_manager.quota_manager import DynamicQuota +from synergy_scheduler_manager.tests import base + + +class TestDynamicQuota(base.TestCase): + + def setUp(self): + super(TestDynamicQuota, self).setUp() + self.dyn_quota = DynamicQuota() + + def test_add_project(self): + project_id = 1 + self.dyn_quota.addProject(project_id, "test_project") + self.assertIn(project_id, self.dyn_quota.getProjects()) diff --git a/test-requirements.txt b/test-requirements.txt new file mode 100644 index 0000000..21a7e3b --- /dev/null +++ b/test-requirements.txt @@ -0,0 +1,14 @@ +# The order of packages is significant, because pip processes them in the order +# of appearance. Changing the order has an impact on the overall integration +# process, which may cause wedges in the gate later. + +hacking<0.11,>=0.10.0 + +coverage>=3.6 +python-subunit>=0.0.18 +sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 +oslosphinx>=2.5.0 # Apache-2.0 +oslotest>=1.10.0 # Apache-2.0 +testrepository>=0.0.18 +testscenarios>=0.4 +testtools>=1.4.0 diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..a1cec90 --- /dev/null +++ b/tox.ini @@ -0,0 +1,60 @@ +[tox] +minversion = 2.0 +envlist = py27-constraints,pep8-constraints +skipsdist = True + +[testenv] +usedevelop = True +install_command = + constraints: {[testenv:common-constraints]install_command} + pip install -U {opts} {packages} +setenv = + VIRTUAL_ENV={envdir} +deps = -r{toxinidir}/test-requirements.txt +commands = python setup.py test --slowest --testr-args='{posargs}' + +[testenv:common-constraints] +install_command = pip install -c{env:UPPER_CONSTRAINTS_FILE:https://git.openstack.org/cgit/openstack/requirements/plain/upper-constraints.txt} {opts} {packages} + +[testenv:pep8] +commands = flake8 {posargs} + +[testenv:pep8-constraints] +install_command = {[testenv:common-constraints]install_command} +commands = flake8 {posargs} + +[testenv:venv] +commands = {posargs} + +[testenv:venv-constraints] +install_command = {[testenv:common-constraints]install_command} +commands = {posargs} + +[testenv:cover] +commands = python setup.py test --coverage --testr-args='{posargs}' + +[testenv:cover-constraints] +install_command = {[testenv:common-constraints]install_command} +commands = python setup.py test --coverage --testr-args='{posargs}' + +[testenv:docs] +commands = python setup.py build_sphinx + +[testenv:docs-constraints] +install_command = {[testenv:common-constraints]install_command} +commands = python setup.py build_sphinx + +[testenv:debug] +commands = oslo_debug_helper {posargs} + +[testenv:debug-constraints] +install_command = {[testenv:common-constraints]install_command} +commands = oslo_debug_helper {posargs} + +[flake8] +# E123, E125 skipped as they are invalid PEP-8. + +show-source = True +ignore = E123,E125 +builtins = _ +exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build