Retry mechanism improved and fixed

Bug: #1659780
Change-Id: Ifabc4f0ce576fc5c0348fe591d0ca92abe2f7b64
Sem-Ver: bugfix
This commit is contained in:
Lisa Zangrando 2017-01-27 11:23:34 +01:00
parent 80ef20c334
commit 89537e9f99
2 changed files with 106 additions and 139 deletions

View File

@ -54,6 +54,12 @@ class Server(SynergyObject):
def setState(self, state):
self.set("state", state)
def getHost(self):
return self.get("host")
def setHost(self, host):
self.set("host", host)
def getFlavor(self):
return self.get("flavor")

View File

@ -59,6 +59,9 @@ class Notifications(object):
server.setDeletedAt(server_info["deleted_at"])
server.setTerminatedAt(server_info["terminated_at"])
if "host" in server_info:
server.setHost(server_info["host"])
if "uuid" in server_info:
server.setId(server_info["uuid"])
elif "instance_id" in server_info:
@ -84,96 +87,66 @@ class Notifications(object):
return
state = payload["state"]
if state == "error":
LOG.info("Notification INFO: event_type=%s payload=%s"
% (event_type, payload))
if ((event_type == "compute.instance.delete.end" and
(state == "deleted" or state == "error" or state == "building")) or
(event_type == "compute.instance.update" and state == "error") or
(event_type == "scheduler.run_instance" and state == "error")):
server_info = None
event_types = ["compute.instance.create.end",
"compute.instance.delete.end",
"compute.instance.update",
"scheduler.run_instance"]
if event_type == "scheduler.run_instance":
server_info = payload["request_spec"]["instance_type"]
else:
server_info = payload
if event_type not in event_types:
return
if server_info["tenant_id"] not in self.projects:
return
server = self._makeServer(server_info)
flavor = server.getFlavor()
message = "N/A"
if "message" in server_info:
message = server_info["message"]
LOG.debug("Notification INFO (type=%s state=%s): vcpus=%s "
"memory=%s prj_id=%s server_id=%s (message=%s)"
% (event_type, server.getState(), flavor.getVCPUs(),
flavor.getMemory(), server.getProjectId(),
server.getId(), message))
quota = self.projects[server.getProjectId()].getQuota()
try:
quota.release(server)
except Exception as ex:
LOG.warn("Cannot release server id=%r: %s"
% (server.getId(), ex))
LOG.error("Exception has occured", exc_info=1)
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
state = payload["state"]
instance_id = payload["instance_id"]
LOG.debug("Notification WARN: event_type=%s state=%s instance_id=%s "
"payload=%s" % (event_type, state, instance_id, payload))
def error(self, ctxt, publisher_id, event_type, payload, metadata):
server = None
message = "N\A"
server_info = None
if event_type == "terminate_instance":
server_info = payload["args"]["instance"]
message = payload["exception"]["value"]
elif event_type == "compute.instance.create.error" or\
event_type == "compute.instance.update.error":
if event_type == "scheduler.run_instance":
server_info = payload["request_spec"]["instance_type"]
else:
server_info = payload
message = payload["message"]
server = self._makeServer(server_info)
if not server:
LOG.info("Notification ERROR: event_type=%s payload=%s"
% (event_type, payload))
return
server_id = server.getId()
host = server.getHost()
if server.getProjectId() not in self.projects:
return
flavor = server.getFlavor()
if event_type == "compute.instance.create.end" and \
state == "active":
LOG.info("the server %s is now active on host %s"
% (server_id, host))
else:
quota = self.projects[server.getProjectId()].getQuota()
LOG.debug("Notification ERROR (type=%s state=%s): vcpus=%s "
"memory=%s prj_id=%s server_id=%s (error=%s)"
% (event_type, server.getState(), flavor.getVCPUs(),
flavor.getMemory(), server.getProjectId(),
server.getId(), message))
if event_type == "compute.instance.delete.end" and \
state == "deleted":
LOG.info("the server %s has been deleted on host %s"
% (server_id, host))
try:
quota.release(server)
except Exception as ex:
LOG.warn("cannot release server %s "
"(reason=%s)" % (server_id, ex))
elif state == "error":
LOG.info("error occurred on server %s (host %s)"
% (server_id, host))
if not server.getTerminatedAt() and not server.getDeletedAt():
try:
self.nova_manager.deleteServer(server)
except Exception as ex:
LOG.error("Cannot delete server id=%r: %s"
% (server.getId(), ex))
if not server.getTerminatedAt() and not server.getDeletedAt():
try:
self.nova_manager.deleteServer(server)
except Exception as ex:
LOG.error("cannot delete server %s: %s"
% (server_id, ex))
quota = self.projects[server.getProjectId()].getQuota()
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.debug("Notification WARN: event_type=%s, payload=%s metadata=%s"
% (event_type, payload, metadata))
try:
quota.release(server)
except Exception as ex:
LOG.warn("Cannot release server id=%r: %s"
% (server.getId(), ex))
LOG.error("Exception has occured", exc_info=1)
def error(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.debug("Notification ERROR: event_type=%s, payload=%s metadata=%s"
% (event_type, payload, metadata))
class Worker(Thread):
@ -190,7 +163,7 @@ class Worker(Thread):
self.nova_manager = nova_manager
self.keystone_manager = keystone_manager
self.exit = False
LOG.info("Worker %r created!" % self.name)
LOG.info("Worker %s created!" % self.name)
def getName(self):
return self.name
@ -205,7 +178,7 @@ class Worker(Thread):
raise ex
def run(self):
LOG.info("Worker %r running!" % self.name)
LOG.info("Worker %s running!" % self.name)
queue_items = []
last_release_time = SharedQuota.getLastReleaseTime()
@ -234,7 +207,7 @@ class Worker(Thread):
try:
request = Request.fromDict(queue_item.getData())
user_id = request.getUserId()
prj_id = request.getProjectId()
context = request.getContext()
server = request.getServer()
@ -243,26 +216,27 @@ class Worker(Thread):
try:
s = self.nova_manager.getServer(server_id, detail=True)
if s.getState() != "building":
# or server["OS-EXT-STS:task_state"] != "scheduling":
self.queue.deleteItem(queue_item)
continue
except Exception as ex:
LOG.warn("Worker %s: the server %r is not anymore availa"
"ble ! [reason=%s]" % (self.name, server_id, ex))
LOG.warn("the server %s is not anymore available!"
"(reason=%s)" % (self.name, server_id, ex))
self.queue.deleteItem(queue_item)
continue
quota = self.projects[prj_id].getQuota()
quota = self.projects[prj_id].getQuota()
computes = []
blocking = False
if server.isEphemeral() and not SharedQuota.isEnabled():
blocking = True
if quota.allocate(server, blocking=blocking):
found = False
try:
km = self.keystone_manager
trust = km.getTrust(context["trust_id"])
@ -271,33 +245,21 @@ class Worker(Thread):
context["auth_token"] = token.getId()
context["user_id"] = token.getUser().getId()
except Exception as ex:
LOG.error("Worker %r: error on getting the token "
"for server (id=%r) reason=%s"
LOG.error("error on getting the token for server "
"%s (reason=%s)"
% (self.name, server.getId(), ex))
raise ex
try:
computes = self.nova_manager.selectComputes(request)
self.nova_manager.buildServer(request)
LOG.info("building server %s (user_id=%s prj_id=%s quo"
"ta=shared)" % (server_id, user_id, prj_id))
found = True
except Exception as ex:
LOG.warn("Worker %s: compute not found for server %r!"
" [reason=%s]" % (self.name,
server.getId(), ex.message))
found = False
for compute in computes:
try:
self.nova_manager.buildServer(request, compute)
LOG.info("Worker %r: server (id=%r) "
"builded!" % (self.name, server.getId()))
found = True
break
except Exception as ex:
LOG.error("Worker %r: error on building the "
"server (id=%r) reason=%s"
% (self.name, server.getId(), ex))
LOG.error("error on building the server %s (reason=%s)"
% (self.name, server.getId(), ex))
if found:
self.queue.deleteItem(queue_item)
@ -309,11 +271,11 @@ class Worker(Thread):
except Exception as ex:
LOG.error("Exception has occured", exc_info=1)
LOG.error("Worker %r: %s" % (self.name, ex))
LOG.error("Worker %s: %s" % (self.name, ex))
self.queue.deleteItem(queue_item)
LOG.info("Worker %r destroyed!" % self.name)
LOG.info("Worker %s destroyed!" % self.name)
class SchedulerManager(Manager):
@ -404,7 +366,7 @@ class SchedulerManager(Manager):
project = self.projects.get(prj_id, None)
if not project:
raise Exception("project (id=%r) not found!" % prj_id)
raise Exception("project %s not found!" % prj_id)
elif prj_name:
for prj in self.projects.values():
if prj_name == prj.getName():
@ -412,7 +374,7 @@ class SchedulerManager(Manager):
break
if not project:
raise Exception("project (name=%r) not found!" % prj_name)
raise Exception("project %r not found!" % prj_name)
elif not all_users:
return self.projects.values()
@ -438,7 +400,7 @@ class SchedulerManager(Manager):
else:
domain = self.keystone_manager.getDomains(name="default")
if not domain:
raise Exception("domain 'default' not found!")
raise Exception("domain'default' not found!")
domain = domain[0]
dom_id = domain.getId()
@ -543,43 +505,48 @@ class SchedulerManager(Manager):
project = self.projects[request.getProjectId()]
quota = project.getQuota()
retry = request.getRetry()
num_attempts = 0
reason = None
if retry:
num_attempts = retry.get("num_attempts", 0)
reason = retry.get("exc_reason", "n/a")
if 0 < num_attempts < 3:
self.nova_manager.buildServer(request)
LOG.info("retrying to build the server %s (user_id"
"=%s prj_id=%s, num_attempts=%s, reason=%s)"
% (request.getId(), request.getUserId(),
request.getProjectId(), num_attempts, reason))
return
if server.isPermanent():
if quota.allocate(server, blocking=False):
self.nova_manager.buildServer(request)
LOG.info("new request: id=%r user_id=%s prj_id=%s "
LOG.info("new request: id=%s user_id=%s prj_id=%s "
"quota=private" % (request.getId(),
request.getUserId(),
request.getProjectId()))
self.nova_manager.buildServer(request)
LOG.info("building server %s (user_id=%s prj_id=%s "
"quota=private)" % (server.getId(),
request.getUserId(),
request.getProjectId()))
else:
self.nova_manager.deleteServer(server)
LOG.info("request rejected (quota exceeded): "
"id=%r user_id=%s prj_id=%s "
"id=%s user_id=%s prj_id=%s "
"quota=private" % (request.getId(),
request.getUserId(),
request.getProjectId()))
else:
timestamp = request.getCreatedAt()
priority = 0
retry = request.getRetry()
if retry:
num_attempts = retry["num_attempts"]
if num_attempts:
quota.release(server)
priority = 99999999
LOG.info("released resource uuid %s num attempts"
"%s" % (request.getId(), num_attempts))
if priority == 0:
priority = self.fairshare_manager.calculatePriority(
user_id=request.getUserId(),
prj_id=request.getProjectId(),
timestamp=timestamp,
retry=0)
priority = self.fairshare_manager.calculatePriority(
user_id=request.getUserId(),
prj_id=request.getProjectId(),
timestamp=request.getCreatedAt(),
retry=num_attempts)
context = request.getContext()
@ -608,19 +575,13 @@ class SchedulerManager(Manager):
priority=priority,
data=request.toDict())
LOG.info("new request: id=%r user_id=%s prj_id=%s priority"
LOG.info("new request: id=%s user_id=%s prj_id=%s priority"
"=%s quota=shared" % (request.getId(),
request.getUserId(),
request.getProjectId(),
priority))
else:
self.nova_manager.buildServer(request)
self.nova_manager.setQuotaTypeServer(server)
LOG.info("new request: id=%r user_id=%s prj_id=%s "
"quota=private" % (request.getId(),
request.getUserId(),
request.getProjectId()))
except Exception as ex:
LOG.error("Exception has occured", exc_info=1)
LOG.error(ex)