Ansible launcher: several fixes
These were developed together in situ: * Fix tailing the console log * Change console log name from console.log to console.txt (for better auto content typing) * Expand JJB macros for builders and publishers * Use a two-stage SCP copy (worker -> controller; controller -> site); a one-stage copy is possible but will require installing a key on the remote site * Substitute parameters (eg $LOG_SERVER) into scp/ftp site paths * Better worker logging (use the worker name in the logger name) Change-Id: I98e5603f7a3210c1322640a66ecdeadb24ce74fe
This commit is contained in:
parent
1849e725ed
commit
b5aa68ffee
|
@ -109,12 +109,20 @@ class Server(object):
|
|||
|
||||
def followConsole(self, console, conn):
|
||||
while True:
|
||||
r = [console.file, conn]
|
||||
e = [console.file, conn]
|
||||
r, w, e = select.select(r, [], e)
|
||||
# As long as we have unread data, keep reading/sending
|
||||
while True:
|
||||
chunk = console.file.read(4096)
|
||||
if chunk:
|
||||
conn.send(chunk)
|
||||
else:
|
||||
break
|
||||
|
||||
if console.file in e:
|
||||
return True
|
||||
# At this point, we are waiting for more data to be written
|
||||
time.sleep(0.5)
|
||||
|
||||
# Check to see if the remote end has sent any data, if so,
|
||||
# discard
|
||||
r, w, e = select.select([conn], [], [conn], 0)
|
||||
if conn in e:
|
||||
return False
|
||||
if conn in r:
|
||||
|
@ -124,19 +132,15 @@ class Server(object):
|
|||
if not ret:
|
||||
return False
|
||||
|
||||
if console.file in r:
|
||||
line = console.file.readline()
|
||||
if line:
|
||||
conn.send(line)
|
||||
time.sleep(0.5)
|
||||
try:
|
||||
st = os.stat(console.path)
|
||||
if (st.st_ino != console.stat.st_ino or
|
||||
st.st_size < console.size):
|
||||
return True
|
||||
except Exception:
|
||||
# See if the file has been truncated
|
||||
try:
|
||||
st = os.stat(console.path)
|
||||
if (st.st_ino != console.stat.st_ino or
|
||||
st.st_size < console.size):
|
||||
return True
|
||||
console.size = st.st_size
|
||||
except Exception:
|
||||
return True
|
||||
console.size = st.st_size
|
||||
|
||||
def handleOneConnection(self, conn):
|
||||
# FIXME: this won't notice disconnects until it tries to send
|
||||
|
@ -166,14 +170,14 @@ class Server(object):
|
|||
|
||||
|
||||
def test():
|
||||
s = Server('/tmp/console.log', 8088)
|
||||
s = Server('/tmp/console.txt', 8088)
|
||||
s.run()
|
||||
|
||||
|
||||
def main():
|
||||
module = AnsibleModule(
|
||||
argument_spec=dict(
|
||||
path=dict(default='/tmp/console.log'),
|
||||
path=dict(default='/tmp/console.txt'),
|
||||
port=dict(default=8088, type='int'),
|
||||
)
|
||||
)
|
||||
|
|
|
@ -21,7 +21,7 @@ import subprocess
|
|||
|
||||
class Console(object):
|
||||
def __enter__(self):
|
||||
self.logfile = open('/tmp/console.log', 'w+')
|
||||
self.logfile = open('/tmp/console.txt', 'w+', 0)
|
||||
return self
|
||||
|
||||
def __exit__(self, etype, value, tb):
|
||||
|
|
|
@ -29,6 +29,7 @@ import uuid
|
|||
import gear
|
||||
import yaml
|
||||
import jenkins_jobs.builder
|
||||
import jenkins_jobs.formatter
|
||||
import zmq
|
||||
|
||||
import zuul.ansible.library
|
||||
|
@ -136,6 +137,7 @@ class LaunchServer(object):
|
|||
builder.parser.expandYaml()
|
||||
unseen = set(self.jobs.keys())
|
||||
for job in builder.parser.jobs:
|
||||
builder.expandMacros(job)
|
||||
self.jobs[job['name']] = job
|
||||
unseen.discard(job['name'])
|
||||
for name in unseen:
|
||||
|
@ -261,6 +263,10 @@ class LaunchServer(object):
|
|||
self.log.debug("Got termination event %s" % (item,))
|
||||
if item is None:
|
||||
continue
|
||||
worker = self.node_workers[item]
|
||||
self.log.debug("Joining %s" % (item,))
|
||||
worker.process.join()
|
||||
self.log.debug("Joined %s" % (item,))
|
||||
del self.node_workers[item]
|
||||
except Exception:
|
||||
self.log.exception("Exception while processing "
|
||||
|
@ -270,11 +276,10 @@ class LaunchServer(object):
|
|||
|
||||
|
||||
class NodeWorker(object):
|
||||
log = logging.getLogger("zuul.NodeWorker")
|
||||
|
||||
def __init__(self, config, jobs, builds, sites, name, host,
|
||||
description, labels, manager_name, zmq_send_queue,
|
||||
termination_queue):
|
||||
self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
|
||||
self.log.debug("Creating node worker %s" % (name,))
|
||||
self.config = config
|
||||
self.jobs = jobs
|
||||
|
@ -325,12 +330,15 @@ class NodeWorker(object):
|
|||
self.worker.addServer(server, port)
|
||||
self.log.debug("Waiting for server")
|
||||
self.worker.waitForServer()
|
||||
self.log.debug("Registering")
|
||||
self.register()
|
||||
|
||||
self.gearman_thread = threading.Thread(target=self.runGearman)
|
||||
self.gearman_thread.daemon = True
|
||||
self.gearman_thread.start()
|
||||
|
||||
self.log.debug("Started")
|
||||
|
||||
while self._running or not self.queue.empty():
|
||||
try:
|
||||
self._runQueue()
|
||||
|
@ -559,7 +567,12 @@ class NodeWorker(object):
|
|||
return [('node', dict(
|
||||
ansible_host=self.host, ansible_user=self.username))]
|
||||
|
||||
def _makeSCPTask(self, publisher):
|
||||
def _substituteVariables(self, text, variables):
|
||||
def lookup(match):
|
||||
return variables.get(match.group(1), '')
|
||||
return re.sub('\$([A-Za-z0-9_]+)', lookup, text)
|
||||
|
||||
def _makeSCPTask(self, jobdir, publisher, parameters):
|
||||
tasks = []
|
||||
for scpfile in publisher['scp']['files']:
|
||||
site = publisher['scp']['site']
|
||||
|
@ -567,35 +580,64 @@ class NodeWorker(object):
|
|||
raise Exception("Undefined SCP site: %s" % (site,))
|
||||
site = self.sites[site]
|
||||
if scpfile.get('copy-console'):
|
||||
src = '/tmp/console.log'
|
||||
src = '/tmp/console.txt'
|
||||
else:
|
||||
src = scpfile['source']
|
||||
dest = os.path.join(site['root'], scpfile['target'])
|
||||
src = self._substituteVariables(src, parameters)
|
||||
src = os.path.join(parameters['WORKSPACE'], src)
|
||||
scproot = tempfile.mkdtemp(dir=jobdir.ansible_root)
|
||||
os.chmod(scproot, 0o755)
|
||||
syncargs = dict(src=src,
|
||||
dest=scproot,
|
||||
mode='pull')
|
||||
task = dict(synchronize=syncargs)
|
||||
if not scpfile.get('copy-after-failure'):
|
||||
task['when'] = 'success'
|
||||
tasks.append(task)
|
||||
|
||||
dest = scpfile['target']
|
||||
dest = self._substituteVariables(dest, parameters)
|
||||
dest = os.path.join(site['root'], dest)
|
||||
dest = os.path.normpath(dest)
|
||||
if not dest.startswith(site['root']):
|
||||
raise Exception("Target path %s is not below site root" %
|
||||
(dest,))
|
||||
syncargs = dict(src=src,
|
||||
dest=dest)
|
||||
task = dict(synchronize=syncargs,
|
||||
delegate_to=site['host'])
|
||||
local_args = [
|
||||
'command', '/usr/bin/rsync', '--delay-updates', '-F',
|
||||
'--compress', '-rt', '--safe-links', '--rsh',
|
||||
'"/usr/bin/ssh -i {private_key_file} -S none '
|
||||
'-o StrictHostKeyChecking=no"',
|
||||
'--out-format="<<CHANGED>>%i %n%L"',
|
||||
'"{source}/"', '"{user}@{host}:{dest}"'
|
||||
]
|
||||
local_action = ' '.join(local_args).format(
|
||||
source=scproot,
|
||||
dest=dest,
|
||||
private_key_file=self.private_key_file,
|
||||
host=site['host'],
|
||||
user=site['user'])
|
||||
task = dict(local_action=local_action)
|
||||
if not scpfile.get('copy-after-failure'):
|
||||
task['when'] = 'success'
|
||||
tasks.append(task)
|
||||
return tasks
|
||||
|
||||
def _makeFTPTask(self, jobdir, publisher):
|
||||
def _makeFTPTask(self, jobdir, publisher, parameters):
|
||||
tasks = []
|
||||
ftp = publisher['ftp']
|
||||
site = ftp['site']
|
||||
if site not in self.sites:
|
||||
raise Exception("Undefined FTP site: %s" % site)
|
||||
site = self.sites[site]
|
||||
|
||||
ftproot = tempfile.mkdtemp(dir=jobdir.ansible_root)
|
||||
ftpcontent = os.path.join(ftproot, 'content')
|
||||
os.makedirs(ftpcontent)
|
||||
ftpscript = os.path.join(ftproot, 'script')
|
||||
syncargs = dict(src=ftp['source'],
|
||||
src = ftp['source']
|
||||
src = self._substituteVariables(src, parameters)
|
||||
src = os.path.join(parameters['WORKSPACE'], src)
|
||||
syncargs = dict(src=src,
|
||||
dest=ftpcontent)
|
||||
task = dict(synchronize=syncargs,
|
||||
when='success')
|
||||
|
@ -608,6 +650,7 @@ class NodeWorker(object):
|
|||
while ftpsource[-1] == '/':
|
||||
ftpsource = ftpsource[:-1]
|
||||
ftptarget = ftp['target']
|
||||
ftptarget = self._substituteVariables(ftptarget, parameters)
|
||||
ftptarget = os.path.join(site['root'], ftp['target'])
|
||||
ftptarget = os.path.normpath(ftptarget)
|
||||
if not ftptarget.startswith(site['root']):
|
||||
|
@ -627,7 +670,10 @@ class NodeWorker(object):
|
|||
script_fn = '%s.sh' % str(uuid.uuid4().hex)
|
||||
script_path = os.path.join(jobdir.script_root, script_fn)
|
||||
with open(script_path, 'w') as script:
|
||||
script.write(builder['shell'])
|
||||
data = builder['shell']
|
||||
if not data.startswith('#!'):
|
||||
data = '#!/bin/bash -x\n %s' % (data,)
|
||||
script.write(data)
|
||||
|
||||
remote_path = os.path.join('/tmp', script_fn)
|
||||
copy = dict(src=script_path,
|
||||
|
@ -681,10 +727,10 @@ class NodeWorker(object):
|
|||
with open(jobdir.playbook, 'w') as playbook:
|
||||
tasks = []
|
||||
|
||||
task = dict(file=dict(path='/tmp/console.log', state='absent'))
|
||||
task = dict(file=dict(path='/tmp/console.txt', state='absent'))
|
||||
tasks.append(task)
|
||||
|
||||
task = dict(zuul_console=dict(path='/tmp/console.log', port=8088))
|
||||
task = dict(zuul_console=dict(path='/tmp/console.txt', port=8088))
|
||||
tasks.append(task)
|
||||
|
||||
task = dict(file=dict(path=parameters['WORKSPACE'],
|
||||
|
@ -703,9 +749,11 @@ class NodeWorker(object):
|
|||
tasks = []
|
||||
for publisher in jjb_job.get('publishers', []):
|
||||
if 'scp' in publisher:
|
||||
tasks.extend(self._makeSCPTask(publisher))
|
||||
tasks.extend(self._makeSCPTask(jobdir, publisher,
|
||||
parameters))
|
||||
if 'ftp' in publisher:
|
||||
tasks.extend(self._makeFTPTask(jobdir, publisher))
|
||||
tasks.extend(self._makeFTPTask(jobdir, publisher,
|
||||
parameters))
|
||||
play = dict(hosts='node', name='Publishers',
|
||||
tasks=tasks)
|
||||
playbook.write(yaml.dump([play]))
|
||||
|
@ -747,13 +795,15 @@ class NodeWorker(object):
|
|||
def runAnsiblePostPlaybook(self, jobdir, success):
|
||||
proc = subprocess.Popen(
|
||||
['ansible-playbook', jobdir.post_playbook,
|
||||
'-e', 'success=%s' % success],
|
||||
'-e', 'success=%s' % success, '-v'],
|
||||
cwd=jobdir.ansible_root,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
preexec_fn=os.setsid,
|
||||
)
|
||||
(out, err) = proc.communicate()
|
||||
self.log.debug("Ansible post stdout:\n%s" % out)
|
||||
self.log.debug("Ansible post stderr:\n%s" % err)
|
||||
return proc.wait() == 0
|
||||
|
||||
|
||||
|
@ -761,3 +811,34 @@ class JJB(jenkins_jobs.builder.Builder):
|
|||
def __init__(self):
|
||||
self.global_config = None
|
||||
self._plugins_list = []
|
||||
|
||||
def expandComponent(self, component_type, component, template_data):
|
||||
component_list_type = component_type + 's'
|
||||
new_components = []
|
||||
if isinstance(component, dict):
|
||||
name, component_data = next(iter(component.items()))
|
||||
if template_data:
|
||||
component_data = jenkins_jobs.formatter.deep_format(
|
||||
component_data, template_data, True)
|
||||
else:
|
||||
name = component
|
||||
component_data = {}
|
||||
|
||||
new_component = self.parser.data[component_type].get(name)
|
||||
if new_component:
|
||||
for new_sub_component in new_component[component_list_type]:
|
||||
new_components.extend(
|
||||
self.expandComponent(component_type,
|
||||
new_sub_component, component_data))
|
||||
else:
|
||||
new_components.append({name: component_data})
|
||||
return new_components
|
||||
|
||||
def expandMacros(self, job):
|
||||
for component_type in ['builder', 'publisher']:
|
||||
component_list_type = component_type + 's'
|
||||
new_components = []
|
||||
for new_component in job.get(component_list_type, []):
|
||||
new_components.extend(self.expandComponent(component_type,
|
||||
new_component, {}))
|
||||
job[component_list_type] = new_components
|
||||
|
|
Loading…
Reference in New Issue