ceph: fix data compression when oldest_mutable_timestamp == next(key)
Change-Id: I1c66b720d2c1424f022898ed4afd9ca820965b68 Closes-Bug: #1655422
This commit is contained in:
parent
bb687e1715
commit
a9c1383992
|
@ -230,7 +230,7 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
oldest_mutable_timestamp):
|
||||
# NOTE(jd) We write the full split only if the driver works that way
|
||||
# (self.WRITE_FULL) or if the oldest_mutable_timestamp is out of range.
|
||||
write_full = self.WRITE_FULL or next(key) < oldest_mutable_timestamp
|
||||
write_full = self.WRITE_FULL or next(key) <= oldest_mutable_timestamp
|
||||
key_as_str = str(key)
|
||||
if write_full:
|
||||
try:
|
||||
|
|
|
@ -310,6 +310,94 @@ class TestStorageDriver(tests_base.TestCase):
|
|||
(utils.datetime_utc(2016, 1, 10, 17, 12), 60.0, 46),
|
||||
], self.storage.get_measures(self.metric, granularity=60.0))
|
||||
|
||||
def test_rewrite_measures_oldest_mutable_timestamp_eq_next_key(self):
|
||||
"""See LP#1655422"""
|
||||
# Create an archive policy that spans on several splits. Each split
|
||||
# being 3600 points, let's go for 36k points so we have 10 splits.
|
||||
apname = str(uuid.uuid4())
|
||||
ap = archive_policy.ArchivePolicy(apname, 0, [(36000, 60)])
|
||||
self.index.create_archive_policy(ap)
|
||||
self.metric = storage.Metric(uuid.uuid4(), ap)
|
||||
self.index.create_metric(self.metric.id, str(uuid.uuid4()),
|
||||
str(uuid.uuid4()),
|
||||
apname)
|
||||
|
||||
# First store some points scattered across different splits
|
||||
self.storage.add_measures(self.metric, [
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 1, 12, 0, 1), 69),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 2, 13, 7, 31), 42),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 4, 14, 9, 31), 4),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 6, 15, 12, 45), 44),
|
||||
])
|
||||
self.trigger_processing()
|
||||
|
||||
splits = {'1451520000.0', '1451736000.0', '1451952000.0'}
|
||||
self.assertEqual(splits,
|
||||
self.storage._list_split_keys_for_metric(
|
||||
self.metric, "mean", 60.0))
|
||||
|
||||
if self.storage.WRITE_FULL:
|
||||
assertCompressedIfWriteFull = self.assertTrue
|
||||
else:
|
||||
assertCompressedIfWriteFull = self.assertFalse
|
||||
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451520000.0', "mean", 60.0)
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451736000.0', "mean", 60.0)
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451952000.0', "mean", 60.0)
|
||||
assertCompressedIfWriteFull(
|
||||
carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
|
||||
self.assertEqual([
|
||||
(utils.datetime_utc(2016, 1, 1, 12), 60.0, 69),
|
||||
(utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42),
|
||||
(utils.datetime_utc(2016, 1, 4, 14, 9), 60.0, 4),
|
||||
(utils.datetime_utc(2016, 1, 6, 15, 12), 60.0, 44),
|
||||
], self.storage.get_measures(self.metric, granularity=60.0))
|
||||
|
||||
# Now store brand new points that should force a rewrite of one of the
|
||||
# split (keep in mind the back window size in one hour here). We move
|
||||
# the BoundTimeSerie processing timeserie far away from its current
|
||||
# range.
|
||||
|
||||
# Here we test a special case where the oldest_mutable_timestamp will
|
||||
# be 2016-01-10TOO:OO:OO = 1452384000.0, our new split key.
|
||||
self.storage.add_measures(self.metric, [
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 10, 0, 12), 45),
|
||||
])
|
||||
self.trigger_processing()
|
||||
|
||||
self.assertEqual({'1452384000.0', '1451736000.0',
|
||||
'1451520000.0', '1451952000.0'},
|
||||
self.storage._list_split_keys_for_metric(
|
||||
self.metric, "mean", 60.0))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451520000.0', "mean", 60.0)
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451736000.0', "mean", 60.0)
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451952000.0', "mean", 60.0)
|
||||
# Now this one is compressed because it has been rewritten!
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1452384000.0', "mean", 60.0)
|
||||
assertCompressedIfWriteFull(
|
||||
carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
|
||||
self.assertEqual([
|
||||
(utils.datetime_utc(2016, 1, 1, 12), 60.0, 69),
|
||||
(utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42),
|
||||
(utils.datetime_utc(2016, 1, 4, 14, 9), 60.0, 4),
|
||||
(utils.datetime_utc(2016, 1, 6, 15, 12), 60.0, 44),
|
||||
(utils.datetime_utc(2016, 1, 10, 0, 12), 60.0, 45),
|
||||
], self.storage.get_measures(self.metric, granularity=60.0))
|
||||
|
||||
def test_rewrite_measures_corruption_missing_file(self):
|
||||
# Create an archive policy that spans on several splits. Each split
|
||||
# being 3600 points, let's go for 36k points so we have 10 splits.
|
||||
|
|
Loading…
Reference in New Issue