Merge "carbonara: replace msgpack encoding with struct for new measures"

This commit is contained in:
Jenkins 2016-09-20 15:25:48 +00:00 committed by Gerrit Code Review
commit 4dbb1134cb
8 changed files with 132 additions and 114 deletions

View File

@ -91,12 +91,17 @@ Upgrading
In order to upgrade from a previous version of Gnocchi, you need to make sure
that your indexer and storage are properly upgraded. Run the following:
1. Stop the old version of Gnocchi API server and metric daemon
1. Stop the old version of Gnocchi API server and `gnocchi-statsd` daemon
2. Install the new version of Gnocchi
2. Make sure that the processing backlog is empty (`gnocchi status`)
2. Run `gnocchi-upgrade`
3. Stop the old version of `gnocchi-metricd` daemon
4. Install the new version of Gnocchi
5. Run `gnocchi-upgrade`
This can take several hours depending on the size of your index and
storage.
3. Start the new Gnocchi API server and metric daemon
6. Start the new Gnocchi API server, `gnocchi-metricd`
and `gnocchi-statsd` daemons

View File

@ -16,7 +16,6 @@
import operator
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
from stevedore import driver
from gnocchi import exceptions
@ -46,7 +45,7 @@ LOG = log.getLogger(__name__)
class Measure(object):
def __init__(self, timestamp, value):
self.timestamp = timeutils.normalize_time(timestamp)
self.timestamp = timestamp
self.value = value
def __iter__(self):

View File

@ -18,6 +18,7 @@ import collections
import datetime
import itertools
import operator
import struct
import uuid
from concurrent import futures
@ -25,9 +26,10 @@ import iso8601
import msgpack
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import msgpackutils
from oslo_utils import timeutils
import pandas
import six
import six.moves
from tooz import coordination
from gnocchi import carbonara
@ -329,8 +331,16 @@ class CarbonaraBasedStorage(storage.StorageDriver):
oldest_mutable_timestamp)
def add_measures(self, metric, measures):
self._store_new_measures(metric, msgpackutils.dumps(
list(map(tuple, measures))))
measures = list(measures)
data = struct.pack(
"<" + self._MEASURE_SERIAL_FORMAT * len(measures),
*list(
itertools.chain(
# NOTE(jd) int(10e8) to avoid rounding errors
*((int(utils.datetime_to_unix(timestamp) * int(10e8)),
value)
for timestamp, value in measures))))
self._store_new_measures(metric, data)
@staticmethod
def _store_new_measures(metric, data):
@ -359,9 +369,16 @@ class CarbonaraBasedStorage(storage.StorageDriver):
aggregation, granularity, version=3):
raise NotImplementedError
@staticmethod
def _unserialize_measures(data):
return msgpackutils.loads(data)
_MEASURE_SERIAL_FORMAT = "Qd"
_MEASURE_SERIAL_LEN = struct.calcsize(_MEASURE_SERIAL_FORMAT)
def _unserialize_measures(self, data):
nb_measures = len(data) // self._MEASURE_SERIAL_LEN
measures = struct.unpack(
"<" + self._MEASURE_SERIAL_FORMAT * nb_measures, data)
return six.moves.zip(
pandas.to_datetime(measures[::2], unit='ns'),
itertools.islice(measures, 1, len(measures), 2))
def measures_report(self, details=True):
metrics, measures, full_details = self._build_report(details)

View File

@ -141,8 +141,8 @@ class TestCarbonaraMigration(tests_base.TestCase):
self.storage.get_measures, self.metric, aggregation='max')
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2016, 7, 18), 69),
storage.Measure(datetime.datetime(2016, 7, 18, 1, 1), 64),
storage.Measure(utils.datetime_utc(2016, 7, 18), 69),
storage.Measure(utils.datetime_utc(2016, 7, 18, 1, 1), 64),
])
with mock.patch.object(self.index, 'list_metrics') as f:

View File

@ -58,7 +58,7 @@ class TestAggregates(tests_base.TestCase):
def _test_create_metric_and_data(self, data, spacing):
metric = storage.Metric(
uuid.uuid4(), self.archive_policies['medium'])
start_time = datetime.datetime(2014, 1, 1, 12)
start_time = utils.datetime_utc(2014, 1, 1, 12)
incr = datetime.timedelta(seconds=spacing)
measures = [storage.Measure(start_time + incr * n, val)
for n, val in enumerate(data)]

View File

@ -1,5 +1,6 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2016 Red Hat, Inc.
# Copyright © 2015 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -13,11 +14,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import uuid
import mock
from oslo_utils import timeutils
from gnocchi import indexer
from gnocchi import statsd
@ -52,11 +51,11 @@ class TestStatsd(tests_base.TestCase):
def test_flush_empty(self):
self.server.stats.flush()
@mock.patch.object(timeutils, 'utcnow')
@mock.patch.object(utils, 'utcnow')
def _test_gauge_or_ms(self, metric_type, utcnow):
metric_name = "test_gauge_or_ms"
metric_key = metric_name + "|" + metric_type
utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 58, 36)
utcnow.return_value = utils.datetime_utc(2015, 1, 7, 13, 58, 36)
self.server.datagram_received(
("%s:1|%s" % (metric_name, metric_type)).encode('ascii'),
("127.0.0.1", 12345))
@ -78,7 +77,7 @@ class TestStatsd(tests_base.TestCase):
(utils.datetime_utc(2015, 1, 7, 13, 58), 60.0, 1.0)
], measures)
utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 59, 37)
utcnow.return_value = utils.datetime_utc(2015, 1, 7, 13, 59, 37)
# This one is going to be ignored
self.server.datagram_received(
("%s:45|%s" % (metric_name, metric_type)).encode('ascii'),
@ -105,11 +104,11 @@ class TestStatsd(tests_base.TestCase):
def test_ms(self):
self._test_gauge_or_ms("ms")
@mock.patch.object(timeutils, 'utcnow')
@mock.patch.object(utils, 'utcnow')
def test_counter(self, utcnow):
metric_name = "test_counter"
metric_key = metric_name + "|c"
utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 58, 36)
utcnow.return_value = utils.datetime_utc(2015, 1, 7, 13, 58, 36)
self.server.datagram_received(
("%s:1|c" % metric_name).encode('ascii'),
("127.0.0.1", 12345))
@ -130,7 +129,7 @@ class TestStatsd(tests_base.TestCase):
(utils.datetime_utc(2015, 1, 7, 13), 3600.0, 1.0),
(utils.datetime_utc(2015, 1, 7, 13, 58), 60.0, 1.0)], measures)
utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 59, 37)
utcnow.return_value = utils.datetime_utc(2015, 1, 7, 13, 59, 37)
self.server.datagram_received(
("%s:45|c" % metric_name).encode('ascii'),
("127.0.0.1", 12345))

View File

@ -58,12 +58,12 @@ class TestStorageDriver(tests_base.TestCase):
self.skipTest("This driver is not based on Carbonara")
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
])
self.trigger_processing()
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 13, 0, 1), 1),
storage.Measure(utils.datetime_utc(2014, 1, 1, 13, 0, 1), 1),
])
with mock.patch('gnocchi.carbonara.AggregatedTimeSerie.unserialize',
@ -82,7 +82,7 @@ class TestStorageDriver(tests_base.TestCase):
None, None, full=True)
self.assertEqual(set(), metrics)
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
])
metrics = self.storage.list_metric_with_measures_to_process(
None, None, full=True)
@ -94,7 +94,7 @@ class TestStorageDriver(tests_base.TestCase):
def test_delete_nonempty_metric(self):
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
])
self.trigger_processing()
self.storage.delete_metric(self.metric, sync=True)
@ -102,14 +102,14 @@ class TestStorageDriver(tests_base.TestCase):
def test_delete_nonempty_metric_unprocessed(self):
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
])
self.storage.delete_metric(self.metric, sync=True)
self.trigger_processing()
def test_delete_expunge_metric(self):
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
])
self.trigger_processing()
self.index.delete_metric(self.metric.id)
@ -135,7 +135,7 @@ class TestStorageDriver(tests_base.TestCase):
def test_add_measures_big(self):
m, __ = self._create_metric('high')
self.storage.add_measures(m, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, i, j), 100)
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, i, j), 100)
for i in six.moves.range(0, 60) for j in six.moves.range(0, 60)])
self.trigger_processing([str(m.id)])
@ -145,14 +145,14 @@ class TestStorageDriver(tests_base.TestCase):
def test_add_measures_update_subset_split(self):
m, m_sql = self._create_metric('medium')
measures = [
storage.Measure(datetime.datetime(2014, 1, 6, i, j, 0), 100)
storage.Measure(utils.datetime_utc(2014, 1, 6, i, j, 0), 100)
for i in six.moves.range(2) for j in six.moves.range(0, 60, 2)]
self.storage.add_measures(m, measures)
self.trigger_processing([str(m.id)])
# add measure to end, in same aggregate time as last point.
self.storage.add_measures(m, [
storage.Measure(datetime.datetime(2014, 1, 6, 1, 58, 1), 100)])
storage.Measure(utils.datetime_utc(2014, 1, 6, 1, 58, 1), 100)])
with mock.patch.object(self.storage, '_store_metric_measures') as c:
# should only resample last aggregate
@ -168,13 +168,13 @@ class TestStorageDriver(tests_base.TestCase):
def test_add_measures_update_subset(self):
m, m_sql = self._create_metric('medium')
measures = [
storage.Measure(datetime.datetime(2014, 1, 6, i, j, 0), 100)
storage.Measure(utils.datetime_utc(2014, 1, 6, i, j, 0), 100)
for i in six.moves.range(2) for j in six.moves.range(0, 60, 2)]
self.storage.add_measures(m, measures)
self.trigger_processing([str(m.id)])
# add measure to end, in same aggregate time as last point.
new_point = datetime.datetime(2014, 1, 6, 1, 58, 1)
new_point = utils.datetime_utc(2014, 1, 6, 1, 58, 1)
self.storage.add_measures(m, [storage.Measure(new_point, 100)])
with mock.patch.object(self.storage, '_add_measures') as c:
@ -186,10 +186,10 @@ class TestStorageDriver(tests_base.TestCase):
def test_delete_old_measures(self):
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44),
])
self.trigger_processing()
@ -203,7 +203,7 @@ class TestStorageDriver(tests_base.TestCase):
# One year later…
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2015, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2015, 1, 1, 12, 0, 1), 69),
])
self.trigger_processing()
@ -237,10 +237,10 @@ class TestStorageDriver(tests_base.TestCase):
# First store some points scattered across different splits
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2016, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2016, 1, 2, 13, 7, 31), 42),
storage.Measure(datetime.datetime(2016, 1, 4, 14, 9, 31), 4),
storage.Measure(datetime.datetime(2016, 1, 6, 15, 12, 45), 44),
storage.Measure(utils.datetime_utc(2016, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2016, 1, 2, 13, 7, 31), 42),
storage.Measure(utils.datetime_utc(2016, 1, 4, 14, 9, 31), 4),
storage.Measure(utils.datetime_utc(2016, 1, 6, 15, 12, 45), 44),
])
self.trigger_processing()
@ -277,8 +277,8 @@ class TestStorageDriver(tests_base.TestCase):
# the BoundTimeSerie processing timeserie far away from its current
# range.
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2016, 1, 10, 16, 18, 45), 45),
storage.Measure(datetime.datetime(2016, 1, 10, 17, 12, 45), 46),
storage.Measure(utils.datetime_utc(2016, 1, 10, 16, 18, 45), 45),
storage.Measure(utils.datetime_utc(2016, 1, 10, 17, 12, 45), 46),
])
self.trigger_processing()
@ -312,8 +312,8 @@ class TestStorageDriver(tests_base.TestCase):
def test_updated_measures(self):
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42),
])
self.trigger_processing()
@ -325,8 +325,8 @@ class TestStorageDriver(tests_base.TestCase):
], self.storage.get_measures(self.metric))
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44),
])
self.trigger_processing()
@ -356,10 +356,10 @@ class TestStorageDriver(tests_base.TestCase):
def test_add_and_get_measures(self):
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44),
])
self.trigger_processing()
@ -446,10 +446,10 @@ class TestStorageDriver(tests_base.TestCase):
def test_get_measure_unknown_aggregation(self):
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44),
])
self.assertRaises(storage.AggregationDoesNotExist,
self.storage.get_measures,
@ -459,16 +459,16 @@ class TestStorageDriver(tests_base.TestCase):
metric2 = storage.Metric(uuid.uuid4(),
self.archive_policies['low'])
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44),
])
self.storage.add_measures(metric2, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44),
])
self.assertRaises(storage.AggregationDoesNotExist,
self.storage.get_cross_metric_measures,
@ -479,16 +479,16 @@ class TestStorageDriver(tests_base.TestCase):
metric2 = storage.Metric(uuid.uuid4(),
self.archive_policies['low'])
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44),
])
self.storage.add_measures(metric2, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44),
])
self.assertRaises(storage.GranularityDoesNotExist,
self.storage.get_cross_metric_measures,
@ -499,16 +499,16 @@ class TestStorageDriver(tests_base.TestCase):
metric2 = storage.Metric(uuid.uuid4(),
self.archive_policies['no_granularity_match'])
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44),
])
self.storage.add_measures(metric2, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44),
])
self.assertRaises(storage.MetricUnaggregatable,
@ -518,16 +518,16 @@ class TestStorageDriver(tests_base.TestCase):
def test_add_and_get_cross_metric_measures(self):
metric2, __ = self._create_metric()
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44),
])
self.storage.add_measures(metric2, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 5), 9),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 41), 2),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 10, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 13, 10), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 5), 9),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 41), 2),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 10, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 13, 10), 4),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
@ -603,17 +603,17 @@ class TestStorageDriver(tests_base.TestCase):
def test_add_and_get_cross_metric_measures_with_holes(self):
metric2, __ = self._create_metric()
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 5, 31), 8),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 5, 31), 8),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 42),
])
self.storage.add_measures(metric2, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 5), 9),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 2),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 6),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 13, 10), 2),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 5), 9),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 2),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 6),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 13, 10), 2),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
@ -629,18 +629,18 @@ class TestStorageDriver(tests_base.TestCase):
def test_search_value(self):
metric2, __ = self._create_metric()
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1,), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 5, 31), 8),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1,), 69),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 5, 31), 8),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 42),
])
self.storage.add_measures(metric2, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 5), 9),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 2),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 6),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 13, 10), 2),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 5), 9),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 2),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 6),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 13, 10), 2),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
@ -671,9 +671,9 @@ class TestStorageDriver(tests_base.TestCase):
str(uuid.uuid4()), name)
m = self.index.list_metrics(ids=[m.id])[0]
self.storage.add_measures(m, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 0), 1),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 5), 1),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 10), 1),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 0), 1),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 5), 1),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 10), 1),
])
self.trigger_processing([str(m.id)])
self.assertEqual([
@ -686,7 +686,7 @@ class TestStorageDriver(tests_base.TestCase):
name, [archive_policy.ArchivePolicyItem(granularity=5, points=6)])
m = self.index.list_metrics(ids=[m.id])[0]
self.storage.add_measures(m, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 15), 1),
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 15), 1),
])
self.trigger_processing([str(m.id)])
self.assertEqual([

View File

@ -13,7 +13,6 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import random
from concurrent import futures
@ -45,8 +44,7 @@ def injector():
def todo(metric):
for _ in six.moves.range(conf.batch_of_measures):
measures = [
storage.Measure(utils.to_timestamp(datetime.datetime.now()),
random.random())
storage.Measure(utils.utcnow(), random.random())
for __ in six.moves.range(conf.measures_per_batch)]
s.add_measures(metric, measures)