Graph action refactoring
* get rid of dependency counter in graph action: analyze graph directly instead when we need to determine nodes we need to execute or revert next; * move graph interpreting to separate class, GraphAnalyzer: instead of inheriting this code from base graph action we now use composition; * get rid of direct dependency of graph action on engine. Relates to blueprint smart-revert Co-authored-by: Ivan A. Melnikov <imelnikov@griddynamics.com> Change-Id: Ib6499d2f2d5b568d7f2a45af3c5ed6d8d9ace39b
This commit is contained in:
parent
5cbfcba760
commit
9632fe6392
|
@ -20,6 +20,7 @@ import threading
|
|||
|
||||
from taskflow.engines.action_engine import executor
|
||||
from taskflow.engines.action_engine import graph_action
|
||||
from taskflow.engines.action_engine import graph_analyzer
|
||||
from taskflow.engines.action_engine import task_action
|
||||
from taskflow.engines import base
|
||||
|
||||
|
@ -47,12 +48,14 @@ class ActionEngine(base.EngineBase):
|
|||
reversion to commence. See the valid states in the states module to learn
|
||||
more about what other states the tasks & flow being ran can go through.
|
||||
"""
|
||||
_graph_action_cls = None
|
||||
_graph_action_cls = graph_action.FutureGraphAction
|
||||
_graph_analyzer_cls = graph_analyzer.GraphAnalyzer
|
||||
_task_action_cls = task_action.TaskAction
|
||||
_task_executor_cls = executor.SerialTaskExecutor
|
||||
|
||||
def __init__(self, flow, flow_detail, backend, conf):
|
||||
super(ActionEngine, self).__init__(flow, flow_detail, backend, conf)
|
||||
self._analyzer = None
|
||||
self._root = None
|
||||
self._lock = threading.RLock()
|
||||
self._state_lock = threading.RLock()
|
||||
|
@ -66,7 +69,7 @@ class ActionEngine(base.EngineBase):
|
|||
def _revert(self, current_failure=None):
|
||||
self._change_state(states.REVERTING)
|
||||
try:
|
||||
state = self._root.revert(self)
|
||||
state = self._root.revert()
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self._change_state(states.FAILURE)
|
||||
|
@ -95,7 +98,7 @@ class ActionEngine(base.EngineBase):
|
|||
@property
|
||||
def execution_graph(self):
|
||||
self.compile()
|
||||
return self._root.graph
|
||||
return self._analyzer.graph
|
||||
|
||||
@lock_utils.locked
|
||||
def run(self):
|
||||
|
@ -119,7 +122,7 @@ class ActionEngine(base.EngineBase):
|
|||
def _run(self):
|
||||
self._change_state(states.RUNNING)
|
||||
try:
|
||||
state = self._root.execute(self)
|
||||
state = self._root.execute()
|
||||
except Exception:
|
||||
self._change_state(states.FAILURE)
|
||||
self._revert(misc.Failure())
|
||||
|
@ -165,37 +168,30 @@ class ActionEngine(base.EngineBase):
|
|||
if self._root is not None:
|
||||
return
|
||||
|
||||
assert self._graph_action_cls is not None, (
|
||||
'Graph action class must be specified')
|
||||
self._change_state(states.RESUMING) # does nothing in PENDING state
|
||||
task_graph = flow_utils.flatten(self._flow)
|
||||
if task_graph.number_of_nodes() == 0:
|
||||
raise exc.EmptyFlow("Flow %s is empty." % self._flow.name)
|
||||
self._root = self._graph_action_cls(task_graph)
|
||||
self._analyzer = self._graph_analyzer_cls(task_graph,
|
||||
self.storage)
|
||||
self._root = self._graph_action_cls(self._analyzer,
|
||||
self.storage,
|
||||
self.task_action)
|
||||
for task in task_graph.nodes_iter():
|
||||
task_version = misc.get_version_string(task)
|
||||
self.storage.ensure_task(task.name, task_version, task.save_as)
|
||||
|
||||
self._change_state(states.SUSPENDED) # does nothing in PENDING state
|
||||
|
||||
@property
|
||||
def is_running(self):
|
||||
return self.storage.get_flow_state() == states.RUNNING
|
||||
|
||||
@property
|
||||
def is_reverting(self):
|
||||
return self.storage.get_flow_state() == states.REVERTING
|
||||
|
||||
|
||||
class SingleThreadedActionEngine(ActionEngine):
|
||||
# NOTE(harlowja): This one attempts to run in a serial manner.
|
||||
_graph_action_cls = graph_action.FutureGraphAction
|
||||
"""Engine that runs tasks in serial manner"""
|
||||
_storage_cls = t_storage.Storage
|
||||
|
||||
|
||||
class MultiThreadedActionEngine(ActionEngine):
|
||||
# NOTE(harlowja): This one attempts to run in a parallel manner.
|
||||
_graph_action_cls = graph_action.FutureGraphAction
|
||||
"""Engine that runs tasks in parallel manner"""
|
||||
|
||||
_storage_cls = t_storage.ThreadSafeStorage
|
||||
|
||||
def _task_executor_cls(self):
|
||||
|
|
|
@ -16,57 +16,14 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from taskflow import states as st
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GraphAction(object):
|
||||
|
||||
def __init__(self, graph):
|
||||
self._graph = graph
|
||||
|
||||
@property
|
||||
def graph(self):
|
||||
return self._graph
|
||||
|
||||
def _succ(self, node):
|
||||
return self._graph.successors(node)
|
||||
|
||||
def _pred(self, node):
|
||||
return self._graph.predecessors(node)
|
||||
|
||||
def _resolve_dependencies(self, node, deps_counter, revert=False):
|
||||
to_execute = []
|
||||
nodes = self._pred(node) if revert else self._succ(node)
|
||||
for next_node in nodes:
|
||||
deps_counter[next_node] -= 1
|
||||
if not deps_counter[next_node]:
|
||||
to_execute.append(next_node)
|
||||
return to_execute
|
||||
|
||||
def _browse_nodes_to_execute(self, deps_counter):
|
||||
to_execute = []
|
||||
for node, deps in deps_counter.items():
|
||||
if not deps:
|
||||
to_execute.append(node)
|
||||
return to_execute
|
||||
|
||||
def _get_nodes_dependencies_count(self, revert=False):
|
||||
deps_counter = {}
|
||||
for node in self._graph.nodes_iter():
|
||||
nodes = self._succ(node) if revert else self._pred(node)
|
||||
deps_counter[node] = len(nodes)
|
||||
return deps_counter
|
||||
|
||||
|
||||
_WAITING_TIMEOUT = 60 # in seconds
|
||||
|
||||
|
||||
class FutureGraphAction(GraphAction):
|
||||
class FutureGraphAction(object):
|
||||
"""Graph action build around futures returned by task action.
|
||||
|
||||
This graph action schedules all task it can for execution and than
|
||||
|
@ -74,23 +31,34 @@ class FutureGraphAction(GraphAction):
|
|||
in parallel, this enables parallel flow run and reversion.
|
||||
"""
|
||||
|
||||
def execute(self, engine):
|
||||
was_suspended = self._run(engine, lambda: engine.is_running,
|
||||
engine.task_action.schedule_execution,
|
||||
engine.task_action.complete_execution,
|
||||
revert=False)
|
||||
def __init__(self, analyzer, storage, task_action):
|
||||
self._analyzer = analyzer
|
||||
self._storage = storage
|
||||
self._task_action = task_action
|
||||
|
||||
def is_running(self):
|
||||
return self._storage.get_flow_state() == st.RUNNING
|
||||
|
||||
def is_reverting(self):
|
||||
return self._storage.get_flow_state() == st.REVERTING
|
||||
|
||||
def execute(self):
|
||||
was_suspended = self._run(
|
||||
self.is_running,
|
||||
self._task_action.schedule_execution,
|
||||
self._task_action.complete_execution,
|
||||
self._analyzer.browse_nodes_for_execute)
|
||||
return st.SUSPENDED if was_suspended else st.SUCCESS
|
||||
|
||||
def revert(self, engine):
|
||||
was_suspended = self._run(engine, lambda: engine.is_reverting,
|
||||
engine.task_action.schedule_reversion,
|
||||
engine.task_action.complete_reversion,
|
||||
revert=True)
|
||||
def revert(self):
|
||||
was_suspended = self._run(
|
||||
self.is_reverting,
|
||||
self._task_action.schedule_reversion,
|
||||
self._task_action.complete_reversion,
|
||||
self._analyzer.browse_nodes_for_revert)
|
||||
return st.SUSPENDED if was_suspended else st.REVERTED
|
||||
|
||||
def _run(self, engine, running, schedule_node, complete_node, revert):
|
||||
deps_counter = self._get_nodes_dependencies_count(revert)
|
||||
def _run(self, running, schedule_node, complete_node, get_next_nodes):
|
||||
not_done = []
|
||||
|
||||
def schedule(nodes):
|
||||
|
@ -99,10 +67,10 @@ class FutureGraphAction(GraphAction):
|
|||
if future is not None:
|
||||
not_done.append(future)
|
||||
else:
|
||||
schedule(self._resolve_dependencies(
|
||||
node, deps_counter, revert))
|
||||
schedule(get_next_nodes(node))
|
||||
|
||||
schedule(get_next_nodes())
|
||||
|
||||
schedule(self._browse_nodes_to_execute(deps_counter))
|
||||
failures = []
|
||||
|
||||
was_suspended = False
|
||||
|
@ -110,7 +78,7 @@ class FutureGraphAction(GraphAction):
|
|||
# NOTE(imelnikov): if timeout occurs before any of futures
|
||||
# completes, done list will be empty and we'll just go
|
||||
# for next iteration
|
||||
done, not_done = engine.task_action.wait_for_any(
|
||||
done, not_done = self._task_action.wait_for_any(
|
||||
not_done, _WAITING_TIMEOUT)
|
||||
|
||||
not_done = list(not_done)
|
||||
|
@ -121,8 +89,7 @@ class FutureGraphAction(GraphAction):
|
|||
if isinstance(result, misc.Failure):
|
||||
failures.append(result)
|
||||
else:
|
||||
next_nodes.extend(self._resolve_dependencies(
|
||||
node, deps_counter, revert))
|
||||
next_nodes.extend(get_next_nodes(node))
|
||||
|
||||
if next_nodes:
|
||||
if running() and not failures:
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
|
||||
from taskflow import states as st
|
||||
|
||||
|
||||
class GraphAnalyzer(object):
|
||||
"""Analyzes graph to get next nodes for execution or reversion"""
|
||||
|
||||
def __init__(self, graph, storage):
|
||||
self._graph = graph
|
||||
self._storage = storage
|
||||
|
||||
@property
|
||||
def graph(self):
|
||||
return self._graph
|
||||
|
||||
def browse_nodes_for_execute(self, node=None):
|
||||
"""Browse next nodes to execute for given node if
|
||||
specified and for whole graph otherwise.
|
||||
"""
|
||||
if node:
|
||||
nodes = self._graph.successors(node)
|
||||
else:
|
||||
nodes = self._graph.nodes_iter()
|
||||
|
||||
available_nodes = []
|
||||
for node in nodes:
|
||||
if self._is_ready_for_execute(node):
|
||||
available_nodes.append(node)
|
||||
return available_nodes
|
||||
|
||||
def browse_nodes_for_revert(self, node=None):
|
||||
"""Browse next nodes to revert for given node if
|
||||
specified and for whole graph otherwise.
|
||||
"""
|
||||
if node:
|
||||
nodes = self._graph.predecessors(node)
|
||||
else:
|
||||
nodes = self._graph.nodes_iter()
|
||||
|
||||
available_nodes = []
|
||||
for node in nodes:
|
||||
if self._is_ready_for_revert(node):
|
||||
available_nodes.append(node)
|
||||
return available_nodes
|
||||
|
||||
def _is_ready_for_execute(self, task):
|
||||
"""Checks if task is ready to be executed"""
|
||||
|
||||
state = self._storage.get_task_state(task.name)
|
||||
if not st.check_task_transition(state, st.RUNNING):
|
||||
return False
|
||||
|
||||
task_names = []
|
||||
for prev_task in self._graph.predecessors(task):
|
||||
task_names.append(prev_task.name)
|
||||
|
||||
task_states = self._storage.get_tasks_states(task_names)
|
||||
return all(state == st.SUCCESS
|
||||
for state in six.itervalues(task_states))
|
||||
|
||||
def _is_ready_for_revert(self, task):
|
||||
"""Checks if task is ready to be reverted"""
|
||||
|
||||
state = self._storage.get_task_state(task.name)
|
||||
if not st.check_task_transition(state, st.REVERTING):
|
||||
return False
|
||||
|
||||
task_names = []
|
||||
for prev_task in self._graph.successors(task):
|
||||
task_names.append(prev_task.name)
|
||||
|
||||
task_states = self._storage.get_tasks_states(task_names)
|
||||
return all(state in (st.PENDING, st.REVERTED)
|
||||
for state in six.itervalues(task_states))
|
|
@ -175,6 +175,10 @@ class Storage(object):
|
|||
"""Get state of task with given name"""
|
||||
return self._taskdetail_by_name(task_name).state
|
||||
|
||||
def get_tasks_states(self, task_names):
|
||||
return dict((name, self.get_task_state(name))
|
||||
for name in task_names)
|
||||
|
||||
def update_task_metadata(self, task_name, update_with):
|
||||
if not update_with:
|
||||
return
|
||||
|
|
|
@ -65,6 +65,17 @@ class StorageTest(test.TestCase):
|
|||
self.assertTrue(
|
||||
uuidutils.is_uuid_like(s.get_task_uuid('my task')))
|
||||
|
||||
def test_get_tasks_states(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_task('my task2')
|
||||
s.save('my task', 'foo')
|
||||
expected = {
|
||||
'my task': states.SUCCESS,
|
||||
'my task2': states.PENDING,
|
||||
}
|
||||
self.assertEqual(s.get_tasks_states(['my task', 'my task2']), expected)
|
||||
|
||||
def test_ensure_task_fd(self):
|
||||
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
|
||||
s = storage.Storage(backend=self.backend, flow_detail=flow_detail)
|
||||
|
|
Loading…
Reference in New Issue