From 3d7acd3957a75457da4ca87ae9ebd5cc61d28149 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Thu, 4 Oct 2018 11:50:03 +0700 Subject: [PATCH] Improve workflow completion logic by removing periodic jobs * Workflow completion algorithm use periodic scheduled jobs to poll DB and determine when a workflow is finished. The problem with this approach is that if Mistral runs another iteration of such job too soon then running such jobs will create a big load on the system. If too late, then a workflow may be in RUNNING state for too long after all its tasks are completed. The current implementation tries to predict a delay with which the next job should run, based on a number of incompleted tasks. This approach was initially taken because we switched to a non-blocking transactional model (previously we locked the entire workflow execution graph in order to change a state of anything) and in this architecture, when we have parallel branches, i.e. parallel DB transactions, we can't make a consistent read from DB from neither of these transactions to make a reliable decision about whether the workflow is completed or not. Using periodic jobs was a solution. However, this approach has been proven to work unreliably because such a prediction about delay before the next job iteration doesn't work well on all variety of use cases that we have. This patch removes using periodic jobs in favor of using the "two transactions" approach when in the first transaction we handle action completion event (and task completion if it causes it) and in the second transaction, if a task is completed, we check if the workflow is completed. This approach guarantees that at least one of the "second" transactions in parallel branches will make needed consistent read from DB (i.e. will see the actuall state of all needed objects) to make the right decision. Closes-Bug: #1799382 Change-Id: I2333507503b3b8226c184beb0bd783e1dcfa397f --- mistral/engine/default_engine.py | 38 +++- mistral/engine/policies.py | 27 ++- mistral/engine/task_handler.py | 22 ++ mistral/engine/workflow_handler.py | 196 +++++++++--------- mistral/engine/workflows.py | 7 +- .../tests/unit/engine/test_default_engine.py | 5 +- .../tests/unit/engine/test_direct_workflow.py | 4 +- .../tests/unit/engine/test_integrity_check.py | 2 + mistral/tests/unit/engine/test_join.py | 1 - 9 files changed, 182 insertions(+), 120 deletions(-) diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index fc074cc83..1bb29e109 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -64,6 +64,9 @@ class DefaultEngine(base.Engine): params ) + # Checking a case when all tasks are completed immediately. + wf_handler.check_and_complete(wf_ex.id) + return wf_ex.get_clone() except exceptions.DBDuplicateEntryError: @@ -143,7 +146,23 @@ class DefaultEngine(base.Engine): action_handler.on_action_complete(action_ex, result) - return action_ex.get_clone() + result = action_ex.get_clone() + + # Need to see if checking workflow completion makes sense. + wf_ex_id = None + + if (action_ex.task_execution_id + and states.is_completed(action_ex.task_execution.state)): + wf_ex_id = action_ex.task_execution.workflow_execution_id + + # Note: We must do this check in a new transaction to make sure + # that at least one of the parallel transactions will do a consistent + # read from the DB. + if wf_ex_id: + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex_id) + + return result @db_utils.retry_on_db_error @action_queue.process @@ -158,7 +177,22 @@ class DefaultEngine(base.Engine): action_handler.on_action_update(action_ex, state) - return action_ex.get_clone() + result = action_ex.get_clone() + + wf_ex_id = None + + if (action_ex.task_execution_id + and states.is_completed(action_ex.task_execution.state)): + wf_ex_id = action_ex.task_execution.workflow_execution_id + + # Note: We must do this check in a new transaction to make sure + # that at least one of the parallel transactions will do a consistent + # read from the DB. + if wf_ex_id: + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex_id) + + return result @db_utils.retry_on_db_error @action_queue.process diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index 844714bf0..a0dd790d1 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -525,11 +525,14 @@ def _complete_task(task_ex_id, state, state_info): from mistral.engine import task_handler with db_api.transaction(): - task_handler.complete_task( - db_api.get_task_execution(task_ex_id), - state, - state_info - ) + task_ex = db_api.get_task_execution(task_ex_id) + + wf_ex_id = task_ex.workflow_execution_id + + task_handler.complete_task(task_ex, state, state_info) + + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex_id) @db_utils.retry_on_db_error @@ -540,11 +543,15 @@ def _fail_task_if_incomplete(task_ex_id, timeout): with db_api.transaction(): task_ex = db_api.get_task_execution(task_ex_id) + wf_ex_id = None + if not states.is_completed(task_ex.state): msg = 'Task timed out [timeout(s)=%s].' % timeout - task_handler.complete_task( - db_api.get_task_execution(task_ex_id), - states.ERROR, - msg - ) + wf_ex_id = task_ex.workflow_execution_id + + task_handler.complete_task(task_ex, states.ERROR, msg) + + if wf_ex_id: + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex_id) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index d3f431c64..3b9e3da03 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -418,6 +418,10 @@ def _refresh_task_state(task_ex_id): (task_ex_id, task_ex.name, state) ) + if states.is_completed(task_ex.state): + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex.id) + def _schedule_refresh_task_state(task_ex, delay=0): """Schedules task preconditions check. @@ -461,6 +465,15 @@ def _scheduled_on_action_complete(action_ex_id, wf_action): _on_action_complete(action_ex) + wf_ex_id = None + + if states.is_completed(action_ex.task_execution.state): + wf_ex_id = action_ex.task_execution.workflow_execution_id + + if wf_ex_id: + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex_id) + def schedule_on_action_complete(action_ex, delay=0): """Schedules task completion check. @@ -507,6 +520,15 @@ def _scheduled_on_action_update(action_ex_id, wf_action): _on_action_update(action_ex) + wf_ex_id = None + + if states.is_completed(action_ex.task_execution.state): + wf_ex_id = action_ex.task_execution.workflow_execution_id + + if wf_ex_id: + with db_api.transaction(): + wf_handler.check_and_complete(wf_ex_id) + def schedule_on_action_update(action_ex, delay=0): """Schedules task update check. diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index f1919f1c1..1176267fe 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -19,9 +19,7 @@ from oslo_utils import timeutils from osprofiler import profiler import traceback as tb -from mistral.db import utils as db_utils from mistral.db.v2 import api as db_api -from mistral.engine import action_queue from mistral.engine import workflows from mistral import exceptions as exc from mistral.services import scheduler @@ -31,8 +29,8 @@ LOG = logging.getLogger(__name__) CONF = cfg.CONF -_CHECK_AND_COMPLETE_PATH = ( - 'mistral.engine.workflow_handler._check_and_complete' +_CHECK_AND_FIX_INTEGRITY_PATH = ( + 'mistral.engine.workflow_handler._check_and_fix_integrity' ) @@ -54,7 +52,7 @@ def start_workflow(wf_identifier, wf_namespace, wf_ex_id, wf_input, desc, params=params ) - _schedule_check_and_complete(wf.wf_ex) + _schedule_check_and_fix_integrity(wf.wf_ex, delay=10) return wf.wf_ex @@ -88,54 +86,30 @@ def cancel_workflow(wf_ex, msg=None): stop_workflow(wf_ex, states.CANCELLED, msg) -@db_utils.retry_on_db_error -@action_queue.process @profiler.trace('workflow-handler-check-and-complete', hide_args=True) -def _check_and_complete(wf_ex_id): - # Note: This method can only be called via scheduler. - with db_api.transaction(): - wf_ex = db_api.load_workflow_execution(wf_ex_id) +def check_and_complete(wf_ex_id): + wf_ex = db_api.load_workflow_execution(wf_ex_id) - if not wf_ex or states.is_completed(wf_ex.state): - return + if not wf_ex or states.is_completed(wf_ex.state): + return - wf = workflows.Workflow(wf_ex=wf_ex) + wf = workflows.Workflow(wf_ex=wf_ex) - try: - check_and_fix_integrity(wf_ex) + try: + wf.check_and_complete() + except exc.MistralException as e: + msg = ( + "Failed to check and complete [wf_ex_id=%s, wf_name=%s]:" + " %s\n%s" % (wf_ex_id, wf_ex.name, e, tb.format_exc()) + ) - num_incomplete_tasks = wf.check_and_complete() + LOG.error(msg) - if not states.is_completed(wf_ex.state): - delay = ( - 2 + int(num_incomplete_tasks * 0.1) if num_incomplete_tasks - else 4 - ) - - # Rescheduling this check may not happen if errors are - # raised in the business logic. If the error is DB related - # and not considered fatal (e.g. disconnect, deadlock), the - # retry annotation around the method will ensure that the - # whole method is retried in a new transaction. On fatal - # errors, the check should not be rescheduled as it could - # result in undesired consequences. - # In case there are some errors that should not be - # considered fatal, those should be handled explicitly. - _schedule_check_and_complete(wf_ex, delay) - - except exc.MistralException as e: - msg = ( - "Failed to check and complete [wf_ex_id=%s, wf_name=%s]:" - " %s\n%s" % (wf_ex_id, wf_ex.name, e, tb.format_exc()) - ) - - LOG.error(msg) - - force_fail_workflow(wf.wf_ex, msg) + force_fail_workflow(wf.wf_ex, msg) @profiler.trace('workflow-handler-check-and-fix-integrity') -def check_and_fix_integrity(wf_ex): +def _check_and_fix_integrity(wf_ex_id): check_after_seconds = CONF.engine.execution_integrity_check_delay if check_after_seconds < 0: @@ -145,55 +119,75 @@ def check_and_fix_integrity(wf_ex): # To break cyclic dependency. from mistral.engine import task_handler - running_task_execs = db_api.get_task_executions( - workflow_execution_id=wf_ex.id, - state=states.RUNNING - ) + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex_id) - for t_ex in running_task_execs: - # The idea is that we take the latest known timestamp of the task - # execution and consider it eligible for checking and fixing only - # if some minimum period of time elapsed since the last update. - timestamp = t_ex.updated_at or t_ex.created_at + if states.is_completed(wf_ex.state): + return - delta = timeutils.delta_seconds(timestamp, timeutils.utcnow()) + _schedule_check_and_fix_integrity(wf_ex, delay=60) - if delta < check_after_seconds: - continue - - child_executions = t_ex.executions - - if not child_executions: - continue - - all_finished = all( - [states.is_completed(c_ex.state) for c_ex in child_executions] + running_task_execs = db_api.get_task_executions( + workflow_execution_id=wf_ex.id, + state=states.RUNNING ) - if all_finished: - # Find the timestamp of the most recently finished child. - most_recent_child_timestamp = max( - [c_ex.updated_at or c_ex.created_at for c_ex in - child_executions] - ) - interval = timeutils.delta_seconds( - most_recent_child_timestamp, - timeutils.utcnow() + any_completed = False + + for t_ex in running_task_execs: + # The idea is that we take the latest known timestamp of the task + # execution and consider it eligible for checking and fixing only + # if some minimum period of time elapsed since the last update. + timestamp = t_ex.updated_at or t_ex.created_at + + delta = timeutils.delta_seconds(timestamp, timeutils.utcnow()) + + if delta < check_after_seconds: + continue + + child_executions = t_ex.executions + + if not child_executions: + continue + + all_finished = all( + [states.is_completed(c_ex.state) for c_ex in child_executions] ) - if interval > check_after_seconds: - # We found a task execution in RUNNING state for which all - # child executions are finished. We need to call - # "schedule_on_action_complete" on the task handler for any of - # the child executions so that the task state is calculated and - # updated properly. - LOG.warning( - "Found a task execution that is likely stuck in RUNNING" - " state because all child executions are finished," - " will try to recover [task_execution=%s]", t_ex.id + if all_finished: + # Find the timestamp of the most recently finished child. + most_recent_child_timestamp = max( + [c_ex.updated_at or c_ex.created_at for c_ex in + child_executions] + ) + interval = timeutils.delta_seconds( + most_recent_child_timestamp, + timeutils.utcnow() ) - task_handler.schedule_on_action_complete(child_executions[-1]) + if interval > check_after_seconds: + # We found a task execution in RUNNING state for which all + # child executions are finished. We need to call + # "schedule_on_action_complete" on the task handler for + # any of the child executions so that the task state is + # calculated and updated properly. + LOG.warning( + "Found a task execution that is likely stuck in" + " RUNNING state because all child executions are" + " finished, will try to recover [task_execution=%s]", + t_ex.id + ) + + task_handler.schedule_on_action_complete( + child_executions[-1] + ) + + if states.is_completed(t_ex.state): + any_completed = True + + if any_completed: + with db_api.transaction(): + check_and_complete(wf_ex_id) def pause_workflow(wf_ex, msg=None): @@ -222,6 +216,17 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None): wf.rerun(task_ex, reset=reset, env=env) + _schedule_check_and_fix_integrity( + wf_ex, + delay=CONF.engine.execution_integrity_check_delay + ) + + if wf_ex.task_execution_id: + _schedule_check_and_fix_integrity( + wf_ex.task_execution.workflow_execution, + delay=CONF.engine.execution_integrity_check_delay + ) + def resume_workflow(wf_ex, env=None): if not states.is_paused_or_idle(wf_ex.state): @@ -256,31 +261,22 @@ def set_workflow_state(wf_ex, state, msg=None): ) -def _get_completion_check_key(wf_ex): - return 'wfh_on_c_a_c-%s' % wf_ex.id +def _get_integrity_check_key(wf_ex): + return 'wfh_c_a_f_i-%s' % wf_ex.id @profiler.trace('workflow-handler-schedule-check-and-complete', hide_args=True) -def _schedule_check_and_complete(wf_ex, delay=0): - """Schedules workflow completion check. - - This method provides transactional decoupling of task completion from - workflow completion check. It's needed in non-locking model in order to - avoid 'phantom read' phenomena when reading state of multiple tasks - to see if a workflow is completed. Just starting a separate transaction - without using scheduler is not safe due to concurrency window that we'll - have in this case (time between transactions) whereas scheduler is a - special component that is designed to be resistant to failures. +def _schedule_check_and_fix_integrity(wf_ex, delay=0): + """Schedules workflow integrity check. :param wf_ex: Workflow execution. - :param delay: Minimum amount of time before task completion check - should be made. + :param delay: Minimum amount of time before the check should be made. """ - key = _get_completion_check_key(wf_ex) + key = _get_integrity_check_key(wf_ex) scheduler.schedule_call( None, - _CHECK_AND_COMPLETE_PATH, + _CHECK_AND_FIX_INTEGRITY_PATH, delay, key=key, wf_ex_id=wf_ex.id diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 0653f8733..6a64376ad 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -257,7 +257,12 @@ class Workflow(object): from mistral.engine import workflow_handler self.set_state(states.RUNNING) - workflow_handler._schedule_check_and_complete(self.wf_ex) + + # TODO(rakhmerov): We call a internal method of a module here. + # The simplest way is to make it public, however, I believe + # it's another "bad smell" that tells that some refactoring + # of the architecture should be made. + workflow_handler._schedule_check_and_fix_integrity(self.wf_ex) if self.wf_ex.task_execution_id: parent_task_ex = db_api.get_task_execution( diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index 8bc514759..9004410fc 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -539,10 +539,7 @@ class DefaultEngineTest(base.DbTestCase): task_execs = wf_ex.task_executions - # Workflow completion check is done separate with scheduler - # but scheduler doesn't start in this test (in fact, it's just - # a DB test)so the workflow is expected to be in running state. - self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(states.SUCCESS, wf_ex.state) self.assertIsInstance(task2_action_ex, models.ActionExecution) self.assertEqual('std.echo', task2_action_ex.name) diff --git a/mistral/tests/unit/engine/test_direct_workflow.py b/mistral/tests/unit/engine/test_direct_workflow.py index 364542fef..e6141b6d0 100644 --- a/mistral/tests/unit/engine/test_direct_workflow.py +++ b/mistral/tests/unit/engine/test_direct_workflow.py @@ -753,7 +753,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): calls = db_api.get_delayed_calls() - mtd_name = 'mistral.engine.workflow_handler._check_and_complete' + mtd_name = 'mistral.engine.workflow_handler._check_and_fix_integrity' self._assert_single_item(calls, target_method_name=mtd_name) @@ -780,7 +780,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): calls = db_api.get_delayed_calls() - mtd_name = 'mistral.engine.workflow_handler._check_and_complete' + mtd_name = 'mistral.engine.workflow_handler._check_and_fix_integrity' self._assert_single_item(calls, target_method_name=mtd_name) diff --git a/mistral/tests/unit/engine/test_integrity_check.py b/mistral/tests/unit/engine/test_integrity_check.py index 42eea5455..cc1c0fe8e 100644 --- a/mistral/tests/unit/engine/test_integrity_check.py +++ b/mistral/tests/unit/engine/test_integrity_check.py @@ -30,6 +30,8 @@ class IntegrityCheckTest(base.EngineTestCase): ) def test_task_execution_integrity(self): + self.override_config('execution_integrity_check_delay', 1, 'engine') + # The idea of the test is that we use the no-op asynchronous action # so that action and task execution state is not automatically set # to SUCCESS after we start the workflow. We'll update the action diff --git a/mistral/tests/unit/engine/test_join.py b/mistral/tests/unit/engine/test_join.py index 4b6cac7d2..314abf7cf 100644 --- a/mistral/tests/unit/engine/test_join.py +++ b/mistral/tests/unit/engine/test_join.py @@ -235,7 +235,6 @@ class JoinEngineTest(base.EngineTestCase): # uncertainty of its running in parallel with task3. self.await_task_success(task4.id) - self.assertEqual(states.RUNNING, wf_ex.state) self.assertEqual(states.SUCCESS, task1.state) self.assertEqual(states.SUCCESS, task2.state)