Centralize job canceling

Canceling jobs is hard because they can be in various states like
waiting for nodes, waiting for an executor or running. We further must
release the semaphore when canceling a job. We currently do this in
multiple places which can lead to hard to find bugs especially around
semaphores. Thus move the job canceling code into the scheduler at one
place and only use this for canceling jobs.

Change-Id: Iccee137909c8e99cf4e2860fb300f3100fed8dbe
This commit is contained in:
Tobias Henkel 2019-03-02 22:18:22 +01:00
parent d298cb12e0
commit 3704095c79
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
2 changed files with 50 additions and 57 deletions

View File

@ -420,46 +420,14 @@ class PipelineManager(object):
def cancelJobs(self, item, prime=True):
self.log.debug("Cancel jobs for change %s" % item.change)
canceled = False
jobs_to_release = []
old_build_set = item.current_build_set
old_jobs = {job.name: job for job in item.getJobs()}
jobs_to_cancel = item.getJobs()
if prime and item.current_build_set.ref:
item.resetAllBuilds()
for req in old_build_set.node_requests.values():
self.sched.nodepool.cancelRequest(req)
jobs_to_release.append(req.job)
old_build_set.node_requests = {}
canceled_jobs = set()
for build in old_build_set.getBuilds():
if build.result:
canceled_jobs.add(build.job.name)
continue
was_running = False
try:
was_running = self.sched.executor.cancel(build)
except Exception:
self.log.exception("Exception while canceling build %s "
"for change %s" % (build, item.change))
jobs_to_release.append(build.job)
if not was_running:
nodeset = build.build_set.getJobNodeSet(build.job.name)
self.sched.nodepool.returnNodeSet(nodeset, build)
build.result = 'CANCELED'
canceled = True
canceled_jobs.add(build.job.name)
for jobname, nodeset in list(old_build_set.nodesets.items()):
if jobname in canceled_jobs:
continue
self.sched.nodepool.returnNodeSet(nodeset)
jobs_to_release.append(old_jobs[jobname])
for job in jobs_to_release:
tenant = old_build_set.item.pipeline.tenant
tenant.semaphore_handler.release(
old_build_set.item, job)
for job in jobs_to_cancel:
self.sched.cancelJob(old_build_set, job)
for item_behind in item.items_behind:
self.log.debug("Canceling jobs for change %s, behind change %s" %

View File

@ -831,35 +831,15 @@ class Scheduler(threading.Thread):
requests_to_cancel.append(
(item.current_build_set, request))
semaphores_to_release = []
for build in builds_to_cancel:
self.log.info(
"Canceling build %s during reconfiguration" % (build,))
try:
self.executor.cancel(build)
except Exception:
self.log.exception(
"Exception while canceling build %s "
"for change %s" % (build, build.build_set.item.change))
# In the unlikely case that a build is removed and
# later added back, make sure we clear out the nodeset
# so it gets requested again.
try:
build.build_set.removeJobNodeSet(build.job.name)
except Exception:
self.log.exception(
"Exception while removing nodeset from build %s "
"for change %s" % (build, build.build_set.item.change))
semaphores_to_release.append((build.build_set.item, build.job))
self.cancelJob(build.build_set, build.job, build=build)
for build_set, request in requests_to_cancel:
self.log.info(
"Canceling node request %s during reconfiguration",
request)
self.nodepool.cancelRequest(request)
build_set.removeJobNodeRequest(request.job.name)
semaphores_to_release.append((build_set.item, request.job))
for item, job in semaphores_to_release:
tenant.semaphore_handler.release(item, job)
self.cancelJob(build_set, request.job)
def _reconfigureTenant(self, tenant):
# This is called from _doReconfigureEvent while holding the
@ -1413,3 +1393,48 @@ class Scheduler(threading.Thread):
if change.isUpdateOf(dep):
other_change.refresh_deps = True
change.refresh_deps = True
def cancelJob(self, buildset, job, build=None):
item = buildset.item
try:
# Cancel node request if needed
req = buildset.node_requests.get(job)
if req:
self.nodepool.cancelRequest(req)
buildset.removeJobNodeRequest(job.name)
# Cancel build if needed
job_name = job.name
build = build or buildset.getBuild(job_name)
if build:
was_running = False
try:
was_running = self.executor.cancel(build)
except Exception:
self.log.exception(
"Exception while canceling build %s for change %s" % (
build, item.change))
# In the unlikely case that a build is removed and
# later added back, make sure we clear out the nodeset
# so it gets requested again.
try:
buildset.removeJobNodeSet(job.name)
except Exception:
self.log.exception(
"Exception while removing nodeset from build %s "
"for change %s" % (build, build.build_set.item.change))
if not was_running:
nodeset = buildset.getJobNodeSet(job.name)
if nodeset:
self.nodepool.returnNodeSet(nodeset, build)
build.result = 'CANCELED'
else:
nodeset = buildset.getJobNodeSet(job.name)
if nodeset:
self.nodepool.returnNodeSet(nodeset)
finally:
# Release the semaphore in any case
tenant = buildset.item.pipeline.tenant
tenant.semaphore_handler.release(item, job)