Meter Get Statistics by name implemented

Ceilometer API: Get Meter Statistics by name implemented and
unit tested.

Change-Id: I9869daaccb8a8e656e6ae283708883a97818b7ec
This commit is contained in:
Xiao Tan 2016-04-30 02:06:19 +00:00
parent f3dada5dd2
commit 2e0a2a5e1a
5 changed files with 177 additions and 1 deletions

View File

@ -2,5 +2,5 @@ Andreas Jaeger <aj@suse.com>
Chang-Yi Lee <cy.l@inwinstack.com>
Jiaming Lin <robin890650@gmail.com>
Tong Li <litong01@us.ibm.com>
Xiao Tan <xt85@cornell.edu>
spzala <spzala@us.ibm.com>
xiaotan2 <xiaotan2@uw.edu>

View File

@ -1,6 +1,7 @@
CHANGES
=======
* Meter Get_Meter_Byname implemented
* Meters GET request implemented
* Added more instructions on how to configure keystone middleware
* move up one more dependencies

View File

@ -36,3 +36,7 @@ class V2API(object):
@resource_api.Restify('/v2.0/meters/{meter_name}', method='get')
def get_meter_byname(self, req, res, meter_name):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/meters/{meter_name}/statistics', method='get')
def get_meter_statistics(self, req, res, meter_name):
res.status = '501 Not Implemented'

View File

@ -19,6 +19,7 @@ from oslotest import base
import requests
from kiloeyes.common import kafka_conn
from kiloeyes.common import timeutils as tu
from kiloeyes.v2.elasticsearch import meters
try:
@ -178,3 +179,62 @@ class TestMeterDispatcher(base.BaseTestCase):
self.assertEqual(obj[0]['counter_volume'], 4)
self.assertEqual(obj[0]['timestamp'], 1461337094000)
self.assertEqual(len(obj), 1)
def test_do_get_statistics(self):
res = mock.Mock()
req = mock.Mock()
def _side_effect(arg):
if arg == 'name':
return 'tongli'
elif arg == 'dimensions':
return 'key1:100, key2:200'
elif arg == 'start_time':
return '2014-01-01'
elif arg == 'end_time':
return None
elif arg == 'period':
return None
elif arg == 'statistics':
return 'avg, sum, max'
req.get_param.side_effect = _side_effect
req_result = mock.Mock()
response_str = """
{"took":2006,"timed_out":false,"_shards":{"total":5,"successful":5,
"failed":0},"hits":{"total":600,"max_score":0.0,"hits":[]},
"aggregations":{"by_name":{"doc_count_error_upper_bound":0,
"sum_other_doc_count":0,"buckets":[{"key":"BABMGD","doc_count":300,
"by_dim":{"doc_count_error_upper_bound":0,"sum_other_doc_count":0,
"buckets":[{"key":"64e6ce08b3b8547b7c32e5cfa5b7d81f","doc_count":300,
"periods":{"buckets":[{"key":1421700000,"doc_count":130,
"statistics":{"count":130,"min":0.0,"max":595.0274095324651,
"avg":91.83085293930924,"sum":11938.0108821102}},
{"key":1422000000,"doc_count":170,"statistics":{"count":170,
"min":0.0,"max":1623.511307756313,"avg":324.69434786459897,
"sum":55198.039136981824}}]},"dimension":{"hits":{"total":300,
"max_score":1.4142135,"hits":[{"_index":"data_20150121",
"_type":"metrics","_id":"AUsSNF5mTZaMxA7_wmFx","_score":1.4142135,
"_source":{"name":"BABMGD","dimensions":{"key2":"NVITDU",
"key1":"FUFMPY","key_43":"ROQBZM"}}}]}}}]}}]}}}
"""
req_result.json.return_value = json.loads(response_str)
req_result.status_code = 200
with mock.patch.object(requests, 'post', return_value=req_result):
self.dispatcher.get_meter_statistics(req, res, 'BABMGD')
# test that the response code is 200
self.assertEqual(res.status, getattr(falcon, 'HTTP_200'))
print(res.body)
obj = json.loads(res.body)
# there should be total of 2 objects
self.assertEqual(len(obj), 2)
self.assertEqual(obj[0]['avg'], 91.8308529393)
self.assertEqual(obj[1]['max'], 1623.51130776)
self.assertEqual(obj[1]['period'], 300)
self.assertEqual(obj[0]['duration_start'],
tu.iso8601_from_timestamp(1421700000))

View File

@ -23,6 +23,7 @@ from kiloeyes.common import es_conn
from kiloeyes.common import kafka_conn
from kiloeyes.common import namespace
from kiloeyes.common import resource_api
from kiloeyes.common import timeutils as tu
from kiloeyes.v2.elasticsearch import metrics
try:
@ -115,6 +116,16 @@ class MeterDispatcher(object):
["dimensions_hash"]},"size":1}}}}}}}
"""
self._meter_stats_agg = """
{"by_name":{"terms":{"field":"name","size":%(size)d},
"aggs":{"by_dim":{"terms":{"field":"dimensions_hash",
"size":%(size)d},"aggs":{"dimension":{"top_hits":{"_source":
{"exclude":["dimensions_hash","timestamp","value"]},"size":1}},
"periods":{"date_histogram":{"field":"timestamp",
"interval":"%(period)s"},"aggs":{"statistics":{"stats":
{"field":"value"}}}}}}}}}
"""
self.setup_index_template()
def setup_index_template(self):
@ -284,3 +295,103 @@ class MeterDispatcher(object):
res.content_type = 'application/json;charset=utf-8'
else:
res.body = ''
@resource_api.Restify('/v2.0/meters/{meter_name}/statistics', method='get')
def get_meter_statistics(self, req, res, meter_name):
LOG.debug('The meter %s statistics GET request is received' %
meter_name)
# process query conditions
query = []
metrics.ParamUtil.common(req, query)
period = metrics.ParamUtil.period(req)
_stats_ag = (self._meter_stats_agg %
{"size": self.size, "period": period})
if query:
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
'"size":' + str(self.size) + ','
'"aggs":' + _stats_ag + '}')
else:
body = '{"aggs":' + _stats_ag + '}'
# modify the query url to filter out name
query_url = []
if meter_name:
query_url = self._query_url + '&q=name:' + meter_name
else:
query_url = self._query_url
es_res = requests.post(query_url, data=body)
res.status = getattr(falcon, 'HTTP_%s' % es_res.status_code)
LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code)
res_data = self._get_agg_response(es_res)
if res_data:
# convert the response into Ceilometer Statistics format
aggs = res_data['by_name']['buckets']
LOG.debug('@$Stats: %s' % json.dumps(aggs))
def _render_stats(dim):
is_first = True
oldest_time = []
previous_time = []
for item in dim['periods']['buckets']:
current_time = item['key']
# calculte period and duration difference
if is_first:
period_diff = 'null'
oldest_time = current_time
duration_diff = 'null'
previous_time = current_time
else:
period_diff = (current_time - previous_time) / 1000
duration_diff = (current_time - oldest_time) / 1000
# parses the statistics data
_max = str(item['statistics']['max'])
_min = str(item['statistics']['min'])
_sum = str(item['statistics']['sum'])
_avg = str(item['statistics']['avg'])
_count = str(item['statistics']['count'])
curr_timestamp = tu.iso8601_from_timestamp(current_time)
prev_timestamp = tu.iso8601_from_timestamp(previous_time)
old_timestamp = tu.iso8601_from_timestamp(oldest_time)
rslt = ('{"avg":' + _avg + ','
'"count":' + _count + ','
'"duration":' + str(duration_diff) + ','
'"duration_end":' +
'"%s"' % str(curr_timestamp) + ','
'"duration_start":' +
'"%s"' % str(old_timestamp) + ','
'"max":' + _max + ','
'"min":' + _min + ','
'"period":' + str(period_diff) + ','
'"period_end":' +
'"%s"' % str(curr_timestamp) + ','
'"period_start":' +
'"%s"' % str(prev_timestamp) + ','
'"sum":' + _sum + ','
'"unit":null}')
previous_time = current_time
if is_first:
yield rslt
is_first = False
else:
yield ',' + rslt
def _make_body(items):
is_first = True
yield '['
for metric in items:
for dim in metric['by_dim']['buckets']:
if is_first:
is_first = False
else:
yield ','
for result in _render_stats(dim):
yield result
yield ']'
res.body = ''.join(_make_body(aggs))
res.content_type = 'application/json;charset=utf-8'
else:
res.body = 'o'