Sending TASK_FAILED event in case of MistralException

If task was failed by unhandled exceptions, Mistral was not sending
TASK_FAILED event, which could be critical for Mistral's clients, who
use notifications mechanism to handle issues.

Change-Id: I460686c2852e3eb642506049ad5c33705697ecd8
Closes-Bug: #1803746
Signed-off-by: Oleg Ovcharuk <vgvoleg@gmail.com>
This commit is contained in:
Oleg Ovcharuk 2018-11-21 12:59:45 +03:00
parent 81af1b4838
commit 87200f6aea
2 changed files with 60 additions and 35 deletions

View File

@ -70,12 +70,7 @@ def run_task(wf_cmd):
(e, wf_ex.name, task_spec.get_name(), tb.format_exc())
)
LOG.error(msg)
task.set_state(states.ERROR, msg)
task.save_finished_time()
wf_handler.force_fail_workflow(wf_ex, msg)
force_fail_task(task.task_ex, msg, task=task)
return
@ -123,12 +118,7 @@ def _on_action_complete(action_ex):
" action=%s]:\n%s" %
(e, wf_ex.name, task_ex.name, action_ex.name, tb.format_exc()))
LOG.error(msg)
task.set_state(states.ERROR, msg)
task.save_finished_time()
wf_handler.force_fail_workflow(wf_ex, msg)
force_fail_task(task_ex, msg, task=task)
return
@ -183,19 +173,14 @@ def _on_action_update(action_ex):
" action=%s]:\n%s" %
(e, wf_ex.name, task_ex.name, action_ex.name, tb.format_exc()))
LOG.error(msg)
task.set_state(states.ERROR, msg)
task.save_finished_time()
wf_handler.force_fail_workflow(wf_ex, msg)
force_fail_task(task_ex, msg, task=task)
return
_check_affected_tasks(task)
def force_fail_task(task_ex, msg):
def force_fail_task(task_ex, msg, task=None):
"""Forces the given task to fail.
This method implements the 'forced' task fail without giving a chance
@ -207,14 +192,22 @@ def force_fail_task(task_ex, msg):
:param task_ex: Task execution.
:param msg: Error message.
:param task: Task object. Optional.
"""
wf_spec = spec_parser.get_workflow_spec_by_execution_id(
task_ex.workflow_execution_id
)
task = _build_task_from_execution(wf_spec, task_ex)
LOG.error(msg)
if not task:
wf_spec = spec_parser.get_workflow_spec_by_execution_id(
task_ex.workflow_execution_id
)
task = _build_task_from_execution(wf_spec, task_ex)
old_task_state = task_ex.state
task.set_state(states.ERROR, msg)
task.notify(old_task_state, states.ERROR)
task.save_finished_time()
wf_handler.force_fail_workflow(task_ex.workflow_execution, msg)
@ -239,12 +232,7 @@ def continue_task(task_ex):
(e, wf_ex.name, task_ex.name, tb.format_exc())
)
LOG.error(msg)
task.set_state(states.ERROR, msg)
task.save_finished_time()
wf_handler.force_fail_workflow(wf_ex, msg)
force_fail_task(task_ex, msg, task=task)
return
@ -268,12 +256,7 @@ def complete_task(task_ex, state, state_info):
(e, wf_ex.name, task_ex.name, tb.format_exc())
)
LOG.error(msg)
task.set_state(states.ERROR, msg)
task.save_finished_time()
wf_handler.force_fail_workflow(wf_ex, msg)
force_fail_task(task_ex, msg, task=task)
return

View File

@ -1035,3 +1035,45 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertTrue(self.publishers['wbhk'].publish.called)
self.assertListEqual(expected_order, EVENT_LOGS)
def test_notify_task_input_error(self):
wf_def = """---
version: '2.0'
wf:
tasks:
task1:
input:
url: <% $.ItWillBeError %>
action: std.http
on-error: task2
task2:
action: std.noop
"""
wf_svc.create_workflows(wf_def)
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
wf_ex = self.engine.start_workflow('wf', '', **params)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_exs = wf_ex.task_executions
self.assertEqual(1, len(task_exs))
t1_ex = self._assert_single_item(task_exs, name='task1')
self.assertEqual(states.ERROR, t1_ex.state)
expected_order = [
(wf_ex.id, events.WORKFLOW_LAUNCHED),
(t1_ex.id, events.TASK_LAUNCHED),
(t1_ex.id, events.TASK_FAILED),
(wf_ex.id, events.WORKFLOW_FAILED)
]
self.assertTrue(self.publishers['wbhk'].publish.called)
self.assertListEqual(expected_order, EVENT_LOGS)