Report early failure from Ansible task failures

We can have the Ansible callback plugin tell the executor to tell
the scheduler that a task has failed and therefore the job will
fail.  This will allow the scheduler to begin a gate reset before
the failing job has finished and potentially save much developer
and CPU time.

We take some extra precautions to try to avoid sending a pre-fail
notification where we think we might end up retrying the job
(either due to a failure in a pre-run playbook, or an unreachable
host).  If that does happen then a gate pipeline might end up
flapping between two different NNFI configurations (ie, it may
perform unecessary gate resets behind the change with the retrying
job), but should still not produce an incorrect result.  Presumably
the detections here should catch that case sufficiently early, but
due to the nature of these errors, we may need to observe it in
production to be sure.

Change-Id: Ic40b8826f2d54e45fb6c4d2478761a89ef4549e4
This commit is contained in:
James E. Blair 2023-05-16 16:11:11 -07:00
parent a8a302eb82
commit 1170b91bd8
17 changed files with 506 additions and 18 deletions

View File

@ -119,3 +119,9 @@ Version 13
:Description: Stores only the necessary event info as part of a queue item
instead of the full trigger event.
Affects schedulers.
Version 14
----------
:Prior Zuul version: 8.2.0
:Description: Adds the pre_fail attribute to builds.
Affects schedulers.

View File

@ -0,0 +1,6 @@
---
features:
- |
Zuul will now begin the process of aborting following jobs and
re-ordering pipelines immediately after the first Ansible task
fails.

View File

@ -3342,7 +3342,7 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
self.recordResult(self.result)
def runAnsible(self, cmd, timeout, playbook, ansible_version,
wrapped=True, cleanup=False):
allow_pre_fail, wrapped=True, cleanup=False):
build = self.executor_server.job_builds[self.build_request.uuid]
if self.executor_server._run_ansible:
@ -3353,7 +3353,8 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
build.run()
result = super(RecordingAnsibleJob, self).runAnsible(
cmd, timeout, playbook, ansible_version, wrapped, cleanup)
cmd, timeout, playbook, ansible_version, allow_pre_fail,
wrapped, cleanup)
else:
if playbook not in [self.jobdir.setup_playbook,
self.jobdir.freeze_playbook]:

View File

@ -0,0 +1,13 @@
- hosts: localhost
tasks:
- file:
path: "{{zuul._test.test_root}}/builds/{{zuul.build}}.failure_start.flag"
state: touch
# Do not finish until test creates the flag file
- debug:
msg: "Waiting for {{zuul._test.test_root}}/builds/{{zuul.build}}/failure_continue_flag"
- wait_for:
state: present
path: "{{zuul._test.test_root}}/builds/{{zuul.build}}/failure_continue_flag"
- fail:
msg: FAIL!

View File

@ -0,0 +1,14 @@
- hosts: localhost
tasks:
- file:
path: "{{zuul._test.test_root}}/builds/{{zuul.build}}.wait_start.flag"
state: touch
# Do not finish until test creates the flag file
- debug:
msg: "Waiting for {{zuul._test.test_root}}/builds/{{zuul.build}}/wait_continue_flag"
- wait_for:
state: present
path: "{{zuul._test.test_root}}/builds/{{zuul.build}}/wait_continue_flag"
- file:
path: "{{zuul._test.test_root}}/builds/{{zuul.build}}.wait_end.flag"
state: touch

View File

@ -0,0 +1,116 @@
- pipeline:
name: check
manager: independent
post-review: true
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- pipeline:
name: gate
manager: dependent
success-message: Build succeeded (gate).
trigger:
gerrit:
- event: comment-added
approval:
- Approved: 1
success:
gerrit:
Verified: 2
submit: true
failure:
gerrit:
Verified: -2
start:
gerrit:
Verified: 0
precedence: high
- job:
name: base
parent: null
- job:
name: early-failure
files:
- early-failure.txt
run:
- playbooks/failure.yaml
post-run:
- playbooks/wait.yaml
- job:
name: pre-failure
files:
- pre-failure.txt
pre-run:
- playbooks/failure.yaml
run:
# This won't actually be run
- playbooks/wait.yaml
- job:
name: wait
run:
- playbooks/wait.yaml
- queue:
name: shared
- queue:
name: fail-fast
- project:
name: org/project1
queue: shared
check:
jobs:
- early-failure
- pre-failure
gate:
jobs:
- early-failure
- pre-failure
- project:
name: org/project2
queue: shared
check:
jobs:
- wait
gate:
jobs:
- wait
- project:
name: org/project3
queue: fail-fast
check:
fail-fast: true
jobs:
- early-failure
- wait
gate:
fail-fast: true
jobs:
- early-failure
- wait
- project:
name: org/project4
queue: fail-fast
check:
fail-fast: true
jobs:
- wait
gate:
fail-fast: true
jobs:
- wait

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1,12 @@
- tenant:
name: tenant-one
source:
gerrit:
config-projects:
- common-config
untrusted-projects:
- org/project1
- org/project2
- org/project3
- org/project4

View File

@ -5762,6 +5762,278 @@ class TestDiskAccounting(AnsibleZuulTestCase):
dict(name='dd-big-empty-file', result='ABORTED', changes='1,1')])
class TestEarlyFailure(AnsibleZuulTestCase):
tenant_config_file = 'config/early-failure/main.yaml'
def test_early_failure(self):
file_dict = {'early-failure.txt': ''}
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A',
files=file_dict)
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.log.debug("Wait for the first change to start its job")
for _ in iterate_timeout(30, 'job A started'):
if len(self.builds) == 1:
break
A_build = self.builds[0]
start = os.path.join(self.jobdir_root, A_build.uuid +
'.failure_start.flag')
for _ in iterate_timeout(30, 'job A running'):
if os.path.exists(start):
break
self.log.debug("Add a second change which will test with the first")
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
B.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
self.log.debug("Wait for the second change to start its job")
for _ in iterate_timeout(30, 'job B started'):
if len(self.builds) == 2:
break
B_build = self.builds[1]
start = os.path.join(self.jobdir_root, B_build.uuid +
'.wait_start.flag')
for _ in iterate_timeout(30, 'job B running'):
if os.path.exists(start):
break
self.log.debug("Continue the first job which will fail early")
flag_path = os.path.join(self.jobdir_root, A_build.uuid,
'failure_continue_flag')
self.log.debug("Writing %s", flag_path)
with open(flag_path, "w") as of:
of.write("continue")
self.log.debug("Wait for the second job to be aborted "
"and restarted without the first change")
for _ in iterate_timeout(30, 'job B restarted'):
if len(self.builds) == 2:
B_build2 = self.builds[1]
if B_build2 != B_build:
break
self.log.debug("Wait for the first job to be in its post-run playbook")
start = os.path.join(self.jobdir_root, A_build.uuid +
'.wait_start.flag')
for _ in iterate_timeout(30, 'job A post running'):
if os.path.exists(start):
break
self.log.debug("Allow the first job to finish")
flag_path = os.path.join(self.jobdir_root, A_build.uuid,
'wait_continue_flag')
self.log.debug("Writing %s", flag_path)
with open(flag_path, "w") as of:
of.write("continue")
self.log.debug("Wait for the first job to finish")
for _ in iterate_timeout(30, 'job A complete'):
if A_build not in self.builds:
break
self.log.debug("Allow the restarted second job to finish")
flag_path = os.path.join(self.jobdir_root, B_build2.uuid,
'wait_continue_flag')
self.log.debug("Writing %s", flag_path)
with open(flag_path, "w") as of:
of.write("continue")
self.waitUntilSettled()
self.assertHistory([
dict(name='wait', result='ABORTED', changes='1,1 2,1'),
dict(name='early-failure', result='FAILURE', changes='1,1'),
dict(name='wait', result='SUCCESS', changes='2,1'),
], ordered=True)
def test_pre_run_failure_retry(self):
# Test that we don't set pre_fail when a pre-run playbook fails
# (so we honor the retry logic and restart the job).
file_dict = {'pre-failure.txt': ''}
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A',
files=file_dict)
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.log.debug("Wait for the first change to start its job")
for _ in iterate_timeout(30, 'job A started'):
if len(self.builds) == 1:
break
A_build = self.builds[0]
start = os.path.join(self.jobdir_root, A_build.uuid +
'.failure_start.flag')
for _ in iterate_timeout(30, 'job A running'):
if os.path.exists(start):
break
self.log.debug("Add a second change which will test with the first")
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
B.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
self.log.debug("Wait for the second change to start its job")
for _ in iterate_timeout(30, 'job B started'):
if len(self.builds) == 2:
break
B_build = self.builds[1]
start = os.path.join(self.jobdir_root, B_build.uuid +
'.wait_start.flag')
for _ in iterate_timeout(30, 'job B running'):
if os.path.exists(start):
break
self.log.debug("Continue the first job which will fail early")
flag_path = os.path.join(self.jobdir_root, A_build.uuid,
'failure_continue_flag')
self.log.debug("Writing %s", flag_path)
with open(flag_path, "w") as of:
of.write("continue")
# From here out, allow any pre-failure job to
# continue until it has run three times
self.log.debug("Wait for all jobs to finish")
for _ in iterate_timeout(30, 'all jobs finished'):
if len(self.builds) == 1 and len(self.history) == 4:
break
for b in self.builds[:]:
if b.name == 'pre-failure':
try:
flag_path = os.path.join(self.jobdir_root, b.uuid,
'failure_continue_flag')
with open(flag_path, "w") as of:
of.write("continue")
except Exception:
self.log.debug("Unable to write flag path %s",
flag_path)
self.log.debug("Done")
self.log.debug("Wait for the second job to be aborted "
"and restarted without the first change")
for _ in iterate_timeout(30, 'job B restarted'):
if len(self.builds) == 1 and self.builds[0].name == 'wait':
B_build2 = self.builds[0]
if B_build2 != B_build:
break
self.log.debug("Wait for the second change to start its job")
start = os.path.join(self.jobdir_root, B_build2.uuid +
'.wait_start.flag')
for _ in iterate_timeout(30, 'job B running'):
if os.path.exists(start):
break
self.log.debug("Allow the restarted second job to finish")
flag_path = os.path.join(self.jobdir_root, B_build2.uuid,
'wait_continue_flag')
self.log.debug("Writing %s", flag_path)
with open(flag_path, "w") as of:
of.write("continue")
self.waitUntilSettled()
self.assertHistory([
dict(name='pre-failure', result=None, changes='1,1'),
dict(name='pre-failure', result=None, changes='1,1'),
dict(name='pre-failure', result=None, changes='1,1'),
dict(name='wait', result='ABORTED', changes='1,1 2,1'),
dict(name='wait', result='SUCCESS', changes='2,1'),
], ordered=True)
def test_early_failure_fail_fast(self):
file_dict = {'early-failure.txt': ''}
A = self.fake_gerrit.addFakeChange('org/project3', 'master', 'A',
files=file_dict)
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.log.debug("Wait for the first change to start its job")
for _ in iterate_timeout(30, 'job A started'):
if len(self.builds) == 1:
break
A_build = self.builds[0]
start = os.path.join(self.jobdir_root, A_build.uuid +
'.failure_start.flag')
for _ in iterate_timeout(30, 'job A running'):
if os.path.exists(start):
break
self.log.debug("Add a second change which will test with the first")
B = self.fake_gerrit.addFakeChange('org/project4', 'master', 'B')
B.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
self.log.debug("Wait for the second change to start its job")
for _ in iterate_timeout(30, 'job B started'):
if len(self.builds) == 3:
break
B_build = self.builds[2]
start = os.path.join(self.jobdir_root, B_build.uuid +
'.wait_start.flag')
for _ in iterate_timeout(30, 'job B running'):
if os.path.exists(start):
break
self.log.debug("Continue the first job which will fail early")
flag_path = os.path.join(self.jobdir_root, A_build.uuid,
'failure_continue_flag')
self.log.debug("Writing %s", flag_path)
with open(flag_path, "w") as of:
of.write("continue")
self.log.debug("Wait for the second job to be aborted "
"and restarted without the first change")
for _ in iterate_timeout(30, 'job B restarted'):
if len(self.builds) == 3:
B_build2 = self.builds[2]
if B_build2 != B_build:
break
self.log.debug("Wait for the first job to be in its post-run playbook")
start = os.path.join(self.jobdir_root, A_build.uuid +
'.wait_start.flag')
for _ in iterate_timeout(30, 'job A post running'):
if os.path.exists(start):
break
self.log.debug("Allow the first job to finish")
flag_path = os.path.join(self.jobdir_root, A_build.uuid,
'wait_continue_flag')
self.log.debug("Writing %s", flag_path)
with open(flag_path, "w") as of:
of.write("continue")
self.log.debug("Wait for the first job to finish")
for _ in iterate_timeout(30, 'job A complete'):
if A_build not in self.builds:
break
self.log.debug("Wait for the second change to start its job")
start = os.path.join(self.jobdir_root, B_build2.uuid +
'.wait_start.flag')
for _ in iterate_timeout(30, 'job B running'):
if os.path.exists(start):
break
self.log.debug("Allow the restarted second job to finish")
flag_path = os.path.join(self.jobdir_root, B_build2.uuid,
'wait_continue_flag')
self.log.debug("Writing %s", flag_path)
with open(flag_path, "w") as of:
of.write("continue")
self.waitUntilSettled()
self.assertHistory([
dict(name='wait', result='ABORTED', changes='1,1 2,1'),
dict(name='early-failure', result='FAILURE', changes='1,1'),
dict(name='wait', result='ABORTED', changes='1,1'),
dict(name='wait', result='SUCCESS', changes='2,1'),
], ordered=True)
class TestMaxNodesPerJob(AnsibleZuulTestCase):
tenant_config_file = 'config/multi-tenant/main.yaml'

View File

@ -125,6 +125,8 @@ class CallbackModule(default.CallbackModule):
logging_config.apply()
self._logger = logging.getLogger('zuul.executor.ansible')
self._result_logger = logging.getLogger(
'zuul.executor.ansible.result')
def _log(self, msg, ts=None, job=True, executor=False, debug=False):
# With the default "linear" strategy (and likely others),
@ -426,7 +428,7 @@ class CallbackModule(default.CallbackModule):
hostname = self._get_hostname(result)
self._log("%s | %s " % (hostname, line))
def v2_runner_on_failed(self, result, ignore_errors=False):
def _v2_runner_on_failed(self, result, ignore_errors=False):
result_dict = dict(result._result)
self._handle_exception(result_dict)
@ -452,6 +454,17 @@ class CallbackModule(default.CallbackModule):
if ignore_errors:
self._log_message(result, "Ignoring Errors", status="ERROR")
def v2_runner_on_failed(self, result, ignore_errors=False):
ret = self._v2_runner_on_failed(result, ignore_errors)
if not ignore_errors:
self._result_logger.info("failure")
return ret
def v2_runner_on_unreachable(self, result):
ret = self._v2_runner_on_failed(result)
self._result_logger.info("unreachable")
return ret
def v2_runner_on_skipped(self, result):
if result._task.loop:
self._items_done = False
@ -786,5 +799,3 @@ class CallbackModule(default.CallbackModule):
delegated_host=delegated_vars['ansible_host'])
else:
return result._host.get_name()
v2_runner_on_unreachable = v2_runner_on_failed

View File

@ -2703,7 +2703,7 @@ class AnsibleJob(object):
self.log.exception("Exception while killing ansible process:")
def runAnsible(self, cmd, timeout, playbook, ansible_version,
wrapped=True, cleanup=False):
allow_pre_fail, wrapped=True, cleanup=False):
config_file = playbook.ansible_config
env_copy = {key: value
for key, value in os.environ.copy().items()
@ -2823,10 +2823,20 @@ class AnsibleJob(object):
except Exception:
self.log.exception("Unable to list namespace pids")
first = False
if b'FATAL ERROR DURING FILE TRANSFER' in line:
# This can end up being an unreachable host (see
# below), so don't pre-fail in this case.
allow_pre_fail = False
result_line = None
if line.startswith(b'RESULT'):
# TODO(mordred) Process result commands if sent
continue
result_line = line[len('RESULT'):].strip()
if result_line == b'unreachable':
self.log.info("Early unreachable host")
allow_pre_fail = False
if allow_pre_fail and result_line == b'failure':
self.log.info("Early failure in job")
self.executor_server.updateBuildStatus(
self.build_request, {'pre_fail': True})
else:
idx += 1
if idx < BUFFER_LINES_FOR_SYNTAX:
@ -2837,7 +2847,10 @@ class AnsibleJob(object):
else:
line = line[:1024].rstrip()
ansible_log.debug("Ansible output: %s" % (line,))
if result_line:
ansible_log.debug("Ansible result output: %s" % (line,))
else:
ansible_log.debug("Ansible output: %s" % (line,))
self.log.debug("Ansible output terminated")
try:
cpu_times = self.proc.cpu_times()
@ -2874,6 +2887,9 @@ class AnsibleJob(object):
# creates the file job-output.unreachable in case there were
# unreachable nodes. This can be removed once ansible returns a
# distinct value for unreachable.
# TODO: Investigate whether the unreachable callback can be
# removed in favor of the ansible result log stream (see above
# in pre-fail)
if ret == 3 or os.path.exists(self.jobdir.job_unreachable_file):
# AnsibleHostUnreachable: We had a network issue connecting to
# our zuul-worker.
@ -2967,7 +2983,8 @@ class AnsibleJob(object):
result, code = self.runAnsible(
cmd=cmd, timeout=self.executor_server.setup_timeout,
playbook=playbook, ansible_version=ansible_version, wrapped=False)
playbook=playbook, ansible_version=ansible_version,
allow_pre_fail=False, wrapped=False)
self.log.debug("Ansible complete, result %s code %s" % (
self.RESULT_MAP[result], code))
if self.executor_server.statsd:
@ -3028,7 +3045,8 @@ class AnsibleJob(object):
result, code = self.runAnsible(
cmd=cmd, timeout=self.executor_server.setup_timeout,
playbook=playbook, ansible_version=ansible_version)
playbook=playbook, ansible_version=ansible_version,
allow_pre_fail=False)
self.log.debug("Ansible freeze complete, result %s code %s" % (
self.RESULT_MAP[result], code))
@ -3157,6 +3175,7 @@ class AnsibleJob(object):
if acquired_semaphores:
result, code = self.runAnsible(
cmd, timeout, playbook, ansible_version,
allow_pre_fail=phase in ('run', 'post'),
cleanup=phase == 'cleanup')
self.log.debug("Ansible complete, result %s code %s" % (
self.RESULT_MAP[result], code))

View File

@ -3924,6 +3924,7 @@ class Build(zkobject.ZKObject):
span_info=None,
# A list of build events like paused, resume, ...
events=[],
pre_fail=False,
)
def serialize(self, context):
@ -3938,6 +3939,7 @@ class Build(zkobject.ZKObject):
"estimated_time": self.estimated_time,
"canceled": self.canceled,
"paused": self.paused,
"pre_fail": self.pre_fail,
"retry": self.retry,
"held": self.held,
"zuul_event_id": self.zuul_event_id,
@ -4009,6 +4011,11 @@ class Build(zkobject.ZKObject):
# Load the object from ZK
data['_' + job_data_key] = JobData.fromZK(
context, job_data['path'])
# MODEL_API < 14
if 'pre_fail' not in data:
data['pre_fail'] = False
return data
def getPath(self):
@ -4059,7 +4066,9 @@ class Build(zkobject.ZKObject):
@property
def failed(self):
if self.result and self.result not in ['SUCCESS', 'SKIPPED']:
if self.pre_fail:
return True
if self.result and self.result not in ['SUCCESS', 'SKIPPED', 'RETRY']:
return True
return False
@ -5162,8 +5171,7 @@ class QueueItem(zkobject.ZKObject):
if not job.voting:
continue
build = self.current_build_set.getBuild(job.name)
if (build and build.result and
build.result not in ['SUCCESS', 'SKIPPED', 'RETRY']):
if (build and build.failed):
return True
return False
@ -5988,6 +5996,7 @@ class QueueItem(zkobject.ZKObject):
'pipeline': build.pipeline.name if build else None,
'canceled': build.canceled if build else None,
'paused': build.paused if build else None,
'pre_fail': build.pre_fail if build else None,
'retry': build.retry if build else None,
'tries': self.current_build_set.getTries(job.name),
'queued': job.queued,

View File

@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
MODEL_API = 13
MODEL_API = 14

View File

@ -2839,9 +2839,14 @@ class Scheduler(threading.Thread):
if not build:
return
# Allow URL to be updated
args = {}
if 'url' in event.data:
args['url'] = event.data['url']
if (COMPONENT_REGISTRY.model_api >= 14):
if 'pre_fail' in event.data:
args['pre_fail'] = event.data['pre_fail']
build.updateAttributes(pipeline.manager.current_context,
url=event.data.get('url', build.url))
**args)
def _doBuildPausedEvent(self, event, pipeline):
build = self._getBuildFromPipeline(event, pipeline)