diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..76045da --- /dev/null +++ b/.coveragerc @@ -0,0 +1,7 @@ +[run] +branch = True +source = synergy_scheduler_manager +omit = synergy_scheduler_manager/openstack/* + +[report] +ignore_errors = True diff --git a/.testr.conf b/.testr.conf new file mode 100644 index 0000000..6d83b3c --- /dev/null +++ b/.testr.conf @@ -0,0 +1,7 @@ +[DEFAULT] +test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \ + OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \ + OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \ + ${PYTHON:-python} -m subunit.run discover -t ./ . $LISTOPT $IDOPTION +test_id_option=--load-list $IDFILE +test_list_option=--list diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst new file mode 100644 index 0000000..0866929 --- /dev/null +++ b/CONTRIBUTING.rst @@ -0,0 +1,17 @@ +If you would like to contribute to the development of OpenStack, you must +follow the steps in this page: + + http://docs.openstack.org/infra/manual/developers.html + +If you already have a good understanding of how the system works and your +OpenStack accounts are set up, you can skip to the development workflow +section of this documentation to learn how changes to OpenStack should be +submitted for review via the Gerrit tool: + + http://docs.openstack.org/infra/manual/developers.html#development-workflow + +Pull requests submitted through GitHub will be ignored. + +Bugs should be filed on Launchpad, not GitHub: + + https://bugs.launchpad.net/synergy-scheduler-manager diff --git a/HACKING.rst b/HACKING.rst new file mode 100644 index 0000000..56da37d --- /dev/null +++ b/HACKING.rst @@ -0,0 +1,4 @@ +synergy-scheduler-manager Style Commandments +=============================================== + +Read the OpenStack Style Commandments http://docs.openstack.org/developer/hacking/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..68c771a --- /dev/null +++ b/LICENSE @@ -0,0 +1,176 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..c978a52 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,6 @@ +include AUTHORS +include ChangeLog +exclude .gitignore +exclude .gitreview + +global-exclude *.pyc diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..c293f8e --- /dev/null +++ b/README.rst @@ -0,0 +1,26 @@ +------------------------------ + SYNERGY SCHEDULER MANAGER +------------------------------ + +The Scheduler Manager + +Synergy is as a new extensible general purpose management OpenStack service. +Its capabilities are implemented by a collection of managers which are specific +and independent pluggable tasks, executed periodically or interactively. The +managers can interact with each other in a loosely coupled way. +The Scheduler Manager provides advanced scheduling (fairshare) capability for +OpenStack. In particular it aims to address the resource utilization issues +coming from the static allocation model inherent in the Cloud paradigm, by +adopting the dynamic partitioning strategy implemented by the advanced batch +schedulers. + + +* Free software: Apache license +* Documentation: http://docs.openstack.org/developer/synergy-scheduler-manager +* Source: http://git.openstack.org/cgit/openstack/synergy-scheduler-manager +* Bugs: http://bugs.launchpad.net/synergy-scheduler-manager + +Features +-------- + +* TODO diff --git a/babel.cfg b/babel.cfg new file mode 100644 index 0000000..15cd6cb --- /dev/null +++ b/babel.cfg @@ -0,0 +1,2 @@ +[python: **.py] + diff --git a/doc/source/conf.py b/doc/source/conf.py new file mode 100755 index 0000000..1f2d516 --- /dev/null +++ b/doc/source/conf.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys + +sys.path.insert(0, os.path.abspath('../..')) +# -- General configuration ---------------------------------------------------- + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones. +extensions = [ + 'sphinx.ext.autodoc', + #'sphinx.ext.intersphinx', + 'oslosphinx' +] + +# autodoc generation is a bit aggressive and a nuisance when doing heavy +# text edit cycles. +# execute "export SPHINX_DEBUG=1" in your terminal to disable + +# The suffix of source filenames. +source_suffix = '.rst' + +# The master toctree document. +master_doc = 'index' + +# General information about the project. +project = u'synergy-scheduler-manager' + +# If true, '()' will be appended to :func: etc. cross-reference text. +add_function_parentheses = True + +# If true, the current module name will be prepended to all description +# unit titles (such as .. function::). +add_module_names = True + +# The name of the Pygments (syntax highlighting) style to use. +pygments_style = 'sphinx' + +# -- Options for HTML output -------------------------------------------------- + +# The theme to use for HTML and HTML Help pages. Major themes that come with +# Sphinx are currently 'default' and 'sphinxdoc'. +# html_theme_path = ["."] +# html_theme = '_theme' +# html_static_path = ['static'] + +# Output file base name for HTML help builder. +htmlhelp_basename = '%sdoc' % project + +# Grouping the document tree into LaTeX files. List of tuples +# (source start file, target name, title, author, documentclass +# [howto/manual]). +latex_documents = [ + ('index', + '%s.tex' % project, + u'%s Documentation' % project, + u'OpenStack Foundation', 'manual'), +] + +# Example configuration for intersphinx: refer to the Python standard library. +#intersphinx_mapping = {'http://docs.python.org/': None} diff --git a/doc/source/contributing.rst b/doc/source/contributing.rst new file mode 100644 index 0000000..1728a61 --- /dev/null +++ b/doc/source/contributing.rst @@ -0,0 +1,4 @@ +============ +Contributing +============ +.. include:: ../../CONTRIBUTING.rst diff --git a/doc/source/index.rst b/doc/source/index.rst new file mode 100644 index 0000000..1513578 --- /dev/null +++ b/doc/source/index.rst @@ -0,0 +1,25 @@ +.. synergy-scheduler-manager documentation master file, created by + sphinx-quickstart on Tue Jul 9 22:26:36 2013. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Welcome to synergy-scheduler-manager's documentation! +======================================================== + +Contents: + +.. toctree:: + :maxdepth: 2 + + readme + installation + usage + contributing + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` + diff --git a/doc/source/installation.rst b/doc/source/installation.rst new file mode 100644 index 0000000..d0ef516 --- /dev/null +++ b/doc/source/installation.rst @@ -0,0 +1,12 @@ +============ +Installation +============ + +At the command line:: + + $ pip install synergy-scheduler-manager + +Or, if you have virtualenvwrapper installed:: + + $ mkvirtualenv synergy-scheduler-manager + $ pip install synergy-scheduler-manager diff --git a/doc/source/readme.rst b/doc/source/readme.rst new file mode 100644 index 0000000..a6210d3 --- /dev/null +++ b/doc/source/readme.rst @@ -0,0 +1 @@ +.. include:: ../../README.rst diff --git a/doc/source/usage.rst b/doc/source/usage.rst new file mode 100644 index 0000000..7111ba8 --- /dev/null +++ b/doc/source/usage.rst @@ -0,0 +1,5 @@ +======== +Usage +======== + +TODO diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e8ac661 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +# The order of packages is significant, because pip processes them in the order +# of appearance. Changing the order has an impact on the overall integration +# process, which may cause wedges in the gate later. + +pbr>=1.6 +synergy-service==0.2.0.dev4 +oslo.config<2.0.0 +oslo.messaging<2.0.0 +sqlalchemy==1.0.13 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..dd87780 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,53 @@ +[metadata] +name = synergy-scheduler-manager +version = 0.1 +summary = Provide advanced scheduling (fairshare) capability for OpenStack +description-file = + README.rst +author = Lisa Zangrando +author-email = lisa.zangrando@pd.infn.it +home-page = https://launchpad.net/synergy-scheduler-manager +classifier = + Environment :: OpenStack + Intended Audience :: Information Technology + Intended Audience :: System Administrators + License :: OSI Approved :: Apache Software License + Operating System :: POSIX :: Linux + Programming Language :: Python + Programming Language :: Python :: 2 + Programming Language :: Python :: 2.7 + +[files] +packages = + synergy_scheduler_manager + +[entry_points] +synergy.managers = + KeystoneManager = synergy_scheduler_manager.keystone_manager:KeystoneManager + NovaManager = synergy_scheduler_manager.nova_manager:NovaManager + QueueManager = synergy_scheduler_manager.queue_manager:QueueManager + QuotaManager = synergy_scheduler_manager.quota_manager:QuotaManager + FairShareManager = synergy_scheduler_manager.fairshare_manager:FairShareManager + SchedulerManager = synergy_scheduler_manager.scheduler_manager:SchedulerManager + +[build_sphinx] +source-dir = doc/source +build-dir = doc/build +all_files = 1 + +[upload_sphinx] +upload-dir = doc/build/html + +[compile_catalog] +directory = synergy_scheduler_manager/locale +domain = synergy_scheduler_manager + +[update_catalog] +domain = synergy_scheduler_manager +output_dir = synergy_scheduler_manager/locale +input_file = synergy_scheduler_manager/locale/synergy_scheduler_manager.pot + +[extract_messages] +keywords = _ gettext ngettext l_ lazy_gettext +mapping_file = babel.cfg +output_file = synergy_scheduler_manager/locale/synergy_scheduler_manager.pot diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..98b93eb --- /dev/null +++ b/setup.py @@ -0,0 +1,27 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT +import setuptools + +# In python < 2.7.4, a lazy loading of package `pbr` will break +# setuptools if some other modules registered functions in `atexit`. +# solution from: http://bugs.python.org/issue15881#msg170215 +try: + import multiprocessing # noqa +except ImportError: + pass + +setuptools.setup( + setup_requires=['pbr'], + pbr=True) diff --git a/synergy_scheduler_manager/__init__.py b/synergy_scheduler_manager/__init__.py new file mode 100644 index 0000000..b6223ca --- /dev/null +++ b/synergy_scheduler_manager/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import pbr.version + + +__version__ = pbr.version.VersionInfo( + 'synergy_scheduler_manager').version_string() diff --git a/synergy_scheduler_manager/fairshare_manager.py b/synergy_scheduler_manager/fairshare_manager.py new file mode 100644 index 0000000..4bf1c73 --- /dev/null +++ b/synergy_scheduler_manager/fairshare_manager.py @@ -0,0 +1,376 @@ +import logging +import threading + +from datetime import datetime +from datetime import timedelta + +try: + from oslo_config import cfg +except ImportError: + from oslo.config import cfg + +from synergy.common.manager import Manager + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class FairShareManager(Manager): + + def __init__(self): + Manager.__init__(self, name="FairShareManager") + + self.config_opts = [ + cfg.IntOpt('periods', default=3), + cfg.IntOpt('period_length', default=7), + cfg.FloatOpt('default_share', default=10.0), + cfg.FloatOpt('decay_weight', default=0.5, help="the decay weight"), + cfg.IntOpt('age_weight', default=1000, help="the age weight"), + cfg.IntOpt('vcpus_weight', default=10000, help="the vcpus weight"), + cfg.IntOpt('memory_weight', default=7000, help="the memory weight") + ] + + def setup(self): + if self.getManager("NovaManager") is None: + raise Exception("NovaManager not found!") + + if self.getManager("QueueManager") is None: + raise Exception("QueueManager not found!") + + if self.getManager("QuotaManager") is None: + raise Exception("QuotaManager not found!") + + if self.getManager("KeystoneManager") is None: + raise Exception("KeystoneManager not found!") + + self.periods = CONF.FairShareManager.periods + self.period_length = CONF.FairShareManager.period_length + self.default_share = float(CONF.FairShareManager.default_share) + self.decay_weight = CONF.FairShareManager.decay_weight + self.vcpus_weight = CONF.FairShareManager.vcpus_weight + self.age_weight = CONF.FairShareManager.age_weight + self.memory_weight = CONF.FairShareManager.memory_weight + self.projects = {} + self.workers = [] + self.exit = False + self.nova_manager = self.getManager("NovaManager") + self.queue_manager = self.getManager("QueueManager") + self.quota_manager = self.getManager("QuotaManager") + self.keystone_manager = self.getManager("KeystoneManager") + self.condition = threading.Condition() + + def execute(self, command, *args, **kargs): + if command == "ADD_PROJECT": + return self.addProject(*args, **kargs) + elif command == "GET_PROJECT": + return self.getProject(*args, **kargs) + elif command == "GET_PROJECTS": + return self.getProjects(*args, **kargs) + elif command == "REMOVE_PROJECT": + return self.removeProject(*args, **kargs) + elif command == "GET_PRIORITY": + result = {} + for prj_id, project in self.projects.items(): + users = {} + + for user_id, user in project["users"].items(): + p = self.calculatePriority(user_id=user_id, prj_id=prj_id) + users[user["name"]] = p + + result[project["name"]] = users + return result + elif command == "CALCULATE_PRIORITY": + return self.calculatePriority(*args, **kargs) + else: + raise Exception("command=%r not supported!" % command) + + def task(self): + with self.condition: + try: + self.calculateFairShare() + except Exception as ex: + LOG.error(ex) + raise ex + finally: + self.condition.notifyAll() + + def destroy(self): + pass + + def calculatePriority(self, user_id, prj_id, timestamp=None, retry=0): + if prj_id not in self.projects: + raise Exception("project=%s not found!" % prj_id) + + if user_id not in self.projects[prj_id]["users"]: + raise Exception("user=%s not found!" % user_id) + + fair_share_cores = 0 + fair_share_ram = 0 + + with self.condition: + user = self.projects[prj_id]["users"].get(user_id) + fair_share_cores = user["fairshare_cores"] + fair_share_ram = user["fairshare_ram"] + + self.condition.notifyAll() + + if not timestamp: + timestamp = datetime.utcnow() + + now = datetime.utcnow() + + diff = (now - timestamp) + minutes = diff.seconds / 60 + priority = (float(self.age_weight) * minutes + + float(self.vcpus_weight) * fair_share_cores + + float(self.memory_weight) * fair_share_ram - + float(self.age_weight) * retry) + + return int(priority) + + def addProject(self, prj_id, prj_name, share=float(0)): + if prj_id not in self.projects: + if share == 0: + share = self.default_share + + with self.condition: + self.projects[prj_id] = {"id": prj_id, + "name": prj_name, + "type": "dynamic", + "users": {}, + "usage": {}, + "share": share} + self.condition.notifyAll() + + def getProject(self, prj_id): + if prj_id not in self.projects: + raise Exception("project name=%r not found!" % prj_id) + + return self.projects.get(prj_id) + + def getProjects(self): + return self.projects + + def removeProject(self, prj_id): + if prj_id in self.projects: + with self.condition: + del self.projects[prj_id] + self.condition.notifyAll() + + def calculateFairShare(self): + total_prj_share = float(0) + total_usage_ram = float(0) + total_usage_cores = float(0) + total_actual_usage_cores = float(0) + total_actual_usage_ram = float(0) + + users = self.keystone_manager.execute("GET_USERS") + + if not users: + LOG.error("cannot retrieve the users list from KeystoneManager") + return + + for user in users: + user_id = str(user["id"]) + user_name = str(user["name"]) + user_projects = self.keystone_manager.execute("GET_USER_PROJECTS", + id=user_id) + + for project in user_projects: + prj_id = str(project["id"]) + + if prj_id not in self.projects: + continue + + p_users = self.projects[prj_id]["users"] + + if user_id not in p_users: + p_users[user_id] = {"name": user_name, + "share": self.default_share, + "usage": {"ram": float(0), + "cores": float(0)}} + else: + p_users[user_id]["usage"]["ram"] = float(0) + p_users[user_id]["usage"]["cores"] = float(0) + + to_date = datetime.utcnow() + + for x in xrange(self.periods): + default_share = self.default_share + decay = self.decay_weight ** x + from_date = to_date - timedelta(days=(self.period_length)) + + usages = self.nova_manager.execute("GET_RESOURCE_USAGE", + prj_ids=self.projects.keys(), + from_date=from_date, + to_date=to_date) + + for prj_id, users in usages.items(): + project = self.projects[prj_id] + + for user_id, usage_record in users.items(): + if user_id not in project["users"]: + project["users"][user_id] = {"name": user_name, + "share": default_share, + "usage": {}} + + user_usage = project["users"][user_id]["usage"] + user_usage["ram"] += decay * usage_record["ram"] + user_usage["cores"] += decay * usage_record["cores"] + + total_usage_ram += user_usage["ram"] + total_usage_cores += user_usage["cores"] + + to_date = from_date + + for project in self.projects.values(): + if "share" not in project or project["share"] == 0: + project["share"] = self.default_share + + # check the share for each user and update the usage_record + users = project["users"] + prj_id = project["id"] + # prj_name = project["name"] + prj_share = project["share"] + sibling_share = float(0) + + for user_id, user in users.items(): + if "share" not in user or user["share"] == 0: + user["share"] = self.default_share + + if len(users) == 1: + user["share"] = prj_share + sibling_share = prj_share + else: + sibling_share += user["share"] + + project["sibling_share"] = sibling_share + total_prj_share += prj_share + + for prj_id, project in self.projects.items(): + sibling_share = project["sibling_share"] + prj_share = project["share"] + actual_usage_cores = float(0) + actual_usage_ram = float(0) + + users = project["users"] + + for user_id, user in users.items(): + # for each user the normalized share + # is calculated (0 <= user_norm_share <= 1) + user_share = user["share"] + user_usage = user["usage"] + user_usage["norm_ram"] = user_usage["ram"] + user_usage["norm_cores"] = user_usage["cores"] + + if prj_share > 0 and sibling_share > 0 and total_prj_share > 0: + user["norm_share"] = (user_share / sibling_share) * \ + (prj_share / total_prj_share) + else: + user["norm_share"] = user_share + + if total_usage_ram > 0: + user_usage["norm_ram"] /= total_usage_ram + + if total_usage_cores > 0: + user_usage["norm_cores"] /= total_usage_cores + + actual_usage_ram += user_usage["norm_ram"] + actual_usage_cores += user_usage["norm_cores"] + + project["usage"]["actual_ram"] = actual_usage_ram + project["usage"]["actual_cores"] = actual_usage_cores + + total_actual_usage_ram += actual_usage_ram + total_actual_usage_cores += actual_usage_cores + + for project in self.projects.values(): + actual_usage_ram = project["usage"]["actual_ram"] + actual_usage_cores = project["usage"]["actual_cores"] + prj_share = project["share"] + sibling_share = project["sibling_share"] + users = project["users"] + + # effect_prj_cores_usage = actual_usage_cores + + # ((total_actual_usage_cores - actual_usage_cores) * + # prj_share / total_prj_share) + + # effect_prj_cores_usage = actual_usage_ram + + # ((total_actual_usage_ram - actual_usage_ram) * + # prj_share / total_prj_share) + + effect_prj_ram_usage = actual_usage_ram + effect_prj_cores_usage = actual_usage_cores + + project["usage"]["effective_ram"] = effect_prj_ram_usage + project["usage"]["effective_cores"] = effect_prj_cores_usage + + for user in users.values(): + user["fairshare_ram"] = float(0) + user["fairshare_cores"] = float(0) + user_share = user["share"] + user_usage = user["usage"] + user_usage["effective_cores"] = float(0) + user_usage["effective_ram"] = float(0) + + if user_share > 0: + norm_share = user["norm_share"] + norm_usage_ram = user_usage["norm_ram"] + norm_usage_cores = user_usage["norm_cores"] + + effect_usage_ram = (norm_usage_ram + ( + (effect_prj_cores_usage - + norm_usage_ram) * + user_share / sibling_share)) + + effect_usage_cores = (norm_usage_cores + ( + (effect_prj_cores_usage - + norm_usage_cores) * + user_share / sibling_share)) + + user_usage["effective_ram"] = effect_usage_ram + user_usage["effective_rel_ram"] = float(0) + + user_usage["effective_cores"] = effect_usage_cores + user_usage["effective_rel_cores"] = float(0) + + if actual_usage_cores > 0: + user_usage["effective_rel_cores"] = norm_usage_cores + user_usage["effective_rel_cores"] /= actual_usage_cores + + if actual_usage_ram > 0: + user_usage["effect_rel_ram"] = norm_usage_ram + user_usage["effect_rel_ram"] /= actual_usage_ram + + # user["effect_usage_rel_cores"] = effect_usage_cores / + # effect_prj_cores_usage + # user["effect_usage_rel_ram"] = effect_usage_ram / + # effect_prj_cores_usage + + if norm_share > 0: + f_ram = 2 ** (-effect_usage_ram / norm_share) + user["fairshare_ram"] = f_ram + + f_cores = 2 ** (-effect_usage_cores / norm_share) + user["fairshare_cores"] = f_cores + + LOG.debug("fairshare project %s" % project) diff --git a/synergy_scheduler_manager/keystone_manager.py b/synergy_scheduler_manager/keystone_manager.py new file mode 100644 index 0000000..309d486 --- /dev/null +++ b/synergy_scheduler_manager/keystone_manager.py @@ -0,0 +1,672 @@ +import json +import logging +import os.path +import requests + +from datetime import datetime + +try: + from oslo_config import cfg +except ImportError: + from oslo.config import cfg + +from synergy.common.manager import Manager + + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class Trust(object): + + def __init__(self, data): + data = data["trust"] + + self.id = data["id"] + self.impersonations = data["impersonation"] + self.roles_links = data["roles_links"] + self.trustor_user_id = data["trustor_user_id"] + self.trustee_user_id = data["trustee_user_id"] + self.links = data["links"] + self.roles = data["roles"] + self.remaining_uses = data["remaining_uses"] + self.expires_at = None + + if data["expires_at"] is not None: + self.expires_at = datetime.strptime(data["expires_at"], + "%Y-%m-%dT%H:%M:%S.%fZ") + self.project_id = data["project_id"] + + def getId(self): + return self.id + + def isImpersonations(self): + return self.impersonations + + def getRolesLinks(self): + return self.roles_links + + def getTrustorUserId(self): + return self.trustor_user_id + + def getTrusteeUserId(self): + return self.trustee_user_id + + def getlinks(self): + return self.links + + def getProjectId(self): + return self.project_id + + def getRoles(self): + return self.roles + + def getRemainingUses(self): + return self.remaining_uses + + def getExpiration(self): + return self.expires_at + + def isExpired(self): + if self.getExpiration() is None: + return False + + return self.getExpiration() < datetime.utcnow() + + +class Token(object): + + def __init__(self, token, data): + self.id = token + + data = data["token"] + self.roles = data["roles"] + self.catalog = data["catalog"] + self.issued_at = datetime.strptime(data["issued_at"], + "%Y-%m-%dT%H:%M:%S.%fZ") + self.expires_at = datetime.strptime(data["expires_at"], + "%Y-%m-%dT%H:%M:%S.%fZ") + self.project = data["project"] + self.user = data["user"] + self.extras = data["extras"] + + def getCatalog(self, service_name=None, interface="public"): + if service_name: + for service in self.catalog: + if service["name"] == service_name: + for endpoint in service["endpoints"]: + if endpoint["interface"] == interface: + return endpoint + return None + else: + return self.catalog + + def getExpiration(self): + return self.expires_at + + def getId(self): + return self.id + + def getExtras(self): + return self.extras + + def getProject(self): + return self.project + + def getRoles(self): + return self.roles + + def getUser(self): + return self.user + + def isAdmin(self): + if not self.roles: + return False + + for role in self.roles: + if role["name"] == "admin": + return True + + return False + + def issuedAt(self): + return self.issued_at + + def isExpired(self): + return self.getExpiration() < datetime.utcnow() + + def save(self, filename): + # save to file + with open(filename, 'w') as f: + token = {} + token["catalog"] = self.catalog + token["extras"] = self.extras + token["user"] = self.user + token["project"] = self.project + token["roles"] = self.roles + token["roles"] = self.roles + token["issued_at"] = self.issued_at.isoformat() + token["expires_at"] = self.expires_at.isoformat() + + data = {"id": self.id, "token": token} + + json.dump(data, f) + + @classmethod + def load(cls, filename): + if not os.path.isfile(".auth_token"): + return None + + # load from file: + with open(filename, 'r') as f: + try: + data = json.load(f) + return Token(data["id"], data) + # if the file is empty the ValueError will be thrown + except ValueError as ex: + raise ex + + def isotime(self, at=None, subsecond=False): + """Stringify time in ISO 8601 format.""" + if not at: + at = datetime.utcnow() + + if not subsecond: + st = at.strftime('%Y-%m-%dT%H:%M:%S') + else: + st = at.strftime('%Y-%m-%dT%H:%M:%S.%f') + + if at.tzinfo: + tz = at.tzinfo.tzname(None) + else: + tz = 'UTC' + + st += ('Z' if tz == 'UTC' else tz) + return st + + """The trustor or grantor of a trust is the person who creates the trust. + The trustor is the one who contributes property to the trust. + The trustee is the person who manages the trust, and is usually appointed + by the trustor. The trustor is also often the trustee in living trusts. + """ + def trust(self, trustee_user, expires_at=None, + project_id=None, roles=None, impersonation=True): + if self.isExpired(): + raise Exception("token expired!") + + headers = {"Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "python-novaclient", + "X-Auth-Token": self.getId()} + + if roles is None: + roles = self.getRoles() + + if project_id is None: + project_id = self.getProject().get("id") + + data = {} + data["trust"] = {"impersonation": impersonation, + "project_id": project_id, + "roles": roles, + "trustee_user_id": trustee_user, + "trustor_user_id": self.getUser().get("id")} + + if expires_at is not None: + data["trust"]["expires_at"] = self.isotime(expires_at, True) + + endpoint = self.getCatalog(service_name="keystone") + + if not endpoint: + raise Exception("keystone endpoint not found!") + + if "v2.0" in endpoint["url"]: + endpoint["url"] = endpoint["url"].replace("v2.0", "v3") + + response = requests.post(url=endpoint["url"] + "/OS-TRUST/trusts", + headers=headers, + data=json.dumps(data)) + + if response.status_code != requests.codes.ok: + response.raise_for_status() + + if not response.text: + raise Exception("trust token failed!") + + return Trust(response.json()) + + +class KeystoneManager(Manager): + + def __init__(self): + Manager.__init__(self, name="KeystoneManager") + + self.config_opts = [ + cfg.StrOpt("auth_url", + help="the Keystone url (v3 only)", + required=True), + cfg.StrOpt("username", + help="the name of user with admin role", + required=True), + cfg.StrOpt("password", + help="the password of user with admin role", + required=True), + cfg.StrOpt("project_name", + help="the project to request authorization on", + required=True), + cfg.StrOpt("project_id", + help="the project id to request authorization on", + required=False), + cfg.IntOpt("timeout", + help="set the http connection timeout", + default=60, + required=False), + cfg.IntOpt("trust_expiration", + help="set the trust expiration", + default=24, + required=False) + ] + + def setup(self): + self.auth_url = CONF.KeystoneManager.auth_url + self.username = CONF.KeystoneManager.username + self.password = CONF.KeystoneManager.password + self.project_name = CONF.KeystoneManager.project_name + self.project_id = CONF.KeystoneManager.project_id + self.timeout = CONF.KeystoneManager.timeout + self.trust_expiration = CONF.KeystoneManager.trust_expiration + self.token = None + + self.authenticate() + + def task(self): + pass + + def destroy(self): + pass + + def execute(self, command, *args, **kargs): + if command == "GET_USERS": + return self.getUsers() + elif command == "GET_USER": + return self.getProject(*args, **kargs) + elif command == "GET_USER_PROJECTS": + return self.getUserProjects(*args, **kargs) + elif command == "GET_USER_ROLES": + return self.getUserRoles(*args, **kargs) + elif command == "GET_PROJECTS": + return self.getProjects() + elif command == "GET_PROJECT": + return self.getProject(*args, **kargs) + elif command == "GET_ROLES": + return self.getRoles() + elif command == "GET_ROLE": + return self.getRole(*args, **kargs) + elif command == "GET_TOKEN": + return self.getToken() + elif command == "DELETE_TOKEN": + return self.deleteToken(*args, **kargs) + elif command == "VALIDATE_TOKEN": + return self.validateToken(*args, **kargs) + elif command == "GET_ENDPOINTS": + return self.getEndpoints() + elif command == "GET_ENDPOINT": + return self.getEndpoints(*args, **kargs) + elif command == "GET_SERVICES": + return self.getServices() + elif command == "GET_SERVICE": + return self.getService(*args, **kargs) + elif command == "TRUST": + return self.trust(*args, **kargs) + else: + return None + + def doOnEvent(self, event_type, *args, **kargs): + if event_type == "GET_PROJECTS": + kargs["result"].extend(self.getProjects()) + + def authenticate(self): + if self.token is not None: + if self.token.isExpired(): + try: + self.deleteToken(self.token.getId()) + except requests.exceptions.HTTPError: + pass + else: + return + + headers = {"Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "python-novaclient"} + + identity = {"methods": ["password"], + "password": {"user": {"name": self.username, + "domain": {"id": "default"}, + "password": self.password}}} + + data = {"auth": {}} + data["auth"]["identity"] = identity + + if self.project_name: + data["auth"]["scope"] = {"project": {"name": self.project_name, + "domain": {"id": "default"}}} + + if self.project_id: + data["auth"]["scope"] = {"project": {"id": self.project_id, + "domain": {"id": "default"}}} + + response = requests.post(url=self.auth_url + "/auth/tokens", + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + + if response.status_code != requests.codes.ok: + response.raise_for_status() + + if not response.text: + raise Exception("authentication failed!") + + # print(response.__dict__) + + token_subject = response.headers["X-Subject-Token"] + token_data = response.json() + + self.token = Token(token_subject, token_data) + + def getUser(self, id): + try: + response = self.getResource("users/%s" % id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the user info (id=%r): %s" + % (id, response["error"]["message"])) + + if response: + response = response["user"] + + return response + + def getUsers(self): + try: + response = self.getResource("users", "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the users list: %s" + % response["error"]["message"]) + + if response: + response = response["users"] + + return response + + def getUserProjects(self, id): + try: + response = self.getResource("users/%s/projects" % id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the users's projects " + "(id=%r): %s" % (id, response["error"]["message"])) + + if response: + response = response["projects"] + + return response + + def getUserRoles(self, user_id, project_id): + try: + response = self.getResource("/projects/%s/users/%s/roles" + % (project_id, user_id), "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the user's roles (usrId=%r, " + "prjId=%r): %s" % (user_id, + project_id, + response["error"]["message"])) + + if response: + response = response["roles"] + + return response + + def getProject(self, id): + try: + response = self.getResource("/projects/%s" % id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the project (id=%r, " + % (id, response["error"]["message"])) + + if response: + response = response["project"] + + return response + + def getProjects(self): + try: + response = self.getResource("/projects", "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the projects list: %s" + % response["error"]["message"]) + + if response: + response = response["projects"] + + return response + + def getRole(self, id): + try: + response = self.getResource("/roles/%s" % id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the role info (id=%r): %s" + % (id, response["error"]["message"])) + + if response: + response = response["role"] + + return response + + def getRoles(self): + try: + response = self.getResource("/roles", "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the roles list: %s" + % response["error"]["message"]) + + if response: + response = response["roles"] + + return response + + def getToken(self): + self.authenticate() + return self.token + + def deleteToken(self, id): + if self.token is None: + return + + headers = {"Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "python-novaclient", + "X-Auth-Project-Id": self.token.getProject()["name"], + "X-Auth-Token": self.token.getId(), + "X-Subject-Token": id} + + response = requests.delete(url=self.auth_url + "/auth/tokens", + headers=headers, + timeout=self.timeout) + + self.token = None + + if response.status_code != requests.codes.ok: + response.raise_for_status() + + def validateToken(self, id): + self.authenticate() + + headers = {"Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "python-novaclient", + "X-Auth-Project-Id": self.token.getProject()["name"], + "X-Auth-Token": self.token.getId(), + "X-Subject-Token": id} + + response = requests.get(url=self.auth_url + "/auth/tokens", + headers=headers, + timeout=self.timeout) + + if response.status_code != requests.codes.ok: + response.raise_for_status() + + if not response.text: + raise Exception("token not found!") + + token_subject = response.headers["X-Subject-Token"] + token_data = response.json() + + return Token(token_subject, token_data) + + def getEndpoint(self, id=None, service_id=None): + if id: + try: + response = self.getResource("/endpoints/%s" % id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the endpoint (id=%r): %s" + % (id, response["error"]["message"])) + if response: + response = response["endpoint"] + + return response + elif service_id: + try: + endpoints = self.getEndpoints() + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the endpoints list" + "(serviceId=%r): %s" + % response["error"]["message"]) + + if endpoints: + for endpoint in endpoints: + if endpoint["service_id"] == service_id: + return endpoint + + return None + + def getEndpoints(self): + try: + response = self.getResource("/endpoints", "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the endpoints list: %s" + % response["error"]["message"]) + + if response: + response = response["endpoints"] + + return response + + def getService(self, id=None, name=None): + if id: + try: + response = self.getResource("/services/%s" % id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the service info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response: + response = response["service"] + return response + elif name: + services = self.getServices() + + if services: + for service in services: + if service["name"] == name: + return service + + return None + + def getServices(self): + try: + response = self.getResource("/services", "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the services list: %s" + % response["error"]["message"]) + + if response: + response = response["services"] + + return response + + def getResource(self, resource, method, data=None): + self.authenticate() + + url = self.auth_url + "/" + resource + + headers = {"Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "python-novaclient", + "X-Auth-Project-Id": self.token.getProject()["name"], + "X-Auth-Token": self.token.getId()} + + if method == "GET": + response = requests.get(url, + headers=headers, + params=data, + timeout=self.timeout) + elif method == "POST": + response = requests.post(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + elif method == "PUT": + response = requests.put(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + elif method == "HEAD": + response = requests.head(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + elif method == "DELETE": + response = requests.delete(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + else: + raise Exception("wrong HTTP method: %s" % method) + + if response.status_code != requests.codes.ok: + response.raise_for_status() + + if response.text: + return response.json() + else: + return None diff --git a/synergy_scheduler_manager/nova_manager.py b/synergy_scheduler_manager/nova_manager.py new file mode 100644 index 0000000..1c15565 --- /dev/null +++ b/synergy_scheduler_manager/nova_manager.py @@ -0,0 +1,1064 @@ +import ConfigParser +import eventlet +import json +import logging +import os.path +import requests + +from nova.baserpc import BaseAPI +from nova.compute.rpcapi import ComputeAPI +from nova.conductor.rpcapi import ComputeTaskAPI +from nova.conductor.rpcapi import ConductorAPI +from nova.objects import base as objects_base + +try: + from oslo_config import cfg +except ImportError: + from oslo.config import cfg + +try: + import oslo_messaging as oslo_msg +except ImportError: + import oslo.messaging as oslo_msg + +try: + from oslo_serialization import jsonutils +except ImportError: + from oslo.serialization import jsonutils + +try: + from oslo_versionedobjects import base as ovo_base +except ImportError: + from oslo.versionedobjects import base as ovo_base + +from sqlalchemy import create_engine +from sqlalchemy.exc import SQLAlchemyError + +from synergy.common.manager import Manager + + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF +CONFIG = ConfigParser.SafeConfigParser() + + +class MessagingAPI(object): + + def __init__(self, transport_url): + LOG.debug("setting up the AMQP transport url: %s" % transport_url) + oslo_msg.set_transport_defaults(control_exchange="nova") + self.TRANSPORT = oslo_msg.get_transport(CONF, url=transport_url) + + def getTarget(self, topic, exchange=None, namespace=None, + version=None, server=None): + return oslo_msg.Target(topic=topic, + exchange=exchange, + namespace=namespace, + version=version, + server=server) + + def getRPCClient(self, target, version_cap=None, serializer=None): + assert self.TRANSPORT is not None + + LOG.info("creating RPC client with target %s" % target) + return oslo_msg.RPCClient(self.TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer) + + def getRPCServer(self, target, endpoints, serializer=None): + assert self.TRANSPORT is not None + + LOG.info("creating RPC server with target %s" % target) + return oslo_msg.get_rpc_server(self.TRANSPORT, + target, + endpoints, + executor="eventlet", + serializer=serializer) + + def getNotificationListener(self, targets, endpoints): + assert self.TRANSPORT is not None + + LOG.info("creating notification listener with target %s endpoints %s" + % (targets, endpoints)) + return oslo_msg.get_notification_listener(self.TRANSPORT, + targets, + endpoints, + allow_requeue=True, + executor="eventlet") + + +class NovaBaseRPCAPI(BaseAPI): + + def __init__(self, topic, msg): + self.target = msg.getTarget(topic=None, + namespace="baseapi", + version="1.1") + + target_synergy = msg.getTarget(topic=topic + "_synergy", + namespace="baseapi", + version="1.0") + + LOG.info(target_synergy) + self.client = msg.getRPCClient(target=target_synergy, version_cap=None) + + def ping(self, context, arg, timeout=None): + try: + cctxt = self.client.prepare(timeout=timeout) + + return cctxt.call(context, 'ping', arg=arg) + except Exception as ex: + LOG.error("NovaBaseRPCAPI ping! %s" % (ex)) + raise ex + + def get_backdoor_port(self, context, host): + cctxt = self.client.prepare(server=host, version='1.1') + return cctxt.call(context, 'get_backdoor_port') + + +class NovaComputeAPI(ComputeAPI): + + def __init__(self, topic, msg): + self.target = msg.getTarget(topic=topic, version="4.0") + self.client = msg.getRPCClient(target=msg.getTarget(topic=topic)) + + +class NovaConductorAPI(ConductorAPI): + + def __init__(self, topic, msg): + report_interval = cfg.IntOpt("report_interval", + default=10, + help="Seconds between nodes reporting " + "state to datastore") + CONF.register_opt(report_interval) + + self.target = msg.getTarget(topic=topic, version="3.0") + + self.client = msg.getRPCClient( + target=msg.getTarget(topic=topic + "_synergy", version="3.0"), + version_cap=None) + + def provider_fw_rule_get_all(self, context): + cctxt = self.client.prepare() + return cctxt.call(context, 'provider_fw_rule_get_all') + + # TODO(hanlind): This method can be removed once oslo.versionedobjects + # has been converted to use version_manifests in remotable_classmethod + # operations, which will use the new class action handler. + def object_class_action(self, context, objname, objmethod, objver, + args, kwargs): + versions = ovo_base.obj_tree_get_versions(objname) + return self.object_class_action_versions(context, + objname, + objmethod, + versions, + args, kwargs) + + def object_class_action_versions(self, context, objname, objmethod, + object_versions, args, kwargs): + cctxt = self.client.prepare() + return cctxt.call(context, 'object_class_action_versions', + objname=objname, objmethod=objmethod, + object_versions=object_versions, + args=args, kwargs=kwargs) + + def object_action(self, context, objinst, objmethod, args, kwargs): + cctxt = self.client.prepare() + return cctxt.call(context, 'object_action', objinst=objinst, + objmethod=objmethod, args=args, kwargs=kwargs) + + def object_backport_versions(self, context, objinst, object_versions): + cctxt = self.client.prepare() + return cctxt.call(context, 'object_backport_versions', objinst=objinst, + object_versions=object_versions) + + +class NovaConductorComputeAPI(ComputeTaskAPI): + + def __init__(self, topic, scheduler_manager, keystone_manager, msg): + self.topic = topic + self.scheduler_manager = scheduler_manager + self.keystone_manager = keystone_manager + self.messagingAPI = msg + serializer = objects_base.NovaObjectSerializer() + + self.target = self.messagingAPI.getTarget(topic=topic, + namespace="compute_task", + version="1.11") + + self.client = self.messagingAPI.getRPCClient( + target=oslo_msg.Target(topic=topic + "_synergy", + namespace="compute_task", + version="1.10"), + serializer=serializer) + + """ nova-conductor rpc operations """ + def build_instances(self, context, instances, image, filter_properties, + admin_password, injected_files, requested_networks, + security_groups, block_device_mapping=None, + legacy_bdm=True): + # token = self.keystone_manager.validateToken(context["auth_token"]) + + for instance in instances: + request = {'instance': instance, + 'image': image, + 'filter_properties': filter_properties, + 'admin_password': admin_password, + 'injected_files': injected_files, + 'requested_networks': requested_networks, + 'security_groups': security_groups, + 'block_device_mapping': block_device_mapping, + 'legacy_bdm': legacy_bdm, + 'context': context} + + self.scheduler_manager.execute("PROCESS_REQUEST", request) + + def build_instance(self, context, instance, image, filter_properties, + admin_password, injected_files, requested_networks, + security_groups, block_device_mapping=None, + legacy_bdm=True): + try: + # LOG.info(">>> filter_properties %s" % filter_properties) + # LOG.info(">>> context %s" % context) + + version = '1.10' + if not self.client.can_send_version(version): + version = '1.9' + LOG.info(filter_properties) + if 'instance_type' in filter_properties: + flavor = filter_properties['instance_type'] + # ################################# + # ####### objects_base WRONG!!! ### + flavor_p = objects_base.obj_to_primitive(flavor) + filter_properties = dict(filter_properties, + instance_type=flavor_p) + kw = {'instances': [instance], + 'image': image, + 'filter_properties': filter_properties, + 'admin_password': admin_password, + 'injected_files': injected_files, + 'requested_networks': requested_networks, + 'security_groups': security_groups} + + if not self.client.can_send_version(version): + version = '1.8' + kw['requested_networks'] = kw['requested_networks'].as_tuples() + if not self.client.can_send_version('1.7'): + version = '1.5' + bdm_p = objects_base.obj_to_primitive(block_device_mapping) + kw.update({'block_device_mapping': bdm_p, + 'legacy_bdm': legacy_bdm}) + + cctxt = self.client.prepare(version_cap=version) + cctxt.cast(context, 'build_instances', **kw) + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error(ex) + raise ex + + def migrate_server(self, context, instance, scheduler_hint, live, rebuild, + flavor, block_migration, disk_over_commit, + reservations=None, clean_shutdown=True, + request_spec=None): + kw = {'instance': instance, 'scheduler_hint': scheduler_hint, + 'live': live, 'rebuild': rebuild, 'flavor': flavor, + 'block_migration': block_migration, + 'disk_over_commit': disk_over_commit, + 'reservations': reservations, + 'clean_shutdown': clean_shutdown, + 'request_spec': request_spec, + } + + version = '1.13' + if not self.client.can_send_version(version): + del kw['request_spec'] + version = '1.11' + if not self.client.can_send_version(version): + del kw['clean_shutdown'] + version = '1.10' + if not self.client.can_send_version(version): + kw['flavor'] = objects_base.obj_to_primitive(flavor) + version = '1.6' + if not self.client.can_send_version(version): + kw['instance'] = jsonutils.to_primitive( + objects_base.obj_to_primitive(instance)) + version = '1.4' + + cctxt = self.client.prepare(version=version) + return cctxt.call(context, 'migrate_server', **kw) + + def unshelve_instance(self, context, instance): + cctxt = self.client.prepare(version='1.3') + cctxt.cast(context, 'unshelve_instance', instance=instance) + + def rebuild_instance(self, context, instance, orig_image_ref, image_ref, + injected_files, new_pass, orig_sys_metadata, + bdms, recreate, on_shared_storage, + preserve_ephemeral=False, host=None): + cctxt = self.client.prepare(version='1.8') + cctxt.cast(context, + 'rebuild_instance', + instance=instance, new_pass=new_pass, + injected_files=injected_files, image_ref=image_ref, + orig_image_ref=orig_image_ref, + orig_sys_metadata=orig_sys_metadata, bdms=bdms, + recreate=recreate, on_shared_storage=on_shared_storage, + preserve_ephemeral=preserve_ephemeral, + host=host) + + +class NovaManager(Manager): + + def __init__(self): + Manager.__init__(self, name="NovaManager") + + self.config_opts = [ + cfg.StrOpt("nova_conf", + help="the nova.conf path", + default=None, + required=False), + cfg.StrOpt("amqp_backend", + help="the amqp backend tpye (e.g. rabbit, qpid)", + default=None, + required=False), + cfg.StrOpt("amqp_host", + help="the amqp host name", + default=None, + required=False), + cfg.IntOpt("amqp_port", + help="the amqp listening port", + default=None, + required=False), + cfg.StrOpt("amqp_user", + help="the amqp user", + default=None, + required=False), + cfg.StrOpt("amqp_password", + help="the amqp password", + default=None, + required=False), + cfg.StrOpt("amqp_virt_host", + help="the amqp virtual host", + default=None, + required=False), + cfg.StrOpt("conductor_topic", + help="the conductor topic", + default=None, + required=False), + cfg.StrOpt("compute_topic", + help="the compute topic", + default=None, + required=False), + cfg.StrOpt("scheduler_topic", + help="the scheduler topic", + default=None, + required=False), + cfg.StrOpt("db_connection", + help="the NOVA database connection", + default=None, + required=False), + cfg.StrOpt("host", + help="the host name", + default=None, + required=False), + cfg.IntOpt("timeout", + help="set the http connection timeout", + default=60, + required=False) + ] + + def setup(self): + eventlet.monkey_patch(os=False) + + self.timeout = CONF.NovaManager.timeout + + if self.getManager("KeystoneManager") is None: + raise Exception("KeystoneManager not found!") + + if self.getManager("SchedulerManager") is None: + raise Exception("SchedulerManager not found!") + + self.keystone_manager = self.getManager("KeystoneManager") + self.scheduler_manager = self.getManager("SchedulerManager") + + host = "localhost" + conductor_topic = "conductor" + compute_topic = "compute" + scheduler_topic = "scheduler" + db_connection = None + amqp_backend = None + amqp_host = None + amqp_port = None + amqp_user = None + amqp_password = None + amqp_virt_host = None + + if CONF.NovaManager.nova_conf is not None: + if os.path.isfile(CONF.NovaManager.nova_conf): + CONFIG.read(CONF.NovaManager.nova_conf) + else: + raise Exception("nova configuration file not found at %s!" + % CONF.NovaManager.nova_conf) + + host = self.getParameter("my_ip", + "DEFAULT", + default=host) + + conductor_topic = self.getParameter("conductor_topic", + "DEFAULT", + default=conductor_topic) + + compute_topic = self.getParameter("compute_topic", + "DEFAULT", + default=compute_topic) + + scheduler_topic = self.getParameter("scheduler_topic", + "DEFAULT", + default=scheduler_topic) + + db_connection = self.getParameter("connection", + "database", + fallback=True) + + amqp_backend = self.getParameter("rpc_backend", + "DEFAULT", + fallback=True) + + if amqp_backend == "rabbit": + amqp_host = self.getParameter("rabbit_host", + "oslo_messaging_rabbit", + "localhost", + fallback=False) + + amqp_port = self.getParameter("rabbit_port", + "oslo_messaging_rabbit", + "5672", + fallback=False) + + amqp_virt_host = self.getParameter("rabbit_virtual_host", + "oslo_messaging_rabbit", + "/", + fallback=False) + + amqp_user = self.getParameter("rabbit_userid", + "oslo_messaging_rabbit", + "guest", + fallback=False) + + amqp_password = self.getParameter("rabbit_password", + "oslo_messaging_rabbit", + fallback=True) + elif amqp_backend == "qpid": + amqp_host = self.getParameter("qpid_hostname", + "oslo_messaging_qpid", + "localhost", + fallback=False) + + amqp_port = self.getParameter("qpid_port", + "oslo_messaging_qpid", + "5672", + fallback=False) + + amqp_user = self.getParameter("qpid_username", + "oslo_messaging_qpid", + fallback=True) + + amqp_password = self.getParameter("qpid_password", + "oslo_messaging_qpid", + fallback=True) + else: + raise Exception("unsupported amqp backend found: %s!" + % amqp_backend) + else: + amqp_backend = CONF.NovaManager.amqp_backend + amqp_host = CONF.NovaManager.amqp_host + amqp_port = CONF.NovaManager.amqp_port + amqp_user = CONF.NovaManager.amqp_user + amqp_password = CONF.NovaManager.amqp_password + amqp_virt_host = CONF.NovaManager.amqp_virt_host + db_connection = CONF.NovaManager.db_connection + host = amqp_host + + amqp_backend = self.getParameter("amqp_backend", + "NovaManager", + fallback=True) + + amqp_host = self.getParameter("amqp_host", + "NovaManager", + fallback=True) + + amqp_host = self.getParameter("amqp_host", + "NovaManager", + default=5672) + + amqp_user = self.getParameter("amqp_user", + "NovaManager", + fallback=True) + + amqp_password = self.getParameter("amqp_password", + "NovaManager", + fallback=True) + + amqp_virt_host = self.getParameter("amqp_virt_host", + "NovaManager", + default="/") + + db_connection = self.getParameter("db_connection", + "NovaManager", + fallback=True) + + host = self.getParameter("host", + "NovaManager", + default="localhost") + + conductor_topic = self.getParameter("conductor_topic", + "NovaManager", + default=conductor_topic) + + compute_topic = self.getParameter("compute_topic", + "NovaManager", + default=compute_topic) + + scheduler_topic = self.getParameter("scheduler_topic", + "NovaManager", + default=scheduler_topic) + + try: + LOG.debug("setting up the NOVA database connection: %s" + % db_connection) + + self.db_engine = create_engine(db_connection) + + transport_url = "%s://%s:%s@%s:%s%s" % (amqp_backend, + amqp_user, + amqp_password, + amqp_host, + amqp_port, + amqp_virt_host) + + self.messagingAPI = MessagingAPI(transport_url) + + self.novaBaseRPCAPI = NovaBaseRPCAPI(conductor_topic, + self.messagingAPI) + + self.novaConductorAPI = NovaConductorAPI(conductor_topic, + self.messagingAPI) + + self.novaConductorComputeAPI = NovaConductorComputeAPI( + conductor_topic, + self.scheduler_manager, + self.keystone_manager, + self.messagingAPI) + + self.conductor_rpc = self.messagingAPI.getRPCServer( + target=self.messagingAPI.getTarget(topic=conductor_topic, + server=host), + endpoints=[self.novaBaseRPCAPI, + self.novaConductorAPI, + self.novaConductorComputeAPI]) + + self.conductor_rpc.start() + + self.novaComputeAPI = NovaComputeAPI(compute_topic, + self.messagingAPI) + + self.compute_rpc = self.messagingAPI.getRPCServer( + target=self.messagingAPI.getTarget(topic=compute_topic, + server=host), + endpoints=[self.novaComputeAPI]) + + # self.rpcserver.start() + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error("NovaManager initialization failed! %s" % (ex)) + raise ex + + def execute(self, command, *args, **kargs): + if command == "GET_PARAMETER": + return self.getParameter(*args, **kargs) + elif command == "GET_FLAVORS": + return self.getFlavors(*args, **kargs) + elif command == "GET_FLAVOR": + return self.getFlavor(*args, **kargs) + elif command == "GET_SERVERS": + return self.getServers(*args, **kargs) + elif command == "GET_SERVER": + return self.getServer(*args, **kargs) + elif command == "DELETE_SERVER": + return self.deleteServer(*args, **kargs) + elif command == "START_SERVER": + return self.startServer(*args, **kargs) + elif command == "STOP_SERVER": + return self.stopServer(*args, **kargs) + elif command == "BUILD_SERVER": + return self.buildServer(*args, **kargs) + elif command == "GET_HYPERVISORS": + return self.getHypervisors() + elif command == "GET_HYPERVISOR": + return self.getHypervisor(*args, **kargs) + elif command == "GET_QUOTA": + return self.getQuota(*args, **kargs) + elif command == "UPDATE_QUOTA": + return self.updateQuota(*args, **kargs) + elif command == "GET_TARGET": + return self.getTarget(*args, **kargs) + elif command == "GET_RCP_CLIENT": + return self.getRPCClient() + elif command == "GET_RCP_SERVER": + return self.getRPCServer(*args, **kargs) + elif command == "GET_NOTIFICATION_LISTENER": + return self.getNotificationListener(*args, **kargs) + elif command == "GET_RESOURCE_USAGE": + return self.getResourceUsage(*args, **kargs) + elif command == "GET_PROJECT_USAGE": + return self.getProjectUsage(*args, **kargs) + elif command == "GET_EXPIRED_SERVERS": + return self.getExpiredServers(*args, **kargs) + else: + raise Exception("command=%r not supported!" % command) + + def task(self): + pass + + def destroy(self): + pass + + def getParameter(self, name, section="DEFAULT", + default=None, fallback=False): + if section != "NovaManager": + try: + return CONFIG.get(section, name) + except Exception: + if fallback is True: + raise Exception("No attribute %r found in [%s] section of " + "nova.conf" % (name, section)) + else: + LOG.info("No attribute %r found in [%s] section of " + "nova.conf, using default: %r" + % (name, section, default)) + + return default + else: + result = CONF.NovaManager.get(name, None) + + if result is not None: + return result + + if fallback is True: + raise Exception("No attribute %r found in [NovaManager] " + "section of synergy.conf" % name) + else: + LOG.info("No attribute %r found in in [NovaManager] of synergy" + ".conf, using default: %r" % (name, default)) + return default + + def getFlavors(self): + url = "flavors/detail" + + try: + response_data = self.getResource(url, method="GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the flavors list: %s" + % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["flavors"] + + return response_data + + def getFlavor(self, id): + try: + response_data = self.getResource("flavors/" + id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the flavor info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["flavor"] + + return response_data + + def getServers(self, detail=False, status=None): + params = {} + if status: + params["status"] = status + + url = "servers" + + if detail: + url = "servers/detail" + + response_data = self.getResource(url, "GET", params) + + if response_data: + response_data = response_data["servers"] + + return response_data + + def getServer(self, id): + try: + response_data = self.getResource("servers/" + id, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the server info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["server"] + + return response_data + + def buildServer(self, context, instance, image, filter_properties, + admin_password, injected_files, requested_networks, + security_groups, block_device_mapping=None, + legacy_bdm=True): + self.novaConductorComputeAPI.build_instance( + context=context, + instance=instance, + image=image, + filter_properties=filter_properties, + admin_password=admin_password, + injected_files=injected_files, + requested_networks=requested_networks, + security_groups=security_groups, + block_device_mapping=block_device_mapping, + legacy_bdm=legacy_bdm) + + def deleteServer(self, id): + # data = { "forceDelete": None } + # url = "servers/%s/action" % id + url = "servers/%s" % id + + try: + # response_data = self.getResource(url, "POST", data) + response_data = self.getResource(url, "DELETE") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on deleting the server (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["server"] + + return response_data + + def startServer(self, id): + data = {"os-start": None} + url = "servers/%s/action" % id + + try: + response_data = self.getResource(url, "POST", data) + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on starting the server info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["server"] + + return response_data + + def stopServer(self, id): + data = {"os-stop": None} + url = "servers/%s/action" % id + + try: + response_data = self.getResource(url, "POST", data) + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on stopping the server info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["server"] + + return response_data + + def getHypervisors(self): + data = {"os-stop": None} + url = "os-hypervisors" + # /%s" % id + + try: + response_data = self.getResource(url, "GET", data) + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the hypervisors list (id=%r)" + ": %s" % response["error"]["message"]) + + if response_data: + response_data = response_data["hypervisors"] + + return response_data + + def getHypervisor(self, id): + data = {"os-stop": None} + url = "os-hypervisors/%s" % id + + try: + response_data = self.getResource(url, "GET", data) + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the hypervisor info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["hypervisor"] + + return response_data + + def getQuota(self, id=None, defaults=False): + if defaults: + try: + url = "os-quota-sets/defaults" + response_data = self.getResource(url, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the quota defaults" + ": %s" % response["error"]["message"]) + elif id is not None: + try: + url = "os-quota-sets/%s" % id + response_data = self.getResource(url, "GET") + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on retrieving the quota info (id=%r)" + ": %s" % (id, response["error"]["message"])) + else: + raise Exception("wrong arguments") + + if response_data: + response_data = response_data["quota_set"] + + return response_data + + def updateQuota(self, id, data): + url = "os-quota-sets/%s" % id + quota_set = {"quota_set": data} + + try: + response_data = self.getResource(url, "PUT", quota_set) + except requests.exceptions.HTTPError as ex: + response = ex.response.json() + raise Exception("error on updating the quota info (id=%r)" + ": %s" % (id, response["error"]["message"])) + + if response_data: + response_data = response_data["quota_set"] + + return response_data + + def getResource(self, resource, method, data=None): + self.keystone_manager.authenticate() + token = self.keystone_manager.getToken() + url = token.getCatalog("nova")["url"] + "/" + resource + + headers = {"Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "python-novaclient", + "X-Auth-Project-Id": token.getProject()["name"], + "X-Auth-Token": token.getId()} + + if method == "GET": + request = requests.get(url, headers=headers, + params=data, timeout=self.timeout) + elif method == "POST": + request = requests.post(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + elif method == "PUT": + request = requests.put(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + elif method == "HEAD": + request = requests.head(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + elif method == "DELETE": + request = requests.delete(url, + headers=headers, + data=json.dumps(data), + timeout=self.timeout) + else: + raise Exception("wrong HTTP method: %s" % method) + + if request.status_code != requests.codes.ok: + request.raise_for_status() + + if request.text: + return request.json() + else: + return None + + def getTarget(self, topic, exchange=None, namespace=None, + version=None, server=None): + return self.messagingAPI.getTarget(topic=topic, + namespace=namespace, + exchange=exchange, + version=version, + server=server) + + def getRPCClient(self, target, version_cap=None, serializer=None): + return self.messagingAPI.getRPCClient(target, + version_cap=version_cap, + serializer=serializer) + + def getRPCServer(self, target, endpoints, serializer=None): + return self.messagingAPI.getRPCServer(target, + endpoints, + serializer=serializer) + + def getNotificationListener(self, targets, endpoints): + return self.messagingAPI.getNotificationListener(targets, endpoints) + + def getResourceUsage(self, prj_ids, from_date, to_date): + # LOG.info("getUsage: fromDate=%s period_length=%s days" + # % (fromDate, period_length)) + # print("getUsage: fromDate=%s period_length=%s days" + # % (fromDate, period_length)) + + # period = str(period_length) + + resource_usage = {} + connection = self.db_engine.connect() + + try: + ids = "" + + if prj_ids is not None: + ids = " project_id IN (" + + for prj_id in prj_ids: + ids += "%r, " % str(prj_id) + + if "," in ids: + ids = ids[:-2] + + ids += ") and" + + QUERY = """select user_id, project_id, sum(TIMESTAMPDIFF(SECOND, \ +IF(launched_at<='%(from_date)s', '%(from_date)s', IFNULL(launched_at, \ +'%(from_date)s')), IF(terminated_at>='%(to_date)s', '%(to_date)s', \ +IFNULL(terminated_at, '%(to_date)s')))*memory_mb) as memory_usage, \ +sum(TIMESTAMPDIFF(SECOND, IF(launched_at<='%(from_date)s', '%(from_date)s', \ +IFNULL(launched_at, '%(from_date)s')), IF(terminated_at>='%(to_date)s', \ +'%(to_date)s', IFNULL(terminated_at, '%(to_date)s')))*vcpus) as vcpus_usage \ +from nova.instances where %(prj_ids)s launched_at is not NULL and \ +launched_at<='%(to_date)s' and (terminated_at>='%(from_date)s' or \ +terminated_at is NULL) group by user_id, project_id\ +""" % {"prj_ids": ids, "from_date": from_date, "to_date": to_date} + + result = connection.execute(QUERY) + + # LOG.info("QUERY %s\n" % QUERY) + # print("from_date %s\n" % from_date) + # print("to_date %s\n" % to_date) + + prj_id = 0 + user_id = 0 + project = None + + # for row in result.fetchall(): + for row in result: + # LOG.info("row=%s" % row) + user_id = row[0] + prj_id = row[1] + + if (prj_ids is not None and prj_id not in prj_ids): + LOG.warn("project not found: %s" % prj_id) + continue + + if prj_id not in resource_usage: + resource_usage[prj_id] = {} + + project = resource_usage.get(prj_id) + + project[user_id] = {"ram": float(row[2]), + "cores": float(row[3])} + + except SQLAlchemyError as ex: + raise Exception(ex.message) + finally: + connection.close() + + return resource_usage + + def getProjectUsage(self, prj_id): + connection = self.db_engine.connect() + usage = {"instances": [], "cores": 0, "ram": 0} + + try: + # retrieve the amount of resources in terms of cores + # and ram the specified project is consuming + QUERY = """select uuid, vcpus, memory_mb from nova.instances \ +where project_id='%(project_id)s' and deleted_at is NULL and (vm_state in \ +('error') or (vm_state in ('active') and terminated_at is NULL))\ +""" % {"project_id": prj_id} + + result = connection.execute(QUERY) + + for row in result.fetchall(): + usage["instances"].append(str(row[0])) + usage["cores"] += row[1] + usage["ram"] += row[2] + except SQLAlchemyError as ex: + raise Exception(ex.message) + finally: + connection.close() + + return usage + + def getExpiredServers(self, prj_id, instances, TTL): + uuids = [] + connection = self.db_engine.connect() + + try: + # retrieve all expired instances for the specified + # project and expiration time + ids = "'%s'" % "', '".join(instances) + + QUERY = """select uuid from nova.instances where project_id = \ +'%(project_id)s' and deleted_at is NULL and (vm_state in ('error') or \ +(uuid in (%(instances)s) and ((vm_state in ('active') and terminated_at is \ +NULL and timestampdiff(minute, launched_at, utc_timestamp()) >= \ +%(expiration)s) or (vm_state in ('building') and task_state in ('scheduling') \ +and created_at != updated_at and timestampdiff(minute, updated_at, \ +utc_timestamp()) >= 20))))""" % {"project_id": prj_id, + "instances": ids, + "expiration": TTL} + + # LOG.info(QUERY) + result = connection.execute(QUERY) + + for row in result.fetchall(): + uuids.append(row[0]) + except SQLAlchemyError as ex: + raise Exception(ex.message) + finally: + connection.close() + + return uuids diff --git a/synergy_scheduler_manager/queue_manager.py b/synergy_scheduler_manager/queue_manager.py new file mode 100644 index 0000000..7ae231d --- /dev/null +++ b/synergy_scheduler_manager/queue_manager.py @@ -0,0 +1,466 @@ +import heapq +import json +import logging +import threading + +from datetime import datetime + +try: + from oslo_config import cfg +except ImportError: + from oslo.config import cfg + +from sqlalchemy import create_engine +from sqlalchemy.exc import SQLAlchemyError + +from synergy.common.manager import Manager + + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class QueueItem(object): + + def __init__(self, id, user_id, prj_id, priority, + retry_count, creation_time, last_update, data=None): + self.id = id + self.user_id = user_id + self.prj_id = prj_id + self.priority = priority + self.retry_count = retry_count + self.creation_time = creation_time + self.last_update = last_update + self.data = data + + def getId(self): + return self.id + + def setId(self, id): + self.id = id + + def getUserId(self): + return self.user_id + + def setUserId(self, user_id): + self.user_id = user_id + + def getProjectId(self): + return self.prj_id + + def setProjectId(self, prj_id): + self.prj_id = prj_id + + def getPriority(self): + return self.priority + + def setPriority(self, priority): + self.priority = priority + + def getRetryCount(self): + return self.retry_count + + def setRetryCount(self, retry_count): + self.retry_count = retry_count + + def incRetryCount(self): + self.retry_count += 1 + + def getCreationTime(self): + return self.creation_time + + def setCreationTime(self, creation_time): + self.creation_time = creation_time + + def getLastUpdate(self): + return self.last_update + + def setLastUpdate(self, last_update): + self.last_update = last_update + + def getData(self): + return self.data + + def setData(self, data): + self.data + + +class PriorityQueue(object): + + def __init__(self): + self.queue = [] + self._index = 0 + + def put(self, priority, item): + heapq.heappush(self.queue, (-priority, self._index, item)) + self._index += 1 + + def get(self): + return heapq.heappop(self.queue)[-1] + + def size(self): + return len(self.queue) + + +class Queue(object): + + def __init__(self, name, db_engine, fairshare_manager=None): + self.name = name + self.db_engine = db_engine + self.fairshare_manager = fairshare_manager + self.is_closed = False + self.priority_updater = None + self.condition = threading.Condition() + self.pqueue = PriorityQueue() + self.createTable() + self.buildFromDB() + + def getName(self): + return self.name + + def getSize(self): + connection = self.db_engine.connect() + + try: + QUERY = "select count(*) from `%s`" % self.name + result = connection.execute(QUERY) + + row = result.fetchone() + + return row[0] + except SQLAlchemyError as ex: + raise Exception(ex.message) + finally: + connection.close() + + def createTable(self): + TABLE = """CREATE TABLE IF NOT EXISTS `%s` (`id` BIGINT NOT NULL \ +AUTO_INCREMENT PRIMARY KEY, `priority` INT DEFAULT 0, user_id CHAR(40) \ +NOT NULL, prj_id CHAR(40) NOT NULL, `retry_count` INT DEFAULT 0, \ +`creation_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `last_update` \ +TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.name + + connection = self.db_engine.connect() + + try: + connection.execute(TABLE) + except SQLAlchemyError as ex: + raise Exception(ex.message) + except Exception as ex: + raise Exception(ex.message) + finally: + connection.close() + + def close(self): + if not self.is_closed: + self.is_closed = True + + with self.condition: + self.condition.notifyAll() + + def isClosed(self): + return self.is_closed + + def buildFromDB(self): + connection = self.db_engine.connect() + + try: + QUERY = "select id, user_id, prj_id, priority, retry_count, " \ + "creation_time, last_update from `%s`" % self.name + result = connection.execute(QUERY) + + for row in result: + queue_item = QueueItem(row[0], row[1], row[2], + row[3], row[4], row[5], row[6]) + + self.pqueue.put(row[3], queue_item) + except SQLAlchemyError as ex: + raise Exception(ex.message) + finally: + connection.close() + + with self.condition: + self.condition.notifyAll() + + def insertItem(self, user_id, prj_id, priority, data): + with self.condition: + idRecord = -1 + QUERY = "insert into `%s` (user_id, prj_id, priority, " \ + "data) values" % self.name + QUERY += "(%s, %s, %s, %s)" + + connection = self.db_engine.connect() + trans = connection.begin() + + try: + result = connection.execute(QUERY, + [user_id, prj_id, priority, + json.dumps(data)]) + + idRecord = result.lastrowid + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + raise Exception(ex.message) + finally: + connection.close() + + now = datetime.now() + queue_item = QueueItem(idRecord, user_id, prj_id, + priority, 0, now, now) + + self.pqueue.put(priority, queue_item) + + self.condition.notifyAll() + + def reinsertItem(self, queue_item): + with self.condition: + self.pqueue.put(queue_item.getPriority(), queue_item) + self.condition.notifyAll() + + def getItem(self): + item = None + queue_item = None + + with self.condition: + while (queue_item is None and not self.is_closed): + if self.pqueue.size() > 0: + queue_item = self.pqueue.get() + + # self.pqueue.task_done() + else: + self.condition.wait() + + if (not self.is_closed and queue_item is not None): + connection = self.db_engine.connect() + + try: + QUERY = """select user_id, prj_id, priority, \ +retry_count, creation_time, last_update, data from `%s`""" % self.name + QUERY += " where id=%s" + + result = connection.execute(QUERY, [queue_item.getId()]) + + row = result.fetchone() + + item = QueueItem(queue_item.getId(), row[0], row[1], + row[2], row[3], row[4], row[5], + json.loads(row[6])) + except SQLAlchemyError as ex: + raise Exception(ex.message) + finally: + connection.close() + + self.condition.notifyAll() + return item + + def deleteItem(self, queue_item): + if not queue_item: + return + + with self.condition: + connection = self.db_engine.connect() + trans = connection.begin() + + try: + QUERY = "delete from `%s`" % self.name + QUERY += " where id=%s" + + connection.execute(QUERY, [queue_item.getId()]) + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + + raise Exception(ex.message) + finally: + connection.close() + self.condition.notifyAll() + + def updateItem(self, queue_item): + if not queue_item: + return + + with self.condition: + connection = self.db_engine.connect() + trans = connection.begin() + + try: + queue_item.setLastUpdate(datetime.now()) + + QUERY = "update `%s`" % self.name + QUERY += " set priority=%s, retry_count=%s, " \ + "last_update=%s where id=%s" + + connection.execute(QUERY, [queue_item.getPriority(), + queue_item.getRetryCount(), + queue_item.getLastUpdate(), + queue_item.getId()]) + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + + raise Exception(ex.message) + finally: + connection.close() + + self.pqueue.put(queue_item.getPriority(), queue_item) + self.condition.notifyAll() + + def updatePriority(self): + if self.fairshare_manager is None: + # LOG.warn("priority_updater not found!!!") + return + + with self.condition: + # now = datetime.now() + queue_items = [] + + connection = self.db_engine.connect() + + while self.pqueue.size() > 0: + queue_item = self.pqueue.get() + priority = queue_item.getPriority() + + try: + priority = self.fairshare_manager.execute( + "CALCULATE_PRIORITY", + user_id=queue_item.getUserId(), + prj_id=queue_item.getProjectId(), + timestamp=queue_item.getCreationTime(), + retry=queue_item.getRetryCount()) + + queue_item.setPriority(priority) + except Exception as ex: + continue + finally: + queue_items.append(queue_item) + + trans = connection.begin() + + try: + queue_item.setLastUpdate(datetime.now()) + + QUERY = "update `%s`" % self.name + QUERY += " set priority=%s, last_update=%s where id=%s" + + connection.execute(QUERY, [queue_item.getPriority(), + queue_item.getLastUpdate(), + queue_item.getId()]) + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + raise Exception(ex.message) + + connection.close() + + if len(queue_items) > 0: + for queue_item in queue_items: + self.pqueue.put(queue_item.getPriority(), queue_item) + + del queue_items + + self.condition.notifyAll() + + def toDict(self): + queue = {} + queue["name"] = self.name + queue["size"] = self.getSize() + # queue["size"] = self.pqueue.size() + + if self.is_closed: + queue["status"] = "OFF" + else: + queue["status"] = "ON" + + return queue + + +class QueueManager(Manager): + + def __init__(self): + Manager.__init__(self, name="QueueManager") + + self.config_opts = [ + cfg.StrOpt("db_connection", help="the DB url", required=True), + cfg.IntOpt('db_pool_size', default=10, required=False), + cfg.IntOpt('db_max_overflow', default=5, required=False) + ] + + def setup(self): + if self.getManager("FairShareManager") is None: + raise Exception("FairShareManager not found!") + + self.fairshare_manager = self.getManager("FairShareManager") + + self.queue_list = {} + db_connection = CONF.QueueManager.db_connection + pool_size = CONF.QueueManager.db_pool_size + max_overflow = CONF.QueueManager.db_max_overflow + + try: + self.db_engine = create_engine(db_connection, + pool_size=pool_size, + max_overflow=max_overflow) + except Exception as ex: + LOG.error(ex) + raise ex + + def execute(self, command, *args, **kargs): + if command == "CREATE_QUEUE": + return self.createQueue(*args, **kargs) + elif command == "DELETE_QUEUE": + return self.deleteQueue(*args, **kargs) + elif command == "GET_QUEUE": + return self.getQueue(*args, **kargs) + else: + raise Exception("command=%r not supported!" % command) + + def task(self): + for queue in self.queue_list.values(): + queue.updatePriority() + + def destroy(self): + for queue in self.queue_list.values(): + queue.close() + + def createQueue(self, name): + if name not in self.queue_list: + queue = Queue(name, self.db_engine, self.fairshare_manager) + self.queue_list[name] = queue + return queue + else: + raise Exception("the queue %r already exists!" % name) + + def deleteQueue(self, name): + if name not in self.queue_list: + raise Exception("queue %r not found!" % name) + + del self.queue_list[name] + + def getQueue(self, name): + if name not in self.queue_list: + raise Exception("queue %r not found!" % name) + + return self.queue_list[name] diff --git a/synergy_scheduler_manager/quota_manager.py b/synergy_scheduler_manager/quota_manager.py new file mode 100644 index 0000000..f25248e --- /dev/null +++ b/synergy_scheduler_manager/quota_manager.py @@ -0,0 +1,427 @@ +import ConfigParser +import logging +import threading + +try: + from oslo_config import cfg +except ImportError: + from oslo.config import cfg + +from synergy.common.manager import Manager + + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + + +CONF = cfg.CONF +CONFIG = ConfigParser.SafeConfigParser() +LOG = logging.getLogger(__name__) + + +class DynamicQuota(object): + + def __init__(self): + self.exit = False + self.projects = {} + self.ram = {"in_use": 0, "limit": 0} + self.cores = {"in_use": 0, "limit": 0} + self.condition = threading.Condition() + + def setSize(self, cores, ram): + self.ram["limit"] = ram + self.cores["limit"] = cores + + def getSize(self, cores, ram): + return {"cores": self.cores["limit"], "ram": self.ram["limit"]} + + def getProjects(self): + return self.projects + + def getProject(self, prj_id): + return self.projects.get(prj_id, None) + + def addProject(self, prj_id, prj_name, usage=None, TTL=0): + if prj_id not in self.projects: + with self.condition: + project = {"name": prj_name, + "cores": 0, + "ram": 0, + "instances": {"active": [], "pending": []}, + "TTL": 0} + + if usage is not None: + project["cores"] = usage["cores"] + project["ram"] = usage["ram"] + project["instances"]["active"].extend(usage["instances"]) + + self.ram["in_use"] += project["ram"] + self.cores["in_use"] += project["cores"] + + self.projects[prj_id] = project + self.condition.notifyAll() + + def removeProject(self, prj_id): + if prj_id in self.projects: + with self.condition: + project = self.projects[prj_id] + + self.ram["in_use"] -= project["ram"] + self.cores["in_use"] -= project["cores"] + + del self.projects[prj_id] + self.condition.notifyAll() + + return True + return False + + def close(self): + self.exit = True + + def allocate(self, instance_id, prj_id, cores, ram, blocking=True): + if prj_id not in self.projects: + return + + project = self.projects[prj_id] + + if project is None: + return + + found = False + + with self.condition: + if instance_id in project["instances"]["active"]: + found = True + elif instance_id in project["instances"]["pending"]: + found = True + else: + project["instances"]["pending"].append(instance_id) + + while (not self.exit and not found and + instance_id in project["instances"]["pending"]): + + LOG.debug("allocate instance_id=%s project=%s cores=%s " + "ram=%s [vcpu in use %s of %s; ram in use %s of %s]" + % (instance_id, + project["name"], + cores, + ram, + self.cores["in_use"], + self.cores["limit"], + self.ram["in_use"], + self.ram["limit"])) + + if (self.cores["limit"] - self.cores["in_use"] >= cores) and \ + (self.ram["limit"] - self.ram["in_use"] >= ram): + self.cores["in_use"] += cores + self.ram["in_use"] += ram + project["cores"] += cores + project["ram"] += ram + + found = True + project["instances"]["active"].append(instance_id) + project["instances"]["pending"].remove(instance_id) + + LOG.info("allocated instance_id=%s project=%s cores=%s ram" + "=%s [vcpu in use %s of %s; ram in use %s of %s]" + % (instance_id, + project["name"], + cores, + ram, + self.cores["in_use"], + self.cores["limit"], + self.ram["in_use"], + self.ram["limit"])) + elif blocking: + LOG.info("allocate wait!!!") + self.condition.wait() + + self.condition.notifyAll() + + return found + + def release(self, instance_id, prj_id, cores, ram): + if prj_id not in self.projects: + return + + project = self.projects[prj_id] + + LOG.debug("release instance_id=%s project=%s cores=%s " + "ram=%s [vcpu in use %s of %s; ram in use %s of %s]" + % (instance_id, + project["name"], + cores, + ram, + self.cores["in_use"], + self.cores["limit"], + self.ram["in_use"], + self.ram["limit"])) + + with self.condition: + if instance_id in project["instances"]["pending"]: + project["instances"]["pending"].remove(instance_id) + elif instance_id in instance_id in project["instances"]["active"]: + if self.cores["in_use"] - cores < 0: + self.cores["in_use"] = 0 + else: + self.cores["in_use"] -= cores + + if self.ram["in_use"] - ram < 0: + self.ram["in_use"] = 0 + else: + self.ram["in_use"] -= ram + + if project["cores"] - cores < 0: + project["cores"] = 0 + else: + project["cores"] -= cores + + if project["ram"] - ram < 0: + project["ram"] = 0 + else: + project["ram"] -= ram + + project["instances"]["active"].remove(instance_id) + + LOG.info("released instance_id=%s project=%s cores=%s " + "ram=%s [vcpu in use %s of %s; ram in use %s of %s]" + % (instance_id, + project["name"], + cores, + ram, + self.cores["in_use"], + self.cores["limit"], + self.ram["in_use"], + self.ram["limit"])) + else: + LOG.debug("release: instance '%s' not found!" % (instance_id)) + + self.condition.notifyAll() + + def toDict(self): + quota = {} + quota["ram"] = self.ram + quota["cores"] = self.cores + quota["projects"] = self.projects + + return quota + + +class QuotaManager(Manager): + + def __init__(self): + Manager.__init__(self, name="QuotaManager") + + def setup(self): + try: + self.dynamic_quota = DynamicQuota() + + if self.getManager("NovaManager") is None: + raise Exception("NovaManager not found!") + + if self.getManager("KeystoneManager") is None: + raise Exception("KeystoneManager not found!") + + self.nova_manager = self.getManager("NovaManager") + self.keystone_manager = self.getManager("KeystoneManager") + self.listener = None + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error(ex) + + def destroy(self): + LOG.info("destroy invoked!") + self.dynamic_quota.close() + + def execute(self, command, *args, **kargs): + if command == "ADD_PROJECT": + return self.addProject(*args, **kargs) + elif command == "GET_PROJECT": + return self.getProject(*args, **kargs) + elif command == "REMOVE_PROJECT": + return self.removeProject(*args, **kargs) + elif command == "GET_DYNAMIC_QUOTA": + return self.dynamic_quota + else: + raise Exception("command=%r not supported!" % command) + + def task(self): + try: + self.updateDynamicQuota() + self.deleteExpiredServices() + except Exception as ex: + LOG.error(ex) + + def getProject(self, prj_id): + return self.dynamic_quota.getProject(prj_id) + + def addProject(self, prj_id, prj_name): + try: + quota = {"cores": -1, "ram": -1, "instances": -1} + self.nova_manager.execute("UPDATE_QUOTA", prj_id, quota) + + usage = self.nova_manager.execute("GET_PROJECT_USAGE", prj_id) + self.dynamic_quota.addProject(prj_id, prj_name, usage) + + self.updateDynamicQuota() + except Exception as ex: + LOG.error(ex) + raise ex + + def removeProject(self, prj_id, destroy=False): + project = self.dynamic_quota.getProject(prj_id) + if project is None: + return + + try: + if destroy: + ids = [] + ids.extend(project["instances"]["active"]) + ids.extend(project["instances"]["pending"]) + + for instance_id in ids: + self.nova_manager.execute("DELETE_SERVER", instance_id) + + quota = self.nova_manager.execute("GET_QUOTA", defaults=True) + self.nova_manager.execute("UPDATE_QUOTA", prj_id, quota) + + self.dynamic_quota.removeProject(prj_id) + + self.updateDynamicQuota() + except Exception as ex: + LOG.error(ex) + raise ex + + def deleteExpiredServices(self): + for prj_id, project in self.dynamic_quota.projects.items(): + instance_ids = project["instances"]["active"] + TTL = project["TTL"] + + if project["TTL"] == 0: + continue + + try: + expired_ids = self.nova_manager.execute("GET_EXPIRED_SERVERS", + prj_id=prj_id, + instances=instance_ids, + expiration=TTL) + + for instance_id in expired_ids: + self.nova_manager.execute("DELETE_SERVER", instance_id) + except Exception as ex: + LOG.error(ex) + raise ex + + def updateDynamicQuota(self): + # calculate the the total limit per cores and ram + total_ram = float(0) + total_cores = float(0) + static_ram = float(0) + static_cores = float(0) + dynamic_ram = float(0) + dynamic_cores = float(0) + + try: + cpu_ratio = self.nova_manager.execute("GET_PARAMETER", + name="cpu_allocation_ratio", + default=float(16)) + + ram_ratio = self.nova_manager.execute("GET_PARAMETER", + name="ram_allocation_ratio", + default=float(16)) + + quota_default = self.nova_manager.execute("GET_QUOTA", + defaults=True) + + hypervisors = self.nova_manager.execute("GET_HYPERVISORS") + + for hypervisor in hypervisors: + if hypervisor["status"] == "enabled" and \ + hypervisor["state"] == "up": + info = self.nova_manager.execute("GET_HYPERVISOR", + hypervisor["id"]) + + total_ram += info["memory_mb"] + total_cores += info["vcpus"] + + total_ram *= float(ram_ratio) + total_cores *= float(cpu_ratio) + + # LOG.info("total_ram=%s total_cores=%s" + # % (total_ram, total_cores)) + + kprojects = self.keystone_manager.execute("GET_PROJECTS") + + for project in kprojects: + prj_id = project["id"] + # prj_name = str(project["name"]) + + if self.dynamic_quota.getProject(prj_id) is None: + quota = self.nova_manager.execute("GET_QUOTA", prj_id) + + if quota["cores"] == -1 and quota["ram"] == -1: + quota["cores"] = quota_default["cores"] + quota["ram"] = quota_default["ram"] + + try: + self.nova_manager.execute("UPDATE_QUOTA", + prj_id, + quota_default) + except Exception as ex: + LOG.error(ex) + + static_cores += quota["cores"] + static_ram += quota["ram"] + + enabled = False + + if total_cores < static_cores: + LOG.warn("dynamic quota: the total statically " + "allocated cores (%s) is greater than the total " + "amount of cores allowed (%s)" + % (static_cores, total_cores)) + else: + enabled = True + dynamic_cores = total_cores - static_cores + + if total_ram < static_ram: + enabled = False + LOG.warn("dynamic quota: the total statically " + "allocated ram (%s) is greater than the total " + "amount of ram allowed (%s)" + % (static_ram, total_ram)) + else: + enabled = True + dynamic_ram = total_ram - static_ram + + if enabled: + LOG.info("dynamic quota: cores=%s ram=%s" + % (dynamic_cores, dynamic_ram)) + + self.dynamic_quota.setSize(dynamic_cores, dynamic_ram) + + """ + LOG.info("cpu_ratio=%s, ram_ratio=%s" % (cpu_ratio, ram_ratio)) + LOG.info("total_cores=%s total_ram=%s" % (total_cores, total_ram)) + LOG.info("static cores=%s ram=%s" % (static_cores, static_ram)) + LOG.info("dynamic cores=%s ram=%s" % (dynamic_cores, dynamic_ram)) + """ + LOG.debug("dynamic quota %s" % self.dynamic_quota.toDict()) + except Exception as ex: + LOG.error(ex) + raise ex diff --git a/synergy_scheduler_manager/scheduler_manager.py b/synergy_scheduler_manager/scheduler_manager.py new file mode 100644 index 0000000..4e210ee --- /dev/null +++ b/synergy_scheduler_manager/scheduler_manager.py @@ -0,0 +1,496 @@ +import logging +import re +import threading + +from datetime import datetime + +try: + from oslo_config import cfg +except ImportError: + from oslo.config import cfg + +from synergy.common.manager import Manager + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class Notifications(object): + + def __init__(self, dynamic_quota): + super(Notifications, self).__init__() + + self.dynamic_quota = dynamic_quota + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.debug("Notification INFO: event_type=%s payload=%s" + % (event_type, payload)) + + if payload is None or "state" not in payload: + return + + state = payload["state"] + instance_id = payload["instance_id"] + + if ((event_type == "compute.instance.delete.end" and + (state == "deleted" or state == "error" or state == "building")) or + (event_type == "compute.instance.update" and + (state == "deleted" or state == "error")) or + (event_type == "scheduler.run_instance" and state == "error")): + ram = 0 + cores = 0 + prj_id = None + instance_info = None + + if event_type == "scheduler.run_instance": + instance_info = payload["request_spec"]["instance_type"] + else: + instance_info = payload + + prj_id = instance_info["tenant_id"] + instance_id = instance_info["instance_id"] + ram = instance_info["memory_mb"] + cores = instance_info["vcpus"] + # disk = instance_info["root_gb"] + # node = instance_info["node"] + + LOG.debug("Notification INFO (type=%s state=%s): cores=%s ram=%s " + "prj_id=%s instance_id=%s" + % (event_type, state, cores, ram, prj_id, instance_id)) + + try: + self.dynamic_quota.release(instance_id, prj_id, cores, ram) + except Exception as ex: + LOG.warn("Notification INFO: %s" % ex) + + def warn(self, ctxt, publisher_id, event_type, payload, metadata): + # LOG.info("Notification WARN: event_type=%s payload=%s metadata=%s" + # % (event_type, payload, metadata)) + + state = payload["state"] + instance_id = payload["instance_id"] + LOG.info("Notification WARN: event_type=%s state=%s instance_id=%s" + % (event_type, state, instance_id)) + + def error(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.info("Notification ERROR: event_type=%s payload=%s metadata=%s" + % (event_type, payload, metadata)) + + +class Worker(threading.Thread): + + def __init__(self, name, queue, quota, nova_manager): + super(Worker, self).__init__() + self.setDaemon(True) + + self.name = name + self.queue = queue + self.quota = quota + self.nova_manager = nova_manager + self.exit = False + LOG.info("Worker %r created!" % self.name) + + def getName(self): + return self.name + + def destroy(self): + try: + # if self.queue: + self.queue.close() + + self.exit = True + except Exception as ex: + LOG.error(ex) + raise ex + + def run(self): + LOG.info("Worker %r running!" % self.name) + + while not self.exit and not self.queue.isClosed(): + try: + queue_item = self.queue.getItem() + except Exception as ex: + LOG.error("Worker %r: %s" % (self.name, ex)) + # self.exit = True + # break + continue + + if queue_item is None: + continue + + try: + request = queue_item.getData() + + instance = request["instance"] + # user_id = instance["nova_object.data"]["user_id"] + prj_id = instance["nova_object.data"]["project_id"] + uuid = instance["nova_object.data"]["uuid"] + vcpus = instance["nova_object.data"]["vcpus"] + memory_mb = instance["nova_object.data"]["memory_mb"] + context = request["context"] + filter_properties = request["filter_properties"] + admin_password = request["admin_password"] + injected_files = request["injected_files"] + requested_networks = request["requested_networks"] + security_groups = request["security_groups"] + block_device_mapping = request["block_device_mapping"] + legacy_bdm = request["legacy_bdm"] + image = request["image"] + + try: + # vm_instance = self.novaConductorAPI.instance_get_by_uuid + # (context, instance_uuid=instance_uuid) + server = self.nova_manager.execute("GET_SERVER", + id=uuid) + except Exception as ex: + LOG.warn("Worker %s: server %r not found! reason=%s" + % (self.name, uuid, ex)) + self.queue.deleteItem(queue_item) + + self.quota.release(instance_id=uuid, + prj_id=prj_id, + cores=vcpus, + ram=memory_mb) + continue + + if server["OS-EXT-STS:vm_state"] != "building" or \ + server["OS-EXT-STS:task_state"] != "scheduling": + self.queue.deleteItem(queue_item) + + self.quota.release(instance_id=uuid, + prj_id=prj_id, + cores=vcpus, + ram=memory_mb) + continue + + # LOG.info(request_spec) + + # if (self.quota.reserve(instance_uuid, vcpus, memory_mb)): + # done = False + + if self.quota.allocate(instance_id=uuid, + prj_id=prj_id, + cores=vcpus, + ram=memory_mb, + blocking=True): + try: + self.nova_manager.execute( + "BUILD_SERVER", + context=context, + instance=instance, + image=image, + filter_properties=filter_properties, + admin_password=admin_password, + injected_files=injected_files, + requested_networks=requested_networks, + security_groups=security_groups, + block_device_mapping=block_device_mapping, + legacy_bdm=legacy_bdm) + + LOG.info("Worker %r: server (instance_id=%s) build OK" + % (self.name, uuid)) + except Exception as ex: + LOG.error("Worker %r: error on building the server " + "(instance_id=%s) reason=%s" + % (self.name, uuid, ex)) + + self.quota.release(instance_id=uuid, + prj_id=prj_id, + cores=vcpus, + ram=memory_mb) + + self.queue.deleteItem(queue_item) + except Exception as ex: + LOG.error("Worker '%s': %s" % (self.name, ex)) + # self.queue.reinsertItem(queue_item) + + continue + + # LOG.info("Worker done is %s" % done) + + # LOG.info(">>>> Worker '%s' queue.isClosed %s exit=%s" + # % (self.name, self.queue.isClosed(), self.exit)) + LOG.info("Worker '%s' destroyed!" % self.name) + + +class SchedulerManager(Manager): + + def __init__(self): + Manager.__init__(self, name="SchedulerManager") + + self.config_opts = [ + cfg.FloatOpt('default_TTL', default=10.0), + cfg.ListOpt("projects", default=[], help="the projects list"), + cfg.ListOpt("shares", default=[], help="the shares list"), + cfg.ListOpt("TTLs", default=[], help="the TTLs list"), + ] + + def setup(self): + if self.getManager("NovaManager") is None: + raise Exception("NovaManager not found!") + + if self.getManager("QueueManager") is None: + raise Exception("QueueManager not found!") + + if self.getManager("QuotaManager") is None: + raise Exception("QuotaManager not found!") + + if self.getManager("KeystoneManager") is None: + raise Exception("KeystoneManager not found!") + + if self.getManager("FairShareManager") is None: + raise Exception("FairShareManager not found!") + + self.nova_manager = self.getManager("NovaManager") + self.queue_manager = self.getManager("QueueManager") + self.quota_manager = self.getManager("QuotaManager") + self.keystone_manager = self.getManager("KeystoneManager") + self.fairshare_manager = self.getManager("FairShareManager") + self.default_TTL = float(CONF.SchedulerManager.default_TTL) + self.fairshare_manager = self.getManager("FairShareManager") + self.projects = {} + self.workers = [] + self.listener = None + self.exit = False + + try: + self.dynamic_quota = self.quota_manager.execute( + "GET_DYNAMIC_QUOTA") + + k_projects = self.keystone_manager.execute("GET_PROJECTS") + + for k_project in k_projects: + prj_id = str(k_project["id"]) + prj_name = str(k_project["name"]) + + if prj_name in CONF.SchedulerManager.projects: + CONF.SchedulerManager.projects.remove(prj_name) + + self.projects[prj_name] = {"id": prj_id, + "name": prj_name, + "type": "dynamic", + "share": float(0), + "TTL": self.default_TTL} + + if len(CONF.SchedulerManager.projects) > 0: + raise Exception("projects %s not found" + % CONF.SchedulerManager.projects) + + for prj_ttl in CONF.SchedulerManager.TTLs: + prj_name, TTL = self.parseAttribute(prj_ttl) + self.projects[prj_name]["TTL"] = TTL + + for prj_share in CONF.SchedulerManager.shares: + prj_name, share = self.parseAttribute(prj_share) + self.projects[prj_name]["share"] = share + + for project in self.projects.values(): + prj_id = project["id"] + prj_name = project["name"] + prj_share = project["share"] + + del self.projects[prj_name] + self.projects[prj_id] = project + + quota = {"cores": -1, "ram": -1, "instances": -1} + + self.nova_manager.execute("UPDATE_QUOTA", + id=prj_id, + data=quota) + + self.quota_manager.execute("ADD_PROJECT", + prj_id=prj_id, + prj_name=prj_name) + + self.fairshare_manager.execute("ADD_PROJECT", + prj_id=prj_id, + prj_name=prj_name, + share=prj_share) + try: + self.dynamic_queue = self.queue_manager.execute("CREATE_QUEUE", + name="DYNAMIC") + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error(ex) + + self.dynamic_queue = self.queue_manager.execute("GET_QUEUE", + name="DYNAMIC") + + dynamic_worker = Worker(name="DYNAMIC", + queue=self.dynamic_queue, + quota=self.dynamic_quota, + nova_manager=self.nova_manager) + dynamic_worker.start() + + self.workers.append(dynamic_worker) + + print(self.projects) + print(self.dynamic_quota.toDict()) + + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error(ex) + raise ex + + def parseAttribute(self, attribute): + if attribute is None: + return None + + prj_name = None + value = float(0) + + parsed_attribute = re.split('=', attribute) + + if len(parsed_attribute) > 1: + if not parsed_attribute[-1].isdigit(): + raise Exception("wrong value %r found in %r!" + % (parsed_attribute[-1], parsed_attribute)) + + if len(parsed_attribute) == 2: + prj_name = parsed_attribute[0] + value = float(parsed_attribute[1]) + else: + raise Exception("wrong attribute definition: %r" + % parsed_attribute) + else: + raise Exception("wrong attribute definition: %r" + % parsed_attribute) + + return (prj_name, value) + + def execute(self, command, *args, **kargs): + if command == "PROCESS_REQUEST": + return self.processRequest(*args, **kargs) + else: + raise Exception("command=%r not supported!" % command) + + def task(self): + if self.listener is None: + self.notifications = Notifications(self.dynamic_quota) + + target = self.nova_manager.execute("GET_TARGET", + topic='notifications', + exchange="nova") + self.listener = self.nova_manager.execute( + "GET_NOTIFICATION_LISTENER", + targets=[target], + endpoints=[self.notifications]) + + LOG.info("listener created") + + self.listener.start() + for prj_id, project in self.dynamic_quota.getProjects().items(): + instances = project["instances"]["active"] + TTL = self.projects[prj_id]["TTL"] + uuids = self.nova_manager.execute("GET_EXPIRED_SERVERS", + prj_id=prj_id, + instances=instances, + TTL=TTL) + + for uuid in uuids: + LOG.info("deleting the expired instance %r from project=%s" + % (uuid, prj_id)) + self.nova_manager.execute("DELETE_SERVER", id=uuid) + + def destroy(self): + if self.workers: + for queue_worker in self.workers: + queue_worker.destroy() + + def processRequest(self, request): + try: + filter_properties = request["filter_properties"] + instance = request["instance"] + user_id = instance["nova_object.data"]["user_id"] + prj_id = instance["nova_object.data"]["project_id"] + uuid = instance["nova_object.data"]["uuid"] + vcpus = instance["nova_object.data"]["vcpus"] + memory_mb = instance["nova_object.data"]["memory_mb"] + + if prj_id in self.projects: + # prj_name = self.projects[prj_id]["name"] + # metadata = instance["nova_object.data"]["metadata"] + timestamp = instance["nova_object.data"]["created_at"] + timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ") + priority = 0 + + try: + if "retry" in filter_properties: + retry = filter_properties["retry"] + num_attempts = retry["num_attempts"] + + if num_attempts > 0: + self.dynamic_quota.release(instance_id=uuid, + prj_id=prj_id, + cores=vcpus, + ram=memory_mb) + priority = 999999999999 + LOG.info("released resource uuid %s " + "num_attempts %s" % (uuid, num_attempts)) + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error(ex) + + if priority == 0: + priority = self.fairshare_manager.execute( + "CALCULATE_PRIORITY", + user_id=user_id, + prj_id=prj_id, + timestamp=timestamp, + retry=0) + + self.dynamic_queue.insertItem(user_id, + prj_id, + priority=priority, + data=request) + + LOG.info("new request: instance_id=%s user_id=%s prj_id=%s " + "priority=%s type=dynamic" % (uuid, user_id, + prj_id, priority)) + + else: + context = request["context"] + admin_password = request["admin_password"] + injected_files = request["injected_files"] + requested_networks = request["requested_networks"] + security_groups = request["security_groups"] + block_device_mapping = request["block_device_mapping"] + legacy_bdm = request["legacy_bdm"] + image = request["image"] + + self.nova_manager.execute( + "BUILD_SERVER", + context=context, + instance=instance, + image=image, + filter_properties=filter_properties, + admin_password=admin_password, + injected_files=injected_files, + requested_networks=requested_networks, + security_groups=security_groups, + block_device_mapping=block_device_mapping, + legacy_bdm=legacy_bdm) + + LOG.info("new request: instance_id=%s user_id=%s " + "prj_id=%s type=static" % (uuid, user_id, prj_id)) + except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + LOG.error(ex) diff --git a/synergy_scheduler_manager/tests/__init__.py b/synergy_scheduler_manager/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/synergy_scheduler_manager/tests/base.py b/synergy_scheduler_manager/tests/base.py new file mode 100644 index 0000000..bc2d9c8 --- /dev/null +++ b/synergy_scheduler_manager/tests/base.py @@ -0,0 +1,18 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslotest import base + + +class TestCase(base.BaseTestCase): + + """Test case base class for all unit tests.""" diff --git a/synergy_scheduler_manager/tests/test_dynamic_quota.py b/synergy_scheduler_manager/tests/test_dynamic_quota.py new file mode 100644 index 0000000..9f25dbd --- /dev/null +++ b/synergy_scheduler_manager/tests/test_dynamic_quota.py @@ -0,0 +1,26 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from synergy_scheduler_manager.quota_manager import DynamicQuota +from synergy_scheduler_manager.tests import base + + +class TestDynamicQuota(base.TestCase): + + def setUp(self): + super(TestDynamicQuota, self).setUp() + self.dyn_quota = DynamicQuota() + + def test_add_project(self): + project_id = 1 + self.dyn_quota.addProject(project_id, "test_project") + self.assertIn(project_id, self.dyn_quota.getProjects()) diff --git a/test-requirements.txt b/test-requirements.txt new file mode 100644 index 0000000..21a7e3b --- /dev/null +++ b/test-requirements.txt @@ -0,0 +1,14 @@ +# The order of packages is significant, because pip processes them in the order +# of appearance. Changing the order has an impact on the overall integration +# process, which may cause wedges in the gate later. + +hacking<0.11,>=0.10.0 + +coverage>=3.6 +python-subunit>=0.0.18 +sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 +oslosphinx>=2.5.0 # Apache-2.0 +oslotest>=1.10.0 # Apache-2.0 +testrepository>=0.0.18 +testscenarios>=0.4 +testtools>=1.4.0 diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..a1cec90 --- /dev/null +++ b/tox.ini @@ -0,0 +1,60 @@ +[tox] +minversion = 2.0 +envlist = py27-constraints,pep8-constraints +skipsdist = True + +[testenv] +usedevelop = True +install_command = + constraints: {[testenv:common-constraints]install_command} + pip install -U {opts} {packages} +setenv = + VIRTUAL_ENV={envdir} +deps = -r{toxinidir}/test-requirements.txt +commands = python setup.py test --slowest --testr-args='{posargs}' + +[testenv:common-constraints] +install_command = pip install -c{env:UPPER_CONSTRAINTS_FILE:https://git.openstack.org/cgit/openstack/requirements/plain/upper-constraints.txt} {opts} {packages} + +[testenv:pep8] +commands = flake8 {posargs} + +[testenv:pep8-constraints] +install_command = {[testenv:common-constraints]install_command} +commands = flake8 {posargs} + +[testenv:venv] +commands = {posargs} + +[testenv:venv-constraints] +install_command = {[testenv:common-constraints]install_command} +commands = {posargs} + +[testenv:cover] +commands = python setup.py test --coverage --testr-args='{posargs}' + +[testenv:cover-constraints] +install_command = {[testenv:common-constraints]install_command} +commands = python setup.py test --coverage --testr-args='{posargs}' + +[testenv:docs] +commands = python setup.py build_sphinx + +[testenv:docs-constraints] +install_command = {[testenv:common-constraints]install_command} +commands = python setup.py build_sphinx + +[testenv:debug] +commands = oslo_debug_helper {posargs} + +[testenv:debug-constraints] +install_command = {[testenv:common-constraints]install_command} +commands = oslo_debug_helper {posargs} + +[flake8] +# E123, E125 skipped as they are invalid PEP-8. + +show-source = True +ignore = E123,E125 +builtins = _ +exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build