Added support for an hybrid gnocchi storage

This storage removes the need to duplicate the data present in gnocchi.
Every dataframe is associated to the metric_id that generated the
calculations.
The downside is that you can only use gnocchi with this storage driver.
Moved around some utility function to remove the need to duplicate code
and have a cleaner boundary between the different parts of the code.

Change-Id: Iabfdcd4c15c906ed145ce383b65d1538f72671aa
This commit is contained in:
Stéphane Albert 2015-11-18 11:55:03 +01:00
parent ea8a86811e
commit 563b69fcd0
17 changed files with 419 additions and 106 deletions

View File

@ -19,7 +19,9 @@ import abc
from oslo_config import cfg
import six
from stevedore import driver
from cloudkitty import transformer
import cloudkitty.utils as ck_utils
collect_opts = [
@ -44,7 +46,24 @@ collect_opts = [
'network.floating'],
help='Services to monitor.'), ]
cfg.CONF.register_opts(collect_opts, 'collect')
CONF = cfg.CONF
CONF.register_opts(collect_opts, 'collect')
COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends'
def get_collector(transformers=None):
if not transformers:
transformers = transformer.get_transformers()
collector_args = {
'period': CONF.collect.period,
'transformers': transformers}
collector = driver.DriverManager(
COLLECTORS_NAMESPACE,
CONF.collect.collector,
invoke_on_load=True,
invoke_kwds=collector_args).driver
return collector
class TransformerDependencyError(Exception):

View File

@ -26,13 +26,14 @@ from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from stevedore import driver
from stevedore import extension
from tooz import coordination
from cloudkitty import collector
from cloudkitty.common import rpc
from cloudkitty import config # noqa
from cloudkitty import extension_manager
from cloudkitty import storage
from cloudkitty import transformer
from cloudkitty import utils as ck_utils
eventlet.monkey_patch()
@ -40,7 +41,6 @@ eventlet.monkey_patch()
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('backend', 'cloudkitty.storage', 'storage')
CONF.import_opt('backend', 'cloudkitty.tenant_fetcher', 'tenant_fetcher')
orchestrator_opts = [
@ -51,11 +51,8 @@ orchestrator_opts = [
]
CONF.register_opts(orchestrator_opts, group='orchestrator')
COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends'
FETCHERS_NAMESPACE = 'cloudkitty.tenant.fetchers'
TRANSFORMERS_NAMESPACE = 'cloudkitty.transformers'
PROCESSORS_NAMESPACE = 'cloudkitty.rating.processors'
STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
class RatingEndpoint(object):
@ -226,24 +223,9 @@ class Orchestrator(object):
CONF.tenant_fetcher.backend,
invoke_on_load=True).driver
# Transformers
self.transformers = {}
self._load_transformers()
collector_args = {'transformers': self.transformers,
'period': CONF.collect.period}
self.collector = driver.DriverManager(
COLLECTORS_NAMESPACE,
CONF.collect.collector,
invoke_on_load=True,
invoke_kwds=collector_args).driver
storage_args = {'period': CONF.collect.period}
self.storage = driver.DriverManager(
STORAGES_NAMESPACE,
CONF.storage.backend,
invoke_on_load=True,
invoke_kwds=storage_args).driver
self.transformers = transformer.get_transformers()
self.collector = collector.get_collector(self.transformers)
self.storage = storage.get_storage(self.collector)
# RPC
self.server = None
@ -287,17 +269,6 @@ class Orchestrator(object):
return next_timestamp
return 0
def _load_transformers(self):
self.transformers = {}
transformers = extension.ExtensionManager(
TRANSFORMERS_NAMESPACE,
invoke_on_load=True)
for transformer in transformers:
t_name = transformer.name
t_obj = transformer.obj
self.transformers[t_name] = t_obj
def process_messages(self):
# TODO(sheeprine): Code kept to handle threading and asynchronous
# reloading

View File

@ -21,23 +21,25 @@ from oslo_config import cfg
import six
from stevedore import driver
from cloudkitty import collector as ck_collector
from cloudkitty import utils as ck_utils
STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
CONF = cfg.CONF
storage_opts = [
cfg.StrOpt('backend',
default='sqlalchemy',
help='Name of the storage backend driver.')
]
CONF = cfg.CONF
CONF.register_opts(storage_opts, group='storage')
CONF.import_opt('period', 'cloudkitty.collector', 'collect')
STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
def get_storage():
storage_args = {'period': cfg.CONF.collect.period}
def get_storage(collector=None):
storage_args = {
'period': CONF.collect.period,
'collector': collector if collector else ck_collector.get_collector()}
backend = driver.DriverManager(
STORAGES_NAMESPACE,
cfg.CONF.storage.backend,
@ -60,8 +62,9 @@ class BaseStorage(object):
Handle incoming data from the global orchestrator, and store them.
"""
def __init__(self, period=CONF.collect.period):
self._period = period
def __init__(self, **kwargs):
self._period = kwargs.get('period', CONF.collect.period)
self._collector = kwargs.get('collector')
# State vars
self.usage_start = {}

View File

@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
import decimal
from oslo_log import log
from cloudkitty.storage.gnocchi_hybrid import migration
from cloudkitty.storage.gnocchi_hybrid import models
from cloudkitty.storage import sqlalchemy as sql_storage
LOG = log.getLogger(__name__)
class GnocchiHybridStorage(sql_storage.SQLAlchemyStorage):
"""Gnocchi Hybrid Storage Backend
Driver used to add support for gnocchi until the creation of custom
resources is supported in gnocchi.
"""
frame_model = models.HybridRatedDataframe
@staticmethod
def init():
migration.upgrade('head')
def _append_time_frame(self, res_type, frame, tenant_id):
rating_dict = frame.get('rating', {})
rate = rating_dict.get('price')
if not rate:
rate = decimal.Decimal(0)
resource_ref = frame.get('resource_id')
if not resource_ref:
LOG.warn('Trying to store data collected outside of gnocchi. '
'This driver can only be used with the gnocchi collector.'
' Data not stored!')
return
self.add_time_frame(begin=self.usage_start_dt.get(tenant_id),
end=self.usage_end_dt.get(tenant_id),
tenant_id=tenant_id,
res_type=res_type,
resource_ref=resource_ref,
rate=rate)
def add_time_frame(self, **kwargs):
"""Create a new time frame.
:param begin: Start of the dataframe.
:param end: End of the dataframe.
:param res_type: Type of the resource.
:param rate: Calculated rate for this dataframe.
:param tenant_id: tenant_id of the dataframe owner.
:param resource_ref: Reference to the gnocchi metric (UUID).
"""
super(GnocchiHybridStorage, self).add_time_frame(**kwargs)

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
from cloudkitty.common.db.alembic import env # noqa
from cloudkitty.storage.gnocchi_hybrid import models
target_metadata = models.Base.metadata
version_table = 'storage_gnocchi_hybrid_alembic'
env.run_migrations_online(target_metadata, version_table)

View File

@ -0,0 +1,22 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision}
Create Date: ${create_date}
"""
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,32 @@
"""Initial migration.
Revision ID: 4c2f20df7491
Revises: None
Create Date: 2015-11-18 11:44:09.175326
"""
# revision identifiers, used by Alembic.
revision = '4c2f20df7491'
down_revision = None
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table('ghybrid_dataframes',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('begin', sa.DateTime(), nullable=False),
sa.Column('end', sa.DateTime(), nullable=False),
sa.Column('res_type', sa.String(length=255), nullable=False),
sa.Column('rate', sa.Numeric(precision=20, scale=8), nullable=False),
sa.Column('resource_ref', sa.String(length=32), nullable=False),
sa.Column('tenant_id', sa.String(length=32), nullable=True),
sa.PrimaryKeyConstraint('id'),
mysql_charset='utf8',
mysql_engine='InnoDB')
def downgrade():
op.drop_table('ghybrid_dataframes')

View File

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
import os
from cloudkitty.common.db.alembic import migration
ALEMBIC_REPO = os.path.join(os.path.dirname(__file__), 'alembic')
def upgrade(revision):
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.upgrade(config, revision)
def downgrade(revision):
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.downgrade(config, revision)
def version():
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.version(config)
def revision(message, autogenerate):
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.revision(config, message, autogenerate)
def stamp(revision):
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.stamp(config, revision)

View File

@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
from oslo_db.sqlalchemy import models
import sqlalchemy
from sqlalchemy.ext import declarative
from cloudkitty import utils as ck_utils
Base = declarative.declarative_base()
class HybridRatedDataframe(Base, models.ModelBase):
"""An hybrid rated dataframe.
"""
__table_args__ = {'mysql_charset': "utf8",
'mysql_engine': "InnoDB"}
__tablename__ = 'ghybrid_dataframes'
id = sqlalchemy.Column(sqlalchemy.Integer,
primary_key=True)
begin = sqlalchemy.Column(sqlalchemy.DateTime,
nullable=False)
end = sqlalchemy.Column(sqlalchemy.DateTime,
nullable=False)
res_type = sqlalchemy.Column(sqlalchemy.String(255),
nullable=False)
rate = sqlalchemy.Column(sqlalchemy.Numeric(20, 8),
nullable=False)
resource_ref = sqlalchemy.Column(sqlalchemy.String(32),
nullable=False)
tenant_id = sqlalchemy.Column(sqlalchemy.String(32),
nullable=True)
def to_cloudkitty(self, collector=None):
if not collector:
raise Exception('Gnocchi storage needs a reference '
'to the collector.')
# Rating informations
rating_dict = {}
rating_dict['price'] = self.rate
# Resource information from gnocchi
resource_data = collector.resource_info(
resource_type=self.res_type,
start=self.begin,
end=self.end,
resource_id=self.resource_ref,
project_id=self.tenant_id)
# Encapsulate informations in a resource dict
res_dict = {}
res_dict['desc'] = resource_data['desc']
res_dict['vol'] = resource_data['vol']
res_dict['rating'] = rating_dict
res_dict['tenant_id'] = self.tenant_id
# Add resource to the usage dict
usage_dict = {}
usage_dict[self.res_type] = [res_dict]
# Time informations
period_dict = {}
period_dict['begin'] = ck_utils.dt2iso(self.begin)
period_dict['end'] = ck_utils.dt2iso(self.end)
# Add period to the resource informations
ck_dict = {}
ck_dict['period'] = period_dict
ck_dict['usage'] = usage_dict
return ck_dict

View File

@ -15,6 +15,7 @@
#
# @author: Stéphane Albert
#
import decimal
import json
from oslo_db.sqlalchemy import utils
@ -31,8 +32,10 @@ class SQLAlchemyStorage(storage.BaseStorage):
"""SQLAlchemy Storage Backend
"""
def __init__(self, period=3600):
super(SQLAlchemyStorage, self).__init__(period)
frame_model = models.RatedDataFrame
def __init__(self, **kwargs):
super(SQLAlchemyStorage, self).__init__(**kwargs)
self._session = {}
@staticmethod
@ -69,22 +72,18 @@ class SQLAlchemyStorage(storage.BaseStorage):
def get_state(self, tenant_id=None):
session = db.get_session()
q = utils.model_query(
models.RatedDataFrame,
session
)
self.frame_model,
session)
if tenant_id:
q = q.filter(
models.RatedDataFrame.tenant_id == tenant_id
)
r = q.order_by(
models.RatedDataFrame.begin.desc()
).first()
self.frame_model.tenant_id == tenant_id)
q = q.order_by(
self.frame_model.begin.desc())
r = q.first()
if r:
return ck_utils.dt2ts(r.begin)
def get_total(self, begin=None, end=None, tenant_id=None, service=None):
model = models.RatedDataFrame
# Boundary calculation
if not begin:
begin = ck_utils.get_month_start()
@ -93,22 +92,20 @@ class SQLAlchemyStorage(storage.BaseStorage):
session = db.get_session()
q = session.query(
sqlalchemy.func.sum(model.rate).label('rate'))
sqlalchemy.func.sum(self.frame_model.rate).label('rate'))
if tenant_id:
q = q.filter(
models.RatedDataFrame.tenant_id == tenant_id)
self.frame_model.tenant_id == tenant_id)
if service:
q = q.filter(
models.RatedDataFrame.res_type == service)
self.frame_model.res_type == service)
q = q.filter(
model.begin >= begin,
model.end <= end)
self.frame_model.begin >= begin,
self.frame_model.end <= end)
rate = q.scalar()
return rate
def get_tenants(self, begin=None, end=None):
model = models.RatedDataFrame
# Boundary calculation
if not begin:
begin = ck_utils.get_month_start()
@ -117,65 +114,64 @@ class SQLAlchemyStorage(storage.BaseStorage):
session = db.get_session()
q = utils.model_query(
model,
session
).filter(
model.begin >= begin,
model.end <= end
)
self.frame_model,
session)
q = q.filter(
self.frame_model.begin >= begin,
self.frame_model.end <= end)
tenants = q.distinct().values(
model.tenant_id
)
self.frame_model.tenant_id)
return [tenant.tenant_id for tenant in tenants]
def get_time_frame(self, begin, end, **filters):
model = models.RatedDataFrame
session = db.get_session()
q = utils.model_query(
model,
session
).filter(
model.begin >= ck_utils.ts2dt(begin),
model.end <= ck_utils.ts2dt(end)
)
self.frame_model,
session)
q = q.filter(
self.frame_model.begin >= ck_utils.ts2dt(begin),
self.frame_model.end <= ck_utils.ts2dt(end))
for filter_name, filter_value in filters.items():
if filter_value:
q = q.filter(getattr(model, filter_name) == filter_value)
q = q.filter(
getattr(self.frame_model, filter_name) == filter_value)
if not filters.get('res_type'):
q = q.filter(model.res_type != '_NO_DATA_')
q = q.filter(self.frame_model.res_type != '_NO_DATA_')
count = q.count()
if not count:
raise storage.NoTimeFrame()
r = q.all()
return [entry.to_cloudkitty() for entry in r]
return [entry.to_cloudkitty(self._collector) for entry in r]
def _append_time_frame(self, res_type, frame, tenant_id):
vol_dict = frame['vol']
qty = vol_dict['qty']
unit = vol_dict['unit']
rating_dict = frame['rating']
rate = rating_dict['price']
rating_dict = frame.get('rating', {})
rate = rating_dict.get('price')
if not rate:
rate = decimal.Decimal(0)
desc = json.dumps(frame['desc'])
self.add_time_frame(self.usage_start_dt.get(tenant_id),
self.usage_end_dt.get(tenant_id),
tenant_id,
unit,
qty,
res_type,
rate,
desc)
self.add_time_frame(begin=self.usage_start_dt.get(tenant_id),
end=self.usage_end_dt.get(tenant_id),
tenant_id=tenant_id,
unit=unit,
qty=qty,
res_type=res_type,
rate=rate,
desc=desc)
def add_time_frame(self, begin, end, tenant_id, unit, qty, res_type,
rate, desc):
def add_time_frame(self, **kwargs):
"""Create a new time frame.
:param begin: Start of the dataframe.
:param end: End of the dataframe.
:param tenant_id: tenant_id of the dataframe owner.
:param unit: Unit of the metric.
:param qty: Quantity of the metric.
:param res_type: Type of the resource.
:param rate: Calculated rate for this dataframe.
:param desc: Resource description (metadata).
"""
frame = models.RatedDataFrame(begin=begin,
end=end,
tenant_id=tenant_id,
unit=unit,
qty=qty,
res_type=res_type,
rate=rate,
desc=desc)
self._session[tenant_id].add(frame)
frame = self.frame_model(**kwargs)
self._session[kwargs.get('tenant_id')].add(frame)

View File

@ -53,7 +53,7 @@ class RatedDataFrame(Base, models.ModelBase):
desc = sqlalchemy.Column(sqlalchemy.Text(),
nullable=False)
def to_cloudkitty(self):
def to_cloudkitty(self, collector=None):
# Rating informations
rating_dict = {}
rating_dict['price'] = self.rate

View File

@ -17,6 +17,7 @@
#
import decimal
import mock
from oslo_config import fixture as config_fixture
from oslotest import base
import testscenarios
@ -74,7 +75,19 @@ class TestCase(testscenarios.TestWithScenarios, base.BaseTestCase):
self.conn = ck_db_api.get_instance()
migration = self.conn.get_migration()
migration.upgrade('head')
auth = mock.patch(
'keystoneauth1.loading.load_auth_from_conf_options',
return_value=dict())
auth.start()
self.auth = auth
session = mock.patch(
'keystoneauth1.loading.load_session_from_conf_options',
return_value=dict())
session.start()
self.session = session
def tearDown(self):
db.get_engine().dispose()
self.auth.stop()
self.session.stop()
super(TestCase, self).tearDown()

View File

@ -274,7 +274,15 @@ class BaseStorageDataFixture(fixture.GabbiFixture):
return data
def start_fixture(self):
self.storage = storage.get_storage()
auth = mock.patch(
'keystoneauth1.loading.load_auth_from_conf_options',
return_value=dict())
session = mock.patch(
'keystoneauth1.loading.load_session_from_conf_options',
return_value=dict())
with auth:
with session:
self.storage = storage.get_storage()
self.storage.init()
self.initialize_data()
@ -350,4 +358,10 @@ class CORSConfigFixture(fixture.GabbiFixture):
def setup_app():
rpc.init()
return app.load_app()
# FIXME(sheeprine): Extension fixtures are interacting with transformers
# loading, since collectors are not needed here we shunt them
no_collector = mock.patch(
'cloudkitty.collector.get_collector',
return_value=None)
with no_collector:
return app.load_app()

View File

@ -1,6 +1,6 @@
fixtures:
- PyScriptsConfigFixture
- UUIDFixture
- PyScriptsConfigFixture
- UUIDFixture
tests:

View File

@ -18,6 +18,21 @@
import abc
import six
from stevedore import extension
TRANSFORMERS_NAMESPACE = 'cloudkitty.transformers'
def get_transformers():
transformers = {}
transformer_exts = extension.ExtensionManager(
TRANSFORMERS_NAMESPACE,
invoke_on_load=True)
for transformer in transformer_exts:
t_name = transformer.name
t_obj = transformer.obj
transformers[t_name] = t_obj
return transformers
@six.add_metaclass(abc.ABCMeta)

View File

@ -56,6 +56,7 @@ cloudkitty.rating.processors =
cloudkitty.storage.backends =
sqlalchemy = cloudkitty.storage.sqlalchemy:SQLAlchemyStorage
gnocchihybrid = cloudkitty.storage.gnocchi_hybrid:GnocchiHybridStorage
cloudkitty.output.writers =
osrf = cloudkitty.writer.osrf:OSRFBackend