monasca-api/monasca_api/common/repositories/cassandra/metrics_repository.py

990 lines
37 KiB
Python

# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2017-2018 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import binascii
from builtins import str as text
from collections import namedtuple
from datetime import datetime
from datetime import timedelta
import itertools
import six
import urllib
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from cassandra.query import FETCH_SIZE_UNSET
from cassandra.query import SimpleStatement
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
from monasca_api.common.repositories import exceptions
from monasca_api.common.repositories import metrics_repository
from monasca_common.rest import utils as rest_utils
CONF = cfg.CONF
LOG = log.getLogger(__name__)
LIMIT_CLAUSE = 'limit %s'
ALLOW_FILTERING = 'allow filtering'
MEASUREMENT_LIST_CQL = ('select time_stamp, value, value_meta '
'from measurements where %s %s %s %s')
METRIC_ID_EQ = 'metric_id = %s'
METRIC_ID_IN = 'metric_id in %s'
OFFSET_TIME_GT = "and time_stamp > %s"
START_TIME_GE = "and time_stamp >= %s"
END_TIME_LE = "and time_stamp <= %s"
METRIC_LIST_CQL = ('select metric_name, dimensions, metric_id '
'from metrics where %s %s %s %s %s %s %s %s %s %s')
REGION_EQ = 'region = %s'
TENANT_EQ = 'and tenant_id = %s'
METRIC_NAME_EQ = 'and metric_name = %s'
DIMENSIONS_CONTAINS = 'and dimensions contains %s '
DIMENSIONS_NAME_CONTAINS = 'and dimension_names contains %s '
CREATED_TIME_LE = "and created_at <= %s"
UPDATED_TIME_GE = "and updated_at >= %s"
DIMENSIONS_GT = 'and dimensions > %s'
DIMENSION_VALUE_BY_METRIC_CQL = ('select dimension_value as value from metrics_dimensions '
'where region = ? and tenant_id = ? and metric_name = ? '
'and dimension_name = ? group by dimension_value')
DIMENSION_VALUE_CQL = ('select value from dimensions '
'where region = ? and tenant_id = ? and name = ? '
'group by value order by value')
DIMENSION_NAME_BY_METRIC_CQL = ('select dimension_name as name from metrics_dimensions where '
'region = ? and tenant_id = ? and metric_name = ? '
'group by dimension_name order by dimension_name')
DIMENSION_NAME_CQL = ('select name from dimensions where region = ? and tenant_id = ? '
'group by name allow filtering')
METRIC_NAME_BY_DIMENSION_CQL = ('select metric_name from dimensions_metrics where region = ? and '
'tenant_id = ? and dimension_name = ? and dimension_value = ? '
'group by metric_name order by metric_name')
METRIC_NAME_BY_DIMENSION_OFFSET_CQL = ('select metric_name from dimensions_metrics where region = ? and '
'tenant_id = ? and dimension_name = ? and dimension_value = ? and '
'metric_name >= ?'
'group by metric_name order by metric_name')
METRIC_NAME_CQL = ('select distinct region, tenant_id, metric_name from metrics_dimensions '
'where region = ? and tenant_id = ? allow filtering')
METRIC_NAME_OFFSET_CQL = ('select distinct region, tenant_id, metric_name from metrics_dimensions '
'where region = ? and tenant_id = ? and metric_name >= ? allow filtering')
METRIC_BY_ID_CQL = ('select region, tenant_id, metric_name, dimensions from measurements '
'where metric_id = ? limit 1')
Metric = namedtuple('metric', 'id name dimensions')
ALARM_HISTORY_CQL = ('select tenant_id, alarm_id, time_stamp, metric, new_state, old_state, reason, reason_data, '
'sub_alarms from alarm_state_history where %s %s %s %s %s')
ALARM_ID_EQ = 'and alarm_id = %s'
ALARM_ID_IN = 'and alarm_id in %s'
ALARM_TENANT_ID_EQ = 'tenant_id = %s'
class MetricsRepository(metrics_repository.AbstractMetricsRepository):
def __init__(self):
try:
self.conf = cfg.CONF
if self.conf.cassandra.user:
auth_provider = PlainTextAuthProvider(username=self.conf.cassandra.user,
password=self.conf.cassandra.password)
else:
auth_provider = None
self.cluster = Cluster(self.conf.cassandra.contact_points, auth_provider=auth_provider)
self.session = self.cluster.connect(self.conf.cassandra.keyspace)
self.dim_val_by_metric_stmt = self.session.prepare(DIMENSION_VALUE_BY_METRIC_CQL)
self.dim_val_stmt = self.session.prepare(DIMENSION_VALUE_CQL)
self.dim_name_by_metric_stmt = self.session.prepare(DIMENSION_NAME_BY_METRIC_CQL)
self.dim_name_stmt = self.session.prepare(DIMENSION_NAME_CQL)
self.metric_name_by_dimension_stmt = self.session.prepare(METRIC_NAME_BY_DIMENSION_CQL)
self.metric_name_by_dimension_offset_stmt = self.session.prepare(METRIC_NAME_BY_DIMENSION_OFFSET_CQL)
self.metric_name_stmt = self.session.prepare(METRIC_NAME_CQL)
self.metric_name_offset_stmt = self.session.prepare(METRIC_NAME_OFFSET_CQL)
self.metric_by_id_stmt = self.session.prepare(METRIC_BY_ID_CQL)
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
self.epoch = datetime.utcfromtimestamp(0)
def list_dimension_values(self, tenant_id, region, metric_name,
dimension_name):
try:
if metric_name:
rows = self.session.execute(
self.dim_val_by_metric_stmt,
[region, tenant_id, metric_name, dimension_name])
else:
rows = self.session.execute(
self.dim_val_stmt,
[region, tenant_id, dimension_name])
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
json_dim_value_list = []
if not rows:
return json_dim_value_list
for row in rows:
json_dim_value_list.append({u'dimension_value': row.value})
json_dim_value_list.sort(key=lambda x: x[u'dimension_value'])
return json_dim_value_list
def list_dimension_names(self, tenant_id, region, metric_name):
try:
if metric_name:
rows = self.session.execute(
self.dim_name_by_metric_stmt,
[region, tenant_id, metric_name])
ordered = True
else:
rows = self.session.execute(
self.dim_name_stmt,
[region, tenant_id])
ordered = False
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
if not rows:
return []
json_dim_name_list = [{u'dimension_name': row.name} for row in rows]
if not ordered:
json_dim_name_list.sort(key=lambda x: x[u'dimension_name'])
return json_dim_name_list
def list_metrics(self, tenant_id, region, name, dimensions, offset, limit, start_time=None,
end_time=None):
offset_name = None
offset_dimensions = []
names = []
metric_list = []
offset_futures = []
non_offset_futures = []
try:
if offset:
offset_metric = self._get_metric_by_id(offset)
if offset_metric:
offset_name = offset_metric.name
offset_dimensions = offset_metric.dimensions
if not name:
names = self._list_metric_names(tenant_id, region, dimensions, offset=offset_name)
if names:
names = [elem['name'] for elem in names]
else:
names.append(name)
if not names:
return metric_list
for name in names:
if name == offset_name:
futures = self._list_metrics_by_name(tenant_id, region, name, dimensions, offset_dimensions,
limit, start_time=None, end_time=None)
if offset_dimensions and dimensions:
offset_futures.extend(futures)
else:
non_offset_futures.extend(futures)
else:
non_offset_futures.extend(
self._list_metrics_by_name(tenant_id, region, name, dimensions, None, limit,
start_time=None, end_time=None))
# manually filter out metrics by the offset dimension
for future in offset_futures:
rows = future.result()
for row in rows:
if offset_dimensions >= row.dimensions:
continue
metric_list.append(self._process_metric_row(row))
for future in non_offset_futures:
metric_list.extend((self._process_metric_row(row) for row in future.result()))
return metric_list
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
@staticmethod
def _process_metric_row(row):
dim_map = {}
for d in row.dimensions:
pair = d.split('\t')
dim_map[pair[0]] = pair[1]
metric = {'id': binascii.hexlify(bytearray(row.metric_id)),
'name': row.metric_name,
'dimensions': dim_map}
return metric
def _list_metrics_by_name(self, tenant_id, region, name, dimensions, dimension_offset, limit, start_time=None,
end_time=None):
or_dimensions = []
sub_dimensions = {}
futures = []
if not dimensions:
query = self._build_metrics_by_name_query(tenant_id, region, name, dimensions, None, start_time,
end_time, dimension_offset, limit)
futures.append(self.session.execute_async(query[0], query[1]))
return futures
wildcard_dimensions = []
for dim_name, dim_value in dimensions.items():
if not dim_value:
wildcard_dimensions.append(dim_name)
elif '|' in dim_value:
def f(val):
return {dim_name: val}
or_dimensions.append(list(map(f, sorted(dim_value.split('|')))))
else:
sub_dimensions[dim_name] = dim_value
if or_dimensions:
or_dims_list = list(itertools.product(*or_dimensions))
for or_dims_tuple in or_dims_list:
extracted_dimensions = sub_dimensions.copy()
for dims in iter(or_dims_tuple):
for k, v in dims.items():
extracted_dimensions[k] = v
query = self._build_metrics_by_name_query(tenant_id, region, name, extracted_dimensions,
wildcard_dimensions, start_time,
end_time, dimension_offset, limit)
futures.append(self.session.execute_async(query[0], query[1]))
else:
query = self._build_metrics_by_name_query(tenant_id, region, name, sub_dimensions, wildcard_dimensions,
start_time,
end_time, dimension_offset, limit)
futures.append(self.session.execute_async(query[0], query[1]))
return futures
def _get_metric_by_id(self, metric_id):
rows = self.session.execute(self.metric_by_id_stmt, [bytearray.fromhex(metric_id)])
if rows:
return Metric(id=metric_id, name=rows[0].metric_name, dimensions=rows[0].dimensions)
return None
def _build_metrics_by_name_query(self, tenant_id, region, name, dimensions, wildcard_dimensions, start_time,
end_time, dim_offset,
limit):
conditions = [REGION_EQ, TENANT_EQ]
params = [region, tenant_id.encode('utf8')]
if name:
conditions.append(METRIC_NAME_EQ)
params.append(name)
else:
conditions.append('')
if dimensions:
conditions.append(DIMENSIONS_CONTAINS * len(dimensions))
params.extend(
[self._create_dimension_value_entry(dim_name, dim_value)
for dim_name, dim_value in dimensions.items()])
else:
conditions.append('')
if wildcard_dimensions:
conditions.append(DIMENSIONS_NAME_CONTAINS * len(wildcard_dimensions))
params.extend(wildcard_dimensions)
else:
conditions.append('')
if dim_offset and not dimensions:
# cassandra does not allow using both contains and GT in collection column
conditions.append(DIMENSIONS_GT)
params.append(dim_offset)
else:
conditions.append('')
if start_time:
conditions.append(UPDATED_TIME_GE % start_time)
else:
conditions.append('')
if end_time:
conditions.append(CREATED_TIME_LE % end_time)
else:
conditions.append('')
if limit:
conditions.append(LIMIT_CLAUSE)
params.append(limit)
else:
conditions.append('')
if (not name) or dimensions or wildcard_dimensions or start_time or end_time:
conditions.append(ALLOW_FILTERING)
else:
conditions.append('')
return METRIC_LIST_CQL % tuple(conditions), params
@staticmethod
def _create_dimension_value_entry(name, value):
return '%s\t%s' % (name, value)
def list_metric_names(self, tenant_id, region, dimensions):
return self._list_metric_names(tenant_id, region, dimensions)
def _list_metric_names(self, tenant_id, region, dimensions, offset=None):
or_dimensions = []
single_dimensions = {}
if dimensions:
for key, value in dimensions.items():
if not value:
continue
elif '|' in value:
def f(val):
return {key: val}
or_dimensions.append(list(map(f, sorted(value.split('|')))))
else:
single_dimensions[key] = value
if or_dimensions:
names = []
or_dims_list = list(itertools.product(*or_dimensions))
for or_dims_tuple in or_dims_list:
extracted_dimensions = single_dimensions.copy()
for dims in iter(or_dims_tuple):
for k, v in dims.items():
extracted_dimensions[k] = v
names.extend(
self._list_metric_names_single_dimension_value(tenant_id, region, extracted_dimensions, offset))
names.sort(key=lambda x: x[u'name'])
return names
else:
names = self._list_metric_names_single_dimension_value(tenant_id, region, single_dimensions, offset)
names.sort(key=lambda x: x[u'name'])
return names
def _list_metric_names_single_dimension_value(self, tenant_id, region, dimensions, offset=None):
try:
futures = []
if dimensions:
for name, value in dimensions.items():
if offset:
futures.append(self.session.execute_async(self.metric_name_by_dimension_offset_stmt,
[region, tenant_id, name, value, offset]))
else:
futures.append(self.session.execute_async(self.metric_name_by_dimension_stmt,
[region, tenant_id, name, value]))
else:
if offset:
futures.append(
self.session.execute_async(self.metric_name_offset_stmt, [region, tenant_id, offset]))
else:
futures.append(self.session.execute_async(self.metric_name_stmt, [region, tenant_id]))
names_list = []
for future in futures:
rows = future.result()
tmp = set()
for row in rows:
tmp.add(row.metric_name)
names_list.append(tmp)
return [{u'name': v} for v in set.intersection(*names_list)]
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
def measurement_list(self, tenant_id, region, name, dimensions,
start_timestamp, end_timestamp, offset, limit,
merge_metrics_flag, group_by):
metrics = self.list_metrics(tenant_id, region, name, dimensions, None, None)
if offset:
tmp = offset.split("_")
if len(tmp) > 1:
offset_id = tmp[0]
offset_timestamp = tmp[1]
else:
offset_id = None
offset_timestamp = offset
else:
offset_timestamp = None
offset_id = None
if not metrics:
return None
elif len(metrics) > 1:
if not merge_metrics_flag and not group_by:
raise exceptions.MultipleMetricsException(self.MULTIPLE_METRICS_MESSAGE)
try:
if len(metrics) > 1 and not group_by:
# offset is controlled only by offset_timestamp when the group by option is not enabled
count, series_list = self._query_merge_measurements(metrics,
dimensions,
start_timestamp,
end_timestamp,
offset_timestamp,
limit)
return series_list
if group_by:
if not isinstance(group_by, list):
group_by = group_by.split(',')
elif len(group_by) == 1:
group_by = group_by[0].split(',')
if len(metrics) == 1 or group_by[0].startswith('*'):
if offset_id:
for index, metric in enumerate(metrics):
if metric['id'] == offset_id:
if index > 0:
metrics[0:index] = []
break
count, series_list = self._query_measurements(metrics,
start_timestamp,
end_timestamp,
offset_timestamp,
limit)
return series_list
grouped_metrics = self._group_metrics(metrics, group_by, dimensions)
if not grouped_metrics or len(grouped_metrics) == 0:
return None
if offset_id:
found_offset = False
for outer_index, sublist in enumerate(grouped_metrics):
for inner_index, metric in enumerate(sublist):
if metric['id'] == offset_id:
found_offset = True
if inner_index > 0:
sublist[0:inner_index] = []
break
if found_offset:
if outer_index > 0:
grouped_metrics[0:outer_index] = []
break
remaining = limit
series_list = []
for sublist in grouped_metrics:
sub_count, results = self._query_merge_measurements(sublist,
sublist[0]['dimensions'],
start_timestamp,
end_timestamp,
offset_timestamp,
remaining)
series_list.extend(results)
if remaining:
remaining -= sub_count
if remaining <= 0:
break
# offset_timestamp is used only in the first group, reset to None for subsequent groups
if offset_timestamp:
offset_timestamp = None
return series_list
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
def _query_merge_measurements(self, metrics, dimensions, start_timestamp, end_timestamp,
offset_timestamp, limit):
results = []
for metric in metrics:
if limit and len(metrics) > 1:
fetch_size = min(limit, max(1000, limit / len(metrics) + 2))
else:
fetch_size = None
query = self._build_measurement_query(metric['id'],
start_timestamp,
end_timestamp,
offset_timestamp,
limit,
fetch_size)
results.append((metric, iter(self.session.execute_async(query[0], query[1]).result())))
return self._merge_series(results, dimensions, limit)
def _query_measurements(self, metrics, start_timestamp, end_timestamp,
offset_timestamp, limit):
results = []
for index, metric in enumerate(metrics):
if index == 0:
query = self._build_measurement_query(metric['id'],
start_timestamp,
end_timestamp,
offset_timestamp,
limit)
else:
if limit:
fetch_size = min(self.session.default_fetch_size,
max(1000, limit / min(index, 4)))
else:
fetch_size = self.session.default_fetch_size
query = self._build_measurement_query(metric['id'],
start_timestamp,
end_timestamp,
None,
limit,
fetch_size)
results.append([metric,
iter(self.session.execute_async(query[0], query[1]).result())])
series_list = []
count = 0
for result in results:
measurements = []
row = next(result[1], None)
while row:
measurements.append([self._isotime_msec(row.time_stamp),
row.value,
rest_utils.from_json(row.value_meta) if row.value_meta else {}])
count += 1
if limit and count >= limit:
break
row = next(result[1], None)
series_list.append({'name': result[0]['name'],
'id': result[0]['id'],
'columns': ['timestamp', 'value', 'value_meta'],
'measurements': measurements,
'dimensions': result[0]['dimensions']})
if limit and count >= limit:
break
return count, series_list
@staticmethod
def _build_measurement_query(metric_id, start_timestamp,
end_timestamp, offset_timestamp,
limit=None, fetch_size=FETCH_SIZE_UNSET):
conditions = [METRIC_ID_EQ]
decode_metric_id = metric_id if six.PY2 else metric_id.decode('utf-8')
params = [bytearray.fromhex(decode_metric_id)]
if offset_timestamp:
conditions.append(OFFSET_TIME_GT)
params.append(offset_timestamp)
elif start_timestamp:
conditions.append(START_TIME_GE)
params.append(int(start_timestamp * 1000))
else:
conditions.append('')
if end_timestamp:
conditions.append(END_TIME_LE)
params.append(int(end_timestamp * 1000))
else:
conditions.append('')
if limit:
conditions.append(LIMIT_CLAUSE)
params.append(limit)
else:
conditions.append('')
return SimpleStatement(MEASUREMENT_LIST_CQL % tuple(conditions), fetch_size=fetch_size), params
def _merge_series(self, series, dimensions, limit):
series_list = []
if not series:
return series_list
measurements = []
top_batch = []
num_series = len(series)
for i in range(0, num_series):
row = next(series[i][1], None)
if row:
top_batch.append([i,
row.time_stamp,
row.value,
rest_utils.from_json(row.value_meta) if row.value_meta else {}])
else:
num_series -= 1
top_batch.sort(key=lambda m: m[1], reverse=True)
count = 0
while (not limit or count < limit) and top_batch:
measurements.append([self._isotime_msec(top_batch[num_series - 1][1]),
top_batch[num_series - 1][2],
top_batch[num_series - 1][3]])
count += 1
row = next(series[top_batch[num_series - 1][0]][1], None)
if row:
top_batch[num_series - 1] = [top_batch[num_series - 1][0],
row.time_stamp,
row.value,
rest_utils.from_json(row.value_meta) if row.value_meta else {}]
top_batch.sort(key=lambda m: m[1], reverse=True)
else:
num_series -= 1
top_batch.pop()
series_list.append({'name': series[0][0]['name'],
'id': series[0][0]['id'],
'columns': ['timestamp', 'value', 'value_meta'],
'measurements': measurements,
'dimensions': dimensions})
return count, series_list
@staticmethod
def _group_metrics(metrics, group_by, search_by):
grouped_metrics = {}
for metric in metrics:
key = ''
display_dimensions = dict(search_by.items())
for name in group_by:
# '_' ensures te key with missing dimension is sorted lower
value = metric['dimensions'].get(name, '_')
if value != '_':
display_dimensions[name] = value
key = key + '='.join((urllib.quote_plus(name), urllib.quote_plus(value))) + '&'
metric['dimensions'] = display_dimensions
if key in grouped_metrics:
grouped_metrics[key].append(metric)
else:
grouped_metrics[key] = [metric]
grouped_metrics = grouped_metrics.items()
grouped_metrics.sort(key=lambda k: k[0])
return [x[1] for x in grouped_metrics]
@staticmethod
def _isotime_msec(timestamp):
"""Stringify datetime in ISO 8601 format + millisecond.
"""
st = timestamp.isoformat()
if '.' in st:
st = st[:23] + 'Z'
else:
st += '.000Z'
return text(st)
def metrics_statistics(self, tenant_id, region, name, dimensions,
start_timestamp, end_timestamp, statistics,
period, offset, limit, merge_metrics_flag,
group_by):
if not period:
period = 300
else:
period = int(period)
series_list = self.measurement_list(tenant_id, region, name, dimensions,
start_timestamp, end_timestamp,
offset, None, merge_metrics_flag, group_by)
json_statistics_list = []
if not series_list:
return json_statistics_list
statistics = [stat.lower() for stat in statistics]
columns = [u'timestamp']
columns.extend([x for x in ['avg', 'min', 'max', 'count', 'sum'] if x in statistics])
start_time = datetime.utcfromtimestamp(start_timestamp)
if end_timestamp:
end_time = datetime.utcfromtimestamp(end_timestamp)
else:
end_time = datetime.utcnow()
for series in series_list:
if limit <= 0:
break
measurements = series['measurements']
if not measurements:
continue
first_measure = measurements[0]
first_measure_start_time = MetricsRepository._parse_time_string(first_measure[0])
# skip blank intervals at the beginning, finds the start time of stat period that is not empty
stat_start_time = start_time + timedelta(
seconds=((first_measure_start_time - start_time).seconds / period) * period)
stats_list = []
stats_count = 0
stats_sum = 0
stats_min = stats_max = first_measure[1]
for measurement in series['measurements']:
time_stamp = MetricsRepository._parse_time_string(measurement[0])
value = measurement[1]
if (time_stamp - stat_start_time).seconds >= period:
stat = MetricsRepository._create_stat(statistics, stat_start_time, stats_count,
stats_sum, stats_min, stats_max)
stats_list.append(stat)
limit -= 1
if limit <= 0:
break
# initialize the new stat period
stats_sum = value
stats_count = 1
stats_min = value
stats_max = value
stat_start_time += timedelta(seconds=period)
else:
stats_min = min(stats_min, value)
stats_max = max(stats_max, value)
stats_count += 1
stats_sum += value
if stats_count:
stat = MetricsRepository._create_stat(statistics, stat_start_time, stats_count, stats_sum,
stats_min, stats_max)
stats_list.append(stat)
limit -= 1
stats_end_time = stat_start_time + timedelta(seconds=period) - timedelta(milliseconds=1)
if stats_end_time > end_time:
stats_end_time = end_time
statistic = {u'name': text(name),
u'id': series['id'],
u'dimensions': series['dimensions'],
u'columns': columns,
u'statistics': stats_list,
u'end_time': self._isotime_msec(stats_end_time)}
json_statistics_list.append(statistic)
return json_statistics_list
@staticmethod
def _create_stat(statistics, timestamp, stat_count=None, stat_sum=None, stat_min=None, stat_max=None):
stat = [MetricsRepository._isotime_msec(timestamp)]
if not stat_count:
stat.extend([0] * len(statistics))
else:
if 'avg' in statistics:
stat.append(stat_sum / stat_count)
if 'min' in statistics:
stat.append(stat_min)
if 'max' in statistics:
stat.append(stat_max)
if 'count' in statistics:
stat.append(stat_count)
if 'sum' in statistics:
stat.append(stat_sum)
return stat
@staticmethod
def _parse_time_string(timestamp):
dt = timeutils.parse_isotime(timestamp)
dt = timeutils.normalize_time(dt)
return dt
def alarm_history(self, tenant_id, alarm_id_list,
offset, limit, start_timestamp=None,
end_timestamp=None):
try:
json_alarm_history_list = []
if not alarm_id_list:
return json_alarm_history_list
conditions = [ALARM_TENANT_ID_EQ]
params = [tenant_id.encode('utf8')]
if len(alarm_id_list) == 1:
conditions.append(ALARM_ID_EQ)
params.append(alarm_id_list[0])
else:
conditions.append(' and alarm_id in ({}) '.format(','.join(['%s'] * len(alarm_id_list))))
for alarm_id in alarm_id_list:
params.append(alarm_id)
if offset:
conditions.append(OFFSET_TIME_GT)
params.append(offset)
elif start_timestamp:
conditions.append(START_TIME_GE)
params.append(int(start_timestamp * 1000))
else:
conditions.append('')
if end_timestamp:
conditions.append(END_TIME_LE)
params.append(int(end_timestamp * 1000))
else:
conditions.append('')
if limit:
conditions.append(LIMIT_CLAUSE)
params.append(limit + 1)
else:
conditions.append('')
rows = self.session.execute(ALARM_HISTORY_CQL % tuple(conditions), params)
if not rows:
return json_alarm_history_list
sorted_rows = sorted(rows, key=lambda row: row.time_stamp)
for (tenant_id, alarm_id, time_stamp, metrics, new_state, old_state, reason,
reason_data, sub_alarms) in sorted_rows:
alarm = {u'timestamp': self._isotime_msec(time_stamp),
u'alarm_id': alarm_id,
u'metrics': rest_utils.from_json(metrics),
u'new_state': new_state,
u'old_state': old_state,
u'reason': reason,
u'reason_data': reason_data,
u'sub_alarms': rest_utils.from_json(sub_alarms),
u'id': str(int((time_stamp - self.epoch).total_seconds() * 1000))}
if alarm[u'sub_alarms']:
for sub_alarm in alarm[u'sub_alarms']:
sub_expr = sub_alarm['sub_alarm_expression']
metric_def = sub_expr['metric_definition']
sub_expr['metric_name'] = metric_def['name']
sub_expr['dimensions'] = metric_def['dimensions']
del sub_expr['metric_definition']
json_alarm_history_list.append(alarm)
return json_alarm_history_list
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
@staticmethod
def check_status():
try:
cluster = Cluster(
CONF.cassandra.contact_points
)
session = cluster.connect(CONF.cassandra.keyspace)
session.shutdown()
except Exception as ex:
LOG.exception(str(ex))
return False, str(ex)
return True, 'OK'