764 lines
26 KiB
Python
764 lines
26 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2018 Objectif Libre
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# @author: Luka Peschke
|
|
#
|
|
from collections import deque
|
|
from collections import Iterable
|
|
import copy
|
|
import datetime
|
|
import decimal
|
|
import time
|
|
|
|
from gnocchiclient import auth as gauth
|
|
from gnocchiclient import client as gclient
|
|
from gnocchiclient import exceptions as gexceptions
|
|
from keystoneauth1 import loading as ks_loading
|
|
from oslo_config import cfg
|
|
from oslo_log import log
|
|
from oslo_utils import uuidutils
|
|
import six
|
|
|
|
from cloudkitty.storage.v2 import BaseStorage
|
|
from cloudkitty import utils as ck_utils
|
|
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
CONF = cfg.CONF
|
|
|
|
|
|
gnocchi_storage_opts = [
|
|
cfg.StrOpt(
|
|
'gnocchi_auth_type',
|
|
default='keystone',
|
|
choices=['keystone', 'basic'],
|
|
help='(v2) Gnocchi auth type (keystone or basic). Keystone '
|
|
'credentials can be specified through the "auth_section" parameter',
|
|
),
|
|
cfg.StrOpt(
|
|
'gnocchi_user',
|
|
default='',
|
|
help='(v2) Gnocchi user (for basic auth only)',
|
|
),
|
|
cfg.StrOpt(
|
|
'gnocchi_endpoint',
|
|
default='',
|
|
help='(v2) Gnocchi endpoint (for basic auth only)',
|
|
),
|
|
cfg.StrOpt(
|
|
'api_interface',
|
|
default='internalURL',
|
|
help='(v2) Endpoint URL type (for keystone auth only)',
|
|
),
|
|
cfg.IntOpt(
|
|
'measure_chunk_size',
|
|
min=10, max=1000000,
|
|
default=500,
|
|
help='(v2) Maximum amount of measures to send to gnocchi at once '
|
|
'(defaults to 500).',
|
|
),
|
|
]
|
|
|
|
|
|
CONF.register_opts(gnocchi_storage_opts, 'storage_gnocchi')
|
|
ks_loading.register_session_conf_options(CONF, 'storage_gnocchi')
|
|
ks_loading.register_auth_conf_options(CONF, 'storage_gnocchi')
|
|
|
|
|
|
RESOURCE_TYPE_NAME_ROOT = 'cloudkitty_metric_'
|
|
ARCHIVE_POLICY_NAME = 'cloudkitty_archive_policy'
|
|
|
|
GROUPBY_NAME_ROOT = 'groupby_attr_'
|
|
META_NAME_ROOT = 'meta_attr_'
|
|
|
|
|
|
class GnocchiResource(object):
|
|
"""Class representing a gnocchi resource
|
|
|
|
It provides utils for resource_type/resource creation and identifying.
|
|
"""
|
|
|
|
def __init__(self, name, metric, conn):
|
|
"""Resource_type name, metric, gnocchiclient"""
|
|
|
|
self.name = name
|
|
self.resource_type = RESOURCE_TYPE_NAME_ROOT + name
|
|
self.unit = metric['vol']['unit']
|
|
self.groupby = {
|
|
k: v if v else '' for k, v in metric['groupby'].items()}
|
|
self.metadata = {
|
|
k: v if v else '' for k, v in metric['metadata'].items()}
|
|
self._trans_groupby = {
|
|
GROUPBY_NAME_ROOT + key: val for key, val in self.groupby.items()
|
|
}
|
|
self._trans_metadata = {
|
|
META_NAME_ROOT + key: val for key, val in self.metadata.items()
|
|
}
|
|
self._conn = conn
|
|
self._resource = None
|
|
self.attributes = self.metadata.copy()
|
|
self.attributes.update(self.groupby)
|
|
self._trans_attributes = self._trans_metadata.copy()
|
|
self._trans_attributes.update(self._trans_groupby)
|
|
self.needs_update = False
|
|
|
|
def __getitem__(self, key):
|
|
output = self._trans_attributes.get(GROUPBY_NAME_ROOT + key, None)
|
|
if output is None:
|
|
output = self._trans_attributes.get(META_NAME_ROOT + key, None)
|
|
return output
|
|
|
|
def __eq__(self, other):
|
|
if self.resource_type != other.resource_type or \
|
|
self['id'] != other['id']:
|
|
return False
|
|
own_keys = list(self.groupby.keys())
|
|
own_keys.sort()
|
|
other_keys = list(other.groupby.keys())
|
|
other_keys.sort()
|
|
if own_keys != other_keys:
|
|
return False
|
|
|
|
for key in own_keys:
|
|
if other[key] != self[key]:
|
|
return False
|
|
|
|
return True
|
|
|
|
@property
|
|
def qty(self):
|
|
if self._resource:
|
|
return self._resource['metrics']['qty']
|
|
return None
|
|
|
|
@property
|
|
def cost(self):
|
|
if self._resource:
|
|
return self._resource['metrics']['cost']
|
|
return None
|
|
|
|
def _get_res_type_dict(self):
|
|
attributes = {}
|
|
for key in self._trans_groupby.keys():
|
|
attributes[key] = {'required': True, 'type': 'string'}
|
|
attributes['unit'] = {'required': True, 'type': 'string'}
|
|
for key in self._trans_metadata.keys():
|
|
attributes[key] = {'required': False, 'type': 'string'}
|
|
|
|
return {
|
|
'name': self.resource_type,
|
|
'attributes': attributes,
|
|
}
|
|
|
|
def create_resource_type(self):
|
|
"""Allows to create the type corresponding to this resource."""
|
|
try:
|
|
self._conn.resource_type.get(self.resource_type)
|
|
except gexceptions.ResourceTypeNotFound:
|
|
res_type = self._get_res_type_dict()
|
|
LOG.debug('Creating resource_type {} in gnocchi'.format(
|
|
self.resource_type))
|
|
self._conn.resource_type.create(res_type)
|
|
|
|
@staticmethod
|
|
def _get_rfc6902_attributes_add_op(new_attributes):
|
|
return [{
|
|
'op': 'add',
|
|
'path': '/attributes/{}'.format(attr),
|
|
'value': {
|
|
'required': attr.startswith(GROUPBY_NAME_ROOT),
|
|
'type': 'string'
|
|
}
|
|
} for attr in new_attributes]
|
|
|
|
def update_resource_type(self):
|
|
needed_res_type = self._get_res_type_dict()
|
|
current_res_type = self._conn.resource_type.get(
|
|
needed_res_type['name'])
|
|
|
|
new_attributes = [attr for attr in needed_res_type['attributes'].keys()
|
|
if attr not in current_res_type['attributes'].keys()]
|
|
if not new_attributes:
|
|
return
|
|
LOG.info('Adding {} to resource_type {}'.format(
|
|
[attr.replace(GROUPBY_NAME_ROOT, '').replace(META_NAME_ROOT, '')
|
|
for attr in new_attributes],
|
|
current_res_type['name'].replace(RESOURCE_TYPE_NAME_ROOT, ''),
|
|
))
|
|
new_attributes_op = self._get_rfc6902_attributes_add_op(new_attributes)
|
|
self._conn.resource_type.update(
|
|
needed_res_type['name'], new_attributes_op)
|
|
|
|
def _create_metrics(self):
|
|
qty = self._conn.metric.create(
|
|
name='qty',
|
|
unit=self.unit,
|
|
archive_policy_name=ARCHIVE_POLICY_NAME,
|
|
)
|
|
cost = self._conn.metric.create(
|
|
name='cost',
|
|
archive_policy_name=ARCHIVE_POLICY_NAME,
|
|
)
|
|
return qty, cost
|
|
|
|
def exists_in_gnocchi(self):
|
|
"""Check if the resource exists in gnocchi.
|
|
|
|
Returns true if the resource exists.
|
|
"""
|
|
query = {
|
|
'and': [
|
|
{'=': {key: value}}
|
|
for key, value in self._trans_groupby.items()
|
|
],
|
|
}
|
|
res = self._conn.resource.search(resource_type=self.resource_type,
|
|
query=query)
|
|
if len(res) > 1:
|
|
LOG.warning(
|
|
"Found more than one metric matching groupby. This may not "
|
|
"have the behavior you're expecting. You should probably add "
|
|
"some items to groupby")
|
|
if len(res) > 0:
|
|
self._resource = res[0]
|
|
return True
|
|
return False
|
|
|
|
def create(self):
|
|
"""Creates the resource in gnocchi."""
|
|
if self._resource:
|
|
return
|
|
self.create_resource_type()
|
|
qty_metric, cost_metric = self._create_metrics()
|
|
resource = self._trans_attributes.copy()
|
|
resource['metrics'] = {
|
|
'qty': qty_metric['id'],
|
|
'cost': cost_metric['id'],
|
|
}
|
|
resource['id'] = uuidutils.generate_uuid()
|
|
resource['unit'] = self.unit
|
|
if not self.exists_in_gnocchi():
|
|
try:
|
|
self._resource = self._conn.resource.create(
|
|
self.resource_type, resource)
|
|
# Attributes have changed
|
|
except gexceptions.BadRequest:
|
|
self.update_resource_type()
|
|
self._resource = self._conn.resource.create(
|
|
self.resource_type, resource)
|
|
|
|
def update(self, metric):
|
|
for key, val in metric['metadata'].items():
|
|
self._resource[META_NAME_ROOT + key] = val
|
|
self._resource = self._conn.update(
|
|
self.resource_type, self._resource['id'], self._resource)
|
|
self.needs_update = False
|
|
return self._resource
|
|
|
|
|
|
class GnocchiResourceCacher(object):
|
|
"""Class allowing to keep created resource in memory to improve perfs.
|
|
|
|
It keeps the last max_size resources in cache.
|
|
"""
|
|
|
|
def __init__(self, max_size=500):
|
|
self._resources = deque(maxlen=max_size)
|
|
|
|
def __contains__(self, resource):
|
|
for r in self._resources:
|
|
if r == resource:
|
|
for key, val in resource.metadata.items():
|
|
if val != r[key]:
|
|
r.needs_update = True
|
|
return True
|
|
return False
|
|
|
|
def add_resource(self, resource):
|
|
"""Add a resource to the cacher.
|
|
|
|
:param resource: resource to add
|
|
:type resource: GnocchiResource
|
|
"""
|
|
for r in self._resources:
|
|
if r == resource:
|
|
return
|
|
self._resources.append(resource)
|
|
|
|
def get(self, resource):
|
|
"""Returns the resource matching to the parameter.
|
|
|
|
:param resource: resource to get
|
|
:type resource: GnocchiResource
|
|
"""
|
|
for r in self._resources:
|
|
if r == resource:
|
|
return r
|
|
return None
|
|
|
|
def get_by_id(self, resource_id):
|
|
"""Returns the resource matching the given id.
|
|
|
|
:param resource_id: ID of the resource to get
|
|
:type resource: str
|
|
"""
|
|
for r in self._resources:
|
|
if r['id'] == resource_id:
|
|
return r
|
|
return None
|
|
|
|
|
|
class GnocchiStorage(BaseStorage):
|
|
|
|
default_op = ['aggregate', 'sum', ['metric', 'cost', 'sum'], ]
|
|
|
|
def _check_archive_policy(self):
|
|
try:
|
|
self._conn.archive_policy.get(ARCHIVE_POLICY_NAME)
|
|
except gexceptions.ArchivePolicyNotFound:
|
|
definition = [
|
|
{'granularity': str(CONF.collect.period) + 's',
|
|
'timespan': '{d} days'.format(d=self.get_retention().days)},
|
|
]
|
|
archive_policy = {
|
|
'name': ARCHIVE_POLICY_NAME,
|
|
'back_window': 0,
|
|
'aggregation_methods': [
|
|
'std', 'count', 'min', 'max', 'sum', 'mean'],
|
|
'definition': definition,
|
|
}
|
|
self._conn.archive_policy.create(archive_policy)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(GnocchiStorage, self).__init__(*args, **kwargs)
|
|
|
|
adapter_options = {'connect_retries': 3}
|
|
if CONF.storage_gnocchi.gnocchi_auth_type == 'keystone':
|
|
auth_plugin = ks_loading.load_auth_from_conf_options(
|
|
CONF,
|
|
'storage_gnocchi',
|
|
)
|
|
adapter_options['interface'] = CONF.storage_gnocchi.api_interface
|
|
else:
|
|
auth_plugin = gauth.GnocchiBasicPlugin(
|
|
user=CONF.storage_gnocchi.gnocchi_user,
|
|
endpoint=CONF.storage_gnocchi.gnocchi_endpoint,
|
|
)
|
|
self._conn = gclient.Client(
|
|
'1',
|
|
session_options={'auth': auth_plugin},
|
|
adapter_options=adapter_options,
|
|
)
|
|
self._cacher = GnocchiResourceCacher()
|
|
|
|
def init(self):
|
|
self._check_archive_policy()
|
|
|
|
def _check_resource(self, metric_name, metric):
|
|
resource = GnocchiResource(metric_name, metric, self._conn)
|
|
if resource in self._cacher:
|
|
return self._cacher.get(resource)
|
|
resource.create()
|
|
self._cacher.add_resource(resource)
|
|
return resource
|
|
|
|
def _push_measures_to_gnocchi(self, measures):
|
|
if measures:
|
|
try:
|
|
self._conn.metric.batch_metrics_measures(measures)
|
|
except gexceptions.BadRequest:
|
|
LOG.warning(
|
|
'An exception occured while trying to push measures to '
|
|
'gnocchi. Retrying in 1 second. If this happens again, '
|
|
'set measure_chunk_size to a lower value.')
|
|
time.sleep(1)
|
|
self._conn.metric.batch_metrics_measures(measures)
|
|
|
|
# Do not use scope_id, as it is deprecated and will be
|
|
# removed together with the v1 storage
|
|
def push(self, dataframes, scope_id=None):
|
|
if not isinstance(dataframes, list):
|
|
dataframes = [dataframes]
|
|
measures = {}
|
|
nb_measures = 0
|
|
for dataframe in dataframes:
|
|
timestamp = dataframe['period']['begin']
|
|
for metric_name, metrics in dataframe['usage'].items():
|
|
for metric in metrics:
|
|
resource = self._check_resource(metric_name, metric)
|
|
if resource.needs_update:
|
|
resource.update(metric)
|
|
if not resource.qty or not resource.cost:
|
|
LOG.warning('Unexpected continue')
|
|
continue
|
|
|
|
# resource.qty is the uuid of the qty metric
|
|
if not measures.get(resource.qty):
|
|
measures[resource.qty] = []
|
|
measures[resource.qty].append({
|
|
'timestamp': timestamp,
|
|
'value': metric['vol']['qty'],
|
|
})
|
|
|
|
if not measures.get(resource.cost):
|
|
measures[resource.cost] = []
|
|
measures[resource.cost].append({
|
|
'timestamp': timestamp,
|
|
'value': metric['rating']['price'],
|
|
})
|
|
nb_measures += 2
|
|
if nb_measures >= CONF.storage_gnocchi.measure_chunk_size:
|
|
LOG.debug('Pushing {} measures to gnocchi.'.format(
|
|
nb_measures))
|
|
self._push_measures_to_gnocchi(measures)
|
|
measures = {}
|
|
nb_measures = 0
|
|
|
|
LOG.debug('Pushing {} measures to gnocchi.'.format(nb_measures))
|
|
self._push_measures_to_gnocchi(measures)
|
|
|
|
def _get_ck_resource_types(self):
|
|
types = self._conn.resource_type.list()
|
|
return [gtype['name'] for gtype in types
|
|
if gtype['name'].startswith(RESOURCE_TYPE_NAME_ROOT)]
|
|
|
|
def _check_res_types(self, res_type=None):
|
|
if res_type is None:
|
|
output = self._get_ck_resource_types()
|
|
elif isinstance(res_type, Iterable):
|
|
output = res_type
|
|
else:
|
|
output = [res_type]
|
|
return sorted(output)
|
|
|
|
@staticmethod
|
|
def _check_begin_end(begin, end):
|
|
if not begin:
|
|
begin = ck_utils.get_month_start()
|
|
if not end:
|
|
end = ck_utils.get_next_month()
|
|
if isinstance(begin, six.text_type):
|
|
begin = ck_utils.iso2dt(begin)
|
|
if isinstance(begin, int):
|
|
begin = ck_utils.ts2dt(begin)
|
|
if isinstance(end, six.text_type):
|
|
end = ck_utils.iso2dt(end)
|
|
if isinstance(end, int):
|
|
end = ck_utils.ts2dt(end)
|
|
|
|
return begin, end
|
|
|
|
def _get_resource_frame(self,
|
|
cost_measure,
|
|
qty_measure,
|
|
resource):
|
|
# Getting price
|
|
price = decimal.Decimal(cost_measure[2])
|
|
price_dict = {'price': float(price)}
|
|
|
|
# Getting vol
|
|
vol_dict = {
|
|
'qty': decimal.Decimal(qty_measure[2]),
|
|
'unit': resource.get('unit'),
|
|
}
|
|
|
|
# Formatting
|
|
groupby = {
|
|
k.replace(GROUPBY_NAME_ROOT, ''): v
|
|
for k, v in resource.items() if k.startswith(GROUPBY_NAME_ROOT)
|
|
}
|
|
metadata = {
|
|
k.replace(META_NAME_ROOT, ''): v
|
|
for k, v in resource.items() if k.startswith(META_NAME_ROOT)
|
|
}
|
|
return {
|
|
'groupby': groupby,
|
|
'metadata': metadata,
|
|
'vol': vol_dict,
|
|
'rating': price_dict,
|
|
}
|
|
|
|
def _to_cloudkitty(self,
|
|
res_type,
|
|
resource,
|
|
cost_measure,
|
|
qty_measure):
|
|
|
|
start = cost_measure[0]
|
|
stop = start + datetime.timedelta(seconds=cost_measure[1])
|
|
|
|
# Period
|
|
period_dict = {
|
|
'begin': ck_utils.dt2iso(start),
|
|
'end': ck_utils.dt2iso(stop),
|
|
}
|
|
|
|
return {
|
|
'usage': {res_type: [
|
|
self._get_resource_frame(cost_measure, qty_measure, resource)],
|
|
},
|
|
'period': period_dict,
|
|
}
|
|
|
|
def _get_resource_info(self, resource_ids, start, stop):
|
|
search = {
|
|
'and': [
|
|
{
|
|
'or': [
|
|
{
|
|
'=': {'id': resource_id},
|
|
}
|
|
for resource_id in resource_ids
|
|
],
|
|
},
|
|
],
|
|
}
|
|
|
|
resources = []
|
|
marker = None
|
|
while True:
|
|
resource_chunk = self._conn.resource.search(query=search,
|
|
details=True,
|
|
marker=marker,
|
|
sorts=['id:asc'])
|
|
if len(resource_chunk) < 1:
|
|
break
|
|
marker = resource_chunk[-1]['id']
|
|
resources += resource_chunk
|
|
return {resource['id']: resource for resource in resources}
|
|
|
|
@staticmethod
|
|
def _dataframes_to_list(dataframes):
|
|
keys = sorted(dataframes.keys())
|
|
return [dataframes[key] for key in keys]
|
|
|
|
def _get_dataframes(self, measures, resource_info):
|
|
dataframes = {}
|
|
|
|
for measure in measures:
|
|
resource_type = measure['group']['type']
|
|
resource_id = measure['group']['id']
|
|
|
|
# Raw metrics do not contain all required attributes
|
|
resource = resource_info[resource_id]
|
|
|
|
dataframe = dataframes.get(measure['cost'][0])
|
|
ck_resource_type_name = resource_type.replace(
|
|
RESOURCE_TYPE_NAME_ROOT, '')
|
|
if dataframe is None:
|
|
dataframes[measure['cost'][0]] = self._to_cloudkitty(
|
|
ck_resource_type_name,
|
|
resource,
|
|
measure['cost'],
|
|
measure['qty'])
|
|
elif dataframe['usage'].get(ck_resource_type_name) is None:
|
|
dataframe['usage'][ck_resource_type_name] = [
|
|
self._get_resource_frame(
|
|
measure['cost'], measure['qty'], resource)]
|
|
else:
|
|
dataframe['usage'][ck_resource_type_name].append(
|
|
self._get_resource_frame(
|
|
measure['cost'], measure['qty'], resource))
|
|
return self._dataframes_to_list(dataframes)
|
|
|
|
@staticmethod
|
|
def _create_filters(filters, group_filters):
|
|
output = {}
|
|
|
|
if filters:
|
|
for k, v in filters.items():
|
|
output[META_NAME_ROOT + k] = v
|
|
if group_filters:
|
|
for k, v in group_filters.items():
|
|
output[GROUPBY_NAME_ROOT + k] = v
|
|
return output
|
|
|
|
def _raw_metrics_to_distinct_measures(self,
|
|
raw_cost_metrics,
|
|
raw_qty_metrics):
|
|
output = []
|
|
for cost, qty in zip(raw_cost_metrics, raw_qty_metrics):
|
|
output += [{
|
|
'cost': cost_measure,
|
|
'qty': qty['measures']['measures']['aggregated'][idx],
|
|
'group': cost['group'],
|
|
} for idx, cost_measure in enumerate(
|
|
cost['measures']['measures']['aggregated'])
|
|
]
|
|
# Sorting by timestamp, metric type and resource ID
|
|
output.sort(key=lambda x: (
|
|
x['cost'][0], x['group']['type'], x['group']['id']))
|
|
return output
|
|
|
|
def retrieve(self, begin=None, end=None,
|
|
filters=None, group_filters=None,
|
|
metric_types=None,
|
|
offset=0, limit=100, paginate=True):
|
|
|
|
begin, end = self._check_begin_end(begin, end)
|
|
|
|
metric_types = self._check_res_types(metric_types)
|
|
|
|
# Getting a list of active gnocchi resources with measures
|
|
filters = self._create_filters(filters, group_filters)
|
|
|
|
# FIXME(lukapeschke): We query all resource types in order to get the
|
|
# total amount of dataframes, but this could be done in a better way;
|
|
# ie. by not doing addtional queries once the limit is reached
|
|
raw_cost_metrics = []
|
|
raw_qty_metrics = []
|
|
for mtype in metric_types:
|
|
cost_metrics, qty_metrics = self._single_resource_type_aggregates(
|
|
begin, end, mtype, ['type', 'id'], filters, fetch_qty=True)
|
|
raw_cost_metrics += cost_metrics
|
|
raw_qty_metrics += qty_metrics
|
|
measures = self._raw_metrics_to_distinct_measures(
|
|
raw_cost_metrics, raw_qty_metrics)
|
|
|
|
result = {'total': len(measures)}
|
|
|
|
if paginate:
|
|
measures = measures[offset:limit]
|
|
if len(measures) < 1:
|
|
return {
|
|
'total': 0,
|
|
'dataframes': [],
|
|
}
|
|
resource_ids = [measure['group']['id'] for measure in measures]
|
|
|
|
resource_info = self._get_resource_info(resource_ids, begin, end)
|
|
|
|
result['dataframes'] = self._get_dataframes(measures, resource_info)
|
|
return result
|
|
|
|
def _single_resource_type_aggregates(self,
|
|
start, stop,
|
|
metric_type,
|
|
groupby,
|
|
filters,
|
|
fetch_qty=False):
|
|
search = {
|
|
'and': [
|
|
{'=': {'type': metric_type}}
|
|
]
|
|
}
|
|
search['and'] += [{'=': {k: v}} for k, v in filters.items()]
|
|
|
|
cost_op = self.default_op
|
|
output = (
|
|
self._conn.aggregates.fetch(
|
|
cost_op,
|
|
search=search,
|
|
groupby=groupby,
|
|
resource_type=metric_type,
|
|
start=start, stop=stop),
|
|
None
|
|
)
|
|
if fetch_qty:
|
|
qty_op = copy.deepcopy(self.default_op)
|
|
qty_op[2][1] = 'qty'
|
|
output = (
|
|
output[0],
|
|
self._conn.aggregates.fetch(
|
|
qty_op,
|
|
search=search,
|
|
groupby=groupby,
|
|
resource_type=metric_type,
|
|
start=start, stop=stop)
|
|
)
|
|
return output
|
|
|
|
@staticmethod
|
|
def _ungroup_type(rated_resources):
|
|
output = []
|
|
for rated_resource in rated_resources:
|
|
rated_resource['group'].pop('type', None)
|
|
new_item = True
|
|
for elem in output:
|
|
if rated_resource['group'] == elem['group']:
|
|
elem['measures']['measures']['aggregated'] \
|
|
+= rated_resource['measures']['measures']['aggregated']
|
|
new_item = False
|
|
break
|
|
if new_item:
|
|
output.append(rated_resource)
|
|
return output
|
|
|
|
def total(self, groupby=None,
|
|
begin=None, end=None,
|
|
metric_types=None,
|
|
filters=None, group_filters=None,
|
|
offset=0, limit=1000, paginate=True):
|
|
begin, end = self._check_begin_end(begin, end)
|
|
|
|
if groupby is None:
|
|
groupby = []
|
|
request_groupby = [
|
|
GROUPBY_NAME_ROOT + elem for elem in groupby if elem != 'type']
|
|
# We need to have a least one attribute on which to group
|
|
request_groupby.append('type')
|
|
|
|
# NOTE(lukapeschke): For now, it isn't possible to group aggregates
|
|
# from different resource types using custom attributes, so we need
|
|
# to do one request per resource type.
|
|
rated_resources = []
|
|
metric_types = self._check_res_types(metric_types)
|
|
filters = self._create_filters(filters, group_filters)
|
|
for mtype in metric_types:
|
|
resources, _ = self._single_resource_type_aggregates(
|
|
begin, end, mtype, request_groupby, filters)
|
|
|
|
for resource in resources:
|
|
# If we have found something
|
|
if len(resource['measures']['measures']['aggregated']):
|
|
rated_resources.append(resource)
|
|
|
|
result = {'total': len(rated_resources)}
|
|
if paginate:
|
|
rated_resources = rated_resources[offset:limit]
|
|
if len(rated_resources) < 1:
|
|
return {
|
|
'total': 0,
|
|
'results': [],
|
|
}
|
|
|
|
# NOTE(lukapeschke): We undo what has been done previously (grouping
|
|
# per type). This is not performant. Should be fixed as soon as
|
|
# previous note is supported in gnocchi
|
|
if 'type' not in groupby:
|
|
rated_resources = self._ungroup_type(rated_resources)
|
|
|
|
output = []
|
|
for rated_resource in rated_resources:
|
|
rate = sum(measure[2] for measure in
|
|
rated_resource['measures']['measures']['aggregated'])
|
|
output_elem = {
|
|
'begin': begin,
|
|
'end': end,
|
|
'rate': rate,
|
|
}
|
|
for group in groupby:
|
|
output_elem[group] = rated_resource['group'].get(
|
|
GROUPBY_NAME_ROOT + group, '')
|
|
# If we want to group per type
|
|
if 'type' in groupby:
|
|
output_elem['type'] = rated_resource['group'].get(
|
|
'type', '').replace(RESOURCE_TYPE_NAME_ROOT, '') or ''
|
|
output.append(output_elem)
|
|
result['results'] = output
|
|
return result
|