Synergy should scale up the oldest user requests from the queue

The oldest requests should be processed quite first as soon as
the user priority increases. This commit fixes the issue.

BUG: #1717464
Change-Id: Iafc74531775d00aeb653cc92092b2bc7775f52d8
Sem-Ver: bugfix
This commit is contained in:
Lisa Zangrando 2017-09-15 10:41:29 +02:00
parent a45e054ec9
commit 56c8d155ac
9 changed files with 167 additions and 196 deletions

View File

@ -1,5 +1,6 @@
import heapq
import json
import Queue as queue
import threading
from datetime import datetime
from sqlalchemy.exc import SQLAlchemyError
@ -95,13 +96,7 @@ class Queue(SynergyObject):
def __init__(self, name="default", type="PRIORITY", db_engine=None):
super(Queue, self).__init__()
if type == "FIFO":
self.queue = queue.Queue()
elif type == "LIFO":
self.queue = queue.LifoQueue()
elif type == "PRIORITY":
self.queue = queue.PriorityQueue()
else:
if type not in ["FIFO", "LIFO", "PRIORITY"]:
raise SynergyError("queue type %r not supported" % type)
self.set("type", type)
@ -109,11 +104,13 @@ class Queue(SynergyObject):
self.set("size", 0)
self.setName(name)
self.db_engine = db_engine
self._items = []
self.condition = threading.Condition()
self._createTable()
self._buildFromDB()
def _setSize(self, value):
def _incSize(self, value):
size = self.get("size")
self.set("size", size + value)
@ -127,12 +124,12 @@ class Queue(SynergyObject):
self.set("is_closed", is_closed)
def isEmpty(self):
return self.queue.empty()
return len(self._items) == 0
def close(self):
self.setClosed(True)
def enqueue(self, user, data, priority=0):
def enqueue(self, user, data):
if self.isClosed():
raise SynergyError("the queue is closed!")
@ -145,33 +142,47 @@ class Queue(SynergyObject):
item = QueueItem()
item.setUserId(user.getId())
item.setProjectId(user.getProjectId())
item.setPriority(priority)
item.setPriority(user.getPriority().getValue())
item.setData(data)
self._insertItemDB(item)
with self.condition:
if self.getType() == "FIFO":
self._items.append(item)
elif self.getType() == "LIFO":
self._items.insert(0, item)
elif self.getType() == "PRIORITY":
heapq.heappush(self._items, (-item.getPriority(),
item.getCreationTime(), item))
if self.getType() == "PRIORITY":
self.queue.put((-priority, item.getCreationTime(), item))
else:
self.queue.put(item)
self._setSize(1)
self._insertItemDB(item)
self._incSize(1)
self.condition.notifyAll()
def dequeue(self, block=True, timeout=None, delete=False):
if self.isClosed():
raise SynergyError("the queue is closed!")
if self.queue.empty() and not block:
return None
item = None
item = self.queue.get(block=block, timeout=timeout)
with self.condition:
while (item is None and not self.isClosed()):
if not self._items:
if block:
self.condition.wait(timeout)
if timeout:
break
else:
break
elif self.getType() == "PRIORITY":
item = heapq.heappop(self._items)[2]
else:
item = self._items.pop(0)
self.condition.notifyAll()
if not item:
return None
if self.getType() == "PRIORITY":
item = item[2]
self._getItemDataDB(item)
if delete:
@ -183,12 +194,44 @@ class Queue(SynergyObject):
if self.isClosed():
raise SynergyError("the queue is closed!")
if self.getType() == "PRIORITY":
self.queue.put((-item.getPriority(), item.getCreationTime(), item))
else:
self.queue.put(item)
with self.condition:
if self.getType() == "FIFO":
self._items.append(item)
elif self.getType() == "LIFO":
self._items.insert(0, item)
elif self.getType() == "PRIORITY":
heapq.heappush(self._items, (-item.getPriority(),
item.getCreationTime(), item))
self._updateItemDB(item)
self._updateItemDB(item)
self._incSize(1)
self.condition.notifyAll()
def updatePriority(self, user):
if self.isClosed():
raise SynergyError("the queue is closed!")
if self.getType() != "PRIORITY":
raise SynergyError("updatePriority() cannot be applied on this "
"queue type")
new_items = []
with self.condition:
while self._items:
item = heapq.heappop(self._items)[2]
if item.getUserId() == user.getId() and\
item.getProjectId() == user.getProjectId():
item.setPriority(user.getPriority().getValue())
new_items.append(item)
for item in new_items:
heapq.heappush(self._items, (-item.getPriority(),
item.getCreationTime(), item))
self._incSize(1)
self.condition.notifyAll()
def getType(self):
return self.get("type")
@ -230,15 +273,13 @@ class Queue(SynergyObject):
connection.execute(QUERY, [item.getId()])
trans.commit()
self._incSize(-1)
except SQLAlchemyError as ex:
trans.rollback()
raise SynergyError(ex.message)
finally:
connection.close()
self._setSize(-1)
self.queue.task_done()
def _createTable(self):
if not self.db_engine:
return
@ -282,7 +323,7 @@ TIMESTAMP NULL, `data` TEXT NOT NULL) ENGINE=InnoDB""" % self.getName()
item.setLastUpdate(row[6])
self.restore(item)
self._setSize(1)
self._incSize(1)
except SQLAlchemyError as ex:
raise SynergyError(ex.message)
finally:

View File

@ -1,5 +1,3 @@
import utils
from datetime import datetime
from flavor import Flavor
from server import Server
@ -109,37 +107,19 @@ class Request(object):
server.setProjectId(instance_data["project_id"])
server.setCreatedAt(instance_data["created_at"])
server.setMetadata(instance_data["metadata"])
server.setUserData(instance_data["user_data"])
server.setKeyName(instance_data["key_name"])
server.setType()
user_data = instance_data.get("user_data", None)
if user_data:
try:
data = utils.decodeBase64(user_data)
quota = utils.getConfigParameter(data, "quota", "synergy")
if not quota:
quota = utils.getConfigParameter(data, "quota")
metadata = instance_data.get("metadata", {})
if quota is None or quota == "private" or quota != "shared":
server.setType("permanent")
metadata["quota"] = "private"
elif quota == "shared":
server.setType("ephemeral")
metadata["quota"] = "shared"
except Exception:
server.setType("permanent")
metadata["quota"] = "private"
request.server = server
if "filter_properties" in request.data:
filter_properties = request.data["filter_properties"]
request.retry = filter_properties["retry"]
request.retry = filter_properties.get("retry", {})
else:
request_spec = request.data["request_specs"][0]
nova_object = request_spec["nova_object.data"]
request.retry = nova_object["retry"]
request.retry = nova_object.get("retry", {})
if not request.retry:
request.retry = {}

View File

@ -1,3 +1,5 @@
import logging
import re
import utils
from datetime import datetime
@ -23,6 +25,9 @@ See the License for the specific language governing
permissions and limitations under the License."""
LOG = logging.getLogger(__name__)
class Server(SynergyObject):
def __init__(self):
@ -45,8 +50,35 @@ class Server(SynergyObject):
def getType(self):
return self.get("type")
def setType(self, type):
self.set("type", type)
def setType(self, type=None):
if type:
self.set("type", type)
return
metadata = self.get("metadata")
userdata = self.get("userdata")
if "quota" in metadata:
if metadata["quota"] == "shared":
self.set("type", "ephemeral")
elif userdata:
try:
data = userdata.splitlines()
keyValRegEx = re.compile(r'^\s*(quota)\s*=\s*(shared)\s*$')
for row in data:
result = keyValRegEx.search(row)
if result:
self.set("type", "ephemeral")
break
except Exception as ex:
LOG.error(ex)
if self.isPermanent():
metadata["quota"] = "private"
else:
metadata["quota"] = "shared"
def getState(self):
return self.get("state")
@ -78,30 +110,11 @@ class Server(SynergyObject):
def setMetadata(self, metadata):
self.set("metadata", metadata)
if "quota" in metadata:
if metadata["quota"] == "shared":
self.setType("ephemeral")
else:
self.setType("permanent")
def getUserData(self):
return self.get("userdata")
def setUserData(self, userdata):
self.set("userdata", userdata)
if userdata:
try:
quota = utils.getConfigParameter(userdata, "quota", "synergy")
if quota is None or quota == "private":
self.setType("permanent")
elif quota == "shared":
self.setType("ephemeral")
else:
self.setType("permanent")
except Exception:
self.setType("permanent")
self.set("userdata", utils.decodeBase64(userdata))
def getUserId(self):
return self.get("user_id")

View File

@ -79,35 +79,6 @@ class FairShareManager(Manager):
def destroy(self):
pass
def calculatePriority(self, user_id, prj_id, timestamp=None, retry=0):
project = self.project_manager.getProject(id=prj_id)
if not project:
raise SynergyError("project=%s not found!" % prj_id)
user = project.getUser(id=user_id)
if not user:
raise SynergyError("user=%s not found!" % user_id)
priority = user.getPriority()
fairshare_vcpus = priority.getFairShare("vcpus")
fairshare_memory = priority.getFairShare("memory")
if not timestamp:
timestamp = datetime.utcnow()
now = datetime.utcnow()
diff = (now - timestamp)
minutes = diff.seconds / 60
priority = (float(self.age_weight) * minutes +
float(self.vcpus_weight) * fairshare_vcpus +
float(self.memory_weight) * fairshare_memory -
float(self.age_weight) * retry)
return int(priority)
def doOnEvent(self, event_type, *args, **kwargs):
if event_type == "USER_ADDED":
user = kwargs.get("user", None)
@ -304,3 +275,5 @@ class FairShareManager(Manager):
usr_priority.setValue(float(self.vcpus_weight) * f_vcpus +
float(self.memory_weight) * f_memory)
self.notify(event_type="USER_PRIORITY_UPDATED", user=user)

View File

@ -1,4 +1,3 @@
import common.utils as utils
import eventlet
import hashlib
import hmac
@ -68,6 +67,7 @@ class ServerEventHandler(object):
server.setMetadata(server_info["metadata"])
server.setDeletedAt(server_info["deleted_at"])
server.setTerminatedAt(server_info["terminated_at"])
server.setType()
if "host" in server_info:
server.setHost(server_info["host"])
@ -548,6 +548,9 @@ class NovaManager(Manager):
server.setName(server_data["name"])
server.setKeyName(server_data["key_name"])
server.setMetadata(server_data["metadata"])
server.setUserData(server_data.get("OS-EXT-SRV-ATTR:user_data",
None))
server.setType()
server.setState(server_data["OS-EXT-STS:vm_state"])
server.setUserId(server_data["user_id"])
server.setProjectId(server_data["tenant_id"])
@ -558,10 +561,6 @@ class NovaManager(Manager):
server.setTerminatedAt(
server_data.get("OS-SRV-USG:terminated_at", None))
if "user_data" in server_data:
user_data = server_data["user_data"]
server.setUserData(utils.decodeBase64(user_data))
if detail:
server.setFlavor(self.getFlavor(
server_data["flavor"]["id"]))
@ -587,6 +586,9 @@ class NovaManager(Manager):
server.setName(server_data["name"])
server.setKeyName(server_data["key_name"])
server.setMetadata(server_data["metadata"])
server.setUserData(server_data.get("OS-EXT-SRV-ATTR:user_data",
None))
server.setType()
server.setState(server_data["OS-EXT-STS:vm_state"])
server.setUserId(server_data["user_id"])
server.setProjectId(server_data["tenant_id"])
@ -597,10 +599,6 @@ class NovaManager(Manager):
server.setTerminatedAt(
server_data.get("OS-SRV-USG:terminated_at", None))
if "user_data" in server_data:
user_data = server_data["user_data"]
server.setUserData(utils.decodeBase64(user_data))
if detail:
server.setFlavor(self.getFlavor(server_data["flavor"]["id"]))
@ -978,9 +976,9 @@ a.launched_at<='%(to_date)s' and (a.terminated_at>='%(from_date)s' or \
try:
# retrieve the amount of resources in terms of cores and memory
QUERY = """select a.uuid, a.vcpus, a.memory_mb, a.root_gb, \
a.vm_state from nova.instances as a WHERE a.project_id='%(project_id)s' \
and a.vm_state in ('active', 'building', 'error') and a.deleted_at is NULL \
and a.terminated_at is NULL""" % {"project_id": prj_id}
a.vm_state, a.user_data from nova.instances as a WHERE a.project_id=\
'%(project_id)s'and a.vm_state in ('active', 'building', 'error') and \
a.deleted_at is NULL and a.terminated_at is NULL""" % {"project_id": prj_id}
LOG.debug("getProjectServers query: %s" % QUERY)
@ -995,6 +993,7 @@ and a.terminated_at is NULL""" % {"project_id": prj_id}
server = Server()
server.setId(row[0])
server.setState(row[4])
server.setUserData(row[5])
server.setFlavor(flavor)
QUERY = """select `key`, value from nova.instance_metadata \
@ -1009,6 +1008,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()}
metadata[row[0]] = row[1]
server.setMetadata(metadata)
server.setType()
servers.append(server)
except SQLAlchemyError as ex:
@ -1031,7 +1031,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()}
ids = "uuid in ('%s') and " % "', '".join(server_ids)
QUERY = """select uuid, vcpus, memory_mb, root_gb, \
vm_state from nova.instances where project_id = \
vm_state, user_data from nova.instances where project_id = \
'%(project_id)s' and deleted_at is NULL and (vm_state='error' or \
(%(server_ids)s vm_state='active' and terminated_at is NULL \
and timestampdiff(minute, launched_at, utc_timestamp()) >= %(expiration)s))\
@ -1050,6 +1050,7 @@ and timestampdiff(minute, launched_at, utc_timestamp()) >= %(expiration)s))\
server = Server()
server.setId(row[0])
server.setState(row[4])
server.setUserData(row[5])
server.setFlavor(flavor)
QUERY = """select `key`, value from nova.instance_metadata \
@ -1064,6 +1065,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()}
metadata[row[0]] = row[1]
server.setMetadata(metadata)
server.setType()
servers.append(server)
except SQLAlchemyError as ex:

View File

@ -212,7 +212,7 @@ class QuotaManager(Manager):
% (uuid, TTL, state, prj_id))
self.nova_manager.deleteServer(server)
except SynergyError as ex:
except Exception as ex:
LOG.error(ex)
def updateSharedQuota(self):

View File

@ -67,29 +67,29 @@ class Worker(Thread):
last_release_time = SharedQuota.getLastReleaseTime()
while not self.exit and not self.queue.isClosed():
if last_release_time < SharedQuota.getLastReleaseTime():
last_release_time = SharedQuota.getLastReleaseTime()
try:
if last_release_time < SharedQuota.getLastReleaseTime():
last_release_time = SharedQuota.getLastReleaseTime()
while queue_items:
self.queue.restore(queue_items.pop(0))
while queue_items:
self.queue.restore(queue_items.pop(0))
if len(queue_items) >= self.backfill_depth:
SharedQuota.wait()
continue
queue_item = self.queue.dequeue(block=False)
if queue_item is None:
if self.queue.getSize():
if len(queue_items) >= self.backfill_depth:
SharedQuota.wait()
continue
else:
queue_item = self.queue.dequeue(block=True)
if queue_item is None:
continue
queue_item = self.queue.dequeue(block=False)
if queue_item is None:
if self.queue.getSize():
SharedQuota.wait()
continue
else:
queue_item = self.queue.dequeue(block=True)
if queue_item is None:
continue
try:
request = Request.fromDict(queue_item.getData())
user_id = request.getUserId()
prj_id = request.getProjectId()
@ -145,7 +145,7 @@ class Worker(Thread):
"ta=shared" % (server_id, user_id, prj_id))
found = True
except SynergyError as ex:
except Exception as ex:
LOG.error("error on building the server %s (reason=%s)"
% (server.getId(), ex))
@ -157,7 +157,7 @@ class Worker(Thread):
else:
queue_items.append(queue_item)
except SynergyError as ex:
except Exception as ex:
LOG.error("Exception has occured", exc_info=1)
LOG.error("Worker %s: %s" % (self.name, ex))
@ -204,6 +204,7 @@ class SchedulerManager(Manager):
self.backfill_depth = CONF.SchedulerManager.backfill_depth
self.exit = False
self.configured = False
self.queue = None
def execute(self, command, *args, **kargs):
raise SynergyError("command %r not supported!" % command)
@ -251,7 +252,9 @@ class SchedulerManager(Manager):
self._processServerEvent(server, event, state)
elif event_type == "SERVER_CREATE":
self._processServerCreate(kwargs["request"])
elif event_type == "PROJECT_ADDED":
if not self.configured:
return
@ -261,6 +264,10 @@ class SchedulerManager(Manager):
if self.queue and project:
project.setQueue(self.queue)
elif event_type == "USER_PRIORITY_UPDATED":
if self.queue:
self.queue.updatePriority(kwargs.get("user", None))
def _processServerEvent(self, server, event, state):
project = self.project_manager.getProject(id=server.getProjectId())
@ -279,7 +286,6 @@ class SchedulerManager(Manager):
self.nova_manager.setServerMetadata(server,
"expiration_time",
expiration)
else:
quota = project.getQuota()
@ -353,12 +359,6 @@ class SchedulerManager(Manager):
request.getUserId(),
request.getProjectId()))
else:
priority = self.fairshare_manager.calculatePriority(
user_id=request.getUserId(),
prj_id=request.getProjectId(),
timestamp=request.getCreatedAt(),
retry=num_attempts)
context = request.getContext()
km = self.keystone_manager
@ -381,8 +381,9 @@ class SchedulerManager(Manager):
context["trust_id"] = trust.getId()
user = project.getUser(id=request.getUserId())
priority = user.getPriority().getValue()
self.queue.enqueue(user, request.toDict(), priority)
self.queue.enqueue(user, request.toDict())
LOG.info("new request: id=%s user_id=%s prj_id=%s priority"
"=%s quota=shared" % (request.getId(),

View File

@ -10,12 +10,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from mock import MagicMock
from mock import patch
from synergy_scheduler_manager.common.project import Project
from synergy_scheduler_manager.common.user import User
from synergy_scheduler_manager.fairshare_manager import FairShareManager
from synergy_scheduler_manager.project_manager import ProjectManager
from synergy_scheduler_manager.tests.unit import base
@ -41,46 +38,6 @@ class TestFairshareManager(base.TestCase):
with patch('synergy_scheduler_manager.fairshare_manager.CONF'):
self.fairshare_manager.setup()
def test_calculate_priority_one_user(self):
# self.fairshare_manager.addProject(prj_id=1, prj_name="test")
project = Project()
project.setId(1)
project.setName("test_project")
# Define values used for computing the priority
age_weight = self.fairshare_manager.age_weight = 1.0
vcpus_weight = self.fairshare_manager.vcpus_weight = 2.0
memory_weight = self.fairshare_manager.memory_weight = 3.0
datetime_start = datetime(year=2000, month=1, day=1, hour=0, minute=0)
datetime_stop = datetime(year=2000, month=1, day=1, hour=2, minute=0)
minutes = (datetime_stop - datetime_start).seconds / 60
fairshare_cores = 10
fairshare_ram = 50
# Add a user to the project
user = User()
user.setId(22)
user.setName("test_user")
priority = user.getPriority()
priority.setFairShare('vcpus', fairshare_cores)
priority.setFairShare('memory', fairshare_ram)
project.addUser(user)
self.project_manager.projects[project.getId()] = project
# Compute the expected priority given the previously defined values
expected_priority = int(age_weight * minutes +
vcpus_weight * fairshare_cores +
memory_weight * fairshare_ram)
with patch("synergy_scheduler_manager.fairshare_manager.datetime") \
as datetime_mock:
datetime_mock.utcnow.side_effect = (datetime_start, datetime_stop)
priority = self.fairshare_manager.calculatePriority(
user.getId(), project.getId())
self.assertEqual(expected_priority, priority)
def test_calculate_fairshare(self):
# TODO(vincent)
pass

View File

@ -115,8 +115,9 @@ class TestQueue(base.TestCase):
user = User()
user.setId(2)
user.setProjectId(100)
user.getPriority().setValue(10)
self.queue.enqueue(user=user, data="mydata", priority=10)
self.queue.enqueue(user=user, data="mydata")
self.assertEqual(1, self.queue.getSize())
@ -125,8 +126,10 @@ class TestQueue(base.TestCase):
user = User()
user.setId(2)
user.setProjectId(100)
user.getPriority().setValue(10)
data = json.dumps("mydata")
self.queue.enqueue(user=user, data=data, priority=10)
self.queue.enqueue(user=user, data=data)
self.assertEqual(1, self.queue.getSize())
@ -148,9 +151,10 @@ class TestQueue(base.TestCase):
user = User()
user.setId(2)
user.setProjectId(100)
user.getPriority().setValue(10)
data = json.dumps("mydata")
self.queue.enqueue(user=user, data=data, priority=10)
self.queue.enqueue(user=user, data=data)
# Mock the DB
execute_mock = self.db_engine_mock.connect().execute