Adding "requires" to "task-defaults" clause

* Reverse workflows can now use "requires" declared in
  "task-defaults", for example, to have all tasks
  depending on a configured task (except this task itself)
* Fixed a bug with "processed" flag, it shouldn't be taken
  into account for reverse workflows because if say task3
  depends on task2 then these tasks can't depend on a task1
  at the same time

Change-Id: Iad113e405f43125b37d16b91fcf448c21cd354e6
(cherry picked from commit fa0102354b)
This commit is contained in:
Renat Akhmerov 2015-04-30 16:56:18 +06:00
parent a72fddfb1f
commit 229f7164ff
5 changed files with 155 additions and 100 deletions

View File

@ -29,35 +29,34 @@ LOG = logging.getLogger(__name__)
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
DIRECT_WF_ON_ERROR = """
---
version: '2.0'
wf:
type: direct
task-defaults:
on-error:
- task3
tasks:
task1:
description: That should lead to transition to task3.
action: std.http url="http://some_url"
on-success:
- task2
task2:
action: std.echo output="Morpheus"
task3:
action: std.echo output="output"
"""
class TaskDefaultsDirectWorkflowEngineTest(base.EngineTestCase):
def test_task_defaults_on_error(self):
wf_service.create_workflows(DIRECT_WF_ON_ERROR)
wf_text = """---
version: '2.0'
wf:
type: direct
task-defaults:
on-error:
- task3
tasks:
task1:
description: That should lead to transition to task3.
action: std.http url="http://some_url"
on-success:
- task2
task2:
action: std.echo output="Morpheus"
task3:
action: std.echo output="output"
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
@ -77,64 +76,6 @@ class TaskDefaultsDirectWorkflowEngineTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task3.state)
REVERSE_WF_RETRY = """
---
version: '2.0'
wf:
type: reverse
task-defaults:
retry:
count: 2
delay: 1
tasks:
task1:
action: std.fail
task2:
action: std.echo output=2
requires: [task1]
"""
REVERSE_WF_TIMEOUT = """
---
version: '2.0'
wf:
type: reverse
task-defaults:
timeout: 1
tasks:
task1:
action: std.async_noop
task2:
action: std.echo output=2
requires: [task1]
"""
REVERSE_WF_WAIT = """
---
version: '2.0'
wf:
type: reverse
task-defaults:
wait-before: 1
wait-after: 1
tasks:
task1:
action: std.echo output=1
"""
class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
def setUp(self):
super(TaskDefaultsReverseWorkflowEngineTest, self).setUp()
@ -144,7 +85,27 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
self.addCleanup(thread_group.stop)
def test_task_defaults_retry_policy(self):
wf_service.create_workflows(REVERSE_WF_RETRY)
wf_text = """---
version: '2.0'
wf:
type: reverse
task-defaults:
retry:
count: 2
delay: 1
tasks:
task1:
action: std.fail
task2:
action: std.echo output=2
requires: [task1]
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {}, task_name='task2')
@ -170,7 +131,25 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
@testtools.skip("Fix 'timeout' policy.")
def test_task_defaults_timeout_policy(self):
wf_service.create_workflows(REVERSE_WF_TIMEOUT)
wf_text = """---
version: '2.0'
wf:
type: reverse
task-defaults:
timeout: 1
tasks:
task1:
action: std.async_noop
task2:
action: std.echo output=2
requires: [task1]
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {}, task_name='task2')
@ -187,7 +166,22 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
self._assert_single_item(tasks, name='task1', state=states.ERROR)
def test_task_defaults_wait_policies(self):
wf_service.create_workflows(REVERSE_WF_WAIT)
wf_text = """---
version: '2.0'
wf:
type: reverse
task-defaults:
wait-before: 1
wait-after: 1
tasks:
task1:
action: std.echo output=1
"""
wf_service.create_workflows(wf_text)
time_before = dt.datetime.now()
@ -210,3 +204,44 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
self.assertEqual(1, len(tasks))
self._assert_single_item(tasks, name='task1', state=states.SUCCESS)
def test_task_defaults_requires(self):
wf_text = """---
version: '2.0'
wf:
type: reverse
task-defaults:
requires: [always_do]
tasks:
task1:
action: std.echo output=1
task2:
action: std.echo output=1
requires: [task1]
always_do:
action: std.echo output="Do something"
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {}, task_name='task2')
self._await(lambda: self.is_execution_success(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(3, len(tasks))
self._assert_single_item(tasks, name='task1', state=states.SUCCESS)
self._assert_single_item(tasks, name='task2', state=states.SUCCESS)
self._assert_single_item(tasks, name='always_do', state=states.SUCCESS)

View File

@ -218,6 +218,13 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase):
({'on-complete': []}, True),
({'on-complete': ['email', 'email']}, True),
({'on-complete': ['email', 12345]}, True),
({'requires': ''}, True),
({'requires': []}, True),
({'requires': ['']}, True),
({'requires': None}, True),
({'requires': 12345}, True),
({'requires': ['echo']}, False),
({'requires': ['echo', 'get']}, False),
({'retry': {'count': 3, 'delay': 1}}, False),
({'retry': {'count': '<% 3 %>', 'delay': 1}}, False),
({'retry': {'count': '<% * %>', 'delay': 1}}, True),

View File

@ -43,7 +43,8 @@ class TaskDefaultsSpec(base.BaseSpec):
"concurrency": policies.CONCURRENCY_SCHEMA,
"on-complete": _on_clause_type,
"on-success": _on_clause_type,
"on-error": _on_clause_type
"on-error": _on_clause_type,
"requires": types.UNIQUE_STRING_LIST
},
"additionalProperties": False
}
@ -67,6 +68,7 @@ class TaskDefaultsSpec(base.BaseSpec):
self._on_complete = self._as_list_of_tuples("on-complete")
self._on_success = self._as_list_of_tuples("on-success")
self._on_error = self._as_list_of_tuples("on-error")
self._requires = data.get('requires', [])
def validate(self):
super(TaskDefaultsSpec, self).validate()
@ -93,3 +95,6 @@ class TaskDefaultsSpec(base.BaseSpec):
def get_on_error(self):
return self._on_error
def get_requires(self):
return self._requires

View File

@ -289,9 +289,6 @@ class ReverseWorkflowTaskSpec(TaskSpec):
self._requires = data.get('requires', [])
def get_requires(self):
if isinstance(self._requires, six.string_types):
return [self._requires]
return self._requires

View File

@ -73,7 +73,7 @@ class ReverseWorkflowController(base.WorkflowController):
def _get_upstream_task_executions(self, task_spec):
t_specs = [
self.wf_spec.get_tasks()[t_name]
for t_name in task_spec.get_requires()
for t_name in self._get_task_requires(task_spec)
or []
]
@ -119,16 +119,16 @@ class ReverseWorkflowController(base.WorkflowController):
if task_ex:
return False
if not task_spec.get_requires():
if not self._get_task_requires(task_spec):
return True
success_task_names = set()
success_t_names = set()
for t_ex in self.wf_ex.task_executions:
if t_ex.state == states.SUCCESS and not t_ex.processed:
success_task_names.add(t_ex.name)
if t_ex.state == states.SUCCESS:
success_t_names.add(t_ex.name)
return not (set(task_spec.get_requires()) - success_task_names)
return not (set(self._get_task_requires(task_spec)) - success_t_names)
def _build_graph(self, tasks_spec):
graph = nx.DiGraph()
@ -144,9 +144,8 @@ class ReverseWorkflowController(base.WorkflowController):
return graph
@staticmethod
def _get_dependency_tasks(tasks_spec, task_spec):
dep_task_names = tasks_spec[task_spec.get_name()].get_requires()
def _get_dependency_tasks(self, tasks_spec, task_spec):
dep_task_names = self._get_task_requires(task_spec)
if len(dep_task_names) == 0:
return []
@ -159,3 +158,15 @@ class ReverseWorkflowController(base.WorkflowController):
dep_t_specs.add(t_spec)
return dep_t_specs
def _get_task_requires(self, task_spec):
requires = set(task_spec.get_requires())
task_defaults = self.wf_spec.get_task_defaults()
if task_defaults:
requires |= set(task_defaults.get_requires())
requires.discard(task_spec.get_name())
return list(requires)