From 75e10b2a4808ee007a8e3d768ae90fa1037a3c11 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Tue, 18 Jul 2017 08:10:38 +0200 Subject: [PATCH] High precision rate of change timedelta The current way to calculate rate of change is not precise at all and depends on the local host clock. So, we have good chance that the host clock derive a bit between each polling. Also the timestamp is polling cycle run and not the exact polled sample. This makes the rate of change transformer not accurate, and maybe wrong if the local clock have jumped to much or if a pollster make to much time to get the stats (libvirt reconnection, ...). A sample gets a new attribute monotonic_time, where we can store an accurate polling time using monotonic.monotonic(). In rate of change transformer, if the monotonic time is available we use to calculate the time delta between samples. For instance metrics, we set monotonic_time as soon as we poll it from libvirt, avoiding almost all precision issue. That makes the rate of change precise to the nanoseconds for polled samples, while keeping the timestamp identical for all samples polled during one cycle. Related-bug: #1527620 Change-Id: I40e14fb6aa595a86df9767be5758f52b7ceafc8f --- ceilometer/compute/pollsters/__init__.py | 18 +++--- ceilometer/compute/pollsters/util.py | 4 +- ceilometer/pipeline.py | 5 +- ceilometer/publisher/utils.py | 1 + ceilometer/sample.py | 3 +- ceilometer/storage/impl_hbase.py | 4 ++ ceilometer/storage/impl_mongodb.py | 7 +++ .../api/v2/test_post_samples_scenarios.py | 7 +++ .../storage/test_storage_scenarios.py | 6 ++ ceilometer/tests/unit/pipeline_base.py | 62 +++++++++++++++++++ ceilometer/transformer/conversions.py | 10 ++- requirements.txt | 1 + 12 files changed, 115 insertions(+), 13 deletions(-) diff --git a/ceilometer/compute/pollsters/__init__.py b/ceilometer/compute/pollsters/__init__.py index b02a78287b..64fa0f8e95 100644 --- a/ceilometer/compute/pollsters/__init__.py +++ b/ceilometer/compute/pollsters/__init__.py @@ -14,6 +14,7 @@ import collections +import monotonic from oslo_log import log from oslo_utils import timeutils @@ -93,13 +94,16 @@ class GenericComputePollster(plugin_base.PollsterBase): if instance.id not in cache[self.inspector_method]: result = getattr(self.inspector, self.inspector_method)( instance, duration) + polled_time = monotonic.monotonic() # Ensure we don't cache an iterator if isinstance(result, collections.Iterable): result = list(result) - cache[self.inspector_method][instance.id] = result + else: + result = [result] + cache[self.inspector_method][instance.id] = (polled_time, result) return cache[self.inspector_method][instance.id] - def _stats_to_sample(self, instance, stats): + def _stats_to_sample(self, instance, stats, polled_time): volume = getattr(stats, self.sample_stats_key) LOG.debug("%(instance_id)s/%(name)s volume: " "%(volume)s" % { @@ -121,21 +125,19 @@ class GenericComputePollster(plugin_base.PollsterBase): volume=volume, additional_metadata=self.get_additional_metadata( instance, stats), + monotonic_time=polled_time, ) def get_samples(self, manager, cache, resources): self._inspection_duration = self._record_poll_time() for instance in resources: try: - result = self._inspect_cached(cache, instance, - self._inspection_duration) + polled_time, result = self._inspect_cached( + cache, instance, self._inspection_duration) if not result: continue - - if not isinstance(result, collections.Iterable): - result = [result] for stats in self.aggregate_method(result): - yield self._stats_to_sample(instance, stats) + yield self._stats_to_sample(instance, stats, polled_time) except NoVolumeException: # FIXME(sileht): This should be a removed... but I will # not change the test logic for now diff --git a/ceilometer/compute/pollsters/util.py b/ceilometer/compute/pollsters/util.py index 11f39ffec3..a0ac83b741 100644 --- a/ceilometer/compute/pollsters/util.py +++ b/ceilometer/compute/pollsters/util.py @@ -76,7 +76,8 @@ def _get_metadata_from_object(conf, instance): def make_sample_from_instance(conf, instance, name, type, unit, volume, - resource_id=None, additional_metadata=None): + resource_id=None, additional_metadata=None, + monotonic_time=None): additional_metadata = additional_metadata or {} resource_metadata = _get_metadata_from_object(conf, instance) resource_metadata.update(additional_metadata) @@ -89,6 +90,7 @@ def make_sample_from_instance(conf, instance, name, type, unit, volume, project_id=instance.tenant_id, resource_id=resource_id or instance.id, resource_metadata=resource_metadata, + monotonic_time=monotonic_time, ) diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 0d64dc9286..37c8209033 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -97,7 +97,10 @@ class SamplePipelineEndpoint(PipelineEndpoint): resource_id=s['resource_id'], timestamp=s['timestamp'], resource_metadata=s['resource_metadata'], - source=s.get('source')) + source=s.get('source'), + # NOTE(sileht): May come from an older node, + # Put None in this case. + monotonic_time=s.get('monotonic_time')) for s in samples if publisher_utils.verify_signature( s, self.conf.publisher.telemetry_secret) ] diff --git a/ceilometer/publisher/utils.py b/ceilometer/publisher/utils.py index 6db55a4f23..b49f0220b8 100644 --- a/ceilometer/publisher/utils.py +++ b/ceilometer/publisher/utils.py @@ -98,6 +98,7 @@ def meter_message_from_counter(sample, secret): 'timestamp': sample.timestamp, 'resource_metadata': sample.resource_metadata, 'message_id': sample.id, + 'monotonic_time': sample.monotonic_time, } msg['message_signature'] = compute_signature(msg, secret) return msg diff --git a/ceilometer/sample.py b/ceilometer/sample.py index d5318822c3..ed780127e7 100644 --- a/ceilometer/sample.py +++ b/ceilometer/sample.py @@ -95,7 +95,7 @@ class Sample(object): def __init__(self, name, type, unit, volume, user_id, project_id, resource_id, timestamp=None, resource_metadata=None, - source=None, id=None): + source=None, id=None, monotonic_time=None): self.name = name self.type = type self.unit = unit @@ -107,6 +107,7 @@ class Sample(object): self.resource_metadata = resource_metadata or {} self.source = source or self.SOURCE_DEFAULT self.id = id or str(uuid.uuid1()) + self.monotonic_time = monotonic_time def as_dict(self): return copy.copy(self.__dict__) diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index 494db88492..aafaff6c80 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -143,6 +143,10 @@ class Connection(hbase_base.Connection, base.Connection): :param data: a dictionary such as returned by ceilometer.publisher.utils.meter_message_from_counter """ + + # We must not record thing. + data.pop("monotonic_time", None) + with self.conn_pool.connection() as conn: resource_table = conn.table(self.RESOURCE_TABLE) meter_table = conn.table(self.METER_TABLE) diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index 7301ffab1c..2c3353e7f6 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -259,6 +259,13 @@ class Connection(pymongo_base.Connection): # unconditionally insert sample timestamps and resource metadata # (in the update case, this must be conditional on the sample not # being out-of-order) + + # We must not store this + samples = copy.deepcopy(samples) + + for sample in samples: + sample.pop("monotonic_time", None) + sorted_samples = sorted( copy.deepcopy(samples), key=lambda s: (s['resource_id'], s['timestamp'])) diff --git a/ceilometer/tests/functional/api/v2/test_post_samples_scenarios.py b/ceilometer/tests/functional/api/v2/test_post_samples_scenarios.py index a0a66beda8..fe6ba01d6a 100644 --- a/ceilometer/tests/functional/api/v2/test_post_samples_scenarios.py +++ b/ceilometer/tests/functional/api/v2/test_post_samples_scenarios.py @@ -80,6 +80,7 @@ class TestPostSamples(v2.FunctionalTest): s1[0]['source'] = '%s:openstack' % s1[0]['project_id'] self.assertEqual(s1, data.json) + s1[0]["monotonic_time"] = None self.assertEqual(s1[0], self.published[0][0]) def test_nested_metadata(self): @@ -107,6 +108,7 @@ class TestPostSamples(v2.FunctionalTest): unwound['resource_metadata'] = {'nest': {'name1': 'value1', 'name2': 'value3'}, 'name2': 'value2'} + unwound["monotonic_time"] = None # only the published sample should be unwound, not the representation # in the API response self.assertEqual(s1[0], data.json[0]) @@ -218,6 +220,7 @@ class TestPostSamples(v2.FunctionalTest): msg['timestamp'] = timestamp.replace(tzinfo=None).isoformat() self.assertEqual(s, c) + s["monotonic_time"] = None self.assertEqual(s, self.published[0][x]) def test_missing_mandatory_fields(self): @@ -278,6 +281,7 @@ class TestPostSamples(v2.FunctionalTest): s['timestamp'] = data.json[x]['timestamp'] s.setdefault('resource_metadata', dict()) self.assertEqual(s, data.json[x]) + s['monotonic_time'] = None self.assertEqual(s, self.published[0][x]) def test_multiple_samples_multiple_sources(self): @@ -328,6 +332,7 @@ class TestPostSamples(v2.FunctionalTest): s['timestamp'] = data.json[x]['timestamp'] s.setdefault('resource_metadata', dict()) self.assertEqual(s, data.json[x]) + s['monotonic_time'] = None self.assertEqual(s, self.published[0][x]) def test_missing_project_user_id(self): @@ -364,4 +369,6 @@ class TestPostSamples(v2.FunctionalTest): s['project_id'] = project_id self.assertEqual(s, data.json[x]) + + s['monotonic_time'] = None self.assertEqual(s, self.published[0][x]) diff --git a/ceilometer/tests/functional/storage/test_storage_scenarios.py b/ceilometer/tests/functional/storage/test_storage_scenarios.py index d6f26aaabc..4c7952353c 100644 --- a/ceilometer/tests/functional/storage/test_storage_scenarios.py +++ b/ceilometer/tests/functional/storage/test_storage_scenarios.py @@ -430,6 +430,7 @@ class RawSampleTest(DBTestBase): d = meter.as_dict() self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at']) del d['recorded_at'] + d['monotonic_time'] = None self.assertIn(d, self.msgs[:3]) def test_get_samples_by_user_limit(self): @@ -450,6 +451,7 @@ class RawSampleTest(DBTestBase): d = meter.as_dict() self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at']) del d['recorded_at'] + d['monotonic_time'] = None self.assertIn(d, self.msgs[:4]) def test_get_samples_by_resource(self): @@ -459,6 +461,7 @@ class RawSampleTest(DBTestBase): d = results[1].as_dict() self.assertEqual(timeutils.utcnow(), d['recorded_at']) del d['recorded_at'] + d['monotonic_time'] = None self.assertEqual(self.msgs[0], d) def test_get_samples_by_metaquery(self): @@ -470,6 +473,7 @@ class RawSampleTest(DBTestBase): d = meter.as_dict() self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at']) del d['recorded_at'] + d['monotonic_time'] = None self.assertIn(d, self.msgs) def test_get_samples_by_metaquery_key_with_dot_in_metadata(self): @@ -721,6 +725,7 @@ class ComplexSampleQueryTest(DBTestBase): for sample_item in results: d = sample_item.as_dict() del d['recorded_at'] + d['monotonic_time'] = None self.assertIn(d, self.msgs) def test_query_complex_filter_with_regexp(self): @@ -2787,6 +2792,7 @@ class TestBatchRecordingMetering(tests_db.TestBase): for sample_item in results: d = sample_item.as_dict() del d['recorded_at'] + d['monotonic_time'] = None self.assertIn(d, self.sample_dicts) resources = list(self.conn.get_resources()) diff --git a/ceilometer/tests/unit/pipeline_base.py b/ceilometer/tests/unit/pipeline_base.py index 2a69fcd723..7c82fc687b 100644 --- a/ceilometer/tests/unit/pipeline_base.py +++ b/ceilometer/tests/unit/pipeline_base.py @@ -22,6 +22,7 @@ import unittest import fixtures import mock +import monotonic from oslo_utils import timeutils import six from stevedore import extension @@ -1097,6 +1098,67 @@ class BasePipelineTestCase(base.BaseTestCase): pipe.flush() self.assertEqual(0, len(publisher.samples)) + def test_rate_of_change_precision(self): + s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" + transformer_cfg = [ + { + 'name': 'rate_of_change', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_util', + 'unit': '%', + 'type': sample.TYPE_GAUGE, + 'scale': s} + } + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('meters', ['cpu']) + pipeline_manager = pipeline.PipelineManager( + self.CONF, + self.cfg2file(self.pipeline_cfg), self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + now = timeutils.utcnow() + now_time = monotonic.monotonic() + # Simulate a laggy poller + later = now + datetime.timedelta(seconds=12345) + later_time = now_time + 10 + + counters = [ + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=125000000000, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=now.isoformat(), + monotonic_time=now_time, + resource_metadata={'cpu_number': 4} + ), + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=165000000000, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=later.isoformat(), + monotonic_time=later_time, + resource_metadata={'cpu_number': 4} + ), + ] + + pipe.publish_data(counters) + publisher = pipe.publishers[0] + self.assertEqual(1, len(publisher.samples)) + + cpu_util_sample = publisher.samples[0] + self.assertEqual(100, cpu_util_sample.volume) + def test_rate_of_change_max(self): s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" transformer_cfg = [ diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index 7bbc505b77..4614528bce 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -187,12 +187,18 @@ class RateOfChangeTransformer(ScalingTransformer): key = s.name + s.resource_id prev = self.cache.get(key) timestamp = timeutils.parse_isotime(s.timestamp) - self.cache[key] = (s.volume, timestamp) + self.cache[key] = (s.volume, timestamp, s.monotonic_time) if prev: prev_volume = prev[0] prev_timestamp = prev[1] - time_delta = timeutils.delta_seconds(prev_timestamp, timestamp) + prev_monotonic_time = prev[2] + if (prev_monotonic_time is not None and + s.monotonic_time is not None): + # NOTE(sileht): Prefer high precision timer + time_delta = s.monotonic_time - prev_monotonic_time + else: + time_delta = timeutils.delta_seconds(prev_timestamp, timestamp) # disallow violations of the arrow of time if time_delta < 0: LOG.warning(_('dropping out of time order sample: %s'), (s,)) diff --git a/requirements.txt b/requirements.txt index 51346d0489..b714a91842 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,7 @@ jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT kafka-python>=1.3.2 # Apache-2.0 keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0 lxml>=2.3 # BSD +monotonic msgpack-python>=0.4.0 # Apache-2.0 oslo.concurrency>=3.5.0 # Apache-2.0 oslo.config>=3.22.0 # Apache-2.0