Meter Get_Meter_Byname implemented
Ceilometer API, Meter get_meter_byname implemented and tested. Change-Id: I4f038213797c8be7eb8e66822941afda3921a7dd
This commit is contained in:
parent
7c0766d455
commit
f3dada5dd2
|
@ -32,3 +32,7 @@ class V2API(object):
|
|||
@resource_api.Restify('/v2.0/meters', method='post')
|
||||
def post_meters(self, req, res):
|
||||
res.status = '501 Not Implemented'
|
||||
|
||||
@resource_api.Restify('/v2.0/meters/{meter_name}', method='get')
|
||||
def get_meter_byname(self, req, res, meter_name):
|
||||
res.status = '501 Not Implemented'
|
||||
|
|
|
@ -58,6 +58,22 @@ class TestMeterDispatcher(base.BaseTestCase):
|
|||
with mock.patch.object(requests, 'put', return_value=put_res):
|
||||
self.dispatcher = meters.MeterDispatcher({})
|
||||
|
||||
self.response_str = """
|
||||
{"aggregations":{"by_name":{"doc_count_error_upper_bound":0,
|
||||
"sum_other_doc_count":0,"buckets":[{"key":"BABMGD","doc_count":300,
|
||||
"by_dim":{"buckets":[{"key": "64e6ce08b3b8547b7c32e5cfa5b7d81f",
|
||||
"doc_count":300,"meters":{"hits":{"hits":[{ "_type": "metrics",
|
||||
"_id": "AVOziWmP6-pxt0dRmr7j", "_index": "data_20160401000000",
|
||||
"_source":{"name":"BABMGD", "value": 4,
|
||||
"timestamp": 1461337094000,
|
||||
"dimensions_hash": "0afdb86f508962bb5d8af52df07ef35a",
|
||||
"project_id": "35b17138-b364-4e6a-a131-8f3099c5be68",
|
||||
"tenant_id": "bd9431c1-8d69-4ad3-803a-8d4a6b89fd36",
|
||||
"user_agent": "openstack", "dimensions": null,
|
||||
"user": "admin", "value_meta": null, "tenant": "admin",
|
||||
"user_id": "efd87807-12d2-4b38-9c70-5f5c2ac427ff"}}]}}}]}}]}}}
|
||||
"""
|
||||
|
||||
def test_initialization(self):
|
||||
# test that the kafka connection uri should be 'fake' as it was passed
|
||||
# in from configuration
|
||||
|
@ -104,27 +120,12 @@ class TestMeterDispatcher(base.BaseTestCase):
|
|||
req.get_param.side_effect = _side_effect
|
||||
|
||||
req_result = mock.Mock()
|
||||
response_str = """
|
||||
{"aggregations":{"by_name":{"doc_count_error_upper_bound":0,
|
||||
"sum_other_doc_count":0,"buckets":[{"key":"BABMGD","doc_count":300,
|
||||
"by_dim":{"buckets":[{"key": "64e6ce08b3b8547b7c32e5cfa5b7d81f",
|
||||
"doc_count":300,"meters":{"hits":{"hits":[{ "_type": "metrics",
|
||||
"_id": "AVOziWmP6-pxt0dRmr7j", "_index": "data_20160401000000",
|
||||
"_source":{"name":"BABMGD", "value": 4,
|
||||
"timestamp": 1461337094000,
|
||||
"dimensions_hash": "0afdb86f508962bb5d8af52df07ef35a",
|
||||
"project_id": "35b17138-b364-4e6a-a131-8f3099c5be68",
|
||||
"tenant_id": "bd9431c1-8d69-4ad3-803a-8d4a6b89fd36",
|
||||
"user_agent": "openstack", "dimensions": null,
|
||||
"user": "admin", "value_meta": null, "tenant": "admin",
|
||||
"user_id": "efd87807-12d2-4b38-9c70-5f5c2ac427ff"}}]}}}]}}]}}}
|
||||
"""
|
||||
|
||||
req_result.json.return_value = json.loads(response_str)
|
||||
req_result.json.return_value = json.loads(self.response_str)
|
||||
req_result.status_code = 200
|
||||
|
||||
with mock.patch.object(requests, 'post', return_value=req_result):
|
||||
self.dispatcher.get_meter(req, res)
|
||||
self.dispatcher.get_meters(req, res)
|
||||
|
||||
# test that the response code is 200
|
||||
self.assertEqual(res.status, getattr(falcon, 'HTTP_200'))
|
||||
|
@ -145,3 +146,35 @@ class TestMeterDispatcher(base.BaseTestCase):
|
|||
self.dispatcher.post_meters(mock.Mock(), res)
|
||||
|
||||
self.assertEqual(getattr(falcon, 'HTTP_204'), res.status)
|
||||
|
||||
def test_get_meter_byname(self):
|
||||
res = mock.Mock()
|
||||
req = mock.Mock()
|
||||
|
||||
def _side_effect(arg):
|
||||
if arg == 'name':
|
||||
return 'tongli'
|
||||
elif arg == 'dimensions':
|
||||
return 'key1:100, key2:200'
|
||||
req.get_param.side_effect = _side_effect
|
||||
|
||||
req_result = mock.Mock()
|
||||
|
||||
req_result.json.return_value = json.loads(self.response_str)
|
||||
req_result.status_code = 200
|
||||
|
||||
with mock.patch.object(requests, 'post', return_value=req_result):
|
||||
self.dispatcher.get_meter_byname(req, res, "BABMGD")
|
||||
|
||||
# test that the response code is 200
|
||||
self.assertEqual(res.status, getattr(falcon, 'HTTP_200'))
|
||||
obj = json.loads(res.body)
|
||||
self.assertEqual(obj[0]['counter_name'], 'BABMGD')
|
||||
self.assertEqual(obj[0]['counter_type'], 'metrics')
|
||||
self.assertEqual(obj[0]['user_id'],
|
||||
'efd87807-12d2-4b38-9c70-5f5c2ac427ff')
|
||||
self.assertEqual(obj[0]['project_id'],
|
||||
'35b17138-b364-4e6a-a131-8f3099c5be68')
|
||||
self.assertEqual(obj[0]['counter_volume'], 4)
|
||||
self.assertEqual(obj[0]['timestamp'], 1461337094000)
|
||||
self.assertEqual(len(obj), 1)
|
||||
|
|
|
@ -108,6 +108,13 @@ class MeterDispatcher(object):
|
|||
["dimensions_hash","timestamp","value"]},"size":1}}}}}}}
|
||||
"""
|
||||
|
||||
self._oldsample_agg = """
|
||||
{"by_name":{"terms":{"field":"name","size":%(size)d},
|
||||
"aggs":{"by_dim":{"terms":{"field":"dimensions_hash","size":%(size)d},
|
||||
"aggs":{"meters":{"top_hits":{"_source":{"exclude":
|
||||
["dimensions_hash"]},"size":1}}}}}}}
|
||||
"""
|
||||
|
||||
self.setup_index_template()
|
||||
|
||||
def setup_index_template(self):
|
||||
|
@ -127,9 +134,9 @@ class MeterDispatcher(object):
|
|||
|
||||
def post_data(self, req, res):
|
||||
msg = ""
|
||||
LOG.debug('@$Post Message is %s' % msg)
|
||||
LOG.debug('Getting the call.')
|
||||
msg = req.stream.read()
|
||||
LOG.debug('@$Post Message is %s' % msg)
|
||||
|
||||
code = self._kafka_conn.send_messages(msg)
|
||||
res.status = getattr(falcon, 'HTTP_' + str(code))
|
||||
|
@ -144,7 +151,7 @@ class MeterDispatcher(object):
|
|||
return None
|
||||
|
||||
@resource_api.Restify('/v2.0/meters', method='get')
|
||||
def get_meter(self, req, res):
|
||||
def get_meters(self, req, res):
|
||||
LOG.debug('The meters GET request is received')
|
||||
|
||||
# process query condition
|
||||
|
@ -203,6 +210,77 @@ class MeterDispatcher(object):
|
|||
else:
|
||||
res.body = ''
|
||||
|
||||
@resource_api.Restify('/v2.0/meters/', method='post')
|
||||
@resource_api.Restify('/v2.0/meters', method='post')
|
||||
def post_meters(self, req, res):
|
||||
self.post_data(req, res)
|
||||
|
||||
@resource_api.Restify('/v2.0/meters/{meter_name}', method='get')
|
||||
def get_meter_byname(self, req, res, meter_name):
|
||||
LOG.debug('The meter %s sample GET request is received' % meter_name)
|
||||
|
||||
# process query condition
|
||||
query = []
|
||||
metrics.ParamUtil.common(req, query)
|
||||
_meter_ag = self._oldsample_agg % {"size": self.size}
|
||||
if query:
|
||||
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
|
||||
'"size":' + str(self.size) + ','
|
||||
'"aggs":' + _meter_ag + '}')
|
||||
else:
|
||||
body = '{"aggs":' + _meter_ag + '}'
|
||||
|
||||
# modify the query url to filter out name
|
||||
query_url = []
|
||||
if meter_name:
|
||||
query_url = self._query_url + '&q=name:' + meter_name
|
||||
else:
|
||||
query_url = self._query_url
|
||||
LOG.debug('Request body:' + body)
|
||||
LOG.debug('Request url:' + query_url)
|
||||
es_res = requests.post(query_url, data=body)
|
||||
res.status = getattr(falcon, 'HTTP_%s' % es_res.status_code)
|
||||
|
||||
LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code)
|
||||
res_data = self._get_agg_response(es_res)
|
||||
LOG.debug('@$Result data is %s\n' % res_data)
|
||||
if res_data:
|
||||
# convert the response into ceilometer meter OldSample format
|
||||
aggs = res_data['by_name']['buckets']
|
||||
flag = {'is_first': True}
|
||||
|
||||
def _render_hits(item):
|
||||
_type = item['meters']['hits']['hits'][0]['_type']
|
||||
_source = item['meters']['hits']['hits'][0]['_source']
|
||||
rslt = ('{"counter_name":' + json.dumps(_source['name']) + ','
|
||||
'"counter_type":' + json.dumps(_type) + ','
|
||||
'"counter_unit":null,'
|
||||
'"counter_volume":' +
|
||||
json.dumps(_source['value']) + ','
|
||||
'"message_id":null,'
|
||||
'"project_id":' +
|
||||
json.dumps(_source['project_id']) + ','
|
||||
'"recorded_at":null,'
|
||||
'"resource_id":' +
|
||||
json.dumps(_source['tenant_id']) + ','
|
||||
'"resource_metadata":null,'
|
||||
'"source":' + json.dumps(_source['user_agent']) + ','
|
||||
'"timestamp":' + json.dumps(_source['timestamp']) + ','
|
||||
'"user_id":' + json.dumps(_source['user_id']) + '}')
|
||||
if flag['is_first']:
|
||||
flag['is_first'] = False
|
||||
return rslt
|
||||
else:
|
||||
return ',' + rslt
|
||||
|
||||
def _make_body(buckets):
|
||||
yield '['
|
||||
for by_name in buckets:
|
||||
if by_name['by_dim']:
|
||||
for by_dim in by_name['by_dim']['buckets']:
|
||||
yield _render_hits(by_dim)
|
||||
yield ']'
|
||||
|
||||
res.body = ''.join(_make_body(aggs))
|
||||
res.content_type = 'application/json;charset=utf-8'
|
||||
else:
|
||||
res.body = ''
|
||||
|
|
|
@ -231,6 +231,7 @@ class MetricDispatcher(object):
|
|||
def post_data(self, req, res):
|
||||
LOG.debug('Getting the call.')
|
||||
msg = req.stream.read()
|
||||
LOG.debug('@Post: %s' % msg)
|
||||
|
||||
code = self._kafka_conn.send_messages(msg)
|
||||
res.status = getattr(falcon, 'HTTP_' + str(code))
|
||||
|
|
|
@ -49,14 +49,14 @@ kiloeyes.dispatcher =
|
|||
versions = kiloeyes.v2.elasticsearch.versions:VersionDispatcher
|
||||
alarmdefinitions = kiloeyes.v2.elasticsearch.alarmdefinitions:AlarmDefinitionDispatcher
|
||||
notificationmethods = kiloeyes.v2.elasticsearch.notificationmethods:NotificationMethodDispatcher
|
||||
alarms = kiloeyes.v2.elasticsearch.alarms:AlarmDispatcher
|
||||
alarms = kiloeyes.v2.elasticsearch.alarms:AlarmDispatcher
|
||||
meters = kiloeyes.v2.elasticsearch.meters:MeterDispatcher
|
||||
|
||||
kiloeyes.index.strategy =
|
||||
timed = kiloeyes.microservice.timed_strategy:TimedStrategy
|
||||
fixed = kiloeyes.microservice.fixed_strategy:FixedStrategy
|
||||
|
||||
kiloeyes.message.processor =
|
||||
kiloeyes.message.processor =
|
||||
metrics_msg_fixer = kiloeyes.microservice.metrics_fixer:MetricsFixer
|
||||
notification_processor = kiloeyes.microservice.notification_processor:NotificationProcessor
|
||||
threshold_processor = kiloeyes.microservice.threshold_processor:ThresholdProcessor
|
||||
|
|
Loading…
Reference in New Issue