From 35f8bc84b30156e1f4df3bc17d265c0f5c7a9e35 Mon Sep 17 00:00:00 2001 From: Xicheng Chang Date: Wed, 8 Apr 2015 17:47:17 -0700 Subject: [PATCH] Add rally's dockerfile for rally docker build. The "real" rally docker container will be built from the image' created by this file. This Dockerfile should be used for creating and uploading rally image with periodic jobs. Change-Id: Id0d0c6d911a1893fe15dcb2cbd7df70b1597c2b8 --- docker-build/rally/Dockerfile | 22 ++ docker-build/rally/check_health.py | 503 +++++++++++++++++++++++++++++ 2 files changed, 525 insertions(+) create mode 100644 docker-build/rally/Dockerfile create mode 100644 docker-build/rally/check_health.py diff --git a/docker-build/rally/Dockerfile b/docker-build/rally/Dockerfile new file mode 100644 index 0000000..fd1b00e --- /dev/null +++ b/docker-build/rally/Dockerfile @@ -0,0 +1,22 @@ +FROM ubuntu:trusty +RUN apt-get update && \ + apt-get -y install git python2.7 bash-completion python-dev libffi-dev \ + libxml2-dev libxslt1-dev libssl-dev python-pip libmysqlclient-dev && \ + pip install mysql-python +RUN mkdir -p /tmp/rally-src && \ + cd /tmp/rally-src && \ + git clone https://github.com/stackforge/rally.git +RUN mkdir -p /opt/rally/database && \ + mkdir -p /etc/rally && \ + mkdir -p /opt/compass/rally/deployment && \ + mkdir -p /opt/compass/rally/scenarios && \ + chmod 0755 /opt/compass && \ + chmod 0755 /opt/compass/rally && \ + chmod 0755 /opt/compass/rally/deployment && \ + chmod 0755 /opt/compass/rally/scenarios + +ADD check_health.py /opt/compass/check_health.py + +RUN git clone https://github.com/stackforge/rally.git +RUN cd rally && \ + ./install_rally.sh diff --git a/docker-build/rally/check_health.py b/docker-build/rally/check_health.py new file mode 100644 index 0000000..c7b6446 --- /dev/null +++ b/docker-build/rally/check_health.py @@ -0,0 +1,503 @@ +from multiprocessing import Pool +import argparse +import logging +import multiprocessing +import os +import simplejson as json +import site +import subprocess +import sys +import re + + +logging.basicConfig(filename='/var/log/check_health.log', + level=logging.INFO, + format='%(asctime)s;%(levelname)s;%(lineno)s;%(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + +# Activate virtual environment for Rally +logging.info("Start to activate Rally virtual environment......") +virtual_env = '/opt/rally' +activate_this = '/opt/rally/bin/activate_this.py' +execfile(activate_this, dict(__file__=activate_this)) +site.addsitedir(virtual_env) +if virtual_env not in sys.path: + sys.path.append(virtual_env) +logging.info("Activated virtual environment.") + + +from oslo_config import cfg +from rally import db +from rally.common import version +import requests + + +CONF = cfg.CONF +PIDFILE = '/tmp/compass_health_check.pid' +REQUEST_HEADER = {'content-type': 'application/json'} + + +def round_float(number, d=2): + return ("%." + str(d) + "f") % number + + +def get_task_name(task_json_path): + return os.path.basename(task_json_path).split('.')[0] + + +def get_report_name(task_name): + return task_name.replace('_', '-') + + +def is_process_running(): + if not os.path.isfile(PIDFILE): + return False + + file = open(PIDFILE, 'r') + pid = file.readline() + file.close() + + if os.path.exists('/proc/%s/cmd' % pid): + return True + else: + os.unlink(PIDFILE) + + return False + + +def clean_pidfile(): + if not is_process_running(): + return + os.unlink(PIDFILE) + + +class HealthException(Exception): + def __init__(self, err_msg, url=None): + super(HealthException, self).__init__(err_msg) + self.url = url + + +def error_handler(func_name, err_msg, url): + logging.error("%s raise excption: %s" % (func_name, err_msg)) + # Clean pidfile + clean_pidfile() + + # Send error back to Compass + payload = { + "report": {}, + "state": "error", + "error_message": err_msg + } + resp = requests.put( + url, data=json.dumps(payload), headers=REQUEST_HEADER + ) + logging.info("[error_handler] status_code: %s" % resp.status_code) + + +def error_handler_decorator(func): + def func_wrapper(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + + except HealthException as exc: + func_name = func.__name__ + err_msg = str(exc) + error_handler(func_name, err_msg, exc.url) + logging.error(exc) + + return func_wrapper + + +def run_task(args, **kwargs): + return HealthCheck.start_task(*args, **kwargs) + + +class HealthCheck(object): + + def __init__(self, compass_url, clustername): + self.url = compass_url + self.deployment_name = clustername + self.rally_secnarios_dir = '/opt/compass/rally/scenarios' + self.rally_deployment_dir = '/opt/compass/rally/deployment' + + def print_dict(self, input_dict): + print json.dumps(input_dict, indent=4) + + def init_rally_config(self): + CONF([], project='rally', version=version.version_string()) + + @error_handler_decorator + def exec_cli(self, command, max_reties=1): + max_reties = max_reties + output = None + err_msg = None + + while(max_reties > 0): + proc = subprocess.Popen( + command, shell=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + output, err_msg = proc.communicate() + if proc.returncode == 0: + break + else: + logging.error('[exec_cli]: %s' % err_msg) + proc.terminate() + max_reties -= 1 + + return proc.returncode, output, err_msg + + @error_handler_decorator + def create_deployment(self): + dpl_file_name = '.'.join((self.deployment_name, 'json')) + dpl_path = os.path.join(self.rally_deployment_dir, dpl_file_name) + logging.info('deployment config file path is %s' % dpl_path) + + if not os.path.isfile(dpl_path): + err_msg = 'Cannot find deployment config file for rally.' + raise HealthException(err_msg, self.url) + + deployments = db.deployment_list(name=self.deployment_name) + if deployments: + # Destroy the previous deployment + uuid = deployments[0].uuid + self.delete_deployment_and_tasks(uuid) + logging.info("Destroy previous deployment!") + + # Create deployment + command = 'rally deployment create --filename=%s --name=%s' \ + % (dpl_path, self.deployment_name) + + logging.info(command) + returncode, output, err_msg = self.exec_cli(command) + if returncode > 0: + # Send error message to Compass. Rally failed. + raise HealthException(err_msg, self.url) + + deployment = db.deployment_list(name=self.deployment_name)[0] + + return deployment.uuid + + @error_handler_decorator + def delete_deployment_and_tasks(self, deployment_uuid=None): + if not deployment_uuid: + deployments = db.deployment_list(name=self.deployment_name) + if not deployments: + return + + deployment_uuid = deployments[0].uuid + + self.cleanup_previous_tasks(deployment_uuid) + command = 'rally deployment destroy --deployment %s'\ + % self.deployment_name + + returncode, output, err_msg = self.exec_cli(command) + if returncode > 0: + raise HealthException(err_msg, self.url) + + logging.info("Destroyed the deployment '%s'" % self.deployment_name) + + def get_all_tasks_config(self): + tasks = [] + for dirpath, dirs, files in os.walk(self.rally_secnarios_dir): + for file in files: + if file.endswith('.json'): + tasks.append(os.path.join(dirpath, file)) + + logging.info("Get all tasks config are %s" % tasks) + return tasks + + def get_tasks_uuid_from_db(self, deployment_id): + tasks = db.task_list(deployment=deployment_id) + return [task.uuid for task in tasks] + + @error_handler_decorator + def start_task(self, task_json_path): + task_name = get_task_name(task_json_path) + print "Start task [%s]...." % task_name + + command = 'rally -v task start %s' % task_json_path + logging.info(command) + returncode, output, err = self.exec_cli(command) + + logging.info("task [%s] output is %s" % (task_name, output)) + print "Done task [%s]" % task_name + + print "Start to collect report......" + self.collect_and_send_report(task_name, output) + + print "Collecting report for task [%s] is done!" % task_name + + def collect_and_send_report(self, task_name, task_output): + """ + { + "results": { + "actions": { + "$action": { + "duration": { + "summary": { + "min (sec)": xx, + "max (sec)": xx, + "avg (sec)": xx, + "success": xx, + "errors": xx, + "total": xx + }, + "data": [xx,xx,xx] + } + } + }, + 'total_errors': x + }, + "category": "xxx", + "raw_output": {...} + } + """ + report_name = get_report_name(task_name) + report_url = '/'.join((self.url, report_name)) + match = re.search('\s?rally task results\s+([\da-f\-]+)\s?', task_output) + if not match: + raise HealthException('Unknown rally internel error!', report_url) + + task_uuid = match.group(1) + task_obj = db.task_get(task_uuid) + if task_obj['status'] == 'failed': + raise HealthException(task_obj['verification_log'], report_url) + + command = "rally task results %s" % task_uuid + logging.info("[collect_and_send_report] command is %s" % command) + + print "Start to collect report for task [%s]" % task_name + return_code, task_result, err = self.exec_cli(command) + if return_code > 0: + raise HealthException(err, report_url) + + output = json.loads(task_result)[0] + report = {'actions': {}} + actions = [] + + # Get the name of actions + actions = [] + if output['result']: + actions = output['result'][0]['atomic_actions'].keys() + + for result in output['result']: + if result['error']: + continue + actions = result['atomic_actions'].keys() + break + + if not actions: + actions.append(report_name) + + # Get and set report for each action + for action in actions: + report['actions'].setdefault(action, {'duration': {}}) + report['actions'][action]['duration'] \ + = self._get_action_dur_report(action, output) + + # Get and set errors if any + errors = self._get_total_errors(output) + report['total_errors'] = errors + + logging.info("task [%s] report is: %s" % (task_name, report)) + + final_report = {"results": report, "raw_output": output} + self.send_report(final_report, report_url) + + def _get_total_errors(self, output): + results = output['result'] + if not results: + return 1 + total_errors = 0 + + for result in results: + if result['error']: + total_errors += 1 + + return total_errors + + def _get_action_dur_report(self, action, output): + summary = { + 'min (sec)': 0, + 'avg (sec)': 0, + 'max (sec)': 0, + 'success': '0.0%', + 'errors': {}, + 'total': 0 + } + data = [] + errors = { + 'count': 0, + 'details': [] + } + min_dur = sys.maxint + max_dur = 0 + total_dur = 0 + no_action = 0 + + results = output['result'] + + for result in results: + atomic_actions = result['atomic_actions'] + + if atomic_actions and action not in atomic_actions: + no_action += 1 + data.append(0) + continue + + elif (atomic_actions and not atomic_actions[action] + or not atomic_actions and result['error']): + errors['count'] = errors['count'] + 1 + errors['details'].append(result['error']) + data.append(0) + continue + + duration = result['duration'] + if action in atomic_actions: + duration = atomic_actions[action] + + total_dur += duration + min_dur = [min_dur, duration][duration < min_dur] + max_dur = [max_dur, duration][duration > max_dur] + data.append(duration) + + error_count = errors['count'] + total_exec = output['key']['kw']['runner']['times'] + + if not results: + errors['count'] = total_exec + errors['details'] = ['Unknown error!'] + summary['errors'] = errors + + return { + 'summary': summary, + 'data': data + } + + if total_exec == error_count: + # All actions in this scenario are failed. + summary['min (sec)'] = 0 + summary['avg (sec)'] = 0 + else: + summary['min (sec)'] = round_float(min_dur) + summary['avg (sec)'] = round_float( + total_dur / (total_exec - error_count - no_action) + ) + + summary['max (sec)'] = round_float(max_dur) + summary['errors'] = errors + summary['success'] = round_float( + float( + total_exec - error_count - no_action + ) * 100 / float(len(results)), + 1 + ) + '%' + summary['total'] = total_exec + + return { + 'summary': summary, + 'data': data + } + + def create_reports(self, tasks): + reports_list = [] + for task in tasks: + temp = {} + temp['name'] = get_report_name(get_task_name(task)) + temp['category'] = os.path.basename(os.path.dirname(task)) + reports_list.append(temp) + + logging.info("tasks are %s" % reports_list) + + payload = {"report_list": reports_list} + resp = requests.post( + self.url, data=json.dumps(payload), headers=REQUEST_HEADER + ) + logging.info("[create reports] response code is %s" % resp.status_code) + + def send_report(self, report, report_url=None): + if not report_url: + logging.error("report_url is None!") + report_url = self.url + + payload = { + "report": report, + "state": "success" + } + total_errors = report['results']['total_errors'] + exec_num = report['raw_output']['key']['kw']['runner']['times'] + + if total_errors >= exec_num or total_errors == 0 and exec_num > 0: + payload['state'] = 'error' + payload['error_message'] = "Actions in this scenario are failed." + + elif total_errors: + payload['state'] = 'finished' + + resp = requests.put( + report_url, data=json.dumps(payload), headers=REQUEST_HEADER + ) + logging.info("Update report reponse is %s" % resp.text) + + def cleanup_previous_tasks(self, deployment_id): + tasks = self.get_tasks_uuid_from_db(deployment_id) + + for task_id in tasks: + db.task_delete(task_id) + + logging.info("Delete all tasks of deployment[ID: %s]" % deployment_id) + + def run(self): + tasks = self.get_all_tasks_config() + self.create_reports(tasks) + self.init_rally_config() + self.create_deployment() + + logging.info("Start to run tasks...") + process_num = 2 + try: + cpu_num = multiprocessing.cpu_count() + process_num = [process_num, cpu_num][process_num < cpu_num] + except Exception: + logging.info("cpu_count() has not been implemented!") + + logging.info("The number of processes will be %s." % process_num) + try: + pool = Pool(processes=process_num) + pool.map_async(run_task, zip([self]*len(tasks), tasks)) + pool.close() + pool.join() + except Exception as ex: + logging.info("processing pool get exception: '%s'" % ex) + + finally: + clean_pidfile() + + +def main(compass_url, deployment_name): + logging.info('compass_url is %s' % compass_url) + if is_process_running(): + logging.info("[%s] already exisits, exit!" % PIDFILE) + sys.exit() + else: + pid = str(os.getpid()) + file(PIDFILE, 'w').write(pid) + + checker = HealthCheck(compass_url, deployment_name) + checker.run() + logging.info("Health check is finished!") + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument("--url", type=str, + help="The URL to send reports back") + parser.add_argument("--clustername", type=str, + help="The Cluster name") + args = parser.parse_args() + + compass_url = args.url + deployment_name = args.clustername + + main(compass_url, deployment_name)