From 3704095c7927568a1f32317337c3646a9d15769e Mon Sep 17 00:00:00 2001 From: Tobias Henkel Date: Sat, 2 Mar 2019 22:18:22 +0100 Subject: [PATCH] Centralize job canceling Canceling jobs is hard because they can be in various states like waiting for nodes, waiting for an executor or running. We further must release the semaphore when canceling a job. We currently do this in multiple places which can lead to hard to find bugs especially around semaphores. Thus move the job canceling code into the scheduler at one place and only use this for canceling jobs. Change-Id: Iccee137909c8e99cf4e2860fb300f3100fed8dbe --- zuul/manager/__init__.py | 38 ++-------------------- zuul/scheduler.py | 69 +++++++++++++++++++++++++++------------- 2 files changed, 50 insertions(+), 57 deletions(-) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index d54969110f..95922e7113 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -420,46 +420,14 @@ class PipelineManager(object): def cancelJobs(self, item, prime=True): self.log.debug("Cancel jobs for change %s" % item.change) canceled = False - jobs_to_release = [] - old_build_set = item.current_build_set - old_jobs = {job.name: job for job in item.getJobs()} + jobs_to_cancel = item.getJobs() if prime and item.current_build_set.ref: item.resetAllBuilds() - for req in old_build_set.node_requests.values(): - self.sched.nodepool.cancelRequest(req) - jobs_to_release.append(req.job) - old_build_set.node_requests = {} - canceled_jobs = set() - for build in old_build_set.getBuilds(): - if build.result: - canceled_jobs.add(build.job.name) - continue - was_running = False - try: - was_running = self.sched.executor.cancel(build) - except Exception: - self.log.exception("Exception while canceling build %s " - "for change %s" % (build, item.change)) - jobs_to_release.append(build.job) - if not was_running: - nodeset = build.build_set.getJobNodeSet(build.job.name) - self.sched.nodepool.returnNodeSet(nodeset, build) - build.result = 'CANCELED' - canceled = True - canceled_jobs.add(build.job.name) - for jobname, nodeset in list(old_build_set.nodesets.items()): - if jobname in canceled_jobs: - continue - self.sched.nodepool.returnNodeSet(nodeset) - jobs_to_release.append(old_jobs[jobname]) - - for job in jobs_to_release: - tenant = old_build_set.item.pipeline.tenant - tenant.semaphore_handler.release( - old_build_set.item, job) + for job in jobs_to_cancel: + self.sched.cancelJob(old_build_set, job) for item_behind in item.items_behind: self.log.debug("Canceling jobs for change %s, behind change %s" % diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 6f82568d42..31995c88c5 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -831,35 +831,15 @@ class Scheduler(threading.Thread): requests_to_cancel.append( (item.current_build_set, request)) - semaphores_to_release = [] for build in builds_to_cancel: self.log.info( "Canceling build %s during reconfiguration" % (build,)) - try: - self.executor.cancel(build) - except Exception: - self.log.exception( - "Exception while canceling build %s " - "for change %s" % (build, build.build_set.item.change)) - # In the unlikely case that a build is removed and - # later added back, make sure we clear out the nodeset - # so it gets requested again. - try: - build.build_set.removeJobNodeSet(build.job.name) - except Exception: - self.log.exception( - "Exception while removing nodeset from build %s " - "for change %s" % (build, build.build_set.item.change)) - semaphores_to_release.append((build.build_set.item, build.job)) + self.cancelJob(build.build_set, build.job, build=build) for build_set, request in requests_to_cancel: self.log.info( "Canceling node request %s during reconfiguration", request) - self.nodepool.cancelRequest(request) - build_set.removeJobNodeRequest(request.job.name) - semaphores_to_release.append((build_set.item, request.job)) - for item, job in semaphores_to_release: - tenant.semaphore_handler.release(item, job) + self.cancelJob(build_set, request.job) def _reconfigureTenant(self, tenant): # This is called from _doReconfigureEvent while holding the @@ -1413,3 +1393,48 @@ class Scheduler(threading.Thread): if change.isUpdateOf(dep): other_change.refresh_deps = True change.refresh_deps = True + + def cancelJob(self, buildset, job, build=None): + item = buildset.item + try: + # Cancel node request if needed + req = buildset.node_requests.get(job) + if req: + self.nodepool.cancelRequest(req) + buildset.removeJobNodeRequest(job.name) + + # Cancel build if needed + job_name = job.name + build = build or buildset.getBuild(job_name) + if build: + was_running = False + try: + was_running = self.executor.cancel(build) + except Exception: + self.log.exception( + "Exception while canceling build %s for change %s" % ( + build, item.change)) + + # In the unlikely case that a build is removed and + # later added back, make sure we clear out the nodeset + # so it gets requested again. + try: + buildset.removeJobNodeSet(job.name) + except Exception: + self.log.exception( + "Exception while removing nodeset from build %s " + "for change %s" % (build, build.build_set.item.change)) + + if not was_running: + nodeset = buildset.getJobNodeSet(job.name) + if nodeset: + self.nodepool.returnNodeSet(nodeset, build) + build.result = 'CANCELED' + else: + nodeset = buildset.getJobNodeSet(job.name) + if nodeset: + self.nodepool.returnNodeSet(nodeset) + finally: + # Release the semaphore in any case + tenant = buildset.item.pipeline.tenant + tenant.semaphore_handler.release(item, job)