Postprocess after a workload has executed

This work is to take an results directory and find all the
rally/Shaker/Perfkit JSONs and create the Elasticsearch JSONs that can be
pushed to Elasticsearch.

This is also a minor refactor of the Rally lib to refactor how we push
results into elasticsearch. This creates a generic function so we can
have file and/or taskid based metrics created

+ (sai) Fix way how we crawl for files
+ (sai) Exclude already postprocessed files
+ (sai) Fix filenaming of dumped postprocessed files
+ {sai) flake8

Co-Authored-By: Sai Sindhur Malleni <smalleni@redhat.com>
Change-Id: I5ca8877f26e889856c9773b51ba38f24562a80af
This commit is contained in:
Joe Talerico 2017-01-17 14:57:51 -05:00 committed by Sai Sindhur Malleni
parent ae06629d82
commit 423da4ff52
3 changed files with 152 additions and 45 deletions

View File

@ -41,6 +41,8 @@ def main():
parser.add_argument('workloads', nargs='*', help='Browbeat workload(s). Takes a space separated'
' list of workloads ({}) or \"all\"'.format(', '.join(_workload_opts)))
parser.add_argument('--debug', action='store_true', help='Enable Debug messages')
parser.add_argument('-p','--postprocess',
dest="path",help="Path to process, ie results/20170101/")
_cli_args = parser.parse_args()
_logger = logging.getLogger('browbeat')
@ -67,6 +69,8 @@ def main():
# Default to all workloads
if _cli_args.workloads == []:
_cli_args.workloads.append('all')
if _cli_args.path :
return tools.post_process(_cli_args)
if len(_cli_args.workloads) == 1 and 'all' in _cli_args.workloads:
_cli_args.workloads = _workload_opts

View File

@ -23,6 +23,7 @@ import shutil
import time
import Tools
import WorkloadBase
import json
class Rally(WorkloadBase.WorkloadBase):
@ -33,7 +34,8 @@ class Rally(WorkloadBase.WorkloadBase):
self.tools = Tools.Tools(self.config)
self.connmon = Connmon.Connmon(self.config)
self.grafana = Grafana.Grafana(self.config)
self.elastic = Elastic.Elastic(self.config, self.__class__.__name__.lower())
self.elastic = Elastic.Elastic(
self.config, self.__class__.__name__.lower())
self.error_count = 0
self.pass_count = 0
self.test_count = 0
@ -125,16 +127,44 @@ class Rally(WorkloadBase.WorkloadBase):
result['rally_metadata'] = meta
return result
def json_result(self, task_id, scenario_name, run, test_name, result_dir):
def file_to_json(self, filename, push_to_es=False):
self.logger.info("Loading rally JSON file {} JSON".format(filename))
rally_json = self.elastic.load_json_file(filename)
errors, results = self.json_parse(rally_json)
for error in errors:
error_result = self.elastic.combine_metadata(error)
with open("{}/{}-error_index-es.json".format(os.path.dirname(filename),
os.path.basename(filename)),
'w+') as error_file:
json.dump(error_result, error_file)
for result in results:
result_doc = self.elastic.combine_metadata(result)
with open("{}/{}-result_index-es.json".format(os.path.dirname(filename),
os.path.splitext(
os.path.basename(filename))[0]),
'w+') as result_file:
json.dump(result_doc, result_file)
return errors, results
def json_parse(self, json_doc, metadata={}):
"""Function to extract data out of a json document
Args:
json_doc (json): json document to parse
metadata (dict): dict containing run specific metadata, ie rally UUID.
Returns:
errors (list) : errors contained within the json_doc
results (list) : results contained within the json_doc
"""
rally_data = {}
failure = False
self.logger.info("Loading Task_ID {} JSON".format(task_id))
rally_json = self.elastic.load_json(self.gen_scenario_json(task_id))
es_ts = datetime.datetime.utcnow()
if len(rally_json) < 1:
self.logger.error("Issue with Rally Results")
errors = []
results = []
if len(json_doc) < 1:
self.logger.error("Issue with JSON document")
return False
for metrics in rally_json[0]['result']:
es_ts = datetime.datetime.utcnow()
for metrics in json_doc[0]['result']:
for workload in metrics:
if type(metrics[workload]) is dict:
for value in metrics[workload]:
@ -146,47 +176,56 @@ class Rally(WorkloadBase.WorkloadBase):
iteration = 1
workload_name = value
if value.find('(') is not -1:
iteration = re.findall('\d+', value)
iteration = re.findall('\d+', value)[0]
workload_name = value.split('(')[0]
error = {'action': workload_name.strip(),
'browbeat_rerun': run,
'iteration': iteration,
'error_type': metrics['error'][0],
'error_msg': metrics['error'][1],
'result': task_id,
'timestamp': str(es_ts).replace(" ", "T"),
'rally_setup': rally_json[0]['key'],
'scenario': scenario_name,
'rally_setup': json_doc[0]['key']
}
error_result = self.elastic.combine_metadata(error)
index_status = self.elastic.index_result(error_result, test_name, result_dir,
workload, 'error')
if index_status is False:
failure = True
if len(metadata) > 0:
error.update(metadata)
errors.append(error)
for workload in rally_data:
if not type(rally_data[workload]) is dict:
iteration = 1
workload_name = workload
if workload.find('(') is not -1:
iteration = re.findall('\d+', workload)
iteration = re.findall('\d+', workload)[0]
workload_name = workload.split('(')[0]
rally_stats = {'result': task_id,
'action': workload_name.strip(),
'browbeat_rerun': run,
rally_stats = {'action': workload_name.strip(),
'iteration': iteration,
'timestamp': str(es_ts).replace(" ", "T"),
'grafana_url': [self.grafana.grafana_urls()],
'scenario': scenario_name,
'rally_setup': rally_json[0]['key'],
'rally_setup': json_doc[0]['key'],
'raw': rally_data[workload]}
result = self.elastic.combine_metadata(rally_stats)
index_status = self.elastic.index_result(result, test_name, result_dir, workload)
if index_status is False:
failure = True
if failure:
return False
else:
return True
if len(metadata) > 0:
rally_stats.update(metadata)
results.append(rally_stats)
return errors, results
def json_result(self, task_id, scenario_name, run, test_name, result_dir):
success = True
self.logger.info("Loading Task_ID {} JSON".format(task_id))
rally_json = self.elastic.load_json(self.gen_scenario_json(task_id))
errors, results = self.json_parse(rally_json, {'scenario': scenario_name,
'browbeat_rerun': run,
'result': task_id})
for error in errors:
error_result = self.elastic.combine_metadata(error)
status = self.elastic.index_result(error_result, test_name, result_dir,
'rally', 'error')
if not status:
success = False
for result in results:
result = self.elastic.combine_metadata(result)
status = self.elastic.index_result(
result, test_name, result_dir, 'rally')
if not status:
success = False
return success
def start_workloads(self):
"""Iterates through all rally scenarios in browbeat yaml config file"""
@ -239,7 +278,8 @@ class Rally(WorkloadBase.WorkloadBase):
del scenario['concurrency']
else:
concurrencies = def_concurrencies
concurrency_count_dict = collections.Counter(concurrencies)
concurrency_count_dict = collections.Counter(
concurrencies)
if 'times' not in scenario:
scenario['times'] = def_times
@ -260,7 +300,8 @@ class Rally(WorkloadBase.WorkloadBase):
self.logger.debug("Duplicate concurrency {} found,"
" setting test name"
" to {}".format(concurrency, test_name))
concurrency_count_dict[concurrency] -= 1
concurrency_count_dict[
concurrency] -= 1
if not result_dir:
self.logger.error(
@ -311,7 +352,8 @@ class Rally(WorkloadBase.WorkloadBase):
index_status = self.json_result(
task_id, scenario_name, run, test_name, result_dir)
self.get_time_dict(to_time, from_time,
benchmark['name'], new_test_name,
benchmark[
'name'], new_test_name,
workload, "pass", index_status)
else:
self.get_time_dict(to_time, from_time, benchmark[

View File

@ -17,9 +17,11 @@ import logging
import os
import subprocess
import yaml
import re
from pykwalify import core as pykwalify_core
from pykwalify import errors as pykwalify_errors
class Tools(object):
def __init__(self, config=None):
@ -59,32 +61,38 @@ class Tools(object):
# Create directory for results
def create_results_dir(self, results_dir, timestamp, service, scenario):
the_directory = "{}/{}/{}/{}".format(results_dir, timestamp, service, scenario)
the_directory = "{}/{}/{}/{}".format(results_dir,
timestamp, service, scenario)
if not os.path.isdir(the_directory):
try:
os.makedirs(the_directory)
except OSError as err:
self.logger.error("Error creating the results directory: {}".format(err))
self.logger.error(
"Error creating the results directory: {}".format(err))
return False
return the_directory
def _load_config(self, path):
def _load_config(self, path, validate=True):
try:
stream = open(path, 'r')
except IOError:
self.logger.error("Configuration file {} passed is missing".format(path))
self.logger.error(
"Configuration file {} passed is missing".format(path))
exit(1)
config = yaml.load(stream)
stream.close()
self.config = config
self.validate_yaml()
if validate:
self.validate_yaml()
return config
def validate_yaml(self):
self.logger.info("Validating the configuration file passed by the user")
self.logger.info(
"Validating the configuration file passed by the user")
stream = open("lib/validate.yaml", 'r')
schema = yaml.load(stream)
check = pykwalify_core.Core(source_data=self.config, schema_data=schema)
check = pykwalify_core.Core(
source_data=self.config, schema_data=schema)
try:
check.validate(raise_exception=True)
self.logger.info("Validation successful")
@ -110,12 +118,14 @@ class Tools(object):
meta = self.config['elasticsearch']['metadata_files']
for _meta in meta:
if not os.path.isfile(_meta['file']):
self.logger.error("Metadata file {} is not present".format(_meta['file']))
self.logger.error(
"Metadata file {} is not present".format(_meta['file']))
return False
return True
def gather_metadata(self):
os.putenv("ANSIBLE_SSH_ARGS"," -F {}".format(self.config['ansible']['ssh_config']))
os.putenv("ANSIBLE_SSH_ARGS",
" -F {}".format(self.config['ansible']['ssh_config']))
ansible_cmd = \
'ansible-playbook -i {} {}' \
.format(self.config['ansible']['hosts'], self.config['ansible']['metadata'])
@ -126,3 +136,54 @@ class Tools(object):
else:
self.logger.info("Metadata about cloud has been gathered")
return True
def post_process(self, cli):
workloads = {}
workloads['shaker'] = re.compile("shaker")
workloads['perfkit'] = re.compile("perfkit")
workloads['rally'] = re.compile("(?!perfkit)|(?!shaker)")
""" Iterate through dir structure """
results = {}
if os.path.isdir(cli.path):
for dirname, dirnames, files in os.walk(cli.path):
self.logger.info("Inspecting : %s" % dirname)
results[dirname] = files
else:
self.logger.error("Path does not exist")
return False
""" Capture per-workload results """
workload_results = {}
json = re.compile("\.json")
if len(results) > 0:
for path in results:
for regex in workloads:
if re.findall(workloads[regex], path):
if regex not in workload_results:
workload_results[regex] = []
for file in results[path]:
if (re.findall(json, file) and
'result_index-es' not in file):
workload_results[regex].append(
"{}/{}".format(path, file))
else:
self.logger.error("Results are empty")
return False
""" Iterate through each workload result, generate ES JSON """
if len(workload_results) > 0:
for workload in workload_results:
if workload is "rally":
rally = Rally.Rally(self.config)
for file in workload_results[workload]:
errors, results = rally.file_to_json(file)
return True
if cli.es:
# Stub for when we want to push to ES.
continue
if workload is "shaker":
# Stub for Shaker.
continue
if workload is "perfkit":
# Stub for PerfKit.
continue