diff --git a/kiloeyes/api/ceilometer_api_v2.py b/kiloeyes/api/ceilometer_api_v2.py index f7ee45f..00100aa 100755 --- a/kiloeyes/api/ceilometer_api_v2.py +++ b/kiloeyes/api/ceilometer_api_v2.py @@ -32,3 +32,7 @@ class V2API(object): @resource_api.Restify('/v2.0/meters', method='post') def post_meters(self, req, res): res.status = '501 Not Implemented' + + @resource_api.Restify('/v2.0/meters/{meter_name}', method='get') + def get_meter_byname(self, req, res, meter_name): + res.status = '501 Not Implemented' diff --git a/kiloeyes/tests/v2/elasticsearch/test_meters.py b/kiloeyes/tests/v2/elasticsearch/test_meters.py index e1619c3..4f740f6 100755 --- a/kiloeyes/tests/v2/elasticsearch/test_meters.py +++ b/kiloeyes/tests/v2/elasticsearch/test_meters.py @@ -58,6 +58,22 @@ class TestMeterDispatcher(base.BaseTestCase): with mock.patch.object(requests, 'put', return_value=put_res): self.dispatcher = meters.MeterDispatcher({}) + self.response_str = """ + {"aggregations":{"by_name":{"doc_count_error_upper_bound":0, + "sum_other_doc_count":0,"buckets":[{"key":"BABMGD","doc_count":300, + "by_dim":{"buckets":[{"key": "64e6ce08b3b8547b7c32e5cfa5b7d81f", + "doc_count":300,"meters":{"hits":{"hits":[{ "_type": "metrics", + "_id": "AVOziWmP6-pxt0dRmr7j", "_index": "data_20160401000000", + "_source":{"name":"BABMGD", "value": 4, + "timestamp": 1461337094000, + "dimensions_hash": "0afdb86f508962bb5d8af52df07ef35a", + "project_id": "35b17138-b364-4e6a-a131-8f3099c5be68", + "tenant_id": "bd9431c1-8d69-4ad3-803a-8d4a6b89fd36", + "user_agent": "openstack", "dimensions": null, + "user": "admin", "value_meta": null, "tenant": "admin", + "user_id": "efd87807-12d2-4b38-9c70-5f5c2ac427ff"}}]}}}]}}]}}} + """ + def test_initialization(self): # test that the kafka connection uri should be 'fake' as it was passed # in from configuration @@ -104,27 +120,12 @@ class TestMeterDispatcher(base.BaseTestCase): req.get_param.side_effect = _side_effect req_result = mock.Mock() - response_str = """ - {"aggregations":{"by_name":{"doc_count_error_upper_bound":0, - "sum_other_doc_count":0,"buckets":[{"key":"BABMGD","doc_count":300, - "by_dim":{"buckets":[{"key": "64e6ce08b3b8547b7c32e5cfa5b7d81f", - "doc_count":300,"meters":{"hits":{"hits":[{ "_type": "metrics", - "_id": "AVOziWmP6-pxt0dRmr7j", "_index": "data_20160401000000", - "_source":{"name":"BABMGD", "value": 4, - "timestamp": 1461337094000, - "dimensions_hash": "0afdb86f508962bb5d8af52df07ef35a", - "project_id": "35b17138-b364-4e6a-a131-8f3099c5be68", - "tenant_id": "bd9431c1-8d69-4ad3-803a-8d4a6b89fd36", - "user_agent": "openstack", "dimensions": null, - "user": "admin", "value_meta": null, "tenant": "admin", - "user_id": "efd87807-12d2-4b38-9c70-5f5c2ac427ff"}}]}}}]}}]}}} - """ - req_result.json.return_value = json.loads(response_str) + req_result.json.return_value = json.loads(self.response_str) req_result.status_code = 200 with mock.patch.object(requests, 'post', return_value=req_result): - self.dispatcher.get_meter(req, res) + self.dispatcher.get_meters(req, res) # test that the response code is 200 self.assertEqual(res.status, getattr(falcon, 'HTTP_200')) @@ -145,3 +146,35 @@ class TestMeterDispatcher(base.BaseTestCase): self.dispatcher.post_meters(mock.Mock(), res) self.assertEqual(getattr(falcon, 'HTTP_204'), res.status) + + def test_get_meter_byname(self): + res = mock.Mock() + req = mock.Mock() + + def _side_effect(arg): + if arg == 'name': + return 'tongli' + elif arg == 'dimensions': + return 'key1:100, key2:200' + req.get_param.side_effect = _side_effect + + req_result = mock.Mock() + + req_result.json.return_value = json.loads(self.response_str) + req_result.status_code = 200 + + with mock.patch.object(requests, 'post', return_value=req_result): + self.dispatcher.get_meter_byname(req, res, "BABMGD") + + # test that the response code is 200 + self.assertEqual(res.status, getattr(falcon, 'HTTP_200')) + obj = json.loads(res.body) + self.assertEqual(obj[0]['counter_name'], 'BABMGD') + self.assertEqual(obj[0]['counter_type'], 'metrics') + self.assertEqual(obj[0]['user_id'], + 'efd87807-12d2-4b38-9c70-5f5c2ac427ff') + self.assertEqual(obj[0]['project_id'], + '35b17138-b364-4e6a-a131-8f3099c5be68') + self.assertEqual(obj[0]['counter_volume'], 4) + self.assertEqual(obj[0]['timestamp'], 1461337094000) + self.assertEqual(len(obj), 1) diff --git a/kiloeyes/v2/elasticsearch/meters.py b/kiloeyes/v2/elasticsearch/meters.py index 3c8ba26..e2e3085 100755 --- a/kiloeyes/v2/elasticsearch/meters.py +++ b/kiloeyes/v2/elasticsearch/meters.py @@ -108,6 +108,13 @@ class MeterDispatcher(object): ["dimensions_hash","timestamp","value"]},"size":1}}}}}}} """ + self._oldsample_agg = """ + {"by_name":{"terms":{"field":"name","size":%(size)d}, + "aggs":{"by_dim":{"terms":{"field":"dimensions_hash","size":%(size)d}, + "aggs":{"meters":{"top_hits":{"_source":{"exclude": + ["dimensions_hash"]},"size":1}}}}}}} + """ + self.setup_index_template() def setup_index_template(self): @@ -127,9 +134,9 @@ class MeterDispatcher(object): def post_data(self, req, res): msg = "" - LOG.debug('@$Post Message is %s' % msg) LOG.debug('Getting the call.') msg = req.stream.read() + LOG.debug('@$Post Message is %s' % msg) code = self._kafka_conn.send_messages(msg) res.status = getattr(falcon, 'HTTP_' + str(code)) @@ -144,7 +151,7 @@ class MeterDispatcher(object): return None @resource_api.Restify('/v2.0/meters', method='get') - def get_meter(self, req, res): + def get_meters(self, req, res): LOG.debug('The meters GET request is received') # process query condition @@ -203,6 +210,77 @@ class MeterDispatcher(object): else: res.body = '' - @resource_api.Restify('/v2.0/meters/', method='post') + @resource_api.Restify('/v2.0/meters', method='post') def post_meters(self, req, res): self.post_data(req, res) + + @resource_api.Restify('/v2.0/meters/{meter_name}', method='get') + def get_meter_byname(self, req, res, meter_name): + LOG.debug('The meter %s sample GET request is received' % meter_name) + + # process query condition + query = [] + metrics.ParamUtil.common(req, query) + _meter_ag = self._oldsample_agg % {"size": self.size} + if query: + body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},' + '"size":' + str(self.size) + ',' + '"aggs":' + _meter_ag + '}') + else: + body = '{"aggs":' + _meter_ag + '}' + + # modify the query url to filter out name + query_url = [] + if meter_name: + query_url = self._query_url + '&q=name:' + meter_name + else: + query_url = self._query_url + LOG.debug('Request body:' + body) + LOG.debug('Request url:' + query_url) + es_res = requests.post(query_url, data=body) + res.status = getattr(falcon, 'HTTP_%s' % es_res.status_code) + + LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code) + res_data = self._get_agg_response(es_res) + LOG.debug('@$Result data is %s\n' % res_data) + if res_data: + # convert the response into ceilometer meter OldSample format + aggs = res_data['by_name']['buckets'] + flag = {'is_first': True} + + def _render_hits(item): + _type = item['meters']['hits']['hits'][0]['_type'] + _source = item['meters']['hits']['hits'][0]['_source'] + rslt = ('{"counter_name":' + json.dumps(_source['name']) + ',' + '"counter_type":' + json.dumps(_type) + ',' + '"counter_unit":null,' + '"counter_volume":' + + json.dumps(_source['value']) + ',' + '"message_id":null,' + '"project_id":' + + json.dumps(_source['project_id']) + ',' + '"recorded_at":null,' + '"resource_id":' + + json.dumps(_source['tenant_id']) + ',' + '"resource_metadata":null,' + '"source":' + json.dumps(_source['user_agent']) + ',' + '"timestamp":' + json.dumps(_source['timestamp']) + ',' + '"user_id":' + json.dumps(_source['user_id']) + '}') + if flag['is_first']: + flag['is_first'] = False + return rslt + else: + return ',' + rslt + + def _make_body(buckets): + yield '[' + for by_name in buckets: + if by_name['by_dim']: + for by_dim in by_name['by_dim']['buckets']: + yield _render_hits(by_dim) + yield ']' + + res.body = ''.join(_make_body(aggs)) + res.content_type = 'application/json;charset=utf-8' + else: + res.body = '' diff --git a/kiloeyes/v2/elasticsearch/metrics.py b/kiloeyes/v2/elasticsearch/metrics.py index c50e42b..2992ef3 100755 --- a/kiloeyes/v2/elasticsearch/metrics.py +++ b/kiloeyes/v2/elasticsearch/metrics.py @@ -231,6 +231,7 @@ class MetricDispatcher(object): def post_data(self, req, res): LOG.debug('Getting the call.') msg = req.stream.read() + LOG.debug('@Post: %s' % msg) code = self._kafka_conn.send_messages(msg) res.status = getattr(falcon, 'HTTP_' + str(code)) diff --git a/setup.cfg b/setup.cfg index 47a88cb..db8f6e1 100755 --- a/setup.cfg +++ b/setup.cfg @@ -49,14 +49,14 @@ kiloeyes.dispatcher = versions = kiloeyes.v2.elasticsearch.versions:VersionDispatcher alarmdefinitions = kiloeyes.v2.elasticsearch.alarmdefinitions:AlarmDefinitionDispatcher notificationmethods = kiloeyes.v2.elasticsearch.notificationmethods:NotificationMethodDispatcher - alarms = kiloeyes.v2.elasticsearch.alarms:AlarmDispatcher + alarms = kiloeyes.v2.elasticsearch.alarms:AlarmDispatcher meters = kiloeyes.v2.elasticsearch.meters:MeterDispatcher kiloeyes.index.strategy = timed = kiloeyes.microservice.timed_strategy:TimedStrategy fixed = kiloeyes.microservice.fixed_strategy:FixedStrategy -kiloeyes.message.processor = +kiloeyes.message.processor = metrics_msg_fixer = kiloeyes.microservice.metrics_fixer:MetricsFixer notification_processor = kiloeyes.microservice.notification_processor:NotificationProcessor threshold_processor = kiloeyes.microservice.threshold_processor:ThresholdProcessor