From ec055f58bacf6be8c4b5b6eb448a0195c5ba5ae1 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Fri, 16 Sep 2016 15:49:53 +0200 Subject: [PATCH] carbonara: replace msgpack encoding with struct for new measures This replaces the msgpack encoding that was use to store new measures to a lighter struct one. oslo_serialization.msgpackutils actually makes a pretty big encoding of the `datetime.datetime' object, whereas having just a long integer is enough in our case. Change-Id: If89e3d740a400a912c79e5731fb7907f97d2fe42 --- doc/source/install.rst | 13 +- gnocchi/storage/__init__.py | 3 +- gnocchi/storage/_carbonara.py | 29 +++- gnocchi/tests/storage/test_carbonara.py | 4 +- gnocchi/tests/test_aggregates.py | 2 +- gnocchi/tests/test_statsd.py | 15 +- gnocchi/tests/test_storage.py | 176 ++++++++++++------------ tools/measures_injector.py | 4 +- 8 files changed, 132 insertions(+), 114 deletions(-) diff --git a/doc/source/install.rst b/doc/source/install.rst index 6cf36d1e..6a5f5a87 100644 --- a/doc/source/install.rst +++ b/doc/source/install.rst @@ -91,12 +91,17 @@ Upgrading In order to upgrade from a previous version of Gnocchi, you need to make sure that your indexer and storage are properly upgraded. Run the following: -1. Stop the old version of Gnocchi API server and metric daemon +1. Stop the old version of Gnocchi API server and `gnocchi-statsd` daemon -2. Install the new version of Gnocchi +2. Make sure that the processing backlog is empty (`gnocchi status`) -2. Run `gnocchi-upgrade` +3. Stop the old version of `gnocchi-metricd` daemon + +4. Install the new version of Gnocchi + +5. Run `gnocchi-upgrade` This can take several hours depending on the size of your index and storage. -3. Start the new Gnocchi API server and metric daemon +6. Start the new Gnocchi API server, `gnocchi-metricd` + and `gnocchi-statsd` daemons diff --git a/gnocchi/storage/__init__.py b/gnocchi/storage/__init__.py index 80ed0ae0..420b55b6 100644 --- a/gnocchi/storage/__init__.py +++ b/gnocchi/storage/__init__.py @@ -16,7 +16,6 @@ import operator from oslo_config import cfg from oslo_log import log -from oslo_utils import timeutils from stevedore import driver from gnocchi import exceptions @@ -46,7 +45,7 @@ LOG = log.getLogger(__name__) class Measure(object): def __init__(self, timestamp, value): - self.timestamp = timeutils.normalize_time(timestamp) + self.timestamp = timestamp self.value = value def __iter__(self): diff --git a/gnocchi/storage/_carbonara.py b/gnocchi/storage/_carbonara.py index 8b2338c0..ab9f1702 100644 --- a/gnocchi/storage/_carbonara.py +++ b/gnocchi/storage/_carbonara.py @@ -18,6 +18,7 @@ import collections import datetime import itertools import operator +import struct import uuid from concurrent import futures @@ -25,9 +26,10 @@ import iso8601 import msgpack from oslo_config import cfg from oslo_log import log -from oslo_serialization import msgpackutils from oslo_utils import timeutils +import pandas import six +import six.moves from tooz import coordination from gnocchi import carbonara @@ -329,8 +331,16 @@ class CarbonaraBasedStorage(storage.StorageDriver): oldest_mutable_timestamp) def add_measures(self, metric, measures): - self._store_new_measures(metric, msgpackutils.dumps( - list(map(tuple, measures)))) + measures = list(measures) + data = struct.pack( + "<" + self._MEASURE_SERIAL_FORMAT * len(measures), + *list( + itertools.chain( + # NOTE(jd) int(10e8) to avoid rounding errors + *((int(utils.datetime_to_unix(timestamp) * int(10e8)), + value) + for timestamp, value in measures)))) + self._store_new_measures(metric, data) @staticmethod def _store_new_measures(metric, data): @@ -359,9 +369,16 @@ class CarbonaraBasedStorage(storage.StorageDriver): aggregation, granularity, version=3): raise NotImplementedError - @staticmethod - def _unserialize_measures(data): - return msgpackutils.loads(data) + _MEASURE_SERIAL_FORMAT = "Qd" + _MEASURE_SERIAL_LEN = struct.calcsize(_MEASURE_SERIAL_FORMAT) + + def _unserialize_measures(self, data): + nb_measures = len(data) // self._MEASURE_SERIAL_LEN + measures = struct.unpack( + "<" + self._MEASURE_SERIAL_FORMAT * nb_measures, data) + return six.moves.zip( + pandas.to_datetime(measures[::2], unit='ns'), + itertools.islice(measures, 1, len(measures), 2)) def measures_report(self, details=True): metrics, measures, full_details = self._build_report(details) diff --git a/gnocchi/tests/storage/test_carbonara.py b/gnocchi/tests/storage/test_carbonara.py index 58717c80..d8ea6159 100644 --- a/gnocchi/tests/storage/test_carbonara.py +++ b/gnocchi/tests/storage/test_carbonara.py @@ -141,8 +141,8 @@ class TestCarbonaraMigration(tests_base.TestCase): self.storage.get_measures, self.metric, aggregation='max') self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2016, 7, 18), 69), - storage.Measure(datetime.datetime(2016, 7, 18, 1, 1), 64), + storage.Measure(utils.datetime_utc(2016, 7, 18), 69), + storage.Measure(utils.datetime_utc(2016, 7, 18, 1, 1), 64), ]) with mock.patch.object(self.index, 'list_metrics') as f: diff --git a/gnocchi/tests/test_aggregates.py b/gnocchi/tests/test_aggregates.py index c4c79015..c07df7ef 100644 --- a/gnocchi/tests/test_aggregates.py +++ b/gnocchi/tests/test_aggregates.py @@ -58,7 +58,7 @@ class TestAggregates(tests_base.TestCase): def _test_create_metric_and_data(self, data, spacing): metric = storage.Metric( uuid.uuid4(), self.archive_policies['medium']) - start_time = datetime.datetime(2014, 1, 1, 12) + start_time = utils.datetime_utc(2014, 1, 1, 12) incr = datetime.timedelta(seconds=spacing) measures = [storage.Measure(start_time + incr * n, val) for n, val in enumerate(data)] diff --git a/gnocchi/tests/test_statsd.py b/gnocchi/tests/test_statsd.py index 4d820bc0..7531e25c 100644 --- a/gnocchi/tests/test_statsd.py +++ b/gnocchi/tests/test_statsd.py @@ -1,5 +1,6 @@ # -*- encoding: utf-8 -*- # +# Copyright © 2016 Red Hat, Inc. # Copyright © 2015 eNovance # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -13,11 +14,9 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import datetime import uuid import mock -from oslo_utils import timeutils from gnocchi import indexer from gnocchi import statsd @@ -52,11 +51,11 @@ class TestStatsd(tests_base.TestCase): def test_flush_empty(self): self.server.stats.flush() - @mock.patch.object(timeutils, 'utcnow') + @mock.patch.object(utils, 'utcnow') def _test_gauge_or_ms(self, metric_type, utcnow): metric_name = "test_gauge_or_ms" metric_key = metric_name + "|" + metric_type - utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 58, 36) + utcnow.return_value = utils.datetime_utc(2015, 1, 7, 13, 58, 36) self.server.datagram_received( ("%s:1|%s" % (metric_name, metric_type)).encode('ascii'), ("127.0.0.1", 12345)) @@ -78,7 +77,7 @@ class TestStatsd(tests_base.TestCase): (utils.datetime_utc(2015, 1, 7, 13, 58), 60.0, 1.0) ], measures) - utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 59, 37) + utcnow.return_value = utils.datetime_utc(2015, 1, 7, 13, 59, 37) # This one is going to be ignored self.server.datagram_received( ("%s:45|%s" % (metric_name, metric_type)).encode('ascii'), @@ -105,11 +104,11 @@ class TestStatsd(tests_base.TestCase): def test_ms(self): self._test_gauge_or_ms("ms") - @mock.patch.object(timeutils, 'utcnow') + @mock.patch.object(utils, 'utcnow') def test_counter(self, utcnow): metric_name = "test_counter" metric_key = metric_name + "|c" - utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 58, 36) + utcnow.return_value = utils.datetime_utc(2015, 1, 7, 13, 58, 36) self.server.datagram_received( ("%s:1|c" % metric_name).encode('ascii'), ("127.0.0.1", 12345)) @@ -130,7 +129,7 @@ class TestStatsd(tests_base.TestCase): (utils.datetime_utc(2015, 1, 7, 13), 3600.0, 1.0), (utils.datetime_utc(2015, 1, 7, 13, 58), 60.0, 1.0)], measures) - utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 59, 37) + utcnow.return_value = utils.datetime_utc(2015, 1, 7, 13, 59, 37) self.server.datagram_received( ("%s:45|c" % metric_name).encode('ascii'), ("127.0.0.1", 12345)) diff --git a/gnocchi/tests/test_storage.py b/gnocchi/tests/test_storage.py index a0460972..7ab25375 100644 --- a/gnocchi/tests/test_storage.py +++ b/gnocchi/tests/test_storage.py @@ -58,12 +58,12 @@ class TestStorageDriver(tests_base.TestCase): self.skipTest("This driver is not based on Carbonara") self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), ]) self.trigger_processing() self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 13, 0, 1), 1), + storage.Measure(utils.datetime_utc(2014, 1, 1, 13, 0, 1), 1), ]) with mock.patch('gnocchi.carbonara.AggregatedTimeSerie.unserialize', @@ -82,7 +82,7 @@ class TestStorageDriver(tests_base.TestCase): None, None, full=True) self.assertEqual(set(), metrics) self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), ]) metrics = self.storage.list_metric_with_measures_to_process( None, None, full=True) @@ -94,7 +94,7 @@ class TestStorageDriver(tests_base.TestCase): def test_delete_nonempty_metric(self): self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), ]) self.trigger_processing() self.storage.delete_metric(self.metric, sync=True) @@ -102,14 +102,14 @@ class TestStorageDriver(tests_base.TestCase): def test_delete_nonempty_metric_unprocessed(self): self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), ]) self.storage.delete_metric(self.metric, sync=True) self.trigger_processing() def test_delete_expunge_metric(self): self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), ]) self.trigger_processing() self.index.delete_metric(self.metric.id) @@ -135,7 +135,7 @@ class TestStorageDriver(tests_base.TestCase): def test_add_measures_big(self): m, __ = self._create_metric('high') self.storage.add_measures(m, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, i, j), 100) + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, i, j), 100) for i in six.moves.range(0, 60) for j in six.moves.range(0, 60)]) self.trigger_processing([str(m.id)]) @@ -145,14 +145,14 @@ class TestStorageDriver(tests_base.TestCase): def test_add_measures_update_subset_split(self): m, m_sql = self._create_metric('medium') measures = [ - storage.Measure(datetime.datetime(2014, 1, 6, i, j, 0), 100) + storage.Measure(utils.datetime_utc(2014, 1, 6, i, j, 0), 100) for i in six.moves.range(2) for j in six.moves.range(0, 60, 2)] self.storage.add_measures(m, measures) self.trigger_processing([str(m.id)]) # add measure to end, in same aggregate time as last point. self.storage.add_measures(m, [ - storage.Measure(datetime.datetime(2014, 1, 6, 1, 58, 1), 100)]) + storage.Measure(utils.datetime_utc(2014, 1, 6, 1, 58, 1), 100)]) with mock.patch.object(self.storage, '_store_metric_measures') as c: # should only resample last aggregate @@ -168,13 +168,13 @@ class TestStorageDriver(tests_base.TestCase): def test_add_measures_update_subset(self): m, m_sql = self._create_metric('medium') measures = [ - storage.Measure(datetime.datetime(2014, 1, 6, i, j, 0), 100) + storage.Measure(utils.datetime_utc(2014, 1, 6, i, j, 0), 100) for i in six.moves.range(2) for j in six.moves.range(0, 60, 2)] self.storage.add_measures(m, measures) self.trigger_processing([str(m.id)]) # add measure to end, in same aggregate time as last point. - new_point = datetime.datetime(2014, 1, 6, 1, 58, 1) + new_point = utils.datetime_utc(2014, 1, 6, 1, 58, 1) self.storage.add_measures(m, [storage.Measure(new_point, 100)]) with mock.patch.object(self.storage, '_add_measures') as c: @@ -186,10 +186,10 @@ class TestStorageDriver(tests_base.TestCase): def test_delete_old_measures(self): self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44), ]) self.trigger_processing() @@ -203,7 +203,7 @@ class TestStorageDriver(tests_base.TestCase): # One year later… self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2015, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2015, 1, 1, 12, 0, 1), 69), ]) self.trigger_processing() @@ -237,10 +237,10 @@ class TestStorageDriver(tests_base.TestCase): # First store some points scattered across different splits self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2016, 1, 1, 12, 0, 1), 69), - storage.Measure(datetime.datetime(2016, 1, 2, 13, 7, 31), 42), - storage.Measure(datetime.datetime(2016, 1, 4, 14, 9, 31), 4), - storage.Measure(datetime.datetime(2016, 1, 6, 15, 12, 45), 44), + storage.Measure(utils.datetime_utc(2016, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2016, 1, 2, 13, 7, 31), 42), + storage.Measure(utils.datetime_utc(2016, 1, 4, 14, 9, 31), 4), + storage.Measure(utils.datetime_utc(2016, 1, 6, 15, 12, 45), 44), ]) self.trigger_processing() @@ -277,8 +277,8 @@ class TestStorageDriver(tests_base.TestCase): # the BoundTimeSerie processing timeserie far away from its current # range. self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2016, 1, 10, 16, 18, 45), 45), - storage.Measure(datetime.datetime(2016, 1, 10, 17, 12, 45), 46), + storage.Measure(utils.datetime_utc(2016, 1, 10, 16, 18, 45), 45), + storage.Measure(utils.datetime_utc(2016, 1, 10, 17, 12, 45), 46), ]) self.trigger_processing() @@ -312,8 +312,8 @@ class TestStorageDriver(tests_base.TestCase): def test_updated_measures(self): self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42), ]) self.trigger_processing() @@ -325,8 +325,8 @@ class TestStorageDriver(tests_base.TestCase): ], self.storage.get_measures(self.metric)) self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44), ]) self.trigger_processing() @@ -356,10 +356,10 @@ class TestStorageDriver(tests_base.TestCase): def test_add_and_get_measures(self): self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44), ]) self.trigger_processing() @@ -446,10 +446,10 @@ class TestStorageDriver(tests_base.TestCase): def test_get_measure_unknown_aggregation(self): self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44), ]) self.assertRaises(storage.AggregationDoesNotExist, self.storage.get_measures, @@ -459,16 +459,16 @@ class TestStorageDriver(tests_base.TestCase): metric2 = storage.Metric(uuid.uuid4(), self.archive_policies['low']) self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44), ]) self.storage.add_measures(metric2, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44), ]) self.assertRaises(storage.AggregationDoesNotExist, self.storage.get_cross_metric_measures, @@ -479,16 +479,16 @@ class TestStorageDriver(tests_base.TestCase): metric2 = storage.Metric(uuid.uuid4(), self.archive_policies['low']) self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44), ]) self.storage.add_measures(metric2, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44), ]) self.assertRaises(storage.GranularityDoesNotExist, self.storage.get_cross_metric_measures, @@ -499,16 +499,16 @@ class TestStorageDriver(tests_base.TestCase): metric2 = storage.Metric(uuid.uuid4(), self.archive_policies['no_granularity_match']) self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44), ]) self.storage.add_measures(metric2, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44), ]) self.assertRaises(storage.MetricUnaggregatable, @@ -518,16 +518,16 @@ class TestStorageDriver(tests_base.TestCase): def test_add_and_get_cross_metric_measures(self): metric2, __ = self._create_metric() self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 44), ]) self.storage.add_measures(metric2, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 5), 9), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 41), 2), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 10, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 13, 10), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 5), 9), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 41), 2), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 10, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 13, 10), 4), ]) self.trigger_processing([str(self.metric.id), str(metric2.id)]) @@ -603,17 +603,17 @@ class TestStorageDriver(tests_base.TestCase): def test_add_and_get_cross_metric_measures_with_holes(self): metric2, __ = self._create_metric() self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 5, 31), 8), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 5, 31), 8), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 42), ]) self.storage.add_measures(metric2, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 5), 9), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 2), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 6), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 13, 10), 2), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 5), 9), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 2), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 6), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 13, 10), 2), ]) self.trigger_processing([str(self.metric.id), str(metric2.id)]) @@ -629,18 +629,18 @@ class TestStorageDriver(tests_base.TestCase): def test_search_value(self): metric2, __ = self._create_metric() self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1,), 69), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 5, 31), 8), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1,), 69), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 42), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 5, 31), 8), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 4), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 12, 45), 42), ]) self.storage.add_measures(metric2, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 5), 9), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 2), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 6), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 13, 10), 2), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 5), 9), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 7, 31), 2), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 9, 31), 6), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 13, 10), 2), ]) self.trigger_processing([str(self.metric.id), str(metric2.id)]) @@ -671,9 +671,9 @@ class TestStorageDriver(tests_base.TestCase): str(uuid.uuid4()), name) m = self.index.list_metrics(ids=[m.id])[0] self.storage.add_measures(m, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 0), 1), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 5), 1), - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 10), 1), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 0), 1), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 5), 1), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 10), 1), ]) self.trigger_processing([str(m.id)]) self.assertEqual([ @@ -686,7 +686,7 @@ class TestStorageDriver(tests_base.TestCase): name, [archive_policy.ArchivePolicyItem(granularity=5, points=6)]) m = self.index.list_metrics(ids=[m.id])[0] self.storage.add_measures(m, [ - storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 15), 1), + storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 15), 1), ]) self.trigger_processing([str(m.id)]) self.assertEqual([ diff --git a/tools/measures_injector.py b/tools/measures_injector.py index 2d58ca62..dd112d55 100755 --- a/tools/measures_injector.py +++ b/tools/measures_injector.py @@ -13,7 +13,6 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -import datetime import random from concurrent import futures @@ -45,8 +44,7 @@ def injector(): def todo(metric): for _ in six.moves.range(conf.batch_of_measures): measures = [ - storage.Measure(utils.to_timestamp(datetime.datetime.now()), - random.random()) + storage.Measure(utils.utcnow(), random.random()) for __ in six.moves.range(conf.measures_per_batch)] s.add_measures(metric, measures)