Remove upgrade code from 2.2 and 3.0

We don't care anymore in this next release.

Change-Id: I0c6f3166b1eb5226a7d435cca3d2f14ce7991741
Sem-Ver: api-break
This commit is contained in:
Julien Danjou 2017-02-27 11:56:19 +01:00
parent a3cd3b7816
commit 6d4abc3a6d
6 changed files with 9 additions and 383 deletions

View File

@ -21,7 +21,6 @@ import operator
from concurrent import futures
import iso8601
import msgpack
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
@ -354,83 +353,6 @@ class CarbonaraBasedStorage(storage.StorageDriver):
aggregation, granularity, version=3):
raise NotImplementedError
def _check_for_metric_upgrade(self, metric):
# FIXME(gordc): this is only required for v2.x to v3.x storage upgrade.
# we should make storage version easily detectable rather than
# checking each metric individually
lock = self._lock(metric.id)
with lock:
try:
old_unaggregated = self._get_unaggregated_timeserie_and_unserialize_v2( # noqa
metric)
except (storage.MetricDoesNotExist, CorruptionError) as e:
# This case can happen if v3.0 to v3.x or if no measures
# pushed. skip the rest of upgrade on metric.
LOG.debug(
"Unable to find v2 unaggregated timeserie for "
"metric %s, no data to upgrade: %s",
metric.id, e)
return
unaggregated = carbonara.BoundTimeSerie(
ts=old_unaggregated.ts,
block_size=metric.archive_policy.max_block_size,
back_window=metric.archive_policy.back_window)
# Upgrade unaggregated timeserie to v3
self._store_unaggregated_timeserie(
metric, unaggregated.serialize())
oldest_mutable_timestamp = (
unaggregated.first_block_timestamp()
)
for agg_method, d in itertools.product(
metric.archive_policy.aggregation_methods,
metric.archive_policy.definition):
LOG.debug(
"Checking if the metric %s needs migration for %s",
metric, agg_method)
try:
all_keys = self._list_split_keys_for_metric(
metric, agg_method, d.granularity, version=2)
except storage.MetricDoesNotExist:
# Just try the next metric, this one has no measures
break
else:
LOG.info("Migrating metric %s to new format", metric)
timeseries = filter(
lambda x: x is not None,
self._map_in_thread(
self._get_measures_and_unserialize_v2,
((metric, key, agg_method, d.granularity)
for key in all_keys))
)
ts = carbonara.AggregatedTimeSerie.from_timeseries(
sampling=d.granularity,
aggregation_method=agg_method,
timeseries=timeseries, max_size=d.points)
for key, split in ts.split():
self._store_timeserie_split(
metric, key, split,
ts.aggregation_method,
d, oldest_mutable_timestamp)
for key in all_keys:
self._delete_metric_measures(
metric, key, agg_method,
d.granularity, version=None)
self._delete_unaggregated_timeserie(metric, version=None)
LOG.info("Migrated metric %s to new format", metric)
def upgrade(self, index):
marker = None
while True:
metrics = [(metric,) for metric in
index.list_metrics(limit=self.UPGRADE_BATCH_SIZE,
marker=marker)]
self._map_in_thread(self._check_for_metric_upgrade, metrics)
if len(metrics) == 0:
break
marker = metrics[-1][0].id
def process_new_measures(self, indexer, metrics_to_process,
sync=False):
# process only active metrics. deleted metrics with unprocessed
@ -645,31 +567,3 @@ class CarbonaraBasedStorage(storage.StorageDriver):
# We use 'list' to iterate all threads here to raise the first
# exception now, not much choice
return list(executor.map(lambda args: method(*args), list_of_args))
@staticmethod
def _unserialize_timeserie_v2(data):
return carbonara.TimeSerie.from_data(
*carbonara.TimeSerie._timestamps_and_values_from_dict(
msgpack.loads(data, encoding='utf-8')['values']),
clean=True)
def _get_unaggregated_timeserie_and_unserialize_v2(self, metric):
"""Unserialization method for unaggregated v2 timeseries."""
data = self._get_unaggregated_timeserie(metric, version=None)
try:
return self._unserialize_timeserie_v2(data)
except ValueError:
LOG.error("Data corruption detected for %s ignoring.", metric.id)
def _get_measures_and_unserialize_v2(self, metric, key,
aggregation, granularity):
"""Unserialization method for upgrading v2 objects. Upgrade only."""
data = self._get_measures(
metric, key, aggregation, granularity, version=None)
try:
return self._unserialize_timeserie_v2(data)
except ValueError:
LOG.error("Data corruption detected for %s "
"aggregated `%s' timeserie, granularity `%s' "
"around time `%s', ignoring.",
metric.id, aggregation, granularity, key)

View File

@ -50,35 +50,6 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
ceph.close_rados_connection(self.rados, self.ioctx)
super(CephStorage, self).stop()
def _check_for_metric_upgrade(self, metric):
lock = self._lock(metric.id)
with lock:
container = "gnocchi_%s_container" % metric.id
unagg_obj = self._build_unaggregated_timeserie_path(metric, 3)
try:
xattrs = tuple(k for k, v in self.ioctx.get_xattrs(container))
except rados.ObjectNotFound:
# this means already upgraded or some corruption? move on.
pass
else:
# if xattrs are found, it means we're coming from
# gnocchiv2. migrate to omap accordingly.
if xattrs:
keys = xattrs
# if no xattrs but object exists, it means it already
# migrated to v3 and now upgrade to use single object
else:
with rados.ReadOpCtx() as op:
omaps, ret = self.ioctx.get_omap_vals(op, "", "", -1)
self.ioctx.operate_read_op(op, container)
keys = (k for k, __ in omaps)
with rados.WriteOpCtx() as op:
self.ioctx.set_omap(op, keys,
tuple([b""] * len(keys)))
self.ioctx.operate_write_op(op, unagg_obj)
self.ioctx.remove_object(container)
super(CephStorage, self)._check_for_metric_upgrade(metric)
@staticmethod
def _get_object_name(metric, timestamp_key, aggregation, granularity,
version=3):

View File

@ -18,7 +18,6 @@ import itertools
import struct
from oslo_log import log
from oslo_serialization import msgpackutils
import pandas
import six.moves
@ -38,14 +37,10 @@ class CarbonaraBasedStorage(incoming.StorageDriver):
measures = struct.unpack(
"<" + self._MEASURE_SERIAL_FORMAT * nb_measures, data)
except struct.error:
# This either a corruption, either a v2 measures
try:
return msgpackutils.loads(data)
except ValueError:
LOG.error(
"Unable to decode measure %s, possible data corruption",
measure_id)
raise
LOG.error(
"Unable to decode measure %s, possible data corruption",
measure_id)
raise
return six.moves.zip(
pandas.to_datetime(measures[::2], unit='ns'),
itertools.islice(measures, 1, len(measures), 2))

View File

@ -1,210 +0,0 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2015-2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import itertools
import uuid
import mock
import msgpack
import six
from gnocchi import carbonara
from gnocchi import storage
from gnocchi.storage import _carbonara
from gnocchi.tests import base as tests_base
from gnocchi import utils
def _serialize_v2(split):
d = {'values': dict((timestamp.value, float(v))
for timestamp, v
in six.iteritems(split.ts.dropna()))}
return msgpack.dumps(d)
class TestCarbonaraMigration(tests_base.TestCase):
def setUp(self):
super(TestCarbonaraMigration, self).setUp()
if not isinstance(self.storage, _carbonara.CarbonaraBasedStorage):
self.skipTest("This driver is not based on Carbonara")
self.metric = storage.Metric(uuid.uuid4(),
self.archive_policies['low'])
self.storage._create_metric(self.metric)
with mock.patch('gnocchi.carbonara.SplitKey.'
'POINTS_PER_SPLIT', 14400):
bts = carbonara.BoundTimeSerie(
block_size=self.metric.archive_policy.max_block_size,
back_window=self.metric.archive_policy.back_window)
# NOTE: there is a split at 2016-07-18 on granularity 300
values = ((datetime.datetime(2016, 7, 17, 23, 59, 0), 4),
(datetime.datetime(2016, 7, 17, 23, 59, 4), 5),
(datetime.datetime(2016, 7, 17, 23, 59, 9), 6),
(datetime.datetime(2016, 7, 18, 0, 0, 0), 7),
(datetime.datetime(2016, 7, 18, 0, 0, 4), 8),
(datetime.datetime(2016, 7, 18, 0, 0, 9), 9))
def _before_truncate(bound_timeserie):
for d, agg in itertools.product(
self.metric.archive_policy.definition,
['mean', 'max']):
grouped = bound_timeserie.group_serie(
d.granularity, carbonara.round_timestamp(
bound_timeserie.first, d.granularity * 10e8))
aggts = carbonara.AggregatedTimeSerie.from_grouped_serie(
grouped, d.granularity, agg, max_size=d.points)
for key, split in aggts.split():
self.storage._store_metric_measures(
self.metric,
str(key),
agg, d.granularity,
_serialize_v2(split), offset=None, version=None)
bts.set_values(values, before_truncate_callback=_before_truncate)
self.storage._store_unaggregated_timeserie(self.metric,
_serialize_v2(bts),
version=None)
def upgrade(self):
with mock.patch.object(self.index, 'list_metrics') as f:
f.side_effect = [[self.metric], []]
self.storage.upgrade(self.index)
def test_get_measures(self):
with mock.patch.object(
self.storage, '_get_measures_and_unserialize',
side_effect=self.storage._get_measures_and_unserialize_v2):
self.assertEqual([
(utils.datetime_utc(2016, 7, 17), 86400, 5),
(utils.datetime_utc(2016, 7, 18), 86400, 8),
(utils.datetime_utc(2016, 7, 17, 23), 3600, 5),
(utils.datetime_utc(2016, 7, 18, 0), 3600, 8),
(utils.datetime_utc(2016, 7, 17, 23, 55), 300, 5),
(utils.datetime_utc(2016, 7, 18, 0), 300, 8)
], self.storage.get_measures(self.metric))
self.assertEqual([
(utils.datetime_utc(2016, 7, 17), 86400, 6),
(utils.datetime_utc(2016, 7, 18), 86400, 9),
(utils.datetime_utc(2016, 7, 17, 23), 3600, 6),
(utils.datetime_utc(2016, 7, 18, 0), 3600, 9),
(utils.datetime_utc(2016, 7, 17, 23, 55), 300, 6),
(utils.datetime_utc(2016, 7, 18, 0), 300, 9)
], self.storage.get_measures(self.metric, aggregation='max'))
self.upgrade()
self.assertEqual([
(utils.datetime_utc(2016, 7, 17), 86400, 5),
(utils.datetime_utc(2016, 7, 18), 86400, 8),
(utils.datetime_utc(2016, 7, 17, 23), 3600, 5),
(utils.datetime_utc(2016, 7, 18, 0), 3600, 8),
(utils.datetime_utc(2016, 7, 17, 23, 55), 300, 5),
(utils.datetime_utc(2016, 7, 18, 0), 300, 8)
], self.storage.get_measures(self.metric))
self.assertEqual([
(utils.datetime_utc(2016, 7, 17), 86400, 6),
(utils.datetime_utc(2016, 7, 18), 86400, 9),
(utils.datetime_utc(2016, 7, 17, 23), 3600, 6),
(utils.datetime_utc(2016, 7, 18, 0), 3600, 9),
(utils.datetime_utc(2016, 7, 17, 23, 55), 300, 6),
(utils.datetime_utc(2016, 7, 18, 0), 300, 9)
], self.storage.get_measures(self.metric, aggregation='max'))
with mock.patch.object(
self.storage, '_get_measures_and_unserialize',
side_effect=self.storage._get_measures_and_unserialize_v2):
self.assertRaises(
storage.AggregationDoesNotExist,
self.storage.get_measures, self.metric)
self.assertRaises(
storage.AggregationDoesNotExist,
self.storage.get_measures, self.metric, aggregation='max')
self.storage.incoming.add_measures(self.metric, [
storage.Measure(utils.dt_to_unix_ns(2016, 7, 18), 69),
storage.Measure(utils.dt_to_unix_ns(2016, 7, 18, 1, 1), 64),
])
with mock.patch.object(self.index, 'list_metrics') as f:
f.side_effect = [[self.metric], []]
self.storage.process_background_tasks(
self.index, [str(self.metric.id)], sync=True)
self.assertEqual([
(utils.datetime_utc(2016, 7, 17), 86400, 6),
(utils.datetime_utc(2016, 7, 18), 86400, 69),
(utils.datetime_utc(2016, 7, 17, 23), 3600, 6),
(utils.datetime_utc(2016, 7, 18, 0), 3600, 69),
(utils.datetime_utc(2016, 7, 18, 1), 3600, 64),
(utils.datetime_utc(2016, 7, 18, 0), 300, 69),
(utils.datetime_utc(2016, 7, 18, 1), 300, 64)
], self.storage.get_measures(self.metric, aggregation='max'))
def test_upgrade_upgraded_storage(self):
with mock.patch.object(
self.storage, '_get_measures_and_unserialize',
side_effect=self.storage._get_measures_and_unserialize_v2):
self.assertEqual([
(utils.datetime_utc(2016, 7, 17), 86400, 5),
(utils.datetime_utc(2016, 7, 18), 86400, 8),
(utils.datetime_utc(2016, 7, 17, 23), 3600, 5),
(utils.datetime_utc(2016, 7, 18, 0), 3600, 8),
(utils.datetime_utc(2016, 7, 17, 23, 55), 300, 5),
(utils.datetime_utc(2016, 7, 18, 0), 300, 8)
], self.storage.get_measures(self.metric))
self.assertEqual([
(utils.datetime_utc(2016, 7, 17), 86400, 6),
(utils.datetime_utc(2016, 7, 18), 86400, 9),
(utils.datetime_utc(2016, 7, 17, 23), 3600, 6),
(utils.datetime_utc(2016, 7, 18, 0), 3600, 9),
(utils.datetime_utc(2016, 7, 17, 23, 55), 300, 6),
(utils.datetime_utc(2016, 7, 18, 0), 300, 9)
], self.storage.get_measures(self.metric, aggregation='max'))
self.upgrade()
self.upgrade()
self.assertEqual([
(utils.datetime_utc(2016, 7, 17), 86400, 5),
(utils.datetime_utc(2016, 7, 18), 86400, 8),
(utils.datetime_utc(2016, 7, 17, 23), 3600, 5),
(utils.datetime_utc(2016, 7, 18, 0), 3600, 8),
(utils.datetime_utc(2016, 7, 17, 23, 55), 300, 5),
(utils.datetime_utc(2016, 7, 18, 0), 300, 8)
], self.storage.get_measures(self.metric))
self.assertEqual([
(utils.datetime_utc(2016, 7, 17), 86400, 6),
(utils.datetime_utc(2016, 7, 18), 86400, 9),
(utils.datetime_utc(2016, 7, 17, 23), 3600, 6),
(utils.datetime_utc(2016, 7, 18, 0), 3600, 9),
(utils.datetime_utc(2016, 7, 17, 23, 55), 300, 6),
(utils.datetime_utc(2016, 7, 18, 0), 300, 9)
], self.storage.get_measures(self.metric, aggregation='max'))
def test_delete_metric_not_upgraded(self):
# Make sure that we delete everything (e.g. objects + container)
# correctly even if the metric has not been upgraded.
self.storage.delete_metric(self.metric)
self.assertEqual([], self.storage.get_measures(self.metric))

View File

@ -0,0 +1,4 @@
---
upgrade:
- |
The storage upgrade is only supported from version 3.1.

30
tox.ini
View File

@ -1,6 +1,6 @@
[tox]
minversion = 1.8
envlist = py{35,27}-{postgresql,mysql}{,-file,-swift,-ceph,-s3},pep8,bashate,py35-postgresql-file-upgrade-from-2.2,py27-mysql-ceph-upgrade-from-2.2
envlist = py{35,27}-{postgresql,mysql}{,-file,-swift,-ceph,-s3},pep8,bashate
[testenv]
usedevelop = True
@ -62,34 +62,6 @@ deps = gnocchi[{env:GNOCCHI_VARIANT}]>=3.1,<3.2
pifpaf>=0.13
commands = pifpaf --env-prefix INDEXER run mysql -- pifpaf --env-prefix STORAGE run ceph {toxinidir}/run-upgrade-tests.sh {posargs}
[testenv:py35-postgresql-file-upgrade-from-2.2]
# We should always recreate since the script upgrade
# Gnocchi we can't reuse the virtualenv
envdir = upgrade
recreate = True
skip_install = True
usedevelop = False
setenv = GNOCCHI_VARIANT=test,postgresql,file
deps = gnocchi[{env:GNOCCHI_VARIANT}]>=2.2,<2.3
pifpaf>=0.13
gnocchiclient>=2.8.0
commands = pifpaf --env-prefix INDEXER run postgresql {toxinidir}/run-upgrade-tests.sh {posargs}
[testenv:py27-mysql-ceph-upgrade-from-2.2]
# We should always recreate since the script upgrade
# Gnocchi we can't reuse the virtualenv
envdir = upgrade
recreate = True
skip_install = True
usedevelop = False
setenv = GNOCCHI_VARIANT=test,mysql,ceph,ceph_recommended_lib
deps = gnocchi[{env:GNOCCHI_VARIANT}]>=2.2,<2.3
gnocchiclient>=2.8.0
pifpaf>=0.13
cradox
# cradox is required because 2.2 extra names are incorrect
commands = pifpaf --env-prefix INDEXER run mysql -- pifpaf --env-prefix STORAGE run ceph {toxinidir}/run-upgrade-tests.sh {posargs}
[testenv:bashate]
deps = bashate
commands = bashate -v devstack/plugin.sh devstack/gate/gate_hook.sh devstack/gate/post_test_hook.sh