Make log streaming to executor per-task

Spawning a single reader at the top isn't actually working. In cases
where there are multiple playbooks per host (literally every zuul v3 job
given pre playbooks for git repos), the stamp file was preventing following
playbook from spawning a daemon, but the daemon was only persistent in
the context of a single playbook.

We can't just spawn a new one per playbook without some modifications,
as otherwise the existing already-streamed content would get streamed
again.

Grab the finger streaming code, which accepts an argument as to what to
stream, and re-use it in zuul_console. Combine this with adding a unique
id to each task. That way each task on a host will log to a distinct
logfile, and each callback will stream only that task's log output.

This allows us to join the reader as well, so that we won't get
streaming overlap across tasks.

Change-Id: Ic5eb6c38af698f4ba8b4504aa69170834ec4036a
This commit is contained in:
Monty Taylor 2017-06-07 09:34:53 -05:00
parent 50e6b6b0b5
commit b115358259
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
3 changed files with 83 additions and 29 deletions

View File

@ -13,10 +13,12 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>. # along with Ansible. If not, see <http://www.gnu.org/licenses/>.
import os from __future__ import absolute_import
import multiprocessing import multiprocessing
import socket import socket
import time import time
import uuid
from ansible.plugins.callback import default from ansible.plugins.callback import default
@ -82,20 +84,25 @@ class CallbackModule(default.CallbackModule):
super(CallbackModule, self).__init__() super(CallbackModule, self).__init__()
self._task = None self._task = None
self._daemon_running = False self._daemon_running = False
self._daemon_stamp = 'daemon-stamp-%s'
self._host_dict = {} self._host_dict = {}
self._play = None
self._streamer = None
def _read_log(self, host, ip): def _read_log(self, host, ip, log_id):
self._display.display("[%s] starting to log" % host) self._display.vvv("[%s] Starting to log" % host)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while True: while True:
try: try:
s.connect((ip, LOG_STREAM_PORT)) s.connect((ip, LOG_STREAM_PORT))
except Exception: except Exception:
self._display.display("[%s] Waiting on logger" % host) self._display.vvv("[%s] Waiting on logger" % host)
time.sleep(0.1) time.sleep(0.1)
continue continue
s.send(log_id + '\n')
for line in linesplit(s): for line in linesplit(s):
if "[Zuul] Task exit code" in line:
return
else:
self._display.display("[%s] %s " % (host, line.strip())) self._display.display("[%s] %s " % (host, line.strip()))
def v2_playbook_on_play_start(self, play): def v2_playbook_on_play_start(self, play):
@ -108,6 +115,8 @@ class CallbackModule(default.CallbackModule):
if self._play.strategy != 'free': if self._play.strategy != 'free':
self._print_task_banner(task) self._print_task_banner(task)
if task.action == 'command': if task.action == 'command':
log_id = uuid.uuid4().hex
task.args['zuul_log_id'] = log_id
play_vars = self._play._variable_manager._hostvars play_vars = self._play._variable_manager._hostvars
hosts = self._play.hosts hosts = self._play.hosts
@ -119,24 +128,26 @@ class CallbackModule(default.CallbackModule):
hosts = play_vars.keys() hosts = play_vars.keys()
for host in hosts: for host in hosts:
ip = play_vars[host]['ansible_host'] ip = play_vars[host].get(
daemon_stamp = self._daemon_stamp % host 'ansible_host', play_vars[host].get(
if not os.path.exists(daemon_stamp): 'ansible_inventory_host'))
self._host_dict[host] = ip self._host_dict[host] = ip
# Touch stamp file self._streamer = multiprocessing.Process(
open(daemon_stamp, 'w').close() target=self._read_log, args=(host, ip, log_id))
p = multiprocessing.Process( self._streamer.daemon = True
target=self._read_log, args=(host, ip)) self._streamer.start()
p.daemon = True
p.start()
def v2_runner_on_failed(self, result, ignore_errors=False): def v2_runner_on_failed(self, result, ignore_errors=False):
if self._streamer:
self._streamer.join()
if result._task.action in ('command', 'shell'): if result._task.action in ('command', 'shell'):
zuul_filter_result(result._result) zuul_filter_result(result._result)
super(CallbackModule, self).v2_runner_on_failed( super(CallbackModule, self).v2_runner_on_failed(
result, ignore_errors=ignore_errors) result, ignore_errors=ignore_errors)
def v2_runner_on_ok(self, result): def v2_runner_on_ok(self, result):
if self._streamer:
self._streamer.join()
if result._task.action in ('command', 'shell'): if result._task.action in ('command', 'shell'):
zuul_filter_result(result._result) zuul_filter_result(result._result)
else: else:

View File

@ -121,15 +121,18 @@ from ansible.module_utils.basic import get_exception
from ast import literal_eval from ast import literal_eval
LOG_STREAM_FILE = '/tmp/console.log' LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log'
PASSWD_ARG_RE = re.compile(r'^[-]{0,2}pass[-]?(word|wd)?') PASSWD_ARG_RE = re.compile(r'^[-]{0,2}pass[-]?(word|wd)?')
# List to save stdout log lines in as we collect them # List to save stdout log lines in as we collect them
_log_lines = [] _log_lines = []
class Console(object): class Console(object):
def __init__(self, log_uuid):
self.logfile_name = LOG_STREAM_FILE.format(log_uuid=log_uuid)
def __enter__(self): def __enter__(self):
self.logfile = open(LOG_STREAM_FILE, 'a', 0) self.logfile = open(self.logfile_name, 'a', 0)
return self return self
def __exit__(self, etype, value, tb): def __exit__(self, etype, value, tb):
@ -145,9 +148,9 @@ class Console(object):
self.logfile.write(outln) self.logfile.write(outln)
def follow(fd): def follow(fd, log_uuid):
newline_warning = False newline_warning = False
with Console() as console: with Console(log_uuid) as console:
while True: while True:
line = fd.readline() line = fd.readline()
if not line: if not line:
@ -163,7 +166,7 @@ def follow(fd):
# Taken from ansible/module_utils/basic.py ... forking the method for now # Taken from ansible/module_utils/basic.py ... forking the method for now
# so that we can dive in and figure out how to make appropriate hook points # so that we can dive in and figure out how to make appropriate hook points
def zuul_run_command(self, args, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None, use_unsafe_shell=False, prompt_regex=None, environ_update=None): def zuul_run_command(self, args, zuul_log_id, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None, use_unsafe_shell=False, prompt_regex=None, environ_update=None):
''' '''
Execute a command, returns rc, stdout, and stderr. Execute a command, returns rc, stdout, and stderr.
@ -312,7 +315,7 @@ def zuul_run_command(self, args, check_rc=False, close_fds=True, executable=None
self.log('Executing: ' + running) self.log('Executing: ' + running)
# ZUUL: Replaced the excution loop with the zuul_runner run function # ZUUL: Replaced the excution loop with the zuul_runner run function
cmd = subprocess.Popen(args, **kwargs) cmd = subprocess.Popen(args, **kwargs)
t = threading.Thread(target=follow, args=(cmd.stdout,)) t = threading.Thread(target=follow, args=(cmd.stdout, zuul_log_id))
t.daemon = True t.daemon = True
t.start() t.start()
ret = cmd.wait() ret = cmd.wait()
@ -321,7 +324,7 @@ def zuul_run_command(self, args, check_rc=False, close_fds=True, executable=None
# likely stuck in readline() because it spawed a child that is # likely stuck in readline() because it spawed a child that is
# holding stdout or stderr open. # holding stdout or stderr open.
t.join(10) t.join(10)
with Console() as console: with Console(zuul_log_id) as console:
if t.isAlive(): if t.isAlive():
console.addLine("[Zuul] standard output/error still open " console.addLine("[Zuul] standard output/error still open "
"after child exited") "after child exited")
@ -397,6 +400,7 @@ def main():
removes = dict(type='path'), removes = dict(type='path'),
warn = dict(type='bool', default=True), warn = dict(type='bool', default=True),
environ = dict(type='dict', default=None), environ = dict(type='dict', default=None),
zuul_log_id = dict(type='str'),
) )
) )
@ -408,6 +412,7 @@ def main():
removes = module.params['removes'] removes = module.params['removes']
warn = module.params['warn'] warn = module.params['warn']
environ = module.params['environ'] environ = module.params['environ']
zuul_log_id = module.params['zuul_log_id']
if args.strip() == '': if args.strip() == '':
module.fail_json(rc=256, msg="no command given") module.fail_json(rc=256, msg="no command given")
@ -448,7 +453,7 @@ def main():
args = shlex.split(args) args = shlex.split(args)
startd = datetime.datetime.now() startd = datetime.datetime.now()
rc, out, err = zuul_run_command(module, args, executable=executable, use_unsafe_shell=shell, environ_update=environ) rc, out, err = zuul_run_command(module, args, zuul_log_id, executable=executable, use_unsafe_shell=shell, environ_update=environ)
endd = datetime.datetime.now() endd = datetime.datetime.now()
delta = endd - startd delta = endd - startd
@ -467,7 +472,8 @@ def main():
end = str(endd), end = str(endd),
delta = str(delta), delta = str(delta),
changed = True, changed = True,
warnings = warnings warnings = warnings,
zuul_log_id = zuul_log_id
) )
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -22,7 +22,7 @@ import socket
import threading import threading
import time import time
LOG_STREAM_FILE = '/tmp/console.log' LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log'
LOG_STREAM_PORT = 19885 LOG_STREAM_PORT = 19885
@ -63,6 +63,10 @@ class Console(object):
class Server(object): class Server(object):
MAX_REQUEST_LEN = 1024
REQUEST_TIMEOUT = 10
def __init__(self, path, port): def __init__(self, path, port):
self.path = path self.path = path
@ -85,9 +89,9 @@ class Server(object):
t.daemon = True t.daemon = True
t.start() t.start()
def chunkConsole(self, conn): def chunkConsole(self, conn, log_uuid):
try: try:
console = Console(self.path) console = Console(self.path.format(log_uuid=log_uuid))
except Exception: except Exception:
return return
while True: while True:
@ -132,7 +136,40 @@ class Server(object):
return True return True
console.size = st.st_size console.size = st.st_size
def get_command(self, conn):
poll = select.poll()
bitmask = (select.POLLIN | select.POLLERR |
select.POLLHUP | select.POLLNVAL)
poll.register(conn, bitmask)
buffer = b''
ret = None
start = time.time()
while True:
elapsed = time.time() - start
timeout = max(self.REQUEST_TIMEOUT - elapsed, 0)
if not timeout:
raise Exception("Timeout while waiting for input")
for fd, event in poll.poll(timeout):
if event & select.POLLIN:
buffer += conn.recv(self.MAX_REQUEST_LEN)
else:
raise Exception("Received error event")
if len(buffer) >= self.MAX_REQUEST_LEN:
raise Exception("Request too long")
try:
ret = buffer.decode('utf-8')
x = ret.find('\n')
if x > 0:
return ret[:x]
except UnicodeDecodeError:
pass
def handleOneConnection(self, conn): def handleOneConnection(self, conn):
log_uuid = self.get_command(conn)
# use path split to make use the input isn't trying to be clever
# and construct some path like /tmp/console-/../../something
log_uuid = os.path.split(log_uuid.rstrip())[-1]
# FIXME: this won't notice disconnects until it tries to send # FIXME: this won't notice disconnects until it tries to send
console = None console = None
try: try:
@ -143,7 +180,7 @@ class Server(object):
except: except:
pass pass
while True: while True:
console = self.chunkConsole(conn) console = self.chunkConsole(conn, log_uuid)
if console: if console:
break break
time.sleep(0.5) time.sleep(0.5)