storage,rest: add support for granularity in aggregation

This allows to provide the storage driver a granularity argument when
doing cross-metric aggregation.

Change-Id: I33d48e66c39f7cf02f5fa26697994750b5215513
Closes-Bug: #1512740
This commit is contained in:
Julien Danjou 2016-01-14 14:47:54 +01:00
parent 57ac881e06
commit 9afee322b3
5 changed files with 164 additions and 25 deletions

View File

@ -357,13 +357,15 @@ class AggregatedMetricController(rest.RestController):
@pecan.expose('json')
def get_measures(self, start=None, stop=None, aggregation='mean',
needed_overlap=100.0):
granularity=None, needed_overlap=100.0):
return self.get_cross_metric_measures_from_ids(
self.metric_ids, start, stop, aggregation, needed_overlap)
self.metric_ids, start, stop,
aggregation, granularity, needed_overlap)
@classmethod
def get_cross_metric_measures_from_ids(cls, metric_ids, start=None,
stop=None, aggregation='mean',
granularity=None,
needed_overlap=100.0):
# Check RBAC policy
metrics = pecan.request.indexer.get_metrics(metric_ids)
@ -374,11 +376,12 @@ class AggregatedMetricController(rest.RestController):
abort(404, storage.MetricDoesNotExist(
missing_metric_ids.pop()))
return cls.get_cross_metric_measures_from_objs(
metrics, start, stop, aggregation, needed_overlap)
metrics, start, stop, aggregation, granularity, needed_overlap)
@staticmethod
def get_cross_metric_measures_from_objs(metrics, start=None, stop=None,
aggregation='mean',
granularity=None,
needed_overlap=100.0):
try:
needed_overlap = float(needed_overlap)
@ -400,14 +403,22 @@ class AggregatedMetricController(rest.RestController):
try:
if number_of_metrics == 0:
return []
if granularity is not None:
try:
granularity = float(granularity)
except ValueError as e:
abort(400, "granularity must be a float: %s" % e)
if number_of_metrics == 1:
# NOTE(sileht): don't do the aggregation if we only have one
# metric
measures = pecan.request.storage.get_measures(
metrics[0], start, stop, aggregation)
metrics[0], start, stop, aggregation,
granularity)
else:
measures = pecan.request.storage.get_cross_metric_measures(
metrics, start, stop, aggregation, needed_overlap)
metrics, start, stop, aggregation,
granularity,
needed_overlap)
# Replace timestamp keys by their string versions
return [(timestamp.isoformat(), offset, v)
for timestamp, offset, v in measures]
@ -1188,7 +1199,7 @@ class AggregationResource(rest.RestController):
@pecan.expose('json')
def post(self, start=None, stop=None, aggregation='mean',
needed_overlap=100.0):
granularity=None, needed_overlap=100.0):
resources = SearchResourceTypeController(self.resource_type).post()
metrics = []
for r in resources:
@ -1196,7 +1207,7 @@ class AggregationResource(rest.RestController):
if m:
metrics.append(m)
return AggregatedMetricController.get_cross_metric_measures_from_objs(
metrics, start, stop, aggregation, needed_overlap)
metrics, start, stop, aggregation, granularity, needed_overlap)
class Aggregation(rest.RestController):
@ -1214,9 +1225,10 @@ class Aggregation(rest.RestController):
@pecan.expose('json')
def get_metric(self, metric=None, start=None,
stop=None, aggregation='mean',
needed_overlap=100.0):
granularity=None, needed_overlap=100.0):
return AggregatedMetricController.get_cross_metric_measures_from_ids(
arg_to_list(metric), start, stop, aggregation, needed_overlap)
arg_to_list(metric), start, stop, aggregation,
granularity, needed_overlap)
class CapabilityController(rest.RestController):

View File

@ -260,17 +260,23 @@ class StorageDriver(object):
@staticmethod
def get_cross_metric_measures(metrics, from_timestamp=None,
to_timestamp=None, aggregation='mean',
granularity=None,
needed_overlap=None):
"""Get aggregated measures of multiple entities.
:param entities: The entities measured to aggregate.
:param from timestamp: The timestamp to get the measure from.
:param to timestamp: The timestamp to get the measure to.
:param granularity: The granularity to retrieve.
:param aggregation: The type of aggregation to retrieve.
"""
for metric in metrics:
if aggregation not in metric.archive_policy.aggregation_methods:
raise AggregationDoesNotExist(metric, aggregation)
if (granularity is not None and granularity
not in set(d.granularity
for d in metric.archive_policy.definition)):
raise GranularityDoesNotExist(metric, granularity)
@staticmethod
def search_value(metrics, query, from_timestamp=None,

View File

@ -303,31 +303,37 @@ class CarbonaraBasedStorage(storage.StorageDriver):
finally:
lock.release()
# TODO(jd) Add granularity parameter here and in the REST API
# rather than fetching all granularities
def get_cross_metric_measures(self, metrics, from_timestamp=None,
to_timestamp=None, aggregation='mean',
granularity=None,
needed_overlap=100.0):
super(CarbonaraBasedStorage, self).get_cross_metric_measures(
metrics, from_timestamp, to_timestamp, aggregation, needed_overlap)
metrics, from_timestamp, to_timestamp,
aggregation, granularity, needed_overlap)
granularities = (definition.granularity
for metric in metrics
for definition in metric.archive_policy.definition)
granularities_in_common = [
granularity
for granularity, occurence in six.iteritems(
collections.Counter(granularities))
if occurence == len(metrics)
]
if granularity is None:
granularities = (
definition.granularity
for metric in metrics
for definition in metric.archive_policy.definition
)
granularities_in_common = [
g
for g, occurence in six.iteritems(
collections.Counter(granularities))
if occurence == len(metrics)
]
if not granularities_in_common:
raise storage.MetricUnaggregatable(metrics, 'No granularity match')
if not granularities_in_common:
raise storage.MetricUnaggregatable(
metrics, 'No granularity match')
else:
granularities_in_common = [granularity]
tss = self._map_in_thread(self._get_measures_timeserie,
[(metric, aggregation, granularity)
[(metric, aggregation, g)
for metric in metrics
for granularity in granularities_in_common])
for g in granularities_in_common])
try:
return [(timestamp.replace(tzinfo=iso8601.iso8601.UTC), r, v)
for timestamp, r, v

View File

@ -0,0 +1,85 @@
fixtures:
- ConfigFixture
tests:
- name: create archive policy
desc: for later use
url: /v1/archive_policy
method: POST
request_headers:
content-type: application/json
x-roles: admin
data:
name: low
definition:
- granularity: 1 second
- granularity: 300 seconds
status: 201
- name: create metric 1
url: /v1/metric
request_headers:
content-type: application/json
method: post
data:
archive_policy_name: low
status: 201
- name: create metric 2
url: /v1/metric
request_headers:
content-type: application/json
method: post
data:
archive_policy_name: low
status: 201
- name: get metric list to push metric 1
url: /v1/metric
- name: push measurements to metric 1
url: /v1/metric/$RESPONSE['$[0].id']/measures
request_headers:
content-type: application/json
method: post
data:
- timestamp: "2015-03-06T14:33:57"
value: 43.1
- timestamp: "2015-03-06T14:34:12"
value: 12
status: 202
- name: get metric list to push metric 2
url: /v1/metric
- name: push measurements to metric 2
url: /v1/metric/$RESPONSE['$[1].id']/measures
request_headers:
content-type: application/json
method: post
data:
- timestamp: "2015-03-06T14:33:57"
value: 3.1
- timestamp: "2015-03-06T14:34:12"
value: 2
status: 202
- name: get metric list to get aggregates
url: /v1/metric
- name: get measure aggregates by granularity not float
url: /v1/aggregation/metric?metric=$RESPONSE['$[0].id']&metric=$RESPONSE['$[1].id']&granularity=foobar
status: 400
- name: get metric list to get aggregates 2
url: /v1/metric
- name: get measure aggregates by granularity
url: /v1/aggregation/metric?metric=$RESPONSE['$[0].id']&metric=$RESPONSE['$[1].id']&granularity=1
poll:
count: 10
delay: 1
response_json_paths:
$:
- ['2015-03-06T14:33:57+00:00', 1.0, 23.1]
- ['2015-03-06T14:34:12+00:00', 1.0, 7.0]

View File

@ -259,6 +259,26 @@ class TestStorageDriver(tests_base.TestCase):
[self.metric, metric2],
aggregation='last')
def test_get_cross_metric_measures_unknown_granularity(self):
metric2 = storage.Metric(uuid.uuid4(),
self.archive_policies['low'])
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
])
self.storage.add_measures(metric2, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
])
self.assertRaises(storage.GranularityDoesNotExist,
self.storage.get_cross_metric_measures,
[self.metric, metric2],
granularity=12345.456)
def test_add_and_get_cross_metric_measures_different_archives(self):
metric2 = storage.Metric(uuid.uuid4(),
self.archive_policies['no_granularity_match'])
@ -345,6 +365,16 @@ class TestStorageDriver(tests_base.TestCase):
(utils.datetime_utc(2014, 1, 1, 12, 0, 0), 300.0, 39.0),
], values)
values = self.storage.get_cross_metric_measures(
[self.metric, metric2],
from_timestamp='2014-01-01 12:00:00',
to_timestamp='2014-01-01 12:00:01',
granularity=300.0)
self.assertEqual([
(utils.datetime_utc(2014, 1, 1, 12, 0, 0), 300.0, 39.0),
], values)
def test_add_and_get_cross_metric_measures_with_holes(self):
metric2 = storage.Metric(uuid.uuid4(),
self.archive_policies['low'])