diff --git a/mistral/tests/unit/engine/test_tasks_function.py b/mistral/tests/unit/engine/test_tasks_function.py new file mode 100644 index 000000000..54ee12e3b --- /dev/null +++ b/mistral/tests/unit/engine/test_tasks_function.py @@ -0,0 +1,440 @@ +# Copyright 2016 - Nokia, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + +from mistral.db.v2 import api as db_api +from mistral.services import workbooks as wb_service +from mistral.tests.unit.engine import base +from mistral.workflow import states + + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + + +WORKBOOK_WITH_EXPRESSIONS = """ +--- +version: '2.0' + +name: wb + +workflows: + test_tasks_function: + type: direct + input: + - wf1_wx_id + - wf2_wx_id + - wf3_wx_id + - wf4_wx_id + - wf5_wx_id + + tasks: + main_task: + action: std.noop + publish: + all_tasks_yaql: <% tasks() %> + all_tasks_jinja: "{{ tasks() }}" + + wf1_tasks_yaql: <% tasks($.wf1_wx_id) %> + wf1_tasks_jinja: "{{ tasks(_.wf1_wx_id) }}" + wf1_recursive_tasks_yaql: <% tasks($.wf1_wx_id, true) %> + wf1_recursive_tasks_jinja: "{{ tasks(_.wf1_wx_id, true) }}" + wf1_recursive_error_tasks_yaql: <% tasks($.wf1_wx_id, true, ERROR) %> + wf1_recursive_error_tasks_jinja: + "{{ tasks(_.wf1_wx_id, True, 'ERROR') }}" + wf1_not_recursive_error_tasks_yaql: + <% tasks($.wf1_wx_id, false, ERROR) %> + wf1_not_recursive_error_tasks_jinja: + "{{ tasks(_.wf1_wx_id, False, 'ERROR') }}" + wf1_recursive_success_flat_tasks_yaql: + <% tasks($.wf1_wx_id, true, SUCCESS, true) %> + wf1_recursive_success_flat_tasks_jinja: + "{{ tasks(_.wf1_wx_id, True, 'SUCCESS', True) }}" + + wf2_recursive_tasks_yaql: <% tasks($.wf2_wx_id, true) %> + wf2_recursive_tasks_jinja: "{{ tasks(_.wf2_wx_id, true) }}" + + wf3_recursive_error_tasks_yaql: <% tasks($.wf3_wx_id, true, ERROR) %> + wf3_recursive_error_tasks_jinja: + "{{ tasks(_.wf3_wx_id, True, 'ERROR') }}" + wf3_recursive_error_flat_tasks_yaql: + <% tasks($.wf3_wx_id, true, ERROR, true) %> + wf3_recursive_error_flat_tasks_jinja: + "{{ tasks(_.wf3_wx_id, True, 'ERROR', True) }}" + + wf4_recursive_error_flat_tasks_yaql: + <% tasks($.wf4_wx_id, true, ERROR, true) %> + wf4_recursive_error_flat_tasks_jinja: + "{{ tasks(_.wf4_wx_id, True, 'ERROR', True) }}" + + + wf5_recursive_error_flat_tasks_yaql: + <% tasks($.wf5_wx_id, true, ERROR, true) %> + wf5_recursive_error_flat_tasks_jinja: + "{{ tasks(_.wf5_wx_id, True, 'ERROR', True) }}" + + + wf1_top_lvl: + type: direct + tasks: + top_lvl_wf1_task_1: + workflow: wf1_second_lvl + top_lvl_wf1_task_2: + action: std.noop + + wf1_second_lvl: + type: direct + tasks: + second_lvl_wf1_task_1: + workflow: wf1_third_lvl_fail + on-error: + - second_lvl_wf1_task_2 + second_lvl_wf1_task_2: + action: std.noop + second_lvl_wf1_task_3: + action: std.noop + + wf1_third_lvl_fail: + type: direct + tasks: + third_lvl_wf1_task_1: + action: std.noop + on-success: + - third_lvl_wf1_task_2_fail + third_lvl_wf1_task_2_fail: + action: std.fail + third_lvl_wf1_task_3: + action: std.noop + + + wf2_top_lvl: + type: direct + tasks: + top_lvl_wf2_task_1: + action: std.noop + top_lvl_wf2_task_2: + action: std.noop + + + wf3_top_lvl: + type: direct + tasks: + top_lvl_wf3_task_1_fail: + workflow: wf3_second_lvl_fail + top_lvl_wf3_task_2_fail: + action: std.fail + + wf3_second_lvl_fail: + type: direct + tasks: + second_lvl_wf3_task_1_fail: + workflow: wf3_third_lvl_fail + second_lvl_wf3_task_2: + action: std.noop + second_lvl_wf3_task_3: + action: std.noop + + wf3_third_lvl_fail: + type: direct + tasks: + third_lvl_wf3_task_1: + action: std.noop + on-success: + - third_lvl_wf3_task_2 + third_lvl_wf3_task_2: + action: std.noop + third_lvl_wf3_task_3_fail: + action: std.fail + + wf4_top_lvl: + type: direct + tasks: + top_lvl_wf4_task_1: + workflow: wf4_second_lvl + publish: + raise_error: <% $.invalid_yaql_expression %> + + wf4_second_lvl: + type: direct + tasks: + second_lvl_wf4_task_1: + action: std.noop + + wf5_top_lvl: + type: direct + tasks: + top_lvl_wf5_task_1: + workflow: wf4_second_lvl + input: + raise_error: <% $.invalid_yaql_expression2 %> + + wf5_second_lvl: + type: direct + tasks: + second_lvl_wf5_task_1: + workflow: wf5_third_lvl + + wf5_third_lvl: + type: direct + tasks: + third_lvl_wf5_task_1: + action: std.noop + +""" + + +class TasksFunctionTest(base.EngineTestCase): + def setUp(self): + super(TasksFunctionTest, self).setUp() + + def _assert_published_tasks(self, task, published_key, + expected_tasks_count=None, + expected_tasks_names=None): + published = task.published[published_key] + self.assertIsNotNone( + published, + "there is a problem with publishing '{}'".format(published_key) + ) + published_names = [t['name'] for t in published] + if expected_tasks_names: + for e in expected_tasks_names: + self.assertIn(e, published_names) + if not expected_tasks_count: + expected_tasks_count = len(expected_tasks_names) + if expected_tasks_count: + self.assertEqual(expected_tasks_count, len(published)) + + def test_tasks_function(self): + wb_service.create_workbook_v2(WORKBOOK_WITH_EXPRESSIONS) + # Start helping workflow executions. + wf1_ex = self.engine.start_workflow('wb.wf1_top_lvl', {}) + wf2_ex = self.engine.start_workflow('wb.wf2_top_lvl', {}) + wf3_ex = self.engine.start_workflow('wb.wf3_top_lvl', {}) + wf4_ex = self.engine.start_workflow('wb.wf4_top_lvl', {}) + wf5_ex = self.engine.start_workflow('wb.wf5_top_lvl', {}) + + self.await_workflow_success(wf1_ex.id) + self.await_workflow_success(wf2_ex.id) + self.await_workflow_error(wf3_ex.id) + self.await_workflow_error(wf4_ex.id) + self.await_workflow_error(wf5_ex.id) + + # Start test workflow execution + execution = self.engine.start_workflow( + 'wb.test_tasks_function', + {"wf1_wx_id": wf1_ex.id, + "wf2_wx_id": wf2_ex.id, + "wf3_wx_id": wf3_ex.id, + "wf4_wx_id": wf4_ex.id, + "wf5_wx_id": wf5_ex.id} + ) + + self.await_workflow_success(execution.id) + + with db_api.transaction(): + execution = db_api.get_workflow_execution(execution.id) + task_executions = execution.task_executions + + self.assertEqual(states.SUCCESS, execution.state) + self.assertEqual(1, len(task_executions)) + main_task = task_executions[0] + + self._assert_published_tasks( + main_task, + "all_tasks_yaql", + 22) + self._assert_published_tasks( + main_task, + "all_tasks_jinja", + 22) + + self._assert_published_tasks( + main_task, + "wf1_tasks_yaql", + 2, + ["top_lvl_wf1_task_1", "top_lvl_wf1_task_2"] + ) + + self._assert_published_tasks( + main_task, + "wf1_tasks_jinja", + 2, + ["top_lvl_wf1_task_1", "top_lvl_wf1_task_2"] + ) + + self._assert_published_tasks( + main_task, + "wf1_recursive_tasks_yaql", + 8, + [ + 'top_lvl_wf1_task_1', + 'top_lvl_wf1_task_2', + 'second_lvl_wf1_task_3', + 'second_lvl_wf1_task_1', + 'second_lvl_wf1_task_2', + 'third_lvl_wf1_task_3', + 'third_lvl_wf1_task_1', + 'third_lvl_wf1_task_2_fail' + ] + ) + + self._assert_published_tasks( + main_task, + "wf1_recursive_tasks_jinja", + 8, + [ + 'top_lvl_wf1_task_1', + 'top_lvl_wf1_task_2', + 'second_lvl_wf1_task_3', + 'second_lvl_wf1_task_1', + 'second_lvl_wf1_task_2', + 'third_lvl_wf1_task_3', + 'third_lvl_wf1_task_1', + 'third_lvl_wf1_task_2_fail' + ] + ) + + self._assert_published_tasks( + main_task, + "wf1_recursive_error_tasks_yaql", + 2, + ['second_lvl_wf1_task_1', 'third_lvl_wf1_task_2_fail'] + ) + + self._assert_published_tasks( + main_task, + "wf1_recursive_error_tasks_jinja", + 2, + ['second_lvl_wf1_task_1', 'third_lvl_wf1_task_2_fail'] + ) + + self._assert_published_tasks( + main_task, + "wf1_not_recursive_error_tasks_yaql", + 0 + ) + + self._assert_published_tasks( + main_task, + "wf1_not_recursive_error_tasks_jinja", + 0 + ) + + self._assert_published_tasks( + main_task, + "wf1_recursive_success_flat_tasks_yaql", + 5, + [ + 'top_lvl_wf1_task_2', + 'second_lvl_wf1_task_3', + 'second_lvl_wf1_task_2', + 'third_lvl_wf1_task_3', + 'third_lvl_wf1_task_1' + ] + ) + + self._assert_published_tasks( + main_task, + "wf1_recursive_success_flat_tasks_jinja", + 5, + [ + 'top_lvl_wf1_task_2', + 'second_lvl_wf1_task_3', + 'second_lvl_wf1_task_2', + 'third_lvl_wf1_task_3', + 'third_lvl_wf1_task_1' + ] + ) + + self._assert_published_tasks( + main_task, + "wf2_recursive_tasks_yaql", + 2, + ['top_lvl_wf2_task_2', 'top_lvl_wf2_task_1'] + ) + + self._assert_published_tasks( + main_task, + "wf2_recursive_tasks_jinja", + 2, + ['top_lvl_wf2_task_2', 'top_lvl_wf2_task_1'] + ) + + self._assert_published_tasks( + main_task, + "wf3_recursive_error_tasks_yaql", + 4, + [ + 'top_lvl_wf3_task_1_fail', + 'top_lvl_wf3_task_2_fail', + 'second_lvl_wf3_task_1_fail', + 'third_lvl_wf3_task_3_fail' + ] + ) + + self._assert_published_tasks( + main_task, + "wf3_recursive_error_tasks_jinja", + 4, + [ + 'top_lvl_wf3_task_1_fail', + 'top_lvl_wf3_task_2_fail', + 'second_lvl_wf3_task_1_fail', + 'third_lvl_wf3_task_3_fail' + ] + ) + + self._assert_published_tasks( + main_task, + "wf3_recursive_error_flat_tasks_yaql", + 2, + ['top_lvl_wf3_task_2_fail', 'third_lvl_wf3_task_3_fail'] + ) + + self._assert_published_tasks( + main_task, + "wf3_recursive_error_flat_tasks_jinja", + 2, + ['top_lvl_wf3_task_2_fail', 'third_lvl_wf3_task_3_fail'] + ) + + self._assert_published_tasks( + main_task, + "wf4_recursive_error_flat_tasks_yaql", + 1, + ['top_lvl_wf4_task_1'] + ) + + self._assert_published_tasks( + main_task, + "wf4_recursive_error_flat_tasks_jinja", + 1, + ['top_lvl_wf4_task_1'] + ) + + self._assert_published_tasks( + main_task, + "wf5_recursive_error_flat_tasks_yaql", + 1, + ['top_lvl_wf5_task_1'] + ) + + self._assert_published_tasks( + main_task, + "wf5_recursive_error_flat_tasks_jinja", + 1, + ['top_lvl_wf5_task_1'] + ) diff --git a/mistral/tests/unit/expressions/test_jinja_expression.py b/mistral/tests/unit/expressions/test_jinja_expression.py index 47956f00f..c84980208 100644 --- a/mistral/tests/unit/expressions/test_jinja_expression.py +++ b/mistral/tests/unit/expressions/test_jinja_expression.py @@ -183,8 +183,8 @@ class JinjaEvaluatorTest(base.BaseTest): @mock.patch('mistral.db.v2.api.get_task_executions') @mock.patch('mistral.workflow.data_flow.get_task_execution_result') - def test_filter_task_without_taskexecution(self, task_execution_result, - task_executions): + def test_filter_task_without_task_execution(self, task_execution_result, + task_executions): task = mock.MagicMock(return_value={}) task_executions.return_value = [task] ctx = { @@ -203,9 +203,38 @@ class JinjaEvaluatorTest(base.BaseTest): 'result': task_execution_result(), 'spec': task.spec, 'state': task.state, - 'state_info': task.state_info + 'state_info': task.state_info, + 'type': task.type, + 'workflow_execution_id': task.workflow_execution_id }, result) + @mock.patch('mistral.db.v2.api.get_task_executions') + @mock.patch('mistral.workflow.data_flow.get_task_execution_result') + def test_filter_tasks_without_task_execution(self, task_execution_result, + task_executions): + task = mock.MagicMock(return_value={}) + task_executions.return_value = [task] + ctx = { + '__task_execution': None, + '__execution': { + 'id': 'some' + } + } + + result = self._evaluator.evaluate('_|tasks()', ctx) + + self.assertEqual([{ + 'id': task.id, + 'name': task.name, + 'published': task.published, + 'result': task_execution_result(), + 'spec': task.spec, + 'state': task.state, + 'state_info': task.state_info, + 'type': task.type, + 'workflow_execution_id': task.workflow_execution_id + }], result) + @mock.patch('mistral.db.v2.api.get_task_execution') @mock.patch('mistral.workflow.data_flow.get_task_execution_result') def test_filter_task_with_taskexecution(self, task_execution_result, @@ -226,7 +255,9 @@ class JinjaEvaluatorTest(base.BaseTest): 'result': task_execution_result(), 'spec': task_execution().spec, 'state': task_execution().state, - 'state_info': task_execution().state_info + 'state_info': task_execution().state_info, + 'type': task_execution().type, + 'workflow_execution_id': task_execution().workflow_execution_id }, result) @mock.patch('mistral.db.v2.api.get_task_execution') @@ -248,7 +279,9 @@ class JinjaEvaluatorTest(base.BaseTest): 'result': task_execution_result(), 'spec': task_execution().spec, 'state': task_execution().state, - 'state_info': task_execution().state_info + 'state_info': task_execution().state_info, + 'type': task_execution().type, + 'workflow_execution_id': task_execution().workflow_execution_id }, result) @mock.patch('mistral.db.v2.api.get_workflow_execution') diff --git a/mistral/tests/unit/expressions/test_yaql_expression.py b/mistral/tests/unit/expressions/test_yaql_expression.py index c4cf17cda..26f0b64ec 100644 --- a/mistral/tests/unit/expressions/test_yaql_expression.py +++ b/mistral/tests/unit/expressions/test_yaql_expression.py @@ -18,6 +18,7 @@ from mistral import exceptions as exc from mistral.expressions import yaql_expression as expr from mistral.tests.unit import base from mistral import utils +import mock DATA = { "server": { @@ -136,6 +137,48 @@ class YaqlEvaluatorTest(base.BaseTest): self.assertTrue(utils.is_valid_uuid(uuid)) + @mock.patch('mistral.db.v2.api.get_task_executions') + @mock.patch('mistral.workflow.data_flow.get_task_execution_result') + def test_filter_tasks_without_task_execution(self, task_execution_result, + task_executions): + + task_execution_result.return_value = 'task_execution_result' + task = type("obj", (object,), { + 'id': 'id', + 'name': 'name', + 'published': 'published', + 'result': task_execution_result(), + 'spec': 'spec', + 'state': 'state', + 'state_info': 'state_info', + 'type': 'type', + 'workflow_execution_id': 'workflow_execution_id' + })() + + task_executions.return_value = [task] + + ctx = { + '__task_execution': None, + '__execution': { + 'id': 'some' + } + } + + result = self._evaluator.evaluate('tasks(some)', ctx) + + self.assertEqual(1, len(result)) + self.assertDictEqual({ + 'id': task.id, + 'name': task.name, + 'published': task.published, + 'result': task.result, + 'spec': task.spec, + 'state': task.state, + 'state_info': task.state_info, + 'type': task.type, + 'workflow_execution_id': task.workflow_execution_id + }, result[0]) + def test_function_env(self): ctx = {'__env': 'some'} diff --git a/mistral/utils/__init__.py b/mistral/utils/__init__.py index 250225488..0bab2d5ed 100644 --- a/mistral/utils/__init__.py +++ b/mistral/utils/__init__.py @@ -41,6 +41,8 @@ from mistral import version # Thread local storage. _th_loc_storage = threading.local() +ACTION_TASK_TYPE = 'ACTION' +WORKFLOW_TASK_TYPE = 'WORKFLOW' def generate_unicode_uuid(): diff --git a/mistral/utils/expression_utils.py b/mistral/utils/expression_utils.py index 0c7837d7d..1350619d5 100644 --- a/mistral/utils/expression_utils.py +++ b/mistral/utils/expression_utils.py @@ -118,8 +118,6 @@ def json_pp_(context, data=None): def task_(context, task_name): - # Importing data_flow in order to break cycle dependency between modules. - from mistral.workflow import data_flow # This section may not exist in a context if it's calculated not in # task scope. @@ -140,6 +138,105 @@ def task_(context, task_name): if not task_ex: return None + # We don't use to_dict() db model method because not all fields + # make sense for user. + return _convert_to_user_model(task_ex) + + +def _should_pass_filter(t, state, flat): + # Start from assuming all is true, check only if needed. + state_match = True + flat_match = True + + if state: + state_match = t['state'] == state + + if flat: + is_action = t['type'] == utils.ACTION_TASK_TYPE + + if not is_action: + nested_execs = db_api.get_workflow_executions( + task_execution_id=t.id + ) + + for n in nested_execs: + flat_match = flat_match and n.state != t.state + + return state_match and flat_match + + +def _get_tasks_from_db(workflow_execution_id=None, recursive=False, state=None, + flat=False): + task_execs = [] + nested_task_exs = [] + + kwargs = {} + + if workflow_execution_id: + kwargs['workflow_execution_id'] = workflow_execution_id + + # We can't add state to query if we want to filter by workflow_execution_id + # recursively. There might be a workflow_execution in one state with a + # nested workflow execution that has a task in the desired state until we + # have an optimization for queering all workflow executions under a given + # top level workflow execution, this is the way to go. + if state and not (workflow_execution_id and recursive): + kwargs['state'] = state + + task_execs.extend(db_api.get_task_executions(**kwargs)) + + # If it is not recursive no need to check nested workflows. + # If there is no workflow execution id, we already have all we need, and + # doing more queries will just create duplication in the results. + if recursive and workflow_execution_id: + for t in task_execs: + if t.type == utils.WORKFLOW_TASK_TYPE: + # Get nested workflow execution that matches the task. + nested_workflow_executions = db_api.get_workflow_executions( + task_execution_id=t.id + ) + + # There might be zero nested executions. + for nested_workflow_execution in nested_workflow_executions: + nested_task_exs.extend( + _get_tasks_from_db( + nested_workflow_execution.id, + recursive, + state, + flat + ) + ) + + if state or flat: + # Filter by state and flat. + task_execs = [ + t for t in task_execs if _should_pass_filter(t, state, flat) + ] + + # The nested tasks were already filtered, since this is a recursion. + task_execs.extend(nested_task_exs) + + return task_execs + + +def tasks_(context, workflow_execution_id=None, recursive=False, state=None, + flat=False): + + task_execs = _get_tasks_from_db( + workflow_execution_id, + recursive, + state, + flat + ) + + # Convert task_execs to user model and return. + return [_convert_to_user_model(t) for t in task_execs] + + +def _convert_to_user_model(task_ex): + # Importing data_flow in order to break cycle dependency between modules. + from mistral.workflow import data_flow + # We don't use to_dict() db model method because not all fields # make sense for user. return { @@ -149,7 +246,9 @@ def task_(context, task_name): 'state': task_ex.state, 'state_info': task_ex.state_info, 'result': data_flow.get_task_execution_result(task_ex), - 'published': task_ex.published + 'published': task_ex.published, + 'type': task_ex.type, + 'workflow_execution_id': task_ex.workflow_execution_id } diff --git a/mistral/workbook/v2/tasks.py b/mistral/workbook/v2/tasks.py index ab872f517..0342c46ce 100644 --- a/mistral/workbook/v2/tasks.py +++ b/mistral/workbook/v2/tasks.py @@ -36,9 +36,6 @@ RESERVED_TASK_NAMES = [ 'pause' ] -ACTION_TASK_TYPE = 'ACTION' -WORKFLOW_TASK_TYPE = 'WORKFLOW' - class TaskSpec(base.BaseSpec): # See http://json-schema.org @@ -225,8 +222,8 @@ class TaskSpec(base.BaseSpec): def get_type(self): if self._workflow: - return WORKFLOW_TASK_TYPE - return ACTION_TASK_TYPE + return utils.WORKFLOW_TASK_TYPE + return utils.ACTION_TASK_TYPE class DirectWorkflowTaskSpec(TaskSpec): diff --git a/setup.cfg b/setup.cfg index 21c7528f7..32de4f85b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -72,6 +72,7 @@ mistral.actions = mistral.expression.functions = json_pp = mistral.utils.expression_utils:json_pp_ task = mistral.utils.expression_utils:task_ + tasks = mistral.utils.expression_utils:tasks_ execution = mistral.utils.expression_utils:execution_ env = mistral.utils.expression_utils:env_ uuid = mistral.utils.expression_utils:uuid_