diff --git a/scripts/marathon-health-check-testing/Dockerfile b/scripts/marathon-health-check-testing/Dockerfile new file mode 100644 index 0000000..aa9ad2d --- /dev/null +++ b/scripts/marathon-health-check-testing/Dockerfile @@ -0,0 +1,9 @@ +FROM python:2 + +RUN /bin/sh -c 'mkdir -p /usr/src/app' + +ADD server.py /usr/src/app/server.py + +WORKDIR /usr/src/app + +CMD ["python", "./server.py"] diff --git a/scripts/marathon-health-check-testing/HealthCheckBencher.py b/scripts/marathon-health-check-testing/HealthCheckBencher.py new file mode 100755 index 0000000..3fabe4d --- /dev/null +++ b/scripts/marathon-health-check-testing/HealthCheckBencher.py @@ -0,0 +1,254 @@ +#!/usr/bin/python + + +from argparse import ArgumentParser +from datetime import datetime +from hashlib import md5 +from marathon import MarathonClient +from marathon.models.container import MarathonContainer +from marathon.models.container import MarathonContainerPortMapping +from marathon.models.container import MarathonDockerContainer +from marathon.models import MarathonApp +from marathon.models import MarathonHealthCheck +from Queue import Empty +from Queue import Queue +from random import random +from threading import Thread +from time import sleep +from urllib2 import urlopen + +MEM = 50 +CPUS = 1 +DISK = 50 + + +class HealthCheckBencher(object): + def __init__(self, marathon_url, image, tasks): + self.concurrency = 20 + self.docker_image = image + self.app_base_name = 'health-check-test-' + self.total_tasks_cout = int(tasks) + self.instances_per_app = 50 + if tasks < self.instances_per_app: + self.instances_per_app = self.total_tasks_cout + self.app_count = 1 + else: + self.app_count = self.total_tasks_cout/self.instances_per_app + self.heath_check_interval = 30 + self.test_duration = 20 + self.marathon_cluster = MarathonClient(marathon_url, timeout=240) + self.work_queue = Queue() + self.result_queue = Queue() + self.app_list_queue = Queue() + self.action_list = [self.start_collect, + 'sleep={}'.format(self.test_duration), + self.get_stats] + + def remove_apps(self): + apps = self.marathon_cluster.list_apps() + for app in apps: + if app.id.startswith("/"+self.app_base_name): + self.marathon_cluster.delete_app(app.id) + active = 0 + while True: + apps = self.marathon_cluster.list_apps() + for app in apps: + if app.id.startswith(self.app_base_name): + active += 1 + if active == 0: + break + + def create_app(self, id): + port_mapping = MarathonContainerPortMapping(container_port=80, + protocol="tcp") + app_docker = MarathonDockerContainer( + image=self.docker_image, + network="BRIDGE", + force_pull_image=True, + port_mappings=[port_mapping]) + app_container = MarathonContainer(docker=app_docker) + http_health_check = MarathonHealthCheck( + protocol="HTTP", + path="/status", + grace_period_seconds=300, + interval_seconds=self.heath_check_interval, + timeout_seconds=20, + max_consecutive_failures=0 + ) + + app_suffix = str(md5(str(random())).hexdigest()) + app_name = self.app_base_name + app_suffix + new_app = MarathonApp(cpus=CPUS, mem=MEM, disk=DISK, + container=app_container, + health_checks=[http_health_check], + instances=self.instances_per_app, + max_launch_delay_seconds=5) + print("Creating {}".format(app_name)) + self.marathon_cluster.create_app(app_id=app_name, app=new_app) + self.app_list_queue.put(app_name) + return None + + def wait_instances(self, app_name): + health_ok = 0 + while health_ok < self.instances_per_app: + health_ok = 0 + tasks = self.marathon_cluster.list_tasks(app_name) + for task in tasks: + if task.health_check_results: + health_ok += 1 + + def start_collect(self, task): + url = 'http://'+task['host']+':'+str(task['port'])+'/start_collect' + res = urlopen(url) + if res.getcode() == 200: + print(task['id']+': collecter was started') + else: + print(task['id']+': failed to start collecter') + + def stop_collect(self, task): + url = 'http://'+task['host']+':'+str(task['port'])+'/stop_collect' + res = urlopen(url) + if res.getcode() == 200: + print(task['id']+': collecter was stopped') + else: + print(task['id']+': failed to stop collecter') + + def clear_stats(self, task): + url = 'http://'+task['host']+':'+str(task['port'])+'/clear_stats' + res = urlopen(url) + if res.getcode() == 200: + print(task['id']+': stats was dropped') + else: + print(task['id']+': stats was dropped') + + def get_stats(self, task): + url = 'http://'+task['host']+':'+str(task['port'])+'/get_timestamps' + try: + res = urlopen(url) + except Exception: + print("URL req failed") + self.result_queue.put({'id': task['id'], + 'status': 'Failed', + 'data': []}) + return + if res.getcode() == 200: + data = res.read() + timestamps = data.split(',') + self.result_queue.put({'id': task['id'], + 'status': 'ok', + 'data': timestamps}) + elif res.getcode() == 202: + print("Collecting is not enabled") + self.result_queue.put({'id': task['id'], + 'status': 'Collecting is not enabled', + 'data': []}) + else: + print("Unknown response code") + self.result_queue.put({'id': task['id'], + 'status': 'Unknown response code', + 'data': []}) + + def repeat(self, action): + while self.work_queue.empty() is False: + try: + iteration = self.work_queue.get_nowait() + except Empty: + continue + action(iteration) + self.work_queue.task_done() + + def fill_queue(self, iterations): + for iteration in iterations: + self.work_queue.put(iteration) + + def get_tasks(self): + res = [] + tasks = self.marathon_cluster.list_tasks() + for task in tasks: + if not task.id.startswith('health-check-test-'): + continue + res.append({'id': str(task.id), + 'host': str(task.host), + 'port': str(task.ports[0])}) + return res + + def create_apps(self): + self.fill_queue(range(self.app_count)) + for thread_num in range(self.concurrency): + if self.work_queue.empty() is True: + break + worker = Thread(target=self.repeat, args=(self.create_app,)) + worker.start() + self.work_queue.join() + + while self.app_list_queue.empty() is False: + try: + app_name = self.app_list_queue.get_nowait() + except Empty: + continue + self.work_queue.put(app_name) + + for thread_num in range(self.concurrency): + if self.work_queue.empty() is True: + break + worker = Thread(target=self.repeat, args=(self.wait_instances,)) + worker.start() + self.work_queue.join() + + def start_test(self): + task_list = self.get_tasks() + for action in self.action_list: + if isinstance(action, basestring): + if action.startswith('sleep='): + amount = int(action.split('=')[1]) + sleep(60*amount) + continue + self.fill_queue(task_list) + for thread_num in range(self.concurrency): + if self.work_queue.empty() is True: + break + worker = Thread(target=self.repeat, args=(action,)) + worker.start() + self.work_queue.join() + + def generate_report(self): + today = datetime.today() + file_prefix = "{:%Y-%m-%d_%H_%M_%S-}".format(today) + file_name = (file_prefix + + 'health_check_result-' + + str(self.total_tasks_cout) + + 'tasks.csv') + + f = open(file_name, "w") + f.write("Task ID,Health check timestamp") + + while self.result_queue.empty() is False: + try: + result = self.result_queue.get_nowait() + except Empty: + continue + for timestamp in result['data']: + f.write("\n%s,%s" % (result['id'], timestamp)) + + f.close() + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument("-m", "--marathon", + help="Marathon URL, on example " + "http://172.20.8.34:8080/virt-env-2/marathon", + required=True) + parser.add_argument("-t", "--tasks", + help="Total tasks count", + required=True) + parser.add_argument("-i", "--image", + help="Docker image path", + required=True) + args = parser.parse_args() + + bencher = HealthCheckBencher(args.marathon, args.image, int(args.tasks)) + + bencher.create_apps() + bencher.start_test() + bencher.remove_apps() + bencher.generate_report() diff --git a/scripts/marathon-health-check-testing/ParseRawResults.py b/scripts/marathon-health-check-testing/ParseRawResults.py new file mode 100755 index 0000000..b18fd37 --- /dev/null +++ b/scripts/marathon-health-check-testing/ParseRawResults.py @@ -0,0 +1,52 @@ +#!/usr/bin/python +from argparse import ArgumentParser +import numpy as np + + +def parse(file, percentile, interval): + data = {} + with open(file) as fp: + for line in fp: + record = line.rstrip().split(',') + try: + timestamp = float(record[1]) + if record[0] not in data: + data[record[0]] = [] + data[record[0]].append(timestamp) + except ValueError: + continue + + deviations = [] + for task in data: + data[task].sort() + last_timestamp = 0 + for timestamp in data[task]: + if last_timestamp == 0: + last_timestamp = timestamp + continue + cur_interval = timestamp - last_timestamp + last_timestamp = timestamp + deviations.append(np.fabs(interval - cur_interval)) + + print("Total tasks: {}. Total health checks: {}".format(len(data.keys()), + len(deviations))) + print("min: {}. max: {}, average: {}," + " percentile: {}".format(np.min(deviations), + np.max(deviations), + np.average(deviations), + np.percentile(deviations, percentile))) + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument("-f", "--file", + help="File to be parsed", + required=True) + parser.add_argument("-i", "--interval", + help="Configured health check interval(sec)", + required=True) + parser.add_argument("-p", "--persentile", + help="Percentile value [0-100]. Default 95", + required=False, default=95.0) + args = parser.parse_args() + + parse(args.file, float(args.persentile), int(args.interval)) diff --git a/scripts/marathon-health-check-testing/server.py b/scripts/marathon-health-check-testing/server.py new file mode 100755 index 0000000..b8af07b --- /dev/null +++ b/scripts/marathon-health-check-testing/server.py @@ -0,0 +1,108 @@ +#!/usr/bin/python +from BaseHTTPServer import BaseHTTPRequestHandler +import os +import time + +VERSION = 1.0 + + +class ServerStatus(object): + collecting = 0 + status = 1 + last_timestamp = 0 + interval_arr = [] + timestamp_arr = [] + + +class GetHandler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path == '/start': + ServerStatus.status = 1 + self.send_response(200) + self.end_headers() + elif self.path == '/stop': + ServerStatus.status = 0 + self.send_response(200) + self.end_headers() + elif self.path == '/start_collect': + ServerStatus.collecting = 1 + self.send_response(200) + self.end_headers() + elif self.path == '/stop_collect': + ServerStatus.collecting = 0 + self.send_response(200) + self.end_headers() + elif self.path == '/get_intervals': + self.send_response(200) + self.send_header("Context-Type", "text/plain") + self.end_headers() + tmp_str = ','.join(str(x) for x in ServerStatus.interval_arr) + self.wfile.write(tmp_str) + self.wfile.close() + elif self.path == '/version': + self.send_response(200) + self.send_header("Context-Type", "text/plain") + self.end_headers() + self.wfile.write(VERSION) + self.wfile.close() + elif self.path == '/get_stats': + if ServerStatus.collecting == 0: + self.send_response(202) + else: + self.send_response(200) + self.send_header("Context-Type", "text/plain") + self.end_headers() + timestamp_str = ','.join(str(x) + for x in ServerStatus.timestamp_arr) + intervals_str = ','.join(str(x) for x in ServerStatus.interval_arr) + self.wfile.write(timestamp_str+'\n'+intervals_str) + self.wfile.close() + elif self.path == '/get_timestamps': + if ServerStatus.collecting == 0: + self.send_response(202) + else: + self.send_response(200) + self.send_header("Context-Type", "text/plain") + self.end_headers() + tmp_str = ','.join(str(x) for x in ServerStatus.timestamp_arr) + self.wfile.write(tmp_str) + self.wfile.close() + elif self.path == '/is_collecter_start': + self.send_response(200) + self.send_header("Context-Type", "text/plain") + self.end_headers() + if ServerStatus.collecting == 1: + self.wfile.write("Yes") + else: + self.wfile.write("No") + self.wfile.close() + elif self.path == '/clear_stats': + ServerStatus.interval_arr = [] + ServerStatus.timestamp_arr = [] + ServerStatus.last_timestamp = 0 + self.send_response(200) + self.end_headers() + elif self.path == '/status': + if ServerStatus.collecting == 1: + if ServerStatus.last_timestamp == 0: + ServerStatus.last_timestamp = time.time() + else: + current_time = time.time() + interval = round(current_time - + ServerStatus.last_timestamp, 3) + ServerStatus.interval_arr.append(interval) + ServerStatus.timestamp_arr.append(round(current_time, 3)) + ServerStatus.last_timestamp = current_time + if ServerStatus.status == 0: + self.send_response(503) + else: + self.send_response(200) + self.end_headers() + + return + +if __name__ == '__main__': + port = int(os.getenv('SERVER_PORT', '80')) + from BaseHTTPServer import HTTPServer + server = HTTPServer(('0.0.0.0', port), GetHandler) + server.serve_forever()