Refactor the storage backend

This adds an hybrid storage backend to CloudKitty. It aims at splitting the
processor state management and the dataframe storage part. The default backend
for this storage is gnocchi, but other backends will be implemented in the
future.

This storage backend is a standard storage module for now, but it will become
the default storage once it is considered stable. Once this new storage type
has become the default, storage backend creation will be easier (no state
handling).

Change-Id: I61c0c17230350b12be3484ea4b5805960aa33099
Story: 2001372
This commit is contained in:
Luka Peschke 2018-01-03 11:48:19 +01:00 committed by Maxime Cottret
parent 49379ba8f9
commit d2f7e06362
16 changed files with 1360 additions and 254 deletions

View File

@ -18,12 +18,15 @@
import abc
from oslo_config import cfg
from oslo_log import log as logging
import six
from stevedore import driver
from cloudkitty import collector as ck_collector
from cloudkitty import utils as ck_utils
LOG = logging.getLogger(__name__)
storage_opts = [
cfg.StrOpt('backend',
default='sqlalchemy',
@ -47,6 +50,9 @@ def get_storage(collector=None):
cfg.CONF.storage.backend,
invoke_on_load=True,
invoke_kwds=storage_args).driver
if cfg.CONF.storage.backend not in ['sqlalchemy', 'hybrid']:
LOG.warning('{} storage backend is deprecated and will be removed '
'in a future release.'.format(cfg.CONF.storage.backend))
return backend

View File

@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Luka Peschke
#
from oslo_config import cfg
from oslo_db.sqlalchemy import utils
from stevedore import driver
from cloudkitty import db
from cloudkitty import storage
from cloudkitty.storage.hybrid import migration
from cloudkitty.storage.hybrid import models
from cloudkitty import utils as ck_utils
storage_opts = [
cfg.StrOpt(
'backend',
default='gnocchi',
help='Name of the storage backend that should be used '
'by the hybrid storage')
]
CONF = cfg.CONF
CONF.register_opts(storage_opts, group='hybrid_storage')
HYBRID_BACKENDS_NAMESPACE = 'cloudkitty.storage.hybrid.backends'
class HybridStorage(storage.BaseStorage):
"""Hybrid Storage Backend.
Stores dataframes in one of the available backends and other informations
in a classical SQL database.
"""
state_model = models.TenantState
def __init__(self, **kwargs):
super(HybridStorage, self).__init__(**kwargs)
self._hybrid_backend = driver.DriverManager(
HYBRID_BACKENDS_NAMESPACE,
cfg.CONF.hybrid_storage.backend,
invoke_on_load=True).driver
self._sql_session = {}
def _check_session(self, tenant_id):
session = self._sql_session.get(tenant_id, None)
if not session:
self._sql_session[tenant_id] = db.get_session()
self._sql_session[tenant_id].begin()
def init(self):
migration.upgrade('head')
self._hybrid_backend.init()
def get_state(self, tenant_id=None):
session = db.get_session()
q = utils.model_query(self.state_model, session)
if tenant_id:
q = q.filter(self.state_model.tenant_id == tenant_id)
q = q.order_by(self.state_model.state.desc())
r = q.first()
return ck_utils.dt2ts(r.state) if r else None
def _set_state(self, tenant_id, state):
self._check_session(tenant_id)
session = self._sql_session[tenant_id]
q = utils.model_query(self.state_model, session)
if tenant_id:
q = q.filter(self.state_model.tenant_id == tenant_id)
r = q.first()
do_commit = False
if r:
if state >= r.state:
q.update({'state': state})
do_commit = True
else:
state = self.state_model(tenant_id=tenant_id, state=state)
session.add(state)
do_commit = True
if do_commit:
session.commit()
def _commit(self, tenant_id):
self._hybrid_backend.commit(tenant_id, self.get_state(tenant_id))
def _pre_commit(self, tenant_id):
super(HybridStorage, self)._pre_commit(tenant_id)
def _post_commit(self, tenant_id):
self._set_state(tenant_id, self.usage_start_dt.get(tenant_id))
super(HybridStorage, self)._post_commit(tenant_id)
del self._sql_session[tenant_id]
def get_total(self, begin=None, end=None, tenant_id=None,
service=None, groupby=None):
return self._hybrid_backend.get_total(
begin=begin, end=end, tenant_id=tenant_id,
service=service, groupby=groupby)
def _dispatch(self, data, tenant_id):
for service in data:
for frame in data[service]:
self._hybrid_backend.append_time_frame(
service, frame, tenant_id)
self._has_data[tenant_id] = True
def get_tenants(self, begin, end):
return self._hybrid_backend.get_tenants(begin, end)
def get_time_frame(self, begin, end, **filters):
return self._hybrid_backend.get_time_frame(begin, end, **filters)

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Luka Peschke
#
from cloudkitty.common.db.alembic import env # noqa
from cloudkitty.storage.hybrid import models
target_metadata = models.Base.metadata
version_table = 'storage_hybrid_alembic'
env.run_migrations_online(target_metadata, version_table)

View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Luka Peschke
#
"""initial revision
Revision ID: 03da4bb002b9
Revises: None
Create Date: 2017-11-21 15:59:26.776639
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '03da4bb002b9'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'hybrid_storage_states',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('tenant_id', sa.String(length=32), nullable=False),
sa.Column('state', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id'),
mysql_charset='utf8',
mysql_engine='InnoDB')

View File

@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Luka Peschke
#
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class BaseHybridBackend(object):
"""Base Backend class for the Hybrid Storage.
This is the interface that all backends for the hybrid storage
should implement.
"""
@abc.abstractmethod
def commit(self, tenant_id, state):
"""Push data to the storage backend.
:param tenant_id: id of the tenant which information must be commited.
"""
pass
@abc.abstractmethod
def init(self):
"""Initialize hybrid storage backend.
Can be used to create DB scheme on first start
"""
pass
@abc.abstractmethod
def get_total(self, begin=None, end=None, tenant_id=None,
service=None, groupby=None):
"""Return the current total.
:param begin: When to start filtering.
:type begin: datetime.datetime
:param end: When to stop filtering.
:type end: datetime.datetime
:param tenant_id: Filter on the tenant_id.
:type res_type: str
:param service: Filter on the resource type.
:type service: str
:param groupby: Fields to group by, separated by commas if multiple.
:type groupby: str
"""
pass
@abc.abstractmethod
def append_time_frame(self, res_type, frame, tenant_id):
"""Append a time frame to commit to the backend.
:param res_type: The resource type of the dataframe.
:param frame: The timeframe to append.
:param tenant_id: Tenant the frame is belonging to.
"""
pass
@abc.abstractmethod
def get_tenants(self, begin, end):
"""Return the list of rated tenants.
:param begin: When to start filtering.
:type begin: datetime.datetime
:param end: When to stop filtering.
:type end: datetime.datetime
"""
@abc.abstractmethod
def get_time_frame(self, begin, end, **filters):
"""Request a time frame from the storage backend.
:param begin: When to start filtering.
:type begin: datetime.datetime
:param end: When to stop filtering.
:type end: datetime.datetime
:param res_type: (Optional) Filter on the resource type.
:type res_type: str
:param tenant_id: (Optional) Filter on the tenant_id.
:type res_type: str
"""

View File

@ -0,0 +1,461 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Luka Peschke
#
import datetime
import decimal
import json
from gnocchiclient import client as gclient
from gnocchiclient import exceptions as gexceptions
from keystoneauth1 import loading as ks_loading
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import uuidutils
import six
from cloudkitty.storage.hybrid.backends import BaseHybridBackend
from cloudkitty.transformer import gnocchi as gtransformer
import cloudkitty.utils as ck_utils
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('period', 'cloudkitty.collector', 'collect')
GNOCCHI_STORAGE_OPTS = 'storage_gnocchi'
gnocchi_storage_opts = [
cfg.StrOpt('archive_policy_name',
default='rating',
help='Gnocchi storage archive policy name.'),
# The archive policy definition MUST include the collect period granularity
cfg.StrOpt('archive_policy_definition',
default='[{"granularity": '
+ six.text_type(CONF.collect.period) +
', "timespan": "90 days"}, '
'{"granularity": 86400, "timespan": "360 days"}, '
'{"granularity": 2592000, "timespan": "1800 days"}]',
help='Gnocchi storage archive policy definition.'), ]
CONF.register_opts(gnocchi_storage_opts, GNOCCHI_STORAGE_OPTS)
ks_loading.register_session_conf_options(
CONF,
GNOCCHI_STORAGE_OPTS)
ks_loading.register_auth_conf_options(
CONF,
GNOCCHI_STORAGE_OPTS)
METRICS_CONF = ck_utils.get_metrics_conf(CONF.collect.metrics_conf)
RESOURCE_TYPE_NAME_ROOT = 'rating_service_'
class DecimalJSONEncoder(json.JSONEncoder):
"""Wrapper class to handle decimal.Decimal objects in json.dumps()."""
def default(self, obj):
if isinstance(obj, decimal.Decimal):
return float(obj)
return super(DecimalJSONEncoder, self).default(obj)
class UnknownResourceType(Exception):
"""Exception raised when an unknown resource type is encountered"""
def __init__(self, resource_type):
super(UnknownResourceType, self).__init__(
'Unknown resource type {}'.format(resource_type)
)
class GnocchiStorage(BaseHybridBackend):
"""Gnocchi backend for hybrid storage.
"""
# NOTE(lukapeschke): List taken directly from gnocchi code
invalid_attribute_names = [
"id", "type", "metrics",
"revision", "revision_start", "revision_end",
"started_at", "ended_at",
"user_id", "project_id",
"created_by_user_id", "created_by_project_id", "get_metric",
"creator",
]
@staticmethod
def _get_service_metrics(service_name):
metrics = METRICS_CONF['services_metrics'][service_name]
metric_list = ['price']
for metric in metrics:
metric_list.append(list(metric.keys())[0])
return metric_list
def _init_resource_types(self):
transformer = gtransformer.GnocchiTransformer()
services = METRICS_CONF['services']
for service in services:
service_dict = dict()
service_dict['attributes'] = list()
for attribute in transformer.get_metadata(service):
if attribute not in self.invalid_attribute_names:
service_dict['attributes'].append(attribute)
service_dict['required_attributes'] = [
'resource_id',
'unit',
]
try:
service_dict['metrics'] = self._get_service_metrics(service)
except KeyError:
LOG.warning(
'No metrics configured for service {}'.format(service))
service_dict['metrics'] = list()
service_dict['name'] = RESOURCE_TYPE_NAME_ROOT + service
service_dict['qty_metric'] \
= list(METRICS_CONF['metrics_units'][service].keys())[0]
self._resource_type_data[service] = service_dict
def _get_res_type_dict(self, res_type):
res_type_data = self._resource_type_data.get(res_type, None)
if not res_type_data:
return None
attribute_dict = dict()
for attribute in res_type_data['attributes']:
attribute_dict[attribute] = {
'required': False,
'type': 'string',
}
for attribute in res_type_data['required_attributes']:
attribute_dict[attribute] = {
'required': True,
'type': 'string',
}
return {
'name': res_type_data['name'],
'attributes': attribute_dict,
}
def _create_resource(self, res_type, tenant_id, data):
res_type_data = self._resource_type_data.get(res_type, None)
if not res_type_data:
raise UnknownResourceType(
"Unknown resource type '{}'".format(res_type))
res_dict = {
'id': data['resource_id'],
'resource_id': data['resource_id'],
'project_id': tenant_id,
'user_id': data['user_id'],
'unit': data['unit'],
}
for attr in res_type_data['attributes']:
res_dict[attr] = data.get(attr, None) or 'None'
if isinstance(res_dict[attr], decimal.Decimal):
res_dict[attr] = float(res_dict[attr])
created_metrics = [
self._conn.metric.create({
'name': metric,
'archive_policy_name':
CONF.storage_gnocchi.archive_policy_name,
}) for metric in res_type_data['metrics']
]
metrics_dict = dict()
for metric in created_metrics:
metrics_dict[metric['name']] = metric['id']
res_dict['metrics'] = metrics_dict
try:
return self._conn.resource.create(res_type_data['name'], res_dict)
except gexceptions.ResourceAlreadyExists:
res_dict['id'] = uuidutils.generate_uuid()
return self._conn.resource.create(res_type_data['name'], res_dict)
def _get_resource(self, resource_type, resource_id):
try:
resource_name = self._resource_type_data[resource_type]['name']
except KeyError:
raise UnknownResourceType(
"Unknown resource type '{}'".format(resource_type))
try:
return self._conn.resource.get(resource_name, resource_id)
except gexceptions.ResourceNotFound:
return None
def _find_resource(self, resource_type, resource_id):
try:
resource_type = self._resource_type_data[resource_type]['name']
except KeyError:
raise UnknownResourceType(
"Unknown resource type '{}'".format(resource_type))
query = {
'=': {
'resource_id': resource_id,
}
}
try:
return self._conn.resource.search(
resource_type=resource_type, query=query, limit=1)[0]
except IndexError:
return None
def _create_resource_type(self, resource_type):
res_type = self._resource_type_data.get(resource_type, None)
if not res_type:
return None
res_type_dict = self._get_res_type_dict(resource_type)
try:
output = self._conn.resource_type.create(res_type_dict)
except gexceptions.ResourceTypeAlreadyExists:
output = None
return output
def _get_resource_type(self, resource_type):
res_type = self._resource_type_data.get(resource_type, None)
if not res_type:
return None
return self._conn.resource_type.get(res_type['name'])
def __init__(self, **kwargs):
super(GnocchiStorage, self).__init__(**kwargs)
self.auth = ks_loading.load_auth_from_conf_options(
CONF,
GNOCCHI_STORAGE_OPTS)
self.session = ks_loading.load_session_from_conf_options(
CONF,
GNOCCHI_STORAGE_OPTS,
auth=self.auth)
self._conn = gclient.Client('1', session=self.session)
self._archive_policy_name = (
CONF.storage_gnocchi.archive_policy_name)
self._archive_policy_definition = json.loads(
CONF.storage_gnocchi.archive_policy_definition)
self._period = CONF.collect.period
if "period" in kwargs:
self._period = kwargs["period"]
self._measurements = dict()
self._resource_type_data = dict()
self._init_resource_types()
def commit(self, tenant_id, state):
if not self._measurements.get(tenant_id, None):
return
commitable_measurements = dict()
for metrics in self._measurements[tenant_id].values():
for metric_id, measurements in metrics.items():
if measurements:
measures = list()
for measurement in measurements:
measures.append(
{
'timestamp': state,
'value': measurement,
}
)
commitable_measurements[metric_id] = measures
if commitable_measurements:
self._conn.metric.batch_metrics_measures(commitable_measurements)
del self._measurements[tenant_id]
def init(self):
try:
self._conn.archive_policy.get(self._archive_policy_name)
except gexceptions.ArchivePolicyNotFound:
ck_archive_policy = {}
ck_archive_policy['name'] = self._archive_policy_name
ck_archive_policy['back_window'] = 0
ck_archive_policy['aggregation_methods'] \
= ['std', 'count', 'min', 'max', 'sum', 'mean']
ck_archive_policy['definition'] = self._archive_policy_definition
self._conn.archive_policy.create(ck_archive_policy)
for service in self._resource_type_data.keys():
try:
self._get_resource_type(service)
except gexceptions.ResourceTypeNotFound:
self._create_resource_type(service)
def get_total(self, begin=None, end=None, tenant_id=None,
service=None, groupby=None):
# Query can't be None if we don't specify a resource_id
query = {}
if tenant_id:
query['='] = {"project_id": tenant_id}
measures = self._conn.metric.aggregation(
metrics='price', query=query,
start=begin, stop=end,
aggregation='sum',
granularity=self._period,
needed_overlap=0)
rate = sum(measure[2] for measure in measures) if len(measures) else 0
return [{
'begin': begin,
'end': end,
'rate': rate,
}]
def _append_measurements(self, resource, data, tenant_id):
if not self._measurements.get(tenant_id, None):
self._measurements[tenant_id] = {}
measurements = self._measurements[tenant_id]
if not measurements.get(resource['id'], None):
measurements[resource['id']] = {
key: list() for key in resource['metrics'].values()
}
for metric_name, metric_id in resource['metrics'].items():
measurement = data.get(metric_name, None)
if measurement is not None:
measurements[resource['id']][metric_id].append(
float(measurement)
if isinstance(measurement, decimal.Decimal)
else measurement)
def append_time_frame(self, res_type, frame, tenant_id):
flat_frame = ck_utils.flat_dict(frame)
resource = self._find_resource(res_type, flat_frame['resource_id'])
if not resource:
resource = self._create_resource(res_type, tenant_id, flat_frame)
self._append_measurements(resource, flat_frame, tenant_id)
def get_tenants(self, begin, end):
query = {'like': {'type': RESOURCE_TYPE_NAME_ROOT + '%'}}
r = self._conn.metric.aggregation(
metrics='price',
query=query,
start=begin,
stop=end,
aggregation='sum',
granularity=self._period,
needed_overlap=0,
groupby='project_id')
projects = list()
for measures in r:
projects.append(measures['group']['project_id'])
return projects
@staticmethod
def _get_time_query(start, end, resource_type, tenant_id=None):
query = {'and': [{
'or': [
{'=': {'ended_at': None}},
{'<=': {'ended_at': end}}
]
},
{'>=': {'started_at': start}},
{'=': {'type': resource_type}},
]
}
if tenant_id:
query['and'].append({'=': {'project_id': tenant_id}})
return query
def _get_resources(self, resource_type, start, end, tenant_id=None):
"""Returns the resources of the given type in the given period"""
return self._conn.resource.search(
resource_type=resource_type,
query=self._get_time_query(start, end, resource_type, tenant_id),
details=True)
def _format_frame(self, res_type, resource, desc, measure, tenant_id):
res_type_info = self._resource_type_data.get(res_type, None)
if not res_type_info:
return dict()
start = measure[0]
stop = start + datetime.timedelta(seconds=self._period)
# Getting price
price = decimal.Decimal(measure[2])
price_dict = {'price': float(price)}
# Getting vol
if isinstance(res_type_info['qty_metric'], (str, unicode)):
try:
qty = self._conn.metric.get_measures(
resource['metrics'][res_type_info['qty_metric']],
aggregation='sum',
start=start, stop=stop,
refresh=True)[-1][2]
except IndexError:
qty = 0
else:
qty = res_type_info['qty_metric']
vol_dict = {'qty': decimal.Decimal(qty), 'unit': resource['unit']}
# Period
period_dict = {
'begin': ck_utils.dt2iso(start),
'end': ck_utils.dt2iso(stop),
}
# Formatting
res_dict = dict()
res_dict['desc'] = desc
res_dict['vol'] = vol_dict
res_dict['rating'] = price_dict
res_dict['tenant_id'] = tenant_id
return {
'usage': {res_type: [res_dict]},
'period': period_dict,
}
def resource_info(self, resource_type, start, end, tenant_id=None):
"""Returns a dataframe for the given resource type"""
try:
res_type_info = self._resource_type_data.get(resource_type, None)
resource_name = res_type_info['name']
except (KeyError, AttributeError):
raise UnknownResourceType(resource_type)
attributes = res_type_info['attributes'] \
+ res_type_info['required_attributes']
output = list()
query = self._get_time_query(start, end, resource_name, tenant_id)
measures = self._conn.metric.aggregation(
metrics='price',
resource_type=resource_name,
query=query,
start=start,
stop=end,
granularity=self._period,
aggregation='sum',
needed_overlap=0,
groupby=['type', 'id'],
)
for resource_measures in measures:
resource_desc = None
resource = None
for measure in resource_measures['measures']:
if not resource_desc:
resource = self._get_resource(
resource_type, resource_measures['group']['id'])
if not resource:
continue
desc = {a: resource.get(a, None) for a in attributes}
formatted_frame = self._format_frame(
resource_type, resource, desc, measure, tenant_id)
output.append(formatted_frame)
return output
def get_time_frame(self, begin, end, **filters):
tenant_id = filters.get('tenant_id', None)
resource_types = [filters.get('res_type', None)]
if not resource_types[0]:
resource_types = self._resource_type_data.keys()
output = list()
for resource_type in resource_types:
output += self.resource_info(resource_type, begin, end, tenant_id)
return output

View File

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
import os
from cloudkitty.common.db.alembic import migration
ALEMBIC_REPO = os.path.join(os.path.dirname(__file__), 'alembic')
def upgrade(revision):
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.upgrade(config, revision)
def version():
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.version(config)
def revision(message, autogenerate):
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.revision(config, message, autogenerate)
def stamp(revision):
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.stamp(config, revision)

View File

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Luka Peschke
#
from oslo_db.sqlalchemy import models
import sqlalchemy
from sqlalchemy.ext import declarative
Base = declarative.declarative_base()
class TenantState(Base, models.ModelBase):
"""A tenant state.
"""
__table_args__ = {'mysql_charset': "utf8",
'mysql_engine': "InnoDB"}
__tablename__ = 'hybrid_storage_states'
id = sqlalchemy.Column(sqlalchemy.Integer,
primary_key=True)
tenant_id = sqlalchemy.Column(sqlalchemy.String(32),
nullable=False)
state = sqlalchemy.Column(sqlalchemy.DateTime,
nullable=False)

View File

@ -36,6 +36,7 @@ COMPUTE_METADATA = {
'flavor': 'm1.nano',
'image_id': 'f5600101-8fa2-4864-899e-ebcb7ed6b568',
'instance_id': '26c084e1-b8f1-4cbc-a7ec-e8b356788a17',
'resource_id': '1558f911-b55a-4fd2-9173-c8f1f23e5639',
'memory': '64',
'metadata': {
'farm': 'prod'
@ -47,6 +48,7 @@ COMPUTE_METADATA = {
IMAGE_METADATA = {
'checksum': '836c69cbcd1dc4f225daedbab6edc7c7',
'resource_id': '7b5b73f2-9181-4307-a710-b1aa6472526d',
'container_format': 'aki',
'created_at': '2014-06-04T16:26:01',
'deleted': 'False',
@ -127,3 +129,62 @@ STORED_DATA[1]['usage']['compute'][0]['rating'] = {
'price': 0.42}
STORED_DATA = split_storage_data(STORED_DATA)
METRICS_CONF = {
'collector': 'gnocchi',
'name': 'OpenStack',
'period': 3600,
'services': [
'compute',
'volume',
'network.bw.in',
'network.bw.out',
'network.floating',
'image'
],
'services_metrics': {
'compute': [
{'vcpus': 'max'},
{'memory': 'max'},
{'cpu': 'max'},
{'disk.root.size': 'max'},
{'disk.ephemeral.size': 'max'}
],
'image': [
{'image.size': 'max'},
{'image.download': 'max'},
{'image.serve': 'max'}
],
'network.bw.in': [{'network.incoming.bytes': 'max'}],
'network.bw.out': [{'network.outgoing.bytes': 'max'}],
'network.floating': [{'ip.floating': 'max'}],
'volume': [{'volume.size': 'max'}],
'radosgw.usage': [{'radosgw.objects.size': 'max'}]},
'services_objects': {
'compute': 'instance',
'image': 'image',
'network.bw.in': 'instance_network_interface',
'network.bw.out': 'instance_network_interface',
'network.floating': 'network',
'volume': 'volume',
'radosgw.usage': 'ceph_account',
},
'metrics_units': {
'compute': {1: {'unit': 'instance'}},
'default_unit': {1: {'unit': 'unknown'}},
'image': {'image.size': {'unit': 'MiB', 'factor': '1/1048576'}},
'network.bw.in': {'network.incoming.bytes': {
'unit': 'MB',
'factor': '1/1000000'}},
'network.bw.out': {'network.outgoing.bytes': {
'unit': 'MB',
'factor': '1/1000000'}},
'network.floating': {1: {'unit': 'ip'}},
'volume': {'volume.size': {'unit': 'GiB'}},
'radosgw.usage': {'radosgw.objects.size': {
'unit': 'GiB',
'factor': '1/1073741824'}},
},
'wait_periods': 2,
'window': 1800
}

View File

@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Luka Peschke
#
import mock
from gnocchiclient import exceptions as gexc
from cloudkitty import storage
from cloudkitty.storage.hybrid.backends import gnocchi as hgnocchi
from cloudkitty import tests
from cloudkitty.tests import samples
class BaseHybridStorageTest(tests.TestCase):
def setUp(self):
super(BaseHybridStorageTest, self).setUp()
self.conf.set_override('backend', 'hybrid', 'storage')
hgnocchi.METRICS_CONF = samples.METRICS_CONF
self.storage = storage.get_storage()
with mock.patch.object(
self.storage._hybrid_backend, 'init'):
self.storage.init()
class PermissiveDict(object):
"""Allows to check a single key of a dict in an assertion.
Example:
>>> mydict = {'a': 'A', 'b': 'B'}
>>> checker = PermissiveDict('A', key='a')
>>> checker == mydict
True
"""
def __init__(self, value, key='name'):
self.key = key
self.value = value
def __eq__(self, other):
return self.value == other.get(self.key)
class HybridStorageTestGnocchi(BaseHybridStorageTest):
def setUp(self):
super(HybridStorageTestGnocchi, self).setUp()
def tearDown(self):
super(HybridStorageTestGnocchi, self).tearDown()
def _init_storage(self, archive_policy=False, res_type=False):
with mock.patch.object(self.storage._hybrid_backend._conn,
'archive_policy',
spec=['get', 'create']) as pol_mock:
if not archive_policy:
pol_mock.get.side_effect = gexc.ArchivePolicyNotFound
else:
pol_mock.create.side_effect = gexc.ArchivePolicyAlreadyExists
with mock.patch.object(self.storage._hybrid_backend._conn,
'resource_type',
spec=['get', 'create']) as rtype_mock:
if not res_type:
rtype_mock.get.side_effect = gexc.ResourceTypeNotFound
else:
rtype_mock.create.side_effect \
= gexc.ResourceTypeAlreadyExists
self.storage.init()
rtype_data = self.storage._hybrid_backend._resource_type_data
rtype_calls = list()
for val in rtype_data.values():
rtype_calls.append(
mock.call(PermissiveDict(val['name'], key='name')))
if res_type:
rtype_mock.create.assert_not_called()
else:
rtype_mock.create.assert_has_calls(
rtype_calls, any_order=True)
pol_mock.get.assert_called_once_with(
self.storage._hybrid_backend._archive_policy_name)
if archive_policy:
pol_mock.create.assert_not_called()
else:
apolicy = {
'name': self.storage._hybrid_backend._archive_policy_name,
'back_window': 0,
'aggregation_methods':
['std', 'count', 'min', 'max', 'sum', 'mean'],
}
apolicy['definition'] = \
self.storage._hybrid_backend._archive_policy_definition
pol_mock.create.assert_called_once_with(apolicy)
def test_init_no_res_type_no_policy(self):
self._init_storage()
def test_init_with_res_type_no_policy(self):
self._init_storage(res_type=True)
def test_init_no_res_type_with_policy(self):
self._init_storage(archive_policy=True)
def test_init_with_res_type_with_policy(self):
self._init_storage(res_type=True, archive_policy=True)

View File

@ -17,10 +17,12 @@
#
import copy
import mock
import sqlalchemy
import testscenarios
from cloudkitty import storage
from cloudkitty.storage.hybrid.backends import gnocchi as hgnocchi
from cloudkitty import tests
from cloudkitty.tests import samples
from cloudkitty import utils as ck_utils
@ -28,7 +30,8 @@ from cloudkitty import utils as ck_utils
class StorageTest(tests.TestCase):
storage_scenarios = [
('sqlalchemy', dict(storage_backend='sqlalchemy'))]
('sqlalchemy', dict(storage_backend='sqlalchemy')),
('hybrid', dict(storage_backend='hybrid'))]
@classmethod
def generate_scenarios(cls):
@ -36,8 +39,10 @@ class StorageTest(tests.TestCase):
cls.scenarios,
cls.storage_scenarios)
def setUp(self):
@mock.patch('cloudkitty.storage.hybrid.backends.gnocchi.gclient')
def setUp(self, gclient_mock):
super(StorageTest, self).setUp()
hgnocchi.METRICS_CONF = samples.METRICS_CONF
self._tenant_id = samples.TENANT
self._other_tenant_id = '8d3ae50089ea4142-9c6e1269db6a0b64'
self.conf.set_override('backend', self.storage_backend, 'storage')
@ -77,6 +82,278 @@ class StorageTest(tests.TestCase):
self.assertEqual(samples.RATED_DATA[1]['usage'], data)
self.assertEqual([], working_data)
# State
def test_get_state_when_nothing_in_storage(self):
state = self.storage.get_state()
self.assertIsNone(state)
def test_get_latest_global_state(self):
self.insert_different_data_two_tenants()
state = self.storage.get_state()
self.assertEqual(samples.SECOND_PERIOD_BEGIN, state)
def test_get_state_on_rated_tenant(self):
self.insert_different_data_two_tenants()
state = self.storage.get_state(self._tenant_id)
self.assertEqual(samples.FIRST_PERIOD_BEGIN, state)
state = self.storage.get_state(self._other_tenant_id)
self.assertEqual(samples.SECOND_PERIOD_BEGIN, state)
def test_get_state_on_no_data_frame(self):
self.storage.nodata(
samples.FIRST_PERIOD_BEGIN,
samples.FIRST_PERIOD_END,
self._tenant_id)
self.storage.commit(self._tenant_id)
state = self.storage.get_state(self._tenant_id)
self.assertEqual(samples.FIRST_PERIOD_BEGIN, state)
class StorageDataframeTest(StorageTest):
storage_scenarios = [
('sqlalchemy', dict(storage_backend='sqlalchemy'))]
# Queries
# Data
def test_get_no_frame_when_nothing_in_storage(self):
self.assertRaises(
storage.NoTimeFrame,
self.storage.get_time_frame,
begin=samples.FIRST_PERIOD_BEGIN - 3600,
end=samples.FIRST_PERIOD_BEGIN)
def test_get_frame_filter_outside_data(self):
self.insert_different_data_two_tenants()
self.assertRaises(
storage.NoTimeFrame,
self.storage.get_time_frame,
begin=samples.FIRST_PERIOD_BEGIN - 3600,
end=samples.FIRST_PERIOD_BEGIN)
def test_get_frame_without_filter_but_timestamp(self):
self.insert_different_data_two_tenants()
data = self.storage.get_time_frame(
begin=samples.FIRST_PERIOD_BEGIN,
end=samples.SECOND_PERIOD_END)
self.assertEqual(3, len(data))
def test_get_frame_on_one_period(self):
self.insert_different_data_two_tenants()
data = self.storage.get_time_frame(
begin=samples.FIRST_PERIOD_BEGIN,
end=samples.FIRST_PERIOD_END)
self.assertEqual(2, len(data))
def test_get_frame_on_one_period_and_one_tenant(self):
self.insert_different_data_two_tenants()
data = self.storage.get_time_frame(
begin=samples.FIRST_PERIOD_BEGIN,
end=samples.FIRST_PERIOD_END,
tenant_id=self._tenant_id)
self.assertEqual(2, len(data))
def test_get_frame_on_one_period_and_one_tenant_outside_data(self):
self.insert_different_data_two_tenants()
self.assertRaises(
storage.NoTimeFrame,
self.storage.get_time_frame,
begin=samples.FIRST_PERIOD_BEGIN,
end=samples.FIRST_PERIOD_END,
tenant_id=self._other_tenant_id)
def test_get_frame_on_two_periods(self):
self.insert_different_data_two_tenants()
data = self.storage.get_time_frame(
begin=samples.FIRST_PERIOD_BEGIN,
end=samples.SECOND_PERIOD_END)
self.assertEqual(3, len(data))
class StorageTotalTest(StorageTest):
storage_scenarios = [
('sqlalchemy', dict(storage_backend='sqlalchemy'))]
# Total
def test_get_empty_total(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN - 3600)
end = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end)
self.assertEqual(1, len(total))
self.assertIsNone(total[0]["rate"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
def test_get_total_without_filter_but_timestamp(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.SECOND_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end)
# FIXME(sheeprine): floating point error (transition to decimal)
self.assertEqual(1, len(total))
self.assertEqual(1.9473999999999998, total[0]["rate"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
def test_get_total_filtering_on_one_period(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.FIRST_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end)
self.assertEqual(1, len(total))
self.assertEqual(1.1074, total[0]["rate"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
def test_get_total_filtering_on_one_period_and_one_tenant(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.FIRST_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end,
tenant_id=self._tenant_id)
self.assertEqual(1, len(total))
self.assertEqual(0.5537, total[0]["rate"])
self.assertEqual(self._tenant_id, total[0]["tenant_id"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
def test_get_total_filtering_on_service(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.FIRST_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end,
service='compute')
self.assertEqual(1, len(total))
self.assertEqual(0.84, total[0]["rate"])
self.assertEqual('compute', total[0]["res_type"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
def test_get_total_groupby_tenant(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.SECOND_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end,
groupby="tenant_id")
self.assertEqual(2, len(total))
self.assertEqual(0.9737, total[0]["rate"])
self.assertEqual(self._other_tenant_id, total[0]["tenant_id"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
self.assertEqual(0.9737, total[1]["rate"])
self.assertEqual(self._tenant_id, total[1]["tenant_id"])
self.assertEqual(begin, total[1]["begin"])
self.assertEqual(end, total[1]["end"])
def test_get_total_groupby_restype(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.SECOND_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end,
groupby="res_type")
self.assertEqual(2, len(total))
self.assertEqual(0.2674, total[0]["rate"])
self.assertEqual('image', total[0]["res_type"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
self.assertEqual(1.68, total[1]["rate"])
self.assertEqual('compute', total[1]["res_type"])
self.assertEqual(begin, total[1]["begin"])
self.assertEqual(end, total[1]["end"])
def test_get_total_groupby_tenant_and_restype(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.SECOND_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end,
groupby="tenant_id,res_type")
self.assertEqual(4, len(total))
self.assertEqual(0.1337, total[0]["rate"])
self.assertEqual(self._other_tenant_id, total[0]["tenant_id"])
self.assertEqual('image', total[0]["res_type"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
self.assertEqual(0.1337, total[1]["rate"])
self.assertEqual(self._tenant_id, total[1]["tenant_id"])
self.assertEqual('image', total[1]["res_type"])
self.assertEqual(begin, total[1]["begin"])
self.assertEqual(end, total[1]["end"])
self.assertEqual(0.84, total[2]["rate"])
self.assertEqual(self._other_tenant_id, total[2]["tenant_id"])
self.assertEqual('compute', total[2]["res_type"])
self.assertEqual(begin, total[2]["begin"])
self.assertEqual(end, total[2]["end"])
self.assertEqual(0.84, total[3]["rate"])
self.assertEqual(self._tenant_id, total[3]["tenant_id"])
self.assertEqual('compute', total[3]["res_type"])
self.assertEqual(begin, total[3]["begin"])
self.assertEqual(end, total[3]["end"])
class StorageTenantTest(StorageTest):
storage_scenarios = [
('sqlalchemy', dict(storage_backend='sqlalchemy'))]
# Tenants
def test_get_empty_tenant_with_nothing_in_storage(self):
tenants = self.storage.get_tenants(
begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN),
end=ck_utils.ts2dt(samples.SECOND_PERIOD_BEGIN))
self.assertEqual([], tenants)
def test_get_empty_tenant_list(self):
self.insert_data()
tenants = self.storage.get_tenants(
begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN - 3600),
end=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN))
self.assertEqual([], tenants)
def test_get_tenants_filtering_on_period(self):
self.insert_different_data_two_tenants()
tenants = self.storage.get_tenants(
begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN),
end=ck_utils.ts2dt(samples.SECOND_PERIOD_END))
self.assertListEqual(
[self._tenant_id, self._other_tenant_id],
tenants)
tenants = self.storage.get_tenants(
begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN),
end=ck_utils.ts2dt(samples.FIRST_PERIOD_END))
self.assertListEqual(
[self._tenant_id],
tenants)
tenants = self.storage.get_tenants(
begin=ck_utils.ts2dt(samples.SECOND_PERIOD_BEGIN),
end=ck_utils.ts2dt(samples.SECOND_PERIOD_END))
self.assertListEqual(
[self._other_tenant_id],
tenants)
class StorageDataIntegrityTest(StorageTest):
storage_scenarios = [
('sqlalchemy', dict(storage_backend='sqlalchemy'))]
# Data integrity
def test_has_data_flag_behaviour(self):
self.assertNotIn(self._tenant_id, self.storage._has_data)
@ -213,253 +490,9 @@ class StorageTest(tests.TestCase):
self.assertNotIn(self._tenant_id, self.storage.usage_end)
self.assertNotIn(self._tenant_id, self.storage.usage_end_dt)
# Queries
# Data
def test_get_no_frame_when_nothing_in_storage(self):
self.assertRaises(
storage.NoTimeFrame,
self.storage.get_time_frame,
begin=samples.FIRST_PERIOD_BEGIN - 3600,
end=samples.FIRST_PERIOD_BEGIN)
def test_get_frame_filter_outside_data(self):
self.insert_different_data_two_tenants()
self.assertRaises(
storage.NoTimeFrame,
self.storage.get_time_frame,
begin=samples.FIRST_PERIOD_BEGIN - 3600,
end=samples.FIRST_PERIOD_BEGIN)
def test_get_frame_without_filter_but_timestamp(self):
self.insert_different_data_two_tenants()
data = self.storage.get_time_frame(
begin=samples.FIRST_PERIOD_BEGIN,
end=samples.SECOND_PERIOD_END)
self.assertEqual(3, len(data))
def test_get_frame_on_one_period(self):
self.insert_different_data_two_tenants()
data = self.storage.get_time_frame(
begin=samples.FIRST_PERIOD_BEGIN,
end=samples.FIRST_PERIOD_END)
self.assertEqual(2, len(data))
def test_get_frame_on_one_period_and_one_tenant(self):
self.insert_different_data_two_tenants()
data = self.storage.get_time_frame(
begin=samples.FIRST_PERIOD_BEGIN,
end=samples.FIRST_PERIOD_END,
tenant_id=self._tenant_id)
self.assertEqual(2, len(data))
def test_get_frame_on_one_period_and_one_tenant_outside_data(self):
self.insert_different_data_two_tenants()
self.assertRaises(
storage.NoTimeFrame,
self.storage.get_time_frame,
begin=samples.FIRST_PERIOD_BEGIN,
end=samples.FIRST_PERIOD_END,
tenant_id=self._other_tenant_id)
def test_get_frame_on_two_periods(self):
self.insert_different_data_two_tenants()
data = self.storage.get_time_frame(
begin=samples.FIRST_PERIOD_BEGIN,
end=samples.SECOND_PERIOD_END)
self.assertEqual(3, len(data))
# State
def test_get_state_when_nothing_in_storage(self):
state = self.storage.get_state()
self.assertIsNone(state)
def test_get_latest_global_state(self):
self.insert_different_data_two_tenants()
state = self.storage.get_state()
self.assertEqual(samples.SECOND_PERIOD_BEGIN, state)
def test_get_state_on_rated_tenant(self):
self.insert_different_data_two_tenants()
state = self.storage.get_state(self._tenant_id)
self.assertEqual(samples.FIRST_PERIOD_BEGIN, state)
state = self.storage.get_state(self._other_tenant_id)
self.assertEqual(samples.SECOND_PERIOD_BEGIN, state)
def test_get_state_on_no_data_frame(self):
self.storage.nodata(
samples.FIRST_PERIOD_BEGIN,
samples.FIRST_PERIOD_END,
self._tenant_id)
self.storage.commit(self._tenant_id)
state = self.storage.get_state(self._tenant_id)
self.assertEqual(samples.FIRST_PERIOD_BEGIN, state)
# Total
def test_get_empty_total(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN - 3600)
end = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end)
self.assertEqual(1, len(total))
self.assertIsNone(total[0]["rate"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
def test_get_total_without_filter_but_timestamp(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.SECOND_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end)
# FIXME(sheeprine): floating point error (transition to decimal)
self.assertEqual(1, len(total))
self.assertEqual(1.9473999999999998, total[0]["rate"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
def test_get_total_filtering_on_one_period(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.FIRST_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end)
self.assertEqual(1, len(total))
self.assertEqual(1.1074, total[0]["rate"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
def test_get_total_filtering_on_one_period_and_one_tenant(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.FIRST_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end,
tenant_id=self._tenant_id)
self.assertEqual(1, len(total))
self.assertEqual(0.5537, total[0]["rate"])
self.assertEqual(self._tenant_id, total[0]["tenant_id"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
def test_get_total_filtering_on_service(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.FIRST_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end,
service='compute')
self.assertEqual(1, len(total))
self.assertEqual(0.84, total[0]["rate"])
self.assertEqual('compute', total[0]["res_type"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
def test_get_total_groupby_tenant(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.SECOND_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end,
groupby="tenant_id")
self.assertEqual(2, len(total))
self.assertEqual(0.9737, total[0]["rate"])
self.assertEqual(self._other_tenant_id, total[0]["tenant_id"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
self.assertEqual(0.9737, total[1]["rate"])
self.assertEqual(self._tenant_id, total[1]["tenant_id"])
self.assertEqual(begin, total[1]["begin"])
self.assertEqual(end, total[1]["end"])
def test_get_total_groupby_restype(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.SECOND_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end,
groupby="res_type")
self.assertEqual(2, len(total))
self.assertEqual(0.2674, total[0]["rate"])
self.assertEqual('image', total[0]["res_type"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
self.assertEqual(1.68, total[1]["rate"])
self.assertEqual('compute', total[1]["res_type"])
self.assertEqual(begin, total[1]["begin"])
self.assertEqual(end, total[1]["end"])
def test_get_total_groupby_tenant_and_restype(self):
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
end = ck_utils.ts2dt(samples.SECOND_PERIOD_END)
self.insert_data()
total = self.storage.get_total(
begin=begin,
end=end,
groupby="tenant_id,res_type")
self.assertEqual(4, len(total))
self.assertEqual(0.1337, total[0]["rate"])
self.assertEqual(self._other_tenant_id, total[0]["tenant_id"])
self.assertEqual('image', total[0]["res_type"])
self.assertEqual(begin, total[0]["begin"])
self.assertEqual(end, total[0]["end"])
self.assertEqual(0.1337, total[1]["rate"])
self.assertEqual(self._tenant_id, total[1]["tenant_id"])
self.assertEqual('image', total[1]["res_type"])
self.assertEqual(begin, total[1]["begin"])
self.assertEqual(end, total[1]["end"])
self.assertEqual(0.84, total[2]["rate"])
self.assertEqual(self._other_tenant_id, total[2]["tenant_id"])
self.assertEqual('compute', total[2]["res_type"])
self.assertEqual(begin, total[2]["begin"])
self.assertEqual(end, total[2]["end"])
self.assertEqual(0.84, total[3]["rate"])
self.assertEqual(self._tenant_id, total[3]["tenant_id"])
self.assertEqual('compute', total[3]["res_type"])
self.assertEqual(begin, total[3]["begin"])
self.assertEqual(end, total[3]["end"])
# Tenants
def test_get_empty_tenant_with_nothing_in_storage(self):
tenants = self.storage.get_tenants(
begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN),
end=ck_utils.ts2dt(samples.SECOND_PERIOD_BEGIN))
self.assertEqual([], tenants)
def test_get_empty_tenant_list(self):
self.insert_data()
tenants = self.storage.get_tenants(
begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN - 3600),
end=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN))
self.assertEqual([], tenants)
def test_get_tenants_filtering_on_period(self):
self.insert_different_data_two_tenants()
tenants = self.storage.get_tenants(
begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN),
end=ck_utils.ts2dt(samples.SECOND_PERIOD_END))
self.assertListEqual(
[self._tenant_id, self._other_tenant_id],
tenants)
tenants = self.storage.get_tenants(
begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN),
end=ck_utils.ts2dt(samples.FIRST_PERIOD_END))
self.assertListEqual(
[self._tenant_id],
tenants)
tenants = self.storage.get_tenants(
begin=ck_utils.ts2dt(samples.SECOND_PERIOD_BEGIN),
end=ck_utils.ts2dt(samples.SECOND_PERIOD_END))
self.assertListEqual(
[self._other_tenant_id],
tenants)
StorageTest.generate_scenarios()
StorageTotalTest.generate_scenarios()
StorageTenantTest.generate_scenarios()
StorageDataframeTest.generate_scenarios()
StorageDataIntegrityTest.generate_scenarios()

View File

@ -291,3 +291,15 @@ def convert_unit(value, factor, offset=0):
factor = decimal.Decimal(factor)
return (decimal.Decimal(value) * factor) + decimal.Decimal(offset)
def flat_dict(item, parent=None):
"""Returns a flat version of the nested dict item"""
if not parent:
parent = dict()
for k, val in item.items():
if isinstance(val, dict):
parent = flat_dict(val, parent)
else:
parent[k] = val
return parent

View File

@ -139,13 +139,13 @@ The following shows the basic configuration items:
The tenant named ``service`` is also commonly called ``services``
It is now time to configure the storage backend. Three storage backends are
available: ``sqlalchemy``, ``gnocchihybrid``, and ``gnocchi``.
It is now time to configure the storage backend. Four storage backends are
available: ``sqlalchemy``, ``hybrid``, ``gnocchihybrid``, and ``gnocchi``.
.. code-block:: ini
[storage]
backend = gnocchihybrid
backend = gnocchi
As you will see in the following example, collector and storage backends
sometimes need additional configuration sections. (The tenant fetcher works the
@ -158,7 +158,8 @@ example), except for ``storage_gnocchi``.
The section name format should become ``{backend_type}_{backend_name}`` for
all sections in the future (``storage_gnocchi`` style).
If you want to use the pure gnocchi storage, add the following entry:
If you want to use the pure gnocchi storage or the hybrid storage with a
gnocchi backend, add the following entry:
.. code-block:: ini

View File

@ -0,0 +1,13 @@
---
features:
- |
The storage system is being refactored.
A hybrid storage backend has been added. This backend handles states
via SQLAlchemy and pure storage via another storage backend. Once
this new storage is considered stable, it will become the default storage.
This will ease the creation of storage backends (no more state handling).
deprecations:
- |
All storage backends except sqlalchemy and the new hybrid storage
have been deprecated.

View File

@ -69,6 +69,10 @@ cloudkitty.storage.backends =
sqlalchemy = cloudkitty.storage.sqlalchemy:SQLAlchemyStorage
gnocchihybrid = cloudkitty.storage.gnocchi_hybrid:GnocchiHybridStorage
gnocchi = cloudkitty.storage.gnocchi:GnocchiStorage
hybrid = cloudkitty.storage.hybrid:HybridStorage
cloudkitty.storage.hybrid.backends =
gnocchi = cloudkitty.storage.hybrid.backends.gnocchi:GnocchiStorage
cloudkitty.output.writers =
osrf = cloudkitty.writer.osrf:OSRFBackend