Implements groupby for statistics API
Queries for metrics to obtain all dimensions before quering for statistics with merge_metrics as False, which returns response with dimensions. The statistics response with dimensions is used to perform in-memory computations based on the groupby dimension. Only a single groupby is supported. The current implementation is memory-intensive and can be improved further to constrain memory usage. This patch is an initial step in that direction. Change-Id: I98ad72337ed50b2f6703afc2e7c4e4689590b2c7
This commit is contained in:
parent
9c2c1e878e
commit
b704e3f9c7
|
@ -348,18 +348,29 @@ class Connection(base.Connection):
|
|||
yield curr, next
|
||||
curr = next
|
||||
|
||||
def get_next_task_args(self, sample_filter, delta, **kwargs):
|
||||
def get_next_task_args(self, start_timestamp=None, end_timestamp=None,
|
||||
delta=None, **kwargs):
|
||||
|
||||
# Yields next set of measurement related args
|
||||
metrics = self.mc.metrics_list(**kwargs)
|
||||
for start, end in self.get_next_time_delta(
|
||||
sample_filter.start_timestamp,
|
||||
sample_filter.end_timestamp,
|
||||
delta):
|
||||
has_ts = start_timestamp and end_timestamp and delta
|
||||
if has_ts:
|
||||
for start, end in self.get_next_time_delta(
|
||||
start_timestamp,
|
||||
end_timestamp,
|
||||
delta):
|
||||
for metric in metrics:
|
||||
task = {'metric': metric['name'],
|
||||
'dimension': metric['dimensions'],
|
||||
'start_ts': start,
|
||||
'end_ts': end}
|
||||
LOG.debug(_('next task is : %s'), task)
|
||||
yield task
|
||||
else:
|
||||
for metric in metrics:
|
||||
task = {'metric': metric['name'],
|
||||
'dimension': metric['dimensions'],
|
||||
'start_ts': start,
|
||||
'end_ts': end}
|
||||
'dimension': metric['dimensions']
|
||||
}
|
||||
LOG.debug(_('next task is : %s'), task)
|
||||
yield task
|
||||
|
||||
|
@ -477,7 +488,8 @@ class Connection(base.Connection):
|
|||
result_count = 0
|
||||
|
||||
for task_cnt, task in enumerate(self.get_next_task_args(
|
||||
sample_filter, delta, **_metric_args)):
|
||||
sample_filter.start_timestamp, sample_filter.end_timestamp,
|
||||
delta, **_metric_args)):
|
||||
# Spawn query_concurrency_limit number of green threads
|
||||
# simultaneously to fetch measurements
|
||||
thread_pool.add_thread(self.get_measurements,
|
||||
|
@ -531,8 +543,17 @@ class Connection(base.Connection):
|
|||
raise ceilometer.NotImplementedError('Query without filter '
|
||||
'not implemented')
|
||||
|
||||
allowed_groupby = ['user_id', 'project_id', 'resource_id', 'source']
|
||||
|
||||
if groupby:
|
||||
raise ceilometer.NotImplementedError('Groupby not implemented')
|
||||
if len(groupby) > 1:
|
||||
raise ceilometer.NotImplementedError('Only one groupby '
|
||||
'supported')
|
||||
|
||||
groupby = groupby[0]
|
||||
if groupby not in allowed_groupby:
|
||||
raise ceilometer.NotImplementedError('Groupby %s not'
|
||||
' implemented' % groupby)
|
||||
|
||||
if filter.metaquery:
|
||||
raise ceilometer.NotImplementedError('Metaquery not implemented')
|
||||
|
@ -550,7 +571,6 @@ class Connection(base.Connection):
|
|||
raise ceilometer.NotImplementedError(('End time op %s '
|
||||
'not implemented') %
|
||||
filter.end_timestamp_op)
|
||||
|
||||
if not filter.start_timestamp:
|
||||
filter.start_timestamp = timeutils.isotime(
|
||||
datetime.datetime(1970, 1, 1))
|
||||
|
@ -585,44 +605,167 @@ class Connection(base.Connection):
|
|||
period = period if period \
|
||||
else cfg.CONF.monasca.default_stats_period
|
||||
|
||||
_search_args = dict(
|
||||
name=filter.meter,
|
||||
dimensions=dims_filter,
|
||||
start_time=filter.start_timestamp,
|
||||
end_time=filter.end_timestamp,
|
||||
period=period,
|
||||
statistics=','.join(statistics),
|
||||
merge_metrics=True)
|
||||
if groupby:
|
||||
_metric_args = dict(name=filter.meter,
|
||||
dimensions=dims_filter)
|
||||
group_stats_list = []
|
||||
|
||||
_search_args = {k: v for k, v in _search_args.items()
|
||||
if v is not None}
|
||||
for task_cnt, task in enumerate(
|
||||
self.get_next_task_args(**_metric_args)):
|
||||
|
||||
_search_args = dict(
|
||||
name=task['metric'],
|
||||
dimensions=task['dimension'],
|
||||
start_time=filter.start_timestamp,
|
||||
end_time=filter.end_timestamp,
|
||||
period=period,
|
||||
statistics=','.join(statistics),
|
||||
merge_metrics=False)
|
||||
|
||||
_search_args = {k: v for k, v in _search_args.items()
|
||||
if v is not None}
|
||||
stats_list = self.mc.statistics_list(**_search_args)
|
||||
group_stats_list.extend(stats_list)
|
||||
|
||||
group_stats_dict = {}
|
||||
|
||||
for stats in group_stats_list:
|
||||
groupby_val = stats['dimensions'].get(groupby)
|
||||
stats_list = group_stats_dict.get(groupby_val)
|
||||
if stats_list:
|
||||
stats_list.append(stats)
|
||||
else:
|
||||
group_stats_dict[groupby_val] = [stats]
|
||||
|
||||
def get_max(items):
|
||||
return max(items)
|
||||
|
||||
def get_min(items):
|
||||
return min(items)
|
||||
|
||||
def get_avg(items):
|
||||
return sum(items)/len(items)
|
||||
|
||||
def get_sum(items):
|
||||
return sum(items)
|
||||
|
||||
def get_count(items):
|
||||
count = 0
|
||||
for item in items:
|
||||
count = count + item
|
||||
return count
|
||||
|
||||
for group_key, stats_group in group_stats_dict.iteritems():
|
||||
max_list = []
|
||||
min_list = []
|
||||
avg_list = []
|
||||
sum_list = []
|
||||
count_list = []
|
||||
ts_list = []
|
||||
group_statistics = {}
|
||||
for stats in stats_group:
|
||||
for s in stats['statistics']:
|
||||
stats_dict = self._convert_to_dict(s, stats['columns'])
|
||||
|
||||
if 'max' in stats['columns']:
|
||||
max_list.append(stats_dict['max'])
|
||||
if 'min' in stats['columns']:
|
||||
min_list.append(stats_dict['min'])
|
||||
if 'avg' in stats['columns']:
|
||||
avg_list.append(stats_dict['avg'])
|
||||
if 'sum' in stats['columns']:
|
||||
sum_list.append(stats_dict['sum'])
|
||||
if 'count' in stats['columns']:
|
||||
count_list.append(stats_dict['count'])
|
||||
|
||||
ts_list.append(stats_dict['timestamp'])
|
||||
|
||||
group_statistics['unit'] = (stats['dimensions'].
|
||||
get('unit'))
|
||||
|
||||
if len(max_list):
|
||||
group_statistics['max'] = get_max(max_list)
|
||||
if len(min_list):
|
||||
group_statistics['min'] = get_min(min_list)
|
||||
if len(avg_list):
|
||||
group_statistics['avg'] = get_avg(avg_list)
|
||||
if len(sum_list):
|
||||
group_statistics['sum'] = get_sum(sum_list)
|
||||
if len(count_list):
|
||||
group_statistics['count'] = get_count(count_list)
|
||||
|
||||
group_statistics['end_timestamp'] = get_max(ts_list)
|
||||
group_statistics['timestamp'] = get_min(ts_list)
|
||||
|
||||
stats_list = self.mc.statistics_list(**_search_args)
|
||||
for stats in stats_list:
|
||||
for s in stats['statistics']:
|
||||
stats_dict = self._convert_to_dict(s, stats['columns'])
|
||||
ts_start = timeutils.parse_isotime(
|
||||
stats_dict['timestamp']).replace(tzinfo=None)
|
||||
ts_end = (ts_start + datetime.timedelta(
|
||||
0, period)).replace(tzinfo=None)
|
||||
del stats_dict['timestamp']
|
||||
if 'count' in stats_dict:
|
||||
stats_dict['count'] = int(stats_dict['count'])
|
||||
if aggregate:
|
||||
stats_dict['aggregate'] = {}
|
||||
for a in aggregate:
|
||||
key = '%s%s' % (a.func,
|
||||
'/%s' % a.param if a.param else '')
|
||||
stats_dict['aggregate'][key] = stats_dict.get(key)
|
||||
group_statistics['timestamp']).replace(tzinfo=None)
|
||||
|
||||
ts_end = timeutils.parse_isotime(
|
||||
group_statistics['end_timestamp']).replace(tzinfo=None)
|
||||
|
||||
del group_statistics['end_timestamp']
|
||||
|
||||
if 'count' in group_statistics:
|
||||
group_statistics['count'] = int(group_statistics['count'])
|
||||
unit = group_statistics['unit']
|
||||
del group_statistics['unit']
|
||||
if aggregate:
|
||||
group_statistics['aggregate'] = {}
|
||||
for a in aggregate:
|
||||
key = '%s%s' % (a.func, '/%s' % a.param if a.param
|
||||
else '')
|
||||
group_statistics['aggregate'][key] = (
|
||||
group_statistics.get(key))
|
||||
yield api_models.Statistics(
|
||||
unit=stats['dimensions'].get('unit'),
|
||||
unit=unit,
|
||||
period=period,
|
||||
period_start=ts_start,
|
||||
period_end=ts_end,
|
||||
duration=period,
|
||||
duration_start=ts_start,
|
||||
duration_end=ts_end,
|
||||
groupby={u'': u''},
|
||||
**stats_dict
|
||||
groupby={groupby: group_key},
|
||||
**group_statistics
|
||||
)
|
||||
else:
|
||||
_search_args = dict(
|
||||
name=filter.meter,
|
||||
dimensions=dims_filter,
|
||||
start_time=filter.start_timestamp,
|
||||
end_time=filter.end_timestamp,
|
||||
period=period,
|
||||
statistics=','.join(statistics),
|
||||
merge_metrics=True)
|
||||
|
||||
_search_args = {k: v for k, v in _search_args.items()
|
||||
if v is not None}
|
||||
stats_list = self.mc.statistics_list(**_search_args)
|
||||
for stats in stats_list:
|
||||
for s in stats['statistics']:
|
||||
stats_dict = self._convert_to_dict(s, stats['columns'])
|
||||
ts_start = timeutils.parse_isotime(
|
||||
stats_dict['timestamp']).replace(tzinfo=None)
|
||||
ts_end = (ts_start + datetime.timedelta(
|
||||
0, period)).replace(tzinfo=None)
|
||||
del stats_dict['timestamp']
|
||||
if 'count' in stats_dict:
|
||||
stats_dict['count'] = int(stats_dict['count'])
|
||||
|
||||
if aggregate:
|
||||
stats_dict['aggregate'] = {}
|
||||
for a in aggregate:
|
||||
key = '%s%s' % (a.func, '/%s' % a.param if a.param
|
||||
else '')
|
||||
stats_dict['aggregate'][key] = stats_dict.get(key)
|
||||
|
||||
yield api_models.Statistics(
|
||||
unit=stats['dimensions'].get('unit'),
|
||||
period=period,
|
||||
period_start=ts_start,
|
||||
period_end=ts_end,
|
||||
duration=period,
|
||||
duration_start=ts_start,
|
||||
duration_end=ts_end,
|
||||
groupby={u'': u''},
|
||||
**stats_dict
|
||||
)
|
||||
|
|
|
@ -517,12 +517,12 @@ class MeterStatisticsTest(base.BaseTestCase):
|
|||
conn.get_meter_statistics(sf)))
|
||||
|
||||
sf.meter = "image"
|
||||
self.assertRaisesWithMessage("Groupby not implemented",
|
||||
self.assertRaisesWithMessage("Groupby message_id not implemented",
|
||||
ceilometer.NotImplementedError,
|
||||
lambda: list(
|
||||
conn.get_meter_statistics(
|
||||
sf,
|
||||
groupby="resource_id")))
|
||||
groupby=['message_id'])))
|
||||
|
||||
sf.metaquery = "metaquery"
|
||||
self.assertRaisesWithMessage("Metaquery not implemented",
|
||||
|
@ -630,6 +630,76 @@ class MeterStatisticsTest(base.BaseTestCase):
|
|||
self.assertIsNotNone(stats[0].as_dict().get('aggregate'))
|
||||
self.assertEqual({u'min': 0.008}, stats[0].as_dict()['aggregate'])
|
||||
|
||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||
def test_stats_list_with_groupby(self, mock_mdf):
|
||||
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
|
||||
conn = impl_monasca.Connection("127.0.0.1:8080")
|
||||
ml_mock = mock_client().metrics_list
|
||||
ml_mock.return_value = [
|
||||
{
|
||||
'name': 'image',
|
||||
'dimensions': {'project_id': '1234'}
|
||||
},
|
||||
{
|
||||
'name': 'image',
|
||||
'dimensions': {'project_id': '5678'}
|
||||
}
|
||||
]
|
||||
|
||||
sl_mock = mock_client().statistics_list
|
||||
sl_mock.side_effect = [[
|
||||
{
|
||||
'statistics':
|
||||
[
|
||||
['2014-10-24T12:12:12Z', 0.008, 1.3, 3, 0.34],
|
||||
['2014-10-24T12:20:12Z', 0.078, 1.25, 2, 0.21],
|
||||
['2014-10-24T12:52:12Z', 0.018, 0.9, 4, 0.14]
|
||||
],
|
||||
'dimensions': {'project_id': '1234', 'unit': 'gb'},
|
||||
'columns': ['timestamp', 'min', 'max', 'count', 'avg']
|
||||
}],
|
||||
[{
|
||||
'statistics':
|
||||
[
|
||||
['2014-10-24T12:14:12Z', 0.45, 2.5, 2, 2.1],
|
||||
['2014-10-24T12:20:12Z', 0.58, 3.2, 3, 3.4],
|
||||
['2014-10-24T13:52:42Z', 1.67, 3.5, 1, 5.3]
|
||||
],
|
||||
'dimensions': {'project_id': '5678', 'unit': 'gb'},
|
||||
'columns': ['timestamp', 'min', 'max', 'count', 'avg']
|
||||
}]]
|
||||
|
||||
sf = storage.SampleFilter()
|
||||
sf.meter = "image"
|
||||
sf.start_timestamp = timeutils.parse_isotime(
|
||||
'2014-10-24T12:12:42').replace(tzinfo=None)
|
||||
groupby = ['project_id']
|
||||
stats = list(conn.get_meter_statistics(sf, period=30,
|
||||
groupby=groupby))
|
||||
|
||||
self.assertEqual(2, len(stats))
|
||||
|
||||
for stat in stats:
|
||||
self.assertIsNotNone(stat.groupby)
|
||||
project_id = stat.groupby.get('project_id')
|
||||
self.assertIn(project_id, ['1234', '5678'])
|
||||
if project_id == '1234':
|
||||
self.assertEqual(0.008, stat.min)
|
||||
self.assertEqual(1.3, stat.max)
|
||||
self.assertEqual(0.23, stat.avg)
|
||||
self.assertEqual(9, stat.count)
|
||||
self.assertEqual(30, stat.period)
|
||||
self.assertEqual('2014-10-24T12:12:12',
|
||||
stat.period_start.isoformat())
|
||||
if project_id == '5678':
|
||||
self.assertEqual(0.45, stat.min)
|
||||
self.assertEqual(3.5, stat.max)
|
||||
self.assertEqual(3.6, stat.avg)
|
||||
self.assertEqual(6, stat.count)
|
||||
self.assertEqual(30, stat.period)
|
||||
self.assertEqual('2014-10-24T13:52:42',
|
||||
stat.period_end.isoformat())
|
||||
|
||||
|
||||
class CapabilitiesTest(base.BaseTestCase):
|
||||
|
||||
|
|
Loading…
Reference in New Issue