Merge "carbonara: reduce the number of array copy"

This commit is contained in:
Jenkins 2017-05-19 15:45:01 +00:00 committed by Gerrit Code Review
commit 4317d3a890
1 changed files with 13 additions and 13 deletions

View File

@ -94,7 +94,7 @@ class GroupedTimeSeries(object):
# we always assume the orderd to be the same as the input. # we always assume the orderd to be the same as the input.
freq = granularity * 10e8 freq = granularity * 10e8
self._ts = ts self._ts = ts
self.indexes = (numpy.array(ts.index, 'float') // freq) * freq self.indexes = (numpy.array(ts.index, numpy.float) // freq) * freq
self.tstamps, self.counts = numpy.unique(self.indexes, self.tstamps, self.counts = numpy.unique(self.indexes,
return_counts=True) return_counts=True)
@ -124,7 +124,7 @@ class GroupedTimeSeries(object):
default=None) default=None)
def _count(self): def _count(self):
timestamps = numpy.array(self.tstamps, 'datetime64[ns]') timestamps = self.tstamps.astype('datetime64[ns]', copy=False)
return (self.counts, timestamps) return (self.counts, timestamps)
def count(self): def count(self):
@ -163,7 +163,7 @@ class GroupedTimeSeries(object):
values = method(self._ts.values, self.indexes, tstamps, values = method(self._ts.values, self.indexes, tstamps,
*args, **kwargs) *args, **kwargs)
timestamps = numpy.array(tstamps, 'datetime64[ns]') timestamps = tstamps.astype('datetime64[ns]', copy=False)
return pandas.Series(values, pandas.to_datetime(timestamps)) return pandas.Series(values, pandas.to_datetime(timestamps))
@ -330,7 +330,7 @@ class BoundTimeSerie(TimeSerie):
:nb_points*cls._SERIALIZATION_TIMESTAMP_LEN] :nb_points*cls._SERIALIZATION_TIMESTAMP_LEN]
timestamps = numpy.frombuffer(timestamps_raw, dtype='<Q') timestamps = numpy.frombuffer(timestamps_raw, dtype='<Q')
timestamps = numpy.cumsum(timestamps) timestamps = numpy.cumsum(timestamps)
timestamps = numpy.array(timestamps, dtype='datetime64[ns]') timestamps = timestamps.astype(dtype='datetime64[ns]', copy=False)
values_raw = uncompressed[nb_points*cls._SERIALIZATION_TIMESTAMP_LEN:] values_raw = uncompressed[nb_points*cls._SERIALIZATION_TIMESTAMP_LEN:]
values = numpy.frombuffer(values_raw, dtype='<d') values = numpy.frombuffer(values_raw, dtype='<d')
@ -345,8 +345,8 @@ class BoundTimeSerie(TimeSerie):
# NOTE(jd) Use a double delta encoding for timestamps # NOTE(jd) Use a double delta encoding for timestamps
timestamps = numpy.insert(numpy.diff(self.ts.index), timestamps = numpy.insert(numpy.diff(self.ts.index),
0, self.first.value) 0, self.first.value)
timestamps = numpy.array(timestamps, dtype='<Q') timestamps = timestamps.astype('<Q', copy=False)
values = numpy.array(self.ts.values, dtype='<d') values = self.ts.values.astype('<d', copy=False)
payload = (timestamps.tobytes() + values.tobytes()) payload = (timestamps.tobytes() + values.tobytes())
return self._compress(payload) return self._compress(payload)
@ -549,7 +549,7 @@ class AggregatedTimeSerie(TimeSerie):
# but we have ordered timestamps, so don't need # but we have ordered timestamps, so don't need
# to iter the whole series. # to iter the whole series.
freq = self.sampling * SplitKey.POINTS_PER_SPLIT freq = self.sampling * SplitKey.POINTS_PER_SPLIT
ix = numpy.array(self.ts.index, 'float64') / 10e8 ix = numpy.array(self.ts.index, numpy.float64) / 10e8
keys, counts = numpy.unique((ix // freq) * freq, return_counts=True) keys, counts = numpy.unique((ix // freq) * freq, return_counts=True)
start = 0 start = 0
for key, count in six.moves.zip(keys, counts): for key, count in six.moves.zip(keys, counts):
@ -636,8 +636,8 @@ class AggregatedTimeSerie(TimeSerie):
y = index * sampling + start y = index * sampling + start
x = everything['v'][index] x = everything['v'][index]
y = numpy.array(y, dtype='float64') * 10e8 y = y.astype(numpy.float64, copy=False) * 10e8
y = numpy.array(y, dtype='datetime64[ns]') y = y.astype('datetime64[ns]', copy=False)
y = pandas.to_datetime(y) y = pandas.to_datetime(y)
return cls.from_data(sampling, agg_method, y, x) return cls.from_data(sampling, agg_method, y, x)
@ -682,8 +682,8 @@ class AggregatedTimeSerie(TimeSerie):
timestamps = numpy.insert( timestamps = numpy.insert(
numpy.diff(self.ts.index) // offset_div, numpy.diff(self.ts.index) // offset_div,
0, int((self.first.value - start) // offset_div)) 0, int((self.first.value - start) // offset_div))
timestamps = numpy.array(timestamps, dtype='<H') timestamps = timestamps.astype('<H', copy=False)
values = numpy.array(self.ts.values, dtype='<d') values = self.ts.values.astype('<d', copy=False)
payload = (timestamps.tobytes() + values.tobytes()) payload = (timestamps.tobytes() + values.tobytes())
return None, b"c" + self._compress(payload) return None, b"c" + self._compress(payload)
# NOTE(gordc): this binary serializes series based on the split # NOTE(gordc): this binary serializes series based on the split
@ -700,14 +700,14 @@ class AggregatedTimeSerie(TimeSerie):
locs = (numpy.cumsum(numpy.diff(self.ts.index)) // offset_div) locs = (numpy.cumsum(numpy.diff(self.ts.index)) // offset_div)
locs = numpy.insert(locs, 0, 0) locs = numpy.insert(locs, 0, 0)
locs = numpy.array(locs, dtype='int') locs = locs.astype(numpy.int, copy=False)
# Fill everything with zero # Fill everything with zero
serial_dtype = [('b', '<?'), ('v', '<d')] serial_dtype = [('b', '<?'), ('v', '<d')]
serial = numpy.zeros((e_offset,), dtype=serial_dtype) serial = numpy.zeros((e_offset,), dtype=serial_dtype)
# Create a structured array with two dimensions # Create a structured array with two dimensions
values = numpy.array(self.ts.values, dtype='<d') values = self.ts.values.astype(dtype='<d', copy=False)
ones = numpy.ones_like(values, dtype='<?') ones = numpy.ones_like(values, dtype='<?')
values = numpy.core.records.fromarrays((ones, values), values = numpy.core.records.fromarrays((ones, values),
dtype=serial_dtype) dtype=serial_dtype)