Merge "Allow engine commands as task name"

This commit is contained in:
Zuul 2018-07-30 15:39:01 +00:00 committed by Gerrit Code Review
commit c54c35c82d
11 changed files with 67 additions and 41 deletions

View File

@ -711,6 +711,24 @@ This illustrates that, while designing a workflow, it's important to know
precisely how Mistral processes 'on-success', 'on-error' and 'on-complete'
and engine commands.
Engine commands and tasks
'''''''''''''''''''''''''
The **on-*** clauses in direct workflows can refer both to tasks and engine
commands, as demonstrated earlier. It is possible to use the engine commands
as names for tasks. For example, one can create a task named `noop` or `fail`.
These tasks will override the engine commands, that is, the action defined
in these tasks will be executed instead of the engine commands. This is a
method to succinctly extend the default behavior of the Mistral engine or
provide side-effect free workflow examples.
The order in which task names are resolved is the following:
1. the task with the given name is searched
2. the engine command with the given name is selected
The first option that matches is executed.
Fork
''''

View File

@ -33,12 +33,7 @@ _expr_ptrns = [expressions.patterns[name] for name in expressions.patterns]
WITH_ITEMS_PTRN = re.compile(
"\s*([\w\d_\-]+)\s*in\s*(\[.+\]|%s)" % '|'.join(_expr_ptrns)
)
RESERVED_TASK_NAMES = [
'noop',
'fail',
'succeed',
'pause'
]
MAX_LENGTH_TASK_NAME = 255
# Length of a join task name must be less than or equal to maximum
# of task_executions unique_key and named_locks name. Their
@ -155,12 +150,6 @@ class TaskSpec(base.BaseSpec):
def _validate_name(self):
task_name = self._data.get('name')
if task_name in RESERVED_TASK_NAMES:
raise exc.InvalidModelException(
"Reserved keyword '%s' not allowed as task name." %
task_name
)
if len(task_name) > MAX_LENGTH_TASK_NAME:
raise exc.InvalidModelException(
"The length of a '{0}' task name must not exceed {1}"

View File

@ -23,6 +23,17 @@ from mistral.lang.v2 import task_defaults
from mistral.lang.v2 import tasks
from mistral import utils
NOOP_COMMAND = 'noop'
FAIL_COMMAND = 'fail'
SUCCEED_COMMAND = 'succeed'
PAUSE_COMMAND = 'pause'
ENGINE_COMMANDS = [
NOOP_COMMAND,
FAIL_COMMAND,
SUCCEED_COMMAND,
PAUSE_COMMAND
]
class WorkflowSpec(base.BaseSpec):
# See http://json-schema.org
@ -93,7 +104,7 @@ class WorkflowSpec(base.BaseSpec):
valid_task = self._task_exists(task_name)
if allow_engine_cmds:
valid_task |= task_name in tasks.RESERVED_TASK_NAMES
valid_task |= task_name in ENGINE_COMMANDS
if not valid_task:
raise exc.InvalidModelException(

View File

@ -197,7 +197,8 @@ class BaseTest(base.BaseTestCase):
self.fail(self._formatMessage(msg, standardMsg))
def _await(self, predicate, delay=1, timeout=60, fail_message="no detail"):
def _await(self, predicate, delay=1, timeout=60, fail_message="no detail",
fail_message_formatter=lambda x: x):
"""Awaits for predicate function to evaluate to True.
If within a configured timeout predicate function hasn't evaluated
@ -207,6 +208,7 @@ class BaseTest(base.BaseTestCase):
:param timeout: Maximum amount of time to wait for predication
function to evaluate to True.
:param fail_message: explains what was expected
:param fail_message_formatter: lambda that formats the fail_message
:return:
"""
end_time = time.time() + timeout
@ -217,7 +219,8 @@ class BaseTest(base.BaseTestCase):
if time.time() + delay > end_time:
raise AssertionError(
"Failed to wait for expected result: " + fail_message
"Failed to wait for expected result: " +
fail_message_formatter(fail_message)
)
time.sleep(delay)

View File

@ -270,7 +270,13 @@ class EngineTestCase(base.DbTestCase):
lambda: self.is_workflow_in_state(ex_id, state),
delay,
timeout,
"Execution {} to reach {} state".format(ex_id, state)
fail_message="Execution {ex_id} to reach {state} "
"state but is in {current}",
fail_message_formatter=lambda m: m.format(
ex_id=ex_id,
state=state,
current=db_api.get_workflow_execution(ex_id).state
)
)
def await_workflow_running(self, ex_id, delay=DEFAULT_DELAY,

View File

@ -175,10 +175,9 @@ class ProcessCronTriggerTest(base.EngineTestCase):
None
)
self.assertEqual(
first_time,
cron_trigger.next_execution_time
)
interval = (cron_trigger.next_execution_time - first_time)
self.assertLessEqual(interval.total_seconds(), 3.0)
periodic.process_cron_triggers_v2(None, None)

View File

@ -362,7 +362,7 @@ class WorkbookSpecValidation(base.WorkbookSpecValidationTestCase):
'version': '2.0',
'wf1': {
'tasks': {
'noop-task': {
'noop': {
'action': 'std.noop'
}
}

View File

@ -21,6 +21,7 @@ from mistral.db.v2.sqlalchemy import api as db_api
from mistral import exceptions as exc
from mistral.lang import parser as spec_parser
from mistral.lang.v2 import tasks
from mistral.lang.v2 import workflows
from mistral.services import workflows as wf_service
from mistral.tests.unit import base
from mistral import utils
@ -85,7 +86,7 @@ WORKFLOW_WITH_VAR_TASK_NAME = """
---
version: '2.0'
list_servers:
engine_command_{task_name}:
tasks:
{task_name}:
@ -155,15 +156,10 @@ class WorkflowServiceTest(base.DbTestCase):
self.assertEqual('wf2', wf2_spec.get_name())
self.assertEqual('direct', wf2_spec.get_type())
def test_invalid_task_name(self):
for name in tasks.RESERVED_TASK_NAMES:
def test_engine_commands_are_valid_task_names(self):
for name in workflows.ENGINE_COMMANDS:
wf = WORKFLOW_WITH_VAR_TASK_NAME.format(task_name=name)
self.assertRaises(
exc.InvalidModelException,
wf_service.create_workflows,
wf
)
wf_service.create_workflows(wf)
def test_update_workflows(self):
db_wfs = wf_service.create_workflows(WORKFLOW_LIST)

View File

@ -13,8 +13,10 @@
# limitations under the License.
from mistral.lang import parser as spec_parser
from mistral.lang.v2 import workflows
from mistral.tests.unit import base
from mistral.workflow import base as wf_base
from mistral.workflow import commands
from mistral.workflow import direct_workflow as direct_wf
from mistral.workflow import reverse_workflow as reverse_wf
@ -64,3 +66,7 @@ class WorkflowControllerTest(base.BaseTest):
wf_base.get_controller(wf_ex, wf_spec),
reverse_wf.ReverseWorkflowController
)
def test_all_engine_commands_have_implementation(self):
for command in workflows.ENGINE_COMMANDS:
self.assertIsNotNone(commands.get_command_class(command))

View File

@ -14,7 +14,7 @@
# limitations under the License.
from mistral.lang import parser as spec_parser
from mistral.lang.v2 import tasks
from mistral.lang.v2 import workflows
from mistral.workflow import states
@ -228,18 +228,16 @@ class PauseWorkflow(SetWorkflowState):
return d
RESERVED_CMDS = dict(zip(
tasks.RESERVED_TASK_NAMES, [
Noop,
FailWorkflow,
SucceedWorkflow,
PauseWorkflow
]
))
ENGINE_CMD_CLS = {
workflows.NOOP_COMMAND: Noop,
workflows.FAIL_COMMAND: FailWorkflow,
workflows.SUCCEED_COMMAND: SucceedWorkflow,
workflows.PAUSE_COMMAND: PauseWorkflow
}
def get_command_class(cmd_name):
return RESERVED_CMDS[cmd_name] if cmd_name in RESERVED_CMDS else None
return ENGINE_CMD_CLS[cmd_name] if cmd_name in ENGINE_CMD_CLS else None
# TODO(rakhmerov): IMO the way how we instantiate commands is weird.

View File

@ -120,7 +120,7 @@ class DirectWorkflowController(base.WorkflowController):
for t_n, params, event_name in self._find_next_tasks(task_ex, ctx=ctx):
t_s = self.wf_spec.get_tasks()[t_n]
if not (t_s or t_n in commands.RESERVED_CMDS):
if not (t_s or t_n in commands.ENGINE_CMD_CLS):
raise exc.WorkflowException("Task '%s' not found." % t_n)
elif not t_s:
t_s = self.wf_spec.get_tasks()[task_ex.name]