Unify constant runner logic
Change-Id: I7f9c34e0bc146ac3a8223aff5a42498341a36a88 Related-Bug: #1800447
This commit is contained in:
parent
a19cc57391
commit
cfde22978a
|
@ -24,15 +24,16 @@ from rally.common import utils
|
|||
from rally.common import validation
|
||||
from rally import consts
|
||||
from rally.task import runner
|
||||
from rally.task import utils as butils
|
||||
|
||||
|
||||
def _worker_process(queue, iteration_gen, timeout, concurrency, times,
|
||||
context, cls, method_name, args, event_queue, aborted,
|
||||
info):
|
||||
duration, context, cls, method_name, args, event_queue,
|
||||
aborted, info):
|
||||
"""Start the scenario within threads.
|
||||
|
||||
Spawn threads to support scenario execution for a fixed number of times.
|
||||
Spawn threads to support scenario execution.
|
||||
Scenario is ran for a fixed number of times if times is specified
|
||||
Scenario is ran for fixed duration if duration is specified.
|
||||
This generates a constant load on the cloud under test by executing each
|
||||
scenario iteration without pausing between iterations. Each thread runs
|
||||
the scenario method once with passed scenario arguments and context.
|
||||
|
@ -43,6 +44,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
|
|||
:param timeout: operation's timeout
|
||||
:param concurrency: number of concurrently running scenario iterations
|
||||
:param times: total number of scenario iterations to be run
|
||||
:param duration: total duration in seconds of the run
|
||||
:param context: scenario context object
|
||||
:param cls: scenario class
|
||||
:param method_name: scenario method name
|
||||
|
@ -52,14 +54,25 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
|
|||
the flag is set
|
||||
:param info: info about all processes count and counter of launched process
|
||||
"""
|
||||
def _to_be_continued(iteration, current_duration, aborted, times=None,
|
||||
duration=None):
|
||||
if times is not None:
|
||||
return iteration < times and not aborted.is_set()
|
||||
elif duration is not None:
|
||||
return current_duration < duration and not aborted.is_set()
|
||||
else:
|
||||
return False
|
||||
|
||||
if times is None and duration is None:
|
||||
raise ValueError("times or duration must be specified")
|
||||
|
||||
pool = collections.deque()
|
||||
alive_threads_in_pool = 0
|
||||
finished_threads_in_pool = 0
|
||||
|
||||
runner._log_worker_info(times=times, concurrency=concurrency,
|
||||
timeout=timeout, cls=cls, method_name=method_name,
|
||||
args=args)
|
||||
runner._log_worker_info(times=times, duration=duration,
|
||||
concurrency=concurrency, timeout=timeout, cls=cls,
|
||||
method_name=method_name, args=args)
|
||||
|
||||
if timeout:
|
||||
timeout_queue = Queue.Queue()
|
||||
|
@ -70,7 +83,13 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
|
|||
collector_thr_by_timeout.start()
|
||||
|
||||
iteration = next(iteration_gen)
|
||||
while iteration < times and not aborted.is_set():
|
||||
start_time = time.time()
|
||||
# NOTE(msimonin): keep the previous behaviour
|
||||
# > when duration is 0, scenario executes exactly 1 time
|
||||
current_duration = -1
|
||||
while _to_be_continued(iteration, current_duration, aborted,
|
||||
times=times, duration=duration):
|
||||
|
||||
scenario_context = runner._get_scenario_context(iteration, context)
|
||||
worker_args = (
|
||||
queue, cls, method_name, scenario_context, args, event_queue)
|
||||
|
@ -106,6 +125,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
|
|||
# we should wait to not create big noise with these checks
|
||||
time.sleep(0.001)
|
||||
iteration = next(iteration_gen)
|
||||
current_duration = time.time() - start_time
|
||||
|
||||
# Wait until all threads are done
|
||||
while pool:
|
||||
|
@ -217,8 +237,8 @@ class ConstantScenarioRunner(runner.ScenarioRunner):
|
|||
while True:
|
||||
yield (result_queue, iteration_gen, timeout,
|
||||
concurrency_per_worker + (concurrency_overhead and 1),
|
||||
times, context, cls, method_name, args, event_queue,
|
||||
self.aborted)
|
||||
times, None, context, cls, method_name, args,
|
||||
event_queue, self.aborted)
|
||||
if concurrency_overhead:
|
||||
concurrency_overhead -= 1
|
||||
|
||||
|
@ -228,18 +248,6 @@ class ConstantScenarioRunner(runner.ScenarioRunner):
|
|||
self._join_processes(process_pool, result_queue, event_queue)
|
||||
|
||||
|
||||
def _run_scenario_once_with_unpack_args(args):
|
||||
# NOTE(andreykurilin): `pool.imap` is used in
|
||||
# ConstantForDurationScenarioRunner. It does not want to work with
|
||||
# instance-methods, class-methods and static-methods. Also, it can't
|
||||
# transmit positional or keyword arguments to destination function.
|
||||
# While original `rally.task.runner._run_scenario_once` accepts
|
||||
# multiple arguments instead of one big tuple with all arguments, we
|
||||
# need to hardcode unpacking here(all other runners are able to
|
||||
# transmit arguments in proper way).
|
||||
return runner._run_scenario_once(*args)
|
||||
|
||||
|
||||
@runner.configure(name="constant_for_duration")
|
||||
class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
|
||||
"""Creates constant load executing a scenario for an interval of time.
|
||||
|
@ -267,7 +275,8 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
|
|||
"type": "number",
|
||||
"minimum": 0.0,
|
||||
"description": "The number of seconds during which to generate"
|
||||
" a load."
|
||||
" a load. If the duration is 0, the scenario"
|
||||
" will run once per parallel execution."
|
||||
},
|
||||
"timeout": {
|
||||
"type": "number",
|
||||
|
@ -279,22 +288,18 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
|
|||
"additionalProperties": False
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _iter_scenario_args(cls, method, ctx, args, event_queue, aborted):
|
||||
def _scenario_args(i):
|
||||
if aborted.is_set():
|
||||
raise StopIteration()
|
||||
return (cls, method, runner._get_scenario_context(i, ctx), args,
|
||||
event_queue)
|
||||
return _scenario_args
|
||||
|
||||
def _run_scenario(self, cls, method, context, args):
|
||||
def _run_scenario(self, cls, method_name, context, args):
|
||||
"""Runs the specified scenario with given arguments.
|
||||
|
||||
This method generates a constant load on the cloud under test by
|
||||
executing each scenario iteration using a pool of processes without
|
||||
pausing between iterations up to the number of times specified
|
||||
in the scenario config.
|
||||
|
||||
:param cls: The Scenario class where the scenario is implemented
|
||||
:param method: Name of the method that implements the scenario
|
||||
:param method_name: Name of the method that implements the scenario
|
||||
:param context: context that contains users, admin & other
|
||||
information, that was created before scenario is
|
||||
information, that was created before scenario
|
||||
execution starts.
|
||||
:param args: Arguments to call the scenario method with
|
||||
|
||||
|
@ -302,49 +307,37 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
|
|||
where each result is a dictionary
|
||||
"""
|
||||
timeout = self.config.get("timeout", 600)
|
||||
duration = self.config.get("duration", 0)
|
||||
concurrency = self.config.get("concurrency", 1)
|
||||
duration = self.config.get("duration")
|
||||
iteration_gen = utils.RAMInt()
|
||||
|
||||
# FIXME(andreykurilin): unify `_worker_process`, use it here and remove
|
||||
# usage of `multiprocessing.Pool`(usage of separate process for
|
||||
# each concurrent iteration is redundant).
|
||||
pool = multiprocessing.Pool(concurrency)
|
||||
manager = multiprocessing.Manager()
|
||||
event_queue = manager.Queue()
|
||||
stop_event_listener = threading.Event()
|
||||
cpu_count = multiprocessing.cpu_count()
|
||||
max_cpu_used = min(cpu_count,
|
||||
self.config.get("max_cpu_count", cpu_count))
|
||||
|
||||
def event_listener():
|
||||
while not stop_event_listener.isSet():
|
||||
while not event_queue.empty():
|
||||
self.send_event(**event_queue.get())
|
||||
else:
|
||||
time.sleep(0.01)
|
||||
processes_to_start = min(max_cpu_used, concurrency)
|
||||
concurrency_per_worker, concurrency_overhead = divmod(
|
||||
concurrency, processes_to_start)
|
||||
|
||||
event_listener_thread = threading.Thread(target=event_listener)
|
||||
event_listener_thread.start()
|
||||
self._log_debug_info(duration=duration, concurrency=concurrency,
|
||||
timeout=timeout, max_cpu_used=max_cpu_used,
|
||||
processes_to_start=processes_to_start,
|
||||
concurrency_per_worker=concurrency_per_worker,
|
||||
concurrency_overhead=concurrency_overhead)
|
||||
|
||||
run_args = butils.infinite_run_args_generator(
|
||||
self._iter_scenario_args(
|
||||
cls, method, context, args, event_queue, self.aborted))
|
||||
iter_result = pool.imap(_run_scenario_once_with_unpack_args, run_args)
|
||||
result_queue = multiprocessing.Queue()
|
||||
event_queue = multiprocessing.Queue()
|
||||
|
||||
start = time.time()
|
||||
try:
|
||||
def worker_args_gen(concurrency_overhead):
|
||||
while True:
|
||||
try:
|
||||
result = iter_result.next(timeout)
|
||||
except multiprocessing.TimeoutError as e:
|
||||
result = runner.format_result_on_timeout(e, timeout)
|
||||
except StopIteration:
|
||||
break
|
||||
yield (result_queue, iteration_gen, timeout,
|
||||
concurrency_per_worker + (concurrency_overhead and 1),
|
||||
None, duration, context, cls, method_name, args,
|
||||
event_queue, self.aborted)
|
||||
if concurrency_overhead:
|
||||
concurrency_overhead -= 1
|
||||
|
||||
self._send_result(result)
|
||||
|
||||
if time.time() - start > duration:
|
||||
break
|
||||
finally:
|
||||
stop_event_listener.set()
|
||||
event_listener_thread.join()
|
||||
pool.terminate()
|
||||
pool.join()
|
||||
self._flush_results()
|
||||
process_pool = self._create_process_pool(
|
||||
processes_to_start, _worker_process,
|
||||
worker_args_gen(concurrency_overhead))
|
||||
self._join_processes(process_pool, result_queue, event_queue)
|
||||
|
|
|
@ -56,15 +56,6 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
|
|||
else:
|
||||
self.assertGreater(len(results), 0)
|
||||
|
||||
@mock.patch(RUNNERS + "constant.runner")
|
||||
def test__run_scenario_once_with_unpack_args(self, mock_runner):
|
||||
result = constant._run_scenario_once_with_unpack_args(
|
||||
("FOO", ("BAR", "QUUZ")))
|
||||
|
||||
self.assertEqual(result, mock_runner._run_scenario_once.return_value)
|
||||
mock_runner._run_scenario_once.assert_called_once_with(
|
||||
"FOO", ("BAR", "QUUZ"))
|
||||
|
||||
@mock.patch(RUNNERS + "constant.time")
|
||||
@mock.patch(RUNNERS + "constant.threading.Thread")
|
||||
@mock.patch(RUNNERS + "constant.multiprocessing.Queue")
|
||||
|
@ -89,7 +80,7 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
|
|||
"id": "uuid1"}]}
|
||||
info = {"processes_to_start": 1, "processes_counter": 1}
|
||||
|
||||
constant._worker_process(mock_queue, fake_ram_int, 1, 2, times,
|
||||
constant._worker_process(mock_queue, fake_ram_int, 1, 2, times, None,
|
||||
context, "Dummy", "dummy", (),
|
||||
mock_event_queue, mock_event, info)
|
||||
|
||||
|
@ -268,6 +259,7 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
|
|||
self.context = fakes.FakeContext({"task": {"uuid": "uuid"}}).context
|
||||
self.context["iteration"] = 14
|
||||
self.args = {"a": 1}
|
||||
self.task = mock.MagicMock()
|
||||
|
||||
@ddt.data(({"duration": 0,
|
||||
"concurrency": 2,
|
||||
|
@ -288,8 +280,9 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
|
|||
|
||||
runner_obj._run_scenario(fakes.FakeScenario, "do_it",
|
||||
self.context, self.args)
|
||||
# NOTE(mmorais): when duration is 0, scenario executes exactly 1 time
|
||||
expected_times = 1
|
||||
# NOTE(mmorais/msimonin): when duration is 0, scenario executes exactly
|
||||
# 1 time per unit of parrallelism
|
||||
expected_times = self.config["concurrency"]
|
||||
self.assertEqual(expected_times, len(runner_obj.result_queue))
|
||||
for result_batch in runner_obj.result_queue:
|
||||
for result in result_batch:
|
||||
|
@ -301,8 +294,9 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
|
|||
|
||||
runner_obj._run_scenario(fakes.FakeScenario, "something_went_wrong",
|
||||
self.context, self.args)
|
||||
# NOTE(mmorais): when duration is 0, scenario executes exactly 1 time
|
||||
expected_times = 1
|
||||
# NOTE(mmorais/msimonin): when duration is 0, scenario executes exactly
|
||||
# 1 time per unit of parrallelism
|
||||
expected_times = self.config["concurrency"]
|
||||
self.assertEqual(expected_times, len(runner_obj.result_queue))
|
||||
for result_batch in runner_obj.result_queue:
|
||||
for result in result_batch:
|
||||
|
@ -315,8 +309,9 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
|
|||
|
||||
runner_obj._run_scenario(fakes.FakeScenario, "raise_timeout",
|
||||
self.context, self.args)
|
||||
# NOTE(mmorais): when duration is 0, scenario executes exactly 1 time
|
||||
expected_times = 1
|
||||
# NOTE(mmorais/msimonin): when duration is 0, scenario executes exactly
|
||||
# 1 time per unit of parrallelism
|
||||
expected_times = self.config["concurrency"]
|
||||
self.assertEqual(expected_times, len(runner_obj.result_queue))
|
||||
for result_batch in runner_obj.result_queue:
|
||||
for result in result_batch:
|
||||
|
@ -324,7 +319,7 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
|
|||
self.assertIn("error", runner_obj.result_queue[0][0])
|
||||
|
||||
def test__run_scenario_constantly_aborted(self):
|
||||
runner_obj = constant.ConstantForDurationScenarioRunner(None,
|
||||
runner_obj = constant.ConstantForDurationScenarioRunner(self.task,
|
||||
self.config)
|
||||
|
||||
runner_obj.abort()
|
||||
|
|
Loading…
Reference in New Issue