From 6b8f15385a601864f42a9726232013888b80097f Mon Sep 17 00:00:00 2001 From: hardik Date: Tue, 24 May 2016 18:25:31 +0530 Subject: [PATCH] Fail/Success/Pause transition message Now user can provide customize message for fail/success/pause transition. Change-Id: I84b1fbc63aaf8186c81eea9c852f5b49db93f0ff Implements: blueprint mistral-fail-transition-message --- mistral/engine/dispatcher.py | 2 +- mistral/tests/unit/engine/test_commands.py | 162 ++++++++++++++++++ .../tests/unit/workbook/v2/test_workbook.py | 12 +- mistral/workbook/base.py | 2 +- mistral/workbook/v2/task_defaults.py | 16 +- mistral/workbook/v2/tasks.py | 22 ++- mistral/workflow/commands.py | 7 +- mistral/workflow/direct_workflow.py | 44 +++-- 8 files changed, 234 insertions(+), 33 deletions(-) diff --git a/mistral/engine/dispatcher.py b/mistral/engine/dispatcher.py index e17233084..cd4c8ead1 100644 --- a/mistral/engine/dispatcher.py +++ b/mistral/engine/dispatcher.py @@ -35,7 +35,7 @@ def dispatch_workflow_commands(wf_ex, wf_cmds): if states.is_completed(cmd.new_state): wf_handler.stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg) else: - wf_handler.set_workflow_state(wf_ex, cmd.new_state) + wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg) elif isinstance(cmd, commands.Noop): # Do nothing. pass diff --git a/mistral/tests/unit/engine/test_commands.py b/mistral/tests/unit/engine/test_commands.py index afcda505b..615c24a2f 100644 --- a/mistral/tests/unit/engine/test_commands.py +++ b/mistral/tests/unit/engine/test_commands.py @@ -313,3 +313,165 @@ class OrderEngineCommandsTest(base.EngineTestCase): self.await_task_error(task2_db.id) self.await_execution_success(wf_ex.id) + +WORKBOOK4 = """ +--- +version: '2.0' + +name: my_wb + +workflows: + wf: + type: direct + input: + - my_var + + tasks: + task1: + action: std.echo output='1' + on-complete: + - fail(msg='my_var value is 1'): <% $.my_var = 1 %> + - succeed(msg='my_var value is 2'): <% $.my_var = 2 %> + - pause(msg='my_var value is 3'): <% $.my_var = 3 %> + - task2 + + task2: + action: std.echo output='2' +""" + + +class SimpleEngineCmdsWithMsgTest(base.EngineTestCase): + def setUp(self): + super(SimpleEngineCmdsWithMsgTest, self).setUp() + + wb_service.create_workbook_v2(WORKBOOK4) + + def test_fail(self): + wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 1}) + + self.await_execution_error(wf_ex.id) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(1, len(wf_ex.task_executions)) + self._assert_single_item( + wf_ex.task_executions, + name='task1', + state=states.SUCCESS + ) + self.assertEqual(states.ERROR, wf_ex.state) + self.assertEqual('my_var value is 1', wf_ex.state_info) + + def test_succeed(self): + wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 2}) + + self.await_execution_success(wf_ex.id) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(1, len(wf_ex.task_executions)) + self._assert_single_item( + wf_ex.task_executions, + name='task1', + state=states.SUCCESS + ) + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertEqual("my_var value is 2", wf_ex.state_info) + + def test_pause(self): + wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 3}) + + self.await_execution_paused(wf_ex.id) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(1, len(wf_ex.task_executions)) + self._assert_single_item( + wf_ex.task_executions, + name='task1', + state=states.SUCCESS + ) + self.assertEqual(states.PAUSED, wf_ex.state) + self.assertEqual("my_var value is 3", wf_ex.state_info) + +WORKBOOK5 = """ +--- +version: '2.0' + +name: my_wb + +workflows: + wf: + type: direct + input: + - my_var + + task-defaults: + on-complete: + - fail(msg='my_var value is 1'): <% $.my_var = 1 %> + - succeed(msg='my_var value is <% $.my_var %>'): <% $.my_var = 2 %> + - pause(msg='my_var value is 3'): <% $.my_var = 3 %> + - task2: <% $.my_var = 4 %> # (Never happens in this test) + + tasks: + task1: + action: std.echo output='1' + + task2: + action: std.echo output='2' +""" + + +class SimpleEngineWorkflowLevelCmdsWithMsgTest(base.EngineTestCase): + def setUp(self): + super(SimpleEngineWorkflowLevelCmdsWithMsgTest, self).setUp() + + wb_service.create_workbook_v2(WORKBOOK5) + + def test_fail(self): + wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 1}) + + self.await_execution_error(wf_ex.id) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(1, len(wf_ex.task_executions)) + self._assert_single_item( + wf_ex.task_executions, + name='task1', + state=states.SUCCESS + ) + self.assertEqual(states.ERROR, wf_ex.state) + self.assertEqual("my_var value is 1", wf_ex.state_info) + + def test_succeed(self): + wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 2}) + + self.await_execution_success(wf_ex.id) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(1, len(wf_ex.task_executions)) + self._assert_single_item( + wf_ex.task_executions, + name='task1', + state=states.SUCCESS + ) + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertEqual("my_var value is 2", wf_ex.state_info) + + def test_pause(self): + wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 3}) + + self.await_execution_paused(wf_ex.id) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(1, len(wf_ex.task_executions)) + self._assert_single_item( + wf_ex.task_executions, + name='task1', + state=states.SUCCESS + ) + self.assertEqual(states.PAUSED, wf_ex.state) + self.assertEqual("my_var value is 3", wf_ex.state_info) diff --git a/mistral/tests/unit/workbook/v2/test_workbook.py b/mistral/tests/unit/workbook/v2/test_workbook.py index 78d084b94..b550869d1 100644 --- a/mistral/tests/unit/workbook/v2/test_workbook.py +++ b/mistral/tests/unit/workbook/v2/test_workbook.py @@ -118,15 +118,15 @@ class WorkbookSpecValidation(base.WorkbookSpecValidationTestCase): task_defaults_spec = wf2_spec.get_task_defaults() self.assertListEqual( - [('fail', '<% $.my_val = 0 %>')], + [('fail', '<% $.my_val = 0 %>', {})], task_defaults_spec.get_on_error() ) self.assertListEqual( - [('pause', '')], + [('pause', '', {})], task_defaults_spec.get_on_success() ) self.assertListEqual( - [('succeed', '')], + [('succeed', '', {})], task_defaults_spec.get_on_complete() ) @@ -147,15 +147,15 @@ class WorkbookSpecValidation(base.WorkbookSpecValidationTestCase): task3_spec.get_input() ) self.assertListEqual( - [('task4', '<% $.my_val = 1 %>')], + [('task4', '<% $.my_val = 1 %>', {})], task3_spec.get_on_error() ) self.assertListEqual( - [('task5', '<% $.my_val = 2 %>')], + [('task5', '<% $.my_val = 2 %>', {})], task3_spec.get_on_success() ) self.assertListEqual( - [('task6', '<% $.my_val = 3 %>')], + [('task6', '<% $.my_val = 3 %>', {})], task3_spec.get_on_complete() ) diff --git a/mistral/workbook/base.py b/mistral/workbook/base.py index 8bc6bef3e..62ed563d8 100644 --- a/mistral/workbook/base.py +++ b/mistral/workbook/base.py @@ -25,7 +25,7 @@ from mistral import utils from mistral.workbook import types -CMD_PTRN = re.compile("^[\w\.]+[^=\s\"]*") +CMD_PTRN = re.compile("^[\w\.]+[^=\(\s\"]*") INLINE_YAQL = expr.INLINE_YAQL_REGEXP _ALL_IN_BRACKETS = "\[.*\]\s*" diff --git a/mistral/workbook/v2/task_defaults.py b/mistral/workbook/v2/task_defaults.py index 86e06329a..18446a66f 100644 --- a/mistral/workbook/v2/task_defaults.py +++ b/mistral/workbook/v2/task_defaults.py @@ -18,6 +18,10 @@ import six from mistral.workbook import types from mistral.workbook.v2 import base from mistral.workbook.v2 import policies +from mistral.workbook.v2 import tasks + + +direct_wf_ts = tasks.DirectWorkflowTaskSpec class TaskDefaultsSpec(base.BaseSpec): @@ -67,9 +71,15 @@ class TaskDefaultsSpec(base.BaseSpec): 'pause-before', 'concurrency' ) - self._on_complete = self._as_list_of_tuples("on-complete") - self._on_success = self._as_list_of_tuples("on-success") - self._on_error = self._as_list_of_tuples("on-error") + self._on_complete = direct_wf_ts.prepare_on_clause( + self._as_list_of_tuples('on-complete') + ) + self._on_success = direct_wf_ts.prepare_on_clause( + self._as_list_of_tuples('on-success') + ) + self._on_error = direct_wf_ts.prepare_on_clause( + self._as_list_of_tuples('on-error') + ) self._requires = data.get('requires', []) def validate_schema(self): diff --git a/mistral/workbook/v2/tasks.py b/mistral/workbook/v2/tasks.py index 8029a3e6c..e7dd0729e 100644 --- a/mistral/workbook/v2/tasks.py +++ b/mistral/workbook/v2/tasks.py @@ -244,9 +244,15 @@ class DirectWorkflowTaskSpec(TaskSpec): super(DirectWorkflowTaskSpec, self).__init__(data) self._join = data.get('join') - self._on_complete = self._as_list_of_tuples('on-complete') - self._on_success = self._as_list_of_tuples('on-success') - self._on_error = self._as_list_of_tuples('on-error') + self._on_complete = self.prepare_on_clause( + self._as_list_of_tuples('on-complete') + ) + self._on_success = self.prepare_on_clause( + self._as_list_of_tuples('on-success') + ) + self._on_error = self.prepare_on_clause( + self._as_list_of_tuples('on-error') + ) def validate_schema(self): super(DirectWorkflowTaskSpec, self).validate_schema() @@ -262,6 +268,16 @@ class DirectWorkflowTaskSpec(TaskSpec): [self.validate_yaql_expr(t) for t in ([val] if isinstance(val, six.string_types) else val)] + @staticmethod + def prepare_on_clause(list_of_tuples): + for i, task in enumerate(list_of_tuples): + task_name, params = DirectWorkflowTaskSpec._parse_cmd_and_input( + task[0] + ) + list_of_tuples[i] = (task_name, task[1], params) + + return list_of_tuples + def get_join(self): return self._join diff --git a/mistral/workflow/commands.py b/mistral/workflow/commands.py index 739818cec..93f94a423 100644 --- a/mistral/workflow/commands.py +++ b/mistral/workflow/commands.py @@ -146,7 +146,10 @@ def get_command_class(cmd_name): return RESERVED_CMDS[cmd_name] if cmd_name in RESERVED_CMDS else None -def create_command(cmd_name, wf_ex, task_spec, ctx): +def create_command(cmd_name, wf_ex, task_spec, ctx, explicit_params=None): cmd_cls = get_command_class(cmd_name) or RunTask - return cmd_cls(wf_ex, task_spec, ctx) + if issubclass(cmd_cls, SetWorkflowState): + return cmd_cls(wf_ex, task_spec, ctx, explicit_params.get('msg')) + else: + return cmd_cls(wf_ex, task_spec, ctx) diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index fad7fdabb..5bfdca6a1 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -103,7 +103,7 @@ class DirectWorkflowController(base.WorkflowController): cmds = [] - for t_n in self._find_next_task_names(task_ex): + for t_n, params in self._find_next_tasks(task_ex): t_s = self.wf_spec.get_tasks()[t_n] if not (t_s or t_n in commands.RESERVED_CMDS): @@ -115,7 +115,8 @@ class DirectWorkflowController(base.WorkflowController): t_n, self.wf_ex, t_s, - self._get_task_inbound_context(t_s) + self._get_task_inbound_context(t_s), + params ) # NOTE(xylan): Decide whether or not a join task should run @@ -153,7 +154,7 @@ class DirectWorkflowController(base.WorkflowController): def all_errors_handled(self): for t_ex in wf_utils.find_error_task_executions(self.wf_ex): - tasks_on_error = self._find_next_task_names_for_clause( + tasks_on_error = self._find_next_tasks_for_clause( self.wf_spec.get_on_error_clause(t_ex.name), data_flow.evaluate_task_outbound_context(t_ex) ) @@ -182,35 +183,44 @@ class DirectWorkflowController(base.WorkflowController): ]) def _find_next_task_names(self, task_ex): + return [t[0] for t in self._find_next_tasks(task_ex)] + + def _find_next_tasks(self, task_ex): t_state = task_ex.state t_name = task_ex.name ctx = data_flow.evaluate_task_outbound_context(task_ex) - t_names = [] + t_names_and_params = [] if states.is_completed(t_state): - t_names += self._find_next_task_names_for_clause( - self.wf_spec.get_on_complete_clause(t_name), - ctx + t_names_and_params += ( + self._find_next_tasks_for_clause( + self.wf_spec.get_on_complete_clause(t_name), + ctx + ) ) if t_state == states.ERROR: - t_names += self._find_next_task_names_for_clause( - self.wf_spec.get_on_error_clause(t_name), - ctx + t_names_and_params += ( + self._find_next_tasks_for_clause( + self.wf_spec.get_on_error_clause(t_name), + ctx + ) ) elif t_state == states.SUCCESS: - t_names += self._find_next_task_names_for_clause( - self.wf_spec.get_on_success_clause(t_name), - ctx + t_names_and_params += ( + self._find_next_tasks_for_clause( + self.wf_spec.get_on_success_clause(t_name), + ctx + ) ) - return t_names + return t_names_and_params @staticmethod - def _find_next_task_names_for_clause(clause, ctx): + def _find_next_tasks_for_clause(clause, ctx): """Finds next tasks names. This method finds next task(command) base on given {name: condition} @@ -226,8 +236,8 @@ class DirectWorkflowController(base.WorkflowController): return [] return [ - t_name - for t_name, condition in clause + (t_name, expr.evaluate_recursively(params, ctx)) + for t_name, condition, params in clause if not condition or expr.evaluate(condition, ctx) ]