Make log streaming to executor per-task

Spawning a single reader at the top isn't actually working. In cases
where there are multiple playbooks per host (literally every zuul v3 job
given pre playbooks for git repos), the stamp file was preventing following
playbook from spawning a daemon, but the daemon was only persistent in
the context of a single playbook.

We can't just spawn a new one per playbook without some modifications,
as otherwise the existing already-streamed content would get streamed
again.

Grab the finger streaming code, which accepts an argument as to what to
stream, and re-use it in zuul_console. Combine this with adding a unique
id to each task. That way each task on a host will log to a distinct
logfile, and each callback will stream only that task's log output.

This allows us to join the reader as well, so that we won't get
streaming overlap across tasks.

Change-Id: Ic5eb6c38af698f4ba8b4504aa69170834ec4036a
This commit is contained in:
Monty Taylor 2017-06-07 09:34:53 -05:00
parent 50e6b6b0b5
commit b115358259
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
3 changed files with 83 additions and 29 deletions

View File

@ -13,10 +13,12 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
import os
from __future__ import absolute_import
import multiprocessing
import socket
import time
import uuid
from ansible.plugins.callback import default
@ -82,21 +84,26 @@ class CallbackModule(default.CallbackModule):
super(CallbackModule, self).__init__()
self._task = None
self._daemon_running = False
self._daemon_stamp = 'daemon-stamp-%s'
self._host_dict = {}
self._play = None
self._streamer = None
def _read_log(self, host, ip):
self._display.display("[%s] starting to log" % host)
def _read_log(self, host, ip, log_id):
self._display.vvv("[%s] Starting to log" % host)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while True:
try:
s.connect((ip, LOG_STREAM_PORT))
except Exception:
self._display.display("[%s] Waiting on logger" % host)
self._display.vvv("[%s] Waiting on logger" % host)
time.sleep(0.1)
continue
s.send(log_id + '\n')
for line in linesplit(s):
self._display.display("[%s] %s " % (host, line.strip()))
if "[Zuul] Task exit code" in line:
return
else:
self._display.display("[%s] %s " % (host, line.strip()))
def v2_playbook_on_play_start(self, play):
self._play = play
@ -108,6 +115,8 @@ class CallbackModule(default.CallbackModule):
if self._play.strategy != 'free':
self._print_task_banner(task)
if task.action == 'command':
log_id = uuid.uuid4().hex
task.args['zuul_log_id'] = log_id
play_vars = self._play._variable_manager._hostvars
hosts = self._play.hosts
@ -119,24 +128,26 @@ class CallbackModule(default.CallbackModule):
hosts = play_vars.keys()
for host in hosts:
ip = play_vars[host]['ansible_host']
daemon_stamp = self._daemon_stamp % host
if not os.path.exists(daemon_stamp):
self._host_dict[host] = ip
# Touch stamp file
open(daemon_stamp, 'w').close()
p = multiprocessing.Process(
target=self._read_log, args=(host, ip))
p.daemon = True
p.start()
ip = play_vars[host].get(
'ansible_host', play_vars[host].get(
'ansible_inventory_host'))
self._host_dict[host] = ip
self._streamer = multiprocessing.Process(
target=self._read_log, args=(host, ip, log_id))
self._streamer.daemon = True
self._streamer.start()
def v2_runner_on_failed(self, result, ignore_errors=False):
if self._streamer:
self._streamer.join()
if result._task.action in ('command', 'shell'):
zuul_filter_result(result._result)
super(CallbackModule, self).v2_runner_on_failed(
result, ignore_errors=ignore_errors)
def v2_runner_on_ok(self, result):
if self._streamer:
self._streamer.join()
if result._task.action in ('command', 'shell'):
zuul_filter_result(result._result)
else:

View File

@ -121,15 +121,18 @@ from ansible.module_utils.basic import get_exception
from ast import literal_eval
LOG_STREAM_FILE = '/tmp/console.log'
LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log'
PASSWD_ARG_RE = re.compile(r'^[-]{0,2}pass[-]?(word|wd)?')
# List to save stdout log lines in as we collect them
_log_lines = []
class Console(object):
def __init__(self, log_uuid):
self.logfile_name = LOG_STREAM_FILE.format(log_uuid=log_uuid)
def __enter__(self):
self.logfile = open(LOG_STREAM_FILE, 'a', 0)
self.logfile = open(self.logfile_name, 'a', 0)
return self
def __exit__(self, etype, value, tb):
@ -145,9 +148,9 @@ class Console(object):
self.logfile.write(outln)
def follow(fd):
def follow(fd, log_uuid):
newline_warning = False
with Console() as console:
with Console(log_uuid) as console:
while True:
line = fd.readline()
if not line:
@ -163,7 +166,7 @@ def follow(fd):
# Taken from ansible/module_utils/basic.py ... forking the method for now
# so that we can dive in and figure out how to make appropriate hook points
def zuul_run_command(self, args, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None, use_unsafe_shell=False, prompt_regex=None, environ_update=None):
def zuul_run_command(self, args, zuul_log_id, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None, use_unsafe_shell=False, prompt_regex=None, environ_update=None):
'''
Execute a command, returns rc, stdout, and stderr.
@ -312,7 +315,7 @@ def zuul_run_command(self, args, check_rc=False, close_fds=True, executable=None
self.log('Executing: ' + running)
# ZUUL: Replaced the excution loop with the zuul_runner run function
cmd = subprocess.Popen(args, **kwargs)
t = threading.Thread(target=follow, args=(cmd.stdout,))
t = threading.Thread(target=follow, args=(cmd.stdout, zuul_log_id))
t.daemon = True
t.start()
ret = cmd.wait()
@ -321,7 +324,7 @@ def zuul_run_command(self, args, check_rc=False, close_fds=True, executable=None
# likely stuck in readline() because it spawed a child that is
# holding stdout or stderr open.
t.join(10)
with Console() as console:
with Console(zuul_log_id) as console:
if t.isAlive():
console.addLine("[Zuul] standard output/error still open "
"after child exited")
@ -397,6 +400,7 @@ def main():
removes = dict(type='path'),
warn = dict(type='bool', default=True),
environ = dict(type='dict', default=None),
zuul_log_id = dict(type='str'),
)
)
@ -408,6 +412,7 @@ def main():
removes = module.params['removes']
warn = module.params['warn']
environ = module.params['environ']
zuul_log_id = module.params['zuul_log_id']
if args.strip() == '':
module.fail_json(rc=256, msg="no command given")
@ -448,7 +453,7 @@ def main():
args = shlex.split(args)
startd = datetime.datetime.now()
rc, out, err = zuul_run_command(module, args, executable=executable, use_unsafe_shell=shell, environ_update=environ)
rc, out, err = zuul_run_command(module, args, zuul_log_id, executable=executable, use_unsafe_shell=shell, environ_update=environ)
endd = datetime.datetime.now()
delta = endd - startd
@ -467,7 +472,8 @@ def main():
end = str(endd),
delta = str(delta),
changed = True,
warnings = warnings
warnings = warnings,
zuul_log_id = zuul_log_id
)
if __name__ == '__main__':

View File

@ -22,7 +22,7 @@ import socket
import threading
import time
LOG_STREAM_FILE = '/tmp/console.log'
LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log'
LOG_STREAM_PORT = 19885
@ -63,6 +63,10 @@ class Console(object):
class Server(object):
MAX_REQUEST_LEN = 1024
REQUEST_TIMEOUT = 10
def __init__(self, path, port):
self.path = path
@ -85,9 +89,9 @@ class Server(object):
t.daemon = True
t.start()
def chunkConsole(self, conn):
def chunkConsole(self, conn, log_uuid):
try:
console = Console(self.path)
console = Console(self.path.format(log_uuid=log_uuid))
except Exception:
return
while True:
@ -132,7 +136,40 @@ class Server(object):
return True
console.size = st.st_size
def get_command(self, conn):
poll = select.poll()
bitmask = (select.POLLIN | select.POLLERR |
select.POLLHUP | select.POLLNVAL)
poll.register(conn, bitmask)
buffer = b''
ret = None
start = time.time()
while True:
elapsed = time.time() - start
timeout = max(self.REQUEST_TIMEOUT - elapsed, 0)
if not timeout:
raise Exception("Timeout while waiting for input")
for fd, event in poll.poll(timeout):
if event & select.POLLIN:
buffer += conn.recv(self.MAX_REQUEST_LEN)
else:
raise Exception("Received error event")
if len(buffer) >= self.MAX_REQUEST_LEN:
raise Exception("Request too long")
try:
ret = buffer.decode('utf-8')
x = ret.find('\n')
if x > 0:
return ret[:x]
except UnicodeDecodeError:
pass
def handleOneConnection(self, conn):
log_uuid = self.get_command(conn)
# use path split to make use the input isn't trying to be clever
# and construct some path like /tmp/console-/../../something
log_uuid = os.path.split(log_uuid.rstrip())[-1]
# FIXME: this won't notice disconnects until it tries to send
console = None
try:
@ -143,7 +180,7 @@ class Server(object):
except:
pass
while True:
console = self.chunkConsole(conn)
console = self.chunkConsole(conn, log_uuid)
if console:
break
time.sleep(0.5)