High precision rate of change timedelta

The current way to calculate rate of change is not precise at all and
depends on the local host clock. So, we have good chance that the host
clock derive a bit between each polling. Also the timestamp is polling
cycle run and not the exact polled sample.

This makes the rate of change transformer not accurate, and maybe wrong
if the local clock have jumped to much or if a pollster make to much
time to get the stats (libvirt reconnection, ...).

A sample gets a new attribute monotonic_time, where we can store an
accurate polling time using monotonic.monotonic().

In rate of change transformer, if the monotonic time is available we use
to calculate the time delta between samples.

For instance metrics, we set monotonic_time as soon as we poll it from
libvirt, avoiding almost all precision issue.

That makes the rate of change precise to the nanoseconds for polled
samples, while keeping the timestamp identical for all samples polled
during one cycle.

Related-bug: #1527620
Change-Id: I40e14fb6aa595a86df9767be5758f52b7ceafc8f
(cherry picked from commit fd6a76601a)
This commit is contained in:
Mehdi Abaakouk 2017-07-18 08:10:38 +02:00
parent 09b8713a6d
commit 251a06d5c6
16 changed files with 151 additions and 27 deletions

View File

@ -73,7 +73,7 @@ class BaseComputePollster(plugin_base.PollsterBase):
return duration
def _get_samples_per_devices(self, attribute, instance, _name, _type,
_unit):
_unit, monotonic_time=None):
samples = []
for disk, value in six.iteritems(attribute):
samples.append(util.make_sample_from_instance(
@ -85,5 +85,6 @@ class BaseComputePollster(plugin_base.PollsterBase):
volume=value,
resource_id="%s-%s" % (instance.id, disk),
additional_metadata={'disk_name': disk},
monotonic_time=monotonic_time,
))
return samples

View File

@ -14,6 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import monotonic
from oslo_log import log
import ceilometer
@ -46,6 +48,7 @@ class CPUPollster(pollsters.BaseComputePollster):
unit='ns',
volume=cpu_info.time,
additional_metadata=cpu_num,
monotonic_time=monotonic.monotonic()
)
except virt_inspector.InstanceNotFoundException as err:
# Instance was deleted while getting samples. Ignore it.

View File

@ -17,6 +17,7 @@
import abc
import collections
import monotonic
from oslo_log import log
import six
@ -33,7 +34,7 @@ LOG = log.getLogger(__name__)
DiskIOData = collections.namedtuple(
'DiskIOData',
'r_bytes r_requests w_bytes w_requests per_disk_requests',
'r_bytes r_requests w_bytes w_requests per_disk_requests polled_time',
)
DiskRateData = collections.namedtuple('DiskRateData',
@ -103,12 +104,14 @@ class _Base(pollsters.BaseComputePollster):
'write_bytes': per_device_write_bytes,
'write_requests': per_device_write_requests,
}
polled_time = monotonic.monotonic()
i_cache[instance.id] = DiskIOData(
r_bytes=r_bytes,
r_requests=r_requests,
w_bytes=w_bytes,
w_requests=w_requests,
per_disk_requests=per_device_requests,
polled_time=polled_time
)
return i_cache[instance.id]
@ -128,13 +131,15 @@ class _Base(pollsters.BaseComputePollster):
volume=getattr(c_data, _volume),
additional_metadata={
'device': c_data.per_disk_requests[_metadata].keys()},
monotonic_time=c_data.polled_time,
)]
def _get_samples_per_device(self, c_data, _attr, instance, _name, _unit):
"""Return one or more Samples for meter 'disk.device.*'"""
return self._get_samples_per_devices(c_data.per_disk_requests[_attr],
instance, _name,
sample.TYPE_CUMULATIVE, _unit)
sample.TYPE_CUMULATIVE, _unit,
c_data.polled_time)
def get_samples(self, manager, cache, resources):
for instance in resources:

View File

@ -16,6 +16,7 @@
import copy
import monotonic
from oslo_log import log
import ceilometer
@ -34,7 +35,8 @@ class _Base(pollsters.BaseComputePollster):
NET_USAGE_MESSAGE = ' '.join(["NETWORK USAGE:", "%s %s:", "read-bytes=%d",
"write-bytes=%d"])
def make_vnic_sample(self, instance, name, type, unit, volume, vnic_data):
def make_vnic_sample(self, instance, name, type, unit, volume, vnic_data,
monotonic_time):
metadata = copy.copy(vnic_data)
additional_metadata = dict(zip(metadata._fields, metadata))
if vnic_data.fref is not None:
@ -53,7 +55,8 @@ class _Base(pollsters.BaseComputePollster):
unit=unit,
volume=volume,
resource_id=rid,
additional_metadata=additional_metadata
additional_metadata=additional_metadata,
monotonic_time=monotonic_time,
)
CACHE_KEY_VNIC = 'vnics'
@ -72,9 +75,10 @@ class _Base(pollsters.BaseComputePollster):
def _get_vnics_for_instance(self, cache, inspector, instance):
i_cache = cache.setdefault(self.CACHE_KEY_VNIC, {})
if instance.id not in i_cache:
i_cache[instance.id] = list(
self._get_vnic_info(inspector, instance)
)
data = list(self._get_vnic_info(inspector, instance))
polled_time = monotonic.monotonic()
i_cache[instance.id] = [(vnic, info, polled_time)
for vnic, info in data]
return i_cache[instance.id]
def get_samples(self, manager, cache, resources):
@ -88,11 +92,11 @@ class _Base(pollsters.BaseComputePollster):
self.inspector,
instance,
)
for vnic, info in vnics:
for vnic, info, polled_time in vnics:
LOG.debug(self.NET_USAGE_MESSAGE, instance_name,
vnic.name, self._get_rx_info(info),
self._get_tx_info(info))
yield self._get_sample(instance, vnic, info)
yield self._get_sample(instance, vnic, info, polled_time)
except virt_inspector.InstanceNotFoundException as err:
# Instance was deleted while getting samples. Ignore it.
LOG.debug('Exception while getting samples %s', err)
@ -179,7 +183,7 @@ class _ErrorsBase(_Base):
class IncomingBytesPollster(_Base):
def _get_sample(self, instance, vnic, info):
def _get_sample(self, instance, vnic, info, polled_time):
return self.make_vnic_sample(
instance,
name='network.incoming.bytes',
@ -187,12 +191,13 @@ class IncomingBytesPollster(_Base):
unit='B',
volume=info.rx_bytes,
vnic_data=vnic,
monotonic_time=polled_time,
)
class IncomingPacketsPollster(_PacketsBase):
def _get_sample(self, instance, vnic, info):
def _get_sample(self, instance, vnic, info, polled_time):
return self.make_vnic_sample(
instance,
name='network.incoming.packets',
@ -200,12 +205,13 @@ class IncomingPacketsPollster(_PacketsBase):
unit='packet',
volume=info.rx_packets,
vnic_data=vnic,
monotonic_time=polled_time,
)
class OutgoingBytesPollster(_Base):
def _get_sample(self, instance, vnic, info):
def _get_sample(self, instance, vnic, info, polled_time):
return self.make_vnic_sample(
instance,
name='network.outgoing.bytes',
@ -213,12 +219,13 @@ class OutgoingBytesPollster(_Base):
unit='B',
volume=info.tx_bytes,
vnic_data=vnic,
monotonic_time=polled_time,
)
class OutgoingPacketsPollster(_PacketsBase):
def _get_sample(self, instance, vnic, info):
def _get_sample(self, instance, vnic, info, polled_time):
return self.make_vnic_sample(
instance,
name='network.outgoing.packets',
@ -226,12 +233,13 @@ class OutgoingPacketsPollster(_PacketsBase):
unit='packet',
volume=info.tx_packets,
vnic_data=vnic,
monotonic_time=polled_time,
)
class IncomingBytesRatePollster(_RateBase):
def _get_sample(self, instance, vnic, info):
def _get_sample(self, instance, vnic, info, polled_time):
return self.make_vnic_sample(
instance,
name='network.incoming.bytes.rate',
@ -239,12 +247,13 @@ class IncomingBytesRatePollster(_RateBase):
unit='B/s',
volume=info.rx_bytes_rate,
vnic_data=vnic,
monotonic_time=polled_time,
)
class OutgoingBytesRatePollster(_RateBase):
def _get_sample(self, instance, vnic, info):
def _get_sample(self, instance, vnic, info, polled_time):
return self.make_vnic_sample(
instance,
name='network.outgoing.bytes.rate',
@ -252,12 +261,13 @@ class OutgoingBytesRatePollster(_RateBase):
unit='B/s',
volume=info.tx_bytes_rate,
vnic_data=vnic,
monotonic_time=polled_time,
)
class IncomingDropPollster(_DropBase):
def _get_sample(self, instance, vnic, info):
def _get_sample(self, instance, vnic, info, polled_time):
return self.make_vnic_sample(
instance,
name='network.incoming.packets.drop',
@ -265,12 +275,13 @@ class IncomingDropPollster(_DropBase):
unit='packet',
volume=info.rx_drop,
vnic_data=vnic,
monotonic_time=polled_time,
)
class OutgoingDropPollster(_DropBase):
def _get_sample(self, instance, vnic, info):
def _get_sample(self, instance, vnic, info, polled_time):
return self.make_vnic_sample(
instance,
name='network.outgoing.packets.drop',
@ -278,12 +289,13 @@ class OutgoingDropPollster(_DropBase):
unit='packet',
volume=info.tx_drop,
vnic_data=vnic,
monotonic_time=polled_time,
)
class IncomingErrorsPollster(_ErrorsBase):
def _get_sample(self, instance, vnic, info):
def _get_sample(self, instance, vnic, info, polled_time):
return self.make_vnic_sample(
instance,
name='network.incoming.packets.error',
@ -291,12 +303,13 @@ class IncomingErrorsPollster(_ErrorsBase):
unit='packet',
volume=info.rx_errors,
vnic_data=vnic,
monotonic_time=polled_time,
)
class OutgoingErrorsPollster(_ErrorsBase):
def _get_sample(self, instance, vnic, info):
def _get_sample(self, instance, vnic, info, polled_time):
return self.make_vnic_sample(
instance,
name='network.outgoing.packets.error',
@ -304,4 +317,5 @@ class OutgoingErrorsPollster(_ErrorsBase):
unit='packet',
volume=info.tx_errors,
vnic_data=vnic,
monotonic_time=polled_time,
)

View File

@ -76,7 +76,8 @@ def _get_metadata_from_object(conf, instance):
def make_sample_from_instance(conf, instance, name, type, unit, volume,
resource_id=None, additional_metadata=None):
resource_id=None, additional_metadata=None,
monotonic_time=None):
additional_metadata = additional_metadata or {}
resource_metadata = _get_metadata_from_object(conf, instance)
resource_metadata.update(additional_metadata)
@ -89,6 +90,7 @@ def make_sample_from_instance(conf, instance, name, type, unit, volume,
project_id=instance.tenant_id,
resource_id=resource_id or instance.id,
resource_metadata=resource_metadata,
monotonic_time=monotonic_time,
)

View File

@ -110,7 +110,10 @@ class SamplePipelineEndpoint(PipelineEndpoint):
resource_id=s['resource_id'],
timestamp=s['timestamp'],
resource_metadata=s['resource_metadata'],
source=s.get('source'))
source=s.get('source'),
# NOTE(sileht): May come from an older node,
# Put None in this case.
monotonic_time=s.get('monotonic_time'))
for s in samples if publisher_utils.verify_signature(
s, self.conf.publisher.telemetry_secret)
]

View File

@ -98,6 +98,7 @@ def meter_message_from_counter(sample, secret):
'timestamp': sample.timestamp,
'resource_metadata': sample.resource_metadata,
'message_id': sample.id,
'monotonic_time': sample.monotonic_time,
}
msg['message_signature'] = compute_signature(msg, secret)
return msg

View File

@ -95,7 +95,7 @@ class Sample(object):
def __init__(self, name, type, unit, volume, user_id, project_id,
resource_id, timestamp=None, resource_metadata=None,
source=None, id=None):
source=None, id=None, monotonic_time=None):
self.name = name
self.type = type
self.unit = unit
@ -107,6 +107,7 @@ class Sample(object):
self.resource_metadata = resource_metadata or {}
self.source = source or self.SOURCE_DEFAULT
self.id = id or str(uuid.uuid1())
self.monotonic_time = monotonic_time
def as_dict(self):
return copy.copy(self.__dict__)

View File

@ -143,6 +143,10 @@ class Connection(hbase_base.Connection, base.Connection):
:param data: a dictionary such as returned by
ceilometer.publisher.utils.meter_message_from_counter
"""
# We must not record thing.
data.pop("monotonic_time", None)
with self.conn_pool.connection() as conn:
resource_table = conn.table(self.RESOURCE_TABLE)
meter_table = conn.table(self.METER_TABLE)

View File

@ -259,6 +259,13 @@ class Connection(pymongo_base.Connection):
# unconditionally insert sample timestamps and resource metadata
# (in the update case, this must be conditional on the sample not
# being out-of-order)
# We must not store this
samples = copy.deepcopy(samples)
for sample in samples:
sample.pop("monotonic_time", None)
sorted_samples = sorted(
copy.deepcopy(samples),
key=lambda s: (s['resource_id'], s['timestamp']))

View File

@ -80,6 +80,7 @@ class TestPostSamples(v2.FunctionalTest):
s1[0]['source'] = '%s:openstack' % s1[0]['project_id']
self.assertEqual(s1, data.json)
s1[0]["monotonic_time"] = None
self.assertEqual(s1[0], self.published[0][0])
def test_nested_metadata(self):
@ -107,6 +108,7 @@ class TestPostSamples(v2.FunctionalTest):
unwound['resource_metadata'] = {'nest': {'name1': 'value1',
'name2': 'value3'},
'name2': 'value2'}
unwound["monotonic_time"] = None
# only the published sample should be unwound, not the representation
# in the API response
self.assertEqual(s1[0], data.json[0])
@ -218,6 +220,7 @@ class TestPostSamples(v2.FunctionalTest):
msg['timestamp'] = timestamp.replace(tzinfo=None).isoformat()
self.assertEqual(s, c)
s["monotonic_time"] = None
self.assertEqual(s, self.published[0][x])
def test_missing_mandatory_fields(self):
@ -278,6 +281,7 @@ class TestPostSamples(v2.FunctionalTest):
s['timestamp'] = data.json[x]['timestamp']
s.setdefault('resource_metadata', dict())
self.assertEqual(s, data.json[x])
s['monotonic_time'] = None
self.assertEqual(s, self.published[0][x])
def test_multiple_samples_multiple_sources(self):
@ -328,6 +332,7 @@ class TestPostSamples(v2.FunctionalTest):
s['timestamp'] = data.json[x]['timestamp']
s.setdefault('resource_metadata', dict())
self.assertEqual(s, data.json[x])
s['monotonic_time'] = None
self.assertEqual(s, self.published[0][x])
def test_missing_project_user_id(self):
@ -364,4 +369,6 @@ class TestPostSamples(v2.FunctionalTest):
s['project_id'] = project_id
self.assertEqual(s, data.json[x])
s['monotonic_time'] = None
self.assertEqual(s, self.published[0][x])

View File

@ -430,6 +430,7 @@ class RawSampleTest(DBTestBase):
d = meter.as_dict()
self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at'])
del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.msgs[:3])
def test_get_samples_by_user_limit(self):
@ -450,6 +451,7 @@ class RawSampleTest(DBTestBase):
d = meter.as_dict()
self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at'])
del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.msgs[:4])
def test_get_samples_by_resource(self):
@ -459,6 +461,7 @@ class RawSampleTest(DBTestBase):
d = results[1].as_dict()
self.assertEqual(timeutils.utcnow(), d['recorded_at'])
del d['recorded_at']
d['monotonic_time'] = None
self.assertEqual(self.msgs[0], d)
def test_get_samples_by_metaquery(self):
@ -470,6 +473,7 @@ class RawSampleTest(DBTestBase):
d = meter.as_dict()
self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at'])
del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.msgs)
def test_get_samples_by_metaquery_key_with_dot_in_metadata(self):
@ -721,6 +725,7 @@ class ComplexSampleQueryTest(DBTestBase):
for sample_item in results:
d = sample_item.as_dict()
del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.msgs)
def test_query_complex_filter_with_regexp(self):
@ -2785,6 +2790,7 @@ class TestBatchRecordingMetering(tests_db.TestBase):
for sample_item in results:
d = sample_item.as_dict()
del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.sample_dicts)
resources = list(self.conn.get_resources())

View File

@ -234,7 +234,8 @@ class TestNetPollster(base.TestPollsterBase):
type=sample.TYPE_CUMULATIVE,
unit='B',
volume=100,
vnic_data=self.vnic0)
vnic_data=self.vnic0,
monotonic_time=123.123)
user_metadata = sm.resource_metadata['user_metadata']
expected = self.INSTANCE_PROPERTIES[
@ -259,7 +260,7 @@ class TestNetPollsterCache(base.TestPollsterBase):
rx_drop=20, rx_errors=21,
tx_bytes=3, tx_packets=4,
tx_drop=22, tx_errors=23)
vnics = [(vnic0, stats0)]
vnics = [(vnic0, stats0, 123.123)]
mgr = manager.AgentManager(0, self.CONF)
pollster = factory(self.CONF)

View File

@ -21,6 +21,7 @@ import traceback
import unittest
import mock
import monotonic
from oslo_config import fixture as fixture_config
from oslo_utils import timeutils
from oslotest import mockpatch
@ -1097,6 +1098,67 @@ class BasePipelineTestCase(base.BaseTestCase):
pipe.flush()
self.assertEqual(0, len(publisher.samples))
def test_rate_of_change_precision(self):
s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
transformer_cfg = [
{
'name': 'rate_of_change',
'parameters': {
'source': {},
'target': {'name': 'cpu_util',
'unit': '%',
'type': sample.TYPE_GAUGE,
'scale': s}
}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('meters', ['cpu'])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
now = timeutils.utcnow()
now_time = monotonic.monotonic()
# Simulate a laggy poller
later = now + datetime.timedelta(seconds=12345)
later_time = now_time + 10
counters = [
sample.Sample(
name='cpu',
type=sample.TYPE_CUMULATIVE,
volume=125000000000,
unit='ns',
user_id='test_user',
project_id='test_proj',
resource_id='test_resource',
timestamp=now.isoformat(),
monotonic_time=now_time,
resource_metadata={'cpu_number': 4}
),
sample.Sample(
name='cpu',
type=sample.TYPE_CUMULATIVE,
volume=165000000000,
unit='ns',
user_id='test_user',
project_id='test_proj',
resource_id='test_resource',
timestamp=later.isoformat(),
monotonic_time=later_time,
resource_metadata={'cpu_number': 4}
),
]
pipe.publish_data(counters)
publisher = pipe.publishers[0]
self.assertEqual(1, len(publisher.samples))
cpu_util_sample = publisher.samples[0]
self.assertEqual(100, cpu_util_sample.volume)
def test_rate_of_change_max(self):
s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
transformer_cfg = [

View File

@ -187,12 +187,18 @@ class RateOfChangeTransformer(ScalingTransformer):
key = s.name + s.resource_id
prev = self.cache.get(key)
timestamp = timeutils.parse_isotime(s.timestamp)
self.cache[key] = (s.volume, timestamp)
self.cache[key] = (s.volume, timestamp, s.monotonic_time)
if prev:
prev_volume = prev[0]
prev_timestamp = prev[1]
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
prev_monotonic_time = prev[2]
if (prev_monotonic_time is not None and
s.monotonic_time is not None):
# NOTE(sileht): Prefer high precision timer
time_delta = s.monotonic_time - prev_monotonic_time
else:
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
# disallow violations of the arrow of time
if time_delta < 0:
LOG.warning(_('dropping out of time order sample: %s'), (s,))

View File

@ -13,6 +13,7 @@ jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
kafka-python>=1.3.1 # Apache-2.0
keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0
lxml>=2.3 # BSD
monotonic
msgpack-python>=0.4.0 # Apache-2.0
oslo.concurrency>=3.5.0 # Apache-2.0
oslo.config>=3.9.0 # Apache-2.0