Changes aggregator transformer to allow retention_time w/o size

When using an aggregator transformer in pipeline.yaml that
looks like this:
---
transformers:
    - name: "aggregator"
      parameters:
          retention_time: 60
          resource_metadata: last
---
'size' will automatically be defaulted to 1. That means that
aggregation will not happen as expected with a retention_time of
60 seconds.

This fix will only default sample size to 1 if neither
retention_time or size are defined.

Change-Id: I4a3aa0f6de26173e6f9383d570ff2cf13d367e38
Closes-Bug: 1531626
(cherry picked from commit 5a9a41563e)
This commit is contained in:
Kevin McDonald 2016-01-06 15:52:55 -06:00 committed by gordon chung
parent cd2065df67
commit fd54d10f6d
4 changed files with 93 additions and 10 deletions

View File

@ -1370,15 +1370,6 @@ class BasePipelineTestCase(base.BaseTestCase):
actual = sorted(s.volume for s in publisher.samples)
self.assertEqual([2.0, 3.0, 6.0], actual)
def test_aggregator_input_validation(self):
aggregator = conversions.AggregatorTransformer("1", "15", None,
None, None)
self.assertEqual(1, aggregator.size)
self.assertEqual(15, aggregator.retention_time)
self.assertRaises(ValueError, conversions.AggregatorTransformer,
"abc", "cde", None, None, None)
def test_aggregator_metadata(self):
for conf, expected_version in [('last', '2.0'), ('first', '1.0')]:
samples = self._do_test_aggregator({

View File

@ -0,0 +1,89 @@
#
# Copyright 2016 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo_context import context
from oslo_utils import timeutils
from oslotest import base
from ceilometer import sample
from ceilometer.transformer import conversions
class AggregatorTransformerTestCase(base.BaseTestCase):
SAMPLE = sample.Sample(
name='cpu',
type=sample.TYPE_CUMULATIVE,
unit='ns',
volume='1234567',
user_id='56c5692032f34041900342503fecab30',
project_id='ac9494df2d9d4e709bac378cceabaf23',
resource_id='1ca738a1-c49c-4401-8346-5c60ebdb03f4',
timestamp="2015-10-29 14:12:15.485877+00:00",
resource_metadata={}
)
def setUp(self):
super(AggregatorTransformerTestCase, self).setUp()
self._sample_offset = 0
def test_init_input_validation(self):
aggregator = conversions.AggregatorTransformer("2", "15", None,
None, None)
self.assertEqual(2, aggregator.size)
self.assertEqual(15, aggregator.retention_time)
def test_init_no_size_or_rention_time(self):
aggregator = conversions.AggregatorTransformer()
self.assertEqual(1, aggregator.size)
self.assertEqual(None, aggregator.retention_time)
def test_init_size_zero(self):
aggregator = conversions.AggregatorTransformer(size="0")
self.assertEqual(1, aggregator.size)
self.assertEqual(None, aggregator.retention_time)
def test_init_input_validation_size_invalid(self):
self.assertRaises(ValueError, conversions.AggregatorTransformer,
"abc", "15", None, None, None)
def test_init_input_validation_retention_time_invalid(self):
self.assertRaises(ValueError, conversions.AggregatorTransformer,
"2", "abc", None, None, None)
def test_size_unbounded(self):
aggregator = conversions.AggregatorTransformer(size="0",
retention_time="300")
self._insert_sample_data(aggregator)
samples = aggregator.flush(context.get_admin_context())
self.assertEqual([], samples)
def test_size_bounded(self):
aggregator = conversions.AggregatorTransformer(size="100")
self._insert_sample_data(aggregator)
samples = aggregator.flush(context.get_admin_context())
self.assertEqual(100, len(samples))
def _insert_sample_data(self, aggregator):
for _ in range(100):
sample = copy.copy(self.SAMPLE)
sample.resource_id = sample.resource_id + str(self._sample_offset)
sample.timestamp = timeutils.isotime()
aggregator.handle_sample(context.get_admin_context(), sample)
self._sample_offset += 1

View File

@ -247,6 +247,9 @@ class AggregatorTransformer(ScalingTransformer):
self.counts = collections.defaultdict(int)
self.size = int(size) if size else None
self.retention_time = float(retention_time) if retention_time else None
if not (self.size or self.retention_time):
self.size = 1
self.initial_timestamp = None
self.aggregated_samples = 0
@ -309,7 +312,7 @@ class AggregatorTransformer(ScalingTransformer):
expired = (self.retention_time and
timeutils.is_older_than(self.initial_timestamp,
self.retention_time))
full = self.aggregated_samples >= self.size
full = self.size and self.aggregated_samples >= self.size
if full or expired:
x = list(self.samples.values())
# gauge aggregates need to be averages