Make SchedulerManager handle ERROR notifications.

SchedulerManager receives all VMs status changes through AMQP but it doesn't
handle the ERROR notifications (not used before to Liberty version)

- updated test unit (test_scheduler_manager.TestNotifications.test_info_quota)

Change-Id: Ia1e4db66a743f2d42f01241505310cbed37b625e
Sem-Ver: bugfix
Closes-bug: #1648057
This commit is contained in:
Lisa Zangrando 2016-12-07 13:18:42 +01:00
parent a6cc985ad6
commit 90e6d14aab
3 changed files with 193 additions and 45 deletions

View File

@ -33,7 +33,10 @@ class Server(SynergyObject):
if not date: if not date:
return None return None
elif isinstance(date, basestring): elif isinstance(date, basestring):
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ") try:
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
except Exception:
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f")
elif isinstance(date, datetime): elif isinstance(date, datetime):
return date return date
else: else:
@ -130,6 +133,12 @@ class Server(SynergyObject):
def setTerminatedAt(self, terminated_at): def setTerminatedAt(self, terminated_at):
self.set("terminated_at", self.__getDateTime(terminated_at)) self.set("terminated_at", self.__getDateTime(terminated_at))
def getDeletedAt(self):
return self.get("deleted_at")
def setDeletedAt(self, deleted_at):
self.set("deleted_at", self.__getDateTime(deleted_at))
def isEphemeral(self): def isEphemeral(self):
return self.get("type") == "ephemeral" return self.get("type") == "ephemeral"

View File

@ -34,10 +34,47 @@ LOG = logging.getLogger(__name__)
class Notifications(object): class Notifications(object):
def __init__(self, projects): def __init__(self, projects, nova_manager):
super(Notifications, self).__init__() super(Notifications, self).__init__()
self.projects = projects self.projects = projects
self.nova_manager = nova_manager
def _makeServer(self, server_info):
if not server_info:
return
flavor = Flavor()
flavor.setMemory(server_info["memory_mb"])
flavor.setVCPUs(server_info["vcpus"])
flavor.setStorage(server_info["root_gb"])
if "instance_type" in server_info:
flavor.setName(server_info["instance_type"])
server = Server()
server.setFlavor(flavor)
server.setUserId(server_info["user_id"])
server.setMetadata(server_info["metadata"])
server.setDeletedAt(server_info["deleted_at"])
server.setTerminatedAt(server_info["terminated_at"])
if "uuid" in server_info:
server.setId(server_info["uuid"])
elif "instance_id" in server_info:
server.setId(server_info["instance_id"])
if "project_id" in server_info:
server.setProjectId(server_info["project_id"])
elif "tenant_id" in server_info:
server.setProjectId(server_info["tenant_id"])
if "vm_state" in server_info:
server.setState(server_info["vm_state"])
elif "state" in server_info:
server.setState(server_info["state"])
return server
def info(self, ctxt, publisher_id, event_type, payload, metadata): def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.debug("Notification INFO: event_type=%s payload=%s" LOG.debug("Notification INFO: event_type=%s payload=%s"
@ -52,41 +89,36 @@ class Notifications(object):
(state == "deleted" or state == "error" or state == "building")) or (state == "deleted" or state == "error" or state == "building")) or
(event_type == "compute.instance.update" and state == "error") or (event_type == "compute.instance.update" and state == "error") or
(event_type == "scheduler.run_instance" and state == "error")): (event_type == "scheduler.run_instance" and state == "error")):
instance_info = None server_info = None
if event_type == "scheduler.run_instance": if event_type == "scheduler.run_instance":
instance_info = payload["request_spec"]["instance_type"] server_info = payload["request_spec"]["instance_type"]
else: else:
instance_info = payload server_info = payload
if instance_info["tenant_id"] not in self.projects: if server_info["tenant_id"] not in self.projects:
return return
flavor = Flavor() server = self._makeServer(server_info)
flavor.setName(instance_info["instance_type"]) flavor = server.getFlavor()
flavor.setMemory(instance_info["memory_mb"])
flavor.setVCPUs(instance_info["vcpus"])
flavor.setStorage(instance_info["root_gb"])
server = Server() message = "N/A"
server.setFlavor(flavor) if "message" in server_info:
server.setId(instance_info["instance_id"]) message = server_info["message"]
server.setUserId(instance_info["user_id"])
server.setProjectId(instance_info["tenant_id"])
server.setMetadata(instance_info["metadata"])
LOG.debug("Notification INFO (type=%s state=%s): vcpus=%s " LOG.debug("Notification INFO (type=%s state=%s): vcpus=%s "
"memory=%s prj_id=%s server_id=%s" "memory=%s prj_id=%s server_id=%s (message=%s)"
% (event_type, state, flavor.getVCPUs(), % (event_type, server.getState(), flavor.getVCPUs(),
flavor.getMemory(), server.getProjectId(), flavor.getMemory(), server.getProjectId(),
server.getId())) server.getId(), message))
quota = self.projects[server.getProjectId()].getQuota() quota = self.projects[server.getProjectId()].getQuota()
try: try:
quota.release(server) quota.release(server)
except Exception as ex: except Exception as ex:
LOG.warn("Notification INFO: %s" % ex) LOG.warn("Cannot release server id=%r: %s"
% (server.getId(), ex))
LOG.error("Exception has occured", exc_info=1) LOG.error("Exception has occured", exc_info=1)
def warn(self, ctxt, publisher_id, event_type, payload, metadata): def warn(self, ctxt, publisher_id, event_type, payload, metadata):
@ -96,8 +128,52 @@ class Notifications(object):
"payload=%s" % (event_type, state, instance_id, payload)) "payload=%s" % (event_type, state, instance_id, payload))
def error(self, ctxt, publisher_id, event_type, payload, metadata): def error(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.debug("Notification ERROR: event_type=%s payload=%s metadata=%s" server = None
% (event_type, payload, metadata)) message = "N\A"
server_info = None
if event_type == "terminate_instance":
server_info = payload["args"]["instance"]
message = payload["exception"]["value"]
elif event_type == "compute.instance.create.error" or\
event_type == "compute.instance.update.error":
server_info = payload
message = payload["message"]
server = self._makeServer(server_info)
if not server:
LOG.info("Notification ERROR: event_type=%s payload=%s"
% (event_type, payload))
return
if server.getProjectId() not in self.projects:
return
flavor = server.getFlavor()
LOG.debug("Notification ERROR (type=%s state=%s): vcpus=%s "
"memory=%s prj_id=%s server_id=%s (error=%s)"
% (event_type, server.getState(), flavor.getVCPUs(),
flavor.getMemory(), server.getProjectId(),
server.getId(), message))
if not server.getTerminatedAt() and not server.getDeletedAt():
try:
self.nova_manager.deleteServer(server)
except Exception as ex:
LOG.error("Cannot delete server id=%r: %s"
% (server.getId(), ex))
quota = self.projects[server.getProjectId()].getQuota()
try:
quota.release(server)
except Exception as ex:
LOG.warn("Cannot release server id=%r: %s"
% (server.getId(), ex))
LOG.error("Exception has occured", exc_info=1)
class Worker(Thread): class Worker(Thread):
@ -415,7 +491,8 @@ class SchedulerManager(Manager):
self.workers.append(dynamic_worker) self.workers.append(dynamic_worker)
self.notifications = Notifications(self.projects) self.notifications = Notifications(self.projects,
self.nova_manager)
target = self.nova_manager.getTarget(topic='notifications', target = self.nova_manager.getTarget(topic='notifications',
exchange="nova") exchange="nova")

View File

@ -13,40 +13,95 @@
from mock import create_autospec from mock import create_autospec
from mock import MagicMock from mock import MagicMock
from sqlalchemy.engine.base import Engine from sqlalchemy.engine.base import Engine
from synergy_scheduler_manager.common.flavor import Flavor
from synergy_scheduler_manager.common.project import Project from synergy_scheduler_manager.common.project import Project
from synergy_scheduler_manager.common.queue import QueueDB from synergy_scheduler_manager.common.queue import QueueDB
from synergy_scheduler_manager.common.queue import QueueItem from synergy_scheduler_manager.common.queue import QueueItem
from synergy_scheduler_manager.common.quota import SharedQuota
from synergy_scheduler_manager.common.server import Server
from synergy_scheduler_manager.scheduler_manager import Notifications from synergy_scheduler_manager.scheduler_manager import Notifications
from synergy_scheduler_manager.scheduler_manager import Worker from synergy_scheduler_manager.scheduler_manager import Worker
from synergy_scheduler_manager.tests.unit import base from synergy_scheduler_manager.tests.unit import base
class TestNotifications(base.TestCase): class TestNotifications(base.TestCase):
# TO COMPLETE
def test_info_quota(self): def test_info_quota(self):
SharedQuota.setSize("vcpus", 20)
SharedQuota.setSize("memory", 4096)
SharedQuota.enable()
project1 = Project() self.assertEqual(20, SharedQuota.getSize('vcpus'))
project1.setId(1) self.assertEqual(4096, SharedQuota.getSize('memory'))
project1.setName("test1")
project2 = Project() prj_a = Project()
project2.setId(2) prj_a.setId(1)
project2.setName("test2") prj_a.setName("prj_a")
prjDict = {1: project1, 2: project2} prj_b = Project()
prj_b.setId(2)
prj_b.setName("prj_b")
prjDict = {1: prj_a, 2: prj_b}
quota = prjDict[1].getQuota()
quota.setSize("vcpus", 10, private=True)
quota.setSize("memory", 2048, private=True)
self.assertEqual(10, quota.getSize('vcpus', private=True))
self.assertEqual(2048, quota.getSize('memory', private=True))
quota.setSize("vcpus",
SharedQuota.getSize('vcpus'),
private=False)
quota.setSize("memory",
SharedQuota.getSize('memory'),
private=False)
self.assertEqual(20, quota.getSize('vcpus', private=False))
self.assertEqual(4096, quota.getSize('memory', private=False))
flavor = Flavor()
flavor.setVCPUs(2)
flavor.setMemory(512)
server = Server()
server.setType("ephemeral")
server.setId("server_id")
server.setFlavor(flavor)
self.assertEqual(True, server.isEphemeral())
try:
allocated = quota.allocate(server, blocking=False)
except Exception as ex:
print(ex)
self.assertEqual(True, allocated)
self.assertEqual(0, quota.getUsage('vcpus', private=True))
self.assertEqual(0, quota.getUsage('memory', private=True))
self.assertEqual(2, quota.getUsage('vcpus', private=False))
self.assertEqual(512, quota.getUsage('memory', private=False))
self.assertEqual(2, SharedQuota.getUsage('vcpus'))
self.assertEqual(512, SharedQuota.getUsage('memory'))
ns = Notifications(prjDict, None)
ns = Notifications(prjDict)
payload = { payload = {
"state": "deleted", "state": "deleted",
"instance_type": "instance_type", "deleted_at": "2016-12-09T10:06:10.000000",
"user_id": "user_id", "terminated_at": "2016-12-09T10:06:10.025305",
"root_gb": "root_gb", "instance_type": "m1.tiny",
"metadata": "metadata", "user_id": "user",
"instance_id": 1, "root_gb": "1",
"tenant_id": 2, "metadata": {},
"memory_mb": 3, "instance_id": "server_id",
"vcpus": 4} "tenant_id": 1,
"memory_mb": 512,
"vcpus": 2}
ns.info(ctxt=None, ns.info(ctxt=None,
publisher_id=None, publisher_id=None,
@ -54,9 +109,16 @@ class TestNotifications(base.TestCase):
payload=payload, payload=payload,
metadata=None) metadata=None)
quota = ns.projects[2].getQuota() quota = prjDict[1].getQuota()
self.assertEqual(0, quota.getUsage("memory", private=False))
self.assertEqual(0, quota.getUsage("vcpus", private=False)) self.assertEqual(0, quota.getUsage("vcpus", private=True))
self.assertEqual(0, quota.getUsage("memory", private=True))
self.assertEqual(0, quota.getUsage('vcpus', private=False))
self.assertEqual(0, quota.getUsage('memory', private=False))
self.assertEqual(0, SharedQuota.getUsage('vcpus'))
self.assertEqual(0, SharedQuota.getUsage('memory'))
class TestWorker(base.TestCase): class TestWorker(base.TestCase):