storage,rest: add support for granularity in aggregation
This allows to provide the storage driver a granularity argument when doing cross-metric aggregation. Change-Id: I33d48e66c39f7cf02f5fa26697994750b5215513 Closes-Bug: #1512740
This commit is contained in:
parent
57ac881e06
commit
9afee322b3
|
@ -357,13 +357,15 @@ class AggregatedMetricController(rest.RestController):
|
|||
|
||||
@pecan.expose('json')
|
||||
def get_measures(self, start=None, stop=None, aggregation='mean',
|
||||
needed_overlap=100.0):
|
||||
granularity=None, needed_overlap=100.0):
|
||||
return self.get_cross_metric_measures_from_ids(
|
||||
self.metric_ids, start, stop, aggregation, needed_overlap)
|
||||
self.metric_ids, start, stop,
|
||||
aggregation, granularity, needed_overlap)
|
||||
|
||||
@classmethod
|
||||
def get_cross_metric_measures_from_ids(cls, metric_ids, start=None,
|
||||
stop=None, aggregation='mean',
|
||||
granularity=None,
|
||||
needed_overlap=100.0):
|
||||
# Check RBAC policy
|
||||
metrics = pecan.request.indexer.get_metrics(metric_ids)
|
||||
|
@ -374,11 +376,12 @@ class AggregatedMetricController(rest.RestController):
|
|||
abort(404, storage.MetricDoesNotExist(
|
||||
missing_metric_ids.pop()))
|
||||
return cls.get_cross_metric_measures_from_objs(
|
||||
metrics, start, stop, aggregation, needed_overlap)
|
||||
metrics, start, stop, aggregation, granularity, needed_overlap)
|
||||
|
||||
@staticmethod
|
||||
def get_cross_metric_measures_from_objs(metrics, start=None, stop=None,
|
||||
aggregation='mean',
|
||||
granularity=None,
|
||||
needed_overlap=100.0):
|
||||
try:
|
||||
needed_overlap = float(needed_overlap)
|
||||
|
@ -400,14 +403,22 @@ class AggregatedMetricController(rest.RestController):
|
|||
try:
|
||||
if number_of_metrics == 0:
|
||||
return []
|
||||
if granularity is not None:
|
||||
try:
|
||||
granularity = float(granularity)
|
||||
except ValueError as e:
|
||||
abort(400, "granularity must be a float: %s" % e)
|
||||
if number_of_metrics == 1:
|
||||
# NOTE(sileht): don't do the aggregation if we only have one
|
||||
# metric
|
||||
measures = pecan.request.storage.get_measures(
|
||||
metrics[0], start, stop, aggregation)
|
||||
metrics[0], start, stop, aggregation,
|
||||
granularity)
|
||||
else:
|
||||
measures = pecan.request.storage.get_cross_metric_measures(
|
||||
metrics, start, stop, aggregation, needed_overlap)
|
||||
metrics, start, stop, aggregation,
|
||||
granularity,
|
||||
needed_overlap)
|
||||
# Replace timestamp keys by their string versions
|
||||
return [(timestamp.isoformat(), offset, v)
|
||||
for timestamp, offset, v in measures]
|
||||
|
@ -1188,7 +1199,7 @@ class AggregationResource(rest.RestController):
|
|||
|
||||
@pecan.expose('json')
|
||||
def post(self, start=None, stop=None, aggregation='mean',
|
||||
needed_overlap=100.0):
|
||||
granularity=None, needed_overlap=100.0):
|
||||
resources = SearchResourceTypeController(self.resource_type).post()
|
||||
metrics = []
|
||||
for r in resources:
|
||||
|
@ -1196,7 +1207,7 @@ class AggregationResource(rest.RestController):
|
|||
if m:
|
||||
metrics.append(m)
|
||||
return AggregatedMetricController.get_cross_metric_measures_from_objs(
|
||||
metrics, start, stop, aggregation, needed_overlap)
|
||||
metrics, start, stop, aggregation, granularity, needed_overlap)
|
||||
|
||||
|
||||
class Aggregation(rest.RestController):
|
||||
|
@ -1214,9 +1225,10 @@ class Aggregation(rest.RestController):
|
|||
@pecan.expose('json')
|
||||
def get_metric(self, metric=None, start=None,
|
||||
stop=None, aggregation='mean',
|
||||
needed_overlap=100.0):
|
||||
granularity=None, needed_overlap=100.0):
|
||||
return AggregatedMetricController.get_cross_metric_measures_from_ids(
|
||||
arg_to_list(metric), start, stop, aggregation, needed_overlap)
|
||||
arg_to_list(metric), start, stop, aggregation,
|
||||
granularity, needed_overlap)
|
||||
|
||||
|
||||
class CapabilityController(rest.RestController):
|
||||
|
|
|
@ -260,17 +260,23 @@ class StorageDriver(object):
|
|||
@staticmethod
|
||||
def get_cross_metric_measures(metrics, from_timestamp=None,
|
||||
to_timestamp=None, aggregation='mean',
|
||||
granularity=None,
|
||||
needed_overlap=None):
|
||||
"""Get aggregated measures of multiple entities.
|
||||
|
||||
:param entities: The entities measured to aggregate.
|
||||
:param from timestamp: The timestamp to get the measure from.
|
||||
:param to timestamp: The timestamp to get the measure to.
|
||||
:param granularity: The granularity to retrieve.
|
||||
:param aggregation: The type of aggregation to retrieve.
|
||||
"""
|
||||
for metric in metrics:
|
||||
if aggregation not in metric.archive_policy.aggregation_methods:
|
||||
raise AggregationDoesNotExist(metric, aggregation)
|
||||
if (granularity is not None and granularity
|
||||
not in set(d.granularity
|
||||
for d in metric.archive_policy.definition)):
|
||||
raise GranularityDoesNotExist(metric, granularity)
|
||||
|
||||
@staticmethod
|
||||
def search_value(metrics, query, from_timestamp=None,
|
||||
|
|
|
@ -303,31 +303,37 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
finally:
|
||||
lock.release()
|
||||
|
||||
# TODO(jd) Add granularity parameter here and in the REST API
|
||||
# rather than fetching all granularities
|
||||
def get_cross_metric_measures(self, metrics, from_timestamp=None,
|
||||
to_timestamp=None, aggregation='mean',
|
||||
granularity=None,
|
||||
needed_overlap=100.0):
|
||||
super(CarbonaraBasedStorage, self).get_cross_metric_measures(
|
||||
metrics, from_timestamp, to_timestamp, aggregation, needed_overlap)
|
||||
metrics, from_timestamp, to_timestamp,
|
||||
aggregation, granularity, needed_overlap)
|
||||
|
||||
granularities = (definition.granularity
|
||||
for metric in metrics
|
||||
for definition in metric.archive_policy.definition)
|
||||
granularities_in_common = [
|
||||
granularity
|
||||
for granularity, occurence in six.iteritems(
|
||||
collections.Counter(granularities))
|
||||
if occurence == len(metrics)
|
||||
]
|
||||
if granularity is None:
|
||||
granularities = (
|
||||
definition.granularity
|
||||
for metric in metrics
|
||||
for definition in metric.archive_policy.definition
|
||||
)
|
||||
granularities_in_common = [
|
||||
g
|
||||
for g, occurence in six.iteritems(
|
||||
collections.Counter(granularities))
|
||||
if occurence == len(metrics)
|
||||
]
|
||||
|
||||
if not granularities_in_common:
|
||||
raise storage.MetricUnaggregatable(metrics, 'No granularity match')
|
||||
if not granularities_in_common:
|
||||
raise storage.MetricUnaggregatable(
|
||||
metrics, 'No granularity match')
|
||||
else:
|
||||
granularities_in_common = [granularity]
|
||||
|
||||
tss = self._map_in_thread(self._get_measures_timeserie,
|
||||
[(metric, aggregation, granularity)
|
||||
[(metric, aggregation, g)
|
||||
for metric in metrics
|
||||
for granularity in granularities_in_common])
|
||||
for g in granularities_in_common])
|
||||
try:
|
||||
return [(timestamp.replace(tzinfo=iso8601.iso8601.UTC), r, v)
|
||||
for timestamp, r, v
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
fixtures:
|
||||
- ConfigFixture
|
||||
|
||||
tests:
|
||||
- name: create archive policy
|
||||
desc: for later use
|
||||
url: /v1/archive_policy
|
||||
method: POST
|
||||
request_headers:
|
||||
content-type: application/json
|
||||
x-roles: admin
|
||||
data:
|
||||
name: low
|
||||
definition:
|
||||
- granularity: 1 second
|
||||
- granularity: 300 seconds
|
||||
status: 201
|
||||
|
||||
- name: create metric 1
|
||||
url: /v1/metric
|
||||
request_headers:
|
||||
content-type: application/json
|
||||
method: post
|
||||
data:
|
||||
archive_policy_name: low
|
||||
status: 201
|
||||
|
||||
- name: create metric 2
|
||||
url: /v1/metric
|
||||
request_headers:
|
||||
content-type: application/json
|
||||
method: post
|
||||
data:
|
||||
archive_policy_name: low
|
||||
status: 201
|
||||
|
||||
- name: get metric list to push metric 1
|
||||
url: /v1/metric
|
||||
|
||||
- name: push measurements to metric 1
|
||||
url: /v1/metric/$RESPONSE['$[0].id']/measures
|
||||
request_headers:
|
||||
content-type: application/json
|
||||
method: post
|
||||
data:
|
||||
- timestamp: "2015-03-06T14:33:57"
|
||||
value: 43.1
|
||||
- timestamp: "2015-03-06T14:34:12"
|
||||
value: 12
|
||||
status: 202
|
||||
|
||||
- name: get metric list to push metric 2
|
||||
url: /v1/metric
|
||||
|
||||
- name: push measurements to metric 2
|
||||
url: /v1/metric/$RESPONSE['$[1].id']/measures
|
||||
request_headers:
|
||||
content-type: application/json
|
||||
method: post
|
||||
data:
|
||||
- timestamp: "2015-03-06T14:33:57"
|
||||
value: 3.1
|
||||
- timestamp: "2015-03-06T14:34:12"
|
||||
value: 2
|
||||
status: 202
|
||||
|
||||
- name: get metric list to get aggregates
|
||||
url: /v1/metric
|
||||
|
||||
- name: get measure aggregates by granularity not float
|
||||
url: /v1/aggregation/metric?metric=$RESPONSE['$[0].id']&metric=$RESPONSE['$[1].id']&granularity=foobar
|
||||
status: 400
|
||||
|
||||
- name: get metric list to get aggregates 2
|
||||
url: /v1/metric
|
||||
|
||||
- name: get measure aggregates by granularity
|
||||
url: /v1/aggregation/metric?metric=$RESPONSE['$[0].id']&metric=$RESPONSE['$[1].id']&granularity=1
|
||||
poll:
|
||||
count: 10
|
||||
delay: 1
|
||||
response_json_paths:
|
||||
$:
|
||||
- ['2015-03-06T14:33:57+00:00', 1.0, 23.1]
|
||||
- ['2015-03-06T14:34:12+00:00', 1.0, 7.0]
|
|
@ -259,6 +259,26 @@ class TestStorageDriver(tests_base.TestCase):
|
|||
[self.metric, metric2],
|
||||
aggregation='last')
|
||||
|
||||
def test_get_cross_metric_measures_unknown_granularity(self):
|
||||
metric2 = storage.Metric(uuid.uuid4(),
|
||||
self.archive_policies['low'])
|
||||
self.storage.add_measures(self.metric, [
|
||||
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
|
||||
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
|
||||
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
|
||||
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
|
||||
])
|
||||
self.storage.add_measures(metric2, [
|
||||
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
|
||||
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
|
||||
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
|
||||
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
|
||||
])
|
||||
self.assertRaises(storage.GranularityDoesNotExist,
|
||||
self.storage.get_cross_metric_measures,
|
||||
[self.metric, metric2],
|
||||
granularity=12345.456)
|
||||
|
||||
def test_add_and_get_cross_metric_measures_different_archives(self):
|
||||
metric2 = storage.Metric(uuid.uuid4(),
|
||||
self.archive_policies['no_granularity_match'])
|
||||
|
@ -345,6 +365,16 @@ class TestStorageDriver(tests_base.TestCase):
|
|||
(utils.datetime_utc(2014, 1, 1, 12, 0, 0), 300.0, 39.0),
|
||||
], values)
|
||||
|
||||
values = self.storage.get_cross_metric_measures(
|
||||
[self.metric, metric2],
|
||||
from_timestamp='2014-01-01 12:00:00',
|
||||
to_timestamp='2014-01-01 12:00:01',
|
||||
granularity=300.0)
|
||||
|
||||
self.assertEqual([
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 0), 300.0, 39.0),
|
||||
], values)
|
||||
|
||||
def test_add_and_get_cross_metric_measures_with_holes(self):
|
||||
metric2 = storage.Metric(uuid.uuid4(),
|
||||
self.archive_policies['low'])
|
||||
|
|
Loading…
Reference in New Issue