Merge "Prepare for distribured runner, part 1"

This commit is contained in:
Jenkins 2015-12-16 14:59:46 +00:00 committed by Gerrit Code Review
commit a003008a04
8 changed files with 69 additions and 35 deletions

View File

@ -295,3 +295,4 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
pool.terminate()
pool.join()
self._flush_results()

View File

@ -72,3 +72,5 @@ class SerialScenarioRunner(runner.ScenarioRunner):
runner._get_scenario_context(context), args)
result = runner._run_scenario_once(run_args)
self._send_result(result)
self._flush_results()

View File

@ -75,12 +75,13 @@ class ResultConsumer(object):
def _consume_results(self):
while True:
if self.runner.result_queue:
result = self.runner.result_queue.popleft()
self.results.append(result)
success = self.sla_checker.add_iteration(result)
if self.abort_on_sla_failure and not success:
self.sla_checker.set_aborted_on_sla()
self.runner.abort()
results = self.runner.result_queue.popleft()
self.results.extend(results)
for r in results:
success = self.sla_checker.add_iteration(r)
if self.abort_on_sla_failure and not success:
self.sla_checker.set_aborted_on_sla()
self.runner.abort()
elif self.is_done.isSet():
break
else:

View File

@ -163,7 +163,7 @@ class ScenarioRunner(plugin.Plugin):
CONFIG_SCHEMA = {}
def __init__(self, task, config):
def __init__(self, task, config, batch_size=0):
"""Runner constructor.
It sets task and config to local variables. Also initialize
@ -177,6 +177,8 @@ class ScenarioRunner(plugin.Plugin):
self.result_queue = collections.deque()
self.aborted = multiprocessing.Event()
self.run_duration = 0
self.batch_size = batch_size
self.result_batch = []
@staticmethod
def validate(config):
@ -254,16 +256,32 @@ class ScenarioRunner(plugin.Plugin):
while not result_queue.empty():
self._send_result(result_queue.get())
self._flush_results()
result_queue.close()
def _flush_results(self):
if self.result_batch:
sorted_batch = sorted(self.result_batch)
self.result_queue.append(sorted_batch)
self.result_batch = []
def _send_result(self, result):
"""Send partial result to consumer.
"""Store partial result to send it to consumer later.
:param result: Result dict to be sent. It should match the
ScenarioRunnerResult schema, otherwise
ValidationError is raised.
"""
self.result_queue.append(ScenarioRunnerResult(result))
r = ScenarioRunnerResult(result)
self.result_batch.append(r)
if len(self.result_batch) >= self.batch_size:
sorted_batch = sorted(self.result_batch,
key=lambda r: r["timestamp"])
self.result_queue.append(sorted_batch)
self.result_batch = []
def _log_debug_info(self, **info):
"""Log runner parameters for debugging.

View File

@ -107,8 +107,9 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
runner_obj._run_scenario(
fakes.FakeScenario, "do_it", self.context, self.args)
self.assertEqual(len(runner_obj.result_queue), self.config["times"])
for result in runner_obj.result_queue:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
for result_batch in runner_obj.result_queue:
for result in result_batch:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
def test__run_scenario_exception(self):
runner_obj = constant.ConstantScenarioRunner(self.task, self.config)
@ -116,9 +117,10 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
runner_obj._run_scenario(fakes.FakeScenario, "something_went_wrong",
self.context, self.args)
self.assertEqual(len(runner_obj.result_queue), self.config["times"])
for result in runner_obj.result_queue:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
self.assertIn("error", runner_obj.result_queue[0])
for result_batch in runner_obj.result_queue:
for result in result_batch:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
self.assertIn("error", runner_obj.result_queue[0][0])
def test__run_scenario_aborted(self):
runner_obj = constant.ConstantScenarioRunner(self.task, self.config)
@ -258,8 +260,9 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
# NOTE(mmorais): when duration is 0, scenario executes exactly 1 time
expected_times = 1
self.assertEqual(len(runner_obj.result_queue), expected_times)
for result in runner_obj.result_queue:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
for result_batch in runner_obj.result_queue:
for result in result_batch:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
def test_run_scenario_constantly_for_duration_exception(self):
runner_obj = constant.ConstantForDurationScenarioRunner(
@ -270,9 +273,10 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
# NOTE(mmorais): when duration is 0, scenario executes exactly 1 time
expected_times = 1
self.assertEqual(len(runner_obj.result_queue), expected_times)
for result in runner_obj.result_queue:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
self.assertIn("error", runner_obj.result_queue[0])
for result_batch in runner_obj.result_queue:
for result in result_batch:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
self.assertIn("error", runner_obj.result_queue[0][0])
def test_run_scenario_constantly_for_duration_timeout(self):
runner_obj = constant.ConstantForDurationScenarioRunner(
@ -283,9 +287,10 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
# NOTE(mmorais): when duration is 0, scenario executes exactly 1 time
expected_times = 1
self.assertEqual(len(runner_obj.result_queue), expected_times)
for result in runner_obj.result_queue:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
self.assertIn("error", runner_obj.result_queue[0])
for result_batch in runner_obj.result_queue:
for result in result_batch:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
self.assertIn("error", runner_obj.result_queue[0][0])
def test__run_scenario_constantly_aborted(self):
runner_obj = constant.ConstantForDurationScenarioRunner(None,

View File

@ -145,8 +145,9 @@ class RPSScenarioRunnerTestCase(test.TestCase):
self.assertEqual(len(runner_obj.result_queue), config["times"])
for result in runner_obj.result_queue:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
for result_batch in runner_obj.result_queue:
for result in result_batch:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
@mock.patch(RUNNERS + "rps.time.sleep")
def test__run_scenario_exception(self, mock_sleep):
@ -156,8 +157,9 @@ class RPSScenarioRunnerTestCase(test.TestCase):
runner_obj._run_scenario(fakes.FakeScenario, "something_went_wrong",
fakes.FakeContext({}).context, {})
self.assertEqual(len(runner_obj.result_queue), config["times"])
for result in runner_obj.result_queue:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
for result_batch in runner_obj.result_queue:
for result in result_batch:
self.assertIsNotNone(runner.ScenarioRunnerResult(result))
@mock.patch(RUNNERS + "rps.time.sleep")
def test__run_scenario_aborted(self, mock_sleep):

View File

@ -26,9 +26,10 @@ class SerialScenarioRunnerTestCase(test.TestCase):
def test__run_scenario(self, mock__run_scenario_once):
times = 5
result = {"duration": 10, "idle_duration": 0, "error": [],
"scenario_output": {}, "atomic_actions": {}}
"scenario_output": {}, "atomic_actions": {},
"timestamp": 1}
mock__run_scenario_once.return_value = result
expected_results = [result for i in range(times)]
expected_results = [[result] for i in range(times)]
runner = serial.SerialScenarioRunner(mock.MagicMock(),
{"times": times})

View File

@ -488,8 +488,8 @@ class ResultConsumerTestCase(test.TestCase):
runner = mock.MagicMock()
results = [
{"duration": 1, "timestamp": 3},
{"duration": 2, "timestamp": 2}
[{"duration": 1, "timestamp": 3}],
[{"duration": 2, "timestamp": 2}]
]
runner.result_queue = collections.deque(results)
@ -497,9 +497,12 @@ class ResultConsumerTestCase(test.TestCase):
key, task, runner, False) as consumer_obj:
pass
self.assertEqual(list(map(mock.call, results)),
mock_sla_instance.add_iteration.mock_calls)
self.assertEqual(sorted(results, key=lambda x: x["timestamp"]),
mock_sla_instance.add_iteration.assert_has_calls([
mock.call({"duration": 1, "timestamp": 3}),
mock.call({"duration": 2, "timestamp": 2})])
self.assertEqual([{"duration": 2, "timestamp": 2},
{"duration": 1, "timestamp": 3}],
consumer_obj.results)
@mock.patch("rally.common.objects.Task.get_status")
@ -517,7 +520,8 @@ class ResultConsumerTestCase(test.TestCase):
runner = mock.MagicMock()
runner.result_queue = collections.deque(
[{"duration": 1, "timestamp": 1}] * 4)
[[{"duration": 1, "timestamp": 1},
{"duration": 2, "timestamp": 2}]] * 4)
with engine.ResultConsumer(key, task, runner, True):
pass
@ -565,7 +569,7 @@ class ResultConsumerTestCase(test.TestCase):
task = mock.MagicMock()
runner = mock.MagicMock()
runner.result_queue = collections.deque(
[{"duration": 1, "timestamp": 4}] * 4)
[[{"duration": 1, "timestamp": 4}]] * 4)
with engine.ResultConsumer(key, task, runner, False):
pass