Enable Cassandra Database

Implement the following resource endpoints using
Cassandra as a backend DB.

metric names
metrics
alarm history
measurements
statistics

Change-Id: I9ddbd2037fb6b989dee5e33945df9dd08afa84e8
This commit is contained in:
Deklan Dieterly 2016-01-28 07:43:37 -07:00 committed by Shinya Kawabata
parent 71d430e929
commit 7abd139f0e
9 changed files with 985 additions and 20 deletions

View File

@ -40,7 +40,9 @@ driver = monasca_api.common.messaging.kafka_publisher:KafkaPublisher
[repositories]
# The driver to use for the metrics repository
# Switches depending on backend database in use. Influxdb or Cassandra.
metrics_driver = monasca_api.common.repositories.influxdb.metrics_repository:MetricsRepository
#metrics_driver = monasca_api.common.repositories.cassandra.metrics_repository:MetricsRepository
# The driver to use for the alarm definitions repository
alarm_definitions_driver = monasca_api.common.repositories.mysql.alarm_definitions_repository:AlarmDefinitionsRepository
@ -86,6 +88,7 @@ compact = False
partitions = 0
[influxdb]
# Only needed if Influxdb database is used for backend.
# The IP address of the InfluxDB service.
ip_address = 192.168.10.4
@ -101,6 +104,12 @@ password = password
# The name of the InfluxDB database to use.
database_name = mon
[cassandra]
# Only needed if Cassandra database is used for backend.
# Comma separated list of Cassandra node IP addresses. No spaces.
cluster_ip_addresses: 192.168.10.6
keyspace: monasca
# Below is configuration for database.
# The order of reading configuration for database is:
# 1) [mysql] section

View File

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -13,8 +13,19 @@
# License for the specific language governing permissions and limitations
# under the License.
import binascii
from datetime import datetime
from datetime import timedelta
import hashlib
import itertools
import json
import urllib
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
from monasca_api.common.repositories import exceptions
from monasca_api.common.repositories import metrics_repository
@ -22,46 +33,344 @@ from monasca_api.common.repositories import metrics_repository
LOG = log.getLogger(__name__)
class MetricsRepository(metrics_repository.MetricsRepository):
class MetricsRepository(metrics_repository.AbstractMetricsRepository):
def __init__(self):
try:
self.conf = cfg.CONF
self._cassandra_cluster = Cluster(
self.conf.cassandra.cluster_ip_addresses.split(','))
self.cassandra_session = self._cassandra_cluster.connect(
self.conf.cassandra.keyspace)
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
def list_metrics(self, tenant_id, region, name, dimensions, offset,
limit):
limit, start_timestamp=None, end_timestamp=None,
include_metric_hash=False):
or_dimensions = []
sub_dimensions = {}
if dimensions:
for key, value in dimensions.iteritems():
if not value:
sub_dimensions[key] = value
elif '|' in value:
def f(val):
return {key: val}
or_dimensions.append(list(map(f, value.split('|'))))
else:
sub_dimensions[key] = value
if or_dimensions:
or_dims_list = list(itertools.product(*or_dimensions))
metrics_list = []
for or_dims_tuple in or_dims_list:
extracted_dimensions = sub_dimensions.copy()
for dims in iter(or_dims_tuple):
for k, v in dims.iteritems():
extracted_dimensions[k] = v
metrics = self._list_metrics(tenant_id, region, name,
extracted_dimensions, offset,
limit, start_timestamp,
end_timestamp,
include_metric_hash)
metrics_list += metrics
return sorted(metrics_list, key=lambda metric: metric['id'])
return self._list_metrics(tenant_id, region, name, dimensions,
offset, limit, start_timestamp,
end_timestamp, include_metric_hash)
def _list_metrics(self, tenant_id, region, name, dimensions, offset,
limit, start_timestamp=None, end_timestamp=None,
include_metric_hash=False):
try:
select_stmt = """
select tenant_id, region, metric_hash, metric_map
from metric_map
where tenant_id = %s and region = %s
"""
parms = [tenant_id.encode('utf8'), region.encode('utf8')]
name_clause = self._build_name_clause(name, parms)
dimension_clause = self._build_dimensions_clause(dimensions, parms)
select_stmt += name_clause + dimension_clause
if offset:
select_stmt += ' and metric_hash > %s '
parms.append(bytearray(offset.decode('hex')))
if limit:
select_stmt += ' limit %s '
parms.append(limit + 1)
select_stmt += ' allow filtering '
json_metric_list = []
stmt = SimpleStatement(select_stmt,
fetch_size=2147483647)
rows = self.cassandra_session.execute(stmt, parms)
if not rows:
return json_metric_list
for (tenant_id, region, metric_hash, metric_map) in rows:
metric = {}
dimensions = {}
if include_metric_hash:
metric[u'metric_hash'] = metric_hash
for name, value in metric_map.iteritems():
if name == '__name__':
name = urllib.unquote_plus(value)
metric[u'name'] = name
else:
name = urllib.unquote_plus(name)
value = urllib.unquote_plus(value)
dimensions[name] = value
metric[u'dimensions'] = dimensions
metric[u'id'] = binascii.hexlify(bytearray(metric_hash))
json_metric_list.append(metric)
return json_metric_list
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
def _build_dimensions_clause(self, dimensions, parms):
dimension_clause = ''
if dimensions:
for name, value in dimensions.iteritems():
if not value:
dimension_clause += ' and metric_map contains key %s '
parms.append(urllib.quote_plus(name).encode('utf8'))
else:
dimension_clause += ' and metric_map[%s] = %s '
parms.append(urllib.quote_plus(name).encode('utf8'))
parms.append(urllib.quote_plus(value).encode('utf8'))
return dimension_clause
def _build_name_clause(self, name, parms):
name_clause = ''
if name:
name_clause = ' and metric_map[%s] = %s '
parms.append(urllib.quote_plus('__name__').encode('utf8'))
parms.append(urllib.quote_plus(name).encode('utf8'))
return name_clause
def measurement_list(self, tenant_id, region, name, dimensions,
start_timestamp, end_timestamp, offset,
limit, merge_metrics_flag):
try:
json_measurement_list = []
rows = self._get_measurements(tenant_id, region, name, dimensions,
start_timestamp, end_timestamp,
offset, limit, merge_metrics_flag)
if not rows:
return json_measurement_list
if not merge_metrics_flag:
dimensions = self._get_dimensions(tenant_id, region, name, dimensions)
measurements_list = (
[[self._isotime_msec(time_stamp),
value,
json.loads(value_meta) if value_meta else {}]
for (time_stamp, value, value_meta) in rows])
measurement = {u'name': name,
# The last date in the measurements list.
u'id': measurements_list[-1][0],
u'dimensions': dimensions,
u'columns': [u'timestamp', u'value', u'value_meta'],
u'measurements': measurements_list}
json_measurement_list.append(measurement)
return json_measurement_list
except exceptions.RepositoryException as ex:
LOG.exception(ex)
raise ex
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
def _get_measurements(self, tenant_id, region, name, dimensions,
start_timestamp, end_timestamp, offset, limit,
merge_metrics_flag):
metric_list = self.list_metrics(tenant_id, region, name,
dimensions, None, None,
start_timestamp, end_timestamp,
include_metric_hash=True)
if not metric_list:
return None
if len(metric_list) > 1:
if not merge_metrics_flag:
raise exceptions.MultipleMetricsException(
self.MULTIPLE_METRICS_MESSAGE)
select_stmt = """
select time_stamp, value, value_meta
from measurements
where tenant_id = %s and region = %s
"""
parms = [tenant_id.encode('utf8'), region.encode('utf8')]
metric_hash_list = [bytearray(metric['metric_hash']) for metric in
metric_list]
place_holders = ["%s"] * len(metric_hash_list)
in_clause = ' and metric_hash in ({}) '.format(",".join(place_holders))
select_stmt += in_clause
parms.extend(metric_hash_list)
if offset:
select_stmt += ' and time_stamp > %s '
parms.append(offset)
elif start_timestamp:
select_stmt += ' and time_stamp >= %s '
parms.append(int(start_timestamp * 1000))
if end_timestamp:
select_stmt += ' and time_stamp <= %s '
parms.append(int(end_timestamp * 1000))
select_stmt += ' order by time_stamp '
if limit:
select_stmt += ' limit %s '
parms.append(limit + 1)
stmt = SimpleStatement(select_stmt,
fetch_size=2147483647)
rows = self.cassandra_session.execute(stmt, parms)
return rows
def _get_dimensions(self, tenant_id, region, name, dimensions):
metrics_list = self.list_metrics(tenant_id, region, name,
dimensions, None, 2)
if len(metrics_list) > 1:
raise exceptions.MultipleMetricsException(self.MULTIPLE_METRICS_MESSAGE)
if not metrics_list:
return {}
return metrics_list[0]['dimensions']
def list_metric_names(self, tenant_id, region, dimensions, offset, limit):
try:
select_stmt = """
select metric_hash, metric_map
from metric_map
where tenant_id = %s and region = %s
"""
parms = [tenant_id.encode('utf8'), region.encode('utf8')]
dimension_clause = self._build_dimensions_clause(dimensions, parms)
select_stmt += dimension_clause
if offset:
select_stmt += ' and metric_hash > %s '
parms.append(bytearray(offset.decode('hex')))
if limit:
select_stmt += ' limit %s '
parms.append(limit + 1)
select_stmt += ' allow filtering'
json_name_list = []
stmt = SimpleStatement(select_stmt,
fetch_size=2147483647)
rows = self.cassandra_session.execute(stmt, parms)
if not rows:
return json_name_list
for (metric_hash, metric_map) in rows:
metric = {}
for name, value in metric_map.iteritems():
if name == '__name__':
name = urllib.unquote_plus(value)
metric[u'name'] = name
break
metric[u'id'] = binascii.hexlify(bytearray(metric_hash))
json_name_list.append(metric)
return json_name_list
except Exception as ex:
@ -69,15 +378,173 @@ class MetricsRepository(metrics_repository.MetricsRepository):
raise exceptions.RepositoryException(ex)
def metrics_statistics(self, tenant_id, region, name, dimensions,
start_timestamp,
end_timestamp, statistics, period, offset, limit,
merge_metrics_flag):
start_timestamp, end_timestamp, statistics,
period, offset, limit, merge_metrics_flag):
try:
if not period:
period = 300
period = int(period)
json_statistics_list = []
rows = self._get_measurements(tenant_id, region, name, dimensions,
start_timestamp, end_timestamp,
offset, limit, merge_metrics_flag)
if not rows:
return json_statistics_list
requested_statistics = [stat.lower() for stat in statistics]
columns = [u'timestamp']
if 'avg' in requested_statistics:
columns.append(u'avg')
if 'min' in requested_statistics:
columns.append(u'min')
if 'max' in requested_statistics:
columns.append(u'max')
if 'count' in requested_statistics:
columns.append(u'count')
if 'sum' in requested_statistics:
columns.append(u'sum')
first_row = rows[0]
stats_count = 0
stats_sum = 0
stats_max = first_row.value
stats_min = first_row.value
start_period = first_row.time_stamp
stats_list = []
tmp_start_period = datetime.utcfromtimestamp(start_timestamp)
while start_period >= tmp_start_period + timedelta(seconds=period):
stat = [
tmp_start_period.strftime('%Y-%m-%dT%H:%M:%SZ')
.decode('utf8')
]
for _statistics in requested_statistics:
stat.append(0)
tmp_start_period += timedelta(seconds=period)
stats_list.append(stat)
for (time_stamp, value, value_meta) in rows:
if (time_stamp - start_period).seconds >= period:
stat = [
start_period.strftime('%Y-%m-%dT%H:%M:%SZ').decode(
'utf8')]
if 'avg' in requested_statistics:
stat.append(stats_sum / stats_count)
if 'min' in requested_statistics:
stat.append(stats_min)
stats_min = value
if 'max' in requested_statistics:
stat.append(stats_max)
stats_max = value
if 'count' in requested_statistics:
stat.append(stats_count)
if 'sum' in requested_statistics:
stat.append(stats_sum)
stats_list.append(stat)
tmp_start_period = start_period + timedelta(seconds=period)
while time_stamp > tmp_start_period:
stat = [
tmp_start_period.strftime('%Y-%m-%dT%H:%M:%SZ')
.decode('utf8')
]
for _statistics in requested_statistics:
stat.append(0)
tmp_start_period += timedelta(seconds=period)
stats_list.append(stat)
start_period = time_stamp
stats_sum = 0
stats_count = 0
stats_count += 1
stats_sum += value
if 'min' in requested_statistics:
if value < stats_min:
stats_min = value
if 'max' in requested_statistics:
if value > stats_max:
stats_max = value
if stats_count:
stat = [start_period.strftime('%Y-%m-%dT%H:%M:%SZ').decode(
'utf8')]
if 'avg' in requested_statistics:
stat.append(stats_sum / stats_count)
if 'min' in requested_statistics:
stat.append(stats_min)
if 'max' in requested_statistics:
stat.append(stats_max)
if 'count' in requested_statistics:
stat.append(stats_count)
if 'sum' in requested_statistics:
stat.append(stats_sum)
stats_list.append(stat)
if end_timestamp:
time_stamp = datetime.utcfromtimestamp(end_timestamp)
else:
time_stamp = datetime.now()
tmp_start_period = start_period + timedelta(seconds=period)
while time_stamp > tmp_start_period:
stat = [
tmp_start_period.strftime('%Y-%m-%dT%H:%M:%SZ')
.decode('utf8')
]
for _statistics in requested_statistics:
stat.append(0)
tmp_start_period += timedelta(seconds=period)
stats_list.append(stat)
statistic = {u'name': name.decode('utf8'),
# The last date in the stats list.
u'id': stats_list[-1][0],
u'dimensions': dimensions,
u'columns': columns,
u'statistics': stats_list}
json_statistics_list.append(statistic)
return json_statistics_list
except exceptions.RepositoryException as ex:
LOG.exception(ex)
raise ex
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
@ -89,8 +556,175 @@ class MetricsRepository(metrics_repository.MetricsRepository):
try:
json_alarm_history_list = []
if not alarm_id_list:
return json_alarm_history_list
select_stmt = """
select alarm_id, time_stamp, metrics, new_state, old_state,
reason, reason_data, sub_alarms, tenant_id
from alarm_state_history
where tenant_id = %s
"""
parms = [tenant_id.encode('utf8')]
place_holders = ["%s"] * len(alarm_id_list)
in_clause = ' and alarm_id in ({}) '.format(
",".join(place_holders))
select_stmt += in_clause
parms.extend(alarm_id_list)
if offset and offset != '0':
select_stmt += ' and time_stamp > %s '
dt = timeutils.normalize_time(timeutils.parse_isotime(offset))
parms.append(self._get_millis_from_timestamp(dt))
elif start_timestamp:
select_stmt += ' and time_stamp >= %s '
parms.append(int(start_timestamp * 1000))
if end_timestamp:
select_stmt += ' and time_stamp <= %s '
parms.append(int(end_timestamp * 1000))
if limit:
select_stmt += ' limit %s '
parms.append(limit + 1)
stmt = SimpleStatement(select_stmt,
fetch_size=2147483647)
rows = self.cassandra_session.execute(stmt, parms)
if not rows:
return json_alarm_history_list
sorted_rows = sorted(rows, key=lambda row: row.time_stamp)
for (alarm_id, time_stamp, metrics, new_state, old_state, reason,
reason_data, sub_alarms, tenant_id) in sorted_rows:
alarm = {u'timestamp': self._isotime_msec(time_stamp),
u'alarm_id': alarm_id,
u'metrics': json.loads(metrics),
u'new_state': new_state,
u'old_state': old_state,
u'reason': reason,
u'reason_data': reason_data,
u'sub_alarms': json.loads(sub_alarms),
u'id': str(self._get_millis_from_timestamp(time_stamp)
).decode('utf8')}
if alarm[u'sub_alarms']:
for sub_alarm in alarm[u'sub_alarms']:
sub_expr = sub_alarm['sub_alarm_expression']
metric_def = sub_expr['metric_definition']
sub_expr['metric_name'] = metric_def['name']
sub_expr['dimensions'] = metric_def['dimensions']
del sub_expr['metric_definition']
json_alarm_history_list.append(alarm)
return json_alarm_history_list
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
@staticmethod
def _isotime_msec(timestamp):
"""Stringify datetime in ISO 8601 format + millisecond.
"""
st = timestamp.isoformat()
if '.' in st:
st = st[:23] + 'Z'
else:
st += '.000Z'
return st.decode('utf8')
@staticmethod
def _get_millis_from_timestamp(dt):
dt = timeutils.normalize_time(dt)
return int((dt - datetime(1970, 1, 1)).total_seconds() * 1000)
def _generate_dimension_values_id(self, metric_name, dimension_name):
sha1 = hashlib.sha1()
hashstr = "metricName=" + (metric_name or "") + "dimensionName=" + dimension_name
sha1.update(hashstr)
return sha1.hexdigest()
def list_dimension_values(self, tenant_id, region, metric_name,
dimension_name, offset, limit):
try:
select_stmt = """
select metric_map
from metric_map
where tenant_id = %s and region = %s
"""
parms = [tenant_id.encode('utf8'), region.encode('utf8')]
name_clause = self._build_name_clause(metric_name, parms)
dimensions = {dimension_name: None}
dimension_clause = self._build_dimensions_clause(dimensions, parms)
select_stmt += name_clause + dimension_clause
if offset:
select_stmt += ' and metric_hash > %s '
parms.append(bytearray(offset.decode('hex')))
if limit:
select_stmt += ' limit %s '
parms.append(limit + 1)
select_stmt += ' allow filtering '
stmt = SimpleStatement(select_stmt,
fetch_size=2147483647)
rows = self.cassandra_session.execute(stmt, parms)
sha1_id = self._generate_dimension_values_id(metric_name, dimension_name)
json_dim_vals = {u'id': sha1_id,
u'dimension_name': dimension_name,
u'values': []}
#
# Only return metric name if one was provided
#
if metric_name:
json_dim_vals[u'metric_name'] = metric_name
if not rows:
return json_dim_vals
dim_vals = set()
for row in rows:
metric_map = row.metric_map
for name, value in metric_map.iteritems():
name = urllib.unquote_plus(name)
value = urllib.unquote_plus(value)
if name == dimension_name:
dim_vals.add(value)
json_dim_vals[u'values'] = sorted(list(dim_vals))
return json_dim_vals
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)

View File

@ -19,7 +19,7 @@ from monasca_api.common.repositories import metrics_repository
LOG = log.getLogger(__name__)
class MetricsRepository(metrics_repository.MetricsRepository):
class MetricsRepository(metrics_repository.AbstractMetricsRepository):
def __init__(self):
return

View File

@ -14,8 +14,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from datetime import datetime
import hashlib
import json
@ -33,16 +33,7 @@ MEASUREMENT_NOT_FOUND_MSG = "measurement not found"
LOG = log.getLogger(__name__)
class MetricsRepository(metrics_repository.MetricsRepository):
MULTIPLE_METRICS_MESSAGE = ("Found multiple metrics matching metric name"
+ " and dimensions. Please refine your search"
+ " criteria using a unique"
+ " metric name or additional dimensions."
+ " Alternatively, you may specify"
+ " 'merge_metrics=True' as a query"
+ " parameter to combine all metrics"
+ " matching search criteria into a single"
+ " series.")
class MetricsRepository(metrics_repository.AbstractMetricsRepository):
def __init__(self):

View File

@ -18,11 +18,26 @@ import six
@six.add_metaclass(abc.ABCMeta)
class MetricsRepository(object):
class AbstractMetricsRepository(object):
MULTIPLE_METRICS_MESSAGE = ("Found multiple metrics matching metric name"
+ " and dimensions. Please refine your search"
+ " criteria using a unique"
+ " metric name or additional dimensions."
+ " Alternatively, you may specify"
+ " 'merge_metrics=True' as a query"
+ " parameter to combine all metrics"
+ " matching search criteria into a single"
+ " series.")
@abc.abstractmethod
def list_metrics(self, tenant_id, region, name, dimensions, offset, limit):
pass
@abc.abstractmethod
def list_metric_names(self, tenant_id, region, dimensions, offset, limit):
pass
@abc.abstractmethod
def measurement_list(self, tenant_id, region, name, dimensions,
start_timestamp, end_timestamp, offset, limit,

View File

@ -12,13 +12,20 @@
# License for the specific language governing permissions and limitations
# under the License.
import binascii
from collections import namedtuple
from datetime import datetime
import unittest
from mock import patch
import monasca_api.common.repositories.cassandra.metrics_repository as cassandra_repo
import monasca_api.common.repositories.influxdb.metrics_repository as influxdb_repo
from oslo_config import cfg
from oslo_config import fixture as fixture_config
from oslo_utils import timeutils
import testtools
CONF = cfg.CONF
@ -157,3 +164,304 @@ class TestRepoMetricsInfluxDB(unittest.TestCase):
u'id': 'bea9565d854a16a3366164de213694c190f27675',
u'metric_name': 'custom_metric'
})
class TestRepoMetricsCassandra(testtools.TestCase):
def setUp(self):
super(TestRepoMetricsCassandra, self).setUp()
self._fixture_config = self.useFixture(
fixture_config.Config(cfg.CONF))
self._fixture_config.config(cluster_ip_addresses='127.0.0.1',
group='cassandra')
@patch("monasca_api.common.repositories.cassandra.metrics_repository.Cluster.connect")
def test_list_metrics(self, cassandra_connect_mock):
cassandra_session_mock = cassandra_connect_mock.return_value
cassandra_session_mock.execute.return_value = [[
"0b5e7d8c43f74430add94fba09ffd66e",
"region",
binascii.unhexlify(b"01d39f19798ed27bbf458300bf843edd17654614"),
{
"__name__": "disk.space_used_perc",
"device": "rootfs",
"hostname": "host0",
"hosttype": "native",
"mount_point": "/",
}
]]
repo = cassandra_repo.MetricsRepository()
result = repo.list_metrics(
"0b5e7d8c43f74430add94fba09ffd66e",
"region",
name="disk.space_user_perc",
dimensions={
"hostname": "host0",
"hosttype": "native",
"mount_point": "/",
"device": "rootfs"},
offset=None,
limit=1)
self.assertEqual([{
u'id': u'01d39f19798ed27bbf458300bf843edd17654614',
u'name': u'disk.space_used_perc',
u'dimensions': {
u'device': u'rootfs',
u'hostname': u'host0',
u'mount_point': u'/',
u'hosttype': u'native'
}}], result)
@patch("monasca_api.common.repositories.cassandra.metrics_repository.Cluster.connect")
def test_list_metric_names(self, cassandra_connect_mock):
cassandra_session_mock = cassandra_connect_mock.return_value
cassandra_session_mock.execute.return_value = [
[
binascii.unhexlify(b"01d39f19798ed27bbf458300bf843edd17654614"),
{
"__name__": "disk.space_used_perc",
"device": "rootfs",
"hostname": "host0",
"hosttype": "native",
"mount_point": "/",
}
],
[
binascii.unhexlify(b"042da8f7445d779f4bb7214aaf744e512d897ac7"),
{
"__name__": "cpu.idle_perc",
"hostname": "host0",
"service": "monitoring"
}
]
]
repo = cassandra_repo.MetricsRepository()
result = repo.list_metric_names(
"0b5e7d8c43f74430add94fba09ffd66e",
"region",
dimensions={
"hostname": "host0",
"hosttype": "native",
"mount_point": "/",
"device": "rootfs"},
offset=None,
limit=1)
self.assertEqual([
{
u'name': u'disk.space_used_perc',
u'id': u'01d39f19798ed27bbf458300bf843edd17654614'
},
{
u'name': u'cpu.idle_perc',
u'id': u'042da8f7445d779f4bb7214aaf744e512d897ac7'
}
], result)
@patch("monasca_api.common.repositories.cassandra.metrics_repository.Cluster.connect")
def test_measurement_list(self, cassandra_connect_mock):
Measurement = namedtuple('Measurement', 'time_stamp value value_meta')
cassandra_session_mock = cassandra_connect_mock.return_value
cassandra_session_mock.execute.side_effect = [
[[
"0b5e7d8c43f74430add94fba09ffd66e",
"region",
binascii.unhexlify(b"01d39f19798ed27bbf458300bf843edd17654614"),
{
"__name__": "disk.space_used_perc",
"device": "rootfs",
"hostname": "host0",
"hosttype": "native",
"mount_point": "/",
"service": "monitoring",
}
]],
[
Measurement(self._convert_time_string("2015-03-14T09:26:53.59Z"), 2, None),
Measurement(self._convert_time_string("2015-03-14T09:26:53.591Z"), 2.5, ''),
Measurement(self._convert_time_string("2015-03-14T09:26:53.6Z"), 4.0, '{}'),
Measurement(self._convert_time_string("2015-03-14T09:26:54Z"), 4,
'{"key": "value"}'),
]
]
repo = cassandra_repo.MetricsRepository()
result = repo.measurement_list(
"tenant_id",
"region",
name="disk.space_used_perc",
dimensions=None,
start_timestamp=1,
end_timestamp=2,
offset=None,
limit=1,
merge_metrics_flag=True)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['dimensions'], None)
self.assertEqual(result[0]['name'], 'disk.space_used_perc')
self.assertEqual(result[0]['columns'],
['timestamp', 'value', 'value_meta'])
measurements = result[0]['measurements']
self.assertEqual(
[["2015-03-14T09:26:53.590Z", 2, {}],
["2015-03-14T09:26:53.591Z", 2.5, {}],
["2015-03-14T09:26:53.600Z", 4.0, {}],
["2015-03-14T09:26:54.000Z", 4, {"key": "value"}]],
measurements
)
@patch("monasca_api.common.repositories.cassandra.metrics_repository.Cluster.connect")
def test_metrics_statistics(self, cassandra_connect_mock):
Measurement = namedtuple('Measurement', 'time_stamp value value_meta')
cassandra_session_mock = cassandra_connect_mock.return_value
cassandra_session_mock.execute.side_effect = [
[[
"0b5e7d8c43f74430add94fba09ffd66e",
"region",
binascii.unhexlify(b"01d39f19798ed27bbf458300bf843edd17654614"),
{
"__name__": "cpu.idle_perc",
"hostname": "host0",
"service": "monitoring",
}
]],
[
Measurement(self._convert_time_string("2016-05-19T11:58:24Z"), 95.0, '{}'),
Measurement(self._convert_time_string("2016-05-19T11:58:25Z"), 97.0, '{}'),
Measurement(self._convert_time_string("2016-05-19T11:58:26Z"), 94.0, '{}'),
Measurement(self._convert_time_string("2016-05-19T11:58:27Z"), 96.0, '{}'),
]
]
start_timestamp = (self._convert_time_string("2016-05-19T11:58:24Z") -
datetime(1970, 1, 1)).total_seconds()
end_timestamp = (self._convert_time_string("2016-05-19T11:58:27Z") -
datetime(1970, 1, 1)).total_seconds()
print(start_timestamp)
repo = cassandra_repo.MetricsRepository()
result = repo.metrics_statistics(
"tenant_id",
"region",
name="cpu.idle_perc",
dimensions=None,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
statistics=['avg', 'min', 'max', 'count', 'sum'],
period=300,
offset=None,
limit=1,
merge_metrics_flag=True)
self.assertEqual([
{
u'dimensions': None,
u'statistics': [[u'2016-05-19T11:58:24Z', 95.5, 94, 97, 4, 382]],
u'name': u'cpu.idle_perc',
u'columns': [u'timestamp', u'avg', u'min', u'max', u'count', u'sum'],
u'id': u'2016-05-19T11:58:24Z'
}
], result)
@patch("monasca_api.common.repositories.cassandra.metrics_repository.Cluster.connect")
def test_alarm_history(self, cassandra_connect_mock):
AlarmHistory = namedtuple('AlarmHistory', 'alarm_id, time_stamp, metrics, '
'new_state, old_state, reason, '
'reason_data, sub_alarms, tenant_id')
cassandra_session_mock = cassandra_connect_mock.return_value
cassandra_session_mock.execute.return_value = [
AlarmHistory('09c2f5e7-9245-4b7e-bce1-01ed64a3c63d',
self._convert_time_string("2016-05-19T11:58:27Z"),
"""[{
"dimensions": {"hostname": "devstack", "service": "monitoring"},
"id": "",
"name": "cpu.idle_perc"
}]""",
'OK',
'UNDETERMINED',
'The alarm threshold(s) have not been exceeded for the sub-alarms: '
'avg(cpu.idle_perc) < 10.0 times 3 with the values: [84.35]',
'{}',
"""[
{
"sub_alarm_state": "OK",
"currentValues": [
"84.35"
],
"sub_alarm_expression": {
"function": "AVG",
"period": "60",
"threshold": "10.0",
"periods": "3",
"operator": "LT",
"metric_definition": {
"dimensions": "{}",
"id": "",
"name": "cpu.idle_perc"
}
}
}
]""",
'741e1aa149524c0f9887a8d6750f67b1')
]
repo = cassandra_repo.MetricsRepository()
result = repo.alarm_history('741e1aa149524c0f9887a8d6750f67b1',
['09c2f5e7-9245-4b7e-bce1-01ed64a3c63d'],
None, None)
self.assertEqual(
[{
u'id': u'1463659107000',
u'timestamp': u'2016-05-19T11:58:27.000Z',
u'new_state': u'OK',
u'old_state': u'UNDETERMINED',
u'reason_data': u'{}',
u'reason': u'The alarm threshold(s) have not been exceeded for the sub-alarms: '
u'avg(cpu.idle_perc) < 10.0 times 3 with the values: [84.35]',
u'alarm_id': u'09c2f5e7-9245-4b7e-bce1-01ed64a3c63d',
u'metrics': [{
u'id': u'',
u'name': u'cpu.idle_perc',
u'dimensions': {
u'service': u'monitoring',
u'hostname': u'devstack'
}
}],
u'sub_alarms': [
{
u'sub_alarm_state': u'OK',
u'currentValues': [
u'84.35'
],
u'sub_alarm_expression': {
u'dimensions': u'{}',
u'threshold': u'10.0',
u'periods': u'3',
u'operator': u'LT',
u'period': u'60',
u'metric_name': u'cpu.idle_perc',
u'function': u'AVG'
}
}
]
}], result)
@staticmethod
def _convert_time_string(date_time_string):
dt = timeutils.parse_isotime(date_time_string)
dt = timeutils.normalize_time(dt)
return dt

View File

@ -130,6 +130,12 @@ influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb')
cfg.CONF.register_group(influxdb_group)
cfg.CONF.register_opts(influxdb_opts, influxdb_group)
cassandra_opts = [cfg.StrOpt('cluster_ip_addresses'), cfg.StrOpt('keyspace')]
cassandra_group = cfg.OptGroup(name='cassandra', title='cassandra')
cfg.CONF.register_group(cassandra_group)
cfg.CONF.register_opts(cassandra_opts, cassandra_group)
mysql_opts = [cfg.StrOpt('database_name'), cfg.StrOpt('hostname'),
cfg.StrOpt('username'), cfg.StrOpt('password')]

View File

@ -14,6 +14,7 @@ six
pyparsing
voluptuous
#influxdb
#cassandra-driver>=2.1.4,!=3.6.0 # Apache-2.0
eventlet
kafka-python>=0.9.5,<1.0.0
simplejson

View File

@ -13,6 +13,7 @@ deps = -r{toxinidir}/requirements.txt
whitelist_externals = find
commands =
pip install influxdb==2.8.0
pip install cassandra-driver>=2.1.4,!=3.6.0
find . -type f -name "*.pyc" -delete
python setup.py testr --testr-args='{posargs}'