diff --git a/rally/plugins/common/runners/constant.py b/rally/plugins/common/runners/constant.py index 37bc5da948..962fd53dcb 100644 --- a/rally/plugins/common/runners/constant.py +++ b/rally/plugins/common/runners/constant.py @@ -24,15 +24,16 @@ from rally.common import utils from rally.common import validation from rally import consts from rally.task import runner -from rally.task import utils as butils def _worker_process(queue, iteration_gen, timeout, concurrency, times, - context, cls, method_name, args, event_queue, aborted, - info): + duration, context, cls, method_name, args, event_queue, + aborted, info): """Start the scenario within threads. - Spawn threads to support scenario execution for a fixed number of times. + Spawn threads to support scenario execution. + Scenario is ran for a fixed number of times if times is specified + Scenario is ran for fixed duration if duration is specified. This generates a constant load on the cloud under test by executing each scenario iteration without pausing between iterations. Each thread runs the scenario method once with passed scenario arguments and context. @@ -43,6 +44,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, :param timeout: operation's timeout :param concurrency: number of concurrently running scenario iterations :param times: total number of scenario iterations to be run + :param duration: total duration in seconds of the run :param context: scenario context object :param cls: scenario class :param method_name: scenario method name @@ -52,14 +54,25 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, the flag is set :param info: info about all processes count and counter of launched process """ + def _to_be_continued(iteration, current_duration, aborted, times=None, + duration=None): + if times is not None: + return iteration < times and not aborted.is_set() + elif duration is not None: + return current_duration < duration and not aborted.is_set() + else: + return False + + if times is None and duration is None: + raise ValueError("times or duration must be specified") pool = collections.deque() alive_threads_in_pool = 0 finished_threads_in_pool = 0 - runner._log_worker_info(times=times, concurrency=concurrency, - timeout=timeout, cls=cls, method_name=method_name, - args=args) + runner._log_worker_info(times=times, duration=duration, + concurrency=concurrency, timeout=timeout, cls=cls, + method_name=method_name, args=args) if timeout: timeout_queue = Queue.Queue() @@ -70,7 +83,13 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, collector_thr_by_timeout.start() iteration = next(iteration_gen) - while iteration < times and not aborted.is_set(): + start_time = time.time() + # NOTE(msimonin): keep the previous behaviour + # > when duration is 0, scenario executes exactly 1 time + current_duration = -1 + while _to_be_continued(iteration, current_duration, aborted, + times=times, duration=duration): + scenario_context = runner._get_scenario_context(iteration, context) worker_args = ( queue, cls, method_name, scenario_context, args, event_queue) @@ -106,6 +125,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, # we should wait to not create big noise with these checks time.sleep(0.001) iteration = next(iteration_gen) + current_duration = time.time() - start_time # Wait until all threads are done while pool: @@ -217,8 +237,8 @@ class ConstantScenarioRunner(runner.ScenarioRunner): while True: yield (result_queue, iteration_gen, timeout, concurrency_per_worker + (concurrency_overhead and 1), - times, context, cls, method_name, args, event_queue, - self.aborted) + times, None, context, cls, method_name, args, + event_queue, self.aborted) if concurrency_overhead: concurrency_overhead -= 1 @@ -228,18 +248,6 @@ class ConstantScenarioRunner(runner.ScenarioRunner): self._join_processes(process_pool, result_queue, event_queue) -def _run_scenario_once_with_unpack_args(args): - # NOTE(andreykurilin): `pool.imap` is used in - # ConstantForDurationScenarioRunner. It does not want to work with - # instance-methods, class-methods and static-methods. Also, it can't - # transmit positional or keyword arguments to destination function. - # While original `rally.task.runner._run_scenario_once` accepts - # multiple arguments instead of one big tuple with all arguments, we - # need to hardcode unpacking here(all other runners are able to - # transmit arguments in proper way). - return runner._run_scenario_once(*args) - - @runner.configure(name="constant_for_duration") class ConstantForDurationScenarioRunner(runner.ScenarioRunner): """Creates constant load executing a scenario for an interval of time. @@ -267,7 +275,8 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner): "type": "number", "minimum": 0.0, "description": "The number of seconds during which to generate" - " a load." + " a load. If the duration is 0, the scenario" + " will run once per parallel execution." }, "timeout": { "type": "number", @@ -279,22 +288,18 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner): "additionalProperties": False } - @staticmethod - def _iter_scenario_args(cls, method, ctx, args, event_queue, aborted): - def _scenario_args(i): - if aborted.is_set(): - raise StopIteration() - return (cls, method, runner._get_scenario_context(i, ctx), args, - event_queue) - return _scenario_args - - def _run_scenario(self, cls, method, context, args): + def _run_scenario(self, cls, method_name, context, args): """Runs the specified scenario with given arguments. + This method generates a constant load on the cloud under test by + executing each scenario iteration using a pool of processes without + pausing between iterations up to the number of times specified + in the scenario config. + :param cls: The Scenario class where the scenario is implemented - :param method: Name of the method that implements the scenario + :param method_name: Name of the method that implements the scenario :param context: context that contains users, admin & other - information, that was created before scenario is + information, that was created before scenario execution starts. :param args: Arguments to call the scenario method with @@ -302,49 +307,37 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner): where each result is a dictionary """ timeout = self.config.get("timeout", 600) + duration = self.config.get("duration", 0) concurrency = self.config.get("concurrency", 1) - duration = self.config.get("duration") + iteration_gen = utils.RAMInt() - # FIXME(andreykurilin): unify `_worker_process`, use it here and remove - # usage of `multiprocessing.Pool`(usage of separate process for - # each concurrent iteration is redundant). - pool = multiprocessing.Pool(concurrency) - manager = multiprocessing.Manager() - event_queue = manager.Queue() - stop_event_listener = threading.Event() + cpu_count = multiprocessing.cpu_count() + max_cpu_used = min(cpu_count, + self.config.get("max_cpu_count", cpu_count)) - def event_listener(): - while not stop_event_listener.isSet(): - while not event_queue.empty(): - self.send_event(**event_queue.get()) - else: - time.sleep(0.01) + processes_to_start = min(max_cpu_used, concurrency) + concurrency_per_worker, concurrency_overhead = divmod( + concurrency, processes_to_start) - event_listener_thread = threading.Thread(target=event_listener) - event_listener_thread.start() + self._log_debug_info(duration=duration, concurrency=concurrency, + timeout=timeout, max_cpu_used=max_cpu_used, + processes_to_start=processes_to_start, + concurrency_per_worker=concurrency_per_worker, + concurrency_overhead=concurrency_overhead) - run_args = butils.infinite_run_args_generator( - self._iter_scenario_args( - cls, method, context, args, event_queue, self.aborted)) - iter_result = pool.imap(_run_scenario_once_with_unpack_args, run_args) + result_queue = multiprocessing.Queue() + event_queue = multiprocessing.Queue() - start = time.time() - try: + def worker_args_gen(concurrency_overhead): while True: - try: - result = iter_result.next(timeout) - except multiprocessing.TimeoutError as e: - result = runner.format_result_on_timeout(e, timeout) - except StopIteration: - break + yield (result_queue, iteration_gen, timeout, + concurrency_per_worker + (concurrency_overhead and 1), + None, duration, context, cls, method_name, args, + event_queue, self.aborted) + if concurrency_overhead: + concurrency_overhead -= 1 - self._send_result(result) - - if time.time() - start > duration: - break - finally: - stop_event_listener.set() - event_listener_thread.join() - pool.terminate() - pool.join() - self._flush_results() + process_pool = self._create_process_pool( + processes_to_start, _worker_process, + worker_args_gen(concurrency_overhead)) + self._join_processes(process_pool, result_queue, event_queue) diff --git a/tests/unit/plugins/common/runners/test_constant.py b/tests/unit/plugins/common/runners/test_constant.py index aa49d39853..1c6a031eba 100644 --- a/tests/unit/plugins/common/runners/test_constant.py +++ b/tests/unit/plugins/common/runners/test_constant.py @@ -56,15 +56,6 @@ class ConstantScenarioRunnerTestCase(test.TestCase): else: self.assertGreater(len(results), 0) - @mock.patch(RUNNERS + "constant.runner") - def test__run_scenario_once_with_unpack_args(self, mock_runner): - result = constant._run_scenario_once_with_unpack_args( - ("FOO", ("BAR", "QUUZ"))) - - self.assertEqual(result, mock_runner._run_scenario_once.return_value) - mock_runner._run_scenario_once.assert_called_once_with( - "FOO", ("BAR", "QUUZ")) - @mock.patch(RUNNERS + "constant.time") @mock.patch(RUNNERS + "constant.threading.Thread") @mock.patch(RUNNERS + "constant.multiprocessing.Queue") @@ -89,7 +80,7 @@ class ConstantScenarioRunnerTestCase(test.TestCase): "id": "uuid1"}]} info = {"processes_to_start": 1, "processes_counter": 1} - constant._worker_process(mock_queue, fake_ram_int, 1, 2, times, + constant._worker_process(mock_queue, fake_ram_int, 1, 2, times, None, context, "Dummy", "dummy", (), mock_event_queue, mock_event, info) @@ -268,6 +259,7 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase): self.context = fakes.FakeContext({"task": {"uuid": "uuid"}}).context self.context["iteration"] = 14 self.args = {"a": 1} + self.task = mock.MagicMock() @ddt.data(({"duration": 0, "concurrency": 2, @@ -288,8 +280,9 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase): runner_obj._run_scenario(fakes.FakeScenario, "do_it", self.context, self.args) - # NOTE(mmorais): when duration is 0, scenario executes exactly 1 time - expected_times = 1 + # NOTE(mmorais/msimonin): when duration is 0, scenario executes exactly + # 1 time per unit of parrallelism + expected_times = self.config["concurrency"] self.assertEqual(expected_times, len(runner_obj.result_queue)) for result_batch in runner_obj.result_queue: for result in result_batch: @@ -301,8 +294,9 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase): runner_obj._run_scenario(fakes.FakeScenario, "something_went_wrong", self.context, self.args) - # NOTE(mmorais): when duration is 0, scenario executes exactly 1 time - expected_times = 1 + # NOTE(mmorais/msimonin): when duration is 0, scenario executes exactly + # 1 time per unit of parrallelism + expected_times = self.config["concurrency"] self.assertEqual(expected_times, len(runner_obj.result_queue)) for result_batch in runner_obj.result_queue: for result in result_batch: @@ -315,8 +309,9 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase): runner_obj._run_scenario(fakes.FakeScenario, "raise_timeout", self.context, self.args) - # NOTE(mmorais): when duration is 0, scenario executes exactly 1 time - expected_times = 1 + # NOTE(mmorais/msimonin): when duration is 0, scenario executes exactly + # 1 time per unit of parrallelism + expected_times = self.config["concurrency"] self.assertEqual(expected_times, len(runner_obj.result_queue)) for result_batch in runner_obj.result_queue: for result in result_batch: @@ -324,7 +319,7 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase): self.assertIn("error", runner_obj.result_queue[0][0]) def test__run_scenario_constantly_aborted(self): - runner_obj = constant.ConstantForDurationScenarioRunner(None, + runner_obj = constant.ConstantForDurationScenarioRunner(self.task, self.config) runner_obj.abort()