Merge "Centralize job canceling"

This commit is contained in:
Zuul 2019-04-15 21:52:18 +00:00 committed by Gerrit Code Review
commit dac1ad5d73
2 changed files with 50 additions and 57 deletions

View File

@ -420,46 +420,14 @@ class PipelineManager(object):
def cancelJobs(self, item, prime=True):
self.log.debug("Cancel jobs for change %s" % item.change)
canceled = False
jobs_to_release = []
old_build_set = item.current_build_set
old_jobs = {job.name: job for job in item.getJobs()}
jobs_to_cancel = item.getJobs()
if prime and item.current_build_set.ref:
item.resetAllBuilds()
for req in old_build_set.node_requests.values():
self.sched.nodepool.cancelRequest(req)
jobs_to_release.append(req.job)
old_build_set.node_requests = {}
canceled_jobs = set()
for build in old_build_set.getBuilds():
if build.result:
canceled_jobs.add(build.job.name)
continue
was_running = False
try:
was_running = self.sched.executor.cancel(build)
except Exception:
self.log.exception("Exception while canceling build %s "
"for change %s" % (build, item.change))
jobs_to_release.append(build.job)
if not was_running:
nodeset = build.build_set.getJobNodeSet(build.job.name)
self.sched.nodepool.returnNodeSet(nodeset, build)
build.result = 'CANCELED'
canceled = True
canceled_jobs.add(build.job.name)
for jobname, nodeset in list(old_build_set.nodesets.items()):
if jobname in canceled_jobs:
continue
self.sched.nodepool.returnNodeSet(nodeset)
jobs_to_release.append(old_jobs[jobname])
for job in jobs_to_release:
tenant = old_build_set.item.pipeline.tenant
tenant.semaphore_handler.release(
old_build_set.item, job)
for job in jobs_to_cancel:
self.sched.cancelJob(old_build_set, job)
for item_behind in item.items_behind:
self.log.debug("Canceling jobs for change %s, behind change %s" %

View File

@ -847,35 +847,15 @@ class Scheduler(threading.Thread):
requests_to_cancel.append(
(item.current_build_set, request))
semaphores_to_release = []
for build in builds_to_cancel:
self.log.info(
"Canceling build %s during reconfiguration" % (build,))
try:
self.executor.cancel(build)
except Exception:
self.log.exception(
"Exception while canceling build %s "
"for change %s" % (build, build.build_set.item.change))
# In the unlikely case that a build is removed and
# later added back, make sure we clear out the nodeset
# so it gets requested again.
try:
build.build_set.removeJobNodeSet(build.job.name)
except Exception:
self.log.exception(
"Exception while removing nodeset from build %s "
"for change %s" % (build, build.build_set.item.change))
semaphores_to_release.append((build.build_set.item, build.job))
self.cancelJob(build.build_set, build.job, build=build)
for build_set, request in requests_to_cancel:
self.log.info(
"Canceling node request %s during reconfiguration",
request)
self.nodepool.cancelRequest(request)
build_set.removeJobNodeRequest(request.job.name)
semaphores_to_release.append((build_set.item, request.job))
for item, job in semaphores_to_release:
tenant.semaphore_handler.release(item, job)
self.cancelJob(build_set, request.job)
def _reconfigureTenant(self, tenant):
# This is called from _doReconfigureEvent while holding the
@ -1436,3 +1416,48 @@ class Scheduler(threading.Thread):
if change.isUpdateOf(dep):
other_change.refresh_deps = True
change.refresh_deps = True
def cancelJob(self, buildset, job, build=None):
item = buildset.item
try:
# Cancel node request if needed
req = buildset.node_requests.get(job)
if req:
self.nodepool.cancelRequest(req)
buildset.removeJobNodeRequest(job.name)
# Cancel build if needed
job_name = job.name
build = build or buildset.getBuild(job_name)
if build:
was_running = False
try:
was_running = self.executor.cancel(build)
except Exception:
self.log.exception(
"Exception while canceling build %s for change %s" % (
build, item.change))
# In the unlikely case that a build is removed and
# later added back, make sure we clear out the nodeset
# so it gets requested again.
try:
buildset.removeJobNodeSet(job.name)
except Exception:
self.log.exception(
"Exception while removing nodeset from build %s "
"for change %s" % (build, build.build_set.item.change))
if not was_running:
nodeset = buildset.getJobNodeSet(job.name)
if nodeset:
self.nodepool.returnNodeSet(nodeset, build)
build.result = 'CANCELED'
else:
nodeset = buildset.getJobNodeSet(job.name)
if nodeset:
self.nodepool.returnNodeSet(nodeset)
finally:
# Release the semaphore in any case
tenant = buildset.item.pipeline.tenant
tenant.semaphore_handler.release(item, job)