Merge "Fix an issue of database migration"
This commit is contained in:
commit
17532bba9d
|
@ -17,7 +17,6 @@ SQLAlchemy implementation for DB.API
|
|||
"""
|
||||
|
||||
import collections
|
||||
import copy
|
||||
import datetime as dt
|
||||
import os
|
||||
import time
|
||||
|
@ -251,27 +250,6 @@ class Connection(object):
|
|||
results = (self.model_query(models.WorkloadData, session=session).
|
||||
filter_by(workload_uuid=workload_uuid).
|
||||
order_by(models.WorkloadData.chunk_order.asc()))
|
||||
if results.first() and results[0].chunk_data["raw"] and isinstance(
|
||||
results[0].chunk_data["raw"][0]["atomic_actions"], dict):
|
||||
# NOTE(andreykurilin): It is an old format of atomic actions.
|
||||
# We do not have migration yet, since it can take too much
|
||||
# time on the big databases. Let's lazy-migrate results which
|
||||
# user greps and force a migration after several releases.
|
||||
|
||||
for workload_data in results:
|
||||
chunk_data = copy.deepcopy(workload_data.chunk_data)
|
||||
for chunk in chunk_data["raw"]:
|
||||
new_atomic_actions = []
|
||||
started_at = chunk["timestamp"]
|
||||
for name, d in chunk["atomic_actions"].items():
|
||||
finished_at = started_at + d
|
||||
new_atomic_actions.append(
|
||||
{"name": name, "children": [],
|
||||
"started_at": started_at,
|
||||
"finished_at": finished_at})
|
||||
started_at = finished_at
|
||||
chunk["atomic_actions"] = new_atomic_actions
|
||||
workload_data.update({"chunk_data": chunk_data})
|
||||
|
||||
return sorted([raw for workload_data in results
|
||||
for raw in workload_data.chunk_data["raw"]],
|
||||
|
|
|
@ -77,6 +77,18 @@ def upgrade():
|
|||
"chart": "OutputStackedAreaChart"})
|
||||
del itr["scenario_output"]
|
||||
require_updating = True
|
||||
if isinstance(itr["atomic_actions"], dict):
|
||||
new_atomic_actions = []
|
||||
started_at = itr["timestamp"]
|
||||
for name, d in itr["atomic_actions"].items():
|
||||
finished_at = started_at + d
|
||||
new_atomic_actions.append(
|
||||
{"name": name, "children": [],
|
||||
"started_at": started_at,
|
||||
"finished_at": finished_at})
|
||||
started_at = finished_at
|
||||
itr["atomic_actions"] = new_atomic_actions
|
||||
require_updating = True
|
||||
|
||||
if require_updating:
|
||||
connection.execute(workload_data_helper.update().where(
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
|
||||
"""Tests for db.api layer."""
|
||||
|
||||
import collections
|
||||
import copy
|
||||
import datetime as dt
|
||||
|
||||
|
@ -182,65 +181,6 @@ class TasksTestCase(test.DBTestCase):
|
|||
"fcd0483f-a405-44c4-b712-99c9e52254eb",
|
||||
status=consts.TaskStatus.FINISHED)
|
||||
|
||||
def test_task_get_detailed__transform_atomics(self):
|
||||
task_id = self._create_task()["uuid"]
|
||||
subtask = db.subtask_create(task_id, title="foo")
|
||||
|
||||
workload = db.workload_create(
|
||||
task_id, subtask["uuid"], name="atata", description="foo",
|
||||
position=0, args={}, context={}, sla={}, runner={},
|
||||
runner_type="r", hooks=[])
|
||||
|
||||
workloads_data = [
|
||||
{"raw": [
|
||||
{"duration": 1, "timestamp": 1, "idle_duration": 1,
|
||||
"error": None, "output": None,
|
||||
"atomic_actions": collections.OrderedDict(
|
||||
[("foo", 1), ("bar", 2)])},
|
||||
{"duration": 1, "timestamp": 1, "idle_duration": 1,
|
||||
"error": None, "output": None,
|
||||
"atomic_actions": collections.OrderedDict(
|
||||
[("xxx", 1), ("yyy", 2)])},
|
||||
]},
|
||||
{"raw": [
|
||||
{"duration": 1, "timestamp": 1, "idle_duration": 1,
|
||||
"error": None, "output": None,
|
||||
"atomic_actions": collections.OrderedDict(
|
||||
[("xxx", 1), ("yyy", 2), ("zzz", 2)])}
|
||||
]}
|
||||
]
|
||||
|
||||
for i, data in enumerate(workloads_data):
|
||||
db.workload_data_create(task_id, workload["uuid"], i, data)
|
||||
|
||||
db.workload_set_results(workload_uuid=workload["uuid"],
|
||||
subtask_uuid=workload["subtask_uuid"],
|
||||
task_uuid=workload["task_uuid"],
|
||||
sla_results=[{"success": True}],
|
||||
load_duration=13, full_duration=42,
|
||||
start_time=77.33)
|
||||
|
||||
task = db.task_get(task_id, detailed=True)
|
||||
self.assertEqual(1, len(task["subtasks"]))
|
||||
self.assertEqual(1, len(task["subtasks"][0]["workloads"]))
|
||||
workload = task["subtasks"][0]["workloads"][0]
|
||||
self.assertEqual(
|
||||
[[{"started_at": 1, "finished_at": 2, "children": [],
|
||||
"name": "foo"},
|
||||
{"started_at": 2, "finished_at": 4, "children": [],
|
||||
"name": "bar"}],
|
||||
[{"started_at": 1, "finished_at": 2, "children": [],
|
||||
"name": "xxx"},
|
||||
{"started_at": 2, "finished_at": 4, "children": [],
|
||||
"name": "yyy"}],
|
||||
[{"started_at": 1, "finished_at": 2, "children": [],
|
||||
"name": "xxx"},
|
||||
{"started_at": 2, "finished_at": 4, "children": [],
|
||||
"name": "yyy"},
|
||||
{"started_at": 4, "finished_at": 6, "children": [],
|
||||
"name": "zzz"}]],
|
||||
[w["atomic_actions"] for w in workload["data"]])
|
||||
|
||||
def test_task_create_and_get_detailed(self):
|
||||
validation_result = {
|
||||
"etype": "FooError",
|
||||
|
|
|
@ -1566,10 +1566,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
|
|||
"data": [{"timestamp": 0,
|
||||
"scenario_output": {"data": {1: 2}},
|
||||
"duration": 3, "error": None,
|
||||
"atomic_actions": [
|
||||
{"name": "foo", "started_at": 0,
|
||||
"finished_at": 3}]
|
||||
}],
|
||||
"atomic_actions": {
|
||||
"foo": 3}}],
|
||||
"statistics": {"durations": {
|
||||
"rows": [["foo", 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, "100.0%", 1],
|
||||
["total", 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, "100.0%", 1]
|
||||
|
@ -1719,6 +1717,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
|
|||
for iter in json.loads(wdata.chunk_data)["raw"]:
|
||||
self.assertNotIn("scenario_output", iter)
|
||||
self.assertIn("output", iter)
|
||||
self.assertIsInstance(iter["atomic_actions"], list)
|
||||
|
||||
conn.execute(
|
||||
wdata_table.delete().where(
|
||||
|
|
Loading…
Reference in New Issue