Added pluggable transformer support

Change-Id: I49e768dd5c2c2348bc63f7363de94aecc4eb4d39
This commit is contained in:
Stéphane Albert 2014-09-12 15:30:09 +02:00
parent 44e9d76403
commit e1d8bdccf6
8 changed files with 237 additions and 64 deletions

View File

@ -23,10 +23,24 @@ import six
import cloudkitty.utils as utils
class TransformerDependencyError(Exception):
"""Raised when a collector can't find a mandatory transformer."""
def __init__(self, collector, transformer):
super(TransformerDependencyError, self).__init__(
"Transformer '%s' not found, but required by %s" % (transformer,
collector))
self.collector = collector
self.transformer = transformer
@six.add_metaclass(abc.ABCMeta)
class BaseCollector(object):
def __init__(self, **kwargs):
dependencies = []
def __init__(self, transformers, **kwargs):
try:
self.transformers = transformers
self.user = kwargs['user']
self.password = kwargs['password']
self.tenant = kwargs['tenant']
@ -36,9 +50,20 @@ class BaseCollector(object):
except IndexError as e:
raise ValueError("Missing argument (%s)" % e)
self._check_transformers()
self._conn = None
self._connect()
def _check_transformers(self):
"""Check for transformer prerequisites
"""
for dependency in self.dependencies:
if dependency not in self.transformers:
raise TransformerDependencyError(self.collector_name,
dependency)
@abc.abstractmethod
def _connect(self):
"""Connect to the backend

View File

@ -22,12 +22,53 @@ from ceilometerclient import client as cclient
from cloudkitty import collector
class CeilometerCollector(collector.BaseCollector):
def __init__(self, **kwargs):
super(CeilometerCollector, self).__init__(**kwargs)
class ResourceNotFound(Exception):
"""Raised when the resource doesn't exist."""
def __init__(self, resource_type, resource_id):
super(ResourceNotFound, self).__init__(
"No such resource: %s, type: %s" % (resource_id, resource_type))
self.resource_id = resource_id
self.resource_type = resource_type
class CeilometerResourceCacher(object):
def __init__(self):
self._resource_cache = {}
def add_resource_detail(self, resource_type, resource_id, resource_data):
if resource_type not in self._resource_cache:
self._resource_cache[resource_type] = {}
self._resource_cache[resource_type][resource_id] = resource_data
return self._resource_cache[resource_type][resource_id]
def has_resource_detail(self, resource_type, resource_id):
if resource_type in self._resource_cache:
if resource_id in self._resource_cache[resource_type]:
return True
return False
def get_resource_detail(self, resource_type, resource_id):
try:
resource = self._resource_cache[resource_type][resource_id]
return resource
except KeyError:
raise ResourceNotFound(resource_type, resource_id)
class CeilometerCollector(collector.BaseCollector):
collector_name = 'ceilometer'
dependencies = ('CeilometerTransformer',
'CloudKittyFormatTransformer')
def __init__(self, transformers, **kwargs):
super(CeilometerCollector, self).__init__(transformers, **kwargs)
self.t_ceilometer = self.transformers['CeilometerTransformer']
self.t_cloudkitty = self.transformers['CloudKittyFormatTransformer']
self._cacher = CeilometerResourceCacher()
def _connect(self):
"""Initialize connection to the Ceilometer endpoint."""
self._conn = cclient.get_client('2', os_username=self.user,
@ -87,49 +128,20 @@ class CeilometerCollector(collector.BaseCollector):
return [instance.groupby['resource_id'] for instance in instance_stats]
def get_compute(self, start, end=None, project_id=None, q_filter=None):
active_instances = self.get_active_instances(start, end, project_id,
q_filter)
active_instance_ids = self.get_active_instances(start, end, project_id,
q_filter)
compute_data = []
volume_data = {'unit': 'instance', 'qty': 1}
for instance in active_instances:
instance_data = {}
instance_data['desc'] = self.get_resource_detail(instance)
instance_data['desc']['instance_id'] = instance
instance_data['vol'] = volume_data
compute_data.append(instance_data)
data = {}
data['compute'] = compute_data
return data
def _strip_compute(self, data):
res_data = {}
res_data['name'] = data.metadata.get('display_name')
res_data['flavor'] = data.metadata.get('flavor.name')
res_data['vcpus'] = data.metadata.get('vcpus')
res_data['memory'] = data.metadata.get('memory_mb')
res_data['image_id'] = data.metadata.get('image.id')
res_data['availability_zone'] = (
data.metadata.get('OS-EXT-AZ.availability_zone')
)
res_data['project_id'] = data.project_id
res_data['user_id'] = data.user_id
res_data['metadata'] = {}
for field in data.metadata:
if field.startswith('user_metadata'):
res_data['metadata'][field[14:]] = data.metadata[field]
return res_data
def strip_resource_data(self, res_data, res_type='compute'):
if res_type == 'compute':
return self._strip_compute(res_data)
def get_resource_detail(self, resource_id):
if resource_id not in self._resource_cache:
resource = self._conn.resources.get(resource_id)
resource = self.strip_resource_data(resource)
self._resource_cache[resource_id] = resource
return self._resource_cache[resource_id]
for instance_id in active_instance_ids:
if not self._cacher.has_resource_detail('compute', instance_id):
raw_resource = self._conn.resources.get(instance_id)
instance = self.t_ceilometer.strip_resource_data('compute',
raw_resource)
self._cacher.add_resource_detail('compute',
instance_id,
instance)
instance = self._cacher.get_resource_detail('compute',
instance_id)
compute_data.append(self.t_cloudkitty.format_item(instance,
'instance',
1))
return self.t_cloudkitty.format_service('compute', compute_data)

View File

@ -24,6 +24,7 @@ from keystoneclient.v2_0 import client as kclient
from oslo.config import cfg
from oslo import messaging
from stevedore import driver
from stevedore import extension
from stevedore import named
from cloudkitty.common import rpc
@ -41,9 +42,10 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends'
TRANSFORMERS_NAMESPACE = 'cloudkitty.transformers'
PROCESSORS_NAMESPACE = 'cloudkitty.billing.processors'
WRITERS_NAMESPACE = 'cloudkitty.output.writers'
COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends'
class BillingEndpoint(object):
@ -106,7 +108,12 @@ class Orchestrator(object):
self.sm = state.DBStateManager(self.keystone.user_id,
'osrtf')
collector_args = {'user': CONF.auth.username,
# Transformers
self.transformers = {}
self._load_transformers()
collector_args = {'transformers': self.transformers,
'user': CONF.auth.username,
'password': CONF.auth.password,
'tenant': CONF.auth.tenant,
'region': CONF.auth.region,
@ -177,6 +184,17 @@ class Orchestrator(object):
'usage': raw_data}]
return timed_data
def _load_transformers(self):
self.transformers = {}
transformers = extension.ExtensionManager(
TRANSFORMERS_NAMESPACE,
invoke_on_load=True)
for transformer in transformers:
t_name = transformer.name
t_obj = transformer.obj
self.transformers[t_name] = t_obj
def _load_billing_processors(self):
self.b_processors = {}
processors = extension_manager.EnabledExtensionManager(

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class BaseTransformer(object):
def __init__(self):
pass

View File

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
from cloudkitty import transformer
class CeilometerTransformer(transformer.BaseTransformer):
def __init__(self):
pass
def _strip_compute(self, data):
res_data = {}
res_data['name'] = data.metadata.get('display_name')
res_data['flavor'] = data.metadata.get('flavor.name')
res_data['vcpus'] = data.metadata.get('vcpus')
res_data['memory'] = data.metadata.get('memory_mb')
res_data['image_id'] = data.metadata.get('image.id')
res_data['availability_zone'] = (
data.metadata.get('OS-EXT-AZ.availability_zone')
)
res_data['project_id'] = data.project_id
res_data['user_id'] = data.user_id
res_data['metadata'] = {}
for field in data.metadata:
if field.startswith('user_metadata'):
res_data['metadata'][field[14:]] = data.metadata[field]
return res_data
def strip_resource_data(self, res_type, res_data):
if res_type == 'compute':
return self._strip_compute(res_data)

View File

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
from cloudkitty import transformer
class CloudKittyFormatTransformer(transformer.BaseTransformer):
def format_item(self, desc, unit, qty=1):
data = {}
data['desc'] = desc
data['vol'] = {'unit': unit, 'qty': qty}
return data
def format_service(self, service, items):
data = {}
data[service] = items
return data

View File

@ -14,10 +14,6 @@
# Size of RPC connection pool. (integer value)
#rpc_conn_pool_size=30
# Modules of exceptions that are permitted to be recreated
# upon receiving exception data from an rpc call. (list value)
#allowed_rpc_exception_modules=oslo.messaging.exceptions,nova.exception,cinder.exception,exceptions
# Qpid broker hostname. (string value)
#qpid_hostname=localhost
@ -47,6 +43,10 @@
# Whether to disable the Nagle algorithm. (boolean value)
#qpid_tcp_nodelay=true
# The number of prefetched messages held by receiver. (integer
# value)
#qpid_receiver_capacity=1
# The qpid topology version to use. Version 1 is what was
# originally used by impl_qpid. Version 2 includes some
# backwards-incompatible changes that allow broker federation
@ -156,15 +156,6 @@
# Heartbeat time-to-live. (integer value)
#matchmaker_heartbeat_ttl=600
# Host to locate redis. (string value)
#host=127.0.0.1
# Use this port to connect to redis host. (integer value)
#port=6379
# Password for Redis server (optional). (string value)
#password=<None>
# Size of RPC greenthread pool. (integer value)
#rpc_thread_pool_size=64
@ -481,6 +472,22 @@
#db_max_retries=20
[matchmaker_redis]
#
# Options defined in oslo.messaging
#
# Host to locate redis. (string value)
#host=127.0.0.1
# Use this port to connect to redis host. (integer value)
#port=6379
# Password for Redis server (optional). (string value)
#password=<None>
[matchmaker_ring]
#

View File

@ -27,6 +27,10 @@ console_scripts =
cloudkitty.collector.backends =
ceilometer = cloudkitty.collector.ceilometer:CeilometerCollector
cloudkitty.transformers =
CloudKittyFormatTransformer = cloudkitty.transformer.format:CloudKittyFormatTransformer
CeilometerTransformer = cloudkitty.transformer.ceilometer:CeilometerTransformer
cloudkitty.billing.processors =
noop = cloudkitty.billing.noop:Noop
hashmap = cloudkitty.billing.hash:BasicHashMap