Do not use _display outside the main thread in zuul_stream

With the defaulh "linear" strategy (and likely others), Ansible will
send the on_task_start callback, and then fork a worker process to
execute that task.  Since we spawn a thread in the on_task_start
callback, we can end up emitting a log message in this method while
Ansible is forking.  If a forked process inherits a Python file object
(i.e., stdout) that is locked by a thread that doesn't exist in the
fork (i.e., this one), it can deadlock when trying to flush the file
object.  To minimize the chances of that happening, we should avoid
using _display outside the main thread.

The Python logging module is supposed to use internal locks which are
automatically aqcuired and released across a fork.  Assuming this is
(still) true and functioning correctly, we should be okay to issue
our Python logging module calls at any time.  If there is a fault
in this system, however, it could have a potential to cause a similar
problem.

If we can convince the Ansible maintainers to lock _display across
forks, we may be able to revert this change in the future.

Change-Id: Ifc6b835c151539e6209284728ccad467bef8be6f
This commit is contained in:
James E. Blair 2022-08-31 11:28:37 -07:00
parent 20e89b83cc
commit 36de4939a8
2 changed files with 20 additions and 18 deletions

View File

@ -31,11 +31,6 @@
mv job-output.txt job-output-success-19887.txt
mv job-output.json job-output-success-19887.json
- name: Check protocol version
assert:
that:
- "'[node1] Reports streaming version: 1' in _success_output.stdout"
# Streamer puts out a line like
# [node1] Starting to log 916b2084-4bbb-80e5-248e-000000000016-1-node1 for task TASK: Print binary data
# One of the tasks in job-output shows find: results;

View File

@ -121,6 +121,21 @@ class CallbackModule(default.CallbackModule):
self._logger = logging.getLogger('zuul.executor.ansible')
def _log(self, msg, ts=None, job=True, executor=False, debug=False):
# With the default "linear" strategy (and likely others),
# Ansible will send the on_task_start callback, and then fork
# a worker process to execute that task. Since we spawn a
# thread in the on_task_start callback, we can end up emitting
# a log message in this method while Ansible is forking. If a
# forked process inherits a Python file object (i.e., stdout)
# that is locked by a thread that doesn't exist in the fork
# (i.e., this one), it can deadlock when trying to flush the
# file object. To minimize the chances of that happening, we
# should avoid using _display outside the main thread.
# Therefore:
# Do not set executor=True from any calls from a thread
# spawned in this callback.
msg = msg.rstrip()
if job:
now = ts or datetime.datetime.now()
@ -143,10 +158,6 @@ class CallbackModule(default.CallbackModule):
s.settimeout(None)
return s
except socket.timeout:
self._log(
"Timeout exception waiting for the logger. "
"Please check connectivity to [%s:%s]"
% (ip, port), executor=True)
self._log_streamline(
"localhost",
"Timeout exception waiting for the logger. "
@ -155,16 +166,12 @@ class CallbackModule(default.CallbackModule):
return None
except Exception:
if logger_retries % 10 == 0:
self._log("[%s] Waiting on logger" % host,
executor=True, debug=True)
self._log("[%s] Waiting on logger" % host)
logger_retries += 1
time.sleep(0.1)
continue
def _read_log(self, host, ip, port, log_id, task_name, hosts):
self._log("[%s] Starting to log %s for task %s"
% (host, log_id, task_name), job=False, executor=True)
s = self._read_log_connect(host, ip, port)
if s is None:
# Can't connect; _read_log_connect() already logged an
@ -188,9 +195,6 @@ class CallbackModule(default.CallbackModule):
return
else:
self._zuul_console_version = int(buff)
self._log('[%s] Reports streaming version: %d' %
(host, self._zuul_console_version),
job=False, executor=True)
if self._zuul_console_version >= 1:
msg = 's:%s\n' % log_id
@ -349,6 +353,9 @@ class CallbackModule(default.CallbackModule):
log_id = "%s-%s-%s" % (
self._task._uuid, count, log_host)
self._log("[%s] Starting to log %s for task %s"
% (host, log_id, task_name),
job=False, executor=True)
streamer = threading.Thread(
target=self._read_log, args=(
host, ip, port, log_id, task_name, hosts))
@ -369,7 +376,7 @@ class CallbackModule(default.CallbackModule):
streamer.join(30)
if streamer.is_alive():
msg = "[Zuul] Log Stream did not terminate"
self._log(msg, job=True, executor=True)
self._log(msg)
self._streamers_stop = False
def _process_result_for_localhost(self, result, is_task=True):