Ceilometer Meter Query and Aggregation Implemented

Implemented Meter Query Utility that supports operations such as
eq, ne, gt, lt, gte, lte. The implementation also supports multiple
filters.

Implemented Meter Statistics Aggregation where client can specify
which aggregation function to return.

Change-Id: I5b19faa4a0d46befbe6e84c975af084cdc0d9d12
This commit is contained in:
Xiao Tan 2016-05-12 21:17:51 +00:00 committed by litong01
parent 18ad81cb4b
commit b536e94286
3 changed files with 185 additions and 60 deletions

View File

@ -177,7 +177,8 @@ class TestMeterDispatcher(base.BaseTestCase):
self.assertEqual(obj[0]['project_id'],
'35b17138-b364-4e6a-a131-8f3099c5be68')
self.assertEqual(obj[0]['counter_volume'], 4)
self.assertEqual(obj[0]['timestamp'], 1461337094000)
self.assertEqual(obj[0]['timestamp'],
tu.iso8601_from_timestamp(1461337094000))
self.assertEqual(len(obj), 1)
def test_do_get_statistics(self):

View File

@ -12,7 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import calendar
import datetime
import dateutil.parser
import falcon
from oslo_config import cfg
from oslo_log import log
@ -24,7 +26,6 @@ from kiloeyes.common import kafka_conn
from kiloeyes.common import namespace
from kiloeyes.common import resource_api
from kiloeyes.common import timeutils as tu
from kiloeyes.v2.elasticsearch import metrics
try:
import ujson as json
@ -56,6 +57,146 @@ LOG = log.getLogger(__name__)
UPDATED = str(datetime.datetime(2014, 1, 1, 0, 0, 0))
class ParamUtil(object):
@staticmethod
def process_one_filter(field, op, value, must, must_not):
if (not field or not field.strip() or
not value or not value.strip()):
return
# default op is equal
# convert ceilometer op to elasticsearch op
if not op or not op.strip():
op = 'eq'
elif op.strip() == 'le':
op = 'lte'
elif op.strip() == 'ge':
op = 'gte'
# if field is timestamp, convert it to correct format
if field.strip() == 'timestamp':
value = dateutil.parser.parse(value.strip())
value = value.timetuple()
value = calendar.timegm(value) * 1000
elif field.strip() == 'value':
value = float(value.strip())
# construct query based on op
if op.strip() == 'eq':
must.append({'match': {field.strip(): value}})
elif op.strip() == 'ne':
must_not.append({'match': {field.strip(): value}})
# range search
else:
must.append({'range': {field.strip(): {op.strip(): value}}})
return
@staticmethod
def filtering(req, _agg, size, query):
must = []
must_not = []
# default body
body = '{"aggs":' + _agg + '}'
# process ceilometer API query
# query as json, can support multiple filters
if(req.content_type == 'application/json'):
if not query:
query = req.stream.read()
if not query:
return body
if 'q' in json.loads(query):
objs = json.loads(query)['q']
else:
return body
for obj in objs:
field = obj['field']
op = obj['op']
value = obj['value']
ParamUtil.process_one_filter(field, op, value, must, must_not)
# query as param, only support one filter
else:
field = req.get_param('q.field')
op = req.get_param('q.op')
value = req.get_param('q.value')
ParamUtil.process_one_filter(field, op, value, must, must_not)
q = ''
if must:
q = q + ('"must":' + json.dumps(must))
elif must_not:
q = q + ('"must_not":' + json.dumps(must_not))
if q != '':
body = ('{"query":{"bool":{' + q + '}},'
'"size":' + str(size) + ','
'"aggs":' + _agg + '}')
return body
@staticmethod
def process_one_aggregate(func, _agg, count):
if not func or not func.strip():
return _agg
ret_agg = ''
# use sep ',' to seperate aggregations
sep = ''
if count != 0:
sep = ','
if func.strip() == 'avg':
ret_agg = _agg + (sep + '"average":{"avg":{"field":"value"}}')
elif func.strip() == 'max':
ret_agg = _agg + (sep + '"maximum":{"max":{"field":"value"}}')
elif func.strip() == 'min':
ret_agg = _agg + (sep + '"minimum":{"min":{"field":"value"}}')
elif func.strip() == 'sum':
ret_agg = _agg + (sep + '"sum":{"sum":{"field":"value"}}')
elif func.strip() == 'count' or func.strip() == 'value_count':
ret_agg = _agg + (sep +
'"count":{"value_count":{"field":"value"}}')
elif func.strip() == 'stats':
ret_agg = _agg + (sep + '"statistics":{"stats":{"field":"value"}}')
return ret_agg
@staticmethod
def aggregate(req):
# process meter statistics selectable aggregates
_stats_agg = ''
len_aggs = 1
# aggregate as json, support multiple aggregations
if(req.content_type == 'application/json'):
query = req.stream.read()
if not query:
return ('"statistics":{"stats":{"field":"value"}}', query)
if 'aggregate' in json.loads(query):
objs = json.loads(query)['aggregate']
else:
return ('"statistics":{"stats":{"field":"value"}}', query)
len_aggs = len(objs)
for i in xrange(0, len_aggs):
obj = objs[i]
func = obj['func']
_stats_agg = ParamUtil.process_one_aggregate(func,
_stats_agg, i)
# aggregate as param, only support one aggregation
else:
query = None
func = req.get_param('aggregate.func')
_stats_agg = ParamUtil.process_one_aggregate(func, _stats_agg, 0)
if _stats_agg == '':
_stats_agg = ('"statistics":{"stats":{"field":"value"}}')
# return query and pass it to ParamUtil.common to process query
# it needs to be return because req.stream.read can only be read once
return (_stats_agg, query)
@staticmethod
def period(req):
try:
if req.get_param('period'):
return str(int(req.get_param('period'))) + 's'
except Exception:
pass
return '300s'
class MeterDispatcher(object):
def __init__(self, global_conf):
LOG.debug('initializing V2API!')
@ -106,7 +247,7 @@ class MeterDispatcher(object):
{"by_name":{"terms":{"field":"name","size":%(size)d},
"aggs":{"by_dim":{"terms":{"field":"dimensions_hash","size":%(size)d},
"aggs":{"meters":{"top_hits":{"_source":{"exclude":
["dimensions_hash","timestamp","value"]},"size":1}}}}}}}
["dimensions_hash"]},"size":1}}}}}}}
"""
self._oldsample_agg = """
@ -122,8 +263,7 @@ class MeterDispatcher(object):
"size":%(size)d},"aggs":{"dimension":{"top_hits":{"_source":
{"exclude":["dimensions_hash","timestamp","value"]},"size":1}},
"periods":{"date_histogram":{"field":"timestamp",
"interval":"%(period)s"},"aggs":{"statistics":{"stats":
{"field":"value"}}}}}}}}}
"interval":"%(period)s"},"aggs":{%(agg)s}}}}}}}
"""
self.setup_index_template()
@ -166,15 +306,8 @@ class MeterDispatcher(object):
LOG.debug('The meters GET request is received')
# process query condition
query = []
metrics.ParamUtil.common(req, query)
_meters_ag = self._meters_agg % {"size": self.size}
if query:
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
'"size":' + str(self.size) + ','
'"aggs":' + _meters_ag + '}')
else:
body = '{"aggs":' + _meters_ag + '}'
body = ParamUtil.filtering(req, _meters_ag, self.size, None)
LOG.debug('Request body:' + body)
LOG.debug('Request url:' + self._query_url)
@ -201,6 +334,8 @@ class MeterDispatcher(object):
'"source":' + json.dumps(_source['user_agent']) + ','
'"type":' + json.dumps(_type) + ','
'"unit":null,'
'"value":' + json.dumps(_source['value']) + ','
'"timestamp":' + json.dumps(_source['timestamp']) + ','
'"user_id":' + json.dumps(_source['user_id']) + '}')
if flag['is_first']:
flag['is_first'] = False
@ -230,15 +365,8 @@ class MeterDispatcher(object):
LOG.debug('The meter %s sample GET request is received' % meter_name)
# process query condition
query = []
metrics.ParamUtil.common(req, query)
_meter_ag = self._oldsample_agg % {"size": self.size}
if query:
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
'"size":' + str(self.size) + ','
'"aggs":' + _meter_ag + '}')
else:
body = '{"aggs":' + _meter_ag + '}'
body = ParamUtil.filtering(req, _meter_ag, self.size, None)
# modify the query url to filter out name
query_url = []
@ -253,7 +381,6 @@ class MeterDispatcher(object):
LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code)
res_data = self._get_agg_response(es_res)
LOG.debug('@$Result data is %s\n' % res_data)
if res_data:
# convert the response into ceilometer meter OldSample format
aggs = res_data['by_name']['buckets']
@ -275,7 +402,8 @@ class MeterDispatcher(object):
json.dumps(_source['tenant_id']) + ','
'"resource_metadata":null,'
'"source":' + json.dumps(_source['user_agent']) + ','
'"timestamp":' + json.dumps(_source['timestamp']) + ','
'"timestamp":"' +
tu.iso8601_from_timestamp(_source['timestamp']) + '",'
'"user_id":' + json.dumps(_source['user_id']) + '}')
if flag['is_first']:
flag['is_first'] = False
@ -300,19 +428,12 @@ class MeterDispatcher(object):
def get_meter_statistics(self, req, res, meter_name):
LOG.debug('The meter %s statistics GET request is received' %
meter_name)
# process query conditions
query = []
metrics.ParamUtil.common(req, query)
period = metrics.ParamUtil.period(req)
_stats_ag = (self._meter_stats_agg %
{"size": self.size, "period": period})
if query:
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
'"size":' + str(self.size) + ','
'"aggs":' + _stats_ag + '}')
else:
body = '{"aggs":' + _stats_ag + '}'
# process query condition
(_agg, query) = ParamUtil.aggregate(req)
period = ParamUtil.period(req)
_stats_ag = self._meter_stats_agg % {"size": self.size,
"period": period, "agg": _agg}
body = ParamUtil.filtering(req, _stats_ag, self.size, query)
# modify the query url to filter out name
query_url = []
@ -323,14 +444,14 @@ class MeterDispatcher(object):
es_res = requests.post(query_url, data=body)
res.status = getattr(falcon, 'HTTP_%s' % es_res.status_code)
LOG.debug('Request body:' + body)
LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code)
res_data = self._get_agg_response(es_res)
LOG.debug('@Result: %s', res_data)
if res_data:
# convert the response into Ceilometer Statistics format
aggs = res_data['by_name']['buckets']
LOG.debug('@$Stats: %s' % json.dumps(aggs))
def _render_stats(dim):
is_first = True
oldest_time = []
@ -347,11 +468,28 @@ class MeterDispatcher(object):
period_diff = (current_time - previous_time) / 1000
duration_diff = (current_time - oldest_time) / 1000
# parses the statistics data
_max = str(item['statistics']['max'])
_min = str(item['statistics']['min'])
_sum = str(item['statistics']['sum'])
_avg = str(item['statistics']['avg'])
_count = str(item['statistics']['count'])
_max = 'null'
_min = 'null'
_sum = 'null'
_avg = 'null'
_count = 'null'
if 'statistics' in item:
_max = str(item['statistics']['max'])
_min = str(item['statistics']['min'])
_sum = str(item['statistics']['sum'])
_avg = str(item['statistics']['avg'])
_count = str(item['statistics']['count'])
else:
if 'average' in item:
_avg = str(item['average']['value'])
if 'maximum' in item:
_max = str(item['maximum']['value'])
if 'minimum' in item:
_min = str(item['minimum']['value'])
if 'count' in item:
_count = str(item['count']['value'])
if 'sum' in item:
_sum = str(item['sum']['value'])
curr_timestamp = tu.iso8601_from_timestamp(current_time)
prev_timestamp = tu.iso8601_from_timestamp(previous_time)
old_timestamp = tu.iso8601_from_timestamp(oldest_time)

View File

@ -23,7 +23,7 @@ from kiloeyes.common import es_conn
from kiloeyes.common import kafka_conn
from kiloeyes.common import namespace
from kiloeyes.common import resource_api
from kiloeyes.v2.elasticsearch import metrics
from kiloeyes.v2.elasticsearch import meters
try:
import ujson as json
@ -173,15 +173,8 @@ class CeilometerSampleDispatcher(object):
LOG.debug('The samples GET request is received')
# process query condition
query = []
metrics.ParamUtil.common(req, query)
_samples_ag = self._sample_agg % {"size": self.size}
if query:
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
'"size":' + str(self.size) + ','
'"aggs":' + _samples_ag + '}')
else:
body = '{"aggs":' + _samples_ag + '}'
body = meters.ParamUtil.filtering(req, _samples_ag, self.size, None)
LOG.debug('Request body:' + body)
LOG.debug('Request url:' + self._query_url)
@ -204,15 +197,8 @@ class CeilometerSampleDispatcher(object):
LOG.debug('The sample %s GET request is received' % sample_id)
# process query condition
query = []
metrics.ParamUtil.common(req, query)
_sample_ag = self._sample_agg % {"size": self.size}
if query:
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
'"size":' + str(self.size) + ','
'"aggs":' + _sample_ag + '}')
else:
body = '{"aggs":' + _sample_ag + '}'
body = meters.ParamUtil.filtering(req, _sample_ag, self.size, None)
# modify the query url to filter out name
query_url = []