Improve workflow completion logic by removing periodic jobs

* Workflow completion algorithm use periodic scheduled jobs to
  poll DB and determine when a workflow is finished. The problem
  with this approach is that if Mistral runs another iteration
  of such job too soon then running such jobs will create a big
  load on the system. If too late, then a workflow may be in
  RUNNING state for too long after all its tasks are completed.
  The current implementation tries to predict a delay with which
  the next job should run, based on a number of incompleted tasks.
  This approach was initially taken because we switched to a
  non-blocking transactional model (previously we locked the entire
  workflow execution graph in order to change a state of anything)
  and in this architecture, when we have parallel branches, i.e.
  parallel DB transactions, we can't make a consistent read from
  DB from neither of these transactions to make a reliable decision
  about whether the workflow is completed or not. Using periodic
  jobs was a solution. However, this approach has been proven to
  work unreliably because such a prediction about delay before the
  next job iteration doesn't work well on all variety of use cases
  that we have.
  This patch removes using periodic jobs in favor of using the
  "two transactions" approach when in the first transaction we
  handle action completion event (and task completion if it causes
  it) and in the second transaction, if a task is completed, we
  check if the workflow is completed. This approach guarantees
  that at least one of the "second" transactions in parallel
  branches will make needed consistent read from DB (i.e. will
  see the actuall state of all needed objects) to make the right
  decision.

Closes-Bug: #1799382
Change-Id: I2333507503b3b8226c184beb0bd783e1dcfa397f
This commit is contained in:
Renat Akhmerov 2018-10-04 11:50:03 +07:00
parent ec3d14112c
commit 3d7acd3957
9 changed files with 182 additions and 120 deletions

View File

@ -64,6 +64,9 @@ class DefaultEngine(base.Engine):
params
)
# Checking a case when all tasks are completed immediately.
wf_handler.check_and_complete(wf_ex.id)
return wf_ex.get_clone()
except exceptions.DBDuplicateEntryError:
@ -143,7 +146,23 @@ class DefaultEngine(base.Engine):
action_handler.on_action_complete(action_ex, result)
return action_ex.get_clone()
result = action_ex.get_clone()
# Need to see if checking workflow completion makes sense.
wf_ex_id = None
if (action_ex.task_execution_id
and states.is_completed(action_ex.task_execution.state)):
wf_ex_id = action_ex.task_execution.workflow_execution_id
# Note: We must do this check in a new transaction to make sure
# that at least one of the parallel transactions will do a consistent
# read from the DB.
if wf_ex_id:
with db_api.transaction():
wf_handler.check_and_complete(wf_ex_id)
return result
@db_utils.retry_on_db_error
@action_queue.process
@ -158,7 +177,22 @@ class DefaultEngine(base.Engine):
action_handler.on_action_update(action_ex, state)
return action_ex.get_clone()
result = action_ex.get_clone()
wf_ex_id = None
if (action_ex.task_execution_id
and states.is_completed(action_ex.task_execution.state)):
wf_ex_id = action_ex.task_execution.workflow_execution_id
# Note: We must do this check in a new transaction to make sure
# that at least one of the parallel transactions will do a consistent
# read from the DB.
if wf_ex_id:
with db_api.transaction():
wf_handler.check_and_complete(wf_ex_id)
return result
@db_utils.retry_on_db_error
@action_queue.process

View File

@ -525,11 +525,14 @@ def _complete_task(task_ex_id, state, state_info):
from mistral.engine import task_handler
with db_api.transaction():
task_handler.complete_task(
db_api.get_task_execution(task_ex_id),
state,
state_info
)
task_ex = db_api.get_task_execution(task_ex_id)
wf_ex_id = task_ex.workflow_execution_id
task_handler.complete_task(task_ex, state, state_info)
with db_api.transaction():
wf_handler.check_and_complete(wf_ex_id)
@db_utils.retry_on_db_error
@ -540,11 +543,15 @@ def _fail_task_if_incomplete(task_ex_id, timeout):
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id)
wf_ex_id = None
if not states.is_completed(task_ex.state):
msg = 'Task timed out [timeout(s)=%s].' % timeout
task_handler.complete_task(
db_api.get_task_execution(task_ex_id),
states.ERROR,
msg
)
wf_ex_id = task_ex.workflow_execution_id
task_handler.complete_task(task_ex, states.ERROR, msg)
if wf_ex_id:
with db_api.transaction():
wf_handler.check_and_complete(wf_ex_id)

View File

@ -418,6 +418,10 @@ def _refresh_task_state(task_ex_id):
(task_ex_id, task_ex.name, state)
)
if states.is_completed(task_ex.state):
with db_api.transaction():
wf_handler.check_and_complete(wf_ex.id)
def _schedule_refresh_task_state(task_ex, delay=0):
"""Schedules task preconditions check.
@ -461,6 +465,15 @@ def _scheduled_on_action_complete(action_ex_id, wf_action):
_on_action_complete(action_ex)
wf_ex_id = None
if states.is_completed(action_ex.task_execution.state):
wf_ex_id = action_ex.task_execution.workflow_execution_id
if wf_ex_id:
with db_api.transaction():
wf_handler.check_and_complete(wf_ex_id)
def schedule_on_action_complete(action_ex, delay=0):
"""Schedules task completion check.
@ -507,6 +520,15 @@ def _scheduled_on_action_update(action_ex_id, wf_action):
_on_action_update(action_ex)
wf_ex_id = None
if states.is_completed(action_ex.task_execution.state):
wf_ex_id = action_ex.task_execution.workflow_execution_id
if wf_ex_id:
with db_api.transaction():
wf_handler.check_and_complete(wf_ex_id)
def schedule_on_action_update(action_ex, delay=0):
"""Schedules task update check.

View File

@ -19,9 +19,7 @@ from oslo_utils import timeutils
from osprofiler import profiler
import traceback as tb
from mistral.db import utils as db_utils
from mistral.db.v2 import api as db_api
from mistral.engine import action_queue
from mistral.engine import workflows
from mistral import exceptions as exc
from mistral.services import scheduler
@ -31,8 +29,8 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
_CHECK_AND_COMPLETE_PATH = (
'mistral.engine.workflow_handler._check_and_complete'
_CHECK_AND_FIX_INTEGRITY_PATH = (
'mistral.engine.workflow_handler._check_and_fix_integrity'
)
@ -54,7 +52,7 @@ def start_workflow(wf_identifier, wf_namespace, wf_ex_id, wf_input, desc,
params=params
)
_schedule_check_and_complete(wf.wf_ex)
_schedule_check_and_fix_integrity(wf.wf_ex, delay=10)
return wf.wf_ex
@ -88,54 +86,30 @@ def cancel_workflow(wf_ex, msg=None):
stop_workflow(wf_ex, states.CANCELLED, msg)
@db_utils.retry_on_db_error
@action_queue.process
@profiler.trace('workflow-handler-check-and-complete', hide_args=True)
def _check_and_complete(wf_ex_id):
# Note: This method can only be called via scheduler.
with db_api.transaction():
wf_ex = db_api.load_workflow_execution(wf_ex_id)
def check_and_complete(wf_ex_id):
wf_ex = db_api.load_workflow_execution(wf_ex_id)
if not wf_ex or states.is_completed(wf_ex.state):
return
if not wf_ex or states.is_completed(wf_ex.state):
return
wf = workflows.Workflow(wf_ex=wf_ex)
wf = workflows.Workflow(wf_ex=wf_ex)
try:
check_and_fix_integrity(wf_ex)
try:
wf.check_and_complete()
except exc.MistralException as e:
msg = (
"Failed to check and complete [wf_ex_id=%s, wf_name=%s]:"
" %s\n%s" % (wf_ex_id, wf_ex.name, e, tb.format_exc())
)
num_incomplete_tasks = wf.check_and_complete()
LOG.error(msg)
if not states.is_completed(wf_ex.state):
delay = (
2 + int(num_incomplete_tasks * 0.1) if num_incomplete_tasks
else 4
)
# Rescheduling this check may not happen if errors are
# raised in the business logic. If the error is DB related
# and not considered fatal (e.g. disconnect, deadlock), the
# retry annotation around the method will ensure that the
# whole method is retried in a new transaction. On fatal
# errors, the check should not be rescheduled as it could
# result in undesired consequences.
# In case there are some errors that should not be
# considered fatal, those should be handled explicitly.
_schedule_check_and_complete(wf_ex, delay)
except exc.MistralException as e:
msg = (
"Failed to check and complete [wf_ex_id=%s, wf_name=%s]:"
" %s\n%s" % (wf_ex_id, wf_ex.name, e, tb.format_exc())
)
LOG.error(msg)
force_fail_workflow(wf.wf_ex, msg)
force_fail_workflow(wf.wf_ex, msg)
@profiler.trace('workflow-handler-check-and-fix-integrity')
def check_and_fix_integrity(wf_ex):
def _check_and_fix_integrity(wf_ex_id):
check_after_seconds = CONF.engine.execution_integrity_check_delay
if check_after_seconds < 0:
@ -145,55 +119,75 @@ def check_and_fix_integrity(wf_ex):
# To break cyclic dependency.
from mistral.engine import task_handler
running_task_execs = db_api.get_task_executions(
workflow_execution_id=wf_ex.id,
state=states.RUNNING
)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)
for t_ex in running_task_execs:
# The idea is that we take the latest known timestamp of the task
# execution and consider it eligible for checking and fixing only
# if some minimum period of time elapsed since the last update.
timestamp = t_ex.updated_at or t_ex.created_at
if states.is_completed(wf_ex.state):
return
delta = timeutils.delta_seconds(timestamp, timeutils.utcnow())
_schedule_check_and_fix_integrity(wf_ex, delay=60)
if delta < check_after_seconds:
continue
child_executions = t_ex.executions
if not child_executions:
continue
all_finished = all(
[states.is_completed(c_ex.state) for c_ex in child_executions]
running_task_execs = db_api.get_task_executions(
workflow_execution_id=wf_ex.id,
state=states.RUNNING
)
if all_finished:
# Find the timestamp of the most recently finished child.
most_recent_child_timestamp = max(
[c_ex.updated_at or c_ex.created_at for c_ex in
child_executions]
)
interval = timeutils.delta_seconds(
most_recent_child_timestamp,
timeutils.utcnow()
any_completed = False
for t_ex in running_task_execs:
# The idea is that we take the latest known timestamp of the task
# execution and consider it eligible for checking and fixing only
# if some minimum period of time elapsed since the last update.
timestamp = t_ex.updated_at or t_ex.created_at
delta = timeutils.delta_seconds(timestamp, timeutils.utcnow())
if delta < check_after_seconds:
continue
child_executions = t_ex.executions
if not child_executions:
continue
all_finished = all(
[states.is_completed(c_ex.state) for c_ex in child_executions]
)
if interval > check_after_seconds:
# We found a task execution in RUNNING state for which all
# child executions are finished. We need to call
# "schedule_on_action_complete" on the task handler for any of
# the child executions so that the task state is calculated and
# updated properly.
LOG.warning(
"Found a task execution that is likely stuck in RUNNING"
" state because all child executions are finished,"
" will try to recover [task_execution=%s]", t_ex.id
if all_finished:
# Find the timestamp of the most recently finished child.
most_recent_child_timestamp = max(
[c_ex.updated_at or c_ex.created_at for c_ex in
child_executions]
)
interval = timeutils.delta_seconds(
most_recent_child_timestamp,
timeutils.utcnow()
)
task_handler.schedule_on_action_complete(child_executions[-1])
if interval > check_after_seconds:
# We found a task execution in RUNNING state for which all
# child executions are finished. We need to call
# "schedule_on_action_complete" on the task handler for
# any of the child executions so that the task state is
# calculated and updated properly.
LOG.warning(
"Found a task execution that is likely stuck in"
" RUNNING state because all child executions are"
" finished, will try to recover [task_execution=%s]",
t_ex.id
)
task_handler.schedule_on_action_complete(
child_executions[-1]
)
if states.is_completed(t_ex.state):
any_completed = True
if any_completed:
with db_api.transaction():
check_and_complete(wf_ex_id)
def pause_workflow(wf_ex, msg=None):
@ -222,6 +216,17 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None):
wf.rerun(task_ex, reset=reset, env=env)
_schedule_check_and_fix_integrity(
wf_ex,
delay=CONF.engine.execution_integrity_check_delay
)
if wf_ex.task_execution_id:
_schedule_check_and_fix_integrity(
wf_ex.task_execution.workflow_execution,
delay=CONF.engine.execution_integrity_check_delay
)
def resume_workflow(wf_ex, env=None):
if not states.is_paused_or_idle(wf_ex.state):
@ -256,31 +261,22 @@ def set_workflow_state(wf_ex, state, msg=None):
)
def _get_completion_check_key(wf_ex):
return 'wfh_on_c_a_c-%s' % wf_ex.id
def _get_integrity_check_key(wf_ex):
return 'wfh_c_a_f_i-%s' % wf_ex.id
@profiler.trace('workflow-handler-schedule-check-and-complete', hide_args=True)
def _schedule_check_and_complete(wf_ex, delay=0):
"""Schedules workflow completion check.
This method provides transactional decoupling of task completion from
workflow completion check. It's needed in non-locking model in order to
avoid 'phantom read' phenomena when reading state of multiple tasks
to see if a workflow is completed. Just starting a separate transaction
without using scheduler is not safe due to concurrency window that we'll
have in this case (time between transactions) whereas scheduler is a
special component that is designed to be resistant to failures.
def _schedule_check_and_fix_integrity(wf_ex, delay=0):
"""Schedules workflow integrity check.
:param wf_ex: Workflow execution.
:param delay: Minimum amount of time before task completion check
should be made.
:param delay: Minimum amount of time before the check should be made.
"""
key = _get_completion_check_key(wf_ex)
key = _get_integrity_check_key(wf_ex)
scheduler.schedule_call(
None,
_CHECK_AND_COMPLETE_PATH,
_CHECK_AND_FIX_INTEGRITY_PATH,
delay,
key=key,
wf_ex_id=wf_ex.id

View File

@ -257,7 +257,12 @@ class Workflow(object):
from mistral.engine import workflow_handler
self.set_state(states.RUNNING)
workflow_handler._schedule_check_and_complete(self.wf_ex)
# TODO(rakhmerov): We call a internal method of a module here.
# The simplest way is to make it public, however, I believe
# it's another "bad smell" that tells that some refactoring
# of the architecture should be made.
workflow_handler._schedule_check_and_fix_integrity(self.wf_ex)
if self.wf_ex.task_execution_id:
parent_task_ex = db_api.get_task_execution(

View File

@ -539,10 +539,7 @@ class DefaultEngineTest(base.DbTestCase):
task_execs = wf_ex.task_executions
# Workflow completion check is done separate with scheduler
# but scheduler doesn't start in this test (in fact, it's just
# a DB test)so the workflow is expected to be in running state.
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsInstance(task2_action_ex, models.ActionExecution)
self.assertEqual('std.echo', task2_action_ex.name)

View File

@ -753,7 +753,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
calls = db_api.get_delayed_calls()
mtd_name = 'mistral.engine.workflow_handler._check_and_complete'
mtd_name = 'mistral.engine.workflow_handler._check_and_fix_integrity'
self._assert_single_item(calls, target_method_name=mtd_name)
@ -780,7 +780,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
calls = db_api.get_delayed_calls()
mtd_name = 'mistral.engine.workflow_handler._check_and_complete'
mtd_name = 'mistral.engine.workflow_handler._check_and_fix_integrity'
self._assert_single_item(calls, target_method_name=mtd_name)

View File

@ -30,6 +30,8 @@ class IntegrityCheckTest(base.EngineTestCase):
)
def test_task_execution_integrity(self):
self.override_config('execution_integrity_check_delay', 1, 'engine')
# The idea of the test is that we use the no-op asynchronous action
# so that action and task execution state is not automatically set
# to SUCCESS after we start the workflow. We'll update the action

View File

@ -235,7 +235,6 @@ class JoinEngineTest(base.EngineTestCase):
# uncertainty of its running in parallel with task3.
self.await_task_success(task4.id)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(states.SUCCESS, task1.state)
self.assertEqual(states.SUCCESS, task2.state)