diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index fc074cc83..1bb29e109 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -64,6 +64,9 @@ class DefaultEngine(base.Engine): params ) + # Checking a case when all tasks are completed immediately. + wf_handler.check_and_complete(wf_ex.id) + return wf_ex.get_clone() except exceptions.DBDuplicateEntryError: @@ -143,7 +146,23 @@ class DefaultEngine(base.Engine): action_handler.on_action_complete(action_ex, result) - return action_ex.get_clone() + result = action_ex.get_clone() + + # Need to see if checking workflow completion makes sense. + wf_ex_id = None + + if (action_ex.task_execution_id + and states.is_completed(action_ex.task_execution.state)): + wf_ex_id = action_ex.task_execution.workflow_execution_id + + # Note: We must do this check in a new transaction to make sure + # that at least one of the parallel transactions will do a consistent + # read from the DB. + if wf_ex_id: + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex_id) + + return result @db_utils.retry_on_db_error @action_queue.process @@ -158,7 +177,22 @@ class DefaultEngine(base.Engine): action_handler.on_action_update(action_ex, state) - return action_ex.get_clone() + result = action_ex.get_clone() + + wf_ex_id = None + + if (action_ex.task_execution_id + and states.is_completed(action_ex.task_execution.state)): + wf_ex_id = action_ex.task_execution.workflow_execution_id + + # Note: We must do this check in a new transaction to make sure + # that at least one of the parallel transactions will do a consistent + # read from the DB. + if wf_ex_id: + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex_id) + + return result @db_utils.retry_on_db_error @action_queue.process diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index 844714bf0..a0dd790d1 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -525,11 +525,14 @@ def _complete_task(task_ex_id, state, state_info): from mistral.engine import task_handler with db_api.transaction(): - task_handler.complete_task( - db_api.get_task_execution(task_ex_id), - state, - state_info - ) + task_ex = db_api.get_task_execution(task_ex_id) + + wf_ex_id = task_ex.workflow_execution_id + + task_handler.complete_task(task_ex, state, state_info) + + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex_id) @db_utils.retry_on_db_error @@ -540,11 +543,15 @@ def _fail_task_if_incomplete(task_ex_id, timeout): with db_api.transaction(): task_ex = db_api.get_task_execution(task_ex_id) + wf_ex_id = None + if not states.is_completed(task_ex.state): msg = 'Task timed out [timeout(s)=%s].' % timeout - task_handler.complete_task( - db_api.get_task_execution(task_ex_id), - states.ERROR, - msg - ) + wf_ex_id = task_ex.workflow_execution_id + + task_handler.complete_task(task_ex, states.ERROR, msg) + + if wf_ex_id: + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex_id) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index d3f431c64..3b9e3da03 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -418,6 +418,10 @@ def _refresh_task_state(task_ex_id): (task_ex_id, task_ex.name, state) ) + if states.is_completed(task_ex.state): + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex.id) + def _schedule_refresh_task_state(task_ex, delay=0): """Schedules task preconditions check. @@ -461,6 +465,15 @@ def _scheduled_on_action_complete(action_ex_id, wf_action): _on_action_complete(action_ex) + wf_ex_id = None + + if states.is_completed(action_ex.task_execution.state): + wf_ex_id = action_ex.task_execution.workflow_execution_id + + if wf_ex_id: + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex_id) + def schedule_on_action_complete(action_ex, delay=0): """Schedules task completion check. @@ -507,6 +520,15 @@ def _scheduled_on_action_update(action_ex_id, wf_action): _on_action_update(action_ex) + wf_ex_id = None + + if states.is_completed(action_ex.task_execution.state): + wf_ex_id = action_ex.task_execution.workflow_execution_id + + if wf_ex_id: + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex_id) + def schedule_on_action_update(action_ex, delay=0): """Schedules task update check. diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index f1919f1c1..1176267fe 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -19,9 +19,7 @@ from oslo_utils import timeutils from osprofiler import profiler import traceback as tb -from mistral.db import utils as db_utils from mistral.db.v2 import api as db_api -from mistral.engine import action_queue from mistral.engine import workflows from mistral import exceptions as exc from mistral.services import scheduler @@ -31,8 +29,8 @@ LOG = logging.getLogger(__name__) CONF = cfg.CONF -_CHECK_AND_COMPLETE_PATH = ( - 'mistral.engine.workflow_handler._check_and_complete' +_CHECK_AND_FIX_INTEGRITY_PATH = ( + 'mistral.engine.workflow_handler._check_and_fix_integrity' ) @@ -54,7 +52,7 @@ def start_workflow(wf_identifier, wf_namespace, wf_ex_id, wf_input, desc, params=params ) - _schedule_check_and_complete(wf.wf_ex) + _schedule_check_and_fix_integrity(wf.wf_ex, delay=10) return wf.wf_ex @@ -88,54 +86,30 @@ def cancel_workflow(wf_ex, msg=None): stop_workflow(wf_ex, states.CANCELLED, msg) -@db_utils.retry_on_db_error -@action_queue.process @profiler.trace('workflow-handler-check-and-complete', hide_args=True) -def _check_and_complete(wf_ex_id): - # Note: This method can only be called via scheduler. - with db_api.transaction(): - wf_ex = db_api.load_workflow_execution(wf_ex_id) +def check_and_complete(wf_ex_id): + wf_ex = db_api.load_workflow_execution(wf_ex_id) - if not wf_ex or states.is_completed(wf_ex.state): - return + if not wf_ex or states.is_completed(wf_ex.state): + return - wf = workflows.Workflow(wf_ex=wf_ex) + wf = workflows.Workflow(wf_ex=wf_ex) - try: - check_and_fix_integrity(wf_ex) + try: + wf.check_and_complete() + except exc.MistralException as e: + msg = ( + "Failed to check and complete [wf_ex_id=%s, wf_name=%s]:" + " %s\n%s" % (wf_ex_id, wf_ex.name, e, tb.format_exc()) + ) - num_incomplete_tasks = wf.check_and_complete() + LOG.error(msg) - if not states.is_completed(wf_ex.state): - delay = ( - 2 + int(num_incomplete_tasks * 0.1) if num_incomplete_tasks - else 4 - ) - - # Rescheduling this check may not happen if errors are - # raised in the business logic. If the error is DB related - # and not considered fatal (e.g. disconnect, deadlock), the - # retry annotation around the method will ensure that the - # whole method is retried in a new transaction. On fatal - # errors, the check should not be rescheduled as it could - # result in undesired consequences. - # In case there are some errors that should not be - # considered fatal, those should be handled explicitly. - _schedule_check_and_complete(wf_ex, delay) - - except exc.MistralException as e: - msg = ( - "Failed to check and complete [wf_ex_id=%s, wf_name=%s]:" - " %s\n%s" % (wf_ex_id, wf_ex.name, e, tb.format_exc()) - ) - - LOG.error(msg) - - force_fail_workflow(wf.wf_ex, msg) + force_fail_workflow(wf.wf_ex, msg) @profiler.trace('workflow-handler-check-and-fix-integrity') -def check_and_fix_integrity(wf_ex): +def _check_and_fix_integrity(wf_ex_id): check_after_seconds = CONF.engine.execution_integrity_check_delay if check_after_seconds < 0: @@ -145,55 +119,75 @@ def check_and_fix_integrity(wf_ex): # To break cyclic dependency. from mistral.engine import task_handler - running_task_execs = db_api.get_task_executions( - workflow_execution_id=wf_ex.id, - state=states.RUNNING - ) + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex_id) - for t_ex in running_task_execs: - # The idea is that we take the latest known timestamp of the task - # execution and consider it eligible for checking and fixing only - # if some minimum period of time elapsed since the last update. - timestamp = t_ex.updated_at or t_ex.created_at + if states.is_completed(wf_ex.state): + return - delta = timeutils.delta_seconds(timestamp, timeutils.utcnow()) + _schedule_check_and_fix_integrity(wf_ex, delay=60) - if delta < check_after_seconds: - continue - - child_executions = t_ex.executions - - if not child_executions: - continue - - all_finished = all( - [states.is_completed(c_ex.state) for c_ex in child_executions] + running_task_execs = db_api.get_task_executions( + workflow_execution_id=wf_ex.id, + state=states.RUNNING ) - if all_finished: - # Find the timestamp of the most recently finished child. - most_recent_child_timestamp = max( - [c_ex.updated_at or c_ex.created_at for c_ex in - child_executions] - ) - interval = timeutils.delta_seconds( - most_recent_child_timestamp, - timeutils.utcnow() + any_completed = False + + for t_ex in running_task_execs: + # The idea is that we take the latest known timestamp of the task + # execution and consider it eligible for checking and fixing only + # if some minimum period of time elapsed since the last update. + timestamp = t_ex.updated_at or t_ex.created_at + + delta = timeutils.delta_seconds(timestamp, timeutils.utcnow()) + + if delta < check_after_seconds: + continue + + child_executions = t_ex.executions + + if not child_executions: + continue + + all_finished = all( + [states.is_completed(c_ex.state) for c_ex in child_executions] ) - if interval > check_after_seconds: - # We found a task execution in RUNNING state for which all - # child executions are finished. We need to call - # "schedule_on_action_complete" on the task handler for any of - # the child executions so that the task state is calculated and - # updated properly. - LOG.warning( - "Found a task execution that is likely stuck in RUNNING" - " state because all child executions are finished," - " will try to recover [task_execution=%s]", t_ex.id + if all_finished: + # Find the timestamp of the most recently finished child. + most_recent_child_timestamp = max( + [c_ex.updated_at or c_ex.created_at for c_ex in + child_executions] + ) + interval = timeutils.delta_seconds( + most_recent_child_timestamp, + timeutils.utcnow() ) - task_handler.schedule_on_action_complete(child_executions[-1]) + if interval > check_after_seconds: + # We found a task execution in RUNNING state for which all + # child executions are finished. We need to call + # "schedule_on_action_complete" on the task handler for + # any of the child executions so that the task state is + # calculated and updated properly. + LOG.warning( + "Found a task execution that is likely stuck in" + " RUNNING state because all child executions are" + " finished, will try to recover [task_execution=%s]", + t_ex.id + ) + + task_handler.schedule_on_action_complete( + child_executions[-1] + ) + + if states.is_completed(t_ex.state): + any_completed = True + + if any_completed: + with db_api.transaction(): + check_and_complete(wf_ex_id) def pause_workflow(wf_ex, msg=None): @@ -222,6 +216,17 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None): wf.rerun(task_ex, reset=reset, env=env) + _schedule_check_and_fix_integrity( + wf_ex, + delay=CONF.engine.execution_integrity_check_delay + ) + + if wf_ex.task_execution_id: + _schedule_check_and_fix_integrity( + wf_ex.task_execution.workflow_execution, + delay=CONF.engine.execution_integrity_check_delay + ) + def resume_workflow(wf_ex, env=None): if not states.is_paused_or_idle(wf_ex.state): @@ -256,31 +261,22 @@ def set_workflow_state(wf_ex, state, msg=None): ) -def _get_completion_check_key(wf_ex): - return 'wfh_on_c_a_c-%s' % wf_ex.id +def _get_integrity_check_key(wf_ex): + return 'wfh_c_a_f_i-%s' % wf_ex.id @profiler.trace('workflow-handler-schedule-check-and-complete', hide_args=True) -def _schedule_check_and_complete(wf_ex, delay=0): - """Schedules workflow completion check. - - This method provides transactional decoupling of task completion from - workflow completion check. It's needed in non-locking model in order to - avoid 'phantom read' phenomena when reading state of multiple tasks - to see if a workflow is completed. Just starting a separate transaction - without using scheduler is not safe due to concurrency window that we'll - have in this case (time between transactions) whereas scheduler is a - special component that is designed to be resistant to failures. +def _schedule_check_and_fix_integrity(wf_ex, delay=0): + """Schedules workflow integrity check. :param wf_ex: Workflow execution. - :param delay: Minimum amount of time before task completion check - should be made. + :param delay: Minimum amount of time before the check should be made. """ - key = _get_completion_check_key(wf_ex) + key = _get_integrity_check_key(wf_ex) scheduler.schedule_call( None, - _CHECK_AND_COMPLETE_PATH, + _CHECK_AND_FIX_INTEGRITY_PATH, delay, key=key, wf_ex_id=wf_ex.id diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 0653f8733..6a64376ad 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -257,7 +257,12 @@ class Workflow(object): from mistral.engine import workflow_handler self.set_state(states.RUNNING) - workflow_handler._schedule_check_and_complete(self.wf_ex) + + # TODO(rakhmerov): We call a internal method of a module here. + # The simplest way is to make it public, however, I believe + # it's another "bad smell" that tells that some refactoring + # of the architecture should be made. + workflow_handler._schedule_check_and_fix_integrity(self.wf_ex) if self.wf_ex.task_execution_id: parent_task_ex = db_api.get_task_execution( diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index 8bc514759..9004410fc 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -539,10 +539,7 @@ class DefaultEngineTest(base.DbTestCase): task_execs = wf_ex.task_executions - # Workflow completion check is done separate with scheduler - # but scheduler doesn't start in this test (in fact, it's just - # a DB test)so the workflow is expected to be in running state. - self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(states.SUCCESS, wf_ex.state) self.assertIsInstance(task2_action_ex, models.ActionExecution) self.assertEqual('std.echo', task2_action_ex.name) diff --git a/mistral/tests/unit/engine/test_direct_workflow.py b/mistral/tests/unit/engine/test_direct_workflow.py index 364542fef..e6141b6d0 100644 --- a/mistral/tests/unit/engine/test_direct_workflow.py +++ b/mistral/tests/unit/engine/test_direct_workflow.py @@ -753,7 +753,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): calls = db_api.get_delayed_calls() - mtd_name = 'mistral.engine.workflow_handler._check_and_complete' + mtd_name = 'mistral.engine.workflow_handler._check_and_fix_integrity' self._assert_single_item(calls, target_method_name=mtd_name) @@ -780,7 +780,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): calls = db_api.get_delayed_calls() - mtd_name = 'mistral.engine.workflow_handler._check_and_complete' + mtd_name = 'mistral.engine.workflow_handler._check_and_fix_integrity' self._assert_single_item(calls, target_method_name=mtd_name) diff --git a/mistral/tests/unit/engine/test_integrity_check.py b/mistral/tests/unit/engine/test_integrity_check.py index 42eea5455..cc1c0fe8e 100644 --- a/mistral/tests/unit/engine/test_integrity_check.py +++ b/mistral/tests/unit/engine/test_integrity_check.py @@ -30,6 +30,8 @@ class IntegrityCheckTest(base.EngineTestCase): ) def test_task_execution_integrity(self): + self.override_config('execution_integrity_check_delay', 1, 'engine') + # The idea of the test is that we use the no-op asynchronous action # so that action and task execution state is not automatically set # to SUCCESS after we start the workflow. We'll update the action diff --git a/mistral/tests/unit/engine/test_join.py b/mistral/tests/unit/engine/test_join.py index 4b6cac7d2..314abf7cf 100644 --- a/mistral/tests/unit/engine/test_join.py +++ b/mistral/tests/unit/engine/test_join.py @@ -235,7 +235,6 @@ class JoinEngineTest(base.EngineTestCase): # uncertainty of its running in parallel with task3. self.await_task_success(task4.id) - self.assertEqual(states.RUNNING, wf_ex.state) self.assertEqual(states.SUCCESS, task1.state) self.assertEqual(states.SUCCESS, task2.state)