Fix an issue of database migration

We did not migrate all old atomic to new format, just convert
them while getting data from database and update, so it maybe
occur an error while merging aotmic at line #104. I think it
is the time that we update all old atomic to new format, and
remove the lazy-migration.
NOTE:
It is ok to modify existing migration, since we have 2 cases:
1) the database doesn't include results in old format. In this case the
   old migration succedded and change of it doesn't affect anything.
   Everything is ok here
2) the database includes results in old format. In this case the old
   migration will fail and do not actually apply, so it will be possible
   to apply it again(with new code)

Change-Id: Idb2a0ffd161163537e62ced320f8da033f4e04c9
This commit is contained in:
chenhb 2017-07-13 10:49:09 +08:00
parent 19b85d4e6d
commit 7d457c2540
4 changed files with 15 additions and 86 deletions

View File

@ -17,7 +17,6 @@ SQLAlchemy implementation for DB.API
"""
import collections
import copy
import datetime as dt
import os
import time
@ -251,27 +250,6 @@ class Connection(object):
results = (self.model_query(models.WorkloadData, session=session).
filter_by(workload_uuid=workload_uuid).
order_by(models.WorkloadData.chunk_order.asc()))
if results.first() and results[0].chunk_data["raw"] and isinstance(
results[0].chunk_data["raw"][0]["atomic_actions"], dict):
# NOTE(andreykurilin): It is an old format of atomic actions.
# We do not have migration yet, since it can take too much
# time on the big databases. Let's lazy-migrate results which
# user greps and force a migration after several releases.
for workload_data in results:
chunk_data = copy.deepcopy(workload_data.chunk_data)
for chunk in chunk_data["raw"]:
new_atomic_actions = []
started_at = chunk["timestamp"]
for name, d in chunk["atomic_actions"].items():
finished_at = started_at + d
new_atomic_actions.append(
{"name": name, "children": [],
"started_at": started_at,
"finished_at": finished_at})
started_at = finished_at
chunk["atomic_actions"] = new_atomic_actions
workload_data.update({"chunk_data": chunk_data})
return sorted([raw for workload_data in results
for raw in workload_data.chunk_data["raw"]],

View File

@ -77,6 +77,18 @@ def upgrade():
"chart": "OutputStackedAreaChart"})
del itr["scenario_output"]
require_updating = True
if isinstance(itr["atomic_actions"], dict):
new_atomic_actions = []
started_at = itr["timestamp"]
for name, d in itr["atomic_actions"].items():
finished_at = started_at + d
new_atomic_actions.append(
{"name": name, "children": [],
"started_at": started_at,
"finished_at": finished_at})
started_at = finished_at
itr["atomic_actions"] = new_atomic_actions
require_updating = True
if require_updating:
connection.execute(workload_data_helper.update().where(

View File

@ -15,7 +15,6 @@
"""Tests for db.api layer."""
import collections
import copy
import datetime as dt
@ -182,65 +181,6 @@ class TasksTestCase(test.DBTestCase):
"fcd0483f-a405-44c4-b712-99c9e52254eb",
status=consts.TaskStatus.FINISHED)
def test_task_get_detailed__transform_atomics(self):
task_id = self._create_task()["uuid"]
subtask = db.subtask_create(task_id, title="foo")
workload = db.workload_create(
task_id, subtask["uuid"], name="atata", description="foo",
position=0, args={}, context={}, sla={}, runner={},
runner_type="r", hooks=[])
workloads_data = [
{"raw": [
{"duration": 1, "timestamp": 1, "idle_duration": 1,
"error": None, "output": None,
"atomic_actions": collections.OrderedDict(
[("foo", 1), ("bar", 2)])},
{"duration": 1, "timestamp": 1, "idle_duration": 1,
"error": None, "output": None,
"atomic_actions": collections.OrderedDict(
[("xxx", 1), ("yyy", 2)])},
]},
{"raw": [
{"duration": 1, "timestamp": 1, "idle_duration": 1,
"error": None, "output": None,
"atomic_actions": collections.OrderedDict(
[("xxx", 1), ("yyy", 2), ("zzz", 2)])}
]}
]
for i, data in enumerate(workloads_data):
db.workload_data_create(task_id, workload["uuid"], i, data)
db.workload_set_results(workload_uuid=workload["uuid"],
subtask_uuid=workload["subtask_uuid"],
task_uuid=workload["task_uuid"],
sla_results=[{"success": True}],
load_duration=13, full_duration=42,
start_time=77.33)
task = db.task_get(task_id, detailed=True)
self.assertEqual(1, len(task["subtasks"]))
self.assertEqual(1, len(task["subtasks"][0]["workloads"]))
workload = task["subtasks"][0]["workloads"][0]
self.assertEqual(
[[{"started_at": 1, "finished_at": 2, "children": [],
"name": "foo"},
{"started_at": 2, "finished_at": 4, "children": [],
"name": "bar"}],
[{"started_at": 1, "finished_at": 2, "children": [],
"name": "xxx"},
{"started_at": 2, "finished_at": 4, "children": [],
"name": "yyy"}],
[{"started_at": 1, "finished_at": 2, "children": [],
"name": "xxx"},
{"started_at": 2, "finished_at": 4, "children": [],
"name": "yyy"},
{"started_at": 4, "finished_at": 6, "children": [],
"name": "zzz"}]],
[w["atomic_actions"] for w in workload["data"]])
def test_task_create_and_get_detailed(self):
validation_result = {
"etype": "FooError",

View File

@ -1566,10 +1566,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
"data": [{"timestamp": 0,
"scenario_output": {"data": {1: 2}},
"duration": 3, "error": None,
"atomic_actions": [
{"name": "foo", "started_at": 0,
"finished_at": 3}]
}],
"atomic_actions": {
"foo": 3}}],
"statistics": {"durations": {
"rows": [["foo", 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, "100.0%", 1],
["total", 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, "100.0%", 1]
@ -1719,6 +1717,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
for iter in json.loads(wdata.chunk_data)["raw"]:
self.assertNotIn("scenario_output", iter)
self.assertIn("output", iter)
self.assertIsInstance(iter["atomic_actions"], list)
conn.execute(
wdata_table.delete().where(