diff --git a/mistral/engine/action_queue.py b/mistral/engine/action_queue.py index 1a92d46ba..96dcd6f3c 100644 --- a/mistral/engine/action_queue.py +++ b/mistral/engine/action_queue.py @@ -99,7 +99,7 @@ def process(func): # NOTE(rakhmerov): Since we make RPC calls to the engine itself # we need to process the action queue asynchronously in a new # thread. Otherwise, if we have one engine process the engine - # will may send a request to itself while already processing + # may send a request to itself while already processing # another one. In conjunction with blocking RPC it will lead # to a deadlock (and RPC timeout). def _within_new_thread(): diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index fb7c8e921..d3f431c64 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -78,8 +78,7 @@ def run_task(wf_cmd): return - if task.is_waiting() and (task.is_created() or task.is_state_changed()): - _schedule_refresh_task_state(task.task_ex, 1) + _check_affected_tasks(task) def rerun_task(task_ex, wf_spec): @@ -129,6 +128,10 @@ def _on_action_complete(action_ex): wf_handler.force_fail_workflow(wf_ex, msg) + return + + _check_affected_tasks(task) + @profiler.trace('task-handler-on-action-update', hide_args=True) def _on_action_update(action_ex): @@ -186,6 +189,8 @@ def _on_action_update(action_ex): return + _check_affected_tasks(task) + def force_fail_task(task_ex, msg): """Forces the given task to fail. @@ -238,6 +243,8 @@ def continue_task(task_ex): return + _check_affected_tasks(task) + def complete_task(task_ex, state, state_info): wf_spec = spec_parser.get_workflow_spec_by_execution_id( @@ -264,6 +271,33 @@ def complete_task(task_ex, state, state_info): return + _check_affected_tasks(task) + + +def _check_affected_tasks(task): + if not task.is_completed(): + return + + task_ex = task.task_ex + + wf_ex = task_ex.workflow_execution + + if states.is_completed(wf_ex.state): + return + + wf_spec = spec_parser.get_workflow_spec_by_execution_id( + task_ex.workflow_execution_id + ) + + wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) + + affected_task_execs = wf_ctrl.find_indirectly_affected_task_executions( + task_ex.name + ) + + for t_ex in affected_task_execs: + _schedule_refresh_task_state(t_ex) + def _build_task_from_execution(wf_spec, task_ex): return _create_task( @@ -350,39 +384,39 @@ def _refresh_task_state(task_ex_id): wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) - log_state = wf_ctrl.get_logical_task_state( - task_ex - ) + with db_api.named_lock(task_ex.id): + db_api.refresh(task_ex) - state = log_state.state - state_info = log_state.state_info + if (states.is_completed(task_ex.state) + or task_ex.state == states.RUNNING): + return - # Update 'triggered_by' because it could have changed. - task_ex.runtime_context['triggered_by'] = log_state.triggered_by + log_state = wf_ctrl.get_logical_task_state(task_ex) - if state == states.RUNNING: - continue_task(task_ex) - elif state == states.ERROR: - complete_task(task_ex, state, state_info) - elif state == states.WAITING: - # Let's assume that a task takes 0.01 sec in average to complete - # and based on this assumption calculate a time of the next check. - # The estimation is very rough, of course, but this delay will be - # decreasing as task preconditions will be completing which will - # give a decent asymptotic approximation. - # For example, if a 'join' task has 100 inbound incomplete tasks - # then the next 'refresh_task_state' call will happen in 10 - # seconds. For 500 tasks it will be 50 seconds. The larger the - # workflow is, the more beneficial this mechanism will be. - delay = int(log_state.cardinality * 0.01) + state = log_state.state + state_info = log_state.state_info - _schedule_refresh_task_state(task_ex, max(1, delay)) - else: - # Must never get here. - raise RuntimeError( - 'Unexpected logical task state [task_ex_id=%s, task_name=%s, ' - 'state=%s]' % (task_ex_id, task_ex.name, state) - ) + # Update 'triggered_by' because it could have changed. + task_ex.runtime_context['triggered_by'] = log_state.triggered_by + + if state == states.RUNNING: + continue_task(task_ex) + elif state == states.ERROR: + complete_task(task_ex, state, state_info) + elif state == states.WAITING: + LOG.info( + "Task execution is still in WAITING state" + " [task_ex_id=%s, task_name=%s]", + task_ex_id, + task_ex.name + ) + else: + # Must never get here. + raise RuntimeError( + 'Unexpected logical task state [task_ex_id=%s, ' + 'task_name=%s, state=%s]' % + (task_ex_id, task_ex.name, state) + ) def _schedule_refresh_task_state(task_ex, delay=0): @@ -401,7 +435,7 @@ def _schedule_refresh_task_state(task_ex, delay=0): :param task_ex: Task execution. :param delay: Delay. """ - key = 'th_c_t_s_a-%s' % task_ex.id + key = _get_refresh_state_job_key(task_ex.id) scheduler.schedule_call( None, @@ -412,6 +446,10 @@ def _schedule_refresh_task_state(task_ex, delay=0): ) +def _get_refresh_state_job_key(task_ex_id): + return 'th_r_t_s-%s' % task_ex_id + + @db_utils.retry_on_db_error @action_queue.process def _scheduled_on_action_complete(action_ex_id, wf_action): @@ -492,7 +530,7 @@ def schedule_on_action_update(action_ex, delay=0): return - key = 'th_on_a_c-%s' % action_ex.task_execution_id + key = 'th_on_a_u-%s' % action_ex.task_execution_id scheduler.schedule_call( None, diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index daffd60ad..1f1062b34 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -140,13 +140,14 @@ class EngineTestCase(base.DbTestCase): for t in w.task_executions: print( "\t%s [id=%s, state=%s, state_info=%s, processed=%s," - " published=%s]" % + " published=%s, runtime_context=%s]" % (t.name, t.id, t.state, t.state_info, t.processed, - t.published) + t.published, + t.runtime_context) ) child_execs = t.executions diff --git a/mistral/tests/unit/engine/test_join.py b/mistral/tests/unit/engine/test_join.py index c19f7e921..4b6cac7d2 100644 --- a/mistral/tests/unit/engine/test_join.py +++ b/mistral/tests/unit/engine/test_join.py @@ -873,18 +873,11 @@ class JoinEngineTest(base.EngineTestCase): state=states.WAITING ) - calls = db_api.get_delayed_calls() - - mtd_name = 'mistral.engine.task_handler._refresh_task_state' - - cnt = sum([1 for c in calls if c.target_method_name == mtd_name]) - - # There can be 2 calls with different value of 'processing' flag. - self.assertTrue(cnt == 1 or cnt == 2) - # Stop the workflow. self.engine.stop_workflow(wf_ex.id, state=states.CANCELLED) + mtd_name = 'mistral.engine.task_handler._refresh_task_state' + self._await( lambda: len(db_api.get_delayed_calls(target_method_name=mtd_name)) == 0 @@ -931,18 +924,11 @@ class JoinEngineTest(base.EngineTestCase): state=states.WAITING ) - calls = db_api.get_delayed_calls() - - mtd_name = 'mistral.engine.task_handler._refresh_task_state' - - cnt = sum([1 for c in calls if c.target_method_name == mtd_name]) - - # There can be 2 calls with different value of 'processing' flag. - self.assertTrue(cnt == 1 or cnt == 2) - # Stop the workflow. db_api.delete_workflow_execution(wf_ex.id) + mtd_name = 'mistral.engine.task_handler._refresh_task_state' + self._await( lambda: len(db_api.get_delayed_calls(target_method_name=mtd_name)) == 0 diff --git a/mistral/tests/unit/engine/test_subworkflows_pause_resume.py b/mistral/tests/unit/engine/test_subworkflows_pause_resume.py index 4cf6e4cb2..d3e4112b3 100644 --- a/mistral/tests/unit/engine/test_subworkflows_pause_resume.py +++ b/mistral/tests/unit/engine/test_subworkflows_pause_resume.py @@ -1025,8 +1025,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): # Get objects for the parent workflow execution. wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - wf_1_task_execs = wf_1_ex.task_executions - wf_1_task_1_ex = self._assert_single_item( wf_1_ex.task_executions, name='task1' @@ -1049,8 +1047,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wf_1_task_1_action_exs[0].id ) - wf_2_ex_1_task_execs = wf_2_ex_1.task_executions - wf_2_ex_1_task_1_ex = self._assert_single_item( wf_2_ex_1.task_executions, name='task1' @@ -1064,8 +1060,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wf_1_task_1_action_exs[1].id ) - wf_2_ex_2_task_execs = wf_2_ex_2.task_executions - wf_2_ex_2_task_1_ex = self._assert_single_item( wf_2_ex_2.task_executions, name='task1' @@ -1079,8 +1073,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wf_1_task_1_action_exs[2].id ) - wf_2_ex_3_task_execs = wf_2_ex_3.task_executions - wf_2_ex_3_task_1_ex = self._assert_single_item( wf_2_ex_3.task_executions, name='task1' @@ -1093,8 +1085,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): # Get objects for the wf3 subworkflow execution. wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - wf_3_task_execs = wf_3_ex.task_executions - wf_3_task_1_ex = self._assert_single_item( wf_3_ex.task_executions, name='task1' @@ -1149,8 +1139,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): # Get objects for the parent workflow execution. wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - wf_1_task_execs = wf_1_ex.task_executions - wf_1_task_1_ex = self._assert_single_item( wf_1_ex.task_executions, name='task1' @@ -1173,8 +1161,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wf_1_task_1_action_exs[0].id ) - wf_2_ex_1_task_execs = wf_2_ex_1.task_executions - wf_2_ex_1_task_1_ex = self._assert_single_item( wf_2_ex_1.task_executions, name='task1' @@ -1188,8 +1174,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wf_1_task_1_action_exs[1].id ) - wf_2_ex_2_task_execs = wf_2_ex_2.task_executions - wf_2_ex_2_task_1_ex = self._assert_single_item( wf_2_ex_2.task_executions, name='task1' @@ -1203,8 +1187,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wf_1_task_1_action_exs[2].id ) - wf_2_ex_3_task_execs = wf_2_ex_3.task_executions - wf_2_ex_3_task_1_ex = self._assert_single_item( wf_2_ex_3.task_executions, name='task1' @@ -1217,8 +1199,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): # Get objects for the wf3 subworkflow execution. wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - wf_3_task_execs = wf_3_ex.task_executions - wf_3_task_1_ex = self._assert_single_item( wf_3_ex.task_executions, name='task1' @@ -1292,8 +1272,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): # Get objects for the parent workflow execution. wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - wf_1_task_execs = wf_1_ex.task_executions - wf_1_task_1_ex = self._assert_single_item( wf_1_ex.task_executions, name='task1' @@ -1316,8 +1294,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wf_1_task_1_action_exs[0].id ) - wf_2_ex_1_task_execs = wf_2_ex_1.task_executions - wf_2_ex_1_task_1_ex = self._assert_single_item( wf_2_ex_1.task_executions, name='task1' @@ -1331,8 +1307,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wf_1_task_1_action_exs[1].id ) - wf_2_ex_2_task_execs = wf_2_ex_2.task_executions - wf_2_ex_2_task_1_ex = self._assert_single_item( wf_2_ex_2.task_executions, name='task1' @@ -1346,8 +1320,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wf_1_task_1_action_exs[2].id ) - wf_2_ex_3_task_execs = wf_2_ex_3.task_executions - wf_2_ex_3_task_1_ex = self._assert_single_item( wf_2_ex_3.task_executions, name='task1' @@ -1360,8 +1332,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): # Get objects for the wf3 subworkflow execution. wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - wf_3_task_execs = wf_3_ex.task_executions - wf_3_task_1_ex = self._assert_single_item( wf_3_ex.task_executions, name='task1' diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index 0d8cbdecd..ebc576e78 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -167,6 +167,16 @@ class WorkflowController(object): """ raise NotImplementedError + @abc.abstractmethod + def find_indirectly_affected_task_executions(self, task_name): + """Get a set of task executions indirectly affected by the given. + + :param task_name: Task name. + :return: Task executions that can be indirectly affected by a task + identified by the given name. + """ + raise NotImplementedError + @abc.abstractmethod def is_error_handled_for(self, task_ex): """Determines if error is handled for specific task. diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index fac22e36a..ca1aae2fe 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -191,6 +191,9 @@ class DirectWorkflowController(base.WorkflowController): return self._get_join_logical_state(task_spec) + def find_indirectly_affected_task_executions(self, task_name): + return self._find_indirectly_affected_created_joins(task_name) + def is_error_handled_for(self, task_ex): return bool(self.wf_spec.get_on_error_clause(task_ex.name)) @@ -308,6 +311,54 @@ class DirectWorkflowController(base.WorkflowController): if not condition or expr.evaluate(condition, ctx) ] + @profiler.trace('direct-wf-controller-find-downstream-joins') + def _find_indirectly_affected_created_joins(self, task_name, result=None, + visited_task_names=None): + visited_task_names = visited_task_names or set() + + if task_name in visited_task_names: + return + + visited_task_names.add(task_name) + + result = result or set() + + def _process_clause(clause): + for t_name, condition, params in clause: + t_spec = self.wf_spec.get_tasks()[t_name] + + # Encountered an engine command. + if not t_spec: + continue + + if t_spec.get_join(): + # TODO(rakhmerov): This is a fundamental limitation + # that prevents us having cycles within workflows + # that contain joins because we assume that there + # can be only one "join" task with a given name. + t_ex = self._find_task_execution_by_name(t_name) + + if t_ex: + result.add(t_ex) + + # If we found a "join" we don't need to go further + # because completion of the found join will handle + # other deeper joins. + continue + + # Recursion. + self._find_indirectly_affected_created_joins( + t_name, + result=result, + visited_task_names=visited_task_names + ) + + _process_clause(self.wf_spec.get_on_success_clause(task_name)) + _process_clause(self.wf_spec.get_on_error_clause(task_name)) + _process_clause(self.wf_spec.get_on_complete_clause(task_name)) + + return result + @profiler.trace('direct-wf-controller-get-join-logical-state') def _get_join_logical_state(self, task_spec): """Evaluates logical state of 'join' task. diff --git a/mistral/workflow/lookup_utils.py b/mistral/workflow/lookup_utils.py index ea145ee5e..7f78672b2 100644 --- a/mistral/workflow/lookup_utils.py +++ b/mistral/workflow/lookup_utils.py @@ -88,7 +88,8 @@ def find_task_executions_by_name(wf_ex_id, task_name): :param wf_ex_id: Workflow execution id. :param task_name: Task name. - :return: Task executions (possibly a cached value). + :return: Task executions (possibly a cached value). The returned list + may contain task execution clones not bound to the DB session. """ with _TASK_EX_CACHE_LOCK: t_execs = _TASK_EX_CACHE[wf_ex_id].get(task_name) @@ -102,6 +103,8 @@ def find_task_executions_by_name(wf_ex_id, task_name): sort_keys=[] # disable sorting ) + t_execs = [t_ex.get_clone() for t_ex in t_execs] + # We can cache only finished tasks because they won't change. all_finished = ( t_execs and diff --git a/mistral/workflow/reverse_workflow.py b/mistral/workflow/reverse_workflow.py index f233f3c9b..8411bbf7c 100644 --- a/mistral/workflow/reverse_workflow.py +++ b/mistral/workflow/reverse_workflow.py @@ -118,6 +118,9 @@ class ReverseWorkflowController(base.WorkflowController): # TODO(rakhmerov): Implement. return base.TaskLogicalState(task_ex.state, task_ex.state_info) + def find_indirectly_affected_task_executions(self, task_name): + return set() + def is_error_handled_for(self, task_ex): return task_ex.state != states.ERROR diff --git a/releasenotes/notes/remove_polling_from_join-3a7921c4af741822.yaml b/releasenotes/notes/remove_polling_from_join-3a7921c4af741822.yaml new file mode 100644 index 000000000..3a339f007 --- /dev/null +++ b/releasenotes/notes/remove_polling_from_join-3a7921c4af741822.yaml @@ -0,0 +1,9 @@ +--- +fixes: + - | + Removed DB polling from the logic that checks readiness of a "join" task + which leads to situations when CPU was mostly occupied by scheduler that + runs corresponding periodic jobs and that doesn't let the workflow move + forward with a proper speed. That happens in case if a workflow has lots + of "join" tasks with many dependencies. It's fixed now. +