diff --git a/doc/source/engines.rst b/doc/source/engines.rst index 6aa2388f4..312c2e367 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -448,11 +448,13 @@ Components .. automodule:: taskflow.engines.action_engine.builder .. automodule:: taskflow.engines.action_engine.compiler .. automodule:: taskflow.engines.action_engine.completer +.. automodule:: taskflow.engines.action_engine.deciders .. automodule:: taskflow.engines.action_engine.executor .. automodule:: taskflow.engines.action_engine.runtime .. automodule:: taskflow.engines.action_engine.scheduler .. autoclass:: taskflow.engines.action_engine.scopes.ScopeWalker :special-members: __iter__ +.. automodule:: taskflow.engines.action_engine.traversal Hierarchy ========= diff --git a/doc/source/patterns.rst b/doc/source/patterns.rst index 8c8eb410e..fc546bb37 100644 --- a/doc/source/patterns.rst +++ b/doc/source/patterns.rst @@ -21,6 +21,7 @@ Graph flow ~~~~~~~~~~ .. automodule:: taskflow.patterns.graph_flow +.. automodule:: taskflow.deciders Hierarchy ~~~~~~~~~ diff --git a/taskflow/deciders.py b/taskflow/deciders.py new file mode 100644 index 000000000..c96b32185 --- /dev/null +++ b/taskflow/deciders.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from taskflow.utils import misc + + +class Depth(misc.StrEnum): + """Enumeration of decider(s) *area of influence*.""" + + ALL = 'ALL' + """ + **Default** decider depth that affects **all** successor atoms (including + ones that are in successor nested flows). + """ + + FLOW = 'FLOW' + """ + Decider depth that affects **all** successor tasks in the **same** + flow (it will **not** affect tasks/retries that are in successor + nested flows). + + .. warning:: + + While using this kind we are allowed to execute successors of + things that have been ignored (for example nested flows and the + tasks they contain), this may result in symbol lookup errors during + running, user beware. + """ + + NEIGHBORS = 'NEIGHBORS' + """ + Decider depth that affects only **next** successor tasks (and does + not traverse past **one** level of successor tasks). + + .. warning:: + + While using this kind we are allowed to execute successors of + things that have been ignored (for example nested flows and the + tasks they contain), this may result in symbol lookup errors during + running, user beware. + """ + + ATOM = 'ATOM' + """ + Decider depth that affects only **targeted** atom (and does + **not** traverse into **any** level of successor atoms). + + .. warning:: + + While using this kind we are allowed to execute successors of + things that have been ignored (for example nested flows and the + tasks they contain), this may result in symbol lookup errors during + running, user beware. + """ + + @classmethod + def translate(cls, desired_depth): + """Translates a string into a depth enumeration.""" + if isinstance(desired_depth, cls): + # Nothing to do in the first place... + return desired_depth + if not isinstance(desired_depth, six.string_types): + raise TypeError("Unexpected desired depth type, string type" + " expected, not %s" % type(desired_depth)) + try: + return cls(desired_depth.upper()) + except ValueError: + pretty_depths = sorted([a_depth.name for a_depth in cls]) + raise ValueError("Unexpected decider depth value, one of" + " %s (case-insensitive) is expected and" + " not '%s'" % (pretty_depths, desired_depth)) + + +# Depth area of influence order (from greater influence to least). +# +# Order very much matters here... +_ORDERING = tuple([ + Depth.ALL, Depth.FLOW, Depth.NEIGHBORS, Depth.ATOM, +]) + + +def pick_widest(depths): + """Pick from many depths which has the **widest** area of influence.""" + return _ORDERING[min(_ORDERING.index(d) for d in depths)] diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py index c0b625b64..e7047c607 100644 --- a/taskflow/engines/action_engine/analyzer.py +++ b/taskflow/engines/action_engine/analyzer.py @@ -14,108 +14,16 @@ # License for the specific language governing permissions and limitations # under the License. -import abc -import itertools import operator import weakref -import six - from taskflow.engines.action_engine import compiler as co +from taskflow.engines.action_engine import deciders +from taskflow.engines.action_engine import traversal from taskflow import states as st from taskflow.utils import iter_utils -def _depth_first_iterate(graph, connected_to_functors, initial_nodes_iter): - """Iterates connected nodes in execution graph (from starting set). - - Jumps over nodes with ``noop`` attribute (does not yield them back). - """ - stack = list(initial_nodes_iter) - while stack: - node = stack.pop() - node_attrs = graph.node[node] - if not node_attrs.get('noop'): - yield node - try: - node_kind = node_attrs['kind'] - connected_to_functor = connected_to_functors[node_kind] - except KeyError: - pass - else: - stack.extend(connected_to_functor(node)) - - -@six.add_metaclass(abc.ABCMeta) -class Decider(object): - """Base class for deciders. - - Provides interface to be implemented by sub-classes - Decider checks whether next atom in flow should be executed or not - """ - - @abc.abstractmethod - def check(self, runtime): - """Returns bool of whether this decider should allow running.""" - - @abc.abstractmethod - def affect(self, runtime): - """If the :py:func:`~.check` returns false, affects associated atoms. - - """ - - def check_and_affect(self, runtime): - """Handles :py:func:`~.check` + :py:func:`~.affect` in right order.""" - proceed = self.check(runtime) - if not proceed: - self.affect(runtime) - return proceed - - -class IgnoreDecider(Decider): - """Checks any provided edge-deciders and determines if ok to run.""" - - def __init__(self, atom, edge_deciders): - self._atom = atom - self._edge_deciders = edge_deciders - - def check(self, runtime): - """Returns bool of whether this decider should allow running.""" - # Gather all atoms results so that those results can be used - # by the decider(s) that are making a decision as to pass or - # not pass... - results = {} - for node, node_kind, _local_decider in self._edge_deciders: - if node_kind in co.ATOMS: - results[node.name] = runtime.storage.get(node.name) - for _node, _node_kind, local_decider in self._edge_deciders: - if not local_decider(history=results): - return False - return True - - def affect(self, runtime): - """If the :py:func:`~.check` returns false, affects associated atoms. - - This will alter the associated atom + successor atoms by setting there - state to ``IGNORE`` so that they are ignored in future runtime - activities. - """ - successors_iter = runtime.analyzer.iterate_connected_atoms(self._atom) - runtime.reset_atoms(itertools.chain([self._atom], successors_iter), - state=st.IGNORE, intention=st.IGNORE) - - -class NoOpDecider(Decider): - """No-op decider that says it is always ok to run & has no effect(s).""" - - def check(self, runtime): - """Always good to go.""" - return True - - def affect(self, runtime): - """Does nothing.""" - - class Analyzer(object): """Analyzes a compilation and aids in execution processes. @@ -142,7 +50,7 @@ class Analyzer(object): if state == st.SUCCESS: if intention == st.REVERT: return iter([ - (atom, NoOpDecider()), + (atom, deciders.NoOpDecider()), ]) elif intention == st.EXECUTE: return self.browse_atoms_for_execute(atom=atom) @@ -165,10 +73,15 @@ class Analyzer(object): if atom is None: atom_it = self.iterate_nodes(co.ATOMS) else: - successors_iter = self._execution_graph.successors_iter - atom_it = _depth_first_iterate(self._execution_graph, - {co.FLOW: successors_iter}, - successors_iter(atom)) + # NOTE(harlowja): the reason this uses breadth first is so that + # when deciders are applied that those deciders can be applied + # from top levels to lower levels since lower levels *may* be + # able to run even if top levels have deciders that decide to + # ignore some atoms... (going deeper first would make this + # problematic to determine as top levels can have their deciders + # applied **after** going deeper). + atom_it = traversal.breadth_first_iterate( + self._execution_graph, atom, traversal.Direction.FORWARD) for atom in atom_it: is_ready, late_decider = self._get_maybe_ready_for_execute(atom) if is_ready: @@ -185,18 +98,50 @@ class Analyzer(object): if atom is None: atom_it = self.iterate_nodes(co.ATOMS) else: - predecessors_iter = self._execution_graph.predecessors_iter - atom_it = _depth_first_iterate(self._execution_graph, - {co.FLOW: predecessors_iter}, - predecessors_iter(atom)) + atom_it = traversal.breadth_first_iterate( + self._execution_graph, atom, traversal.Direction.BACKWARD, + # Stop at the retry boundary (as retries 'control' there + # surronding atoms, and we don't want to back track over + # them so that they can correctly affect there associated + # atoms); we do though need to jump through all tasks since + # if a predecessor Y was ignored and a predecessor Z before Y + # was not it should be eligible to now revert... + through_retries=False) for atom in atom_it: is_ready, late_decider = self._get_maybe_ready_for_revert(atom) if is_ready: yield (atom, late_decider) def _get_maybe_ready(self, atom, transition_to, allowed_intentions, - connected_fetcher, connected_checker, + connected_fetcher, ready_checker, decider_fetcher): + def iter_connected_states(): + # Lazily iterate over connected states so that ready checkers + # can stop early (vs having to consume and check all the + # things...) + for atom in connected_fetcher(): + # TODO(harlowja): make this storage api better, its not + # especially clear what the following is doing (mainly + # to avoid two calls into storage). + atom_states = self._storage.get_atoms_states([atom.name]) + yield (atom, atom_states[atom.name]) + # NOTE(harlowja): How this works is the following... + # + # 1. First check if the current atom can even transition to the + # desired state, if not this atom is definitely not ready to + # execute or revert. + # 2. Check if the actual atoms intention is in one of the desired/ok + # intentions, if it is not there we are still not ready to execute + # or revert. + # 3. Iterate over (atom, atom_state, atom_intention) for all the + # atoms the 'connected_fetcher' callback yields from underlying + # storage and direct that iterator into the 'ready_checker' + # callback, that callback should then iterate over these entries + # and determine if it is ok to execute or revert. + # 4. If (and only if) 'ready_checker' returns true, then + # the 'decider_fetcher' callback is called to get a late decider + # which can (if it desires) affect this ready result (but does + # so right before the atom is about to be scheduled). state = self._storage.get_atom_state(atom.name) ok_to_transition = self._runtime.check_atom_transition(atom, state, transition_to) @@ -205,59 +150,62 @@ class Analyzer(object): intention = self._storage.get_atom_intention(atom.name) if intention not in allowed_intentions: return (False, None) - connected_states = self._storage.get_atoms_states( - connected_atom.name for connected_atom in connected_fetcher(atom)) - ok_to_run = connected_checker(six.itervalues(connected_states)) + ok_to_run = ready_checker(iter_connected_states()) if not ok_to_run: return (False, None) else: - return (True, decider_fetcher(atom)) + return (True, decider_fetcher()) def _get_maybe_ready_for_execute(self, atom): """Returns if an atom is *likely* ready to be executed.""" - def decider_fetcher(atom): - edge_deciders = self._runtime.fetch_edge_deciders(atom) - if edge_deciders: - return IgnoreDecider(atom, edge_deciders) - else: - return NoOpDecider() - predecessors_iter = self._execution_graph.predecessors_iter - connected_fetcher = lambda atom: \ - _depth_first_iterate(self._execution_graph, - {co.FLOW: predecessors_iter}, - predecessors_iter(atom)) - connected_checker = lambda connected_iter: \ - all(state == st.SUCCESS and intention == st.EXECUTE - for state, intention in connected_iter) + def ready_checker(pred_connected_it): + for _atom, (atom_state, atom_intention) in pred_connected_it: + if (atom_state in (st.SUCCESS, st.IGNORE) and + atom_intention in (st.EXECUTE, st.IGNORE)): + continue + return False + return True + decider_fetcher = lambda: \ + deciders.IgnoreDecider( + atom, self._runtime.fetch_edge_deciders(atom)) + connected_fetcher = lambda: \ + traversal.depth_first_iterate(self._execution_graph, atom, + # Whether the desired atom + # can execute is dependent on its + # predecessors outcomes (thus why + # we look backwards). + traversal.Direction.BACKWARD) + # If this atoms current state is able to be transitioned to RUNNING + # and its intention is to EXECUTE and all of its predecessors executed + # successfully or were ignored then this atom is ready to execute. return self._get_maybe_ready(atom, st.RUNNING, [st.EXECUTE], - connected_fetcher, connected_checker, + connected_fetcher, ready_checker, decider_fetcher) def _get_maybe_ready_for_revert(self, atom): """Returns if an atom is *likely* ready to be reverted.""" - successors_iter = self._execution_graph.successors_iter - connected_fetcher = lambda atom: \ - _depth_first_iterate(self._execution_graph, - {co.FLOW: successors_iter}, - successors_iter(atom)) - connected_checker = lambda connected_iter: \ - all(state in (st.PENDING, st.REVERTED) - for state, _intention in connected_iter) - decider_fetcher = lambda atom: NoOpDecider() + def ready_checker(succ_connected_it): + for _atom, (atom_state, _atom_intention) in succ_connected_it: + if atom_state not in (st.PENDING, st.REVERTED, st.IGNORE): + return False + return True + noop_decider = deciders.NoOpDecider() + connected_fetcher = lambda: \ + traversal.depth_first_iterate(self._execution_graph, atom, + # Whether the desired atom + # can revert is dependent on its + # successors states (thus why we + # look forwards). + traversal.Direction.FORWARD) + decider_fetcher = lambda: noop_decider + # If this atoms current state is able to be transitioned to REVERTING + # and its intention is either REVERT or RETRY and all of its + # successors are either PENDING or REVERTED then this atom is ready + # to revert. return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY], - connected_fetcher, connected_checker, + connected_fetcher, ready_checker, decider_fetcher) - def iterate_connected_atoms(self, atom): - """Iterates **all** successor atoms connected to given atom.""" - successors_iter = self._execution_graph.successors_iter - return _depth_first_iterate( - self._execution_graph, { - co.FLOW: successors_iter, - co.TASK: successors_iter, - co.RETRY: successors_iter, - }, successors_iter(atom)) - def iterate_retries(self, state=None): """Iterates retry atoms that match the provided state. @@ -268,7 +216,8 @@ class Analyzer(object): atom_states = self._storage.get_atoms_states(atom.name for atom in atoms) for atom in atoms: - if atom_states[atom.name][0] == state: + atom_state, _atom_intention = atom_states[atom.name] + if atom_state == state: yield atom else: for atom in self.iterate_nodes((co.RETRY,)): @@ -290,7 +239,7 @@ class Analyzer(object): atom_states = self._storage.get_atoms_states(atom.name for atom in atoms) for atom in atoms: - atom_state = atom_states[atom.name][0] + atom_state, _atom_intention = atom_states[atom.name] if atom_state == st.IGNORE: continue if atom_state != st.SUCCESS: diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index 72aa81c56..7ce9b301a 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -223,11 +223,15 @@ class MachineBuilder(object): atom, intention) except Exception: memory.failures.append(failure.Failure()) + LOG.exception("Engine '%s' atom post-completion" + " failed", atom) else: try: more_work = set(iter_next_atoms(atom=atom)) except Exception: memory.failures.append(failure.Failure()) + LOG.exception("Engine '%s' atom post-completion" + " next atom searching failed", atom) else: next_up.update(more_work) if is_runnable() and next_up and not memory.failures: diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py index 561fc18ce..9933fd0cb 100644 --- a/taskflow/engines/action_engine/compiler.py +++ b/taskflow/engines/action_engine/compiler.py @@ -40,22 +40,55 @@ LOG = logging.getLogger(__name__) TASK = 'task' RETRY = 'retry' FLOW = 'flow' +FLOW_END = 'flow_end' # Quite often used together, so make a tuple everyone can share... ATOMS = (TASK, RETRY) +FLOWS = (FLOW, FLOW_END) + + +class Terminator(object): + """Flow terminator class.""" + + def __init__(self, flow): + self._flow = flow + self._name = "%s[$]" % (self._flow.name,) + + @property + def flow(self): + """The flow which this terminator signifies/marks the end of.""" + return self._flow + + @property + def name(self): + """Useful name this end terminator has (derived from flow name).""" + return self._name + + def __str__(self): + return "%s[$]" % (self._flow,) class Compilation(object): - """The result of a compilers compile() is this *immutable* object.""" + """The result of a compilers ``compile()`` is this *immutable* object.""" - #: Task nodes will have a ``kind`` attribute/metadata key with this value. + #: Task nodes will have a ``kind`` metadata key with this value. TASK = TASK - #: Retry nodes will have a ``kind`` attribute/metadata key with this value. + #: Retry nodes will have a ``kind`` metadata key with this value. RETRY = RETRY - #: Flow nodes will have a ``kind`` attribute/metadata key with this value. FLOW = FLOW + """ + Flow **entry** nodes will have a ``kind`` metadata key with + this value. + """ + + FLOW_END = FLOW_END + """ + Flow **exit** nodes will have a ``kind`` metadata key with + this value (only applicable for compilation execution graph, not currently + used in tree hierarchy). + """ def __init__(self, execution_graph, hierarchy): self._execution_graph = execution_graph @@ -141,6 +174,8 @@ class FlowCompiler(object): _add_update_edges(graph, u_graph.no_successors_iter(), list(v_graph.no_predecessors_iter()), attr_dict=attr_dict) + # Insert the flow(s) retry if needed, and always make sure it + # is the **immediate** successor of the flow node itself. if flow.retry is not None: graph.add_node(flow.retry, kind=RETRY) _add_update_edges(graph, [flow], [flow.retry], @@ -149,20 +184,42 @@ class FlowCompiler(object): if node is not flow.retry and node is not flow: graph.node[node].setdefault(RETRY, flow.retry) from_nodes = [flow.retry] - connected_attr_dict = {LINK_INVARIANT: True, LINK_RETRY: True} + attr_dict = {LINK_INVARIANT: True, LINK_RETRY: True} else: from_nodes = [flow] - connected_attr_dict = {LINK_INVARIANT: True} - connected_to = [ - node for node in graph.no_predecessors_iter() if node is not flow - ] - if connected_to: - # Ensure all nodes in this graph(s) that have no - # predecessors depend on this flow (or this flow's retry) so that - # we can depend on the flow being traversed before its - # children (even though at the current time it will be skipped). - _add_update_edges(graph, from_nodes, connected_to, - attr_dict=connected_attr_dict) + attr_dict = {LINK_INVARIANT: True} + # Ensure all nodes with no predecessors are connected to this flow + # or its retry node (so that the invariant that the flow node is + # traversed through before its contents is maintained); this allows + # us to easily know when we have entered a flow (when running) and + # do special and/or smart things such as only traverse up to the + # start of a flow when looking for node deciders. + _add_update_edges(graph, from_nodes, [ + node for node in graph.no_predecessors_iter() + if node is not flow + ], attr_dict=attr_dict) + # Connect all nodes with no successors into a special terminator + # that is used to identify the end of the flow and ensure that all + # execution traversals will traverse over this node before executing + # further work (this is especially useful for nesting and knowing + # when we have exited a nesting level); it allows us to do special + # and/or smart things such as applying deciders up to (but not + # beyond) a flow termination point. + # + # Do note that in a empty flow this will just connect itself to + # the flow node itself... and also note we can not use the flow + # object itself (primarily because the underlying graph library + # uses hashing to identify node uniqueness and we can easily create + # a loop if we don't do this correctly, so avoid that by just + # creating this special node and tagging it with a special kind); we + # may be able to make this better in the future with a multidigraph + # that networkx provides?? + flow_term = Terminator(flow) + graph.add_node(flow_term, kind=FLOW_END, noop=True) + _add_update_edges(graph, [ + node for node in graph.no_successors_iter() + if node is not flow_term + ], [flow_term], attr_dict={LINK_INVARIANT: True}) return graph, tree_node diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index d5c43a58b..02b63f761 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -128,7 +128,7 @@ class Completer(object): atom_states = self._storage.get_atoms_states(atom.name for atom in atoms) for atom in atoms: - atom_state = atom_states[atom.name][0] + atom_state, _atom_intention = atom_states[atom.name] if atom_state == st.FAILURE: self._process_atom_failure(atom, self._storage.get(atom.name)) for retry in self._analyzer.iterate_retries(st.RETRYING): @@ -137,7 +137,7 @@ class Completer(object): atom_states[atom.name] = (state, intention) unfinished_atoms = set() for atom in atoms: - atom_state = atom_states[atom.name][0] + atom_state, _atom_intention = atom_states[atom.name] if atom_state in (st.RUNNING, st.REVERTING): unfinished_atoms.add(atom) return unfinished_atoms diff --git a/taskflow/engines/action_engine/deciders.py b/taskflow/engines/action_engine/deciders.py new file mode 100644 index 000000000..4c4ac9d6c --- /dev/null +++ b/taskflow/engines/action_engine/deciders.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import itertools + +import six + +from taskflow import deciders +from taskflow.engines.action_engine import compiler +from taskflow.engines.action_engine import traversal +from taskflow import states + + +@six.add_metaclass(abc.ABCMeta) +class Decider(object): + """Base class for deciders. + + Provides interface to be implemented by sub-classes. + + Deciders check whether next atom in flow should be executed or not. + """ + + @abc.abstractmethod + def tally(self, runtime): + """Tally edge deciders on whether this decider should allow running. + + The returned value is a list of edge deciders that voted + 'nay' (do not allow running). + """ + + @abc.abstractmethod + def affect(self, runtime, nay_voters): + """Affects associated atoms due to at least one 'nay' edge decider. + + This will alter the associated atom + some set of successor atoms by + setting there state and intention to ``IGNORE`` so that they are + ignored in future runtime activities. + """ + + def check_and_affect(self, runtime): + """Handles :py:func:`~.tally` + :py:func:`~.affect` in right order. + + NOTE(harlowja): If there are zero 'nay' edge deciders then it is + assumed this decider should allow running. + + Returns boolean of whether this decider allows for running (or not). + """ + nay_voters = self.tally(runtime) + if nay_voters: + self.affect(runtime, nay_voters) + return False + return True + + +def _affect_all_successors(atom, runtime): + execution_graph = runtime.compilation.execution_graph + successors_iter = traversal.depth_first_iterate( + execution_graph, atom, traversal.Direction.FORWARD) + runtime.reset_atoms(itertools.chain([atom], successors_iter), + state=states.IGNORE, intention=states.IGNORE) + + +def _affect_successor_tasks_in_same_flow(atom, runtime): + execution_graph = runtime.compilation.execution_graph + successors_iter = traversal.depth_first_iterate( + execution_graph, atom, traversal.Direction.FORWARD, + # Do not go through nested flows but do follow *all* tasks that + # are directly connected in this same flow (thus the reason this is + # called the same flow decider); retries are direct successors + # of flows, so they should also be not traversed through, but + # setting this explicitly ensures that. + through_flows=False, through_retries=False) + runtime.reset_atoms(itertools.chain([atom], successors_iter), + state=states.IGNORE, intention=states.IGNORE) + + +def _affect_atom(atom, runtime): + runtime.reset_atoms([atom], state=states.IGNORE, intention=states.IGNORE) + + +def _affect_direct_task_neighbors(atom, runtime): + def _walk_neighbors(): + execution_graph = runtime.compilation.execution_graph + for node in execution_graph.successors_iter(atom): + node_data = execution_graph.node[node] + if node_data['kind'] == compiler.TASK: + yield node + successors_iter = _walk_neighbors() + runtime.reset_atoms(itertools.chain([atom], successors_iter), + state=states.IGNORE, intention=states.IGNORE) + + +class IgnoreDecider(Decider): + """Checks any provided edge-deciders and determines if ok to run.""" + + _depth_strategies = { + deciders.Depth.ALL: _affect_all_successors, + deciders.Depth.ATOM: _affect_atom, + deciders.Depth.FLOW: _affect_successor_tasks_in_same_flow, + deciders.Depth.NEIGHBORS: _affect_direct_task_neighbors, + } + + def __init__(self, atom, edge_deciders): + self._atom = atom + self._edge_deciders = edge_deciders + + def tally(self, runtime): + if not self._edge_deciders: + return [] + # Gather all atoms (the ones that were not ignored) results so that + # those results can be used + # by the decider(s) that are making a decision as to pass or + # not pass... + states_intentions = runtime.storage.get_atoms_states( + ed.from_node.name for ed in self._edge_deciders + if ed.kind in compiler.ATOMS) + history = {} + for atom_name in six.iterkeys(states_intentions): + atom_state, _atom_intention = states_intentions[atom_name] + if atom_state != states.IGNORE: + history[atom_name] = runtime.storage.get(atom_name) + nay_voters = [] + for ed in self._edge_deciders: + if ed.kind in compiler.ATOMS and ed.from_node.name not in history: + continue + if not ed.decider(history=history): + nay_voters.append(ed) + return nay_voters + + def affect(self, runtime, nay_voters): + # If there were many 'nay' edge deciders that were targeted + # at this atom, then we need to pick the one which has the widest + # impact and respect that one as the decider depth that will + # actually affect things. + widest_depth = deciders.pick_widest(ed.depth for ed in nay_voters) + affector = self._depth_strategies[widest_depth] + return affector(self._atom, runtime) + + +class NoOpDecider(Decider): + """No-op decider that says it is always ok to run & has no effect(s).""" + + def tally(self, runtime): + """Always good to go.""" + return [] + + def affect(self, runtime, nay_voters): + """Does nothing.""" diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index c65da3429..7084f51b9 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -99,37 +99,37 @@ class ActionEngine(base.Engine): **Engine options:** - +-------------------+-----------------------+------+-----------+ - | Name/key | Description | Type | Default | - +===================+=======================+======+===========+ - | defer_reverts | This option lets you | bool | ``False`` | - | | safely nest flows | | | - | | with retries inside | | | - | | flows without retries | | | - | | and it still behaves | | | - | | as a user would | | | - | | expect (for example | | | - | | if the retry gets | | | - | | exhausted it reverts | | | - | | the outer flow unless | | | - | | the outer flow has a | | | - | | has a separate retry | | | - | | behavior). | | | - +-------------------+-----------------------+------+-----------+ - | inject_transient | When true, values | bool | ``True`` | - | | that are local to | | | - | | each atoms scope | | | - | | are injected into | | | - | | storage into a | | | - | | transient location | | | - | | (typically a local | | | - | | dictionary), when | | | - | | false those values | | | - | | are instead persisted | | | - | | into atom details | | | - | | (and saved in a non- | | | - | | transient manner). | | | - +-------------------+-----------------------+------+-----------+ + +----------------------+-----------------------+------+------------+ + | Name/key | Description | Type | Default | + +======================+=======================+======+============+ + | ``defer_reverts`` | This option lets you | bool | ``False`` | + | | safely nest flows | | | + | | with retries inside | | | + | | flows without retries | | | + | | and it still behaves | | | + | | as a user would | | | + | | expect (for example | | | + | | if the retry gets | | | + | | exhausted it reverts | | | + | | the outer flow unless | | | + | | the outer flow has a | | | + | | has a separate retry | | | + | | behavior). | | | + +----------------------+-----------------------+------+------------+ + | ``inject_transient`` | When true, values | bool | ``True`` | + | | that are local to | | | + | | each atoms scope | | | + | | are injected into | | | + | | storage into a | | | + | | transient location | | | + | | (typically a local | | | + | | dictionary), when | | | + | | false those values | | | + | | are instead persisted | | | + | | into atom details | | | + | | (and saved in a non- | | | + | | transient manner). | | | + +----------------------+-----------------------+------+------------+ """ NO_RERAISING_STATES = frozenset([states.SUSPENDED, states.SUCCESS]) @@ -148,6 +148,12 @@ class ActionEngine(base.Engine): end-users when doing execution iterations via :py:meth:`.run_iter`. """ + MAX_MACHINE_STATES_RETAINED = 10 + """ + During :py:meth:`~.run_iter` the last X state machine transitions will + be recorded (typically only useful on failure). + """ + def __init__(self, flow, flow_detail, backend, options): super(ActionEngine, self).__init__(flow, flow_detail, backend, options) self._runtime = None @@ -242,16 +248,21 @@ class ActionEngine(base.Engine): self.compile() self.prepare() self.validate() - last_state = None + # Keep track of the last X state changes, which if a failure happens + # are quite useful to log (and the performance of tracking this + # should be negligible). + last_transitions = collections.deque( + maxlen=max(1, self.MAX_MACHINE_STATES_RETAINED)) with _start_stop(self._task_executor, self._retry_executor): self._change_state(states.RUNNING) try: closed = False machine, memory = self._runtime.builder.build(timeout=timeout) r = runners.FiniteRunner(machine) - for (_prior_state, new_state) in r.run_iter(builder.START): - last_state = new_state - # NOTE(harlowja): skip over meta-states. + for transition in r.run_iter(builder.START): + last_transitions.append(transition) + _prior_state, new_state = transition + # NOTE(harlowja): skip over meta-states if new_state in builder.META_STATES: continue if new_state == states.FAILURE: @@ -271,15 +282,24 @@ class ActionEngine(base.Engine): self.suspend() except Exception: with excutils.save_and_reraise_exception(): + LOG.exception("Engine execution has failed, something" + " bad must of happened (last" + " %s machine transitions were %s)", + last_transitions.maxlen, + list(last_transitions)) self._change_state(states.FAILURE) else: - if last_state and last_state not in self.IGNORABLE_STATES: - self._change_state(new_state) - if last_state not in self.NO_RERAISING_STATES: - it = itertools.chain( - six.itervalues(self.storage.get_failures()), - six.itervalues(self.storage.get_revert_failures())) - failure.Failure.reraise_if_any(it) + if last_transitions: + _prior_state, new_state = last_transitions[-1] + if new_state not in self.IGNORABLE_STATES: + self._change_state(new_state) + if new_state not in self.NO_RERAISING_STATES: + failures = self.storage.get_failures() + more_failures = self.storage.get_revert_failures() + fails = itertools.chain( + six.itervalues(failures), + six.itervalues(more_failures)) + failure.Failure.reraise_if_any(fails) @staticmethod def _check_compilation(compilation): diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 41dbd7798..f6fc9c15a 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -19,6 +19,7 @@ import functools from futurist import waiters +from taskflow import deciders as de from taskflow.engines.action_engine.actions import retry as ra from taskflow.engines.action_engine.actions import task as ta from taskflow.engines.action_engine import analyzer as an @@ -27,11 +28,17 @@ from taskflow.engines.action_engine import compiler as com from taskflow.engines.action_engine import completer as co from taskflow.engines.action_engine import scheduler as sched from taskflow.engines.action_engine import scopes as sc +from taskflow.engines.action_engine import traversal as tr from taskflow import exceptions as exc -from taskflow.flow import LINK_DECIDER from taskflow import states as st from taskflow.utils import misc +from taskflow.flow import (LINK_DECIDER, LINK_DECIDER_DEPTH) # noqa + +# Small helper to make the edge decider tuples more easily useable... +_EdgeDecider = collections.namedtuple('_EdgeDecider', + 'from_node,kind,decider,depth') + class Runtime(object): """A aggregate of runtime objects, properties, ... used during execution. @@ -42,7 +49,8 @@ class Runtime(object): """ def __init__(self, compilation, storage, atom_notifier, - task_executor, retry_executor, options=None): + task_executor, retry_executor, + options=None): self._atom_notifier = atom_notifier self._task_executor = task_executor self._retry_executor = retry_executor @@ -51,11 +59,10 @@ class Runtime(object): self._atom_cache = {} self._options = misc.safe_copy_dict(options) - @staticmethod - def _walk_edge_deciders(graph, atom): + def _walk_edge_deciders(self, graph, atom): """Iterates through all nodes, deciders that alter atoms execution.""" # This is basically a reverse breadth first exploration, with - # special logic to further traverse down flow nodes... + # special logic to further traverse down flow nodes as needed... predecessors_iter = graph.predecessors_iter nodes = collections.deque((u_node, atom) for u_node in predecessors_iter(atom)) @@ -63,14 +70,19 @@ class Runtime(object): while nodes: u_node, v_node = nodes.popleft() u_node_kind = graph.node[u_node]['kind'] + u_v_data = graph.adj[u_node][v_node] try: - yield (u_node, u_node_kind, - graph.adj[u_node][v_node][LINK_DECIDER]) + decider = u_v_data[LINK_DECIDER] + decider_depth = u_v_data.get(LINK_DECIDER_DEPTH) + if decider_depth is None: + decider_depth = de.Depth.ALL + yield _EdgeDecider(u_node, u_node_kind, + decider, decider_depth) except KeyError: pass if u_node_kind == com.FLOW and u_node not in visited: - # Avoid re-exploring the same flow if we get to this - # same flow by a different *future* path... + # Avoid re-exploring the same flow if we get to this same + # flow by a different *future* path... visited.add(u_node) # Since we *currently* jump over flow node(s), we need to make # sure that any prior decider that was directed at this flow @@ -108,7 +120,7 @@ class Runtime(object): graph = self._compilation.execution_graph for node, node_data in graph.nodes_iter(data=True): node_kind = node_data['kind'] - if node_kind == com.FLOW: + if node_kind in com.FLOWS: continue elif node_kind in com.ATOMS: check_transition_handler = check_transition_handlers[node_kind] @@ -128,6 +140,10 @@ class Runtime(object): metadata['edge_deciders'] = tuple(deciders_it) metadata['action'] = action self._atom_cache[node.name] = metadata + # TODO(harlowja): optimize the different decider depths to avoid + # repeated full successor searching; this can be done by searching + # for the widest depth of parent(s), and limiting the search of + # children by the that depth. @property def compilation(self): @@ -246,11 +262,12 @@ class Runtime(object): def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE): """Resets a atoms subgraph to the given state and intention. - The subgraph is contained of all of the atoms successors. + The subgraph is contained of **all** of the atoms successors. """ - return self.reset_atoms( - self.analyzer.iterate_connected_atoms(atom), - state=state, intention=intention) + execution_graph = self._compilation.execution_graph + atoms_it = tr.depth_first_iterate(execution_graph, atom, + tr.Direction.FORWARD) + return self.reset_atoms(atoms_it, state=state, intention=intention) def retry_subflow(self, retry): """Prepares a retrys + its subgraph for execution. diff --git a/taskflow/engines/action_engine/scopes.py b/taskflow/engines/action_engine/scopes.py index 4558ddda2..01a7546ec 100644 --- a/taskflow/engines/action_engine/scopes.py +++ b/taskflow/engines/action_engine/scopes.py @@ -15,33 +15,12 @@ # under the License. from taskflow.engines.action_engine import compiler as co +from taskflow.engines.action_engine import traversal as tr from taskflow import logging LOG = logging.getLogger(__name__) -def _depth_first_reverse_iterate(node, idx=-1): - """Iterates connected (in reverse) nodes (from starting node). - - Jumps through nodes with ``FLOW`` ``kind`` attribute (does not yield - them back). - """ - # Always go left to right, since right to left is the pattern order - # and we want to go backwards and not forwards through that ordering... - if idx == -1: - children_iter = node.reverse_iter() - else: - children_iter = reversed(node[0:idx]) - for child in children_iter: - if child.metadata['kind'] == co.FLOW: - # Jump through these... - for child_child in child.dfs_iter(right_to_left=False): - if child_child.metadata['kind'] in co.ATOMS: - yield child_child.item - else: - yield child.item - - class ScopeWalker(object): """Walks through the scopes of a atom using a engines compilation. @@ -117,7 +96,9 @@ class ScopeWalker(object): except KeyError: visible = [] removals = set() - for atom in _depth_first_reverse_iterate(parent, idx=last_idx): + atom_it = tr.depth_first_reverse_iterate( + parent, start_from_idx=last_idx) + for atom in atom_it: if atom in predecessors: predecessors.remove(atom) removals.add(atom) diff --git a/taskflow/engines/action_engine/traversal.py b/taskflow/engines/action_engine/traversal.py new file mode 100644 index 000000000..ad0b794ed --- /dev/null +++ b/taskflow/engines/action_engine/traversal.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import enum + +from taskflow.engines.action_engine import compiler as co + + +class Direction(enum.Enum): + """Traversal direction enum.""" + + #: Go through successors. + FORWARD = 1 + + #: Go through predecessors. + BACKWARD = 2 + + +def _extract_connectors(execution_graph, starting_node, direction, + through_flows=True, through_retries=True, + through_tasks=True): + if direction == Direction.FORWARD: + connected_iter = execution_graph.successors_iter + else: + connected_iter = execution_graph.predecessors_iter + connected_to_functors = {} + if through_flows: + connected_to_functors[co.FLOW] = connected_iter + connected_to_functors[co.FLOW_END] = connected_iter + if through_retries: + connected_to_functors[co.RETRY] = connected_iter + if through_tasks: + connected_to_functors[co.TASK] = connected_iter + return connected_iter(starting_node), connected_to_functors + + +def breadth_first_iterate(execution_graph, starting_node, direction, + through_flows=True, through_retries=True, + through_tasks=True): + """Iterates connected nodes in execution graph (from starting node). + + Does so in a breadth first manner. + + Jumps over nodes with ``noop`` attribute (does not yield them back). + """ + initial_nodes_iter, connected_to_functors = _extract_connectors( + execution_graph, starting_node, direction, + through_flows=through_flows, through_retries=through_retries, + through_tasks=through_tasks) + q = collections.deque(initial_nodes_iter) + while q: + node = q.popleft() + node_attrs = execution_graph.node[node] + if not node_attrs.get('noop'): + yield node + try: + node_kind = node_attrs['kind'] + connected_to_functor = connected_to_functors[node_kind] + except KeyError: + pass + else: + q.extend(connected_to_functor(node)) + + +def depth_first_iterate(execution_graph, starting_node, direction, + through_flows=True, through_retries=True, + through_tasks=True): + """Iterates connected nodes in execution graph (from starting node). + + Does so in a depth first manner. + + Jumps over nodes with ``noop`` attribute (does not yield them back). + """ + initial_nodes_iter, connected_to_functors = _extract_connectors( + execution_graph, starting_node, direction, + through_flows=through_flows, through_retries=through_retries, + through_tasks=through_tasks) + stack = list(initial_nodes_iter) + while stack: + node = stack.pop() + node_attrs = execution_graph.node[node] + if not node_attrs.get('noop'): + yield node + try: + node_kind = node_attrs['kind'] + connected_to_functor = connected_to_functors[node_kind] + except KeyError: + pass + else: + stack.extend(connected_to_functor(node)) + + +def depth_first_reverse_iterate(node, start_from_idx=-1): + """Iterates connected (in reverse) **tree** nodes (from starting node). + + Jumps through nodes with ``noop`` attribute (does not yield them back). + """ + # Always go left to right, since right to left is the pattern order + # and we want to go backwards and not forwards through that ordering... + if start_from_idx == -1: + # All of them... + children_iter = node.reverse_iter() + else: + children_iter = reversed(node[0:start_from_idx]) + for child in children_iter: + if child.metadata.get('noop'): + # Jump through these... + for grand_child in child.dfs_iter(right_to_left=False): + if grand_child.metadata['kind'] in co.ATOMS: + yield grand_child.item + else: + yield child.item diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 857d6634d..beb66e999 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -221,6 +221,14 @@ class InvalidFormat(TaskFlowException): """Raised when some object/entity is not in the expected format.""" +class DisallowedAccess(TaskFlowException): + """Raised when storage access is not possible due to state limitations.""" + + def __init__(self, message, cause=None, state=None): + super(DisallowedAccess, self).__init__(message, cause=cause) + self.state = state + + # Others. class NotImplementedError(NotImplementedError): diff --git a/taskflow/flow.py b/taskflow/flow.py index a0f6846bd..3b974f7b0 100644 --- a/taskflow/flow.py +++ b/taskflow/flow.py @@ -39,6 +39,9 @@ LINK_DECIDER = 'decider' _CHOP_PAT = "taskflow.patterns." _CHOP_PAT_LEN = len(_CHOP_PAT) +# This key denotes the depth the decider will apply (defaulting to all). +LINK_DECIDER_DEPTH = 'decider_depth' + @six.add_metaclass(abc.ABCMeta) class Flow(object): diff --git a/taskflow/formatters.py b/taskflow/formatters.py index 41b409c98..af6c6a50f 100644 --- a/taskflow/formatters.py +++ b/taskflow/formatters.py @@ -42,16 +42,20 @@ def _fetch_predecessor_tree(graph, atom): """Creates a tree of predecessors, rooted at given atom.""" root = tree.Node(atom) stack = [(root, atom)] - seen = set() while stack: parent, node = stack.pop() for pred_node in graph.predecessors_iter(node): - child = tree.Node(pred_node, - **graph.node[pred_node]) - parent.add(child) - stack.append((child, pred_node)) - seen.add(pred_node) - return len(seen), root + pred_node_data = graph.node[pred_node] + if pred_node_data['kind'] == compiler.FLOW_END: + # Jump over and/or don't show flow end nodes... + for pred_pred_node in graph.predecessors_iter(pred_node): + stack.append((parent, pred_pred_node)) + else: + child = tree.Node(pred_node, **pred_node_data) + parent.add(child) + # And go further backwards... + stack.append((child, pred_node)) + return root class FailureFormatter(object): @@ -64,59 +68,51 @@ class FailureFormatter(object): def __init__(self, engine, hide_inputs_outputs_of=()): self._hide_inputs_outputs_of = hide_inputs_outputs_of self._engine = engine - self._formatter_funcs = { - compiler.FLOW: self._format_flow, - } - for kind in compiler.ATOMS: - self._formatter_funcs[kind] = self._format_atom - - def _format_atom(self, storage, cache, node): - """Formats a single tree node (atom) into a string version.""" - atom = node.item - atom_name = atom.name - atom_attrs = {} - intention, intention_found = _cached_get(cache, 'intentions', - atom_name, - storage.get_atom_intention, - atom_name) - if intention_found: - atom_attrs['intention'] = intention - state, state_found = _cached_get(cache, 'states', atom_name, - storage.get_atom_state, atom_name) - if state_found: - atom_attrs['state'] = state - if atom_name not in self._hide_inputs_outputs_of: - # When the cache does not exist for this atom this - # will be called with the rest of these arguments - # used to populate the cache. - fetch_mapped_args = functools.partial( - storage.fetch_mapped_args, atom.rebind, - atom_name=atom_name, optional_args=atom.optional) - requires, requires_found = _cached_get(cache, 'requires', - atom_name, - fetch_mapped_args) - if requires_found: - atom_attrs['requires'] = requires - provides, provides_found = _cached_get(cache, 'provides', - atom_name, - storage.get_execute_result, - atom_name) - if provides_found: - atom_attrs['provides'] = provides - if atom_attrs: - return "Atom '%s' %s" % (atom_name, atom_attrs) - else: - return "Atom '%s'" % (atom_name) - - def _format_flow(self, storage, cache, node): - """Formats a single tree node (flow) into a string version.""" - flow = node.item - return flow.name def _format_node(self, storage, cache, node): """Formats a single tree node into a string version.""" - formatter_func = self. _formatter_funcs[node.metadata['kind']] - return formatter_func(storage, cache, node) + if node.metadata['kind'] == compiler.FLOW: + flow = node.item + flow_name = flow.name + return "Flow '%s'" % (flow_name) + elif node.metadata['kind'] in compiler.ATOMS: + atom = node.item + atom_name = atom.name + atom_attrs = {} + intention, intention_found = _cached_get( + cache, 'intentions', atom_name, storage.get_atom_intention, + atom_name) + if intention_found: + atom_attrs['intention'] = intention + state, state_found = _cached_get(cache, 'states', atom_name, + storage.get_atom_state, + atom_name) + if state_found: + atom_attrs['state'] = state + if atom_name not in self._hide_inputs_outputs_of: + # When the cache does not exist for this atom this + # will be called with the rest of these arguments + # used to populate the cache. + fetch_mapped_args = functools.partial( + storage.fetch_mapped_args, atom.rebind, + atom_name=atom_name, optional_args=atom.optional) + requires, requires_found = _cached_get(cache, 'requires', + atom_name, + fetch_mapped_args) + if requires_found: + atom_attrs['requires'] = requires + provides, provides_found = _cached_get( + cache, 'provides', atom_name, + storage.get_execute_result, atom_name) + if provides_found: + atom_attrs['provides'] = provides + if atom_attrs: + return "Atom '%s' %s" % (atom_name, atom_attrs) + else: + return "Atom '%s'" % (atom_name) + else: + raise TypeError("Unable to format node, unknown node" + " kind '%s' encountered" % node.metadata['kind']) def format(self, fail, atom_matcher): """Returns a (exc_info, details) tuple about the failure. @@ -143,13 +139,11 @@ class FailureFormatter(object): graph = compilation.execution_graph atom_node = hierarchy.find_first_match(atom_matcher) atom = None - priors = 0 atom_intention = None if atom_node is not None: atom = atom_node.item atom_intention = storage.get_atom_intention(atom.name) - priors = sum(c for (_n, c) in graph.in_degree_iter([atom])) - if atom is not None and priors and atom_intention in self._BUILDERS: + if atom is not None and atom_intention in self._BUILDERS: # Cache as much as we can, since the path of various atoms # may cause the same atom to be seen repeatedly depending on # the graph structure... @@ -160,12 +154,13 @@ class FailureFormatter(object): 'states': {}, } builder, kind = self._BUILDERS[atom_intention] - count, rooted_tree = builder(graph, atom) - buff.write_nl('%s %s (most recent atoms first):' % (count, kind)) + rooted_tree = builder(graph, atom) + child_count = rooted_tree.child_count(only_direct=False) + buff.write_nl('%s %s (most recent first):' % (child_count, kind)) formatter = functools.partial(self._format_node, storage, cache) - child_count = rooted_tree.child_count() + direct_child_count = rooted_tree.child_count(only_direct=True) for i, child in enumerate(rooted_tree, 1): - if i == child_count: + if i == direct_child_count: buff.write(child.pformat(stringify_node=formatter, starting_prefix=" ")) else: diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index c769124f0..e3d9d9b26 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -18,6 +18,7 @@ import collections import six +from taskflow import deciders as de from taskflow import exceptions as exc from taskflow import flow from taskflow.types import graph as gr @@ -73,7 +74,7 @@ class Flow(flow.Flow): #: Extracts the unsatisified symbol requirements of a single node. _unsatisfied_requires = staticmethod(_unsatisfied_requires) - def link(self, u, v, decider=None): + def link(self, u, v, decider=None, decider_depth=None): """Link existing node u as a runtime dependency of existing node v. Note that if the addition of these edges creates a `cyclic`_ graph @@ -93,6 +94,13 @@ class Flow(flow.Flow): links that have ``v`` as a target. It is expected to return a single boolean (``True`` to allow ``v`` execution or ``False`` to not). + :param decider_depth: One of the :py:class:`~taskflow.deciders.Depth` + enumerations (or a string version of) that will + be used to influence what atoms are ignored + when the decider provided results false. If + not provided (and a valid decider is provided + then this defaults to + :py:attr:`~taskflow.deciders.Depth.ALL`). .. _cyclic: https://en.wikipedia.org/wiki/Cycle_graph """ @@ -103,11 +111,13 @@ class Flow(flow.Flow): if decider is not None: if not six.callable(decider): raise ValueError("Decider boolean callback must be callable") - self._swap(self._link(u, v, manual=True, decider=decider)) + self._swap(self._link(u, v, manual=True, + decider=decider, decider_depth=decider_depth)) return self def _link(self, u, v, graph=None, - reason=None, manual=False, decider=None): + reason=None, manual=False, decider=None, + decider_depth=None): mutable_graph = True if graph is None: graph = self._graph @@ -119,6 +129,18 @@ class Flow(flow.Flow): attrs = {} if decider is not None: attrs[flow.LINK_DECIDER] = decider + try: + # Remove existing decider depth, if one existed. + del attrs[flow.LINK_DECIDER_DEPTH] + except KeyError: + pass + if decider_depth is not None: + if decider is None: + raise ValueError("Decider depth requires a decider to be" + " provided along with it") + else: + decider_depth = de.Depth.translate(decider_depth) + attrs[flow.LINK_DECIDER_DEPTH] = decider_depth if manual: attrs[flow.LINK_MANUAL] = True if reason is not None: diff --git a/taskflow/storage.py b/taskflow/storage.py index 1784ff749..5c348f350 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -15,6 +15,7 @@ # under the License. import contextlib +import functools import fasteners from oslo_utils import reflection @@ -84,6 +85,127 @@ META_PROGRESS = 'progress' META_PROGRESS_DETAILS = 'progress_details' +class _ProviderLocator(object): + """Helper to start to better decouple the finding logic from storage. + + WIP: part of the larger effort to cleanup/refactor the finding of named + arguments so that the code can be more unified and easy to + follow... + """ + + def __init__(self, transient_results, + providers_fetcher, result_fetcher): + self.result_fetcher = result_fetcher + self.providers_fetcher = providers_fetcher + self.transient_results = transient_results + + def _try_get_results(self, looking_for, provider, + look_into_results=True, find_potentials=False): + if provider.name is _TRANSIENT_PROVIDER: + # TODO(harlowja): This 'is' check still sucks, do this + # better in the future... + results = self.transient_results + else: + try: + results = self.result_fetcher(provider.name) + except (exceptions.NotFound, exceptions.DisallowedAccess): + if not find_potentials: + raise + else: + # Ok, likely hasn't produced a result yet, but + # at a future point it hopefully will, so stub + # out the *expected* result. + results = {} + if look_into_results: + _item_from_single(provider, results, looking_for) + return results + + def _find(self, looking_for, scope_walker=None, + short_circuit=True, find_potentials=False): + if scope_walker is None: + scope_walker = [] + default_providers, atom_providers = self.providers_fetcher(looking_for) + searched_providers = set() + providers_and_results = [] + if default_providers: + for p in default_providers: + searched_providers.add(p) + try: + provider_results = self._try_get_results( + looking_for, p, find_potentials=find_potentials, + # For default providers always look into there + # results as default providers are statically setup + # and therefore looking into there provided results + # should fail early. + look_into_results=True) + except exceptions.NotFound: + if not find_potentials: + raise + else: + providers_and_results.append((p, provider_results)) + if short_circuit: + return (searched_providers, providers_and_results) + if not atom_providers: + return (searched_providers, providers_and_results) + atom_providers_by_name = dict((p.name, p) for p in atom_providers) + for accessible_atom_names in iter(scope_walker): + # *Always* retain the scope ordering (if any matches + # happen); instead of retaining the possible provider match + # order (which isn't that important and may be different from + # the scope requested ordering). + maybe_atom_providers = [atom_providers_by_name[atom_name] + for atom_name in accessible_atom_names + if atom_name in atom_providers_by_name] + tmp_providers_and_results = [] + if find_potentials: + for p in maybe_atom_providers: + searched_providers.add(p) + tmp_providers_and_results.append((p, {})) + else: + for p in maybe_atom_providers: + searched_providers.add(p) + try: + # Don't at this point look into the provider results + # as calling code will grab all providers, and then + # get the result from the *first* provider that + # actually provided it (or die). + provider_results = self._try_get_results( + looking_for, p, find_potentials=find_potentials, + look_into_results=False) + except exceptions.DisallowedAccess as e: + if e.state != states.IGNORE: + exceptions.raise_with_cause( + exceptions.NotFound, + "Expected to be able to find output %r" + " produced by %s but was unable to get at" + " that providers results" % (looking_for, p)) + else: + LOG.blather("Avoiding using the results of" + " %r (from %s) for name %r because" + " it was ignored", p.name, p, + looking_for) + else: + tmp_providers_and_results.append((p, provider_results)) + if tmp_providers_and_results and short_circuit: + return (searched_providers, tmp_providers_and_results) + else: + providers_and_results.extend(tmp_providers_and_results) + return (searched_providers, providers_and_results) + + def find_potentials(self, looking_for, scope_walker=None): + """Returns the accessible **potential** providers.""" + _searched_providers, providers_and_results = self._find( + looking_for, scope_walker=scope_walker, + short_circuit=False, find_potentials=True) + return set(p for (p, _provider_results) in providers_and_results) + + def find(self, looking_for, scope_walker=None, short_circuit=True): + """Returns the accessible providers.""" + return self._find(looking_for, scope_walker=scope_walker, + short_circuit=short_circuit, + find_potentials=False) + + class _Provider(object): """A named symbol provider that produces a output at the given index.""" @@ -326,13 +448,13 @@ class Storage(object): ad = self._flowdetail.find(self._atom_name_to_uuid[atom_name]) except KeyError: exceptions.raise_with_cause(exceptions.NotFound, - "Unknown atom name: %s" % atom_name) + "Unknown atom name '%s'" % atom_name) else: # TODO(harlowja): we need to figure out how to get away from doing # these kinds of type checks in general (since they likely mean # we aren't doing something right). if expected_type and not isinstance(ad, expected_type): - raise TypeError("Atom %s is not of the expected type: %s" + raise TypeError("Atom '%s' is not of the expected type: %s" % (atom_name, reflection.get_class_name(expected_type))) if clone: @@ -479,8 +601,9 @@ class Storage(object): try: _item_from(container, index) except _EXTRACTION_EXCEPTIONS: - LOG.warning("Atom %s did not supply result " - "with index %r (name %s)", atom_name, index, name) + LOG.warning("Atom '%s' did not supply result " + "with index %r (name '%s')", atom_name, index, + name) @fasteners.write_locked def save(self, atom_name, result, state=states.SUCCESS): @@ -545,17 +668,34 @@ class Storage(object): except KeyError: pass return failure - # TODO(harlowja): this seems like it should be checked before fetching - # the potential failure, instead of after, fix this soon... - if source.state not in allowed_states: - raise exceptions.NotFound("Result for atom %s is not currently" - " known" % atom_name) - return getattr(source, results_attr_name) + else: + if source.state not in allowed_states: + raise exceptions.DisallowedAccess( + "Result for atom '%s' is not known/accessible" + " due to it being in %s state when result access" + " is restricted to %s states" % (atom_name, + source.state, + allowed_states), + state=source.state) + return getattr(source, results_attr_name) def get_execute_result(self, atom_name): """Gets the ``execute`` results for an atom from storage.""" - return self._get(atom_name, 'results', 'failure', - _EXECUTE_STATES_WITH_RESULTS, states.EXECUTE) + try: + results = self._get(atom_name, 'results', 'failure', + _EXECUTE_STATES_WITH_RESULTS, states.EXECUTE) + except exceptions.DisallowedAccess as e: + if e.state == states.IGNORE: + exceptions.raise_with_cause(exceptions.NotFound, + "Result for atom '%s' execution" + " is not known (as it was" + " ignored)" % atom_name) + else: + exceptions.raise_with_cause(exceptions.NotFound, + "Result for atom '%s' execution" + " is not known" % atom_name) + else: + return results @fasteners.read_locked def _get_failures(self, fail_cache_key): @@ -577,8 +717,21 @@ class Storage(object): def get_revert_result(self, atom_name): """Gets the ``revert`` results for an atom from storage.""" - return self._get(atom_name, 'revert_results', 'revert_failure', - _REVERT_STATES_WITH_RESULTS, states.REVERT) + try: + results = self._get(atom_name, 'revert_results', 'revert_failure', + _REVERT_STATES_WITH_RESULTS, states.REVERT) + except exceptions.DisallowedAccess as e: + if e.state == states.IGNORE: + exceptions.raise_with_cause(exceptions.NotFound, + "Result for atom '%s' revert is" + " not known (as it was" + " ignored)" % atom_name) + else: + exceptions.raise_with_cause(exceptions.NotFound, + "Result for atom '%s' revert is" + " not known" % atom_name) + else: + return results def get_revert_failures(self): """Get all ``revert`` failures that happened with this flow.""" @@ -639,7 +792,7 @@ class Storage(object): be serializable). """ if atom_name not in self._atom_name_to_uuid: - raise exceptions.NotFound("Unknown atom name: %s" % atom_name) + raise exceptions.NotFound("Unknown atom name '%s'" % atom_name) def save_transient(): self._injected_args.setdefault(atom_name, {}) @@ -728,6 +881,19 @@ class Storage(object): self._set_result_mapping(provider_name, dict((name, name) for name in names)) + def _fetch_providers(self, looking_for, providers=None): + """Return pair of (default providers, atom providers).""" + if providers is None: + providers = self._reverse_mapping.get(looking_for, []) + default_providers = [] + atom_providers = [] + for p in providers: + if p.name in (_TRANSIENT_PROVIDER, self.injector_name): + default_providers.append(p) + else: + atom_providers.append(p) + return default_providers, atom_providers + def _set_result_mapping(self, provider_name, mapping): """Sets the result mapping for a given producer. @@ -757,30 +923,30 @@ class Storage(object): if many_handler is None: many_handler = _many_handler try: - providers = self._reverse_mapping[name] + maybe_providers = self._reverse_mapping[name] except KeyError: - exceptions.raise_with_cause(exceptions.NotFound, - "Name %r is not mapped as a produced" - " output by any providers" % name) + raise exceptions.NotFound("Name %r is not mapped as a produced" + " output by any providers" % name) + locator = _ProviderLocator( + self._transients, + functools.partial(self._fetch_providers, + providers=maybe_providers), + lambda atom_name: + self._get(atom_name, 'last_results', 'failure', + _EXECUTE_STATES_WITH_RESULTS, states.EXECUTE)) values = [] - for provider in providers: - if provider.name is _TRANSIENT_PROVIDER: - values.append(_item_from_single(provider, - self._transients, name)) - else: - try: - container = self._get(provider.name, - 'last_results', 'failure', - _EXECUTE_STATES_WITH_RESULTS, - states.EXECUTE) - except exceptions.NotFound: - pass - else: - values.append(_item_from_single(provider, - container, name)) + searched_providers, providers = locator.find( + name, short_circuit=False, + # NOTE(harlowja): There are no scopes used here (as of now), so + # we just return all known providers as if it was one large + # scope. + scope_walker=[[p.name for p in maybe_providers]]) + for provider, results in providers: + values.append(_item_from_single(provider, results, name)) if not values: - raise exceptions.NotFound("Unable to find result %r," - " searched %s" % (name, providers)) + raise exceptions.NotFound( + "Unable to find result %r, searched %s providers" + % (name, len(searched_providers))) else: return many_handler(values) @@ -796,49 +962,6 @@ class Storage(object): needed values; it just checks that they are registered to produce it in the future. """ - - def _fetch_providers(name): - """Fetchs pair of (default providers, non-default providers).""" - default_providers = [] - non_default_providers = [] - for p in self._reverse_mapping.get(name, []): - if p.name in (_TRANSIENT_PROVIDER, self.injector_name): - default_providers.append(p) - else: - non_default_providers.append(p) - return default_providers, non_default_providers - - def _locate_providers(name, scope_walker=None): - """Finds the accessible *potential* providers.""" - default_providers, non_default_providers = _fetch_providers(name) - providers = [] - if non_default_providers: - if scope_walker is not None: - scope_iter = iter(scope_walker) - else: - scope_iter = iter([]) - for names in scope_iter: - for p in non_default_providers: - if p.name in names: - providers.append(p) - for p in default_providers: - if p.name is _TRANSIENT_PROVIDER: - results = self._transients - else: - try: - results = self._get(p.name, 'last_results', 'failure', - _EXECUTE_STATES_WITH_RESULTS, - states.EXECUTE) - except exceptions.NotFound: - results = {} - try: - _item_from_single(p, results, name) - except exceptions.NotFound: - pass - else: - providers.append(p) - return providers - source, _clone = self._atomdetail_by_name(atom_name) if scope_walker is None: scope_walker = self._scope_fetcher(atom_name) @@ -849,6 +972,11 @@ class Storage(object): source.meta.get(META_INJECTED, {}), ] missing = set(six.iterkeys(args_mapping)) + locator = _ProviderLocator( + self._transients, self._fetch_providers, + lambda atom_name: + self._get(atom_name, 'last_results', 'failure', + _EXECUTE_STATES_WITH_RESULTS, states.EXECUTE)) for (bound_name, name) in six.iteritems(args_mapping): if LOG.isEnabledFor(logging.TRACE): LOG.trace("Looking for %r <= %r for atom '%s'", @@ -863,8 +991,8 @@ class Storage(object): continue if name in source: maybe_providers += 1 - providers = _locate_providers(name, scope_walker=scope_walker) - maybe_providers += len(providers) + maybe_providers += len( + locator.find_potentials(name, scope_walker=scope_walker)) if maybe_providers: LOG.trace("Atom '%s' will have %s potential providers" " of %r <= %r", atom_name, maybe_providers, @@ -894,7 +1022,6 @@ class Storage(object): atom_name=None, scope_walker=None, optional_args=None): """Fetch ``execute`` arguments for an atom using its args mapping.""" - def _extract_first_from(name, sources): """Extracts/returns first occurence of key in list of dicts.""" for i, source in enumerate(sources): @@ -903,49 +1030,6 @@ class Storage(object): if name in source: return (i, source[name]) raise KeyError(name) - - def _get_results(looking_for, provider): - """Gets the results saved for a given provider.""" - try: - return self._get(provider.name, 'last_results', 'failure', - _EXECUTE_STATES_WITH_RESULTS, - states.EXECUTE) - except exceptions.NotFound: - exceptions.raise_with_cause(exceptions.NotFound, - "Expected to be able to find" - " output %r produced by %s but was" - " unable to get at that providers" - " results" % (looking_for, - provider)) - - def _locate_providers(looking_for, possible_providers, - scope_walker=None): - """Finds the accessible providers.""" - default_providers = [] - for p in possible_providers: - if p.name is _TRANSIENT_PROVIDER: - default_providers.append((p, self._transients)) - if p.name == self.injector_name: - default_providers.append((p, _get_results(looking_for, p))) - if default_providers: - return default_providers - if scope_walker is not None: - scope_iter = iter(scope_walker) - else: - scope_iter = iter([]) - extractor = lambda p: p.name - for names in scope_iter: - # *Always* retain the scope ordering (if any matches - # happen); instead of retaining the possible provider match - # order (which isn't that important and may be different from - # the scope requested ordering). - providers = misc.look_for(names, possible_providers, - extractor=extractor) - if providers: - return [(p, _get_results(looking_for, p)) - for p in providers] - return [] - if optional_args is None: optional_args = [] if atom_name: @@ -960,6 +1044,9 @@ class Storage(object): injected_sources = [] if not args_mapping: return {} + get_results = lambda atom_name: \ + self._get(atom_name, 'last_results', 'failure', + _EXECUTE_STATES_WITH_RESULTS, states.EXECUTE) mapped_args = {} for (bound_name, name) in six.iteritems(args_mapping): if LOG.isEnabledFor(logging.TRACE): @@ -969,8 +1056,8 @@ class Storage(object): else: LOG.trace("Looking for %r <= %r", bound_name, name) try: - source_index, value = _extract_first_from(name, - injected_sources) + source_index, value = _extract_first_from( + name, injected_sources) mapped_args[bound_name] = value if LOG.isEnabledFor(logging.TRACE): if source_index == 0: @@ -983,7 +1070,7 @@ class Storage(object): " values)", bound_name, name, value) except KeyError: try: - possible_providers = self._reverse_mapping[name] + maybe_providers = self._reverse_mapping[name] except KeyError: if bound_name in optional_args: LOG.trace("Argument %r is optional, skipping", @@ -992,15 +1079,18 @@ class Storage(object): raise exceptions.NotFound("Name %r is not mapped as a" " produced output by any" " providers" % name) - # Reduce the possible providers to one that are allowed. - providers = _locate_providers(name, possible_providers, - scope_walker=scope_walker) + locator = _ProviderLocator( + self._transients, + functools.partial(self._fetch_providers, + providers=maybe_providers), get_results) + searched_providers, providers = locator.find( + name, scope_walker=scope_walker) if not providers: raise exceptions.NotFound( "Mapped argument %r <= %r was not produced" " by any accessible provider (%s possible" " providers were scanned)" - % (bound_name, name, len(possible_providers))) + % (bound_name, name, len(searched_providers))) provider, value = _item_from_first_of(providers, name) mapped_args[bound_name] = value LOG.trace("Matched %r <= %r to %r (from %s)", diff --git a/taskflow/tests/unit/action_engine/test_compile.py b/taskflow/tests/unit/action_engine/test_compile.py index 6ccf35889..757bde78d 100644 --- a/taskflow/tests/unit/action_engine/test_compile.py +++ b/taskflow/tests/unit/action_engine/test_compile.py @@ -25,12 +25,27 @@ from taskflow import test from taskflow.tests import utils as test_utils +def _replicate_graph_with_names(compilation): + # Turn a graph of nodes into a graph of names only so that + # testing can use those names instead of having to use the exact + # node objects themselves (which is problematic for any end nodes that + # are added into the graph *dynamically*, and are not there in the + # original/source flow). + g = compilation.execution_graph + n_g = g.__class__(name=g.name) + for node, node_data in g.nodes_iter(data=True): + n_g.add_node(node.name, attr_dict=node_data) + for u, v, u_v_data in g.edges_iter(data=True): + n_g.add_edge(u.name, v.name, attr_dict=u_v_data) + return n_g + + class PatternCompileTest(test.TestCase): def test_task(self): task = test_utils.DummyTask(name='a') - compilation = compiler.PatternCompiler(task).compile() - g = compilation.execution_graph - self.assertEqual([task], list(g.nodes())) + g = _replicate_graph_with_names( + compiler.PatternCompiler(task).compile()) + self.assertEqual(['a'], list(g.nodes())) self.assertEqual([], list(g.edges())) def test_retry(self): @@ -54,19 +69,20 @@ class PatternCompileTest(test.TestCase): inner_flo.add(d) flo.add(inner_flo) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(6, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(8, len(g)) order = g.topological_sort() - self.assertEqual([flo, a, b, c, inner_flo, d], order) - self.assertTrue(g.has_edge(c, inner_flo)) - self.assertTrue(g.has_edge(inner_flo, d)) + self.assertEqual(['test', 'a', 'b', 'c', + "sub-test", 'd', "sub-test[$]", + 'test[$]'], order) + self.assertTrue(g.has_edge('c', "sub-test")) + self.assertTrue(g.has_edge("sub-test", 'd')) self.assertEqual({'invariant': True}, - g.get_edge_data(inner_flo, d)) - - self.assertEqual([d], list(g.no_successors_iter())) - self.assertEqual([flo], list(g.no_predecessors_iter())) + g.get_edge_data("sub-test", 'd')) + self.assertEqual(['test[$]'], list(g.no_successors_iter())) + self.assertEqual(['test'], list(g.no_predecessors_iter())) def test_invalid(self): a, b, c = test_utils.make_many(3) @@ -80,19 +96,21 @@ class PatternCompileTest(test.TestCase): a, b, c, d = test_utils.make_many(4) flo = uf.Flow("test") flo.add(a, b, c, d) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(5, len(g)) + + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(6, len(g)) self.assertItemsEqual(g.edges(), [ - (flo, a), - (flo, b), - (flo, c), - (flo, d), + ('test', 'a'), + ('test', 'b'), + ('test', 'c'), + ('test', 'd'), + ('a', 'test[$]'), + ('b', 'test[$]'), + ('c', 'test[$]'), + ('d', 'test[$]'), ]) - self.assertEqual(set([a, b, c, d]), - set(g.no_successors_iter())) - self.assertEqual(set([flo]), - set(g.no_predecessors_iter())) + self.assertEqual(set(['test']), set(g.no_predecessors_iter())) def test_linear_nested(self): a, b, c, d = test_utils.make_many(4) @@ -102,22 +120,22 @@ class PatternCompileTest(test.TestCase): inner_flo.add(c, d) flo.add(inner_flo) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph - self.assertEqual(6, len(graph)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(8, len(g)) - lb = graph.subgraph([a, b]) - self.assertFalse(lb.has_edge(b, a)) - self.assertTrue(lb.has_edge(a, b)) - self.assertEqual({'invariant': True}, graph.get_edge_data(a, b)) + sub_g = g.subgraph(['a', 'b']) + self.assertFalse(sub_g.has_edge('b', 'a')) + self.assertTrue(sub_g.has_edge('a', 'b')) + self.assertEqual({'invariant': True}, sub_g.get_edge_data("a", "b")) - ub = graph.subgraph([c, d]) - self.assertEqual(0, ub.number_of_edges()) + sub_g = g.subgraph(['c', 'd']) + self.assertEqual(0, sub_g.number_of_edges()) # This ensures that c and d do not start executing until after b. - self.assertTrue(graph.has_edge(b, inner_flo)) - self.assertTrue(graph.has_edge(inner_flo, c)) - self.assertTrue(graph.has_edge(inner_flo, d)) + self.assertTrue(g.has_edge('b', 'test2')) + self.assertTrue(g.has_edge('test2', 'c')) + self.assertTrue(g.has_edge('test2', 'd')) def test_unordered_nested(self): a, b, c, d = test_utils.make_many(4) @@ -127,15 +145,19 @@ class PatternCompileTest(test.TestCase): flo2.add(c, d) flo.add(flo2) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(6, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(8, len(g)) self.assertItemsEqual(g.edges(), [ - (flo, a), - (flo, b), - (flo, flo2), - (flo2, c), - (c, d) + ('test', 'a'), + ('test', 'b'), + ('test', 'test2'), + ('test2', 'c'), + ('c', 'd'), + ('d', 'test2[$]'), + ('test2[$]', 'test[$]'), + ('a', 'test[$]'), + ('b', 'test[$]'), ]) def test_unordered_nested_in_linear(self): @@ -143,27 +165,27 @@ class PatternCompileTest(test.TestCase): inner_flo = uf.Flow('ut').add(b, c) flo = lf.Flow('lt').add(a, inner_flo, d) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(6, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(8, len(g)) self.assertItemsEqual(g.edges(), [ - (flo, a), - (a, inner_flo), - (inner_flo, b), - (inner_flo, c), - (b, d), - (c, d), + ('lt', 'a'), + ('a', 'ut'), + ('ut', 'b'), + ('ut', 'c'), + ('b', 'ut[$]'), + ('c', 'ut[$]'), + ('ut[$]', 'd'), + ('d', 'lt[$]'), ]) def test_graph(self): a, b, c, d = test_utils.make_many(4) flo = gf.Flow("test") flo.add(a, b, c, d) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(5, len(g)) - self.assertEqual(4, g.number_of_edges()) + self.assertEqual(6, len(compilation.execution_graph)) + self.assertEqual(8, compilation.execution_graph.number_of_edges()) def test_graph_nested(self): a, b, c, d, e, f, g = test_utils.make_many(7) @@ -174,19 +196,26 @@ class PatternCompileTest(test.TestCase): flo2.add(e, f, g) flo.add(flo2) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph - self.assertEqual(9, len(graph)) - self.assertItemsEqual(graph.edges(), [ - (flo, a), - (flo, b), - (flo, c), - (flo, d), - (flo, flo2), + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(11, len(g)) + self.assertItemsEqual(g.edges(), [ + ('test', 'a'), + ('test', 'b'), + ('test', 'c'), + ('test', 'd'), + ('a', 'test[$]'), + ('b', 'test[$]'), + ('c', 'test[$]'), + ('d', 'test[$]'), - (flo2, e), - (e, f), - (f, g), + ('test', 'test2'), + ('test2', 'e'), + ('e', 'f'), + ('f', 'g'), + + ('g', 'test2[$]'), + ('test2[$]', 'test[$]'), ]) def test_graph_nested_graph(self): @@ -198,19 +227,29 @@ class PatternCompileTest(test.TestCase): flo2.add(e, f, g) flo.add(flo2) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph - self.assertEqual(9, len(graph)) - self.assertItemsEqual(graph.edges(), [ - (flo, a), - (flo, b), - (flo, c), - (flo, d), - (flo, flo2), + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(11, len(g)) + self.assertItemsEqual(g.edges(), [ + ('test', 'a'), + ('test', 'b'), + ('test', 'c'), + ('test', 'd'), + ('test', 'test2'), - (flo2, e), - (flo2, f), - (flo2, g), + ('test2', 'e'), + ('test2', 'f'), + ('test2', 'g'), + + ('e', 'test2[$]'), + ('f', 'test2[$]'), + ('g', 'test2[$]'), + + ('test2[$]', 'test[$]'), + ('a', 'test[$]'), + ('b', 'test[$]'), + ('c', 'test[$]'), + ('d', 'test[$]'), ]) def test_graph_links(self): @@ -221,33 +260,34 @@ class PatternCompileTest(test.TestCase): flo.link(b, c) flo.link(c, d) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(5, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(6, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, a, {'invariant': True}), - - (a, b, {'manual': True}), - (b, c, {'manual': True}), - (c, d, {'manual': True}), + ('test', 'a', {'invariant': True}), + ('a', 'b', {'manual': True}), + ('b', 'c', {'manual': True}), + ('c', 'd', {'manual': True}), + ('d', 'test[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], g.no_predecessors_iter()) - self.assertItemsEqual([d], g.no_successors_iter()) + self.assertItemsEqual(['test'], g.no_predecessors_iter()) + self.assertItemsEqual(['test[$]'], g.no_successors_iter()) def test_graph_dependencies(self): a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[]) b = test_utils.ProvidesRequiresTask('b', provides=[], requires=['x']) flo = gf.Flow("test").add(a, b) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(3, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(4, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, a, {'invariant': True}), - (a, b, {'reasons': set(['x'])}) + ('test', 'a', {'invariant': True}), + ('a', 'b', {'reasons': set(['x'])}), + ('b', 'test[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], g.no_predecessors_iter()) - self.assertItemsEqual([b], g.no_successors_iter()) + self.assertItemsEqual(['test'], g.no_predecessors_iter()) + self.assertItemsEqual(['test[$]'], g.no_successors_iter()) def test_graph_nested_requires(self): a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[]) @@ -256,17 +296,19 @@ class PatternCompileTest(test.TestCase): inner_flo = lf.Flow("test2").add(b, c) flo = gf.Flow("test").add(a, inner_flo) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph - self.assertEqual(5, len(graph)) - self.assertItemsEqual(graph.edges(data=True), [ - (flo, a, {'invariant': True}), - (inner_flo, b, {'invariant': True}), - (a, inner_flo, {'reasons': set(['x'])}), - (b, c, {'invariant': True}), + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(7, len(g)) + self.assertItemsEqual(g.edges(data=True), [ + ('test', 'a', {'invariant': True}), + ('test2', 'b', {'invariant': True}), + ('a', 'test2', {'reasons': set(['x'])}), + ('b', 'c', {'invariant': True}), + ('c', 'test2[$]', {'invariant': True}), + ('test2[$]', 'test[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], graph.no_predecessors_iter()) - self.assertItemsEqual([c], graph.no_successors_iter()) + self.assertItemsEqual(['test'], list(g.no_predecessors_iter())) + self.assertItemsEqual(['test[$]'], list(g.no_successors_iter())) def test_graph_nested_provides(self): a = test_utils.ProvidesRequiresTask('a', provides=[], requires=['x']) @@ -275,18 +317,22 @@ class PatternCompileTest(test.TestCase): inner_flo = lf.Flow("test2").add(b, c) flo = gf.Flow("test").add(a, inner_flo) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph - self.assertEqual(5, len(graph)) - self.assertItemsEqual(graph.edges(data=True), [ - (flo, inner_flo, {'invariant': True}), + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(7, len(g)) + self.assertItemsEqual(g.edges(data=True), [ + ('test', 'test2', {'invariant': True}), + ('a', 'test[$]', {'invariant': True}), - (inner_flo, b, {'invariant': True}), - (b, c, {'invariant': True}), - (c, a, {'reasons': set(['x'])}), + # The 'x' requirement is produced out of test2... + ('test2[$]', 'a', {'reasons': set(['x'])}), + + ('test2', 'b', {'invariant': True}), + ('b', 'c', {'invariant': True}), + ('c', 'test2[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], graph.no_predecessors_iter()) - self.assertItemsEqual([a], graph.no_successors_iter()) + self.assertItemsEqual(['test'], g.no_predecessors_iter()) + self.assertItemsEqual(['test[$]'], g.no_successors_iter()) def test_empty_flow_in_linear_flow(self): flo = lf.Flow('lf') @@ -295,12 +341,14 @@ class PatternCompileTest(test.TestCase): empty_flo = gf.Flow("empty") flo.add(a, empty_flo, b) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph - self.assertItemsEqual(graph.edges(), [ - (flo, a), - (a, empty_flo), - (empty_flo, b), + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertItemsEqual(g.edges(), [ + ("lf", "a"), + ("a", "empty"), + ("empty", "empty[$]"), + ("empty[$]", "b"), + ("b", "lf[$]"), ]) def test_many_empty_in_graph_flow(self): @@ -331,22 +379,24 @@ class PatternCompileTest(test.TestCase): flo.link(a, d) flo.link(c, d) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) - self.assertTrue(graph.has_edge(flo, a)) + self.assertTrue(g.has_edge('root', 'a')) + self.assertTrue(g.has_edge('root', 'b')) + self.assertTrue(g.has_edge('root', 'c')) - self.assertTrue(graph.has_edge(flo, b)) - self.assertTrue(graph.has_edge(b_0, b_1)) - self.assertTrue(graph.has_edge(b_1, b_2)) - self.assertTrue(graph.has_edge(b_2, b_3)) + self.assertTrue(g.has_edge('b.0', 'b.1')) + self.assertTrue(g.has_edge('b.1[$]', 'b.2')) + self.assertTrue(g.has_edge('b.2[$]', 'b.3')) - self.assertTrue(graph.has_edge(flo, c)) - self.assertTrue(graph.has_edge(c_0, c_1)) - self.assertTrue(graph.has_edge(c_1, c_2)) + self.assertTrue(g.has_edge('c.0[$]', 'c.1')) + self.assertTrue(g.has_edge('c.1[$]', 'c.2')) - self.assertTrue(graph.has_edge(b_3, d)) - self.assertEqual(12, len(graph)) + self.assertTrue(g.has_edge('a', 'd')) + self.assertTrue(g.has_edge('b[$]', 'd')) + self.assertTrue(g.has_edge('c[$]', 'd')) + self.assertEqual(20, len(g)) def test_empty_flow_in_nested_flow(self): flow = lf.Flow('lf') @@ -360,13 +410,13 @@ class PatternCompileTest(test.TestCase): flow2.add(c, empty_flow, d) flow.add(a, flow2, b) - compilation = compiler.PatternCompiler(flow).compile() - g = compilation.execution_graph - - for source, target in [(flow, a), (a, flow2), - (flow2, c), (c, empty_flow), - (empty_flow, d), (d, b)]: - self.assertTrue(g.has_edge(source, target)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flow).compile()) + for u, v in [('lf', 'a'), ('a', 'lf-2'), + ('lf-2', 'c'), ('c', 'empty'), + ('empty[$]', 'd'), ('d', 'lf-2[$]'), + ('lf-2[$]', 'b'), ('b', 'lf[$]')]: + self.assertTrue(g.has_edge(u, v)) def test_empty_flow_in_graph_flow(self): flow = lf.Flow('lf') @@ -379,7 +429,14 @@ class PatternCompileTest(test.TestCase): g = compilation.execution_graph self.assertTrue(g.has_edge(flow, a)) self.assertTrue(g.has_edge(a, empty_flow)) - self.assertTrue(g.has_edge(empty_flow, b)) + + empty_flow_successors = g.successors(empty_flow) + self.assertEqual(1, len(empty_flow_successors)) + empty_flow_terminal = empty_flow_successors[0] + self.assertIs(empty_flow, empty_flow_terminal.flow) + self.assertEqual(compiler.FLOW_END, + g.node[empty_flow_terminal]['kind']) + self.assertTrue(g.has_edge(empty_flow_terminal, b)) def test_empty_flow_in_graph_flow_linkage(self): flow = gf.Flow('lf') @@ -417,146 +474,154 @@ class PatternCompileTest(test.TestCase): def test_retry_in_linear_flow(self): flo = lf.Flow("test", retry.AlwaysRevert("c")) compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(2, len(g)) - self.assertEqual(1, g.number_of_edges()) + self.assertEqual(3, len(compilation.execution_graph)) + self.assertEqual(2, compilation.execution_graph.number_of_edges()) def test_retry_in_unordered_flow(self): flo = uf.Flow("test", retry.AlwaysRevert("c")) compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(2, len(g)) - self.assertEqual(1, g.number_of_edges()) + self.assertEqual(3, len(compilation.execution_graph)) + self.assertEqual(2, compilation.execution_graph.number_of_edges()) def test_retry_in_graph_flow(self): flo = gf.Flow("test", retry.AlwaysRevert("c")) compilation = compiler.PatternCompiler(flo).compile() g = compilation.execution_graph - self.assertEqual(2, len(g)) - self.assertEqual(1, g.number_of_edges()) + self.assertEqual(3, len(g)) + self.assertEqual(2, g.number_of_edges()) def test_retry_in_nested_flows(self): c1 = retry.AlwaysRevert("c1") c2 = retry.AlwaysRevert("c2") inner_flo = lf.Flow("test2", c2) flo = lf.Flow("test", c1).add(inner_flo) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(4, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(6, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, c1, {'invariant': True}), - (c1, inner_flo, {'invariant': True, 'retry': True}), - (inner_flo, c2, {'invariant': True}), + ('test', 'c1', {'invariant': True}), + ('c1', 'test2', {'invariant': True, 'retry': True}), + ('test2', 'c2', {'invariant': True}), + ('c2', 'test2[$]', {'invariant': True}), + ('test2[$]', 'test[$]', {'invariant': True}), ]) - self.assertIs(c1, g.node[c2]['retry']) - self.assertItemsEqual([flo], g.no_predecessors_iter()) - self.assertItemsEqual([c2], g.no_successors_iter()) + self.assertIs(c1, g.node['c2']['retry']) + self.assertItemsEqual(['test'], list(g.no_predecessors_iter())) + self.assertItemsEqual(['test[$]'], list(g.no_successors_iter())) def test_retry_in_linear_flow_with_tasks(self): c = retry.AlwaysRevert("c") a, b = test_utils.make_many(2) flo = lf.Flow("test", c).add(a, b) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(4, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(5, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, c, {'invariant': True}), - (a, b, {'invariant': True}), - (c, a, {'invariant': True, 'retry': True}) + ('test', 'c', {'invariant': True}), + ('a', 'b', {'invariant': True}), + ('c', 'a', {'invariant': True, 'retry': True}), + ('b', 'test[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], g.no_predecessors_iter()) - self.assertItemsEqual([b], g.no_successors_iter()) - self.assertIs(c, g.node[a]['retry']) - self.assertIs(c, g.node[b]['retry']) + self.assertItemsEqual(['test'], g.no_predecessors_iter()) + self.assertItemsEqual(['test[$]'], g.no_successors_iter()) + self.assertIs(c, g.node['a']['retry']) + self.assertIs(c, g.node['b']['retry']) def test_retry_in_unordered_flow_with_tasks(self): c = retry.AlwaysRevert("c") a, b = test_utils.make_many(2) flo = uf.Flow("test", c).add(a, b) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(4, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(5, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, c, {'invariant': True}), - (c, a, {'invariant': True, 'retry': True}), - (c, b, {'invariant': True, 'retry': True}), + ('test', 'c', {'invariant': True}), + ('c', 'a', {'invariant': True, 'retry': True}), + ('c', 'b', {'invariant': True, 'retry': True}), + ('b', 'test[$]', {'invariant': True}), + ('a', 'test[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], g.no_predecessors_iter()) - self.assertItemsEqual([a, b], g.no_successors_iter()) - self.assertIs(c, g.node[a]['retry']) - self.assertIs(c, g.node[b]['retry']) + self.assertItemsEqual(['test'], list(g.no_predecessors_iter())) + self.assertItemsEqual(['test[$]'], list(g.no_successors_iter())) + self.assertIs(c, g.node['a']['retry']) + self.assertIs(c, g.node['b']['retry']) def test_retry_in_graph_flow_with_tasks(self): - r = retry.AlwaysRevert("cp") + r = retry.AlwaysRevert("r") a, b, c = test_utils.make_many(3) flo = gf.Flow("test", r).add(a, b, c).link(b, c) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(5, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) self.assertItemsEqual(g.edges(data=True), [ - (flo, r, {'invariant': True}), - (r, a, {'invariant': True, 'retry': True}), - (r, b, {'invariant': True, 'retry': True}), - (b, c, {'manual': True}) + ('test', 'r', {'invariant': True}), + ('r', 'a', {'invariant': True, 'retry': True}), + ('r', 'b', {'invariant': True, 'retry': True}), + ('b', 'c', {'manual': True}), + ('a', 'test[$]', {'invariant': True}), + ('c', 'test[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], g.no_predecessors_iter()) - self.assertItemsEqual([a, c], g.no_successors_iter()) - self.assertIs(r, g.node[a]['retry']) - self.assertIs(r, g.node[b]['retry']) - self.assertIs(r, g.node[c]['retry']) + self.assertItemsEqual(['test'], g.no_predecessors_iter()) + self.assertItemsEqual(['test[$]'], g.no_successors_iter()) + self.assertIs(r, g.node['a']['retry']) + self.assertIs(r, g.node['b']['retry']) + self.assertIs(r, g.node['c']['retry']) def test_retries_hierarchy(self): - c1 = retry.AlwaysRevert("cp1") - c2 = retry.AlwaysRevert("cp2") + c1 = retry.AlwaysRevert("c1") + c2 = retry.AlwaysRevert("c2") a, b, c, d = test_utils.make_many(4) - inner_flo = lf.Flow("test", c2).add(b, c) + inner_flo = lf.Flow("test2", c2).add(b, c) flo = lf.Flow("test", c1).add(a, inner_flo, d) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(8, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(10, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, c1, {'invariant': True}), - (c1, a, {'invariant': True, 'retry': True}), - (a, inner_flo, {'invariant': True}), - (inner_flo, c2, {'invariant': True}), - (c2, b, {'invariant': True, 'retry': True}), - (b, c, {'invariant': True}), - (c, d, {'invariant': True}), + ('test', 'c1', {'invariant': True}), + ('c1', 'a', {'invariant': True, 'retry': True}), + ('a', 'test2', {'invariant': True}), + ('test2', 'c2', {'invariant': True}), + ('c2', 'b', {'invariant': True, 'retry': True}), + ('b', 'c', {'invariant': True}), + ('c', 'test2[$]', {'invariant': True}), + ('test2[$]', 'd', {'invariant': True}), + ('d', 'test[$]', {'invariant': True}), ]) - self.assertIs(c1, g.node[a]['retry']) - self.assertIs(c1, g.node[d]['retry']) - self.assertIs(c2, g.node[b]['retry']) - self.assertIs(c2, g.node[c]['retry']) - self.assertIs(c1, g.node[c2]['retry']) - self.assertIs(None, g.node[c1].get('retry')) + self.assertIs(c1, g.node['a']['retry']) + self.assertIs(c1, g.node['d']['retry']) + self.assertIs(c2, g.node['b']['retry']) + self.assertIs(c2, g.node['c']['retry']) + self.assertIs(c1, g.node['c2']['retry']) + self.assertIs(None, g.node['c1'].get('retry')) def test_retry_subflows_hierarchy(self): - c1 = retry.AlwaysRevert("cp1") + c1 = retry.AlwaysRevert("c1") a, b, c, d = test_utils.make_many(4) - inner_flo = lf.Flow("test").add(b, c) + inner_flo = lf.Flow("test2").add(b, c) flo = lf.Flow("test", c1).add(a, inner_flo, d) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(7, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(9, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, c1, {'invariant': True}), - (c1, a, {'invariant': True, 'retry': True}), - (a, inner_flo, {'invariant': True}), - (inner_flo, b, {'invariant': True}), - (b, c, {'invariant': True}), - (c, d, {'invariant': True}), + ('test', 'c1', {'invariant': True}), + ('c1', 'a', {'invariant': True, 'retry': True}), + ('a', 'test2', {'invariant': True}), + ('test2', 'b', {'invariant': True}), + ('b', 'c', {'invariant': True}), + ('c', 'test2[$]', {'invariant': True}), + ('test2[$]', 'd', {'invariant': True}), + ('d', 'test[$]', {'invariant': True}), ]) - self.assertIs(c1, g.node[a]['retry']) - self.assertIs(c1, g.node[d]['retry']) - self.assertIs(c1, g.node[b]['retry']) - self.assertIs(c1, g.node[c]['retry']) - self.assertIs(None, g.node[c1].get('retry')) + self.assertIs(c1, g.node['a']['retry']) + self.assertIs(c1, g.node['d']['retry']) + self.assertIs(c1, g.node['b']['retry']) + self.assertIs(c1, g.node['c']['retry']) + self.assertIs(None, g.node['c1'].get('retry')) diff --git a/taskflow/tests/unit/patterns/test_graph_flow.py b/taskflow/tests/unit/patterns/test_graph_flow.py index 7197dc721..429f68fdb 100644 --- a/taskflow/tests/unit/patterns/test_graph_flow.py +++ b/taskflow/tests/unit/patterns/test_graph_flow.py @@ -26,6 +26,16 @@ def _task(name, provides=None, requires=None): class GraphFlowTest(test.TestCase): + def test_invalid_decider_depth(self): + g_1 = utils.ProgressingTask(name='g-1') + g_2 = utils.ProgressingTask(name='g-2') + for not_a_depth in ['not-a-depth', object(), 2, 3.4, False]: + flow = gf.Flow('g') + flow.add(g_1, g_2) + self.assertRaises((ValueError, TypeError), + flow.link, g_1, g_2, + decider=lambda history: False, + decider_depth=not_a_depth) def test_graph_flow_stringy(self): f = gf.Flow('test') diff --git a/taskflow/tests/unit/test_deciders.py b/taskflow/tests/unit/test_deciders.py new file mode 100644 index 000000000..8bfc154d6 --- /dev/null +++ b/taskflow/tests/unit/test_deciders.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow import deciders +from taskflow import test + + +class TestDeciders(test.TestCase): + def test_translate(self): + for val in ['all', 'ALL', 'aLL', deciders.Depth.ALL]: + self.assertEqual(deciders.Depth.ALL, + deciders.Depth.translate(val)) + for val in ['atom', 'ATOM', 'atOM', deciders.Depth.ATOM]: + self.assertEqual(deciders.Depth.ATOM, + deciders.Depth.translate(val)) + for val in ['neighbors', 'Neighbors', + 'NEIGHBORS', deciders.Depth.NEIGHBORS]: + self.assertEqual(deciders.Depth.NEIGHBORS, + deciders.Depth.translate(val)) + for val in ['flow', 'FLOW', 'flOW', deciders.Depth.FLOW]: + self.assertEqual(deciders.Depth.FLOW, + deciders.Depth.translate(val)) + + def test_bad_translate(self): + self.assertRaises(TypeError, deciders.Depth.translate, 3) + self.assertRaises(TypeError, deciders.Depth.translate, object()) + self.assertRaises(ValueError, deciders.Depth.translate, "stuff") + + def test_pick_widest(self): + choices = [deciders.Depth.ATOM, deciders.Depth.FLOW] + self.assertEqual(deciders.Depth.FLOW, deciders.pick_widest(choices)) + choices = [deciders.Depth.ATOM, deciders.Depth.FLOW, + deciders.Depth.ALL] + self.assertEqual(deciders.Depth.ALL, deciders.pick_widest(choices)) + choices = [deciders.Depth.ATOM, deciders.Depth.FLOW, + deciders.Depth.ALL, deciders.Depth.NEIGHBORS] + self.assertEqual(deciders.Depth.ALL, deciders.pick_widest(choices)) + choices = [deciders.Depth.ATOM, deciders.Depth.NEIGHBORS] + self.assertEqual(deciders.Depth.NEIGHBORS, + deciders.pick_widest(choices)) + + def test_bad_pick_widest(self): + self.assertRaises(ValueError, deciders.pick_widest, []) + self.assertRaises(ValueError, deciders.pick_widest, ["a"]) + self.assertRaises(ValueError, deciders.pick_widest, set(['b'])) diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 288afb182..cb59a3e0e 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -669,6 +669,94 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): self.assertNotIn('task2.t REVERTED(None)', capturer.values) +class EngineDeciderDepthTest(utils.EngineTestBase): + + def test_run_graph_flow_decider_various_depths(self): + sub_flow_1 = gf.Flow('g_1') + g_1_1 = utils.ProgressingTask(name='g_1-1') + sub_flow_1.add(g_1_1) + g_1 = utils.ProgressingTask(name='g-1') + g_2 = utils.ProgressingTask(name='g-2') + g_3 = utils.ProgressingTask(name='g-3') + g_4 = utils.ProgressingTask(name='g-4') + for a_depth, ran_how_many in [('all', 1), + ('atom', 4), + ('flow', 2), + ('neighbors', 3)]: + flow = gf.Flow('g') + flow.add(g_1, g_2, sub_flow_1, g_3, g_4) + flow.link(g_1, g_2, + decider=lambda history: False, + decider_depth=a_depth) + flow.link(g_2, sub_flow_1) + flow.link(g_2, g_3) + flow.link(g_3, g_4) + flow.link(g_1, sub_flow_1, + decider=lambda history: True, + decider_depth=a_depth) + e = self._make_engine(flow) + with utils.CaptureListener(e, capture_flow=False) as capturer: + e.run() + ran_tasks = 0 + for outcome in capturer.values: + if outcome.endswith("RUNNING"): + ran_tasks += 1 + self.assertEqual(ran_how_many, ran_tasks) + + def test_run_graph_flow_decider_jump_over_atom(self): + flow = gf.Flow('g') + a = utils.AddOneSameProvidesRequires("a", inject={'value': 0}) + b = utils.AddOneSameProvidesRequires("b") + c = utils.AddOneSameProvidesRequires("c") + flow.add(a, b, c, resolve_requires=False) + flow.link(a, b, decider=lambda history: False, + decider_depth='atom') + flow.link(b, c) + e = self._make_engine(flow) + e.run() + self.assertEqual(2, e.storage.get('c')) + self.assertEqual(states.IGNORE, e.storage.get_atom_state('b')) + + def test_run_graph_flow_decider_jump_over_bad_atom(self): + flow = gf.Flow('g') + a = utils.NoopTask("a") + b = utils.FailingTask("b") + c = utils.NoopTask("c") + flow.add(a, b, c) + flow.link(a, b, decider=lambda history: False, + decider_depth='atom') + flow.link(b, c) + e = self._make_engine(flow) + e.run() + + def test_run_graph_flow_decider_revert(self): + flow = gf.Flow('g') + a = utils.NoopTask("a") + b = utils.NoopTask("b") + c = utils.FailingTask("c") + flow.add(a, b, c) + flow.link(a, b, decider=lambda history: False, + decider_depth='atom') + flow.link(b, c) + e = self._make_engine(flow) + with utils.CaptureListener(e, capture_flow=False) as capturer: + # Wrapped failure here for WBE engine, make this better in + # the future, perhaps via a custom testtools matcher?? + self.assertRaises((RuntimeError, exc.WrappedFailure), e.run) + expected = [ + 'a.t RUNNING', + 'a.t SUCCESS(None)', + 'b.t IGNORE', + 'c.t RUNNING', + 'c.t FAILURE(Failure: RuntimeError: Woot!)', + 'c.t REVERTING', + 'c.t REVERTED(None)', + 'a.t REVERTING', + 'a.t REVERTED(None)', + ] + self.assertEqual(expected, capturer.values) + + class EngineGraphFlowTest(utils.EngineTestBase): def test_run_empty_graph_flow(self): @@ -1043,9 +1131,10 @@ class EngineGraphConditionalFlowTest(utils.EngineTestBase): 'task4.t IGNORE', ] self.assertEqual(expected, capturer.values) - self.assertEqual(1, len(histories)) - self.assertIn('task1', histories[0]) - self.assertIn('task2', histories[0]) + self.assertEqual(2, len(histories)) + for i in range(0, 2): + self.assertIn('task1', histories[i]) + self.assertIn('task2', histories[i]) def test_graph_flow_conditional(self): flow = gf.Flow('root') @@ -1249,14 +1338,15 @@ class SerialEngineTest(EngineTaskTest, EngineResetTests, EngineGraphConditionalFlowTest, EngineCheckingTaskTest, + EngineDeciderDepthTest, test.TestCase): def _make_engine(self, flow, - flow_detail=None, store=None): + flow_detail=None, store=None, **kwargs): return taskflow.engines.load(flow, flow_detail=flow_detail, engine='serial', backend=self.backend, - store=store) + store=store, **kwargs) def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) @@ -1278,11 +1368,13 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, EngineMissingDepsTest, EngineGraphConditionalFlowTest, EngineCheckingTaskTest, + EngineDeciderDepthTest, test.TestCase): _EXECUTOR_WORKERS = 2 def _make_engine(self, flow, - flow_detail=None, executor=None, store=None): + flow_detail=None, executor=None, store=None, + **kwargs): if executor is None: executor = 'threads' return taskflow.engines.load(flow, flow_detail=flow_detail, @@ -1290,7 +1382,8 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, executor=executor, engine='parallel', store=store, - max_workers=self._EXECUTOR_WORKERS) + max_workers=self._EXECUTOR_WORKERS, + **kwargs) def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) @@ -1319,17 +1412,19 @@ class ParallelEngineWithEventletTest(EngineTaskTest, EngineMissingDepsTest, EngineGraphConditionalFlowTest, EngineCheckingTaskTest, + EngineDeciderDepthTest, test.TestCase): def _make_engine(self, flow, - flow_detail=None, executor=None, store=None): + flow_detail=None, executor=None, store=None, + **kwargs): if executor is None: executor = futurist.GreenThreadPoolExecutor() self.addCleanup(executor.shutdown) return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', executor=executor, - store=store) + store=store, **kwargs) class ParallelEngineWithProcessTest(EngineTaskTest, @@ -1342,6 +1437,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest, EngineResetTests, EngineMissingDepsTest, EngineGraphConditionalFlowTest, + EngineDeciderDepthTest, test.TestCase): _EXECUTOR_WORKERS = 2 @@ -1350,7 +1446,8 @@ class ParallelEngineWithProcessTest(EngineTaskTest, self.assertIsInstance(engine, eng.ParallelActionEngine) def _make_engine(self, flow, - flow_detail=None, executor=None, store=None): + flow_detail=None, executor=None, store=None, + **kwargs): if executor is None: executor = 'processes' return taskflow.engines.load(flow, flow_detail=flow_detail, @@ -1358,7 +1455,8 @@ class ParallelEngineWithProcessTest(EngineTaskTest, engine='parallel', executor=executor, store=store, - max_workers=self._EXECUTOR_WORKERS) + max_workers=self._EXECUTOR_WORKERS, + **kwargs) class WorkerBasedEngineTest(EngineTaskTest, @@ -1371,6 +1469,7 @@ class WorkerBasedEngineTest(EngineTaskTest, EngineResetTests, EngineMissingDepsTest, EngineGraphConditionalFlowTest, + EngineDeciderDepthTest, test.TestCase): def setUp(self): super(WorkerBasedEngineTest, self).setUp() @@ -1415,10 +1514,11 @@ class WorkerBasedEngineTest(EngineTaskTest, super(WorkerBasedEngineTest, self).tearDown() def _make_engine(self, flow, - flow_detail=None, store=None): + flow_detail=None, store=None, **kwargs): + kwargs.update(self.engine_conf) return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, - store=store, **self.engine_conf) + store=store, **kwargs) def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index cf435003b..effc54b50 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -244,24 +244,6 @@ class TestCountdownIter(test.TestCase): self.assertRaises(ValueError, six.next, it) -class TestLookFor(test.TestCase): - def test_no_matches(self): - hay = [9, 10, 11] - self.assertEqual([], misc.look_for(hay, [1, 2, 3])) - - def test_match_order(self): - hay = [6, 5, 4, 3, 2, 1] - priors = [] - for i in range(0, 6): - priors.append(i + 1) - matches = misc.look_for(hay, priors) - self.assertGreater(0, len(matches)) - self.assertIsSuperAndSubsequence(hay, matches) - hay = [10, 1, 15, 3, 5, 8, 44] - self.assertEqual([1, 15], misc.look_for(hay, [15, 1])) - self.assertEqual([10, 44], misc.look_for(hay, [44, 10])) - - class TestMergeUri(test.TestCase): def test_merge(self): url = "http://www.yahoo.com/?a=b&c=d" diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 1a516d839..5c2bf4fa5 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -226,37 +226,6 @@ def parse_uri(uri): return netutils.urlsplit(uri) -def look_for(haystack, needles, extractor=None): - """Find items in haystack and returns matches found (in haystack order). - - Given a list of items (the haystack) and a list of items to look for (the - needles) this will look for the needles in the haystack and returns - the found needles (if any). The ordering of the returned needles is in the - order they are located in the haystack. - - Example input and output: - - >>> from taskflow.utils import misc - >>> hay = [3, 2, 1] - >>> misc.look_for(hay, [1, 2]) - [2, 1] - """ - if not haystack: - return [] - if extractor is None: - extractor = lambda v: v - matches = [] - for i, v in enumerate(needles): - try: - matches.append((haystack.index(extractor(v)), i)) - except ValueError: - pass - if not matches: - return [] - else: - return [needles[i] for (_hay_i, i) in sorted(matches)] - - def disallow_when_frozen(excp_cls): """Frozen checking/raising method decorator."""