Ceilometer Meter Query and Aggregation Implemented
Implemented Meter Query Utility that supports operations such as eq, ne, gt, lt, gte, lte. The implementation also supports multiple filters. Implemented Meter Statistics Aggregation where client can specify which aggregation function to return. Change-Id: I5b19faa4a0d46befbe6e84c975af084cdc0d9d12
This commit is contained in:
parent
18ad81cb4b
commit
b536e94286
|
@ -177,7 +177,8 @@ class TestMeterDispatcher(base.BaseTestCase):
|
|||
self.assertEqual(obj[0]['project_id'],
|
||||
'35b17138-b364-4e6a-a131-8f3099c5be68')
|
||||
self.assertEqual(obj[0]['counter_volume'], 4)
|
||||
self.assertEqual(obj[0]['timestamp'], 1461337094000)
|
||||
self.assertEqual(obj[0]['timestamp'],
|
||||
tu.iso8601_from_timestamp(1461337094000))
|
||||
self.assertEqual(len(obj), 1)
|
||||
|
||||
def test_do_get_statistics(self):
|
||||
|
|
|
@ -12,7 +12,9 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import calendar
|
||||
import datetime
|
||||
import dateutil.parser
|
||||
import falcon
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
@ -24,7 +26,6 @@ from kiloeyes.common import kafka_conn
|
|||
from kiloeyes.common import namespace
|
||||
from kiloeyes.common import resource_api
|
||||
from kiloeyes.common import timeutils as tu
|
||||
from kiloeyes.v2.elasticsearch import metrics
|
||||
|
||||
try:
|
||||
import ujson as json
|
||||
|
@ -56,6 +57,146 @@ LOG = log.getLogger(__name__)
|
|||
UPDATED = str(datetime.datetime(2014, 1, 1, 0, 0, 0))
|
||||
|
||||
|
||||
class ParamUtil(object):
|
||||
|
||||
@staticmethod
|
||||
def process_one_filter(field, op, value, must, must_not):
|
||||
if (not field or not field.strip() or
|
||||
not value or not value.strip()):
|
||||
return
|
||||
# default op is equal
|
||||
# convert ceilometer op to elasticsearch op
|
||||
if not op or not op.strip():
|
||||
op = 'eq'
|
||||
elif op.strip() == 'le':
|
||||
op = 'lte'
|
||||
elif op.strip() == 'ge':
|
||||
op = 'gte'
|
||||
|
||||
# if field is timestamp, convert it to correct format
|
||||
if field.strip() == 'timestamp':
|
||||
value = dateutil.parser.parse(value.strip())
|
||||
value = value.timetuple()
|
||||
value = calendar.timegm(value) * 1000
|
||||
elif field.strip() == 'value':
|
||||
value = float(value.strip())
|
||||
|
||||
# construct query based on op
|
||||
if op.strip() == 'eq':
|
||||
must.append({'match': {field.strip(): value}})
|
||||
elif op.strip() == 'ne':
|
||||
must_not.append({'match': {field.strip(): value}})
|
||||
# range search
|
||||
else:
|
||||
must.append({'range': {field.strip(): {op.strip(): value}}})
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def filtering(req, _agg, size, query):
|
||||
must = []
|
||||
must_not = []
|
||||
# default body
|
||||
body = '{"aggs":' + _agg + '}'
|
||||
# process ceilometer API query
|
||||
# query as json, can support multiple filters
|
||||
if(req.content_type == 'application/json'):
|
||||
if not query:
|
||||
query = req.stream.read()
|
||||
if not query:
|
||||
return body
|
||||
if 'q' in json.loads(query):
|
||||
objs = json.loads(query)['q']
|
||||
else:
|
||||
return body
|
||||
for obj in objs:
|
||||
field = obj['field']
|
||||
op = obj['op']
|
||||
value = obj['value']
|
||||
ParamUtil.process_one_filter(field, op, value, must, must_not)
|
||||
# query as param, only support one filter
|
||||
else:
|
||||
field = req.get_param('q.field')
|
||||
op = req.get_param('q.op')
|
||||
value = req.get_param('q.value')
|
||||
ParamUtil.process_one_filter(field, op, value, must, must_not)
|
||||
|
||||
q = ''
|
||||
if must:
|
||||
q = q + ('"must":' + json.dumps(must))
|
||||
elif must_not:
|
||||
q = q + ('"must_not":' + json.dumps(must_not))
|
||||
if q != '':
|
||||
body = ('{"query":{"bool":{' + q + '}},'
|
||||
'"size":' + str(size) + ','
|
||||
'"aggs":' + _agg + '}')
|
||||
return body
|
||||
|
||||
@staticmethod
|
||||
def process_one_aggregate(func, _agg, count):
|
||||
if not func or not func.strip():
|
||||
return _agg
|
||||
ret_agg = ''
|
||||
# use sep ',' to seperate aggregations
|
||||
sep = ''
|
||||
if count != 0:
|
||||
sep = ','
|
||||
if func.strip() == 'avg':
|
||||
ret_agg = _agg + (sep + '"average":{"avg":{"field":"value"}}')
|
||||
elif func.strip() == 'max':
|
||||
ret_agg = _agg + (sep + '"maximum":{"max":{"field":"value"}}')
|
||||
elif func.strip() == 'min':
|
||||
ret_agg = _agg + (sep + '"minimum":{"min":{"field":"value"}}')
|
||||
elif func.strip() == 'sum':
|
||||
ret_agg = _agg + (sep + '"sum":{"sum":{"field":"value"}}')
|
||||
elif func.strip() == 'count' or func.strip() == 'value_count':
|
||||
ret_agg = _agg + (sep +
|
||||
'"count":{"value_count":{"field":"value"}}')
|
||||
elif func.strip() == 'stats':
|
||||
ret_agg = _agg + (sep + '"statistics":{"stats":{"field":"value"}}')
|
||||
return ret_agg
|
||||
|
||||
@staticmethod
|
||||
def aggregate(req):
|
||||
# process meter statistics selectable aggregates
|
||||
_stats_agg = ''
|
||||
len_aggs = 1
|
||||
# aggregate as json, support multiple aggregations
|
||||
if(req.content_type == 'application/json'):
|
||||
query = req.stream.read()
|
||||
if not query:
|
||||
return ('"statistics":{"stats":{"field":"value"}}', query)
|
||||
if 'aggregate' in json.loads(query):
|
||||
objs = json.loads(query)['aggregate']
|
||||
else:
|
||||
return ('"statistics":{"stats":{"field":"value"}}', query)
|
||||
len_aggs = len(objs)
|
||||
for i in xrange(0, len_aggs):
|
||||
obj = objs[i]
|
||||
func = obj['func']
|
||||
_stats_agg = ParamUtil.process_one_aggregate(func,
|
||||
_stats_agg, i)
|
||||
# aggregate as param, only support one aggregation
|
||||
else:
|
||||
query = None
|
||||
func = req.get_param('aggregate.func')
|
||||
_stats_agg = ParamUtil.process_one_aggregate(func, _stats_agg, 0)
|
||||
|
||||
if _stats_agg == '':
|
||||
_stats_agg = ('"statistics":{"stats":{"field":"value"}}')
|
||||
# return query and pass it to ParamUtil.common to process query
|
||||
# it needs to be return because req.stream.read can only be read once
|
||||
return (_stats_agg, query)
|
||||
|
||||
@staticmethod
|
||||
def period(req):
|
||||
try:
|
||||
if req.get_param('period'):
|
||||
return str(int(req.get_param('period'))) + 's'
|
||||
except Exception:
|
||||
pass
|
||||
return '300s'
|
||||
|
||||
|
||||
class MeterDispatcher(object):
|
||||
def __init__(self, global_conf):
|
||||
LOG.debug('initializing V2API!')
|
||||
|
@ -106,7 +247,7 @@ class MeterDispatcher(object):
|
|||
{"by_name":{"terms":{"field":"name","size":%(size)d},
|
||||
"aggs":{"by_dim":{"terms":{"field":"dimensions_hash","size":%(size)d},
|
||||
"aggs":{"meters":{"top_hits":{"_source":{"exclude":
|
||||
["dimensions_hash","timestamp","value"]},"size":1}}}}}}}
|
||||
["dimensions_hash"]},"size":1}}}}}}}
|
||||
"""
|
||||
|
||||
self._oldsample_agg = """
|
||||
|
@ -122,8 +263,7 @@ class MeterDispatcher(object):
|
|||
"size":%(size)d},"aggs":{"dimension":{"top_hits":{"_source":
|
||||
{"exclude":["dimensions_hash","timestamp","value"]},"size":1}},
|
||||
"periods":{"date_histogram":{"field":"timestamp",
|
||||
"interval":"%(period)s"},"aggs":{"statistics":{"stats":
|
||||
{"field":"value"}}}}}}}}}
|
||||
"interval":"%(period)s"},"aggs":{%(agg)s}}}}}}}
|
||||
"""
|
||||
|
||||
self.setup_index_template()
|
||||
|
@ -166,15 +306,8 @@ class MeterDispatcher(object):
|
|||
LOG.debug('The meters GET request is received')
|
||||
|
||||
# process query condition
|
||||
query = []
|
||||
metrics.ParamUtil.common(req, query)
|
||||
_meters_ag = self._meters_agg % {"size": self.size}
|
||||
if query:
|
||||
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
|
||||
'"size":' + str(self.size) + ','
|
||||
'"aggs":' + _meters_ag + '}')
|
||||
else:
|
||||
body = '{"aggs":' + _meters_ag + '}'
|
||||
body = ParamUtil.filtering(req, _meters_ag, self.size, None)
|
||||
|
||||
LOG.debug('Request body:' + body)
|
||||
LOG.debug('Request url:' + self._query_url)
|
||||
|
@ -201,6 +334,8 @@ class MeterDispatcher(object):
|
|||
'"source":' + json.dumps(_source['user_agent']) + ','
|
||||
'"type":' + json.dumps(_type) + ','
|
||||
'"unit":null,'
|
||||
'"value":' + json.dumps(_source['value']) + ','
|
||||
'"timestamp":' + json.dumps(_source['timestamp']) + ','
|
||||
'"user_id":' + json.dumps(_source['user_id']) + '}')
|
||||
if flag['is_first']:
|
||||
flag['is_first'] = False
|
||||
|
@ -230,15 +365,8 @@ class MeterDispatcher(object):
|
|||
LOG.debug('The meter %s sample GET request is received' % meter_name)
|
||||
|
||||
# process query condition
|
||||
query = []
|
||||
metrics.ParamUtil.common(req, query)
|
||||
_meter_ag = self._oldsample_agg % {"size": self.size}
|
||||
if query:
|
||||
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
|
||||
'"size":' + str(self.size) + ','
|
||||
'"aggs":' + _meter_ag + '}')
|
||||
else:
|
||||
body = '{"aggs":' + _meter_ag + '}'
|
||||
body = ParamUtil.filtering(req, _meter_ag, self.size, None)
|
||||
|
||||
# modify the query url to filter out name
|
||||
query_url = []
|
||||
|
@ -253,7 +381,6 @@ class MeterDispatcher(object):
|
|||
|
||||
LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code)
|
||||
res_data = self._get_agg_response(es_res)
|
||||
LOG.debug('@$Result data is %s\n' % res_data)
|
||||
if res_data:
|
||||
# convert the response into ceilometer meter OldSample format
|
||||
aggs = res_data['by_name']['buckets']
|
||||
|
@ -275,7 +402,8 @@ class MeterDispatcher(object):
|
|||
json.dumps(_source['tenant_id']) + ','
|
||||
'"resource_metadata":null,'
|
||||
'"source":' + json.dumps(_source['user_agent']) + ','
|
||||
'"timestamp":' + json.dumps(_source['timestamp']) + ','
|
||||
'"timestamp":"' +
|
||||
tu.iso8601_from_timestamp(_source['timestamp']) + '",'
|
||||
'"user_id":' + json.dumps(_source['user_id']) + '}')
|
||||
if flag['is_first']:
|
||||
flag['is_first'] = False
|
||||
|
@ -300,19 +428,12 @@ class MeterDispatcher(object):
|
|||
def get_meter_statistics(self, req, res, meter_name):
|
||||
LOG.debug('The meter %s statistics GET request is received' %
|
||||
meter_name)
|
||||
# process query conditions
|
||||
query = []
|
||||
metrics.ParamUtil.common(req, query)
|
||||
period = metrics.ParamUtil.period(req)
|
||||
|
||||
_stats_ag = (self._meter_stats_agg %
|
||||
{"size": self.size, "period": period})
|
||||
if query:
|
||||
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
|
||||
'"size":' + str(self.size) + ','
|
||||
'"aggs":' + _stats_ag + '}')
|
||||
else:
|
||||
body = '{"aggs":' + _stats_ag + '}'
|
||||
# process query condition
|
||||
(_agg, query) = ParamUtil.aggregate(req)
|
||||
period = ParamUtil.period(req)
|
||||
_stats_ag = self._meter_stats_agg % {"size": self.size,
|
||||
"period": period, "agg": _agg}
|
||||
body = ParamUtil.filtering(req, _stats_ag, self.size, query)
|
||||
|
||||
# modify the query url to filter out name
|
||||
query_url = []
|
||||
|
@ -323,14 +444,14 @@ class MeterDispatcher(object):
|
|||
es_res = requests.post(query_url, data=body)
|
||||
res.status = getattr(falcon, 'HTTP_%s' % es_res.status_code)
|
||||
|
||||
LOG.debug('Request body:' + body)
|
||||
LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code)
|
||||
res_data = self._get_agg_response(es_res)
|
||||
LOG.debug('@Result: %s', res_data)
|
||||
if res_data:
|
||||
# convert the response into Ceilometer Statistics format
|
||||
aggs = res_data['by_name']['buckets']
|
||||
|
||||
LOG.debug('@$Stats: %s' % json.dumps(aggs))
|
||||
|
||||
def _render_stats(dim):
|
||||
is_first = True
|
||||
oldest_time = []
|
||||
|
@ -347,11 +468,28 @@ class MeterDispatcher(object):
|
|||
period_diff = (current_time - previous_time) / 1000
|
||||
duration_diff = (current_time - oldest_time) / 1000
|
||||
# parses the statistics data
|
||||
_max = str(item['statistics']['max'])
|
||||
_min = str(item['statistics']['min'])
|
||||
_sum = str(item['statistics']['sum'])
|
||||
_avg = str(item['statistics']['avg'])
|
||||
_count = str(item['statistics']['count'])
|
||||
_max = 'null'
|
||||
_min = 'null'
|
||||
_sum = 'null'
|
||||
_avg = 'null'
|
||||
_count = 'null'
|
||||
if 'statistics' in item:
|
||||
_max = str(item['statistics']['max'])
|
||||
_min = str(item['statistics']['min'])
|
||||
_sum = str(item['statistics']['sum'])
|
||||
_avg = str(item['statistics']['avg'])
|
||||
_count = str(item['statistics']['count'])
|
||||
else:
|
||||
if 'average' in item:
|
||||
_avg = str(item['average']['value'])
|
||||
if 'maximum' in item:
|
||||
_max = str(item['maximum']['value'])
|
||||
if 'minimum' in item:
|
||||
_min = str(item['minimum']['value'])
|
||||
if 'count' in item:
|
||||
_count = str(item['count']['value'])
|
||||
if 'sum' in item:
|
||||
_sum = str(item['sum']['value'])
|
||||
curr_timestamp = tu.iso8601_from_timestamp(current_time)
|
||||
prev_timestamp = tu.iso8601_from_timestamp(previous_time)
|
||||
old_timestamp = tu.iso8601_from_timestamp(oldest_time)
|
||||
|
|
|
@ -23,7 +23,7 @@ from kiloeyes.common import es_conn
|
|||
from kiloeyes.common import kafka_conn
|
||||
from kiloeyes.common import namespace
|
||||
from kiloeyes.common import resource_api
|
||||
from kiloeyes.v2.elasticsearch import metrics
|
||||
from kiloeyes.v2.elasticsearch import meters
|
||||
|
||||
try:
|
||||
import ujson as json
|
||||
|
@ -173,15 +173,8 @@ class CeilometerSampleDispatcher(object):
|
|||
LOG.debug('The samples GET request is received')
|
||||
|
||||
# process query condition
|
||||
query = []
|
||||
metrics.ParamUtil.common(req, query)
|
||||
_samples_ag = self._sample_agg % {"size": self.size}
|
||||
if query:
|
||||
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
|
||||
'"size":' + str(self.size) + ','
|
||||
'"aggs":' + _samples_ag + '}')
|
||||
else:
|
||||
body = '{"aggs":' + _samples_ag + '}'
|
||||
body = meters.ParamUtil.filtering(req, _samples_ag, self.size, None)
|
||||
|
||||
LOG.debug('Request body:' + body)
|
||||
LOG.debug('Request url:' + self._query_url)
|
||||
|
@ -204,15 +197,8 @@ class CeilometerSampleDispatcher(object):
|
|||
LOG.debug('The sample %s GET request is received' % sample_id)
|
||||
|
||||
# process query condition
|
||||
query = []
|
||||
metrics.ParamUtil.common(req, query)
|
||||
_sample_ag = self._sample_agg % {"size": self.size}
|
||||
if query:
|
||||
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
|
||||
'"size":' + str(self.size) + ','
|
||||
'"aggs":' + _sample_ag + '}')
|
||||
else:
|
||||
body = '{"aggs":' + _sample_ag + '}'
|
||||
body = meters.ParamUtil.filtering(req, _sample_ag, self.size, None)
|
||||
|
||||
# modify the query url to filter out name
|
||||
query_url = []
|
||||
|
|
Loading…
Reference in New Issue