diff --git a/AUTHORS b/AUTHORS index 7522628..acd9ddb 100755 --- a/AUTHORS +++ b/AUTHORS @@ -4,3 +4,4 @@ Jiaming Lin Tong Li Xiao Tan spzala +xiaotan2 diff --git a/ChangeLog b/ChangeLog index acfbdb0..63afe2a 100755 --- a/ChangeLog +++ b/ChangeLog @@ -1,6 +1,8 @@ CHANGES ======= +* Sample Get and Get-by-id implemented +* Meter Get Statistics by name implemented * Meter Get_Meter_Byname implemented * Meters GET request implemented * Added more instructions on how to configure keystone middleware diff --git a/etc/kiloeyes.conf b/etc/kiloeyes.conf index fddc175..453389f 100755 --- a/etc/kiloeyes.conf +++ b/etc/kiloeyes.conf @@ -12,6 +12,7 @@ dispatcher = alarmdefinitions dispatcher = notificationmethods dispatcher = alarms dispatcher = meters +dispatcher = samples [metrics] topic = metrics diff --git a/kiloeyes/api/ceilometer_api_v2.py b/kiloeyes/api/ceilometer_api_v2.py index 6bf9952..292ec4a 100755 --- a/kiloeyes/api/ceilometer_api_v2.py +++ b/kiloeyes/api/ceilometer_api_v2.py @@ -40,3 +40,11 @@ class V2API(object): @resource_api.Restify('/v2.0/meters/{meter_name}/statistics', method='get') def get_meter_statistics(self, req, res, meter_name): res.status = '501 Not Implemented' + + @resource_api.Restify('/v2.0/samples', method='get') + def get_samples(self, req, res): + res.status = '501 Not Implemented' + + @resource_api.Restify('/v2.0/samples/{sample_id}', method='get') + def get_sample_byid(self, req, res, sample_id): + res.status = '501 Not Implemented' diff --git a/kiloeyes/tests/v2/elasticsearch/test_samples.py b/kiloeyes/tests/v2/elasticsearch/test_samples.py new file mode 100755 index 0000000..5d4b662 --- /dev/null +++ b/kiloeyes/tests/v2/elasticsearch/test_samples.py @@ -0,0 +1,157 @@ +# Copyright 2013 IBM Corp +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import falcon +import mock +from oslo_config import fixture as fixture_config +from oslotest import base +import requests + +from kiloeyes.v2.elasticsearch import samples + +try: + import ujson as json +except ImportError: + import json + + +class TestCeilometerSampleDispatcher(base.BaseTestCase): + + def setUp(self): + super(TestCeilometerSampleDispatcher, self).setUp() + self.CONF = self.useFixture(fixture_config.Config()).conf + self.CONF.set_override('uri', 'fake_url', group='kafka_opts') + self.CONF.set_override('topic', 'fake', group='samples') + self.CONF.set_override('doc_type', 'fake', group='samples') + self.CONF.set_override('index_prefix', 'also_fake', group='samples') + self.CONF.set_override('index_template', 'etc/metrics.template', + group='samples') + self.CONF.set_override('uri', 'http://fake_es_uri', group='es_conn') + + res = mock.Mock() + res.status_code = 200 + res.json.return_value = {"data": {"mappings": {"fake": { + "properties": { + "dimensions": {"properties": { + "key1": {"type": "long"}, "key2": {"type": "long"}, + "rkey0": {"type": "long"}, "rkey1": {"type": "long"}, + "rkey2": {"type": "long"}, "rkey3": {"type": "long"}}}, + "name": {"type": "string", "index": "not_analyzed"}, + "timestamp": {"type": "string", "index": "not_analyzed"}, + "value": {"type": "double"}}}}}} + put_res = mock.Mock() + put_res.status_code = '200' + with mock.patch.object(requests, 'get', + return_value=res): + with mock.patch.object(requests, 'put', return_value=put_res): + self.dispatcher = samples.CeilometerSampleDispatcher({}) + + self.response_str = """ + {"aggregations":{"by_name":{"doc_count_error_upper_bound":0, + "sum_other_doc_count":0,"buckets":[{"key":"BABMGD","doc_count":300, + "by_dim":{"buckets":[{"key": "64e6ce08b3b8547b7c32e5cfa5b7d81f", + "doc_count":300,"samples":{"hits":{"hits":[{ "_type": "metrics", + "_id": "AVOziWmP6-pxt0dRmr7j", "_index": "data_20160401000000", + "_source":{"name":"BABMGD", "value": 4, + "timestamp": 1461337094000, + "dimensions_hash": "0afdb86f508962bb5d8af52df07ef35a", + "project_id": "35b17138-b364-4e6a-a131-8f3099c5be68", + "tenant_id": "bd9431c1-8d69-4ad3-803a-8d4a6b89fd36", + "user_agent": "openstack", "dimensions": null, + "user": "admin", "value_meta": null, "tenant": "admin", + "user_id": "efd87807-12d2-4b38-9c70-5f5c2ac427ff"}}]}}}]}}]}}} + """ + + def test_initialization(self): + # test that the kafka connection uri should be 'fake' as it was passed + # in from configuration + self.assertEqual(self.dispatcher._kafka_conn.uri, 'fake_url') + + # test that the topic is samples as it was passed into dispatcher + self.assertEqual(self.dispatcher._kafka_conn.topic, 'fake') + + # test that the doc type of the es connection is fake + self.assertEqual(self.dispatcher._es_conn.doc_type, 'fake') + + self.assertEqual(self.dispatcher._es_conn.uri, 'http://fake_es_uri/') + + # test that the query url is correctly formed + self.assertEqual(self.dispatcher._query_url, ( + 'http://fake_es_uri/also_fake*/fake/_search?search_type=count')) + + def test_get_samples(self): + res = mock.Mock() + req = mock.Mock() + + def _side_effect(arg): + if arg == 'name': + return 'tongli' + elif arg == 'dimensions': + return 'key1:100, key2:200' + req.get_param.side_effect = _side_effect + + req_result = mock.Mock() + + req_result.json.return_value = json.loads(self.response_str) + req_result.status_code = 200 + + with mock.patch.object(requests, 'post', return_value=req_result): + self.dispatcher.get_samples(req, res) + + # test that the response code is 200 + self.assertEqual(res.status, getattr(falcon, 'HTTP_200')) + obj = json.loads(res.body) + self.assertEqual(obj[0]['meter'], 'BABMGD') + self.assertEqual(obj[0]['id'], 'AVOziWmP6-pxt0dRmr7j') + self.assertEqual(obj[0]['type'], 'metrics') + self.assertEqual(obj[0]['user_id'], + 'efd87807-12d2-4b38-9c70-5f5c2ac427ff') + self.assertEqual(obj[0]['project_id'], + '35b17138-b364-4e6a-a131-8f3099c5be68') + self.assertEqual(obj[0]['timestamp'], 1461337094000) + self.assertEqual(obj[0]['volume'], 4) + self.assertEqual(len(obj), 1) + + def test_get_sample_byid(self): + res = mock.Mock() + req = mock.Mock() + + def _side_effect(arg): + if arg == 'name': + return 'tongli' + elif arg == 'dimensions': + return 'key1:100, key2:200' + req.get_param.side_effect = _side_effect + + req_result = mock.Mock() + + req_result.json.return_value = json.loads(self.response_str) + req_result.status_code = 200 + + with mock.patch.object(requests, 'post', return_value=req_result): + self.dispatcher.get_sample_byid(req, res, "AVOziWmP6-pxt0dRmr7j") + + # test that the response code is 200 + self.assertEqual(res.status, getattr(falcon, 'HTTP_200')) + obj = json.loads(res.body) + self.assertEqual(obj[0]['meter'], 'BABMGD') + self.assertEqual(obj[0]['id'], 'AVOziWmP6-pxt0dRmr7j') + self.assertEqual(obj[0]['type'], 'metrics') + self.assertEqual(obj[0]['user_id'], + 'efd87807-12d2-4b38-9c70-5f5c2ac427ff') + self.assertEqual(obj[0]['project_id'], + '35b17138-b364-4e6a-a131-8f3099c5be68') + self.assertEqual(obj[0]['timestamp'], 1461337094000) + self.assertEqual(obj[0]['volume'], 4) + self.assertEqual(len(obj), 1) diff --git a/kiloeyes/v2/elasticsearch/samples.py b/kiloeyes/v2/elasticsearch/samples.py new file mode 100755 index 0000000..7fa017e --- /dev/null +++ b/kiloeyes/v2/elasticsearch/samples.py @@ -0,0 +1,238 @@ +# Copyright 2013 IBM Corp +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import falcon +from oslo_config import cfg +from oslo_log import log +import requests +from stevedore import driver + +from kiloeyes.common import es_conn +from kiloeyes.common import kafka_conn +from kiloeyes.common import namespace +from kiloeyes.common import resource_api +from kiloeyes.v2.elasticsearch import metrics + +try: + import ujson as json +except ImportError: + import json + +SAMPLES_OPTS = [ + cfg.StrOpt('topic', default='metrics', + help='The topic that samples will be published to.'), + cfg.StrOpt('doc_type', default='metrics', + help='The doc type that samples will be saved into.'), + cfg.StrOpt('index_strategy', default='fixed', + help='The index strategy used to create index name.'), + cfg.StrOpt('index_prefix', default='data_', + help='The index prefix where samples were saved to.'), + cfg.StrOpt('index_template', default='/etc/kiloeyes/metrics.template', + help='The index template which samples index should use.'), + cfg.IntOpt('size', default=10000, + help=('The query result limit. Any result set more than ' + 'the limit will be discarded. To see all the matching ' + 'result, narrow your search by using a small time ' + 'window or strong matching name')), +] + +cfg.CONF.register_opts(SAMPLES_OPTS, group="samples") + +LOG = log.getLogger(__name__) + +UPDATED = str(datetime.datetime(2014, 1, 1, 0, 0, 0)) + + +class CeilometerSampleDispatcher(object): + def __init__(self, global_conf): + LOG.debug('initializing V2API!') + super(CeilometerSampleDispatcher, self).__init__() + self.topic = cfg.CONF.samples.topic + self.doc_type = cfg.CONF.samples.doc_type + self.index_template = cfg.CONF.samples.index_template + self.size = cfg.CONF.samples.size + self._kafka_conn = kafka_conn.KafkaConnection(self.topic) + + # load index strategy + if cfg.CONF.samples.index_strategy: + self.index_strategy = driver.DriverManager( + namespace.STRATEGY_NS, + cfg.CONF.samples.index_strategy, + invoke_on_load=True, + invoke_kwds={}).driver + LOG.debug(dir(self.index_strategy)) + else: + self.index_strategy = None + + self.index_prefix = cfg.CONF.samples.index_prefix + + self._es_conn = es_conn.ESConnection( + self.doc_type, self.index_strategy, self.index_prefix) + + # Setup the get samples query body pattern + self._query_body = { + "query": {"bool": {"must": []}}, + "size": self.size} + + self._aggs_body = {} + self._stats_body = {} + self._sort_clause = [] + + # Setup the get samples query url, the url should be similar to this: + # http://host:port/data_20141201/metrics/_search + # the url should be made of es_conn uri, the index prefix, samples + # dispatcher topic, then add the key word _search. + self._query_url = ''.join([self._es_conn.uri, + self._es_conn.index_prefix, '*/', + cfg.CONF.samples.topic, + '/_search?search_type=count']) + + # Setup sample query aggregation command. To see the structure of + # the aggregation, copy and paste it to a json formatter. + self._sample_agg = """ + {"by_name":{"terms":{"field":"name","size":%(size)d}, + "aggs":{"by_dim":{"terms":{"field":"dimensions_hash","size":%(size)d}, + "aggs":{"samples":{"top_hits":{"_source":{"exclude": + ["dimensions_hash"]},"size":1}}}}}}} + """ + + self.setup_index_template() + + def setup_index_template(self): + status = '400' + with open(self.index_template) as template_file: + template_path = ''.join([self._es_conn.uri, + '/_template/metrics']) + es_res = requests.put(template_path, data=template_file.read()) + status = getattr(falcon, 'HTTP_%s' % es_res.status_code) + + if status == '400': + LOG.error('Metrics template can not be created. Status code %s' + % status) + exit(1) + else: + LOG.debug('Index template set successfully! Status %s' % status) + + def _get_agg_response(self, res): + if res and res.status_code == 200: + obj = res.json() + if obj: + return obj.get('aggregations') + return None + else: + return None + + def _render_hits(self, item, flag): + _id = item['samples']['hits']['hits'][0]['_id'] + _type = item['samples']['hits']['hits'][0]['_type'] + _source = item['samples']['hits']['hits'][0]['_source'] + rslt = ('{"id":' + json.dumps(_id) + ',' + '"metadata":' + json.dumps(_source['dimensions']) + ',' + '"meter":' + json.dumps(_source['name']) + ',' + '"project_id":' + + json.dumps(_source['project_id']) + ',' + '"recorded_at":' + + json.dumps(_source['timestamp']) + ',' + '"resource_id":' + + json.dumps(_source['tenant_id']) + ',' + '"source":' + json.dumps(_source['user_agent']) + ',' + '"timestamp":' + json.dumps(_source['timestamp']) + ',' + '"type":' + json.dumps(_type) + ',' + '"unit":null,' + '"user_id":' + json.dumps(_source['user_id']) + ',' + '"volume":' + json.dumps(_source['value']) + '}') + if flag['is_first']: + flag['is_first'] = False + return rslt + else: + return ',' + rslt + + def _make_body(self, buckets): + flag = {'is_first': True} + yield '[' + for by_name in buckets: + if by_name['by_dim']: + for by_dim in by_name['by_dim']['buckets']: + yield self._render_hits(by_dim, flag) + yield ']' + + @resource_api.Restify('/v2.0/samples', method='get') + def get_samples(self, req, res): + LOG.debug('The samples GET request is received') + + # process query condition + query = [] + metrics.ParamUtil.common(req, query) + _samples_ag = self._sample_agg % {"size": self.size} + if query: + body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},' + '"size":' + str(self.size) + ',' + '"aggs":' + _samples_ag + '}') + else: + body = '{"aggs":' + _samples_ag + '}' + + LOG.debug('Request body:' + body) + LOG.debug('Request url:' + self._query_url) + es_res = requests.post(self._query_url, data=body) + res.status = getattr(falcon, 'HTTP_%s' % es_res.status_code) + + LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code) + res_data = self._get_agg_response(es_res) + if res_data: + # convert the response into ceilometer sample format + aggs = res_data['by_name']['buckets'] + + res.body = ''.join(self._make_body(aggs)) + res.content_type = 'application/json;charset=utf-8' + else: + res.body = '' + + @resource_api.Restify('/v2.0/samples/{sample_id}', method='get') + def get_sample_byid(self, req, res, sample_id): + LOG.debug('The sample %s GET request is received' % sample_id) + + # process query condition + query = [] + metrics.ParamUtil.common(req, query) + _sample_ag = self._sample_agg % {"size": self.size} + if query: + body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},' + '"size":' + str(self.size) + ',' + '"aggs":' + _sample_ag + '}') + else: + body = '{"aggs":' + _sample_ag + '}' + + # modify the query url to filter out name + query_url = [] + if sample_id: + query_url = self._query_url + '&q=_id:' + sample_id + else: + query_url = self._query_url + LOG.debug('Request body:' + body) + LOG.debug('Request url:' + query_url) + es_res = requests.post(query_url, data=body) + res.status = getattr(falcon, 'HTTP_%s' % es_res.status_code) + + LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code) + res_data = self._get_agg_response(es_res) + LOG.debug('@$Result data is %s\n' % res_data) + if res_data: + # convert the response into ceilometer sample format + aggs = res_data['by_name']['buckets'] + + res.body = ''.join(self._make_body(aggs)) + res.content_type = 'application/json;charset=utf-8' + else: + res.body = '' diff --git a/setup.cfg b/setup.cfg index 0d90dfc..a757ab2 100755 --- a/setup.cfg +++ b/setup.cfg @@ -51,6 +51,7 @@ kiloeyes.dispatcher = notificationmethods = kiloeyes.v2.elasticsearch.notificationmethods:NotificationMethodDispatcher alarms = kiloeyes.v2.elasticsearch.alarms:AlarmDispatcher meters = kiloeyes.v2.elasticsearch.meters:MeterDispatcher + samples = kiloeyes.v2.elasticsearch.samples:CeilometerSampleDispatcher kiloeyes.index.strategy = timed = kiloeyes.microservice.timed_strategy:TimedStrategy