carbonara: allow to split AggregatedTimeSerie

This adds get_split_key() that returns a key for a (timestamp, sampling)
tuple, and a configurable chunk size.

This split key is then used to split a TimeSerie in several smaller
TimeSerie based on that key and range between two keys.

Change-Id: I6b172cae7812fdccd93cbf0318938a228d112270
This commit is contained in:
Julien Danjou 2015-12-08 17:09:21 +01:00
parent 59bda86d18
commit b786636d99
3 changed files with 119 additions and 5 deletions

View File

@ -25,6 +25,8 @@ import msgpack
import pandas
import six
from gnocchi import utils
LOG = logging.getLogger(__name__)
@ -143,8 +145,7 @@ class TimeSerie(SerializableMixin):
@staticmethod
def _round_timestamp(ts, freq):
return pandas.Timestamp(
(pandas.Timestamp(ts).value // freq.delta.value)
* freq.delta.value)
(pandas.Timestamp(ts).value // freq) * freq)
@staticmethod
def _to_offset(value):
@ -234,7 +235,8 @@ class BoundTimeSerie(TimeSerie):
return basic
def _first_block_timestamp(self):
rounded = self._round_timestamp(self.ts.index[-1], self.block_size)
rounded = self._round_timestamp(self.ts.index[-1],
self.block_size.delta.value)
return rounded - (self.block_size * self.back_window)
def _truncate(self):
@ -250,6 +252,8 @@ class AggregatedTimeSerie(TimeSerie):
_AGG_METHOD_PCT_RE = re.compile(r"([1-9][0-9]?)pct")
POINTS_PER_SPLIT = 14400
def __init__(self, ts=None, max_size=None,
sampling=None, aggregation_method='mean'):
"""A time serie that is downsampled.
@ -285,6 +289,44 @@ class AggregatedTimeSerie(TimeSerie):
max_size=max_size, sampling=sampling,
aggregation_method=aggregation_method)
@classmethod
def get_split_key_datetime(cls, timestamp, sampling,
chunk_size=POINTS_PER_SPLIT):
return cls._round_timestamp(timestamp,
freq=sampling * chunk_size * 10e8)
@staticmethod
def _split_key_to_string(timestamp):
return str(utils.datetime_to_unix(timestamp))
@classmethod
def get_split_key(cls, timestamp, sampling, chunk_size=POINTS_PER_SPLIT):
return cls._split_key_to_string(
cls.get_split_key_datetime(
timestamp, sampling, chunk_size))
def split(self, chunk_size=POINTS_PER_SPLIT):
groupby = self.ts.groupby(functools.partial(
self.get_split_key_datetime, sampling=self.sampling,
chunk_size=chunk_size))
keys = sorted(groupby.groups.keys())
for i, ts in enumerate(keys):
if i + 1 == len(keys):
yield self._split_key_to_string(ts), TimeSerie(self.ts[ts:])
elif i + 1 < len(keys):
t = self.ts[ts:keys[i + 1]]
del t[t.index[-1]]
yield self._split_key_to_string(ts), TimeSerie(t)
@classmethod
def from_timeseries(cls, timeseries, sampling=None, max_size=None,
aggregation_method='mean'):
ts = pandas.Series()
for t in timeseries:
ts = ts.combine_first(t.ts)
return cls(ts, sampling=sampling, max_size=max_size,
aggregation_method=aggregation_method)
@property
def sampling(self):
return self._sampling.nanos / 10e8
@ -342,7 +384,7 @@ class AggregatedTimeSerie(TimeSerie):
# the points after `after'
groupedby = self.ts[after:].groupby(
functools.partial(self._round_timestamp,
freq=self._sampling))
freq=self.sampling * 10e8))
agg_func = getattr(groupedby, self.aggregation_method_func_name)
if self.aggregation_method_func_name == 'quantile':
aggregated = agg_func(self.q)
@ -363,7 +405,7 @@ class AggregatedTimeSerie(TimeSerie):
if from_timestamp is None:
from_ = None
else:
from_ = self._round_timestamp(from_timestamp, self._sampling)
from_ = self._round_timestamp(from_timestamp, self.sampling * 10e8)
points = self[from_:to_timestamp]
try:
# Do not include stop timestamp

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import math
import subprocess
import tempfile
@ -843,6 +844,69 @@ class TestAggregatedTimeSerie(base.BaseTestCase):
(pandas.Timestamp('2015-12-03 13:24:15'), 1.0, 10.0),
], output)
def test_split_key(self):
self.assertEqual(
"1420128000.0",
carbonara.AggregatedTimeSerie.get_split_key(
datetime.datetime(2015, 1, 1, 23, 34), 5))
self.assertEqual(
"1420056000.0",
carbonara.AggregatedTimeSerie.get_split_key(
datetime.datetime(2015, 1, 1, 15, 3), 5))
def test_split_key_datetime(self):
self.assertEqual(
datetime.datetime(2014, 5, 10),
carbonara.AggregatedTimeSerie.get_split_key_datetime(
datetime.datetime(2015, 1, 1, 15, 3), 3600))
self.assertEqual(
datetime.datetime(2014, 12, 29, 8),
carbonara.AggregatedTimeSerie.get_split_key_datetime(
datetime.datetime(2015, 1, 1, 15, 3), 58))
def test_split(self):
sampling = 5
points = 100000
ts = carbonara.TimeSerie.from_data(
timestamps=map(datetime.datetime.utcfromtimestamp,
six.moves.range(points)),
values=six.moves.range(points))
agg = carbonara.AggregatedTimeSerie(sampling=sampling)
agg.update(ts)
grouped_points = list(agg.split())
self.assertEqual(
math.ceil((points / float(sampling))
/ carbonara.AggregatedTimeSerie.POINTS_PER_SPLIT),
len(grouped_points))
self.assertEqual("0.0",
grouped_points[0][0])
# 14400 × 5s = 20 hours
self.assertEqual("72000.0",
grouped_points[1][0])
self.assertEqual(carbonara.AggregatedTimeSerie.POINTS_PER_SPLIT,
len(grouped_points[0][1]))
def test_from_timeseries(self):
sampling = 5
points = 100000
ts = carbonara.TimeSerie.from_data(
timestamps=map(datetime.datetime.utcfromtimestamp,
six.moves.range(points)),
values=six.moves.range(points))
agg = carbonara.AggregatedTimeSerie(sampling=sampling)
agg.update(ts)
split = [t[1] for t in list(agg.split())]
self.assertEqual(agg,
carbonara.AggregatedTimeSerie.from_timeseries(
split,
sampling=agg.sampling,
max_size=agg.max_size,
aggregation_method=agg.aggregation_method))
class CarbonaraCmd(base.BaseTestCase):

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import time
import iso8601
from oslo_utils import timeutils
@ -93,3 +94,10 @@ def utcnow():
def datetime_utc(*args):
return datetime.datetime(*args, tzinfo=iso8601.iso8601.UTC)
def datetime_to_unix(timestamp):
return (time.mktime(timestamp.utctimetuple())
+ timestamp.microsecond / 10e5
# mktime() returns for the current timezone
- time.timezone)