share groupings across aggregates
this changes it so we don't compute a completely new series over and over for each aggregte when in reality, they are all the same. should save on round_timestamp calculations as well. Change-Id: I525fe97b2674eaf06c7170dce8d40523f15443da Closes-Bug: #1621498 Closes-Bug: #1621510
This commit is contained in:
parent
a4b12efa21
commit
8abc05b22a
|
@ -183,11 +183,9 @@ class TimeSerie(object):
|
|||
def serialize(self):
|
||||
return msgpack.dumps(self.to_dict())
|
||||
|
||||
def aggregate(self, granularity, aggregation_method='mean', max_size=None):
|
||||
ats = AggregatedTimeSerie(
|
||||
granularity, aggregation_method, max_size=max_size)
|
||||
ats.update(self)
|
||||
return ats
|
||||
def group_serie(self, granularity, start=None):
|
||||
return self.ts[start:].groupby(functools.partial(
|
||||
round_timestamp, freq=granularity * 10e8))
|
||||
|
||||
|
||||
class BoundTimeSerie(TimeSerie):
|
||||
|
@ -363,16 +361,8 @@ class AggregatedTimeSerie(TimeSerie):
|
|||
"""
|
||||
super(AggregatedTimeSerie, self).__init__(ts)
|
||||
|
||||
m = self._AGG_METHOD_PCT_RE.match(aggregation_method)
|
||||
|
||||
if m:
|
||||
self.q = float(m.group(1)) / 100
|
||||
self.aggregation_method_func_name = 'quantile'
|
||||
else:
|
||||
if not hasattr(pandas.core.groupby.SeriesGroupBy,
|
||||
aggregation_method):
|
||||
raise UnknownAggregationMethod(aggregation_method)
|
||||
self.aggregation_method_func_name = aggregation_method
|
||||
self.aggregation_method_func_name, self.q = self._get_agg_method(
|
||||
aggregation_method)
|
||||
|
||||
self.sampling = self._to_offset(sampling).nanos / 10e8
|
||||
self.max_size = max_size
|
||||
|
@ -387,6 +377,20 @@ class AggregatedTimeSerie(TimeSerie):
|
|||
ts=pandas.Series(values, timestamps),
|
||||
max_size=max_size)
|
||||
|
||||
@staticmethod
|
||||
def _get_agg_method(aggregation_method):
|
||||
q = None
|
||||
m = AggregatedTimeSerie._AGG_METHOD_PCT_RE.match(aggregation_method)
|
||||
if m:
|
||||
q = float(m.group(1)) / 100
|
||||
aggregation_method_func_name = 'quantile'
|
||||
else:
|
||||
if not hasattr(pandas.core.groupby.SeriesGroupBy,
|
||||
aggregation_method):
|
||||
raise UnknownAggregationMethod(aggregation_method)
|
||||
aggregation_method_func_name = aggregation_method
|
||||
return aggregation_method_func_name, q
|
||||
|
||||
def split(self):
|
||||
groupby = self.ts.groupby(functools.partial(
|
||||
SplitKey.from_timestamp_and_sampling, sampling=self.sampling))
|
||||
|
@ -405,6 +409,15 @@ class AggregatedTimeSerie(TimeSerie):
|
|||
aggregation_method=aggregation_method,
|
||||
ts=ts, max_size=max_size)
|
||||
|
||||
@classmethod
|
||||
def from_grouped_serie(cls, grouped_serie, sampling, aggregation_method,
|
||||
max_size=None):
|
||||
agg_name, q = cls._get_agg_method(aggregation_method)
|
||||
return cls(sampling, aggregation_method,
|
||||
ts=cls._resample_grouped(grouped_serie, agg_name,
|
||||
q).dropna(),
|
||||
max_size=max_size)
|
||||
|
||||
def __eq__(self, other):
|
||||
return (isinstance(other, AggregatedTimeSerie)
|
||||
and super(AggregatedTimeSerie, self).__eq__(other)
|
||||
|
@ -538,15 +551,18 @@ class AggregatedTimeSerie(TimeSerie):
|
|||
groupedby = self.ts[after:].groupby(
|
||||
functools.partial(round_timestamp,
|
||||
freq=self.sampling * 10e8))
|
||||
agg_func = getattr(groupedby, self.aggregation_method_func_name)
|
||||
if self.aggregation_method_func_name == 'quantile':
|
||||
aggregated = agg_func(self.q)
|
||||
else:
|
||||
aggregated = agg_func()
|
||||
aggregated = self._resample_grouped(groupedby,
|
||||
self.aggregation_method_func_name,
|
||||
self.q)
|
||||
# Now combine the result with the rest of the point – everything
|
||||
# that is before `after'
|
||||
self.ts = aggregated.combine_first(self.ts[:after][:-1])
|
||||
|
||||
@staticmethod
|
||||
def _resample_grouped(grouped_serie, agg_name, q=None):
|
||||
agg_func = getattr(grouped_serie, agg_name)
|
||||
return agg_func(q) if agg_name == 'quantile' else agg_func()
|
||||
|
||||
def fetch(self, from_timestamp=None, to_timestamp=None):
|
||||
"""Fetch aggregated time value.
|
||||
|
||||
|
@ -578,6 +594,7 @@ class AggregatedTimeSerie(TimeSerie):
|
|||
self.ts = self.ts.combine_first(ts.ts)
|
||||
|
||||
def update(self, ts):
|
||||
# TODO(gordc): remove this since it's not used
|
||||
if ts.ts.empty:
|
||||
return
|
||||
ts.ts = self.clean_ts(ts.ts)
|
||||
|
|
|
@ -254,13 +254,12 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
data, offset=offset)
|
||||
|
||||
def _add_measures(self, aggregation, archive_policy_def,
|
||||
metric, timeserie,
|
||||
metric, grouped_serie,
|
||||
previous_oldest_mutable_timestamp,
|
||||
oldest_mutable_timestamp):
|
||||
ts = timeserie.aggregate(
|
||||
archive_policy_def.granularity,
|
||||
aggregation,
|
||||
archive_policy_def.points)
|
||||
ts = carbonara.AggregatedTimeSerie.from_grouped_serie(
|
||||
grouped_serie, archive_policy_def.granularity,
|
||||
aggregation, max_size=archive_policy_def.points)
|
||||
|
||||
# Don't do anything if the timeserie is empty
|
||||
if not ts:
|
||||
|
@ -505,16 +504,16 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
# affected by new measures for specific granularity
|
||||
tstamp = max(bound_timeserie.first, measures[0][0])
|
||||
computed_points['number'] = len(bound_timeserie)
|
||||
self._map_in_thread(
|
||||
self._add_measures,
|
||||
((aggregation, d, metric,
|
||||
carbonara.TimeSerie(bound_timeserie.ts[
|
||||
carbonara.round_timestamp(
|
||||
tstamp, d.granularity * 10e8):]),
|
||||
current_first_block_timestamp,
|
||||
bound_timeserie.first_block_timestamp())
|
||||
for aggregation in agg_methods
|
||||
for d in metric.archive_policy.definition))
|
||||
for d in metric.archive_policy.definition:
|
||||
ts = bound_timeserie.group_serie(
|
||||
d.granularity, carbonara.round_timestamp(
|
||||
tstamp, d.granularity * 10e8))
|
||||
self._map_in_thread(
|
||||
self._add_measures,
|
||||
((aggregation, d, metric, ts,
|
||||
current_first_block_timestamp,
|
||||
bound_timeserie.first_block_timestamp())
|
||||
for aggregation in agg_methods))
|
||||
|
||||
with timeutils.StopWatch() as sw:
|
||||
ts.set_values(
|
||||
|
|
|
@ -181,7 +181,7 @@ class TestStorageDriver(tests_base.TestCase):
|
|||
self.trigger_processing([str(m.id)])
|
||||
for __, args, __ in c.mock_calls:
|
||||
self.assertEqual(
|
||||
args[3].first, carbonara.round_timestamp(
|
||||
list(args[3])[0][0], carbonara.round_timestamp(
|
||||
new_point, args[1].granularity * 10e8))
|
||||
|
||||
def test_delete_old_measures(self):
|
||||
|
|
Loading…
Reference in New Issue