Fail-on policy

Fail-on policy allows to fail success tasks by condition. It is useful
in cases we have to fail task if its result is unacceptable and it makes
workflow definition more readable.

Change-Id: I57b4f3d1533982d3b9b7063925f8d70f044aefea
Implements: blueprint fail-on-policy
Signed-off-by: Oleg Ovcharuk <vgvoleg@gmail.com>
This commit is contained in:
Oleg Ovcharuk 2019-06-07 00:30:20 +03:00 committed by Eyal
parent ee94087c89
commit 4e926a1f13
9 changed files with 218 additions and 7 deletions

View File

@ -86,6 +86,7 @@ YAML example
pause-before: true
wait-before: 2
wait-after: 4
fail-on: <% $.some_value < 4 %>
timeout: 30
retry:
count: 10
@ -110,6 +111,11 @@ There are different types of policies in Mistral.
has completed before starting the tasks specified in *'on-success'*,
*'on-error'* or *'on-complete'*.
4. **fail-on**
Specifies a condition under which the task will fail, even if
the action was completed successfully.
4. **timeout**
Specifies a period of time in seconds after which a task will be failed

View File

@ -162,6 +162,7 @@ Common workflow attributes
- **pause-before** - Configures pause-before policy. *Optional*.
- **wait-before** - Configures wait-before policy. *Optional*.
- **wait-after** - Configures wait-after policy. *Optional*.
- **fail-on** - Configures fail-on policy. *Optional*.
- **timeout** - Configures timeout policy. *Optional*.
- **retry** - Configures retry policy. *Optional*.
- **concurrency** - Configures concurrency policy. *Optional*.
@ -270,6 +271,7 @@ attributes:
- **pause-before** - Configures pause-before policy. *Optional*.
- **wait-before** - Configures wait-before policy. *Optional*.
- **wait-after** - Configures wait-after policy. *Optional*.
- **fail-on** - Configures fail-on policy. *Optional*.
- **timeout** - Configures timeout policy. *Optional*.
- **retry** - Configures retry policy. *Optional*.
- **concurrency** - Configures concurrency policy. *Optional*.
@ -355,6 +357,7 @@ YAML example
  pause-before: true
  wait-before: 2
  wait-after: 4
  fail-on: <% $.some_value < 4 %>
  timeout: 30
  retry:
    count: 10
@ -381,6 +384,12 @@ has completed before starting next tasks defined in *on-success*,
*on-error* or *on-complete*.
**fail-on**
Defines a condition under which the task will fail, even if
the action was completed successfully.
**timeout**
Defines a period of time in seconds after which a task will be failed

View File

@ -131,7 +131,7 @@ definitions:
So the call chain looks like this:
.. code-block::
.. code-block:: console
wf1 -> wf2 -> wf3
@ -154,7 +154,7 @@ these namespaces:
And we create a workflow execution like this via API:
.. code-block::
.. code-block:: console
POST /v2/executions
@ -175,7 +175,7 @@ In this case, Mistral will:
However, if we launch a workflow like this:
.. code-block::
.. code-block:: console
POST /v2/executions
@ -186,7 +186,7 @@ However, if we launch a workflow like this:
We'll get the call chain
.. code-block::
.. code-block:: console
wf2 -> wf3

View File

@ -56,6 +56,7 @@ def get_policy_factories():
build_pause_before_policy,
build_wait_before_policy,
build_wait_after_policy,
build_fail_on_policy,
build_retry_policy,
build_timeout_policy,
build_concurrency_policy
@ -150,6 +151,16 @@ def build_concurrency_policy(policies_spec):
if concurrency_policy else None)
def build_fail_on_policy(policies_spec):
if not policies_spec:
return None
fail_on_policy = policies_spec.get_fail_on()
return (FailOnPolicy(fail_on_policy)
if fail_on_policy else None)
def _ensure_context_has_key(runtime_context, key):
if not runtime_context:
runtime_context = {}
@ -542,6 +553,30 @@ class ConcurrencyPolicy(base.TaskPolicy):
task_ex.runtime_context = runtime_context
class FailOnPolicy(base.TaskPolicy):
_schema = {
"properties": {
"fail-on": {"type": "boolean"},
}
}
def __init__(self, fail_on):
self.fail_on = fail_on
def before_task_start(self, task_ex, task_spec):
pass
def after_task_complete(self, task_ex, task_spec):
if task_ex.state != states.SUCCESS:
return
super(FailOnPolicy, self).after_task_complete(task_ex, task_spec)
if self.fail_on:
task_ex.state = states.ERROR
task_ex.state_info = 'Failed by fail-on policy'
@db_utils.retry_on_db_error
@post_tx_queue.run
def _continue_task(task_ex_id):

View File

@ -29,6 +29,7 @@ class PoliciesSpec(base.BaseSpec):
"timeout": types.EXPRESSION_OR_POSITIVE_INTEGER,
"pause-before": types.EXPRESSION_OR_BOOLEAN,
"concurrency": types.EXPRESSION_OR_POSITIVE_INTEGER,
"fail-on": types.EXPRESSION_OR_BOOLEAN
},
"additionalProperties": False
}
@ -46,6 +47,7 @@ class PoliciesSpec(base.BaseSpec):
self._timeout = data.get('timeout', 0)
self._pause_before = data.get('pause-before', False)
self._concurrency = data.get('concurrency', 0)
self._fail_on = data.get('fail-on', False)
def validate_schema(self):
super(PoliciesSpec, self).validate_schema()
@ -56,6 +58,7 @@ class PoliciesSpec(base.BaseSpec):
self.validate_expr(self._data.get('timeout', 0))
self.validate_expr(self._data.get('pause-before', False))
self.validate_expr(self._data.get('concurrency', 0))
self.validate_expr(self._data.get('fail-on', False))
def get_retry(self):
return self._retry
@ -74,3 +77,6 @@ class PoliciesSpec(base.BaseSpec):
def get_concurrency(self):
return self._concurrency
def get_fail_on(self):
return self._fail_on

View File

@ -38,6 +38,7 @@ class TaskDefaultsSpec(base.BaseSpec):
"timeout": types.EXPRESSION_OR_POSITIVE_INTEGER,
"pause-before": types.EXPRESSION_OR_BOOLEAN,
"concurrency": types.EXPRESSION_OR_POSITIVE_INTEGER,
"fail-on": types.EXPRESSION_OR_BOOLEAN,
"on-complete": on_clause.OnClauseSpec.get_schema(),
"on-success": on_clause.OnClauseSpec.get_schema(),
"on-error": on_clause.OnClauseSpec.get_schema(),
@ -63,7 +64,8 @@ class TaskDefaultsSpec(base.BaseSpec):
'wait-after',
'timeout',
'pause-before',
'concurrency'
'concurrency',
'fail-on'
)
on_spec_cls = on_clause.OnClauseSpec

View File

@ -75,6 +75,7 @@ class TaskSpec(base.BaseSpec):
"timeout": types.EXPRESSION_OR_POSITIVE_INTEGER,
"pause-before": types.EXPRESSION_OR_BOOLEAN,
"concurrency": types.EXPRESSION_OR_POSITIVE_INTEGER,
"fail-on": types.EXPRESSION_OR_BOOLEAN,
"target": types.NONEMPTY_STRING,
"keep-result": types.EXPRESSION_OR_BOOLEAN,
"safe-rerun": types.EXPRESSION_OR_BOOLEAN
@ -120,7 +121,8 @@ class TaskSpec(base.BaseSpec):
'wait-after',
'timeout',
'pause-before',
'concurrency'
'concurrency',
'fail-on'
)
self._target = data.get('target')
self._keep_result = data.get('keep-result', True)

View File

@ -1874,3 +1874,154 @@ class PoliciesTest(base.EngineTestCase):
{},
fail_task_ex.runtime_context["retry_task_policy"]
)
def test_fail_on_true_condition(self):
retry_wb = """---
version: '2.0'
name: wb
workflows:
wf1:
tasks:
task1:
action: std.echo output=4
fail-on: <% task(task1).result <= 4 %>
"""
wb_service.create_workbook_v2(retry_wb)
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(task_ex.state, states.ERROR, "Check task state")
def test_fail_on_false_condition(self):
retry_wb = """---
version: '2.0'
name: wb
workflows:
wf1:
tasks:
task1:
action: std.echo output=4
fail-on: <% task(task1).result != 4 %>
"""
wb_service.create_workbook_v2(retry_wb)
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(task_ex.state, states.SUCCESS, "Check task state")
def test_fail_on_true_condition_task_defaults(self):
retry_wb = """---
version: '2.0'
name: wb
workflows:
wf1:
task-defaults:
fail-on: <% task().result <= 4 %>
tasks:
task1:
action: std.echo output=4
"""
wb_service.create_workbook_v2(retry_wb)
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(task_ex.state, states.ERROR, "Check task state")
@mock.patch.object(
std_actions.EchoAction,
'run',
mock.Mock(side_effect=[1, 2, 3, 4])
)
def test_fail_on_with_retry(self):
retry_wb = """---
version: '2.0'
name: wb
workflows:
wf1:
tasks:
task1:
action: std.echo output="mocked"
fail-on: <% task(task1).result <= 2 %>
retry:
count: 3
delay: 0
"""
wb_service.create_workbook_v2(retry_wb)
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(task_ex.state, states.SUCCESS, "Check task state")
self.assertEqual(
2,
task_ex.runtime_context['retry_task_policy']['retry_no']
)
@mock.patch.object(
std_actions.EchoAction,
'run',
mock.Mock(side_effect=[1, 2, 3, 4])
)
def test_fail_on_with_retry_and_with_items(self):
retry_wb = """---
version: '2.0'
name: wb
workflows:
wf1:
tasks:
task1:
with-items: x in [1, 2]
action: std.echo output="mocked"
fail-on: <% not task(task1).result.contains(4) %>
retry:
count: 3
delay: 0
"""
wb_service.create_workbook_v2(retry_wb)
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(task_ex.state, states.SUCCESS, "Check task state")
self.assertEqual(
1,
task_ex.runtime_context['retry_task_policy']['retry_no']
)

View File

@ -34,6 +34,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
pecan>=1.2.1 # BSD
python-barbicanclient>=4.5.2 # Apache-2.0
python-cinderclient!=4.0.0,>=3.3.0 # Apache-2.0
python-zaqarclient>=1.0.0 # Apache-2.0
python-designateclient>=2.7.0 # Apache-2.0
python-glanceclient>=2.8.0 # Apache-2.0
python-glareclient>=0.3.0 # Apache-2.0
@ -52,7 +53,6 @@ python-troveclient>=2.2.0 # Apache-2.0
python-ironicclient!=2.7.1,>=2.7.0 # Apache-2.0
python-ironic-inspector-client>=1.5.0 # Apache-2.0
python-vitrageclient>=2.0.0 # Apache-2.0
python-zaqarclient>=1.0.0 # Apache-2.0
python-zunclient>=1.0.0 # Apache-2.0
python-qinlingclient>=1.0.0 # Apache-2.0
PyJWT>=1.0.1 # MIT