Merge "Fix deduplication exceptions in pipeline processing"

This commit is contained in:
Zuul 2022-12-20 01:48:36 +00:00 committed by Gerrit Code Review
commit 35c68169bd
6 changed files with 43 additions and 15 deletions

View File

@ -3610,10 +3610,11 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
self.log.debug('No running builds to release')
return
self.log.debug("Releasing build %s (%s)" % (regex, len(builds)))
self.log.debug("Releasing build %s %s (%s)" % (
regex, change, len(builds)))
for build in builds:
if (not regex or re.match(regex, build.name) and
not change or build.change == change):
if ((not regex or re.match(regex, build.name)) and
(not change or build.change == change)):
self.log.debug("Releasing build %s" %
(build.parameters['zuul']['build']))
build.release()
@ -5158,6 +5159,11 @@ class ZuulTestCase(BaseTestCase):
self.assertIsNotNone(build.start_time)
self.assertIsNotNone(build.end_time)
def assertNoPipelineExceptions(self):
for tenant in self.scheds.first.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
self.assertEqual(0, pipeline._exception_count)
def assertFinalState(self):
self.log.debug("Assert final state")
# Make sure no jobs are running
@ -5184,6 +5190,7 @@ class ZuulTestCase(BaseTestCase):
for pipeline in tenant.layout.pipelines.values():
if isinstance(pipeline.manager, ipm):
self.assertEqual(len(pipeline.queues), 0)
self.assertNoPipelineExceptions()
def shutdown(self):
self.log.debug("Shutting down after tests")

View File

@ -5,6 +5,7 @@ server=127.0.0.1
[scheduler]
tenant_config=main.yaml
relative_priority=true
[merger]
git_dir=/tmp/zuul-test/merger-git

View File

@ -118,6 +118,11 @@ class ExecutorClient(object):
# Store the NodeRequest ID in the job arguments, so we can look it up
# on the executor side to lock the nodes.
req_id = build.build_set.getJobNodeRequestID(job.name)
if isinstance(req_id, dict):
# This should never happen
raise Exception(
"Attempt to start build with deduplicated node request ID "
f"{req_id}")
if req_id:
params["noderequest_id"] = req_id

View File

@ -1647,7 +1647,7 @@ class PipelineManager(metaclass=ABCMeta):
if (item.live and not dequeued
and self.sched.globals.use_relative_priority):
priority = item.getNodePriority()
for request_id in item.current_build_set.node_requests.values():
for _, request_id in item.current_build_set.getNodeRequests():
node_request = self.sched.nodepool.zk_nodepool.getNodeRequest(
request_id, cached=True)
if not node_request:

View File

@ -473,6 +473,8 @@ class Pipeline(object):
self.window_decrease_factor = None
self.state = None
self.change_list = None
# Only used by the unit tests for assertions
self._exception_count = 0
@property
def queues(self):
@ -4429,8 +4431,18 @@ class BuildSet(zkobject.ZKObject):
with self.activeContext(self.item.pipeline.manager.current_context):
self.node_requests[job_name] = request_id
def getJobNodeRequestID(self, job_name):
return self.node_requests.get(job_name)
def getJobNodeRequestID(self, job_name, ignore_deduplicate=False):
r = self.node_requests.get(job_name)
if ignore_deduplicate and isinstance(r, dict):
return None
return r
def getNodeRequests(self):
# This ignores deduplicated node requests
for job_name, request in self.node_requests.items():
if isinstance(request, dict):
continue
yield job_name, request
def removeJobNodeRequestID(self, job_name):
if job_name in self.node_requests:

View File

@ -689,8 +689,8 @@ class Scheduler(threading.Thread):
# In case we're in the middle of a reconfig,
# include the old queue items.
for item in pipeline.getAllItems(include_old=True):
nrs = item.current_build_set.node_requests
for req_id in (nrs.values()):
nrs = item.current_build_set.getNodeRequests()
for _, req_id in nrs:
outstanding_requests.add(req_id)
leaked_requests = zk_requests - outstanding_requests
for req_id in leaked_requests:
@ -1632,7 +1632,7 @@ class Scheduler(threading.Thread):
item.removeBuild(build)
builds_to_cancel.append(build)
for request_job, request in \
item.current_build_set.node_requests.items():
item.current_build_set.getNodeRequests():
new_job = item.getJob(request_job)
if not new_job:
requests_to_cancel.append(
@ -1654,7 +1654,7 @@ class Scheduler(threading.Thread):
for build in item.current_build_set.getBuilds():
builds_to_cancel.append(build)
for request_job, request in \
item.current_build_set.node_requests.items():
item.current_build_set.getNodeRequests():
requests_to_cancel.append(
(
item.current_build_set,
@ -1781,7 +1781,7 @@ class Scheduler(threading.Thread):
for build in item.current_build_set.getBuilds():
builds_to_cancel.append(build)
for request_job, request in \
item.current_build_set.node_requests.items():
item.current_build_set.getNodeRequests():
requests_to_cancel.append(
(
item.current_build_set,
@ -2230,6 +2230,7 @@ class Scheduler(threading.Thread):
pass
except Exception:
self.log.exception("Exception in pipeline processing:")
pipeline._exception_count += 1
pipeline.state.updateAttributes(
ctx, state=pipeline.STATE_ERROR)
# Continue processing other pipelines+tenants
@ -2825,7 +2826,8 @@ class Scheduler(threading.Thread):
# In case the build didn't show up on any executor, the node
# request does still exist, so we have to make sure it is
# removed from ZK.
request_id = build.build_set.getJobNodeRequestID(build.job.name)
request_id = build.build_set.getJobNodeRequestID(
build.job.name, ignore_deduplicate=True)
if request_id:
self.nodepool.deleteNodeRequest(
request_id, event_id=build.zuul_event_id)
@ -2926,9 +2928,10 @@ class Scheduler(threading.Thread):
# Cancel node request if needed
req_id = buildset.getJobNodeRequestID(job_name)
if req_id:
req = self.nodepool.zk_nodepool.getNodeRequest(req_id)
if req:
self.nodepool.cancelRequest(req)
if not isinstance(req_id, dict):
req = self.nodepool.zk_nodepool.getNodeRequest(req_id)
if req:
self.nodepool.cancelRequest(req)
buildset.removeJobNodeRequestID(job_name)
# Cancel build if needed