Merge "Allow engine commands as task name"
This commit is contained in:
commit
c54c35c82d
|
@ -711,6 +711,24 @@ This illustrates that, while designing a workflow, it's important to know
|
|||
precisely how Mistral processes 'on-success', 'on-error' and 'on-complete'
|
||||
and engine commands.
|
||||
|
||||
Engine commands and tasks
|
||||
'''''''''''''''''''''''''
|
||||
|
||||
The **on-*** clauses in direct workflows can refer both to tasks and engine
|
||||
commands, as demonstrated earlier. It is possible to use the engine commands
|
||||
as names for tasks. For example, one can create a task named `noop` or `fail`.
|
||||
These tasks will override the engine commands, that is, the action defined
|
||||
in these tasks will be executed instead of the engine commands. This is a
|
||||
method to succinctly extend the default behavior of the Mistral engine or
|
||||
provide side-effect free workflow examples.
|
||||
|
||||
The order in which task names are resolved is the following:
|
||||
|
||||
1. the task with the given name is searched
|
||||
2. the engine command with the given name is selected
|
||||
|
||||
The first option that matches is executed.
|
||||
|
||||
Fork
|
||||
''''
|
||||
|
||||
|
|
|
@ -33,12 +33,7 @@ _expr_ptrns = [expressions.patterns[name] for name in expressions.patterns]
|
|||
WITH_ITEMS_PTRN = re.compile(
|
||||
"\s*([\w\d_\-]+)\s*in\s*(\[.+\]|%s)" % '|'.join(_expr_ptrns)
|
||||
)
|
||||
RESERVED_TASK_NAMES = [
|
||||
'noop',
|
||||
'fail',
|
||||
'succeed',
|
||||
'pause'
|
||||
]
|
||||
|
||||
MAX_LENGTH_TASK_NAME = 255
|
||||
# Length of a join task name must be less than or equal to maximum
|
||||
# of task_executions unique_key and named_locks name. Their
|
||||
|
@ -155,12 +150,6 @@ class TaskSpec(base.BaseSpec):
|
|||
def _validate_name(self):
|
||||
task_name = self._data.get('name')
|
||||
|
||||
if task_name in RESERVED_TASK_NAMES:
|
||||
raise exc.InvalidModelException(
|
||||
"Reserved keyword '%s' not allowed as task name." %
|
||||
task_name
|
||||
)
|
||||
|
||||
if len(task_name) > MAX_LENGTH_TASK_NAME:
|
||||
raise exc.InvalidModelException(
|
||||
"The length of a '{0}' task name must not exceed {1}"
|
||||
|
|
|
@ -23,6 +23,17 @@ from mistral.lang.v2 import task_defaults
|
|||
from mistral.lang.v2 import tasks
|
||||
from mistral import utils
|
||||
|
||||
NOOP_COMMAND = 'noop'
|
||||
FAIL_COMMAND = 'fail'
|
||||
SUCCEED_COMMAND = 'succeed'
|
||||
PAUSE_COMMAND = 'pause'
|
||||
ENGINE_COMMANDS = [
|
||||
NOOP_COMMAND,
|
||||
FAIL_COMMAND,
|
||||
SUCCEED_COMMAND,
|
||||
PAUSE_COMMAND
|
||||
]
|
||||
|
||||
|
||||
class WorkflowSpec(base.BaseSpec):
|
||||
# See http://json-schema.org
|
||||
|
@ -93,7 +104,7 @@ class WorkflowSpec(base.BaseSpec):
|
|||
valid_task = self._task_exists(task_name)
|
||||
|
||||
if allow_engine_cmds:
|
||||
valid_task |= task_name in tasks.RESERVED_TASK_NAMES
|
||||
valid_task |= task_name in ENGINE_COMMANDS
|
||||
|
||||
if not valid_task:
|
||||
raise exc.InvalidModelException(
|
||||
|
|
|
@ -197,7 +197,8 @@ class BaseTest(base.BaseTestCase):
|
|||
|
||||
self.fail(self._formatMessage(msg, standardMsg))
|
||||
|
||||
def _await(self, predicate, delay=1, timeout=60, fail_message="no detail"):
|
||||
def _await(self, predicate, delay=1, timeout=60, fail_message="no detail",
|
||||
fail_message_formatter=lambda x: x):
|
||||
"""Awaits for predicate function to evaluate to True.
|
||||
|
||||
If within a configured timeout predicate function hasn't evaluated
|
||||
|
@ -207,6 +208,7 @@ class BaseTest(base.BaseTestCase):
|
|||
:param timeout: Maximum amount of time to wait for predication
|
||||
function to evaluate to True.
|
||||
:param fail_message: explains what was expected
|
||||
:param fail_message_formatter: lambda that formats the fail_message
|
||||
:return:
|
||||
"""
|
||||
end_time = time.time() + timeout
|
||||
|
@ -217,7 +219,8 @@ class BaseTest(base.BaseTestCase):
|
|||
|
||||
if time.time() + delay > end_time:
|
||||
raise AssertionError(
|
||||
"Failed to wait for expected result: " + fail_message
|
||||
"Failed to wait for expected result: " +
|
||||
fail_message_formatter(fail_message)
|
||||
)
|
||||
|
||||
time.sleep(delay)
|
||||
|
|
|
@ -270,7 +270,13 @@ class EngineTestCase(base.DbTestCase):
|
|||
lambda: self.is_workflow_in_state(ex_id, state),
|
||||
delay,
|
||||
timeout,
|
||||
"Execution {} to reach {} state".format(ex_id, state)
|
||||
fail_message="Execution {ex_id} to reach {state} "
|
||||
"state but is in {current}",
|
||||
fail_message_formatter=lambda m: m.format(
|
||||
ex_id=ex_id,
|
||||
state=state,
|
||||
current=db_api.get_workflow_execution(ex_id).state
|
||||
)
|
||||
)
|
||||
|
||||
def await_workflow_running(self, ex_id, delay=DEFAULT_DELAY,
|
||||
|
|
|
@ -175,10 +175,9 @@ class ProcessCronTriggerTest(base.EngineTestCase):
|
|||
None
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
first_time,
|
||||
cron_trigger.next_execution_time
|
||||
)
|
||||
interval = (cron_trigger.next_execution_time - first_time)
|
||||
|
||||
self.assertLessEqual(interval.total_seconds(), 3.0)
|
||||
|
||||
periodic.process_cron_triggers_v2(None, None)
|
||||
|
||||
|
|
|
@ -362,7 +362,7 @@ class WorkbookSpecValidation(base.WorkbookSpecValidationTestCase):
|
|||
'version': '2.0',
|
||||
'wf1': {
|
||||
'tasks': {
|
||||
'noop-task': {
|
||||
'noop': {
|
||||
'action': 'std.noop'
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ from mistral.db.v2.sqlalchemy import api as db_api
|
|||
from mistral import exceptions as exc
|
||||
from mistral.lang import parser as spec_parser
|
||||
from mistral.lang.v2 import tasks
|
||||
from mistral.lang.v2 import workflows
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit import base
|
||||
from mistral import utils
|
||||
|
@ -85,7 +86,7 @@ WORKFLOW_WITH_VAR_TASK_NAME = """
|
|||
---
|
||||
version: '2.0'
|
||||
|
||||
list_servers:
|
||||
engine_command_{task_name}:
|
||||
|
||||
tasks:
|
||||
{task_name}:
|
||||
|
@ -155,15 +156,10 @@ class WorkflowServiceTest(base.DbTestCase):
|
|||
self.assertEqual('wf2', wf2_spec.get_name())
|
||||
self.assertEqual('direct', wf2_spec.get_type())
|
||||
|
||||
def test_invalid_task_name(self):
|
||||
for name in tasks.RESERVED_TASK_NAMES:
|
||||
def test_engine_commands_are_valid_task_names(self):
|
||||
for name in workflows.ENGINE_COMMANDS:
|
||||
wf = WORKFLOW_WITH_VAR_TASK_NAME.format(task_name=name)
|
||||
|
||||
self.assertRaises(
|
||||
exc.InvalidModelException,
|
||||
wf_service.create_workflows,
|
||||
wf
|
||||
)
|
||||
wf_service.create_workflows(wf)
|
||||
|
||||
def test_update_workflows(self):
|
||||
db_wfs = wf_service.create_workflows(WORKFLOW_LIST)
|
||||
|
|
|
@ -13,8 +13,10 @@
|
|||
# limitations under the License.
|
||||
|
||||
from mistral.lang import parser as spec_parser
|
||||
from mistral.lang.v2 import workflows
|
||||
from mistral.tests.unit import base
|
||||
from mistral.workflow import base as wf_base
|
||||
from mistral.workflow import commands
|
||||
from mistral.workflow import direct_workflow as direct_wf
|
||||
from mistral.workflow import reverse_workflow as reverse_wf
|
||||
|
||||
|
@ -64,3 +66,7 @@ class WorkflowControllerTest(base.BaseTest):
|
|||
wf_base.get_controller(wf_ex, wf_spec),
|
||||
reverse_wf.ReverseWorkflowController
|
||||
)
|
||||
|
||||
def test_all_engine_commands_have_implementation(self):
|
||||
for command in workflows.ENGINE_COMMANDS:
|
||||
self.assertIsNotNone(commands.get_command_class(command))
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
from mistral.lang import parser as spec_parser
|
||||
from mistral.lang.v2 import tasks
|
||||
from mistral.lang.v2 import workflows
|
||||
from mistral.workflow import states
|
||||
|
||||
|
||||
|
@ -228,18 +228,16 @@ class PauseWorkflow(SetWorkflowState):
|
|||
return d
|
||||
|
||||
|
||||
RESERVED_CMDS = dict(zip(
|
||||
tasks.RESERVED_TASK_NAMES, [
|
||||
Noop,
|
||||
FailWorkflow,
|
||||
SucceedWorkflow,
|
||||
PauseWorkflow
|
||||
]
|
||||
))
|
||||
ENGINE_CMD_CLS = {
|
||||
workflows.NOOP_COMMAND: Noop,
|
||||
workflows.FAIL_COMMAND: FailWorkflow,
|
||||
workflows.SUCCEED_COMMAND: SucceedWorkflow,
|
||||
workflows.PAUSE_COMMAND: PauseWorkflow
|
||||
}
|
||||
|
||||
|
||||
def get_command_class(cmd_name):
|
||||
return RESERVED_CMDS[cmd_name] if cmd_name in RESERVED_CMDS else None
|
||||
return ENGINE_CMD_CLS[cmd_name] if cmd_name in ENGINE_CMD_CLS else None
|
||||
|
||||
|
||||
# TODO(rakhmerov): IMO the way how we instantiate commands is weird.
|
||||
|
|
|
@ -120,7 +120,7 @@ class DirectWorkflowController(base.WorkflowController):
|
|||
for t_n, params, event_name in self._find_next_tasks(task_ex, ctx=ctx):
|
||||
t_s = self.wf_spec.get_tasks()[t_n]
|
||||
|
||||
if not (t_s or t_n in commands.RESERVED_CMDS):
|
||||
if not (t_s or t_n in commands.ENGINE_CMD_CLS):
|
||||
raise exc.WorkflowException("Task '%s' not found." % t_n)
|
||||
elif not t_s:
|
||||
t_s = self.wf_spec.get_tasks()[task_ex.name]
|
||||
|
|
Loading…
Reference in New Issue