Partition Director requires configuring Synergy through RESTful API

Partition Director, the INDIGO-DataCloud service, requires the setting
of the Synergy configuration (e.g. the list of projects allowed the
access the shared quota, their values, etc) through the RESTful API.
This implies some changes at configuration level of Synergy: some
parameters must be drop out from the synergy_scheduler.conf file and
the stored into a database. The use of the database for storing
configuration parameters will be useful even for configuring Synergy
in high availability mode.

Bug: #1690123
Change-Id: Id8f9c6b0e0a2804b43984f7353dc3fc0882cd651
Sem-Ver: feature
This commit is contained in:
Lisa Zangrando 2017-07-19 12:39:53 +02:00
parent 23e9382c7d
commit 94456be8fc
9 changed files with 860 additions and 527 deletions

View File

@ -7,21 +7,6 @@ autostart = True
# set the manager rate (minutes)
rate = 1
# set the list of projects accessing to the shared quota
# projects = prj_a, prj_b
#projects =
# set the projects share
# shares = prj_a=70, prj_b=30
#shares =
# set the default max time to live (minutes) for VM/Container
default_TTL = 2880
# set, for the specified projects, the max time to live (minutes) for VM/Container
# TTLs = prj_a=1440, prj_b=2880
#TTLs =
# set the max depth used by the backfilling strategy (default: 100)
# this allows Synergy to not check the whole queue when looking for VMs to start
backfill_depth = 100
@ -200,3 +185,30 @@ autostart = True
# set the manager rate (minutes)
rate = 5
[ProjectManager]
autostart = True
# set the manager rate (minutes)
rate = 60
# set the Synergy database connection:
db_connection = DIALECT+DRIVER://USER:PASSWORD@DB_HOST/synergy
# set the connection pool size (default: 10)
db_pool_size = 10
# set the number of seconds after which a connection is automatically
# recycled (default: 30)
db_pool_recycle = 30
# set the max overflow (default: 5)
db_max_overflow = 5
# set the default max time to live (minutes) for VM/Container
default_TTL = 2880
# set the default share value (default: 10)
default_share = 10

View File

@ -27,8 +27,10 @@ synergy.managers =
QuotaManager = synergy_scheduler_manager.quota_manager:QuotaManager
FairShareManager = synergy_scheduler_manager.fairshare_manager:FairShareManager
SchedulerManager = synergy_scheduler_manager.scheduler_manager:SchedulerManager
ProjectManager = synergy_scheduler_manager.project_manager:ProjectManager
synergy.commands =
project = synergy_scheduler_manager.client.command:ProjectCommand
quota = synergy_scheduler_manager.client.command:QuotaCommand
queue = synergy_scheduler_manager.client.command:QueueCommand
usage = synergy_scheduler_manager.client.command:UsageCommand

View File

@ -24,6 +24,189 @@ See the License for the specific language governing
permissions and limitations under the License."""
class ProjectCommand(ExecuteCommand):
def __init__(self):
super(ProjectCommand, self).__init__("ProjectCommand")
def configureParser(self, subparser):
prj_parser = subparser.add_parser('project')
prj_subparsers = prj_parser.add_subparsers(dest="command")
prj_subparsers.add_parser("list", add_help=True,
help="shows the projects list")
show_parser = prj_subparsers.add_parser("show", add_help=True,
help="shows the project info")
group = show_parser.add_mutually_exclusive_group(required=True)
group.add_argument("-i", "--id", metavar="<id>")
group.add_argument("-n", "--name", metavar="<name>")
group.add_argument("-a", "--all", action="store_true")
show_parser.add_argument("-r", "--share", action="store_true")
show_parser.add_argument("-t", "--ttl", action="store_true")
show_parser.add_argument("-p", "--p_quota", action="store_true")
show_parser.add_argument("-s", "--s_quota", action="store_true")
show_parser.add_argument("-q", "--queue", action="store_true")
show_parser.add_argument("-l", "--long", action="store_true")
show_parser.add_argument("-u", "--usage", action="store_true")
add_parser = prj_subparsers.add_parser("add", add_help=True,
help="adds a new project")
group = add_parser.add_mutually_exclusive_group(required=True)
group.add_argument("-i", "--id", metavar="<id>")
group.add_argument("-n", "--name", metavar="<name>")
add_parser.add_argument("-s", "--share", metavar="<share>")
add_parser.add_argument("-t", "--ttl", metavar="<TTL>")
remove_parser = prj_subparsers.add_parser("remove", add_help=True,
help="removes a project")
group = remove_parser.add_mutually_exclusive_group(required=True)
group.add_argument("-i", "--id", metavar="<id>")
group.add_argument("-n", "--name", metavar="<name>")
set_parser = prj_subparsers.add_parser("set", add_help=True,
help="sets the project values")
group = set_parser.add_mutually_exclusive_group(required=True)
group.add_argument("-i", "--id", metavar="<id>")
group.add_argument("-n", "--name", metavar="<name>")
set_parser.add_argument("-s", "--share", metavar="<share>")
set_parser.add_argument("-t", "--ttl", metavar="<TTL>")
def execute(self, synergy_url, args):
id = getattr(args, 'id', None)
name = getattr(args, 'name', None)
command = getattr(args, 'command', None)
headers = ["name"]
if command == "list":
cmd_args = {}
command = "GET_PROJECTS"
elif command == "show":
if args.all:
cmd_args = {}
command = "GET_PROJECTS"
else:
cmd_args = {"id": id, "name": name}
command = "GET_PROJECT"
if args.long:
headers.insert(0, "id")
if args.usage:
headers.append("usage")
if args.p_quota:
headers.append("private quota")
if args.s_quota:
headers.append("shared quota")
if args.queue:
headers.append("queue")
if args.share:
headers.append("share")
if args.ttl:
headers.append("TTL")
elif command == "remove":
cmd_args = {"id": id, "name": name}
command = "REMOVE_PROJECT"
else:
cmd_args = {"id": id, "name": name}
TTL = getattr(args, 'ttl', None)
share = getattr(args, 'share', None)
if TTL:
cmd_args["TTL"] = TTL
if share:
cmd_args["share"] = share
if command == "add":
command = "ADD_PROJECT"
headers.append("share")
headers.append("TTL")
elif command == "set":
command = "UPDATE_PROJECT"
if TTL:
headers.append("TTL")
if share:
headers.append("share")
result = super(ProjectCommand, self).execute(synergy_url,
"ProjectManager",
command,
args=cmd_args)
if isinstance(result, Project):
self.printProjects([result], headers)
else:
self.printProjects(result, headers)
def printProjects(self, projects, headers):
if not projects:
return
table = []
for project in projects:
row = []
for attribute in headers:
if attribute == "id":
row.append(project.getId())
if attribute == "name":
row.append(project.getName())
if attribute == "share":
share = project.getShare()
share_value = share.getValue()
share_norm = share.getNormalizedValue()
row.append("{:.2f}% | {:.2f}%".format(share_value,
share_norm * 100))
if attribute == "TTL":
row.append(project.getTTL())
if attribute == "private quota":
quota = project.getQuota()
private = "vcpus: {:.1f} of {:.1f} | ram: "\
"{:.1f} of {:.1f}".format(quota.getUsage("vcpus"),
quota.getSize("vcpus"),
quota.getUsage("memory"),
quota.getSize("memory"))
row.append(private)
if attribute == "shared quota":
quota = project.getQuota()
vcpus_size = quota.getSize("vcpus", private=False)
vcpus_usage = quota.getUsage("vcpus", private=False)
memory_size = quota.getSize("memory", private=False)
memory_usage = quota.getUsage("memory", private=False)
shared = "vcpus: {:.1f} of {:.1f} | "\
"ram: {:.1f} of {:.1f}".format(
vcpus_usage, vcpus_size,
memory_usage, memory_size)
row.append(shared)
if attribute == "usage":
data = project.getData()
usage = "vcpus: {:.2f}% | ram: {:.2f}%".format(
data["effective_vcpus"] * 100,
data["effective_memory"] * 100)
row.append(usage)
table.append(row)
print(tabulate(table, headers, tablefmt="fancy_grid"))
class QueueCommand(ExecuteCommand):
def __init__(self):

View File

@ -1,10 +1,11 @@
import logging
import threading
from datetime import datetime
from datetime import timedelta
from oslo_config import cfg
from synergy.common.manager import Manager
from synergy.exception import SynergyError
__author__ = "Lisa Zangrando"
__email__ = "lisa.zangrando[AT]pd.infn.it"
@ -47,16 +48,13 @@ class FairShareManager(Manager):
def setup(self):
if self.getManager("NovaManager") is None:
raise Exception("NovaManager not found!")
raise SynergyError("NovaManager not found!")
if self.getManager("QueueManager") is None:
raise Exception("QueueManager not found!")
if self.getManager("ProjectManager") is None:
raise SynergyError("ProjectManager not found!")
if self.getManager("QuotaManager") is None:
raise Exception("QuotaManager not found!")
if self.getManager("KeystoneManager") is None:
raise Exception("KeystoneManager not found!")
self.nova_manager = self.getManager("NovaManager")
self.project_manager = self.getManager("ProjectManager")
self.periods = CONF.FairShareManager.periods
self.period_length = CONF.FairShareManager.period_length
@ -65,68 +63,32 @@ class FairShareManager(Manager):
self.vcpus_weight = CONF.FairShareManager.vcpus_weight
self.age_weight = CONF.FairShareManager.age_weight
self.memory_weight = CONF.FairShareManager.memory_weight
self.projects = {}
self.workers = []
self.exit = False
self.nova_manager = self.getManager("NovaManager")
self.queue_manager = self.getManager("QueueManager")
self.quota_manager = self.getManager("QuotaManager")
self.keystone_manager = self.getManager("KeystoneManager")
self.fs_condition = threading.Condition()
if self.decay_weight < 0:
self.decay_weight = float(0)
elif self.decay_weight > 1:
self.decay_weight = float(1)
def execute(self, command, *args, **kargs):
if command == "ADD_PROJECT":
return self.addProject(*args, **kargs)
elif command == "GET_PROJECT":
return self.getProject(*args, **kargs)
elif command == "GET_PROJECTS":
return self.getProjects()
elif command == "REMOVE_PROJECT":
return self.removeProject(*args, **kargs)
elif command == "GET_PRIORITY":
result = {}
for prj_id, project in self.projects.items():
users = {}
for user_id, user in project["users"].items():
p = self.calculatePriority(user_id=user_id, prj_id=prj_id)
users[user["name"]] = p
result[project["name"]] = users
return result
elif command == "CALCULATE_PRIORITY":
return self.calculatePriority(*args, **kargs)
elif command == "CALCULATE_FAIRSHARE":
return self.calculateFairShare(*args, **kargs)
else:
raise Exception("command=%r not supported!" % command)
def task(self):
with self.fs_condition:
try:
self.checkUsers()
self.calculateFairShare()
except Exception as ex:
LOG.error(ex)
raise ex
finally:
self.fs_condition.notifyAll()
try:
self._calculateFairShare()
except SynergyError as ex:
LOG.error(ex)
raise ex
def destroy(self):
pass
def calculatePriority(self, user_id, prj_id, timestamp=None, retry=0):
if prj_id not in self.projects:
raise Exception("project=%s not found!" % prj_id)
project = self.project_manager.getProject(id=prj_id)
if not project:
raise SynergyError("project=%s not found!" % prj_id)
user = project.getUser(id=user_id)
user = self.projects[prj_id].getUser(id=user_id)
if not user:
raise Exception("user=%s not found!" % user_id)
raise SynergyError("user=%s not found!" % user_id)
priority = user.getPriority()
fairshare_vcpus = priority.getFairShare("vcpus")
@ -146,117 +108,45 @@ class FairShareManager(Manager):
return int(priority)
def addProject(self, project):
if self.projects.get(project.getId(), None):
raise Exception("project %s already exists!" % (project.getId()))
def doOnEvent(self, event_type, *args, **kwargs):
if event_type == "USER_ADDED":
user = kwargs.get("user", None)
prj_share = project.getShare()
if prj_share.getValue() == 0:
prj_share.setValue(self.default_share)
if not user:
return
self.projects[project.getId()] = project
share = user.getShare()
def getProject(self, prj_id):
if prj_id not in self.projects:
raise Exception("project name=%r not found!" % prj_id)
if not share or share.getValue() == 0:
share.setValue(self.default_share)
return self.projects.get(prj_id, None)
elif event_type == "PROJECT_ADDED":
project = kwargs.get("project", None)
def getProjects(self):
return self.projects
if not project:
return
def removeProject(self, prj_id):
if prj_id in self.projects:
with self.fs_condition:
del self.projects[prj_id]
self.fs_condition.notifyAll()
share = project.getShare()
def checkUsers(self):
if not self.projects:
if not share or share.getValue() == 0:
share.setValue(self.default_share)
elif event_type == "PROJECT_REMOVED" or\
event_type == "PROJECT_UPDATED":
pass
else:
return
for project in self.projects.values():
k_users = self.keystone_manager.getUsers(prj_id=project.getId())
k_users_ids = [user.getId() for user in k_users]
p_users = project.getUsers()
p_users_ids = [user.getId() for user in p_users]
new_user_ids = list(set(k_users_ids) - set(p_users_ids))
deleted_user_ids = list(set(p_users_ids) - set(k_users_ids))
for id in deleted_user_ids:
LOG.info("deleting user %s" % id)
project.removeUser(id)
for user in k_users:
if user.getId() in new_user_ids:
LOG.info("found new user %s" % user.getName())
date = datetime.utcnow()
data = user.getData()
data["vcpus"] = float(0)
data["memory"] = float(0)
data["actual_memory"] = float(0)
data["actual_vcpus"] = float(0)
data["time_window_from_date"] = date
data["time_window_to_date"] = date
try:
project.addUser(user)
except Exception:
pass
def calculateFairShare(self):
if not self.projects:
return
self._calculateShares()
self._calculateFairShare()
def _calculateShares(self):
total_prj_share = float(0)
total_memory = float(0)
total_vcpus = float(0)
projects = self.project_manager.getProjects()
to_date = datetime.utcnow()
time_window_from_date = to_date
time_window_to_date = to_date
for prj_id, project in self.projects.items():
for user in project.getUsers():
data = user.getData()
data["vcpus"] = float(0)
data["memory"] = float(0)
for period in xrange(self.periods):
decay = self.decay_weight ** period
from_date = to_date - timedelta(days=(self.period_length))
time_window_from_date = from_date
for prj_id, project in self.projects.items():
usages = self.nova_manager.getProjectUsage(
prj_id, from_date, to_date)
for user_id, usage_rec in usages.items():
decay_vcpus = decay * usage_rec["vcpus"]
decay_memory = decay * usage_rec["memory"]
user = project.getUser(id=user_id)
if user:
data = user.getData()
data["vcpus"] += decay_vcpus
data["memory"] += decay_memory
total_vcpus += decay_vcpus
total_memory += decay_memory
to_date = from_date
for project in self.projects.values():
for project in projects.values():
prj_share = project.getShare()
if prj_share.getValue() == 0:
prj_share.setValue(self.default_share)
# check the share for each user and update the usage_record
sibling_share = float(0)
@ -278,15 +168,7 @@ class FairShareManager(Manager):
total_prj_share += prj_share.getValue()
for prj_id, project in self.projects.items():
prj_data = project.getData()
prj_data["actual_memory"] = float(0)
prj_data["actual_vcpus"] = float(0)
prj_data["effective_memory"] = float(0)
prj_data["effective_vcpus"] = float(0)
prj_data["time_window_from_date"] = time_window_from_date
prj_data["time_window_to_date"] = time_window_to_date
for project in projects.values():
prj_share = project.getShare()
prj_share.setSiblingValue(total_prj_share)
prj_share.setNormalizedValue(
@ -300,10 +182,75 @@ class FairShareManager(Manager):
usr_share.getValue() / usr_share.getSiblingValue() *
prj_share.getNormalizedValue())
def _calculateFairShare(self):
projects = self.project_manager.getProjects()
if not projects:
return
total_memory = float(0)
total_vcpus = float(0)
to_date = datetime.utcnow()
time_window_from_date = to_date
time_window_to_date = to_date
for prj_id, project in projects.items():
prj_data = project.getData()
prj_data["actual_vcpus"] = float(0)
prj_data["actual_memory"] = float(0)
prj_data["effective_vcpus"] = float(0)
prj_data["effective_memory"] = float(0)
prj_data["time_window_from_date"] = time_window_from_date
prj_data["time_window_to_date"] = time_window_to_date
for user in project.getUsers():
usr_data = user.getData()
usr_data["vcpus"] = float(0)
usr_data["memory"] = float(0)
usr_data["actual_vcpus"] = float(0)
usr_data["actual_memory"] = float(0)
usr_data["effective_vcpus"] = float(0)
usr_data["effective_memory"] = float(0)
usr_data["actual_rel_vcpus"] = float(0)
usr_data["actual_rel_memory"] = float(0)
usr_data["time_window_from_date"] = time_window_from_date
usr_data["time_window_to_date"] = time_window_to_date
for period in xrange(self.periods):
decay = self.decay_weight ** period
from_date = to_date - timedelta(days=(self.period_length))
time_window_from_date = from_date
for prj_id, project in projects.items():
usages = self.nova_manager.getProjectUsage(
prj_id, from_date, to_date)
for user_id, usage_rec in usages.items():
decay_vcpus = decay * usage_rec["vcpus"]
decay_memory = decay * usage_rec["memory"]
user = project.getUser(id=user_id)
if user:
data = user.getData()
data["vcpus"] += decay_vcpus
data["memory"] += decay_memory
total_vcpus += decay_vcpus
total_memory += decay_memory
to_date = from_date
for prj_id, project in projects.items():
prj_data = project.getData()
prj_data["time_window_to_date"] = time_window_to_date
for user in project.getUsers():
usr_data = user.getData()
usr_data["actual_memory"] = usr_data["memory"]
usr_data["actual_vcpus"] = usr_data["vcpus"]
usr_data["time_window_from_date"] = time_window_from_date
usr_data["time_window_to_date"] = time_window_to_date
if total_memory > 0:
@ -315,7 +262,7 @@ class FairShareManager(Manager):
prj_data["actual_memory"] += usr_data["actual_memory"]
prj_data["actual_vcpus"] += usr_data["actual_vcpus"]
for project in self.projects.values():
for project in projects.values():
prj_data = project.getData()
prj_data["effective_memory"] = prj_data["actual_memory"]
prj_data["effective_vcpus"] = prj_data["actual_vcpus"]
@ -327,10 +274,6 @@ class FairShareManager(Manager):
sibling_share = usr_share.getSiblingValue()
norm_share = usr_share.getNormalizedValue()
usr_data = user.getData()
usr_data["effective_vcpus"] = float(0)
usr_data["effective_memory"] = float(0)
usr_data["actual_rel_vcpus"] = float(0)
usr_data["actual_rel_memory"] = float(0)
if prj_data["actual_vcpus"] > 0:
usr_data["actual_rel_vcpus"] = usr_data["actual_vcpus"]
@ -361,5 +304,3 @@ class FairShareManager(Manager):
usr_priority.setValue(float(self.vcpus_weight) * f_vcpus +
float(self.memory_weight) * f_memory)
LOG.debug("fairshare project %s" % project.serialize())

View File

@ -0,0 +1,354 @@
import logging
from common.project import Project
from oslo_config import cfg
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
from synergy.common.manager import Manager
from synergy.exception import SynergyError
__author__ = "Lisa Zangrando"
__email__ = "lisa.zangrando[AT]pd.infn.it"
__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud
All Rights Reserved
Licensed under the Apache License, Version 2.0;
you may not use this file except in compliance with the
License. You may obtain a copy of the License at:
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express or implied.
See the License for the specific language governing
permissions and limitations under the License."""
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class ProjectManager(Manager):
def __init__(self):
super(ProjectManager, self).__init__(name="ProjectManager")
self.config_opts = [
cfg.IntOpt("default_TTL", default=1440, required=False),
cfg.FloatOpt("default_share", default=10.0, required=False),
cfg.StrOpt("db_connection", help="the DB url", required=True),
cfg.IntOpt('db_pool_size', default=10, required=False),
cfg.IntOpt('db_pool_recycle', default=30, required=False),
cfg.IntOpt('db_max_overflow', default=5, required=False)
]
self.projects = {}
def setup(self):
self.default_TTL = CONF.ProjectManager.default_TTL
self.default_share = CONF.ProjectManager.default_share
db_connection = CONF.ProjectManager.db_connection
pool_size = CONF.ProjectManager.db_pool_size
pool_recycle = CONF.ProjectManager.db_pool_recycle
max_overflow = CONF.ProjectManager.db_max_overflow
try:
self.db_engine = create_engine(db_connection,
pool_size=pool_size,
pool_recycle=pool_recycle,
max_overflow=max_overflow)
except SQLAlchemyError as ex:
LOG.error(ex)
raise ex
self.configured = False
self.keystone_manager = self.getManager("KeystoneManager")
self.createTable()
def task(self):
if not self.configured:
self.buildFromDB()
self.configured = True
def destroy(self):
pass
def execute(self, command, *args, **kargs):
if command == "GET_PROJECTS":
return self.projects.values()
prj_id = kargs.get("id", None)
prj_name = kargs.get("name", None)
project = self.getProject(prj_id, prj_name)
if command == "GET_PROJECT":
if project:
return project
else:
raise SynergyError("project not found!")
elif command == "ADD_PROJECT":
if project:
raise SynergyError("the project id=%s name=%s already exists!"
% (project.getId(), project.getName()))
TTL = kargs.get("TTL", None)
share = kargs.get("share", None)
return self._addProject(prj_id, prj_name, TTL, share)
elif command == "UPDATE_PROJECT":
if not project:
raise SynergyError("project not found!")
TTL = kargs.get("TTL", None)
share = kargs.get("share", None)
return self._updateProject(project, TTL, share)
elif command == "REMOVE_PROJECT":
if not project:
raise SynergyError("project not found!")
self._removeProject(project)
else:
raise SynergyError("command %r not supported!" % command)
def doOnEvent(self, event_type, *args, **kwargs):
if event_type == "identity.role_assignment.created":
usr_id = kwargs.get("user", None)
prj_id = kwargs.get("project", None)
project = self.getProject(id=prj_id)
if project and not project.getUser(id=usr_id):
user = self.keystone_manager.getUser(usr_id)
if user:
project.addUser(user)
self.notify(event_type="USER_ADDED", user=user)
elif event_type == "identity.role_assignment.deleted":
usr_id = kwargs.get("user", None)
prj_id = kwargs.get("project", None)
project = self.getProject(id=prj_id)
if project:
project.removeUser(usr_id)
self.notify(event_type="USER_REMOVED", user=user)
elif event_type == "identity.user.deleted":
user_id = kwargs.get("resource_info", None)
for project in self.projects.values():
try:
user = project.getUser(id=user_id)
if user:
project.removeUser(user_id)
self.notify(event_type="USER_DELETED", user=user)
except SynergyError as ex:
LOG.info(ex)
def _parseNumber(self, value, default=None):
if not value:
return default
try:
return int(value)
except SynergyError:
if default:
return default
raise SynergyError("%r is not a number!" % str(value))
def _addProject(self, prj_id, prj_name, TTL, share):
if prj_id:
project = self.keystone_manager.getProject(prj_id)
elif prj_name:
projects = self.keystone_manager.getProjects(name=prj_name)
if len(projects) > 1:
raise SynergyError("ambiguity: found %s projects having %r"
" as name" % (len(projects), prj_name))
if projects:
project = projects[0]
else:
raise SynergyError("missing project attributes")
if not project:
raise SynergyError("project not found!")
prj_TTL = self._parseNumber(TTL, default=self.default_TTL)
prj_share = self._parseNumber(share, 0)
project.setTTL(prj_TTL)
project.getShare().setValue(prj_share)
QUERY = "insert into project (id, name, share, TTL) " \
"values (%s, %s, %s, %s)"
connection = self.db_engine.connect()
trans = connection.begin()
try:
connection.execute(
QUERY, [project.getId(), project.getName(),
project.getShare().getValue(), project.getTTL()])
trans.commit()
except SQLAlchemyError as ex:
trans.rollback()
if "Duplicate entry" in ex.message:
raise SynergyError("the project id=%s name=%s already exists!"
% (project.getId(), project.getName()))
else:
raise(ex)
finally:
connection.close()
self.projects[project.getId()] = project
LOG.info("added project %s" % project.getName())
self.notify(event_type="PROJECT_ADDED", project=project)
return project
def _updateProject(self, project, TTL, share):
if not project:
return
TTL = self._parseNumber(TTL)
if TTL:
TTL = self._parseNumber(TTL)
if TTL <= 0:
raise SynergyError("wrong TTL value: %s <= 0" % TTL)
project.setTTL(TTL)
share = self._parseNumber(share)
if share:
if share <= 0:
raise SynergyError("wrong share value: %s <= 0" % share)
project.getShare().setValue(share)
connection = self.db_engine.connect()
trans = connection.begin()
try:
QUERY = "update project set share=%s, TTL=%s where id=%s"
connection.execute(QUERY, [project.getShare().getValue(),
project.getTTL(),
project.getId()])
trans.commit()
except SQLAlchemyError as ex:
trans.rollback()
raise SynergyError(ex.message)
finally:
connection.close()
self.notify(event_type="PROJECT_UPDATED", project=project)
def _removeProject(self, project):
if project.getId() not in self.projects.keys():
raise SynergyError("project %s not found!" % project.getId())
self.projects.pop(project.getId())
connection = self.db_engine.connect()
trans = connection.begin()
try:
QUERY = "delete from project where id=%s"
connection.execute(QUERY, [project.getId()])
trans.commit()
except SQLAlchemyError as ex:
trans.rollback()
raise SynergyError(ex.message)
finally:
connection.close()
LOG.info("removed project %s" % project.getName())
self.notify(event_type="PROJECT_REMOVED", project=project)
def getProject(self, id=None, name=None):
if not id and not name:
raise SynergyError("please define the project id or its name!")
project = None
if id:
project = self.projects.get(id, None)
elif name:
for prj in self.projects.values():
if name == prj.getName():
project = prj
break
return project
def getProjects(self):
return self.projects
def createTable(self):
TABLE = """CREATE TABLE IF NOT EXISTS project (`id` VARCHAR(64) \
NOT NULL PRIMARY KEY, name VARCHAR(64), share INT DEFAULT 0, TTL INT DEFAULT \
1440) ENGINE=InnoDB"""
connection = self.db_engine.connect()
try:
connection.execute(TABLE)
except SQLAlchemyError as ex:
raise SynergyError(ex.message)
except Exception as ex:
raise SynergyError(ex.message)
finally:
connection.close()
def buildFromDB(self):
connection = self.db_engine.connect()
try:
QUERY = "select id, name, share, TTL from project"
result = connection.execute(QUERY)
for row in result:
project = Project()
project.setId(row[0])
project.setName(row[1])
project.getShare().setValue(row[2])
project.setTTL(row[3])
project.setId(row[0])
project_id = project.getId()
try:
k_project = self.keystone_manager.getProject(project_id)
if not k_project:
self.removeProject(project)
continue
users = self.keystone_manager.getUsers(prj_id=project_id)
for user in users:
project.addUser(user)
self.projects[project.getId()] = project
self.notify(event_type="PROJECT_ADDED", project=project)
except SynergyError as ex:
LOG.info("the project %s seems not to exist anymore! "
"(reason=%s)" % (project.getName(), ex.message))
self.removeProject(project)
except SQLAlchemyError as ex:
raise SynergyError(ex.message)
finally:
connection.close()

View File

@ -4,6 +4,7 @@ import logging
from common.quota import SharedQuota
from oslo_config import cfg
from synergy.common.manager import Manager
from synergy.exception import SynergyError
__author__ = "Lisa Zangrando"
@ -39,152 +40,143 @@ class QuotaManager(Manager):
self.projects = {}
if self.getManager("NovaManager") is None:
raise Exception("NovaManager not found!")
raise SynergyError("NovaManager not found!")
if self.getManager("KeystoneManager") is None:
raise Exception("KeystoneManager not found!")
raise SynergyError("KeystoneManager not found!")
if self.getManager("ProjectManager") is None:
raise SynergyError("ProjectManager not found!")
self.nova_manager = self.getManager("NovaManager")
self.keystone_manager = self.getManager("KeystoneManager")
self.listener = None
self.project_manager = self.getManager("ProjectManager")
def destroy(self):
LOG.info("destroy invoked!")
SharedQuota.disable()
def execute(self, command, *args, **kargs):
if command == "show":
prj_id = kargs.get("project_id", None)
prj_name = kargs.get("project_name", None)
all_projects = kargs.get("all_projects", None)
if prj_id:
project = self.projects.get(prj_id, None)
if command == "GET_PRIVATE_QUOTA":
prj_id = kargs.get("id", None)
prj_name = kargs.get("name", None)
project = self.project_manager.getProject(prj_id, prj_name)
if project:
return project.getQuota()
else:
raise SynergyError("project not found!")
if project:
return project
raise Exception("project (id=%r) not found!" % prj_id)
elif prj_name:
for project in self.projects.values():
if prj_name == project.getName():
return project
raise Exception("project (name=%r) not found!" % prj_name)
elif all_projects:
return self.projects.values()
else:
return SharedQuota()
elif command == "GET_SHARED_QUOTA":
return SharedQuota()
elif command == "GET_PROJECTS":
return self.projects.values()
elif command == "GET_PROJECT":
return self.getProject(*args, **kargs)
else:
raise Exception("command=%r not supported!" % command)
raise SynergyError("command %r not supported!" % command)
def task(self):
try:
self.updateSharedQuota()
self.deleteExpiredServers()
except Exception as ex:
except SynergyError as ex:
LOG.error(ex)
def getProject(self, prj_id):
return self.projects.get(prj_id, None)
def doOnEvent(self, event_type, *args, **kwargs):
if event_type == "PROJECT_ADDED":
project = kwargs.get("project", None)
def getProjects(self):
return self.projects
if not project:
return
def addProject(self, project):
if self.projects.get(project.getId(), None):
raise Exception("project %r already exists!" % (project.getId()))
try:
quota = self.nova_manager.getQuota(project.getId())
if quota.getSize("vcpus") > 0 and \
quota.getSize("memory") > 0 and \
quota.getSize("instances") > 0:
self.nova_manager.updateQuota(quota, is_class=True)
quota.setSize("vcpus", -1)
quota.setSize("memory", -1)
quota.setSize("instances", -1)
self.nova_manager.updateQuota(quota)
class_quota = self.nova_manager.getQuota(
project.getId(), is_class=True)
quota = project.getQuota()
quota.setId(project.getId())
quota.setSize("vcpus", class_quota.getSize("vcpus"))
quota.setSize("memory", class_quota.getSize("memory"))
quota.setSize("instances", class_quota.getSize("instances"))
quota.setSize(
"vcpus", SharedQuota.getSize("vcpus"), private=False)
quota.setSize(
"memory", SharedQuota.getSize("memory"), private=False)
quota.setSize(
"instances", SharedQuota.getSize("instances"),
private=False)
servers = self.nova_manager.getProjectServers(project.getId())
for server in servers:
if server.getState() != "building":
try:
quota.allocate(server)
except SynergyError as ex:
fl = server.getFlavor()
vcpus_size = quota.getSize("vcpus") + fl.getVCPUs()
mem_size = quota.getSize("memory") + fl.getMemory()
quota.setSize("vcpus", vcpus_size)
quota.setSize("memory", mem_size)
self.nova_manager.updateQuota(quota, is_class=True)
LOG.warn("private quota autoresized (vcpus=%s, "
"memory=%s) for project %r (id=%s)"
% (quota.getSize("vcpus"),
quota.getSize("memory"),
project.getName(),
project.getId()))
quota.allocate(server)
self.projects[project.getId()] = project
except SynergyError as ex:
LOG.error(ex)
raise ex
elif event_type == "PROJECT_REMOVED":
project = kwargs.get("project", None)
if not project:
return
try:
quota = self.nova_manager.getQuota(project.getId())
if quota.getSize("vcpus") > 0 and \
quota.getSize("memory") > 0 and \
quota.getSize("instances") > 0:
if quota.getSize("vcpus") <= -1 and \
quota.getSize("memory") <= -1 and \
quota.getSize("instances") <= -1:
self.nova_manager.updateQuota(quota, is_class=True)
quota.setSize("vcpus", -1)
quota.setSize("memory", -1)
quota.setSize("instances", -1)
self.nova_manager.updateQuota(quota)
class_quota = self.nova_manager.getQuota(
project.getId(), is_class=True)
qc = self.nova_manager.getQuota(project.getId(), is_class=True)
self.nova_manager.updateQuota(qc)
quota = project.getQuota()
quota.setId(project.getId())
quota.setSize("vcpus", class_quota.getSize("vcpus"))
quota.setSize("memory", class_quota.getSize("memory"))
quota.setSize("instances", class_quota.getSize("instances"))
quota.setSize(
"vcpus", SharedQuota.getSize("vcpus"), private=False)
quota.setSize(
"memory", SharedQuota.getSize("memory"), private=False)
quota.setSize(
"instances", SharedQuota.getSize("instances"), private=False)
servers = self.nova_manager.getProjectServers(project.getId())
for server in servers:
if server.getState() != "building":
try:
quota.allocate(server)
except Exception as ex:
flavor = server.getFlavor()
vcpus_size = quota.getSize("vcpus") + flavor.getVCPUs()
mem_size = quota.getSize("memory") + flavor.getMemory()
quota.setSize("vcpus", vcpus_size)
quota.setSize("memory", mem_size)
self.nova_manager.updateQuota(quota, is_class=True)
LOG.warn("private quota autoresized (vcpus=%s, "
"memory=%s) for project %r (id=%s)"
% (quota.getSize("vcpus"),
quota.getSize("memory"),
project.getName(),
project.getId()))
quota.allocate(server)
self.projects[project.getId()] = project
except Exception as ex:
LOG.error(ex)
raise ex
def removeProject(self, project, destroy=False):
project = self.projects[project.getId()]
if project is None:
return
try:
if destroy:
quota = project.getQuota()
ids = []
ids.extend(quota.getServers("active", private=False))
ids.extend(quota.getServers("pending", private=False))
ids.extend(quota.getServers("error", private=False))
ids = []
ids.extend(quota.getServers("active", private=False))
ids.extend(quota.getServers("building", private=False))
ids.extend(quota.getServers("error", private=False))
try:
for server_id in ids:
self.nova_manager.deleteServer(server_id)
del self.projects[project.getId()]
except Exception as ex:
LOG.error(ex)
raise ex
except SynergyError as ex:
LOG.error(ex)
raise ex
def deleteExpiredServers(self):
for prj_id, project in self.getProjects().items():
for prj_id, project in self.project_manager.getProjects().items():
TTL = project.getTTL()
quota = project.getQuota()
@ -217,7 +209,7 @@ class QuotaManager(Manager):
% (uuid, TTL, state, prj_id))
self.nova_manager.deleteServer(server)
except Exception as ex:
except SynergyError as ex:
LOG.error(ex)
raise ex
@ -252,7 +244,7 @@ class QuotaManager(Manager):
domain = self.keystone_manager.getDomains(name="default")
if not domain:
raise Exception("domain 'default' not found!")
raise SynergyError("domain 'default' not found!")
domain = domain[0]
dom_id = domain.getId()
@ -260,7 +252,7 @@ class QuotaManager(Manager):
kprojects = self.keystone_manager.getProjects(domain_id=dom_id)
for kproject in kprojects:
project = self.getProject(kproject.getId())
project = self.project_manager.getProject(id=kproject.getId())
if project:
quota = self.nova_manager.getQuota(project.getId(),
@ -298,7 +290,7 @@ class QuotaManager(Manager):
enabled = False
if total_vcpus < static_vcpus:
if self.getProjects():
if self.project_manager.getProjects():
LOG.warn("shared quota: the total statically "
"allocated vcpus (%s) is greater than the "
"total amount of vcpus allowed (%s)"
@ -307,7 +299,7 @@ class QuotaManager(Manager):
shared_vcpus = total_vcpus - static_vcpus
if total_memory < static_memory:
if self.getProjects():
if self.project_manager.getProjects():
LOG.warn("shared quota: the total statically "
"allocated memory (%s) is greater than "
"the total amount of memory allowed (%s)"
@ -330,10 +322,10 @@ class QuotaManager(Manager):
SharedQuota.setSize("vcpus", 0)
SharedQuota.setSize("memory", 0)
for project in self.getProjects().values():
for project in self.project_manager.getProjects().values():
quota = project.getQuota()
quota.setSize("vcpus", shared_vcpus, private=False)
quota.setSize("memory", shared_memory, private=False)
except Exception as ex:
except SynergyError as ex:
LOG.error(ex)
raise ex

View File

@ -1,5 +1,4 @@
import logging
import re
import logging
from common.flavor import Flavor
from common.quota import SharedQuota
@ -7,6 +6,7 @@ from common.request import Request
from common.server import Server
from oslo_config import cfg
from synergy.common.manager import Manager
from synergy.exception import SynergyError
from threading import Thread
@ -153,7 +153,7 @@ class Notifications(object):
class Worker(Thread):
def __init__(self, name, queue, projects, nova_manager,
def __init__(self, name, queue, project_manager, nova_manager,
keystone_manager, backfill_depth=100):
super(Worker, self).__init__()
self.setDaemon(True)
@ -161,7 +161,7 @@ class Worker(Thread):
self.name = name
self.backfill_depth = backfill_depth
self.queue = queue
self.projects = projects
self.project_manager = project_manager
self.nova_manager = nova_manager
self.keystone_manager = keystone_manager
self.exit = False
@ -175,7 +175,7 @@ class Worker(Thread):
self.queue.close()
self.exit = True
except Exception as ex:
except SynergyError as ex:
LOG.error(ex)
raise ex
@ -223,14 +223,19 @@ class Worker(Thread):
# or server["OS-EXT-STS:task_state"] != "scheduling":
self.queue.deleteItem(queue_item)
continue
except Exception as ex:
except SynergyError as ex:
LOG.warn("the server %s is not anymore available!"
"(reason=%s)" % (server_id, ex))
self.queue.deleteItem(queue_item)
" (reason=%s)" % (server_id, ex))
self.queue.delete(queue_item)
continue
quota = self.projects[prj_id].getQuota()
project = self.project_manager.getProject(id=prj_id)
if not project:
raise SynergyError("project %r not found!" % prj_id)
quota = project.getQuota()
blocking = False
if server.isEphemeral() and not SharedQuota.isEnabled():
@ -246,7 +251,7 @@ class Worker(Thread):
context["auth_token"] = token.getId()
context["user_id"] = token.getUser().getId()
except Exception as ex:
except SynergyError as ex:
LOG.error("error on getting the token for server "
"%s (reason=%s)" % (server.getId(), ex))
raise ex
@ -258,7 +263,7 @@ class Worker(Thread):
"ta=shared)" % (server_id, user_id, prj_id))
found = True
except Exception as ex:
except SynergyError as ex:
LOG.error("error on building the server %s (reason=%s)"
% (server.getId(), ex))
@ -270,7 +275,7 @@ class Worker(Thread):
else:
queue_items.append(queue_item)
except Exception as ex:
except SynergyError as ex:
LOG.error("Exception has occured", exc_info=1)
LOG.error("Worker %s: %s" % (self.name, ex))
@ -287,35 +292,34 @@ class SchedulerManager(Manager):
self.config_opts = [
cfg.StrOpt("notification_topic", default="notifications"),
cfg.IntOpt("backfill_depth", default=100),
cfg.FloatOpt("default_TTL", default=10.0),
cfg.ListOpt("projects", default=[], help="the projects list"),
cfg.ListOpt("shares", default=[], help="the shares list"),
cfg.ListOpt("TTLs", default=[], help="the TTLs list")
]
self.workers = []
def setup(self):
if self.getManager("NovaManager") is None:
raise Exception("NovaManager not found!")
raise SynergyError("NovaManager not found!")
if self.getManager("QueueManager") is None:
raise Exception("QueueManager not found!")
raise SynergyError("QueueManager not found!")
if self.getManager("QuotaManager") is None:
raise Exception("QuotaManager not found!")
raise SynergyError("QuotaManager not found!")
if self.getManager("KeystoneManager") is None:
raise Exception("KeystoneManager not found!")
raise SynergyError("KeystoneManager not found!")
if self.getManager("FairShareManager") is None:
raise Exception("FairShareManager not found!")
raise SynergyError("FairShareManager not found!")
if self.getManager("ProjectManager") is None:
raise SynergyError("ProjectManager not found!")
self.nova_manager = self.getManager("NovaManager")
self.queue_manager = self.getManager("QueueManager")
self.quota_manager = self.getManager("QuotaManager")
self.keystone_manager = self.getManager("KeystoneManager")
self.fairshare_manager = self.getManager("FairShareManager")
self.default_TTL = float(CONF.SchedulerManager.default_TTL)
self.project_manager = self.getManager("ProjectManager")
self.backfill_depth = CONF.SchedulerManager.backfill_depth
self.notification_topic = CONF.SchedulerManager.notification_topic
self.projects = {}
@ -323,129 +327,18 @@ class SchedulerManager(Manager):
self.exit = False
self.configured = False
def parseAttribute(self, attribute):
if attribute is None:
return None
parsed_attribute = re.split('=', attribute)
if len(parsed_attribute) > 1:
if not parsed_attribute[-1].isdigit():
raise Exception("wrong value %r found in %r!"
% (parsed_attribute[-1], parsed_attribute))
if len(parsed_attribute) == 2:
prj_name = parsed_attribute[0]
value = float(parsed_attribute[1])
else:
raise Exception("wrong attribute definition: %r"
% parsed_attribute)
else:
raise Exception("wrong attribute definition: %r"
% parsed_attribute)
return (prj_name, value)
def execute(self, command, *args, **kargs):
if command == "show":
usr_id = kargs.get("user_id", None)
usr_name = kargs.get("user_name", None)
all_users = kargs.get("all_users", False)
all_projects = kargs.get("all_projects", False)
prj_id = kargs.get("project_id", None)
prj_name = kargs.get("project_name", None)
project = None
if all_projects:
return self.projects.values()
if (usr_id is not None or usr_name is not None or all_users) and \
prj_id is None and prj_name is None:
raise Exception("project id or name not defined!")
if prj_id:
project = self.projects.get(prj_id, None)
if not project:
raise Exception("project %s not found!" % prj_id)
elif prj_name:
for prj in self.projects.values():
if prj_name == prj.getName():
project = prj
break
if not project:
raise Exception("project %r not found!" % prj_name)
elif not all_users:
return self.projects.values()
if usr_id or usr_name:
return project.getUser(id=usr_id, name=usr_name)
elif all_users:
return project.getUsers()
else:
return project
else:
raise Exception("command=%r not supported!" % command)
raise SynergyError("command %r not supported!" % command)
def task(self):
if self.configured:
return
domain = self.keystone_manager.getDomains(name="default")
if not domain:
raise Exception("domain 'default' not found!")
domain = domain[0]
dom_id = domain.getId()
for project in self.keystone_manager.getProjects(domain_id=dom_id):
if project.getName() in CONF.SchedulerManager.projects:
CONF.SchedulerManager.projects.remove(project.getName())
project.setTTL(self.default_TTL)
self.projects[project.getName()] = project
else:
quota = self.nova_manager.getQuota(project.getId())
if quota.getSize("vcpus") <= -1 and \
quota.getSize("memory") <= -1 and \
quota.getSize("instances") <= -1:
qc = self.nova_manager.getQuota(project.getId(),
is_class=True)
self.nova_manager.updateQuota(qc)
if len(CONF.SchedulerManager.projects) > 0:
raise Exception("projects %s not found, please check the syn"
"ergy.conf" % CONF.SchedulerManager.projects)
self.quota_manager.updateSharedQuota()
for prj_ttl in CONF.SchedulerManager.TTLs:
prj_name, TTL = self.parseAttribute(prj_ttl)
self.projects[prj_name].setTTL(TTL)
for prj_share in CONF.SchedulerManager.shares:
prj_name, share_value = self.parseAttribute(prj_share)
p_share = self.projects[prj_name].getShare()
p_share.setValue(share_value)
for prj_name, project in self.projects.items():
del self.projects[prj_name]
self.projects[project.getId()] = project
self.quota_manager.addProject(project)
self.fairshare_manager.addProject(project)
self.quota_manager.updateSharedQuota()
self.fairshare_manager.checkUsers()
self.fairshare_manager.calculateFairShare()
try:
self.dynamic_queue = self.queue_manager.createQueue("DYNAMIC")
except Exception as ex:
except SynergyError as ex:
LOG.error("Exception has occured", exc_info=1)
LOG.error(ex)
@ -453,7 +346,7 @@ class SchedulerManager(Manager):
dynamic_worker = Worker("DYNAMIC",
self.dynamic_queue,
self.projects,
self.project_manager,
self.nova_manager,
self.keystone_manager,
self.backfill_depth)
@ -482,11 +375,10 @@ class SchedulerManager(Manager):
def processRequest(self, request):
server = request.getServer()
try:
if request.getProjectId() in self.projects:
self.nova_manager.setQuotaTypeServer(server)
project = self.project_manager.getProject(id=request.getProjectId())
project = self.projects[request.getProjectId()]
try:
if project:
quota = project.getQuota()
retry = request.getRetry()
num_attempts = 0
@ -565,6 +457,6 @@ class SchedulerManager(Manager):
priority))
else:
self.nova_manager.buildServer(request)
except Exception as ex:
except SynergyError as ex:
LOG.error("Exception has occured", exc_info=1)
LOG.error(ex)

View File

@ -17,9 +17,7 @@ from mock import patch
from synergy_scheduler_manager.common.project import Project
from synergy_scheduler_manager.common.user import User
from synergy_scheduler_manager.fairshare_manager import FairShareManager
from synergy_scheduler_manager.keystone_manager import KeystoneManager
from synergy_scheduler_manager.queue_manager import QueueManager
from synergy_scheduler_manager.quota_manager import QuotaManager
from synergy_scheduler_manager.project_manager import ProjectManager
from synergy_scheduler_manager.tests.unit import base
@ -27,92 +25,32 @@ class TestFairshareManager(base.TestCase):
def setUp(self):
super(TestFairshareManager, self).setUp()
self.fsmanager = FairShareManager()
self.fairshare_manager = FairShareManager()
self.project_manager = ProjectManager()
# NOTE(vincent): we cannot import NovaManager in our tests.
# NovaManager depends on the "nova" package (not novaclient), but it is
# not available on PyPI so the test runner will fail to install it.
nova_manager_mock = MagicMock()
self.fsmanager.managers = {
self.fairshare_manager.managers = {
'NovaManager': nova_manager_mock(),
'QueueManager': QueueManager(),
'QuotaManager': QuotaManager(),
'KeystoneManager': KeystoneManager()}
'ProjectManager': self.project_manager}
# Mock the configuration since it is initiliazed by synergy-service.
with patch('synergy_scheduler_manager.fairshare_manager.CONF'):
self.fsmanager.setup()
def test_add_project(self):
project = Project()
project.setId(1)
project.setName("test_project")
prj_share = project.getShare()
prj_share.setValue(5)
self.fsmanager.addProject(project)
self.assertEqual(1, self.fsmanager.projects[1].getId())
self.assertEqual("test_project", self.fsmanager.projects[1].getName())
self.assertEqual([], self.fsmanager.projects[1].getUsers())
self.assertEqual(5, self.fsmanager.projects[1].getShare().getValue())
def test_add_project_no_share(self):
project = Project()
project.setId(1)
project.setName("test_project")
self.fsmanager.addProject(project)
self.assertEqual(1, self.fsmanager.projects[1].getId())
self.assertEqual("test_project", self.fsmanager.projects[1].getName())
self.assertEqual([], self.fsmanager.projects[1].getUsers())
self.assertEqual(self.fsmanager.default_share,
self.fsmanager.projects[1].getShare().getValue())
def test_get_project(self):
project = Project()
project.setId(1)
project.setName("test_project")
self.fsmanager.addProject(project)
self.assertEqual(project, self.fsmanager.getProject(1))
def test_get_projects(self):
project1 = Project()
project1.setId(1)
project1.setName("test1")
self.fsmanager.addProject(project1)
project2 = Project()
project2.setId(2)
project2.setName("test2")
self.fsmanager.addProject(project2)
expected_projects = {
1: project1,
2: project2}
self.assertEqual(expected_projects, self.fsmanager.getProjects())
def test_remove_project(self):
project = Project()
project.setId(1)
project.setName("test_project")
self.fsmanager.addProject(project)
self.assertIn(1, self.fsmanager.projects)
self.fsmanager.removeProject(1)
self.assertNotIn(1, self.fsmanager.projects)
self.fairshare_manager.setup()
def test_calculate_priority_one_user(self):
# self.fsmanager.addProject(prj_id=1, prj_name="test")
# self.fairshare_manager.addProject(prj_id=1, prj_name="test")
project = Project()
project.setId(1)
project.setName("test_project")
# Define values used for computing the priority
age_weight = self.fsmanager.age_weight = 1.0
vcpus_weight = self.fsmanager.vcpus_weight = 2.0
memory_weight = self.fsmanager.memory_weight = 3.0
age_weight = self.fairshare_manager.age_weight = 1.0
vcpus_weight = self.fairshare_manager.vcpus_weight = 2.0
memory_weight = self.fairshare_manager.memory_weight = 3.0
datetime_start = datetime(year=2000, month=1, day=1, hour=0, minute=0)
datetime_stop = datetime(year=2000, month=1, day=1, hour=2, minute=0)
minutes = (datetime_stop - datetime_start).seconds / 60
@ -128,7 +66,7 @@ class TestFairshareManager(base.TestCase):
priority.setFairShare('memory', fairshare_ram)
project.addUser(user)
self.fsmanager.addProject(project)
self.project_manager.projects[project.getId()] = project
# Compute the expected priority given the previously defined values
expected_priority = int(age_weight * minutes +
@ -138,7 +76,8 @@ class TestFairshareManager(base.TestCase):
with patch("synergy_scheduler_manager.fairshare_manager.datetime") \
as datetime_mock:
datetime_mock.utcnow.side_effect = (datetime_start, datetime_stop)
priority = self.fsmanager.calculatePriority(user_id=22, prj_id=1)
priority = self.fairshare_manager.calculatePriority(
user.getId(), project.getId())
self.assertEqual(expected_priority, priority)

View File

@ -19,6 +19,7 @@ from synergy_scheduler_manager.common.queue import QueueDB
from synergy_scheduler_manager.common.queue import QueueItem
from synergy_scheduler_manager.common.quota import SharedQuota
from synergy_scheduler_manager.common.server import Server
from synergy_scheduler_manager.project_manager import ProjectManager
from synergy_scheduler_manager.scheduler_manager import Notifications
from synergy_scheduler_manager.scheduler_manager import Worker
from synergy_scheduler_manager.tests.unit import base
@ -130,19 +131,36 @@ class TestWorker(base.TestCase):
self.keystone_manager_mock = MagicMock()
db_engine_mock = create_autospec(Engine)
project1 = Project()
project1.setId("1")
project1.setName("test1")
def my_side_effect(*args, **kwargs):
project1 = Project()
project1.setId(1)
project1.setName("test_project1")
project1.getShare().setValue(5)
project2 = Project()
project2.setId("2")
project2.setName("test2")
project2 = Project()
project2.setId(2)
project2.setName("test_project2")
project2.getShare().setValue(55)
if args[0] == 1:
return project1
elif args[0] == 2:
return project2
keystone_manager = MagicMock()
keystone_manager.getProject.side_effect = my_side_effect
self.project_manager = ProjectManager()
self.project_manager.db_engine = MagicMock()
self.project_manager.keystone_manager = keystone_manager
self.project_manager.default_TTL = 10
self.project_manager.default_share = 30
self.project_manager._addProject(1, "test_project1", 10, 50)
projects_list = {'1': project1, '2': project2}
self.worker = Worker(
name="test",
queue=QueueDB("testq", db_engine_mock),
projects=projects_list,
project_manager=self.project_manager,
nova_manager=self.nova_manager_mock,
keystone_manager=self.keystone_manager_mock)
@ -181,11 +199,11 @@ class TestWorker(base.TestCase):
nova_exec = self.nova_manager_mock.execute
nova_exec.side_effect = nova_exec_side_effect
project = self.project_manager.getProject(id=1)
# Mock quota allocation
quota_allocate_mock = create_autospec(
self.worker.projects['1'].getQuota().allocate)
quota_allocate_mock = create_autospec(project.getQuota().allocate)
quota_allocate_mock.return_value = True
self.worker.projects['1'].getQuota().allocate = quota_allocate_mock
project.getQuota().allocate = quota_allocate_mock
# Delete item from the queue
delete_item_mock = create_autospec(self.worker.queue.deleteItem)