diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index 347bb096c..99b79518b 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -26,6 +26,7 @@ import wsmeext.pecan as wsme_pecan from mistral.api import access_control as acl from mistral.api.controllers.v2 import execution_report from mistral.api.controllers.v2 import resources +from mistral.api.controllers.v2 import sub_execution from mistral.api.controllers.v2 import task from mistral.api.controllers.v2 import types from mistral import context @@ -86,6 +87,7 @@ def _get_workflow_execution(id, must_exist=True): class ExecutionsController(rest.RestController): tasks = task.ExecutionTasksController() report = execution_report.ExecutionReportController() + executions = sub_execution.SubExecutionsController() @rest_utils.wrap_wsme_controller_exception @wsme_pecan.wsexpose(resources.Execution, wtypes.text) diff --git a/mistral/api/controllers/v2/sub_execution.py b/mistral/api/controllers/v2/sub_execution.py new file mode 100644 index 000000000..f47d53ba4 --- /dev/null +++ b/mistral/api/controllers/v2/sub_execution.py @@ -0,0 +1,134 @@ +# Copyright 2020 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +from pecan import request +from pecan import rest +import wsmeext.pecan as wsme_pecan + +from mistral.api.controllers.v2 import resources +from mistral.api.controllers.v2 import types +from mistral.db.v2 import api as db_api +from mistral.utils import rest_utils +from mistral.workflow import states + +LOG = logging.getLogger(__name__) + + +def get_task_sub_executions_list(task_ex_id, filters, cur_depth): + task_sub_execs = [] + + with db_api.transaction(): + task_ex = db_api.get_task_execution(task_ex_id) + + if filters['errors_only'] and task_ex.state != states.ERROR: + return [] + + child_wf_executions = task_ex.workflow_executions + + for c_ex in child_wf_executions: + task_sub_execs.extend( + get_execution_sub_executions_list( + c_ex.id, + filters, + cur_depth + ) + ) + + return task_sub_execs + + +def get_execution_sub_executions_list(wf_ex_id, filters, cur_depth): + max_depth = filters['max_depth'] + include_output = filters['include_output'] + ex_sub_execs = [] + + if 0 <= max_depth < cur_depth: + return [] + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex_id) + + wf_resource = _get_wf_resource_from_db_model( + wf_ex, + include_output) + + ex_sub_execs.append(wf_resource) + + task_execs = wf_ex.task_executions + + for t_ex in task_execs: + task_sub_executions = get_task_sub_executions_list( + t_ex.id, + filters, + cur_depth + 1 + ) + ex_sub_execs.extend(task_sub_executions) + + return ex_sub_execs + + +def _get_wf_resource_from_db_model(wf_ex, include_output): + if include_output: + rest_utils.load_deferred_fields(wf_ex, ['params', 'input', 'output']) + else: + rest_utils.load_deferred_fields(wf_ex, ['params', 'input']) + + return resources.Execution.from_db_model(wf_ex) + + +def _get_sub_executions(origin, id, filters): + if origin == 'execution': + return get_execution_sub_executions_list(id, filters, cur_depth=0) + else: + return get_task_sub_executions_list(id, filters, cur_depth=0) + + +class SubExecutionsController(rest.RestController): + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.Executions, types.uuid, bool, int, bool) + def get(self, id, errors_only=False, max_depth=-1, include_output=False): + """Return workflow execution report. + + :param id: The ID of the workflow execution or task execution + to get the sub-executions of. + :param errors_only: Optional. If True, only error paths of the + execution tree are returned . + :param max_depth: Optional. Limits the depth of recursion while + obtaining the execution tree. If a value of the + flag is a negative number then no limit is set. + :param include_output: Optional. Include the output for all executions + in the list. + """ + origin = 'execution' if request.path.startswith('/v2/executions') \ + else 'task' + + LOG.info( + "Fetching sub executions of %s [id=%s]", + origin, + id + ) + + filters = { + 'errors_only': errors_only, + 'max_depth': max_depth, + 'include_output': include_output + } + + sub_executions_resource = _get_sub_executions(origin, id, filters) + + return resources.Executions.convert_with_links( + sub_executions_resource, + request.application_url, + ) diff --git a/mistral/api/controllers/v2/task.py b/mistral/api/controllers/v2/task.py index 28157b118..7c9e4ea50 100644 --- a/mistral/api/controllers/v2/task.py +++ b/mistral/api/controllers/v2/task.py @@ -24,6 +24,7 @@ import wsmeext.pecan as wsme_pecan from mistral.api import access_control as acl from mistral.api.controllers.v2 import action_execution from mistral.api.controllers.v2 import resources +from mistral.api.controllers.v2 import sub_execution from mistral.api.controllers.v2 import types from mistral import context from mistral.db.v2 import api as db_api @@ -200,6 +201,7 @@ class TaskExecutionsController(rest.RestController): class TasksController(rest.RestController): action_executions = action_execution.TasksActionExecutionController() workflow_executions = TaskExecutionsController() + executions = sub_execution.SubExecutionsController() @rest_utils.wrap_wsme_controller_exception @wsme_pecan.wsexpose(resources.Task, wtypes.text) diff --git a/mistral/tests/unit/api/v2/test_sub_execution.py b/mistral/tests/unit/api/v2/test_sub_execution.py new file mode 100644 index 000000000..8a21ab43b --- /dev/null +++ b/mistral/tests/unit/api/v2/test_sub_execution.py @@ -0,0 +1,171 @@ +# Copyright 2019 - Nokia Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.services import workflows as wf_service +from mistral.tests.unit.api import base +from mistral.tests.unit.engine import base as engine_base + +WF_TEXT = """--- + version: "2.0" + action_wf: + tasks: + action_task: + action: std.noop + fail_wf: + tasks: + fail_task: + action: std.fail + middle_wf: + tasks: + middle_task: + workflow: action_wf + fail_task: + workflow: fail_wf + main_wf: + tasks: + main_task: + workflow: middle_wf + """ + + +class TestSubExecutionsController(base.APITest, engine_base.EngineTestCase): + + def setUp(self): + super(TestSubExecutionsController, self).setUp() + + wf_service.create_workflows(WF_TEXT) + + def test_sub_executions_wf_ex_id(self): + wf_ex = self.engine.start_workflow('main_wf') + self.await_workflow_error(wf_ex.id) + + resp = self.app.get('/v2/executions/%s/executions' % wf_ex.id) + + self.assertEqual(200, resp.status_int) + + main_wf_ex_list = resp.json['executions'] + + self.assertEqual(4, len(main_wf_ex_list)) + self._assert_single_item(main_wf_ex_list, workflow_name='main_wf') + self._assert_single_item(main_wf_ex_list, workflow_name='action_wf') + self._assert_single_item(main_wf_ex_list, workflow_name='fail_wf') + + middle_wf = self._assert_single_item( + main_wf_ex_list, + workflow_name='middle_wf' + ) + + # check the sub execs of a sub-ex + resp = self.app.get('/v2/executions/%s/executions' % middle_wf['id']) + + self.assertEqual(200, resp.status_int) + + middle_wf_ex_list = resp.json['executions'] + + self.assertEqual(3, len(middle_wf_ex_list)) + self._assert_single_item(middle_wf_ex_list, workflow_name='middle_wf') + self._assert_single_item(middle_wf_ex_list, workflow_name='action_wf') + self._assert_single_item(middle_wf_ex_list, workflow_name='fail_wf') + + def test_sub_executions_errors_only(self): + wf_ex = self.engine.start_workflow('main_wf') + self.await_workflow_error(wf_ex.id) + + resp = self.app.get( + '/v2/executions/%s/executions?errors_only=True' + % wf_ex.id + ) + + self.assertEqual(200, resp.status_int) + + main_wf_ex_list = resp.json['executions'] + + self.assertEqual(3, len(main_wf_ex_list)) + self._assert_single_item(main_wf_ex_list, workflow_name='middle_wf') + self._assert_single_item(main_wf_ex_list, workflow_name='fail_wf') + self._assert_no_item(main_wf_ex_list, workflow_name='action_wf') + + def test_sub_executions_with_max_depth(self): + wf_ex = self.engine.start_workflow('main_wf') + self.await_workflow_error(wf_ex.id) + + resp = self.app.get( + '/v2/executions/%s/executions?max_depth=1' + % wf_ex.id + ) + + self.assertEqual(200, resp.status_int) + + main_wf_ex_list = resp.json['executions'] + + self.assertEqual(2, len(main_wf_ex_list)) + self._assert_single_item(main_wf_ex_list, workflow_name='middle_wf') + self._assert_single_item(main_wf_ex_list, workflow_name='main_wf') + + def test_sub_executions_task_id(self): + wf_ex = self.engine.start_workflow('main_wf') + self.await_workflow_error(wf_ex.id) + + resp = self.app.get('/v2/executions/%s/executions' % wf_ex.id) + + self.assertEqual(200, resp.status_int) + + main_wf_ex_list = resp.json['executions'] + + self.assertEqual(4, len(main_wf_ex_list)) + middle_wf = self._assert_single_item( + main_wf_ex_list, + workflow_name='middle_wf' + ) + + resp = self.app.get( + '/v2/tasks/%s/executions' + % middle_wf['task_execution_id'] + ) + + self.assertEqual(200, resp.status_int) + + main_task_ex_list = resp.json['executions'] + + self.assertEqual(3, len(main_task_ex_list)) + self._assert_single_item(main_task_ex_list, workflow_name='fail_wf') + self._assert_single_item(main_task_ex_list, workflow_name='middle_wf') + self._assert_single_item(main_task_ex_list, workflow_name='action_wf') + + def test_sub_executions_with_include_output(self): + wf_ex = self.engine.start_workflow('main_wf') + self.await_workflow_error(wf_ex.id) + + resp = self.app.get( + '/v2/executions/%s/executions?include_output=true' + % wf_ex.id + ) + + self.assertEqual(200, resp.status_int) + main_wf = self._assert_single_item( + resp.json['executions'], + workflow_name='main_wf' + ) + + self.assertIsNotNone(main_wf.get('output')) + + resp = self.app.get('/v2/executions/%s/executions' % wf_ex.id) + + self.assertEqual(200, resp.status_int) + + main_wf = self._assert_single_item( + resp.json['executions'], + workflow_name='main_wf' + ) + self.assertIsNone(main_wf.get('output')) diff --git a/mistral/tests/unit/base.py b/mistral/tests/unit/base.py index 525b8f80e..f64bbf150 100644 --- a/mistral/tests/unit/base.py +++ b/mistral/tests/unit/base.py @@ -139,6 +139,9 @@ class BaseTest(base.BaseTestCase): def _assert_single_item(self, items, **props): return self._assert_multiple_items(items, 1, **props)[0] + def _assert_no_item(self, items, **props): + self._assert_multiple_items(items, 0, **props) + def _assert_multiple_items(self, items, count, **props): def _matches(item, **props): for prop_name, prop_val in props.items(): diff --git a/releasenotes/notes/sub_execution_api.yaml b/releasenotes/notes/sub_execution_api.yaml new file mode 100644 index 000000000..c90aff9eb --- /dev/null +++ b/releasenotes/notes/sub_execution_api.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Added a new API to fetch sub-executions of an execution or a task.