Separated writing and rating processing

Added a new process cloudkitty-writer to handle report generation.
Removed writers references in the orchestrator.
Removed OSRTFBackend as the storage backend is now driver based.
Modified write_orchestrator to reflect changes made on the storage code.

Change-Id: I201448892a02796d23f11a92d95d3e8a3992b961
This commit is contained in:
Stéphane Albert 2014-11-14 09:57:24 +01:00
parent 734c2fae8d
commit 2398f39058
4 changed files with 131 additions and 179 deletions

57
cloudkitty/cli/writer.py Normal file
View File

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
from oslo.config import cfg
from stevedore import driver
from cloudkitty import config # noqa
from cloudkitty.openstack.common import importutils as i_utils
from cloudkitty import service
from cloudkitty import write_orchestrator
CONF = cfg.CONF
STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
def load_storage_backend():
storage_args = {'period': CONF.collect.period}
CONF.import_opt('backend', 'cloudkitty.storage', 'storage')
backend = driver.DriverManager(
STORAGES_NAMESPACE,
CONF.storage.backend,
invoke_on_load=True,
invoke_kwds=storage_args).driver
return backend
def load_output_backend():
CONF.import_opt('backend', 'cloudkitty.config', 'output')
backend = i_utils.import_class(CONF.output.backend)
return backend
def main():
service.prepare_service()
output_backend = load_output_backend()
storage_backend = load_storage_backend()
wo = write_orchestrator.WriteOrchestrator(output_backend,
'writer',
storage_backend)
wo.init_writing_pipeline()
wo.restart_month()
wo.process()

View File

@ -24,17 +24,14 @@ from oslo.config import cfg
from oslo import messaging
from stevedore import driver
from stevedore import extension
from stevedore import named
from cloudkitty.common import rpc
from cloudkitty import config # NOQA
from cloudkitty import extension_manager
from cloudkitty.openstack.common import importutils as i_utils
from cloudkitty.openstack.common import lockutils
from cloudkitty.openstack.common import log as logging
from cloudkitty import state
from cloudkitty import utils as ck_utils
from cloudkitty import write_orchestrator as w_orch
eventlet.monkey_patch()
@ -46,7 +43,6 @@ COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends'
TRANSFORMERS_NAMESPACE = 'cloudkitty.transformers'
PROCESSORS_NAMESPACE = 'cloudkitty.billing.processors'
STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
WRITERS_NAMESPACE = 'cloudkitty.output.writers'
class BillingEndpoint(object):
@ -126,12 +122,6 @@ class Orchestrator(object):
invoke_on_load=True,
invoke_kwds=collector_args).driver
w_backend = i_utils.import_class(CONF.output.backend)
self.wo = w_orch.WriteOrchestrator(w_backend,
self.keystone.user_id,
self.sm,
basepath=CONF.output.basepath)
CONF.import_opt('backend', 'cloudkitty.storage', 'storage')
storage_args = {'period': CONF.collect.period}
self.storage = driver.DriverManager(
@ -144,13 +134,6 @@ class Orchestrator(object):
self.b_processors = {}
self._load_billing_processors()
# Output settings
output_pipeline = named.NamedExtensionManager(
WRITERS_NAMESPACE,
CONF.output.pipeline)
for writer in output_pipeline:
self.wo.add_writer(writer.plugin)
# RPC
self.server = None
self._billing_endpoint = BillingEndpoint(self)
@ -262,14 +245,10 @@ class Orchestrator(object):
processor.process(data)
# Writing
# Copy data to keep old behaviour with write_orchestrator
wo_data = list(data)
self.wo.append(wo_data)
self.storage.append(data)
# We're getting a full period so we directly commit
self.wo.commit()
self.storage.commit()
def terminate(self):
self.wo.close()
pass

View File

@ -15,60 +15,15 @@
#
# @author: Stéphane Albert
#
import datetime
import json
import os.path
import zipfile
from oslo.config import cfg
from stevedore import named
import cloudkitty.utils as utils
from cloudkitty import state
from cloudkitty import storage
from cloudkitty import utils as ck_utils
class OSRTFBackend(object):
"""Native backend for transient report storage.
Used to store data from the output of the billing pipeline.
"""
def __init__(self):
self._osrtf = None
def open(self, filename):
# FIXME(sheeprine): ZipFile is working well with filename
# but not backend
self._osrtf = zipfile.ZipFile(filename, 'a')
def _gen_filename(self, timeframe):
filename = '{}-{:02d}-{:02d}-{}-{}.json'.format(timeframe.year,
timeframe.month,
timeframe.day,
timeframe.hour,
timeframe.minute)
return filename
def _file_exists(self, filename):
for file_info in self._osrtf.infolist():
if file_info.filename == filename:
return True
return False
def add(self, timeframe, data):
"""Add the data to the OpenStack Report Transient Format."""
filename = self._gen_filename(timeframe)
# We can only check for the existence of a file not rewrite or delete
# it
if not self._file_exists(filename):
self._osrtf.writestr(filename, json.dumps(data))
def get(self, timeframe):
try:
filename = self._gen_filename(timeframe)
data = json.loads(self._osrtf.read(filename))
return data
except Exception:
pass
def close(self):
self._osrtf.close()
CONF = cfg.CONF
WRITERS_NAMESPACE = 'cloudkitty.output.writers'
class WriteOrchestrator(object):
@ -80,28 +35,32 @@ class WriteOrchestrator(object):
def __init__(self,
backend,
user_id,
state_manager,
storage,
basepath=None,
period=3600):
self._backend = backend
self._uid = user_id
self._period = period
self._sm = state_manager
self._storage = storage
self._basepath = basepath
self._osrtf = None
self._period = period
self._sm = state.DBStateManager(self._uid,
'writer_status')
self._write_pipeline = []
# State vars
self.usage_start = None
self.usage_start_dt = None
self.usage_end = None
self.usage_end_dt = None
# Current total
self.total = 0
# Current usage period lines
self._usage_data = {}
def init_writing_pipeline(self):
CONF.import_opt('pipeline', 'cloudkitty.config', 'output')
output_pipeline = named.NamedExtensionManager(
WRITERS_NAMESPACE,
CONF.output.pipeline)
for writer in output_pipeline:
self.add_writer(writer.plugin)
def add_writer(self, writer_class):
writer = writer_class(self,
@ -110,117 +69,73 @@ class WriteOrchestrator(object):
self._basepath)
self._write_pipeline.append(writer)
def _gen_osrtf_filename(self, timeframe):
if not isinstance(timeframe, datetime.datetime):
raise TypeError('timeframe should be of type datetime.')
date = '{}-{:02d}'.format(timeframe.year, timeframe.month)
filename = '{}-osrtf-{}.zip'.format(self._uid, date)
return filename
def _update_state_manager(self):
def _update_state_manager_data(self):
self._sm.set_state(self.usage_end)
metadata = {'total': self.total}
self._sm.set_metadata(metadata)
def _get_state_manager_timeframe(self):
def _load_state_manager_data(self):
timeframe = self._sm.get_state()
self.usage_start = datetime.datetime.fromtimestamp(timeframe)
end_frame = timeframe + self._period
self.usage_end = datetime.datetime.fromtimestamp(end_frame)
if timeframe:
self.usage_start = timeframe
self.usage_end = self.usage_start + self._period
metadata = self._sm.get_metadata()
self.total = metadata.get('total', 0)
def _filter_period(self, json_data):
"""Detect the best usage period to extract.
Removes the usage from the json data and returns it.
"""
candidate_ts = None
candidate_idx = 0
for idx, usage in enumerate(json_data):
usage_ts = usage['period']['begin']
if candidate_ts is None or usage_ts < candidate_ts:
candidate_ts = usage_ts
candidate_idx = idx
if candidate_ts:
return candidate_ts, json_data.pop(candidate_idx)['usage']
def _format_data(self, timeframe, data):
beg = utils.dt2ts(timeframe)
end = beg + self._period
final_data = {'period': {'begin': beg, 'end': end}}
final_data['usage'] = data
return [final_data]
def _open_osrtf(self):
if self._osrtf is None:
self._osrtf = OSRTFBackend()
filename = self._gen_osrtf_filename(self.usage_start_dt)
if self._basepath:
self._osrtf_filename = os.path.join(self._basepath, filename)
self._osrtf.open(self._osrtf_filename)
def _pre_commit(self):
self._open_osrtf()
def _commit(self):
self._pre_commit()
self._osrtf.add(self.usage_start_dt, self._usage_data)
# Dispatch data to writing pipeline
for backend in self._write_pipeline:
backend.append(self._usage_data, self.usage_start, self.usage_end)
self._update_state_manager()
self._usage_data = {}
self._post_commit()
def _post_commit(self):
self._osrtf.close()
if metadata:
self.total = metadata.get('total', 0)
def _dispatch(self, data):
for service in data:
if service in self._usage_data:
self._usage_data[service].extend(data[service])
else:
self._usage_data[service] = data[service]
# Update totals
for entry in data[service]:
self.total += entry['billing']['price']
# Dispatch data to writing pipeline
for backend in self._write_pipeline:
backend.append(data, self.usage_start, self.usage_end)
def get_timeframe(self, timeframe):
self._open_osrtf()
data = self._osrtf.get(timeframe)
self._osrtf.close()
return self._format_data(timeframe, data)
def append(self, raw_data):
while raw_data:
usage_start, data = self._filter_period(raw_data)
if self.usage_end is not None and usage_start >= self.usage_end:
self._commit()
self.usage_start = None
if self.usage_start is None:
self.usage_start = usage_start
self.usage_end = usage_start + self._period
self.usage_start_dt = (
datetime.datetime.fromtimestamp(self.usage_start))
self.usage_end_dt = (
datetime.datetime.fromtimestamp(self.usage_end))
self._dispatch(data)
def commit(self):
self._commit()
def get_timeframe(self, timeframe, timeframe_end=None):
if not timeframe_end:
timeframe_end = timeframe + self._period
try:
data = self._storage.get_time_frame(timeframe,
timeframe_end)
except storage.NoTimeFrame:
return None
return data
def close(self):
for writer in self._write_pipeline:
writer.close()
if self._osrtf is not None:
self._osrtf.close()
def _push_data(self):
data = self.get_timeframe(self.usage_start, self.usage_end)
if data:
for timeframe in data:
self._dispatch(timeframe['usage'])
def _commit_data(self):
for backend in self._write_pipeline:
backend.commit()
def reset_state(self):
self._load_state_manager_data()
self.usage_end = self._storage.get_state()
self._update_state_manager_data()
def restart_month(self):
self._load_state_manager_data()
self.usage_end = ck_utils.get_this_month_timestamp()
self._update_state_manager_data()
def process(self):
self._load_state_manager_data()
storage_state = self._storage.get_state()
if not self.usage_start:
self.usage_start = storage_state
self.usage_end = self.usage_start + self._period
while storage_state > self.usage_start:
self._push_data()
self._commit_data()
self._update_state_manager_data()
self._load_state_manager_data()
storage_state = self._storage.get_state()
self.close()

View File

@ -24,6 +24,7 @@ console_scripts =
cloudkitty-dbsync = cloudkitty.cli.dbsync:main
cloudkitty-processor = cloudkitty.cli.processor:main
cloudkitty-storage-init = cloudkitty.cli.storage:main
cloudkitty-writer = cloudkitty.cli.writer:main
cloudkitty.collector.backends =
ceilometer = cloudkitty.collector.ceilometer:CeilometerCollector