performance-docs/scripts/marathon-health-check-testing/HealthCheckBencher.py

256 lines
9.0 KiB
Python
Executable File

#!/usr/bin/python
from argparse import ArgumentParser
from datetime import datetime
from hashlib import md5
from marathon import MarathonClient
from marathon.models.container import MarathonContainer
from marathon.models.container import MarathonContainerPortMapping
from marathon.models.container import MarathonDockerContainer
from marathon.models import MarathonApp
from marathon.models import MarathonHealthCheck
from Queue import Empty
from Queue import Queue
from random import random
import six
from threading import Thread
from time import sleep
from urllib2 import urlopen
MEM = 50
CPUS = 1
DISK = 50
class HealthCheckBencher(object):
def __init__(self, marathon_url, image, tasks):
self.concurrency = 20
self.docker_image = image
self.app_base_name = 'health-check-test-'
self.total_tasks_cout = int(tasks)
self.instances_per_app = 50
if tasks < self.instances_per_app:
self.instances_per_app = self.total_tasks_cout
self.app_count = 1
else:
self.app_count = self.total_tasks_cout/self.instances_per_app
self.heath_check_interval = 30
self.test_duration = 20
self.marathon_cluster = MarathonClient(marathon_url, timeout=240)
self.work_queue = Queue()
self.result_queue = Queue()
self.app_list_queue = Queue()
self.action_list = [self.start_collect,
'sleep={}'.format(self.test_duration),
self.get_stats]
def remove_apps(self):
apps = self.marathon_cluster.list_apps()
for app in apps:
if app.id.startswith("/"+self.app_base_name):
self.marathon_cluster.delete_app(app.id)
active = 0
while True:
apps = self.marathon_cluster.list_apps()
for app in apps:
if app.id.startswith(self.app_base_name):
active += 1
if active == 0:
break
def create_app(self, id):
port_mapping = MarathonContainerPortMapping(container_port=80,
protocol="tcp")
app_docker = MarathonDockerContainer(
image=self.docker_image,
network="BRIDGE",
force_pull_image=True,
port_mappings=[port_mapping])
app_container = MarathonContainer(docker=app_docker)
http_health_check = MarathonHealthCheck(
protocol="HTTP",
path="/status",
grace_period_seconds=300,
interval_seconds=self.heath_check_interval,
timeout_seconds=20,
max_consecutive_failures=0
)
app_suffix = str(md5(str(random())).hexdigest())
app_name = self.app_base_name + app_suffix
new_app = MarathonApp(cpus=CPUS, mem=MEM, disk=DISK,
container=app_container,
health_checks=[http_health_check],
instances=self.instances_per_app,
max_launch_delay_seconds=5)
print("Creating {}".format(app_name))
self.marathon_cluster.create_app(app_id=app_name, app=new_app)
self.app_list_queue.put(app_name)
return None
def wait_instances(self, app_name):
health_ok = 0
while health_ok < self.instances_per_app:
health_ok = 0
tasks = self.marathon_cluster.list_tasks(app_name)
for task in tasks:
if task.health_check_results:
health_ok += 1
def start_collect(self, task):
url = 'http://'+task['host']+':'+str(task['port'])+'/start_collect'
res = urlopen(url)
if res.getcode() == 200:
print(task['id']+': collecter was started')
else:
print(task['id']+': failed to start collecter')
def stop_collect(self, task):
url = 'http://'+task['host']+':'+str(task['port'])+'/stop_collect'
res = urlopen(url)
if res.getcode() == 200:
print(task['id']+': collecter was stopped')
else:
print(task['id']+': failed to stop collecter')
def clear_stats(self, task):
url = 'http://'+task['host']+':'+str(task['port'])+'/clear_stats'
res = urlopen(url)
if res.getcode() == 200:
print(task['id']+': stats was dropped')
else:
print(task['id']+': stats was dropped')
def get_stats(self, task):
url = 'http://'+task['host']+':'+str(task['port'])+'/get_timestamps'
try:
res = urlopen(url)
except Exception:
print("URL req failed")
self.result_queue.put({'id': task['id'],
'status': 'Failed',
'data': []})
return
if res.getcode() == 200:
data = res.read()
timestamps = data.split(',')
self.result_queue.put({'id': task['id'],
'status': 'ok',
'data': timestamps})
elif res.getcode() == 202:
print("Collecting is not enabled")
self.result_queue.put({'id': task['id'],
'status': 'Collecting is not enabled',
'data': []})
else:
print("Unknown response code")
self.result_queue.put({'id': task['id'],
'status': 'Unknown response code',
'data': []})
def repeat(self, action):
while self.work_queue.empty() is False:
try:
iteration = self.work_queue.get_nowait()
except Empty:
continue
action(iteration)
self.work_queue.task_done()
def fill_queue(self, iterations):
for iteration in iterations:
self.work_queue.put(iteration)
def get_tasks(self):
res = []
tasks = self.marathon_cluster.list_tasks()
for task in tasks:
if not task.id.startswith('health-check-test-'):
continue
res.append({'id': str(task.id),
'host': str(task.host),
'port': str(task.ports[0])})
return res
def create_apps(self):
self.fill_queue(range(self.app_count))
for thread_num in range(self.concurrency):
if self.work_queue.empty() is True:
break
worker = Thread(target=self.repeat, args=(self.create_app,))
worker.start()
self.work_queue.join()
while self.app_list_queue.empty() is False:
try:
app_name = self.app_list_queue.get_nowait()
except Empty:
continue
self.work_queue.put(app_name)
for thread_num in range(self.concurrency):
if self.work_queue.empty() is True:
break
worker = Thread(target=self.repeat, args=(self.wait_instances,))
worker.start()
self.work_queue.join()
def start_test(self):
task_list = self.get_tasks()
for action in self.action_list:
if isinstance(action, six.text_type):
if action.startswith('sleep='):
amount = int(action.split('=')[1])
sleep(60*amount)
continue
self.fill_queue(task_list)
for thread_num in range(self.concurrency):
if self.work_queue.empty() is True:
break
worker = Thread(target=self.repeat, args=(action,))
worker.start()
self.work_queue.join()
def generate_report(self):
today = datetime.today()
file_prefix = "{:%Y-%m-%d_%H_%M_%S-}".format(today)
file_name = (file_prefix +
'health_check_result-' +
str(self.total_tasks_cout) +
'tasks.csv')
f = open(file_name, "w")
f.write("Task ID,Health check timestamp")
while self.result_queue.empty() is False:
try:
result = self.result_queue.get_nowait()
except Empty:
continue
for timestamp in result['data']:
f.write("\n%s,%s" % (result['id'], timestamp))
f.close()
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument("-m", "--marathon",
help="Marathon URL, on example "
"http://172.20.8.34:8080/virt-env-2/marathon",
required=True)
parser.add_argument("-t", "--tasks",
help="Total tasks count",
required=True)
parser.add_argument("-i", "--image",
help="Docker image path",
required=True)
args = parser.parse_args()
bencher = HealthCheckBencher(args.marathon, args.image, int(args.tasks))
bencher.create_apps()
bencher.start_test()
bencher.remove_apps()
bencher.generate_report()