share groupings across aggregates

this changes it so we don't compute a completely new series over
and over for each aggregte when in reality, they are all the same.
should save on round_timestamp calculations as well.

Change-Id: I525fe97b2674eaf06c7170dce8d40523f15443da
Closes-Bug: #1621498
Closes-Bug: #1621510
This commit is contained in:
gord chung 2016-09-09 20:37:28 +00:00
parent a4b12efa21
commit 8abc05b22a
3 changed files with 52 additions and 36 deletions

View File

@ -183,11 +183,9 @@ class TimeSerie(object):
def serialize(self):
return msgpack.dumps(self.to_dict())
def aggregate(self, granularity, aggregation_method='mean', max_size=None):
ats = AggregatedTimeSerie(
granularity, aggregation_method, max_size=max_size)
ats.update(self)
return ats
def group_serie(self, granularity, start=None):
return self.ts[start:].groupby(functools.partial(
round_timestamp, freq=granularity * 10e8))
class BoundTimeSerie(TimeSerie):
@ -363,16 +361,8 @@ class AggregatedTimeSerie(TimeSerie):
"""
super(AggregatedTimeSerie, self).__init__(ts)
m = self._AGG_METHOD_PCT_RE.match(aggregation_method)
if m:
self.q = float(m.group(1)) / 100
self.aggregation_method_func_name = 'quantile'
else:
if not hasattr(pandas.core.groupby.SeriesGroupBy,
aggregation_method):
raise UnknownAggregationMethod(aggregation_method)
self.aggregation_method_func_name = aggregation_method
self.aggregation_method_func_name, self.q = self._get_agg_method(
aggregation_method)
self.sampling = self._to_offset(sampling).nanos / 10e8
self.max_size = max_size
@ -387,6 +377,20 @@ class AggregatedTimeSerie(TimeSerie):
ts=pandas.Series(values, timestamps),
max_size=max_size)
@staticmethod
def _get_agg_method(aggregation_method):
q = None
m = AggregatedTimeSerie._AGG_METHOD_PCT_RE.match(aggregation_method)
if m:
q = float(m.group(1)) / 100
aggregation_method_func_name = 'quantile'
else:
if not hasattr(pandas.core.groupby.SeriesGroupBy,
aggregation_method):
raise UnknownAggregationMethod(aggregation_method)
aggregation_method_func_name = aggregation_method
return aggregation_method_func_name, q
def split(self):
groupby = self.ts.groupby(functools.partial(
SplitKey.from_timestamp_and_sampling, sampling=self.sampling))
@ -405,6 +409,15 @@ class AggregatedTimeSerie(TimeSerie):
aggregation_method=aggregation_method,
ts=ts, max_size=max_size)
@classmethod
def from_grouped_serie(cls, grouped_serie, sampling, aggregation_method,
max_size=None):
agg_name, q = cls._get_agg_method(aggregation_method)
return cls(sampling, aggregation_method,
ts=cls._resample_grouped(grouped_serie, agg_name,
q).dropna(),
max_size=max_size)
def __eq__(self, other):
return (isinstance(other, AggregatedTimeSerie)
and super(AggregatedTimeSerie, self).__eq__(other)
@ -538,15 +551,18 @@ class AggregatedTimeSerie(TimeSerie):
groupedby = self.ts[after:].groupby(
functools.partial(round_timestamp,
freq=self.sampling * 10e8))
agg_func = getattr(groupedby, self.aggregation_method_func_name)
if self.aggregation_method_func_name == 'quantile':
aggregated = agg_func(self.q)
else:
aggregated = agg_func()
aggregated = self._resample_grouped(groupedby,
self.aggregation_method_func_name,
self.q)
# Now combine the result with the rest of the point everything
# that is before `after'
self.ts = aggregated.combine_first(self.ts[:after][:-1])
@staticmethod
def _resample_grouped(grouped_serie, agg_name, q=None):
agg_func = getattr(grouped_serie, agg_name)
return agg_func(q) if agg_name == 'quantile' else agg_func()
def fetch(self, from_timestamp=None, to_timestamp=None):
"""Fetch aggregated time value.
@ -578,6 +594,7 @@ class AggregatedTimeSerie(TimeSerie):
self.ts = self.ts.combine_first(ts.ts)
def update(self, ts):
# TODO(gordc): remove this since it's not used
if ts.ts.empty:
return
ts.ts = self.clean_ts(ts.ts)

View File

@ -254,13 +254,12 @@ class CarbonaraBasedStorage(storage.StorageDriver):
data, offset=offset)
def _add_measures(self, aggregation, archive_policy_def,
metric, timeserie,
metric, grouped_serie,
previous_oldest_mutable_timestamp,
oldest_mutable_timestamp):
ts = timeserie.aggregate(
archive_policy_def.granularity,
aggregation,
archive_policy_def.points)
ts = carbonara.AggregatedTimeSerie.from_grouped_serie(
grouped_serie, archive_policy_def.granularity,
aggregation, max_size=archive_policy_def.points)
# Don't do anything if the timeserie is empty
if not ts:
@ -505,16 +504,16 @@ class CarbonaraBasedStorage(storage.StorageDriver):
# affected by new measures for specific granularity
tstamp = max(bound_timeserie.first, measures[0][0])
computed_points['number'] = len(bound_timeserie)
self._map_in_thread(
self._add_measures,
((aggregation, d, metric,
carbonara.TimeSerie(bound_timeserie.ts[
carbonara.round_timestamp(
tstamp, d.granularity * 10e8):]),
current_first_block_timestamp,
bound_timeserie.first_block_timestamp())
for aggregation in agg_methods
for d in metric.archive_policy.definition))
for d in metric.archive_policy.definition:
ts = bound_timeserie.group_serie(
d.granularity, carbonara.round_timestamp(
tstamp, d.granularity * 10e8))
self._map_in_thread(
self._add_measures,
((aggregation, d, metric, ts,
current_first_block_timestamp,
bound_timeserie.first_block_timestamp())
for aggregation in agg_methods))
with timeutils.StopWatch() as sw:
ts.set_values(

View File

@ -181,7 +181,7 @@ class TestStorageDriver(tests_base.TestCase):
self.trigger_processing([str(m.id)])
for __, args, __ in c.mock_calls:
self.assertEqual(
args[3].first, carbonara.round_timestamp(
list(args[3])[0][0], carbonara.round_timestamp(
new_point, args[1].granularity * 10e8))
def test_delete_old_measures(self):