From ae2c5fdbbbd118c371dfd6af238f1c32bb50a6db Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Tue, 15 Jan 2019 17:13:42 +0700 Subject: [PATCH] Add a workflow execution report endpoint * This patch adds the first version of the REST endpoint that can generate a report for a workflow execution. Without any query parameters, GET method of the endpoint returns a tree-like structure that includes information about all execution objects associated with the specified workflow execution. The root object is the workflow execution itself, its children are task executions, each task execution has either action executions or workflow executions and so on. So the structure describes the entire set of execution objects regardless of how deep there are from the root object. This kind of data itself can be used to better understand and visualise the process related with a parent workflow, see what paths were taken while the workflow was running. If additional parameters are provided in a query string, the endpoint can give only a subset of the entire tree. Currently, the filters are: 1. "errors_only" (boolean) to retain only all execution objects with the error state, meaning that only all error paths are present in the report. It is useful when we need to do a root cause analysis of the workflow failure. False by default. 2. "max_depth" (integer) to limit how deep the algorithm can go into nested workflows. If set to 0, only the root workflow execution will be in the report. If set to 1, then the report will have only the root workflow execution and its direct children. And so on. If negative (by default) then no limit is set. Additionally, the report contains statistics about task executions examined while the report was being generated, like the number of tasks in the error state, number of tasks that successfully finished and so on. * Added all main tests for the endpoint. Note that despite the fact that this test verifies a REST API endpoint, unlike the other API tests it runs a Mistral engine to run workflows. This is done to simplify the test implementation so that we don't have to mock everything with huge data structures like we do in other API tests. Possible changes that may be made based on the feedback: * Statistics can contain not only number of tasks in certain states. We can also add things like number of actions, depth of the tree, number of nested workflows, average task/action/workflow execution time etc. * Additional query parameters to configure a generated report. For example, "statistics_only" just to get a general information about the workflow execution tree, not the tree itself. Another example is "running_only" to retain only not finished workflow paths. Implements blueprint: mistral-error-analysis Change-Id: Id3e17821e04b7a1b84dfea5126d223d90ad8e3c2 --- mistral/api/controllers/v2/execution.py | 2 + .../api/controllers/v2/execution_report.py | 164 +++++++ mistral/api/controllers/v2/resources.py | 155 +++++++ mistral/api/controllers/v2/task.py | 11 +- .../unit/api/v2/test_execution_report.py | 406 ++++++++++++++++++ 5 files changed, 736 insertions(+), 2 deletions(-) create mode 100644 mistral/api/controllers/v2/execution_report.py create mode 100644 mistral/tests/unit/api/v2/test_execution_report.py diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index 945feb908..784a849f1 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -23,6 +23,7 @@ from wsme import types as wtypes import wsmeext.pecan as wsme_pecan from mistral.api import access_control as acl +from mistral.api.controllers.v2 import execution_report from mistral.api.controllers.v2 import resources from mistral.api.controllers.v2 import task from mistral.api.controllers.v2 import types @@ -82,6 +83,7 @@ def _get_workflow_execution(id, must_exist=True): class ExecutionsController(rest.RestController): tasks = task.ExecutionTasksController() + report = execution_report.ExecutionReportController() @rest_utils.wrap_wsme_controller_exception @wsme_pecan.wsexpose(resources.Execution, wtypes.text) diff --git a/mistral/api/controllers/v2/execution_report.py b/mistral/api/controllers/v2/execution_report.py new file mode 100644 index 000000000..d1750577e --- /dev/null +++ b/mistral/api/controllers/v2/execution_report.py @@ -0,0 +1,164 @@ +# Copyright 2019 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +from pecan import rest +import wsmeext.pecan as wsme_pecan + +from mistral.api.controllers.v2 import resources +from mistral.api.controllers.v2 import types +from mistral.db.v2 import api as db_api +from mistral.db.v2.sqlalchemy import models as db_models +from mistral.utils import rest_utils +from mistral.workflow import states + + +LOG = logging.getLogger(__name__) + + +def create_workflow_execution_entry(wf_ex): + return resources.WorkflowExecutionReportEntry.from_db_model(wf_ex) + + +def create_task_execution_entry(task_ex): + return resources.TaskExecutionReportEntry.from_db_model(task_ex) + + +def create_action_execution_entry(action_ex): + return resources.ActionExecutionReportEntry.from_db_model(action_ex) + + +def update_statistics_with_task(stat, task_ex): + if task_ex.state == states.RUNNING: + stat.increment_running() + elif task_ex.state == states.SUCCESS: + stat.increment_success() + elif task_ex.state == states.ERROR: + stat.increment_error() + elif task_ex.state == states.IDLE: + stat.increment_idle() + elif task_ex.state == states.PAUSED: + stat.increment_paused() + + +def analyse_task_execution(task_ex_id, stat, filters, cur_depth): + with db_api.transaction(): + task_ex = db_api.get_task_execution(task_ex_id) + + if filters['errors_only'] and task_ex.state != states.ERROR: + return None + + update_statistics_with_task(stat, task_ex) + + entry = create_task_execution_entry(task_ex) + + child_executions = task_ex.executions + + entry.action_executions = [] + entry.workflow_executions = [] + + for c_ex in child_executions: + if isinstance(c_ex, db_models.ActionExecution): + entry.action_executions.append( + create_action_execution_entry(c_ex) + ) + else: + entry.workflow_executions.append( + analyse_workflow_execution(c_ex.id, stat, filters, cur_depth) + ) + + return entry + + +def analyse_workflow_execution(wf_ex_id, stat, filters, cur_depth): + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex_id) + + entry = create_workflow_execution_entry(wf_ex) + + max_depth = filters['max_depth'] + + # Don't get deeper into the workflow task executions if + # maximum depth is defined and the current depth exceeds it. + if 0 <= max_depth < cur_depth: + return entry + + task_execs = wf_ex.task_executions + + entry.task_executions = [] + + for t_ex in task_execs: + task_exec_entry = analyse_task_execution( + t_ex.id, + stat, + filters, + cur_depth + 1 + ) + + if task_exec_entry: + entry.task_executions.append(task_exec_entry) + + return entry + + +def build_report(wf_ex_id, filters): + report = resources.ExecutionReport() + + stat = resources.ExecutionReportStatistics() + + report.statistics = stat + report.root_workflow_execution = analyse_workflow_execution( + wf_ex_id, + stat, + filters, + 0 + ) + + return report + + +class ExecutionReportController(rest.RestController): + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.ExecutionReport, types.uuid, bool, int) + def get(self, workflow_execution_id, errors_only=False, max_depth=-1): + """Return workflow execution report. + + :param workflow_execution_id: The ID of the workflow execution to + generate a report for. + :param errors_only: Optional. If True, only error paths of the + execution tree are included into the report. The root execution + (with the specified id) is always included, but its tasks may + or may not be included depending on this flag's value. + :param max_depth: Optional. Limits the depth of recursion while + obtaining the execution tree. That is, subworkflows of what + maximum depth will be included into the report. If a value of the + flag is a negative number then no limit is set. + The root execution has depth 0 so if the flag is 0 then only + the root execution, its tasks and their actions will be included. + If some of the tasks in turn run workflows then these subworkflows + will be also included but without their tasks. The algorithm will + fully analyse their tasks only if max_depth is greater than zero. + """ + + LOG.info( + "Fetch execution report [workflow_execution_id=%s]", + workflow_execution_id + ) + + filters = { + 'errors_only': errors_only, + 'max_depth': max_depth + } + + return build_report(workflow_execution_id, filters) diff --git a/mistral/api/controllers/v2/resources.py b/mistral/api/controllers/v2/resources.py index d19e62a0f..3d6eee051 100644 --- a/mistral/api/controllers/v2/resources.py +++ b/mistral/api/controllers/v2/resources.py @@ -721,3 +721,158 @@ class EventTriggers(resource.ResourceList): "marker=123e4567-e89b-12d3-a456-426655440000") return triggers_sample + + +class BaseExecutionReportEntry(resource.Resource): + """Execution report entry resource.""" + + id = wtypes.text + name = wtypes.text + created_at = wtypes.text + updated_at = wtypes.text + state = wtypes.text + state_info = wtypes.text + + @classmethod + def sample(cls): + # TODO(rakhmerov): complete + + return cls( + id='123e4567-e89b-12d3-a456-426655441414', + created_at='2019-01-30T00:00:00.000000', + updated_at='2019-01-30T00:00:00.000000', + state=states.SUCCESS + ) + + +class ActionExecutionReportEntry(BaseExecutionReportEntry): + """Action execution report entry resource.""" + + accepted = bool + last_heartbeat = wtypes.text + + @classmethod + def sample(cls): + sample = super(ActionExecutionReportEntry, cls).sample() + + sample.accepted = True + sample.last_heartbeat = '2019-01-30T00:00:00.000000' + + return sample + + +class WorkflowExecutionReportEntry(BaseExecutionReportEntry): + """Workflow execution report entry resource.""" + + # NOTE(rakhmerov): task_executions has to be declared below + # after we declare a class for task execution entry resource. + + @classmethod + def sample(cls): + sample = super(WorkflowExecutionReportEntry, cls).sample() + + # We can't define a non-empty list task executions here because + # the needed class is not defined yet. Since this is just a sample + # we can sacrifice it. + sample.task_executions = [] + + return sample + + +class TaskExecutionReportEntry(BaseExecutionReportEntry): + """Task execution report entity resource.""" + + action_executions = [ActionExecutionReportEntry] + workflow_executions = [WorkflowExecutionReportEntry] + + @classmethod + def sample(cls): + sample = super(TaskExecutionReportEntry, cls).sample() + + sample.action_executions = [ActionExecutionReportEntry.sample()] + sample.workflow_executions = [] + + return sample + + +# We have to declare this field later because of the dynamic binding. +# It can't be within WorkflowExecutionReportEntry before +# TaskExecutionReportEntry is declared. +WorkflowExecutionReportEntry.task_executions = [TaskExecutionReportEntry] +wtypes.registry.reregister(WorkflowExecutionReportEntry) + + +class ExecutionReportStatistics(resource.Resource): + """Execution report statistics. + + TODO(rakhmerov): There's much more we can add here. For example, + information about action, average (and also min and max) task execution + run time etc. + """ + + total_tasks_count = wtypes.IntegerType(minimum=0) + running_tasks_count = wtypes.IntegerType(minimum=0) + success_tasks_count = wtypes.IntegerType(minimum=0) + error_tasks_count = wtypes.IntegerType(minimum=0) + idle_tasks_count = wtypes.IntegerType(minimum=0) + paused_tasks_count = wtypes.IntegerType(minimum=0) + + def __init__(self, **kw): + self.total_tasks_count = 0 + self.running_tasks_count = 0 + self.success_tasks_count = 0 + self.error_tasks_count = 0 + self.idle_tasks_count = 0 + self.paused_tasks_count = 0 + + super(ExecutionReportStatistics, self).__init__(**kw) + + def increment_running(self): + self.running_tasks_count += 1 + self.total_tasks_count += 1 + + def increment_success(self): + self.success_tasks_count += 1 + self.total_tasks_count += 1 + + def increment_error(self): + self.error_tasks_count += 1 + self.total_tasks_count += 1 + + def increment_idle(self): + self.idle_tasks_count += 1 + self.total_tasks_count += 1 + + def increment_paused(self): + self.paused_tasks_count += 1 + self.total_tasks_count += 1 + + @classmethod + def sample(cls): + return cls( + total_tasks_count=10, + running_tasks_count=3, + success_tasks_count=5, + error_tasks_count=2, + idle_tasks_count=0, + paused_tasks_count=0 + ) + + +class ExecutionReport(resource.Resource): + """Execution report resource.""" + + statistics = ExecutionReportStatistics + """General statistics about the workflow execution hierarchy.""" + + root_workflow_execution = WorkflowExecutionReportEntry + """Root entry of the report associated with a workflow execution.""" + + @classmethod + def sample(cls): + sample = cls() + + sample.statistics = ExecutionReportStatistics.sample() + sample.root_workflow_execution = WorkflowExecutionReportEntry.sample() + + return sample diff --git a/mistral/api/controllers/v2/task.py b/mistral/api/controllers/v2/task.py index 070b3b62d..5974da629 100644 --- a/mistral/api/controllers/v2/task.py +++ b/mistral/api/controllers/v2/task.py @@ -36,8 +36,15 @@ from mistral.workflow import states LOG = logging.getLogger(__name__) -STATE_TYPES = wtypes.Enum(str, states.IDLE, states.RUNNING, states.SUCCESS, - states.ERROR, states.RUNNING_DELAYED) + +STATE_TYPES = wtypes.Enum( + str, + states.IDLE, + states.RUNNING, + states.SUCCESS, + states.ERROR, + states.RUNNING_DELAYED +) def _get_task_resource_with_result(task_ex): diff --git a/mistral/tests/unit/api/v2/test_execution_report.py b/mistral/tests/unit/api/v2/test_execution_report.py new file mode 100644 index 000000000..295745ac2 --- /dev/null +++ b/mistral/tests/unit/api/v2/test_execution_report.py @@ -0,0 +1,406 @@ +# Copyright 2019 - Nokia Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.services import workbooks as wb_service +from mistral.services import workflows as wf_service +from mistral.tests.unit.api import base +from mistral.tests.unit.engine import base as engine_base +from mistral.workflow import states + + +class TestExecutionReportController(base.APITest, engine_base.EngineTestCase): + def test_simple_sequence_wf(self): + wf_text = """--- + version: '2.0' + + wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + action: std.fail + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf') + + self.await_workflow_error(wf_ex.id) + + resp = self.app.get('/v2/executions/%s/report' % wf_ex.id) + + self.assertEqual(200, resp.status_int) + + # Now let's verify the response structure + + self.assertIn('root_workflow_execution', resp.json) + + root_wf_ex = resp.json['root_workflow_execution'] + + self.assertIsInstance(root_wf_ex, dict) + self.assertEqual(wf_ex.id, root_wf_ex['id']) + self.assertEqual(wf_ex.name, root_wf_ex['name']) + self.assertEqual(states.ERROR, root_wf_ex['state']) + self.assertGreater(len(root_wf_ex['state_info']), 0) + + tasks = root_wf_ex['task_executions'] + + self.assertIsInstance(tasks, list) + + self.assertEqual(2, len(tasks)) + + # Verify task1 info. + task1 = self._assert_single_item( + tasks, + name='task1', + state=states.SUCCESS + ) + + self.assertEqual(0, len(task1['workflow_executions'])) + self.assertEqual(1, len(task1['action_executions'])) + + task1_action = task1['action_executions'][0] + + self.assertEqual(states.SUCCESS, task1_action['state']) + self.assertEqual('std.noop', task1_action['name']) + + # Verify task2 info. + task2 = self._assert_single_item( + tasks, + name='task2', + state=states.ERROR + ) + + self.assertEqual(1, len(task2['action_executions'])) + + task2_action = task2['action_executions'][0] + + self.assertEqual(0, len(task2['workflow_executions'])) + self.assertEqual(states.ERROR, task2_action['state']) + + # Verify statistics. + stat = resp.json['statistics'] + + self.assertEqual(1, stat['error_tasks_count']) + self.assertEqual(0, stat['idle_tasks_count']) + self.assertEqual(0, stat['paused_tasks_count']) + self.assertEqual(0, stat['running_tasks_count']) + self.assertEqual(1, stat['success_tasks_count']) + self.assertEqual(2, stat['total_tasks_count']) + + def test_nested_wf(self): + wb_text = """--- + version: '2.0' + + name: wb + + workflows: + parent_wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + workflow: sub_wf + on-success: task3 + + task3: + action: std.fail + + sub_wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + action: std.fail + """ + + wb_service.create_workbook_v2(wb_text) + + wf_ex = self.engine.start_workflow('wb.parent_wf') + + self.await_workflow_error(wf_ex.id) + + resp = self.app.get('/v2/executions/%s/report' % wf_ex.id) + + self.assertEqual(200, resp.status_int) + + # Now let's verify the response structure + + self.assertIn('root_workflow_execution', resp.json) + + root_wf_ex = resp.json['root_workflow_execution'] + + self.assertIsInstance(root_wf_ex, dict) + self.assertEqual('wb.parent_wf', root_wf_ex['name']) + self.assertEqual(states.ERROR, root_wf_ex['state']) + self.assertGreater(len(root_wf_ex['state_info']), 0) + + tasks = root_wf_ex['task_executions'] + + self.assertIsInstance(tasks, list) + + self.assertEqual(2, len(tasks)) + + # Verify task1 info. + task1 = self._assert_single_item(tasks, name='task1') + + self.assertEqual(states.SUCCESS, task1['state']) + self.assertEqual(0, len(task1['workflow_executions'])) + self.assertEqual(1, len(task1['action_executions'])) + + task1_action = task1['action_executions'][0] + self.assertEqual(states.SUCCESS, task1_action['state']) + self.assertEqual('std.noop', task1_action['name']) + + # Verify task2 info. + task2 = self._assert_single_item(tasks, name='task2') + + self.assertEqual(states.ERROR, task2['state']) + self.assertEqual(0, len(task2['action_executions'])) + self.assertEqual(1, len(task2['workflow_executions'])) + + sub_wf_entry = task2['workflow_executions'][0] + self.assertEqual(states.ERROR, sub_wf_entry['state']) + + sub_wf_tasks = sub_wf_entry['task_executions'] + + self.assertEqual(2, len(sub_wf_tasks)) + + sub_wf_task1 = self._assert_single_item( + sub_wf_tasks, + name='task1', + state=states.SUCCESS + ) + sub_wf_task2 = self._assert_single_item( + sub_wf_tasks, + name='task2', + state=states.ERROR + ) + + self.assertEqual(1, len(sub_wf_task1['action_executions'])) + self.assertEqual( + states.SUCCESS, + sub_wf_task1['action_executions'][0]['state'] + ) + + self.assertEqual(1, len(sub_wf_task2['action_executions'])) + self.assertEqual( + states.ERROR, + sub_wf_task2['action_executions'][0]['state'] + ) + + # Verify statistics. + stat = resp.json['statistics'] + + self.assertEqual(2, stat['error_tasks_count']) + self.assertEqual(0, stat['idle_tasks_count']) + self.assertEqual(0, stat['paused_tasks_count']) + self.assertEqual(0, stat['running_tasks_count']) + self.assertEqual(2, stat['success_tasks_count']) + self.assertEqual(4, stat['total_tasks_count']) + + def test_nested_wf_errors_only(self): + wb_text = """--- + version: '2.0' + + name: wb + + workflows: + parent_wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + workflow: sub_wf + on-success: task3 + + task3: + action: std.fail + + sub_wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + action: std.fail + """ + + wb_service.create_workbook_v2(wb_text) + + wf_ex = self.engine.start_workflow('wb.parent_wf') + + self.await_workflow_error(wf_ex.id) + + resp = self.app.get( + '/v2/executions/%s/report?errors_only=true' % wf_ex.id + ) + + self.assertEqual(200, resp.status_int) + + # Now let's verify the response structure + + self.assertIn('root_workflow_execution', resp.json) + + root_wf_ex = resp.json['root_workflow_execution'] + + self.assertIsInstance(root_wf_ex, dict) + self.assertEqual('wb.parent_wf', root_wf_ex['name']) + self.assertEqual(states.ERROR, root_wf_ex['state']) + self.assertGreater(len(root_wf_ex['state_info']), 0) + + tasks = root_wf_ex['task_executions'] + + self.assertIsInstance(tasks, list) + + self.assertEqual(1, len(tasks)) + + # There must be only task2 in the response. + # Verify task2 info. + task2 = self._assert_single_item(tasks, name='task2') + + self.assertEqual(states.ERROR, task2['state']) + self.assertEqual(0, len(task2['action_executions'])) + self.assertEqual(1, len(task2['workflow_executions'])) + + sub_wf_entry = task2['workflow_executions'][0] + self.assertEqual(states.ERROR, sub_wf_entry['state']) + + sub_wf_tasks = sub_wf_entry['task_executions'] + + self.assertEqual(1, len(sub_wf_tasks)) + + sub_wf_task2 = self._assert_single_item( + sub_wf_tasks, + name='task2', + state=states.ERROR + ) + + self.assertEqual(1, len(sub_wf_task2['action_executions'])) + self.assertEqual( + states.ERROR, + sub_wf_task2['action_executions'][0]['state'] + ) + + # Verify statistics. + stat = resp.json['statistics'] + + self.assertEqual(2, stat['error_tasks_count']) + self.assertEqual(0, stat['idle_tasks_count']) + self.assertEqual(0, stat['paused_tasks_count']) + self.assertEqual(0, stat['running_tasks_count']) + self.assertEqual(0, stat['success_tasks_count']) + self.assertEqual(2, stat['total_tasks_count']) + + def test_nested_wf_max_depth(self): + wb_text = """--- + version: '2.0' + + name: wb + + workflows: + parent_wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + workflow: sub_wf + on-success: task3 + + task3: + action: std.fail + + sub_wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + action: std.fail + """ + + wb_service.create_workbook_v2(wb_text) + + wf_ex = self.engine.start_workflow('wb.parent_wf') + + self.await_workflow_error(wf_ex.id) + + resp = self.app.get('/v2/executions/%s/report?max_depth=0' % wf_ex.id) + + self.assertEqual(200, resp.status_int) + + # Now let's verify the response structure + + self.assertIn('root_workflow_execution', resp.json) + + root_wf_ex = resp.json['root_workflow_execution'] + + self.assertIsInstance(root_wf_ex, dict) + self.assertEqual('wb.parent_wf', root_wf_ex['name']) + self.assertEqual(states.ERROR, root_wf_ex['state']) + self.assertGreater(len(root_wf_ex['state_info']), 0) + + tasks = root_wf_ex['task_executions'] + + self.assertIsInstance(tasks, list) + + self.assertEqual(2, len(tasks)) + + # Verify task1 info. + task1 = self._assert_single_item(tasks, name='task1') + + self.assertEqual(states.SUCCESS, task1['state']) + self.assertEqual(0, len(task1['workflow_executions'])) + self.assertEqual(1, len(task1['action_executions'])) + + task1_action = task1['action_executions'][0] + self.assertEqual(states.SUCCESS, task1_action['state']) + self.assertEqual('std.noop', task1_action['name']) + + # Verify task2 info. + task2 = self._assert_single_item(tasks, name='task2') + + self.assertEqual(states.ERROR, task2['state']) + self.assertEqual(0, len(task2['action_executions'])) + self.assertEqual(1, len(task2['workflow_executions'])) + + sub_wf_entry = task2['workflow_executions'][0] + self.assertEqual(states.ERROR, sub_wf_entry['state']) + + # We still must have an entry for the subworkflow itself + # but it must not have info about task executions because + # we've now limited max depth. + self.assertNotIn('task_executions', sub_wf_entry) + + # Verify statistics. + stat = resp.json['statistics'] + + self.assertEqual(1, stat['error_tasks_count']) + self.assertEqual(0, stat['idle_tasks_count']) + self.assertEqual(0, stat['paused_tasks_count']) + self.assertEqual(0, stat['running_tasks_count']) + self.assertEqual(1, stat['success_tasks_count']) + self.assertEqual(2, stat['total_tasks_count'])