From a9c1383992ed2039cf42beee988e81113a7d33e9 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Wed, 11 Jan 2017 19:03:45 +0100 Subject: [PATCH] ceph: fix data compression when oldest_mutable_timestamp == next(key) Change-Id: I1c66b720d2c1424f022898ed4afd9ca820965b68 Closes-Bug: #1655422 --- gnocchi/storage/_carbonara.py | 2 +- gnocchi/tests/test_storage.py | 88 +++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/gnocchi/storage/_carbonara.py b/gnocchi/storage/_carbonara.py index c418dff4..e3d934c4 100644 --- a/gnocchi/storage/_carbonara.py +++ b/gnocchi/storage/_carbonara.py @@ -230,7 +230,7 @@ class CarbonaraBasedStorage(storage.StorageDriver): oldest_mutable_timestamp): # NOTE(jd) We write the full split only if the driver works that way # (self.WRITE_FULL) or if the oldest_mutable_timestamp is out of range. - write_full = self.WRITE_FULL or next(key) < oldest_mutable_timestamp + write_full = self.WRITE_FULL or next(key) <= oldest_mutable_timestamp key_as_str = str(key) if write_full: try: diff --git a/gnocchi/tests/test_storage.py b/gnocchi/tests/test_storage.py index 96d64fa1..eb32ad5e 100644 --- a/gnocchi/tests/test_storage.py +++ b/gnocchi/tests/test_storage.py @@ -310,6 +310,94 @@ class TestStorageDriver(tests_base.TestCase): (utils.datetime_utc(2016, 1, 10, 17, 12), 60.0, 46), ], self.storage.get_measures(self.metric, granularity=60.0)) + def test_rewrite_measures_oldest_mutable_timestamp_eq_next_key(self): + """See LP#1655422""" + # Create an archive policy that spans on several splits. Each split + # being 3600 points, let's go for 36k points so we have 10 splits. + apname = str(uuid.uuid4()) + ap = archive_policy.ArchivePolicy(apname, 0, [(36000, 60)]) + self.index.create_archive_policy(ap) + self.metric = storage.Metric(uuid.uuid4(), ap) + self.index.create_metric(self.metric.id, str(uuid.uuid4()), + str(uuid.uuid4()), + apname) + + # First store some points scattered across different splits + self.storage.add_measures(self.metric, [ + storage.Measure(utils.datetime_utc(2016, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2016, 1, 2, 13, 7, 31), 42), + storage.Measure(utils.datetime_utc(2016, 1, 4, 14, 9, 31), 4), + storage.Measure(utils.datetime_utc(2016, 1, 6, 15, 12, 45), 44), + ]) + self.trigger_processing() + + splits = {'1451520000.0', '1451736000.0', '1451952000.0'} + self.assertEqual(splits, + self.storage._list_split_keys_for_metric( + self.metric, "mean", 60.0)) + + if self.storage.WRITE_FULL: + assertCompressedIfWriteFull = self.assertTrue + else: + assertCompressedIfWriteFull = self.assertFalse + + data = self.storage._get_measures( + self.metric, '1451520000.0', "mean", 60.0) + self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data)) + data = self.storage._get_measures( + self.metric, '1451736000.0', "mean", 60.0) + self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data)) + data = self.storage._get_measures( + self.metric, '1451952000.0', "mean", 60.0) + assertCompressedIfWriteFull( + carbonara.AggregatedTimeSerie.is_compressed(data)) + + self.assertEqual([ + (utils.datetime_utc(2016, 1, 1, 12), 60.0, 69), + (utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42), + (utils.datetime_utc(2016, 1, 4, 14, 9), 60.0, 4), + (utils.datetime_utc(2016, 1, 6, 15, 12), 60.0, 44), + ], self.storage.get_measures(self.metric, granularity=60.0)) + + # Now store brand new points that should force a rewrite of one of the + # split (keep in mind the back window size in one hour here). We move + # the BoundTimeSerie processing timeserie far away from its current + # range. + + # Here we test a special case where the oldest_mutable_timestamp will + # be 2016-01-10TOO:OO:OO = 1452384000.0, our new split key. + self.storage.add_measures(self.metric, [ + storage.Measure(utils.datetime_utc(2016, 1, 10, 0, 12), 45), + ]) + self.trigger_processing() + + self.assertEqual({'1452384000.0', '1451736000.0', + '1451520000.0', '1451952000.0'}, + self.storage._list_split_keys_for_metric( + self.metric, "mean", 60.0)) + data = self.storage._get_measures( + self.metric, '1451520000.0', "mean", 60.0) + self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data)) + data = self.storage._get_measures( + self.metric, '1451736000.0', "mean", 60.0) + self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data)) + data = self.storage._get_measures( + self.metric, '1451952000.0', "mean", 60.0) + # Now this one is compressed because it has been rewritten! + self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data)) + data = self.storage._get_measures( + self.metric, '1452384000.0', "mean", 60.0) + assertCompressedIfWriteFull( + carbonara.AggregatedTimeSerie.is_compressed(data)) + + self.assertEqual([ + (utils.datetime_utc(2016, 1, 1, 12), 60.0, 69), + (utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42), + (utils.datetime_utc(2016, 1, 4, 14, 9), 60.0, 4), + (utils.datetime_utc(2016, 1, 6, 15, 12), 60.0, 44), + (utils.datetime_utc(2016, 1, 10, 0, 12), 60.0, 45), + ], self.storage.get_measures(self.metric, granularity=60.0)) + def test_rewrite_measures_corruption_missing_file(self): # Create an archive policy that spans on several splits. Each split # being 3600 points, let's go for 36k points so we have 10 splits.