Dynamic workflow name evaluation.

Evaluate workflow names dynamically, so yaql or jinja expression
is allowed as sub-workflow name. Tasks names are not yet
dynamically evaluated.

Partially implements: blueprint mistral-dynamic-actions

Change-Id: Icfe591e27a4f45c2e3dcfa83512217f3b2122189
This commit is contained in:
Istvan Imre 2017-02-15 08:06:36 +01:00
parent 18bab73ed7
commit ff78d7f659
8 changed files with 298 additions and 25 deletions

View File

@ -191,14 +191,16 @@ attributes:
- **action** - Name of the action associated with the task.
*Mutually exclusive with* **workflow**. If neither action nor workflow are
provided then the action 'std.noop' will be used.
- **workflow** - Name of the workflow associated with the task.
- **workflow** - Name of the workflow associated with the task. Can be static
value or an expression (for example, "{{ _.subworkflow_name }}").
*Mutually exclusive with* **action**.
- **input** - Actual input parameter values of the task. *Optional*.
Value of each parameter is a JSON-compliant type such as number,
string etc, dictionary or list. It can also be an expression to
retrieve value from task context or any of the mentioned types
containing inline expressions (for example, string "<%
$.movie_name %> is a cool movie!")
$.movie_name %> is a cool movie!") Can be an expression that evaluates to
a JSON object.
- **publish** - Dictionary of variables to publish to the workflow
context. Any JSON-compatible data structure optionally containing
expression to select precisely what needs to be published.
@ -226,6 +228,66 @@ attributes:
during action execution. If set to 'true' task may be run twice.
*Optional*. By default set to 'false'.
Workflow
''''''''
Synchronously starts a sub-workflow with the given name.
Example static workflow call:
.. code-block:: mistral
my_task:
  workflow: name_of_my_workflow
Example dynamic workflow selection:
.. code-block:: mistral
---
version: '2.0'
name: weather_data_processing
workflows:
framework:
input:
- magic_workflow_name: show_weather
tasks:
weather_data:
action: std.echo
input:
output:
location: wherever
temperature: "22C"
publish:
weather_data: <% task(weather_data).result %>
on-success:
- do_magic
do_magic:
# reference workflow by parameter
workflow: <% $.magic_workflow_name %>
# expand dictionary to input parameters
input: <% $.weather_data %>
show_weather:
input:
- location
- temperature
tasks:
write_data:
action: std.echo
input:
output: "<% $.location %>: <% $.temperature %>"
Note: Typical use for the dynamic workflow selection is when parts of a
workflow can be customized. E.g. collect some weather data and then execute
some custom workflow on it.
Policies
''''''''
@ -307,8 +369,8 @@ Retry policy can also be configured on a single line as:
All parameter values for any policy can be defined as expressions.
Simplified input syntax
'''''''''''''''''''''''
Input syntax
''''''''''''
When describing a workflow task it's possible to specify its input
parameters in two ways:
@ -330,6 +392,25 @@ Simplified syntax:
my_task:
  action: std.http url="http://mywebsite.org" method="GET"
Syntax with dynamic input parameter map:
.. code-block:: mistral
---
version: '2.0'
example_workflow:
input:
- http_request_parameters:
url: http://mywebsite.org
method: GET
tasks:
setup_task:
action: std.http
input: <% $.http_request_parameters %>
The same rules apply to tasks associated with workflows.
Full syntax:
@ -349,6 +430,22 @@ Simplified syntax:
my_task:
  workflow: some_nested_workflow param1='val1' param2='val2'
Syntax with dynamic input parameter map:
.. code-block:: mistral
---
version: '2.0'
example_workflow:
input:
- nested_params: {"param1": "val1", "param2": "val2"}
tasks:
setup_task:
workflow: some_nested_workflow
input: <% $.nested_params %>
**NOTE**: It's also possible to merge these two approaches and specify a part
of parameters using simplified key-value pairs syntax and using keyword *input*.
In this case all the parameters will be effectively merged. If the same

View File

@ -82,7 +82,8 @@ def on_action_update(action_ex, state):
@profiler.trace('action-handler-build-action', hide_args=True)
def _build_action(action_ex):
if isinstance(action_ex, models.WorkflowExecution):
return actions.WorkflowAction(None, action_ex=action_ex)
return actions.WorkflowAction(wf_name=action_ex.name,
action_ex=action_ex)
wf_name = None
wf_spec_name = None

View File

@ -489,6 +489,10 @@ class AdHocAction(PythonAction):
class WorkflowAction(Action):
"""Workflow action."""
def __init__(self, wf_name, **kwargs):
super(WorkflowAction, self).__init__(None, **kwargs)
self.wf_name = wf_name
@profiler.trace('workflow-action-complete', hide_args=True)
def complete(self, result):
# No-op because in case of workflow result is already processed.
@ -503,15 +507,11 @@ class WorkflowAction(Action):
parent_wf_ex.id
)
task_spec = spec_parser.get_task_spec(self.task_ex.spec)
wf_spec_name = task_spec.get_workflow_name()
wf_def = engine_utils.resolve_workflow_definition(
parent_wf_ex.workflow_name,
parent_wf_spec.get_name(),
namespace=parent_wf_ex.params['namespace'],
wf_spec_name=wf_spec_name
wf_spec_name=self.wf_name
)
wf_spec = spec_parser.get_workflow_spec_by_definition_id(

View File

@ -416,18 +416,14 @@ class RegularTask(Task):
@profiler.trace('regular-task-get-action-input', hide_args=True)
def _get_action_input(self, ctx=None):
ctx = ctx or self.ctx
input_dict = self._evaluate_expression(self.task_spec.get_input(), ctx)
ctx_view = data_flow.ContextView(
ctx,
self.wf_ex.context,
self.wf_ex.input
)
input_dict = expr.evaluate_recursively(
self.task_spec.get_input(),
ctx_view
)
if not isinstance(input_dict, dict):
raise exc.InputException(
"Wrong dynamic input for task: %s. Dict type is expected. "
"Actual type: %s. Actual value: %s" %
(self.task_spec.get_name(), type(input_dict), str(input_dict))
)
return utils.merge_dicts(
input_dict,
@ -435,12 +431,28 @@ class RegularTask(Task):
overwrite=False
)
def _evaluate_expression(self, expression, ctx=None):
ctx = ctx or self.ctx
ctx_view = data_flow.ContextView(
ctx,
self.wf_ex.context,
self.wf_ex.input
)
input_dict = expr.evaluate_recursively(
expression,
ctx_view
)
return input_dict
def _build_action(self):
action_name = self.task_spec.get_action_name()
wf_name = self.task_spec.get_workflow_name()
if wf_name:
return actions.WorkflowAction(wf_name, task_ex=self.task_ex)
return actions.WorkflowAction(
wf_name=self._evaluate_expression(wf_name),
task_ex=self.task_ex
)
if not action_name:
action_name = 'std.noop'

View File

@ -27,6 +27,8 @@ from mistral.utils import expression_utils
LOG = logging.getLogger(__name__)
ANY_JINJA_REGEXP = "{{.*}}|{%.*%}"
JINJA_REGEXP = '({{(.*)}})'
JINJA_BLOCK_REGEXP = '({%(.*)%})'

View File

@ -21,11 +21,19 @@ import six
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.expressions.jinja_expression import ANY_JINJA_REGEXP
from mistral.expressions.yaql_expression import INLINE_YAQL_REGEXP
from mistral.lang import types
from mistral import utils
CMD_PTRN = re.compile("^[\w\.]+[^=\(\s\"]*")
ACTION_PATTRENS = {
"command": "[\w\.]+[^=\(\s\"]*",
"yaql_expression": INLINE_YAQL_REGEXP,
"jinja_expression": ANY_JINJA_REGEXP,
}
CMD_PTRN = re.compile(
"^({})".format("|".join(six.itervalues(ACTION_PATTRENS)))
)
EXPRESSION = '|'.join([expr.patterns[name] for name in expr.patterns])
_ALL_IN_BRACKETS = "\[.*\]\s*"

View File

@ -50,7 +50,12 @@ class TaskSpec(base.BaseSpec):
"type": types.WORKFLOW_TYPE,
"action": types.NONEMPTY_STRING,
"workflow": types.NONEMPTY_STRING,
"input": types.NONEMPTY_DICT,
"input": {
"oneOf": [
types.NONEMPTY_DICT,
types.NONEMPTY_STRING
]
},
"with-items": {
"oneOf": [
types.NONEMPTY_STRING,

View File

@ -96,6 +96,100 @@ workflows:
action: std.noop
"""
WB3 = """
---
version: '2.0'
name: wb3
workflows:
wf1:
input:
- wf_name
output:
sub_wf_out: <% $.sub_wf_out %>
tasks:
task1:
workflow: <% $.wf_name %>
publish:
sub_wf_out: <% task(task1).result.sub_wf_out %>
wf2:
output:
sub_wf_out: wf2_out
tasks:
task1:
action: std.noop
"""
WB4 = """
---
version: '2.0'
name: wb4
workflows:
wf1:
input:
- wf_name
- inp
output:
sub_wf_out: <% $.sub_wf_out %>
tasks:
task1:
workflow: <% $.wf_name %>
input: <% $.inp %>
publish:
sub_wf_out: <% task(task1).result.sub_wf_out %>
wf2:
input:
- inp
output:
sub_wf_out: <% $.inp %>
tasks:
task1:
action: std.noop
"""
WB5 = """
---
version: '2.0'
name: wb5
workflows:
wf1:
input:
- wf_name
- inp
output:
sub_wf_out: '{{ _.sub_wf_out }}'
tasks:
task1:
workflow: '{{ _.wf_name }}'
input: '{{ _.inp }}'
publish:
sub_wf_out: '{{ task("task1").result.sub_wf_out }}'
wf2:
input:
- inp
output:
sub_wf_out: '{{ _.inp }}'
tasks:
task1:
action: std.noop
"""
class SubworkflowsTest(base.EngineTestCase):
def setUp(self):
@ -103,6 +197,9 @@ class SubworkflowsTest(base.EngineTestCase):
wb_service.create_workbook_v2(WB1)
wb_service.create_workbook_v2(WB2)
wb_service.create_workbook_v2(WB3)
wb_service.create_workbook_v2(WB4)
wb_service.create_workbook_v2(WB5)
def test_subworkflow_success(self):
wf2_ex = self.engine.start_workflow('wb1.wf2', '', None)
@ -261,3 +358,54 @@ class SubworkflowsTest(base.EngineTestCase):
# Wait till workflow 'wf2' is completed.
self.await_workflow_success(wf2_ex.id)
def test_dynamic_subworkflow_wf2(self):
ex = self.engine.start_workflow(
wf_identifier='wb3.wf1',
wf_input={'wf_name': 'wf2'}
)
self.await_workflow_success(ex.id)
with db_api.transaction():
ex = db_api.get_workflow_execution(ex.id)
self.assertEqual({'sub_wf_out': 'wf2_out'}, ex.output)
def test_dynamic_subworkflow_call_failure(self):
ex = self.engine.start_workflow(
wf_identifier='wb3.wf1',
wf_input={'wf_name': 'not_existing_wf'}
)
self.await_workflow_error(ex.id)
with db_api.transaction():
ex = db_api.get_workflow_execution(ex.id)
self.assertIn('not_existing_wf', ex.state_info)
def test_dynamic_subworkflow_with_generic_input(self):
self._test_dynamic_workflow_with_dict_param('wb4.wf1')
def test_dynamic_subworkflow_with_jinja(self):
self._test_dynamic_workflow_with_dict_param('wb5.wf1')
def test_string_workflow_input_failure(self):
ex = self.engine.start_workflow(
wf_identifier='wb4.wf1',
wf_input={'wf_name': 'wf2', 'inp': 'invalid_string_input'}
)
self.await_workflow_error(ex.id)
with db_api.transaction():
ex = db_api.get_workflow_execution(ex.id)
self.assertIn('invalid_string_input', ex.state_info)
def _test_dynamic_workflow_with_dict_param(self, wf_identifier):
ex = self.engine.start_workflow(
wf_identifier=wf_identifier,
wf_input={'wf_name': 'wf2', 'inp': {'inp': 'abc'}}
)
self.await_workflow_success(ex.id)
with db_api.transaction():
ex = db_api.get_workflow_execution(ex.id)
self.assertEqual({'sub_wf_out': 'abc'}, ex.output)