Add a workflow execution report endpoint

* This patch adds the first version of the REST endpoint that
  can generate a report for a workflow execution. Without any
  query parameters, GET method of the endpoint returns a tree-like
  structure that includes information about all execution objects
  associated with the specified workflow execution. The root object
  is the workflow execution itself, its children are task executions,
  each task execution has either action executions or workflow
  executions and so on. So the structure describes the entire set
  of execution objects regardless of how deep there are from the
  root object. This kind of data itself can be used to better
  understand and visualise the process related with a parent workflow,
  see what paths were taken while the workflow was running.
  If additional parameters are provided in a query string, the
  endpoint can give only a subset of the entire tree. Currently,
  the filters are:
    1. "errors_only" (boolean) to retain only all execution objects
        with the error state, meaning that only all error paths are
        present in the report. It is useful when we need to do a root
        cause analysis of the workflow failure. False by default.
    2. "max_depth" (integer) to limit how deep the algorithm can go
        into nested workflows. If set to 0, only the root workflow
        execution will be in the report. If set to 1, then the report
        will have only the root workflow execution and its direct
	children. And so on. If negative (by default) then no limit
        is set.
  Additionally, the report contains statistics about task executions
  examined while the report was being generated, like the number of
  tasks in the error state, number of tasks that successfully
  finished and so on.
* Added all main tests for the endpoint. Note that despite the fact
  that this test verifies a REST API endpoint, unlike the other API
  tests it runs a Mistral engine to run workflows. This is done to
  simplify the test implementation so that we don't have to mock
  everything with huge data structures like we do in other API tests.

Possible changes that may be made based on the feedback:
* Statistics can contain not only number of tasks in certain states.
  We can also add things like number of actions, depth of the tree,
  number of nested workflows, average task/action/workflow execution
  time etc.
* Additional query parameters to configure a generated report. For
  example, "statistics_only" just to get a general information about
  the workflow execution tree, not the tree itself. Another example
  is "running_only" to retain only not finished workflow paths.

Implements blueprint: mistral-error-analysis

Change-Id: Id3e17821e04b7a1b84dfea5126d223d90ad8e3c2
This commit is contained in:
Renat Akhmerov 2019-01-15 17:13:42 +07:00
parent 81af1b4838
commit ae2c5fdbbb
5 changed files with 736 additions and 2 deletions

View File

@ -23,6 +23,7 @@ from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from mistral.api import access_control as acl
from mistral.api.controllers.v2 import execution_report
from mistral.api.controllers.v2 import resources
from mistral.api.controllers.v2 import task
from mistral.api.controllers.v2 import types
@ -82,6 +83,7 @@ def _get_workflow_execution(id, must_exist=True):
class ExecutionsController(rest.RestController):
tasks = task.ExecutionTasksController()
report = execution_report.ExecutionReportController()
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Execution, wtypes.text)

View File

@ -0,0 +1,164 @@
# Copyright 2019 - Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from pecan import rest
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers.v2 import resources
from mistral.api.controllers.v2 import types
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
from mistral.utils import rest_utils
from mistral.workflow import states
LOG = logging.getLogger(__name__)
def create_workflow_execution_entry(wf_ex):
return resources.WorkflowExecutionReportEntry.from_db_model(wf_ex)
def create_task_execution_entry(task_ex):
return resources.TaskExecutionReportEntry.from_db_model(task_ex)
def create_action_execution_entry(action_ex):
return resources.ActionExecutionReportEntry.from_db_model(action_ex)
def update_statistics_with_task(stat, task_ex):
if task_ex.state == states.RUNNING:
stat.increment_running()
elif task_ex.state == states.SUCCESS:
stat.increment_success()
elif task_ex.state == states.ERROR:
stat.increment_error()
elif task_ex.state == states.IDLE:
stat.increment_idle()
elif task_ex.state == states.PAUSED:
stat.increment_paused()
def analyse_task_execution(task_ex_id, stat, filters, cur_depth):
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id)
if filters['errors_only'] and task_ex.state != states.ERROR:
return None
update_statistics_with_task(stat, task_ex)
entry = create_task_execution_entry(task_ex)
child_executions = task_ex.executions
entry.action_executions = []
entry.workflow_executions = []
for c_ex in child_executions:
if isinstance(c_ex, db_models.ActionExecution):
entry.action_executions.append(
create_action_execution_entry(c_ex)
)
else:
entry.workflow_executions.append(
analyse_workflow_execution(c_ex.id, stat, filters, cur_depth)
)
return entry
def analyse_workflow_execution(wf_ex_id, stat, filters, cur_depth):
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)
entry = create_workflow_execution_entry(wf_ex)
max_depth = filters['max_depth']
# Don't get deeper into the workflow task executions if
# maximum depth is defined and the current depth exceeds it.
if 0 <= max_depth < cur_depth:
return entry
task_execs = wf_ex.task_executions
entry.task_executions = []
for t_ex in task_execs:
task_exec_entry = analyse_task_execution(
t_ex.id,
stat,
filters,
cur_depth + 1
)
if task_exec_entry:
entry.task_executions.append(task_exec_entry)
return entry
def build_report(wf_ex_id, filters):
report = resources.ExecutionReport()
stat = resources.ExecutionReportStatistics()
report.statistics = stat
report.root_workflow_execution = analyse_workflow_execution(
wf_ex_id,
stat,
filters,
0
)
return report
class ExecutionReportController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.ExecutionReport, types.uuid, bool, int)
def get(self, workflow_execution_id, errors_only=False, max_depth=-1):
"""Return workflow execution report.
:param workflow_execution_id: The ID of the workflow execution to
generate a report for.
:param errors_only: Optional. If True, only error paths of the
execution tree are included into the report. The root execution
(with the specified id) is always included, but its tasks may
or may not be included depending on this flag's value.
:param max_depth: Optional. Limits the depth of recursion while
obtaining the execution tree. That is, subworkflows of what
maximum depth will be included into the report. If a value of the
flag is a negative number then no limit is set.
The root execution has depth 0 so if the flag is 0 then only
the root execution, its tasks and their actions will be included.
If some of the tasks in turn run workflows then these subworkflows
will be also included but without their tasks. The algorithm will
fully analyse their tasks only if max_depth is greater than zero.
"""
LOG.info(
"Fetch execution report [workflow_execution_id=%s]",
workflow_execution_id
)
filters = {
'errors_only': errors_only,
'max_depth': max_depth
}
return build_report(workflow_execution_id, filters)

View File

@ -721,3 +721,158 @@ class EventTriggers(resource.ResourceList):
"marker=123e4567-e89b-12d3-a456-426655440000")
return triggers_sample
class BaseExecutionReportEntry(resource.Resource):
"""Execution report entry resource."""
id = wtypes.text
name = wtypes.text
created_at = wtypes.text
updated_at = wtypes.text
state = wtypes.text
state_info = wtypes.text
@classmethod
def sample(cls):
# TODO(rakhmerov): complete
return cls(
id='123e4567-e89b-12d3-a456-426655441414',
created_at='2019-01-30T00:00:00.000000',
updated_at='2019-01-30T00:00:00.000000',
state=states.SUCCESS
)
class ActionExecutionReportEntry(BaseExecutionReportEntry):
"""Action execution report entry resource."""
accepted = bool
last_heartbeat = wtypes.text
@classmethod
def sample(cls):
sample = super(ActionExecutionReportEntry, cls).sample()
sample.accepted = True
sample.last_heartbeat = '2019-01-30T00:00:00.000000'
return sample
class WorkflowExecutionReportEntry(BaseExecutionReportEntry):
"""Workflow execution report entry resource."""
# NOTE(rakhmerov): task_executions has to be declared below
# after we declare a class for task execution entry resource.
@classmethod
def sample(cls):
sample = super(WorkflowExecutionReportEntry, cls).sample()
# We can't define a non-empty list task executions here because
# the needed class is not defined yet. Since this is just a sample
# we can sacrifice it.
sample.task_executions = []
return sample
class TaskExecutionReportEntry(BaseExecutionReportEntry):
"""Task execution report entity resource."""
action_executions = [ActionExecutionReportEntry]
workflow_executions = [WorkflowExecutionReportEntry]
@classmethod
def sample(cls):
sample = super(TaskExecutionReportEntry, cls).sample()
sample.action_executions = [ActionExecutionReportEntry.sample()]
sample.workflow_executions = []
return sample
# We have to declare this field later because of the dynamic binding.
# It can't be within WorkflowExecutionReportEntry before
# TaskExecutionReportEntry is declared.
WorkflowExecutionReportEntry.task_executions = [TaskExecutionReportEntry]
wtypes.registry.reregister(WorkflowExecutionReportEntry)
class ExecutionReportStatistics(resource.Resource):
"""Execution report statistics.
TODO(rakhmerov): There's much more we can add here. For example,
information about action, average (and also min and max) task execution
run time etc.
"""
total_tasks_count = wtypes.IntegerType(minimum=0)
running_tasks_count = wtypes.IntegerType(minimum=0)
success_tasks_count = wtypes.IntegerType(minimum=0)
error_tasks_count = wtypes.IntegerType(minimum=0)
idle_tasks_count = wtypes.IntegerType(minimum=0)
paused_tasks_count = wtypes.IntegerType(minimum=0)
def __init__(self, **kw):
self.total_tasks_count = 0
self.running_tasks_count = 0
self.success_tasks_count = 0
self.error_tasks_count = 0
self.idle_tasks_count = 0
self.paused_tasks_count = 0
super(ExecutionReportStatistics, self).__init__(**kw)
def increment_running(self):
self.running_tasks_count += 1
self.total_tasks_count += 1
def increment_success(self):
self.success_tasks_count += 1
self.total_tasks_count += 1
def increment_error(self):
self.error_tasks_count += 1
self.total_tasks_count += 1
def increment_idle(self):
self.idle_tasks_count += 1
self.total_tasks_count += 1
def increment_paused(self):
self.paused_tasks_count += 1
self.total_tasks_count += 1
@classmethod
def sample(cls):
return cls(
total_tasks_count=10,
running_tasks_count=3,
success_tasks_count=5,
error_tasks_count=2,
idle_tasks_count=0,
paused_tasks_count=0
)
class ExecutionReport(resource.Resource):
"""Execution report resource."""
statistics = ExecutionReportStatistics
"""General statistics about the workflow execution hierarchy."""
root_workflow_execution = WorkflowExecutionReportEntry
"""Root entry of the report associated with a workflow execution."""
@classmethod
def sample(cls):
sample = cls()
sample.statistics = ExecutionReportStatistics.sample()
sample.root_workflow_execution = WorkflowExecutionReportEntry.sample()
return sample

View File

@ -36,8 +36,15 @@ from mistral.workflow import states
LOG = logging.getLogger(__name__)
STATE_TYPES = wtypes.Enum(str, states.IDLE, states.RUNNING, states.SUCCESS,
states.ERROR, states.RUNNING_DELAYED)
STATE_TYPES = wtypes.Enum(
str,
states.IDLE,
states.RUNNING,
states.SUCCESS,
states.ERROR,
states.RUNNING_DELAYED
)
def _get_task_resource_with_result(task_ex):

View File

@ -0,0 +1,406 @@
# Copyright 2019 - Nokia Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.services import workbooks as wb_service
from mistral.services import workflows as wf_service
from mistral.tests.unit.api import base
from mistral.tests.unit.engine import base as engine_base
from mistral.workflow import states
class TestExecutionReportController(base.APITest, engine_base.EngineTestCase):
def test_simple_sequence_wf(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
action: std.fail
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_error(wf_ex.id)
resp = self.app.get('/v2/executions/%s/report' % wf_ex.id)
self.assertEqual(200, resp.status_int)
# Now let's verify the response structure
self.assertIn('root_workflow_execution', resp.json)
root_wf_ex = resp.json['root_workflow_execution']
self.assertIsInstance(root_wf_ex, dict)
self.assertEqual(wf_ex.id, root_wf_ex['id'])
self.assertEqual(wf_ex.name, root_wf_ex['name'])
self.assertEqual(states.ERROR, root_wf_ex['state'])
self.assertGreater(len(root_wf_ex['state_info']), 0)
tasks = root_wf_ex['task_executions']
self.assertIsInstance(tasks, list)
self.assertEqual(2, len(tasks))
# Verify task1 info.
task1 = self._assert_single_item(
tasks,
name='task1',
state=states.SUCCESS
)
self.assertEqual(0, len(task1['workflow_executions']))
self.assertEqual(1, len(task1['action_executions']))
task1_action = task1['action_executions'][0]
self.assertEqual(states.SUCCESS, task1_action['state'])
self.assertEqual('std.noop', task1_action['name'])
# Verify task2 info.
task2 = self._assert_single_item(
tasks,
name='task2',
state=states.ERROR
)
self.assertEqual(1, len(task2['action_executions']))
task2_action = task2['action_executions'][0]
self.assertEqual(0, len(task2['workflow_executions']))
self.assertEqual(states.ERROR, task2_action['state'])
# Verify statistics.
stat = resp.json['statistics']
self.assertEqual(1, stat['error_tasks_count'])
self.assertEqual(0, stat['idle_tasks_count'])
self.assertEqual(0, stat['paused_tasks_count'])
self.assertEqual(0, stat['running_tasks_count'])
self.assertEqual(1, stat['success_tasks_count'])
self.assertEqual(2, stat['total_tasks_count'])
def test_nested_wf(self):
wb_text = """---
version: '2.0'
name: wb
workflows:
parent_wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
workflow: sub_wf
on-success: task3
task3:
action: std.fail
sub_wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
action: std.fail
"""
wb_service.create_workbook_v2(wb_text)
wf_ex = self.engine.start_workflow('wb.parent_wf')
self.await_workflow_error(wf_ex.id)
resp = self.app.get('/v2/executions/%s/report' % wf_ex.id)
self.assertEqual(200, resp.status_int)
# Now let's verify the response structure
self.assertIn('root_workflow_execution', resp.json)
root_wf_ex = resp.json['root_workflow_execution']
self.assertIsInstance(root_wf_ex, dict)
self.assertEqual('wb.parent_wf', root_wf_ex['name'])
self.assertEqual(states.ERROR, root_wf_ex['state'])
self.assertGreater(len(root_wf_ex['state_info']), 0)
tasks = root_wf_ex['task_executions']
self.assertIsInstance(tasks, list)
self.assertEqual(2, len(tasks))
# Verify task1 info.
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(states.SUCCESS, task1['state'])
self.assertEqual(0, len(task1['workflow_executions']))
self.assertEqual(1, len(task1['action_executions']))
task1_action = task1['action_executions'][0]
self.assertEqual(states.SUCCESS, task1_action['state'])
self.assertEqual('std.noop', task1_action['name'])
# Verify task2 info.
task2 = self._assert_single_item(tasks, name='task2')
self.assertEqual(states.ERROR, task2['state'])
self.assertEqual(0, len(task2['action_executions']))
self.assertEqual(1, len(task2['workflow_executions']))
sub_wf_entry = task2['workflow_executions'][0]
self.assertEqual(states.ERROR, sub_wf_entry['state'])
sub_wf_tasks = sub_wf_entry['task_executions']
self.assertEqual(2, len(sub_wf_tasks))
sub_wf_task1 = self._assert_single_item(
sub_wf_tasks,
name='task1',
state=states.SUCCESS
)
sub_wf_task2 = self._assert_single_item(
sub_wf_tasks,
name='task2',
state=states.ERROR
)
self.assertEqual(1, len(sub_wf_task1['action_executions']))
self.assertEqual(
states.SUCCESS,
sub_wf_task1['action_executions'][0]['state']
)
self.assertEqual(1, len(sub_wf_task2['action_executions']))
self.assertEqual(
states.ERROR,
sub_wf_task2['action_executions'][0]['state']
)
# Verify statistics.
stat = resp.json['statistics']
self.assertEqual(2, stat['error_tasks_count'])
self.assertEqual(0, stat['idle_tasks_count'])
self.assertEqual(0, stat['paused_tasks_count'])
self.assertEqual(0, stat['running_tasks_count'])
self.assertEqual(2, stat['success_tasks_count'])
self.assertEqual(4, stat['total_tasks_count'])
def test_nested_wf_errors_only(self):
wb_text = """---
version: '2.0'
name: wb
workflows:
parent_wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
workflow: sub_wf
on-success: task3
task3:
action: std.fail
sub_wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
action: std.fail
"""
wb_service.create_workbook_v2(wb_text)
wf_ex = self.engine.start_workflow('wb.parent_wf')
self.await_workflow_error(wf_ex.id)
resp = self.app.get(
'/v2/executions/%s/report?errors_only=true' % wf_ex.id
)
self.assertEqual(200, resp.status_int)
# Now let's verify the response structure
self.assertIn('root_workflow_execution', resp.json)
root_wf_ex = resp.json['root_workflow_execution']
self.assertIsInstance(root_wf_ex, dict)
self.assertEqual('wb.parent_wf', root_wf_ex['name'])
self.assertEqual(states.ERROR, root_wf_ex['state'])
self.assertGreater(len(root_wf_ex['state_info']), 0)
tasks = root_wf_ex['task_executions']
self.assertIsInstance(tasks, list)
self.assertEqual(1, len(tasks))
# There must be only task2 in the response.
# Verify task2 info.
task2 = self._assert_single_item(tasks, name='task2')
self.assertEqual(states.ERROR, task2['state'])
self.assertEqual(0, len(task2['action_executions']))
self.assertEqual(1, len(task2['workflow_executions']))
sub_wf_entry = task2['workflow_executions'][0]
self.assertEqual(states.ERROR, sub_wf_entry['state'])
sub_wf_tasks = sub_wf_entry['task_executions']
self.assertEqual(1, len(sub_wf_tasks))
sub_wf_task2 = self._assert_single_item(
sub_wf_tasks,
name='task2',
state=states.ERROR
)
self.assertEqual(1, len(sub_wf_task2['action_executions']))
self.assertEqual(
states.ERROR,
sub_wf_task2['action_executions'][0]['state']
)
# Verify statistics.
stat = resp.json['statistics']
self.assertEqual(2, stat['error_tasks_count'])
self.assertEqual(0, stat['idle_tasks_count'])
self.assertEqual(0, stat['paused_tasks_count'])
self.assertEqual(0, stat['running_tasks_count'])
self.assertEqual(0, stat['success_tasks_count'])
self.assertEqual(2, stat['total_tasks_count'])
def test_nested_wf_max_depth(self):
wb_text = """---
version: '2.0'
name: wb
workflows:
parent_wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
workflow: sub_wf
on-success: task3
task3:
action: std.fail
sub_wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
action: std.fail
"""
wb_service.create_workbook_v2(wb_text)
wf_ex = self.engine.start_workflow('wb.parent_wf')
self.await_workflow_error(wf_ex.id)
resp = self.app.get('/v2/executions/%s/report?max_depth=0' % wf_ex.id)
self.assertEqual(200, resp.status_int)
# Now let's verify the response structure
self.assertIn('root_workflow_execution', resp.json)
root_wf_ex = resp.json['root_workflow_execution']
self.assertIsInstance(root_wf_ex, dict)
self.assertEqual('wb.parent_wf', root_wf_ex['name'])
self.assertEqual(states.ERROR, root_wf_ex['state'])
self.assertGreater(len(root_wf_ex['state_info']), 0)
tasks = root_wf_ex['task_executions']
self.assertIsInstance(tasks, list)
self.assertEqual(2, len(tasks))
# Verify task1 info.
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(states.SUCCESS, task1['state'])
self.assertEqual(0, len(task1['workflow_executions']))
self.assertEqual(1, len(task1['action_executions']))
task1_action = task1['action_executions'][0]
self.assertEqual(states.SUCCESS, task1_action['state'])
self.assertEqual('std.noop', task1_action['name'])
# Verify task2 info.
task2 = self._assert_single_item(tasks, name='task2')
self.assertEqual(states.ERROR, task2['state'])
self.assertEqual(0, len(task2['action_executions']))
self.assertEqual(1, len(task2['workflow_executions']))
sub_wf_entry = task2['workflow_executions'][0]
self.assertEqual(states.ERROR, sub_wf_entry['state'])
# We still must have an entry for the subworkflow itself
# but it must not have info about task executions because
# we've now limited max depth.
self.assertNotIn('task_executions', sub_wf_entry)
# Verify statistics.
stat = resp.json['statistics']
self.assertEqual(1, stat['error_tasks_count'])
self.assertEqual(0, stat['idle_tasks_count'])
self.assertEqual(0, stat['paused_tasks_count'])
self.assertEqual(0, stat['running_tasks_count'])
self.assertEqual(1, stat['success_tasks_count'])
self.assertEqual(2, stat['total_tasks_count'])