Separated writing and rating processing
Added a new process cloudkitty-writer to handle report generation. Removed writers references in the orchestrator. Removed OSRTFBackend as the storage backend is now driver based. Modified write_orchestrator to reflect changes made on the storage code. Change-Id: I201448892a02796d23f11a92d95d3e8a3992b961
This commit is contained in:
parent
734c2fae8d
commit
2398f39058
|
@ -0,0 +1,57 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Stéphane Albert
|
||||
#
|
||||
from oslo.config import cfg
|
||||
from stevedore import driver
|
||||
|
||||
from cloudkitty import config # noqa
|
||||
from cloudkitty.openstack.common import importutils as i_utils
|
||||
from cloudkitty import service
|
||||
from cloudkitty import write_orchestrator
|
||||
|
||||
CONF = cfg.CONF
|
||||
STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
|
||||
|
||||
|
||||
def load_storage_backend():
|
||||
storage_args = {'period': CONF.collect.period}
|
||||
CONF.import_opt('backend', 'cloudkitty.storage', 'storage')
|
||||
backend = driver.DriverManager(
|
||||
STORAGES_NAMESPACE,
|
||||
CONF.storage.backend,
|
||||
invoke_on_load=True,
|
||||
invoke_kwds=storage_args).driver
|
||||
return backend
|
||||
|
||||
|
||||
def load_output_backend():
|
||||
CONF.import_opt('backend', 'cloudkitty.config', 'output')
|
||||
backend = i_utils.import_class(CONF.output.backend)
|
||||
return backend
|
||||
|
||||
|
||||
def main():
|
||||
service.prepare_service()
|
||||
output_backend = load_output_backend()
|
||||
storage_backend = load_storage_backend()
|
||||
|
||||
wo = write_orchestrator.WriteOrchestrator(output_backend,
|
||||
'writer',
|
||||
storage_backend)
|
||||
wo.init_writing_pipeline()
|
||||
wo.restart_month()
|
||||
wo.process()
|
|
@ -24,17 +24,14 @@ from oslo.config import cfg
|
|||
from oslo import messaging
|
||||
from stevedore import driver
|
||||
from stevedore import extension
|
||||
from stevedore import named
|
||||
|
||||
from cloudkitty.common import rpc
|
||||
from cloudkitty import config # NOQA
|
||||
from cloudkitty import extension_manager
|
||||
from cloudkitty.openstack.common import importutils as i_utils
|
||||
from cloudkitty.openstack.common import lockutils
|
||||
from cloudkitty.openstack.common import log as logging
|
||||
from cloudkitty import state
|
||||
from cloudkitty import utils as ck_utils
|
||||
from cloudkitty import write_orchestrator as w_orch
|
||||
|
||||
eventlet.monkey_patch()
|
||||
|
||||
|
@ -46,7 +43,6 @@ COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends'
|
|||
TRANSFORMERS_NAMESPACE = 'cloudkitty.transformers'
|
||||
PROCESSORS_NAMESPACE = 'cloudkitty.billing.processors'
|
||||
STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
|
||||
WRITERS_NAMESPACE = 'cloudkitty.output.writers'
|
||||
|
||||
|
||||
class BillingEndpoint(object):
|
||||
|
@ -126,12 +122,6 @@ class Orchestrator(object):
|
|||
invoke_on_load=True,
|
||||
invoke_kwds=collector_args).driver
|
||||
|
||||
w_backend = i_utils.import_class(CONF.output.backend)
|
||||
self.wo = w_orch.WriteOrchestrator(w_backend,
|
||||
self.keystone.user_id,
|
||||
self.sm,
|
||||
basepath=CONF.output.basepath)
|
||||
|
||||
CONF.import_opt('backend', 'cloudkitty.storage', 'storage')
|
||||
storage_args = {'period': CONF.collect.period}
|
||||
self.storage = driver.DriverManager(
|
||||
|
@ -144,13 +134,6 @@ class Orchestrator(object):
|
|||
self.b_processors = {}
|
||||
self._load_billing_processors()
|
||||
|
||||
# Output settings
|
||||
output_pipeline = named.NamedExtensionManager(
|
||||
WRITERS_NAMESPACE,
|
||||
CONF.output.pipeline)
|
||||
for writer in output_pipeline:
|
||||
self.wo.add_writer(writer.plugin)
|
||||
|
||||
# RPC
|
||||
self.server = None
|
||||
self._billing_endpoint = BillingEndpoint(self)
|
||||
|
@ -262,14 +245,10 @@ class Orchestrator(object):
|
|||
processor.process(data)
|
||||
|
||||
# Writing
|
||||
# Copy data to keep old behaviour with write_orchestrator
|
||||
wo_data = list(data)
|
||||
self.wo.append(wo_data)
|
||||
self.storage.append(data)
|
||||
|
||||
# We're getting a full period so we directly commit
|
||||
self.wo.commit()
|
||||
self.storage.commit()
|
||||
|
||||
def terminate(self):
|
||||
self.wo.close()
|
||||
pass
|
||||
|
|
|
@ -15,60 +15,15 @@
|
|||
#
|
||||
# @author: Stéphane Albert
|
||||
#
|
||||
import datetime
|
||||
import json
|
||||
import os.path
|
||||
import zipfile
|
||||
from oslo.config import cfg
|
||||
from stevedore import named
|
||||
|
||||
import cloudkitty.utils as utils
|
||||
from cloudkitty import state
|
||||
from cloudkitty import storage
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
||||
class OSRTFBackend(object):
|
||||
"""Native backend for transient report storage.
|
||||
|
||||
Used to store data from the output of the billing pipeline.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._osrtf = None
|
||||
|
||||
def open(self, filename):
|
||||
# FIXME(sheeprine): ZipFile is working well with filename
|
||||
# but not backend
|
||||
self._osrtf = zipfile.ZipFile(filename, 'a')
|
||||
|
||||
def _gen_filename(self, timeframe):
|
||||
filename = '{}-{:02d}-{:02d}-{}-{}.json'.format(timeframe.year,
|
||||
timeframe.month,
|
||||
timeframe.day,
|
||||
timeframe.hour,
|
||||
timeframe.minute)
|
||||
return filename
|
||||
|
||||
def _file_exists(self, filename):
|
||||
for file_info in self._osrtf.infolist():
|
||||
if file_info.filename == filename:
|
||||
return True
|
||||
return False
|
||||
|
||||
def add(self, timeframe, data):
|
||||
"""Add the data to the OpenStack Report Transient Format."""
|
||||
filename = self._gen_filename(timeframe)
|
||||
# We can only check for the existence of a file not rewrite or delete
|
||||
# it
|
||||
if not self._file_exists(filename):
|
||||
self._osrtf.writestr(filename, json.dumps(data))
|
||||
|
||||
def get(self, timeframe):
|
||||
try:
|
||||
filename = self._gen_filename(timeframe)
|
||||
data = json.loads(self._osrtf.read(filename))
|
||||
return data
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
self._osrtf.close()
|
||||
CONF = cfg.CONF
|
||||
WRITERS_NAMESPACE = 'cloudkitty.output.writers'
|
||||
|
||||
|
||||
class WriteOrchestrator(object):
|
||||
|
@ -80,28 +35,32 @@ class WriteOrchestrator(object):
|
|||
def __init__(self,
|
||||
backend,
|
||||
user_id,
|
||||
state_manager,
|
||||
storage,
|
||||
basepath=None,
|
||||
period=3600):
|
||||
self._backend = backend
|
||||
self._uid = user_id
|
||||
self._period = period
|
||||
self._sm = state_manager
|
||||
self._storage = storage
|
||||
self._basepath = basepath
|
||||
self._osrtf = None
|
||||
self._period = period
|
||||
self._sm = state.DBStateManager(self._uid,
|
||||
'writer_status')
|
||||
self._write_pipeline = []
|
||||
|
||||
# State vars
|
||||
self.usage_start = None
|
||||
self.usage_start_dt = None
|
||||
self.usage_end = None
|
||||
self.usage_end_dt = None
|
||||
|
||||
# Current total
|
||||
self.total = 0
|
||||
|
||||
# Current usage period lines
|
||||
self._usage_data = {}
|
||||
def init_writing_pipeline(self):
|
||||
CONF.import_opt('pipeline', 'cloudkitty.config', 'output')
|
||||
output_pipeline = named.NamedExtensionManager(
|
||||
WRITERS_NAMESPACE,
|
||||
CONF.output.pipeline)
|
||||
for writer in output_pipeline:
|
||||
self.add_writer(writer.plugin)
|
||||
|
||||
def add_writer(self, writer_class):
|
||||
writer = writer_class(self,
|
||||
|
@ -110,117 +69,73 @@ class WriteOrchestrator(object):
|
|||
self._basepath)
|
||||
self._write_pipeline.append(writer)
|
||||
|
||||
def _gen_osrtf_filename(self, timeframe):
|
||||
if not isinstance(timeframe, datetime.datetime):
|
||||
raise TypeError('timeframe should be of type datetime.')
|
||||
date = '{}-{:02d}'.format(timeframe.year, timeframe.month)
|
||||
filename = '{}-osrtf-{}.zip'.format(self._uid, date)
|
||||
return filename
|
||||
|
||||
def _update_state_manager(self):
|
||||
def _update_state_manager_data(self):
|
||||
self._sm.set_state(self.usage_end)
|
||||
metadata = {'total': self.total}
|
||||
self._sm.set_metadata(metadata)
|
||||
|
||||
def _get_state_manager_timeframe(self):
|
||||
def _load_state_manager_data(self):
|
||||
timeframe = self._sm.get_state()
|
||||
self.usage_start = datetime.datetime.fromtimestamp(timeframe)
|
||||
end_frame = timeframe + self._period
|
||||
self.usage_end = datetime.datetime.fromtimestamp(end_frame)
|
||||
if timeframe:
|
||||
self.usage_start = timeframe
|
||||
self.usage_end = self.usage_start + self._period
|
||||
metadata = self._sm.get_metadata()
|
||||
self.total = metadata.get('total', 0)
|
||||
|
||||
def _filter_period(self, json_data):
|
||||
"""Detect the best usage period to extract.
|
||||
|
||||
Removes the usage from the json data and returns it.
|
||||
"""
|
||||
candidate_ts = None
|
||||
candidate_idx = 0
|
||||
|
||||
for idx, usage in enumerate(json_data):
|
||||
usage_ts = usage['period']['begin']
|
||||
if candidate_ts is None or usage_ts < candidate_ts:
|
||||
candidate_ts = usage_ts
|
||||
candidate_idx = idx
|
||||
|
||||
if candidate_ts:
|
||||
return candidate_ts, json_data.pop(candidate_idx)['usage']
|
||||
|
||||
def _format_data(self, timeframe, data):
|
||||
beg = utils.dt2ts(timeframe)
|
||||
end = beg + self._period
|
||||
final_data = {'period': {'begin': beg, 'end': end}}
|
||||
final_data['usage'] = data
|
||||
return [final_data]
|
||||
|
||||
def _open_osrtf(self):
|
||||
if self._osrtf is None:
|
||||
self._osrtf = OSRTFBackend()
|
||||
filename = self._gen_osrtf_filename(self.usage_start_dt)
|
||||
if self._basepath:
|
||||
self._osrtf_filename = os.path.join(self._basepath, filename)
|
||||
self._osrtf.open(self._osrtf_filename)
|
||||
|
||||
def _pre_commit(self):
|
||||
self._open_osrtf()
|
||||
|
||||
def _commit(self):
|
||||
self._pre_commit()
|
||||
|
||||
self._osrtf.add(self.usage_start_dt, self._usage_data)
|
||||
|
||||
# Dispatch data to writing pipeline
|
||||
for backend in self._write_pipeline:
|
||||
backend.append(self._usage_data, self.usage_start, self.usage_end)
|
||||
|
||||
self._update_state_manager()
|
||||
|
||||
self._usage_data = {}
|
||||
|
||||
self._post_commit()
|
||||
|
||||
def _post_commit(self):
|
||||
self._osrtf.close()
|
||||
if metadata:
|
||||
self.total = metadata.get('total', 0)
|
||||
|
||||
def _dispatch(self, data):
|
||||
for service in data:
|
||||
if service in self._usage_data:
|
||||
self._usage_data[service].extend(data[service])
|
||||
else:
|
||||
self._usage_data[service] = data[service]
|
||||
# Update totals
|
||||
for entry in data[service]:
|
||||
self.total += entry['billing']['price']
|
||||
# Dispatch data to writing pipeline
|
||||
for backend in self._write_pipeline:
|
||||
backend.append(data, self.usage_start, self.usage_end)
|
||||
|
||||
def get_timeframe(self, timeframe):
|
||||
self._open_osrtf()
|
||||
data = self._osrtf.get(timeframe)
|
||||
self._osrtf.close()
|
||||
return self._format_data(timeframe, data)
|
||||
|
||||
def append(self, raw_data):
|
||||
while raw_data:
|
||||
usage_start, data = self._filter_period(raw_data)
|
||||
if self.usage_end is not None and usage_start >= self.usage_end:
|
||||
self._commit()
|
||||
self.usage_start = None
|
||||
|
||||
if self.usage_start is None:
|
||||
self.usage_start = usage_start
|
||||
self.usage_end = usage_start + self._period
|
||||
self.usage_start_dt = (
|
||||
datetime.datetime.fromtimestamp(self.usage_start))
|
||||
self.usage_end_dt = (
|
||||
datetime.datetime.fromtimestamp(self.usage_end))
|
||||
|
||||
self._dispatch(data)
|
||||
|
||||
def commit(self):
|
||||
self._commit()
|
||||
def get_timeframe(self, timeframe, timeframe_end=None):
|
||||
if not timeframe_end:
|
||||
timeframe_end = timeframe + self._period
|
||||
try:
|
||||
data = self._storage.get_time_frame(timeframe,
|
||||
timeframe_end)
|
||||
except storage.NoTimeFrame:
|
||||
return None
|
||||
return data
|
||||
|
||||
def close(self):
|
||||
for writer in self._write_pipeline:
|
||||
writer.close()
|
||||
if self._osrtf is not None:
|
||||
self._osrtf.close()
|
||||
|
||||
def _push_data(self):
|
||||
data = self.get_timeframe(self.usage_start, self.usage_end)
|
||||
if data:
|
||||
for timeframe in data:
|
||||
self._dispatch(timeframe['usage'])
|
||||
|
||||
def _commit_data(self):
|
||||
for backend in self._write_pipeline:
|
||||
backend.commit()
|
||||
|
||||
def reset_state(self):
|
||||
self._load_state_manager_data()
|
||||
self.usage_end = self._storage.get_state()
|
||||
self._update_state_manager_data()
|
||||
|
||||
def restart_month(self):
|
||||
self._load_state_manager_data()
|
||||
self.usage_end = ck_utils.get_this_month_timestamp()
|
||||
self._update_state_manager_data()
|
||||
|
||||
def process(self):
|
||||
self._load_state_manager_data()
|
||||
storage_state = self._storage.get_state()
|
||||
if not self.usage_start:
|
||||
self.usage_start = storage_state
|
||||
self.usage_end = self.usage_start + self._period
|
||||
while storage_state > self.usage_start:
|
||||
self._push_data()
|
||||
self._commit_data()
|
||||
self._update_state_manager_data()
|
||||
self._load_state_manager_data()
|
||||
storage_state = self._storage.get_state()
|
||||
self.close()
|
||||
|
|
|
@ -24,6 +24,7 @@ console_scripts =
|
|||
cloudkitty-dbsync = cloudkitty.cli.dbsync:main
|
||||
cloudkitty-processor = cloudkitty.cli.processor:main
|
||||
cloudkitty-storage-init = cloudkitty.cli.storage:main
|
||||
cloudkitty-writer = cloudkitty.cli.writer:main
|
||||
|
||||
cloudkitty.collector.backends =
|
||||
ceilometer = cloudkitty.collector.ceilometer:CeilometerCollector
|
||||
|
|
Loading…
Reference in New Issue