# Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from collections import deque import datetime import json import logging import os import re import sys import time import uuid import elasticsearch from elasticsearch import helpers browbeat_uuid = uuid.uuid4() class Elastic(object): def __init__( self, config, workload, tool="browbeat", cache_size=50, max_cache_time=10): self.config = config self.cache = deque() self.max_cache_size = cache_size self.last_upload = datetime.datetime.utcnow() self.max_cache_age = datetime.timedelta(minutes=max_cache_time) self.logger = logging.getLogger('browbeat.elastic') self.es = elasticsearch.Elasticsearch([ {'host': self.config['elasticsearch']['host'], 'port': self.config['elasticsearch']['port']}], send_get_body_as='POST' ) life = self.config['elasticsearch']['life'] self.workload = workload today = datetime.datetime.today() self.index = "{}-{}-{}-{}".format(tool, workload, life, today.strftime('%Y.%m')) def __del__(self): self.flush_cache() def load_json(self, result): json_data = None self.logger.info("Loading JSON") json_data = json.loads(result) return json_data def load_json_file(self, result): json_data = None self.logger.info("Loading JSON file : {}".format(result)) try: with open(result) as jdata: json_data = json.load(jdata) except (IOError, OSError): self.logger.error("Error loading JSON file : {}".format(result)) return False return json_data def combine_metadata(self, result): if (self.config['elasticsearch']['metadata_files'] is not None and len(self.config['elasticsearch']['metadata_files']) > 0): meta = self.config['elasticsearch']['metadata_files'] for _meta in meta: try: with open(_meta['file']) as jdata: result[_meta['name']] = json.load(jdata) except Exception: self.logger.error( "Error loading Metadata file : {}".format( _meta['file'])) self.logger.error( "Please make sure the metadata file exists and" " is valid JSON or run the playbook ansible/gather/site.yml" " before running the Browbeat test Suite") sys.exit(1) return result # Used to transform the cache dict into an elastic insertable iterable def cache_insertable_iterable(self): output = deque() for item in self.cache: es_item = {} es_item['_id'] = item['_id'] es_item['_source'] = item['result'] es_item['_index'] = self.index output.append(es_item) return output def flush_cache(self): if len(self.cache) == 0: return True retry = 2 for i in range(retry): try: to_upload = helpers.parallel_bulk( self.es, self.cache_insertable_iterable()) counter = 0 num_items = len(self.cache) for item in to_upload: self.logger.debug( "{} of {} Elastic objects uploaded".format( num_items, counter)) counter = counter + 1 output = "Pushed {} items to Elasticsearch to index {}".format( num_items, self.index) output += " and browbeat UUID {}".format(str(browbeat_uuid)) self.logger.info(output) self.cache = deque() self.last_upload = datetime.datetime.utcnow() return True except Exception as Err: self.logger.error( "Error pushing data to Elasticsearch, going to retry" " in 10 seconds") self.logger.error("Exception: {}".format(Err)) time.sleep(10) if i == (retry - 1): self.logger.error( "Pushing Data to Elasticsearch failed in spite of retry," " dumping JSON for {} cached items".format( len( self.cache))) for item in self.cache: filename = item['test_name'] + '-' + item['identifier'] filename += '-elastic' + '.' + 'json' elastic_file = os.path.join(item['result_dir'], filename) with open(elastic_file, 'w') as result_file: json.dump(item['result'], result_file, indent=4, sort_keys=True) self.logger.info( "Saved Elasticsearch consumable result JSON to {}". format( elastic_file)) self.cache = deque() self.last_upload = datetime.datetime.utcnow() return False def get_software_metadata(self, index, role, browbeat_uuid): nodes = {} results = self.query_uuid(index, browbeat_uuid) pattern = re.compile(".*{}.*".format(role)) if results: for result in results: for metadata in result['_source']['software-metadata']: for service in metadata: if 'node_name' in metadata[service]: if pattern.match(metadata[service]['node_name']): if metadata[service]['node_name'] not in nodes: nodes[metadata[service]['node_name']] = metadata return nodes else: self.logger.error("UUID {} wasn't found".format(browbeat_uuid)) return False """ summarize_results this function will iterate through all the data points, combining the iteration and rerun data points into a single 95%tile. """ def summarize_results(self, data, combined): import numpy summary = {} if combined: if len(data) > 1: for result in data: if result['browbeat_uuid'] not in summary: summary[result['browbeat_uuid']] = {} if result['scenario'] not in summary[result['browbeat_uuid']]: summary[result['browbeat_uuid']][result['scenario']] = {} if result['action'] not in summary[ result['browbeat_uuid']][ result['scenario']]: summary[result['browbeat_uuid']][ result['scenario']][result['action']] = [] summary[result['browbeat_uuid']][result['scenario']][ result['action']].append(result['performance']) else: if len(data) > 1: for result in data: if result['browbeat_uuid'] not in summary: summary[result['browbeat_uuid']] = {} if result['scenario'] not in summary[result['browbeat_uuid']]: summary[result['browbeat_uuid']][result['scenario']] = {} if result['time'] not in summary[result['browbeat_uuid']][result['scenario']] : summary[result['browbeat_uuid']][result['scenario']][result['time']] = {} if result['concurrency'] not in summary[result['browbeat_uuid']][ result['scenario']][result['time']] : summary[result['browbeat_uuid']][result['scenario']][result['time']][ result['concurrency']] = {} if result['action'] not in summary[ result['browbeat_uuid']][ result['scenario']][ result['time']][ result['concurrency']]: summary[result['browbeat_uuid']][ result['scenario']][result['time']][result['concurrency']][ result['action']] = [] summary[result['browbeat_uuid']][result['scenario']][result['time']][ result['concurrency']][ result['action']].append(result['performance']) if len(summary) > 0 and combined : for uuids in summary: for scenario in summary[uuids]: for action in summary[uuids][scenario]: summary[uuids][scenario][action] = numpy.percentile( summary[uuids][scenario][action], 95) elif len(summary) > 0 and not combined : for uuids in summary: for scenario in summary[uuids]: for times in summary[uuids][scenario]: for concurrency in summary[uuids][scenario][times]: for action in summary[uuids][scenario][times][concurrency]: summary[uuids][scenario][times][ concurrency][action] = numpy.percentile( summary[uuids][scenario][times][concurrency][action], 95) else: return False return summary """ """ def compare_rally_results(self, data, uuids, combined, metadata=None): import numpy missing = [] if len(data) < 2: self.logger.error("Not enough data to compare") return False if (uuids[0] not in data) or (uuids[1] not in data): self.logger.error("Not able to find UUID in data set") return False if combined: errors = {} errors[uuids[0]] = {} errors[uuids[1]] = {} for error in self.get_errors("browbeat-rally-*", uuids[0]): action = error['_source']['action'] scenario = error['_source']['scenario'] if scenario not in errors[uuids[0]]: errors[uuids[0]][scenario] = {} if action not in errors[uuids[0]][scenario]: errors[uuids[0]][scenario][action] = [] errors[uuids[0]][scenario][action].append(error['_source'][ 'result']) for error in self.get_errors("browbeat-rally-*", uuids[1]): action = error['_source']['action'] scenario = error['_source']['scenario'] if scenario not in errors[uuids[1]]: errors[uuids[1]][scenario] = {} if action not in errors[uuids[1]][scenario]: errors[uuids[1]][scenario][action] = [] errors[uuids[1]][scenario][action].append(error[ '_source']['result']) print("+{}+".format("-" * (33 + 44 + 20 + 30 + 35))) err_uuid1 = " errors-%s " % uuids[0][-8:], err_uuid2 = " errors-%s " % uuids[1][-8:], print("{0:34}|{1:40}|{2:20}|{3:20}|{4:10}|{5:10}|{6:13}".format( " Scenario ", " Action ", " %s " % err_uuid1, " %s " % err_uuid2, " %s " % uuids[0][-8:], " %s " % uuids[1][-8:], "% Difference")) print("+{}+".format("-" * (33 + 44 + 20 + 30 + 35))) for scenario in data[uuids[0]]: if scenario not in data[uuids[1]]: missing.append(scenario) continue else: for action in data[uuids[0]][scenario]: if action not in data[uuids[1]][scenario]: continue dset = [data[uuids[0]][scenario][action], data[uuids[1]][scenario][action]] err0 = 0 if uuids[0] in errors : if scenario in errors[uuids[0]] : if action in errors[uuids[0]][scenario]: err0 = len(errors[uuids[0]][scenario][action]) err1 = 0 if uuids[1] in errors : if scenario in errors[uuids[1]] : if action in errors[uuids[1]][scenario]: err1 = len(errors[uuids[1]][scenario][action]) perf0 = data[uuids[0]][scenario][action] perf1 = data[uuids[1]][scenario][action] diff = numpy.diff(dset)[0] / numpy.abs(dset[:-1])[0] * 100 output = "{0:34}|{1:40}|{2:20}".format( scenario, action, err0) output += "|{0:20}|{1:10.3f}|{2:10.3f}|{3:13.3f}".format( err1, perf0, perf1, diff) print(output) print("+{}+".format("-" * (33 + 44 + 20 + 30 + 35))) else: errors = {} errors[uuids[0]] = {} errors[uuids[1]] = {} for error in self.get_errors("browbeat-rally-*", uuids[0]): concurrency = error['_source']['rally_setup']['kw']['runner']['concurrency'] times = error['_source']['rally_setup']['kw']['runner']['times'] action = error['_source']['action'] scenario = error['_source']['scenario'] if scenario not in errors[uuids[0]]: errors[uuids[0]][scenario] = {} if times not in errors[uuids[0]][scenario]: errors[uuids[0]][scenario][times] = {} if concurrency not in errors[uuids[0]][scenario][times]: errors[uuids[0]][scenario][times][concurrency] = {} if action not in errors[uuids[0]][scenario][times][concurrency]: errors[uuids[0]][scenario][times][concurrency][action] = [] errors[uuids[0]][scenario][times][concurrency][action].append(error['_source'][ 'result']) for error in self.get_errors("browbeat-rally-*", uuids[1]): concurrency = error['_source']['rally_setup']['kw']['runner']['concurrency'] times = error['_source']['rally_setup']['kw']['runner']['times'] action = error['_source']['action'] scenario = error['_source']['scenario'] if scenario not in errors[uuids[1]]: errors[uuids[1]][scenario] = {} if times not in errors[uuids[1]][scenario]: errors[uuids[1]][scenario][times] = {} if concurrency not in errors[uuids[1]][scenario][times]: errors[uuids[1]][scenario][times][concurrency] = {} if action not in errors[uuids[1]][scenario][times][concurrency]: errors[uuids[1]][scenario][times][concurrency][action] = [] errors[uuids[1]][scenario][times][concurrency][action].append(error[ '_source']['result']) print("+{}+".format("-" * (33 + 44 + 15 + 15 + 50 + 10 + 26))) print("{0:30}|{1:35}|{2:18}|{3:19}|{4:25}|{5:20}|{6:10}|{7:10}|{8:13}".format( " Scenario ", " Action ", " times ", " concurrency ", " errors-%s " % uuids[0][-8:], " errors-%s " % uuids[1][-8:], " %s " % uuids[0][-8:], " %s " % uuids[1][-8:], " % Difference ")) print("+{}+".format("-" * (33 + 44 + 15 + 15 + 50 + 10 + 26))) for scenario in data[uuids[0]]: if scenario not in data[uuids[1]]: missing.append(scenario) continue else: for times in data[uuids[0]][scenario]: if times not in data[uuids[1]][scenario]: continue for concurrency in data[uuids[0]][scenario][times]: if concurrency not in data[uuids[1]][scenario][times]: # Print somehow continue else: for action in data[uuids[0]][scenario][times][concurrency]: if action not in data[uuids[1]][scenario][times][concurrency]: # Print somehow continue else: dset = [data[uuids[0]][scenario][times][ concurrency][action], data[uuids[1]][scenario][times][ concurrency][action]] perf0 = data[uuids[0]][scenario][times][ concurrency][action] perf1 = data[uuids[1]][scenario][times][ concurrency][action] err0 = 0 err1 = 0 if uuids[0] in errors: if scenario in errors[uuids[0]] : if times in errors[uuids[0]][scenario] : if concurrency in errors[uuids[0]][ scenario][times] : if action in errors[uuids[0]][ scenario][times][concurrency] : err0 = len(errors[uuids[0]][ scenario][times][concurrency][ action]) if uuids[1] in errors: if scenario in errors[uuids[1]] : if times in errors[uuids[1]][scenario] : if concurrency in errors[uuids[1]][ scenario][times] : if action in errors[uuids[1]][ scenario][times][concurrency] : err1 = len(errors[uuids[1]][ scenario][times][concurrency][ action]) diff = numpy.diff(dset)[0] / numpy.abs(dset[:-1])[0] * 100 output = "{0:30}|{1:35}|{2:18}|{3:19}" output += "|{4:25}|{5:20}" output += "|{6:10.3f}|{7:10.3f}|{8:13.3f}" print(output.format(scenario, action, times, concurrency, err0, err1, perf0, perf1, diff)) print("+{}+".format("-" * (33 + 44 + 15 + 15 + 50 + 10 + 26))) if metadata: print("+{}+".format("-" * (40 + 20 + 20 + 33))) print("{0:40} | {1:20} | {2:20} | {3:20}".format("UUID", "Version", "Build", "Number of runs")) print("+{}+".format("-" * (40 + 20 + 20 + 33))) for uuids in metadata: print("{0:40} | {1:20} | {2:20} | {3:20}".format(uuids, metadata[uuids][ 'version'], metadata[uuids][ 'build'], metadata[uuids]['rerun'])) print("+{}+".format("-" * (40 + 20 + 20 + 33))) if len(missing) > 0: print("+-------------------------------------+") print("Missing Scenarios to compare results:") print("+-------------------------------------+") for scenario in missing: print(" - {}".format(scenario)) """ returns a list of dicts that contain 95%tile performance data. """ def get_result_data(self, index, browbeat_uuid): import numpy results = [] data = [] metadata = {} if len(browbeat_uuid) < 1 : self.logger.error("No uuids to calculate values") return [], {} for uuids in browbeat_uuid: results.append(self.query_uuid(index, uuids)) for result in results: for value in result: if value['_source']['browbeat_uuid'] not in metadata: metadata[value['_source']['browbeat_uuid']] = {} if 'version' in value['_source']: metadata[ value['_source']['browbeat_uuid']] = { 'version': value['_source']['version']['osp_series'], 'build': value['_source']['version']['build'], 'rerun': value['_source']['browbeat_config']['browbeat']['rerun']} data.append({ 'browbeat_uuid': value['_source']['browbeat_uuid'], 'scenario': value['_source']['scenario'], 'action': value['_source']['action'], 'time': value['_source']['rally_setup']['kw']['runner']['times'], 'concurrency': value['_source']['rally_setup']['kw']['runner'][ 'concurrency'], 'iteration': value['_source']['iteration'], 'rerun': value['_source']['browbeat_rerun'], 'performance': numpy.percentile(value['_source']['raw'], 95) }) if len(data) < 1: return False else: return data, metadata def get_version_metadata(self, index, browbeat_uuid): version = {} results = self.query_uuid(index, browbeat_uuid) if results: for result in results: version = result['_source']['version'] return version else: self.logger.error("UUID {} wasn't found".format(browbeat_uuid)) """ Currently this function will only compare two uuids. I (rook) am not convinced it is worth the effort to engineer anything > 2. """ def compare_metadata(self, index, role, uuids): meta = [] for browbeat_uuid in uuids: self.logger.info( "Querying Elastic : index [{}] : role [{}] : browbeat_uuid [{}] ".format( index, role, browbeat_uuid)) software_metadata = self.get_software_metadata( index, role, browbeat_uuid) if software_metadata: meta.append(software_metadata) else: return False ignore = [ "connection", "admin_url", "my_ip", "local_ip", "bind_host", "rabbit_hosts", "auth_url", "public_bind_host", "host", "key", "url", "auth_uri", "www_authenticate_uri", "coordination_url", "swift_authurl", "admin_token", "memcached_servers", "api_servers", "osapi_volume_listen", "nova_url", "coordination_url", "memcache_servers", "novncproxy_host", "backend_url", "novncproxy_base_url", "metadata_listen", "osapi_compute_listen", "admin_bind_host", "glance_api_servers", "iscsi_ip_address", "registry_host", "auth_address", "swift_key", "auth_encryption_key", "metadata_proxy_shared_secret", "telemetry_secret", "heat_metadata_server_url", "heat_waitcondition_server_url", "catalog_info", "gather_conf_path", "exec_dirs", "transport_url"] if len(meta) < 2: self.logger.error("Unable to compare data-sets") return False differences = [] for host in meta[0]: if host not in meta[1]: self.logger.error("Deployment differs: " "Host [{}] missing ".format(host)) continue for service in meta[0][host]: if service not in meta[1][host].keys(): self.logger.debug( "UUID {} " "- Missing Service : " "Host [{}] Service [{}]".format( uuids[1], host, service)) continue for options in meta[0][host][service].keys(): if options not in meta[1][host][service]: self.logger.debug( "UUID {} " "- Missing Option : " "Host [{}] Service [{}] {}".format( uuids[1], host, service, options)) continue if isinstance(meta[0][host][service][options], dict): for key in meta[0][host][service][options].keys(): if key not in ignore: if key in meta[1][host][service][options]: value = meta[0][host][ service][options][key] new_value = meta[1][host][ service][options][key] if value != new_value: differences.append("{}|{}|{}|{}|{}|{}".format( host, service, options, key, value, new_value)) else: self.logger.error( "UUID {} - Missing Value : " "Host [{}] Service [{}] {} [{}]".format( uuids[1], host, service, options, key)) print("+{}+".format("-" * (33 + 44 + 15 + 15 + 30 + 10 + 6))) print("{0:25} | {1:15} | {2:30} | {3:23} | {4:40} | {5:40} ".format( "Host", "Service", "Option", "Key", "Old Value", "New Value")) print("+{}+".format("-" * (33 + 44 + 15 + 15 + 30 + 10 + 6))) for difference in differences: value = difference.split("|") print("{0:25} | {1:15} | {2:30} | {3:23} | {4:40} | {5:40} ".format(value[0], value[1], value[2], value[3], value[4], value[5])) print("+{}+".format("-" * (33 + 44 + 15 + 15 + 30 + 10 + 6))) def scroll(self, search, sid, scroll_size): data = [] if scroll_size < 1 : self.logger.info("Nothing to sroll through") return data while (scroll_size > 0): self.logger.info("Scrolling through Browbeat {} documents...".format(scroll_size)) for x in range(0, len(search['hits']['hits'])) : data.append(search['hits']['hits'][x]) search = self.es.scroll(scroll_id=sid, scroll='2m') sid = search['_scroll_id'] scroll_size = len(search['hits']['hits']) return data """ get_errors - was inteded to capture all the errors across the entire index, however, this is quite expensive, and it might be quicker to only look for errors for specific browbeat_uuids """ def get_errors(self, index, browbeat_id, action=None, times=None, concurrency=None): self.logger.info("Making query against {}".format(index)) body = {"query": { "bool": {"must": [ {"term": {"browbeat_uuid": browbeat_id}}, {"term": {"doc_type": 'error'}} ]} }} """ body = {"query": { "bool": { "must": [ {"term": {"browbeat_uuid": browbeat_id}}, {"term": {"action": "{}".format(action)}}, {"term": {"rally_setup.kw.runner.times": times}}, {"term": {"rally_setup.kw.runner.concurrency": concurrency}}] }}} """ page = self.es.search( index=index, scroll='1m', size=1000, body=body, request_timeout=240) sid = page['_scroll_id'] scroll_size = page['hits']['total'] self.logger.info("Searching through ES for uuid: {}".format(browbeat_id)) return self.scroll(page,sid,scroll_size) def get_results(self, index, browbeat_uuid): body = { "query": { "bool": { "should": [ { "term": { "browbeat_uuid": browbeat_uuid }}]}}} self.logger.info("Making query against {}".format(index)) page = self.es.search( index=index, scroll='1m', size=1000, body=body, request_timeout=240) sid = page['_scroll_id'] scroll_size = page['hits']['total'] self.logger.info("Searching through ES for uuid: {}".format(browbeat_uuid)) return self.scroll(page,sid,scroll_size) def query_uuid(self, index, browbeat_uuid): results = self.get_results(index, browbeat_uuid) if len(results) > 0: return results else: self.logger.info("No results found for uuid : {}".format(browbeat_uuid)) return False def index_result(self, result, test_name, result_dir, identifier='', _type='result', _id=None): data = {} result['browbeat_uuid'] = str(browbeat_uuid) result['cloud_name'] = self.config['browbeat']['cloud_name'] result['browbeat_config'] = self.config data['result'] = result data['result']['doc_type'] = _type data['test_name'] = test_name data['result_dir'] = result_dir data['identifier'] = identifier data['_id'] = _id self.cache.append(data) now = datetime.datetime.utcnow() if (len(self.cache) <= self.max_cache_size and (now - self.last_upload) <= self.max_cache_age): return True else: return self.flush_cache()