Merge "Add scripts for marathon health check testing"

This commit is contained in:
Jenkins 2016-04-01 13:04:18 +00:00 committed by Gerrit Code Review
commit 167dcc8480
4 changed files with 423 additions and 0 deletions

View File

@ -0,0 +1,9 @@
FROM python:2
RUN /bin/sh -c 'mkdir -p /usr/src/app'
ADD server.py /usr/src/app/server.py
WORKDIR /usr/src/app
CMD ["python", "./server.py"]

View File

@ -0,0 +1,254 @@
#!/usr/bin/python
from argparse import ArgumentParser
from datetime import datetime
from hashlib import md5
from marathon import MarathonClient
from marathon.models.container import MarathonContainer
from marathon.models.container import MarathonContainerPortMapping
from marathon.models.container import MarathonDockerContainer
from marathon.models import MarathonApp
from marathon.models import MarathonHealthCheck
from Queue import Empty
from Queue import Queue
from random import random
from threading import Thread
from time import sleep
from urllib2 import urlopen
MEM = 50
CPUS = 1
DISK = 50
class HealthCheckBencher(object):
def __init__(self, marathon_url, image, tasks):
self.concurrency = 20
self.docker_image = image
self.app_base_name = 'health-check-test-'
self.total_tasks_cout = int(tasks)
self.instances_per_app = 50
if tasks < self.instances_per_app:
self.instances_per_app = self.total_tasks_cout
self.app_count = 1
else:
self.app_count = self.total_tasks_cout/self.instances_per_app
self.heath_check_interval = 30
self.test_duration = 20
self.marathon_cluster = MarathonClient(marathon_url, timeout=240)
self.work_queue = Queue()
self.result_queue = Queue()
self.app_list_queue = Queue()
self.action_list = [self.start_collect,
'sleep={}'.format(self.test_duration),
self.get_stats]
def remove_apps(self):
apps = self.marathon_cluster.list_apps()
for app in apps:
if app.id.startswith("/"+self.app_base_name):
self.marathon_cluster.delete_app(app.id)
active = 0
while True:
apps = self.marathon_cluster.list_apps()
for app in apps:
if app.id.startswith(self.app_base_name):
active += 1
if active == 0:
break
def create_app(self, id):
port_mapping = MarathonContainerPortMapping(container_port=80,
protocol="tcp")
app_docker = MarathonDockerContainer(
image=self.docker_image,
network="BRIDGE",
force_pull_image=True,
port_mappings=[port_mapping])
app_container = MarathonContainer(docker=app_docker)
http_health_check = MarathonHealthCheck(
protocol="HTTP",
path="/status",
grace_period_seconds=300,
interval_seconds=self.heath_check_interval,
timeout_seconds=20,
max_consecutive_failures=0
)
app_suffix = str(md5(str(random())).hexdigest())
app_name = self.app_base_name + app_suffix
new_app = MarathonApp(cpus=CPUS, mem=MEM, disk=DISK,
container=app_container,
health_checks=[http_health_check],
instances=self.instances_per_app,
max_launch_delay_seconds=5)
print("Creating {}".format(app_name))
self.marathon_cluster.create_app(app_id=app_name, app=new_app)
self.app_list_queue.put(app_name)
return None
def wait_instances(self, app_name):
health_ok = 0
while health_ok < self.instances_per_app:
health_ok = 0
tasks = self.marathon_cluster.list_tasks(app_name)
for task in tasks:
if task.health_check_results:
health_ok += 1
def start_collect(self, task):
url = 'http://'+task['host']+':'+str(task['port'])+'/start_collect'
res = urlopen(url)
if res.getcode() == 200:
print(task['id']+': collecter was started')
else:
print(task['id']+': failed to start collecter')
def stop_collect(self, task):
url = 'http://'+task['host']+':'+str(task['port'])+'/stop_collect'
res = urlopen(url)
if res.getcode() == 200:
print(task['id']+': collecter was stopped')
else:
print(task['id']+': failed to stop collecter')
def clear_stats(self, task):
url = 'http://'+task['host']+':'+str(task['port'])+'/clear_stats'
res = urlopen(url)
if res.getcode() == 200:
print(task['id']+': stats was dropped')
else:
print(task['id']+': stats was dropped')
def get_stats(self, task):
url = 'http://'+task['host']+':'+str(task['port'])+'/get_timestamps'
try:
res = urlopen(url)
except Exception:
print("URL req failed")
self.result_queue.put({'id': task['id'],
'status': 'Failed',
'data': []})
return
if res.getcode() == 200:
data = res.read()
timestamps = data.split(',')
self.result_queue.put({'id': task['id'],
'status': 'ok',
'data': timestamps})
elif res.getcode() == 202:
print("Collecting is not enabled")
self.result_queue.put({'id': task['id'],
'status': 'Collecting is not enabled',
'data': []})
else:
print("Unknown response code")
self.result_queue.put({'id': task['id'],
'status': 'Unknown response code',
'data': []})
def repeat(self, action):
while self.work_queue.empty() is False:
try:
iteration = self.work_queue.get_nowait()
except Empty:
continue
action(iteration)
self.work_queue.task_done()
def fill_queue(self, iterations):
for iteration in iterations:
self.work_queue.put(iteration)
def get_tasks(self):
res = []
tasks = self.marathon_cluster.list_tasks()
for task in tasks:
if not task.id.startswith('health-check-test-'):
continue
res.append({'id': str(task.id),
'host': str(task.host),
'port': str(task.ports[0])})
return res
def create_apps(self):
self.fill_queue(range(self.app_count))
for thread_num in range(self.concurrency):
if self.work_queue.empty() is True:
break
worker = Thread(target=self.repeat, args=(self.create_app,))
worker.start()
self.work_queue.join()
while self.app_list_queue.empty() is False:
try:
app_name = self.app_list_queue.get_nowait()
except Empty:
continue
self.work_queue.put(app_name)
for thread_num in range(self.concurrency):
if self.work_queue.empty() is True:
break
worker = Thread(target=self.repeat, args=(self.wait_instances,))
worker.start()
self.work_queue.join()
def start_test(self):
task_list = self.get_tasks()
for action in self.action_list:
if isinstance(action, basestring):
if action.startswith('sleep='):
amount = int(action.split('=')[1])
sleep(60*amount)
continue
self.fill_queue(task_list)
for thread_num in range(self.concurrency):
if self.work_queue.empty() is True:
break
worker = Thread(target=self.repeat, args=(action,))
worker.start()
self.work_queue.join()
def generate_report(self):
today = datetime.today()
file_prefix = "{:%Y-%m-%d_%H_%M_%S-}".format(today)
file_name = (file_prefix +
'health_check_result-' +
str(self.total_tasks_cout) +
'tasks.csv')
f = open(file_name, "w")
f.write("Task ID,Health check timestamp")
while self.result_queue.empty() is False:
try:
result = self.result_queue.get_nowait()
except Empty:
continue
for timestamp in result['data']:
f.write("\n%s,%s" % (result['id'], timestamp))
f.close()
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument("-m", "--marathon",
help="Marathon URL, on example "
"http://172.20.8.34:8080/virt-env-2/marathon",
required=True)
parser.add_argument("-t", "--tasks",
help="Total tasks count",
required=True)
parser.add_argument("-i", "--image",
help="Docker image path",
required=True)
args = parser.parse_args()
bencher = HealthCheckBencher(args.marathon, args.image, int(args.tasks))
bencher.create_apps()
bencher.start_test()
bencher.remove_apps()
bencher.generate_report()

View File

@ -0,0 +1,52 @@
#!/usr/bin/python
from argparse import ArgumentParser
import numpy as np
def parse(file, percentile, interval):
data = {}
with open(file) as fp:
for line in fp:
record = line.rstrip().split(',')
try:
timestamp = float(record[1])
if record[0] not in data:
data[record[0]] = []
data[record[0]].append(timestamp)
except ValueError:
continue
deviations = []
for task in data:
data[task].sort()
last_timestamp = 0
for timestamp in data[task]:
if last_timestamp == 0:
last_timestamp = timestamp
continue
cur_interval = timestamp - last_timestamp
last_timestamp = timestamp
deviations.append(np.fabs(interval - cur_interval))
print("Total tasks: {}. Total health checks: {}".format(len(data.keys()),
len(deviations)))
print("min: {}. max: {}, average: {},"
" percentile: {}".format(np.min(deviations),
np.max(deviations),
np.average(deviations),
np.percentile(deviations, percentile)))
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument("-f", "--file",
help="File to be parsed",
required=True)
parser.add_argument("-i", "--interval",
help="Configured health check interval(sec)",
required=True)
parser.add_argument("-p", "--persentile",
help="Percentile value [0-100]. Default 95",
required=False, default=95.0)
args = parser.parse_args()
parse(args.file, float(args.persentile), int(args.interval))

View File

@ -0,0 +1,108 @@
#!/usr/bin/python
from BaseHTTPServer import BaseHTTPRequestHandler
import os
import time
VERSION = 1.0
class ServerStatus(object):
collecting = 0
status = 1
last_timestamp = 0
interval_arr = []
timestamp_arr = []
class GetHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/start':
ServerStatus.status = 1
self.send_response(200)
self.end_headers()
elif self.path == '/stop':
ServerStatus.status = 0
self.send_response(200)
self.end_headers()
elif self.path == '/start_collect':
ServerStatus.collecting = 1
self.send_response(200)
self.end_headers()
elif self.path == '/stop_collect':
ServerStatus.collecting = 0
self.send_response(200)
self.end_headers()
elif self.path == '/get_intervals':
self.send_response(200)
self.send_header("Context-Type", "text/plain")
self.end_headers()
tmp_str = ','.join(str(x) for x in ServerStatus.interval_arr)
self.wfile.write(tmp_str)
self.wfile.close()
elif self.path == '/version':
self.send_response(200)
self.send_header("Context-Type", "text/plain")
self.end_headers()
self.wfile.write(VERSION)
self.wfile.close()
elif self.path == '/get_stats':
if ServerStatus.collecting == 0:
self.send_response(202)
else:
self.send_response(200)
self.send_header("Context-Type", "text/plain")
self.end_headers()
timestamp_str = ','.join(str(x)
for x in ServerStatus.timestamp_arr)
intervals_str = ','.join(str(x) for x in ServerStatus.interval_arr)
self.wfile.write(timestamp_str+'\n'+intervals_str)
self.wfile.close()
elif self.path == '/get_timestamps':
if ServerStatus.collecting == 0:
self.send_response(202)
else:
self.send_response(200)
self.send_header("Context-Type", "text/plain")
self.end_headers()
tmp_str = ','.join(str(x) for x in ServerStatus.timestamp_arr)
self.wfile.write(tmp_str)
self.wfile.close()
elif self.path == '/is_collecter_start':
self.send_response(200)
self.send_header("Context-Type", "text/plain")
self.end_headers()
if ServerStatus.collecting == 1:
self.wfile.write("Yes")
else:
self.wfile.write("No")
self.wfile.close()
elif self.path == '/clear_stats':
ServerStatus.interval_arr = []
ServerStatus.timestamp_arr = []
ServerStatus.last_timestamp = 0
self.send_response(200)
self.end_headers()
elif self.path == '/status':
if ServerStatus.collecting == 1:
if ServerStatus.last_timestamp == 0:
ServerStatus.last_timestamp = time.time()
else:
current_time = time.time()
interval = round(current_time -
ServerStatus.last_timestamp, 3)
ServerStatus.interval_arr.append(interval)
ServerStatus.timestamp_arr.append(round(current_time, 3))
ServerStatus.last_timestamp = current_time
if ServerStatus.status == 0:
self.send_response(503)
else:
self.send_response(200)
self.end_headers()
return
if __name__ == '__main__':
port = int(os.getenv('SERVER_PORT', '80'))
from BaseHTTPServer import HTTPServer
server = HTTPServer(('0.0.0.0', port), GetHandler)
server.serve_forever()