979 lines
36 KiB
Python
979 lines
36 KiB
Python
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
|
|
# (C) Copyright 2017 SUSE LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import binascii
|
|
from collections import namedtuple
|
|
from datetime import datetime
|
|
from datetime import timedelta
|
|
import itertools
|
|
import urllib
|
|
|
|
from cassandra.cluster import Cluster
|
|
from cassandra.query import FETCH_SIZE_UNSET
|
|
from cassandra.query import SimpleStatement
|
|
|
|
from oslo_config import cfg
|
|
from oslo_log import log
|
|
from oslo_utils import timeutils
|
|
|
|
from monasca_api.common.repositories import exceptions
|
|
from monasca_api.common.repositories import metrics_repository
|
|
from monasca_common.rest import utils as rest_utils
|
|
|
|
CONF = cfg.CONF
|
|
LOG = log.getLogger(__name__)
|
|
|
|
LIMIT_CLAUSE = 'limit %s'
|
|
ALLOW_FILTERING = 'allow filtering'
|
|
|
|
MEASUREMENT_LIST_CQL = ('select time_stamp, value, value_meta '
|
|
'from measurements where %s %s %s %s')
|
|
METRIC_ID_EQ = 'metric_id = %s'
|
|
METRIC_ID_IN = 'metric_id in %s'
|
|
OFFSET_TIME_GT = "and time_stamp > %s"
|
|
START_TIME_GE = "and time_stamp >= %s"
|
|
END_TIME_LE = "and time_stamp <= %s"
|
|
|
|
METRIC_LIST_CQL = ('select metric_name, dimensions, metric_id '
|
|
'from metrics where %s %s %s %s %s %s %s %s %s %s')
|
|
REGION_EQ = 'region = %s'
|
|
TENANT_EQ = 'and tenant_id = %s'
|
|
METRIC_NAME_EQ = 'and metric_name = %s'
|
|
DIMENSIONS_CONTAINS = 'and dimensions contains %s '
|
|
DIMENSIONS_NAME_CONTAINS = 'and dimension_names contains %s '
|
|
CREATED_TIME_LE = "and created_at <= %s"
|
|
UPDATED_TIME_GE = "and updated_at >= %s"
|
|
DIMENSIONS_GT = 'and dimensions > %s'
|
|
|
|
DIMENSION_VALUE_BY_METRIC_CQL = ('select dimension_value as value from metrics_dimensions '
|
|
'where region = ? and tenant_id = ? and metric_name = ? '
|
|
'and dimension_name = ? group by dimension_value')
|
|
|
|
DIMENSION_VALUE_CQL = ('select value from dimensions '
|
|
'where region = ? and tenant_id = ? and name = ? '
|
|
'group by value order by value')
|
|
|
|
DIMENSION_NAME_BY_METRIC_CQL = ('select dimension_name as name from metrics_dimensions where '
|
|
'region = ? and tenant_id = ? and metric_name = ? '
|
|
'group by dimension_name order by dimension_name')
|
|
|
|
DIMENSION_NAME_CQL = ('select name from dimensions where region = ? and tenant_id = ? '
|
|
'group by name allow filtering')
|
|
|
|
METRIC_NAME_BY_DIMENSION_CQL = ('select metric_name from dimensions_metrics where region = ? and '
|
|
'tenant_id = ? and dimension_name = ? and dimension_value = ? '
|
|
'group by metric_name order by metric_name')
|
|
|
|
METRIC_NAME_BY_DIMENSION_OFFSET_CQL = ('select metric_name from dimensions_metrics where region = ? and '
|
|
'tenant_id = ? and dimension_name = ? and dimension_value = ? and '
|
|
'metric_name >= ?'
|
|
'group by metric_name order by metric_name')
|
|
|
|
METRIC_NAME_CQL = ('select distinct region, tenant_id, metric_name from metrics_dimensions '
|
|
'where region = ? and tenant_id = ? allow filtering')
|
|
|
|
METRIC_NAME_OFFSET_CQL = ('select distinct region, tenant_id, metric_name from metrics_dimensions '
|
|
'where region = ? and tenant_id = ? and metric_name >= ? allow filtering')
|
|
|
|
METRIC_BY_ID_CQL = ('select region, tenant_id, metric_name, dimensions from measurements '
|
|
'where metric_id = ? limit 1')
|
|
|
|
Metric = namedtuple('metric', 'id name dimensions')
|
|
|
|
ALARM_HISTORY_CQL = ('select tenant_id, alarm_id, time_stamp, metric, new_state, old_state, reason, reason_data, '
|
|
'sub_alarms from alarm_state_history where %s %s %s %s %s')
|
|
|
|
ALARM_ID_EQ = 'and alarm_id = %s'
|
|
|
|
ALARM_ID_IN = 'and alarm_id in %s'
|
|
|
|
ALARM_TENANT_ID_EQ = 'tenant_id = %s'
|
|
|
|
|
|
class MetricsRepository(metrics_repository.AbstractMetricsRepository):
|
|
def __init__(self):
|
|
|
|
try:
|
|
self.conf = cfg.CONF
|
|
self.cluster = Cluster(self.conf.cassandra.contact_points)
|
|
self.session = self.cluster.connect(self.conf.cassandra.keyspace)
|
|
|
|
self.dim_val_by_metric_stmt = self.session.prepare(DIMENSION_VALUE_BY_METRIC_CQL)
|
|
|
|
self.dim_val_stmt = self.session.prepare(DIMENSION_VALUE_CQL)
|
|
|
|
self.dim_name_by_metric_stmt = self.session.prepare(DIMENSION_NAME_BY_METRIC_CQL)
|
|
|
|
self.dim_name_stmt = self.session.prepare(DIMENSION_NAME_CQL)
|
|
|
|
self.metric_name_by_dimension_stmt = self.session.prepare(METRIC_NAME_BY_DIMENSION_CQL)
|
|
|
|
self.metric_name_by_dimension_offset_stmt = self.session.prepare(METRIC_NAME_BY_DIMENSION_OFFSET_CQL)
|
|
|
|
self.metric_name_stmt = self.session.prepare(METRIC_NAME_CQL)
|
|
|
|
self.metric_name_offset_stmt = self.session.prepare(METRIC_NAME_OFFSET_CQL)
|
|
|
|
self.metric_by_id_stmt = self.session.prepare(METRIC_BY_ID_CQL)
|
|
|
|
except Exception as ex:
|
|
LOG.exception(ex)
|
|
raise exceptions.RepositoryException(ex)
|
|
|
|
self.epoch = datetime.utcfromtimestamp(0)
|
|
|
|
def list_dimension_values(self, tenant_id, region, metric_name,
|
|
dimension_name):
|
|
|
|
try:
|
|
if metric_name:
|
|
rows = self.session.execute(
|
|
self.dim_val_by_metric_stmt,
|
|
[region, tenant_id, metric_name, dimension_name])
|
|
else:
|
|
rows = self.session.execute(
|
|
self.dim_val_stmt,
|
|
[region, tenant_id, dimension_name])
|
|
|
|
except Exception as ex:
|
|
LOG.exception(ex)
|
|
raise exceptions.RepositoryException(ex)
|
|
|
|
json_dim_value_list = []
|
|
|
|
if not rows:
|
|
return json_dim_value_list
|
|
|
|
for row in rows:
|
|
json_dim_value_list.append({u'dimension_value': row.value})
|
|
|
|
json_dim_value_list.sort(key=lambda x: x[u'dimension_value'])
|
|
|
|
return json_dim_value_list
|
|
|
|
def list_dimension_names(self, tenant_id, region, metric_name):
|
|
|
|
try:
|
|
if metric_name:
|
|
rows = self.session.execute(
|
|
self.dim_name_by_metric_stmt,
|
|
[region, tenant_id, metric_name])
|
|
ordered = True
|
|
else:
|
|
rows = self.session.execute(
|
|
self.dim_name_stmt,
|
|
[region, tenant_id])
|
|
ordered = False
|
|
|
|
except Exception as ex:
|
|
LOG.exception(ex)
|
|
raise exceptions.RepositoryException(ex)
|
|
|
|
if not rows:
|
|
return []
|
|
|
|
json_dim_name_list = [{u'dimension_name': row.name} for row in rows]
|
|
|
|
if not ordered:
|
|
json_dim_name_list.sort(key=lambda x: x[u'dimension_name'])
|
|
|
|
return json_dim_name_list
|
|
|
|
def list_metrics(self, tenant_id, region, name, dimensions, offset, limit, start_time=None,
|
|
end_time=None):
|
|
|
|
offset_name = None
|
|
offset_dimensions = []
|
|
names = []
|
|
metric_list = []
|
|
offset_futures = []
|
|
non_offset_futures = []
|
|
|
|
try:
|
|
if offset:
|
|
offset_metric = self._get_metric_by_id(offset)
|
|
if offset_metric:
|
|
offset_name = offset_metric.name
|
|
offset_dimensions = offset_metric.dimensions
|
|
|
|
if not name:
|
|
names = self._list_metric_names(tenant_id, region, dimensions, offset=offset_name)
|
|
if names:
|
|
names = [elem['name'] for elem in names]
|
|
else:
|
|
names.append(name)
|
|
|
|
if not names:
|
|
return metric_list
|
|
|
|
for name in names:
|
|
if name == offset_name:
|
|
futures = self._list_metrics_by_name(tenant_id, region, name, dimensions, offset_dimensions,
|
|
limit, start_time=None, end_time=None)
|
|
if offset_dimensions and dimensions:
|
|
offset_futures.extend(futures)
|
|
else:
|
|
non_offset_futures.extend(futures)
|
|
else:
|
|
non_offset_futures.extend(
|
|
self._list_metrics_by_name(tenant_id, region, name, dimensions, None, limit,
|
|
start_time=None, end_time=None))
|
|
|
|
# manually filter out metrics by the offset dimension
|
|
for future in offset_futures:
|
|
rows = future.result()
|
|
for row in rows:
|
|
if offset_dimensions >= row.dimensions:
|
|
continue
|
|
|
|
metric_list.append(self._process_metric_row(row))
|
|
|
|
for future in non_offset_futures:
|
|
metric_list.extend((self._process_metric_row(row) for row in future.result()))
|
|
|
|
return metric_list
|
|
|
|
except Exception as ex:
|
|
LOG.exception(ex)
|
|
raise exceptions.RepositoryException(ex)
|
|
|
|
@staticmethod
|
|
def _process_metric_row(row):
|
|
dim_map = {}
|
|
for d in row.dimensions:
|
|
pair = d.split('\t')
|
|
dim_map[pair[0]] = pair[1]
|
|
|
|
metric = {'id': binascii.hexlify(bytearray(row.metric_id)),
|
|
'name': row.metric_name,
|
|
'dimensions': dim_map}
|
|
|
|
return metric
|
|
|
|
def _list_metrics_by_name(self, tenant_id, region, name, dimensions, dimension_offset, limit, start_time=None,
|
|
end_time=None):
|
|
|
|
or_dimensions = []
|
|
sub_dimensions = {}
|
|
futures = []
|
|
|
|
if not dimensions:
|
|
query = self._build_metrics_by_name_query(tenant_id, region, name, dimensions, None, start_time,
|
|
end_time, dimension_offset, limit)
|
|
futures.append(self.session.execute_async(query[0], query[1]))
|
|
return futures
|
|
|
|
wildcard_dimensions = []
|
|
for dim_name, dim_value in dimensions.items():
|
|
if not dim_value:
|
|
wildcard_dimensions.append(dim_name)
|
|
|
|
elif '|' in dim_value:
|
|
|
|
def f(val):
|
|
return {dim_name: val}
|
|
|
|
or_dimensions.append(list(map(f, sorted(dim_value.split('|')))))
|
|
|
|
else:
|
|
sub_dimensions[dim_name] = dim_value
|
|
|
|
if or_dimensions:
|
|
or_dims_list = list(itertools.product(*or_dimensions))
|
|
|
|
for or_dims_tuple in or_dims_list:
|
|
extracted_dimensions = sub_dimensions.copy()
|
|
|
|
for dims in iter(or_dims_tuple):
|
|
for k, v in dims.items():
|
|
extracted_dimensions[k] = v
|
|
|
|
query = self._build_metrics_by_name_query(tenant_id, region, name, extracted_dimensions,
|
|
wildcard_dimensions, start_time,
|
|
end_time, dimension_offset, limit)
|
|
|
|
futures.append(self.session.execute_async(query[0], query[1]))
|
|
|
|
else:
|
|
query = self._build_metrics_by_name_query(tenant_id, region, name, sub_dimensions, wildcard_dimensions,
|
|
start_time,
|
|
end_time, dimension_offset, limit)
|
|
futures.append(self.session.execute_async(query[0], query[1]))
|
|
|
|
return futures
|
|
|
|
def _get_metric_by_id(self, metric_id):
|
|
|
|
rows = self.session.execute(self.metric_by_id_stmt, [bytearray.fromhex(metric_id)])
|
|
|
|
if rows:
|
|
return Metric(id=metric_id, name=rows[0].metric_name, dimensions=rows[0].dimensions)
|
|
|
|
return None
|
|
|
|
def _build_metrics_by_name_query(self, tenant_id, region, name, dimensions, wildcard_dimensions, start_time,
|
|
end_time, dim_offset,
|
|
limit):
|
|
|
|
conditions = [REGION_EQ, TENANT_EQ]
|
|
params = [region, tenant_id.encode('utf8')]
|
|
|
|
if name:
|
|
conditions.append(METRIC_NAME_EQ)
|
|
params.append(name)
|
|
else:
|
|
conditions.append('')
|
|
|
|
if dimensions:
|
|
conditions.append(DIMENSIONS_CONTAINS * len(dimensions))
|
|
params.extend(
|
|
[self._create_dimension_value_entry(dim_name, dim_value)
|
|
for dim_name, dim_value in dimensions.items()])
|
|
else:
|
|
conditions.append('')
|
|
|
|
if wildcard_dimensions:
|
|
conditions.append(DIMENSIONS_NAME_CONTAINS * len(wildcard_dimensions))
|
|
params.extend(wildcard_dimensions)
|
|
else:
|
|
conditions.append('')
|
|
|
|
if dim_offset and not dimensions:
|
|
# cassandra does not allow using both contains and GT in collection column
|
|
conditions.append(DIMENSIONS_GT)
|
|
params.append(dim_offset)
|
|
else:
|
|
conditions.append('')
|
|
|
|
if start_time:
|
|
conditions.append(UPDATED_TIME_GE % start_time)
|
|
else:
|
|
conditions.append('')
|
|
|
|
if end_time:
|
|
conditions.append(CREATED_TIME_LE % end_time)
|
|
else:
|
|
conditions.append('')
|
|
|
|
if limit:
|
|
conditions.append(LIMIT_CLAUSE)
|
|
params.append(limit)
|
|
else:
|
|
conditions.append('')
|
|
|
|
if (not name) or dimensions or wildcard_dimensions or start_time or end_time:
|
|
conditions.append(ALLOW_FILTERING)
|
|
else:
|
|
conditions.append('')
|
|
|
|
return METRIC_LIST_CQL % tuple(conditions), params
|
|
|
|
@staticmethod
|
|
def _create_dimension_value_entry(name, value):
|
|
return '%s\t%s' % (name, value)
|
|
|
|
def list_metric_names(self, tenant_id, region, dimensions):
|
|
return self._list_metric_names(tenant_id, region, dimensions)
|
|
|
|
def _list_metric_names(self, tenant_id, region, dimensions, offset=None):
|
|
|
|
or_dimensions = []
|
|
single_dimensions = {}
|
|
|
|
if dimensions:
|
|
for key, value in dimensions.items():
|
|
if not value:
|
|
continue
|
|
|
|
elif '|' in value:
|
|
def f(val):
|
|
return {key: val}
|
|
|
|
or_dimensions.append(list(map(f, sorted(value.split('|')))))
|
|
|
|
else:
|
|
single_dimensions[key] = value
|
|
|
|
if or_dimensions:
|
|
|
|
names = []
|
|
or_dims_list = list(itertools.product(*or_dimensions))
|
|
|
|
for or_dims_tuple in or_dims_list:
|
|
extracted_dimensions = single_dimensions.copy()
|
|
|
|
for dims in iter(or_dims_tuple):
|
|
for k, v in dims.items():
|
|
extracted_dimensions[k] = v
|
|
|
|
names.extend(
|
|
self._list_metric_names_single_dimension_value(tenant_id, region, extracted_dimensions, offset))
|
|
|
|
names.sort(key=lambda x: x[u'name'])
|
|
return names
|
|
|
|
else:
|
|
names = self._list_metric_names_single_dimension_value(tenant_id, region, single_dimensions, offset)
|
|
names.sort(key=lambda x: x[u'name'])
|
|
return names
|
|
|
|
def _list_metric_names_single_dimension_value(self, tenant_id, region, dimensions, offset=None):
|
|
|
|
try:
|
|
futures = []
|
|
if dimensions:
|
|
for name, value in dimensions.items():
|
|
if offset:
|
|
futures.append(self.session.execute_async(self.metric_name_by_dimension_offset_stmt,
|
|
[region, tenant_id, name, value, offset]))
|
|
else:
|
|
futures.append(self.session.execute_async(self.metric_name_by_dimension_stmt,
|
|
[region, tenant_id, name, value]))
|
|
|
|
else:
|
|
if offset:
|
|
futures.append(
|
|
self.session.execute_async(self.metric_name_offset_stmt, [region, tenant_id, offset]))
|
|
else:
|
|
futures.append(self.session.execute_async(self.metric_name_stmt, [region, tenant_id]))
|
|
|
|
names_list = []
|
|
|
|
for future in futures:
|
|
rows = future.result()
|
|
tmp = set()
|
|
for row in rows:
|
|
tmp.add(row.metric_name)
|
|
|
|
names_list.append(tmp)
|
|
|
|
return [{u'name': v} for v in set.intersection(*names_list)]
|
|
|
|
except Exception as ex:
|
|
LOG.exception(ex)
|
|
raise exceptions.RepositoryException(ex)
|
|
|
|
def measurement_list(self, tenant_id, region, name, dimensions,
|
|
start_timestamp, end_timestamp, offset, limit,
|
|
merge_metrics_flag, group_by):
|
|
|
|
metrics = self.list_metrics(tenant_id, region, name, dimensions, None, None)
|
|
|
|
if offset:
|
|
tmp = offset.split("_")
|
|
if len(tmp) > 1:
|
|
offset_id = tmp[0]
|
|
offset_timestamp = tmp[1]
|
|
else:
|
|
offset_id = None
|
|
offset_timestamp = offset
|
|
else:
|
|
offset_timestamp = None
|
|
offset_id = None
|
|
|
|
if not metrics:
|
|
return None
|
|
elif len(metrics) > 1:
|
|
if not merge_metrics_flag and not group_by:
|
|
raise exceptions.MultipleMetricsException(self.MULTIPLE_METRICS_MESSAGE)
|
|
|
|
try:
|
|
if len(metrics) > 1 and not group_by:
|
|
# offset is controlled only by offset_timestamp when the group by option is not enabled
|
|
count, series_list = self._query_merge_measurements(metrics,
|
|
dimensions,
|
|
start_timestamp,
|
|
end_timestamp,
|
|
offset_timestamp,
|
|
limit)
|
|
return series_list
|
|
|
|
if group_by:
|
|
if not isinstance(group_by, list):
|
|
group_by = group_by.split(',')
|
|
elif len(group_by) == 1:
|
|
group_by = group_by[0].split(',')
|
|
|
|
if len(metrics) == 1 or group_by[0].startswith('*'):
|
|
if offset_id:
|
|
for index, metric in enumerate(metrics):
|
|
if metric['id'] == offset_id:
|
|
if index > 0:
|
|
metrics[0:index] = []
|
|
break
|
|
|
|
count, series_list = self._query_measurements(metrics,
|
|
start_timestamp,
|
|
end_timestamp,
|
|
offset_timestamp,
|
|
limit)
|
|
|
|
return series_list
|
|
|
|
grouped_metrics = self._group_metrics(metrics, group_by, dimensions)
|
|
|
|
if not grouped_metrics or len(grouped_metrics) == 0:
|
|
return None
|
|
|
|
if offset_id:
|
|
found_offset = False
|
|
for outer_index, sublist in enumerate(grouped_metrics):
|
|
for inner_index, metric in enumerate(sublist):
|
|
if metric['id'] == offset_id:
|
|
found_offset = True
|
|
if inner_index > 0:
|
|
sublist[0:inner_index] = []
|
|
break
|
|
if found_offset:
|
|
if outer_index > 0:
|
|
grouped_metrics[0:outer_index] = []
|
|
break
|
|
|
|
remaining = limit
|
|
series_list = []
|
|
for sublist in grouped_metrics:
|
|
sub_count, results = self._query_merge_measurements(sublist,
|
|
sublist[0]['dimensions'],
|
|
start_timestamp,
|
|
end_timestamp,
|
|
offset_timestamp,
|
|
remaining)
|
|
|
|
series_list.extend(results)
|
|
|
|
if remaining:
|
|
remaining -= sub_count
|
|
if remaining <= 0:
|
|
break
|
|
|
|
# offset_timestamp is used only in the first group, reset to None for subsequent groups
|
|
if offset_timestamp:
|
|
offset_timestamp = None
|
|
|
|
return series_list
|
|
|
|
except Exception as ex:
|
|
LOG.exception(ex)
|
|
raise exceptions.RepositoryException(ex)
|
|
|
|
def _query_merge_measurements(self, metrics, dimensions, start_timestamp, end_timestamp,
|
|
offset_timestamp, limit):
|
|
results = []
|
|
for metric in metrics:
|
|
if limit and len(metrics) > 1:
|
|
fetch_size = min(limit, max(1000, limit / len(metrics) + 2))
|
|
else:
|
|
fetch_size = None
|
|
query = self._build_measurement_query(metric['id'],
|
|
start_timestamp,
|
|
end_timestamp,
|
|
offset_timestamp,
|
|
limit,
|
|
fetch_size)
|
|
results.append((metric, iter(self.session.execute_async(query[0], query[1]).result())))
|
|
|
|
return self._merge_series(results, dimensions, limit)
|
|
|
|
def _query_measurements(self, metrics, start_timestamp, end_timestamp,
|
|
offset_timestamp, limit):
|
|
results = []
|
|
for index, metric in enumerate(metrics):
|
|
if index == 0:
|
|
query = self._build_measurement_query(metric['id'],
|
|
start_timestamp,
|
|
end_timestamp,
|
|
offset_timestamp,
|
|
limit)
|
|
else:
|
|
if limit:
|
|
fetch_size = min(self.session.default_fetch_size,
|
|
max(1000, limit / min(index, 4)))
|
|
else:
|
|
fetch_size = self.session.default_fetch_size
|
|
query = self._build_measurement_query(metric['id'],
|
|
start_timestamp,
|
|
end_timestamp,
|
|
None,
|
|
limit,
|
|
fetch_size)
|
|
|
|
results.append([metric,
|
|
iter(self.session.execute_async(query[0], query[1]).result())])
|
|
|
|
series_list = []
|
|
count = 0
|
|
for result in results:
|
|
measurements = []
|
|
row = next(result[1], None)
|
|
while row:
|
|
measurements.append([self._isotime_msec(row.time_stamp),
|
|
row.value,
|
|
rest_utils.from_json(row.value_meta) if row.value_meta else {}])
|
|
count += 1
|
|
if limit and count >= limit:
|
|
break
|
|
|
|
row = next(result[1], None)
|
|
|
|
series_list.append({'name': result[0]['name'],
|
|
'id': result[0]['id'],
|
|
'columns': ['timestamp', 'value', 'value_meta'],
|
|
'measurements': measurements,
|
|
'dimensions': result[0]['dimensions']})
|
|
if limit and count >= limit:
|
|
break
|
|
|
|
return count, series_list
|
|
|
|
@staticmethod
|
|
def _build_measurement_query(metric_id, start_timestamp,
|
|
end_timestamp, offset_timestamp,
|
|
limit=None, fetch_size=FETCH_SIZE_UNSET):
|
|
conditions = [METRIC_ID_EQ]
|
|
params = [bytearray.fromhex(metric_id)]
|
|
|
|
if offset_timestamp:
|
|
conditions.append(OFFSET_TIME_GT)
|
|
params.append(offset_timestamp)
|
|
elif start_timestamp:
|
|
conditions.append(START_TIME_GE)
|
|
params.append(int(start_timestamp * 1000))
|
|
else:
|
|
conditions.append('')
|
|
|
|
if end_timestamp:
|
|
conditions.append(END_TIME_LE)
|
|
params.append(int(end_timestamp * 1000))
|
|
else:
|
|
conditions.append('')
|
|
|
|
if limit:
|
|
conditions.append(LIMIT_CLAUSE)
|
|
params.append(limit)
|
|
else:
|
|
conditions.append('')
|
|
|
|
return SimpleStatement(MEASUREMENT_LIST_CQL % tuple(conditions), fetch_size=fetch_size), params
|
|
|
|
def _merge_series(self, series, dimensions, limit):
|
|
series_list = []
|
|
|
|
if not series:
|
|
return series_list
|
|
|
|
measurements = []
|
|
top_batch = []
|
|
num_series = len(series)
|
|
for i in range(0, num_series):
|
|
row = next(series[i][1], None)
|
|
if row:
|
|
top_batch.append([i,
|
|
row.time_stamp,
|
|
row.value,
|
|
rest_utils.from_json(row.value_meta) if row.value_meta else {}])
|
|
else:
|
|
num_series -= 1
|
|
|
|
top_batch.sort(key=lambda m: m[1], reverse=True)
|
|
|
|
count = 0
|
|
while (not limit or count < limit) and top_batch:
|
|
measurements.append([self._isotime_msec(top_batch[num_series - 1][1]),
|
|
top_batch[num_series - 1][2],
|
|
top_batch[num_series - 1][3]])
|
|
count += 1
|
|
row = next(series[top_batch[num_series - 1][0]][1], None)
|
|
if row:
|
|
top_batch[num_series - 1] = [top_batch[num_series - 1][0],
|
|
row.time_stamp,
|
|
row.value,
|
|
rest_utils.from_json(row.value_meta) if row.value_meta else {}]
|
|
|
|
top_batch.sort(key=lambda m: m[1], reverse=True)
|
|
else:
|
|
num_series -= 1
|
|
top_batch.pop()
|
|
|
|
series_list.append({'name': series[0][0]['name'],
|
|
'id': series[0][0]['id'],
|
|
'columns': ['timestamp', 'value', 'value_meta'],
|
|
'measurements': measurements,
|
|
'dimensions': dimensions})
|
|
|
|
return count, series_list
|
|
|
|
@staticmethod
|
|
def _group_metrics(metrics, group_by, search_by):
|
|
|
|
grouped_metrics = {}
|
|
for metric in metrics:
|
|
key = ''
|
|
display_dimensions = dict(search_by.items())
|
|
for name in group_by:
|
|
# '_' ensures te key with missing dimension is sorted lower
|
|
value = metric['dimensions'].get(name, '_')
|
|
if value != '_':
|
|
display_dimensions[name] = value
|
|
key = key + '='.join((urllib.quote_plus(name), urllib.quote_plus(value))) + '&'
|
|
|
|
metric['dimensions'] = display_dimensions
|
|
|
|
if key in grouped_metrics:
|
|
grouped_metrics[key].append(metric)
|
|
else:
|
|
grouped_metrics[key] = [metric]
|
|
|
|
grouped_metrics = grouped_metrics.items()
|
|
grouped_metrics.sort(key=lambda k: k[0])
|
|
return [x[1] for x in grouped_metrics]
|
|
|
|
@staticmethod
|
|
def _isotime_msec(timestamp):
|
|
"""Stringify datetime in ISO 8601 format + millisecond.
|
|
"""
|
|
st = timestamp.isoformat()
|
|
if '.' in st:
|
|
st = st[:23] + 'Z'
|
|
else:
|
|
st += '.000Z'
|
|
return st.decode('utf8')
|
|
|
|
def metrics_statistics(self, tenant_id, region, name, dimensions,
|
|
start_timestamp, end_timestamp, statistics,
|
|
period, offset, limit, merge_metrics_flag,
|
|
group_by):
|
|
|
|
if not period:
|
|
period = 300
|
|
else:
|
|
period = int(period)
|
|
|
|
series_list = self.measurement_list(tenant_id, region, name, dimensions,
|
|
start_timestamp, end_timestamp,
|
|
offset, None, merge_metrics_flag, group_by)
|
|
|
|
json_statistics_list = []
|
|
|
|
if not series_list:
|
|
return json_statistics_list
|
|
|
|
statistics = [stat.lower() for stat in statistics]
|
|
|
|
columns = [u'timestamp']
|
|
|
|
columns.extend([x for x in ['avg', 'min', 'max', 'count', 'sum'] if x in statistics])
|
|
|
|
start_time = datetime.utcfromtimestamp(start_timestamp)
|
|
if end_timestamp:
|
|
end_time = datetime.utcfromtimestamp(end_timestamp)
|
|
else:
|
|
end_time = datetime.utcnow()
|
|
|
|
for series in series_list:
|
|
|
|
if limit <= 0:
|
|
break
|
|
|
|
measurements = series['measurements']
|
|
|
|
if not measurements:
|
|
continue
|
|
|
|
first_measure = measurements[0]
|
|
first_measure_start_time = MetricsRepository._parse_time_string(first_measure[0])
|
|
|
|
# skip blank intervals at the beginning, finds the start time of stat period that is not empty
|
|
stat_start_time = start_time + timedelta(
|
|
seconds=((first_measure_start_time - start_time).seconds / period) * period)
|
|
|
|
stats_list = []
|
|
stats_count = 0
|
|
stats_sum = 0
|
|
stats_min = stats_max = first_measure[1]
|
|
|
|
for measurement in series['measurements']:
|
|
|
|
time_stamp = MetricsRepository._parse_time_string(measurement[0])
|
|
value = measurement[1]
|
|
|
|
if (time_stamp - stat_start_time).seconds >= period:
|
|
|
|
stat = MetricsRepository._create_stat(statistics, stat_start_time, stats_count,
|
|
stats_sum, stats_min, stats_max)
|
|
|
|
stats_list.append(stat)
|
|
limit -= 1
|
|
if limit <= 0:
|
|
break
|
|
|
|
# initialize the new stat period
|
|
stats_sum = value
|
|
stats_count = 1
|
|
stats_min = value
|
|
stats_max = value
|
|
stat_start_time += timedelta(seconds=period)
|
|
|
|
else:
|
|
stats_min = min(stats_min, value)
|
|
stats_max = max(stats_max, value)
|
|
stats_count += 1
|
|
stats_sum += value
|
|
|
|
if stats_count:
|
|
stat = MetricsRepository._create_stat(statistics, stat_start_time, stats_count, stats_sum,
|
|
stats_min, stats_max)
|
|
stats_list.append(stat)
|
|
limit -= 1
|
|
|
|
stats_end_time = stat_start_time + timedelta(seconds=period) - timedelta(milliseconds=1)
|
|
if stats_end_time > end_time:
|
|
stats_end_time = end_time
|
|
|
|
statistic = {u'name': name.decode('utf8'),
|
|
u'id': series['id'],
|
|
u'dimensions': series['dimensions'],
|
|
u'columns': columns,
|
|
u'statistics': stats_list,
|
|
u'end_time': self._isotime_msec(stats_end_time)}
|
|
|
|
json_statistics_list.append(statistic)
|
|
|
|
return json_statistics_list
|
|
|
|
@staticmethod
|
|
def _create_stat(statistics, timestamp, stat_count=None, stat_sum=None, stat_min=None, stat_max=None):
|
|
|
|
stat = [MetricsRepository._isotime_msec(timestamp)]
|
|
|
|
if not stat_count:
|
|
stat.extend([0] * len(statistics))
|
|
|
|
else:
|
|
if 'avg' in statistics:
|
|
stat.append(stat_sum / stat_count)
|
|
|
|
if 'min' in statistics:
|
|
stat.append(stat_min)
|
|
|
|
if 'max' in statistics:
|
|
stat.append(stat_max)
|
|
|
|
if 'count' in statistics:
|
|
stat.append(stat_count)
|
|
|
|
if 'sum' in statistics:
|
|
stat.append(stat_sum)
|
|
|
|
return stat
|
|
|
|
@staticmethod
|
|
def _parse_time_string(timestamp):
|
|
dt = timeutils.parse_isotime(timestamp)
|
|
dt = timeutils.normalize_time(dt)
|
|
return dt
|
|
|
|
def alarm_history(self, tenant_id, alarm_id_list,
|
|
offset, limit, start_timestamp=None,
|
|
end_timestamp=None):
|
|
|
|
try:
|
|
|
|
json_alarm_history_list = []
|
|
|
|
if not alarm_id_list:
|
|
return json_alarm_history_list
|
|
|
|
conditions = [ALARM_TENANT_ID_EQ]
|
|
params = [tenant_id.encode('utf8')]
|
|
if len(alarm_id_list) == 1:
|
|
conditions.append(ALARM_ID_EQ)
|
|
params.append(alarm_id_list[0])
|
|
else:
|
|
conditions.append(' and alarm_id in ({}) '.format(','.join(['%s'] * len(alarm_id_list))))
|
|
for alarm_id in alarm_id_list:
|
|
params.append(alarm_id)
|
|
|
|
if offset:
|
|
conditions.append(OFFSET_TIME_GT)
|
|
params.append(offset)
|
|
|
|
elif start_timestamp:
|
|
conditions.append(START_TIME_GE)
|
|
params.append(int(start_timestamp * 1000))
|
|
else:
|
|
conditions.append('')
|
|
|
|
if end_timestamp:
|
|
conditions.append(END_TIME_LE)
|
|
params.append(int(end_timestamp * 1000))
|
|
else:
|
|
conditions.append('')
|
|
|
|
if limit:
|
|
conditions.append(LIMIT_CLAUSE)
|
|
params.append(limit + 1)
|
|
else:
|
|
conditions.append('')
|
|
|
|
rows = self.session.execute(ALARM_HISTORY_CQL % tuple(conditions), params)
|
|
|
|
if not rows:
|
|
return json_alarm_history_list
|
|
|
|
sorted_rows = sorted(rows, key=lambda row: row.time_stamp)
|
|
|
|
for (tenant_id, alarm_id, time_stamp, metrics, new_state, old_state, reason,
|
|
reason_data, sub_alarms) in sorted_rows:
|
|
|
|
alarm = {u'timestamp': self._isotime_msec(time_stamp),
|
|
u'alarm_id': alarm_id,
|
|
u'metrics': rest_utils.from_json(metrics),
|
|
u'new_state': new_state,
|
|
u'old_state': old_state,
|
|
u'reason': reason,
|
|
u'reason_data': reason_data,
|
|
u'sub_alarms': rest_utils.from_json(sub_alarms),
|
|
u'id': str(int((time_stamp - self.epoch).total_seconds() * 1000))}
|
|
|
|
if alarm[u'sub_alarms']:
|
|
|
|
for sub_alarm in alarm[u'sub_alarms']:
|
|
sub_expr = sub_alarm['sub_alarm_expression']
|
|
metric_def = sub_expr['metric_definition']
|
|
sub_expr['metric_name'] = metric_def['name']
|
|
sub_expr['dimensions'] = metric_def['dimensions']
|
|
del sub_expr['metric_definition']
|
|
|
|
json_alarm_history_list.append(alarm)
|
|
|
|
return json_alarm_history_list
|
|
|
|
except Exception as ex:
|
|
LOG.exception(ex)
|
|
raise exceptions.RepositoryException(ex)
|
|
|
|
@staticmethod
|
|
def check_status():
|
|
try:
|
|
cluster = Cluster(
|
|
CONF.cassandra.contact_points
|
|
)
|
|
session = cluster.connect(CONF.cassandra.keyspace)
|
|
session.shutdown()
|
|
except Exception as ex:
|
|
LOG.exception(str(ex))
|
|
return False, str(ex)
|
|
return True, 'OK'
|