performance-docs/scripts/rally-runners/rally_runners/reliability/analytics.py

383 lines
13 KiB
Python

# coding=utf-8
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import math
from interval import interval
import numpy as np
from scipy import stats
from sklearn import cluster as skl
from rally_runners.reliability import types
MIN_CLUSTER_WIDTH = 3 # filter cluster with less items
MAX_CLUSTER_GAP = 6 # max allowed gap in the cluster (otherwise split them)
WINDOW_SIZE = 21 # window size for average duration calculation
WARM_UP_CUTOFF = 10 # drop first N points from etalon
DEGRADATION_THRESHOLD = 4 # how many sigmas duration differs from etalon mean
def find_clusters(arr, filter_fn, max_gap=MAX_CLUSTER_GAP,
min_cluster_width=MIN_CLUSTER_WIDTH):
"""Find clusters of 1 in the sequence containing (0, 1)
The given array is filtered through filter_fn function which produces
sequence of 0s or 1s. Then 1s are grouped into clusters so that:
* there can not be more than max_gap 0s inside
* there are at least min_cluster_width of 1s
:param arr: initial array
:param filter_fn: transformation x -> [0, 1]
:param max_gap: maximum allowed number of consequent 0s inside the cluster
:param min_cluster_width: minimum cluster width
:return: multi-interval (i.e. list of intervals)
"""
clusters = interval()
start = None
end = None
for i, y in enumerate(arr):
v = filter_fn(y)
if v:
if not start:
start = i
end = i
else:
if end and i - end > max_gap:
if end - start >= min_cluster_width:
clusters |= interval([start, end])
start = end = None
if end:
if end - start >= MIN_CLUSTER_WIDTH:
clusters |= interval([start, end])
return clusters
def convert_rally_data(data):
"""Convert raw Rally data into [DataRow]
:param data: raw Rally data
:return: ([DataRow], index of hook)
"""
results = data['result']
start = results[0]['timestamp'] # start of the run
hooks = data['hooks']
hook_index = 0
if hooks:
# when the hook started
hook_start_time = hooks[0]['started_at'] - start
else:
# let all data be etalon
hook_start_time = results[-1]['timestamp']
table = []
for index, result in enumerate(results):
time = result['timestamp'] - start
duration = result['duration']
if time + duration < hook_start_time:
hook_index = index
table.append(types.DataRow(index=index, time=time, duration=duration,
error=bool(result['error'])))
return table, hook_index
def calculate_array_stats(data):
data = np.array(data)
return types.ArrayStats(mean=np.mean(data), median=np.median(data),
p95=np.percentile(data, 95), var=np.var(data),
std=np.std(data), count=len(data))
def indexed_interval_to_time_interval(table, src_interval):
"""For given indexes in the table return time interval
:param table: [DataRow] source data
:param src_interval: interval of array indexes
:return: ClusterStats
"""
start_index = int(src_interval.inf)
end_index = int(src_interval.sup)
if start_index > 0:
d_start = (table[start_index].time - table[start_index - 1].time) / 2
else:
d_start = 0
if end_index < len(table) - 1:
d_end = (table[end_index + 1].time - table[end_index].time) / 2
else:
d_end = 0
start_time = table[start_index].time - d_start
end_time = table[end_index].time + d_end
var = d_start + d_end
duration = end_time - start_time
count = sum(1 if start_time <= p.time <= end_time else 0 for p in table)
return types.ClusterStats(start=start_time, end=end_time, count=count,
duration=types.MeanVar(duration, var))
def calculate_error_area(table):
"""Calculates error statistics
:param table:
:return: list of time intervals where errors occur
"""
error_clusters = find_clusters(
(p.error for p in table),
filter_fn=lambda x: 1 if x else 0,
min_cluster_width=0
)
error_stats = [indexed_interval_to_time_interval(table, cluster)
for cluster in error_clusters]
return error_stats
def calculate_anomaly_area(table, quantile=0.9):
"""Find anomalies
:param quantile: float, default 0.3
:param table:
:return: list of time intervals where anomalies occur
"""
table = [p for p in table if not p.error] # rm errors
x = [p.duration for p in table]
X = np.array(zip(x, np.zeros(len(x))), dtype=np.float)
bandwidth = skl.estimate_bandwidth(X, quantile=quantile)
mean_shift_algo = skl.MeanShift(bandwidth=bandwidth, bin_seeding=True)
mean_shift_algo.fit(X)
labels = mean_shift_algo.labels_
lm = stats.mode(labels)
# filter out the largest cluster
vl = [(0 if labels[i] == lm.mode else 1) for i, p in enumerate(x)]
anomaly_clusters = find_clusters(vl, filter_fn=lambda y: y)
anomaly_stats = [indexed_interval_to_time_interval(table, cluster)
for cluster in anomaly_clusters]
return anomaly_stats
def calculate_smooth_data(table, window_size):
"""Calculate mean for the data
:param table:
:param window_size:
:return: list of points in mean data
"""
table = [p for p in table if not p.error] # rm errors
smooth = []
for i in range(0, len(table) - window_size):
durations = [p.duration for p in table[i: i + window_size]]
time = np.mean([p.time for p in table[i: i + window_size]])
duration = np.mean(durations)
var = abs(time - np.mean(
[p.time for p in table[i + 1: i + window_size - 1]]))
smooth.append(types.SmoothData(time=time, duration=duration, var=var))
return smooth
def calculate_degradation_area(table, smooth, etalon_stats, etalon_threshold):
table = [p for p in table if not p.error] # rm errors
if len(table) <= WINDOW_SIZE:
return []
mean_times = [p.time for p in smooth]
mean_durations = [p.duration for p in smooth]
mean_vars = [p.var for p in smooth]
clusters = find_clusters(
mean_durations,
filter_fn=lambda y: 0 if abs(y) < etalon_threshold else 1)
# calculate cluster duration
degradation_cluster_stats = []
for cluster in clusters:
start_idx = int(cluster.inf)
end_idx = int(cluster.sup)
start_time = mean_times[start_idx]
end_time = mean_times[end_idx]
duration = end_time - start_time
var = np.mean(mean_vars[start_idx: end_idx])
# point durations
point_durations = []
for p in table:
if start_time < p.time < end_time:
point_durations.append(p.duration)
# calculate difference between means
# http://onlinestatbook.com/2/tests_of_means/difference_means.html
anomaly_mean = np.mean(point_durations)
anomaly_var = np.var(point_durations)
se = math.sqrt(anomaly_var / len(point_durations) +
etalon_stats.var / etalon_stats.count)
dof = etalon_stats.count + len(point_durations) - 2
mean_diff = anomaly_mean - etalon_stats.mean
conf_interval = stats.t.interval(0.95, dof, loc=mean_diff, scale=se)
degradation = types.MeanVar(
mean_diff, np.mean([mean_diff - conf_interval[0],
conf_interval[1] - mean_diff]))
degradation_ratio = types.MeanVar(
anomaly_mean / etalon_stats.mean,
np.mean([(mean_diff - conf_interval[0]) / etalon_stats.mean,
(conf_interval[1] - mean_diff) / etalon_stats.mean]))
logging.debug('Mean diff: %s' % mean_diff)
logging.debug('Conf int: %s' % str(conf_interval))
degradation_cluster_stats.append(types.DegradationClusterStats(
start=start_time, end=end_time,
duration=types.MeanVar(duration, var),
degradation=degradation, degradation_ratio=degradation_ratio,
count=len(point_durations)
))
return degradation_cluster_stats
def process_one_run(rally_data):
"""Process single Rally run (raw output for single task iteration)
This function calculates statistics for a single run, including
baseline stats (etalon), error stats, anomalies and areas with degraded
performance.
:param rally_data: raw Rally data
:return: RunResult
"""
data, hook_index = convert_rally_data(rally_data)
etalon = [p.duration for p in data[WARM_UP_CUTOFF:hook_index]]
etalon_stats = calculate_array_stats(etalon)
etalon_threshold = abs(etalon_stats.mean +
DEGRADATION_THRESHOLD * etalon_stats.std)
etalon_interval = interval([data[WARM_UP_CUTOFF].time,
data[hook_index].time])[0]
logging.debug('Hook index: %s' % hook_index)
logging.debug('Etalon stats: %s' % str(etalon_stats))
# Calculate stats
error_area = calculate_error_area(data)
anomaly_area = calculate_anomaly_area(data)
smooth_data = calculate_smooth_data(data, window_size=WINDOW_SIZE)
degradation_area = calculate_degradation_area(
data, smooth_data, etalon_stats, etalon_threshold)
# logging.debug stats
logging.debug('Error area: %s' % error_area)
logging.debug('Anomaly area: %s' % anomaly_area)
logging.debug('Degradation area: %s' % degradation_area)
return types.RunResult(
data=data,
error_area=error_area,
anomaly_area=anomaly_area,
degradation_area=degradation_area,
etalon_stats=etalon_stats,
etalon_interval=etalon_interval,
etalon_threshold=etalon_threshold,
smooth_data=smooth_data,
)
def process_all_runs(runs):
"""Process all runs from Rally raw data report
This function returns summary stats for all runs, including downtime
duration, MTTR, performance degradation.
:param runs: collection of Rally runs
:return: SummaryResult
"""
run_results = []
downtime_statistic = []
downtime_var = []
ttr_statistic = []
ttr_var = []
degradation_statistic = []
degradation_var = []
degradation_ratio_statistic = []
degradation_ratio_var = []
for i, one_run in enumerate(runs):
run_result = process_one_run(one_run)
run_results.append(run_result)
ds = 0
for index, stat in enumerate(run_result.error_area):
ds += stat.duration.statistic
downtime_var.append(stat.duration.var)
if run_result.error_area:
downtime_statistic.append(ds)
ts = ss = sr = 0
for index, stat in enumerate(run_result.degradation_area):
ts += stat.duration.statistic
ttr_var.append(stat.duration.var)
ss += stat.degradation.statistic
degradation_var.append(stat.degradation.var)
sr += stat.degradation_ratio.statistic
degradation_ratio_var.append(stat.degradation_ratio.var)
if run_result.degradation_area:
ttr_statistic.append(ts)
degradation_statistic.append(ss)
degradation_ratio_statistic.append(sr)
downtime = None
if downtime_statistic:
downtime_mean = np.mean(downtime_statistic)
se = math.sqrt((sum(downtime_var) +
np.var(downtime_statistic)) / len(downtime_statistic))
downtime = types.MeanVar(downtime_mean, se)
mttr = None
if ttr_statistic:
ttr_mean = np.mean(ttr_statistic)
se = math.sqrt((sum(ttr_var) +
np.var(ttr_statistic)) / len(ttr_statistic))
mttr = types.MeanVar(ttr_mean, se)
degradation = None
degradation_ratio = None
if degradation_statistic:
degradation = types.MeanVar(np.mean(degradation_statistic),
np.mean(degradation_var))
degradation_ratio = types.MeanVar(np.mean(degradation_ratio_statistic),
np.mean(degradation_ratio_var))
return types.SummaryResult(run_results=run_results, mttr=mttr,
degradation=degradation,
degradation_ratio=degradation_ratio,
downtime=downtime)