Meter Get_Meter_Byname implemented

Ceilometer API, Meter get_meter_byname implemented and tested.

Change-Id: I4f038213797c8be7eb8e66822941afda3921a7dd
This commit is contained in:
Xiao Tan 2016-04-28 15:06:25 +00:00
parent 7c0766d455
commit f3dada5dd2
5 changed files with 138 additions and 22 deletions

View File

@ -32,3 +32,7 @@ class V2API(object):
@resource_api.Restify('/v2.0/meters', method='post')
def post_meters(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/meters/{meter_name}', method='get')
def get_meter_byname(self, req, res, meter_name):
res.status = '501 Not Implemented'

View File

@ -58,6 +58,22 @@ class TestMeterDispatcher(base.BaseTestCase):
with mock.patch.object(requests, 'put', return_value=put_res):
self.dispatcher = meters.MeterDispatcher({})
self.response_str = """
{"aggregations":{"by_name":{"doc_count_error_upper_bound":0,
"sum_other_doc_count":0,"buckets":[{"key":"BABMGD","doc_count":300,
"by_dim":{"buckets":[{"key": "64e6ce08b3b8547b7c32e5cfa5b7d81f",
"doc_count":300,"meters":{"hits":{"hits":[{ "_type": "metrics",
"_id": "AVOziWmP6-pxt0dRmr7j", "_index": "data_20160401000000",
"_source":{"name":"BABMGD", "value": 4,
"timestamp": 1461337094000,
"dimensions_hash": "0afdb86f508962bb5d8af52df07ef35a",
"project_id": "35b17138-b364-4e6a-a131-8f3099c5be68",
"tenant_id": "bd9431c1-8d69-4ad3-803a-8d4a6b89fd36",
"user_agent": "openstack", "dimensions": null,
"user": "admin", "value_meta": null, "tenant": "admin",
"user_id": "efd87807-12d2-4b38-9c70-5f5c2ac427ff"}}]}}}]}}]}}}
"""
def test_initialization(self):
# test that the kafka connection uri should be 'fake' as it was passed
# in from configuration
@ -104,27 +120,12 @@ class TestMeterDispatcher(base.BaseTestCase):
req.get_param.side_effect = _side_effect
req_result = mock.Mock()
response_str = """
{"aggregations":{"by_name":{"doc_count_error_upper_bound":0,
"sum_other_doc_count":0,"buckets":[{"key":"BABMGD","doc_count":300,
"by_dim":{"buckets":[{"key": "64e6ce08b3b8547b7c32e5cfa5b7d81f",
"doc_count":300,"meters":{"hits":{"hits":[{ "_type": "metrics",
"_id": "AVOziWmP6-pxt0dRmr7j", "_index": "data_20160401000000",
"_source":{"name":"BABMGD", "value": 4,
"timestamp": 1461337094000,
"dimensions_hash": "0afdb86f508962bb5d8af52df07ef35a",
"project_id": "35b17138-b364-4e6a-a131-8f3099c5be68",
"tenant_id": "bd9431c1-8d69-4ad3-803a-8d4a6b89fd36",
"user_agent": "openstack", "dimensions": null,
"user": "admin", "value_meta": null, "tenant": "admin",
"user_id": "efd87807-12d2-4b38-9c70-5f5c2ac427ff"}}]}}}]}}]}}}
"""
req_result.json.return_value = json.loads(response_str)
req_result.json.return_value = json.loads(self.response_str)
req_result.status_code = 200
with mock.patch.object(requests, 'post', return_value=req_result):
self.dispatcher.get_meter(req, res)
self.dispatcher.get_meters(req, res)
# test that the response code is 200
self.assertEqual(res.status, getattr(falcon, 'HTTP_200'))
@ -145,3 +146,35 @@ class TestMeterDispatcher(base.BaseTestCase):
self.dispatcher.post_meters(mock.Mock(), res)
self.assertEqual(getattr(falcon, 'HTTP_204'), res.status)
def test_get_meter_byname(self):
res = mock.Mock()
req = mock.Mock()
def _side_effect(arg):
if arg == 'name':
return 'tongli'
elif arg == 'dimensions':
return 'key1:100, key2:200'
req.get_param.side_effect = _side_effect
req_result = mock.Mock()
req_result.json.return_value = json.loads(self.response_str)
req_result.status_code = 200
with mock.patch.object(requests, 'post', return_value=req_result):
self.dispatcher.get_meter_byname(req, res, "BABMGD")
# test that the response code is 200
self.assertEqual(res.status, getattr(falcon, 'HTTP_200'))
obj = json.loads(res.body)
self.assertEqual(obj[0]['counter_name'], 'BABMGD')
self.assertEqual(obj[0]['counter_type'], 'metrics')
self.assertEqual(obj[0]['user_id'],
'efd87807-12d2-4b38-9c70-5f5c2ac427ff')
self.assertEqual(obj[0]['project_id'],
'35b17138-b364-4e6a-a131-8f3099c5be68')
self.assertEqual(obj[0]['counter_volume'], 4)
self.assertEqual(obj[0]['timestamp'], 1461337094000)
self.assertEqual(len(obj), 1)

View File

@ -108,6 +108,13 @@ class MeterDispatcher(object):
["dimensions_hash","timestamp","value"]},"size":1}}}}}}}
"""
self._oldsample_agg = """
{"by_name":{"terms":{"field":"name","size":%(size)d},
"aggs":{"by_dim":{"terms":{"field":"dimensions_hash","size":%(size)d},
"aggs":{"meters":{"top_hits":{"_source":{"exclude":
["dimensions_hash"]},"size":1}}}}}}}
"""
self.setup_index_template()
def setup_index_template(self):
@ -127,9 +134,9 @@ class MeterDispatcher(object):
def post_data(self, req, res):
msg = ""
LOG.debug('@$Post Message is %s' % msg)
LOG.debug('Getting the call.')
msg = req.stream.read()
LOG.debug('@$Post Message is %s' % msg)
code = self._kafka_conn.send_messages(msg)
res.status = getattr(falcon, 'HTTP_' + str(code))
@ -144,7 +151,7 @@ class MeterDispatcher(object):
return None
@resource_api.Restify('/v2.0/meters', method='get')
def get_meter(self, req, res):
def get_meters(self, req, res):
LOG.debug('The meters GET request is received')
# process query condition
@ -203,6 +210,77 @@ class MeterDispatcher(object):
else:
res.body = ''
@resource_api.Restify('/v2.0/meters/', method='post')
@resource_api.Restify('/v2.0/meters', method='post')
def post_meters(self, req, res):
self.post_data(req, res)
@resource_api.Restify('/v2.0/meters/{meter_name}', method='get')
def get_meter_byname(self, req, res, meter_name):
LOG.debug('The meter %s sample GET request is received' % meter_name)
# process query condition
query = []
metrics.ParamUtil.common(req, query)
_meter_ag = self._oldsample_agg % {"size": self.size}
if query:
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
'"size":' + str(self.size) + ','
'"aggs":' + _meter_ag + '}')
else:
body = '{"aggs":' + _meter_ag + '}'
# modify the query url to filter out name
query_url = []
if meter_name:
query_url = self._query_url + '&q=name:' + meter_name
else:
query_url = self._query_url
LOG.debug('Request body:' + body)
LOG.debug('Request url:' + query_url)
es_res = requests.post(query_url, data=body)
res.status = getattr(falcon, 'HTTP_%s' % es_res.status_code)
LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code)
res_data = self._get_agg_response(es_res)
LOG.debug('@$Result data is %s\n' % res_data)
if res_data:
# convert the response into ceilometer meter OldSample format
aggs = res_data['by_name']['buckets']
flag = {'is_first': True}
def _render_hits(item):
_type = item['meters']['hits']['hits'][0]['_type']
_source = item['meters']['hits']['hits'][0]['_source']
rslt = ('{"counter_name":' + json.dumps(_source['name']) + ','
'"counter_type":' + json.dumps(_type) + ','
'"counter_unit":null,'
'"counter_volume":' +
json.dumps(_source['value']) + ','
'"message_id":null,'
'"project_id":' +
json.dumps(_source['project_id']) + ','
'"recorded_at":null,'
'"resource_id":' +
json.dumps(_source['tenant_id']) + ','
'"resource_metadata":null,'
'"source":' + json.dumps(_source['user_agent']) + ','
'"timestamp":' + json.dumps(_source['timestamp']) + ','
'"user_id":' + json.dumps(_source['user_id']) + '}')
if flag['is_first']:
flag['is_first'] = False
return rslt
else:
return ',' + rslt
def _make_body(buckets):
yield '['
for by_name in buckets:
if by_name['by_dim']:
for by_dim in by_name['by_dim']['buckets']:
yield _render_hits(by_dim)
yield ']'
res.body = ''.join(_make_body(aggs))
res.content_type = 'application/json;charset=utf-8'
else:
res.body = ''

View File

@ -231,6 +231,7 @@ class MetricDispatcher(object):
def post_data(self, req, res):
LOG.debug('Getting the call.')
msg = req.stream.read()
LOG.debug('@Post: %s' % msg)
code = self._kafka_conn.send_messages(msg)
res.status = getattr(falcon, 'HTTP_' + str(code))

View File

@ -49,14 +49,14 @@ kiloeyes.dispatcher =
versions = kiloeyes.v2.elasticsearch.versions:VersionDispatcher
alarmdefinitions = kiloeyes.v2.elasticsearch.alarmdefinitions:AlarmDefinitionDispatcher
notificationmethods = kiloeyes.v2.elasticsearch.notificationmethods:NotificationMethodDispatcher
alarms = kiloeyes.v2.elasticsearch.alarms:AlarmDispatcher
alarms = kiloeyes.v2.elasticsearch.alarms:AlarmDispatcher
meters = kiloeyes.v2.elasticsearch.meters:MeterDispatcher
kiloeyes.index.strategy =
timed = kiloeyes.microservice.timed_strategy:TimedStrategy
fixed = kiloeyes.microservice.fixed_strategy:FixedStrategy
kiloeyes.message.processor =
kiloeyes.message.processor =
metrics_msg_fixer = kiloeyes.microservice.metrics_fixer:MetricsFixer
notification_processor = kiloeyes.microservice.notification_processor:NotificationProcessor
threshold_processor = kiloeyes.microservice.threshold_processor:ThresholdProcessor