diff --git a/synergy_scheduler_manager/common/server.py b/synergy_scheduler_manager/common/server.py index cf71488..301628a 100644 --- a/synergy_scheduler_manager/common/server.py +++ b/synergy_scheduler_manager/common/server.py @@ -54,6 +54,12 @@ class Server(SynergyObject): def setState(self, state): self.set("state", state) + def getHost(self): + return self.get("host") + + def setHost(self, host): + self.set("host", host) + def getFlavor(self): return self.get("flavor") diff --git a/synergy_scheduler_manager/scheduler_manager.py b/synergy_scheduler_manager/scheduler_manager.py index f8803be..bf83073 100644 --- a/synergy_scheduler_manager/scheduler_manager.py +++ b/synergy_scheduler_manager/scheduler_manager.py @@ -59,6 +59,9 @@ class Notifications(object): server.setDeletedAt(server_info["deleted_at"]) server.setTerminatedAt(server_info["terminated_at"]) + if "host" in server_info: + server.setHost(server_info["host"]) + if "uuid" in server_info: server.setId(server_info["uuid"]) elif "instance_id" in server_info: @@ -84,96 +87,66 @@ class Notifications(object): return state = payload["state"] + if state == "error": + LOG.info("Notification INFO: event_type=%s payload=%s" + % (event_type, payload)) - if ((event_type == "compute.instance.delete.end" and - (state == "deleted" or state == "error" or state == "building")) or - (event_type == "compute.instance.update" and state == "error") or - (event_type == "scheduler.run_instance" and state == "error")): - server_info = None + event_types = ["compute.instance.create.end", + "compute.instance.delete.end", + "compute.instance.update", + "scheduler.run_instance"] - if event_type == "scheduler.run_instance": - server_info = payload["request_spec"]["instance_type"] - else: - server_info = payload + if event_type not in event_types: + return - if server_info["tenant_id"] not in self.projects: - return - - server = self._makeServer(server_info) - flavor = server.getFlavor() - - message = "N/A" - if "message" in server_info: - message = server_info["message"] - - LOG.debug("Notification INFO (type=%s state=%s): vcpus=%s " - "memory=%s prj_id=%s server_id=%s (message=%s)" - % (event_type, server.getState(), flavor.getVCPUs(), - flavor.getMemory(), server.getProjectId(), - server.getId(), message)) - - quota = self.projects[server.getProjectId()].getQuota() - - try: - quota.release(server) - except Exception as ex: - LOG.warn("Cannot release server id=%r: %s" - % (server.getId(), ex)) - LOG.error("Exception has occured", exc_info=1) - - def warn(self, ctxt, publisher_id, event_type, payload, metadata): - state = payload["state"] - instance_id = payload["instance_id"] - LOG.debug("Notification WARN: event_type=%s state=%s instance_id=%s " - "payload=%s" % (event_type, state, instance_id, payload)) - - def error(self, ctxt, publisher_id, event_type, payload, metadata): - server = None - message = "N\A" server_info = None - if event_type == "terminate_instance": - server_info = payload["args"]["instance"] - message = payload["exception"]["value"] - - elif event_type == "compute.instance.create.error" or\ - event_type == "compute.instance.update.error": + if event_type == "scheduler.run_instance": + server_info = payload["request_spec"]["instance_type"] + else: server_info = payload - message = payload["message"] server = self._makeServer(server_info) - - if not server: - LOG.info("Notification ERROR: event_type=%s payload=%s" - % (event_type, payload)) - return + server_id = server.getId() + host = server.getHost() if server.getProjectId() not in self.projects: return - flavor = server.getFlavor() + if event_type == "compute.instance.create.end" and \ + state == "active": + LOG.info("the server %s is now active on host %s" + % (server_id, host)) + else: + quota = self.projects[server.getProjectId()].getQuota() - LOG.debug("Notification ERROR (type=%s state=%s): vcpus=%s " - "memory=%s prj_id=%s server_id=%s (error=%s)" - % (event_type, server.getState(), flavor.getVCPUs(), - flavor.getMemory(), server.getProjectId(), - server.getId(), message)) + if event_type == "compute.instance.delete.end" and \ + state == "deleted": + LOG.info("the server %s has been deleted on host %s" + % (server_id, host)) + try: + quota.release(server) + except Exception as ex: + LOG.warn("cannot release server %s " + "(reason=%s)" % (server_id, ex)) + elif state == "error": + LOG.info("error occurred on server %s (host %s)" + % (server_id, host)) - if not server.getTerminatedAt() and not server.getDeletedAt(): - try: - self.nova_manager.deleteServer(server) - except Exception as ex: - LOG.error("Cannot delete server id=%r: %s" - % (server.getId(), ex)) + if not server.getTerminatedAt() and not server.getDeletedAt(): + try: + self.nova_manager.deleteServer(server) + except Exception as ex: + LOG.error("cannot delete server %s: %s" + % (server_id, ex)) - quota = self.projects[server.getProjectId()].getQuota() + def warn(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.debug("Notification WARN: event_type=%s, payload=%s metadata=%s" + % (event_type, payload, metadata)) - try: - quota.release(server) - except Exception as ex: - LOG.warn("Cannot release server id=%r: %s" - % (server.getId(), ex)) - LOG.error("Exception has occured", exc_info=1) + def error(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.debug("Notification ERROR: event_type=%s, payload=%s metadata=%s" + % (event_type, payload, metadata)) class Worker(Thread): @@ -190,7 +163,7 @@ class Worker(Thread): self.nova_manager = nova_manager self.keystone_manager = keystone_manager self.exit = False - LOG.info("Worker %r created!" % self.name) + LOG.info("Worker %s created!" % self.name) def getName(self): return self.name @@ -205,7 +178,7 @@ class Worker(Thread): raise ex def run(self): - LOG.info("Worker %r running!" % self.name) + LOG.info("Worker %s running!" % self.name) queue_items = [] last_release_time = SharedQuota.getLastReleaseTime() @@ -234,7 +207,7 @@ class Worker(Thread): try: request = Request.fromDict(queue_item.getData()) - + user_id = request.getUserId() prj_id = request.getProjectId() context = request.getContext() server = request.getServer() @@ -243,26 +216,27 @@ class Worker(Thread): try: s = self.nova_manager.getServer(server_id, detail=True) + if s.getState() != "building": # or server["OS-EXT-STS:task_state"] != "scheduling": self.queue.deleteItem(queue_item) continue except Exception as ex: - LOG.warn("Worker %s: the server %r is not anymore availa" - "ble ! [reason=%s]" % (self.name, server_id, ex)) + LOG.warn("the server %s is not anymore available!" + "(reason=%s)" % (self.name, server_id, ex)) self.queue.deleteItem(queue_item) continue quota = self.projects[prj_id].getQuota() - quota = self.projects[prj_id].getQuota() - computes = [] blocking = False if server.isEphemeral() and not SharedQuota.isEnabled(): blocking = True if quota.allocate(server, blocking=blocking): + found = False + try: km = self.keystone_manager trust = km.getTrust(context["trust_id"]) @@ -271,33 +245,21 @@ class Worker(Thread): context["auth_token"] = token.getId() context["user_id"] = token.getUser().getId() except Exception as ex: - LOG.error("Worker %r: error on getting the token " - "for server (id=%r) reason=%s" + LOG.error("error on getting the token for server " + "%s (reason=%s)" % (self.name, server.getId(), ex)) raise ex try: - computes = self.nova_manager.selectComputes(request) + self.nova_manager.buildServer(request) + + LOG.info("building server %s (user_id=%s prj_id=%s quo" + "ta=shared)" % (server_id, user_id, prj_id)) + + found = True except Exception as ex: - LOG.warn("Worker %s: compute not found for server %r!" - " [reason=%s]" % (self.name, - server.getId(), ex.message)) - - found = False - - for compute in computes: - try: - self.nova_manager.buildServer(request, compute) - - LOG.info("Worker %r: server (id=%r) " - "builded!" % (self.name, server.getId())) - - found = True - break - except Exception as ex: - LOG.error("Worker %r: error on building the " - "server (id=%r) reason=%s" - % (self.name, server.getId(), ex)) + LOG.error("error on building the server %s (reason=%s)" + % (self.name, server.getId(), ex)) if found: self.queue.deleteItem(queue_item) @@ -309,11 +271,11 @@ class Worker(Thread): except Exception as ex: LOG.error("Exception has occured", exc_info=1) - LOG.error("Worker %r: %s" % (self.name, ex)) + LOG.error("Worker %s: %s" % (self.name, ex)) self.queue.deleteItem(queue_item) - LOG.info("Worker %r destroyed!" % self.name) + LOG.info("Worker %s destroyed!" % self.name) class SchedulerManager(Manager): @@ -404,7 +366,7 @@ class SchedulerManager(Manager): project = self.projects.get(prj_id, None) if not project: - raise Exception("project (id=%r) not found!" % prj_id) + raise Exception("project %s not found!" % prj_id) elif prj_name: for prj in self.projects.values(): if prj_name == prj.getName(): @@ -412,7 +374,7 @@ class SchedulerManager(Manager): break if not project: - raise Exception("project (name=%r) not found!" % prj_name) + raise Exception("project %r not found!" % prj_name) elif not all_users: return self.projects.values() @@ -438,7 +400,7 @@ class SchedulerManager(Manager): else: domain = self.keystone_manager.getDomains(name="default") if not domain: - raise Exception("domain 'default' not found!") + raise Exception("domain'default' not found!") domain = domain[0] dom_id = domain.getId() @@ -543,43 +505,48 @@ class SchedulerManager(Manager): project = self.projects[request.getProjectId()] quota = project.getQuota() + retry = request.getRetry() + num_attempts = 0 + reason = None + + if retry: + num_attempts = retry.get("num_attempts", 0) + reason = retry.get("exc_reason", "n/a") + + if 0 < num_attempts < 3: + self.nova_manager.buildServer(request) + + LOG.info("retrying to build the server %s (user_id" + "=%s prj_id=%s, num_attempts=%s, reason=%s)" + % (request.getId(), request.getUserId(), + request.getProjectId(), num_attempts, reason)) + return if server.isPermanent(): if quota.allocate(server, blocking=False): - self.nova_manager.buildServer(request) - - LOG.info("new request: id=%r user_id=%s prj_id=%s " + LOG.info("new request: id=%s user_id=%s prj_id=%s " "quota=private" % (request.getId(), request.getUserId(), request.getProjectId())) + + self.nova_manager.buildServer(request) + LOG.info("building server %s (user_id=%s prj_id=%s " + "quota=private)" % (server.getId(), + request.getUserId(), + request.getProjectId())) else: self.nova_manager.deleteServer(server) LOG.info("request rejected (quota exceeded): " - "id=%r user_id=%s prj_id=%s " + "id=%s user_id=%s prj_id=%s " "quota=private" % (request.getId(), request.getUserId(), request.getProjectId())) else: - timestamp = request.getCreatedAt() - priority = 0 - retry = request.getRetry() - - if retry: - num_attempts = retry["num_attempts"] - - if num_attempts: - quota.release(server) - - priority = 99999999 - LOG.info("released resource uuid %s num attempts" - "%s" % (request.getId(), num_attempts)) - - if priority == 0: - priority = self.fairshare_manager.calculatePriority( - user_id=request.getUserId(), - prj_id=request.getProjectId(), - timestamp=timestamp, - retry=0) + priority = self.fairshare_manager.calculatePriority( + user_id=request.getUserId(), + prj_id=request.getProjectId(), + timestamp=request.getCreatedAt(), + retry=num_attempts) context = request.getContext() @@ -608,19 +575,13 @@ class SchedulerManager(Manager): priority=priority, data=request.toDict()) - LOG.info("new request: id=%r user_id=%s prj_id=%s priority" + LOG.info("new request: id=%s user_id=%s prj_id=%s priority" "=%s quota=shared" % (request.getId(), request.getUserId(), request.getProjectId(), priority)) else: self.nova_manager.buildServer(request) - - self.nova_manager.setQuotaTypeServer(server) - LOG.info("new request: id=%r user_id=%s prj_id=%s " - "quota=private" % (request.getId(), - request.getUserId(), - request.getProjectId())) except Exception as ex: LOG.error("Exception has occured", exc_info=1) LOG.error(ex)