diff --git a/cloudkitty/cli/writer.py b/cloudkitty/cli/writer.py new file mode 100644 index 00000000..a7957a94 --- /dev/null +++ b/cloudkitty/cli/writer.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Stéphane Albert +# +from oslo.config import cfg +from stevedore import driver + +from cloudkitty import config # noqa +from cloudkitty.openstack.common import importutils as i_utils +from cloudkitty import service +from cloudkitty import write_orchestrator + +CONF = cfg.CONF +STORAGES_NAMESPACE = 'cloudkitty.storage.backends' + + +def load_storage_backend(): + storage_args = {'period': CONF.collect.period} + CONF.import_opt('backend', 'cloudkitty.storage', 'storage') + backend = driver.DriverManager( + STORAGES_NAMESPACE, + CONF.storage.backend, + invoke_on_load=True, + invoke_kwds=storage_args).driver + return backend + + +def load_output_backend(): + CONF.import_opt('backend', 'cloudkitty.config', 'output') + backend = i_utils.import_class(CONF.output.backend) + return backend + + +def main(): + service.prepare_service() + output_backend = load_output_backend() + storage_backend = load_storage_backend() + + wo = write_orchestrator.WriteOrchestrator(output_backend, + 'writer', + storage_backend) + wo.init_writing_pipeline() + wo.restart_month() + wo.process() diff --git a/cloudkitty/orchestrator.py b/cloudkitty/orchestrator.py index 977536ba..70db0c34 100644 --- a/cloudkitty/orchestrator.py +++ b/cloudkitty/orchestrator.py @@ -24,17 +24,14 @@ from oslo.config import cfg from oslo import messaging from stevedore import driver from stevedore import extension -from stevedore import named from cloudkitty.common import rpc from cloudkitty import config # NOQA from cloudkitty import extension_manager -from cloudkitty.openstack.common import importutils as i_utils from cloudkitty.openstack.common import lockutils from cloudkitty.openstack.common import log as logging from cloudkitty import state from cloudkitty import utils as ck_utils -from cloudkitty import write_orchestrator as w_orch eventlet.monkey_patch() @@ -46,7 +43,6 @@ COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends' TRANSFORMERS_NAMESPACE = 'cloudkitty.transformers' PROCESSORS_NAMESPACE = 'cloudkitty.billing.processors' STORAGES_NAMESPACE = 'cloudkitty.storage.backends' -WRITERS_NAMESPACE = 'cloudkitty.output.writers' class BillingEndpoint(object): @@ -126,12 +122,6 @@ class Orchestrator(object): invoke_on_load=True, invoke_kwds=collector_args).driver - w_backend = i_utils.import_class(CONF.output.backend) - self.wo = w_orch.WriteOrchestrator(w_backend, - self.keystone.user_id, - self.sm, - basepath=CONF.output.basepath) - CONF.import_opt('backend', 'cloudkitty.storage', 'storage') storage_args = {'period': CONF.collect.period} self.storage = driver.DriverManager( @@ -144,13 +134,6 @@ class Orchestrator(object): self.b_processors = {} self._load_billing_processors() - # Output settings - output_pipeline = named.NamedExtensionManager( - WRITERS_NAMESPACE, - CONF.output.pipeline) - for writer in output_pipeline: - self.wo.add_writer(writer.plugin) - # RPC self.server = None self._billing_endpoint = BillingEndpoint(self) @@ -262,14 +245,10 @@ class Orchestrator(object): processor.process(data) # Writing - # Copy data to keep old behaviour with write_orchestrator - wo_data = list(data) - self.wo.append(wo_data) self.storage.append(data) # We're getting a full period so we directly commit - self.wo.commit() self.storage.commit() def terminate(self): - self.wo.close() + pass diff --git a/cloudkitty/write_orchestrator.py b/cloudkitty/write_orchestrator.py index b348f46b..14c94f75 100644 --- a/cloudkitty/write_orchestrator.py +++ b/cloudkitty/write_orchestrator.py @@ -15,60 +15,15 @@ # # @author: Stéphane Albert # -import datetime -import json -import os.path -import zipfile +from oslo.config import cfg +from stevedore import named -import cloudkitty.utils as utils +from cloudkitty import state +from cloudkitty import storage +from cloudkitty import utils as ck_utils - -class OSRTFBackend(object): - """Native backend for transient report storage. - - Used to store data from the output of the billing pipeline. - """ - - def __init__(self): - self._osrtf = None - - def open(self, filename): - # FIXME(sheeprine): ZipFile is working well with filename - # but not backend - self._osrtf = zipfile.ZipFile(filename, 'a') - - def _gen_filename(self, timeframe): - filename = '{}-{:02d}-{:02d}-{}-{}.json'.format(timeframe.year, - timeframe.month, - timeframe.day, - timeframe.hour, - timeframe.minute) - return filename - - def _file_exists(self, filename): - for file_info in self._osrtf.infolist(): - if file_info.filename == filename: - return True - return False - - def add(self, timeframe, data): - """Add the data to the OpenStack Report Transient Format.""" - filename = self._gen_filename(timeframe) - # We can only check for the existence of a file not rewrite or delete - # it - if not self._file_exists(filename): - self._osrtf.writestr(filename, json.dumps(data)) - - def get(self, timeframe): - try: - filename = self._gen_filename(timeframe) - data = json.loads(self._osrtf.read(filename)) - return data - except Exception: - pass - - def close(self): - self._osrtf.close() +CONF = cfg.CONF +WRITERS_NAMESPACE = 'cloudkitty.output.writers' class WriteOrchestrator(object): @@ -80,28 +35,32 @@ class WriteOrchestrator(object): def __init__(self, backend, user_id, - state_manager, + storage, basepath=None, period=3600): self._backend = backend self._uid = user_id - self._period = period - self._sm = state_manager + self._storage = storage self._basepath = basepath - self._osrtf = None + self._period = period + self._sm = state.DBStateManager(self._uid, + 'writer_status') self._write_pipeline = [] # State vars self.usage_start = None - self.usage_start_dt = None self.usage_end = None - self.usage_end_dt = None # Current total self.total = 0 - # Current usage period lines - self._usage_data = {} + def init_writing_pipeline(self): + CONF.import_opt('pipeline', 'cloudkitty.config', 'output') + output_pipeline = named.NamedExtensionManager( + WRITERS_NAMESPACE, + CONF.output.pipeline) + for writer in output_pipeline: + self.add_writer(writer.plugin) def add_writer(self, writer_class): writer = writer_class(self, @@ -110,117 +69,73 @@ class WriteOrchestrator(object): self._basepath) self._write_pipeline.append(writer) - def _gen_osrtf_filename(self, timeframe): - if not isinstance(timeframe, datetime.datetime): - raise TypeError('timeframe should be of type datetime.') - date = '{}-{:02d}'.format(timeframe.year, timeframe.month) - filename = '{}-osrtf-{}.zip'.format(self._uid, date) - return filename - - def _update_state_manager(self): + def _update_state_manager_data(self): self._sm.set_state(self.usage_end) metadata = {'total': self.total} self._sm.set_metadata(metadata) - def _get_state_manager_timeframe(self): + def _load_state_manager_data(self): timeframe = self._sm.get_state() - self.usage_start = datetime.datetime.fromtimestamp(timeframe) - end_frame = timeframe + self._period - self.usage_end = datetime.datetime.fromtimestamp(end_frame) + if timeframe: + self.usage_start = timeframe + self.usage_end = self.usage_start + self._period metadata = self._sm.get_metadata() - self.total = metadata.get('total', 0) - - def _filter_period(self, json_data): - """Detect the best usage period to extract. - - Removes the usage from the json data and returns it. - """ - candidate_ts = None - candidate_idx = 0 - - for idx, usage in enumerate(json_data): - usage_ts = usage['period']['begin'] - if candidate_ts is None or usage_ts < candidate_ts: - candidate_ts = usage_ts - candidate_idx = idx - - if candidate_ts: - return candidate_ts, json_data.pop(candidate_idx)['usage'] - - def _format_data(self, timeframe, data): - beg = utils.dt2ts(timeframe) - end = beg + self._period - final_data = {'period': {'begin': beg, 'end': end}} - final_data['usage'] = data - return [final_data] - - def _open_osrtf(self): - if self._osrtf is None: - self._osrtf = OSRTFBackend() - filename = self._gen_osrtf_filename(self.usage_start_dt) - if self._basepath: - self._osrtf_filename = os.path.join(self._basepath, filename) - self._osrtf.open(self._osrtf_filename) - - def _pre_commit(self): - self._open_osrtf() - - def _commit(self): - self._pre_commit() - - self._osrtf.add(self.usage_start_dt, self._usage_data) - - # Dispatch data to writing pipeline - for backend in self._write_pipeline: - backend.append(self._usage_data, self.usage_start, self.usage_end) - - self._update_state_manager() - - self._usage_data = {} - - self._post_commit() - - def _post_commit(self): - self._osrtf.close() + if metadata: + self.total = metadata.get('total', 0) def _dispatch(self, data): for service in data: - if service in self._usage_data: - self._usage_data[service].extend(data[service]) - else: - self._usage_data[service] = data[service] # Update totals for entry in data[service]: self.total += entry['billing']['price'] + # Dispatch data to writing pipeline + for backend in self._write_pipeline: + backend.append(data, self.usage_start, self.usage_end) - def get_timeframe(self, timeframe): - self._open_osrtf() - data = self._osrtf.get(timeframe) - self._osrtf.close() - return self._format_data(timeframe, data) - - def append(self, raw_data): - while raw_data: - usage_start, data = self._filter_period(raw_data) - if self.usage_end is not None and usage_start >= self.usage_end: - self._commit() - self.usage_start = None - - if self.usage_start is None: - self.usage_start = usage_start - self.usage_end = usage_start + self._period - self.usage_start_dt = ( - datetime.datetime.fromtimestamp(self.usage_start)) - self.usage_end_dt = ( - datetime.datetime.fromtimestamp(self.usage_end)) - - self._dispatch(data) - - def commit(self): - self._commit() + def get_timeframe(self, timeframe, timeframe_end=None): + if not timeframe_end: + timeframe_end = timeframe + self._period + try: + data = self._storage.get_time_frame(timeframe, + timeframe_end) + except storage.NoTimeFrame: + return None + return data def close(self): for writer in self._write_pipeline: writer.close() - if self._osrtf is not None: - self._osrtf.close() + + def _push_data(self): + data = self.get_timeframe(self.usage_start, self.usage_end) + if data: + for timeframe in data: + self._dispatch(timeframe['usage']) + + def _commit_data(self): + for backend in self._write_pipeline: + backend.commit() + + def reset_state(self): + self._load_state_manager_data() + self.usage_end = self._storage.get_state() + self._update_state_manager_data() + + def restart_month(self): + self._load_state_manager_data() + self.usage_end = ck_utils.get_this_month_timestamp() + self._update_state_manager_data() + + def process(self): + self._load_state_manager_data() + storage_state = self._storage.get_state() + if not self.usage_start: + self.usage_start = storage_state + self.usage_end = self.usage_start + self._period + while storage_state > self.usage_start: + self._push_data() + self._commit_data() + self._update_state_manager_data() + self._load_state_manager_data() + storage_state = self._storage.get_state() + self.close() diff --git a/setup.cfg b/setup.cfg index 6fc5f010..989d717e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,6 +24,7 @@ console_scripts = cloudkitty-dbsync = cloudkitty.cli.dbsync:main cloudkitty-processor = cloudkitty.cli.processor:main cloudkitty-storage-init = cloudkitty.cli.storage:main + cloudkitty-writer = cloudkitty.cli.writer:main cloudkitty.collector.backends = ceilometer = cloudkitty.collector.ceilometer:CeilometerCollector