Adding an InfluxDB storage backend
This adds an InfluxDB backend to v2 storage. It is much more performant than the gnocchi backend, and adds support for grafana. In order to avoid making this patch too big, the documentation will be updated in another patch. Support for InfluxDB installation in the devstack plugin will also be added in another patch. Change-Id: Icaa23acb1a4791aac0dd8afb122d561065193eea Story: 2001372 Task: 24536
This commit is contained in:
parent
6350923970
commit
c4758e78b4
|
@ -29,6 +29,7 @@ import cloudkitty.service
|
|||
import cloudkitty.storage
|
||||
import cloudkitty.storage.v1.hybrid.backends.gnocchi
|
||||
import cloudkitty.storage.v2.gnocchi
|
||||
import cloudkitty.storage.v2.influx
|
||||
import cloudkitty.utils
|
||||
|
||||
__all__ = ['list_opts']
|
||||
|
@ -61,6 +62,8 @@ _opts = [
|
|||
cloudkitty.config.state_opts))),
|
||||
('storage', list(itertools.chain(
|
||||
cloudkitty.storage.storage_opts))),
|
||||
('storage_influx', list(itertools.chain(
|
||||
cloudkitty.storage.v2.influx.influx_storage_opts))),
|
||||
('storage_gnocchi', list(itertools.chain(
|
||||
cloudkitty.storage.v1.hybrid.backends.gnocchi.gnocchi_storage_opts))),
|
||||
('storage_gnocchi', list(itertools.chain(
|
||||
|
|
|
@ -166,7 +166,6 @@ class BaseStorage(object):
|
|||
{
|
||||
'begin': XXX,
|
||||
'end': XXX,
|
||||
'type': XXX,
|
||||
'rate': XXX,
|
||||
'groupby1': XXX,
|
||||
'groupby2': XXX
|
||||
|
|
|
@ -0,0 +1,369 @@
|
|||
# Copyright 2018 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Luka Peschke
|
||||
#
|
||||
import copy
|
||||
import datetime
|
||||
import decimal
|
||||
|
||||
import influxdb
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import six
|
||||
|
||||
from cloudkitty.storage import v2 as v2_storage
|
||||
from cloudkitty import utils
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.import_opt('period', 'cloudkitty.collector', 'collect')
|
||||
|
||||
INFLUX_STORAGE_GROUP = 'storage_influxdb'
|
||||
|
||||
influx_storage_opts = [
|
||||
cfg.StrOpt('username', help='InfluxDB username'),
|
||||
cfg.StrOpt('password', help='InfluxDB password', secret=True),
|
||||
cfg.StrOpt('database', help='InfluxDB database'),
|
||||
cfg.StrOpt('retention_policy', default='autogen',
|
||||
help='Retention policy to use'),
|
||||
cfg.StrOpt('host', help='InfluxDB host', default='localhost'),
|
||||
cfg.IntOpt('port', help='InfluxDB port', default=8086),
|
||||
cfg.BoolOpt(
|
||||
'use_ssl',
|
||||
help='Set to true to use ssl for influxDB connection. '
|
||||
'Defaults to False',
|
||||
default=False,
|
||||
),
|
||||
cfg.BoolOpt(
|
||||
'insecure',
|
||||
help='Set to true to authorize insecure HTTPS connections to '
|
||||
'influxDB. Defaults to False',
|
||||
default=False,
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'cacert',
|
||||
'Path of the CA certificate to trust for HTTPS connections',
|
||||
default=None
|
||||
),
|
||||
]
|
||||
|
||||
CONF.register_opts(influx_storage_opts, INFLUX_STORAGE_GROUP)
|
||||
|
||||
|
||||
class InfluxClient(object):
|
||||
"""Classe used to ease interaction with InfluxDB"""
|
||||
|
||||
def __init__(self, chunk_size=500, autocommit=True):
|
||||
"""Creates an InfluxClient object.
|
||||
|
||||
:param chunk_size: Size after which points should be pushed.
|
||||
:param autocommit: Set to false to disable autocommit
|
||||
"""
|
||||
self._conn = self._get_influx_client()
|
||||
self._chunk_size = chunk_size
|
||||
self._autocommit = autocommit
|
||||
self._retention_policy = CONF.storage_influxdb.retention_policy
|
||||
self._points = []
|
||||
|
||||
@staticmethod
|
||||
def _get_influx_client():
|
||||
verify = CONF.storage_influxdb.use_ssl and not \
|
||||
CONF.storage_influxdb.insecure
|
||||
|
||||
if verify and CONF.storage_influxdb.cacert:
|
||||
verify = CONF.storage_influxdb.cacert
|
||||
|
||||
return influxdb.InfluxDBClient(
|
||||
username=CONF.storage_influxdb.username,
|
||||
password=CONF.storage_influxdb.password,
|
||||
host=CONF.storage_influxdb.host,
|
||||
port=CONF.storage_influxdb.port,
|
||||
database=CONF.storage_influxdb.database,
|
||||
ssl=CONF.storage_influxdb.use_ssl,
|
||||
verify_ssl=verify,
|
||||
)
|
||||
|
||||
def retention_policy_exists(self, database, policy):
|
||||
policies = self._conn.get_list_retention_policies(database)
|
||||
return policy in [pol['name'] for pol in policies]
|
||||
|
||||
def commit(self):
|
||||
total_points = len(self._points)
|
||||
if len(self._points) < 1:
|
||||
return
|
||||
LOG.debug('Pushing {} points to InfluxDB'.format(total_points))
|
||||
self._conn.write_points(self._points,
|
||||
retention_policy=self._retention_policy)
|
||||
self._points = []
|
||||
|
||||
def append_point(self,
|
||||
metric_type,
|
||||
timestamp,
|
||||
qty, price, unit,
|
||||
fields, tags):
|
||||
"""Adds two points to commit to InfluxDB"""
|
||||
|
||||
measurement_fields = copy.deepcopy(fields)
|
||||
measurement_fields['qty'] = float(qty)
|
||||
measurement_fields['price'] = float(price)
|
||||
measurement_fields['unit'] = unit
|
||||
# Unfortunately, this seems to be the fastest way: Having several
|
||||
# measurements would imply a high client-side workload, and this allows
|
||||
# us to filter out unrequired keys
|
||||
measurement_fields['groupby'] = '|'.join(tags.keys())
|
||||
measurement_fields['metadata'] = '|'.join(fields.keys())
|
||||
|
||||
measurement_tags = copy.deepcopy(tags)
|
||||
measurement_tags['type'] = metric_type
|
||||
|
||||
self._points.append({
|
||||
'measurement': 'dataframes',
|
||||
'tags': measurement_tags,
|
||||
'fields': measurement_fields,
|
||||
'time': utils.ts2dt(timestamp),
|
||||
})
|
||||
if self._autocommit and len(self._points) >= self._chunk_size:
|
||||
self.commit()
|
||||
|
||||
@staticmethod
|
||||
def _get_filter(key, value):
|
||||
if isinstance(value, six.text_type):
|
||||
format_string = "{}='{}'"
|
||||
elif isinstance(value, (six.integer_types, float)):
|
||||
format_string = "{}='{}'"
|
||||
return format_string.format(key, value)
|
||||
|
||||
@staticmethod
|
||||
def _get_time_query(begin, end):
|
||||
return " WHERE time >= '{}' AND time < '{}'".format(
|
||||
utils.isotime(begin), utils.isotime(end))
|
||||
|
||||
def _get_filter_query(self, filters):
|
||||
if not filters:
|
||||
return ''
|
||||
return ' AND ' + ' AND '.join(
|
||||
self._get_filter(k, v) for k, v in filters.items())
|
||||
|
||||
@staticmethod
|
||||
def _get_type_query(types):
|
||||
if not types:
|
||||
return ''
|
||||
type_query = ' OR '.join("type='{}'".format(mtype)
|
||||
for mtype in types)
|
||||
return ' AND (' + type_query + ')'
|
||||
|
||||
def get_total(self, types, begin, end, groupby=None, filters=None):
|
||||
query = 'SELECT SUM(qty) AS qty, SUM(price) AS price FROM "dataframes"'
|
||||
query += self._get_time_query(begin, end)
|
||||
query += self._get_filter_query(filters)
|
||||
query += self._get_type_query(types)
|
||||
|
||||
if groupby:
|
||||
groupby_query = ','.join(groupby)
|
||||
query += ' GROUP BY ' + groupby_query
|
||||
|
||||
query += ';'
|
||||
|
||||
return self._conn.query(query)
|
||||
|
||||
def retrieve(self,
|
||||
types,
|
||||
filters,
|
||||
begin, end,
|
||||
offset=0, limit=1000, paginate=True):
|
||||
query = 'SELECT * FROM "dataframes"'
|
||||
query += self._get_time_query(begin, end)
|
||||
query += self._get_filter_query(filters)
|
||||
query += self._get_type_query(types)
|
||||
|
||||
if paginate:
|
||||
query += ' OFFSET {} LIMIT {}'.format(offset, limit)
|
||||
|
||||
query += ';'
|
||||
|
||||
total_query = 'SELECT COUNT(groupby) FROM "dataframes"'
|
||||
total_query += self._get_time_query(begin, end)
|
||||
total_query += self._get_filter_query(filters)
|
||||
total_query += self._get_type_query(types)
|
||||
total_query += ';'
|
||||
|
||||
total, result = self._conn.query(total_query + query)
|
||||
total = sum(point['count'] for point in total.get_points())
|
||||
return total, result
|
||||
|
||||
|
||||
class InfluxStorage(v2_storage.BaseStorage):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(InfluxStorage, self).__init__(*args, **kwargs)
|
||||
self._conn = InfluxClient()
|
||||
self._period = kwargs.get('period', None) or CONF.collect.period
|
||||
|
||||
def init(self):
|
||||
policy = CONF.storage_influxdb.retention_policy
|
||||
database = CONF.storage_influxdb.database
|
||||
if not self._conn.retention_policy_exists(database, policy):
|
||||
LOG.error(
|
||||
'Archive policy "{}" does not exist in database "{}"'.format(
|
||||
policy, database)
|
||||
)
|
||||
|
||||
def push(self, dataframes, scope_id=None):
|
||||
|
||||
for dataframe in dataframes:
|
||||
timestamp = dataframe['period']['begin']
|
||||
for metric_name, metrics in dataframe['usage'].items():
|
||||
for metric in metrics:
|
||||
self._conn.append_point(
|
||||
metric_name,
|
||||
timestamp,
|
||||
metric['vol']['qty'],
|
||||
metric['rating']['price'],
|
||||
metric['vol']['unit'],
|
||||
metric['metadata'],
|
||||
metric['groupby'],
|
||||
)
|
||||
|
||||
self._conn.commit()
|
||||
|
||||
@staticmethod
|
||||
def _check_begin_end(begin, end):
|
||||
if not begin:
|
||||
begin = utils.get_month_start()
|
||||
if not end:
|
||||
end = utils.get_next_month()
|
||||
if isinstance(begin, six.text_type):
|
||||
begin = utils.iso2dt(begin)
|
||||
if isinstance(begin, int):
|
||||
begin = utils.ts2dt(begin)
|
||||
if isinstance(end, six.text_type):
|
||||
end = utils.iso2dt(end)
|
||||
if isinstance(end, int):
|
||||
end = utils.ts2dt(end)
|
||||
|
||||
return begin, end
|
||||
|
||||
@staticmethod
|
||||
def _build_filters(filters, group_filters):
|
||||
output = None
|
||||
if filters and group_filters:
|
||||
output = copy.deepcopy(filters)
|
||||
output.update(group_filters)
|
||||
elif group_filters:
|
||||
output = group_filters
|
||||
return output
|
||||
|
||||
@staticmethod
|
||||
def _point_to_dataframe_entry(point):
|
||||
groupby = point.pop('groupby').split('|')
|
||||
metadata = point.pop('metadata').split('|')
|
||||
return {
|
||||
'vol': {
|
||||
'unit': point['unit'],
|
||||
'qty': decimal.Decimal(point['qty']),
|
||||
},
|
||||
'rating': {
|
||||
'price': point['price'],
|
||||
},
|
||||
'groupby': {key: point.get(key, '') for key in groupby},
|
||||
'metadata': {key: point.get(key, '') for key in metadata},
|
||||
}
|
||||
|
||||
def _build_dataframes(self, points):
|
||||
dataframes = {}
|
||||
for point in points:
|
||||
point_type = point['type']
|
||||
if point['time'] not in dataframes.keys():
|
||||
dataframes[point['time']] = {
|
||||
'period': {
|
||||
'begin': point['time'],
|
||||
'end': utils.isotime(
|
||||
utils.iso2dt(point['time'])
|
||||
+ datetime.timedelta(seconds=self._period)),
|
||||
},
|
||||
'usage': {},
|
||||
}
|
||||
usage = dataframes[point['time']]['usage']
|
||||
if point_type not in usage.keys():
|
||||
usage[point_type] = []
|
||||
usage[point_type].append(self._point_to_dataframe_entry(point))
|
||||
|
||||
output = list(dataframes.values())
|
||||
output.sort(key=lambda x: x['period']['begin'])
|
||||
return output
|
||||
|
||||
def retrieve(self, begin=None, end=None,
|
||||
filters=None, group_filters=None,
|
||||
metric_types=None,
|
||||
offset=0, limit=1000, paginate=True):
|
||||
begin, end = self._check_begin_end(begin, end)
|
||||
filters = self._build_filters(filters, group_filters)
|
||||
total, resp = self._conn.retrieve(
|
||||
metric_types, filters, begin, end, offset, limit, paginate)
|
||||
|
||||
# Unfortunately, a ResultSet has no values() method, so we need to
|
||||
# get them manually
|
||||
points = []
|
||||
for _, item in resp.items():
|
||||
points += list(item)
|
||||
|
||||
return {
|
||||
'total': total,
|
||||
'dataframes': self._build_dataframes(points)
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _get_total_elem(begin, end, groupby, series_groupby, point):
|
||||
output = {
|
||||
'begin': begin,
|
||||
'end': end,
|
||||
'qty': point['qty'],
|
||||
'rate': point['price'],
|
||||
}
|
||||
if groupby:
|
||||
for group in groupby:
|
||||
output[group] = series_groupby.get(group, '')
|
||||
return output
|
||||
|
||||
def total(self, groupby=None,
|
||||
begin=None, end=None,
|
||||
metric_types=None,
|
||||
filters=None, group_filters=None,
|
||||
offset=0, limit=1000, paginate=True):
|
||||
|
||||
begin, end = self._check_begin_end(begin, end)
|
||||
filters = self._build_filters(filters, group_filters)
|
||||
|
||||
total = self._conn.get_total(
|
||||
metric_types, begin, end, groupby, filters)
|
||||
|
||||
output = []
|
||||
for (series_name, series_groupby), points in total.items():
|
||||
for point in points:
|
||||
output.append(self._get_total_elem(
|
||||
begin, end,
|
||||
groupby,
|
||||
series_groupby,
|
||||
point))
|
||||
|
||||
if groupby:
|
||||
output.sort(key=lambda x: [x[group] for group in groupby])
|
||||
return {
|
||||
'total': len(output),
|
||||
'results': output[offset:limit] if paginate else output,
|
||||
}
|
|
@ -44,7 +44,7 @@ from cloudkitty import rating
|
|||
from cloudkitty import storage
|
||||
from cloudkitty.storage.v1.sqlalchemy import models
|
||||
from cloudkitty import tests
|
||||
from cloudkitty.tests import test_utils
|
||||
from cloudkitty.tests import utils as test_utils
|
||||
from cloudkitty.tests.utils import is_functional_test
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
|
|
@ -22,7 +22,10 @@ from oslo_utils import uuidutils
|
|||
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
# These have a different format in order to check that both forms are supported
|
||||
TENANT = 'f266f30b11f246b589fd266f85eeec39'
|
||||
OTHER_TENANT = '8d3ae500-89ea-4142-9c6e-1269db6a0b64'
|
||||
|
||||
INITIAL_TIMESTAMP = 1420070400
|
||||
FIRST_PERIOD_BEGIN = INITIAL_TIMESTAMP
|
||||
FIRST_PERIOD_BEGIN_ISO = ck_utils.ts2iso(FIRST_PERIOD_BEGIN)
|
||||
|
|
|
@ -22,8 +22,7 @@ from gnocchiclient import exceptions as gexc
|
|||
|
||||
from cloudkitty import storage
|
||||
from cloudkitty import tests
|
||||
from cloudkitty.tests import test_utils
|
||||
from cloudkitty.tests.utils import is_functional_test
|
||||
from cloudkitty.tests import utils as test_utils
|
||||
|
||||
|
||||
class BaseHybridStorageTest(tests.TestCase):
|
||||
|
@ -56,7 +55,7 @@ class PermissiveDict(object):
|
|||
return self.value == other.get(self.key)
|
||||
|
||||
|
||||
@testtools.skipIf(is_functional_test(), 'Not a functional test')
|
||||
@testtools.skipIf(test_utils.is_functional_test(), 'Not a functional test')
|
||||
class HybridStorageTestGnocchi(BaseHybridStorageTest):
|
||||
|
||||
def setUp(self):
|
||||
|
|
|
@ -24,8 +24,7 @@ import testscenarios
|
|||
from cloudkitty import storage
|
||||
from cloudkitty import tests
|
||||
from cloudkitty.tests import samples
|
||||
from cloudkitty.tests import test_utils
|
||||
from cloudkitty.tests.utils import is_functional_test
|
||||
from cloudkitty.tests import utils as test_utils
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
||||
|
@ -66,7 +65,7 @@ class StorageTest(tests.TestCase):
|
|||
self.storage.push(working_data, self._other_tenant_id)
|
||||
|
||||
|
||||
@testtools.skipIf(is_functional_test(), 'Not a functional test')
|
||||
@testtools.skipIf(test_utils.is_functional_test(), 'Not a functional test')
|
||||
class StorageDataframeTest(StorageTest):
|
||||
|
||||
storage_scenarios = [
|
||||
|
@ -130,7 +129,7 @@ class StorageDataframeTest(StorageTest):
|
|||
self.assertEqual(3, len(data))
|
||||
|
||||
|
||||
@testtools.skipIf(is_functional_test(), 'Not a functional test')
|
||||
@testtools.skipIf(test_utils.is_functional_test(), 'Not a functional test')
|
||||
class StorageTotalTest(StorageTest):
|
||||
|
||||
storage_scenarios = [
|
||||
|
@ -270,7 +269,7 @@ class StorageTotalTest(StorageTest):
|
|||
self.assertEqual(end, total[3]["end"])
|
||||
|
||||
|
||||
if not is_functional_test():
|
||||
if not test_utils.is_functional_test():
|
||||
StorageTest.generate_scenarios()
|
||||
StorageTotalTest.generate_scenarios()
|
||||
StorageDataframeTest.generate_scenarios()
|
||||
|
|
|
@ -26,7 +26,7 @@ from oslo_config import fixture as config_fixture
|
|||
from oslo_utils import uuidutils
|
||||
|
||||
from cloudkitty import storage
|
||||
from cloudkitty.tests import samples
|
||||
from cloudkitty.tests import utils as test_utils
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
||||
|
@ -42,43 +42,6 @@ def _init_conf():
|
|||
default_config_files=['/etc/cloudkitty/cloudkitty.conf'])
|
||||
|
||||
|
||||
def get_storage_data(min_length=10,
|
||||
nb_projects=2,
|
||||
project_ids=None,
|
||||
start=datetime(2018, 1, 1),
|
||||
end=datetime(2018, 1, 1, 1)):
|
||||
if isinstance(start, datetime):
|
||||
start = ck_utils.dt2ts(start)
|
||||
if isinstance(end, datetime):
|
||||
end = ck_utils.dt2ts(end)
|
||||
|
||||
if not project_ids:
|
||||
project_ids = [uuidutils.generate_uuid() for i in range(nb_projects)]
|
||||
elif not isinstance(project_ids, list):
|
||||
project_ids = [project_ids]
|
||||
|
||||
usage = {}
|
||||
for metric_name, sample in samples.V2_STORAGE_SAMPLE.items():
|
||||
dataframes = []
|
||||
for project_id in project_ids:
|
||||
data = [copy.deepcopy(sample)
|
||||
# for i in range(min_length + random.randint(1, 10))]
|
||||
for i in range(1)]
|
||||
for elem in data:
|
||||
elem['groupby']['id'] = uuidutils.generate_uuid()
|
||||
elem['groupby']['project_id'] = project_id
|
||||
dataframes += data
|
||||
usage[metric_name] = dataframes
|
||||
|
||||
return {
|
||||
'usage': usage,
|
||||
'period': {
|
||||
'begin': start,
|
||||
'end': end
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class BaseFunctionalStorageTest(testtools.TestCase):
|
||||
|
||||
# Name of the storage backend to test
|
||||
|
@ -138,8 +101,9 @@ class BaseFunctionalStorageTest(testtools.TestCase):
|
|||
@staticmethod
|
||||
def gen_data_separate_projects(nb_projects):
|
||||
project_ids = [uuidutils.generate_uuid() for i in range(nb_projects)]
|
||||
data = [get_storage_data(
|
||||
project_ids=project_ids[i], nb_projects=1)
|
||||
data = [
|
||||
test_utils.generate_v2_storage_data(
|
||||
project_ids=project_ids[i], nb_projects=1)
|
||||
for i in range(nb_projects)]
|
||||
return project_ids, data
|
||||
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
# Copyright 2018 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Luka Peschke
|
||||
#
|
||||
import copy
|
||||
import functools
|
||||
|
||||
from influxdb import resultset
|
||||
|
||||
from cloudkitty.storage.v2.influx import InfluxClient
|
||||
from cloudkitty import utils
|
||||
|
||||
|
||||
class FakeInfluxClient(InfluxClient):
|
||||
|
||||
total_sample = {
|
||||
"statement_id": 0,
|
||||
"series": []
|
||||
}
|
||||
|
||||
total_series_sample = {
|
||||
"name": "dataframes",
|
||||
"tags": {},
|
||||
"columns": ["time", "qty", "price"],
|
||||
"values": [],
|
||||
}
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super(FakeInfluxClient, self).__init__(autocommit=False)
|
||||
|
||||
def commit(self):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def __filter_func(types, filters, begin, end, elem):
|
||||
if elem['time'] < begin or elem['time'] >= end:
|
||||
return False
|
||||
if types and elem['tags']['type'] not in types:
|
||||
return False
|
||||
if filters is None:
|
||||
return True
|
||||
for key in filters.keys():
|
||||
if key not in elem['tags'].keys():
|
||||
return False
|
||||
if elem['tags'][key] != filters[key]:
|
||||
return False
|
||||
return True
|
||||
|
||||
def __get_target_serie(self, point, series, groupby):
|
||||
target_serie = None
|
||||
for serie in series:
|
||||
if not groupby:
|
||||
target_serie = serie
|
||||
break
|
||||
valid = True
|
||||
for tag in serie['tags'].keys():
|
||||
if tag not in point['tags'].keys() or \
|
||||
point['tags'][tag] != serie['tags'][tag]:
|
||||
valid = False
|
||||
break
|
||||
if valid:
|
||||
target_serie = serie
|
||||
break
|
||||
|
||||
if target_serie is None:
|
||||
target_serie = copy.deepcopy(self.total_series_sample)
|
||||
if groupby:
|
||||
target_serie['tags'] = {k: point['tags'][k] for k in groupby}
|
||||
else:
|
||||
target_serie['tags'] = {}
|
||||
target_serie['values'] = [['1970-01-01T00:00:00Z', 0, 0]]
|
||||
series.append(target_serie)
|
||||
return target_serie
|
||||
|
||||
def get_total(self, types, begin, end, groupby=None, filters=None):
|
||||
total = copy.deepcopy(self.total_sample)
|
||||
series = []
|
||||
|
||||
filter_func = functools.partial(
|
||||
self.__filter_func, types, filters, begin, end)
|
||||
points = filter(filter_func, self._points)
|
||||
|
||||
for point in points:
|
||||
target_serie = self.__get_target_serie(point, series, groupby)
|
||||
target_serie['values'][0][1] += point['fields']['qty']
|
||||
target_serie['values'][0][2] += point['fields']['price']
|
||||
total['series'] = series
|
||||
|
||||
return resultset.ResultSet(total)
|
||||
|
||||
def retrieve(self,
|
||||
types,
|
||||
filters,
|
||||
begin, end,
|
||||
offset=0, limit=1000, paginate=True):
|
||||
output = copy.deepcopy(self.total_sample)
|
||||
|
||||
filter_func = functools.partial(
|
||||
self.__filter_func, types, filters, begin, end)
|
||||
points = list(filter(filter_func, self._points))
|
||||
|
||||
columns = set()
|
||||
for point in list(points):
|
||||
columns.update(point['tags'].keys())
|
||||
columns.update(point['fields'].keys())
|
||||
columns.add('time')
|
||||
|
||||
series = {
|
||||
'name': 'dataframes',
|
||||
'columns': list(columns),
|
||||
}
|
||||
values = []
|
||||
|
||||
def __get_tag_or_field(point, key):
|
||||
if key == 'time':
|
||||
return utils.isotime(point['time'])
|
||||
return point['tags'].get(key) or point['fields'].get(key)
|
||||
|
||||
for point in points:
|
||||
values.append([__get_tag_or_field(point, key)
|
||||
for key in series['columns']])
|
||||
|
||||
series['values'] = values
|
||||
output['series'] = [series]
|
||||
|
||||
return len(points), resultset.ResultSet(output)
|
||||
|
||||
def retention_policy_exists(self, database, policy):
|
||||
return True
|
|
@ -0,0 +1,327 @@
|
|||
# Copyright 2018 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Luka Peschke
|
||||
#
|
||||
import datetime
|
||||
|
||||
import mock
|
||||
import testscenarios
|
||||
|
||||
from cloudkitty import storage
|
||||
from cloudkitty.tests import samples
|
||||
from cloudkitty.tests.storage.v2 import influx_utils
|
||||
from cloudkitty.tests import TestCase
|
||||
from cloudkitty.tests import utils as test_utils
|
||||
|
||||
|
||||
class StorageUnitTest(TestCase):
|
||||
|
||||
storage_scenarios = [
|
||||
('influx', dict(storage_backend='influxdb'))]
|
||||
|
||||
@classmethod
|
||||
def generate_scenarios(cls):
|
||||
cls.scenarios = testscenarios.multiply_scenarios(
|
||||
cls.scenarios,
|
||||
cls.storage_scenarios)
|
||||
|
||||
@mock.patch('cloudkitty.storage.v2.influx.InfluxClient',
|
||||
new=influx_utils.FakeInfluxClient)
|
||||
@mock.patch('cloudkitty.utils.load_conf', new=test_utils.load_conf)
|
||||
def setUp(self):
|
||||
super(StorageUnitTest, self).setUp()
|
||||
self._project_id = samples.TENANT
|
||||
self._other_project_id = samples.OTHER_TENANT
|
||||
self.conf.set_override('backend', self.storage_backend, 'storage')
|
||||
self.conf.set_override('version', '2', 'storage')
|
||||
self.storage = storage.get_storage(conf=test_utils.load_conf())
|
||||
self.storage.init()
|
||||
self.data = []
|
||||
self.init_data()
|
||||
|
||||
def init_data(self):
|
||||
project_ids = [self._project_id, self._other_project_id]
|
||||
for i in range(3):
|
||||
start_delta = 3600 * i
|
||||
end_delta = start_delta + 3600
|
||||
start = datetime.datetime(2018, 1, 1) \
|
||||
+ datetime.timedelta(seconds=start_delta)
|
||||
end = datetime.datetime(2018, 1, 1) \
|
||||
+ datetime.timedelta(seconds=end_delta)
|
||||
data = test_utils.generate_v2_storage_data(
|
||||
project_ids=project_ids,
|
||||
start=start,
|
||||
end=end)
|
||||
self.data.append(data)
|
||||
self.storage.push([data])
|
||||
|
||||
@staticmethod
|
||||
def _expected_total_qty_len(data, project_id=None, types=None):
|
||||
total = 0
|
||||
qty = 0
|
||||
length = 0
|
||||
for data_part in data:
|
||||
for mtype, usage_part in data_part['usage'].items():
|
||||
if types is not None and mtype not in types:
|
||||
continue
|
||||
for item in usage_part:
|
||||
if project_id is None or \
|
||||
project_id == item['groupby']['project_id']:
|
||||
total += item['rating']['price']
|
||||
qty += item['vol']['qty']
|
||||
length += 1
|
||||
|
||||
return round(float(total), 5), round(float(qty), 5), length
|
||||
|
||||
def _compare_get_total_result_with_expected(self,
|
||||
expected_qty,
|
||||
expected_total,
|
||||
expected_total_len,
|
||||
total):
|
||||
self.assertEqual(len(total['results']), expected_total_len)
|
||||
self.assertEqual(total['total'], expected_total_len)
|
||||
|
||||
returned_total = round(sum(r['rate'] for r in total['results']), 5)
|
||||
self.assertLessEqual(abs(expected_total - returned_total), 0.00001)
|
||||
|
||||
returned_qty = round(sum(r['qty'] for r in total['results']), 5)
|
||||
self.assertLessEqual(abs(expected_qty - returned_qty), 0.00001)
|
||||
|
||||
def test_get_total_all_scopes_all_periods(self):
|
||||
expected_total, expected_qty, _ = self._expected_total_qty_len(
|
||||
self.data)
|
||||
|
||||
begin = datetime.datetime(2018, 1, 1)
|
||||
end = datetime.datetime(2018, 1, 1, 4)
|
||||
|
||||
self._compare_get_total_result_with_expected(
|
||||
expected_qty,
|
||||
expected_total,
|
||||
1,
|
||||
self.storage.total(begin=begin, end=end))
|
||||
|
||||
def test_get_total_one_scope_all_periods(self):
|
||||
expected_total, expected_qty, _ = self._expected_total_qty_len(
|
||||
self.data, self._project_id)
|
||||
|
||||
begin = datetime.datetime(2018, 1, 1)
|
||||
end = datetime.datetime(2018, 1, 1, 4)
|
||||
|
||||
group_filters = {'project_id': self._project_id}
|
||||
self._compare_get_total_result_with_expected(
|
||||
expected_qty,
|
||||
expected_total,
|
||||
1,
|
||||
self.storage.total(begin=begin,
|
||||
end=end,
|
||||
group_filters=group_filters),
|
||||
)
|
||||
|
||||
def test_get_total_all_scopes_one_period(self):
|
||||
expected_total, expected_qty, _ = self._expected_total_qty_len(
|
||||
[self.data[0]])
|
||||
|
||||
begin = datetime.datetime(2018, 1, 1)
|
||||
end = datetime.datetime(2018, 1, 1, 1)
|
||||
|
||||
self._compare_get_total_result_with_expected(
|
||||
expected_qty,
|
||||
expected_total,
|
||||
1,
|
||||
self.storage.total(begin=begin, end=end))
|
||||
|
||||
def test_get_total_one_scope_one_period(self):
|
||||
expected_total, expected_qty, _ = self._expected_total_qty_len(
|
||||
[self.data[0]], self._project_id)
|
||||
expected_total, expected_qty, _ = self._expected_total_qty_len(
|
||||
[self.data[0]], self._project_id)
|
||||
|
||||
begin = datetime.datetime(2018, 1, 1)
|
||||
end = datetime.datetime(2018, 1, 1, 1)
|
||||
|
||||
group_filters = {'project_id': self._project_id}
|
||||
self._compare_get_total_result_with_expected(
|
||||
expected_qty,
|
||||
expected_total,
|
||||
1,
|
||||
self.storage.total(begin=begin,
|
||||
end=end,
|
||||
group_filters=group_filters),
|
||||
)
|
||||
|
||||
def test_get_total_all_scopes_all_periods_groupby_project_id(self):
|
||||
expected_total_first, expected_qty_first, _ = \
|
||||
self._expected_total_qty_len(self.data, self._project_id)
|
||||
expected_total_second, expected_qty_second, _ = \
|
||||
self._expected_total_qty_len(self.data, self._other_project_id)
|
||||
|
||||
begin = datetime.datetime(2018, 1, 1)
|
||||
end = datetime.datetime(2018, 1, 1, 4)
|
||||
total = self.storage.total(begin=begin, end=end,
|
||||
groupby=['project_id'])
|
||||
self.assertEqual(len(total['results']), 2)
|
||||
self.assertEqual(total['total'], 2)
|
||||
|
||||
for t in total['results']:
|
||||
self.assertIn('project_id', t.keys())
|
||||
|
||||
total['results'].sort(key=lambda x: x['project_id'], reverse=True)
|
||||
|
||||
self.assertLessEqual(
|
||||
abs(round(total['results'][0]['rate'], 5) - expected_total_first),
|
||||
0.00001,
|
||||
)
|
||||
self.assertLessEqual(
|
||||
abs(round(total['results'][1]['rate'], 5) - expected_total_second),
|
||||
0.00001,
|
||||
)
|
||||
self.assertLessEqual(
|
||||
abs(round(total['results'][0]['qty'], 5) - expected_qty_first),
|
||||
0.00001,
|
||||
)
|
||||
self.assertLessEqual(
|
||||
abs(round(total['results'][1]['qty'], 5) - expected_qty_second),
|
||||
0.00001,
|
||||
)
|
||||
|
||||
def test_get_total_all_scopes_one_period_groupby_project_id(self):
|
||||
expected_total_first, expected_qty_first, _ = \
|
||||
self._expected_total_qty_len([self.data[0]], self._project_id)
|
||||
expected_total_second, expected_qty_second, _ = \
|
||||
self._expected_total_qty_len([self.data[0]],
|
||||
self._other_project_id)
|
||||
|
||||
begin = datetime.datetime(2018, 1, 1)
|
||||
end = datetime.datetime(2018, 1, 1, 1)
|
||||
total = self.storage.total(begin=begin, end=end,
|
||||
groupby=['project_id'])
|
||||
self.assertEqual(len(total), 2)
|
||||
|
||||
for t in total['results']:
|
||||
self.assertIn('project_id', t.keys())
|
||||
|
||||
total['results'].sort(key=lambda x: x['project_id'], reverse=True)
|
||||
|
||||
self.assertLessEqual(
|
||||
abs(round(total['results'][0]['rate'], 5) - expected_total_first),
|
||||
0.00001,
|
||||
)
|
||||
self.assertLessEqual(
|
||||
abs(round(total['results'][1]['rate'], 5) - expected_total_second),
|
||||
0.00001,
|
||||
)
|
||||
self.assertLessEqual(
|
||||
abs(round(total['results'][0]['qty'], 5) - expected_qty_first),
|
||||
0.00001,
|
||||
)
|
||||
self.assertLessEqual(
|
||||
abs(round(total['results'][1]['qty'], 5) - expected_qty_second),
|
||||
0.00001,
|
||||
)
|
||||
|
||||
def test_get_total_all_scopes_all_periods_groupby_type_paginate(self):
|
||||
expected_total, expected_qty, _ = \
|
||||
self._expected_total_qty_len(self.data)
|
||||
|
||||
begin = datetime.datetime(2018, 1, 1)
|
||||
end = datetime.datetime(2018, 1, 1, 4)
|
||||
|
||||
total = {'total': 0, 'results': []}
|
||||
for offset in range(0, 7, 2):
|
||||
chunk = self.storage.total(
|
||||
begin=begin,
|
||||
end=end,
|
||||
offset=offset,
|
||||
limit=offset + 2,
|
||||
groupby=['type'])
|
||||
# there are seven metric types
|
||||
self.assertEqual(chunk['total'], 7)
|
||||
# last chunk, shorter
|
||||
if offset == 6:
|
||||
self.assertEqual(len(chunk['results']), 1)
|
||||
else:
|
||||
self.assertEqual(len(chunk['results']), 2)
|
||||
total['results'] += chunk['results']
|
||||
total['total'] += len(chunk['results'])
|
||||
|
||||
unpaginated_total = self.storage.total(
|
||||
begin=begin, end=end, groupby=['type'])
|
||||
self.assertEqual(total, unpaginated_total)
|
||||
|
||||
self._compare_get_total_result_with_expected(
|
||||
expected_qty,
|
||||
expected_total,
|
||||
7,
|
||||
total)
|
||||
|
||||
def test_retrieve_all_scopes_all_types(self):
|
||||
expected_total, expected_qty, expected_length = \
|
||||
self._expected_total_qty_len(self.data)
|
||||
|
||||
begin = datetime.datetime(2018, 1, 1)
|
||||
end = datetime.datetime(2018, 1, 1, 4)
|
||||
|
||||
frames = self.storage.retrieve(begin=begin, end=end)
|
||||
self.assertEqual(frames['total'], expected_length)
|
||||
|
||||
retrieved_length = 0
|
||||
for data_part in frames['dataframes']:
|
||||
for usage_part in data_part['usage'].values():
|
||||
retrieved_length += len(usage_part)
|
||||
|
||||
self.assertEqual(expected_length, retrieved_length)
|
||||
|
||||
def test_retrieve_all_scopes_one_type(self):
|
||||
expected_total, expected_qty, expected_length = \
|
||||
self._expected_total_qty_len(self.data, types=['image.size'])
|
||||
|
||||
begin = datetime.datetime(2018, 1, 1)
|
||||
end = datetime.datetime(2018, 1, 1, 4)
|
||||
|
||||
frames = self.storage.retrieve(begin=begin, end=end,
|
||||
metric_types=['image.size'])
|
||||
self.assertEqual(frames['total'], expected_length)
|
||||
|
||||
retrieved_length = 0
|
||||
for data_part in frames['dataframes']:
|
||||
for usage_part in data_part['usage'].values():
|
||||
retrieved_length += len(usage_part)
|
||||
|
||||
self.assertEqual(expected_length, retrieved_length)
|
||||
|
||||
def test_retrieve_one_scope_two_types_one_period(self):
|
||||
expected_total, expected_qty, expected_length = \
|
||||
self._expected_total_qty_len([self.data[0]], self._project_id,
|
||||
types=['image.size', 'instance'])
|
||||
|
||||
begin = datetime.datetime(2018, 1, 1)
|
||||
end = datetime.datetime(2018, 1, 1, 1)
|
||||
|
||||
group_filters = {'project_id': self._project_id}
|
||||
frames = self.storage.retrieve(begin=begin, end=end,
|
||||
group_filters=group_filters,
|
||||
metric_types=['image.size', 'instance'])
|
||||
self.assertEqual(frames['total'], expected_length)
|
||||
|
||||
retrieved_length = 0
|
||||
for data_part in frames['dataframes']:
|
||||
for usage_part in data_part['usage'].values():
|
||||
retrieved_length += len(usage_part)
|
||||
|
||||
self.assertEqual(expected_length, retrieved_length)
|
||||
|
||||
|
||||
if not test_utils.is_functional_test():
|
||||
StorageUnitTest.generate_scenarios()
|
|
@ -25,7 +25,6 @@ import unittest
|
|||
import mock
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from cloudkitty.tests.samples import DEFAULT_METRICS_CONF
|
||||
from cloudkitty.tests.utils import is_functional_test
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
@ -200,7 +199,3 @@ class ConvertUnitTest(unittest.TestCase):
|
|||
def test_convert_decimal(self):
|
||||
result = ck_utils.num2decimal(decimal.Decimal(2))
|
||||
self.assertEqual(result, decimal.Decimal(2))
|
||||
|
||||
|
||||
def load_conf(*args):
|
||||
return DEFAULT_METRICS_CONF
|
||||
|
|
|
@ -15,8 +15,56 @@
|
|||
#
|
||||
# @author: Luka Peschke
|
||||
#
|
||||
import copy
|
||||
from datetime import datetime
|
||||
from os import getenv
|
||||
import random
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from cloudkitty.tests import samples
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
||||
def is_functional_test():
|
||||
return getenv('TEST_FUNCTIONAL', False)
|
||||
|
||||
|
||||
def generate_v2_storage_data(min_length=10,
|
||||
nb_projects=2,
|
||||
project_ids=None,
|
||||
start=datetime(2018, 1, 1),
|
||||
end=datetime(2018, 1, 1, 1)):
|
||||
if isinstance(start, datetime):
|
||||
start = ck_utils.dt2ts(start)
|
||||
if isinstance(end, datetime):
|
||||
end = ck_utils.dt2ts(end)
|
||||
|
||||
if not project_ids:
|
||||
project_ids = [uuidutils.generate_uuid() for i in range(nb_projects)]
|
||||
elif not isinstance(project_ids, list):
|
||||
project_ids = [project_ids]
|
||||
|
||||
usage = {}
|
||||
for metric_name, sample in samples.V2_STORAGE_SAMPLE.items():
|
||||
dataframes = []
|
||||
for project_id in project_ids:
|
||||
data = [copy.deepcopy(sample)
|
||||
for i in range(min_length + random.randint(1, 10))]
|
||||
for elem in data:
|
||||
elem['groupby']['id'] = uuidutils.generate_uuid()
|
||||
elem['groupby']['project_id'] = project_id
|
||||
dataframes += data
|
||||
usage[metric_name] = dataframes
|
||||
|
||||
return {
|
||||
'usage': usage,
|
||||
'period': {
|
||||
'begin': start,
|
||||
'end': end
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def load_conf(*args):
|
||||
return samples.DEFAULT_METRICS_CONF
|
||||
|
|
|
@ -26,6 +26,7 @@ six==1.9.0 # MIT
|
|||
stevedore==1.5.0 # Apache-2.0
|
||||
tooz==1.28.0 # Apache-2.0
|
||||
voluptuous==0.11.1 # BSD-3
|
||||
influxdb==5.1.0 # MIT
|
||||
|
||||
# test-requirements
|
||||
coverage==3.6 # Apache-2.0
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
An InfluxDB v2 storage backend has been added. It will become the default
|
||||
backend of the v2 storage interface.
|
||||
|
||||
The v1 storage interface will be deprecated in a future release. At that
|
||||
point, documentation about how to upgrade the storage backend will be made
|
||||
available, along with some helpers.
|
|
@ -28,3 +28,4 @@ six>=1.9.0 # MIT
|
|||
stevedore>=1.5.0 # Apache-2.0
|
||||
tooz>=1.28.0 # Apache-2.0
|
||||
voluptuous>=0.11.1 # BSD License
|
||||
influxdb>=5.1.0,!=5.2.0 # MIT
|
||||
|
|
|
@ -72,6 +72,7 @@ cloudkitty.storage.v1.backends =
|
|||
|
||||
cloudkitty.storage.v2.backends =
|
||||
gnocchi = cloudkitty.storage.v2.gnocchi:GnocchiStorage
|
||||
influxdb = cloudkitty.storage.v2.influx:InfluxStorage
|
||||
|
||||
cloudkitty.storage.hybrid.backends =
|
||||
gnocchi = cloudkitty.storage.v1.hybrid.backends.gnocchi:GnocchiStorage
|
||||
|
|
Loading…
Reference in New Issue