diff --git a/zuul/ansible/library/zuul_console.py b/zuul/ansible/library/zuul_console.py index 0e3e0668c7..e0d1d6eea6 100644 --- a/zuul/ansible/library/zuul_console.py +++ b/zuul/ansible/library/zuul_console.py @@ -109,12 +109,20 @@ class Server(object): def followConsole(self, console, conn): while True: - r = [console.file, conn] - e = [console.file, conn] - r, w, e = select.select(r, [], e) + # As long as we have unread data, keep reading/sending + while True: + chunk = console.file.read(4096) + if chunk: + conn.send(chunk) + else: + break - if console.file in e: - return True + # At this point, we are waiting for more data to be written + time.sleep(0.5) + + # Check to see if the remote end has sent any data, if so, + # discard + r, w, e = select.select([conn], [], [conn], 0) if conn in e: return False if conn in r: @@ -124,19 +132,15 @@ class Server(object): if not ret: return False - if console.file in r: - line = console.file.readline() - if line: - conn.send(line) - time.sleep(0.5) - try: - st = os.stat(console.path) - if (st.st_ino != console.stat.st_ino or - st.st_size < console.size): - return True - except Exception: + # See if the file has been truncated + try: + st = os.stat(console.path) + if (st.st_ino != console.stat.st_ino or + st.st_size < console.size): return True - console.size = st.st_size + except Exception: + return True + console.size = st.st_size def handleOneConnection(self, conn): # FIXME: this won't notice disconnects until it tries to send @@ -166,14 +170,14 @@ class Server(object): def test(): - s = Server('/tmp/console.log', 8088) + s = Server('/tmp/console.txt', 8088) s.run() def main(): module = AnsibleModule( argument_spec=dict( - path=dict(default='/tmp/console.log'), + path=dict(default='/tmp/console.txt'), port=dict(default=8088, type='int'), ) ) diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py index 75542445ee..955469fd12 100644 --- a/zuul/ansible/library/zuul_runner.py +++ b/zuul/ansible/library/zuul_runner.py @@ -21,7 +21,7 @@ import subprocess class Console(object): def __enter__(self): - self.logfile = open('/tmp/console.log', 'w+') + self.logfile = open('/tmp/console.txt', 'w+', 0) return self def __exit__(self, etype, value, tb): diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py index 844b390619..75b491136e 100644 --- a/zuul/launcher/ansiblelaunchserver.py +++ b/zuul/launcher/ansiblelaunchserver.py @@ -29,6 +29,7 @@ import uuid import gear import yaml import jenkins_jobs.builder +import jenkins_jobs.formatter import zmq import zuul.ansible.library @@ -136,6 +137,7 @@ class LaunchServer(object): builder.parser.expandYaml() unseen = set(self.jobs.keys()) for job in builder.parser.jobs: + builder.expandMacros(job) self.jobs[job['name']] = job unseen.discard(job['name']) for name in unseen: @@ -261,6 +263,10 @@ class LaunchServer(object): self.log.debug("Got termination event %s" % (item,)) if item is None: continue + worker = self.node_workers[item] + self.log.debug("Joining %s" % (item,)) + worker.process.join() + self.log.debug("Joined %s" % (item,)) del self.node_workers[item] except Exception: self.log.exception("Exception while processing " @@ -270,11 +276,10 @@ class LaunchServer(object): class NodeWorker(object): - log = logging.getLogger("zuul.NodeWorker") - def __init__(self, config, jobs, builds, sites, name, host, description, labels, manager_name, zmq_send_queue, termination_queue): + self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,)) self.log.debug("Creating node worker %s" % (name,)) self.config = config self.jobs = jobs @@ -325,12 +330,15 @@ class NodeWorker(object): self.worker.addServer(server, port) self.log.debug("Waiting for server") self.worker.waitForServer() + self.log.debug("Registering") self.register() self.gearman_thread = threading.Thread(target=self.runGearman) self.gearman_thread.daemon = True self.gearman_thread.start() + self.log.debug("Started") + while self._running or not self.queue.empty(): try: self._runQueue() @@ -559,7 +567,12 @@ class NodeWorker(object): return [('node', dict( ansible_host=self.host, ansible_user=self.username))] - def _makeSCPTask(self, publisher): + def _substituteVariables(self, text, variables): + def lookup(match): + return variables.get(match.group(1), '') + return re.sub('\$([A-Za-z0-9_]+)', lookup, text) + + def _makeSCPTask(self, jobdir, publisher, parameters): tasks = [] for scpfile in publisher['scp']['files']: site = publisher['scp']['site'] @@ -567,35 +580,64 @@ class NodeWorker(object): raise Exception("Undefined SCP site: %s" % (site,)) site = self.sites[site] if scpfile.get('copy-console'): - src = '/tmp/console.log' + src = '/tmp/console.txt' else: src = scpfile['source'] - dest = os.path.join(site['root'], scpfile['target']) + src = self._substituteVariables(src, parameters) + src = os.path.join(parameters['WORKSPACE'], src) + scproot = tempfile.mkdtemp(dir=jobdir.ansible_root) + os.chmod(scproot, 0o755) + syncargs = dict(src=src, + dest=scproot, + mode='pull') + task = dict(synchronize=syncargs) + if not scpfile.get('copy-after-failure'): + task['when'] = 'success' + tasks.append(task) + + dest = scpfile['target'] + dest = self._substituteVariables(dest, parameters) + dest = os.path.join(site['root'], dest) dest = os.path.normpath(dest) if not dest.startswith(site['root']): raise Exception("Target path %s is not below site root" % (dest,)) - syncargs = dict(src=src, - dest=dest) - task = dict(synchronize=syncargs, - delegate_to=site['host']) + local_args = [ + 'command', '/usr/bin/rsync', '--delay-updates', '-F', + '--compress', '-rt', '--safe-links', '--rsh', + '"/usr/bin/ssh -i {private_key_file} -S none ' + '-o StrictHostKeyChecking=no"', + '--out-format="<>%i %n%L"', + '"{source}/"', '"{user}@{host}:{dest}"' + ] + local_action = ' '.join(local_args).format( + source=scproot, + dest=dest, + private_key_file=self.private_key_file, + host=site['host'], + user=site['user']) + task = dict(local_action=local_action) if not scpfile.get('copy-after-failure'): task['when'] = 'success' tasks.append(task) return tasks - def _makeFTPTask(self, jobdir, publisher): + def _makeFTPTask(self, jobdir, publisher, parameters): tasks = [] ftp = publisher['ftp'] site = ftp['site'] if site not in self.sites: raise Exception("Undefined FTP site: %s" % site) site = self.sites[site] + ftproot = tempfile.mkdtemp(dir=jobdir.ansible_root) ftpcontent = os.path.join(ftproot, 'content') os.makedirs(ftpcontent) ftpscript = os.path.join(ftproot, 'script') - syncargs = dict(src=ftp['source'], + src = ftp['source'] + src = self._substituteVariables(src, parameters) + src = os.path.join(parameters['WORKSPACE'], src) + syncargs = dict(src=src, dest=ftpcontent) task = dict(synchronize=syncargs, when='success') @@ -608,6 +650,7 @@ class NodeWorker(object): while ftpsource[-1] == '/': ftpsource = ftpsource[:-1] ftptarget = ftp['target'] + ftptarget = self._substituteVariables(ftptarget, parameters) ftptarget = os.path.join(site['root'], ftp['target']) ftptarget = os.path.normpath(ftptarget) if not ftptarget.startswith(site['root']): @@ -627,7 +670,10 @@ class NodeWorker(object): script_fn = '%s.sh' % str(uuid.uuid4().hex) script_path = os.path.join(jobdir.script_root, script_fn) with open(script_path, 'w') as script: - script.write(builder['shell']) + data = builder['shell'] + if not data.startswith('#!'): + data = '#!/bin/bash -x\n %s' % (data,) + script.write(data) remote_path = os.path.join('/tmp', script_fn) copy = dict(src=script_path, @@ -681,10 +727,10 @@ class NodeWorker(object): with open(jobdir.playbook, 'w') as playbook: tasks = [] - task = dict(file=dict(path='/tmp/console.log', state='absent')) + task = dict(file=dict(path='/tmp/console.txt', state='absent')) tasks.append(task) - task = dict(zuul_console=dict(path='/tmp/console.log', port=8088)) + task = dict(zuul_console=dict(path='/tmp/console.txt', port=8088)) tasks.append(task) task = dict(file=dict(path=parameters['WORKSPACE'], @@ -703,9 +749,11 @@ class NodeWorker(object): tasks = [] for publisher in jjb_job.get('publishers', []): if 'scp' in publisher: - tasks.extend(self._makeSCPTask(publisher)) + tasks.extend(self._makeSCPTask(jobdir, publisher, + parameters)) if 'ftp' in publisher: - tasks.extend(self._makeFTPTask(jobdir, publisher)) + tasks.extend(self._makeFTPTask(jobdir, publisher, + parameters)) play = dict(hosts='node', name='Publishers', tasks=tasks) playbook.write(yaml.dump([play])) @@ -747,13 +795,15 @@ class NodeWorker(object): def runAnsiblePostPlaybook(self, jobdir, success): proc = subprocess.Popen( ['ansible-playbook', jobdir.post_playbook, - '-e', 'success=%s' % success], + '-e', 'success=%s' % success, '-v'], cwd=jobdir.ansible_root, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid, ) (out, err) = proc.communicate() + self.log.debug("Ansible post stdout:\n%s" % out) + self.log.debug("Ansible post stderr:\n%s" % err) return proc.wait() == 0 @@ -761,3 +811,34 @@ class JJB(jenkins_jobs.builder.Builder): def __init__(self): self.global_config = None self._plugins_list = [] + + def expandComponent(self, component_type, component, template_data): + component_list_type = component_type + 's' + new_components = [] + if isinstance(component, dict): + name, component_data = next(iter(component.items())) + if template_data: + component_data = jenkins_jobs.formatter.deep_format( + component_data, template_data, True) + else: + name = component + component_data = {} + + new_component = self.parser.data[component_type].get(name) + if new_component: + for new_sub_component in new_component[component_list_type]: + new_components.extend( + self.expandComponent(component_type, + new_sub_component, component_data)) + else: + new_components.append({name: component_data}) + return new_components + + def expandMacros(self, job): + for component_type in ['builder', 'publisher']: + component_list_type = component_type + 's' + new_components = [] + for new_component in job.get(component_list_type, []): + new_components.extend(self.expandComponent(component_type, + new_component, {})) + job[component_list_type] = new_components