#!/usr/bin/python # # Runs a tripleo-ci test-client # # Copyright 2013 Red Hat, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # import argparse import json import logging import sys import subprocess import os import tempfile import textwrap import threading import time import uuid import gear logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('testenv-client') logger.setLevel(logging.INFO) class TestCallback(object): def __init__(self, servers, name, command): self.servers = servers self.name = name self.command = command self.created = time.time() # Default the return value to None, this may end up being # used if the gearman worker goes down before the job finishes self.rv = None def __call__(self): self.worker = gear.Worker('testenv-client-%s' % self.name) add_servers(self.worker, self.servers) self.worker.waitForServer() self.worker.registerFunction(self.name) try: job = self.worker.getJob() except gear.InterruptedError: return logger.info('Received job : %s', job.arguments.strip()) time_waiting = time.time() - self.created if time_waiting > 90: logger.warn('%.1f seconds waiting for a worker.' % (time_waiting)) if "Couldn't retrieve env" in job.arguments or "Failed creating OVB stack" in job.arguments: logger.error(job.arguments) self.rv = 2 job.sendWorkComplete("") return logger.info('Running command "%s"', ' '.join(self.command)) with tempfile.NamedTemporaryFile('w') as fp: fp.write(job.arguments) fp.flush() os.environ["TE_DATAFILE"] = fp.name try: self.rv = subprocess.call(self.command) except: logger.exception("Error calling command") self.rv = 2 job.sendWorkComplete("") class TestEnvClient(gear.Client): def __init__(self): super(TestEnvClient, self).__init__() self.event = threading.Event() def handleWorkComplete(self, packet): super(TestEnvClient, self).handleWorkComplete(packet) self.event.set() def handleWorkException(self, packet): super(TestEnvClient, self).handleWorkException(packet) self.event.set() def handleWorkFail(self, packet): super(TestEnvClient, self).handleWorkFail(packet) self.event.set() def wait(self, timeout=None): """Wait for notification of completion, error or failure. :param timeout: a timeout for the operation in seconds :type timeout: float :returns: True if a notification was received, False on timeout """ self.event.wait(timeout) return self.event.is_set() def add_servers(client, servers): for server in servers.split(','): server = server.rsplit(':', 1) if len(server) == 1: server.append('4730') client.addServer(server[0], int(server[1])) def main(args=sys.argv[1:]): parser = argparse.ArgumentParser( description=(textwrap.dedent(""" Starts up a gearman worker and then calls the job "lockenv" over gearman, then waits for the worker to be called, once the worker is called it will place the provided data in a datafile (indicated by the TE_DATAFILE environment variable) and run the "command" provided, the exit code will be the exit code of the command that was run. Essentially this allows a command to be run while the worker is holding a test environment in a locked state e.g. to simply output the data provided one could run the command: $ echo 'cat $TE_DATAFILE' | %s -- bash """ % sys.argv[0])), formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument('command', nargs="+", help='A command to run once the test env is locked') parser.add_argument('--geard', '-b', default='127.0.0.1:4730', help='A comma separated list of gearman brokers to ' 'connect to.') parser.add_argument('--jobnum', '-n', default=uuid.uuid4().hex, help='A unique identifier identifing this job.') parser.add_argument('--timeout', '-t', default='10800', help='Set a timeout, after which the command will ' 'be killed.') parser.add_argument('--envsize', default="2", help='Number of baremetal nodes to request') parser.add_argument('--compute-envsize', default='0', help='Number of compute baremetal nodes to request. ' 'When this is set to a value > 0, the primary ' 'nodes will be tagged with the controller ' 'profile and the extra nodes with compute. The ' 'compute nodes will be a smaller flavor in order ' 'to use less resources.') parser.add_argument('--ucinstance', help='uuid for the undercloud instance (where an ' 'interface on the provisioning net is attached') parser.add_argument('--create-undercloud', action='store_true', help='deploy the undercloud node.') parser.add_argument('--ssh-key', default='', help='ssh key for the ovb nodes to be deployed.') parser.add_argument('--net-iso', default="multi-nic", choices=['none', 'multi-nic', 'public-bond'], help='"none" requests an environment without network ' 'isolation, "multi-nic" requests one with a ' 'basic multiple nic configuration, and ' '"public-bond" requests one like "multi-nic" ' 'but with two public nics for use with bonded ' 'nic-configs.') parser.add_argument('--extra-nodes', default='0', help='Number of extra undercloud-like nodes to ' 'request') parser.add_argument('--debug', '-d', action='store_true', help='Set to debug mode.') opts = parser.parse_args(args) if opts.debug: logger.setLevel(logging.DEBUG) callback_name = "callback_" + opts.jobnum cb = TestCallback(opts.geard, callback_name, opts.command) threading.Thread(target=cb).start() client = TestEnvClient() add_servers(client, opts.geard) client.waitForServer() job_identifier = '%s: %s' % (os.environ.get('ZUUL_CHANGE', 'No change'), os.environ['TOCI_JOBTYPE']) job_params = { "callback_name": callback_name, "timeout": opts.timeout, "envsize":opts.envsize, "compute_envsize":opts.compute_envsize, "ucinstance":opts.ucinstance, "create_undercloud": "true" if opts.create_undercloud else "", "ssh_key":opts.ssh_key, "net_iso":opts.net_iso, "extra_nodes":opts.extra_nodes, "job_identifier":job_identifier, } job = gear.Job('lockenv', json.dumps(job_params)) client.submitJob(job) # No timeout here as there will be a timeout on the jenkins jobs, which is # also passed to the testenv-worker, lets not second guess them. client.wait() if job.failure: # This signals an error with the gearman connection to the worker # we log it, but still return cb.rv the command may have succeeded logger.error("The gearman Job has failed") cb.worker.stopWaitingForJobs() # If the testenv worker releases the environment before our command # completes we kill this process and all its children, to immediately # stop the running job if cb.rv is None: logger.error("The command hasn't completed but the testenv worker has " "released the environment. Killing all processes.") subprocess.call(["sudo", "kill", "-9", "-%d" % os.getpgrp()]) logger.debug("Exiting with status : %d", cb.rv) return cb.rv if __name__ == '__main__': exit(main())