Merge "High precision rate of change timedelta"

This commit is contained in:
Jenkins 2017-07-21 12:41:20 +00:00 committed by Gerrit Code Review
commit 239cb3dbf6
12 changed files with 115 additions and 13 deletions

View File

@ -14,6 +14,7 @@
import collections
import monotonic
from oslo_log import log
from oslo_utils import timeutils
@ -93,13 +94,16 @@ class GenericComputePollster(plugin_base.PollsterBase):
if instance.id not in cache[self.inspector_method]:
result = getattr(self.inspector, self.inspector_method)(
instance, duration)
polled_time = monotonic.monotonic()
# Ensure we don't cache an iterator
if isinstance(result, collections.Iterable):
result = list(result)
cache[self.inspector_method][instance.id] = result
else:
result = [result]
cache[self.inspector_method][instance.id] = (polled_time, result)
return cache[self.inspector_method][instance.id]
def _stats_to_sample(self, instance, stats):
def _stats_to_sample(self, instance, stats, polled_time):
volume = getattr(stats, self.sample_stats_key)
LOG.debug("%(instance_id)s/%(name)s volume: "
"%(volume)s" % {
@ -121,21 +125,19 @@ class GenericComputePollster(plugin_base.PollsterBase):
volume=volume,
additional_metadata=self.get_additional_metadata(
instance, stats),
monotonic_time=polled_time,
)
def get_samples(self, manager, cache, resources):
self._inspection_duration = self._record_poll_time()
for instance in resources:
try:
result = self._inspect_cached(cache, instance,
self._inspection_duration)
polled_time, result = self._inspect_cached(
cache, instance, self._inspection_duration)
if not result:
continue
if not isinstance(result, collections.Iterable):
result = [result]
for stats in self.aggregate_method(result):
yield self._stats_to_sample(instance, stats)
yield self._stats_to_sample(instance, stats, polled_time)
except NoVolumeException:
# FIXME(sileht): This should be a removed... but I will
# not change the test logic for now

View File

@ -76,7 +76,8 @@ def _get_metadata_from_object(conf, instance):
def make_sample_from_instance(conf, instance, name, type, unit, volume,
resource_id=None, additional_metadata=None):
resource_id=None, additional_metadata=None,
monotonic_time=None):
additional_metadata = additional_metadata or {}
resource_metadata = _get_metadata_from_object(conf, instance)
resource_metadata.update(additional_metadata)
@ -89,6 +90,7 @@ def make_sample_from_instance(conf, instance, name, type, unit, volume,
project_id=instance.tenant_id,
resource_id=resource_id or instance.id,
resource_metadata=resource_metadata,
monotonic_time=monotonic_time,
)

View File

@ -97,7 +97,10 @@ class SamplePipelineEndpoint(PipelineEndpoint):
resource_id=s['resource_id'],
timestamp=s['timestamp'],
resource_metadata=s['resource_metadata'],
source=s.get('source'))
source=s.get('source'),
# NOTE(sileht): May come from an older node,
# Put None in this case.
monotonic_time=s.get('monotonic_time'))
for s in samples if publisher_utils.verify_signature(
s, self.conf.publisher.telemetry_secret)
]

View File

@ -98,6 +98,7 @@ def meter_message_from_counter(sample, secret):
'timestamp': sample.timestamp,
'resource_metadata': sample.resource_metadata,
'message_id': sample.id,
'monotonic_time': sample.monotonic_time,
}
msg['message_signature'] = compute_signature(msg, secret)
return msg

View File

@ -95,7 +95,7 @@ class Sample(object):
def __init__(self, name, type, unit, volume, user_id, project_id,
resource_id, timestamp=None, resource_metadata=None,
source=None, id=None):
source=None, id=None, monotonic_time=None):
self.name = name
self.type = type
self.unit = unit
@ -107,6 +107,7 @@ class Sample(object):
self.resource_metadata = resource_metadata or {}
self.source = source or self.SOURCE_DEFAULT
self.id = id or str(uuid.uuid1())
self.monotonic_time = monotonic_time
def as_dict(self):
return copy.copy(self.__dict__)

View File

@ -143,6 +143,10 @@ class Connection(hbase_base.Connection, base.Connection):
:param data: a dictionary such as returned by
ceilometer.publisher.utils.meter_message_from_counter
"""
# We must not record thing.
data.pop("monotonic_time", None)
with self.conn_pool.connection() as conn:
resource_table = conn.table(self.RESOURCE_TABLE)
meter_table = conn.table(self.METER_TABLE)

View File

@ -259,6 +259,13 @@ class Connection(pymongo_base.Connection):
# unconditionally insert sample timestamps and resource metadata
# (in the update case, this must be conditional on the sample not
# being out-of-order)
# We must not store this
samples = copy.deepcopy(samples)
for sample in samples:
sample.pop("monotonic_time", None)
sorted_samples = sorted(
copy.deepcopy(samples),
key=lambda s: (s['resource_id'], s['timestamp']))

View File

@ -80,6 +80,7 @@ class TestPostSamples(v2.FunctionalTest):
s1[0]['source'] = '%s:openstack' % s1[0]['project_id']
self.assertEqual(s1, data.json)
s1[0]["monotonic_time"] = None
self.assertEqual(s1[0], self.published[0][0])
def test_nested_metadata(self):
@ -107,6 +108,7 @@ class TestPostSamples(v2.FunctionalTest):
unwound['resource_metadata'] = {'nest': {'name1': 'value1',
'name2': 'value3'},
'name2': 'value2'}
unwound["monotonic_time"] = None
# only the published sample should be unwound, not the representation
# in the API response
self.assertEqual(s1[0], data.json[0])
@ -218,6 +220,7 @@ class TestPostSamples(v2.FunctionalTest):
msg['timestamp'] = timestamp.replace(tzinfo=None).isoformat()
self.assertEqual(s, c)
s["monotonic_time"] = None
self.assertEqual(s, self.published[0][x])
def test_missing_mandatory_fields(self):
@ -278,6 +281,7 @@ class TestPostSamples(v2.FunctionalTest):
s['timestamp'] = data.json[x]['timestamp']
s.setdefault('resource_metadata', dict())
self.assertEqual(s, data.json[x])
s['monotonic_time'] = None
self.assertEqual(s, self.published[0][x])
def test_multiple_samples_multiple_sources(self):
@ -328,6 +332,7 @@ class TestPostSamples(v2.FunctionalTest):
s['timestamp'] = data.json[x]['timestamp']
s.setdefault('resource_metadata', dict())
self.assertEqual(s, data.json[x])
s['monotonic_time'] = None
self.assertEqual(s, self.published[0][x])
def test_missing_project_user_id(self):
@ -364,4 +369,6 @@ class TestPostSamples(v2.FunctionalTest):
s['project_id'] = project_id
self.assertEqual(s, data.json[x])
s['monotonic_time'] = None
self.assertEqual(s, self.published[0][x])

View File

@ -430,6 +430,7 @@ class RawSampleTest(DBTestBase):
d = meter.as_dict()
self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at'])
del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.msgs[:3])
def test_get_samples_by_user_limit(self):
@ -450,6 +451,7 @@ class RawSampleTest(DBTestBase):
d = meter.as_dict()
self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at'])
del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.msgs[:4])
def test_get_samples_by_resource(self):
@ -459,6 +461,7 @@ class RawSampleTest(DBTestBase):
d = results[1].as_dict()
self.assertEqual(timeutils.utcnow(), d['recorded_at'])
del d['recorded_at']
d['monotonic_time'] = None
self.assertEqual(self.msgs[0], d)
def test_get_samples_by_metaquery(self):
@ -470,6 +473,7 @@ class RawSampleTest(DBTestBase):
d = meter.as_dict()
self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at'])
del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.msgs)
def test_get_samples_by_metaquery_key_with_dot_in_metadata(self):
@ -721,6 +725,7 @@ class ComplexSampleQueryTest(DBTestBase):
for sample_item in results:
d = sample_item.as_dict()
del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.msgs)
def test_query_complex_filter_with_regexp(self):
@ -2787,6 +2792,7 @@ class TestBatchRecordingMetering(tests_db.TestBase):
for sample_item in results:
d = sample_item.as_dict()
del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.sample_dicts)
resources = list(self.conn.get_resources())

View File

@ -22,6 +22,7 @@ import unittest
import fixtures
import mock
import monotonic
from oslo_utils import timeutils
import six
from stevedore import extension
@ -1097,6 +1098,67 @@ class BasePipelineTestCase(base.BaseTestCase):
pipe.flush()
self.assertEqual(0, len(publisher.samples))
def test_rate_of_change_precision(self):
s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
transformer_cfg = [
{
'name': 'rate_of_change',
'parameters': {
'source': {},
'target': {'name': 'cpu_util',
'unit': '%',
'type': sample.TYPE_GAUGE,
'scale': s}
}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('meters', ['cpu'])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
now = timeutils.utcnow()
now_time = monotonic.monotonic()
# Simulate a laggy poller
later = now + datetime.timedelta(seconds=12345)
later_time = now_time + 10
counters = [
sample.Sample(
name='cpu',
type=sample.TYPE_CUMULATIVE,
volume=125000000000,
unit='ns',
user_id='test_user',
project_id='test_proj',
resource_id='test_resource',
timestamp=now.isoformat(),
monotonic_time=now_time,
resource_metadata={'cpu_number': 4}
),
sample.Sample(
name='cpu',
type=sample.TYPE_CUMULATIVE,
volume=165000000000,
unit='ns',
user_id='test_user',
project_id='test_proj',
resource_id='test_resource',
timestamp=later.isoformat(),
monotonic_time=later_time,
resource_metadata={'cpu_number': 4}
),
]
pipe.publish_data(counters)
publisher = pipe.publishers[0]
self.assertEqual(1, len(publisher.samples))
cpu_util_sample = publisher.samples[0]
self.assertEqual(100, cpu_util_sample.volume)
def test_rate_of_change_max(self):
s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
transformer_cfg = [

View File

@ -187,12 +187,18 @@ class RateOfChangeTransformer(ScalingTransformer):
key = s.name + s.resource_id
prev = self.cache.get(key)
timestamp = timeutils.parse_isotime(s.timestamp)
self.cache[key] = (s.volume, timestamp)
self.cache[key] = (s.volume, timestamp, s.monotonic_time)
if prev:
prev_volume = prev[0]
prev_timestamp = prev[1]
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
prev_monotonic_time = prev[2]
if (prev_monotonic_time is not None and
s.monotonic_time is not None):
# NOTE(sileht): Prefer high precision timer
time_delta = s.monotonic_time - prev_monotonic_time
else:
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
# disallow violations of the arrow of time
if time_delta < 0:
LOG.warning(_('dropping out of time order sample: %s'), (s,))

View File

@ -12,6 +12,7 @@ jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
kafka-python>=1.3.2 # Apache-2.0
keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0
lxml>=2.3 # BSD
monotonic
msgpack-python>=0.4.0 # Apache-2.0
oslo.concurrency>=3.5.0 # Apache-2.0
oslo.config>=3.22.0 # Apache-2.0