carbonara: fix SplitKey with datetime greater than 32bits value
Current implementation based on pandas.Timestamp can't handle keys that
go further than 2^32 seconds after epoch, which makes e.g. archive
policies with very high granularity failing.
Change-Id: Idb81345544cc25e36447473e5115d9d856766c83
(cherry picked from commit 9a95873f40
)
This commit is contained in:
parent
67cdbb737a
commit
175d8bd3bf
|
@ -27,13 +27,10 @@ import re
|
|||
import struct
|
||||
import time
|
||||
|
||||
import iso8601
|
||||
import lz4
|
||||
import pandas
|
||||
import six
|
||||
|
||||
from gnocchi import utils
|
||||
|
||||
# NOTE(sileht): pandas relies on time.strptime()
|
||||
# and often triggers http://bugs.python.org/issue7980
|
||||
# its dues to our heavy threads usage, this is the workaround
|
||||
|
@ -325,7 +322,8 @@ class BoundTimeSerie(TimeSerie):
|
|||
self.ts = self.ts[self.first_block_timestamp():]
|
||||
|
||||
|
||||
class SplitKey(pandas.Timestamp):
|
||||
@functools.total_ordering
|
||||
class SplitKey(object):
|
||||
"""A class representing a split key.
|
||||
|
||||
A split key is basically a timestamp that can be used to split
|
||||
|
@ -336,33 +334,30 @@ class SplitKey(pandas.Timestamp):
|
|||
|
||||
POINTS_PER_SPLIT = 3600
|
||||
|
||||
@classmethod
|
||||
def _init(cls, value, sampling):
|
||||
# NOTE(jd) This should be __init__ but it does not work, because of…
|
||||
# Pandas, Cython, whatever.
|
||||
self = cls(value)
|
||||
self._carbonara_sampling = sampling
|
||||
return self
|
||||
def __init__(self, value, sampling):
|
||||
if isinstance(value, SplitKey):
|
||||
self.key = value.key
|
||||
elif isinstance(value, pandas.Timestamp):
|
||||
self.key = value.value / 10e8
|
||||
else:
|
||||
self.key = float(value)
|
||||
|
||||
self._carbonara_sampling = float(sampling)
|
||||
|
||||
@classmethod
|
||||
def from_timestamp_and_sampling(cls, timestamp, sampling):
|
||||
return cls._init(
|
||||
return cls(
|
||||
round_timestamp(
|
||||
timestamp, freq=sampling * cls.POINTS_PER_SPLIT * 10e8),
|
||||
sampling)
|
||||
|
||||
@classmethod
|
||||
def from_key_string(cls, keystr, sampling):
|
||||
return cls._init(float(keystr) * 10e8, sampling)
|
||||
|
||||
def __next__(self):
|
||||
"""Get the split key of the next split.
|
||||
|
||||
:return: A `SplitKey` object.
|
||||
"""
|
||||
return self._init(
|
||||
self + datetime.timedelta(
|
||||
seconds=(self.POINTS_PER_SPLIT * self._carbonara_sampling)),
|
||||
return self.__class__(
|
||||
self.key + self._carbonara_sampling * self.POINTS_PER_SPLIT,
|
||||
self._carbonara_sampling)
|
||||
|
||||
next = __next__
|
||||
|
@ -370,18 +365,35 @@ class SplitKey(pandas.Timestamp):
|
|||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.key)
|
||||
|
||||
def __lt__(self, other):
|
||||
if isinstance(other, SplitKey):
|
||||
return self.key < other.key
|
||||
if isinstance(other, pandas.Timestamp):
|
||||
return self.key * 10e8 < other.value
|
||||
return self.key < other
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, SplitKey):
|
||||
return self.key == other.key
|
||||
if isinstance(other, pandas.Timestamp):
|
||||
return self.key * 10e8 == other.value
|
||||
return self.key == other
|
||||
|
||||
def __str__(self):
|
||||
return str(float(self))
|
||||
|
||||
def __float__(self):
|
||||
ts = self.to_datetime()
|
||||
if ts.tzinfo is None:
|
||||
ts = ts.replace(tzinfo=iso8601.iso8601.UTC)
|
||||
return utils.datetime_to_unix(ts)
|
||||
return self.key
|
||||
|
||||
def as_datetime(self):
|
||||
return pandas.Timestamp(self.key, unit='s')
|
||||
|
||||
def __repr__(self):
|
||||
return "<%s: %s / %fs>" % (self.__class__.__name__,
|
||||
pandas.Timestamp.__repr__(self),
|
||||
repr(self.key),
|
||||
self._carbonara_sampling)
|
||||
|
||||
|
||||
|
@ -436,7 +448,7 @@ class AggregatedTimeSerie(TimeSerie):
|
|||
groupby = self.ts.groupby(functools.partial(
|
||||
SplitKey.from_timestamp_and_sampling, sampling=self.sampling))
|
||||
for group, ts in groupby:
|
||||
yield (SplitKey._init(group, self.sampling),
|
||||
yield (SplitKey(group, self.sampling),
|
||||
AggregatedTimeSerie(self.sampling, self.aggregation_method,
|
||||
ts))
|
||||
|
||||
|
@ -544,7 +556,10 @@ class AggregatedTimeSerie(TimeSerie):
|
|||
if not self.ts.index.is_monotonic:
|
||||
self.ts = self.ts.sort_index()
|
||||
offset_div = self.sampling * 10e8
|
||||
start = pandas.Timestamp(start).value
|
||||
if isinstance(start, SplitKey):
|
||||
start = start.as_datetime().value
|
||||
else:
|
||||
start = pandas.Timestamp(start).value
|
||||
# calculate how many seconds from start the series runs until and
|
||||
# initialize list to store alternating delimiter, float entries
|
||||
if compressed:
|
||||
|
|
|
@ -239,7 +239,7 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
oldest_mutable_timestamp):
|
||||
# NOTE(jd) We write the full split only if the driver works that way
|
||||
# (self.WRITE_FULL) or if the oldest_mutable_timestamp is out of range.
|
||||
write_full = self.WRITE_FULL or oldest_mutable_timestamp >= next(key)
|
||||
write_full = self.WRITE_FULL or next(key) < oldest_mutable_timestamp
|
||||
key_as_str = str(key)
|
||||
if write_full:
|
||||
try:
|
||||
|
@ -301,7 +301,7 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
archive_policy_def.granularity)
|
||||
existing_keys.remove(key)
|
||||
else:
|
||||
oldest_key_to_keep = carbonara.SplitKey(0)
|
||||
oldest_key_to_keep = carbonara.SplitKey(0, 0)
|
||||
|
||||
# Rewrite all read-only splits just for fun (and compression). This
|
||||
# only happens if `previous_oldest_mutable_timestamp' exists, which
|
||||
|
@ -319,8 +319,8 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
# NOTE(jd) Rewrite it entirely for fun (and later for
|
||||
# compression). For that, we just pass None as split.
|
||||
self._store_timeserie_split(
|
||||
metric, carbonara.SplitKey.from_key_string(
|
||||
key, archive_policy_def.granularity),
|
||||
metric, carbonara.SplitKey(
|
||||
float(key), archive_policy_def.granularity),
|
||||
None, aggregation, archive_policy_def,
|
||||
oldest_mutable_timestamp)
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Copyright © 2014-2015 eNovance
|
||||
# Copyright © 2014-2016 eNovance
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
|
@ -932,21 +932,36 @@ class TestAggregatedTimeSerie(base.BaseTestCase):
|
|||
self.assertEqual(
|
||||
datetime.datetime(2014, 10, 7),
|
||||
carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600))
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600).as_datetime())
|
||||
self.assertEqual(
|
||||
datetime.datetime(2014, 12, 31, 18),
|
||||
carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 58))
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 58).as_datetime())
|
||||
self.assertEqual(
|
||||
1420048800.0,
|
||||
float(carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 58)))
|
||||
|
||||
key = carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600)
|
||||
|
||||
self.assertGreater(key, pandas.Timestamp(0))
|
||||
|
||||
self.assertGreaterEqual(key, pandas.Timestamp(0))
|
||||
|
||||
def test_split_key_next(self):
|
||||
self.assertEqual(
|
||||
datetime.datetime(2015, 3, 6),
|
||||
next(carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600)))
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600)).as_datetime())
|
||||
self.assertEqual(
|
||||
datetime.datetime(2015, 8, 3),
|
||||
next(next(carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600))))
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600))).as_datetime())
|
||||
self.assertEqual(
|
||||
113529600000.0,
|
||||
float(next(carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600 * 24 * 365))))
|
||||
|
||||
def test_split(self):
|
||||
sampling = 5
|
||||
|
@ -964,10 +979,10 @@ class TestAggregatedTimeSerie(base.BaseTestCase):
|
|||
/ carbonara.SplitKey.POINTS_PER_SPLIT),
|
||||
len(grouped_points))
|
||||
self.assertEqual("0.0",
|
||||
str(carbonara.SplitKey(grouped_points[0][0])))
|
||||
str(carbonara.SplitKey(grouped_points[0][0], 0)))
|
||||
# 3600 × 5s = 5 hours
|
||||
self.assertEqual(datetime.datetime(1970, 1, 1, 5),
|
||||
grouped_points[1][0])
|
||||
grouped_points[1][0].as_datetime())
|
||||
self.assertEqual(carbonara.SplitKey.POINTS_PER_SPLIT,
|
||||
len(grouped_points[0][1]))
|
||||
|
||||
|
|
Loading…
Reference in New Issue