# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import binascii from datetime import datetime from datetime import timedelta import itertools import urllib from cassandra.cluster import Cluster from cassandra.query import SimpleStatement from oslo_config import cfg from oslo_log import log from oslo_utils import timeutils from monasca_common.rest import utils as rest_utils from monasca_api.common.repositories import exceptions from monasca_api.common.repositories import metrics_repository LOG = log.getLogger(__name__) class MetricsRepository(metrics_repository.AbstractMetricsRepository): def __init__(self): try: self.conf = cfg.CONF self._cassandra_cluster = Cluster( self.conf.cassandra.cluster_ip_addresses ) self.cassandra_session = self._cassandra_cluster.connect( self.conf.cassandra.keyspace ) except Exception as ex: LOG.exception(ex) raise exceptions.RepositoryException(ex) def list_metrics(self, tenant_id, region, name, dimensions, offset, limit, start_timestamp=None, end_timestamp=None, include_metric_hash=False): or_dimensions = [] sub_dimensions = {} if dimensions: for key, value in dimensions.iteritems(): if not value: sub_dimensions[key] = value elif '|' in value: def f(val): return {key: val} or_dimensions.append(list(map(f, value.split('|')))) else: sub_dimensions[key] = value if or_dimensions: or_dims_list = list(itertools.product(*or_dimensions)) metrics_list = [] for or_dims_tuple in or_dims_list: extracted_dimensions = sub_dimensions.copy() for dims in iter(or_dims_tuple): for k, v in dims.iteritems(): extracted_dimensions[k] = v metrics = self._list_metrics(tenant_id, region, name, extracted_dimensions, offset, limit, start_timestamp, end_timestamp, include_metric_hash) metrics_list += metrics return sorted(metrics_list, key=lambda metric: metric['id']) return self._list_metrics(tenant_id, region, name, dimensions, offset, limit, start_timestamp, end_timestamp, include_metric_hash) def _list_metrics(self, tenant_id, region, name, dimensions, offset, limit, start_timestamp=None, end_timestamp=None, include_metric_hash=False): try: select_stmt = """ select tenant_id, region, metric_hash, metric_map from metric_map where tenant_id = %s and region = %s """ parms = [tenant_id.encode('utf8'), region.encode('utf8')] name_clause = self._build_name_clause(name, parms) dimension_clause = self._build_dimensions_clause(dimensions, parms) select_stmt += name_clause + dimension_clause if offset: select_stmt += ' and metric_hash > %s ' parms.append(bytearray(offset.decode('hex'))) if limit: select_stmt += ' limit %s ' parms.append(limit + 1) select_stmt += ' allow filtering ' json_metric_list = [] stmt = SimpleStatement(select_stmt, fetch_size=2147483647) rows = self.cassandra_session.execute(stmt, parms) if not rows: return json_metric_list for (tenant_id, region, metric_hash, metric_map) in rows: metric = {} dimensions = {} if include_metric_hash: metric[u'metric_hash'] = metric_hash for name, value in metric_map.iteritems(): if name == '__name__': name = urllib.unquote_plus(value) metric[u'name'] = name else: name = urllib.unquote_plus(name) value = urllib.unquote_plus(value) dimensions[name] = value metric[u'dimensions'] = dimensions metric[u'id'] = binascii.hexlify(bytearray(metric_hash)) json_metric_list.append(metric) return json_metric_list except Exception as ex: LOG.exception(ex) raise exceptions.RepositoryException(ex) def _build_dimensions_clause(self, dimensions, parms): dimension_clause = '' if dimensions: for name, value in dimensions.iteritems(): if not value: dimension_clause += ' and metric_map contains key %s ' parms.append(urllib.quote_plus(name).encode('utf8')) else: dimension_clause += ' and metric_map[%s] = %s ' parms.append(urllib.quote_plus(name).encode('utf8')) parms.append(urllib.quote_plus(value).encode('utf8')) return dimension_clause def _build_name_clause(self, name, parms): name_clause = '' if name: name_clause = ' and metric_map[%s] = %s ' parms.append(urllib.quote_plus('__name__').encode('utf8')) parms.append(urllib.quote_plus(name).encode('utf8')) return name_clause def _build_select_metric_map_query(self, tenant_id, region, parms): select_stmt = """ select metric_map from metric_map where tenant_id = %s and region = %s """ parms.append(tenant_id.encode('utf8')) parms.append(region.encode('utf8')) return select_stmt def measurement_list(self, tenant_id, region, name, dimensions, start_timestamp, end_timestamp, offset, limit, merge_metrics_flag): try: json_measurement_list = [] rows = self._get_measurements(tenant_id, region, name, dimensions, start_timestamp, end_timestamp, offset, limit, merge_metrics_flag) if not rows: return json_measurement_list if not merge_metrics_flag: dimensions = self._get_dimensions(tenant_id, region, name, dimensions) measurements_list = ( [[self._isotime_msec(time_stamp), value, rest_utils.from_json(value_meta) if value_meta else {}] for (time_stamp, value, value_meta) in rows]) measurement = {u'name': name, # The last date in the measurements list. u'id': measurements_list[-1][0], u'dimensions': dimensions, u'columns': [u'timestamp', u'value', u'value_meta'], u'measurements': measurements_list} json_measurement_list.append(measurement) return json_measurement_list except exceptions.RepositoryException as ex: LOG.exception(ex) raise ex except Exception as ex: LOG.exception(ex) raise exceptions.RepositoryException(ex) def _get_measurements(self, tenant_id, region, name, dimensions, start_timestamp, end_timestamp, offset, limit, merge_metrics_flag): metric_list = self.list_metrics(tenant_id, region, name, dimensions, None, None, start_timestamp, end_timestamp, include_metric_hash=True) if not metric_list: return None if len(metric_list) > 1: if not merge_metrics_flag: raise exceptions.MultipleMetricsException( self.MULTIPLE_METRICS_MESSAGE) select_stmt = """ select time_stamp, value, value_meta from measurements where tenant_id = %s and region = %s """ parms = [tenant_id.encode('utf8'), region.encode('utf8')] metric_hash_list = [bytearray(metric['metric_hash']) for metric in metric_list] place_holders = ["%s"] * len(metric_hash_list) in_clause = ' and metric_hash in ({}) '.format(",".join(place_holders)) select_stmt += in_clause parms.extend(metric_hash_list) if offset: select_stmt += ' and time_stamp > %s ' parms.append(offset) elif start_timestamp: select_stmt += ' and time_stamp >= %s ' parms.append(int(start_timestamp * 1000)) if end_timestamp: select_stmt += ' and time_stamp <= %s ' parms.append(int(end_timestamp * 1000)) select_stmt += ' order by time_stamp ' if limit: select_stmt += ' limit %s ' parms.append(limit + 1) stmt = SimpleStatement(select_stmt, fetch_size=2147483647) rows = self.cassandra_session.execute(stmt, parms) return rows def _get_dimensions(self, tenant_id, region, name, dimensions): metrics_list = self.list_metrics(tenant_id, region, name, dimensions, None, 2) if len(metrics_list) > 1: raise exceptions.MultipleMetricsException(self.MULTIPLE_METRICS_MESSAGE) if not metrics_list: return {} return metrics_list[0]['dimensions'] def list_metric_names(self, tenant_id, region, dimensions): try: parms = [] query = self._build_select_metric_map_query(tenant_id, region, parms) dimension_clause = self._build_dimensions_clause(dimensions, parms) query += dimension_clause stmt = SimpleStatement(query, fetch_size=2147483647) rows = self.cassandra_session.execute(stmt, parms) json_name_list = [] if not rows: return json_name_list for row in rows: metric_map = row.metric_map for name, value in metric_map.iteritems(): if name == '__name__': value = urllib.unquote_plus(value) metric_name = {u'name': value} if metric_name not in json_name_list: json_name_list.append(metric_name) break return sorted(json_name_list) except Exception as ex: LOG.exception(ex) raise exceptions.RepositoryException(ex) def metrics_statistics(self, tenant_id, region, name, dimensions, start_timestamp, end_timestamp, statistics, period, offset, limit, merge_metrics_flag): try: if not period: period = 300 period = int(period) if offset: if '_' in offset: tmp = datetime.strptime(str(offset).split('_')[1], "%Y-%m-%dT%H:%M:%SZ") tmp = tmp + timedelta(seconds=int(period)) # Leave out any ID as cassandra doesn't understand it offset = tmp.isoformat() else: tmp = datetime.strptime(offset, "%Y-%m-%dT%H:%M:%SZ") offset = tmp + timedelta(seconds=int(period)) rows = self._get_measurements(tenant_id, region, name, dimensions, start_timestamp, end_timestamp, offset, limit, merge_metrics_flag) json_statistics_list = [] if not rows: return json_statistics_list requested_statistics = [stat.lower() for stat in statistics] columns = [u'timestamp'] if 'avg' in requested_statistics: columns.append(u'avg') if 'min' in requested_statistics: columns.append(u'min') if 'max' in requested_statistics: columns.append(u'max') if 'count' in requested_statistics: columns.append(u'count') if 'sum' in requested_statistics: columns.append(u'sum') first_row = rows[0] stats_count = 0 stats_sum = 0 stats_max = first_row.value stats_min = first_row.value start_period = first_row.time_stamp stats_list = [] start_datetime = datetime.utcfromtimestamp(start_timestamp) if offset and offset > start_datetime: tmp_start_period = offset else: tmp_start_period = start_datetime while start_period >= tmp_start_period + timedelta(seconds=period): stat = [ tmp_start_period.strftime('%Y-%m-%dT%H:%M:%SZ') .decode('utf8') ] for _statistics in requested_statistics: stat.append(0) tmp_start_period += timedelta(seconds=period) stats_list.append(stat) for (time_stamp, value, value_meta) in rows: if (time_stamp - start_period).seconds >= period: stat = [ start_period.strftime('%Y-%m-%dT%H:%M:%SZ').decode( 'utf8')] if 'avg' in requested_statistics: stat.append(stats_sum / stats_count) if 'min' in requested_statistics: stat.append(stats_min) stats_min = value if 'max' in requested_statistics: stat.append(stats_max) stats_max = value if 'count' in requested_statistics: stat.append(stats_count) if 'sum' in requested_statistics: stat.append(stats_sum) stats_list.append(stat) tmp_start_period = start_period + timedelta(seconds=period) while time_stamp > tmp_start_period: stat = [ tmp_start_period.strftime('%Y-%m-%dT%H:%M:%SZ') .decode('utf8') ] for _statistics in requested_statistics: stat.append(0) tmp_start_period += timedelta(seconds=period) stats_list.append(stat) start_period = time_stamp stats_sum = 0 stats_count = 0 stats_count += 1 stats_sum += value if 'min' in requested_statistics: if value < stats_min: stats_min = value if 'max' in requested_statistics: if value > stats_max: stats_max = value if stats_count: stat = [start_period.strftime('%Y-%m-%dT%H:%M:%SZ').decode( 'utf8')] if 'avg' in requested_statistics: stat.append(stats_sum / stats_count) if 'min' in requested_statistics: stat.append(stats_min) if 'max' in requested_statistics: stat.append(stats_max) if 'count' in requested_statistics: stat.append(stats_count) if 'sum' in requested_statistics: stat.append(stats_sum) stats_list.append(stat) if end_timestamp: time_stamp = datetime.utcfromtimestamp(end_timestamp) else: time_stamp = datetime.now() tmp_start_period = start_period + timedelta(seconds=period) while time_stamp > tmp_start_period: stat = [ tmp_start_period.strftime('%Y-%m-%dT%H:%M:%SZ') .decode('utf8') ] for _statistics in requested_statistics: stat.append(0) tmp_start_period += timedelta(seconds=period) stats_list.append(stat) statistic = {u'name': name.decode('utf8'), # The last date in the stats list. u'id': stats_list[-1][0], u'dimensions': dimensions, u'columns': columns, u'statistics': stats_list} json_statistics_list.append(statistic) return json_statistics_list except exceptions.RepositoryException as ex: LOG.exception(ex) raise ex except Exception as ex: LOG.exception(ex) raise exceptions.RepositoryException(ex) def alarm_history(self, tenant_id, alarm_id_list, offset, limit, start_timestamp=None, end_timestamp=None): try: json_alarm_history_list = [] if not alarm_id_list: return json_alarm_history_list select_stmt = """ select alarm_id, time_stamp, metrics, new_state, old_state, reason, reason_data, sub_alarms, tenant_id from alarm_state_history where tenant_id = %s """ parms = [tenant_id.encode('utf8')] place_holders = ["%s"] * len(alarm_id_list) in_clause = ' and alarm_id in ({}) '.format( ",".join(place_holders)) select_stmt += in_clause parms.extend(alarm_id_list) if offset and offset != '0': select_stmt += ' and time_stamp > %s ' dt = timeutils.normalize_time(timeutils.parse_isotime(offset)) parms.append(self._get_millis_from_timestamp(dt)) elif start_timestamp: select_stmt += ' and time_stamp >= %s ' parms.append(int(start_timestamp * 1000)) if end_timestamp: select_stmt += ' and time_stamp <= %s ' parms.append(int(end_timestamp * 1000)) if limit: select_stmt += ' limit %s ' parms.append(limit + 1) stmt = SimpleStatement(select_stmt, fetch_size=2147483647) rows = self.cassandra_session.execute(stmt, parms) if not rows: return json_alarm_history_list sorted_rows = sorted(rows, key=lambda row: row.time_stamp) for (alarm_id, time_stamp, metrics, new_state, old_state, reason, reason_data, sub_alarms, tenant_id) in sorted_rows: alarm = {u'timestamp': self._isotime_msec(time_stamp), u'alarm_id': alarm_id, u'metrics': rest_utils.from_json(metrics), u'new_state': new_state, u'old_state': old_state, u'reason': reason, u'reason_data': reason_data, u'sub_alarms': rest_utils.from_json(sub_alarms), u'id': str(self._get_millis_from_timestamp(time_stamp) ).decode('utf8')} if alarm[u'sub_alarms']: for sub_alarm in alarm[u'sub_alarms']: sub_expr = sub_alarm['sub_alarm_expression'] metric_def = sub_expr['metric_definition'] sub_expr['metric_name'] = metric_def['name'] sub_expr['dimensions'] = metric_def['dimensions'] del sub_expr['metric_definition'] json_alarm_history_list.append(alarm) return json_alarm_history_list except Exception as ex: LOG.exception(ex) raise exceptions.RepositoryException(ex) @staticmethod def _isotime_msec(timestamp): """Stringify datetime in ISO 8601 format + millisecond. """ st = timestamp.isoformat() if '.' in st: st = st[:23] + 'Z' else: st += '.000Z' return st.decode('utf8') @staticmethod def _get_millis_from_timestamp(dt): dt = timeutils.normalize_time(dt) return int((dt - datetime(1970, 1, 1)).total_seconds() * 1000) def list_dimension_values(self, tenant_id, region, metric_name, dimension_name): try: parms = [] query = self._build_select_metric_map_query(tenant_id, region, parms) name_clause = self._build_name_clause(metric_name, parms) dimensions = {dimension_name: None} dimension_clause = self._build_dimensions_clause(dimensions, parms) query += name_clause + dimension_clause query += ' allow filtering ' stmt = SimpleStatement(query, fetch_size=2147483647) rows = self.cassandra_session.execute(stmt, parms) json_dim_value_list = [] if not rows: return json_dim_value_list for row in rows: metric_map = row.metric_map for name, value in metric_map.iteritems(): name = urllib.unquote_plus(name) value = urllib.unquote_plus(value) dim_value = {u'dimension_value': value} if name == dimension_name and dim_value not in json_dim_value_list: json_dim_value_list.append(dim_value) return sorted(json_dim_value_list) except Exception as ex: LOG.exception(ex) raise exceptions.RepositoryException(ex) def list_dimension_names(self, tenant_id, region, metric_name): try: parms = [] query = self._build_select_metric_map_query(tenant_id, region, parms) name_clause = self._build_name_clause(metric_name, parms) query += name_clause stmt = SimpleStatement(query, fetch_size=2147483647) rows = self.cassandra_session.execute(stmt, parms) json_dim_name_list = [] for row in rows: metric_map = row.metric_map for name, value in metric_map.iteritems(): name = urllib.unquote_plus(name) dim_name = {u'dimension_name': name} if name != '__name__' and dim_name not in json_dim_name_list: json_dim_name_list.append(dim_name) return sorted(json_dim_name_list) except Exception as ex: LOG.exception(ex) raise exceptions.RepositoryException(ex)