Implemented new storage drivers

Added SQLAlchemy storage backend.
Modified Orchestrator to handle new storage backends.
write_orchestrator code should be considered legacy, it'll be removed
next versions. A new process cloudkitty-writer will replace this
feature.

Change-Id: I723371309e12754b2ebd58afab5bbf7a47d69704
This commit is contained in:
Stéphane Albert 2014-10-14 15:43:52 +02:00
parent 2369cc5913
commit f41fd4c6d7
12 changed files with 546 additions and 5 deletions

38
cloudkitty/cli/storage.py Normal file
View File

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
from oslo.config import cfg
from stevedore import driver
from cloudkitty import config # noqa
from cloudkitty import service
CONF = cfg.CONF
STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
def init_storage_backend():
CONF.import_opt('backend', 'cloudkitty.storage', 'storage')
backend = driver.DriverManager(
STORAGES_NAMESPACE,
CONF.storage.backend)
backend.driver.init()
def main():
service.prepare_service()
init_storage_backend()

View File

@ -49,6 +49,9 @@ collect_opts = [
cfg.IntOpt('period',
default=3600,
help='Billing period in seconds.'),
cfg.IntOpt('wait_periods',
default=2,
help='Wait for N periods before collecting new data.'),
cfg.ListOpt('services',
default=['compute'],
help='Services to monitor.'), ]

View File

@ -45,6 +45,7 @@ CONF = cfg.CONF
COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends'
TRANSFORMERS_NAMESPACE = 'cloudkitty.transformers'
PROCESSORS_NAMESPACE = 'cloudkitty.billing.processors'
STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
WRITERS_NAMESPACE = 'cloudkitty.output.writers'
@ -131,6 +132,14 @@ class Orchestrator(object):
self.sm,
basepath=CONF.output.basepath)
CONF.import_opt('backend', 'cloudkitty.storage', 'storage')
storage_args = {'period': CONF.collect.period}
self.storage = driver.DriverManager(
STORAGES_NAMESPACE,
CONF.storage.backend,
invoke_on_load=True,
invoke_kwds=storage_args).driver
# Billing processors
self.b_processors = {}
self._load_billing_processors()
@ -158,13 +167,15 @@ class Orchestrator(object):
self.server.start()
def _check_state(self):
timestamp = self.sm.get_state()
timestamp = self.storage.get_state()
if not timestamp:
return ck_utils.get_this_month_timestamp()
now = int(time.time())
if timestamp + CONF.collect.period < now:
return timestamp
now = int(time.time() + time.timezone)
next_timestamp = timestamp + CONF.collect.period
wait_time = CONF.collect.wait_periods * CONF.collect.period
if next_timestamp + wait_time < now:
return next_timestamp
return 0
def _collect(self, service, start_timestamp):
@ -251,10 +262,14 @@ class Orchestrator(object):
processor.process(data)
# Writing
self.wo.append(data)
# Copy data to keep old behaviour with write_orchestrator
wo_data = list(data)
self.wo.append(wo_data)
self.storage.append(data)
# We're getting a full period so we directly commit
self.wo.commit()
self.storage.commit()
def terminate(self):
self.wo.close()

View File

@ -0,0 +1,147 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
import abc
import datetime
from oslo.config import cfg
import six
storage_opts = [
cfg.StrOpt('backend',
default='sqlalchemy',
help='Name of the storage backend driver.')
]
cfg.CONF.register_opts(storage_opts, group='storage')
@six.add_metaclass(abc.ABCMeta)
class BaseStorage(object):
"""Base Storage class:
Handle incoming data from the global orchestrator, and store them.
"""
def __init__(self, period=3600):
self._period = period
# State vars
self.usage_start = None
self.usage_start_dt = None
self.usage_end = None
self.usage_end_dt = None
@staticmethod
def init():
"""Initialize storage backend.
Can be used to create DB schema on first start.
"""
pass
def _filter_period(self, json_data):
"""Detect the best usage period to extract.
Removes the usage from the json data and returns it.
:param json_data: Data to filter.
"""
candidate_ts = None
candidate_idx = 0
for idx, usage in enumerate(json_data):
usage_ts = usage['period']['begin']
if candidate_ts is None or usage_ts < candidate_ts:
candidate_ts = usage_ts
candidate_idx = idx
if candidate_ts:
return candidate_ts, json_data.pop(candidate_idx)['usage']
def _pre_commit(self):
"""Called before every commit.
"""
@abc.abstractmethod
def _commit(self):
"""Push data to the storage backend.
"""
def _post_commit(self):
"""Called after every commit.
"""
@abc.abstractmethod
def _dispatch(self, data):
"""Process rated data.
:param data: The rated data frames.
"""
@abc.abstractmethod
def get_state(self):
"""Return the last written frame's timestamp.
"""
@abc.abstractmethod
def get_total(self):
pass
@abc.abstractmethod
def get_time_frame(self, begin, end, **filters):
"""Request a time frame from the storage backend.
:param begin: When to start filtering.
:type begin: datetime.datetime
:param end: When to stop filtering.
:type end: datetime.datetime
:param res_type: (Optional) Filter on the resource type.
:type res_type: str
"""
def append(self, raw_data):
"""Append rated data before committing them to the backend.
:param raw_data: The rated data frames.
"""
while raw_data:
usage_start, data = self._filter_period(raw_data)
if self.usage_end is not None and usage_start >= self.usage_end:
self.commit()
self.usage_start = None
if self.usage_start is None:
self.usage_start = usage_start
self.usage_end = usage_start + self._period
self.usage_start_dt = (
datetime.datetime.fromtimestamp(self.usage_start))
self.usage_end_dt = (
datetime.datetime.fromtimestamp(self.usage_end))
self._dispatch(data)
def commit(self):
"""Commit the changes to the backend.
"""
self._pre_commit()
self._commit()
self._post_commit()

View File

@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
import json
from oslo.db.sqlalchemy import utils
from cloudkitty import db
from cloudkitty import storage
from cloudkitty.storage.sqlalchemy import migration
from cloudkitty.storage.sqlalchemy import models
from cloudkitty import utils as ck_utils
class NoTimeFrame(Exception):
"""Raised when there is no time frame available."""
def __init__(self):
super(NoTimeFrame, self).__init__(
"No time frame available")
class SQLAlchemyStorage(storage.BaseStorage):
"""SQLAlchemy Storage Backend
"""
def __init__(self, period=3600):
super(SQLAlchemyStorage, self).__init__(period)
self._session = None
@staticmethod
def init():
migration.upgrade('head')
def _commit(self):
self._session.commit()
self._session.begin()
def _dispatch(self, data):
for service in data:
for frame in data[service]:
self._append_time_frame(service, frame)
def append(self, raw_data):
if not self._session:
self._session = db.get_session()
self._session.begin()
super(SQLAlchemyStorage, self).append(raw_data)
def get_state(self):
session = db.get_session()
r = utils.model_query(
models.RatedDataFrame,
session
).order_by(
models.RatedDataFrame.begin.desc()
).first()
if r:
return ck_utils.dt2ts(r.begin)
def get_total(self):
pass
def get_time_frame(self, begin, end, **filters):
"""Return a list of time frames.
:param start: Filter from `start`.
:param end: Filter to `end`.
:param unit: Filter on an unit type.
:param res_type: Filter on a resource type.
"""
model = models.RatedDataFrame
session = db.get_session()
q = utils.model_query(
model,
session
).filter(
model.begin >= begin,
model.end <= end
)
for cur_filter in filters:
q = q.filter(getattr(model, cur_filter) == filters[cur_filter])
if not q:
raise NoTimeFrame()
return q.to_cloudkitty()
def _append_time_frame(self, res_type, frame):
vol_dict = frame['vol']
qty = vol_dict['qty']
unit = vol_dict['unit']
rating_dict = frame['billing']
rate = rating_dict['price']
desc = json.dumps(frame['desc'])
self.add_time_frame(self.usage_start_dt,
self.usage_end_dt,
unit,
qty,
res_type,
rate,
desc)
def add_time_frame(self, begin, end, unit, qty, res_type, rate, desc):
"""Create a new time frame.
"""
frame = models.RatedDataFrame(begin=begin,
end=end,
unit=unit,
qty=qty,
res_type=res_type,
rate=rate,
desc=desc)
self._session.add(frame)

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
from cloudkitty.common.db.alembic import env # noqa
from cloudkitty.storage.sqlalchemy import models
target_metadata = models.Base.metadata
version_table = 'storage_sqlalchemy_alembic'
env.run_migrations_online(target_metadata, version_table)

View File

@ -0,0 +1,22 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision}
Create Date: ${create_date}
"""
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,35 @@
"""Initial migration
Revision ID: 17fd1b237aa3
Revises: None
Create Date: 2014-10-10 11:28:08.645122
"""
# revision identifiers, used by Alembic.
revision = '17fd1b237aa3'
down_revision = None
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table('rated_data_frames',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('begin', sa.DateTime(), nullable=False),
sa.Column('end', sa.DateTime(), nullable=False),
sa.Column('unit', sa.String(length=255), nullable=False),
sa.Column('qty', sa.Numeric(), nullable=False),
sa.Column('res_type', sa.String(length=255), nullable=False),
sa.Column('rate', sa.Float(), nullable=False),
sa.Column('desc', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('id'),
mysql_charset='utf8',
mysql_engine='InnoDB'
)
def downgrade():
op.drop_table('rated_data_frames')
op.drop_table('storage_sqlalchemy_alembic')

View File

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
import os
from cloudkitty.common.db.alembic import migration
ALEMBIC_REPO = os.path.join(os.path.dirname(__file__), 'alembic')
def upgrade(revision):
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.upgrade(config, revision)
def downgrade(revision):
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.downgrade(config, revision)
def version():
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.version(config)
def revision(message, autogenerate):
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.revision(config, message, autogenerate)
def stamp(revision):
config = migration.load_alembic_config(ALEMBIC_REPO)
return migration.stamp(config, revision)

View File

@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
import json
from oslo.db.sqlalchemy import models
import sqlalchemy
from sqlalchemy.ext import declarative
Base = declarative.declarative_base()
class RatedDataFrame(Base, models.ModelBase):
"""A rated data frame.
"""
__table_args__ = {'mysql_charset': "utf8",
'mysql_engine': "InnoDB"}
__tablename__ = 'rated_data_frames'
id = sqlalchemy.Column(sqlalchemy.Integer,
primary_key=True)
begin = sqlalchemy.Column(sqlalchemy.DateTime,
nullable=False)
end = sqlalchemy.Column(sqlalchemy.DateTime,
nullable=False)
unit = sqlalchemy.Column(sqlalchemy.String(255),
nullable=False)
qty = sqlalchemy.Column(sqlalchemy.Numeric(),
nullable=False)
res_type = sqlalchemy.Column(sqlalchemy.String(255),
nullable=False)
rate = sqlalchemy.Column(sqlalchemy.Float(),
nullable=False)
desc = sqlalchemy.Column(sqlalchemy.Text(),
nullable=False)
def to_cloudkitty(self):
rating_dict = {}
rating_dict['price'] = self.rate
vol_dict = {}
vol_dict['qty'] = self.qty
vol_dict['unit'] = self.unit
res_dict = {}
res_dict['billing'] = rating_dict
res_dict['desc'] = json.loads(self.desc)
res_dict['vol'] = vol_dict
ck_dict = {}
ck_dict[self.res_type] = [res_dict]
return ck_dict

View File

@ -361,6 +361,10 @@
# Billing period in seconds. (integer value)
#period=3600
# Wait for N periods before collecting new data. (integer
# value)
#wait_periods=2
# Services to monitor. (list value)
#services=compute
@ -529,3 +533,13 @@
#basepath=/var/lib/cloudkitty/states/
[storage]
#
# Options defined in cloudkitty.storage
#
# Name of the storage backend driver. (string value)
#backend=sqlalchemy

View File

@ -23,6 +23,7 @@ console_scripts =
cloudkitty-api = cloudkitty.cli.api:main
cloudkitty-dbsync = cloudkitty.cli.dbsync:main
cloudkitty-processor = cloudkitty.cli.processor:main
cloudkitty-storage-init = cloudkitty.cli.storage:main
cloudkitty.collector.backends =
ceilometer = cloudkitty.collector.ceilometer:CeilometerCollector
@ -36,6 +37,9 @@ cloudkitty.billing.processors =
noop = cloudkitty.billing.noop:Noop
hashmap = cloudkitty.billing.hash:BasicHashMap
cloudkitty.storage.backends =
sqlalchemy = cloudkitty.storage.sqlalchemy:SQLAlchemyStorage
cloudkitty.output.writers =
osrf = cloudkitty.writer.osrf:OSRFBackend