Merge "Starting tasks via RPC"

This commit is contained in:
Zuul 2023-05-17 09:20:40 +00:00 committed by Gerrit Code Review
commit d6b7f7305d
25 changed files with 576 additions and 197 deletions

View File

@ -46,6 +46,14 @@ class Engine(object, metaclass=abc.ABCMeta):
"""
raise NotImplementedError
@abc.abstractmethod
def start_task(self, task_ex_id, first_run, waiting,
triggered_by, rerun, reset, **params):
"""Starts task sending a request to engine over RPC.
"""
raise NotImplementedError
@abc.abstractmethod
def start_action(self, action_name, action_input,
description=None, namespace='', **params):

View File

@ -28,6 +28,7 @@ from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import action_handler
from mistral.engine import base
from mistral.engine import post_tx_queue
from mistral.engine import task_handler
from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions
from mistral.workflow import states
@ -81,6 +82,13 @@ class DefaultEngine(base.Engine):
return wf_ex.get_clone()
@post_tx_queue.run
def start_task(self, task_ex_id, first_run, waiting,
triggered_by, rerun, reset, **params):
with db_api.transaction():
task_handler.run_task(task_ex_id, waiting,
triggered_by, rerun, reset, first_run)
@db_utils.retry_on_db_error
@post_tx_queue.run
def start_action(self, action_name, action_input,

View File

@ -16,7 +16,9 @@
import functools
from osprofiler import profiler
from mistral.engine import post_tx_queue
from mistral import exceptions as exc
from mistral.rpc import clients as rpc
from mistral.workflow import commands
from mistral.workflow import states
@ -146,7 +148,28 @@ def _process_commands(wf_ex, cmds):
continue
if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)):
task_handler.run_task(cmd)
if isinstance(cmd, commands.RunExistingTask):
first_run = False
reset = cmd.reset
else:
first_run = True
reset = False
task = task_handler.create_task(cmd, first_run)
def _start_task(_task=task, _first_run=first_run, _reset=reset):
rpc.get_engine_client().start_task(
wf_spec=_task.wf_spec,
task_ex_id=_task.task_ex.id,
first_run=_first_run,
waiting=_task.waiting,
triggered_by=_task.triggered_by,
rerun=_task.rerun,
reset=_reset
)
post_tx_queue.register_operation(_start_task)
elif isinstance(cmd, commands.SkipTask):
task_handler.skip_task(cmd)
elif isinstance(cmd, commands.SetWorkflowState):

View File

@ -167,6 +167,26 @@ class EngineServer(service_base.MistralService):
**params
)
def start_task(self, rpc_ctx, task_ex_id, first_run, waiting,
triggered_by, rerun, **params):
"""Receives calls over RPC to start tasks on engine.
"""
LOG.info(
"Received RPC request 'start_task'[task_ex_id=%s, first_run=%s]",
task_ex_id,
first_run
)
return self.engine.start_task(
task_ex_id,
first_run,
waiting,
triggered_by,
rerun,
**params
)
def start_action(self, rpc_ctx, action_name,
action_input, description, namespace, params):
"""Receives calls over RPC to start actions on engine.

View File

@ -56,13 +56,23 @@ _SCHEDULED_ON_ACTION_UPDATE_PATH = (
@profiler.trace('task-handler-run-task', hide_args=True)
def run_task(wf_cmd):
def run_task(task_ex_id, waiting, triggered_by, rerun, reset, first_run=False):
"""Runs workflow task.
:param wf_cmd: Workflow command.
:param task_ex_id: Task Execution id
:param waiting: Task waiting param
:param triggered_by:
:param rerun:
:param reset:
:param first_run:
"""
task = _build_task_from_command(wf_cmd)
task_ex = db_api.get_task_execution(task_ex_id)
wf_spec = spec_parser.get_workflow_spec_by_execution_id(
task_ex.workflow_execution_id
)
task = _build_task_after_rpc(wf_spec, task_ex, waiting, triggered_by,
rerun, reset)
try:
if task.waiting and task.rerun:
@ -70,18 +80,16 @@ def run_task(wf_cmd):
_schedule_refresh_task_state(task.task_ex.id)
task.run()
task.run(first_run)
except (exc.MistralException, mistral_lib_exc.MistralException) as e:
wf_ex = wf_cmd.wf_ex
task_spec = wf_cmd.task_spec
wf_ex = task_ex.workflow_execution
msg = (
"Failed to run task [error=%s, wf=%s, task=%s]:\n%s" %
(e, wf_ex.name, task_spec.get_name(), tb.format_exc())
(e, wf_ex.name, task_ex.name, tb.format_exc())
)
force_fail_task(task.task_ex, msg, task=task)
force_fail_task(task_ex, msg, task=task)
return
_check_affected_tasks(task)
@ -99,6 +107,26 @@ def skip_task(wf_cmd):
return
@profiler.trace('task-handler-create-task', hide_args=True)
def create_task(wf_cmd, first_run):
"""Creates workflow task.
:param wf_cmd: Workflow command.
"""
task = _build_task_from_command(wf_cmd)
if task.waiting and task.rerun:
task.set_state(states.WAITING, 'Task is waiting.')
_schedule_refresh_task_state(task.task_ex.id)
if first_run:
task.create_new()
return task
def mark_task_running(task_ex, wf_spec):
task = build_task_from_execution(wf_spec, task_ex)
@ -240,9 +268,9 @@ def continue_task(task_ex):
task = build_task_from_execution(wf_spec, task_ex)
try:
task.set_state(states.RUNNING, None)
task.run()
with db_api.named_lock('continue-task-%s' % task_ex.id):
task.set_state(states.RUNNING, None)
task.run()
except exc.MistralException as e:
wf_ex = task_ex.workflow_execution
@ -346,6 +374,25 @@ def build_task_from_execution(wf_spec, task_ex):
)
def _build_task_after_rpc(wf_spec, task_ex, waiting, triggered_by, rerun,
reset):
task = _create_task(
task_ex.workflow_execution,
wf_spec,
wf_spec.get_task(task_ex.name),
task_ex.in_context,
task_ex,
waiting=waiting == states.WAITING,
triggered_by=triggered_by,
rerun=rerun
)
if reset:
task.reset()
return task
@profiler.trace('task-handler-build-task-from-command', hide_args=True)
def _build_task_from_command(cmd):
if isinstance(cmd, wf_cmds.RunExistingTask):

View File

@ -289,7 +289,7 @@ class Task(object, metaclass=abc.ABCMeta):
self.reset_flag = True
@profiler.trace('task-set-state')
def set_state(self, state, state_info, processed=None):
def set_state(self, state, state_info, processed=None, first_run=False):
"""Sets task state without executing post completion logic.
:param state: New task state.
@ -323,8 +323,8 @@ class Task(object, metaclass=abc.ABCMeta):
# was WAITING (all preconditions are satisfied and it's
# ready to start) or IDLE, or the task is being rerun. So
# we treat all iterations of "retry" policy as one run.
if state == states.RUNNING and \
(cur_state in (None, states.WAITING) or self.rerun):
if state == states.RUNNING and (cur_state in (
None, states.WAITING, states.IDLE) or self.rerun):
self.task_ex.started_at = utils.utc_now_sec()
if states.is_completed(state):
@ -336,7 +336,10 @@ class Task(object, metaclass=abc.ABCMeta):
if processed is not None:
self.task_ex.processed = processed
self._notify(cur_state, state)
if first_run:
self._notify(None, state)
else:
self._notify(cur_state, state)
wf_trace.info(
self.task_ex.workflow_execution,
@ -569,41 +572,47 @@ class RegularTask(Task):
self.update(action_ex.state)
@profiler.trace('task-run')
def run(self):
if not self.task_ex:
def run(self, first_run=False):
if first_run:
self._run_new()
else:
self._run_existing()
@profiler.trace('task-run-new')
def _run_new(self):
if self.waiting:
return
if states.is_idle(self.task_ex.state):
# Set the RUNNING state and trigger all operations
# related to the state change.
self.set_state(
states.RUNNING, self.task_ex.state_info, first_run=True)
LOG.debug(
'Starting task [name=%s, init_state=%s, workflow_name=%s,'
' execution_id=%s]',
self.task_spec.get_name(),
self.task_ex.state,
self.wf_ex.name,
self.wf_ex.id
)
self._before_task_start()
# Policies could possibly change task state.
if self.task_ex.state != states.RUNNING:
return
self._schedule_actions()
@profiler.trace('task-create-new')
def create_new(self):
if self.waiting:
self.defer()
return
self._create_task_execution()
# Set the initial state and trigger all operations
# related to the state change.
self.set_state(states.RUNNING, None)
LOG.debug(
'Starting task [name=%s, init_state=%s, workflow_name=%s,'
' execution_id=%s]',
self.task_spec.get_name(),
self.task_ex.state,
self.wf_ex.name,
self.wf_ex.id
)
self._before_task_start()
# Policies could possibly change task state.
if self.task_ex.state != states.RUNNING:
return
self._schedule_actions()
self._create_task_execution(state=states.IDLE)
@profiler.trace('task-run-existing')
def _run_existing(self):

View File

@ -158,6 +158,25 @@ class EngineClient(eng.Engine):
params=params
)
@base.wrap_messaging_exception
def start_task(self, task_ex_id, first_run, waiting,
triggered_by, rerun, reset, **params):
"""Starts task sending a request to engine over RPC.
"""
return self._client.async_call(
auth_ctx.ctx(),
'start_task',
task_ex_id=task_ex_id,
first_run=first_run,
waiting=waiting,
triggered_by=triggered_by,
rerun=rerun,
reset=reset,
params=params
)
@base.wrap_messaging_exception
def start_action(self, action_name, action_input,
description=None, namespace='', **params):

View File

@ -208,7 +208,9 @@ class AdhocActionsTest(base.EngineTestCase):
'my_wb.wf3',
wf_input={'str1': 'a', 'str2': 'b'}
)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn("Invalid input", wf_ex.state_info)
self.assertEqual(states.ERROR, wf_ex.state)
@ -306,11 +308,16 @@ class AdhocActionsTest(base.EngineTestCase):
self.await_workflow_running(wf_ex.id, timeout=4)
with db_api.transaction(read_only=True):
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_ex_id = wf_ex.id
a_ex_id = wf_ex.task_executions[0].action_executions[0].id
task_execs = wf_ex.task_executions
task1_ex = task_execs[0]
self.engine.start_task(task1_ex.id, True, False, None, False, False)
action_execs = db_api.get_action_executions(
task_execution_id=task1_ex.id
)
a_ex_id = action_execs[0].id
self.engine.on_action_complete(a_ex_id, ml_actions.Result(data='Hi!'))

View File

@ -133,9 +133,10 @@ class DefaultEngineTest(base.DbTestCase):
self.assertEqual('wb.wf', task_ex.workflow_name)
self.assertEqual('task1', task_ex.name)
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.assertIsNotNone(task_ex.spec)
self.assertDictEqual({}, task_ex.runtime_context)
self.engine.start_task(task_ex.id, True, False, None, False, False)
# Data Flow properties.
action_execs = db_api.get_action_executions(
@ -202,9 +203,10 @@ class DefaultEngineTest(base.DbTestCase):
self.assertEqual('wb.wf', task_ex.workflow_name)
self.assertEqual('task1', task_ex.name)
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.assertIsNotNone(task_ex.spec)
self.assertDictEqual({}, task_ex.runtime_context)
self.engine.start_task(task_ex.id, True, False, None, False, False)
# Data Flow properties.
action_execs = db_api.get_action_executions(
@ -353,9 +355,8 @@ class DefaultEngineTest(base.DbTestCase):
self.assertEqual(1, len(task_execs))
task1_ex = task_execs[0]
self.engine.start_task(task1_ex.id, True, False, None, False, False)
self.assertEqual('task1', task1_ex.name)
self.assertEqual(states.RUNNING, task1_ex.state)
action_execs = db_api.get_action_executions(
task_execution_id=task1_ex.id
@ -427,6 +428,9 @@ class DefaultEngineTest(base.DbTestCase):
task1_ex = task_execs[0]
self.engine.start_task(task1_ex.id, True, False, None, False, False)
task1_ex = db_api.get_task_execution(task1_ex.id)
self.assertEqual('task1', task1_ex.name)
self.assertEqual(states.RUNNING, task1_ex.state)
@ -460,6 +464,19 @@ class DefaultEngineTest(base.DbTestCase):
self.assertIsNotNone(wf_ex)
self.assertEqual(states.RUNNING, wf_ex.state)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
task1_ex = task_execs[0]
self.engine.start_task(
task1_ex.id, True, False, None, False, False)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -515,6 +532,8 @@ class DefaultEngineTest(base.DbTestCase):
self.assertEqual(2, len(task_execs))
task2_ex = self._assert_single_item(task_execs, name='task2')
self.engine.start_task(task2_ex.id, True, False, None, False, False)
task2_ex = db_api.get_task_execution(task2_ex.id)
self.assertEqual(states.RUNNING, task2_ex.state)
@ -650,6 +669,7 @@ class DefaultEngineTest(base.DbTestCase):
self.assertEqual(1, len(task_execs))
task_ex = task_execs[0]
self.engine.start_task(task_ex.id, True, False, None, False, False)
action_execs = db_api.get_action_executions(
task_execution_id=task_ex.id

View File

@ -282,7 +282,9 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn(
"Failed to find action [action_name=wrong.task, namespace=]",
wf_ex.state_info
@ -372,13 +374,11 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(1, len(task_execs))
task_1_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(states.RUNNING, task_1_ex.state)
self.await_task_running(task_1_ex.id)
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id
@ -393,6 +393,13 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
ml_actions.Result(data='foobar')
)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task_2_ex = self._assert_single_item(task_execs, name='task2')
self.await_task_error(task_2_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -546,6 +553,9 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn(
"Can not evaluate YAQL expression [expression=wrong(yaql)",
@ -569,6 +579,9 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', wf_input={'var': 2})
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn("Can not evaluate YAQL expression", wf_ex.state_info)
self.assertEqual(states.ERROR, wf_ex.state)
@ -676,10 +689,12 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(task_execs, name='task1')
self.await_task_running(task_1_ex.id)
self.assertEqual(1, len(task_execs))
task_1_ex = self._assert_single_item(task_execs, name='task1')
task_1_ex = db_api.get_task_execution(task_1_ex.id)
self.assertEqual(states.RUNNING, task_1_ex.state)
@ -1177,6 +1192,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
task_execs,
name='branch2'
)
self.engine.start_task(t_ex.id, True, False, None, False, False)
t_action_exs = db_api.get_action_executions(
task_execution_id=t_ex.id

View File

@ -461,56 +461,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
task_30_action_exs[0].input
)
@mock.patch.object(
std_actions.EchoAction,
'run',
mock.MagicMock(
side_effect=[
'Task 1', # Mock task1 success for initial run.
exc.ActionException() # Mock task2 exception for initial run.
]
)
)
def test_rerun_from_prev_step(self):
wb_service.create_workbook_v2(SIMPLE_WORKBOOK)
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(task_execs))
task_1_ex = self._assert_single_item(
task_execs,
name='t1',
state=states.SUCCESS
)
task_2_ex = self._assert_single_item(
task_execs,
name='t2',
state=states.ERROR
)
self.assertIsNotNone(task_2_ex.state_info)
# Resume workflow and re-run failed task.
e = self.assertRaises(
exc.MistralError,
self.engine.rerun_workflow,
task_1_ex.id
)
self.assertIn('not supported', str(e))
@mock.patch.object(
std_actions.EchoAction,
'run',

View File

@ -68,7 +68,15 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase):
wf1_ex = self.engine.start_workflow('wb1.wf1')
self.await_workflow_state(wf1_ex.id, states.RUNNING)
with db_api.transaction():
wf1_execs = db_api.get_workflow_executions()
wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1')
wf1_t1_ex = self._assert_single_item(
wf1_ex.task_executions,
name='t1'
)
self.await_task_running(wf1_t1_ex.id)
with db_api.transaction():
wf1_execs = db_api.get_workflow_executions()
@ -110,6 +118,14 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase):
# Resume workflow and re-run cancelled task.
self.engine.rerun_workflow(wf1_t1_ex.id)
with db_api.transaction():
wf1_ex = db_api.get_workflow_execution(wf1_ex.id)
wf1_task_execs = wf1_ex.task_executions
wf1_t1_ex = self._assert_single_item(wf1_task_execs, name='t1')
self.await_task_running(wf1_t1_ex.id)
with db_api.transaction():
wf1_ex = db_api.get_workflow_execution(wf1_ex.id)
@ -573,6 +589,16 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase):
self.await_workflow_state(wf1_ex.id, states.RUNNING)
with db_api.transaction():
wf1_execs = db_api.get_workflow_executions()
wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1')
wf1_t1_ex = self._assert_single_item(
wf1_ex.task_executions,
name='t1'
)
self.await_task_running(wf1_t1_ex.id)
with db_api.transaction():
wf1_execs = db_api.get_workflow_executions()
@ -620,7 +646,15 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase):
wf1_ex.task_executions,
name='t1'
)
self.await_task_running(wf1_t1_ex.id)
with db_api.transaction():
wf1_execs = db_api.get_workflow_executions()
wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1')
wf1_t1_ex = self._assert_single_item(
wf1_ex.task_executions,
name='t1'
)
self.await_workflow_state(wf1_ex.id, states.RUNNING)
wf1_t1_action_exs = db_api.get_action_executions(

View File

@ -223,4 +223,4 @@ class ErrorResultTest(base.EngineTestCase):
self.assertEqual(1, len(tasks))
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(states.RUNNING, task1.state)
self.assertEqual(states.IDLE, task1.state)

View File

@ -169,7 +169,9 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
self.assertEqual(states.ERROR, wf_ex.state)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn(
"Field size limit exceeded"
" [class=TaskExecution, field=input, size=1KB, limit=0KB]",

View File

@ -428,7 +428,7 @@ class PoliciesTest(base.EngineTestCase):
self.assertIsInstance(p, policies.TimeoutPolicy)
def test_wait_before_policy(self):
wb_service.create_workbook_v2(WAIT_BEFORE_WB % 1)
wb_service.create_workbook_v2(WAIT_BEFORE_WB % 10)
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1')
@ -439,7 +439,13 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING_DELAYED, task_ex.state)
self.await_task_delayed(task_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertDictEqual(
{'wait_before_policy': {'skip': True}},
task_ex.runtime_context
@ -452,15 +458,6 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1')
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.await_workflow_success(wf_ex.id)
def test_wait_before_policy_negative_number(self):
@ -476,7 +473,7 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow(
'wb.wf1',
wf_input={'wait_before': 1}
wf_input={'wait_before': 10}
)
with db_api.transaction():
@ -485,7 +482,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING_DELAYED, task_ex.state)
self.await_task_delayed(task_ex.id)
self.await_workflow_success(wf_ex.id)
@ -505,7 +502,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
# If wait_before is 0 start the task immediately without delay.
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.await_workflow_success(wf_ex.id)
@ -526,7 +523,7 @@ class PoliciesTest(base.EngineTestCase):
# If wait_before value is less than 0 the task should fail with
# InvalidModelException.
self.assertEqual(states.ERROR, task_ex.state)
self.await_task_error(task_ex.id)
self.await_workflow_error(wf_ex.id)
@ -571,7 +568,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.assertDictEqual({}, task_ex.runtime_context)
self.await_task_delayed(task_ex.id, delay=0.5)
@ -589,7 +586,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.assertDictEqual({}, task_ex.runtime_context)
try:
@ -624,7 +621,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.assertDictEqual({}, task_ex.runtime_context)
self.await_task_delayed(task_ex.id, delay=0.5)
@ -645,7 +642,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.assertDictEqual({}, task_ex.runtime_context)
try:
@ -675,7 +672,7 @@ class PoliciesTest(base.EngineTestCase):
# If wait_after value is less than 0 the task should fail with
# InvalidModelException.
self.assertEqual(states.ERROR, task_ex.state)
self.await_task_error(task_ex.id)
self.await_workflow_error(wf_ex.id)
@ -698,7 +695,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.assertDictEqual({}, task_ex.runtime_context)
self.await_task_delayed(task_ex.id, delay=0.5)
@ -733,7 +730,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.assertDictEqual({}, task_ex.runtime_context)
try:
@ -790,7 +787,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.assertDictEqual({}, task_ex.runtime_context)
self.await_task_delayed(task_ex.id, delay=0.5)
@ -828,7 +825,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.assertDictEqual({}, task_ex.runtime_context)
try:
@ -865,7 +862,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.ERROR, task_ex.state)
self.await_task_error(task_ex.id)
self.assertDictEqual({}, task_ex.runtime_context)
self.await_workflow_error(wf_ex.id)
@ -882,7 +879,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.ERROR, task_ex.state)
self.await_task_error(task_ex.id)
self.assertDictEqual({}, task_ex.runtime_context)
self.await_workflow_error(wf_ex.id)
@ -1384,6 +1381,11 @@ class PoliciesTest(base.EngineTestCase):
self.await_workflow_running(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_running(task_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1437,8 +1439,6 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.await_task_error(task_ex.id)
with db_api.transaction():
@ -1476,7 +1476,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.await_task_success(task_ex.id)
@ -1502,7 +1502,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.await_task_error(task_ex.id)
@ -1531,7 +1531,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.await_task_error(task_ex.id)
@ -1567,7 +1567,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.assertEqual(states.IDLE, task_ex.state)
self.await_task_success(task_ex.id)
@ -1585,7 +1585,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.ERROR, task_ex.state)
self.await_task_error(task_ex.id)
self.await_workflow_error(wf_ex.id)
@ -1648,6 +1648,11 @@ class PoliciesTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf1')
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_running(task_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
@ -1803,6 +1808,13 @@ class PoliciesTest(base.EngineTestCase):
wf_input={'concurrency': 4}
)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(task_execs, name='task1')
self.await_task_success(task_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1844,7 +1856,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.ERROR, task_ex.state)
self.await_task_error(task_ex.id)
self.await_workflow_error(wf_ex.id)
@ -1869,7 +1881,10 @@ class PoliciesTest(base.EngineTestCase):
'wb.wf1',
wf_input={'wait_before': '1'}
)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn(
'Invalid data type in WaitBeforePolicy',
wf_ex.state_info

View File

@ -146,7 +146,12 @@ class EngineActionRaceConditionTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.await_task_running(task_execs[0].id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(states.RUNNING, task_execs[0].state)

View File

@ -323,46 +323,3 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
{'output': updated_env['var2']},
task_3_action_exs[0].input
)
@mock.patch.object(
std_actions.EchoAction,
'run',
mock.MagicMock(
side_effect=[
'Task 1', # Mock task1 success for initial run.
exc.ActionException() # Mock task2 exception for initial run.
]
)
)
def test_rerun_from_prev_step(self):
wb_service.create_workbook_v2(SIMPLE_WORKBOOK)
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1', task_name='t3')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(task_execs))
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.ERROR, task_2_ex.state)
self.assertIsNotNone(task_2_ex.state_info)
# Resume workflow and re-run failed task.
e = self.assertRaises(
exc.MistralError,
self.engine.rerun_workflow,
task_1_ex.id
)
self.assertIn('not supported', str(e))

View File

@ -75,7 +75,16 @@ class ReverseWorkflowRerunCancelledTest(base.EngineTestCase):
wf1_ex.task_executions,
name='t1'
)
self.await_task_running(wf1_t1_ex.id)
with db_api.transaction():
wf1_execs = db_api.get_workflow_executions()
wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1')
wf1_t1_ex = self._assert_single_item(
wf1_ex.task_executions,
name='t1'
)
wf1_t1_action_exs = db_api.get_action_executions(
task_execution_id=wf1_t1_ex.id
)
@ -128,12 +137,12 @@ class ReverseWorkflowRerunCancelledTest(base.EngineTestCase):
# Mark async action execution complete.
wf1_t1_ex = self._assert_single_item(wf1_task_execs, name='t1')
self.await_task_running(wf1_t1_ex.id)
wf1_t1_action_exs = db_api.get_action_executions(
task_execution_id=wf1_t1_ex.id
)
self.assertEqual(states.RUNNING, wf1_t1_ex.state)
self.assertEqual(2, len(wf1_t1_action_exs))
# Check there is exactly 1 action in Running and 1 in Cancelled state.
# Order doesn't matter.

View File

@ -67,6 +67,19 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
self.await_workflow_state(wf_1_ex.id, states.RUNNING)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
# Get objects for the parent workflow execution.
wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1')
wf_1_task_execs = wf_1_ex.task_executions
wf_1_task_1_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task1'
)
self.await_task_running(wf_1_task_1_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
@ -394,6 +407,19 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
self.await_workflow_state(wf_1_ex.id, states.RUNNING)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
# Get objects for the parent workflow execution.
wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1')
wf_1_task_execs = wf_1_ex.task_executions
wf_1_task_1_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task1'
)
self.await_task_running(wf_1_task_1_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
@ -836,6 +862,19 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
self.await_workflow_state(wf_1_ex.id, states.RUNNING)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
# Get objects for the parent workflow execution.
wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1')
wf_1_task_execs = wf_1_ex.task_executions
wf_1_task_1_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task1'
)
self.await_task_running(wf_1_task_1_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
@ -1373,6 +1412,19 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
self.await_workflow_state(wf_1_ex.id, states.RUNNING)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
# Get objects for the parent workflow execution.
wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1')
wf_1_task_execs = wf_1_ex.task_executions
wf_1_task_1_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task1'
)
self.await_task_running(wf_1_task_1_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
@ -2020,6 +2072,18 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
self.await_workflow_running(wf_2_ex.id)
self.await_workflow_running(wf_3_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
# Get objects for the parent workflow execution.
wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1')
wf_1_task_1_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task1'
)
self.await_task_running(wf_1_task_1_ex.id)
self.await_task_running(wf_2_task_2_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()

View File

@ -56,6 +56,16 @@ class TaskCancelTest(base.EngineTestCase):
self.await_workflow_state(wf_ex.id, states.RUNNING)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wf')
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
self.await_task_running(task_1_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
@ -147,6 +157,15 @@ class TaskCancelTest(base.EngineTestCase):
self.await_workflow_state(wf_ex.id, states.RUNNING)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='taskx'
)
self.await_task_running(task_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
@ -223,6 +242,16 @@ class TaskCancelTest(base.EngineTestCase):
self.await_workflow_state(wf_ex.id, states.RUNNING)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wf')
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
self.await_task_running(task_1_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()

View File

@ -42,6 +42,18 @@ class TaskPauseResumeTest(base.EngineTestCase):
self.await_workflow_state(wf_ex.id, states.RUNNING)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wf')
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
self.await_task_running(task_1_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
@ -162,6 +174,18 @@ class TaskPauseResumeTest(base.EngineTestCase):
self.await_workflow_state(wf_ex.id, states.RUNNING)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wf')
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
self.await_task_running(task_1_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()

View File

@ -114,15 +114,15 @@ class TaskStartedFinishedAtTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf')
task_ex = self._extract_task_ex(wf_ex.id)
created_at_before_retry = task_ex.created_at
self.await_workflow_error(wf_ex.id)
task_ex = self._extract_task_ex(wf_ex.id)
created_at_after_retry = task_ex.created_at
created_at = task_ex.created_at
started_at = self._get_started_finished(task_ex)[0]
delta = int((started_at - created_at).total_seconds())
self.assertLessEqual(delta, 1)
self.assertEqual(created_at_before_retry, created_at_after_retry)
def test_wait_before_after_are_included_to_duration(self):
wf_text = """

View File

@ -389,6 +389,12 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_input={'items': [1, 2, 3]}
)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_running(task_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -627,6 +633,11 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_running(task_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -763,8 +774,24 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_service.create_workflows(wf_with_concurrency_yaql)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', wf_input={'concurrency': '2'})
self.engine.start_workflow('wf', wf_input={'concurrency': '2'})
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wf')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
self.await_task_error(task_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wf')
self.assertIn(
'Invalid data type in ConcurrencyPolicy',
wf_ex.state_info
@ -793,6 +820,12 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_running(task_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -947,7 +980,12 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_running(task_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(3, self._get_running_actions_count(task_ex.id))
# 1st iteration complete.

View File

@ -202,6 +202,15 @@ class WorkflowCancelTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wb.wf')
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(task_execs, name='taskx')
self.engine.start_task(task_ex.id, True, False, None, False, False)
self.await_task_running(task_ex.id)
self.engine.stop_workflow(
wf_ex.id,
states.CANCELLED,
@ -261,13 +270,22 @@ class WorkflowCancelTest(base.EngineTestCase):
task2:
action: std.echo output="foo"
wait-before: 3
wait-before: 10
"""
wb_service.create_workbook_v2(workbook)
self.engine.start_workflow('wb.wf')
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='taskx'
)
self.await_task_running(task_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
@ -329,12 +347,19 @@ class WorkflowCancelTest(base.EngineTestCase):
task2:
action: std.echo output="foo"
wait-before: 1
wait-before: 10
"""
wb_service.create_workbook_v2(workbook)
wf_ex = self.engine.start_workflow('wb.wf')
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(task_execs, name='taskx')
self.await_task_running(task_ex.id)
self.engine.stop_workflow(
wf_ex.id,
states.CANCELLED,
@ -398,13 +423,23 @@ class WorkflowCancelTest(base.EngineTestCase):
task2:
action: std.echo output="foo"
wait-before: 1
wait-before: 10
"""
wb_service.create_workbook_v2(workbook)
self.engine.start_workflow('wb.wf')
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='taskx'
)
self.await_task_running(task_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
@ -477,13 +512,23 @@ class WorkflowCancelTest(base.EngineTestCase):
task2:
action: std.echo output="foo"
wait-before: 1
wait-before: 10
"""
wb_service.create_workbook_v2(workbook)
self.engine.start_workflow('wb.wf')
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='taskx'
)
self.await_task_running(task_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
@ -562,7 +607,7 @@ class WorkflowCancelTest(base.EngineTestCase):
task2:
action: std.echo output="foo"
wait-before: 1
wait-before: 10
"""
wb_service.create_workbook_v2(workbook)
@ -577,6 +622,13 @@ class WorkflowCancelTest(base.EngineTestCase):
wf_ex.task_executions,
name='taskx'
)
self.await_task_running(task_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
subwf_exs = self._assert_multiple_items(
wf_execs,
2,

View File

@ -729,6 +729,11 @@ class NotifyEventsTest(base.NotifierTestCase):
task_exs = wf_ex.task_executions
t1_ex = self._assert_single_item(task_exs, name='t1')
self.await_task_running(t1_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_exs = wf_ex.task_executions
t1_ex = self._assert_single_item(task_exs, name='t1')
t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
@ -859,6 +864,12 @@ class NotifyEventsTest(base.NotifierTestCase):
task_exs = wf_ex.task_executions
t1_ex = self._assert_single_item(task_exs, name='t1')
self.await_task_running(t1_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_exs = wf_ex.task_executions
t1_ex = self._assert_single_item(task_exs, name='t1')
t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
@ -983,6 +994,12 @@ class NotifyEventsTest(base.NotifierTestCase):
task_exs = wf_ex.task_executions
t1_ex = self._assert_single_item(task_exs, name='t1')
self.await_task_running(t1_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_exs = wf_ex.task_executions
t1_ex = self._assert_single_item(task_exs, name='t1')
t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
@ -1077,6 +1094,12 @@ class NotifyEventsTest(base.NotifierTestCase):
task_exs = wf_ex.task_executions
t1_ex = self._assert_single_item(task_exs, name='t1')
self.await_task_running(t1_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_exs = wf_ex.task_executions
t1_ex = self._assert_single_item(task_exs, name='t1')
t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)