Merge "Fixing engine transaction model and error handling"
This commit is contained in:
commit
ea935a0f5f
28
AUTHORS
28
AUTHORS
|
@ -1,43 +1,71 @@
|
|||
Abhishek Chanda <abhishek@cloudscaling.com>
|
||||
Alexander Kuznetsov <akuznetsov@mirantis.com>
|
||||
Anastasia Kuznetsova <akuznetsova@mirantis.com>
|
||||
Andreas Jaeger <aj@suse.com>
|
||||
Angus Salkeld <angus.salkeld@rackspace.com>
|
||||
Ankita Wagh <ankita_wagh@symmactoolkit-c02lr80ufd57.symc.symantec.com>
|
||||
Antoine Musso <hashar@free.fr>
|
||||
Bertrand Lallau <bertrand.lallau@gmail.com>
|
||||
Bhaskar Duvvuri <dbasu84@gmail.com>
|
||||
Boris Pavlovic <boris@pavlovic.me>
|
||||
Bryan Havenstein <bryan.havenstein@ericsson.com>
|
||||
Chaozhe.Chen <chaozhe.chen@easystack.cn>
|
||||
Christian Berendt <berendt@b1-systems.de>
|
||||
Claudiu Belu <cbelu@cloudbasesolutions.com>
|
||||
Dan Prince <dprince@redhat.com>
|
||||
Daryl Mowrer <dmowrer@us.ibm.com>
|
||||
David C Kennedy <david.c.kennedy@hp.com>
|
||||
David Charles Kennedy <dkennedy@hp.com>
|
||||
Dawid Deja <dawid.deja@intel.com>
|
||||
Dmitri Zimine <dz@stackstorm.com>
|
||||
Doug Hellmann <doug@doughellmann.com>
|
||||
Ed Cranford <ed.cranford@rackspace.com>
|
||||
Gal Margalit <gal.margalit@alcatel-lucent.com>
|
||||
Guy Paz <guy.paz@alcatel-lucent.com>
|
||||
Jeremy Stanley <fungi@yuggoth.org>
|
||||
Jiri Tomasek <jtomasek@redhat.com>
|
||||
Kevin Pouget <kpouget@altair.com>
|
||||
Kirill Izotov <enykeev@stackstorm.com>
|
||||
Lakshmi Kannan <lakshmi@stackstorm.com>
|
||||
Limor <limor.bortman@nokia.com>
|
||||
Limor Stotland <limor.bortman@alcatel-lucent.com>
|
||||
Lingxian Kong <konglingxian@huawei.com>
|
||||
Liu Sheng <liusheng@huawei.com>
|
||||
LiuNanke <nanke.liu@easystack.cn>
|
||||
Manas Kelshikar <manas@stackstorm.com>
|
||||
Michael Krotscheck <krotscheck@gmail.com>
|
||||
Michal Gershenzon <michal.gershenzon@alcatel-lucent.com>
|
||||
Monty Taylor <mordred@inaugust.com>
|
||||
Moshe Elisha <moshe.elisha@alcatel-lucent.com>
|
||||
Nikolay Mahotkin <nmakhotkin@mirantis.com>
|
||||
Noa Koffman <noa.koffman@alcatel-lucent.com>
|
||||
Oleksii Chuprykov <ochuprykov@mirantis.com>
|
||||
Pierre-Arthur MATHIEU <pierre-arthur.mathieu@hp.com>
|
||||
Ray Chen <chenrano2002@gmail.com>
|
||||
Renat Akhmerov <rakhmerov@mirantis.com>
|
||||
Renat Akhmerov <renat.akhmerov@gmail.com>
|
||||
Rico Lin <rico.lin.guanyu@gmail.com>
|
||||
Rinat Sabitov <rinat.sabitov@gmail.com>
|
||||
Sergey Kolekonov <skolekonov@mirantis.com>
|
||||
Sergey Murashov <smurashov@mirantis.com>
|
||||
Shuquan Huang <huang.shuquan@99cloud.net>
|
||||
Thierry Carrez <thierry@openstack.org>
|
||||
Thomas Herve <therve@redhat.com>
|
||||
Timur Nurlygayanov <tnurlygayanov@mirantis.com>
|
||||
Venkata Mahesh Kotha <venkatamaheshkotha@gmail.com>
|
||||
Winson Chan <wcchan@stackstorm.com>
|
||||
Yaroslav Lobankov <ylobankov@mirantis.com>
|
||||
Zhao Lei <zhaolei@cn.fujitsu.com>
|
||||
Zhenguo Niu <niuzhenguo@huawei.com>
|
||||
ZhiQiang Fan <aji.zqfan@gmail.com>
|
||||
ZhiQiang Fan <zhiqiang.fan@huawei.com>
|
||||
Zhu Rong <zhu.rong@99cloud.net>
|
||||
caoyue <yue.cao@easystack.cn>
|
||||
cheneydc <dongc@neunn.com>
|
||||
hardik <hardik.parekh@nectechnologies.in>
|
||||
hparekh <hardik.parekh@nectechnologies.in>
|
||||
keliang <ke.liang@easystack.cn>
|
||||
syed ahsan shamim zaidi <ahsanmohsin04@yahoo.com>
|
||||
tengqm <tengqim@cn.ibm.com>
|
||||
wangzhh <wangzhh@awcloud.com>
|
||||
zhangguoqing <zhang.guoqing@99cloud.net>
|
||||
|
|
|
@ -29,7 +29,7 @@ from mistral.workflow import utils as wf_utils
|
|||
|
||||
def create_action_execution(action_def, action_input, task_ex=None,
|
||||
index=0, description=''):
|
||||
# TODO(rakhmerov): We can avoid hitting DB at all when calling something
|
||||
# TODO(rakhmerov): We can avoid hitting DB at all when calling things like
|
||||
# create_action_execution(), these operations can be just done using
|
||||
# SQLAlchemy session (1-level cache) and session flush (on TX commit) would
|
||||
# send necessary SQL queries to DB. Currently, session flush happens
|
||||
|
|
|
@ -24,6 +24,7 @@ from mistral.engine import action_handler
|
|||
from mistral.engine import base
|
||||
from mistral.engine import task_handler
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import action_manager as a_m
|
||||
from mistral.services import executions as wf_ex_service
|
||||
from mistral.services import workflows as wf_service
|
||||
|
@ -55,6 +56,9 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
wf_ex_id = None
|
||||
|
||||
try:
|
||||
# Create a persistent workflow execution in a separate transaction
|
||||
# so that we can return it even in case of unexpected errors that
|
||||
# lead to transaction rollback.
|
||||
with db_api.transaction():
|
||||
# The new workflow execution will be in an IDLE
|
||||
# state on initial record creation.
|
||||
|
@ -65,10 +69,6 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
params
|
||||
)
|
||||
|
||||
# Separate workflow execution creation and dispatching command
|
||||
# transactions in order to be able to return workflow execution
|
||||
# with corresponding error message in state_info when error occurs
|
||||
# at dispatching commands.
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex_id)
|
||||
wf_handler.set_execution_state(wf_ex, states.RUNNING)
|
||||
|
@ -161,14 +161,10 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
|
||||
self._on_task_state_change(task_ex, wf_ex, wf_spec)
|
||||
|
||||
def _on_task_state_change(self, task_ex, wf_ex, wf_spec,
|
||||
task_state=states.SUCCESS):
|
||||
def _on_task_state_change(self, task_ex, wf_ex, wf_spec):
|
||||
task_spec = wf_spec.get_tasks()[task_ex.name]
|
||||
|
||||
# We must be sure that if task is completed,
|
||||
# it was also completed in previous transaction.
|
||||
if (task_handler.is_task_completed(task_ex, task_spec)
|
||||
and states.is_completed(task_state)):
|
||||
if task_handler.is_task_completed(task_ex, task_spec):
|
||||
task_handler.after_task_complete(task_ex, task_spec, wf_spec)
|
||||
|
||||
# Ignore DELAYED state.
|
||||
|
@ -178,8 +174,21 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
|
||||
|
||||
# Calculate commands to process next.
|
||||
cmds = wf_ctrl.continue_workflow()
|
||||
try:
|
||||
cmds = wf_ctrl.continue_workflow()
|
||||
except exc.YaqlEvaluationException as e:
|
||||
LOG.error(
|
||||
'YAQL error occurred while calculating next workflow '
|
||||
'commands [wf_ex_id=%s, task_ex_id=%s]: %s',
|
||||
wf_ex.id, task_ex.id, e
|
||||
)
|
||||
|
||||
wf_handler.fail_workflow(wf_ex, str(e))
|
||||
|
||||
return
|
||||
|
||||
# Mark task as processed after all decisions have been made
|
||||
# upon its completion.
|
||||
task_ex.processed = True
|
||||
|
||||
self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
|
||||
|
@ -235,6 +244,7 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
|
||||
wf_ex_id = action_ex.task_execution.workflow_execution_id
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
|
||||
task_ex = task_handler.on_action_complete(
|
||||
|
@ -248,30 +258,13 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
if states.is_paused_or_completed(wf_ex.state):
|
||||
return action_ex.get_clone()
|
||||
|
||||
prev_task_state = task_ex.state
|
||||
|
||||
# Separate the task transition in a separate transaction. The task
|
||||
# has already completed for better or worst. The task state should
|
||||
# not be affected by errors during transition on conditions such as
|
||||
# on-success and on-error.
|
||||
with db_api.transaction():
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
action_ex = db_api.get_action_execution(action_ex_id)
|
||||
task_ex = action_ex.task_execution
|
||||
|
||||
self._on_task_state_change(
|
||||
task_ex,
|
||||
wf_ex,
|
||||
wf_spec,
|
||||
task_state=prev_task_state
|
||||
)
|
||||
self._on_task_state_change(task_ex, wf_ex, wf_spec)
|
||||
|
||||
return action_ex.get_clone()
|
||||
except Exception as e:
|
||||
# TODO(dzimine): try to find out which command caused failure.
|
||||
# TODO(rakhmerov): Need to refactor logging in a more elegant way.
|
||||
LOG.error(
|
||||
"Failed to handle action execution result [id=%s]: %s\n%s",
|
||||
'Failed to handle action execution result [id=%s]: %s\n%s',
|
||||
action_ex_id, e, traceback.format_exc()
|
||||
)
|
||||
|
||||
|
@ -301,12 +294,13 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
|
||||
wf_ctrl = wf_base.get_controller(wf_ex)
|
||||
|
||||
# TODO(rakhmerov): Add YAQL error handling.
|
||||
# Calculate commands to process next.
|
||||
cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env)
|
||||
|
||||
# When resuming a workflow we need to ignore all 'pause'
|
||||
# commands because workflow controller takes tasks that
|
||||
# completed within the period when the workflow was pause.
|
||||
# completed within the period when the workflow was paused.
|
||||
cmds = list(
|
||||
filter(
|
||||
lambda c: not isinstance(c, commands.PauseWorkflow),
|
||||
|
@ -323,6 +317,7 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
t_ex.processed = True
|
||||
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
|
||||
self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
|
||||
|
||||
if not cmds:
|
||||
|
@ -378,9 +373,9 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
raise e
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def stop_workflow(self, execution_id, state, message=None):
|
||||
def stop_workflow(self, wf_ex_id, state, message=None):
|
||||
with db_api.transaction():
|
||||
wf_ex = wf_handler.lock_workflow_execution(execution_id)
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
|
||||
return self._stop_workflow(wf_ex, state, message)
|
||||
|
||||
|
@ -390,13 +385,16 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
wf_ctrl = wf_base.get_controller(wf_ex)
|
||||
|
||||
final_context = {}
|
||||
|
||||
try:
|
||||
final_context = wf_ctrl.evaluate_workflow_final_context()
|
||||
except Exception as e:
|
||||
LOG.warning(
|
||||
"Failed to get final context for %s: %s" % (wf_ex, e)
|
||||
'Failed to get final context for %s: %s' % (wf_ex, e)
|
||||
)
|
||||
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
|
||||
return wf_handler.succeed_workflow(
|
||||
wf_ex,
|
||||
final_context,
|
||||
|
@ -409,7 +407,7 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
return wf_ex
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def rollback_workflow(self, execution_id):
|
||||
def rollback_workflow(self, wf_ex_id):
|
||||
# TODO(rakhmerov): Implement.
|
||||
raise NotImplementedError
|
||||
|
||||
|
@ -421,12 +419,26 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
if isinstance(cmd, commands.RunTask) and cmd.is_waiting():
|
||||
task_handler.defer_task(cmd)
|
||||
elif isinstance(cmd, commands.RunTask):
|
||||
task_handler.run_new_task(cmd, wf_spec)
|
||||
task_ex = task_handler.run_new_task(cmd, wf_spec)
|
||||
|
||||
if task_ex.state == states.ERROR:
|
||||
wf_handler.fail_workflow(
|
||||
wf_ex,
|
||||
'Failed to start task [task_ex=%s]: %s' %
|
||||
(task_ex, task_ex.state_info)
|
||||
)
|
||||
elif isinstance(cmd, commands.RunExistingTask):
|
||||
task_handler.run_existing_task(
|
||||
task_ex = task_handler.run_existing_task(
|
||||
cmd.task_ex.id,
|
||||
reset=cmd.reset
|
||||
)
|
||||
|
||||
if task_ex.state == states.ERROR:
|
||||
wf_handler.fail_workflow(
|
||||
wf_ex,
|
||||
'Failed to start task [task_ex=%s]: %s' %
|
||||
(task_ex, task_ex.state_info)
|
||||
)
|
||||
elif isinstance(cmd, commands.SetWorkflowState):
|
||||
if states.is_completed(cmd.new_state):
|
||||
self._stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg)
|
||||
|
@ -441,33 +453,28 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
if wf_ex.state != states.RUNNING:
|
||||
break
|
||||
|
||||
# TODO(rakhmerov): This method may not be needed at all because error
|
||||
# handling is now implemented too roughly w/o distinguishing different
|
||||
# errors. On most errors (like YAQLException) we shouldn't rollback
|
||||
# transactions, we just need to fail corresponding execution objects
|
||||
# where a problem happened (action, task or workflow).
|
||||
@staticmethod
|
||||
def _fail_workflow(wf_ex_id, err, action_ex_id=None):
|
||||
def _fail_workflow(wf_ex_id, exc):
|
||||
"""Private helper to fail workflow on exceptions."""
|
||||
err_msg = str(err)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.load_workflow_execution(wf_ex_id)
|
||||
|
||||
if wf_ex is None:
|
||||
LOG.error(
|
||||
"Cant fail workflow execution with id='%s': not found.",
|
||||
"Can't fail workflow execution with id='%s': not found.",
|
||||
wf_ex_id
|
||||
)
|
||||
return
|
||||
return None
|
||||
|
||||
wf_handler.set_execution_state(wf_ex, states.ERROR, err_msg)
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
|
||||
if action_ex_id:
|
||||
# Note(dzimine): Don't call self.engine_client:
|
||||
# 1) to avoid computing and triggering next tasks
|
||||
# 2) to avoid a loop in case of error in transport
|
||||
action_ex = db_api.get_action_execution(action_ex_id)
|
||||
|
||||
task_handler.on_action_complete(
|
||||
action_ex,
|
||||
spec_parser.get_workflow_spec(wf_ex.spec),
|
||||
wf_utils.Result(error=err_msg)
|
||||
)
|
||||
if not states.is_paused_or_completed(wf_ex.state):
|
||||
wf_handler.set_execution_state(wf_ex, states.ERROR, str(exc))
|
||||
|
||||
return wf_ex
|
||||
|
|
|
@ -19,7 +19,6 @@ from oslo_log import log as logging
|
|||
from mistral.actions import action_factory as a_f
|
||||
from mistral import coordination
|
||||
from mistral.engine import base
|
||||
from mistral import exceptions as exc
|
||||
from mistral.utils import inspect_utils as i_u
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
|
@ -37,7 +36,7 @@ class DefaultExecutor(base.Executor, coordination.Service):
|
|||
action_params):
|
||||
"""Runs action.
|
||||
|
||||
:param action_ex_id: Corresponding task id.
|
||||
:param action_ex_id: Action execution id.
|
||||
:param action_class_str: Path to action class in dot notation.
|
||||
:param attributes: Attributes of action class which will be set to.
|
||||
:param action_params: Action parameters.
|
||||
|
@ -51,14 +50,29 @@ class DefaultExecutor(base.Executor, coordination.Service):
|
|||
action_ex_id,
|
||||
error_result
|
||||
)
|
||||
else:
|
||||
return error_result
|
||||
|
||||
return None
|
||||
|
||||
return error_result
|
||||
|
||||
action_cls = a_f.construct_action_class(action_class_str, attributes)
|
||||
|
||||
# Instantiate action.
|
||||
|
||||
try:
|
||||
action = action_cls(**action_params)
|
||||
except Exception as e:
|
||||
msg = ("Failed to initialize action %s. Action init params = %s."
|
||||
" Actual init params = %s. More info: %s"
|
||||
% (action_class_str, i_u.get_arg_list(action_cls.__init__),
|
||||
action_params.keys(), e))
|
||||
LOG.warning(msg)
|
||||
|
||||
return send_error_back(msg)
|
||||
|
||||
# Run action.
|
||||
|
||||
try:
|
||||
result = action.run()
|
||||
|
||||
# Note: it's made for backwards compatibility with already
|
||||
|
@ -67,24 +81,25 @@ class DefaultExecutor(base.Executor, coordination.Service):
|
|||
if not isinstance(result, wf_utils.Result):
|
||||
result = wf_utils.Result(data=result)
|
||||
|
||||
if action_ex_id and (action.is_sync() or result.is_error()):
|
||||
self._engine_client.on_action_complete(action_ex_id, result)
|
||||
|
||||
return result
|
||||
except TypeError as e:
|
||||
msg = ("Failed to initialize action %s. Action init params = %s."
|
||||
" Actual init params = %s. More info: %s"
|
||||
% (action_class_str, i_u.get_arg_list(action_cls.__init__),
|
||||
action_params.keys(), e))
|
||||
LOG.warning(msg)
|
||||
|
||||
except exc.ActionException as e:
|
||||
except Exception as e:
|
||||
msg = ("Failed to run action [action_ex_id=%s, action_cls='%s',"
|
||||
" attributes='%s', params='%s']\n %s"
|
||||
% (action_ex_id, action_cls, attributes, action_params, e))
|
||||
LOG.exception(msg)
|
||||
except Exception as e:
|
||||
msg = str(e)
|
||||
|
||||
# Send error info to engine.
|
||||
return send_error_back(msg)
|
||||
return send_error_back(msg)
|
||||
|
||||
# Send action result.
|
||||
|
||||
try:
|
||||
if action_ex_id and (action.is_sync() or result.is_error()):
|
||||
self._engine_client.on_action_complete(action_ex_id, result)
|
||||
|
||||
except Exception as e:
|
||||
msg = ("Exception occurred when calling engine on_action_complete"
|
||||
" [action_ex_id=%s, action_cls='%s',"
|
||||
" attributes='%s', params='%s']\n %s"
|
||||
% (action_ex_id, action_cls, attributes, action_params, e))
|
||||
LOG.exception(msg)
|
||||
|
||||
return result
|
||||
|
|
|
@ -24,6 +24,7 @@ from mistral.engine import base
|
|||
from mistral import exceptions as exc
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -164,6 +165,8 @@ class EngineServer(object):
|
|||
|
||||
:param rpc_ctx: RPC request context.
|
||||
:param action_ex_id: Action execution id.
|
||||
:param result_data: Action result data.
|
||||
:param result_error: Action result error.
|
||||
:return: Action execution.
|
||||
"""
|
||||
|
||||
|
|
|
@ -46,6 +46,9 @@ def run_existing_task(task_ex_id, reset=True):
|
|||
"""This function runs existing task execution.
|
||||
|
||||
It is needed mostly by scheduler.
|
||||
|
||||
:param task_ex_id: Task execution id.
|
||||
:param reset: Reset action executions for the task.
|
||||
"""
|
||||
task_ex = db_api.get_task_execution(task_ex_id)
|
||||
task_spec = spec_parser.get_task_spec(task_ex.spec)
|
||||
|
@ -54,15 +57,16 @@ def run_existing_task(task_ex_id, reset=True):
|
|||
|
||||
# Throw exception if the existing task already succeeded.
|
||||
if task_ex.state == states.SUCCESS:
|
||||
raise exc.EngineException('Reruning existing task that already '
|
||||
'succeeded is not supported.')
|
||||
raise exc.EngineException(
|
||||
'Rerunning existing task that already succeeded is not supported.'
|
||||
)
|
||||
|
||||
# Exit if the existing task failed and reset is not instructed.
|
||||
# For a with-items task without reset, re-running the existing
|
||||
# task will re-run the failed and unstarted items.
|
||||
if (task_ex.state == states.ERROR and not reset and
|
||||
not task_spec.get_with_items()):
|
||||
return
|
||||
return task_ex
|
||||
|
||||
# Reset nested executions only if task is not already RUNNING.
|
||||
if task_ex.state != states.RUNNING:
|
||||
|
@ -84,14 +88,27 @@ def run_existing_task(task_ex_id, reset=True):
|
|||
|
||||
_run_existing_task(task_ex, task_spec, wf_spec)
|
||||
|
||||
return task_ex
|
||||
|
||||
|
||||
def _run_existing_task(task_ex, task_spec, wf_spec):
|
||||
input_dicts = _get_input_dictionaries(
|
||||
wf_spec,
|
||||
task_ex,
|
||||
task_spec,
|
||||
task_ex.in_context
|
||||
)
|
||||
try:
|
||||
input_dicts = _get_input_dictionaries(
|
||||
wf_spec,
|
||||
task_ex,
|
||||
task_spec,
|
||||
task_ex.in_context
|
||||
)
|
||||
except exc.MistralException as e:
|
||||
LOG.error(
|
||||
'An error while calculating task action inputs'
|
||||
' [task_execution_id=%s]: %s',
|
||||
task_ex.id, e
|
||||
)
|
||||
|
||||
set_task_state(task_ex, states.ERROR, str(e))
|
||||
|
||||
return
|
||||
|
||||
# In some cases we can have no input, e.g. in case of 'with-items'.
|
||||
if input_dicts:
|
||||
|
@ -113,8 +130,15 @@ def defer_task(wf_cmd):
|
|||
wf_ex = wf_cmd.wf_ex
|
||||
task_spec = wf_cmd.task_spec
|
||||
|
||||
if not wf_utils.find_task_executions_by_spec(wf_ex, task_spec):
|
||||
_create_task_execution(wf_ex, task_spec, ctx, state=states.WAITING)
|
||||
if wf_utils.find_task_executions_by_spec(wf_ex, task_spec):
|
||||
return None
|
||||
|
||||
return _create_task_execution(
|
||||
wf_ex,
|
||||
task_spec,
|
||||
ctx,
|
||||
state=states.WAITING
|
||||
)
|
||||
|
||||
|
||||
def run_new_task(wf_cmd, wf_spec):
|
||||
|
@ -149,23 +173,25 @@ def run_new_task(wf_cmd, wf_spec):
|
|||
|
||||
# Policies could possibly change task state.
|
||||
if task_ex.state != states.RUNNING:
|
||||
return
|
||||
return task_ex
|
||||
|
||||
_run_existing_task(task_ex, task_spec, wf_spec)
|
||||
|
||||
return task_ex
|
||||
|
||||
|
||||
def on_action_complete(action_ex, wf_spec, result):
|
||||
"""Handles event of action result arrival.
|
||||
|
||||
Given action result this method performs analysis of the workflow
|
||||
execution and identifies commands (including tasks) that can be
|
||||
scheduled for execution.
|
||||
Given action result this method changes corresponding task execution
|
||||
object. This method must never be called for the case of individual
|
||||
action which is not associated with any tasks.
|
||||
|
||||
:param action_ex: Action execution objects the result belongs to.
|
||||
:param wf_spec: Workflow specification.
|
||||
:param result: Task action/workflow output wrapped into
|
||||
mistral.workflow.utils.Result instance.
|
||||
:return List of engine commands that need to be performed.
|
||||
:return Task execution object.
|
||||
"""
|
||||
|
||||
task_ex = action_ex.task_execution
|
||||
|
@ -177,7 +203,18 @@ def on_action_complete(action_ex, wf_spec, result):
|
|||
|
||||
task_spec = wf_spec.get_tasks()[task_ex.name]
|
||||
|
||||
result = action_handler.transform_result(result, task_ex, task_spec)
|
||||
try:
|
||||
result = action_handler.transform_result(result, task_ex, task_spec)
|
||||
except exc.YaqlEvaluationException as e:
|
||||
err_msg = str(e)
|
||||
|
||||
LOG.error(
|
||||
'YAQL error while transforming action result'
|
||||
' [action_execution_id=%s, result=%s]: %s',
|
||||
action_ex.id, result, err_msg
|
||||
)
|
||||
|
||||
result = wf_utils.Result(error=err_msg)
|
||||
|
||||
# Ignore workflow executions because they're handled during
|
||||
# workflow completion.
|
||||
|
@ -195,6 +232,7 @@ def on_action_complete(action_ex, wf_spec, result):
|
|||
_complete_task(task_ex, task_spec, task_state, task_state_info)
|
||||
else:
|
||||
with_items.increase_capacity(task_ex)
|
||||
|
||||
if with_items.is_completed(task_ex):
|
||||
_complete_task(
|
||||
task_ex,
|
||||
|
@ -405,7 +443,10 @@ def _schedule_run_action(task_ex, task_spec, action_input, index, wf_spec):
|
|||
)
|
||||
|
||||
action_ex = action_handler.create_action_execution(
|
||||
action_def, action_input, task_ex, index
|
||||
action_def,
|
||||
action_input,
|
||||
task_ex,
|
||||
index
|
||||
)
|
||||
|
||||
target = expr.evaluate_recursively(
|
||||
|
@ -506,11 +547,14 @@ def _complete_task(task_ex, task_spec, state, state_info=None):
|
|||
set_task_state(task_ex, state, state_info)
|
||||
|
||||
try:
|
||||
data_flow.publish_variables(
|
||||
task_ex,
|
||||
task_spec
|
||||
data_flow.publish_variables(task_ex, task_spec)
|
||||
except exc.MistralException as e:
|
||||
LOG.error(
|
||||
'An error while publishing task variables'
|
||||
' [task_execution_id=%s]: %s',
|
||||
task_ex.id, str(e)
|
||||
)
|
||||
except Exception as e:
|
||||
|
||||
set_task_state(task_ex, states.ERROR, str(e))
|
||||
|
||||
if not task_spec.get_keep_result():
|
||||
|
@ -518,7 +562,6 @@ def _complete_task(task_ex, task_spec, state, state_info=None):
|
|||
|
||||
|
||||
def set_task_state(task_ex, state, state_info, processed=None):
|
||||
# TODO(rakhmerov): How do we log task result?
|
||||
wf_trace.info(
|
||||
task_ex.workflow_execution,
|
||||
"Task execution '%s' [%s -> %s]" %
|
||||
|
|
|
@ -146,17 +146,17 @@ class EngineTestCase(base.DbTestCase):
|
|||
def is_task_in_state(self, task_ex_id, state):
|
||||
return db_api.get_task_execution(task_ex_id).state == state
|
||||
|
||||
def is_execution_in_state(self, wf_ex_id, state):
|
||||
return db_api.get_workflow_execution(wf_ex_id).state == state
|
||||
def is_execution_in_state(self, ex_id, state):
|
||||
return db_api.get_workflow_execution(ex_id).state == state
|
||||
|
||||
def is_execution_success(self, wf_ex_id):
|
||||
return self.is_execution_in_state(wf_ex_id, states.SUCCESS)
|
||||
def is_execution_success(self, ex_id):
|
||||
return self.is_execution_in_state(ex_id, states.SUCCESS)
|
||||
|
||||
def is_execution_error(self, wf_ex_id):
|
||||
return self.is_execution_in_state(wf_ex_id, states.ERROR)
|
||||
def is_execution_error(self, ex_id):
|
||||
return self.is_execution_in_state(ex_id, states.ERROR)
|
||||
|
||||
def is_execution_paused(self, wf_ex_id):
|
||||
return self.is_execution_in_state(wf_ex_id, states.PAUSED)
|
||||
def is_execution_paused(self, ex_id):
|
||||
return self.is_execution_in_state(ex_id, states.PAUSED)
|
||||
|
||||
def is_task_success(self, task_ex_id):
|
||||
return self.is_task_in_state(task_ex_id, states.SUCCESS)
|
||||
|
|
|
@ -29,12 +29,14 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
|
|||
|
||||
|
||||
class DirectWorkflowEngineTest(base.EngineTestCase):
|
||||
def _run_workflow(self, workflow_yaml, state=states.ERROR):
|
||||
wf_service.create_workflows(workflow_yaml)
|
||||
def _run_workflow(self, wf_text, expected_state=states.ERROR):
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
wf_ex = self.engine.start_workflow('wf', {})
|
||||
|
||||
self._await(lambda: self.is_execution_in_state(wf_ex.id, state))
|
||||
self._await(
|
||||
lambda: self.is_execution_in_state(wf_ex.id, expected_state)
|
||||
)
|
||||
|
||||
return db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
|
@ -274,17 +276,18 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
|||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertIn('Can not evaluate YAQL expression', wf_ex.state_info)
|
||||
|
||||
# Assert that there is only one task execution and it's SUCCESS.
|
||||
self.assertEqual(1, len(wf_ex.task_executions))
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(2, len(task_execs))
|
||||
|
||||
# 'task1' should be in SUCCESS.
|
||||
task_1_ex = self._assert_single_item(
|
||||
wf_ex.task_executions,
|
||||
name='task1'
|
||||
task_execs,
|
||||
name='task1',
|
||||
state=states.SUCCESS
|
||||
)
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
|
||||
# Assert that there is only one action execution and it's SUCCESS.
|
||||
# 'task1' should have exactly one action execution (in SUCCESS).
|
||||
task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_1_ex.id
|
||||
)
|
||||
|
@ -292,6 +295,19 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
|||
self.assertEqual(1, len(task_1_action_exs))
|
||||
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)
|
||||
|
||||
# 'task2' should exist but in ERROR.
|
||||
task_2_ex = self._assert_single_item(
|
||||
task_execs,
|
||||
name='task2',
|
||||
state=states.ERROR
|
||||
)
|
||||
|
||||
# 'task2' must not have action executions.
|
||||
self.assertEqual(
|
||||
0,
|
||||
len(db_api.get_action_executions(task_execution_id=task_2_ex.id))
|
||||
)
|
||||
|
||||
def test_async_next_task_with_input_yaql_error(self):
|
||||
wf_text = """
|
||||
version: '2.0'
|
||||
|
@ -331,29 +347,28 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
|||
self.assertEqual(states.RUNNING, task_1_action_exs[0].state)
|
||||
|
||||
# Update async action execution result.
|
||||
result = wf_utils.Result(data='foobar')
|
||||
|
||||
self.assertRaises(
|
||||
exc.YaqlEvaluationException,
|
||||
self.engine.on_action_complete,
|
||||
self.engine.on_action_complete(
|
||||
task_1_action_exs[0].id,
|
||||
result
|
||||
wf_utils.Result(data='foobar')
|
||||
)
|
||||
|
||||
# Assert that task1 is SUCCESS and workflow is ERROR.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertIn('Can not evaluate YAQL expression', wf_ex.state_info)
|
||||
self.assertEqual(1, len(wf_ex.task_executions))
|
||||
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(2, len(task_execs))
|
||||
|
||||
# 'task1' must be in SUCCESS.
|
||||
task_1_ex = self._assert_single_item(
|
||||
wf_ex.task_executions,
|
||||
name='task1'
|
||||
task_execs,
|
||||
name='task1',
|
||||
state=states.SUCCESS
|
||||
)
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
|
||||
# 'task1' must have exactly one action execution (in SUCCESS).
|
||||
task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_1_ex.id
|
||||
)
|
||||
|
@ -361,6 +376,19 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
|||
self.assertEqual(1, len(task_1_action_exs))
|
||||
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)
|
||||
|
||||
# 'task2' must be in ERROR.
|
||||
task_2_ex = self._assert_single_item(
|
||||
task_execs,
|
||||
name='task2',
|
||||
state=states.ERROR
|
||||
)
|
||||
|
||||
# 'task2' must not have action executions.
|
||||
self.assertEqual(
|
||||
0,
|
||||
len(db_api.get_action_executions(task_execution_id=task_2_ex.id))
|
||||
)
|
||||
|
||||
def test_messed_yaql_in_first_task(self):
|
||||
wf_text = """
|
||||
version: '2.0'
|
||||
|
@ -511,13 +539,9 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
|||
self.assertEqual(states.RUNNING, task_1_action_exs[0].state)
|
||||
|
||||
# Update async action execution result.
|
||||
result = wf_utils.Result(data='foobar')
|
||||
|
||||
self.assertRaises(
|
||||
exc.YaqlEvaluationException,
|
||||
self.engine.on_action_complete,
|
||||
self.engine.on_action_complete(
|
||||
task_1_action_exs[0].id,
|
||||
result
|
||||
wf_utils.Result(data='foobar')
|
||||
)
|
||||
|
||||
# Assert that task1 is SUCCESS and workflow is ERROR.
|
||||
|
|
|
@ -37,6 +37,7 @@ wf:
|
|||
input:
|
||||
- workflow_input: '__WORKFLOW_INPUT__'
|
||||
- action_output_length: 0
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: my_action
|
||||
|
@ -79,8 +80,10 @@ def expect_size_limit_exception(field_name):
|
|||
def generate_workflow(tokens):
|
||||
new_wf = WF
|
||||
long_string = ''.join('A' for _ in range(1024))
|
||||
|
||||
for token in tokens:
|
||||
new_wf = new_wf.replace(token, long_string)
|
||||
|
||||
return new_wf
|
||||
|
||||
|
||||
|
@ -136,11 +139,11 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
|
|||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wf', {})
|
||||
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertIn(
|
||||
"Size of 'input' is 1KB which exceeds the limit of 0KB",
|
||||
wf_ex.state_info
|
||||
)
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
|
||||
def test_action_output_limit(self):
|
||||
wf_service.create_workflows(WF)
|
||||
|
@ -175,7 +178,7 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
|
|||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertIn(
|
||||
"Failure caused by error in tasks: task1",
|
||||
'Failure caused by error in tasks: task1',
|
||||
wf_ex.state_info
|
||||
)
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ from mistral.workflow import utils as wf_utils
|
|||
# the change in value is not permanent.
|
||||
cfg.CONF.set_default('auth_enable', False, group='pecan')
|
||||
|
||||
WORKBOOK = """
|
||||
WB = """
|
||||
---
|
||||
version: "2.0"
|
||||
|
||||
|
@ -55,7 +55,7 @@ workflows:
|
|||
|
||||
"""
|
||||
|
||||
WORKBOOK_WITH_STATIC_VAR = """
|
||||
WB_WITH_STATIC_VAR = """
|
||||
---
|
||||
version: "2.0"
|
||||
|
||||
|
@ -78,7 +78,7 @@ workflows:
|
|||
"""
|
||||
|
||||
|
||||
WORKBOOK_MULTI_ARRAY = """
|
||||
WB_MULTI_ARRAY = """
|
||||
---
|
||||
version: "2.0"
|
||||
|
||||
|
@ -104,7 +104,7 @@ workflows:
|
|||
"""
|
||||
|
||||
|
||||
WORKBOOK_ACTION_CONTEXT = """
|
||||
WB_ACTION_CONTEXT = """
|
||||
---
|
||||
version: "2.0"
|
||||
name: wb1
|
||||
|
@ -123,7 +123,7 @@ workflows:
|
|||
"""
|
||||
|
||||
|
||||
WORKFLOW_INPUT = {
|
||||
WF_INPUT = {
|
||||
'names_info': [
|
||||
{'name': 'John'},
|
||||
{'name': 'Ivan'},
|
||||
|
@ -140,7 +140,7 @@ WF_INPUT_URLS = {
|
|||
]
|
||||
}
|
||||
|
||||
WORKFLOW_INPUT_ONE_ITEM = {
|
||||
WF_INPUT_ONE_ITEM = {
|
||||
'names_info': [
|
||||
{'name': 'Guy'}
|
||||
]
|
||||
|
@ -153,6 +153,7 @@ class RandomSleepEchoAction(action_base.Action):
|
|||
|
||||
def run(self):
|
||||
utils.random_sleep(1)
|
||||
|
||||
return self.output
|
||||
|
||||
def test(self):
|
||||
|
@ -176,10 +177,10 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
if ex.state == states.RUNNING])
|
||||
|
||||
def test_with_items_simple(self):
|
||||
wb_service.create_workbook_v2(WORKBOOK)
|
||||
wb_service.create_workbook_v2(WB)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wb1.with_items', WORKFLOW_INPUT)
|
||||
wf_ex = self.engine.start_workflow('wb1.with_items', WF_INPUT)
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
|
@ -188,15 +189,17 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
task1 = self._assert_single_item(tasks, name='task1')
|
||||
with_items_context = task1.runtime_context['with_items_context']
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(3, with_items_context['count'])
|
||||
task1_ex = self._assert_single_item(task_execs, name='task1')
|
||||
|
||||
with_items_ctx = task1_ex.runtime_context['with_items_context']
|
||||
|
||||
self.assertEqual(3, with_items_ctx['count'])
|
||||
|
||||
# Since we know that we can receive results in random order,
|
||||
# check is not depend on order of items.
|
||||
result = data_flow.get_task_execution_result(task1)
|
||||
result = data_flow.get_task_execution_result(task1_ex)
|
||||
|
||||
self.assertIsInstance(result, list)
|
||||
|
||||
|
@ -204,15 +207,15 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
self.assertIn('Ivan', result)
|
||||
self.assertIn('Mistral', result)
|
||||
|
||||
published = task1.published
|
||||
published = task1_ex.published
|
||||
|
||||
self.assertIn(published['result'], ['John', 'Ivan', 'Mistral'])
|
||||
|
||||
self.assertEqual(1, len(tasks))
|
||||
self.assertEqual(states.SUCCESS, task1.state)
|
||||
self.assertEqual(1, len(task_execs))
|
||||
self.assertEqual(states.SUCCESS, task1_ex.state)
|
||||
|
||||
def test_with_items_fail(self):
|
||||
workflow = """---
|
||||
wf_text = """---
|
||||
version: "2.0"
|
||||
|
||||
with_items:
|
||||
|
@ -227,23 +230,21 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
task2:
|
||||
action: std.echo output="With-items failed"
|
||||
"""
|
||||
wf_service.create_workflows(workflow)
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('with_items', {})
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
self.assertEqual(2, len(tasks))
|
||||
self.assertEqual(2, len(wf_ex.task_executions))
|
||||
|
||||
def test_with_items_sub_workflow_fail(self):
|
||||
workbook = """---
|
||||
wb_text = """---
|
||||
version: "2.0"
|
||||
|
||||
name: wb1
|
||||
|
@ -263,36 +264,34 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
|
||||
subworkflow:
|
||||
type: direct
|
||||
|
||||
tasks:
|
||||
fail:
|
||||
action: std.fail
|
||||
"""
|
||||
wb_service.create_workbook_v2(workbook)
|
||||
|
||||
wb_service.create_workbook_v2(wb_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wb1.with_items', {})
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
self.assertEqual(2, len(tasks))
|
||||
self.assertEqual(2, len(wf_ex.task_executions))
|
||||
|
||||
def test_with_items_static_var(self):
|
||||
wb_service.create_workbook_v2(WORKBOOK_WITH_STATIC_VAR)
|
||||
wb_service.create_workbook_v2(WB_WITH_STATIC_VAR)
|
||||
|
||||
wf_input = copy.deepcopy(WORKFLOW_INPUT)
|
||||
wf_input = copy.deepcopy(WF_INPUT)
|
||||
wf_input.update({'greeting': 'Hello'})
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wb1.with_items', wf_input)
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
@ -311,26 +310,25 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
self.assertEqual(states.SUCCESS, task1.state)
|
||||
|
||||
def test_with_items_multi_array(self):
|
||||
wb_service.create_workbook_v2(WORKBOOK_MULTI_ARRAY)
|
||||
wb_service.create_workbook_v2(WB_MULTI_ARRAY)
|
||||
|
||||
wf_input = {'arrayI': ['a', 'b', 'c'], 'arrayJ': [1, 2, 3]}
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wb1.with_items', wf_input)
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
task1 = self._assert_single_item(tasks, name='task1')
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
task1_ex = self._assert_single_item(task_execs, name='task1')
|
||||
|
||||
# Since we know that we can receive results in random order,
|
||||
# check is not depend on order of items.
|
||||
result = data_flow.get_task_execution_result(task1)
|
||||
result = data_flow.get_task_execution_result(task1_ex)
|
||||
|
||||
self.assertIsInstance(result, list)
|
||||
|
||||
|
@ -338,30 +336,28 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
self.assertIn('b 2', result)
|
||||
self.assertIn('c 3', result)
|
||||
|
||||
self.assertEqual(1, len(tasks))
|
||||
self.assertEqual(states.SUCCESS, task1.state)
|
||||
self.assertEqual(1, len(task_execs))
|
||||
self.assertEqual(states.SUCCESS, task1_ex.state)
|
||||
|
||||
def test_with_items_action_context(self):
|
||||
wb_service.create_workbook_v2(WORKBOOK_ACTION_CONTEXT)
|
||||
wb_service.create_workbook_v2(WB_ACTION_CONTEXT)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow(
|
||||
'wb1.wf1_with_items', WF_INPUT_URLS
|
||||
)
|
||||
wf_ex = self.engine.start_workflow('wb1.wf1_with_items', WF_INPUT_URLS)
|
||||
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
task_ex = wf_ex.task_executions[0]
|
||||
|
||||
act_exs = task_ex.executions
|
||||
|
||||
self.engine.on_action_complete(act_exs[0].id, wf_utils.Result("Ivan"))
|
||||
self.engine.on_action_complete(act_exs[1].id, wf_utils.Result("John"))
|
||||
self.engine.on_action_complete(
|
||||
act_exs[2].id, wf_utils.Result("Mistral")
|
||||
act_exs[2].id,
|
||||
wf_utils.Result("Mistral")
|
||||
)
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
@ -378,7 +374,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
|
||||
def test_with_items_empty_list(self):
|
||||
workbook = """---
|
||||
wb_text = """---
|
||||
version: "2.0"
|
||||
|
||||
name: wb1
|
||||
|
@ -400,29 +396,29 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
task2:
|
||||
action: std.echo output="Hi!"
|
||||
"""
|
||||
wb_service.create_workbook_v2(workbook)
|
||||
|
||||
wb_service.create_workbook_v2(wb_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_input = {'names_info': []}
|
||||
wf_ex = self.engine.start_workflow('wb1.with_items', wf_input)
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
task1 = self._assert_single_item(tasks, name='task1')
|
||||
task2 = self._assert_single_item(tasks, name='task2')
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(2, len(tasks))
|
||||
self.assertEqual(states.SUCCESS, task1.state)
|
||||
self.assertEqual(states.SUCCESS, task2.state)
|
||||
task1_ex = self._assert_single_item(task_execs, name='task1')
|
||||
task2_ex = self._assert_single_item(task_execs, name='task2')
|
||||
|
||||
self.assertEqual(2, len(task_execs))
|
||||
self.assertEqual(states.SUCCESS, task1_ex.state)
|
||||
self.assertEqual(states.SUCCESS, task2_ex.state)
|
||||
|
||||
def test_with_items_plain_list(self):
|
||||
workbook = """---
|
||||
wb_text = """---
|
||||
version: "2.0"
|
||||
|
||||
name: wb1
|
||||
|
@ -436,7 +432,8 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
with-items: i in [1, 2, 3]
|
||||
action: std.echo output=<% $.i %>
|
||||
"""
|
||||
wb_service.create_workbook_v2(workbook)
|
||||
|
||||
wb_service.create_workbook_v2(wb_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wb1.with_items', {})
|
||||
|
@ -446,11 +443,13 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
task1 = self._assert_single_item(tasks, name='task1')
|
||||
self.assertEqual(states.SUCCESS, task1.state)
|
||||
task1_ex = self._assert_single_item(
|
||||
wf_ex.task_executions,
|
||||
name='task1',
|
||||
state=states.SUCCESS
|
||||
)
|
||||
|
||||
result = data_flow.get_task_execution_result(task1)
|
||||
result = data_flow.get_task_execution_result(task1_ex)
|
||||
|
||||
# Since we know that we can receive results in random order,
|
||||
# check is not depend on order of items.
|
||||
|
@ -459,7 +458,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
self.assertIn(3, result)
|
||||
|
||||
def test_with_items_plain_list_wrong(self):
|
||||
workbook = """---
|
||||
wb_text = """---
|
||||
version: "2.0"
|
||||
|
||||
name: wb1
|
||||
|
@ -477,13 +476,13 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
|
||||
exception = self.assertRaises(
|
||||
exc.InvalidModelException,
|
||||
wb_service.create_workbook_v2, workbook
|
||||
wb_service.create_workbook_v2, wb_text
|
||||
)
|
||||
|
||||
self.assertIn("Invalid array in 'with-items'", exception.message)
|
||||
|
||||
def test_with_items_results_order(self):
|
||||
workbook = """---
|
||||
wb_text = """---
|
||||
version: "2.0"
|
||||
|
||||
name: wb1
|
||||
|
@ -502,60 +501,57 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
# Register random sleep action in the DB.
|
||||
test_base.register_action_class('sleep_echo', RandomSleepEchoAction)
|
||||
|
||||
wb_service.create_workbook_v2(workbook)
|
||||
wb_service.create_workbook_v2(wb_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wb1.with_items', {})
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
task1 = self._assert_single_item(tasks, name='task1')
|
||||
task1_ex = self._assert_single_item(
|
||||
wf_ex.task_executions,
|
||||
name='task1',
|
||||
state=states.SUCCESS
|
||||
)
|
||||
|
||||
self.assertEqual(states.SUCCESS, task1.state)
|
||||
|
||||
published = task1.published
|
||||
published = task1_ex.published
|
||||
|
||||
# Now we can check order of results explicitly.
|
||||
self.assertEqual([1, 2, 3], published['one_two_three'])
|
||||
|
||||
def test_with_items_results_one_item_as_list(self):
|
||||
wb_service.create_workbook_v2(WORKBOOK)
|
||||
wb_service.create_workbook_v2(WB)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wb1.with_items',
|
||||
WORKFLOW_INPUT_ONE_ITEM)
|
||||
wf_ex = self.engine.start_workflow('wb1.with_items', WF_INPUT_ONE_ITEM)
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
task1 = self._assert_single_item(tasks, name='task1')
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
result = data_flow.get_task_execution_result(task1)
|
||||
self.assertEqual(1, len(task_execs))
|
||||
|
||||
task1_ex = self._assert_single_item(
|
||||
task_execs,
|
||||
name='task1',
|
||||
state=states.SUCCESS
|
||||
)
|
||||
|
||||
result = data_flow.get_task_execution_result(task1_ex)
|
||||
|
||||
self.assertIsInstance(result, list)
|
||||
|
||||
self.assertIn('Guy', result)
|
||||
|
||||
published = task1.published
|
||||
|
||||
self.assertIn(published['result'], ['Guy'])
|
||||
|
||||
self.assertEqual(1, len(tasks))
|
||||
self.assertEqual(states.SUCCESS, task1.state)
|
||||
self.assertIn(task1_ex.published['result'], ['Guy'])
|
||||
|
||||
def test_with_items_concurrency_1(self):
|
||||
workflow_with_concurrency_1 = """---
|
||||
wf_with_concurrency_1 = """---
|
||||
version: "2.0"
|
||||
|
||||
concurrency_test:
|
||||
|
@ -569,16 +565,18 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
action: std.async_noop
|
||||
with-items: name in <% $.names %>
|
||||
concurrency: 1
|
||||
|
||||
"""
|
||||
wf_service.create_workflows(workflow_with_concurrency_1)
|
||||
|
||||
wf_service.create_workflows(wf_with_concurrency_1)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('concurrency_test', {})
|
||||
wf_ex = db_api.get_execution(wf_ex.id)
|
||||
task_ex = wf_ex.task_executions[0]
|
||||
|
||||
wf_ex = db_api.get_execution(wf_ex.id)
|
||||
|
||||
task_ex = wf_ex.task_executions[0]
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self.assert_capacity(0, task_ex)
|
||||
self.assertEqual(1, self.get_running_action_exs_number(task_ex))
|
||||
|
||||
|
@ -589,6 +587,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self.assert_capacity(0, task_ex)
|
||||
self.assertEqual(1, self.get_running_action_exs_number(task_ex))
|
||||
|
||||
|
@ -599,6 +598,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self.assert_capacity(0, task_ex)
|
||||
self.assertEqual(1, self.get_running_action_exs_number(task_ex))
|
||||
|
||||
|
@ -609,13 +609,13 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self.assert_capacity(1, task_ex)
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
# Since we know that we can receive results in random order,
|
||||
# check is not depend on order of items.
|
||||
result = data_flow.get_task_execution_result(task_ex)
|
||||
|
@ -629,7 +629,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
|
||||
def test_with_items_concurrency_yaql(self):
|
||||
workflow_with_concurrency_yaql = """---
|
||||
wf_with_concurrency_yaql = """---
|
||||
version: "2.0"
|
||||
|
||||
concurrency_test:
|
||||
|
@ -644,9 +644,9 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
action: std.echo output=<% $.name %>
|
||||
with-items: name in <% $.names %>
|
||||
concurrency: <% $.concurrency %>
|
||||
|
||||
"""
|
||||
wf_service.create_workflows(workflow_with_concurrency_yaql)
|
||||
|
||||
wf_service.create_workflows(wf_with_concurrency_yaql)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow(
|
||||
|
@ -654,13 +654,14 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
{'concurrency': 2}
|
||||
)
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task_ex = wf_ex.task_executions[0]
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
|
||||
# Since we know that we can receive results in random order,
|
||||
# check is not depend on order of items.
|
||||
result = data_flow.get_task_execution_result(task_ex)
|
||||
|
@ -671,10 +672,8 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
self.assertIn('Ivan', result)
|
||||
self.assertIn('Mistral', result)
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
|
||||
def test_with_items_concurrency_yaql_wrong_type(self):
|
||||
workflow_with_concurrency_yaql = """---
|
||||
wf_with_concurrency_yaql = """---
|
||||
version: "2.0"
|
||||
|
||||
concurrency_test:
|
||||
|
@ -689,9 +688,9 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
action: std.echo output=<% $.name %>
|
||||
with-items: name in <% $.names %>
|
||||
concurrency: <% $.concurrency %>
|
||||
|
||||
"""
|
||||
wf_service.create_workflows(workflow_with_concurrency_yaql)
|
||||
|
||||
wf_service.create_workflows(wf_with_concurrency_yaql)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow(
|
||||
|
@ -700,13 +699,13 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
)
|
||||
|
||||
self.assertIn(
|
||||
"Invalid data type in ConcurrencyPolicy",
|
||||
'Invalid data type in ConcurrencyPolicy',
|
||||
wf_ex.state_info
|
||||
)
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
|
||||
def test_with_items_concurrency_2(self):
|
||||
workflow_with_concurrency_2 = """---
|
||||
wf_with_concurrency_2 = """---
|
||||
version: "2.0"
|
||||
|
||||
concurrency_test:
|
||||
|
@ -722,10 +721,11 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
concurrency: 2
|
||||
|
||||
"""
|
||||
wf_service.create_workflows(workflow_with_concurrency_2)
|
||||
wf_service.create_workflows(wf_with_concurrency_2)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('concurrency_test', {})
|
||||
|
||||
wf_ex = db_api.get_execution(wf_ex.id)
|
||||
task_ex = wf_ex.task_executions[0]
|
||||
|
||||
|
@ -739,6 +739,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self.assert_capacity(0, task_ex)
|
||||
self.assertEqual(2, self.get_running_action_exs_number(task_ex))
|
||||
|
||||
|
@ -749,6 +750,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self.assert_capacity(0, task_ex)
|
||||
self.assertEqual(2, self.get_running_action_exs_number(task_ex))
|
||||
|
||||
|
@ -759,6 +761,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self.assert_capacity(1, task_ex)
|
||||
|
||||
# 4th iteration complete.
|
||||
|
@ -768,16 +771,17 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self.assert_capacity(2, task_ex)
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
# Since we know that we can receive results in random order,
|
||||
# check is not depend on order of items.
|
||||
result = data_flow.get_task_execution_result(task_ex)
|
||||
|
||||
self.assertIsInstance(result, list)
|
||||
|
||||
self.assertIn('John', result)
|
||||
|
@ -788,7 +792,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
|
||||
def test_with_items_concurrency_2_fail(self):
|
||||
workflow_with_concurrency_2_fail = """---
|
||||
wf_with_concurrency_2_fail = """---
|
||||
version: "2.0"
|
||||
|
||||
concurrency_test_fail:
|
||||
|
@ -805,14 +809,13 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
action: std.echo output="With-items failed"
|
||||
|
||||
"""
|
||||
wf_service.create_workflows(workflow_with_concurrency_2_fail)
|
||||
wf_service.create_workflows(wf_with_concurrency_2_fail)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('concurrency_test_fail', {})
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
wf_ex = db_api.get_execution(wf_ex.id)
|
||||
|
||||
task_exs = wf_ex.task_executions
|
||||
|
@ -822,12 +825,12 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
task_2 = self._assert_single_item(task_exs, name='task2')
|
||||
|
||||
self.assertEqual(
|
||||
"With-items failed",
|
||||
'With-items failed',
|
||||
data_flow.get_task_execution_result(task_2)
|
||||
)
|
||||
|
||||
def test_with_items_concurrency_3(self):
|
||||
workflow_with_concurrency_3 = """---
|
||||
wf_with_concurrency_3 = """---
|
||||
version: "2.0"
|
||||
|
||||
concurrency_test:
|
||||
|
@ -843,10 +846,12 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
concurrency: 3
|
||||
|
||||
"""
|
||||
wf_service.create_workflows(workflow_with_concurrency_3)
|
||||
|
||||
wf_service.create_workflows(wf_with_concurrency_3)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('concurrency_test', {})
|
||||
|
||||
wf_ex = db_api.get_execution(wf_ex.id)
|
||||
task_ex = wf_ex.task_executions[0]
|
||||
|
||||
|
@ -860,6 +865,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self.assert_capacity(1, task_ex)
|
||||
|
||||
# 2nd iteration complete.
|
||||
|
@ -869,6 +875,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self.assert_capacity(2, task_ex)
|
||||
|
||||
# 3rd iteration complete.
|
||||
|
@ -878,26 +885,27 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self.assert_capacity(3, task_ex)
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
|
||||
# Since we know that we can receive results in random order,
|
||||
# check is not depend on order of items.
|
||||
result = data_flow.get_task_execution_result(task_ex)
|
||||
|
||||
self.assertIsInstance(result, list)
|
||||
|
||||
self.assertIn('John', result)
|
||||
self.assertIn('Ivan', result)
|
||||
self.assertIn('Mistral', result)
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
|
||||
def test_with_items_concurrency_gt_list_length(self):
|
||||
workflow_definition = """---
|
||||
wf_definition = """---
|
||||
version: "2.0"
|
||||
|
||||
concurrency_test:
|
||||
|
@ -913,26 +921,29 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
concurrency: 3
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(workflow_definition)
|
||||
wf_service.create_workflows(wf_definition)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('concurrency_test', {})
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
wf_ex = db_api.get_execution(wf_ex.id)
|
||||
task_ex = self._assert_single_item(wf_ex.task_executions, name='task1')
|
||||
|
||||
task_ex = self._assert_single_item(
|
||||
wf_ex.task_executions,
|
||||
name='task1',
|
||||
state=states.SUCCESS
|
||||
)
|
||||
|
||||
result = data_flow.get_task_execution_result(task_ex)
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
self.assertIsInstance(result, list)
|
||||
self.assertIn('John', result)
|
||||
self.assertIn('Ivan', result)
|
||||
|
||||
def test_with_items_retry_policy(self):
|
||||
workflow = """---
|
||||
wf_text = """---
|
||||
version: "2.0"
|
||||
|
||||
with_items_retry:
|
||||
|
@ -948,32 +959,32 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
task2:
|
||||
action: std.echo output="With-items failed"
|
||||
"""
|
||||
wf_service.create_workflows(workflow)
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('with_items_retry', {})
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id)
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
self.assertEqual(2, len(tasks))
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
task1 = self._assert_single_item(tasks, name='task1')
|
||||
self.assertEqual(2, len(task_execs))
|
||||
|
||||
task1_ex = self._assert_single_item(task_execs, name='task1')
|
||||
|
||||
self.assertEqual(
|
||||
2,
|
||||
task1.runtime_context['retry_task_policy']['retry_no']
|
||||
task1_ex.runtime_context['retry_task_policy']['retry_no']
|
||||
)
|
||||
self.assertEqual(9, len(task1.executions))
|
||||
self._assert_multiple_items(task1.executions, 3, accepted=True)
|
||||
self.assertEqual(9, len(task1_ex.executions))
|
||||
self._assert_multiple_items(task1_ex.executions, 3, accepted=True)
|
||||
|
||||
def test_with_items_retry_policy_concurrency(self):
|
||||
workflow = """---
|
||||
wf_text = """---
|
||||
version: "2.0"
|
||||
|
||||
with_items_retry_concurrency:
|
||||
|
@ -990,31 +1001,28 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
task2:
|
||||
action: std.echo output="With-items failed"
|
||||
"""
|
||||
wf_service.create_workflows(workflow)
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow(
|
||||
'with_items_retry_concurrency',
|
||||
{}
|
||||
)
|
||||
wf_ex = self.engine.start_workflow('with_items_retry_concurrency', {})
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
self.assertEqual(2, len(tasks))
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
task1 = self._assert_single_item(tasks, name='task1')
|
||||
self.assertEqual(2, len(task_execs))
|
||||
|
||||
self.assertEqual(12, len(task1.executions))
|
||||
self._assert_multiple_items(task1.executions, 4, accepted=True)
|
||||
task1_ex = self._assert_single_item(task_execs, name='task1')
|
||||
|
||||
self.assertEqual(12, len(task1_ex.executions))
|
||||
self._assert_multiple_items(task1_ex.executions, 4, accepted=True)
|
||||
|
||||
def test_with_items_env(self):
|
||||
workflow = """---
|
||||
wf_text = """---
|
||||
version: "2.0"
|
||||
|
||||
with_items_env:
|
||||
|
@ -1023,19 +1031,17 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
with-items: i in [1, 2, 3, 4]
|
||||
action: std.echo output="<% $.i %>.<% env().name %>"
|
||||
"""
|
||||
wf_service.create_workflows(workflow)
|
||||
env = {'name': 'Mistral'}
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow(
|
||||
'with_items_env',
|
||||
{},
|
||||
env=env
|
||||
env={'name': 'Mistral'}
|
||||
)
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
@ -1058,7 +1064,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
self.assertEqual(states.SUCCESS, task1.state)
|
||||
|
||||
def test_with_items_two_tasks_second_starts_on_success(self):
|
||||
workbook = """---
|
||||
wb_text = """---
|
||||
version: "2.0"
|
||||
|
||||
name: wb1
|
||||
|
@ -1076,7 +1082,8 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
with-items: i in [3, 4]
|
||||
action: std.echo output=<% $.i %>
|
||||
"""
|
||||
wb_service.create_workbook_v2(workbook)
|
||||
|
||||
wb_service.create_workbook_v2(wb_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wb1.with_items', {})
|
||||
|
@ -1086,14 +1093,21 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
task1 = self._assert_single_item(tasks, name='task1')
|
||||
task2 = self._assert_single_item(tasks, name='task2')
|
||||
self.assertEqual(states.SUCCESS, task1.state)
|
||||
self.assertEqual(states.SUCCESS, task2.state)
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
result_task1 = data_flow.get_task_execution_result(task1)
|
||||
result_task2 = data_flow.get_task_execution_result(task2)
|
||||
task1_ex = self._assert_single_item(
|
||||
task_execs,
|
||||
name='task1',
|
||||
state=states.SUCCESS
|
||||
)
|
||||
task2_ex = self._assert_single_item(
|
||||
task_execs,
|
||||
name='task2',
|
||||
state=states.SUCCESS
|
||||
)
|
||||
|
||||
result_task1 = data_flow.get_task_execution_result(task1_ex)
|
||||
result_task2 = data_flow.get_task_execution_result(task2_ex)
|
||||
|
||||
# Since we know that we can receive results in random order,
|
||||
# check is not depend on order of items.
|
||||
|
@ -1103,45 +1117,51 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||
self.assertIn(4, result_task2)
|
||||
|
||||
def test_with_items_subflow_concurrency_gt_list_length(self):
|
||||
workbook_definition = """---
|
||||
wb_text = """---
|
||||
version: "2.0"
|
||||
name: wb1
|
||||
|
||||
workflows:
|
||||
main:
|
||||
type: direct
|
||||
|
||||
input:
|
||||
- names
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
with-items: name in <% $.names %>
|
||||
workflow: subflow1 name=<% $.name %>
|
||||
concurrency: 3
|
||||
|
||||
subflow1:
|
||||
type: direct
|
||||
|
||||
input:
|
||||
- name
|
||||
output:
|
||||
result: <% task(task1).result %>
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.echo output=<% $.name %>
|
||||
"""
|
||||
|
||||
wb_service.create_workbook_v2(workbook_definition)
|
||||
wb_service.create_workbook_v2(wb_text)
|
||||
|
||||
# Start workflow.
|
||||
names = ["Peter", "Susan", "Edmund", "Lucy", "Aslan", "Caspian"]
|
||||
wf_ex = self.engine.start_workflow('wb1.main', {'names': names})
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
wf_ex = db_api.get_execution(wf_ex.id)
|
||||
task_ex = self._assert_single_item(wf_ex.task_executions, name='task1')
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
task_ex = self._assert_single_item(
|
||||
wf_ex.task_executions,
|
||||
name='task1',
|
||||
state=states.SUCCESS
|
||||
)
|
||||
|
||||
result = [
|
||||
item['result']
|
||||
|
|
|
@ -63,15 +63,15 @@ class RunExistingTask(WorkflowCommand):
|
|||
"""Command for running already existent task."""
|
||||
|
||||
def __init__(self, task_ex, reset=True):
|
||||
wf_ex = task_ex.workflow_execution
|
||||
task_spec = spec_parser.get_task_spec(task_ex.spec)
|
||||
super(RunExistingTask, self).__init__(
|
||||
task_ex.workflow_execution,
|
||||
spec_parser.get_task_spec(task_ex.spec),
|
||||
task_ex.in_context
|
||||
)
|
||||
|
||||
self.task_ex = task_ex
|
||||
self.reset = reset
|
||||
|
||||
super(RunExistingTask, self).__init__(
|
||||
wf_ex, task_spec, task_ex.in_context
|
||||
)
|
||||
|
||||
|
||||
class SetWorkflowState(WorkflowCommand):
|
||||
"""Instruction to change a workflow state."""
|
||||
|
|
Loading…
Reference in New Issue