From b536e942866387fbfa9ddb4cb1d77cbbb607e861 Mon Sep 17 00:00:00 2001 From: Xiao Tan Date: Thu, 12 May 2016 21:17:51 +0000 Subject: [PATCH] Ceilometer Meter Query and Aggregation Implemented Implemented Meter Query Utility that supports operations such as eq, ne, gt, lt, gte, lte. The implementation also supports multiple filters. Implemented Meter Statistics Aggregation where client can specify which aggregation function to return. Change-Id: I5b19faa4a0d46befbe6e84c975af084cdc0d9d12 --- .../tests/v2/elasticsearch/test_meters.py | 3 +- kiloeyes/v2/elasticsearch/meters.py | 222 ++++++++++++++---- kiloeyes/v2/elasticsearch/samples.py | 20 +- 3 files changed, 185 insertions(+), 60 deletions(-) diff --git a/kiloeyes/tests/v2/elasticsearch/test_meters.py b/kiloeyes/tests/v2/elasticsearch/test_meters.py index 63c5914..4b2ad55 100755 --- a/kiloeyes/tests/v2/elasticsearch/test_meters.py +++ b/kiloeyes/tests/v2/elasticsearch/test_meters.py @@ -177,7 +177,8 @@ class TestMeterDispatcher(base.BaseTestCase): self.assertEqual(obj[0]['project_id'], '35b17138-b364-4e6a-a131-8f3099c5be68') self.assertEqual(obj[0]['counter_volume'], 4) - self.assertEqual(obj[0]['timestamp'], 1461337094000) + self.assertEqual(obj[0]['timestamp'], + tu.iso8601_from_timestamp(1461337094000)) self.assertEqual(len(obj), 1) def test_do_get_statistics(self): diff --git a/kiloeyes/v2/elasticsearch/meters.py b/kiloeyes/v2/elasticsearch/meters.py index 685dc3c..b7d11ff 100755 --- a/kiloeyes/v2/elasticsearch/meters.py +++ b/kiloeyes/v2/elasticsearch/meters.py @@ -12,7 +12,9 @@ # License for the specific language governing permissions and limitations # under the License. +import calendar import datetime +import dateutil.parser import falcon from oslo_config import cfg from oslo_log import log @@ -24,7 +26,6 @@ from kiloeyes.common import kafka_conn from kiloeyes.common import namespace from kiloeyes.common import resource_api from kiloeyes.common import timeutils as tu -from kiloeyes.v2.elasticsearch import metrics try: import ujson as json @@ -56,6 +57,146 @@ LOG = log.getLogger(__name__) UPDATED = str(datetime.datetime(2014, 1, 1, 0, 0, 0)) +class ParamUtil(object): + + @staticmethod + def process_one_filter(field, op, value, must, must_not): + if (not field or not field.strip() or + not value or not value.strip()): + return + # default op is equal + # convert ceilometer op to elasticsearch op + if not op or not op.strip(): + op = 'eq' + elif op.strip() == 'le': + op = 'lte' + elif op.strip() == 'ge': + op = 'gte' + + # if field is timestamp, convert it to correct format + if field.strip() == 'timestamp': + value = dateutil.parser.parse(value.strip()) + value = value.timetuple() + value = calendar.timegm(value) * 1000 + elif field.strip() == 'value': + value = float(value.strip()) + + # construct query based on op + if op.strip() == 'eq': + must.append({'match': {field.strip(): value}}) + elif op.strip() == 'ne': + must_not.append({'match': {field.strip(): value}}) + # range search + else: + must.append({'range': {field.strip(): {op.strip(): value}}}) + return + + @staticmethod + def filtering(req, _agg, size, query): + must = [] + must_not = [] + # default body + body = '{"aggs":' + _agg + '}' + # process ceilometer API query + # query as json, can support multiple filters + if(req.content_type == 'application/json'): + if not query: + query = req.stream.read() + if not query: + return body + if 'q' in json.loads(query): + objs = json.loads(query)['q'] + else: + return body + for obj in objs: + field = obj['field'] + op = obj['op'] + value = obj['value'] + ParamUtil.process_one_filter(field, op, value, must, must_not) + # query as param, only support one filter + else: + field = req.get_param('q.field') + op = req.get_param('q.op') + value = req.get_param('q.value') + ParamUtil.process_one_filter(field, op, value, must, must_not) + + q = '' + if must: + q = q + ('"must":' + json.dumps(must)) + elif must_not: + q = q + ('"must_not":' + json.dumps(must_not)) + if q != '': + body = ('{"query":{"bool":{' + q + '}},' + '"size":' + str(size) + ',' + '"aggs":' + _agg + '}') + return body + + @staticmethod + def process_one_aggregate(func, _agg, count): + if not func or not func.strip(): + return _agg + ret_agg = '' + # use sep ',' to seperate aggregations + sep = '' + if count != 0: + sep = ',' + if func.strip() == 'avg': + ret_agg = _agg + (sep + '"average":{"avg":{"field":"value"}}') + elif func.strip() == 'max': + ret_agg = _agg + (sep + '"maximum":{"max":{"field":"value"}}') + elif func.strip() == 'min': + ret_agg = _agg + (sep + '"minimum":{"min":{"field":"value"}}') + elif func.strip() == 'sum': + ret_agg = _agg + (sep + '"sum":{"sum":{"field":"value"}}') + elif func.strip() == 'count' or func.strip() == 'value_count': + ret_agg = _agg + (sep + + '"count":{"value_count":{"field":"value"}}') + elif func.strip() == 'stats': + ret_agg = _agg + (sep + '"statistics":{"stats":{"field":"value"}}') + return ret_agg + + @staticmethod + def aggregate(req): + # process meter statistics selectable aggregates + _stats_agg = '' + len_aggs = 1 + # aggregate as json, support multiple aggregations + if(req.content_type == 'application/json'): + query = req.stream.read() + if not query: + return ('"statistics":{"stats":{"field":"value"}}', query) + if 'aggregate' in json.loads(query): + objs = json.loads(query)['aggregate'] + else: + return ('"statistics":{"stats":{"field":"value"}}', query) + len_aggs = len(objs) + for i in xrange(0, len_aggs): + obj = objs[i] + func = obj['func'] + _stats_agg = ParamUtil.process_one_aggregate(func, + _stats_agg, i) + # aggregate as param, only support one aggregation + else: + query = None + func = req.get_param('aggregate.func') + _stats_agg = ParamUtil.process_one_aggregate(func, _stats_agg, 0) + + if _stats_agg == '': + _stats_agg = ('"statistics":{"stats":{"field":"value"}}') + # return query and pass it to ParamUtil.common to process query + # it needs to be return because req.stream.read can only be read once + return (_stats_agg, query) + + @staticmethod + def period(req): + try: + if req.get_param('period'): + return str(int(req.get_param('period'))) + 's' + except Exception: + pass + return '300s' + + class MeterDispatcher(object): def __init__(self, global_conf): LOG.debug('initializing V2API!') @@ -106,7 +247,7 @@ class MeterDispatcher(object): {"by_name":{"terms":{"field":"name","size":%(size)d}, "aggs":{"by_dim":{"terms":{"field":"dimensions_hash","size":%(size)d}, "aggs":{"meters":{"top_hits":{"_source":{"exclude": - ["dimensions_hash","timestamp","value"]},"size":1}}}}}}} + ["dimensions_hash"]},"size":1}}}}}}} """ self._oldsample_agg = """ @@ -122,8 +263,7 @@ class MeterDispatcher(object): "size":%(size)d},"aggs":{"dimension":{"top_hits":{"_source": {"exclude":["dimensions_hash","timestamp","value"]},"size":1}}, "periods":{"date_histogram":{"field":"timestamp", - "interval":"%(period)s"},"aggs":{"statistics":{"stats": - {"field":"value"}}}}}}}}} + "interval":"%(period)s"},"aggs":{%(agg)s}}}}}}} """ self.setup_index_template() @@ -166,15 +306,8 @@ class MeterDispatcher(object): LOG.debug('The meters GET request is received') # process query condition - query = [] - metrics.ParamUtil.common(req, query) _meters_ag = self._meters_agg % {"size": self.size} - if query: - body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},' - '"size":' + str(self.size) + ',' - '"aggs":' + _meters_ag + '}') - else: - body = '{"aggs":' + _meters_ag + '}' + body = ParamUtil.filtering(req, _meters_ag, self.size, None) LOG.debug('Request body:' + body) LOG.debug('Request url:' + self._query_url) @@ -201,6 +334,8 @@ class MeterDispatcher(object): '"source":' + json.dumps(_source['user_agent']) + ',' '"type":' + json.dumps(_type) + ',' '"unit":null,' + '"value":' + json.dumps(_source['value']) + ',' + '"timestamp":' + json.dumps(_source['timestamp']) + ',' '"user_id":' + json.dumps(_source['user_id']) + '}') if flag['is_first']: flag['is_first'] = False @@ -230,15 +365,8 @@ class MeterDispatcher(object): LOG.debug('The meter %s sample GET request is received' % meter_name) # process query condition - query = [] - metrics.ParamUtil.common(req, query) _meter_ag = self._oldsample_agg % {"size": self.size} - if query: - body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},' - '"size":' + str(self.size) + ',' - '"aggs":' + _meter_ag + '}') - else: - body = '{"aggs":' + _meter_ag + '}' + body = ParamUtil.filtering(req, _meter_ag, self.size, None) # modify the query url to filter out name query_url = [] @@ -253,7 +381,6 @@ class MeterDispatcher(object): LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code) res_data = self._get_agg_response(es_res) - LOG.debug('@$Result data is %s\n' % res_data) if res_data: # convert the response into ceilometer meter OldSample format aggs = res_data['by_name']['buckets'] @@ -275,7 +402,8 @@ class MeterDispatcher(object): json.dumps(_source['tenant_id']) + ',' '"resource_metadata":null,' '"source":' + json.dumps(_source['user_agent']) + ',' - '"timestamp":' + json.dumps(_source['timestamp']) + ',' + '"timestamp":"' + + tu.iso8601_from_timestamp(_source['timestamp']) + '",' '"user_id":' + json.dumps(_source['user_id']) + '}') if flag['is_first']: flag['is_first'] = False @@ -300,19 +428,12 @@ class MeterDispatcher(object): def get_meter_statistics(self, req, res, meter_name): LOG.debug('The meter %s statistics GET request is received' % meter_name) - # process query conditions - query = [] - metrics.ParamUtil.common(req, query) - period = metrics.ParamUtil.period(req) - - _stats_ag = (self._meter_stats_agg % - {"size": self.size, "period": period}) - if query: - body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},' - '"size":' + str(self.size) + ',' - '"aggs":' + _stats_ag + '}') - else: - body = '{"aggs":' + _stats_ag + '}' + # process query condition + (_agg, query) = ParamUtil.aggregate(req) + period = ParamUtil.period(req) + _stats_ag = self._meter_stats_agg % {"size": self.size, + "period": period, "agg": _agg} + body = ParamUtil.filtering(req, _stats_ag, self.size, query) # modify the query url to filter out name query_url = [] @@ -323,14 +444,14 @@ class MeterDispatcher(object): es_res = requests.post(query_url, data=body) res.status = getattr(falcon, 'HTTP_%s' % es_res.status_code) + LOG.debug('Request body:' + body) LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code) res_data = self._get_agg_response(es_res) + LOG.debug('@Result: %s', res_data) if res_data: # convert the response into Ceilometer Statistics format aggs = res_data['by_name']['buckets'] - LOG.debug('@$Stats: %s' % json.dumps(aggs)) - def _render_stats(dim): is_first = True oldest_time = [] @@ -347,11 +468,28 @@ class MeterDispatcher(object): period_diff = (current_time - previous_time) / 1000 duration_diff = (current_time - oldest_time) / 1000 # parses the statistics data - _max = str(item['statistics']['max']) - _min = str(item['statistics']['min']) - _sum = str(item['statistics']['sum']) - _avg = str(item['statistics']['avg']) - _count = str(item['statistics']['count']) + _max = 'null' + _min = 'null' + _sum = 'null' + _avg = 'null' + _count = 'null' + if 'statistics' in item: + _max = str(item['statistics']['max']) + _min = str(item['statistics']['min']) + _sum = str(item['statistics']['sum']) + _avg = str(item['statistics']['avg']) + _count = str(item['statistics']['count']) + else: + if 'average' in item: + _avg = str(item['average']['value']) + if 'maximum' in item: + _max = str(item['maximum']['value']) + if 'minimum' in item: + _min = str(item['minimum']['value']) + if 'count' in item: + _count = str(item['count']['value']) + if 'sum' in item: + _sum = str(item['sum']['value']) curr_timestamp = tu.iso8601_from_timestamp(current_time) prev_timestamp = tu.iso8601_from_timestamp(previous_time) old_timestamp = tu.iso8601_from_timestamp(oldest_time) diff --git a/kiloeyes/v2/elasticsearch/samples.py b/kiloeyes/v2/elasticsearch/samples.py index 7fa017e..d6213dc 100755 --- a/kiloeyes/v2/elasticsearch/samples.py +++ b/kiloeyes/v2/elasticsearch/samples.py @@ -23,7 +23,7 @@ from kiloeyes.common import es_conn from kiloeyes.common import kafka_conn from kiloeyes.common import namespace from kiloeyes.common import resource_api -from kiloeyes.v2.elasticsearch import metrics +from kiloeyes.v2.elasticsearch import meters try: import ujson as json @@ -173,15 +173,8 @@ class CeilometerSampleDispatcher(object): LOG.debug('The samples GET request is received') # process query condition - query = [] - metrics.ParamUtil.common(req, query) _samples_ag = self._sample_agg % {"size": self.size} - if query: - body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},' - '"size":' + str(self.size) + ',' - '"aggs":' + _samples_ag + '}') - else: - body = '{"aggs":' + _samples_ag + '}' + body = meters.ParamUtil.filtering(req, _samples_ag, self.size, None) LOG.debug('Request body:' + body) LOG.debug('Request url:' + self._query_url) @@ -204,15 +197,8 @@ class CeilometerSampleDispatcher(object): LOG.debug('The sample %s GET request is received' % sample_id) # process query condition - query = [] - metrics.ParamUtil.common(req, query) _sample_ag = self._sample_agg % {"size": self.size} - if query: - body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},' - '"size":' + str(self.size) + ',' - '"aggs":' + _sample_ag + '}') - else: - body = '{"aggs":' + _sample_ag + '}' + body = meters.ParamUtil.filtering(req, _sample_ag, self.size, None) # modify the query url to filter out name query_url = []