Merge "Stop running commands with async"

This commit is contained in:
Jenkins 2016-10-19 13:54:48 +00:00 committed by Gerrit Code Review
commit 9555cafb98
4 changed files with 3 additions and 78 deletions

View File

@ -1,52 +0,0 @@
# Copyright 2016 IBM Corp.
#
# This file is part of Zuul
#
# This file is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This file is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this file. If not, see <http://www.gnu.org/licenses/>.
import time
from ansible.executor.task_result import TaskResult
from ansible.plugins.callback import CallbackBase
class CallbackModule(CallbackBase):
def __init__(self, *args, **kw):
super(CallbackModule, self).__init__(*args, **kw)
self._elapsed_time = 0.0
self._task_start_time = None
self._play = None
def v2_playbook_on_play_start(self, play):
self._play = play
def playbook_on_task_start(self, name, is_conditional):
self._task_start_time = time.time()
def v2_on_any(self, *args, **kw):
result = None
if args and isinstance(args[0], TaskResult):
result = args[0]
if not result:
return
if self._task_start_time is not None:
task_time = time.time() - self._task_start_time
self._elapsed_time += task_time
if self._play and result._host:
manager = self._play.get_variable_manager()
facts = dict(elapsed_time=int(self._elapsed_time))
manager.set_nonpersistent_facts(result._host, facts)
self._task_start_time = None

View File

@ -34,7 +34,6 @@ import jenkins_jobs.formatter
import zmq
import zuul.ansible.library
import zuul.ansible.plugins.callback_plugins
from zuul.lib import commandsocket
ANSIBLE_WATCHDOG_GRACE = 5 * 60
@ -213,19 +212,10 @@ class LaunchServer(object):
path = os.path.join(state_dir, 'launcher.socket')
self.command_socket = commandsocket.CommandSocket(path)
ansible_dir = os.path.join(state_dir, 'ansible')
plugins_dir = os.path.join(ansible_dir, 'plugins')
self.callback_dir = os.path.join(plugins_dir, 'callback_plugins')
if not os.path.exists(self.callback_dir):
os.makedirs(self.callback_dir)
self.library_dir = os.path.join(ansible_dir, 'library')
if not os.path.exists(self.library_dir):
os.makedirs(self.library_dir)
callback_path = os.path.dirname(os.path.abspath(
zuul.ansible.plugins.callback_plugins.__file__))
for fn in os.listdir(callback_path):
shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
library_path = os.path.dirname(os.path.abspath(
zuul.ansible.library.__file__))
for fn in os.listdir(library_path):
@ -513,8 +503,7 @@ class LaunchServer(object):
args['description'], args['labels'],
self.hostname, self.zmq_send_queue,
self.termination_queue, self.keep_jobdir,
self.callback_dir, self.library_dir,
self.options)
self.library_dir, self.options)
self.node_workers[worker.name] = worker
worker.thread = threading.Thread(target=worker.run)
@ -594,8 +583,7 @@ class NodeWorker(object):
def __init__(self, config, jobs, builds, sites, name, host,
description, labels, manager_name, zmq_send_queue,
termination_queue, keep_jobdir, callback_dir,
library_dir, options):
termination_queue, keep_jobdir, library_dir, options):
self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
self.log.debug("Creating node worker %s" % (name,))
self.config = config
@ -641,7 +629,6 @@ class NodeWorker(object):
self.username = config.get('launcher', 'username')
else:
self.username = 'zuul'
self.callback_dir = callback_dir
self.library_dir = library_dir
self.options = options
@ -1313,11 +1300,7 @@ class NodeWorker(object):
(executable, shell) = deal_with_shebang(builder['shell'])
task = dict(shell=shell)
task['name'] = ('command with {{ timeout | int - elapsed_time }} '
'second timeout')
task['when'] = '{{ elapsed_time < timeout | int }}'
task['async'] = '{{ timeout | int - elapsed_time }}'
task['poll'] = 5
task['name'] = 'command generated from JJB'
task['environment'] = parameters
task['args'] = dict(chdir=parameters['WORKSPACE'])
if executable:
@ -1370,19 +1353,15 @@ class NodeWorker(object):
inventory.write('\n')
timeout = None
timeout_var = None
for wrapper in jjb_job.get('wrappers', []):
if isinstance(wrapper, dict):
build_timeout = wrapper.get('timeout')
if isinstance(build_timeout, dict):
timeout_var = build_timeout.get('timeout-var')
timeout = build_timeout.get('timeout')
if timeout is not None:
timeout = int(timeout) * 60
if not timeout:
timeout = ANSIBLE_DEFAULT_TIMEOUT
if timeout_var:
parameters[timeout_var] = str(timeout * 1000)
with open(jobdir.playbook, 'w') as playbook:
pre_tasks = []
@ -1428,7 +1407,6 @@ class NodeWorker(object):
error_block.append(task)
error_block.append(dict(fail=dict(msg='FAILURE')))
variables.append(dict(timeout=timeout))
play = dict(hosts='node', name='Job body', vars=variables,
pre_tasks=pre_tasks, tasks=tasks)
playbook.write(yaml.safe_dump([play], default_flow_style=False))
@ -1473,7 +1451,6 @@ class NodeWorker(object):
config.write('retry_files_enabled = False\n')
config.write('log_path = %s\n' % jobdir.ansible_log)
config.write('gathering = explicit\n')
config.write('callback_plugins = %s\n' % self.callback_dir)
config.write('library = %s\n' % self.library_dir)
# TODO(mordred) This can be removed once we're using ansible 2.2
config.write('module_set_locale = False\n')