Merge "Reset dependent jobs when aborting paused job"

This commit is contained in:
Zuul 2019-04-15 22:22:35 +00:00 committed by Gerrit Code Review
commit 02c757e5f7
4 changed files with 144 additions and 4 deletions

View File

@ -1372,6 +1372,7 @@ class FakeBuild(object):
self.name = self.parameters['job']
self.wait_condition = threading.Condition()
self.waiting = False
self.paused = False
self.aborted = False
self.requeue = False
self.created = time.time()
@ -1546,7 +1547,9 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
if self.executor_server._run_ansible:
# Call run on the fake build omitting the result so we also can
# hold real ansible jobs.
build.run()
if playbook.path:
build.run()
result = super(RecordingAnsibleJob, self).runAnsible(
cmd, timeout, playbook, ansible_version, wrapped)
else:
@ -1570,6 +1573,17 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
host_keys=[]))
return hosts
def pause(self):
build = self.executor_server.job_builds[self.job.unique]
build.paused = True
super().pause()
def resume(self):
build = self.executor_server.job_builds.get(self.job.unique)
if build:
build.paused = False
super().resume()
class RecordingMergeClient(zuul.merger.client.MergeClient):
@ -3097,7 +3111,7 @@ class ZuulTestCase(BaseTestCase):
worker_build = self.executor_server.job_builds.get(
server_job.unique.decode('utf8'))
if worker_build:
if worker_build.isWaiting():
if worker_build.isWaiting() or worker_build.paused:
continue
else:
self.log.debug("%s is running" % worker_build)

View File

@ -4942,6 +4942,117 @@ class TestJobPause(AnsibleZuulTestCase):
self.assertEqual('compile1', history_compile1.name)
self.assertEqual('compile2', history_compile2.name)
def test_job_pause_retry(self):
"""
Tests that a paused job that gets lost due to an executor restart is
retried together with all child jobs.
This test will wait until compile1 is paused and then fails it. The
expectation is that all child jobs are retried even if they already
were successful.
compile1 --+
+--> test1-after-compile1
+--> test2-after-compile1
+--> compile2 --+
+--> test-after-compile2
test-good
test-fail
"""
self.wait_timeout = 120
self.executor_server.hold_jobs_in_build = True
# Output extra ansible info so we might see errors.
self.executor_server.verbose = True
self.executor_server.keep_jobdir = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.executor_server.release('test-.*')
self.executor_server.release('compile1')
self.waitUntilSettled()
# test-fail and test-good must be finished by now
self.assertHistory([
dict(name='test-fail', result='FAILURE', changes='1,1'),
dict(name='test-good', result='SUCCESS', changes='1,1'),
], ordered=False)
# Further compile1 must be in paused state and its three children in
# the queue. waitUltilSettled can return either directly after the job
# pause or after the child jobs are enqueued. So to make this
# deterministic we wait for the child jobs here
for _ in iterate_timeout(30, 'waiting for child jobs'):
if len(self.builds) == 4:
break
self.waitUntilSettled()
compile1 = self.builds[0]
self.assertTrue(compile1.paused)
# Now resume resume the compile2 sub tree so we can later check if all
# children restarted
self.executor_server.release('compile2')
for _ in iterate_timeout(30, 'waiting for child jobs'):
if len(self.builds) == 5:
break
self.waitUntilSettled()
self.executor_server.release('test-after-compile2')
self.waitUntilSettled()
self.executor_server.release('compile2')
self.waitUntilSettled()
self.assertHistory([
dict(name='test-fail', result='FAILURE', changes='1,1'),
dict(name='test-good', result='SUCCESS', changes='1,1'),
dict(name='compile2', result='SUCCESS', changes='1,1'),
dict(name='test-after-compile2', result='SUCCESS', changes='1,1'),
], ordered=False)
# Stop the job worker of compile1 to simulate an executor restart
for job_worker in self.executor_server.job_workers.values():
if job_worker.job.unique == compile1.unique:
job_worker.stop()
self.waitUntilSettled()
# All still running child jobs must be aborted
self.assertHistory([
dict(name='test-fail', result='FAILURE', changes='1,1'),
dict(name='test-good', result='SUCCESS', changes='1,1'),
dict(name='compile2', result='SUCCESS', changes='1,1'),
dict(name='test-after-compile2', result='SUCCESS', changes='1,1'),
dict(name='compile1', result='ABORTED', changes='1,1'),
dict(name='test1-after-compile1', result='ABORTED', changes='1,1'),
dict(name='test2-after-compile1', result='ABORTED', changes='1,1'),
], ordered=False)
# Only compile1 must be waiting
for _ in iterate_timeout(30, 'waiting for compile1 job'):
if len(self.builds) == 1:
break
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='test-fail', result='FAILURE', changes='1,1'),
dict(name='test-good', result='SUCCESS', changes='1,1'),
dict(name='compile2', result='SUCCESS', changes='1,1'),
dict(name='compile2', result='SUCCESS', changes='1,1'),
dict(name='test-after-compile2', result='SUCCESS', changes='1,1'),
dict(name='test-after-compile2', result='SUCCESS', changes='1,1'),
dict(name='compile1', result='ABORTED', changes='1,1'),
dict(name='compile1', result='SUCCESS', changes='1,1'),
dict(name='test1-after-compile1', result='ABORTED', changes='1,1'),
dict(name='test2-after-compile1', result='ABORTED', changes='1,1'),
dict(name='test1-after-compile1', result='SUCCESS', changes='1,1'),
dict(name='test2-after-compile1', result='SUCCESS', changes='1,1'),
], ordered=False)
def test_job_node_failure_resume(self):
self.wait_timeout = 120

View File

@ -1210,6 +1210,8 @@ class AnsibleJob(object):
pause = result_data.get('zuul', {}).get('pause')
if pause:
self.pause()
if self.aborted:
return 'ABORTED'
post_timeout = args['post_timeout']
unreachable = False

View File

@ -790,6 +790,15 @@ class PipelineManager(object):
self.sched.executor.resumeBuild(build)
build.paused = False
def _resetDependentBuilds(self, build_set, build):
jobgraph = build_set.item.job_graph
for job in jobgraph.getDependentJobsRecursively(build.job.name):
self.sched.cancelJob(build_set, job)
build = build_set.getBuild(job.name)
if build:
build_set.removeBuild(build)
def onBuildCompleted(self, build):
item = build.build_set.item
@ -800,8 +809,12 @@ class PipelineManager(object):
self.log.debug("Item %s status is now:\n %s" %
(item, item.formatStatus()))
if build.retry and build.build_set.getJobNodeSet(build.job.name):
build.build_set.removeJobNodeSet(build.job.name)
if build.retry:
if build.build_set.getJobNodeSet(build.job.name):
build.build_set.removeJobNodeSet(build.job.name)
# in case this was a paused build we need to retry all child jobs
self._resetDependentBuilds(build.build_set, build)
self._resumeBuilds(build.build_set)
return True