diff --git a/ceilometer/pipeline/base.py b/ceilometer/pipeline/base.py index 73114b1d34..c109c92d47 100644 --- a/ceilometer/pipeline/base.py +++ b/ceilometer/pipeline/base.py @@ -91,33 +91,22 @@ class Sink(object): Each sink config is concerned *only* with the transformation rules and publication conduits for data. - In effect, a sink describes a chain of handlers. The chain starts - with zero or more transformers and ends with one or more publishers. - - The first transformer in the chain is passed data from the - corresponding source, takes some action such as deriving rate of - change, performing unit conversion, or aggregating, before passing - the modified data to next step. - - The subsequent transformers, if any, handle the data similarly. + In effect, a sink describes a chain of handlers. The chain ends with one or + more publishers. At the end of the chain, publishers publish the data. The exact publishing method depends on publisher type, for example, pushing into data storage via the message bus providing guaranteed delivery, or for loss-tolerant data UDP may be used. - If no transformers are included in the chain, the publishers are - passed data directly from the sink which are published unchanged. """ - def __init__(self, conf, cfg, transformer_manager, publisher_manager): + def __init__(self, conf, cfg, publisher_manager): self.conf = conf self.cfg = cfg try: self.name = cfg['name'] - # It's legal to have no transformer specified - self.transformer_cfg = cfg.get('transformers') or [] except KeyError as err: raise PipelineException( "Required field %s not specified" % err.args[0], cfg) @@ -138,30 +127,10 @@ class Sink(object): exc_info=True) self.multi_publish = True if len(self.publishers) > 1 else False - self.transformers = self._setup_transformers(cfg, transformer_manager) def __str__(self): return self.name - def _setup_transformers(self, cfg, transformer_manager): - transformers = [] - for transformer in self.transformer_cfg: - parameter = transformer['parameters'] or {} - try: - ext = transformer_manager[transformer['name']] - except KeyError: - raise PipelineException( - "No transformer named %s loaded" % transformer['name'], - cfg) - transformers.append(ext.plugin(**parameter)) - LOG.info( - "Pipeline %(pipeline)s: Setup transformer instance %(name)s " - "with parameter %(param)s" % ({'pipeline': self, - 'name': transformer['name'], - 'param': parameter})) - - return transformers - @staticmethod def flush(): """Flush data after all events have been injected to pipeline.""" @@ -220,7 +189,7 @@ class PipelineManager(agent.ConfigManagerBase): NOTIFICATION_IPC = 'ceilometer_ipc' - def __init__(self, conf, cfg_file, transformer_manager): + def __init__(self, conf, cfg_file): """Setup the pipelines according to config. The configuration is supported as follows: @@ -244,13 +213,6 @@ class PipelineManager(agent.ConfigManagerBase): }, ], "sinks": [{"name": sink_1, - "transformers": [ - {"name": "Transformer_1", - "parameters": {"p1": "value"}}, - - {"name": "Transformer_2", - "parameters": {"p1": "value"}}, - ], "publishers": ["publisher_1", "publisher_2"] }, {"name": sink_2, @@ -268,8 +230,6 @@ class PipelineManager(agent.ConfigManagerBase): "excluded meter names", wildcard and "excluded meter names", or only wildcard. - Transformer's name is plugin name in setup.cfg. - Publisher's name is plugin name in setup.cfg """ @@ -303,7 +263,6 @@ class PipelineManager(agent.ConfigManagerBase): else: unique_names.add(name) sinks[s['name']] = self.pm_sink(self.conf, s, - transformer_manager, publisher_manager) unique_names.clear() diff --git a/ceilometer/pipeline/data/pipeline.yaml b/ceilometer/pipeline/data/pipeline.yaml index 1f1ec989ad..452bbdbbab 100644 --- a/ceilometer/pipeline/data/pipeline.yaml +++ b/ceilometer/pipeline/data/pipeline.yaml @@ -5,92 +5,7 @@ sources: - "*" sinks: - meter_sink - - name: cpu_source - meters: - - "cpu" - sinks: - - cpu_sink - - cpu_delta_sink - - name: disk_source - meters: - - "disk.read.bytes" - - "disk.read.requests" - - "disk.write.bytes" - - "disk.write.requests" - - "disk.device.read.bytes" - - "disk.device.read.requests" - - "disk.device.write.bytes" - - "disk.device.write.requests" - sinks: - - disk_sink - - name: network_source - meters: - - "network.incoming.bytes" - - "network.incoming.packets" - - "network.outgoing.bytes" - - "network.outgoing.packets" - sinks: - - network_sink sinks: - name: meter_sink publishers: - gnocchi:// - - # All these transformers are deprecated, and will be removed in the future, don't use them. - - name: cpu_sink - transformers: - - name: "rate_of_change" - parameters: - target: - name: "cpu_util" - unit: "%" - type: "gauge" - max: 100 - scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" - publishers: - - gnocchi:// - - # All these transformers are deprecated, and will be removed in the future, don't use them. - - name: cpu_delta_sink - transformers: - - name: "delta" - parameters: - target: - name: "cpu.delta" - growth_only: True - publishers: - - gnocchi:// - - # All these transformers are deprecated, and will be removed in the future, don't use them. - - name: disk_sink - transformers: - - name: "rate_of_change" - parameters: - source: - map_from: - name: "(disk\\.device|disk)\\.(read|write)\\.(bytes|requests)" - unit: "(B|request)" - target: - map_to: - name: "\\1.\\2.\\3.rate" - unit: "\\1/s" - type: "gauge" - publishers: - - gnocchi:// - - # All these transformers are deprecated, and will be removed in the future, don't use them. - - name: network_sink - transformers: - - name: "rate_of_change" - parameters: - source: - map_from: - name: "network\\.(incoming|outgoing)\\.(bytes|packets)" - unit: "(B|packet)" - target: - map_to: - name: "network.\\1.\\2.rate" - unit: "\\1/s" - type: "gauge" - publishers: - - gnocchi:// diff --git a/ceilometer/pipeline/event.py b/ceilometer/pipeline/event.py index 4b3f0b6413..996c9bf301 100644 --- a/ceilometer/pipeline/event.py +++ b/ceilometer/pipeline/event.py @@ -126,7 +126,7 @@ class EventPipelineManager(base.PipelineManager): def __init__(self, conf): super(EventPipelineManager, self).__init__( - conf, conf.event_pipeline_cfg_file, {}) + conf, conf.event_pipeline_cfg_file) def get_main_endpoints(self): return [EventEndpoint(self.conf, self.publisher())] diff --git a/ceilometer/pipeline/sample.py b/ceilometer/pipeline/sample.py index f036f1d201..429a8fedb8 100644 --- a/ceilometer/pipeline/sample.py +++ b/ceilometer/pipeline/sample.py @@ -73,74 +73,25 @@ class SampleSource(base.PipelineSource): class SampleSink(base.Sink): - def _transform_sample(self, start, sample): - try: - for transformer in self.transformers[start:]: - sample = transformer.handle_sample(sample) - if not sample: - LOG.debug( - "Pipeline %(pipeline)s: Sample dropped by " - "transformer %(trans)s", {'pipeline': self, - 'trans': transformer}) - return - return sample - except Exception: - LOG.error("Pipeline %(pipeline)s: Exit after error " - "from transformer %(trans)s " - "for %(smp)s" % {'pipeline': self, - 'trans': transformer, - 'smp': sample}, - exc_info=True) - - def _publish_samples(self, start, samples): + def publish_samples(self, samples): """Push samples into pipeline for publishing. - :param start: The first transformer that the sample will be injected. - This is mainly for flush() invocation that transformer - may emit samples. :param samples: Sample list. - """ - transformed_samples = [] - if not self.transformers: - transformed_samples = samples - else: - for sample in samples: - LOG.debug( - "Pipeline %(pipeline)s: Transform sample " - "%(smp)s from %(trans)s transformer", {'pipeline': self, - 'smp': sample, - 'trans': start}) - sample = self._transform_sample(start, sample) - if sample: - transformed_samples.append(sample) - - if transformed_samples: + if samples: for p in self.publishers: try: - p.publish_samples(transformed_samples) + p.publish_samples(samples) except Exception: LOG.error("Pipeline %(pipeline)s: Continue after " "error from publisher %(pub)s" % {'pipeline': self, 'pub': p}, exc_info=True) - def publish_samples(self, samples): - self._publish_samples(0, samples) - - def flush(self): - """Flush data after all samples have been injected to pipeline.""" - - for (i, transformer) in enumerate(self.transformers): - try: - self._publish_samples(i + 1, - list(transformer.flush())) - except Exception: - LOG.error("Pipeline %(pipeline)s: Error " - "flushing transformer %(trans)s" - % {'pipeline': self, 'trans': transformer}, - exc_info=True) + @staticmethod + def flush(): + pass class SamplePipeline(base.Pipeline): @@ -195,11 +146,7 @@ class SamplePipelineManager(base.PipelineManager): def __init__(self, conf): super(SamplePipelineManager, self).__init__( - conf, conf.pipeline_cfg_file, self.get_transform_manager()) - - @staticmethod - def get_transform_manager(): - return extension.ExtensionManager('ceilometer.transformer') + conf, conf.pipeline_cfg_file) def get_main_endpoints(self): exts = extension.ExtensionManager( diff --git a/ceilometer/publisher/data/gnocchi_resources.yaml b/ceilometer/publisher/data/gnocchi_resources.yaml index ea4ec92d0d..7bd5c86823 100644 --- a/ceilometer/publisher/data/gnocchi_resources.yaml +++ b/ceilometer/publisher/data/gnocchi_resources.yaml @@ -86,8 +86,6 @@ resources: vcpus: cpu: archive_policy_name: ceilometer-low-rate - cpu.delta: - cpu_util: cpu_l3_cache: disk.root.size: disk.ephemeral.size: @@ -132,8 +130,6 @@ resources: - resource_type: instance_network_interface metrics: - network.outgoing.packets.rate: - network.incoming.packets.rate: network.outgoing.packets: archive_policy_name: ceilometer-low-rate network.incoming.packets: @@ -146,8 +142,6 @@ resources: archive_policy_name: ceilometer-low-rate network.incoming.packets.error: archive_policy_name: ceilometer-low-rate - network.outgoing.bytes.rate: - network.incoming.bytes.rate: network.outgoing.bytes: archive_policy_name: ceilometer-low-rate network.incoming.bytes: @@ -160,16 +154,12 @@ resources: metrics: disk.device.read.requests: archive_policy_name: ceilometer-low-rate - disk.device.read.requests.rate: disk.device.write.requests: archive_policy_name: ceilometer-low-rate - disk.device.write.requests.rate: disk.device.read.bytes: archive_policy_name: ceilometer-low-rate - disk.device.read.bytes.rate: disk.device.write.bytes: archive_policy_name: ceilometer-low-rate - disk.device.write.bytes.rate: disk.device.latency: disk.device.read.latency: disk.device.write.latency: diff --git a/ceilometer/publisher/http.py b/ceilometer/publisher/http.py index e107fa9adf..6fe88ab9fd 100644 --- a/ceilometer/publisher/http.py +++ b/ceilometer/publisher/http.py @@ -63,7 +63,6 @@ class HttpPublisher(publisher.ConfigPublisherBase): the sinks like the following: - name: event_sink - transformers: publishers: - http://host:80/path?timeout=1&max_retries=2 diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py index 253cec5cb1..1c40b7be4b 100644 --- a/ceilometer/publisher/messaging.py +++ b/ceilometer/publisher/messaging.py @@ -100,7 +100,7 @@ class MessagingPublisher(publisher.ConfigPublisherBase): def publish_samples(self, samples): """Publish samples on RPC. - :param samples: Samples from pipeline after transformation. + :param samples: Samples from pipeline. """ @@ -172,7 +172,7 @@ class MessagingPublisher(publisher.ConfigPublisherBase): def publish_events(self, events): """Send an event message for publishing - :param events: events from pipeline after transformation + :param events: events from pipeline. """ ev_list = [utils.message_from_event( event, self.conf.publisher.telemetry_secret) for event in events] @@ -216,7 +216,6 @@ class NotifierPublisher(MessagingPublisher): - notifier_sink sinks: - name: notifier_sink - transformers: publishers: - notifier://[notifier_ip]:[notifier_port]?topic=[topic]& driver=driver&max_retry=100 diff --git a/ceilometer/publisher/zaqar.py b/ceilometer/publisher/zaqar.py index a337df24b2..bd6eea6f0c 100644 --- a/ceilometer/publisher/zaqar.py +++ b/ceilometer/publisher/zaqar.py @@ -36,7 +36,6 @@ class ZaqarPublisher(publisher.ConfigPublisherBase): - zaqar_sink sinks: - name: zaqar_sink - transformers: publishers: - zaqar://?queue=meter_queue&ttl=1200 @@ -63,7 +62,7 @@ class ZaqarPublisher(publisher.ConfigPublisherBase): def publish_samples(self, samples): """Send a metering message for publishing - :param samples: Samples from pipeline after transformation + :param samples: Samples from pipeline. """ queue = self.client.queue(self.queue_name) messages = [{'body': sample.as_dict(), 'ttl': self.ttl} @@ -73,7 +72,7 @@ class ZaqarPublisher(publisher.ConfigPublisherBase): def publish_events(self, events): """Send an event message for publishing - :param events: events from pipeline after transformation + :param events: events from pipeline. """ queue = self.client.queue(self.queue_name) messages = [{'body': event.serialize(), 'ttl': self.ttl} diff --git a/ceilometer/tests/unit/pipeline_base.py b/ceilometer/tests/unit/pipeline_base.py index 10b212be59..d08b7bd72a 100644 --- a/ceilometer/tests/unit/pipeline_base.py +++ b/ceilometer/tests/unit/pipeline_base.py @@ -15,17 +15,12 @@ # under the License. import abc -import copy -import datetime import traceback -import unittest import fixtures import mock -import monotonic from oslo_utils import timeutils import six -from stevedore import extension from ceilometer.pipeline import base as pipe_base from ceilometer.pipeline import sample as pipeline @@ -34,32 +29,11 @@ from ceilometer.publisher import test as test_publisher from ceilometer import sample from ceilometer import service from ceilometer.tests import base -from ceilometer import transformer -from ceilometer.transformer import accumulator -from ceilometer.transformer import arithmetic -from ceilometer.transformer import conversions @six.add_metaclass(abc.ABCMeta) class BasePipelineTestCase(base.BaseTestCase): - def fake_transform_manager(self): - class_name_ext = { - 'update': self.TransformerClass, - 'except': self.TransformerClassException, - 'drop': self.TransformerClassDrop, - 'accumulator': accumulator.TransformerAccumulator, - 'aggregator': conversions.AggregatorTransformer, - 'unit_conversion': conversions.ScalingTransformer, - 'rate_of_change': conversions.RateOfChangeTransformer, - 'arithmetic': arithmetic.ArithmeticTransformer, - 'delta': conversions.DeltaTransformer, - } - - return extension.ExtensionManager.make_test_instance([ - extension.Extension(name, None, transformer, None) - for name, transformer in class_name_ext.items()]) - def get_publisher(self, conf, url, namespace=''): fake_drivers = {'test://': test_publisher.TestPublisher, 'new://': test_publisher.TestPublisher, @@ -73,47 +47,6 @@ class BasePipelineTestCase(base.BaseTestCase): def publish_events(self, events): raise Exception() - class TransformerClass(transformer.TransformerBase): - samples = [] - - def __init__(self, append_name='_update'): - self.__class__.samples = [] - self.append_name = append_name - - @staticmethod - def flush(): - return [] - - def handle_sample(self, counter): - self.__class__.samples.append(counter) - newname = getattr(counter, 'name') + self.append_name - return sample.Sample( - name=newname, - type=counter.type, - volume=counter.volume, - unit=counter.unit, - user_id=counter.user_id, - project_id=counter.project_id, - resource_id=counter.resource_id, - timestamp=counter.timestamp, - resource_metadata=counter.resource_metadata, - ) - - class TransformerClassDrop(transformer.TransformerBase): - samples = [] - - def __init__(self): - self.__class__.samples = [] - - def handle_sample(self, counter): - self.__class__.samples.append(counter) - - class TransformerClassException(object): - - @staticmethod - def handle_sample(counter): - raise Exception() - def setUp(self): super(BasePipelineTestCase, self).setUp() self.CONF = service.prepare_service([], []) @@ -132,9 +65,6 @@ class BasePipelineTestCase(base.BaseTestCase): self.useFixture(fixtures.MockPatchObject( publisher, 'get_publisher', side_effect=self.get_publisher)) - self.useFixture(fixtures.MockPatchObject( - pipeline.SamplePipelineManager, 'get_transform_manager', - side_effect=self.fake_transform_manager)) self._setup_pipeline_cfg() @@ -188,11 +118,6 @@ class BasePipelineTestCase(base.BaseTestCase): self._unset_pipeline_cfg('meters') self._exception_create_pipelinemanager() - def test_no_transformers(self): - self._unset_pipeline_cfg('transformers') - self._build_and_set_new_pipeline() - pipeline.SamplePipelineManager(self.CONF) - def test_no_name(self): self._unset_pipeline_cfg('name') self._exception_create_pipelinemanager() @@ -220,27 +145,6 @@ class BasePipelineTestCase(base.BaseTestCase): publisher_cfg = ['test_invalid'] self._set_pipeline_cfg('publishers', publisher_cfg) - def test_check_transformer_invalid_transformer(self): - transformer_cfg = [ - {'name': "test_invalid", - 'parameters': {}} - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._exception_create_pipelinemanager() - - def test_publisher_transformer_invoked(self): - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - with pipeline_manager.publisher() as p: - p([self.test_counter]) - - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(1, len(publisher.samples)) - self.assertEqual(1, len(self.TransformerClass.samples)) - self.assertEqual('a_update', getattr(publisher.samples[0], "name")) - self.assertEqual('a', - getattr(self.TransformerClass.samples[0], "name")) - def test_multiple_included_counters(self): counter_cfg = ['a', 'b'] self._set_pipeline_cfg('meters', counter_cfg) @@ -268,9 +172,8 @@ class BasePipelineTestCase(base.BaseTestCase): p([self.test_counter]) self.assertEqual(2, len(publisher.samples)) - self.assertEqual(2, len(self.TransformerClass.samples)) - self.assertEqual('a_update', getattr(publisher.samples[0], "name")) - self.assertEqual('b_update', getattr(publisher.samples[1], "name")) + self.assertEqual('a', getattr(publisher.samples[0], "name")) + self.assertEqual('b', getattr(publisher.samples[1], "name")) @mock.patch('ceilometer.pipeline.sample.LOG') def test_none_volume_counter(self, LOG): @@ -360,8 +263,7 @@ class BasePipelineTestCase(base.BaseTestCase): publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(1, len(publisher.samples)) - self.assertEqual(1, len(self.TransformerClass.samples)) - self.assertEqual('a_update', getattr(publisher.samples[0], "name")) + self.assertEqual('a', getattr(publisher.samples[0], "name")) def test_wildcard_excluded_counters(self): counter_cfg = ['*', '!a'] @@ -380,8 +282,7 @@ class BasePipelineTestCase(base.BaseTestCase): p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(1, len(publisher.samples)) - self.assertEqual(1, len(self.TransformerClass.samples)) - self.assertEqual('a_update', getattr(publisher.samples[0], "name")) + self.assertEqual('a', getattr(publisher.samples[0], "name")) def test_all_excluded_counters_not_excluded(self): counter_cfg = ['!b', '!c'] @@ -393,10 +294,7 @@ class BasePipelineTestCase(base.BaseTestCase): publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(1, len(publisher.samples)) - self.assertEqual(1, len(self.TransformerClass.samples)) - self.assertEqual('a_update', getattr(publisher.samples[0], "name")) - self.assertEqual('a', - getattr(self.TransformerClass.samples[0], "name")) + self.assertEqual('a', getattr(publisher.samples[0], "name")) def test_all_excluded_counters_is_excluded(self): counter_cfg = ['!a', '!c'] @@ -462,16 +360,11 @@ class BasePipelineTestCase(base.BaseTestCase): publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(1, len(publisher.samples)) self.assertEqual(1, publisher.calls) - self.assertEqual('a_update', getattr(publisher.samples[0], "name")) + self.assertEqual('a', getattr(publisher.samples[0], "name")) new_publisher = pipeline_manager.pipelines[1].publishers[0] self.assertEqual(1, len(new_publisher.samples)) self.assertEqual(1, new_publisher.calls) - self.assertEqual('b_new', getattr(new_publisher.samples[0], "name")) - self.assertEqual(2, len(self.TransformerClass.samples)) - self.assertEqual('a', - getattr(self.TransformerClass.samples[0], "name")) - self.assertEqual('b', - getattr(self.TransformerClass.samples[1], "name")) + self.assertEqual('b', getattr(new_publisher.samples[0], "name")) def test_multiple_pipeline_exception(self): self._reraise_exception = False @@ -500,133 +393,7 @@ class BasePipelineTestCase(base.BaseTestCase): publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(1, publisher.calls) self.assertEqual(1, len(publisher.samples)) - self.assertEqual('a_update', getattr(publisher.samples[0], "name")) - self.assertEqual(2, len(self.TransformerClass.samples)) - self.assertEqual('a', - getattr(self.TransformerClass.samples[0], "name")) - self.assertEqual('b', - getattr(self.TransformerClass.samples[1], "name")) - - def test_none_transformer_pipeline(self): - self._set_pipeline_cfg('transformers', None) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - with pipeline_manager.publisher() as p: - p([self.test_counter]) - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(1, len(publisher.samples)) - self.assertEqual(1, publisher.calls) - self.assertEqual('a', getattr(publisher.samples[0], 'name')) - - def test_empty_transformer_pipeline(self): - self._set_pipeline_cfg('transformers', []) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - with pipeline_manager.publisher() as p: - p([self.test_counter]) - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(1, len(publisher.samples)) - self.assertEqual(1, publisher.calls) - self.assertEqual('a', getattr(publisher.samples[0], 'name')) - - def test_multiple_transformer_same_class(self): - transformer_cfg = [ - { - 'name': 'update', - 'parameters': {} - }, - { - 'name': 'update', - 'parameters': {} - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - with pipeline_manager.publisher() as p: - p([self.test_counter]) - - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(1, publisher.calls) - self.assertEqual(1, len(publisher.samples)) - self.assertEqual('a_update_update', - getattr(publisher.samples[0], 'name')) - self.assertEqual(2, len(self.TransformerClass.samples)) - self.assertEqual('a', - getattr(self.TransformerClass.samples[0], 'name')) - self.assertEqual('a_update', - getattr(self.TransformerClass.samples[1], 'name')) - - def test_multiple_transformer_same_class_different_parameter(self): - transformer_cfg = [ - { - 'name': 'update', - 'parameters': - { - "append_name": "_update", - } - }, - { - 'name': 'update', - 'parameters': - { - "append_name": "_new", - } - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - with pipeline_manager.publisher() as p: - p([self.test_counter]) - - self.assertEqual(2, len(self.TransformerClass.samples)) - self.assertEqual('a', - getattr(self.TransformerClass.samples[0], 'name')) - self.assertEqual('a_update', - getattr(self.TransformerClass.samples[1], 'name')) - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(1, - len(publisher.samples)) - self.assertEqual('a_update_new', - getattr(publisher.samples[0], 'name')) - - def test_multiple_transformer_drop_transformer(self): - transformer_cfg = [ - { - 'name': 'update', - 'parameters': - { - "append_name": "_update", - } - }, - { - 'name': 'drop', - 'parameters': {} - }, - { - 'name': 'update', - 'parameters': - { - "append_name": "_new", - } - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - with pipeline_manager.publisher() as p: - p([self.test_counter]) - - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(0, len(publisher.samples)) - self.assertEqual(1, len(self.TransformerClass.samples)) - self.assertEqual('a', - getattr(self.TransformerClass.samples[0], 'name')) - self.assertEqual(1, - len(self.TransformerClassDrop.samples)) - self.assertEqual('a_update', - getattr(self.TransformerClassDrop.samples[0], 'name')) + self.assertEqual('a', getattr(publisher.samples[0], "name")) def test_multiple_publisher(self): self._set_pipeline_cfg('publishers', ['test://', 'new://']) @@ -639,10 +406,8 @@ class BasePipelineTestCase(base.BaseTestCase): new_publisher = pipeline_manager.pipelines[0].publishers[1] self.assertEqual(1, len(publisher.samples)) self.assertEqual(1, len(new_publisher.samples)) - self.assertEqual('a_update', - getattr(new_publisher.samples[0], 'name')) - self.assertEqual('a_update', - getattr(publisher.samples[0], 'name')) + self.assertEqual('a', getattr(new_publisher.samples[0], 'name')) + self.assertEqual('a', getattr(publisher.samples[0], 'name')) def test_multiple_publisher_isolation(self): self._reraise_exception = False @@ -654,8 +419,7 @@ class BasePipelineTestCase(base.BaseTestCase): new_publisher = pipeline_manager.pipelines[0].publishers[1] self.assertEqual(1, len(new_publisher.samples)) - self.assertEqual('a_update', - getattr(new_publisher.samples[0], 'name')) + self.assertEqual('a', getattr(new_publisher.samples[0], 'name')) def test_multiple_counter_pipeline(self): self._set_pipeline_cfg('meters', ['a', 'b']) @@ -677,1493 +441,8 @@ class BasePipelineTestCase(base.BaseTestCase): publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(2, len(publisher.samples)) - self.assertEqual('a_update', getattr(publisher.samples[0], 'name')) - self.assertEqual('b_update', getattr(publisher.samples[1], 'name')) - - def test_flush_pipeline_cache(self): - CACHE_SIZE = 10 - extra_transformer_cfg = [ - { - 'name': 'accumulator', - 'parameters': { - 'size': CACHE_SIZE, - } - }, - { - 'name': 'update', - 'parameters': - { - 'append_name': '_new' - } - }, - ] - self._extend_pipeline_cfg('transformers', extra_transformer_cfg) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - pipe.publish_data(self.test_counter) - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(0, len(publisher.samples)) - pipe.flush() - self.assertEqual(0, len(publisher.samples)) - pipe.publish_data(self.test_counter) - pipe.flush() - self.assertEqual(0, len(publisher.samples)) - for i in range(CACHE_SIZE - 2): - pipe.publish_data(self.test_counter) - pipe.flush() - self.assertEqual(CACHE_SIZE, len(publisher.samples)) - self.assertEqual('a_update_new', getattr(publisher.samples[0], 'name')) - - def test_flush_pipeline_cache_multiple_counter(self): - CACHE_SIZE = 3 - extra_transformer_cfg = [ - { - 'name': 'accumulator', - 'parameters': { - 'size': CACHE_SIZE - } - }, - { - 'name': 'update', - 'parameters': - { - 'append_name': '_new' - } - }, - ] - self._extend_pipeline_cfg('transformers', extra_transformer_cfg) - self._set_pipeline_cfg('meters', ['a', 'b']) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - with pipeline_manager.publisher() as p: - p([self.test_counter, - sample.Sample( - name='b', - type=self.test_counter.type, - volume=self.test_counter.volume, - unit=self.test_counter.unit, - user_id=self.test_counter.user_id, - project_id=self.test_counter.project_id, - resource_id=self.test_counter.resource_id, - timestamp=self.test_counter.timestamp, - resource_metadata=self.test_counter.resource_metadata, - )]) - - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(0, len(publisher.samples)) - - with pipeline_manager.publisher() as p: - p([self.test_counter]) - - self.assertEqual(CACHE_SIZE, len(publisher.samples)) - self.assertEqual('a_update_new', - getattr(publisher.samples[0], 'name')) - self.assertEqual('b_update_new', - getattr(publisher.samples[1], 'name')) - - def test_flush_pipeline_cache_before_publisher(self): - extra_transformer_cfg = [{ - 'name': 'accumulator', - 'parameters': {} - }] - self._extend_pipeline_cfg('transformers', extra_transformer_cfg) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - publisher = pipe.publishers[0] - pipe.publish_data(self.test_counter) - self.assertEqual(0, len(publisher.samples)) - pipe.flush() - self.assertEqual(1, len(publisher.samples)) - self.assertEqual('a_update', - getattr(publisher.samples[0], 'name')) - - def test_global_unit_conversion(self): - scale = 'volume / ((10**6) * 60)' - transformer_cfg = [ - { - 'name': 'unit_conversion', - 'parameters': { - 'source': {}, - 'target': {'name': 'cpu_mins', - 'unit': 'min', - 'scale': scale}, - } - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['cpu']) - counters = [ - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=1200000000, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={} - ), - ] - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - pipe.publish_data(counters) - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(1, len(publisher.samples)) - pipe.flush() - self.assertEqual(1, len(publisher.samples)) - cpu_mins = publisher.samples[-1] - self.assertEqual('cpu_mins', getattr(cpu_mins, 'name')) - self.assertEqual('min', getattr(cpu_mins, 'unit')) - self.assertEqual(sample.TYPE_CUMULATIVE, getattr(cpu_mins, 'type')) - self.assertEqual(20, getattr(cpu_mins, 'volume')) - - # FIXME(sileht): Since the pipeline configuration is loaded from a file - # this tests won't pass anymore because of encoding issue. - @unittest.skip("fixme: unicode failure") - def test_unit_identified_source_unit_conversion(self): - transformer_cfg = [ - { - 'name': 'unit_conversion', - 'parameters': { - 'source': {'unit': '°C'}, - 'target': {'unit': '°F', - 'scale': '(volume * 1.8) + 32'}, - } - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['core_temperature', - 'ambient_temperature']) - counters = [ - sample.Sample( - name='core_temperature', - type=sample.TYPE_GAUGE, - volume=36.0, - unit='°C', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={} - ), - sample.Sample( - name='ambient_temperature', - type=sample.TYPE_GAUGE, - volume=88.8, - unit='°F', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={} - ), - ] - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - pipe.publish_data(counters) - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(2, len(publisher.samples)) - core_temp = publisher.samples[0] - self.assertEqual('core_temperature', getattr(core_temp, 'name')) - self.assertEqual('°F', getattr(core_temp, 'unit')) - self.assertEqual(96.8, getattr(core_temp, 'volume')) - amb_temp = publisher.samples[1] - self.assertEqual('ambient_temperature', getattr(amb_temp, 'name')) - self.assertEqual('°F', getattr(amb_temp, 'unit')) - self.assertEqual(88.8, getattr(amb_temp, 'volume')) - self.assertEqual(96.8, getattr(core_temp, 'volume')) - - def _do_test_rate_of_change_conversion(self, prev, curr, type, expected, - offset=1, weight=None): - s = ("(resource_metadata.user_metadata.autoscaling_weight or 1.0)" - "* (resource_metadata.non.existent or 1.0)" - "* (100.0 / (10**9 * (resource_metadata.cpu_number or 1)))") - transformer_cfg = [ - { - 'name': 'rate_of_change', - 'parameters': { - 'source': {}, - 'target': {'name': 'cpu_util', - 'unit': '%', - 'type': sample.TYPE_GAUGE, - 'scale': s}, - } - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['cpu']) - now = datetime.datetime.utcnow() - later = now + datetime.timedelta(minutes=offset) - um = {'autoscaling_weight': weight} if weight else {} - counters = [ - sample.Sample( - name='cpu', - type=type, - volume=prev, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=now.isoformat(), - resource_metadata={'cpu_number': 4, - 'user_metadata': um}, - ), - sample.Sample( - name='cpu', - type=type, - volume=prev, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource2', - timestamp=now.isoformat(), - resource_metadata={'cpu_number': 2, - 'user_metadata': um}, - ), - sample.Sample( - name='cpu', - type=type, - volume=curr, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=later.isoformat(), - resource_metadata={'cpu_number': 4, - 'user_metadata': um}, - ), - sample.Sample( - name='cpu', - type=type, - volume=curr, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource2', - timestamp=later.isoformat(), - resource_metadata={'cpu_number': 2, - 'user_metadata': um}, - ), - ] - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - pipe.publish_data(counters) - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(2, len(publisher.samples)) - pipe.flush() - self.assertEqual(2, len(publisher.samples)) - cpu_util = publisher.samples[0] - self.assertEqual('cpu_util', getattr(cpu_util, 'name')) - self.assertEqual('test_resource', getattr(cpu_util, 'resource_id')) - self.assertEqual('%', getattr(cpu_util, 'unit')) - self.assertEqual(sample.TYPE_GAUGE, getattr(cpu_util, 'type')) - self.assertEqual(expected, getattr(cpu_util, 'volume')) - cpu_util = publisher.samples[1] - self.assertEqual('cpu_util', getattr(cpu_util, 'name')) - self.assertEqual('test_resource2', getattr(cpu_util, 'resource_id')) - self.assertEqual('%', getattr(cpu_util, 'unit')) - self.assertEqual(sample.TYPE_GAUGE, getattr(cpu_util, 'type')) - self.assertEqual(expected * 2, getattr(cpu_util, 'volume')) - - def test_rate_of_change_conversion(self): - self._do_test_rate_of_change_conversion(120000000000, - 180000000000, - sample.TYPE_CUMULATIVE, - 25.0) - - def test_rate_of_change_conversion_weight(self): - self._do_test_rate_of_change_conversion(120000000000, - 180000000000, - sample.TYPE_CUMULATIVE, - 27.5, - weight=1.1) - - def test_rate_of_change_conversion_negative_cumulative_delta(self): - self._do_test_rate_of_change_conversion(180000000000, - 120000000000, - sample.TYPE_CUMULATIVE, - 50.0) - - def test_rate_of_change_conversion_negative_gauge_delta(self): - self._do_test_rate_of_change_conversion(180000000000, - 120000000000, - sample.TYPE_GAUGE, - -25.0) - - def test_rate_of_change_conversion_zero_delay(self): - self._do_test_rate_of_change_conversion(120000000000, - 120000000000, - sample.TYPE_CUMULATIVE, - 0.0, - offset=0) - - def test_rate_of_change_no_predecessor(self): - s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" - transformer_cfg = [ - { - 'name': 'rate_of_change', - 'parameters': { - 'source': {}, - 'target': {'name': 'cpu_util', - 'unit': '%', - 'type': sample.TYPE_GAUGE, - 'scale': s} - } - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['cpu']) - now = datetime.datetime.utcnow() - counters = [ - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=120000000000, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=now.isoformat(), - resource_metadata={'cpu_number': 4} - ), - ] - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - pipe.publish_data(counters) - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(0, len(publisher.samples)) - pipe.flush() - self.assertEqual(0, len(publisher.samples)) - - def test_rate_of_change_precision(self): - s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" - transformer_cfg = [ - { - 'name': 'rate_of_change', - 'parameters': { - 'source': {}, - 'target': {'name': 'cpu_util', - 'unit': '%', - 'type': sample.TYPE_GAUGE, - 'scale': s} - } - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['cpu']) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - now = datetime.datetime.utcnow() - now_time = monotonic.monotonic() - # Simulate a laggy poller - later = now + datetime.timedelta(seconds=12345) - later_time = now_time + 10 - - counters = [ - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=125000000000, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=now.isoformat(), - monotonic_time=now_time, - resource_metadata={'cpu_number': 4} - ), - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=165000000000, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=later.isoformat(), - monotonic_time=later_time, - resource_metadata={'cpu_number': 4} - ), - ] - - pipe.publish_data(counters) - publisher = pipe.publishers[0] - self.assertEqual(1, len(publisher.samples)) - - cpu_util_sample = publisher.samples[0] - self.assertAlmostEqual(100.0, cpu_util_sample.volume) - - def test_rate_of_change_max(self): - s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" - transformer_cfg = [ - { - 'name': 'rate_of_change', - 'parameters': { - 'source': {}, - 'target': {'name': 'cpu_util', - 'unit': '%', - 'type': sample.TYPE_GAUGE, - 'scale': s, - 'max': 100} - } - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['cpu']) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - now = datetime.datetime.utcnow() - later = now + datetime.timedelta(seconds=10) - rounding = 12345 - - counters = [ - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=125000000000, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=now.isoformat(), - resource_metadata={'cpu_number': 4} - ), - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=165000000000 + rounding, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=later.isoformat(), - resource_metadata={'cpu_number': 4} - ), - ] - - pipe.publish_data(counters) - publisher = pipe.publishers[0] - self.assertEqual(1, len(publisher.samples)) - - cpu_util_sample = publisher.samples[0] - self.assertAlmostEqual(100.0, cpu_util_sample.volume) - - @mock.patch('ceilometer.transformer.conversions.LOG') - def test_rate_of_change_out_of_order(self, the_log): - s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" - transformer_cfg = [ - { - 'name': 'rate_of_change', - 'parameters': { - 'source': {}, - 'target': {'name': 'cpu_util', - 'unit': '%', - 'type': sample.TYPE_GAUGE, - 'scale': s} - } - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['cpu']) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - now = datetime.datetime.utcnow() - earlier = now - datetime.timedelta(seconds=10) - later = now + datetime.timedelta(seconds=10) - - counters = [ - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=125000000000, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=now.isoformat(), - resource_metadata={'cpu_number': 4} - ), - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=120000000000, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=earlier.isoformat(), - resource_metadata={'cpu_number': 4} - ), - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=130000000000, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=later.isoformat(), - resource_metadata={'cpu_number': 4} - ), - ] - - pipe.publish_data(counters) - publisher = pipe.publishers[0] - self.assertEqual(1, len(publisher.samples)) - pipe.flush() - self.assertEqual(1, len(publisher.samples)) - - cpu_util_sample = publisher.samples[0] - self.assertAlmostEqual(12.5, cpu_util_sample.volume) - the_log.warning.assert_called_with( - 'dropping out of time order sample: %s', - (counters[1],) - ) - - def _do_test_rate_of_change_mapping(self, pipe, meters, units): - now = datetime.datetime.utcnow() - base = 1000 - offset = 7 - rate = 42 - later = now + datetime.timedelta(minutes=offset) - counters = [] - for v, ts in [(base, now.isoformat()), - (base + (offset * 60 * rate), later.isoformat())]: - for n, u, r in [(meters[0], units[0], 'resource1'), - (meters[1], units[1], 'resource2')]: - s = sample.Sample( - name=n, - type=sample.TYPE_CUMULATIVE, - volume=v, - unit=u, - user_id='test_user', - project_id='test_proj', - resource_id=r, - timestamp=ts, - resource_metadata={}, - ) - counters.append(s) - - pipe.publish_data(counters) - publisher = pipe.publishers[0] - self.assertEqual(2, len(publisher.samples)) - pipe.flush() - self.assertEqual(2, len(publisher.samples)) - bps = publisher.samples[0] - self.assertEqual('%s.rate' % meters[0], getattr(bps, 'name')) - self.assertEqual('resource1', getattr(bps, 'resource_id')) - self.assertEqual('%s/s' % units[0], getattr(bps, 'unit')) - self.assertEqual(sample.TYPE_GAUGE, getattr(bps, 'type')) - self.assertEqual(rate, getattr(bps, 'volume')) - rps = publisher.samples[1] - self.assertEqual('%s.rate' % meters[1], getattr(rps, 'name')) - self.assertEqual('resource2', getattr(rps, 'resource_id')) - self.assertEqual('%s/s' % units[1], getattr(rps, 'unit')) - self.assertEqual(sample.TYPE_GAUGE, getattr(rps, 'type')) - self.assertEqual(rate, getattr(rps, 'volume')) - - def test_rate_of_change_mapping(self): - map_from = {'name': 'disk\\.(read|write)\\.(bytes|requests)', - 'unit': '(B|request)'} - map_to = {'name': 'disk.\\1.\\2.rate', - 'unit': '\\1/s'} - transformer_cfg = [ - { - 'name': 'rate_of_change', - 'parameters': { - 'source': { - 'map_from': map_from - }, - 'target': { - 'map_to': map_to, - 'type': sample.TYPE_GAUGE - }, - }, - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['disk.read.bytes', - 'disk.write.requests']) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - meters = ('disk.read.bytes', 'disk.write.requests') - units = ('B', 'request') - self._do_test_rate_of_change_mapping(pipe, meters, units) - - def _do_test_aggregator(self, parameters, expected_length): - transformer_cfg = [ - { - 'name': 'aggregator', - 'parameters': parameters, - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['storage.objects.incoming.bytes']) - counters = [ - sample.Sample( - name='storage.objects.incoming.bytes', - type=sample.TYPE_DELTA, - volume=26, - unit='B', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '1.0'} - ), - sample.Sample( - name='storage.objects.incoming.bytes', - type=sample.TYPE_DELTA, - volume=16, - unit='B', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '2.0'} - ), - sample.Sample( - name='storage.objects.incoming.bytes', - type=sample.TYPE_DELTA, - volume=53, - unit='B', - user_id='test_user_bis', - project_id='test_proj_bis', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '1.0'} - ), - sample.Sample( - name='storage.objects.incoming.bytes', - type=sample.TYPE_DELTA, - volume=42, - unit='B', - user_id='test_user_bis', - project_id='test_proj_bis', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '2.0'} - ), - sample.Sample( - name='storage.objects.incoming.bytes', - type=sample.TYPE_DELTA, - volume=15, - unit='B', - user_id='test_user', - project_id='test_proj_bis', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '2.0'} - ), - sample.Sample( - name='storage.objects.incoming.bytes', - type=sample.TYPE_DELTA, - volume=2, - unit='B', - user_id='test_user_bis', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '3.0'} - ), - ] - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - pipe.publish_data(counters) - pipe.flush() - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(expected_length, len(publisher.samples)) - return sorted(publisher.samples, key=lambda s: s.volume) - - def test_aggregator_meter_type(self): - volumes = [1.0, 2.0, 3.0] - transformer_cfg = [ - { - 'name': 'aggregator', - 'parameters': {'size': len(volumes) * len(sample.TYPES)} - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', - ['testgauge', 'testcumulative', 'testdelta']) - counters = [] - for sample_type in sample.TYPES: - for volume in volumes: - counters.append(sample.Sample( - name='test' + sample_type, - type=sample_type, - volume=volume, - unit='B', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '1.0'} - )) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - pipe.publish_data(counters) - pipe.flush() - publisher = pipeline_manager.pipelines[0].publishers[0] - actual = sorted(s.volume for s in publisher.samples) - self.assertEqual([2.0, 3.0, 6.0], actual) - - def test_aggregator_metadata(self): - for conf, expected_version in [('last', '2.0'), ('first', '1.0')]: - samples = self._do_test_aggregator({ - 'resource_metadata': conf, - 'target': {'name': 'aggregated-bytes'} - }, expected_length=4) - s = samples[0] - self.assertEqual('aggregated-bytes', s.name) - self.assertEqual(2, s.volume) - self.assertEqual('test_user_bis', s.user_id) - self.assertEqual('test_proj', s.project_id) - self.assertEqual({'version': '3.0'}, - s.resource_metadata) - s = samples[1] - self.assertEqual('aggregated-bytes', s.name) - self.assertEqual(15, s.volume) - self.assertEqual('test_user', s.user_id) - self.assertEqual('test_proj_bis', s.project_id) - self.assertEqual({'version': '2.0'}, - s.resource_metadata) - s = samples[2] - self.assertEqual('aggregated-bytes', s.name) - self.assertEqual(42, s.volume) - self.assertEqual('test_user', s.user_id) - self.assertEqual('test_proj', s.project_id) - self.assertEqual({'version': expected_version}, - s.resource_metadata) - s = samples[3] - self.assertEqual('aggregated-bytes', s.name) - self.assertEqual(95, s.volume) - self.assertEqual('test_user_bis', s.user_id) - self.assertEqual('test_proj_bis', s.project_id) - self.assertEqual({'version': expected_version}, - s.resource_metadata) - - def test_aggregator_user_last_and_metadata_last(self): - samples = self._do_test_aggregator({ - 'resource_metadata': 'last', - 'user_id': 'last', - 'target': {'name': 'aggregated-bytes'} - }, expected_length=2) - s = samples[0] - self.assertEqual('aggregated-bytes', s.name) - self.assertEqual(44, s.volume) - self.assertEqual('test_user_bis', s.user_id) - self.assertEqual('test_proj', s.project_id) - self.assertEqual({'version': '3.0'}, - s.resource_metadata) - s = samples[1] - self.assertEqual('aggregated-bytes', s.name) - self.assertEqual(110, s.volume) - self.assertEqual('test_user', s.user_id) - self.assertEqual('test_proj_bis', s.project_id) - self.assertEqual({'version': '2.0'}, - s.resource_metadata) - - def test_aggregator_user_first_and_metadata_last(self): - samples = self._do_test_aggregator({ - 'resource_metadata': 'last', - 'user_id': 'first', - 'target': {'name': 'aggregated-bytes'} - }, expected_length=2) - s = samples[0] - self.assertEqual('aggregated-bytes', s.name) - self.assertEqual(44, s.volume) - self.assertEqual('test_user', s.user_id) - self.assertEqual('test_proj', s.project_id) - self.assertEqual({'version': '3.0'}, - s.resource_metadata) - s = samples[1] - self.assertEqual('aggregated-bytes', s.name) - self.assertEqual(110, s.volume) - self.assertEqual('test_user_bis', s.user_id) - self.assertEqual('test_proj_bis', s.project_id) - self.assertEqual({'version': '2.0'}, - s.resource_metadata) - - def test_aggregator_all_first(self): - samples = self._do_test_aggregator({ - 'resource_metadata': 'first', - 'user_id': 'first', - 'project_id': 'first', - 'target': {'name': 'aggregated-bytes'} - }, expected_length=1) - s = samples[0] - self.assertEqual('aggregated-bytes', s.name) - self.assertEqual(154, s.volume) - self.assertEqual('test_user', s.user_id) - self.assertEqual('test_proj', s.project_id) - self.assertEqual({'version': '1.0'}, - s.resource_metadata) - - def test_aggregator_all_last(self): - samples = self._do_test_aggregator({ - 'resource_metadata': 'last', - 'user_id': 'last', - 'project_id': 'last', - 'target': {'name': 'aggregated-bytes'} - }, expected_length=1) - s = samples[0] - self.assertEqual('aggregated-bytes', s.name) - self.assertEqual(154, s.volume) - self.assertEqual('test_user_bis', s.user_id) - self.assertEqual('test_proj', s.project_id) - self.assertEqual({'version': '3.0'}, - s.resource_metadata) - - def test_aggregator_all_mixed(self): - samples = self._do_test_aggregator({ - 'resource_metadata': 'drop', - 'user_id': 'first', - 'project_id': 'last', - 'target': {'name': 'aggregated-bytes'} - }, expected_length=1) - s = samples[0] - self.assertEqual('aggregated-bytes', s.name) - self.assertEqual(154, s.volume) - self.assertEqual('test_user', s.user_id) - self.assertEqual('test_proj', s.project_id) - self.assertEqual({}, s.resource_metadata) - - def test_aggregator_metadata_default(self): - samples = self._do_test_aggregator({ - 'user_id': 'last', - 'project_id': 'last', - 'target': {'name': 'aggregated-bytes'} - }, expected_length=1) - s = samples[0] - self.assertEqual('aggregated-bytes', s.name) - self.assertEqual(154, s.volume) - self.assertEqual('test_user_bis', s.user_id) - self.assertEqual('test_proj', s.project_id) - self.assertEqual({'version': '3.0'}, - s.resource_metadata) - - @mock.patch('ceilometer.transformer.conversions.LOG') - def test_aggregator_metadata_invalid(self, mylog): - samples = self._do_test_aggregator({ - 'resource_metadata': 'invalid', - 'user_id': 'last', - 'project_id': 'last', - 'target': {'name': 'aggregated-bytes'} - }, expected_length=1) - s = samples[0] - self.assertTrue(mylog.warning.called) - self.assertEqual('aggregated-bytes', s.name) - self.assertEqual(154, s.volume) - self.assertEqual('test_user_bis', s.user_id) - self.assertEqual('test_proj', s.project_id) - self.assertEqual({'version': '3.0'}, - s.resource_metadata) - - def test_aggregator_sized_flush(self): - transformer_cfg = [ - { - 'name': 'aggregator', - 'parameters': {'size': 2}, - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['storage.objects.incoming.bytes']) - counters = [ - sample.Sample( - name='storage.objects.incoming.bytes', - type=sample.TYPE_DELTA, - volume=26, - unit='B', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '1.0'} - ), - sample.Sample( - name='storage.objects.incoming.bytes', - type=sample.TYPE_DELTA, - volume=16, - unit='B', - user_id='test_user_bis', - project_id='test_proj_bis', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '2.0'} - ) - ] - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - pipe.publish_data([counters[0]]) - pipe.flush() - publisher = pipe.publishers[0] - self.assertEqual(0, len(publisher.samples)) - - pipe.publish_data([counters[1]]) - pipe.flush() - publisher = pipe.publishers[0] - self.assertEqual(2, len(publisher.samples)) - - @mock.patch.object(timeutils, 'utcnow') - def test_aggregator_timed_flush(self, mock_utcnow): - now = datetime.datetime.utcnow() - mock_utcnow.return_value = now - transformer_cfg = [ - { - 'name': 'aggregator', - 'parameters': {'size': 900, 'retention_time': 60}, - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['storage.objects.incoming.bytes']) - counters = [ - sample.Sample( - name='storage.objects.incoming.bytes', - type=sample.TYPE_DELTA, - volume=26, - unit='B', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '1.0'} - ), - ] - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - pipe.publish_data(counters) - pipe.flush() - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(0, len(publisher.samples)) - - mock_utcnow.return_value = now + datetime.timedelta(seconds=120) - pipe.flush() - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(1, len(publisher.samples)) - - def test_aggregator_without_authentication(self): - transformer_cfg = [ - { - 'name': 'aggregator', - 'parameters': {'size': 2}, - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['storage.objects.outgoing.bytes']) - counters = [ - sample.Sample( - name='storage.objects.outgoing.bytes', - type=sample.TYPE_DELTA, - volume=26, - unit='B', - user_id=None, - project_id=None, - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '1.0'} - ), - sample.Sample( - name='storage.objects.outgoing.bytes', - type=sample.TYPE_DELTA, - volume=16, - unit='B', - user_id=None, - project_id=None, - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '2.0'} - ) - ] - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - pipe.publish_data([counters[0]]) - pipe.flush() - publisher = pipe.publishers[0] - self.assertEqual(0, len(publisher.samples)) - - pipe.publish_data([counters[1]]) - pipe.flush() - publisher = pipe.publishers[0] - - self.assertEqual(1, len(publisher.samples)) - self.assertEqual(42, getattr(publisher.samples[0], 'volume')) - self.assertEqual("test_resource", getattr(publisher.samples[0], - 'resource_id')) - - def test_aggregator_to_rate_of_change_transformer_two_resources(self): - resource_id = ['1ca738a1-c49c-4401-8346-5c60ebdb03f4', - '5dd418a6-c6a9-49c9-9cef-b357d72c71dd'] - - aggregator = conversions.AggregatorTransformer(size="2", - timestamp="last") - - rate_of_change_transformer = conversions.RateOfChangeTransformer() - - counter_time = timeutils.parse_isotime('2016-01-01T12:00:00+00:00') - - for offset in range(2): - counter = copy.copy(self.test_counter) - counter.timestamp = datetime.datetime.isoformat(counter_time) - counter.resource_id = resource_id[0] - counter.volume = offset - counter.type = sample.TYPE_CUMULATIVE - counter.unit = 'ns' - aggregator.handle_sample(counter) - - if offset == 1: - test_time = counter_time - - counter_time = counter_time + datetime.timedelta(0, 1) - - aggregated_counters = aggregator.flush() - self.assertEqual(len(aggregated_counters), 1) - self.assertEqual(aggregated_counters[0].timestamp, - datetime.datetime.isoformat(test_time)) - - rate_of_change_transformer.handle_sample(aggregated_counters[0]) - - for offset in range(2): - counter = copy.copy(self.test_counter) - counter.timestamp = datetime.datetime.isoformat(counter_time) - counter.resource_id = resource_id[offset] - counter.volume = 2 - counter.type = sample.TYPE_CUMULATIVE - counter.unit = 'ns' - aggregator.handle_sample(counter) - - if offset == 0: - test_time = counter_time - - counter_time = counter_time + datetime.timedelta(0, 1) - - aggregated_counters = aggregator.flush() - self.assertEqual(len(aggregated_counters), 2) - - for counter in aggregated_counters: - if counter.resource_id == resource_id[0]: - rateOfChange = rate_of_change_transformer.handle_sample( - counter) - self.assertEqual(counter.timestamp, - datetime.datetime.isoformat(test_time)) - - self.assertEqual(rateOfChange.volume, 1) - - def _do_test_arithmetic_expr_parse(self, expr, expected): - actual = arithmetic.ArithmeticTransformer.parse_expr(expr) - self.assertEqual(expected, actual) - - def test_arithmetic_expr_parse(self): - expr = '$(cpu) + $(cpu.util)' - expected = ('cpu.volume + _cpu_util_ESC.volume', - { - 'cpu': 'cpu', - 'cpu.util': '_cpu_util_ESC' - }) - self._do_test_arithmetic_expr_parse(expr, expected) - - def test_arithmetic_expr_parse_parameter(self): - expr = '$(cpu) + $(cpu.util).resource_metadata' - expected = ('cpu.volume + _cpu_util_ESC.resource_metadata', - { - 'cpu': 'cpu', - 'cpu.util': '_cpu_util_ESC' - }) - self._do_test_arithmetic_expr_parse(expr, expected) - - def test_arithmetic_expr_parse_reserved_keyword(self): - expr = '$(class) + $(cpu.util)' - expected = ('_class_ESC.volume + _cpu_util_ESC.volume', - { - 'class': '_class_ESC', - 'cpu.util': '_cpu_util_ESC' - }) - self._do_test_arithmetic_expr_parse(expr, expected) - - def test_arithmetic_expr_parse_already_escaped(self): - expr = '$(class) + $(_class_ESC)' - expected = ('_class_ESC.volume + __class_ESC_ESC.volume', - { - 'class': '_class_ESC', - '_class_ESC': '__class_ESC_ESC' - }) - self._do_test_arithmetic_expr_parse(expr, expected) - - def _do_test_arithmetic(self, expression, scenario, expected): - transformer_cfg = [ - { - 'name': 'arithmetic', - 'parameters': { - 'target': {'name': 'new_meter', - 'unit': '%', - 'type': sample.TYPE_GAUGE, - 'expr': expression}, - } - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', - list(set(s['name'] for s in scenario))) - counters = [] - test_resources = ['test_resource1', 'test_resource2'] - for resource_id in test_resources: - for s in scenario: - counters.append(sample.Sample( - name=s['name'], - type=sample.TYPE_CUMULATIVE, - volume=s['volume'], - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id=resource_id, - timestamp=timeutils.utcnow().isoformat(), - resource_metadata=s.get('metadata') - )) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - for s in counters: - pipe.publish_data(s) - pipe.flush() - publisher = pipeline_manager.pipelines[0].publishers[0] - expected_len = len(test_resources) * len(expected) - self.assertEqual(expected_len, len(publisher.samples)) - - # bucket samples by resource first - samples_by_resource = dict((r, []) for r in test_resources) - for s in publisher.samples: - samples_by_resource[s.resource_id].append(s) - - for resource_id in samples_by_resource: - self.assertEqual(len(expected), - len(samples_by_resource[resource_id])) - for i, s in enumerate(samples_by_resource[resource_id]): - self.assertEqual('new_meter', getattr(s, 'name')) - self.assertEqual(resource_id, getattr(s, 'resource_id')) - self.assertEqual('%', getattr(s, 'unit')) - self.assertEqual(sample.TYPE_GAUGE, getattr(s, 'type')) - self.assertEqual(expected[i], getattr(s, 'volume')) - - def test_arithmetic_transformer(self): - expression = '100.0 * $(memory.usage) / $(memory)' - scenario = [ - dict(name='memory', volume=1024.0), - dict(name='memory.usage', volume=512.0), - ] - expected = [50.0] - self._do_test_arithmetic(expression, scenario, expected) - - def test_arithmetic_transformer_expr_empty(self): - expression = '' - scenario = [ - dict(name='memory', volume=1024.0), - dict(name='memory.usage', volume=512.0), - ] - expected = [] - self._do_test_arithmetic(expression, scenario, expected) - - def test_arithmetic_transformer_expr_misconfigured(self): - expression = '512.0 * 3' - scenario = [ - dict(name='memory', volume=1024.0), - dict(name='memory.usage', volume=512.0), - ] - expected = [] - self._do_test_arithmetic(expression, scenario, expected) - - def test_arithmetic_transformer_nan(self): - expression = 'float(\'nan\') * $(memory.usage) / $(memory)' - scenario = [ - dict(name='memory', volume=1024.0), - dict(name='memory.usage', volume=512.0), - ] - expected = [] - self._do_test_arithmetic(expression, scenario, expected) - - def test_arithmetic_transformer_exception(self): - expression = '$(memory) / 0' - scenario = [ - dict(name='memory', volume=1024.0), - dict(name='memory.usage', volume=512.0), - ] - expected = [] - self._do_test_arithmetic(expression, scenario, expected) - - def test_arithmetic_transformer_multiple_samples(self): - expression = '100.0 * $(memory.usage) / $(memory)' - scenario = [ - dict(name='memory', volume=2048.0), - dict(name='memory.usage', volume=512.0), - dict(name='memory', volume=1024.0), - ] - expected = [25.0] - self._do_test_arithmetic(expression, scenario, expected) - - def test_arithmetic_transformer_missing(self): - expression = '100.0 * $(memory.usage) / $(memory)' - scenario = [dict(name='memory.usage', volume=512.0)] - expected = [] - self._do_test_arithmetic(expression, scenario, expected) - - def test_arithmetic_transformer_more_than_needed(self): - expression = '100.0 * $(memory.usage) / $(memory)' - scenario = [ - dict(name='memory', volume=1024.0), - dict(name='memory.usage', volume=512.0), - dict(name='cpu_util', volume=90.0), - ] - expected = [50.0] - self._do_test_arithmetic(expression, scenario, expected) - - def test_arithmetic_transformer_cache_cleared(self): - transformer_cfg = [ - { - 'name': 'arithmetic', - 'parameters': { - 'target': {'name': 'new_meter', - 'expr': '$(memory.usage) + 2'} - } - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['memory.usage']) - counter = sample.Sample( - name='memory.usage', - type=sample.TYPE_GAUGE, - volume=1024.0, - unit='MB', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata=None - ) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - pipe.publish_data([counter]) - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(0, len(publisher.samples)) - pipe.flush() - self.assertEqual(1, len(publisher.samples)) - self.assertEqual(1026.0, publisher.samples[0].volume) - - pipe.flush() - self.assertEqual(1, len(publisher.samples)) - - counter.volume = 2048.0 - pipe.publish_data([counter]) - pipe.flush() - self.assertEqual(2, len(publisher.samples)) - self.assertEqual(2050.0, publisher.samples[1].volume) - - @mock.patch.object(timeutils, 'utcnow') - def test_aggregator_timed_flush_no_matching_samples(self, mock_utcnow): - now = datetime.datetime.utcnow() - mock_utcnow.return_value = now - transformer_cfg = [ - { - 'name': 'aggregator', - 'parameters': {'size': 900, 'retention_time': 60}, - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['unrelated-sample']) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - mock_utcnow.return_value = now + datetime.timedelta(seconds=200) - pipe = pipeline_manager.pipelines[0] - pipe.flush() - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(0, len(publisher.samples)) - - def _do_test_delta(self, data, expected, growth_only=False): - transformer_cfg = [ - { - 'name': 'delta', - 'parameters': { - 'target': {'name': 'new_meter'}, - 'growth_only': growth_only, - } - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._set_pipeline_cfg('meters', ['cpu']) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - - pipe.publish_data(data) - pipe.flush() - publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(expected, len(publisher.samples)) - return publisher.samples - - def test_delta_transformer(self): - samples = [ - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=26, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '1.0'} - ), - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=16, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '2.0'} - ), - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=53, - unit='ns', - user_id='test_user_bis', - project_id='test_proj_bis', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '1.0'} - ), - ] - deltas = self._do_test_delta(samples, 2) - self.assertEqual('new_meter', deltas[0].name) - self.assertEqual('delta', deltas[0].type) - self.assertEqual('ns', deltas[0].unit) - self.assertEqual({'version': '2.0'}, deltas[0].resource_metadata) - self.assertEqual(-10, deltas[0].volume) - self.assertEqual('new_meter', deltas[1].name) - self.assertEqual('delta', deltas[1].type) - self.assertEqual('ns', deltas[1].unit) - self.assertEqual({'version': '1.0'}, deltas[1].resource_metadata) - self.assertEqual(37, deltas[1].volume) - - def test_delta_transformer_out_of_order(self): - samples = [ - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=26, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '1.0'} - ), - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=16, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=((timeutils.utcnow() - datetime.timedelta(minutes=5)) - .isoformat()), - resource_metadata={'version': '2.0'} - ), - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=53, - unit='ns', - user_id='test_user_bis', - project_id='test_proj_bis', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '1.0'} - ), - ] - deltas = self._do_test_delta(samples, 1) - self.assertEqual('new_meter', deltas[0].name) - self.assertEqual('delta', deltas[0].type) - self.assertEqual('ns', deltas[0].unit) - self.assertEqual({'version': '1.0'}, deltas[0].resource_metadata) - self.assertEqual(27, deltas[0].volume) - - def test_delta_transformer_growth_only(self): - samples = [ - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=26, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '1.0'} - ), - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=16, - unit='ns', - user_id='test_user', - project_id='test_proj', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '2.0'} - ), - sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - volume=53, - unit='ns', - user_id='test_user_bis', - project_id='test_proj_bis', - resource_id='test_resource', - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={'version': '1.0'} - ), - ] - deltas = self._do_test_delta(samples, 1, True) - self.assertEqual('new_meter', deltas[0].name) - self.assertEqual('delta', deltas[0].type) - self.assertEqual('ns', deltas[0].unit) - self.assertEqual({'version': '1.0'}, deltas[0].resource_metadata) - self.assertEqual(37, deltas[0].volume) + self.assertEqual('a', getattr(publisher.samples[0], 'name')) + self.assertEqual('b', getattr(publisher.samples[1], 'name')) def test_unique_pipeline_names(self): self._dup_pipeline_name_cfg() diff --git a/ceilometer/tests/unit/polling/test_manager.py b/ceilometer/tests/unit/polling/test_manager.py index c9eeb24a02..6c5bccdace 100644 --- a/ceilometer/tests/unit/polling/test_manager.py +++ b/ceilometer/tests/unit/polling/test_manager.py @@ -677,7 +677,6 @@ class TestPollingAgent(BaseAgent): 'sinks': ['test_sink']}], 'sinks': [{ 'name': 'test_sink', - 'transformers': [], 'publishers': ["test"]}] } self.setup_polling(poll_cfg) @@ -720,7 +719,6 @@ class TestPollingAgent(BaseAgent): 'sinks': ['test_sink']}], 'sinks': [{ 'name': 'test_sink', - 'transformers': [], 'publishers': ["test"]}] } self.setup_polling(poll_cfg) @@ -742,7 +740,6 @@ class TestPollingAgent(BaseAgent): 'sinks': ['test_sink']}], 'sinks': [{ 'name': 'test_sink', - 'transformers': [], 'publishers': ["test"]}] } self.setup_polling(poll_cfg) @@ -771,7 +768,6 @@ class TestPollingAgent(BaseAgent): 'sinks': ['test_sink']}], 'sinks': [{ 'name': 'test_sink', - 'transformers': [], 'publishers': ["test"]}] } self.setup_polling(poll_cfg) @@ -812,7 +808,6 @@ class TestPollingAgent(BaseAgent): 'sinks': ['test_sink']}], 'sinks': [{ 'name': 'test_sink', - 'transformers': [], 'publishers': ["test"]}] } self.setup_polling(poll_cfg) diff --git a/ceilometer/tests/unit/test_decoupled_pipeline.py b/ceilometer/tests/unit/test_decoupled_pipeline.py index d894201d9f..d1cfb065c0 100644 --- a/ceilometer/tests/unit/test_decoupled_pipeline.py +++ b/ceilometer/tests/unit/test_decoupled_pipeline.py @@ -12,9 +12,6 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -import yaml - from ceilometer.pipeline import base from ceilometer.pipeline import sample as pipeline from ceilometer import sample @@ -27,7 +24,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): 'meters': ['a'], 'sinks': ['test_sink']} sink = {'name': 'test_sink', - 'transformers': [{'name': 'update', 'parameters': {}}], 'publishers': ['test://']} self.pipeline_cfg = {'sources': [source], 'sinks': [sink]} @@ -39,13 +35,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): }) self.pipeline_cfg['sinks'].append({ 'name': 'second_sink', - 'transformers': [{ - 'name': 'update', - 'parameters': - { - 'append_name': '_new', - } - }], 'publishers': ['new'], }) @@ -57,13 +46,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): }) self.pipeline_cfg['sinks'].append({ 'name': 'second_sink', - 'transformers': [{ - 'name': 'update', - 'parameters': - { - 'append_name': '_new', - } - }], 'publishers': ['except'], }) @@ -113,13 +95,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): self._set_pipeline_cfg('meters', meter_cfg) self.pipeline_cfg['sinks'].append({ 'name': 'second_sink', - 'transformers': [{ - 'name': 'update', - 'parameters': - { - 'append_name': '_new', - } - }], 'publishers': ['new'], }) self.pipeline_cfg['sources'][0]['sinks'].append('second_sink') @@ -150,12 +125,11 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): str(pipeline_manager.pipelines[1])) test_publisher = pipeline_manager.pipelines[0].publishers[0] new_publisher = pipeline_manager.pipelines[1].publishers[0] - for publisher, sfx in [(test_publisher, '_update'), - (new_publisher, '_new')]: + for publisher in (test_publisher, new_publisher): self.assertEqual(2, len(publisher.samples)) self.assertEqual(2, publisher.calls) - self.assertEqual('a' + sfx, getattr(publisher.samples[0], "name")) - self.assertEqual('b' + sfx, getattr(publisher.samples[1], "name")) + self.assertEqual('a', getattr(publisher.samples[0], "name")) + self.assertEqual('b', getattr(publisher.samples[1], "name")) def test_multiple_sources_with_single_sink(self): self.pipeline_cfg['sources'].append({ @@ -193,68 +167,8 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): for publisher in [test_publisher, another_publisher]: self.assertEqual(2, len(publisher.samples)) self.assertEqual(2, publisher.calls) - self.assertEqual('a_update', getattr(publisher.samples[0], "name")) - self.assertEqual('b_update', getattr(publisher.samples[1], "name")) - - transformed_samples = self.TransformerClass.samples - self.assertEqual(2, len(transformed_samples)) - self.assertEqual(['a', 'b'], - [getattr(s, 'name') for s in transformed_samples]) - - def _do_test_rate_of_change_in_boilerplate_pipeline_cfg(self, index, - meters, units): - with open('ceilometer/pipeline/data/pipeline.yaml') as fap: - data = fap.read() - pipeline_cfg = yaml.safe_load(data) - for s in pipeline_cfg['sinks']: - s['publishers'] = ['test://'] - name = self.cfg2file(pipeline_cfg) - self.CONF.set_override('pipeline_cfg_file', name) - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[index] - self._do_test_rate_of_change_mapping(pipe, meters, units) - - def test_rate_of_change_boilerplate_disk_read_cfg(self): - meters = ('disk.read.bytes', 'disk.read.requests') - units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, - meters, - units) - - def test_rate_of_change_boilerplate_disk_write_cfg(self): - meters = ('disk.write.bytes', 'disk.write.requests') - units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, - meters, - units) - - def test_rate_of_change_boilerplate_network_incoming_cfg(self): - meters = ('network.incoming.bytes', 'network.incoming.packets') - units = ('B', 'packet') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4, - meters, - units) - - def test_rate_of_change_boilerplate_per_disk_device_read_cfg(self): - meters = ('disk.device.read.bytes', 'disk.device.read.requests') - units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, - meters, - units) - - def test_rate_of_change_boilerplate_per_disk_device_write_cfg(self): - meters = ('disk.device.write.bytes', 'disk.device.write.requests') - units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, - meters, - units) - - def test_rate_of_change_boilerplate_network_outgoing_cfg(self): - meters = ('network.outgoing.bytes', 'network.outgoing.packets') - units = ('B', 'packet') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4, - meters, - units) + self.assertEqual('a', getattr(publisher.samples[0], "name")) + self.assertEqual('b', getattr(publisher.samples[1], "name")) def test_duplicated_sinks_names(self): self.pipeline_cfg['sinks'].append({ diff --git a/ceilometer/tests/unit/test_notification.py b/ceilometer/tests/unit/test_notification.py index e7d5fca71b..17b2e13d40 100644 --- a/ceilometer/tests/unit/test_notification.py +++ b/ceilometer/tests/unit/test_notification.py @@ -141,7 +141,6 @@ class BaseRealNotification(BaseNotificationTest): }], 'sinks': [{ 'name': 'test_sink', - 'transformers': [], 'publishers': ['test://'] }] }) diff --git a/ceilometer/tests/unit/transformer/__init__.py b/ceilometer/tests/unit/transformer/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/ceilometer/tests/unit/transformer/test_conversions.py b/ceilometer/tests/unit/transformer/test_conversions.py deleted file mode 100644 index c5bc058724..0000000000 --- a/ceilometer/tests/unit/transformer/test_conversions.py +++ /dev/null @@ -1,115 +0,0 @@ -# -# Copyright 2016 IBM Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import copy -import datetime - -from oslo_utils import timeutils -from oslotest import base - -from ceilometer import sample -from ceilometer.transformer import conversions - - -class AggregatorTransformerTestCase(base.BaseTestCase): - SAMPLE = sample.Sample( - name='cpu', - type=sample.TYPE_CUMULATIVE, - unit='ns', - volume='1234567', - user_id='56c5692032f34041900342503fecab30', - project_id='ac9494df2d9d4e709bac378cceabaf23', - resource_id='1ca738a1-c49c-4401-8346-5c60ebdb03f4', - timestamp="2015-10-29 14:12:15.485877+00:00", - resource_metadata={} - ) - - def setUp(self): - super(AggregatorTransformerTestCase, self).setUp() - self._sample_offset = 0 - - def test_init_input_validation(self): - aggregator = conversions.AggregatorTransformer("2", "15", None, - None, None) - self.assertEqual(2, aggregator.size) - self.assertEqual(15, aggregator.retention_time) - - def test_init_no_size_or_rention_time(self): - aggregator = conversions.AggregatorTransformer() - self.assertEqual(1, aggregator.size) - self.assertIsNone(aggregator.retention_time) - - def test_init_size_zero(self): - aggregator = conversions.AggregatorTransformer(size="0") - self.assertEqual(1, aggregator.size) - self.assertIsNone(aggregator.retention_time) - - def test_init_input_validation_size_invalid(self): - self.assertRaises(ValueError, conversions.AggregatorTransformer, - "abc", "15", None, None, None) - - def test_init_input_validation_retention_time_invalid(self): - self.assertRaises(ValueError, conversions.AggregatorTransformer, - "2", "abc", None, None, None) - - def test_init_no_timestamp(self): - aggregator = conversions.AggregatorTransformer("1", "1", None, - None, None) - self.assertEqual("first", aggregator.timestamp) - - def test_init_timestamp_none(self): - aggregator = conversions.AggregatorTransformer("1", "1", None, - None, None, None) - self.assertEqual("first", aggregator.timestamp) - - def test_init_timestamp_first(self): - aggregator = conversions.AggregatorTransformer("1", "1", None, - None, None, "first") - self.assertEqual("first", aggregator.timestamp) - - def test_init_timestamp_last(self): - aggregator = conversions.AggregatorTransformer("1", "1", None, - None, None, "last") - self.assertEqual("last", aggregator.timestamp) - - def test_init_timestamp_invalid(self): - aggregator = conversions.AggregatorTransformer("1", "1", None, - None, None, - "invalid_option") - self.assertEqual("first", aggregator.timestamp) - - def test_size_unbounded(self): - aggregator = conversions.AggregatorTransformer(size="0", - retention_time="300") - self._insert_sample_data(aggregator) - - samples = aggregator.flush() - - self.assertEqual([], samples) - - def test_size_bounded(self): - aggregator = conversions.AggregatorTransformer(size="100") - self._insert_sample_data(aggregator) - - samples = aggregator.flush() - - self.assertEqual(100, len(samples)) - - def _insert_sample_data(self, aggregator): - for _ in range(100): - sample = copy.copy(self.SAMPLE) - sample.resource_id = sample.resource_id + str(self._sample_offset) - sample.timestamp = datetime.datetime.isoformat(timeutils.utcnow()) - aggregator.handle_sample(sample) - self._sample_offset += 1 diff --git a/ceilometer/transformer/__init__.py b/ceilometer/transformer/__init__.py deleted file mode 100644 index 3afffee655..0000000000 --- a/ceilometer/transformer/__init__.py +++ /dev/null @@ -1,73 +0,0 @@ -# -# Copyright 2013 Intel Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc -import collections - -import six - - -@six.add_metaclass(abc.ABCMeta) -class TransformerBase(object): - """Base class for plugins that transform the sample.""" - - def __init__(self, **kwargs): - """Setup transformer. - - Each time a transformed is involved in a pipeline, a new transformer - instance is created and chained into the pipeline. i.e. transformer - instance is per pipeline. This helps if transformer need keep some - cache and per-pipeline information. - - :param kwargs: The parameters that are defined in pipeline config file. - """ - super(TransformerBase, self).__init__() - - @abc.abstractmethod - def handle_sample(self, sample): - """Transform a sample. - - :param sample: A sample. - """ - - @staticmethod - def flush(): - """Flush samples cached previously.""" - return [] - - -class Namespace(object): - """Encapsulates the namespace. - - Encapsulation is done by wrapping the evaluation of the configured rule. - This allows nested dicts to be accessed in the attribute style, - and missing attributes to yield false when used in a boolean expression. - """ - def __init__(self, seed): - self.__dict__ = collections.defaultdict(lambda: Namespace({})) - self.__dict__.update(seed) - for k, v in six.iteritems(self.__dict__): - if isinstance(v, dict): - self.__dict__[k] = Namespace(v) - - def __getattr__(self, attr): - return self.__dict__[attr] - - def __getitem__(self, key): - return self.__dict__[key] - - def __nonzero__(self): - return len(self.__dict__) > 0 - __bool__ = __nonzero__ diff --git a/ceilometer/transformer/accumulator.py b/ceilometer/transformer/accumulator.py deleted file mode 100644 index db7500762e..0000000000 --- a/ceilometer/transformer/accumulator.py +++ /dev/null @@ -1,42 +0,0 @@ -# -# Copyright 2013 Julien Danjou -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from ceilometer import transformer - - -class TransformerAccumulator(transformer.TransformerBase): - """Transformer that accumulates samples until a threshold. - - And then flushes them out into the wild. - """ - - def __init__(self, size=1, **kwargs): - if size >= 1: - self.samples = [] - self.size = size - super(TransformerAccumulator, self).__init__(**kwargs) - - def handle_sample(self, sample): - if self.size >= 1: - self.samples.append(sample) - else: - return sample - - def flush(self): - if len(self.samples) >= self.size: - x = self.samples - self.samples = [] - return x - return [] diff --git a/ceilometer/transformer/arithmetic.py b/ceilometer/transformer/arithmetic.py deleted file mode 100644 index 6039d22afb..0000000000 --- a/ceilometer/transformer/arithmetic.py +++ /dev/null @@ -1,157 +0,0 @@ -# -# Copyright 2014 Red Hat, Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import copy -import keyword -import math -import re - -from oslo_log import log -import six - -from ceilometer.i18n import _ -from ceilometer import sample -from ceilometer import transformer - -LOG = log.getLogger(__name__) - - -class ArithmeticTransformer(transformer.TransformerBase): - """Multi meter arithmetic transformer. - - Transformer that performs arithmetic operations - over one or more meters and/or their metadata. - """ - - meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)') - - def __init__(self, target=None, **kwargs): - super(ArithmeticTransformer, self).__init__(**kwargs) - target = target or {} - self.target = target - self.expr = target.get('expr', '') - self.expr_escaped, self.escaped_names = self.parse_expr(self.expr) - self.required_meters = list(self.escaped_names.values()) - self.misconfigured = len(self.required_meters) == 0 - if not self.misconfigured: - self.reference_meter = self.required_meters[0] - # convert to set for more efficient contains operation - self.required_meters = set(self.required_meters) - self.cache = collections.defaultdict(dict) - self.latest_timestamp = None - else: - LOG.warning(_('Arithmetic transformer must use at least one' - ' meter in expression \'%s\''), self.expr) - - def _update_cache(self, _sample): - """Update the cache with the latest sample.""" - escaped_name = self.escaped_names.get(_sample.name, '') - if escaped_name not in self.required_meters: - return - self.cache[_sample.resource_id][escaped_name] = _sample - - def _check_requirements(self, resource_id): - """Check if all the required meters are available in the cache.""" - return len(self.cache[resource_id]) == len(self.required_meters) - - def _calculate(self, resource_id): - """Evaluate the expression and return a new sample if successful.""" - ns_dict = dict((m, s.as_dict()) for m, s - in six.iteritems(self.cache[resource_id])) - ns = transformer.Namespace(ns_dict) - try: - new_volume = eval(self.expr_escaped, {}, ns) - if math.isnan(new_volume): - raise ArithmeticError(_('Expression evaluated to ' - 'a NaN value!')) - - reference_sample = self.cache[resource_id][self.reference_meter] - return sample.Sample( - name=self.target.get('name', reference_sample.name), - unit=self.target.get('unit', reference_sample.unit), - type=self.target.get('type', reference_sample.type), - volume=float(new_volume), - user_id=reference_sample.user_id, - project_id=reference_sample.project_id, - resource_id=reference_sample.resource_id, - timestamp=self.latest_timestamp, - resource_metadata=reference_sample.resource_metadata - ) - except Exception as e: - LOG.warning(_('Unable to evaluate expression %(expr)s: %(exc)s'), - {'expr': self.expr, 'exc': e}) - - def handle_sample(self, _sample): - self._update_cache(_sample) - self.latest_timestamp = _sample.timestamp - - def flush(self): - new_samples = [] - if not self.misconfigured: - # When loop self.cache, the dict could not be change by others. - # If changed, will raise "RuntimeError: dictionary changed size - # during iteration". so we make a tmp copy and just loop it. - tmp_cache = copy.copy(self.cache) - for resource_id in tmp_cache: - if self._check_requirements(resource_id): - new_samples.append(self._calculate(resource_id)) - if resource_id in self.cache: - self.cache.pop(resource_id) - return new_samples - - @classmethod - def parse_expr(cls, expr): - """Transforms meter names in the expression into valid identifiers. - - :param expr: unescaped expression - :return: A tuple of the escaped expression and a dict representing - the translation of meter names into Python identifiers - """ - - class Replacer(object): - """Replaces matched meter names with escaped names. - - If the meter name is not followed by parameter access in the - expression, it defaults to accessing the 'volume' parameter. - """ - - def __init__(self, original_expr): - self.original_expr = original_expr - self.escaped_map = {} - - def __call__(self, match): - meter_name = match.group(1) - escaped_name = self.escape(meter_name) - self.escaped_map[meter_name] = escaped_name - - if (match.end(0) == len(self.original_expr) or - self.original_expr[match.end(0)] != '.'): - escaped_name += '.volume' - return escaped_name - - @staticmethod - def escape(name): - has_dot = '.' in name - if has_dot: - name = name.replace('.', '_') - - if has_dot or name.endswith('ESC') or name in keyword.kwlist: - name = "_" + name + '_ESC' - return name - - replacer = Replacer(expr) - expr = re.sub(cls.meter_name_re, replacer, expr) - return expr, replacer.escaped_map diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py deleted file mode 100644 index 5c3b809f03..0000000000 --- a/ceilometer/transformer/conversions.py +++ /dev/null @@ -1,344 +0,0 @@ -# -# Copyright 2013 Red Hat, Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import re - -from oslo_log import log -from oslo_utils import timeutils -import six - -from ceilometer.i18n import _ -from ceilometer import sample -from ceilometer import transformer - -LOG = log.getLogger(__name__) - - -class BaseConversionTransformer(transformer.TransformerBase): - """Transformer to derive conversion.""" - - def __init__(self, source=None, target=None, **kwargs): - """Initialize transformer with configured parameters. - - :param source: dict containing source sample unit - :param target: dict containing target sample name, type, - unit and scaling factor (a missing value - connotes no change) - """ - self.source = source or {} - self.target = target or {} - super(BaseConversionTransformer, self).__init__(**kwargs) - - def _map(self, s, attr): - """Apply the name or unit mapping if configured.""" - mapped = None - from_ = self.source.get('map_from') - to_ = self.target.get('map_to') - if from_ and to_: - if from_.get(attr) and to_.get(attr): - try: - mapped = re.sub(from_[attr], to_[attr], getattr(s, attr)) - except Exception: - pass - return mapped or self.target.get(attr, getattr(s, attr)) - - -class DeltaTransformer(BaseConversionTransformer): - """Transformer based on the delta of a sample volume.""" - - def __init__(self, target=None, growth_only=False, **kwargs): - """Initialize transformer with configured parameters. - - :param growth_only: capture only positive deltas - """ - super(DeltaTransformer, self).__init__(target=target, **kwargs) - self.growth_only = growth_only - self.cache = {} - - def handle_sample(self, s): - """Handle a sample, converting if necessary.""" - key = s.name + s.resource_id - prev = self.cache.get(key) - timestamp = timeutils.parse_isotime(s.timestamp) - self.cache[key] = (s.volume, timestamp) - - if prev: - prev_volume = prev[0] - prev_timestamp = prev[1] - time_delta = timeutils.delta_seconds(prev_timestamp, timestamp) - # disallow violations of the arrow of time - if time_delta < 0: - LOG.warning('Dropping out of time order sample: %s', (s,)) - # Reset the cache to the newer sample. - self.cache[key] = prev - return None - volume_delta = s.volume - prev_volume - if self.growth_only and volume_delta < 0: - LOG.warning('Negative delta detected, dropping value') - s = None - else: - s = self._convert(s, volume_delta) - LOG.debug('Converted to: %s', s) - else: - LOG.warning('Dropping sample with no predecessor: %s', (s,)) - s = None - return s - - def _convert(self, s, delta): - """Transform the appropriate sample fields.""" - return sample.Sample( - name=self._map(s, 'name'), - unit=s.unit, - type=sample.TYPE_DELTA, - volume=delta, - user_id=s.user_id, - project_id=s.project_id, - resource_id=s.resource_id, - timestamp=s.timestamp, - resource_metadata=s.resource_metadata - ) - - -class ScalingTransformer(BaseConversionTransformer): - """Transformer to apply a scaling conversion.""" - - def __init__(self, source=None, target=None, **kwargs): - """Initialize transformer with configured parameters. - - :param source: dict containing source sample unit - :param target: dict containing target sample name, type, - unit and scaling factor (a missing value - connotes no change) - """ - super(ScalingTransformer, self).__init__(source=source, target=target, - **kwargs) - self.scale = self.target.get('scale') - self.max = self.target.get('max') - LOG.debug('scaling conversion transformer with source:' - ' %(source)s target: %(target)s:', {'source': self.source, - 'target': self.target}) - - def _scale(self, s): - """Apply the scaling factor. - - Either a straight multiplicative factor or else a string to be eval'd. - """ - ns = transformer.Namespace(s.as_dict()) - - scale = self.scale - return ((eval(scale, {}, ns) if isinstance(scale, six.string_types) - else s.volume * scale) if scale else s.volume) - - def _convert(self, s, growth=1): - """Transform the appropriate sample fields.""" - volume = self._scale(s) * growth - return sample.Sample( - name=self._map(s, 'name'), - unit=self._map(s, 'unit'), - type=self.target.get('type', s.type), - volume=min(volume, self.max) if self.max else volume, - user_id=s.user_id, - project_id=s.project_id, - resource_id=s.resource_id, - timestamp=s.timestamp, - resource_metadata=s.resource_metadata - ) - - def handle_sample(self, s): - """Handle a sample, converting if necessary.""" - LOG.debug('handling sample %s', s) - if self.source.get('unit', s.unit) == s.unit: - s = self._convert(s) - LOG.debug('converted to: %s', s) - return s - - -class RateOfChangeTransformer(ScalingTransformer): - """Transformer based on the rate of change of a sample volume. - - For example, taking the current and previous volumes of a cumulative sample - and producing a gauge value based on the proportion of some maximum used. - """ - - def __init__(self, **kwargs): - """Initialize transformer with configured parameters.""" - super(RateOfChangeTransformer, self).__init__(**kwargs) - self.cache = {} - self.scale = self.scale or '1' - - def handle_sample(self, s): - """Handle a sample, converting if necessary.""" - LOG.debug('handling sample %s', s) - key = s.name + s.resource_id - prev = self.cache.get(key) - timestamp = timeutils.parse_isotime(s.timestamp) - self.cache[key] = (s.volume, timestamp, s.monotonic_time) - - if prev: - prev_volume = prev[0] - prev_timestamp = prev[1] - prev_monotonic_time = prev[2] - if (prev_monotonic_time is not None and - s.monotonic_time is not None): - # NOTE(sileht): Prefer high precision timer - time_delta = s.monotonic_time - prev_monotonic_time - else: - time_delta = timeutils.delta_seconds(prev_timestamp, timestamp) - # disallow violations of the arrow of time - if time_delta < 0: - LOG.warning(_('dropping out of time order sample: %s'), (s,)) - # Reset the cache to the newer sample. - self.cache[key] = prev - return None - # we only allow negative volume deltas for noncumulative - # samples, whereas for cumulative we assume that a reset has - # occurred in the interim so that the current volume gives a - # lower bound on growth - volume_delta = (s.volume - prev_volume - if (prev_volume <= s.volume or - s.type != sample.TYPE_CUMULATIVE) - else s.volume) - rate_of_change = ((1.0 * volume_delta / time_delta) - if time_delta else 0.0) - - s = self._convert(s, rate_of_change) - LOG.debug('converted to: %s', s) - else: - LOG.warning(_('dropping sample with no predecessor: %s'), - (s,)) - s = None - return s - - -class AggregatorTransformer(ScalingTransformer): - """Transformer that aggregates samples. - - Aggregation goes until a threshold or/and a retention_time, and then - flushes them out into the wild. - - Example: - To aggregate sample by resource_metadata and keep the - resource_metadata of the latest received sample; - - AggregatorTransformer(retention_time=60, resource_metadata='last') - - To aggregate sample by user_id and resource_metadata and keep the - user_id of the first received sample and drop the resource_metadata. - - AggregatorTransformer(size=15, user_id='first', - resource_metadata='drop') - - To keep the timestamp of the last received sample rather - than the first: - - AggregatorTransformer(timestamp="last") - - """ - - def __init__(self, size=1, retention_time=None, - project_id=None, user_id=None, resource_metadata="last", - timestamp="first", **kwargs): - super(AggregatorTransformer, self).__init__(**kwargs) - self.samples = {} - self.counts = collections.defaultdict(int) - self.size = int(size) if size else None - self.retention_time = float(retention_time) if retention_time else None - if not (self.size or self.retention_time): - self.size = 1 - - if timestamp in ["first", "last"]: - self.timestamp = timestamp - else: - self.timestamp = "first" - - self.initial_timestamp = None - self.aggregated_samples = 0 - - self.key_attributes = [] - self.merged_attribute_policy = {} - - self._init_attribute('project_id', project_id) - self._init_attribute('user_id', user_id) - self._init_attribute('resource_metadata', resource_metadata, - is_droppable=True, mandatory=True) - - def _init_attribute(self, name, value, is_droppable=False, - mandatory=False): - drop = ['drop'] if is_droppable else [] - if value or mandatory: - if value not in ['last', 'first'] + drop: - LOG.warning('%s is unknown (%s), using last' % (name, value)) - value = 'last' - self.merged_attribute_policy[name] = value - else: - self.key_attributes.append(name) - - def _get_unique_key(self, s): - # NOTE(arezmerita): in samples generated by ceilometer middleware, - # when accessing without authentication publicly readable/writable - # swift containers, the project_id and the user_id are missing. - # They will be replaced by for unique key construction. - keys = ['' if getattr(s, f) is None else getattr(s, f) - for f in self.key_attributes] - non_aggregated_keys = "-".join(keys) - # NOTE(sileht): it assumes, a meter always have the same unit/type - return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys) - - def handle_sample(self, sample_): - if not self.initial_timestamp: - self.initial_timestamp = timeutils.parse_isotime(sample_.timestamp) - - self.aggregated_samples += 1 - key = self._get_unique_key(sample_) - self.counts[key] += 1 - if key not in self.samples: - self.samples[key] = self._convert(sample_) - if self.merged_attribute_policy[ - 'resource_metadata'] == 'drop': - self.samples[key].resource_metadata = {} - else: - if self.timestamp == "last": - self.samples[key].timestamp = sample_.timestamp - if sample_.type == sample.TYPE_CUMULATIVE: - self.samples[key].volume = self._scale(sample_) - else: - self.samples[key].volume += self._scale(sample_) - for field in self.merged_attribute_policy: - if self.merged_attribute_policy[field] == 'last': - setattr(self.samples[key], field, - getattr(sample_, field)) - - def flush(self): - if not self.initial_timestamp: - return [] - - expired = (self.retention_time and - timeutils.is_older_than(self.initial_timestamp, - self.retention_time)) - full = self.size and self.aggregated_samples >= self.size - if full or expired: - x = list(self.samples.values()) - # gauge aggregates need to be averages - for s in x: - if s.type == sample.TYPE_GAUGE: - key = self._get_unique_key(s) - s.volume /= self.counts[key] - self.samples.clear() - self.counts.clear() - self.aggregated_samples = 0 - self.initial_timestamp = None - return x - return [] diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 9031b25855..1f07a83a9c 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -405,6 +405,9 @@ if is_service_enabled ceilometer; then start_ceilometer elif [[ "$1" == "stack" && "$2" == "test-config" ]]; then iniset $TEMPEST_CONFIG telemetry alarm_granularity $CEILOMETER_ALARM_GRANULARITY + iniset $TEMPEST_CONFIG telemetry alarm_threshold 10000000000 + iniset $TEMPEST_CONFIG telemetry alarm_metric_name cpu + iniset $TEMPEST_CONFIG telemetry alarm_aggregation_method rate:mean fi if [[ "$1" == "unstack" ]]; then diff --git a/devstack/settings b/devstack/settings index 410ed8ea9a..22da068ec2 100644 --- a/devstack/settings +++ b/devstack/settings @@ -19,7 +19,11 @@ fi # Gnocchi default archive_policy for Ceilometer # TODO(sileht): when Gnocchi 4.0 is out use the tarball instead GNOCCHI_GIT_PATH=${GNOCCHI_GIT_PATH:-git+https://github.com/gnocchixyz/gnocchi#egg=gnocchi} -GNOCCHI_ARCHIVE_POLICY=${GNOCCHI_ARCHIVE_POLICY:-ceilometer-low} +if [ -n "$GNOCCHI_ARCHIVE_POLICY_TEMPEST" ]; then + GNOCCHI_ARCHIVE_POLICY=$GNOCCHI_ARCHIVE_POLICY_TEMPEST +else + GNOCCHI_ARCHIVE_POLICY=${GNOCCHI_ARCHIVE_POLICY:-ceilometer-low} +fi GNOCCHI_CONF_DIR=${GNOCCHI_CONF_DIR:-/etc/gnocchi} GNOCCHI_CONF=${GNOCCHI_CONF:-${GNOCCHI_CONF_DIR}/gnocchi.conf} GNOCCHI_COORDINATOR_URL=${CEILOMETER_COORDINATOR_URL:-redis://localhost:6379} diff --git a/doc/source/admin/telemetry-measurements.rst b/doc/source/admin/telemetry-measurements.rst index fa269235d2..c30cfa4132 100644 --- a/doc/source/admin/telemetry-measurements.rst +++ b/doc/source/admin/telemetry-measurements.rst @@ -103,14 +103,6 @@ The following meters are collected for OpenStack Compute. | cpu | Cumu\ | ns | instance | Pollster | Libvirt,| CPU time used | | | lative| | ID | | Hyper-V | | +-----------+-------+------+----------+----------+---------+------------------+ -| cpu.delta | Delta | ns | instance | Pollster | Libvirt,| CPU time used s\ | -| | | | ID | | Hyper-V | ince previous d\ | -| | | | | | | atapoint | -+-----------+-------+------+----------+----------+---------+------------------+ -| cpu_util | Gauge | % | instance | Pollster | LibVirt,| Average CPU | -| | | | ID | | vSphere,| utilization | -| | | | | | XenAPI | | -+-----------+-------+------+----------+----------+---------+------------------+ | vcpus | Gauge | vcpu | instance | Notific\ | Libvirt,| Number of virtual| | | | | ID | ation | Hyper-V | CPUs allocated to| | | | | | | | the instance | @@ -118,17 +110,9 @@ The following meters are collected for OpenStack Compute. | disk.read\| Cumul\| req\ | instance | Pollster | Libvirt,| Number of read | | .requests | ative | uest | ID | | Hyper-V | requests | +-----------+-------+------+----------+----------+---------+------------------+ -| disk.read\| Gauge | requ\| instance | Pollster | Libvirt,| Average rate of | -| .requests\| | est/s| ID | | Hyper-V,| read requests | -| .rate | | | | | vSphere | | -+-----------+-------+------+----------+----------+---------+------------------+ | disk.writ\| Cumul\| req\ | instance | Pollster | Libvirt,| Number of write | | e.requests| ative | uest | ID | | Hyper-V | requests | +-----------+-------+------+----------+----------+---------+------------------+ -| disk.writ\| Gauge | requ\| instance | Pollster | Libvirt,| Average rate of | -| e.request\| | est/s| ID | | Hyper-V,| write requests | -| s.rate | | | | | vSphere | | -+-----------+-------+------+----------+----------+---------+------------------+ | disk.read\| Cumu\ | B | instance | Pollster | Libvirt,| Volume of reads | | .bytes | lative| | ID | | Hyper-V | | +-----------+-------+------+----------+----------+---------+------------------+ @@ -149,38 +133,18 @@ The following meters are collected for OpenStack Compute. | ice.read\ | lative| uest | | | Hyper-V | requests | | .requests | | | | | | | +-----------+-------+------+----------+----------+---------+------------------+ -| disk.dev\ | Gauge | requ\| disk ID | Pollster | Libvirt,| Average rate of | -| ice.read\ | | est/s| | | Hyper-V,| read requests | -| .requests\| | | | | vSphere | | -| .rate | | | | | | | -+-----------+-------+------+----------+----------+---------+------------------+ | disk.dev\ | Cumu\ | req\ | disk ID | Pollster | Libvirt,| Number of write | | ice.write\| lative| uest | | | Hyper-V | requests | | .requests | | | | | | | +-----------+-------+------+----------+----------+---------+------------------+ -| disk.dev\ | Gauge | requ\| disk ID | Pollster | Libvirt,| Average rate of | -| ice.write\| | est/s| | | Hyper-V,| write requests | -| .requests\| | | | | vSphere | | -| .rate | | | | | | | -+-----------+-------+------+----------+----------+---------+------------------+ | disk.dev\ | Cumu\ | B | disk ID | Pollster | Libvirt,| Volume of reads | | ice.read\ | lative| | | | Hyper-V | | | .bytes | | | | | | | +-----------+-------+------+----------+----------+---------+------------------+ -| disk.dev\ | Gauge | B/s | disk ID | Pollster | Libvirt,| Average rate of | -| ice.read\ | | | | | Hyper-V,| reads | -| .bytes | | | | | vSphere | | -| .rate | | | | | | | -+-----------+-------+------+----------+----------+---------+------------------+ | disk.dev\ | Cumu\ | B | disk ID | Pollster | Libvirt,| Volume of writes | | ice.write\| lative| | | | Hyper-V | | | .bytes | | | | | | | +-----------+-------+------+----------+----------+---------+------------------+ -| disk.dev\ | Gauge | B/s | disk ID | Pollster | Libvirt,| Average rate of | -| ice.write\| | | | | Hyper-V,| writes | -| .bytes | | | | | vSphere | | -| .rate | | | | | | | -+-----------+-------+------+----------+----------+---------+------------------+ | disk.root\| Gauge | GB | instance | Notific\ | Libvirt,| Size of root disk| | .size | | | ID | ation | Hyper-V | | +-----------+-------+------+----------+----------+---------+------------------+ @@ -236,38 +200,18 @@ The following meters are collected for OpenStack Compute. | incoming.\| lative| | ID | | Hyper-V | incoming bytes | | bytes | | | | | | | +-----------+-------+------+----------+----------+---------+------------------+ -| network.\ | Gauge | B/s | interface| Pollster | Libvirt,| Average rate of | -| incoming.\| | | ID | | Hyper-V,| incoming bytes | -| bytes.rate| | | | | vSphere,| | -| | | | | | XenAPI | | -+-----------+-------+------+----------+----------+---------+------------------+ | network.\ | Cumu\ | B | interface| Pollster | Libvirt,| Number of | | outgoing\ | lative| | ID | | Hyper-V | outgoing bytes | | .bytes | | | | | | | +-----------+-------+------+----------+----------+---------+------------------+ -| network.\ | Gauge | B/s | interface| Pollster | Libvirt,| Average rate of | -| outgoing.\| | | ID | | Hyper-V,| outgoing bytes | -| bytes.rate| | | | | vSphere,| | -| | | | | | XenAPI | | -+-----------+-------+------+----------+----------+---------+------------------+ | network.\ | Cumu\ | pac\ | interface| Pollster | Libvirt,| Number of | | incoming\ | lative| ket | ID | | Hyper-V | incoming packets | | .packets | | | | | | | +-----------+-------+------+----------+----------+---------+------------------+ -| network.\ | Gauge | pack\| interface| Pollster | Libvirt,| Average rate of | -| incoming\ | | et/s | ID | | Hyper-V,| incoming packets | -| .packets\ | | | | | vSphere,| | -| .rate | | | | | XenAPI | | -+-----------+-------+------+----------+----------+---------+------------------+ | network.\ | Cumu\ | pac\ | interface| Pollster | Libvirt,| Number of | | outgoing\ | lative| ket | ID | | Hyper-V | outgoing packets | | .packets | | | | | | | +-----------+-------+------+----------+----------+---------+------------------+ -| network.\ | Gauge | pac\ | interface| Pollster | Libvirt,| Average rate of | -| outgoing\ | | ket/s| ID | | Hyper-V,| outgoing packets | -| .packets\ | | | | | vSphere,| | -| .rate | | | | | XenAPI | | -+-----------+-------+------+----------+----------+---------+------------------+ | **Meters added in the Newton release** | +-----------+-------+------+----------+----------+---------+------------------+ | cpu_l3_c\ | Gauge | B | instance | Pollster | Libvirt | L3 cache used b\ | @@ -354,50 +298,6 @@ The following meters are collected for OpenStack Compute. To enable libvirt ``disk.*`` support when running on RBD-backed shared storage, you need to install libvirt version 1.2.16+. -The Telemetry service supports creating new meters by using transformers, but -this is deprecated and discouraged to use. Among the meters gathered from -libvirt and Hyper-V, there are a few which are derived from other meters. The -list of meters that are created by using the ``rate_of_change`` transformer -from the above table is the following: - -- cpu_util - -- cpu.delta - -- disk.read.requests.rate - -- disk.write.requests.rate - -- disk.read.bytes.rate - -- disk.write.bytes.rate - -- disk.device.read.requests.rate - -- disk.device.write.requests.rate - -- disk.device.read.bytes.rate - -- disk.device.write.bytes.rate - -- network.incoming.bytes.rate - -- network.outgoing.bytes.rate - -- network.incoming.packets.rate - -- network.outgoing.packets.rate - -.. note:: - - If storing data in Gnocchi, derived rate_of_change metrics are also - computed using Gnocchi in addition to Ceilometer transformers. It avoids - missing data when Ceilometer services restart. - To minimize Ceilometer memory requirements transformers can be disabled. - These ``rate_of_change`` meters are deprecated and will be removed in - default Ceilometer configuration in future release. - - OpenStack Compute is capable of collecting ``CPU`` related meters from the compute host machines. In order to use that you need to set the ``compute_monitors`` option to ``cpu.virt_driver`` in the diff --git a/releasenotes/notes/remove-transformers-14e00a789dedd76b.yaml b/releasenotes/notes/remove-transformers-14e00a789dedd76b.yaml new file mode 100644 index 0000000000..d5d09638d3 --- /dev/null +++ b/releasenotes/notes/remove-transformers-14e00a789dedd76b.yaml @@ -0,0 +1,4 @@ +--- +upgrade: + - | + The support for transformers has been removed from the pipeline. diff --git a/setup.cfg b/setup.cfg index ea9cc16200..a65011a317 100644 --- a/setup.cfg +++ b/setup.cfg @@ -222,14 +222,6 @@ ceilometer.compute.virt = ceilometer.hardware.inspectors = snmp = ceilometer.hardware.inspector.snmp:SNMPInspector -ceilometer.transformer = - accumulator = ceilometer.transformer.accumulator:TransformerAccumulator - delta = ceilometer.transformer.conversions:DeltaTransformer - unit_conversion = ceilometer.transformer.conversions:ScalingTransformer - rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer - aggregator = ceilometer.transformer.conversions:AggregatorTransformer - arithmetic = ceilometer.transformer.arithmetic:ArithmeticTransformer - ceilometer.sample.publisher = test = ceilometer.publisher.test:TestPublisher notifier = ceilometer.publisher.messaging:SampleNotifierPublisher