Simplify workflow and join completion logic

* action_queue module is replaced with the more generic
  post_tx_queue module that allows to register operations that must
  run after the main DB transaction associated with processing a
  workflow event such as completing action.
* Instead of calling workflow completion check from all places
  where task may possibly complete, Mistral now registers a post
  transactional operation that runs after the main DB transaction
  (to make sure at least one needed consistent DB read) right
  inside the task completion logic. It reduces clutter significantly.
* Workflow completion check is now registered only if the just
  completed task may lead to workflow completion, i.e. if it's the
  last one in a workflow branch.
* Join now checks delayed calls to reduce a number of join
  completion checks created with scheduler and also uses post
  transactional queue for that.

Closes-Bug: #1801872
Change-Id: I90741d4121c48c42606dfa850cfe824557b095d0
This commit is contained in:
Renat Akhmerov 2018-10-26 17:13:52 +07:00
parent 3d7acd3957
commit 80a1bed67b
16 changed files with 338 additions and 286 deletions

View File

@ -382,6 +382,10 @@ def get_delayed_calls(**kwargs):
return IMPL.get_delayed_calls(**kwargs)
def get_delayed_calls_count(**kwargs):
return IMPL.get_delayed_calls_count(**kwargs)
def delete_delayed_calls(**kwargs):
return IMPL.delete_delayed_calls(**kwargs)

View File

@ -284,6 +284,14 @@ def _get_collection(model, insecure=False, limit=None, marker=None,
return query.all()
def _get_count(model, insecure=False, **filters):
query = b.model_query(model) if insecure else _secure_query(model)
query = db_filters.apply_filters(query, model, **filters)
return query.count()
def _get_db_object_by_name(model, name, columns=()):
query = _secure_query(model, *columns)
@ -1134,6 +1142,11 @@ def get_delayed_calls(session=None, **kwargs):
return _get_collection(model=models.DelayedCall, **kwargs)
@b.session_aware()
def get_delayed_calls_count(session=None, **kwargs):
return _get_count(model=models.DelayedCall, **kwargs)
@b.session_aware()
def delete_delayed_calls(session=None, **kwargs):
return _delete_all(models.DelayedCall, **kwargs)

View File

@ -1,133 +0,0 @@
# Copyright 2016 - Nokia Networks.
# Copyright 2016 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
import functools
from oslo_config import cfg
from mistral import context
from mistral.executors import base as exe
from mistral.rpc import clients as rpc
from mistral import utils
_THREAD_LOCAL_NAME = "__action_queue_thread_local"
# Action queue operations.
_RUN_ACTION = "run_action"
_ON_ACTION_COMPLETE = "on_action_complete"
def _prepare():
utils.set_thread_local(_THREAD_LOCAL_NAME, list())
def _clear():
utils.set_thread_local(_THREAD_LOCAL_NAME, None)
def _get_queue():
queue = utils.get_thread_local(_THREAD_LOCAL_NAME)
if queue is None:
raise RuntimeError(
'Action queue is not initialized for the current thread.'
' Most likely some transactional method is not decorated'
' with action_queue.process()'
)
return queue
def _process_queue(queue):
executor = exe.get_executor(cfg.CONF.executor.type)
for operation, args in queue:
if operation == _RUN_ACTION:
action_ex, action_def, target, execution_context, timeout = args
executor.run_action(
action_ex.id,
action_def.action_class,
action_def.attributes or {},
action_ex.input,
action_ex.runtime_context.get('safe_rerun', False),
execution_context,
target=target,
timeout=timeout
)
elif operation == _ON_ACTION_COMPLETE:
action_ex_id, result, wf_action = args
rpc.get_engine_client().on_action_complete(
action_ex_id,
result,
wf_action
)
def process(func):
"""Decorator that processes (runs) all actions in the action queue.
Various engine methods may cause new actions to be scheduled. All
such methods must be decorated with this decorator. It makes sure
to run all the actions in the queue and clean up the queue.
"""
@functools.wraps(func)
def decorate(*args, **kw):
_prepare()
try:
res = func(*args, **kw)
queue = _get_queue()
auth_ctx = context.ctx() if context.has_ctx() else None
# NOTE(rakhmerov): Since we make RPC calls to the engine itself
# we need to process the action queue asynchronously in a new
# thread. Otherwise, if we have one engine process the engine
# may send a request to itself while already processing
# another one. In conjunction with blocking RPC it will lead
# to a deadlock (and RPC timeout).
def _within_new_thread():
old_auth_ctx = context.ctx() if context.has_ctx() else None
context.set_ctx(auth_ctx)
try:
_process_queue(queue)
finally:
context.set_ctx(old_auth_ctx)
eventlet.spawn(_within_new_thread)
finally:
_clear()
return res
return decorate
def schedule_run_action(action_ex, action_def, target, execution_context,
timeout):
args = (action_ex, action_def, target, execution_context, timeout)
_get_queue().append((_RUN_ACTION, args))
def schedule_on_action_complete(action_ex_id, result, wf_action=False):
_get_queue().append(
(_ON_ACTION_COMPLETE, (action_ex_id, result, wf_action))
)

View File

@ -21,7 +21,7 @@ from osprofiler import profiler
import six
from mistral.db.v2 import api as db_api
from mistral.engine import action_queue
from mistral.engine import post_tx_queue
from mistral.engine import utils as engine_utils
from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions as exc
@ -254,13 +254,23 @@ class PythonAction(Action):
execution_context = self._prepare_execution_context()
action_queue.schedule_run_action(
self.action_ex,
self.action_def,
target,
execution_context,
timeout=timeout
)
# Register an asynchronous command to send the action to
# run on an executor outside of the main DB transaction.
def _run_action():
executor = exe.get_executor(cfg.CONF.executor.type)
executor.run_action(
self.action_ex.id,
self.action_def.action_class,
self.action_def.attributes or {},
self.action_ex.input,
self.action_ex.runtime_context.get('safe_rerun', False),
execution_context,
target=target,
timeout=timeout
)
post_tx_queue.register_operation(_run_action)
@profiler.trace('action-run', hide_args=True)
def run(self, input_dict, target, index=0, desc='', save=True,

View File

@ -23,8 +23,8 @@ from mistral.db import utils as db_utils
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import action_handler
from mistral.engine import action_queue
from mistral.engine import base
from mistral.engine import post_tx_queue
from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions
from mistral import utils as u
@ -40,7 +40,7 @@ LOG = logging.getLogger(__name__)
class DefaultEngine(base.Engine):
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
@profiler.trace('engine-start-workflow', hide_args=True)
def start_workflow(self, wf_identifier, wf_namespace='', wf_ex_id=None,
wf_input=None, description='', **params):
@ -79,7 +79,7 @@ class DefaultEngine(base.Engine):
return wf_ex.get_clone()
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
def start_action(self, action_name, action_input,
description=None, **params):
with db_api.transaction():
@ -134,7 +134,7 @@ class DefaultEngine(base.Engine):
return db_api.create_action_execution(values)
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
@profiler.trace('engine-on-action-complete', hide_args=True)
def on_action_complete(self, action_ex_id, result, wf_action=False,
async_=False):
@ -146,26 +146,10 @@ class DefaultEngine(base.Engine):
action_handler.on_action_complete(action_ex, result)
result = action_ex.get_clone()
# Need to see if checking workflow completion makes sense.
wf_ex_id = None
if (action_ex.task_execution_id
and states.is_completed(action_ex.task_execution.state)):
wf_ex_id = action_ex.task_execution.workflow_execution_id
# Note: We must do this check in a new transaction to make sure
# that at least one of the parallel transactions will do a consistent
# read from the DB.
if wf_ex_id:
with db_api.transaction():
wf_handler.check_and_complete(wf_ex_id)
return result
return action_ex.get_clone()
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
@profiler.trace('engine-on-action-update', hide_args=True)
def on_action_update(self, action_ex_id, state, wf_action=False,
async_=False):
@ -177,25 +161,10 @@ class DefaultEngine(base.Engine):
action_handler.on_action_update(action_ex, state)
result = action_ex.get_clone()
wf_ex_id = None
if (action_ex.task_execution_id
and states.is_completed(action_ex.task_execution.state)):
wf_ex_id = action_ex.task_execution.workflow_execution_id
# Note: We must do this check in a new transaction to make sure
# that at least one of the parallel transactions will do a consistent
# read from the DB.
if wf_ex_id:
with db_api.transaction():
wf_handler.check_and_complete(wf_ex_id)
return result
return action_ex.get_clone()
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
def pause_workflow(self, wf_ex_id):
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)
@ -205,7 +174,7 @@ class DefaultEngine(base.Engine):
return wf_ex.get_clone()
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
def rerun_workflow(self, task_ex_id, reset=True, env=None):
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id)
@ -217,7 +186,7 @@ class DefaultEngine(base.Engine):
return wf_ex.get_clone()
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
def resume_workflow(self, wf_ex_id, env=None):
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)
@ -227,7 +196,7 @@ class DefaultEngine(base.Engine):
return wf_ex.get_clone()
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
def stop_workflow(self, wf_ex_id, state, message=None):
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)
@ -241,10 +210,11 @@ class DefaultEngine(base.Engine):
raise NotImplementedError
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
def report_running_actions(self, action_ex_ids):
with db_api.transaction():
now = u.utc_now_sec()
for exec_id in action_ex_ids:
try:
db_api.update_action_execution(

View File

@ -15,8 +15,8 @@
from mistral.db import utils as db_utils
from mistral.db.v2 import api as db_api
from mistral.engine import action_queue
from mistral.engine import base
from mistral.engine import post_tx_queue
from mistral.engine import workflow_handler as wf_handler
from mistral import expressions
from mistral.services import scheduler
@ -511,7 +511,7 @@ class ConcurrencyPolicy(base.TaskPolicy):
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
def _continue_task(task_ex_id):
from mistral.engine import task_handler
@ -520,38 +520,25 @@ def _continue_task(task_ex_id):
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
def _complete_task(task_ex_id, state, state_info):
from mistral.engine import task_handler
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id)
wf_ex_id = task_ex.workflow_execution_id
task_handler.complete_task(task_ex, state, state_info)
with db_api.transaction():
wf_handler.check_and_complete(wf_ex_id)
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
def _fail_task_if_incomplete(task_ex_id, timeout):
from mistral.engine import task_handler
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id)
wf_ex_id = None
if not states.is_completed(task_ex.state):
msg = 'Task timed out [timeout(s)=%s].' % timeout
wf_ex_id = task_ex.workflow_execution_id
task_handler.complete_task(task_ex, states.ERROR, msg)
if wf_ex_id:
with db_api.transaction():
wf_handler.check_and_complete(wf_ex_id)

View File

@ -0,0 +1,131 @@
# Copyright 2015 - Mirantis, Inc.
# Copyright 2016 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
import functools
from oslo_log import log as logging
from mistral import context
from mistral.db import utils as db_utils
from mistral.db.v2 import api as db_api
from mistral import utils
"""
This module contains a mini framework for scheduling operations while
performing transactional processing of a workflow event such as
completing a workflow action. The scheduled operations will run after
the main DB transaction, in a new transaction, if needed.
"""
LOG = logging.getLogger(__name__)
_THREAD_LOCAL_NAME = "__operation_queue_thread_local"
def _prepare():
# Register two queues: transactional and non transactional operations.
utils.set_thread_local(_THREAD_LOCAL_NAME, (list(), list()))
def _clear():
utils.set_thread_local(_THREAD_LOCAL_NAME, None)
def register_operation(func, args=None, in_tx=False):
"""Register an operation."""
_get_queues()[0 if in_tx else 1].append((func, args or []))
def _get_queues():
queues = utils.get_thread_local(_THREAD_LOCAL_NAME)
if queues is None:
raise RuntimeError(
'Operation queue is not initialized for the current thread.'
' Most likely some engine method is not decorated with'
' operation_queue.run()'
)
return queues
def run(func):
"""Decorator that runs all operations registered in the operation queue.
Various engine methods may register such operations. All such methods must
be decorated with this decorator.
"""
@functools.wraps(func)
def decorate(*args, **kw):
_prepare()
try:
res = func(*args, **kw)
queues = _get_queues()
tx_queue = queues[0]
non_tx_queue = queues[1]
if not tx_queue and not non_tx_queue:
return res
auth_ctx = context.ctx() if context.has_ctx() else None
def _within_new_thread():
old_auth_ctx = context.ctx() if context.has_ctx() else None
context.set_ctx(auth_ctx)
try:
if tx_queue:
_process_tx_queue(tx_queue)
if non_tx_queue:
_process_non_tx_queue(non_tx_queue)
finally:
context.set_ctx(old_auth_ctx)
eventlet.spawn(_within_new_thread)
finally:
_clear()
return res
return decorate
@db_utils.retry_on_db_error
@run
def _process_tx_queue(queue):
with db_api.transaction():
for func, args in queue:
try:
func(*args)
except Exception:
LOG.exception("Failed to run transactional engine operation.")
raise
def _process_non_tx_queue(queue):
for func, args in queue:
try:
func(*args)
except Exception:
LOG.exception("Failed to run non-transactional engine operation.")

View File

@ -22,7 +22,7 @@ import traceback as tb
from mistral.db import utils as db_utils
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral.engine import action_queue
from mistral.engine import post_tx_queue
from mistral.engine import tasks
from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions as exc
@ -274,6 +274,7 @@ def complete_task(task_ex, state, state_info):
_check_affected_tasks(task)
@profiler.trace('task-handler-check-affected-tasks', hide_args=True)
def _check_affected_tasks(task):
if not task.is_completed():
return
@ -295,8 +296,28 @@ def _check_affected_tasks(task):
task_ex.name
)
def _schedule_if_needed(t_ex_id):
# NOTE(rakhmerov): we need to minimize the number of delayed calls
# that refresh state of "join" tasks. We'll check if corresponding
# calls are already scheduled. Note that we must ignore delayed calls
# that are currently being processed because of a possible race with
# the transaction that deletes delayed calls, i.e. the call may still
# exist in DB (the deleting transaction didn't commit yet) but it has
# already been processed and the task state hasn't changed.
cnt = db_api.get_delayed_calls_count(
key=_get_refresh_state_job_key(t_ex_id),
processing=False
)
if cnt == 0:
_schedule_refresh_task_state(t_ex_id)
for t_ex in affected_task_execs:
_schedule_refresh_task_state(t_ex)
post_tx_queue.register_operation(
_schedule_if_needed,
args=[t_ex.id],
in_tx=True
)
def _build_task_from_execution(wf_spec, task_ex):
@ -364,7 +385,7 @@ def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None,
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
@profiler.trace('task-handler-refresh-task-state', hide_args=True)
def _refresh_task_state(task_ex_id):
with db_api.transaction():
@ -373,6 +394,10 @@ def _refresh_task_state(task_ex_id):
if not task_ex:
return
if (states.is_completed(task_ex.state)
or task_ex.state == states.RUNNING):
return
wf_ex = task_ex.workflow_execution
if states.is_completed(wf_ex.state):
@ -384,46 +409,35 @@ def _refresh_task_state(task_ex_id):
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
with db_api.named_lock(task_ex.id):
db_api.refresh(task_ex)
log_state = wf_ctrl.get_logical_task_state(task_ex)
if (states.is_completed(task_ex.state)
or task_ex.state == states.RUNNING):
return
state = log_state.state
state_info = log_state.state_info
log_state = wf_ctrl.get_logical_task_state(task_ex)
# Update 'triggered_by' because it could have changed.
task_ex.runtime_context['triggered_by'] = log_state.triggered_by
state = log_state.state
state_info = log_state.state_info
# Update 'triggered_by' because it could have changed.
task_ex.runtime_context['triggered_by'] = log_state.triggered_by
if state == states.RUNNING:
continue_task(task_ex)
elif state == states.ERROR:
complete_task(task_ex, state, state_info)
elif state == states.WAITING:
LOG.info(
"Task execution is still in WAITING state"
" [task_ex_id=%s, task_name=%s]",
task_ex_id,
task_ex.name
)
else:
# Must never get here.
raise RuntimeError(
'Unexpected logical task state [task_ex_id=%s, '
'task_name=%s, state=%s]' %
(task_ex_id, task_ex.name, state)
)
if states.is_completed(task_ex.state):
with db_api.transaction():
wf_handler.check_and_complete(wf_ex.id)
if state == states.RUNNING:
continue_task(task_ex)
elif state == states.ERROR:
complete_task(task_ex, state, state_info)
elif state == states.WAITING:
LOG.info(
"Task execution is still in WAITING state"
" [task_ex_id=%s, task_name=%s]",
task_ex_id,
task_ex.name
)
else:
# Must never get here.
raise RuntimeError(
'Unexpected logical task state [task_ex_id=%s, '
'task_name=%s, state=%s]' %
(task_ex_id, task_ex.name, state)
)
def _schedule_refresh_task_state(task_ex, delay=0):
def _schedule_refresh_task_state(task_ex_id, delay=0):
"""Schedules task preconditions check.
This method provides transactional decoupling of task preconditions
@ -436,17 +450,17 @@ def _schedule_refresh_task_state(task_ex, delay=0):
we'll have in this case (time between transactions) whereas scheduler
is a special component that is designed to be resistant to failures.
:param task_ex: Task execution.
:param task_ex_id: Task execution ID.
:param delay: Delay.
"""
key = _get_refresh_state_job_key(task_ex.id)
key = _get_refresh_state_job_key(task_ex_id)
scheduler.schedule_call(
None,
_REFRESH_TASK_STATE_PATH,
delay,
key=key,
task_ex_id=task_ex.id
task_ex_id=task_ex_id
)
@ -455,7 +469,7 @@ def _get_refresh_state_job_key(task_ex_id):
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
def _scheduled_on_action_complete(action_ex_id, wf_action):
with db_api.transaction():
if wf_action:
@ -465,15 +479,6 @@ def _scheduled_on_action_complete(action_ex_id, wf_action):
_on_action_complete(action_ex)
wf_ex_id = None
if states.is_completed(action_ex.task_execution.state):
wf_ex_id = action_ex.task_execution.workflow_execution_id
if wf_ex_id:
with db_api.transaction():
wf_handler.check_and_complete(wf_ex_id)
def schedule_on_action_complete(action_ex, delay=0):
"""Schedules task completion check.
@ -510,7 +515,7 @@ def schedule_on_action_complete(action_ex, delay=0):
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
def _scheduled_on_action_update(action_ex_id, wf_action):
with db_api.transaction():
if wf_action:
@ -520,15 +525,6 @@ def _scheduled_on_action_update(action_ex_id, wf_action):
_on_action_update(action_ex)
wf_ex_id = None
if states.is_completed(action_ex.task_execution.state):
wf_ex_id = action_ex.task_execution.workflow_execution_id
if wf_ex_id:
with db_api.transaction():
wf_handler.check_and_complete(wf_ex_id)
def schedule_on_action_update(action_ex, delay=0):
"""Schedules task update check.

View File

@ -26,6 +26,8 @@ from mistral.db.v2 import api as db_api
from mistral.engine import actions
from mistral.engine import dispatcher
from mistral.engine import policies
from mistral.engine import post_tx_queue
from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.notifiers import base as notif
@ -118,6 +120,23 @@ class Task(object):
This method puts task to a waiting state.
"""
# NOTE(rakhmerov): using named locks may cause problems under load
# with MySQL that raises a lot of deadlocks in case of high
# parallelism so it makes sense to do a fast check if the object
# already exists in DB outside of the lock.
if not self.task_ex:
t_execs = db_api.get_task_executions(
workflow_execution_id=self.wf_ex.id,
unique_key=self.unique_key,
state=states.WAITING
)
self.task_ex = t_execs[0] if t_execs else None
if self.task_ex:
return
with db_api.named_lock(self.unique_key):
if not self.task_ex:
t_execs = db_api.get_task_executions(
@ -249,11 +268,25 @@ class Task(object):
# upon its completion.
self.task_ex.processed = True
self.register_workflow_completion_check()
# Publish task event.
self.notify(old_task_state, self.task_ex.state)
dispatcher.dispatch_workflow_commands(self.wf_ex, cmds)
def register_workflow_completion_check(self):
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
# Register an asynchronous command to check workflow completion
# in a separate transaction if the task may potentially lead to
# workflow completion.
def _check():
wf_handler.check_and_complete(self.wf_ex.id)
if wf_ctrl.may_complete_workflow(self.task_ex):
post_tx_queue.register_operation(_check, in_tx=True)
@profiler.trace('task-update')
def update(self, state, state_info=None):
"""Update task and set specified state.
@ -290,6 +323,9 @@ class Task(object):
self.set_state(state, state_info)
if states.is_completed(self.task_ex.state):
self.register_workflow_completion_check()
# Publish event.
self.notify(old_task_state, self.task_ex.state)

View File

@ -20,6 +20,7 @@ from osprofiler import profiler
import traceback as tb
from mistral.db.v2 import api as db_api
from mistral.engine import post_tx_queue
from mistral.engine import workflows
from mistral import exceptions as exc
from mistral.services import scheduler
@ -108,6 +109,7 @@ def check_and_complete(wf_ex_id):
force_fail_workflow(wf.wf_ex, msg)
@post_tx_queue.run
@profiler.trace('workflow-handler-check-and-fix-integrity')
def _check_and_fix_integrity(wf_ex_id):
check_after_seconds = CONF.engine.execution_integrity_check_delay
@ -125,15 +127,13 @@ def _check_and_fix_integrity(wf_ex_id):
if states.is_completed(wf_ex.state):
return
_schedule_check_and_fix_integrity(wf_ex, delay=60)
_schedule_check_and_fix_integrity(wf_ex, delay=120)
running_task_execs = db_api.get_task_executions(
workflow_execution_id=wf_ex.id,
state=states.RUNNING
)
any_completed = False
for t_ex in running_task_execs:
# The idea is that we take the latest known timestamp of the task
# execution and consider it eligible for checking and fixing only
@ -182,13 +182,6 @@ def _check_and_fix_integrity(wf_ex_id):
child_executions[-1]
)
if states.is_completed(t_ex.state):
any_completed = True
if any_completed:
with db_api.transaction():
check_and_complete(wf_ex_id)
def pause_workflow(wf_ex, msg=None):
# Pause subworkflows first.
@ -272,6 +265,11 @@ def _schedule_check_and_fix_integrity(wf_ex, delay=0):
:param wf_ex: Workflow execution.
:param delay: Minimum amount of time before the check should be made.
"""
if CONF.engine.execution_integrity_check_delay < 0:
# Never check integrity if it's a negative value.
return
key = _get_integrity_check_key(wf_ex)
scheduler.schedule_call(

View File

@ -23,14 +23,15 @@ import six
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import action_queue
from mistral.engine import dispatcher
from mistral.engine import post_tx_queue
from mistral.engine import utils as engine_utils
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.lang import parser as spec_parser
from mistral.notifiers import base as notif
from mistral.notifiers import notification_events as events
from mistral.rpc import clients as rpc
from mistral.services import triggers
from mistral.services import workflows as wf_service
from mistral import utils
@ -374,7 +375,7 @@ class Workflow(object):
if wf_ex is None:
# Do nothing because the state was updated previously.
return
return False
self.wf_ex = wf_ex
self.wf_ex.state_info = json.dumps(state_info) \
@ -406,6 +407,8 @@ class Workflow(object):
triggers.on_workflow_complete(self.wf_ex)
return True
@profiler.trace('workflow-check-and-complete')
def check_and_complete(self):
"""Completes the workflow if it needs to be completed.
@ -457,14 +460,17 @@ class Workflow(object):
return 0
def _succeed_workflow(self, final_context, msg=None):
self.wf_ex.output = data_flow.evaluate_workflow_output(
output = data_flow.evaluate_workflow_output(
self.wf_ex,
self.wf_spec.get_output(),
final_context
)
# Set workflow execution to success until after output is evaluated.
self.set_state(states.SUCCESS, msg)
# Set workflow execution to success after output is evaluated.
if not self.set_state(states.SUCCESS, msg):
return
self.wf_ex.output = output
# Publish event.
self.notify(events.WORKFLOW_SUCCEEDED)
@ -492,7 +498,8 @@ class Workflow(object):
)
LOG.error(msg)
self.set_state(states.ERROR, state_info=msg)
if not self.set_state(states.ERROR, state_info=msg):
return
# When we set an ERROR state we should safely set output value getting
# w/o exceptions due to field size limitations.
@ -524,7 +531,8 @@ class Workflow(object):
if states.is_completed(self.wf_ex.state):
return
self.set_state(states.CANCELLED, state_info=msg)
if not self.set_state(states.CANCELLED, state_info=msg):
return
# When we set an ERROR state we should safely set output value getting
# w/o exceptions due to field size limitations.
@ -564,11 +572,16 @@ class Workflow(object):
" if a workflow is not in SUCCESS, ERROR or CANCELLED state."
)
action_queue.schedule_on_action_complete(
self.wf_ex.id,
result,
wf_action=True
)
# Register a command executed in a separate thread to send the result
# to the parent workflow outside of the main DB transaction.
def _send_result():
rpc.get_engine_client().on_action_complete(
self.wf_ex.id,
result,
wf_action=True
)
post_tx_queue.register_operation(_send_result)
def _get_environment(params):

View File

@ -17,7 +17,7 @@ import datetime
from mistral.db import utils as db_utils
from mistral.db.v2 import api as db_api
from mistral.engine import action_handler
from mistral.engine import action_queue
from mistral.engine import post_tx_queue
from mistral.services import scheduler
from mistral import utils
from mistral_lib import actions as mistral_lib
@ -30,7 +30,7 @@ SCHEDULER_KEY = 'handle_expired_actions_key'
@db_utils.retry_on_db_error
@action_queue.process
@post_tx_queue.run
def handle_expired_actions():
LOG.debug("Running heartbeat checker...")

View File

@ -532,6 +532,11 @@ class DefaultEngineTest(base.DbTestCase):
ml_actions.Result(data='Hi')
)
self._await(
lambda:
db_api.get_workflow_execution(wf_ex.id).state == states.SUCCESS
)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -229,6 +229,11 @@ class WorkflowController(object):
"""
raise NotImplementedError
def may_complete_workflow(self, task_ex):
"""Determines if the task execution may lead to workflow completion."""
return states.is_completed(task_ex.state)
@abc.abstractmethod
def _find_next_commands(self, task_ex):
"""Finds commands that should run next.

View File

@ -236,6 +236,13 @@ class DirectWorkflowController(base.WorkflowController):
filter(is_end_task, batch)
)
def may_complete_workflow(self, task_ex):
res = super(DirectWorkflowController, self).may_complete_workflow(
task_ex
)
return res and not self._has_outbound_tasks(task_ex)
def _has_outbound_tasks(self, task_ex):
# In order to determine if there are outbound tasks we just need
# to calculate next task names (based on task outbound context)

View File

@ -0,0 +1,10 @@
---
fixes:
- |
Workflow and join completion check logic is now simplified with using
post transactional queue of operations which is a more generic version of
action_queue module previously serving for scheduling action runs outside
of the main DB transaction. Workflow completion check is now registered
only once when a task completes which reduces clutter and it's registered
only if the task may potentially lead to workflow completion.