Fail/Success/Pause transition message

Now user can provide customize message for
fail/success/pause transition.

Change-Id: I84b1fbc63aaf8186c81eea9c852f5b49db93f0ff
Implements: blueprint mistral-fail-transition-message
This commit is contained in:
hardik 2016-05-24 18:25:31 +05:30
parent 3641b46d15
commit 6b8f15385a
8 changed files with 234 additions and 33 deletions

View File

@ -35,7 +35,7 @@ def dispatch_workflow_commands(wf_ex, wf_cmds):
if states.is_completed(cmd.new_state):
wf_handler.stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg)
else:
wf_handler.set_workflow_state(wf_ex, cmd.new_state)
wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg)
elif isinstance(cmd, commands.Noop):
# Do nothing.
pass

View File

@ -313,3 +313,165 @@ class OrderEngineCommandsTest(base.EngineTestCase):
self.await_task_error(task2_db.id)
self.await_execution_success(wf_ex.id)
WORKBOOK4 = """
---
version: '2.0'
name: my_wb
workflows:
wf:
type: direct
input:
- my_var
tasks:
task1:
action: std.echo output='1'
on-complete:
- fail(msg='my_var value is 1'): <% $.my_var = 1 %>
- succeed(msg='my_var value is 2'): <% $.my_var = 2 %>
- pause(msg='my_var value is 3'): <% $.my_var = 3 %>
- task2
task2:
action: std.echo output='2'
"""
class SimpleEngineCmdsWithMsgTest(base.EngineTestCase):
def setUp(self):
super(SimpleEngineCmdsWithMsgTest, self).setUp()
wb_service.create_workbook_v2(WORKBOOK4)
def test_fail(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 1})
self.await_execution_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
self._assert_single_item(
wf_ex.task_executions,
name='task1',
state=states.SUCCESS
)
self.assertEqual(states.ERROR, wf_ex.state)
self.assertEqual('my_var value is 1', wf_ex.state_info)
def test_succeed(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 2})
self.await_execution_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
self._assert_single_item(
wf_ex.task_executions,
name='task1',
state=states.SUCCESS
)
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertEqual("my_var value is 2", wf_ex.state_info)
def test_pause(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 3})
self.await_execution_paused(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
self._assert_single_item(
wf_ex.task_executions,
name='task1',
state=states.SUCCESS
)
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual("my_var value is 3", wf_ex.state_info)
WORKBOOK5 = """
---
version: '2.0'
name: my_wb
workflows:
wf:
type: direct
input:
- my_var
task-defaults:
on-complete:
- fail(msg='my_var value is 1'): <% $.my_var = 1 %>
- succeed(msg='my_var value is <% $.my_var %>'): <% $.my_var = 2 %>
- pause(msg='my_var value is 3'): <% $.my_var = 3 %>
- task2: <% $.my_var = 4 %> # (Never happens in this test)
tasks:
task1:
action: std.echo output='1'
task2:
action: std.echo output='2'
"""
class SimpleEngineWorkflowLevelCmdsWithMsgTest(base.EngineTestCase):
def setUp(self):
super(SimpleEngineWorkflowLevelCmdsWithMsgTest, self).setUp()
wb_service.create_workbook_v2(WORKBOOK5)
def test_fail(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 1})
self.await_execution_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
self._assert_single_item(
wf_ex.task_executions,
name='task1',
state=states.SUCCESS
)
self.assertEqual(states.ERROR, wf_ex.state)
self.assertEqual("my_var value is 1", wf_ex.state_info)
def test_succeed(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 2})
self.await_execution_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
self._assert_single_item(
wf_ex.task_executions,
name='task1',
state=states.SUCCESS
)
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertEqual("my_var value is 2", wf_ex.state_info)
def test_pause(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 3})
self.await_execution_paused(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
self._assert_single_item(
wf_ex.task_executions,
name='task1',
state=states.SUCCESS
)
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual("my_var value is 3", wf_ex.state_info)

View File

@ -118,15 +118,15 @@ class WorkbookSpecValidation(base.WorkbookSpecValidationTestCase):
task_defaults_spec = wf2_spec.get_task_defaults()
self.assertListEqual(
[('fail', '<% $.my_val = 0 %>')],
[('fail', '<% $.my_val = 0 %>', {})],
task_defaults_spec.get_on_error()
)
self.assertListEqual(
[('pause', '')],
[('pause', '', {})],
task_defaults_spec.get_on_success()
)
self.assertListEqual(
[('succeed', '')],
[('succeed', '', {})],
task_defaults_spec.get_on_complete()
)
@ -147,15 +147,15 @@ class WorkbookSpecValidation(base.WorkbookSpecValidationTestCase):
task3_spec.get_input()
)
self.assertListEqual(
[('task4', '<% $.my_val = 1 %>')],
[('task4', '<% $.my_val = 1 %>', {})],
task3_spec.get_on_error()
)
self.assertListEqual(
[('task5', '<% $.my_val = 2 %>')],
[('task5', '<% $.my_val = 2 %>', {})],
task3_spec.get_on_success()
)
self.assertListEqual(
[('task6', '<% $.my_val = 3 %>')],
[('task6', '<% $.my_val = 3 %>', {})],
task3_spec.get_on_complete()
)

View File

@ -25,7 +25,7 @@ from mistral import utils
from mistral.workbook import types
CMD_PTRN = re.compile("^[\w\.]+[^=\s\"]*")
CMD_PTRN = re.compile("^[\w\.]+[^=\(\s\"]*")
INLINE_YAQL = expr.INLINE_YAQL_REGEXP
_ALL_IN_BRACKETS = "\[.*\]\s*"

View File

@ -18,6 +18,10 @@ import six
from mistral.workbook import types
from mistral.workbook.v2 import base
from mistral.workbook.v2 import policies
from mistral.workbook.v2 import tasks
direct_wf_ts = tasks.DirectWorkflowTaskSpec
class TaskDefaultsSpec(base.BaseSpec):
@ -67,9 +71,15 @@ class TaskDefaultsSpec(base.BaseSpec):
'pause-before',
'concurrency'
)
self._on_complete = self._as_list_of_tuples("on-complete")
self._on_success = self._as_list_of_tuples("on-success")
self._on_error = self._as_list_of_tuples("on-error")
self._on_complete = direct_wf_ts.prepare_on_clause(
self._as_list_of_tuples('on-complete')
)
self._on_success = direct_wf_ts.prepare_on_clause(
self._as_list_of_tuples('on-success')
)
self._on_error = direct_wf_ts.prepare_on_clause(
self._as_list_of_tuples('on-error')
)
self._requires = data.get('requires', [])
def validate_schema(self):

View File

@ -244,9 +244,15 @@ class DirectWorkflowTaskSpec(TaskSpec):
super(DirectWorkflowTaskSpec, self).__init__(data)
self._join = data.get('join')
self._on_complete = self._as_list_of_tuples('on-complete')
self._on_success = self._as_list_of_tuples('on-success')
self._on_error = self._as_list_of_tuples('on-error')
self._on_complete = self.prepare_on_clause(
self._as_list_of_tuples('on-complete')
)
self._on_success = self.prepare_on_clause(
self._as_list_of_tuples('on-success')
)
self._on_error = self.prepare_on_clause(
self._as_list_of_tuples('on-error')
)
def validate_schema(self):
super(DirectWorkflowTaskSpec, self).validate_schema()
@ -262,6 +268,16 @@ class DirectWorkflowTaskSpec(TaskSpec):
[self.validate_yaql_expr(t)
for t in ([val] if isinstance(val, six.string_types) else val)]
@staticmethod
def prepare_on_clause(list_of_tuples):
for i, task in enumerate(list_of_tuples):
task_name, params = DirectWorkflowTaskSpec._parse_cmd_and_input(
task[0]
)
list_of_tuples[i] = (task_name, task[1], params)
return list_of_tuples
def get_join(self):
return self._join

View File

@ -146,7 +146,10 @@ def get_command_class(cmd_name):
return RESERVED_CMDS[cmd_name] if cmd_name in RESERVED_CMDS else None
def create_command(cmd_name, wf_ex, task_spec, ctx):
def create_command(cmd_name, wf_ex, task_spec, ctx, explicit_params=None):
cmd_cls = get_command_class(cmd_name) or RunTask
return cmd_cls(wf_ex, task_spec, ctx)
if issubclass(cmd_cls, SetWorkflowState):
return cmd_cls(wf_ex, task_spec, ctx, explicit_params.get('msg'))
else:
return cmd_cls(wf_ex, task_spec, ctx)

View File

@ -103,7 +103,7 @@ class DirectWorkflowController(base.WorkflowController):
cmds = []
for t_n in self._find_next_task_names(task_ex):
for t_n, params in self._find_next_tasks(task_ex):
t_s = self.wf_spec.get_tasks()[t_n]
if not (t_s or t_n in commands.RESERVED_CMDS):
@ -115,7 +115,8 @@ class DirectWorkflowController(base.WorkflowController):
t_n,
self.wf_ex,
t_s,
self._get_task_inbound_context(t_s)
self._get_task_inbound_context(t_s),
params
)
# NOTE(xylan): Decide whether or not a join task should run
@ -153,7 +154,7 @@ class DirectWorkflowController(base.WorkflowController):
def all_errors_handled(self):
for t_ex in wf_utils.find_error_task_executions(self.wf_ex):
tasks_on_error = self._find_next_task_names_for_clause(
tasks_on_error = self._find_next_tasks_for_clause(
self.wf_spec.get_on_error_clause(t_ex.name),
data_flow.evaluate_task_outbound_context(t_ex)
)
@ -182,35 +183,44 @@ class DirectWorkflowController(base.WorkflowController):
])
def _find_next_task_names(self, task_ex):
return [t[0] for t in self._find_next_tasks(task_ex)]
def _find_next_tasks(self, task_ex):
t_state = task_ex.state
t_name = task_ex.name
ctx = data_flow.evaluate_task_outbound_context(task_ex)
t_names = []
t_names_and_params = []
if states.is_completed(t_state):
t_names += self._find_next_task_names_for_clause(
self.wf_spec.get_on_complete_clause(t_name),
ctx
t_names_and_params += (
self._find_next_tasks_for_clause(
self.wf_spec.get_on_complete_clause(t_name),
ctx
)
)
if t_state == states.ERROR:
t_names += self._find_next_task_names_for_clause(
self.wf_spec.get_on_error_clause(t_name),
ctx
t_names_and_params += (
self._find_next_tasks_for_clause(
self.wf_spec.get_on_error_clause(t_name),
ctx
)
)
elif t_state == states.SUCCESS:
t_names += self._find_next_task_names_for_clause(
self.wf_spec.get_on_success_clause(t_name),
ctx
t_names_and_params += (
self._find_next_tasks_for_clause(
self.wf_spec.get_on_success_clause(t_name),
ctx
)
)
return t_names
return t_names_and_params
@staticmethod
def _find_next_task_names_for_clause(clause, ctx):
def _find_next_tasks_for_clause(clause, ctx):
"""Finds next tasks names.
This method finds next task(command) base on given {name: condition}
@ -226,8 +236,8 @@ class DirectWorkflowController(base.WorkflowController):
return []
return [
t_name
for t_name, condition in clause
(t_name, expr.evaluate_recursively(params, ctx))
for t_name, condition, params in clause
if not condition or expr.evaluate(condition, ctx)
]