Merge "pipeline: remove transformer support"

This commit is contained in:
Zuul 2018-09-13 09:07:07 +00:00 committed by Gerrit Code Review
commit 494d17f350
23 changed files with 46 additions and 2879 deletions

View File

@ -91,33 +91,22 @@ class Sink(object):
Each sink config is concerned *only* with the transformation rules
and publication conduits for data.
In effect, a sink describes a chain of handlers. The chain starts
with zero or more transformers and ends with one or more publishers.
The first transformer in the chain is passed data from the
corresponding source, takes some action such as deriving rate of
change, performing unit conversion, or aggregating, before passing
the modified data to next step.
The subsequent transformers, if any, handle the data similarly.
In effect, a sink describes a chain of handlers. The chain ends with one or
more publishers.
At the end of the chain, publishers publish the data. The exact
publishing method depends on publisher type, for example, pushing
into data storage via the message bus providing guaranteed delivery,
or for loss-tolerant data UDP may be used.
If no transformers are included in the chain, the publishers are
passed data directly from the sink which are published unchanged.
"""
def __init__(self, conf, cfg, transformer_manager, publisher_manager):
def __init__(self, conf, cfg, publisher_manager):
self.conf = conf
self.cfg = cfg
try:
self.name = cfg['name']
# It's legal to have no transformer specified
self.transformer_cfg = cfg.get('transformers') or []
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
@ -138,30 +127,10 @@ class Sink(object):
exc_info=True)
self.multi_publish = True if len(self.publishers) > 1 else False
self.transformers = self._setup_transformers(cfg, transformer_manager)
def __str__(self):
return self.name
def _setup_transformers(self, cfg, transformer_manager):
transformers = []
for transformer in self.transformer_cfg:
parameter = transformer['parameters'] or {}
try:
ext = transformer_manager[transformer['name']]
except KeyError:
raise PipelineException(
"No transformer named %s loaded" % transformer['name'],
cfg)
transformers.append(ext.plugin(**parameter))
LOG.info(
"Pipeline %(pipeline)s: Setup transformer instance %(name)s "
"with parameter %(param)s" % ({'pipeline': self,
'name': transformer['name'],
'param': parameter}))
return transformers
@staticmethod
def flush():
"""Flush data after all events have been injected to pipeline."""
@ -220,7 +189,7 @@ class PipelineManager(agent.ConfigManagerBase):
NOTIFICATION_IPC = 'ceilometer_ipc'
def __init__(self, conf, cfg_file, transformer_manager):
def __init__(self, conf, cfg_file):
"""Setup the pipelines according to config.
The configuration is supported as follows:
@ -244,13 +213,6 @@ class PipelineManager(agent.ConfigManagerBase):
},
],
"sinks": [{"name": sink_1,
"transformers": [
{"name": "Transformer_1",
"parameters": {"p1": "value"}},
{"name": "Transformer_2",
"parameters": {"p1": "value"}},
],
"publishers": ["publisher_1", "publisher_2"]
},
{"name": sink_2,
@ -268,8 +230,6 @@ class PipelineManager(agent.ConfigManagerBase):
"excluded meter names", wildcard and "excluded meter names", or
only wildcard.
Transformer's name is plugin name in setup.cfg.
Publisher's name is plugin name in setup.cfg
"""
@ -303,7 +263,6 @@ class PipelineManager(agent.ConfigManagerBase):
else:
unique_names.add(name)
sinks[s['name']] = self.pm_sink(self.conf, s,
transformer_manager,
publisher_manager)
unique_names.clear()

View File

@ -5,92 +5,7 @@ sources:
- "*"
sinks:
- meter_sink
- name: cpu_source
meters:
- "cpu"
sinks:
- cpu_sink
- cpu_delta_sink
- name: disk_source
meters:
- "disk.read.bytes"
- "disk.read.requests"
- "disk.write.bytes"
- "disk.write.requests"
- "disk.device.read.bytes"
- "disk.device.read.requests"
- "disk.device.write.bytes"
- "disk.device.write.requests"
sinks:
- disk_sink
- name: network_source
meters:
- "network.incoming.bytes"
- "network.incoming.packets"
- "network.outgoing.bytes"
- "network.outgoing.packets"
sinks:
- network_sink
sinks:
- name: meter_sink
publishers:
- gnocchi://
# All these transformers are deprecated, and will be removed in the future, don't use them.
- name: cpu_sink
transformers:
- name: "rate_of_change"
parameters:
target:
name: "cpu_util"
unit: "%"
type: "gauge"
max: 100
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
publishers:
- gnocchi://
# All these transformers are deprecated, and will be removed in the future, don't use them.
- name: cpu_delta_sink
transformers:
- name: "delta"
parameters:
target:
name: "cpu.delta"
growth_only: True
publishers:
- gnocchi://
# All these transformers are deprecated, and will be removed in the future, don't use them.
- name: disk_sink
transformers:
- name: "rate_of_change"
parameters:
source:
map_from:
name: "(disk\\.device|disk)\\.(read|write)\\.(bytes|requests)"
unit: "(B|request)"
target:
map_to:
name: "\\1.\\2.\\3.rate"
unit: "\\1/s"
type: "gauge"
publishers:
- gnocchi://
# All these transformers are deprecated, and will be removed in the future, don't use them.
- name: network_sink
transformers:
- name: "rate_of_change"
parameters:
source:
map_from:
name: "network\\.(incoming|outgoing)\\.(bytes|packets)"
unit: "(B|packet)"
target:
map_to:
name: "network.\\1.\\2.rate"
unit: "\\1/s"
type: "gauge"
publishers:
- gnocchi://

View File

@ -126,7 +126,7 @@ class EventPipelineManager(base.PipelineManager):
def __init__(self, conf):
super(EventPipelineManager, self).__init__(
conf, conf.event_pipeline_cfg_file, {})
conf, conf.event_pipeline_cfg_file)
def get_main_endpoints(self):
return [EventEndpoint(self.conf, self.publisher())]

View File

@ -73,74 +73,25 @@ class SampleSource(base.PipelineSource):
class SampleSink(base.Sink):
def _transform_sample(self, start, sample):
try:
for transformer in self.transformers[start:]:
sample = transformer.handle_sample(sample)
if not sample:
LOG.debug(
"Pipeline %(pipeline)s: Sample dropped by "
"transformer %(trans)s", {'pipeline': self,
'trans': transformer})
return
return sample
except Exception:
LOG.error("Pipeline %(pipeline)s: Exit after error "
"from transformer %(trans)s "
"for %(smp)s" % {'pipeline': self,
'trans': transformer,
'smp': sample},
exc_info=True)
def _publish_samples(self, start, samples):
def publish_samples(self, samples):
"""Push samples into pipeline for publishing.
:param start: The first transformer that the sample will be injected.
This is mainly for flush() invocation that transformer
may emit samples.
:param samples: Sample list.
"""
transformed_samples = []
if not self.transformers:
transformed_samples = samples
else:
for sample in samples:
LOG.debug(
"Pipeline %(pipeline)s: Transform sample "
"%(smp)s from %(trans)s transformer", {'pipeline': self,
'smp': sample,
'trans': start})
sample = self._transform_sample(start, sample)
if sample:
transformed_samples.append(sample)
if transformed_samples:
if samples:
for p in self.publishers:
try:
p.publish_samples(transformed_samples)
p.publish_samples(samples)
except Exception:
LOG.error("Pipeline %(pipeline)s: Continue after "
"error from publisher %(pub)s"
% {'pipeline': self, 'pub': p},
exc_info=True)
def publish_samples(self, samples):
self._publish_samples(0, samples)
def flush(self):
"""Flush data after all samples have been injected to pipeline."""
for (i, transformer) in enumerate(self.transformers):
try:
self._publish_samples(i + 1,
list(transformer.flush()))
except Exception:
LOG.error("Pipeline %(pipeline)s: Error "
"flushing transformer %(trans)s"
% {'pipeline': self, 'trans': transformer},
exc_info=True)
@staticmethod
def flush():
pass
class SamplePipeline(base.Pipeline):
@ -195,11 +146,7 @@ class SamplePipelineManager(base.PipelineManager):
def __init__(self, conf):
super(SamplePipelineManager, self).__init__(
conf, conf.pipeline_cfg_file, self.get_transform_manager())
@staticmethod
def get_transform_manager():
return extension.ExtensionManager('ceilometer.transformer')
conf, conf.pipeline_cfg_file)
def get_main_endpoints(self):
exts = extension.ExtensionManager(

View File

@ -86,8 +86,6 @@ resources:
vcpus:
cpu:
archive_policy_name: ceilometer-low-rate
cpu.delta:
cpu_util:
cpu_l3_cache:
disk.root.size:
disk.ephemeral.size:
@ -132,8 +130,6 @@ resources:
- resource_type: instance_network_interface
metrics:
network.outgoing.packets.rate:
network.incoming.packets.rate:
network.outgoing.packets:
archive_policy_name: ceilometer-low-rate
network.incoming.packets:
@ -146,8 +142,6 @@ resources:
archive_policy_name: ceilometer-low-rate
network.incoming.packets.error:
archive_policy_name: ceilometer-low-rate
network.outgoing.bytes.rate:
network.incoming.bytes.rate:
network.outgoing.bytes:
archive_policy_name: ceilometer-low-rate
network.incoming.bytes:
@ -160,16 +154,12 @@ resources:
metrics:
disk.device.read.requests:
archive_policy_name: ceilometer-low-rate
disk.device.read.requests.rate:
disk.device.write.requests:
archive_policy_name: ceilometer-low-rate
disk.device.write.requests.rate:
disk.device.read.bytes:
archive_policy_name: ceilometer-low-rate
disk.device.read.bytes.rate:
disk.device.write.bytes:
archive_policy_name: ceilometer-low-rate
disk.device.write.bytes.rate:
disk.device.latency:
disk.device.read.latency:
disk.device.write.latency:

View File

@ -63,7 +63,6 @@ class HttpPublisher(publisher.ConfigPublisherBase):
the sinks like the following:
- name: event_sink
transformers:
publishers:
- http://host:80/path?timeout=1&max_retries=2

View File

@ -102,7 +102,7 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
def publish_samples(self, samples):
"""Publish samples on RPC.
:param samples: Samples from pipeline after transformation.
:param samples: Samples from pipeline.
"""
@ -174,7 +174,7 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
def publish_events(self, events):
"""Send an event message for publishing
:param events: events from pipeline after transformation
:param events: events from pipeline.
"""
ev_list = [utils.message_from_event(
event, self.conf.publisher.telemetry_secret) for event in events]
@ -218,7 +218,6 @@ class NotifierPublisher(MessagingPublisher):
- notifier_sink
sinks:
- name: notifier_sink
transformers:
publishers:
- notifier://[notifier_ip]:[notifier_port]?topic=[topic]&
driver=driver&max_retry=100

View File

@ -36,7 +36,6 @@ class ZaqarPublisher(publisher.ConfigPublisherBase):
- zaqar_sink
sinks:
- name: zaqar_sink
transformers:
publishers:
- zaqar://?queue=meter_queue&ttl=1200
@ -63,7 +62,7 @@ class ZaqarPublisher(publisher.ConfigPublisherBase):
def publish_samples(self, samples):
"""Send a metering message for publishing
:param samples: Samples from pipeline after transformation
:param samples: Samples from pipeline.
"""
queue = self.client.queue(self.queue_name)
messages = [{'body': sample.as_dict(), 'ttl': self.ttl}
@ -73,7 +72,7 @@ class ZaqarPublisher(publisher.ConfigPublisherBase):
def publish_events(self, events):
"""Send an event message for publishing
:param events: events from pipeline after transformation
:param events: events from pipeline.
"""
queue = self.client.queue(self.queue_name)
messages = [{'body': event.serialize(), 'ttl': self.ttl}

File diff suppressed because it is too large Load Diff

View File

@ -677,7 +677,6 @@ class TestPollingAgent(BaseAgent):
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
}
self.setup_polling(poll_cfg)
@ -720,7 +719,6 @@ class TestPollingAgent(BaseAgent):
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
}
self.setup_polling(poll_cfg)
@ -742,7 +740,6 @@ class TestPollingAgent(BaseAgent):
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
}
self.setup_polling(poll_cfg)
@ -771,7 +768,6 @@ class TestPollingAgent(BaseAgent):
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
}
self.setup_polling(poll_cfg)
@ -812,7 +808,6 @@ class TestPollingAgent(BaseAgent):
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
}
self.setup_polling(poll_cfg)

View File

@ -12,9 +12,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import yaml
from ceilometer.pipeline import base
from ceilometer.pipeline import sample as pipeline
from ceilometer import sample
@ -27,7 +24,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
'meters': ['a'],
'sinks': ['test_sink']}
sink = {'name': 'test_sink',
'transformers': [{'name': 'update', 'parameters': {}}],
'publishers': ['test://']}
self.pipeline_cfg = {'sources': [source], 'sinks': [sink]}
@ -39,13 +35,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
})
self.pipeline_cfg['sinks'].append({
'name': 'second_sink',
'transformers': [{
'name': 'update',
'parameters':
{
'append_name': '_new',
}
}],
'publishers': ['new'],
})
@ -57,13 +46,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
})
self.pipeline_cfg['sinks'].append({
'name': 'second_sink',
'transformers': [{
'name': 'update',
'parameters':
{
'append_name': '_new',
}
}],
'publishers': ['except'],
})
@ -113,13 +95,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
self._set_pipeline_cfg('meters', meter_cfg)
self.pipeline_cfg['sinks'].append({
'name': 'second_sink',
'transformers': [{
'name': 'update',
'parameters':
{
'append_name': '_new',
}
}],
'publishers': ['new'],
})
self.pipeline_cfg['sources'][0]['sinks'].append('second_sink')
@ -150,12 +125,11 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
str(pipeline_manager.pipelines[1]))
test_publisher = pipeline_manager.pipelines[0].publishers[0]
new_publisher = pipeline_manager.pipelines[1].publishers[0]
for publisher, sfx in [(test_publisher, '_update'),
(new_publisher, '_new')]:
for publisher in (test_publisher, new_publisher):
self.assertEqual(2, len(publisher.samples))
self.assertEqual(2, publisher.calls)
self.assertEqual('a' + sfx, getattr(publisher.samples[0], "name"))
self.assertEqual('b' + sfx, getattr(publisher.samples[1], "name"))
self.assertEqual('a', getattr(publisher.samples[0], "name"))
self.assertEqual('b', getattr(publisher.samples[1], "name"))
def test_multiple_sources_with_single_sink(self):
self.pipeline_cfg['sources'].append({
@ -193,68 +167,8 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
for publisher in [test_publisher, another_publisher]:
self.assertEqual(2, len(publisher.samples))
self.assertEqual(2, publisher.calls)
self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
self.assertEqual('b_update', getattr(publisher.samples[1], "name"))
transformed_samples = self.TransformerClass.samples
self.assertEqual(2, len(transformed_samples))
self.assertEqual(['a', 'b'],
[getattr(s, 'name') for s in transformed_samples])
def _do_test_rate_of_change_in_boilerplate_pipeline_cfg(self, index,
meters, units):
with open('ceilometer/pipeline/data/pipeline.yaml') as fap:
data = fap.read()
pipeline_cfg = yaml.safe_load(data)
for s in pipeline_cfg['sinks']:
s['publishers'] = ['test://']
name = self.cfg2file(pipeline_cfg)
self.CONF.set_override('pipeline_cfg_file', name)
pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
pipe = pipeline_manager.pipelines[index]
self._do_test_rate_of_change_mapping(pipe, meters, units)
def test_rate_of_change_boilerplate_disk_read_cfg(self):
meters = ('disk.read.bytes', 'disk.read.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
meters,
units)
def test_rate_of_change_boilerplate_disk_write_cfg(self):
meters = ('disk.write.bytes', 'disk.write.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
meters,
units)
def test_rate_of_change_boilerplate_network_incoming_cfg(self):
meters = ('network.incoming.bytes', 'network.incoming.packets')
units = ('B', 'packet')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4,
meters,
units)
def test_rate_of_change_boilerplate_per_disk_device_read_cfg(self):
meters = ('disk.device.read.bytes', 'disk.device.read.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
meters,
units)
def test_rate_of_change_boilerplate_per_disk_device_write_cfg(self):
meters = ('disk.device.write.bytes', 'disk.device.write.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
meters,
units)
def test_rate_of_change_boilerplate_network_outgoing_cfg(self):
meters = ('network.outgoing.bytes', 'network.outgoing.packets')
units = ('B', 'packet')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4,
meters,
units)
self.assertEqual('a', getattr(publisher.samples[0], "name"))
self.assertEqual('b', getattr(publisher.samples[1], "name"))
def test_duplicated_sinks_names(self):
self.pipeline_cfg['sinks'].append({

View File

@ -141,7 +141,6 @@ class BaseRealNotification(BaseNotificationTest):
}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ['test://']
}]
})

View File

@ -1,115 +0,0 @@
#
# Copyright 2016 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
from oslo_utils import timeutils
from oslotest import base
from ceilometer import sample
from ceilometer.transformer import conversions
class AggregatorTransformerTestCase(base.BaseTestCase):
SAMPLE = sample.Sample(
name='cpu',
type=sample.TYPE_CUMULATIVE,
unit='ns',
volume='1234567',
user_id='56c5692032f34041900342503fecab30',
project_id='ac9494df2d9d4e709bac378cceabaf23',
resource_id='1ca738a1-c49c-4401-8346-5c60ebdb03f4',
timestamp="2015-10-29 14:12:15.485877+00:00",
resource_metadata={}
)
def setUp(self):
super(AggregatorTransformerTestCase, self).setUp()
self._sample_offset = 0
def test_init_input_validation(self):
aggregator = conversions.AggregatorTransformer("2", "15", None,
None, None)
self.assertEqual(2, aggregator.size)
self.assertEqual(15, aggregator.retention_time)
def test_init_no_size_or_rention_time(self):
aggregator = conversions.AggregatorTransformer()
self.assertEqual(1, aggregator.size)
self.assertIsNone(aggregator.retention_time)
def test_init_size_zero(self):
aggregator = conversions.AggregatorTransformer(size="0")
self.assertEqual(1, aggregator.size)
self.assertIsNone(aggregator.retention_time)
def test_init_input_validation_size_invalid(self):
self.assertRaises(ValueError, conversions.AggregatorTransformer,
"abc", "15", None, None, None)
def test_init_input_validation_retention_time_invalid(self):
self.assertRaises(ValueError, conversions.AggregatorTransformer,
"2", "abc", None, None, None)
def test_init_no_timestamp(self):
aggregator = conversions.AggregatorTransformer("1", "1", None,
None, None)
self.assertEqual("first", aggregator.timestamp)
def test_init_timestamp_none(self):
aggregator = conversions.AggregatorTransformer("1", "1", None,
None, None, None)
self.assertEqual("first", aggregator.timestamp)
def test_init_timestamp_first(self):
aggregator = conversions.AggregatorTransformer("1", "1", None,
None, None, "first")
self.assertEqual("first", aggregator.timestamp)
def test_init_timestamp_last(self):
aggregator = conversions.AggregatorTransformer("1", "1", None,
None, None, "last")
self.assertEqual("last", aggregator.timestamp)
def test_init_timestamp_invalid(self):
aggregator = conversions.AggregatorTransformer("1", "1", None,
None, None,
"invalid_option")
self.assertEqual("first", aggregator.timestamp)
def test_size_unbounded(self):
aggregator = conversions.AggregatorTransformer(size="0",
retention_time="300")
self._insert_sample_data(aggregator)
samples = aggregator.flush()
self.assertEqual([], samples)
def test_size_bounded(self):
aggregator = conversions.AggregatorTransformer(size="100")
self._insert_sample_data(aggregator)
samples = aggregator.flush()
self.assertEqual(100, len(samples))
def _insert_sample_data(self, aggregator):
for _ in range(100):
sample = copy.copy(self.SAMPLE)
sample.resource_id = sample.resource_id + str(self._sample_offset)
sample.timestamp = datetime.datetime.isoformat(timeutils.utcnow())
aggregator.handle_sample(sample)
self._sample_offset += 1

View File

@ -1,73 +0,0 @@
#
# Copyright 2013 Intel Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import collections
import six
@six.add_metaclass(abc.ABCMeta)
class TransformerBase(object):
"""Base class for plugins that transform the sample."""
def __init__(self, **kwargs):
"""Setup transformer.
Each time a transformed is involved in a pipeline, a new transformer
instance is created and chained into the pipeline. i.e. transformer
instance is per pipeline. This helps if transformer need keep some
cache and per-pipeline information.
:param kwargs: The parameters that are defined in pipeline config file.
"""
super(TransformerBase, self).__init__()
@abc.abstractmethod
def handle_sample(self, sample):
"""Transform a sample.
:param sample: A sample.
"""
@staticmethod
def flush():
"""Flush samples cached previously."""
return []
class Namespace(object):
"""Encapsulates the namespace.
Encapsulation is done by wrapping the evaluation of the configured rule.
This allows nested dicts to be accessed in the attribute style,
and missing attributes to yield false when used in a boolean expression.
"""
def __init__(self, seed):
self.__dict__ = collections.defaultdict(lambda: Namespace({}))
self.__dict__.update(seed)
for k, v in six.iteritems(self.__dict__):
if isinstance(v, dict):
self.__dict__[k] = Namespace(v)
def __getattr__(self, attr):
return self.__dict__[attr]
def __getitem__(self, key):
return self.__dict__[key]
def __nonzero__(self):
return len(self.__dict__) > 0
__bool__ = __nonzero__

View File

@ -1,42 +0,0 @@
#
# Copyright 2013 Julien Danjou
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer import transformer
class TransformerAccumulator(transformer.TransformerBase):
"""Transformer that accumulates samples until a threshold.
And then flushes them out into the wild.
"""
def __init__(self, size=1, **kwargs):
if size >= 1:
self.samples = []
self.size = size
super(TransformerAccumulator, self).__init__(**kwargs)
def handle_sample(self, sample):
if self.size >= 1:
self.samples.append(sample)
else:
return sample
def flush(self):
if len(self.samples) >= self.size:
x = self.samples
self.samples = []
return x
return []

View File

@ -1,157 +0,0 @@
#
# Copyright 2014 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import copy
import keyword
import math
import re
from oslo_log import log
import six
from ceilometer.i18n import _
from ceilometer import sample
from ceilometer import transformer
LOG = log.getLogger(__name__)
class ArithmeticTransformer(transformer.TransformerBase):
"""Multi meter arithmetic transformer.
Transformer that performs arithmetic operations
over one or more meters and/or their metadata.
"""
meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)')
def __init__(self, target=None, **kwargs):
super(ArithmeticTransformer, self).__init__(**kwargs)
target = target or {}
self.target = target
self.expr = target.get('expr', '')
self.expr_escaped, self.escaped_names = self.parse_expr(self.expr)
self.required_meters = list(self.escaped_names.values())
self.misconfigured = len(self.required_meters) == 0
if not self.misconfigured:
self.reference_meter = self.required_meters[0]
# convert to set for more efficient contains operation
self.required_meters = set(self.required_meters)
self.cache = collections.defaultdict(dict)
self.latest_timestamp = None
else:
LOG.warning(_('Arithmetic transformer must use at least one'
' meter in expression \'%s\''), self.expr)
def _update_cache(self, _sample):
"""Update the cache with the latest sample."""
escaped_name = self.escaped_names.get(_sample.name, '')
if escaped_name not in self.required_meters:
return
self.cache[_sample.resource_id][escaped_name] = _sample
def _check_requirements(self, resource_id):
"""Check if all the required meters are available in the cache."""
return len(self.cache[resource_id]) == len(self.required_meters)
def _calculate(self, resource_id):
"""Evaluate the expression and return a new sample if successful."""
ns_dict = dict((m, s.as_dict()) for m, s
in six.iteritems(self.cache[resource_id]))
ns = transformer.Namespace(ns_dict)
try:
new_volume = eval(self.expr_escaped, {}, ns)
if math.isnan(new_volume):
raise ArithmeticError(_('Expression evaluated to '
'a NaN value!'))
reference_sample = self.cache[resource_id][self.reference_meter]
return sample.Sample(
name=self.target.get('name', reference_sample.name),
unit=self.target.get('unit', reference_sample.unit),
type=self.target.get('type', reference_sample.type),
volume=float(new_volume),
user_id=reference_sample.user_id,
project_id=reference_sample.project_id,
resource_id=reference_sample.resource_id,
timestamp=self.latest_timestamp,
resource_metadata=reference_sample.resource_metadata
)
except Exception as e:
LOG.warning(_('Unable to evaluate expression %(expr)s: %(exc)s'),
{'expr': self.expr, 'exc': e})
def handle_sample(self, _sample):
self._update_cache(_sample)
self.latest_timestamp = _sample.timestamp
def flush(self):
new_samples = []
if not self.misconfigured:
# When loop self.cache, the dict could not be change by others.
# If changed, will raise "RuntimeError: dictionary changed size
# during iteration". so we make a tmp copy and just loop it.
tmp_cache = copy.copy(self.cache)
for resource_id in tmp_cache:
if self._check_requirements(resource_id):
new_samples.append(self._calculate(resource_id))
if resource_id in self.cache:
self.cache.pop(resource_id)
return new_samples
@classmethod
def parse_expr(cls, expr):
"""Transforms meter names in the expression into valid identifiers.
:param expr: unescaped expression
:return: A tuple of the escaped expression and a dict representing
the translation of meter names into Python identifiers
"""
class Replacer(object):
"""Replaces matched meter names with escaped names.
If the meter name is not followed by parameter access in the
expression, it defaults to accessing the 'volume' parameter.
"""
def __init__(self, original_expr):
self.original_expr = original_expr
self.escaped_map = {}
def __call__(self, match):
meter_name = match.group(1)
escaped_name = self.escape(meter_name)
self.escaped_map[meter_name] = escaped_name
if (match.end(0) == len(self.original_expr) or
self.original_expr[match.end(0)] != '.'):
escaped_name += '.volume'
return escaped_name
@staticmethod
def escape(name):
has_dot = '.' in name
if has_dot:
name = name.replace('.', '_')
if has_dot or name.endswith('ESC') or name in keyword.kwlist:
name = "_" + name + '_ESC'
return name
replacer = Replacer(expr)
expr = re.sub(cls.meter_name_re, replacer, expr)
return expr, replacer.escaped_map

View File

@ -1,344 +0,0 @@
#
# Copyright 2013 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import re
from oslo_log import log
from oslo_utils import timeutils
import six
from ceilometer.i18n import _
from ceilometer import sample
from ceilometer import transformer
LOG = log.getLogger(__name__)
class BaseConversionTransformer(transformer.TransformerBase):
"""Transformer to derive conversion."""
def __init__(self, source=None, target=None, **kwargs):
"""Initialize transformer with configured parameters.
:param source: dict containing source sample unit
:param target: dict containing target sample name, type,
unit and scaling factor (a missing value
connotes no change)
"""
self.source = source or {}
self.target = target or {}
super(BaseConversionTransformer, self).__init__(**kwargs)
def _map(self, s, attr):
"""Apply the name or unit mapping if configured."""
mapped = None
from_ = self.source.get('map_from')
to_ = self.target.get('map_to')
if from_ and to_:
if from_.get(attr) and to_.get(attr):
try:
mapped = re.sub(from_[attr], to_[attr], getattr(s, attr))
except Exception:
pass
return mapped or self.target.get(attr, getattr(s, attr))
class DeltaTransformer(BaseConversionTransformer):
"""Transformer based on the delta of a sample volume."""
def __init__(self, target=None, growth_only=False, **kwargs):
"""Initialize transformer with configured parameters.
:param growth_only: capture only positive deltas
"""
super(DeltaTransformer, self).__init__(target=target, **kwargs)
self.growth_only = growth_only
self.cache = {}
def handle_sample(self, s):
"""Handle a sample, converting if necessary."""
key = s.name + s.resource_id
prev = self.cache.get(key)
timestamp = timeutils.parse_isotime(s.timestamp)
self.cache[key] = (s.volume, timestamp)
if prev:
prev_volume = prev[0]
prev_timestamp = prev[1]
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
# disallow violations of the arrow of time
if time_delta < 0:
LOG.warning('Dropping out of time order sample: %s', (s,))
# Reset the cache to the newer sample.
self.cache[key] = prev
return None
volume_delta = s.volume - prev_volume
if self.growth_only and volume_delta < 0:
LOG.warning('Negative delta detected, dropping value')
s = None
else:
s = self._convert(s, volume_delta)
LOG.debug('Converted to: %s', s)
else:
LOG.warning('Dropping sample with no predecessor: %s', (s,))
s = None
return s
def _convert(self, s, delta):
"""Transform the appropriate sample fields."""
return sample.Sample(
name=self._map(s, 'name'),
unit=s.unit,
type=sample.TYPE_DELTA,
volume=delta,
user_id=s.user_id,
project_id=s.project_id,
resource_id=s.resource_id,
timestamp=s.timestamp,
resource_metadata=s.resource_metadata
)
class ScalingTransformer(BaseConversionTransformer):
"""Transformer to apply a scaling conversion."""
def __init__(self, source=None, target=None, **kwargs):
"""Initialize transformer with configured parameters.
:param source: dict containing source sample unit
:param target: dict containing target sample name, type,
unit and scaling factor (a missing value
connotes no change)
"""
super(ScalingTransformer, self).__init__(source=source, target=target,
**kwargs)
self.scale = self.target.get('scale')
self.max = self.target.get('max')
LOG.debug('scaling conversion transformer with source:'
' %(source)s target: %(target)s:', {'source': self.source,
'target': self.target})
def _scale(self, s):
"""Apply the scaling factor.
Either a straight multiplicative factor or else a string to be eval'd.
"""
ns = transformer.Namespace(s.as_dict())
scale = self.scale
return ((eval(scale, {}, ns) if isinstance(scale, six.string_types)
else s.volume * scale) if scale else s.volume)
def _convert(self, s, growth=1):
"""Transform the appropriate sample fields."""
volume = self._scale(s) * growth
return sample.Sample(
name=self._map(s, 'name'),
unit=self._map(s, 'unit'),
type=self.target.get('type', s.type),
volume=min(volume, self.max) if self.max else volume,
user_id=s.user_id,
project_id=s.project_id,
resource_id=s.resource_id,
timestamp=s.timestamp,
resource_metadata=s.resource_metadata
)
def handle_sample(self, s):
"""Handle a sample, converting if necessary."""
LOG.debug('handling sample %s', s)
if self.source.get('unit', s.unit) == s.unit:
s = self._convert(s)
LOG.debug('converted to: %s', s)
return s
class RateOfChangeTransformer(ScalingTransformer):
"""Transformer based on the rate of change of a sample volume.
For example, taking the current and previous volumes of a cumulative sample
and producing a gauge value based on the proportion of some maximum used.
"""
def __init__(self, **kwargs):
"""Initialize transformer with configured parameters."""
super(RateOfChangeTransformer, self).__init__(**kwargs)
self.cache = {}
self.scale = self.scale or '1'
def handle_sample(self, s):
"""Handle a sample, converting if necessary."""
LOG.debug('handling sample %s', s)
key = s.name + s.resource_id
prev = self.cache.get(key)
timestamp = timeutils.parse_isotime(s.timestamp)
self.cache[key] = (s.volume, timestamp, s.monotonic_time)
if prev:
prev_volume = prev[0]
prev_timestamp = prev[1]
prev_monotonic_time = prev[2]
if (prev_monotonic_time is not None and
s.monotonic_time is not None):
# NOTE(sileht): Prefer high precision timer
time_delta = s.monotonic_time - prev_monotonic_time
else:
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
# disallow violations of the arrow of time
if time_delta < 0:
LOG.warning(_('dropping out of time order sample: %s'), (s,))
# Reset the cache to the newer sample.
self.cache[key] = prev
return None
# we only allow negative volume deltas for noncumulative
# samples, whereas for cumulative we assume that a reset has
# occurred in the interim so that the current volume gives a
# lower bound on growth
volume_delta = (s.volume - prev_volume
if (prev_volume <= s.volume or
s.type != sample.TYPE_CUMULATIVE)
else s.volume)
rate_of_change = ((1.0 * volume_delta / time_delta)
if time_delta else 0.0)
s = self._convert(s, rate_of_change)
LOG.debug('converted to: %s', s)
else:
LOG.warning(_('dropping sample with no predecessor: %s'),
(s,))
s = None
return s
class AggregatorTransformer(ScalingTransformer):
"""Transformer that aggregates samples.
Aggregation goes until a threshold or/and a retention_time, and then
flushes them out into the wild.
Example:
To aggregate sample by resource_metadata and keep the
resource_metadata of the latest received sample;
AggregatorTransformer(retention_time=60, resource_metadata='last')
To aggregate sample by user_id and resource_metadata and keep the
user_id of the first received sample and drop the resource_metadata.
AggregatorTransformer(size=15, user_id='first',
resource_metadata='drop')
To keep the timestamp of the last received sample rather
than the first:
AggregatorTransformer(timestamp="last")
"""
def __init__(self, size=1, retention_time=None,
project_id=None, user_id=None, resource_metadata="last",
timestamp="first", **kwargs):
super(AggregatorTransformer, self).__init__(**kwargs)
self.samples = {}
self.counts = collections.defaultdict(int)
self.size = int(size) if size else None
self.retention_time = float(retention_time) if retention_time else None
if not (self.size or self.retention_time):
self.size = 1
if timestamp in ["first", "last"]:
self.timestamp = timestamp
else:
self.timestamp = "first"
self.initial_timestamp = None
self.aggregated_samples = 0
self.key_attributes = []
self.merged_attribute_policy = {}
self._init_attribute('project_id', project_id)
self._init_attribute('user_id', user_id)
self._init_attribute('resource_metadata', resource_metadata,
is_droppable=True, mandatory=True)
def _init_attribute(self, name, value, is_droppable=False,
mandatory=False):
drop = ['drop'] if is_droppable else []
if value or mandatory:
if value not in ['last', 'first'] + drop:
LOG.warning('%s is unknown (%s), using last' % (name, value))
value = 'last'
self.merged_attribute_policy[name] = value
else:
self.key_attributes.append(name)
def _get_unique_key(self, s):
# NOTE(arezmerita): in samples generated by ceilometer middleware,
# when accessing without authentication publicly readable/writable
# swift containers, the project_id and the user_id are missing.
# They will be replaced by <undefined> for unique key construction.
keys = ['<undefined>' if getattr(s, f) is None else getattr(s, f)
for f in self.key_attributes]
non_aggregated_keys = "-".join(keys)
# NOTE(sileht): it assumes, a meter always have the same unit/type
return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
def handle_sample(self, sample_):
if not self.initial_timestamp:
self.initial_timestamp = timeutils.parse_isotime(sample_.timestamp)
self.aggregated_samples += 1
key = self._get_unique_key(sample_)
self.counts[key] += 1
if key not in self.samples:
self.samples[key] = self._convert(sample_)
if self.merged_attribute_policy[
'resource_metadata'] == 'drop':
self.samples[key].resource_metadata = {}
else:
if self.timestamp == "last":
self.samples[key].timestamp = sample_.timestamp
if sample_.type == sample.TYPE_CUMULATIVE:
self.samples[key].volume = self._scale(sample_)
else:
self.samples[key].volume += self._scale(sample_)
for field in self.merged_attribute_policy:
if self.merged_attribute_policy[field] == 'last':
setattr(self.samples[key], field,
getattr(sample_, field))
def flush(self):
if not self.initial_timestamp:
return []
expired = (self.retention_time and
timeutils.is_older_than(self.initial_timestamp,
self.retention_time))
full = self.size and self.aggregated_samples >= self.size
if full or expired:
x = list(self.samples.values())
# gauge aggregates need to be averages
for s in x:
if s.type == sample.TYPE_GAUGE:
key = self._get_unique_key(s)
s.volume /= self.counts[key]
self.samples.clear()
self.counts.clear()
self.aggregated_samples = 0
self.initial_timestamp = None
return x
return []

View File

@ -405,6 +405,9 @@ if is_service_enabled ceilometer; then
start_ceilometer
elif [[ "$1" == "stack" && "$2" == "test-config" ]]; then
iniset $TEMPEST_CONFIG telemetry alarm_granularity $CEILOMETER_ALARM_GRANULARITY
iniset $TEMPEST_CONFIG telemetry alarm_threshold 10000000000
iniset $TEMPEST_CONFIG telemetry alarm_metric_name cpu
iniset $TEMPEST_CONFIG telemetry alarm_aggregation_method rate:mean
fi
if [[ "$1" == "unstack" ]]; then

View File

@ -19,7 +19,11 @@ fi
# Gnocchi default archive_policy for Ceilometer
# TODO(sileht): when Gnocchi 4.0 is out use the tarball instead
GNOCCHI_GIT_PATH=${GNOCCHI_GIT_PATH:-git+https://github.com/gnocchixyz/gnocchi#egg=gnocchi}
GNOCCHI_ARCHIVE_POLICY=${GNOCCHI_ARCHIVE_POLICY:-ceilometer-low}
if [ -n "$GNOCCHI_ARCHIVE_POLICY_TEMPEST" ]; then
GNOCCHI_ARCHIVE_POLICY=$GNOCCHI_ARCHIVE_POLICY_TEMPEST
else
GNOCCHI_ARCHIVE_POLICY=${GNOCCHI_ARCHIVE_POLICY:-ceilometer-low}
fi
GNOCCHI_CONF_DIR=${GNOCCHI_CONF_DIR:-/etc/gnocchi}
GNOCCHI_CONF=${GNOCCHI_CONF:-${GNOCCHI_CONF_DIR}/gnocchi.conf}
GNOCCHI_COORDINATOR_URL=${CEILOMETER_COORDINATOR_URL:-redis://localhost:6379}

View File

@ -103,14 +103,6 @@ The following meters are collected for OpenStack Compute.
| cpu | Cumu\ | ns | instance | Pollster | Libvirt,| CPU time used |
| | lative| | ID | | Hyper-V | |
+-----------+-------+------+----------+----------+---------+------------------+
| cpu.delta | Delta | ns | instance | Pollster | Libvirt,| CPU time used s\ |
| | | | ID | | Hyper-V | ince previous d\ |
| | | | | | | atapoint |
+-----------+-------+------+----------+----------+---------+------------------+
| cpu_util | Gauge | % | instance | Pollster | LibVirt,| Average CPU |
| | | | ID | | vSphere,| utilization |
| | | | | | XenAPI | |
+-----------+-------+------+----------+----------+---------+------------------+
| vcpus | Gauge | vcpu | instance | Notific\ | Libvirt,| Number of virtual|
| | | | ID | ation | Hyper-V | CPUs allocated to|
| | | | | | | the instance |
@ -118,17 +110,9 @@ The following meters are collected for OpenStack Compute.
| disk.read\| Cumul\| req\ | instance | Pollster | Libvirt,| Number of read |
| .requests | ative | uest | ID | | Hyper-V | requests |
+-----------+-------+------+----------+----------+---------+------------------+
| disk.read\| Gauge | requ\| instance | Pollster | Libvirt,| Average rate of |
| .requests\| | est/s| ID | | Hyper-V,| read requests |
| .rate | | | | | vSphere | |
+-----------+-------+------+----------+----------+---------+------------------+
| disk.writ\| Cumul\| req\ | instance | Pollster | Libvirt,| Number of write |
| e.requests| ative | uest | ID | | Hyper-V | requests |
+-----------+-------+------+----------+----------+---------+------------------+
| disk.writ\| Gauge | requ\| instance | Pollster | Libvirt,| Average rate of |
| e.request\| | est/s| ID | | Hyper-V,| write requests |
| s.rate | | | | | vSphere | |
+-----------+-------+------+----------+----------+---------+------------------+
| disk.read\| Cumu\ | B | instance | Pollster | Libvirt,| Volume of reads |
| .bytes | lative| | ID | | Hyper-V | |
+-----------+-------+------+----------+----------+---------+------------------+
@ -149,38 +133,18 @@ The following meters are collected for OpenStack Compute.
| ice.read\ | lative| uest | | | Hyper-V | requests |
| .requests | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
| disk.dev\ | Gauge | requ\| disk ID | Pollster | Libvirt,| Average rate of |
| ice.read\ | | est/s| | | Hyper-V,| read requests |
| .requests\| | | | | vSphere | |
| .rate | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
| disk.dev\ | Cumu\ | req\ | disk ID | Pollster | Libvirt,| Number of write |
| ice.write\| lative| uest | | | Hyper-V | requests |
| .requests | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
| disk.dev\ | Gauge | requ\| disk ID | Pollster | Libvirt,| Average rate of |
| ice.write\| | est/s| | | Hyper-V,| write requests |
| .requests\| | | | | vSphere | |
| .rate | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
| disk.dev\ | Cumu\ | B | disk ID | Pollster | Libvirt,| Volume of reads |
| ice.read\ | lative| | | | Hyper-V | |
| .bytes | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
| disk.dev\ | Gauge | B/s | disk ID | Pollster | Libvirt,| Average rate of |
| ice.read\ | | | | | Hyper-V,| reads |
| .bytes | | | | | vSphere | |
| .rate | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
| disk.dev\ | Cumu\ | B | disk ID | Pollster | Libvirt,| Volume of writes |
| ice.write\| lative| | | | Hyper-V | |
| .bytes | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
| disk.dev\ | Gauge | B/s | disk ID | Pollster | Libvirt,| Average rate of |
| ice.write\| | | | | Hyper-V,| writes |
| .bytes | | | | | vSphere | |
| .rate | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
| disk.root\| Gauge | GB | instance | Notific\ | Libvirt,| Size of root disk|
| .size | | | ID | ation | Hyper-V | |
+-----------+-------+------+----------+----------+---------+------------------+
@ -236,38 +200,18 @@ The following meters are collected for OpenStack Compute.
| incoming.\| lative| | ID | | Hyper-V | incoming bytes |
| bytes | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
| network.\ | Gauge | B/s | interface| Pollster | Libvirt,| Average rate of |
| incoming.\| | | ID | | Hyper-V,| incoming bytes |
| bytes.rate| | | | | vSphere,| |
| | | | | | XenAPI | |
+-----------+-------+------+----------+----------+---------+------------------+
| network.\ | Cumu\ | B | interface| Pollster | Libvirt,| Number of |
| outgoing\ | lative| | ID | | Hyper-V | outgoing bytes |
| .bytes | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
| network.\ | Gauge | B/s | interface| Pollster | Libvirt,| Average rate of |
| outgoing.\| | | ID | | Hyper-V,| outgoing bytes |
| bytes.rate| | | | | vSphere,| |
| | | | | | XenAPI | |
+-----------+-------+------+----------+----------+---------+------------------+
| network.\ | Cumu\ | pac\ | interface| Pollster | Libvirt,| Number of |
| incoming\ | lative| ket | ID | | Hyper-V | incoming packets |
| .packets | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
| network.\ | Gauge | pack\| interface| Pollster | Libvirt,| Average rate of |
| incoming\ | | et/s | ID | | Hyper-V,| incoming packets |
| .packets\ | | | | | vSphere,| |
| .rate | | | | | XenAPI | |
+-----------+-------+------+----------+----------+---------+------------------+
| network.\ | Cumu\ | pac\ | interface| Pollster | Libvirt,| Number of |
| outgoing\ | lative| ket | ID | | Hyper-V | outgoing packets |
| .packets | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
| network.\ | Gauge | pac\ | interface| Pollster | Libvirt,| Average rate of |
| outgoing\ | | ket/s| ID | | Hyper-V,| outgoing packets |
| .packets\ | | | | | vSphere,| |
| .rate | | | | | XenAPI | |
+-----------+-------+------+----------+----------+---------+------------------+
| **Meters added in the Newton release** |
+-----------+-------+------+----------+----------+---------+------------------+
| cpu_l3_c\ | Gauge | B | instance | Pollster | Libvirt | L3 cache used b\ |
@ -354,50 +298,6 @@ The following meters are collected for OpenStack Compute.
To enable libvirt ``disk.*`` support when running on RBD-backed shared
storage, you need to install libvirt version 1.2.16+.
The Telemetry service supports creating new meters by using transformers, but
this is deprecated and discouraged to use. Among the meters gathered from
libvirt and Hyper-V, there are a few which are derived from other meters. The
list of meters that are created by using the ``rate_of_change`` transformer
from the above table is the following:
- cpu_util
- cpu.delta
- disk.read.requests.rate
- disk.write.requests.rate
- disk.read.bytes.rate
- disk.write.bytes.rate
- disk.device.read.requests.rate
- disk.device.write.requests.rate
- disk.device.read.bytes.rate
- disk.device.write.bytes.rate
- network.incoming.bytes.rate
- network.outgoing.bytes.rate
- network.incoming.packets.rate
- network.outgoing.packets.rate
.. note::
If storing data in Gnocchi, derived rate_of_change metrics are also
computed using Gnocchi in addition to Ceilometer transformers. It avoids
missing data when Ceilometer services restart.
To minimize Ceilometer memory requirements transformers can be disabled.
These ``rate_of_change`` meters are deprecated and will be removed in
default Ceilometer configuration in future release.
OpenStack Compute is capable of collecting ``CPU`` related meters from
the compute host machines. In order to use that you need to set the
``compute_monitors`` option to ``cpu.virt_driver`` in the

View File

@ -0,0 +1,4 @@
---
upgrade:
- |
The support for transformers has been removed from the pipeline.

View File

@ -222,14 +222,6 @@ ceilometer.compute.virt =
ceilometer.hardware.inspectors =
snmp = ceilometer.hardware.inspector.snmp:SNMPInspector
ceilometer.transformer =
accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
delta = ceilometer.transformer.conversions:DeltaTransformer
unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
aggregator = ceilometer.transformer.conversions:AggregatorTransformer
arithmetic = ceilometer.transformer.arithmetic:ArithmeticTransformer
ceilometer.sample.publisher =
test = ceilometer.publisher.test:TestPublisher
notifier = ceilometer.publisher.messaging:SampleNotifierPublisher