gnocchi 3.0.4 release
meta:version: 3.0.4 meta:diff-start: - meta:series: independent meta:release-type: release meta:pypi: no meta:first: no meta:release:Author: Julien Danjou <julien@danjou.info> meta:release:Commit: Julien Danjou <julien@danjou.info> meta:release:Change-Id: I198372001318bd53428bae0b4f54c5816d2c2af8 meta:release:Code-Review+2: Doug Hellmann <doug@doughellmann.com> meta:release:Workflow+1: Doug Hellmann <doug@doughellmann.com> -----BEGIN PGP SIGNATURE----- Version: GnuPG v1 iQEcBAABAgAGBQJYfOTLAAoJELkGmxM1cAzcdPEH/2urR8IzIwvcxCymGVhAcJ9b 9yBoPydl31nQ0lLcrXF+qJ6OcE7yYv9f+U4Wq45TGMRakocnn88PNmBtuIZLjimF Ih6ugGkMk3pEv+69g1dbyscZ4CiG1WrV0mvbbq8mwlYwOcUkMwM9L2qMJ9EA81rC Lqx60sJZs/K2g1vln6/46YtXkfQQ+bWBKwep9jEX3XqD4z8TgXmK0smTj+drSNK3 gHkcRnYB7TYHG/agMBrjf/FzJoPOwbmZm4XfIsIzHM+anunNDyx0rVJHBTg7AQ01 kplUTwou2lvLq+oGqjBfCIrj8q7ZLgx5sM/R5HemtXJbleSL68U1mKh0WEDI2Zw= =53rm -----END PGP SIGNATURE----- Merge tag '3.0.4' into debian/newton gnocchi 3.0.4 release Change-Id: Iccc30f5f5140bd4e6773ea1a1f5bc93239c6b687 meta:version: 3.0.4 meta:diff-start: - meta:series: independent meta:release-type: release meta:pypi: no meta:first: no meta:release:Author: Julien Danjou <julien@danjou.info> meta:release:Commit: Julien Danjou <julien@danjou.info> meta:release:Change-Id: I198372001318bd53428bae0b4f54c5816d2c2af8 meta:release:Code-Review+2: Doug Hellmann <doug@doughellmann.com> meta:release:Workflow+1: Doug Hellmann <doug@doughellmann.com>
This commit is contained in:
commit
dc6dc2bf8a
|
@ -1,9 +1,15 @@
|
|||
gnocchi (3.0.0-3) UNRELEASED; urgency=medium
|
||||
gnocchi (3.0.4-1) unstable; urgency=medium
|
||||
|
||||
* Team upload.
|
||||
|
||||
[ Ondřej Nový ]
|
||||
* Bumped debhelper compat version to 10
|
||||
* Added lsb-base to depends
|
||||
|
||||
-- Ondřej Nový <onovy@debian.org> Wed, 23 Nov 2016 23:58:26 +0100
|
||||
[ Ondřej Kobližek ]
|
||||
* New upstream release (Closes: #852991)
|
||||
|
||||
-- Ondřej Kobližek <koblizeko@gmail.com> Thu, 02 Feb 2017 14:44:31 +0100
|
||||
|
||||
gnocchi (3.0.0-2) unstable; urgency=medium
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ Build-Depends-Indep: alembic (>= 0.7.6),
|
|||
python-os-testr,
|
||||
python-oslo.config (>= 1:2.6.0),
|
||||
python-oslo.db (>= 4.8.0),
|
||||
python-oslo.log (>= 1.0.0),
|
||||
python-oslo.log (>= 2.3.0),
|
||||
python-oslo.middleware (>= 3.11.0),
|
||||
python-oslo.policy (>= 0.3.0),
|
||||
python-oslo.serialization (>= 1.4.0),
|
||||
|
@ -90,7 +90,7 @@ Depends: alembic (>= 0.7.6),
|
|||
python-numpy,
|
||||
python-oslo.config (>= 1:2.6.0),
|
||||
python-oslo.db (>= 4.8.0),
|
||||
python-oslo.log (>= 1.0.0),
|
||||
python-oslo.log (>= 2.3.0),
|
||||
python-oslo.middleware (>= 3.11.0),
|
||||
python-oslo.policy (>= 0.3.0),
|
||||
python-oslo.serialization (>= 1.4.0),
|
||||
|
|
|
@ -390,7 +390,7 @@ function install_gnocchi {
|
|||
|
||||
install_gnocchiclient
|
||||
|
||||
[ "$GNOCCHI_USE_KEYSTONE" == "True" ] && EXTRA_FLAVOR=,keystonemiddleware
|
||||
[ "$GNOCCHI_USE_KEYSTONE" == "True" ] && EXTRA_FLAVOR=,keystone
|
||||
|
||||
# We don't use setup_package because we don't follow openstack/requirements
|
||||
sudo -H pip install -e "$GNOCCHI_DIR"[test,$GNOCCHI_STORAGE_BACKEND,${DATABASE_TYPE}${EXTRA_FLAVOR}]
|
||||
|
@ -426,7 +426,7 @@ function start_gnocchi {
|
|||
elif [ "$GNOCCHI_DEPLOY" == "uwsgi" ]; then
|
||||
run_process gnocchi-api "$GNOCCHI_BIN_DIR/uwsgi $GNOCCHI_UWSGI_FILE"
|
||||
else
|
||||
run_process gnocchi-api "$GNOCCHI_BIN_DIR/gnocchi-api -d -v --config-file $GNOCCHI_CONF"
|
||||
run_process gnocchi-api "$GNOCCHI_BIN_DIR/gnocchi-api --port $GNOCCHI_SERVICE_PORT"
|
||||
fi
|
||||
# only die on API if it was actually intended to be turned on
|
||||
if is_service_enabled gnocchi-api; then
|
||||
|
@ -457,7 +457,7 @@ function stop_gnocchi {
|
|||
restart_apache_server
|
||||
fi
|
||||
# Kill the gnocchi screen windows
|
||||
for serv in gnocchi-api; do
|
||||
for serv in gnocchi-api gnocchi-metricd gnocchi-statsd; do
|
||||
stop_process $serv
|
||||
done
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ To enable Gnocchi in devstack, add the following to local.conf:
|
|||
::
|
||||
|
||||
enable_plugin gnocchi https://github.com/openstack/gnocchi master
|
||||
enable_service gnocchi-api,gnocchi-metricd
|
||||
|
||||
To enable Grafana support in devstack, you can also enable `gnocchi-grafana`::
|
||||
|
||||
|
@ -93,15 +92,13 @@ that your indexer and storage are properly upgraded. Run the following:
|
|||
|
||||
1. Stop the old version of Gnocchi API server and `gnocchi-statsd` daemon
|
||||
|
||||
2. Make sure that the processing backlog is empty (`gnocchi status`)
|
||||
2. Stop the old version of `gnocchi-metricd` daemon
|
||||
|
||||
3. Stop the old version of `gnocchi-metricd` daemon
|
||||
3. Install the new version of Gnocchi
|
||||
|
||||
4. Install the new version of Gnocchi
|
||||
|
||||
5. Run `gnocchi-upgrade`
|
||||
4. Run `gnocchi-upgrade`
|
||||
This can take several hours depending on the size of your index and
|
||||
storage.
|
||||
|
||||
6. Start the new Gnocchi API server, `gnocchi-metricd`
|
||||
5. Start the new Gnocchi API server, `gnocchi-metricd`
|
||||
and `gnocchi-statsd` daemons
|
||||
|
|
|
@ -489,7 +489,7 @@ It can also be done by providing the list of metrics to aggregate:
|
|||
|
||||
By default, the measures are aggregated using the aggregation method provided,
|
||||
e.g. you'll get a mean of means, or a max of maxs. You can specify what method
|
||||
to use over the retrieved aggregation by using the `reaggregate` parameter:
|
||||
to use over the retrieved aggregation by using the `reaggregation` parameter:
|
||||
|
||||
{{ scenarios['get-across-metrics-measures-by-metric-ids-reaggregate']['doc'] }}
|
||||
|
||||
|
|
|
@ -4,19 +4,22 @@ pipeline = gnocchi+noauth
|
|||
|
||||
[composite:gnocchi+noauth]
|
||||
use = egg:Paste#urlmap
|
||||
/ = gnocchiversions
|
||||
/ = gnocchiversions_pipeline
|
||||
/v1 = gnocchiv1+noauth
|
||||
|
||||
[composite:gnocchi+auth]
|
||||
use = egg:Paste#urlmap
|
||||
/ = gnocchiversions
|
||||
/ = gnocchiversions_pipeline
|
||||
/v1 = gnocchiv1+auth
|
||||
|
||||
[pipeline:gnocchiv1+noauth]
|
||||
pipeline = gnocchiv1
|
||||
pipeline = http_proxy_to_wsgi gnocchiv1
|
||||
|
||||
[pipeline:gnocchiv1+auth]
|
||||
pipeline = keystone_authtoken gnocchiv1
|
||||
pipeline = http_proxy_to_wsgi keystone_authtoken gnocchiv1
|
||||
|
||||
[pipeline:gnocchiversions_pipeline]
|
||||
pipeline = http_proxy_to_wsgi gnocchiversions
|
||||
|
||||
[app:gnocchiversions]
|
||||
paste.app_factory = gnocchi.rest.app:app_factory
|
||||
|
@ -29,3 +32,7 @@ root = gnocchi.rest.V1Controller
|
|||
[filter:keystone_authtoken]
|
||||
paste.filter_factory = keystonemiddleware.auth_token:filter_factory
|
||||
oslo_config_project = gnocchi
|
||||
|
||||
[filter:http_proxy_to_wsgi]
|
||||
paste.filter_factory = oslo_middleware.http_proxy_to_wsgi:HTTPProxyToWSGI.factory
|
||||
oslo_config_project = gnocchi
|
||||
|
|
|
@ -4,6 +4,7 @@ wrap_width = 79
|
|||
namespace = gnocchi
|
||||
namespace = oslo.db
|
||||
namespace = oslo.log
|
||||
namespace = oslo.middleware
|
||||
namespace = oslo.middleware.cors
|
||||
namespace = oslo.middleware.http_proxy_to_wsgi
|
||||
namespace = oslo.policy
|
||||
namespace = keystonemiddleware.auth_token
|
||||
|
|
|
@ -27,13 +27,10 @@ import re
|
|||
import struct
|
||||
import time
|
||||
|
||||
import iso8601
|
||||
import lz4
|
||||
import pandas
|
||||
import six
|
||||
|
||||
from gnocchi import utils
|
||||
|
||||
# NOTE(sileht): pandas relies on time.strptime()
|
||||
# and often triggers http://bugs.python.org/issue7980
|
||||
# its dues to our heavy threads usage, this is the workaround
|
||||
|
@ -77,6 +74,12 @@ class UnknownAggregationMethod(Exception):
|
|||
"Unknown aggregation method `%s'" % agg)
|
||||
|
||||
|
||||
class InvalidData(ValueError):
|
||||
"""Error raised when data are corrupted."""
|
||||
def __init__(self):
|
||||
super(InvalidData, self).__init__("Unable to unpack, invalid data")
|
||||
|
||||
|
||||
def round_timestamp(ts, freq):
|
||||
return pandas.Timestamp(
|
||||
(pandas.Timestamp(ts).value // freq) * freq)
|
||||
|
@ -228,8 +231,11 @@ class BoundTimeSerie(TimeSerie):
|
|||
nb_points = (
|
||||
len(uncompressed) // cls._SERIALIZATION_TIMESTAMP_VALUE_LEN
|
||||
)
|
||||
deserial = struct.unpack("<" + "Q" * nb_points + "d" * nb_points,
|
||||
uncompressed)
|
||||
try:
|
||||
deserial = struct.unpack("<" + "Q" * nb_points + "d" * nb_points,
|
||||
uncompressed)
|
||||
except struct.error:
|
||||
raise InvalidData
|
||||
start = deserial[0]
|
||||
timestamps = [start]
|
||||
for delta in itertools.islice(deserial, 1, nb_points):
|
||||
|
@ -325,7 +331,8 @@ class BoundTimeSerie(TimeSerie):
|
|||
self.ts = self.ts[self.first_block_timestamp():]
|
||||
|
||||
|
||||
class SplitKey(pandas.Timestamp):
|
||||
@functools.total_ordering
|
||||
class SplitKey(object):
|
||||
"""A class representing a split key.
|
||||
|
||||
A split key is basically a timestamp that can be used to split
|
||||
|
@ -336,33 +343,30 @@ class SplitKey(pandas.Timestamp):
|
|||
|
||||
POINTS_PER_SPLIT = 3600
|
||||
|
||||
@classmethod
|
||||
def _init(cls, value, sampling):
|
||||
# NOTE(jd) This should be __init__ but it does not work, because of…
|
||||
# Pandas, Cython, whatever.
|
||||
self = cls(value)
|
||||
self._carbonara_sampling = sampling
|
||||
return self
|
||||
def __init__(self, value, sampling):
|
||||
if isinstance(value, SplitKey):
|
||||
self.key = value.key
|
||||
elif isinstance(value, pandas.Timestamp):
|
||||
self.key = value.value / 10e8
|
||||
else:
|
||||
self.key = float(value)
|
||||
|
||||
self._carbonara_sampling = float(sampling)
|
||||
|
||||
@classmethod
|
||||
def from_timestamp_and_sampling(cls, timestamp, sampling):
|
||||
return cls._init(
|
||||
return cls(
|
||||
round_timestamp(
|
||||
timestamp, freq=sampling * cls.POINTS_PER_SPLIT * 10e8),
|
||||
sampling)
|
||||
|
||||
@classmethod
|
||||
def from_key_string(cls, keystr, sampling):
|
||||
return cls._init(float(keystr) * 10e8, sampling)
|
||||
|
||||
def __next__(self):
|
||||
"""Get the split key of the next split.
|
||||
|
||||
:return: A `SplitKey` object.
|
||||
"""
|
||||
return self._init(
|
||||
self + datetime.timedelta(
|
||||
seconds=(self.POINTS_PER_SPLIT * self._carbonara_sampling)),
|
||||
return self.__class__(
|
||||
self.key + self._carbonara_sampling * self.POINTS_PER_SPLIT,
|
||||
self._carbonara_sampling)
|
||||
|
||||
next = __next__
|
||||
|
@ -370,18 +374,35 @@ class SplitKey(pandas.Timestamp):
|
|||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.key)
|
||||
|
||||
def __lt__(self, other):
|
||||
if isinstance(other, SplitKey):
|
||||
return self.key < other.key
|
||||
if isinstance(other, pandas.Timestamp):
|
||||
return self.key * 10e8 < other.value
|
||||
return self.key < other
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, SplitKey):
|
||||
return self.key == other.key
|
||||
if isinstance(other, pandas.Timestamp):
|
||||
return self.key * 10e8 == other.value
|
||||
return self.key == other
|
||||
|
||||
def __str__(self):
|
||||
return str(float(self))
|
||||
|
||||
def __float__(self):
|
||||
ts = self.to_datetime()
|
||||
if ts.tzinfo is None:
|
||||
ts = ts.replace(tzinfo=iso8601.iso8601.UTC)
|
||||
return utils.datetime_to_unix(ts)
|
||||
return self.key
|
||||
|
||||
def as_datetime(self):
|
||||
return pandas.Timestamp(self.key, unit='s')
|
||||
|
||||
def __repr__(self):
|
||||
return "<%s: %s / %fs>" % (self.__class__.__name__,
|
||||
pandas.Timestamp.__repr__(self),
|
||||
repr(self.key),
|
||||
self._carbonara_sampling)
|
||||
|
||||
|
||||
|
@ -436,7 +457,7 @@ class AggregatedTimeSerie(TimeSerie):
|
|||
groupby = self.ts.groupby(functools.partial(
|
||||
SplitKey.from_timestamp_and_sampling, sampling=self.sampling))
|
||||
for group, ts in groupby:
|
||||
yield (SplitKey._init(group, self.sampling),
|
||||
yield (SplitKey(group, self.sampling),
|
||||
AggregatedTimeSerie(self.sampling, self.aggregation_method,
|
||||
ts))
|
||||
|
||||
|
@ -489,9 +510,12 @@ class AggregatedTimeSerie(TimeSerie):
|
|||
# Compressed format
|
||||
uncompressed = lz4.loads(memoryview(data)[1:].tobytes())
|
||||
nb_points = len(uncompressed) // cls.COMPRESSED_SERIAL_LEN
|
||||
deserial = struct.unpack(
|
||||
'<' + 'H' * nb_points + 'd' * nb_points,
|
||||
uncompressed)
|
||||
try:
|
||||
deserial = struct.unpack(
|
||||
'<' + 'H' * nb_points + 'd' * nb_points,
|
||||
uncompressed)
|
||||
except struct.error:
|
||||
raise InvalidData
|
||||
for delta in itertools.islice(deserial, nb_points):
|
||||
ts = start + (delta * sampling)
|
||||
y.append(ts)
|
||||
|
@ -502,7 +526,10 @@ class AggregatedTimeSerie(TimeSerie):
|
|||
nb_points = len(data) // cls.PADDED_SERIAL_LEN
|
||||
# NOTE(gordc): use '<' for standardized
|
||||
# little-endian byte order
|
||||
deserial = struct.unpack('<' + '?d' * nb_points, data)
|
||||
try:
|
||||
deserial = struct.unpack('<' + '?d' * nb_points, data)
|
||||
except struct.error:
|
||||
raise InvalidData()
|
||||
# alternating split into 2 list and drop items with False flag
|
||||
for i, val in itertools.compress(
|
||||
six.moves.zip(six.moves.range(nb_points),
|
||||
|
@ -544,7 +571,10 @@ class AggregatedTimeSerie(TimeSerie):
|
|||
if not self.ts.index.is_monotonic:
|
||||
self.ts = self.ts.sort_index()
|
||||
offset_div = self.sampling * 10e8
|
||||
start = pandas.Timestamp(start).value
|
||||
if isinstance(start, SplitKey):
|
||||
start = start.as_datetime().value
|
||||
else:
|
||||
start = pandas.Timestamp(start).value
|
||||
# calculate how many seconds from start the series runs until and
|
||||
# initialize list to store alternating delimiter, float entries
|
||||
if compressed:
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
import multiprocessing
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import cotyledon
|
||||
from futurist import periodics
|
||||
|
@ -143,16 +142,11 @@ class MetricScheduler(MetricProcessBase):
|
|||
TASKS_PER_WORKER = 16
|
||||
BLOCK_SIZE = 4
|
||||
|
||||
def _enable_coordination(self, conf):
|
||||
self._coord = coordination.get_coordinator(
|
||||
conf.storage.coordination_url, self._my_id)
|
||||
self._coord.start(start_heart=True)
|
||||
|
||||
def __init__(self, worker_id, conf, queue):
|
||||
super(MetricScheduler, self).__init__(
|
||||
worker_id, conf, conf.storage.metric_processing_delay)
|
||||
self._my_id = str(uuid.uuid4())
|
||||
self._enable_coordination(conf)
|
||||
self._coord, self._my_id = utils.get_coordinator_and_start(
|
||||
conf.storage.coordination_url)
|
||||
self.queue = queue
|
||||
self.previously_scheduled_metrics = set()
|
||||
self.workers = conf.metricd.workers
|
||||
|
|
|
@ -19,6 +19,7 @@ import json
|
|||
import jinja2
|
||||
import six
|
||||
import six.moves
|
||||
import webob.request
|
||||
import yaml
|
||||
|
||||
from gnocchi.tests import test_rest
|
||||
|
@ -117,8 +118,9 @@ def setup(app):
|
|||
fake_file.seek(0)
|
||||
request = webapp.RequestClass.from_file(fake_file)
|
||||
|
||||
# TODO(jd) Fix this lame bug in webob
|
||||
if request.method in ("DELETE"):
|
||||
# TODO(jd) Fix this lame bug in webob < 1.7
|
||||
if (hasattr(webob.request, "http_method_probably_has_body")
|
||||
and request.method == "DELETE"):
|
||||
# Webob has a bug it does not read the body for DELETE, l4m3r
|
||||
clen = request.content_length
|
||||
if clen is None:
|
||||
|
|
|
@ -26,6 +26,7 @@ import iso8601
|
|||
import msgpack
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_serialization import msgpackutils
|
||||
from oslo_utils import timeutils
|
||||
import pandas
|
||||
import six
|
||||
|
@ -65,24 +66,14 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
|
||||
def __init__(self, conf):
|
||||
super(CarbonaraBasedStorage, self).__init__(conf)
|
||||
self.coord = coordination.get_coordinator(
|
||||
conf.coordination_url,
|
||||
str(uuid.uuid4()).encode('ascii'))
|
||||
self.aggregation_workers_number = conf.aggregation_workers_number
|
||||
if self.aggregation_workers_number == 1:
|
||||
# NOTE(jd) Avoid using futures at all if we don't want any threads.
|
||||
self._map_in_thread = self._map_no_thread
|
||||
else:
|
||||
self._map_in_thread = self._map_in_futures_threads
|
||||
self.start()
|
||||
|
||||
@utils.retry
|
||||
def start(self):
|
||||
try:
|
||||
self.coord.start(start_heart=True)
|
||||
except Exception as e:
|
||||
LOG.error("Unable to start coordinator: %s" % e)
|
||||
raise utils.Retry(e)
|
||||
self.coord, my_id = utils.get_coordinator_and_start(
|
||||
conf.coordination_url)
|
||||
|
||||
def stop(self):
|
||||
self.coord.stop()
|
||||
|
@ -177,7 +168,7 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
try:
|
||||
return carbonara.AggregatedTimeSerie.unserialize(
|
||||
data, key, aggregation, granularity)
|
||||
except ValueError:
|
||||
except carbonara.InvalidData:
|
||||
LOG.error("Data corruption detected for %s "
|
||||
"aggregated `%s' timeserie, granularity `%s' "
|
||||
"around time `%s', ignoring."
|
||||
|
@ -239,7 +230,7 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
oldest_mutable_timestamp):
|
||||
# NOTE(jd) We write the full split only if the driver works that way
|
||||
# (self.WRITE_FULL) or if the oldest_mutable_timestamp is out of range.
|
||||
write_full = self.WRITE_FULL or oldest_mutable_timestamp >= next(key)
|
||||
write_full = self.WRITE_FULL or next(key) <= oldest_mutable_timestamp
|
||||
key_as_str = str(key)
|
||||
if write_full:
|
||||
try:
|
||||
|
@ -255,6 +246,20 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
else:
|
||||
split.merge(existing)
|
||||
|
||||
if split is None:
|
||||
# `split' can be none if existing is None and no split was passed
|
||||
# in order to rewrite and compress the data; in that case, it means
|
||||
# the split key is present and listed, but some aggregation method
|
||||
# or granularity is missing. That means data is corrupted, but it
|
||||
# does not mean we have to fail, we can just do nothing and log a
|
||||
# warning.
|
||||
LOG.warning("No data found for metric %s, granularity %f "
|
||||
"and aggregation method %s (split key %s): "
|
||||
"possible data corruption",
|
||||
metric, archive_policy_def.granularity,
|
||||
aggregation, key)
|
||||
return
|
||||
|
||||
offset, data = split.serialize(key, compressed=write_full)
|
||||
|
||||
return self._store_metric_measures(
|
||||
|
@ -301,7 +306,7 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
archive_policy_def.granularity)
|
||||
existing_keys.remove(key)
|
||||
else:
|
||||
oldest_key_to_keep = carbonara.SplitKey(0)
|
||||
oldest_key_to_keep = carbonara.SplitKey(0, 0)
|
||||
|
||||
# Rewrite all read-only splits just for fun (and compression). This
|
||||
# only happens if `previous_oldest_mutable_timestamp' exists, which
|
||||
|
@ -319,8 +324,8 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
# NOTE(jd) Rewrite it entirely for fun (and later for
|
||||
# compression). For that, we just pass None as split.
|
||||
self._store_timeserie_split(
|
||||
metric, carbonara.SplitKey.from_key_string(
|
||||
key, archive_policy_def.granularity),
|
||||
metric, carbonara.SplitKey(
|
||||
float(key), archive_policy_def.granularity),
|
||||
None, aggregation, archive_policy_def,
|
||||
oldest_mutable_timestamp)
|
||||
|
||||
|
@ -372,10 +377,20 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
_MEASURE_SERIAL_FORMAT = "Qd"
|
||||
_MEASURE_SERIAL_LEN = struct.calcsize(_MEASURE_SERIAL_FORMAT)
|
||||
|
||||
def _unserialize_measures(self, data):
|
||||
def _unserialize_measures(self, measure_id, data):
|
||||
nb_measures = len(data) // self._MEASURE_SERIAL_LEN
|
||||
measures = struct.unpack(
|
||||
"<" + self._MEASURE_SERIAL_FORMAT * nb_measures, data)
|
||||
try:
|
||||
measures = struct.unpack(
|
||||
"<" + self._MEASURE_SERIAL_FORMAT * nb_measures, data)
|
||||
except struct.error:
|
||||
# This either a corruption, either a v2 measures
|
||||
try:
|
||||
return msgpackutils.loads(data)
|
||||
except ValueError:
|
||||
LOG.error(
|
||||
"Unable to decode measure %s, possible data corruption",
|
||||
measure_id)
|
||||
raise
|
||||
return six.moves.zip(
|
||||
pandas.to_datetime(measures[::2], unit='ns'),
|
||||
itertools.islice(measures, 1, len(measures), 2))
|
||||
|
@ -453,13 +468,13 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
def upgrade(self, index):
|
||||
marker = None
|
||||
while True:
|
||||
metrics = index.list_metrics(limit=self.UPGRADE_BATCH_SIZE,
|
||||
marker=marker)
|
||||
for m in metrics:
|
||||
self._check_for_metric_upgrade(m)
|
||||
metrics = [(metric,) for metric in
|
||||
index.list_metrics(limit=self.UPGRADE_BATCH_SIZE,
|
||||
marker=marker)]
|
||||
self._map_in_thread(self._check_for_metric_upgrade, metrics)
|
||||
if len(metrics) == 0:
|
||||
break
|
||||
marker = metrics[-1].id
|
||||
marker = metrics[-1][0].id
|
||||
|
||||
def process_new_measures(self, indexer, metrics_to_process, sync=False):
|
||||
metrics = indexer.list_metrics(ids=metrics_to_process)
|
||||
|
|
|
@ -220,7 +220,7 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
|
|||
measures = []
|
||||
for n in object_names:
|
||||
data = self._get_object_content(n)
|
||||
measures.extend(self._unserialize_measures(data))
|
||||
measures.extend(self._unserialize_measures(n, data))
|
||||
|
||||
yield measures
|
||||
|
||||
|
|
|
@ -194,7 +194,7 @@ class FileStorage(_carbonara.CarbonaraBasedStorage):
|
|||
for f in files:
|
||||
abspath = self._build_measure_path(metric.id, f)
|
||||
with open(abspath, "rb") as e:
|
||||
measures.extend(self._unserialize_measures(e.read()))
|
||||
measures.extend(self._unserialize_measures(f, e.read()))
|
||||
|
||||
yield measures
|
||||
|
||||
|
|
|
@ -189,7 +189,7 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage):
|
|||
for f in files:
|
||||
headers, data = self.swift.get_object(
|
||||
self.MEASURE_PREFIX, f['name'])
|
||||
measures.extend(self._unserialize_measures(data))
|
||||
measures.extend(self._unserialize_measures(f['name'], data))
|
||||
|
||||
yield measures
|
||||
|
||||
|
|
|
@ -23,6 +23,8 @@ from unittest import case
|
|||
import warnings
|
||||
|
||||
from gabbi import fixture
|
||||
from oslo_config import cfg
|
||||
from oslo_middleware import cors
|
||||
import sqlalchemy_utils
|
||||
|
||||
from gnocchi import indexer
|
||||
|
@ -80,6 +82,13 @@ class ConfigFixture(fixture.GabbiFixture):
|
|||
os.path.abspath('etc/gnocchi/api-paste.ini'),
|
||||
'api')
|
||||
|
||||
# NOTE(sileht): This is not concurrency safe, but only this tests file
|
||||
# deal with cors, so we are fine. set_override don't work because cors
|
||||
# group doesn't yet exists, and we the CORS middleware is created it
|
||||
# register the option and directly copy value of all configurations
|
||||
# options making impossible to override them properly...
|
||||
cfg.set_defaults(cors.CORS_OPTS, allowed_origin="http://foobar.com")
|
||||
|
||||
self.conf = conf
|
||||
self.tmp_dir = data_tmp_dir
|
||||
|
||||
|
|
|
@ -441,8 +441,7 @@ tests:
|
|||
method: POST
|
||||
request_headers:
|
||||
content-type: plain/text
|
||||
data:
|
||||
archive_policy_name: cookies
|
||||
data: '{"archive_policy_name": "cookies"}'
|
||||
status: 415
|
||||
|
||||
|
||||
|
|
|
@ -212,7 +212,7 @@ tests:
|
|||
- ['2015-03-06T14:33:57+00:00', 1.0, 23.1]
|
||||
|
||||
- name: get measure aggregates by granularity from resources and reaggregate
|
||||
POST: /v1/aggregation/resource/generic/metric/agg_meter?granularity=1&reaggregate=min
|
||||
POST: /v1/aggregation/resource/generic/metric/agg_meter?granularity=1&reaggregation=min
|
||||
request_headers:
|
||||
x-user-id: 0fbb231484614b1a80131fc22f6afc9c
|
||||
x-project-id: f3d41b770cc14f0bb94a1d5be9c0e3ea
|
||||
|
@ -222,8 +222,8 @@ tests:
|
|||
delay: 1
|
||||
response_json_paths:
|
||||
$:
|
||||
- ['2015-03-06T14:33:57+00:00', 1.0, 23.1]
|
||||
- ['2015-03-06T14:34:12+00:00', 1.0, 7.0]
|
||||
- ['2015-03-06T14:33:57+00:00', 1.0, 3.1]
|
||||
- ['2015-03-06T14:34:12+00:00', 1.0, 2.0]
|
||||
|
||||
# Some negative tests
|
||||
|
||||
|
|
|
@ -485,6 +485,7 @@ tests:
|
|||
- name: fail to create policy non-admin
|
||||
POST: /v1/archive_policy
|
||||
request_headers:
|
||||
content-type: application/json
|
||||
x-user-id: b45187c5-150b-4730-bcb2-b5e04e234220
|
||||
x-project-id: 16764ee0-bffe-4843-aa36-04b002cdbc7c
|
||||
data:
|
||||
|
|
|
@ -7,7 +7,7 @@ tests:
|
|||
desc: Root URL must return information about API versions
|
||||
GET: /
|
||||
response_headers:
|
||||
content-type: application/json; charset=UTF-8
|
||||
content-type: /^application\/json/
|
||||
response_json_paths:
|
||||
$.versions.[0].id: "v1.0"
|
||||
$.versions.[0].status: "CURRENT"
|
||||
|
@ -24,7 +24,7 @@ tests:
|
|||
points: 20
|
||||
status: 201
|
||||
response_headers:
|
||||
content-type: /application\/json/
|
||||
content-type: /^application\/json/
|
||||
location: $SCHEME://$NETLOC/v1/archive_policy/test1
|
||||
response_json_paths:
|
||||
$.name: test1
|
||||
|
@ -91,7 +91,7 @@ tests:
|
|||
desc: Resources index page should return list of type associated with a URL
|
||||
GET: /v1/resource/
|
||||
response_headers:
|
||||
content-type: application/json; charset=UTF-8
|
||||
content-type: /^application\/json/
|
||||
status: 200
|
||||
response_json_paths:
|
||||
$.generic: $SCHEME://$NETLOC/v1/resource/generic
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
fixtures:
|
||||
- ConfigFixture
|
||||
|
||||
tests:
|
||||
- name: get CORS headers for non-allowed
|
||||
OPTIONS: /v1/status
|
||||
request_headers:
|
||||
Origin: http://notallowed.com
|
||||
Access-Control-Request-Method: GET
|
||||
response_forbidden_headers:
|
||||
- Access-Control-Allow-Origin
|
||||
- Access-Control-Allow-Methods
|
||||
|
||||
- name: get CORS headers for allowed
|
||||
OPTIONS: /v1/status
|
||||
request_headers:
|
||||
Origin: http://foobar.com
|
||||
Access-Control-Request-Method: GET
|
||||
response_headers:
|
||||
Access-Control-Allow-Origin: http://foobar.com
|
||||
Access-Control-Allow-Methods: GET
|
|
@ -35,7 +35,7 @@ tests:
|
|||
status: 201
|
||||
response_headers:
|
||||
location: $SCHEME://$NETLOC/v1/resource/generic/f93450f2-d8a5-4d67-9985-02511241e7d1
|
||||
content-type: application/json; charset=UTF-8
|
||||
content-type: /^application\/json/
|
||||
response_json_paths:
|
||||
$.created_by_project_id: f3d41b770cc14f0bb94a1d5be9c0e3ea
|
||||
$.created_by_user_id: 0fbb231484614b1a80131fc22f6afc9c
|
||||
|
|
|
@ -115,8 +115,7 @@ tests:
|
|||
POST: /v1/metric
|
||||
request_headers:
|
||||
content-type: plain/text
|
||||
data:
|
||||
archive_policy_name: cookies
|
||||
data: '{"archive_policy_name": "cookies"}'
|
||||
status: 415
|
||||
|
||||
- name: create valid metric
|
||||
|
|
|
@ -131,7 +131,7 @@ tests:
|
|||
status: 201
|
||||
response_headers:
|
||||
location: $SCHEME://$NETLOC/v1/resource/generic/f93450f2-d8a5-4d67-9985-02511241e7d1
|
||||
content-type: application/json; charset=UTF-8
|
||||
content-type: /^application\/json/
|
||||
response_json_paths:
|
||||
$.created_by_project_id: f3d41b770cc14f0bb94a1d5be9c0e3ea
|
||||
$.created_by_user_id: 0fbb231484614b1a80131fc22f6afc9c
|
||||
|
@ -157,11 +157,7 @@ tests:
|
|||
x-user-id: 0fbb231484614b1a80131fc22f6afc9c
|
||||
x-project-id: f3d41b770cc14f0bb94a1d5be9c0e3ea
|
||||
content-type: text/plain
|
||||
data:
|
||||
id: f93450f2-d8a5-4d67-9985-02511241e7d1
|
||||
started_at: "2014-01-03T02:02:02.000000"
|
||||
user_id: 0fbb231484614b1a80131fc22f6afc9c
|
||||
project_id: f3d41b770cc14f0bb94a1d5be9c0e3ea
|
||||
data: '{"id": "f93450f2-d8a5-4d67-9985-02511241e7d1", "started_at": "2014-01-03T02:02:02.000000", "user_id": "0fbb231484614b1a80131fc22f6afc9c", "project_id": "f3d41b770cc14f0bb94a1d5be9c0e3ea"}'
|
||||
status: 415
|
||||
|
||||
# Create a new instance resource, demonstrate that including no data
|
||||
|
@ -339,6 +335,7 @@ tests:
|
|||
request_headers:
|
||||
x-user-id: 0fbb231484614b1a80131fc22f6afc9c
|
||||
x-project-id: f3d41b770cc14f0bb94a1d5be9c0e3ea
|
||||
content-type: application/json
|
||||
data:
|
||||
host: compute2
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ import abc
|
|||
|
||||
import fixtures
|
||||
import mock
|
||||
import oslo_db.exception
|
||||
from oslo_db.sqlalchemy import test_migrations
|
||||
import six
|
||||
import sqlalchemy as sa
|
||||
|
@ -50,10 +51,14 @@ class ModelsMigrationsSync(
|
|||
self.index = indexer.get_driver(self.conf)
|
||||
self.index.connect()
|
||||
self.index.upgrade(nocreate=True, create_legacy_resource_types=True)
|
||||
self.addCleanup(self._drop_database)
|
||||
|
||||
def tearDown(self):
|
||||
sqlalchemy_utils.drop_database(self.conf.indexer.url)
|
||||
super(ModelsMigrationsSync, self).tearDown()
|
||||
def _drop_database(self):
|
||||
try:
|
||||
sqlalchemy_utils.drop_database(self.conf.indexer.url)
|
||||
except oslo_db.exception.DBNonExistentDatabase:
|
||||
# NOTE(sileht): oslo db >= 4.15.0 cleanup this for us
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def get_metadata():
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Copyright © 2014-2015 eNovance
|
||||
# Copyright © 2014-2016 eNovance
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
|
@ -932,21 +932,36 @@ class TestAggregatedTimeSerie(base.BaseTestCase):
|
|||
self.assertEqual(
|
||||
datetime.datetime(2014, 10, 7),
|
||||
carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600))
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600).as_datetime())
|
||||
self.assertEqual(
|
||||
datetime.datetime(2014, 12, 31, 18),
|
||||
carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 58))
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 58).as_datetime())
|
||||
self.assertEqual(
|
||||
1420048800.0,
|
||||
float(carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 58)))
|
||||
|
||||
key = carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600)
|
||||
|
||||
self.assertGreater(key, pandas.Timestamp(0))
|
||||
|
||||
self.assertGreaterEqual(key, pandas.Timestamp(0))
|
||||
|
||||
def test_split_key_next(self):
|
||||
self.assertEqual(
|
||||
datetime.datetime(2015, 3, 6),
|
||||
next(carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600)))
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600)).as_datetime())
|
||||
self.assertEqual(
|
||||
datetime.datetime(2015, 8, 3),
|
||||
next(next(carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600))))
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600))).as_datetime())
|
||||
self.assertEqual(
|
||||
113529600000.0,
|
||||
float(next(carbonara.SplitKey.from_timestamp_and_sampling(
|
||||
datetime.datetime(2015, 1, 1, 15, 3), 3600 * 24 * 365))))
|
||||
|
||||
def test_split(self):
|
||||
sampling = 5
|
||||
|
@ -964,10 +979,10 @@ class TestAggregatedTimeSerie(base.BaseTestCase):
|
|||
/ carbonara.SplitKey.POINTS_PER_SPLIT),
|
||||
len(grouped_points))
|
||||
self.assertEqual("0.0",
|
||||
str(carbonara.SplitKey(grouped_points[0][0])))
|
||||
str(carbonara.SplitKey(grouped_points[0][0], 0)))
|
||||
# 3600 × 5s = 5 hours
|
||||
self.assertEqual(datetime.datetime(1970, 1, 1, 5),
|
||||
grouped_points[1][0])
|
||||
grouped_points[1][0].as_datetime())
|
||||
self.assertEqual(carbonara.SplitKey.POINTS_PER_SPLIT,
|
||||
len(grouped_points[0][1]))
|
||||
|
||||
|
|
|
@ -24,8 +24,6 @@ import uuid
|
|||
|
||||
from keystonemiddleware import fixture as ksm_fixture
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_middleware import cors
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
from stevedore import extension
|
||||
|
@ -132,13 +130,6 @@ class RestTest(tests_base.TestCase, testscenarios.TestWithScenarios):
|
|||
self.path_get('etc/gnocchi/api-paste.ini'),
|
||||
group="api")
|
||||
|
||||
# NOTE(sileht): This is not concurrency safe, but only this tests file
|
||||
# deal with cors, so we are fine. set_override don't work because
|
||||
# cors group doesn't yet exists, and we the CORS middleware is created
|
||||
# it register the option and directly copy value of all configurations
|
||||
# options making impossible to override them properly...
|
||||
cfg.set_defaults(cors.CORS_OPTS, allowed_origin="http://foobar.com")
|
||||
|
||||
self.auth_token_fixture = self.useFixture(
|
||||
ksm_fixture.AuthTokenFixture())
|
||||
self.auth_token_fixture.add_token_data(
|
||||
|
@ -180,33 +171,6 @@ class RestTest(tests_base.TestCase, testscenarios.TestWithScenarios):
|
|||
|
||||
|
||||
class RootTest(RestTest):
|
||||
|
||||
def _do_test_cors(self):
|
||||
resp = self.app.options(
|
||||
"/v1/status",
|
||||
headers={'Origin': 'http://notallowed.com',
|
||||
'Access-Control-Request-Method': 'GET'},
|
||||
status=200)
|
||||
headers = dict(resp.headers)
|
||||
self.assertNotIn("Access-Control-Allow-Origin", headers)
|
||||
self.assertNotIn("Access-Control-Allow-Methods", headers)
|
||||
resp = self.app.options(
|
||||
"/v1/status",
|
||||
headers={'origin': 'http://foobar.com',
|
||||
'Access-Control-Request-Method': 'GET'},
|
||||
status=200)
|
||||
headers = dict(resp.headers)
|
||||
self.assertIn("Access-Control-Allow-Origin", headers)
|
||||
self.assertIn("Access-Control-Allow-Methods", headers)
|
||||
|
||||
def test_cors_invalid_token(self):
|
||||
with self.app.use_invalid_token():
|
||||
self._do_test_cors()
|
||||
|
||||
def test_cors_no_token(self):
|
||||
with self.app.use_no_token():
|
||||
self._do_test_cors()
|
||||
|
||||
def test_deserialize_force_json(self):
|
||||
with self.app.use_admin_user():
|
||||
self.app.post(
|
||||
|
|
|
@ -34,7 +34,7 @@ class TestStatsd(tests_base.TestCase):
|
|||
super(TestStatsd, self).setUp()
|
||||
|
||||
self.conf.set_override("resource_id",
|
||||
uuid.uuid4(), "statsd")
|
||||
str(uuid.uuid4()), "statsd")
|
||||
self.conf.set_override("user_id",
|
||||
self.STATSD_USER_ID, "statsd")
|
||||
self.conf.set_override("project_id",
|
||||
|
|
|
@ -67,9 +67,9 @@ class TestStorageDriver(tests_base.TestCase):
|
|||
])
|
||||
|
||||
with mock.patch('gnocchi.carbonara.AggregatedTimeSerie.unserialize',
|
||||
side_effect=ValueError("boom!")):
|
||||
side_effect=carbonara.InvalidData()):
|
||||
with mock.patch('gnocchi.carbonara.BoundTimeSerie.unserialize',
|
||||
side_effect=ValueError("boom!")):
|
||||
side_effect=carbonara.InvalidData()):
|
||||
self.trigger_processing()
|
||||
|
||||
m = self.storage.get_measures(self.metric)
|
||||
|
@ -310,6 +310,220 @@ class TestStorageDriver(tests_base.TestCase):
|
|||
(utils.datetime_utc(2016, 1, 10, 17, 12), 60.0, 46),
|
||||
], self.storage.get_measures(self.metric, granularity=60.0))
|
||||
|
||||
def test_rewrite_measures_oldest_mutable_timestamp_eq_next_key(self):
|
||||
"""See LP#1655422"""
|
||||
# Create an archive policy that spans on several splits. Each split
|
||||
# being 3600 points, let's go for 36k points so we have 10 splits.
|
||||
apname = str(uuid.uuid4())
|
||||
ap = archive_policy.ArchivePolicy(apname, 0, [(36000, 60)])
|
||||
self.index.create_archive_policy(ap)
|
||||
self.metric = storage.Metric(uuid.uuid4(), ap)
|
||||
self.index.create_metric(self.metric.id, str(uuid.uuid4()),
|
||||
str(uuid.uuid4()),
|
||||
apname)
|
||||
|
||||
# First store some points scattered across different splits
|
||||
self.storage.add_measures(self.metric, [
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 1, 12, 0, 1), 69),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 2, 13, 7, 31), 42),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 4, 14, 9, 31), 4),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 6, 15, 12, 45), 44),
|
||||
])
|
||||
self.trigger_processing()
|
||||
|
||||
splits = {'1451520000.0', '1451736000.0', '1451952000.0'}
|
||||
self.assertEqual(splits,
|
||||
self.storage._list_split_keys_for_metric(
|
||||
self.metric, "mean", 60.0))
|
||||
|
||||
if self.storage.WRITE_FULL:
|
||||
assertCompressedIfWriteFull = self.assertTrue
|
||||
else:
|
||||
assertCompressedIfWriteFull = self.assertFalse
|
||||
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451520000.0', "mean", 60.0)
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451736000.0', "mean", 60.0)
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451952000.0', "mean", 60.0)
|
||||
assertCompressedIfWriteFull(
|
||||
carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
|
||||
self.assertEqual([
|
||||
(utils.datetime_utc(2016, 1, 1, 12), 60.0, 69),
|
||||
(utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42),
|
||||
(utils.datetime_utc(2016, 1, 4, 14, 9), 60.0, 4),
|
||||
(utils.datetime_utc(2016, 1, 6, 15, 12), 60.0, 44),
|
||||
], self.storage.get_measures(self.metric, granularity=60.0))
|
||||
|
||||
# Now store brand new points that should force a rewrite of one of the
|
||||
# split (keep in mind the back window size in one hour here). We move
|
||||
# the BoundTimeSerie processing timeserie far away from its current
|
||||
# range.
|
||||
|
||||
# Here we test a special case where the oldest_mutable_timestamp will
|
||||
# be 2016-01-10TOO:OO:OO = 1452384000.0, our new split key.
|
||||
self.storage.add_measures(self.metric, [
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 10, 0, 12), 45),
|
||||
])
|
||||
self.trigger_processing()
|
||||
|
||||
self.assertEqual({'1452384000.0', '1451736000.0',
|
||||
'1451520000.0', '1451952000.0'},
|
||||
self.storage._list_split_keys_for_metric(
|
||||
self.metric, "mean", 60.0))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451520000.0', "mean", 60.0)
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451736000.0', "mean", 60.0)
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451952000.0', "mean", 60.0)
|
||||
# Now this one is compressed because it has been rewritten!
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1452384000.0', "mean", 60.0)
|
||||
assertCompressedIfWriteFull(
|
||||
carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
|
||||
self.assertEqual([
|
||||
(utils.datetime_utc(2016, 1, 1, 12), 60.0, 69),
|
||||
(utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42),
|
||||
(utils.datetime_utc(2016, 1, 4, 14, 9), 60.0, 4),
|
||||
(utils.datetime_utc(2016, 1, 6, 15, 12), 60.0, 44),
|
||||
(utils.datetime_utc(2016, 1, 10, 0, 12), 60.0, 45),
|
||||
], self.storage.get_measures(self.metric, granularity=60.0))
|
||||
|
||||
def test_rewrite_measures_corruption_missing_file(self):
|
||||
# Create an archive policy that spans on several splits. Each split
|
||||
# being 3600 points, let's go for 36k points so we have 10 splits.
|
||||
apname = str(uuid.uuid4())
|
||||
ap = archive_policy.ArchivePolicy(apname, 0, [(36000, 60)])
|
||||
self.index.create_archive_policy(ap)
|
||||
self.metric = storage.Metric(uuid.uuid4(), ap)
|
||||
self.index.create_metric(self.metric.id, str(uuid.uuid4()),
|
||||
str(uuid.uuid4()),
|
||||
apname)
|
||||
|
||||
# First store some points scattered across different splits
|
||||
self.storage.add_measures(self.metric, [
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 1, 12, 0, 1), 69),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 2, 13, 7, 31), 42),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 4, 14, 9, 31), 4),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 6, 15, 12, 45), 44),
|
||||
])
|
||||
self.trigger_processing()
|
||||
|
||||
splits = {'1451520000.0', '1451736000.0', '1451952000.0'}
|
||||
self.assertEqual(splits,
|
||||
self.storage._list_split_keys_for_metric(
|
||||
self.metric, "mean", 60.0))
|
||||
|
||||
if self.storage.WRITE_FULL:
|
||||
assertCompressedIfWriteFull = self.assertTrue
|
||||
else:
|
||||
assertCompressedIfWriteFull = self.assertFalse
|
||||
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451520000.0', "mean", 60.0)
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451736000.0', "mean", 60.0)
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451952000.0', "mean", 60.0)
|
||||
assertCompressedIfWriteFull(
|
||||
carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
|
||||
self.assertEqual([
|
||||
(utils.datetime_utc(2016, 1, 1, 12), 60.0, 69),
|
||||
(utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42),
|
||||
(utils.datetime_utc(2016, 1, 4, 14, 9), 60.0, 4),
|
||||
(utils.datetime_utc(2016, 1, 6, 15, 12), 60.0, 44),
|
||||
], self.storage.get_measures(self.metric, granularity=60.0))
|
||||
|
||||
# Test what happens if we delete the latest split and then need to
|
||||
# compress it!
|
||||
self.storage._delete_metric_measures(self.metric,
|
||||
'1451952000.0',
|
||||
'mean', 60.0)
|
||||
|
||||
# Now store brand new points that should force a rewrite of one of the
|
||||
# split (keep in mind the back window size in one hour here). We move
|
||||
# the BoundTimeSerie processing timeserie far away from its current
|
||||
# range.
|
||||
self.storage.add_measures(self.metric, [
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 10, 16, 18, 45), 45),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 10, 17, 12, 45), 46),
|
||||
])
|
||||
self.trigger_processing()
|
||||
|
||||
def test_rewrite_measures_corruption_bad_data(self):
|
||||
# Create an archive policy that spans on several splits. Each split
|
||||
# being 3600 points, let's go for 36k points so we have 10 splits.
|
||||
apname = str(uuid.uuid4())
|
||||
ap = archive_policy.ArchivePolicy(apname, 0, [(36000, 60)])
|
||||
self.index.create_archive_policy(ap)
|
||||
self.metric = storage.Metric(uuid.uuid4(), ap)
|
||||
self.index.create_metric(self.metric.id, str(uuid.uuid4()),
|
||||
str(uuid.uuid4()),
|
||||
apname)
|
||||
|
||||
# First store some points scattered across different splits
|
||||
self.storage.add_measures(self.metric, [
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 1, 12, 0, 1), 69),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 2, 13, 7, 31), 42),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 4, 14, 9, 31), 4),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 6, 15, 12, 45), 44),
|
||||
])
|
||||
self.trigger_processing()
|
||||
|
||||
splits = {'1451520000.0', '1451736000.0', '1451952000.0'}
|
||||
self.assertEqual(splits,
|
||||
self.storage._list_split_keys_for_metric(
|
||||
self.metric, "mean", 60.0))
|
||||
|
||||
if self.storage.WRITE_FULL:
|
||||
assertCompressedIfWriteFull = self.assertTrue
|
||||
else:
|
||||
assertCompressedIfWriteFull = self.assertFalse
|
||||
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451520000.0', "mean", 60.0)
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451736000.0', "mean", 60.0)
|
||||
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
data = self.storage._get_measures(
|
||||
self.metric, '1451952000.0', "mean", 60.0)
|
||||
assertCompressedIfWriteFull(
|
||||
carbonara.AggregatedTimeSerie.is_compressed(data))
|
||||
|
||||
self.assertEqual([
|
||||
(utils.datetime_utc(2016, 1, 1, 12), 60.0, 69),
|
||||
(utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42),
|
||||
(utils.datetime_utc(2016, 1, 4, 14, 9), 60.0, 4),
|
||||
(utils.datetime_utc(2016, 1, 6, 15, 12), 60.0, 44),
|
||||
], self.storage.get_measures(self.metric, granularity=60.0))
|
||||
|
||||
# Test what happens if we write garbage
|
||||
self.storage._store_metric_measures(
|
||||
self.metric, '1451952000.0', "mean", 60.0, b"oh really?")
|
||||
|
||||
# Now store brand new points that should force a rewrite of one of the
|
||||
# split (keep in mind the back window size in one hour here). We move
|
||||
# the BoundTimeSerie processing timeserie far away from its current
|
||||
# range.
|
||||
self.storage.add_measures(self.metric, [
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 10, 16, 18, 45), 45),
|
||||
storage.Measure(utils.datetime_utc(2016, 1, 10, 17, 12, 45), 46),
|
||||
])
|
||||
self.trigger_processing()
|
||||
|
||||
def test_updated_measures(self):
|
||||
self.storage.add_measures(self.metric, [
|
||||
storage.Measure(utils.datetime_utc(2014, 1, 1, 12, 0, 1), 69),
|
||||
|
|
|
@ -14,13 +14,19 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
import iso8601
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
from pytimeparse import timeparse
|
||||
import retrying
|
||||
import six
|
||||
import uuid
|
||||
from tooz import coordination
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
# uuid5 namespace for id transformation.
|
||||
# NOTE(chdent): This UUID must stay the same, forever, across all
|
||||
|
@ -64,6 +70,23 @@ retry = retrying.retry(wait_exponential_multiplier=500,
|
|||
retry_on_exception=retry_if_retry_is_raised)
|
||||
|
||||
|
||||
# TODO(jd) Move this to tooz?
|
||||
@retry
|
||||
def _enable_coordination(coord):
|
||||
try:
|
||||
coord.start(start_heart=True)
|
||||
except Exception as e:
|
||||
LOG.error("Unable to start coordinator: %s", e)
|
||||
raise Retry(e)
|
||||
|
||||
|
||||
def get_coordinator_and_start(url):
|
||||
my_id = str(uuid.uuid4())
|
||||
coord = coordination.get_coordinator(url, my_id)
|
||||
_enable_coordination(coord)
|
||||
return coord, my_id
|
||||
|
||||
|
||||
def to_timestamp(v):
|
||||
if isinstance(v, datetime.datetime):
|
||||
return v
|
||||
|
|
|
@ -2,7 +2,7 @@ pbr
|
|||
numpy
|
||||
iso8601
|
||||
oslo.config>=2.6.0
|
||||
oslo.log>=1.0.0
|
||||
oslo.log>=2.3.0
|
||||
oslo.policy>=0.3.0
|
||||
oslo.serialization>=1.4.0
|
||||
oslo.utils>=3.3.0
|
||||
|
|
|
@ -25,13 +25,13 @@ keystone =
|
|||
keystonemiddleware>=4.0.0
|
||||
mysql =
|
||||
pymysql
|
||||
oslo.db>=4.8.0,!=4.13.1,!=4.13.2
|
||||
oslo.db>=4.8.0,!=4.13.1,!=4.13.2,!=4.15.0
|
||||
sqlalchemy
|
||||
sqlalchemy-utils
|
||||
alembic>=0.7.6,!=0.8.1
|
||||
postgresql =
|
||||
psycopg2
|
||||
oslo.db>=4.8.0,!=4.13.1,!=4.13.2
|
||||
oslo.db>=4.8.0,!=4.13.1,!=4.13.2,!=4.15.0
|
||||
sqlalchemy
|
||||
sqlalchemy-utils
|
||||
alembic>=0.7.6,!=0.8.1
|
||||
|
@ -70,6 +70,7 @@ test =
|
|||
os-testr
|
||||
testrepository
|
||||
testscenarios
|
||||
testresources>=0.2.4 # Apache-2.0/BSD
|
||||
testtools>=0.9.38
|
||||
WebTest>=2.0.16
|
||||
doc8
|
||||
|
|
Loading…
Reference in New Issue