ceph: fix data compression when oldest_mutable_timestamp == next(key)

Change-Id: I1c66b720d2c1424f022898ed4afd9ca820965b68
Closes-Bug: #1655422
This commit is contained in:
Julien Danjou 2017-01-11 19:03:45 +01:00
parent bb687e1715
commit a9c1383992
2 changed files with 89 additions and 1 deletions

View File

@ -230,7 +230,7 @@ class CarbonaraBasedStorage(storage.StorageDriver):
oldest_mutable_timestamp):
# NOTE(jd) We write the full split only if the driver works that way
# (self.WRITE_FULL) or if the oldest_mutable_timestamp is out of range.
write_full = self.WRITE_FULL or next(key) < oldest_mutable_timestamp
write_full = self.WRITE_FULL or next(key) <= oldest_mutable_timestamp
key_as_str = str(key)
if write_full:
try:

View File

@ -310,6 +310,94 @@ class TestStorageDriver(tests_base.TestCase):
(utils.datetime_utc(2016, 1, 10, 17, 12), 60.0, 46),
], self.storage.get_measures(self.metric, granularity=60.0))
def test_rewrite_measures_oldest_mutable_timestamp_eq_next_key(self):
"""See LP#1655422"""
# Create an archive policy that spans on several splits. Each split
# being 3600 points, let's go for 36k points so we have 10 splits.
apname = str(uuid.uuid4())
ap = archive_policy.ArchivePolicy(apname, 0, [(36000, 60)])
self.index.create_archive_policy(ap)
self.metric = storage.Metric(uuid.uuid4(), ap)
self.index.create_metric(self.metric.id, str(uuid.uuid4()),
str(uuid.uuid4()),
apname)
# First store some points scattered across different splits
self.storage.add_measures(self.metric, [
storage.Measure(utils.datetime_utc(2016, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2016, 1, 2, 13, 7, 31), 42),
storage.Measure(utils.datetime_utc(2016, 1, 4, 14, 9, 31), 4),
storage.Measure(utils.datetime_utc(2016, 1, 6, 15, 12, 45), 44),
])
self.trigger_processing()
splits = {'1451520000.0', '1451736000.0', '1451952000.0'}
self.assertEqual(splits,
self.storage._list_split_keys_for_metric(
self.metric, "mean", 60.0))
if self.storage.WRITE_FULL:
assertCompressedIfWriteFull = self.assertTrue
else:
assertCompressedIfWriteFull = self.assertFalse
data = self.storage._get_measures(
self.metric, '1451520000.0', "mean", 60.0)
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
data = self.storage._get_measures(
self.metric, '1451736000.0', "mean", 60.0)
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
data = self.storage._get_measures(
self.metric, '1451952000.0', "mean", 60.0)
assertCompressedIfWriteFull(
carbonara.AggregatedTimeSerie.is_compressed(data))
self.assertEqual([
(utils.datetime_utc(2016, 1, 1, 12), 60.0, 69),
(utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42),
(utils.datetime_utc(2016, 1, 4, 14, 9), 60.0, 4),
(utils.datetime_utc(2016, 1, 6, 15, 12), 60.0, 44),
], self.storage.get_measures(self.metric, granularity=60.0))
# Now store brand new points that should force a rewrite of one of the
# split (keep in mind the back window size in one hour here). We move
# the BoundTimeSerie processing timeserie far away from its current
# range.
# Here we test a special case where the oldest_mutable_timestamp will
# be 2016-01-10TOO:OO:OO = 1452384000.0, our new split key.
self.storage.add_measures(self.metric, [
storage.Measure(utils.datetime_utc(2016, 1, 10, 0, 12), 45),
])
self.trigger_processing()
self.assertEqual({'1452384000.0', '1451736000.0',
'1451520000.0', '1451952000.0'},
self.storage._list_split_keys_for_metric(
self.metric, "mean", 60.0))
data = self.storage._get_measures(
self.metric, '1451520000.0', "mean", 60.0)
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
data = self.storage._get_measures(
self.metric, '1451736000.0', "mean", 60.0)
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
data = self.storage._get_measures(
self.metric, '1451952000.0', "mean", 60.0)
# Now this one is compressed because it has been rewritten!
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
data = self.storage._get_measures(
self.metric, '1452384000.0', "mean", 60.0)
assertCompressedIfWriteFull(
carbonara.AggregatedTimeSerie.is_compressed(data))
self.assertEqual([
(utils.datetime_utc(2016, 1, 1, 12), 60.0, 69),
(utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42),
(utils.datetime_utc(2016, 1, 4, 14, 9), 60.0, 4),
(utils.datetime_utc(2016, 1, 6, 15, 12), 60.0, 44),
(utils.datetime_utc(2016, 1, 10, 0, 12), 60.0, 45),
], self.storage.get_measures(self.metric, granularity=60.0))
def test_rewrite_measures_corruption_missing_file(self):
# Create an archive policy that spans on several splits. Each split
# being 3600 points, let's go for 36k points so we have 10 splits.