Merge "Ansible launcher: several fixes"

This commit is contained in:
Jenkins 2016-05-25 21:30:31 +00:00 committed by Gerrit Code Review
commit 6298156d84
3 changed files with 122 additions and 37 deletions

View File

@ -109,12 +109,20 @@ class Server(object):
def followConsole(self, console, conn):
while True:
r = [console.file, conn]
e = [console.file, conn]
r, w, e = select.select(r, [], e)
# As long as we have unread data, keep reading/sending
while True:
chunk = console.file.read(4096)
if chunk:
conn.send(chunk)
else:
break
if console.file in e:
return True
# At this point, we are waiting for more data to be written
time.sleep(0.5)
# Check to see if the remote end has sent any data, if so,
# discard
r, w, e = select.select([conn], [], [conn], 0)
if conn in e:
return False
if conn in r:
@ -124,19 +132,15 @@ class Server(object):
if not ret:
return False
if console.file in r:
line = console.file.readline()
if line:
conn.send(line)
time.sleep(0.5)
try:
st = os.stat(console.path)
if (st.st_ino != console.stat.st_ino or
st.st_size < console.size):
return True
except Exception:
# See if the file has been truncated
try:
st = os.stat(console.path)
if (st.st_ino != console.stat.st_ino or
st.st_size < console.size):
return True
console.size = st.st_size
except Exception:
return True
console.size = st.st_size
def handleOneConnection(self, conn):
# FIXME: this won't notice disconnects until it tries to send
@ -166,14 +170,14 @@ class Server(object):
def test():
s = Server('/tmp/console.log', 8088)
s = Server('/tmp/console.txt', 8088)
s.run()
def main():
module = AnsibleModule(
argument_spec=dict(
path=dict(default='/tmp/console.log'),
path=dict(default='/tmp/console.txt'),
port=dict(default=8088, type='int'),
)
)

View File

@ -21,7 +21,7 @@ import subprocess
class Console(object):
def __enter__(self):
self.logfile = open('/tmp/console.log', 'w+')
self.logfile = open('/tmp/console.txt', 'w+', 0)
return self
def __exit__(self, etype, value, tb):

View File

@ -29,6 +29,7 @@ import uuid
import gear
import yaml
import jenkins_jobs.builder
import jenkins_jobs.formatter
import zmq
import zuul.ansible.library
@ -136,6 +137,7 @@ class LaunchServer(object):
builder.parser.expandYaml()
unseen = set(self.jobs.keys())
for job in builder.parser.jobs:
builder.expandMacros(job)
self.jobs[job['name']] = job
unseen.discard(job['name'])
for name in unseen:
@ -261,6 +263,10 @@ class LaunchServer(object):
self.log.debug("Got termination event %s" % (item,))
if item is None:
continue
worker = self.node_workers[item]
self.log.debug("Joining %s" % (item,))
worker.process.join()
self.log.debug("Joined %s" % (item,))
del self.node_workers[item]
except Exception:
self.log.exception("Exception while processing "
@ -270,11 +276,10 @@ class LaunchServer(object):
class NodeWorker(object):
log = logging.getLogger("zuul.NodeWorker")
def __init__(self, config, jobs, builds, sites, name, host,
description, labels, manager_name, zmq_send_queue,
termination_queue):
self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
self.log.debug("Creating node worker %s" % (name,))
self.config = config
self.jobs = jobs
@ -325,12 +330,15 @@ class NodeWorker(object):
self.worker.addServer(server, port)
self.log.debug("Waiting for server")
self.worker.waitForServer()
self.log.debug("Registering")
self.register()
self.gearman_thread = threading.Thread(target=self.runGearman)
self.gearman_thread.daemon = True
self.gearman_thread.start()
self.log.debug("Started")
while self._running or not self.queue.empty():
try:
self._runQueue()
@ -559,7 +567,12 @@ class NodeWorker(object):
return [('node', dict(
ansible_host=self.host, ansible_user=self.username))]
def _makeSCPTask(self, publisher):
def _substituteVariables(self, text, variables):
def lookup(match):
return variables.get(match.group(1), '')
return re.sub('\$([A-Za-z0-9_]+)', lookup, text)
def _makeSCPTask(self, jobdir, publisher, parameters):
tasks = []
for scpfile in publisher['scp']['files']:
site = publisher['scp']['site']
@ -567,35 +580,64 @@ class NodeWorker(object):
raise Exception("Undefined SCP site: %s" % (site,))
site = self.sites[site]
if scpfile.get('copy-console'):
src = '/tmp/console.log'
src = '/tmp/console.txt'
else:
src = scpfile['source']
dest = os.path.join(site['root'], scpfile['target'])
src = self._substituteVariables(src, parameters)
src = os.path.join(parameters['WORKSPACE'], src)
scproot = tempfile.mkdtemp(dir=jobdir.ansible_root)
os.chmod(scproot, 0o755)
syncargs = dict(src=src,
dest=scproot,
mode='pull')
task = dict(synchronize=syncargs)
if not scpfile.get('copy-after-failure'):
task['when'] = 'success'
tasks.append(task)
dest = scpfile['target']
dest = self._substituteVariables(dest, parameters)
dest = os.path.join(site['root'], dest)
dest = os.path.normpath(dest)
if not dest.startswith(site['root']):
raise Exception("Target path %s is not below site root" %
(dest,))
syncargs = dict(src=src,
dest=dest)
task = dict(synchronize=syncargs,
delegate_to=site['host'])
local_args = [
'command', '/usr/bin/rsync', '--delay-updates', '-F',
'--compress', '-rt', '--safe-links', '--rsh',
'"/usr/bin/ssh -i {private_key_file} -S none '
'-o StrictHostKeyChecking=no"',
'--out-format="<<CHANGED>>%i %n%L"',
'"{source}/"', '"{user}@{host}:{dest}"'
]
local_action = ' '.join(local_args).format(
source=scproot,
dest=dest,
private_key_file=self.private_key_file,
host=site['host'],
user=site['user'])
task = dict(local_action=local_action)
if not scpfile.get('copy-after-failure'):
task['when'] = 'success'
tasks.append(task)
return tasks
def _makeFTPTask(self, jobdir, publisher):
def _makeFTPTask(self, jobdir, publisher, parameters):
tasks = []
ftp = publisher['ftp']
site = ftp['site']
if site not in self.sites:
raise Exception("Undefined FTP site: %s" % site)
site = self.sites[site]
ftproot = tempfile.mkdtemp(dir=jobdir.ansible_root)
ftpcontent = os.path.join(ftproot, 'content')
os.makedirs(ftpcontent)
ftpscript = os.path.join(ftproot, 'script')
syncargs = dict(src=ftp['source'],
src = ftp['source']
src = self._substituteVariables(src, parameters)
src = os.path.join(parameters['WORKSPACE'], src)
syncargs = dict(src=src,
dest=ftpcontent)
task = dict(synchronize=syncargs,
when='success')
@ -608,6 +650,7 @@ class NodeWorker(object):
while ftpsource[-1] == '/':
ftpsource = ftpsource[:-1]
ftptarget = ftp['target']
ftptarget = self._substituteVariables(ftptarget, parameters)
ftptarget = os.path.join(site['root'], ftp['target'])
ftptarget = os.path.normpath(ftptarget)
if not ftptarget.startswith(site['root']):
@ -627,7 +670,10 @@ class NodeWorker(object):
script_fn = '%s.sh' % str(uuid.uuid4().hex)
script_path = os.path.join(jobdir.script_root, script_fn)
with open(script_path, 'w') as script:
script.write(builder['shell'])
data = builder['shell']
if not data.startswith('#!'):
data = '#!/bin/bash -x\n %s' % (data,)
script.write(data)
remote_path = os.path.join('/tmp', script_fn)
copy = dict(src=script_path,
@ -681,10 +727,10 @@ class NodeWorker(object):
with open(jobdir.playbook, 'w') as playbook:
tasks = []
task = dict(file=dict(path='/tmp/console.log', state='absent'))
task = dict(file=dict(path='/tmp/console.txt', state='absent'))
tasks.append(task)
task = dict(zuul_console=dict(path='/tmp/console.log', port=8088))
task = dict(zuul_console=dict(path='/tmp/console.txt', port=8088))
tasks.append(task)
task = dict(file=dict(path=parameters['WORKSPACE'],
@ -703,9 +749,11 @@ class NodeWorker(object):
tasks = []
for publisher in jjb_job.get('publishers', []):
if 'scp' in publisher:
tasks.extend(self._makeSCPTask(publisher))
tasks.extend(self._makeSCPTask(jobdir, publisher,
parameters))
if 'ftp' in publisher:
tasks.extend(self._makeFTPTask(jobdir, publisher))
tasks.extend(self._makeFTPTask(jobdir, publisher,
parameters))
play = dict(hosts='node', name='Publishers',
tasks=tasks)
playbook.write(yaml.dump([play], default_flow_style=False))
@ -747,13 +795,15 @@ class NodeWorker(object):
def runAnsiblePostPlaybook(self, jobdir, success):
proc = subprocess.Popen(
['ansible-playbook', jobdir.post_playbook,
'-e', 'success=%s' % success],
'-e', 'success=%s' % success, '-v'],
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
)
(out, err) = proc.communicate()
self.log.debug("Ansible post stdout:\n%s" % out)
self.log.debug("Ansible post stderr:\n%s" % err)
return proc.wait() == 0
@ -761,3 +811,34 @@ class JJB(jenkins_jobs.builder.Builder):
def __init__(self):
self.global_config = None
self._plugins_list = []
def expandComponent(self, component_type, component, template_data):
component_list_type = component_type + 's'
new_components = []
if isinstance(component, dict):
name, component_data = next(iter(component.items()))
if template_data:
component_data = jenkins_jobs.formatter.deep_format(
component_data, template_data, True)
else:
name = component
component_data = {}
new_component = self.parser.data[component_type].get(name)
if new_component:
for new_sub_component in new_component[component_list_type]:
new_components.extend(
self.expandComponent(component_type,
new_sub_component, component_data))
else:
new_components.append({name: component_data})
return new_components
def expandMacros(self, job):
for component_type in ['builder', 'publisher']:
component_list_type = component_type + 's'
new_components = []
for new_component in job.get(component_list_type, []):
new_components.extend(self.expandComponent(component_type,
new_component, {}))
job[component_list_type] = new_components