diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index adcc9695..7273752f 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -61,6 +61,7 @@ class BasePipelineTestCase(base.BaseTestCase): 'unit_conversion': conversions.ScalingTransformer, 'rate_of_change': conversions.RateOfChangeTransformer, 'arithmetic': arithmetic.ArithmeticTransformer, + 'delta': conversions.DeltaTransformer, } if name in class_name_ext: @@ -1897,6 +1898,164 @@ class BasePipelineTestCase(base.BaseTestCase): publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(0, len(publisher.samples)) + def _do_test_delta(self, data, expected, growth_only=False): + transformer_cfg = [ + { + 'name': 'delta', + 'parameters': { + 'target': {'name': 'new_meter'}, + 'growth_only': growth_only, + } + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', ['cpu']) + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + pipe.publish_data(None, data) + pipe.flush(None) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(expected, len(publisher.samples)) + return publisher.samples + + def test_delta_transformer(self): + samples = [ + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=26, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '1.0'} + ), + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=16, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '2.0'} + ), + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=53, + unit='ns', + user_id='test_user_bis', + project_id='test_proj_bis', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '1.0'} + ), + ] + deltas = self._do_test_delta(samples, 2) + self.assertEqual('new_meter', deltas[0].name) + self.assertEqual('delta', deltas[0].type) + self.assertEqual('ns', deltas[0].unit) + self.assertEqual({'version': '2.0'}, deltas[0].resource_metadata) + self.assertEqual(-10, deltas[0].volume) + self.assertEqual('new_meter', deltas[1].name) + self.assertEqual('delta', deltas[1].type) + self.assertEqual('ns', deltas[1].unit) + self.assertEqual({'version': '1.0'}, deltas[1].resource_metadata) + self.assertEqual(37, deltas[1].volume) + + def test_delta_transformer_out_of_order(self): + samples = [ + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=26, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '1.0'} + ), + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=16, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=((timeutils.utcnow() - datetime.timedelta(minutes=5)) + .isoformat()), + resource_metadata={'version': '2.0'} + ), + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=53, + unit='ns', + user_id='test_user_bis', + project_id='test_proj_bis', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '1.0'} + ), + ] + deltas = self._do_test_delta(samples, 1) + self.assertEqual('new_meter', deltas[0].name) + self.assertEqual('delta', deltas[0].type) + self.assertEqual('ns', deltas[0].unit) + self.assertEqual({'version': '1.0'}, deltas[0].resource_metadata) + self.assertEqual(27, deltas[0].volume) + + def test_delta_transformer_growth_only(self): + samples = [ + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=26, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '1.0'} + ), + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=16, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '2.0'} + ), + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=53, + unit='ns', + user_id='test_user_bis', + project_id='test_proj_bis', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '1.0'} + ), + ] + deltas = self._do_test_delta(samples, 1, True) + self.assertEqual('new_meter', deltas[0].name) + self.assertEqual('delta', deltas[0].type) + self.assertEqual('ns', deltas[0].unit) + self.assertEqual({'version': '1.0'}, deltas[0].resource_metadata) + self.assertEqual(37, deltas[0].volume) + def test_unique_pipeline_names(self): self._dup_pipeline_name_cfg() self._exception_create_pipelinemanager() diff --git a/ceilometer/tests/unit/test_decoupled_pipeline.py b/ceilometer/tests/unit/test_decoupled_pipeline.py index 640856a9..ac9a6d1d 100644 --- a/ceilometer/tests/unit/test_decoupled_pipeline.py +++ b/ceilometer/tests/unit/test_decoupled_pipeline.py @@ -229,42 +229,42 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): def test_rate_of_change_boilerplate_disk_read_cfg(self): meters = ('disk.read.bytes', 'disk.read.requests') units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, meters, units) def test_rate_of_change_boilerplate_disk_write_cfg(self): meters = ('disk.write.bytes', 'disk.write.requests') units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, meters, units) def test_rate_of_change_boilerplate_network_incoming_cfg(self): meters = ('network.incoming.bytes', 'network.incoming.packets') units = ('B', 'packet') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4, meters, units) def test_rate_of_change_boilerplate_per_disk_device_read_cfg(self): meters = ('disk.device.read.bytes', 'disk.device.read.requests') units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, meters, units) def test_rate_of_change_boilerplate_per_disk_device_write_cfg(self): meters = ('disk.device.write.bytes', 'disk.device.write.requests') units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, meters, units) def test_rate_of_change_boilerplate_network_outgoing_cfg(self): meters = ('network.outgoing.bytes', 'network.outgoing.packets') units = ('B', 'packet') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4, meters, units) diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index 44e6f54c..a9b25dfa 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -20,15 +20,15 @@ from oslo_log import log from oslo_utils import timeutils import six -from ceilometer.i18n import _ +from ceilometer.i18n import _, _LW from ceilometer import sample from ceilometer import transformer LOG = log.getLogger(__name__) -class ScalingTransformer(transformer.TransformerBase): - """Transformer to apply a scaling conversion.""" +class BaseConversionTransformer(transformer.TransformerBase): + """Transformer to derive conversion.""" grouping_keys = ['resource_id'] @@ -44,22 +44,7 @@ class ScalingTransformer(transformer.TransformerBase): target = target or {} self.source = source self.target = target - self.scale = target.get('scale') - LOG.debug('scaling conversion transformer with source:' - ' %(source)s target: %(target)s:', {'source': source, - 'target': target}) - super(ScalingTransformer, self).__init__(**kwargs) - - def _scale(self, s): - """Apply the scaling factor. - - Either a straight multiplicative factor or else a string to be eval'd. - """ - ns = transformer.Namespace(s.as_dict()) - - scale = self.scale - return ((eval(scale, {}, ns) if isinstance(scale, six.string_types) - else s.volume * scale) if scale else s.volume) + super(BaseConversionTransformer, self).__init__(**kwargs) def _map(self, s, attr): """Apply the name or unit mapping if configured.""" @@ -74,6 +59,93 @@ class ScalingTransformer(transformer.TransformerBase): pass return mapped or self.target.get(attr, getattr(s, attr)) + +class DeltaTransformer(BaseConversionTransformer): + """Transformer based on the delta of a sample volume.""" + + def __init__(self, target=None, growth_only=False, **kwargs): + """Initialize transformer with configured parameters. + + :param growth_only: capture only positive deltas + """ + super(DeltaTransformer, self).__init__(target=target, **kwargs) + self.growth_only = growth_only + self.cache = {} + + def handle_sample(self, context, s): + """Handle a sample, converting if necessary.""" + key = s.name + s.resource_id + prev = self.cache.get(key) + timestamp = timeutils.parse_isotime(s.timestamp) + self.cache[key] = (s.volume, timestamp) + + if prev: + prev_volume = prev[0] + prev_timestamp = prev[1] + time_delta = timeutils.delta_seconds(prev_timestamp, timestamp) + # disallow violations of the arrow of time + if time_delta < 0: + LOG.warn(_LW('Dropping out of time order sample: %s'), (s,)) + # Reset the cache to the newer sample. + self.cache[key] = prev + return None + volume_delta = s.volume - prev_volume + if self.growth_only and volume_delta < 0: + LOG.warn(_LW('Negative delta detected, dropping value')) + s = None + else: + s = self._convert(s, volume_delta) + LOG.debug('Converted to: %s', s) + else: + LOG.warn(_LW('Dropping sample with no predecessor: %s'), + (s,)) + s = None + return s + + def _convert(self, s, delta): + """Transform the appropriate sample fields.""" + return sample.Sample( + name=self._map(s, 'name'), + unit=s.unit, + type=sample.TYPE_DELTA, + volume=delta, + user_id=s.user_id, + project_id=s.project_id, + resource_id=s.resource_id, + timestamp=s.timestamp, + resource_metadata=s.resource_metadata + ) + + +class ScalingTransformer(BaseConversionTransformer): + """Transformer to apply a scaling conversion.""" + + def __init__(self, source=None, target=None, **kwargs): + """Initialize transformer with configured parameters. + + :param source: dict containing source sample unit + :param target: dict containing target sample name, type, + unit and scaling factor (a missing value + connotes no change) + """ + super(ScalingTransformer, self).__init__(source=source, target=target, + **kwargs) + self.scale = self.target.get('scale') + LOG.debug('scaling conversion transformer with source:' + ' %(source)s target: %(target)s:', {'source': self.source, + 'target': self.target}) + + def _scale(self, s): + """Apply the scaling factor. + + Either a straight multiplicative factor or else a string to be eval'd. + """ + ns = transformer.Namespace(s.as_dict()) + + scale = self.scale + return ((eval(scale, {}, ns) if isinstance(scale, six.string_types) + else s.volume * scale) if scale else s.volume) + def _convert(self, s, growth=1): """Transform the appropriate sample fields.""" return sample.Sample( diff --git a/etc/ceilometer/pipeline.yaml b/etc/ceilometer/pipeline.yaml index 12b45f2e..a5bd5148 100644 --- a/etc/ceilometer/pipeline.yaml +++ b/etc/ceilometer/pipeline.yaml @@ -12,6 +12,7 @@ sources: - "cpu" sinks: - cpu_sink + - cpu_delta_sink - name: disk_source interval: 600 meters: @@ -50,6 +51,15 @@ sinks: scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" publishers: - notifier:// + - name: cpu_delta_sink + transformers: + - name: "delta" + parameters: + target: + name: "cpu.delta" + growth_only: True + publishers: + - notifier:// - name: disk_sink transformers: - name: "rate_of_change" diff --git a/setup.cfg b/setup.cfg index ee474ab1..a083c233 100644 --- a/setup.cfg +++ b/setup.cfg @@ -220,6 +220,7 @@ ceilometer.hardware.inspectors = ceilometer.transformer = accumulator = ceilometer.transformer.accumulator:TransformerAccumulator + delta = ceilometer.transformer.conversions:DeltaTransformer unit_conversion = ceilometer.transformer.conversions:ScalingTransformer rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer aggregator = ceilometer.transformer.conversions:AggregatorTransformer