Unify constant runner logic

Change-Id: I7f9c34e0bc146ac3a8223aff5a42498341a36a88
Related-Bug: #1800447
This commit is contained in:
msimonin 2018-10-31 16:18:48 +01:00
parent a19cc57391
commit cfde22978a
No known key found for this signature in database
GPG Key ID: F3D1D2B4FAABD959
2 changed files with 78 additions and 90 deletions

View File

@ -24,15 +24,16 @@ from rally.common import utils
from rally.common import validation
from rally import consts
from rally.task import runner
from rally.task import utils as butils
def _worker_process(queue, iteration_gen, timeout, concurrency, times,
context, cls, method_name, args, event_queue, aborted,
info):
duration, context, cls, method_name, args, event_queue,
aborted, info):
"""Start the scenario within threads.
Spawn threads to support scenario execution for a fixed number of times.
Spawn threads to support scenario execution.
Scenario is ran for a fixed number of times if times is specified
Scenario is ran for fixed duration if duration is specified.
This generates a constant load on the cloud under test by executing each
scenario iteration without pausing between iterations. Each thread runs
the scenario method once with passed scenario arguments and context.
@ -43,6 +44,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
:param timeout: operation's timeout
:param concurrency: number of concurrently running scenario iterations
:param times: total number of scenario iterations to be run
:param duration: total duration in seconds of the run
:param context: scenario context object
:param cls: scenario class
:param method_name: scenario method name
@ -52,14 +54,25 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
the flag is set
:param info: info about all processes count and counter of launched process
"""
def _to_be_continued(iteration, current_duration, aborted, times=None,
duration=None):
if times is not None:
return iteration < times and not aborted.is_set()
elif duration is not None:
return current_duration < duration and not aborted.is_set()
else:
return False
if times is None and duration is None:
raise ValueError("times or duration must be specified")
pool = collections.deque()
alive_threads_in_pool = 0
finished_threads_in_pool = 0
runner._log_worker_info(times=times, concurrency=concurrency,
timeout=timeout, cls=cls, method_name=method_name,
args=args)
runner._log_worker_info(times=times, duration=duration,
concurrency=concurrency, timeout=timeout, cls=cls,
method_name=method_name, args=args)
if timeout:
timeout_queue = Queue.Queue()
@ -70,7 +83,13 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
collector_thr_by_timeout.start()
iteration = next(iteration_gen)
while iteration < times and not aborted.is_set():
start_time = time.time()
# NOTE(msimonin): keep the previous behaviour
# > when duration is 0, scenario executes exactly 1 time
current_duration = -1
while _to_be_continued(iteration, current_duration, aborted,
times=times, duration=duration):
scenario_context = runner._get_scenario_context(iteration, context)
worker_args = (
queue, cls, method_name, scenario_context, args, event_queue)
@ -106,6 +125,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
# we should wait to not create big noise with these checks
time.sleep(0.001)
iteration = next(iteration_gen)
current_duration = time.time() - start_time
# Wait until all threads are done
while pool:
@ -217,8 +237,8 @@ class ConstantScenarioRunner(runner.ScenarioRunner):
while True:
yield (result_queue, iteration_gen, timeout,
concurrency_per_worker + (concurrency_overhead and 1),
times, context, cls, method_name, args, event_queue,
self.aborted)
times, None, context, cls, method_name, args,
event_queue, self.aborted)
if concurrency_overhead:
concurrency_overhead -= 1
@ -228,18 +248,6 @@ class ConstantScenarioRunner(runner.ScenarioRunner):
self._join_processes(process_pool, result_queue, event_queue)
def _run_scenario_once_with_unpack_args(args):
# NOTE(andreykurilin): `pool.imap` is used in
# ConstantForDurationScenarioRunner. It does not want to work with
# instance-methods, class-methods and static-methods. Also, it can't
# transmit positional or keyword arguments to destination function.
# While original `rally.task.runner._run_scenario_once` accepts
# multiple arguments instead of one big tuple with all arguments, we
# need to hardcode unpacking here(all other runners are able to
# transmit arguments in proper way).
return runner._run_scenario_once(*args)
@runner.configure(name="constant_for_duration")
class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
"""Creates constant load executing a scenario for an interval of time.
@ -267,7 +275,8 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
"type": "number",
"minimum": 0.0,
"description": "The number of seconds during which to generate"
" a load."
" a load. If the duration is 0, the scenario"
" will run once per parallel execution."
},
"timeout": {
"type": "number",
@ -279,22 +288,18 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
"additionalProperties": False
}
@staticmethod
def _iter_scenario_args(cls, method, ctx, args, event_queue, aborted):
def _scenario_args(i):
if aborted.is_set():
raise StopIteration()
return (cls, method, runner._get_scenario_context(i, ctx), args,
event_queue)
return _scenario_args
def _run_scenario(self, cls, method, context, args):
def _run_scenario(self, cls, method_name, context, args):
"""Runs the specified scenario with given arguments.
This method generates a constant load on the cloud under test by
executing each scenario iteration using a pool of processes without
pausing between iterations up to the number of times specified
in the scenario config.
:param cls: The Scenario class where the scenario is implemented
:param method: Name of the method that implements the scenario
:param method_name: Name of the method that implements the scenario
:param context: context that contains users, admin & other
information, that was created before scenario is
information, that was created before scenario
execution starts.
:param args: Arguments to call the scenario method with
@ -302,49 +307,37 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
where each result is a dictionary
"""
timeout = self.config.get("timeout", 600)
duration = self.config.get("duration", 0)
concurrency = self.config.get("concurrency", 1)
duration = self.config.get("duration")
iteration_gen = utils.RAMInt()
# FIXME(andreykurilin): unify `_worker_process`, use it here and remove
# usage of `multiprocessing.Pool`(usage of separate process for
# each concurrent iteration is redundant).
pool = multiprocessing.Pool(concurrency)
manager = multiprocessing.Manager()
event_queue = manager.Queue()
stop_event_listener = threading.Event()
cpu_count = multiprocessing.cpu_count()
max_cpu_used = min(cpu_count,
self.config.get("max_cpu_count", cpu_count))
def event_listener():
while not stop_event_listener.isSet():
while not event_queue.empty():
self.send_event(**event_queue.get())
else:
time.sleep(0.01)
processes_to_start = min(max_cpu_used, concurrency)
concurrency_per_worker, concurrency_overhead = divmod(
concurrency, processes_to_start)
event_listener_thread = threading.Thread(target=event_listener)
event_listener_thread.start()
self._log_debug_info(duration=duration, concurrency=concurrency,
timeout=timeout, max_cpu_used=max_cpu_used,
processes_to_start=processes_to_start,
concurrency_per_worker=concurrency_per_worker,
concurrency_overhead=concurrency_overhead)
run_args = butils.infinite_run_args_generator(
self._iter_scenario_args(
cls, method, context, args, event_queue, self.aborted))
iter_result = pool.imap(_run_scenario_once_with_unpack_args, run_args)
result_queue = multiprocessing.Queue()
event_queue = multiprocessing.Queue()
start = time.time()
try:
def worker_args_gen(concurrency_overhead):
while True:
try:
result = iter_result.next(timeout)
except multiprocessing.TimeoutError as e:
result = runner.format_result_on_timeout(e, timeout)
except StopIteration:
break
yield (result_queue, iteration_gen, timeout,
concurrency_per_worker + (concurrency_overhead and 1),
None, duration, context, cls, method_name, args,
event_queue, self.aborted)
if concurrency_overhead:
concurrency_overhead -= 1
self._send_result(result)
if time.time() - start > duration:
break
finally:
stop_event_listener.set()
event_listener_thread.join()
pool.terminate()
pool.join()
self._flush_results()
process_pool = self._create_process_pool(
processes_to_start, _worker_process,
worker_args_gen(concurrency_overhead))
self._join_processes(process_pool, result_queue, event_queue)

View File

@ -56,15 +56,6 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
else:
self.assertGreater(len(results), 0)
@mock.patch(RUNNERS + "constant.runner")
def test__run_scenario_once_with_unpack_args(self, mock_runner):
result = constant._run_scenario_once_with_unpack_args(
("FOO", ("BAR", "QUUZ")))
self.assertEqual(result, mock_runner._run_scenario_once.return_value)
mock_runner._run_scenario_once.assert_called_once_with(
"FOO", ("BAR", "QUUZ"))
@mock.patch(RUNNERS + "constant.time")
@mock.patch(RUNNERS + "constant.threading.Thread")
@mock.patch(RUNNERS + "constant.multiprocessing.Queue")
@ -89,7 +80,7 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
"id": "uuid1"}]}
info = {"processes_to_start": 1, "processes_counter": 1}
constant._worker_process(mock_queue, fake_ram_int, 1, 2, times,
constant._worker_process(mock_queue, fake_ram_int, 1, 2, times, None,
context, "Dummy", "dummy", (),
mock_event_queue, mock_event, info)
@ -268,6 +259,7 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
self.context = fakes.FakeContext({"task": {"uuid": "uuid"}}).context
self.context["iteration"] = 14
self.args = {"a": 1}
self.task = mock.MagicMock()
@ddt.data(({"duration": 0,
"concurrency": 2,
@ -288,8 +280,9 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
runner_obj._run_scenario(fakes.FakeScenario, "do_it",
self.context, self.args)
# NOTE(mmorais): when duration is 0, scenario executes exactly 1 time
expected_times = 1
# NOTE(mmorais/msimonin): when duration is 0, scenario executes exactly
# 1 time per unit of parrallelism
expected_times = self.config["concurrency"]
self.assertEqual(expected_times, len(runner_obj.result_queue))
for result_batch in runner_obj.result_queue:
for result in result_batch:
@ -301,8 +294,9 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
runner_obj._run_scenario(fakes.FakeScenario, "something_went_wrong",
self.context, self.args)
# NOTE(mmorais): when duration is 0, scenario executes exactly 1 time
expected_times = 1
# NOTE(mmorais/msimonin): when duration is 0, scenario executes exactly
# 1 time per unit of parrallelism
expected_times = self.config["concurrency"]
self.assertEqual(expected_times, len(runner_obj.result_queue))
for result_batch in runner_obj.result_queue:
for result in result_batch:
@ -315,8 +309,9 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
runner_obj._run_scenario(fakes.FakeScenario, "raise_timeout",
self.context, self.args)
# NOTE(mmorais): when duration is 0, scenario executes exactly 1 time
expected_times = 1
# NOTE(mmorais/msimonin): when duration is 0, scenario executes exactly
# 1 time per unit of parrallelism
expected_times = self.config["concurrency"]
self.assertEqual(expected_times, len(runner_obj.result_queue))
for result_batch in runner_obj.result_queue:
for result in result_batch:
@ -324,7 +319,7 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
self.assertIn("error", runner_obj.result_queue[0][0])
def test__run_scenario_constantly_aborted(self):
runner_obj = constant.ConstantForDurationScenarioRunner(None,
runner_obj = constant.ConstantForDurationScenarioRunner(self.task,
self.config)
runner_obj.abort()