Allow for alterations in decider 'area of influence'

Christmas came early.

Closes-Bug: #1479466

Change-Id: I931d826690c925f022dbfffe9afb7bf41345b1d0
This commit is contained in:
Joshua Harlow 2015-11-16 16:27:42 -08:00 committed by Joshua Harlow
parent f555a35f30
commit 8e8156c488
23 changed files with 1452 additions and 732 deletions

View File

@ -448,11 +448,13 @@ Components
.. automodule:: taskflow.engines.action_engine.builder
.. automodule:: taskflow.engines.action_engine.compiler
.. automodule:: taskflow.engines.action_engine.completer
.. automodule:: taskflow.engines.action_engine.deciders
.. automodule:: taskflow.engines.action_engine.executor
.. automodule:: taskflow.engines.action_engine.runtime
.. automodule:: taskflow.engines.action_engine.scheduler
.. autoclass:: taskflow.engines.action_engine.scopes.ScopeWalker
:special-members: __iter__
.. automodule:: taskflow.engines.action_engine.traversal
Hierarchy
=========

View File

@ -21,6 +21,7 @@ Graph flow
~~~~~~~~~~
.. automodule:: taskflow.patterns.graph_flow
.. automodule:: taskflow.deciders
Hierarchy
~~~~~~~~~

99
taskflow/deciders.py Normal file
View File

@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from taskflow.utils import misc
class Depth(misc.StrEnum):
"""Enumeration of decider(s) *area of influence*."""
ALL = 'ALL'
"""
**Default** decider depth that affects **all** successor atoms (including
ones that are in successor nested flows).
"""
FLOW = 'FLOW'
"""
Decider depth that affects **all** successor tasks in the **same**
flow (it will **not** affect tasks/retries that are in successor
nested flows).
.. warning::
While using this kind we are allowed to execute successors of
things that have been ignored (for example nested flows and the
tasks they contain), this may result in symbol lookup errors during
running, user beware.
"""
NEIGHBORS = 'NEIGHBORS'
"""
Decider depth that affects only **next** successor tasks (and does
not traverse past **one** level of successor tasks).
.. warning::
While using this kind we are allowed to execute successors of
things that have been ignored (for example nested flows and the
tasks they contain), this may result in symbol lookup errors during
running, user beware.
"""
ATOM = 'ATOM'
"""
Decider depth that affects only **targeted** atom (and does
**not** traverse into **any** level of successor atoms).
.. warning::
While using this kind we are allowed to execute successors of
things that have been ignored (for example nested flows and the
tasks they contain), this may result in symbol lookup errors during
running, user beware.
"""
@classmethod
def translate(cls, desired_depth):
"""Translates a string into a depth enumeration."""
if isinstance(desired_depth, cls):
# Nothing to do in the first place...
return desired_depth
if not isinstance(desired_depth, six.string_types):
raise TypeError("Unexpected desired depth type, string type"
" expected, not %s" % type(desired_depth))
try:
return cls(desired_depth.upper())
except ValueError:
pretty_depths = sorted([a_depth.name for a_depth in cls])
raise ValueError("Unexpected decider depth value, one of"
" %s (case-insensitive) is expected and"
" not '%s'" % (pretty_depths, desired_depth))
# Depth area of influence order (from greater influence to least).
#
# Order very much matters here...
_ORDERING = tuple([
Depth.ALL, Depth.FLOW, Depth.NEIGHBORS, Depth.ATOM,
])
def pick_widest(depths):
"""Pick from many depths which has the **widest** area of influence."""
return _ORDERING[min(_ORDERING.index(d) for d in depths)]

View File

@ -14,108 +14,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import itertools
import operator
import weakref
import six
from taskflow.engines.action_engine import compiler as co
from taskflow.engines.action_engine import deciders
from taskflow.engines.action_engine import traversal
from taskflow import states as st
from taskflow.utils import iter_utils
def _depth_first_iterate(graph, connected_to_functors, initial_nodes_iter):
"""Iterates connected nodes in execution graph (from starting set).
Jumps over nodes with ``noop`` attribute (does not yield them back).
"""
stack = list(initial_nodes_iter)
while stack:
node = stack.pop()
node_attrs = graph.node[node]
if not node_attrs.get('noop'):
yield node
try:
node_kind = node_attrs['kind']
connected_to_functor = connected_to_functors[node_kind]
except KeyError:
pass
else:
stack.extend(connected_to_functor(node))
@six.add_metaclass(abc.ABCMeta)
class Decider(object):
"""Base class for deciders.
Provides interface to be implemented by sub-classes
Decider checks whether next atom in flow should be executed or not
"""
@abc.abstractmethod
def check(self, runtime):
"""Returns bool of whether this decider should allow running."""
@abc.abstractmethod
def affect(self, runtime):
"""If the :py:func:`~.check` returns false, affects associated atoms.
"""
def check_and_affect(self, runtime):
"""Handles :py:func:`~.check` + :py:func:`~.affect` in right order."""
proceed = self.check(runtime)
if not proceed:
self.affect(runtime)
return proceed
class IgnoreDecider(Decider):
"""Checks any provided edge-deciders and determines if ok to run."""
def __init__(self, atom, edge_deciders):
self._atom = atom
self._edge_deciders = edge_deciders
def check(self, runtime):
"""Returns bool of whether this decider should allow running."""
# Gather all atoms results so that those results can be used
# by the decider(s) that are making a decision as to pass or
# not pass...
results = {}
for node, node_kind, _local_decider in self._edge_deciders:
if node_kind in co.ATOMS:
results[node.name] = runtime.storage.get(node.name)
for _node, _node_kind, local_decider in self._edge_deciders:
if not local_decider(history=results):
return False
return True
def affect(self, runtime):
"""If the :py:func:`~.check` returns false, affects associated atoms.
This will alter the associated atom + successor atoms by setting there
state to ``IGNORE`` so that they are ignored in future runtime
activities.
"""
successors_iter = runtime.analyzer.iterate_connected_atoms(self._atom)
runtime.reset_atoms(itertools.chain([self._atom], successors_iter),
state=st.IGNORE, intention=st.IGNORE)
class NoOpDecider(Decider):
"""No-op decider that says it is always ok to run & has no effect(s)."""
def check(self, runtime):
"""Always good to go."""
return True
def affect(self, runtime):
"""Does nothing."""
class Analyzer(object):
"""Analyzes a compilation and aids in execution processes.
@ -142,7 +50,7 @@ class Analyzer(object):
if state == st.SUCCESS:
if intention == st.REVERT:
return iter([
(atom, NoOpDecider()),
(atom, deciders.NoOpDecider()),
])
elif intention == st.EXECUTE:
return self.browse_atoms_for_execute(atom=atom)
@ -165,10 +73,15 @@ class Analyzer(object):
if atom is None:
atom_it = self.iterate_nodes(co.ATOMS)
else:
successors_iter = self._execution_graph.successors_iter
atom_it = _depth_first_iterate(self._execution_graph,
{co.FLOW: successors_iter},
successors_iter(atom))
# NOTE(harlowja): the reason this uses breadth first is so that
# when deciders are applied that those deciders can be applied
# from top levels to lower levels since lower levels *may* be
# able to run even if top levels have deciders that decide to
# ignore some atoms... (going deeper first would make this
# problematic to determine as top levels can have their deciders
# applied **after** going deeper).
atom_it = traversal.breadth_first_iterate(
self._execution_graph, atom, traversal.Direction.FORWARD)
for atom in atom_it:
is_ready, late_decider = self._get_maybe_ready_for_execute(atom)
if is_ready:
@ -185,18 +98,50 @@ class Analyzer(object):
if atom is None:
atom_it = self.iterate_nodes(co.ATOMS)
else:
predecessors_iter = self._execution_graph.predecessors_iter
atom_it = _depth_first_iterate(self._execution_graph,
{co.FLOW: predecessors_iter},
predecessors_iter(atom))
atom_it = traversal.breadth_first_iterate(
self._execution_graph, atom, traversal.Direction.BACKWARD,
# Stop at the retry boundary (as retries 'control' there
# surronding atoms, and we don't want to back track over
# them so that they can correctly affect there associated
# atoms); we do though need to jump through all tasks since
# if a predecessor Y was ignored and a predecessor Z before Y
# was not it should be eligible to now revert...
through_retries=False)
for atom in atom_it:
is_ready, late_decider = self._get_maybe_ready_for_revert(atom)
if is_ready:
yield (atom, late_decider)
def _get_maybe_ready(self, atom, transition_to, allowed_intentions,
connected_fetcher, connected_checker,
connected_fetcher, ready_checker,
decider_fetcher):
def iter_connected_states():
# Lazily iterate over connected states so that ready checkers
# can stop early (vs having to consume and check all the
# things...)
for atom in connected_fetcher():
# TODO(harlowja): make this storage api better, its not
# especially clear what the following is doing (mainly
# to avoid two calls into storage).
atom_states = self._storage.get_atoms_states([atom.name])
yield (atom, atom_states[atom.name])
# NOTE(harlowja): How this works is the following...
#
# 1. First check if the current atom can even transition to the
# desired state, if not this atom is definitely not ready to
# execute or revert.
# 2. Check if the actual atoms intention is in one of the desired/ok
# intentions, if it is not there we are still not ready to execute
# or revert.
# 3. Iterate over (atom, atom_state, atom_intention) for all the
# atoms the 'connected_fetcher' callback yields from underlying
# storage and direct that iterator into the 'ready_checker'
# callback, that callback should then iterate over these entries
# and determine if it is ok to execute or revert.
# 4. If (and only if) 'ready_checker' returns true, then
# the 'decider_fetcher' callback is called to get a late decider
# which can (if it desires) affect this ready result (but does
# so right before the atom is about to be scheduled).
state = self._storage.get_atom_state(atom.name)
ok_to_transition = self._runtime.check_atom_transition(atom, state,
transition_to)
@ -205,59 +150,62 @@ class Analyzer(object):
intention = self._storage.get_atom_intention(atom.name)
if intention not in allowed_intentions:
return (False, None)
connected_states = self._storage.get_atoms_states(
connected_atom.name for connected_atom in connected_fetcher(atom))
ok_to_run = connected_checker(six.itervalues(connected_states))
ok_to_run = ready_checker(iter_connected_states())
if not ok_to_run:
return (False, None)
else:
return (True, decider_fetcher(atom))
return (True, decider_fetcher())
def _get_maybe_ready_for_execute(self, atom):
"""Returns if an atom is *likely* ready to be executed."""
def decider_fetcher(atom):
edge_deciders = self._runtime.fetch_edge_deciders(atom)
if edge_deciders:
return IgnoreDecider(atom, edge_deciders)
else:
return NoOpDecider()
predecessors_iter = self._execution_graph.predecessors_iter
connected_fetcher = lambda atom: \
_depth_first_iterate(self._execution_graph,
{co.FLOW: predecessors_iter},
predecessors_iter(atom))
connected_checker = lambda connected_iter: \
all(state == st.SUCCESS and intention == st.EXECUTE
for state, intention in connected_iter)
def ready_checker(pred_connected_it):
for _atom, (atom_state, atom_intention) in pred_connected_it:
if (atom_state in (st.SUCCESS, st.IGNORE) and
atom_intention in (st.EXECUTE, st.IGNORE)):
continue
return False
return True
decider_fetcher = lambda: \
deciders.IgnoreDecider(
atom, self._runtime.fetch_edge_deciders(atom))
connected_fetcher = lambda: \
traversal.depth_first_iterate(self._execution_graph, atom,
# Whether the desired atom
# can execute is dependent on its
# predecessors outcomes (thus why
# we look backwards).
traversal.Direction.BACKWARD)
# If this atoms current state is able to be transitioned to RUNNING
# and its intention is to EXECUTE and all of its predecessors executed
# successfully or were ignored then this atom is ready to execute.
return self._get_maybe_ready(atom, st.RUNNING, [st.EXECUTE],
connected_fetcher, connected_checker,
connected_fetcher, ready_checker,
decider_fetcher)
def _get_maybe_ready_for_revert(self, atom):
"""Returns if an atom is *likely* ready to be reverted."""
successors_iter = self._execution_graph.successors_iter
connected_fetcher = lambda atom: \
_depth_first_iterate(self._execution_graph,
{co.FLOW: successors_iter},
successors_iter(atom))
connected_checker = lambda connected_iter: \
all(state in (st.PENDING, st.REVERTED)
for state, _intention in connected_iter)
decider_fetcher = lambda atom: NoOpDecider()
def ready_checker(succ_connected_it):
for _atom, (atom_state, _atom_intention) in succ_connected_it:
if atom_state not in (st.PENDING, st.REVERTED, st.IGNORE):
return False
return True
noop_decider = deciders.NoOpDecider()
connected_fetcher = lambda: \
traversal.depth_first_iterate(self._execution_graph, atom,
# Whether the desired atom
# can revert is dependent on its
# successors states (thus why we
# look forwards).
traversal.Direction.FORWARD)
decider_fetcher = lambda: noop_decider
# If this atoms current state is able to be transitioned to REVERTING
# and its intention is either REVERT or RETRY and all of its
# successors are either PENDING or REVERTED then this atom is ready
# to revert.
return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY],
connected_fetcher, connected_checker,
connected_fetcher, ready_checker,
decider_fetcher)
def iterate_connected_atoms(self, atom):
"""Iterates **all** successor atoms connected to given atom."""
successors_iter = self._execution_graph.successors_iter
return _depth_first_iterate(
self._execution_graph, {
co.FLOW: successors_iter,
co.TASK: successors_iter,
co.RETRY: successors_iter,
}, successors_iter(atom))
def iterate_retries(self, state=None):
"""Iterates retry atoms that match the provided state.
@ -268,7 +216,8 @@ class Analyzer(object):
atom_states = self._storage.get_atoms_states(atom.name
for atom in atoms)
for atom in atoms:
if atom_states[atom.name][0] == state:
atom_state, _atom_intention = atom_states[atom.name]
if atom_state == state:
yield atom
else:
for atom in self.iterate_nodes((co.RETRY,)):
@ -290,7 +239,7 @@ class Analyzer(object):
atom_states = self._storage.get_atoms_states(atom.name
for atom in atoms)
for atom in atoms:
atom_state = atom_states[atom.name][0]
atom_state, _atom_intention = atom_states[atom.name]
if atom_state == st.IGNORE:
continue
if atom_state != st.SUCCESS:

View File

@ -223,11 +223,15 @@ class MachineBuilder(object):
atom, intention)
except Exception:
memory.failures.append(failure.Failure())
LOG.exception("Engine '%s' atom post-completion"
" failed", atom)
else:
try:
more_work = set(iter_next_atoms(atom=atom))
except Exception:
memory.failures.append(failure.Failure())
LOG.exception("Engine '%s' atom post-completion"
" next atom searching failed", atom)
else:
next_up.update(more_work)
if is_runnable() and next_up and not memory.failures:

View File

@ -40,22 +40,55 @@ LOG = logging.getLogger(__name__)
TASK = 'task'
RETRY = 'retry'
FLOW = 'flow'
FLOW_END = 'flow_end'
# Quite often used together, so make a tuple everyone can share...
ATOMS = (TASK, RETRY)
FLOWS = (FLOW, FLOW_END)
class Terminator(object):
"""Flow terminator class."""
def __init__(self, flow):
self._flow = flow
self._name = "%s[$]" % (self._flow.name,)
@property
def flow(self):
"""The flow which this terminator signifies/marks the end of."""
return self._flow
@property
def name(self):
"""Useful name this end terminator has (derived from flow name)."""
return self._name
def __str__(self):
return "%s[$]" % (self._flow,)
class Compilation(object):
"""The result of a compilers compile() is this *immutable* object."""
"""The result of a compilers ``compile()`` is this *immutable* object."""
#: Task nodes will have a ``kind`` attribute/metadata key with this value.
#: Task nodes will have a ``kind`` metadata key with this value.
TASK = TASK
#: Retry nodes will have a ``kind`` attribute/metadata key with this value.
#: Retry nodes will have a ``kind`` metadata key with this value.
RETRY = RETRY
#: Flow nodes will have a ``kind`` attribute/metadata key with this value.
FLOW = FLOW
"""
Flow **entry** nodes will have a ``kind`` metadata key with
this value.
"""
FLOW_END = FLOW_END
"""
Flow **exit** nodes will have a ``kind`` metadata key with
this value (only applicable for compilation execution graph, not currently
used in tree hierarchy).
"""
def __init__(self, execution_graph, hierarchy):
self._execution_graph = execution_graph
@ -141,6 +174,8 @@ class FlowCompiler(object):
_add_update_edges(graph, u_graph.no_successors_iter(),
list(v_graph.no_predecessors_iter()),
attr_dict=attr_dict)
# Insert the flow(s) retry if needed, and always make sure it
# is the **immediate** successor of the flow node itself.
if flow.retry is not None:
graph.add_node(flow.retry, kind=RETRY)
_add_update_edges(graph, [flow], [flow.retry],
@ -149,20 +184,42 @@ class FlowCompiler(object):
if node is not flow.retry and node is not flow:
graph.node[node].setdefault(RETRY, flow.retry)
from_nodes = [flow.retry]
connected_attr_dict = {LINK_INVARIANT: True, LINK_RETRY: True}
attr_dict = {LINK_INVARIANT: True, LINK_RETRY: True}
else:
from_nodes = [flow]
connected_attr_dict = {LINK_INVARIANT: True}
connected_to = [
node for node in graph.no_predecessors_iter() if node is not flow
]
if connected_to:
# Ensure all nodes in this graph(s) that have no
# predecessors depend on this flow (or this flow's retry) so that
# we can depend on the flow being traversed before its
# children (even though at the current time it will be skipped).
_add_update_edges(graph, from_nodes, connected_to,
attr_dict=connected_attr_dict)
attr_dict = {LINK_INVARIANT: True}
# Ensure all nodes with no predecessors are connected to this flow
# or its retry node (so that the invariant that the flow node is
# traversed through before its contents is maintained); this allows
# us to easily know when we have entered a flow (when running) and
# do special and/or smart things such as only traverse up to the
# start of a flow when looking for node deciders.
_add_update_edges(graph, from_nodes, [
node for node in graph.no_predecessors_iter()
if node is not flow
], attr_dict=attr_dict)
# Connect all nodes with no successors into a special terminator
# that is used to identify the end of the flow and ensure that all
# execution traversals will traverse over this node before executing
# further work (this is especially useful for nesting and knowing
# when we have exited a nesting level); it allows us to do special
# and/or smart things such as applying deciders up to (but not
# beyond) a flow termination point.
#
# Do note that in a empty flow this will just connect itself to
# the flow node itself... and also note we can not use the flow
# object itself (primarily because the underlying graph library
# uses hashing to identify node uniqueness and we can easily create
# a loop if we don't do this correctly, so avoid that by just
# creating this special node and tagging it with a special kind); we
# may be able to make this better in the future with a multidigraph
# that networkx provides??
flow_term = Terminator(flow)
graph.add_node(flow_term, kind=FLOW_END, noop=True)
_add_update_edges(graph, [
node for node in graph.no_successors_iter()
if node is not flow_term
], [flow_term], attr_dict={LINK_INVARIANT: True})
return graph, tree_node

View File

@ -128,7 +128,7 @@ class Completer(object):
atom_states = self._storage.get_atoms_states(atom.name
for atom in atoms)
for atom in atoms:
atom_state = atom_states[atom.name][0]
atom_state, _atom_intention = atom_states[atom.name]
if atom_state == st.FAILURE:
self._process_atom_failure(atom, self._storage.get(atom.name))
for retry in self._analyzer.iterate_retries(st.RETRYING):
@ -137,7 +137,7 @@ class Completer(object):
atom_states[atom.name] = (state, intention)
unfinished_atoms = set()
for atom in atoms:
atom_state = atom_states[atom.name][0]
atom_state, _atom_intention = atom_states[atom.name]
if atom_state in (st.RUNNING, st.REVERTING):
unfinished_atoms.add(atom)
return unfinished_atoms

View File

@ -0,0 +1,162 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import itertools
import six
from taskflow import deciders
from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import traversal
from taskflow import states
@six.add_metaclass(abc.ABCMeta)
class Decider(object):
"""Base class for deciders.
Provides interface to be implemented by sub-classes.
Deciders check whether next atom in flow should be executed or not.
"""
@abc.abstractmethod
def tally(self, runtime):
"""Tally edge deciders on whether this decider should allow running.
The returned value is a list of edge deciders that voted
'nay' (do not allow running).
"""
@abc.abstractmethod
def affect(self, runtime, nay_voters):
"""Affects associated atoms due to at least one 'nay' edge decider.
This will alter the associated atom + some set of successor atoms by
setting there state and intention to ``IGNORE`` so that they are
ignored in future runtime activities.
"""
def check_and_affect(self, runtime):
"""Handles :py:func:`~.tally` + :py:func:`~.affect` in right order.
NOTE(harlowja): If there are zero 'nay' edge deciders then it is
assumed this decider should allow running.
Returns boolean of whether this decider allows for running (or not).
"""
nay_voters = self.tally(runtime)
if nay_voters:
self.affect(runtime, nay_voters)
return False
return True
def _affect_all_successors(atom, runtime):
execution_graph = runtime.compilation.execution_graph
successors_iter = traversal.depth_first_iterate(
execution_graph, atom, traversal.Direction.FORWARD)
runtime.reset_atoms(itertools.chain([atom], successors_iter),
state=states.IGNORE, intention=states.IGNORE)
def _affect_successor_tasks_in_same_flow(atom, runtime):
execution_graph = runtime.compilation.execution_graph
successors_iter = traversal.depth_first_iterate(
execution_graph, atom, traversal.Direction.FORWARD,
# Do not go through nested flows but do follow *all* tasks that
# are directly connected in this same flow (thus the reason this is
# called the same flow decider); retries are direct successors
# of flows, so they should also be not traversed through, but
# setting this explicitly ensures that.
through_flows=False, through_retries=False)
runtime.reset_atoms(itertools.chain([atom], successors_iter),
state=states.IGNORE, intention=states.IGNORE)
def _affect_atom(atom, runtime):
runtime.reset_atoms([atom], state=states.IGNORE, intention=states.IGNORE)
def _affect_direct_task_neighbors(atom, runtime):
def _walk_neighbors():
execution_graph = runtime.compilation.execution_graph
for node in execution_graph.successors_iter(atom):
node_data = execution_graph.node[node]
if node_data['kind'] == compiler.TASK:
yield node
successors_iter = _walk_neighbors()
runtime.reset_atoms(itertools.chain([atom], successors_iter),
state=states.IGNORE, intention=states.IGNORE)
class IgnoreDecider(Decider):
"""Checks any provided edge-deciders and determines if ok to run."""
_depth_strategies = {
deciders.Depth.ALL: _affect_all_successors,
deciders.Depth.ATOM: _affect_atom,
deciders.Depth.FLOW: _affect_successor_tasks_in_same_flow,
deciders.Depth.NEIGHBORS: _affect_direct_task_neighbors,
}
def __init__(self, atom, edge_deciders):
self._atom = atom
self._edge_deciders = edge_deciders
def tally(self, runtime):
if not self._edge_deciders:
return []
# Gather all atoms (the ones that were not ignored) results so that
# those results can be used
# by the decider(s) that are making a decision as to pass or
# not pass...
states_intentions = runtime.storage.get_atoms_states(
ed.from_node.name for ed in self._edge_deciders
if ed.kind in compiler.ATOMS)
history = {}
for atom_name in six.iterkeys(states_intentions):
atom_state, _atom_intention = states_intentions[atom_name]
if atom_state != states.IGNORE:
history[atom_name] = runtime.storage.get(atom_name)
nay_voters = []
for ed in self._edge_deciders:
if ed.kind in compiler.ATOMS and ed.from_node.name not in history:
continue
if not ed.decider(history=history):
nay_voters.append(ed)
return nay_voters
def affect(self, runtime, nay_voters):
# If there were many 'nay' edge deciders that were targeted
# at this atom, then we need to pick the one which has the widest
# impact and respect that one as the decider depth that will
# actually affect things.
widest_depth = deciders.pick_widest(ed.depth for ed in nay_voters)
affector = self._depth_strategies[widest_depth]
return affector(self._atom, runtime)
class NoOpDecider(Decider):
"""No-op decider that says it is always ok to run & has no effect(s)."""
def tally(self, runtime):
"""Always good to go."""
return []
def affect(self, runtime, nay_voters):
"""Does nothing."""

View File

@ -99,37 +99,37 @@ class ActionEngine(base.Engine):
**Engine options:**
+-------------------+-----------------------+------+-----------+
| Name/key | Description | Type | Default |
+===================+=======================+======+===========+
| defer_reverts | This option lets you | bool | ``False`` |
| | safely nest flows | | |
| | with retries inside | | |
| | flows without retries | | |
| | and it still behaves | | |
| | as a user would | | |
| | expect (for example | | |
| | if the retry gets | | |
| | exhausted it reverts | | |
| | the outer flow unless | | |
| | the outer flow has a | | |
| | has a separate retry | | |
| | behavior). | | |
+-------------------+-----------------------+------+-----------+
| inject_transient | When true, values | bool | ``True`` |
| | that are local to | | |
| | each atoms scope | | |
| | are injected into | | |
| | storage into a | | |
| | transient location | | |
| | (typically a local | | |
| | dictionary), when | | |
| | false those values | | |
| | are instead persisted | | |
| | into atom details | | |
| | (and saved in a non- | | |
| | transient manner). | | |
+-------------------+-----------------------+------+-----------+
+----------------------+-----------------------+------+------------+
| Name/key | Description | Type | Default |
+======================+=======================+======+============+
| ``defer_reverts`` | This option lets you | bool | ``False`` |
| | safely nest flows | | |
| | with retries inside | | |
| | flows without retries | | |
| | and it still behaves | | |
| | as a user would | | |
| | expect (for example | | |
| | if the retry gets | | |
| | exhausted it reverts | | |
| | the outer flow unless | | |
| | the outer flow has a | | |
| | has a separate retry | | |
| | behavior). | | |
+----------------------+-----------------------+------+------------+
| ``inject_transient`` | When true, values | bool | ``True`` |
| | that are local to | | |
| | each atoms scope | | |
| | are injected into | | |
| | storage into a | | |
| | transient location | | |
| | (typically a local | | |
| | dictionary), when | | |
| | false those values | | |
| | are instead persisted | | |
| | into atom details | | |
| | (and saved in a non- | | |
| | transient manner). | | |
+----------------------+-----------------------+------+------------+
"""
NO_RERAISING_STATES = frozenset([states.SUSPENDED, states.SUCCESS])
@ -148,6 +148,12 @@ class ActionEngine(base.Engine):
end-users when doing execution iterations via :py:meth:`.run_iter`.
"""
MAX_MACHINE_STATES_RETAINED = 10
"""
During :py:meth:`~.run_iter` the last X state machine transitions will
be recorded (typically only useful on failure).
"""
def __init__(self, flow, flow_detail, backend, options):
super(ActionEngine, self).__init__(flow, flow_detail, backend, options)
self._runtime = None
@ -242,16 +248,21 @@ class ActionEngine(base.Engine):
self.compile()
self.prepare()
self.validate()
last_state = None
# Keep track of the last X state changes, which if a failure happens
# are quite useful to log (and the performance of tracking this
# should be negligible).
last_transitions = collections.deque(
maxlen=max(1, self.MAX_MACHINE_STATES_RETAINED))
with _start_stop(self._task_executor, self._retry_executor):
self._change_state(states.RUNNING)
try:
closed = False
machine, memory = self._runtime.builder.build(timeout=timeout)
r = runners.FiniteRunner(machine)
for (_prior_state, new_state) in r.run_iter(builder.START):
last_state = new_state
# NOTE(harlowja): skip over meta-states.
for transition in r.run_iter(builder.START):
last_transitions.append(transition)
_prior_state, new_state = transition
# NOTE(harlowja): skip over meta-states
if new_state in builder.META_STATES:
continue
if new_state == states.FAILURE:
@ -271,15 +282,24 @@ class ActionEngine(base.Engine):
self.suspend()
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception("Engine execution has failed, something"
" bad must of happened (last"
" %s machine transitions were %s)",
last_transitions.maxlen,
list(last_transitions))
self._change_state(states.FAILURE)
else:
if last_state and last_state not in self.IGNORABLE_STATES:
self._change_state(new_state)
if last_state not in self.NO_RERAISING_STATES:
it = itertools.chain(
six.itervalues(self.storage.get_failures()),
six.itervalues(self.storage.get_revert_failures()))
failure.Failure.reraise_if_any(it)
if last_transitions:
_prior_state, new_state = last_transitions[-1]
if new_state not in self.IGNORABLE_STATES:
self._change_state(new_state)
if new_state not in self.NO_RERAISING_STATES:
failures = self.storage.get_failures()
more_failures = self.storage.get_revert_failures()
fails = itertools.chain(
six.itervalues(failures),
six.itervalues(more_failures))
failure.Failure.reraise_if_any(fails)
@staticmethod
def _check_compilation(compilation):

View File

@ -19,6 +19,7 @@ import functools
from futurist import waiters
from taskflow import deciders as de
from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
from taskflow.engines.action_engine import analyzer as an
@ -27,11 +28,17 @@ from taskflow.engines.action_engine import compiler as com
from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
from taskflow.engines.action_engine import traversal as tr
from taskflow import exceptions as exc
from taskflow.flow import LINK_DECIDER
from taskflow import states as st
from taskflow.utils import misc
from taskflow.flow import (LINK_DECIDER, LINK_DECIDER_DEPTH) # noqa
# Small helper to make the edge decider tuples more easily useable...
_EdgeDecider = collections.namedtuple('_EdgeDecider',
'from_node,kind,decider,depth')
class Runtime(object):
"""A aggregate of runtime objects, properties, ... used during execution.
@ -42,7 +49,8 @@ class Runtime(object):
"""
def __init__(self, compilation, storage, atom_notifier,
task_executor, retry_executor, options=None):
task_executor, retry_executor,
options=None):
self._atom_notifier = atom_notifier
self._task_executor = task_executor
self._retry_executor = retry_executor
@ -51,11 +59,10 @@ class Runtime(object):
self._atom_cache = {}
self._options = misc.safe_copy_dict(options)
@staticmethod
def _walk_edge_deciders(graph, atom):
def _walk_edge_deciders(self, graph, atom):
"""Iterates through all nodes, deciders that alter atoms execution."""
# This is basically a reverse breadth first exploration, with
# special logic to further traverse down flow nodes...
# special logic to further traverse down flow nodes as needed...
predecessors_iter = graph.predecessors_iter
nodes = collections.deque((u_node, atom)
for u_node in predecessors_iter(atom))
@ -63,14 +70,19 @@ class Runtime(object):
while nodes:
u_node, v_node = nodes.popleft()
u_node_kind = graph.node[u_node]['kind']
u_v_data = graph.adj[u_node][v_node]
try:
yield (u_node, u_node_kind,
graph.adj[u_node][v_node][LINK_DECIDER])
decider = u_v_data[LINK_DECIDER]
decider_depth = u_v_data.get(LINK_DECIDER_DEPTH)
if decider_depth is None:
decider_depth = de.Depth.ALL
yield _EdgeDecider(u_node, u_node_kind,
decider, decider_depth)
except KeyError:
pass
if u_node_kind == com.FLOW and u_node not in visited:
# Avoid re-exploring the same flow if we get to this
# same flow by a different *future* path...
# Avoid re-exploring the same flow if we get to this same
# flow by a different *future* path...
visited.add(u_node)
# Since we *currently* jump over flow node(s), we need to make
# sure that any prior decider that was directed at this flow
@ -108,7 +120,7 @@ class Runtime(object):
graph = self._compilation.execution_graph
for node, node_data in graph.nodes_iter(data=True):
node_kind = node_data['kind']
if node_kind == com.FLOW:
if node_kind in com.FLOWS:
continue
elif node_kind in com.ATOMS:
check_transition_handler = check_transition_handlers[node_kind]
@ -128,6 +140,10 @@ class Runtime(object):
metadata['edge_deciders'] = tuple(deciders_it)
metadata['action'] = action
self._atom_cache[node.name] = metadata
# TODO(harlowja): optimize the different decider depths to avoid
# repeated full successor searching; this can be done by searching
# for the widest depth of parent(s), and limiting the search of
# children by the that depth.
@property
def compilation(self):
@ -246,11 +262,12 @@ class Runtime(object):
def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE):
"""Resets a atoms subgraph to the given state and intention.
The subgraph is contained of all of the atoms successors.
The subgraph is contained of **all** of the atoms successors.
"""
return self.reset_atoms(
self.analyzer.iterate_connected_atoms(atom),
state=state, intention=intention)
execution_graph = self._compilation.execution_graph
atoms_it = tr.depth_first_iterate(execution_graph, atom,
tr.Direction.FORWARD)
return self.reset_atoms(atoms_it, state=state, intention=intention)
def retry_subflow(self, retry):
"""Prepares a retrys + its subgraph for execution.

View File

@ -15,33 +15,12 @@
# under the License.
from taskflow.engines.action_engine import compiler as co
from taskflow.engines.action_engine import traversal as tr
from taskflow import logging
LOG = logging.getLogger(__name__)
def _depth_first_reverse_iterate(node, idx=-1):
"""Iterates connected (in reverse) nodes (from starting node).
Jumps through nodes with ``FLOW`` ``kind`` attribute (does not yield
them back).
"""
# Always go left to right, since right to left is the pattern order
# and we want to go backwards and not forwards through that ordering...
if idx == -1:
children_iter = node.reverse_iter()
else:
children_iter = reversed(node[0:idx])
for child in children_iter:
if child.metadata['kind'] == co.FLOW:
# Jump through these...
for child_child in child.dfs_iter(right_to_left=False):
if child_child.metadata['kind'] in co.ATOMS:
yield child_child.item
else:
yield child.item
class ScopeWalker(object):
"""Walks through the scopes of a atom using a engines compilation.
@ -117,7 +96,9 @@ class ScopeWalker(object):
except KeyError:
visible = []
removals = set()
for atom in _depth_first_reverse_iterate(parent, idx=last_idx):
atom_it = tr.depth_first_reverse_iterate(
parent, start_from_idx=last_idx)
for atom in atom_it:
if atom in predecessors:
predecessors.remove(atom)
removals.add(atom)

View File

@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import enum
from taskflow.engines.action_engine import compiler as co
class Direction(enum.Enum):
"""Traversal direction enum."""
#: Go through successors.
FORWARD = 1
#: Go through predecessors.
BACKWARD = 2
def _extract_connectors(execution_graph, starting_node, direction,
through_flows=True, through_retries=True,
through_tasks=True):
if direction == Direction.FORWARD:
connected_iter = execution_graph.successors_iter
else:
connected_iter = execution_graph.predecessors_iter
connected_to_functors = {}
if through_flows:
connected_to_functors[co.FLOW] = connected_iter
connected_to_functors[co.FLOW_END] = connected_iter
if through_retries:
connected_to_functors[co.RETRY] = connected_iter
if through_tasks:
connected_to_functors[co.TASK] = connected_iter
return connected_iter(starting_node), connected_to_functors
def breadth_first_iterate(execution_graph, starting_node, direction,
through_flows=True, through_retries=True,
through_tasks=True):
"""Iterates connected nodes in execution graph (from starting node).
Does so in a breadth first manner.
Jumps over nodes with ``noop`` attribute (does not yield them back).
"""
initial_nodes_iter, connected_to_functors = _extract_connectors(
execution_graph, starting_node, direction,
through_flows=through_flows, through_retries=through_retries,
through_tasks=through_tasks)
q = collections.deque(initial_nodes_iter)
while q:
node = q.popleft()
node_attrs = execution_graph.node[node]
if not node_attrs.get('noop'):
yield node
try:
node_kind = node_attrs['kind']
connected_to_functor = connected_to_functors[node_kind]
except KeyError:
pass
else:
q.extend(connected_to_functor(node))
def depth_first_iterate(execution_graph, starting_node, direction,
through_flows=True, through_retries=True,
through_tasks=True):
"""Iterates connected nodes in execution graph (from starting node).
Does so in a depth first manner.
Jumps over nodes with ``noop`` attribute (does not yield them back).
"""
initial_nodes_iter, connected_to_functors = _extract_connectors(
execution_graph, starting_node, direction,
through_flows=through_flows, through_retries=through_retries,
through_tasks=through_tasks)
stack = list(initial_nodes_iter)
while stack:
node = stack.pop()
node_attrs = execution_graph.node[node]
if not node_attrs.get('noop'):
yield node
try:
node_kind = node_attrs['kind']
connected_to_functor = connected_to_functors[node_kind]
except KeyError:
pass
else:
stack.extend(connected_to_functor(node))
def depth_first_reverse_iterate(node, start_from_idx=-1):
"""Iterates connected (in reverse) **tree** nodes (from starting node).
Jumps through nodes with ``noop`` attribute (does not yield them back).
"""
# Always go left to right, since right to left is the pattern order
# and we want to go backwards and not forwards through that ordering...
if start_from_idx == -1:
# All of them...
children_iter = node.reverse_iter()
else:
children_iter = reversed(node[0:start_from_idx])
for child in children_iter:
if child.metadata.get('noop'):
# Jump through these...
for grand_child in child.dfs_iter(right_to_left=False):
if grand_child.metadata['kind'] in co.ATOMS:
yield grand_child.item
else:
yield child.item

View File

@ -221,6 +221,14 @@ class InvalidFormat(TaskFlowException):
"""Raised when some object/entity is not in the expected format."""
class DisallowedAccess(TaskFlowException):
"""Raised when storage access is not possible due to state limitations."""
def __init__(self, message, cause=None, state=None):
super(DisallowedAccess, self).__init__(message, cause=cause)
self.state = state
# Others.
class NotImplementedError(NotImplementedError):

View File

@ -39,6 +39,9 @@ LINK_DECIDER = 'decider'
_CHOP_PAT = "taskflow.patterns."
_CHOP_PAT_LEN = len(_CHOP_PAT)
# This key denotes the depth the decider will apply (defaulting to all).
LINK_DECIDER_DEPTH = 'decider_depth'
@six.add_metaclass(abc.ABCMeta)
class Flow(object):

View File

@ -42,16 +42,20 @@ def _fetch_predecessor_tree(graph, atom):
"""Creates a tree of predecessors, rooted at given atom."""
root = tree.Node(atom)
stack = [(root, atom)]
seen = set()
while stack:
parent, node = stack.pop()
for pred_node in graph.predecessors_iter(node):
child = tree.Node(pred_node,
**graph.node[pred_node])
parent.add(child)
stack.append((child, pred_node))
seen.add(pred_node)
return len(seen), root
pred_node_data = graph.node[pred_node]
if pred_node_data['kind'] == compiler.FLOW_END:
# Jump over and/or don't show flow end nodes...
for pred_pred_node in graph.predecessors_iter(pred_node):
stack.append((parent, pred_pred_node))
else:
child = tree.Node(pred_node, **pred_node_data)
parent.add(child)
# And go further backwards...
stack.append((child, pred_node))
return root
class FailureFormatter(object):
@ -64,59 +68,51 @@ class FailureFormatter(object):
def __init__(self, engine, hide_inputs_outputs_of=()):
self._hide_inputs_outputs_of = hide_inputs_outputs_of
self._engine = engine
self._formatter_funcs = {
compiler.FLOW: self._format_flow,
}
for kind in compiler.ATOMS:
self._formatter_funcs[kind] = self._format_atom
def _format_atom(self, storage, cache, node):
"""Formats a single tree node (atom) into a string version."""
atom = node.item
atom_name = atom.name
atom_attrs = {}
intention, intention_found = _cached_get(cache, 'intentions',
atom_name,
storage.get_atom_intention,
atom_name)
if intention_found:
atom_attrs['intention'] = intention
state, state_found = _cached_get(cache, 'states', atom_name,
storage.get_atom_state, atom_name)
if state_found:
atom_attrs['state'] = state
if atom_name not in self._hide_inputs_outputs_of:
# When the cache does not exist for this atom this
# will be called with the rest of these arguments
# used to populate the cache.
fetch_mapped_args = functools.partial(
storage.fetch_mapped_args, atom.rebind,
atom_name=atom_name, optional_args=atom.optional)
requires, requires_found = _cached_get(cache, 'requires',
atom_name,
fetch_mapped_args)
if requires_found:
atom_attrs['requires'] = requires
provides, provides_found = _cached_get(cache, 'provides',
atom_name,
storage.get_execute_result,
atom_name)
if provides_found:
atom_attrs['provides'] = provides
if atom_attrs:
return "Atom '%s' %s" % (atom_name, atom_attrs)
else:
return "Atom '%s'" % (atom_name)
def _format_flow(self, storage, cache, node):
"""Formats a single tree node (flow) into a string version."""
flow = node.item
return flow.name
def _format_node(self, storage, cache, node):
"""Formats a single tree node into a string version."""
formatter_func = self. _formatter_funcs[node.metadata['kind']]
return formatter_func(storage, cache, node)
if node.metadata['kind'] == compiler.FLOW:
flow = node.item
flow_name = flow.name
return "Flow '%s'" % (flow_name)
elif node.metadata['kind'] in compiler.ATOMS:
atom = node.item
atom_name = atom.name
atom_attrs = {}
intention, intention_found = _cached_get(
cache, 'intentions', atom_name, storage.get_atom_intention,
atom_name)
if intention_found:
atom_attrs['intention'] = intention
state, state_found = _cached_get(cache, 'states', atom_name,
storage.get_atom_state,
atom_name)
if state_found:
atom_attrs['state'] = state
if atom_name not in self._hide_inputs_outputs_of:
# When the cache does not exist for this atom this
# will be called with the rest of these arguments
# used to populate the cache.
fetch_mapped_args = functools.partial(
storage.fetch_mapped_args, atom.rebind,
atom_name=atom_name, optional_args=atom.optional)
requires, requires_found = _cached_get(cache, 'requires',
atom_name,
fetch_mapped_args)
if requires_found:
atom_attrs['requires'] = requires
provides, provides_found = _cached_get(
cache, 'provides', atom_name,
storage.get_execute_result, atom_name)
if provides_found:
atom_attrs['provides'] = provides
if atom_attrs:
return "Atom '%s' %s" % (atom_name, atom_attrs)
else:
return "Atom '%s'" % (atom_name)
else:
raise TypeError("Unable to format node, unknown node"
" kind '%s' encountered" % node.metadata['kind'])
def format(self, fail, atom_matcher):
"""Returns a (exc_info, details) tuple about the failure.
@ -143,13 +139,11 @@ class FailureFormatter(object):
graph = compilation.execution_graph
atom_node = hierarchy.find_first_match(atom_matcher)
atom = None
priors = 0
atom_intention = None
if atom_node is not None:
atom = atom_node.item
atom_intention = storage.get_atom_intention(atom.name)
priors = sum(c for (_n, c) in graph.in_degree_iter([atom]))
if atom is not None and priors and atom_intention in self._BUILDERS:
if atom is not None and atom_intention in self._BUILDERS:
# Cache as much as we can, since the path of various atoms
# may cause the same atom to be seen repeatedly depending on
# the graph structure...
@ -160,12 +154,13 @@ class FailureFormatter(object):
'states': {},
}
builder, kind = self._BUILDERS[atom_intention]
count, rooted_tree = builder(graph, atom)
buff.write_nl('%s %s (most recent atoms first):' % (count, kind))
rooted_tree = builder(graph, atom)
child_count = rooted_tree.child_count(only_direct=False)
buff.write_nl('%s %s (most recent first):' % (child_count, kind))
formatter = functools.partial(self._format_node, storage, cache)
child_count = rooted_tree.child_count()
direct_child_count = rooted_tree.child_count(only_direct=True)
for i, child in enumerate(rooted_tree, 1):
if i == child_count:
if i == direct_child_count:
buff.write(child.pformat(stringify_node=formatter,
starting_prefix=" "))
else:

View File

@ -18,6 +18,7 @@ import collections
import six
from taskflow import deciders as de
from taskflow import exceptions as exc
from taskflow import flow
from taskflow.types import graph as gr
@ -73,7 +74,7 @@ class Flow(flow.Flow):
#: Extracts the unsatisified symbol requirements of a single node.
_unsatisfied_requires = staticmethod(_unsatisfied_requires)
def link(self, u, v, decider=None):
def link(self, u, v, decider=None, decider_depth=None):
"""Link existing node u as a runtime dependency of existing node v.
Note that if the addition of these edges creates a `cyclic`_ graph
@ -93,6 +94,13 @@ class Flow(flow.Flow):
links that have ``v`` as a target. It is expected to
return a single boolean (``True`` to allow ``v``
execution or ``False`` to not).
:param decider_depth: One of the :py:class:`~taskflow.deciders.Depth`
enumerations (or a string version of) that will
be used to influence what atoms are ignored
when the decider provided results false. If
not provided (and a valid decider is provided
then this defaults to
:py:attr:`~taskflow.deciders.Depth.ALL`).
.. _cyclic: https://en.wikipedia.org/wiki/Cycle_graph
"""
@ -103,11 +111,13 @@ class Flow(flow.Flow):
if decider is not None:
if not six.callable(decider):
raise ValueError("Decider boolean callback must be callable")
self._swap(self._link(u, v, manual=True, decider=decider))
self._swap(self._link(u, v, manual=True,
decider=decider, decider_depth=decider_depth))
return self
def _link(self, u, v, graph=None,
reason=None, manual=False, decider=None):
reason=None, manual=False, decider=None,
decider_depth=None):
mutable_graph = True
if graph is None:
graph = self._graph
@ -119,6 +129,18 @@ class Flow(flow.Flow):
attrs = {}
if decider is not None:
attrs[flow.LINK_DECIDER] = decider
try:
# Remove existing decider depth, if one existed.
del attrs[flow.LINK_DECIDER_DEPTH]
except KeyError:
pass
if decider_depth is not None:
if decider is None:
raise ValueError("Decider depth requires a decider to be"
" provided along with it")
else:
decider_depth = de.Depth.translate(decider_depth)
attrs[flow.LINK_DECIDER_DEPTH] = decider_depth
if manual:
attrs[flow.LINK_MANUAL] = True
if reason is not None:

View File

@ -15,6 +15,7 @@
# under the License.
import contextlib
import functools
import fasteners
from oslo_utils import reflection
@ -84,6 +85,127 @@ META_PROGRESS = 'progress'
META_PROGRESS_DETAILS = 'progress_details'
class _ProviderLocator(object):
"""Helper to start to better decouple the finding logic from storage.
WIP: part of the larger effort to cleanup/refactor the finding of named
arguments so that the code can be more unified and easy to
follow...
"""
def __init__(self, transient_results,
providers_fetcher, result_fetcher):
self.result_fetcher = result_fetcher
self.providers_fetcher = providers_fetcher
self.transient_results = transient_results
def _try_get_results(self, looking_for, provider,
look_into_results=True, find_potentials=False):
if provider.name is _TRANSIENT_PROVIDER:
# TODO(harlowja): This 'is' check still sucks, do this
# better in the future...
results = self.transient_results
else:
try:
results = self.result_fetcher(provider.name)
except (exceptions.NotFound, exceptions.DisallowedAccess):
if not find_potentials:
raise
else:
# Ok, likely hasn't produced a result yet, but
# at a future point it hopefully will, so stub
# out the *expected* result.
results = {}
if look_into_results:
_item_from_single(provider, results, looking_for)
return results
def _find(self, looking_for, scope_walker=None,
short_circuit=True, find_potentials=False):
if scope_walker is None:
scope_walker = []
default_providers, atom_providers = self.providers_fetcher(looking_for)
searched_providers = set()
providers_and_results = []
if default_providers:
for p in default_providers:
searched_providers.add(p)
try:
provider_results = self._try_get_results(
looking_for, p, find_potentials=find_potentials,
# For default providers always look into there
# results as default providers are statically setup
# and therefore looking into there provided results
# should fail early.
look_into_results=True)
except exceptions.NotFound:
if not find_potentials:
raise
else:
providers_and_results.append((p, provider_results))
if short_circuit:
return (searched_providers, providers_and_results)
if not atom_providers:
return (searched_providers, providers_and_results)
atom_providers_by_name = dict((p.name, p) for p in atom_providers)
for accessible_atom_names in iter(scope_walker):
# *Always* retain the scope ordering (if any matches
# happen); instead of retaining the possible provider match
# order (which isn't that important and may be different from
# the scope requested ordering).
maybe_atom_providers = [atom_providers_by_name[atom_name]
for atom_name in accessible_atom_names
if atom_name in atom_providers_by_name]
tmp_providers_and_results = []
if find_potentials:
for p in maybe_atom_providers:
searched_providers.add(p)
tmp_providers_and_results.append((p, {}))
else:
for p in maybe_atom_providers:
searched_providers.add(p)
try:
# Don't at this point look into the provider results
# as calling code will grab all providers, and then
# get the result from the *first* provider that
# actually provided it (or die).
provider_results = self._try_get_results(
looking_for, p, find_potentials=find_potentials,
look_into_results=False)
except exceptions.DisallowedAccess as e:
if e.state != states.IGNORE:
exceptions.raise_with_cause(
exceptions.NotFound,
"Expected to be able to find output %r"
" produced by %s but was unable to get at"
" that providers results" % (looking_for, p))
else:
LOG.blather("Avoiding using the results of"
" %r (from %s) for name %r because"
" it was ignored", p.name, p,
looking_for)
else:
tmp_providers_and_results.append((p, provider_results))
if tmp_providers_and_results and short_circuit:
return (searched_providers, tmp_providers_and_results)
else:
providers_and_results.extend(tmp_providers_and_results)
return (searched_providers, providers_and_results)
def find_potentials(self, looking_for, scope_walker=None):
"""Returns the accessible **potential** providers."""
_searched_providers, providers_and_results = self._find(
looking_for, scope_walker=scope_walker,
short_circuit=False, find_potentials=True)
return set(p for (p, _provider_results) in providers_and_results)
def find(self, looking_for, scope_walker=None, short_circuit=True):
"""Returns the accessible providers."""
return self._find(looking_for, scope_walker=scope_walker,
short_circuit=short_circuit,
find_potentials=False)
class _Provider(object):
"""A named symbol provider that produces a output at the given index."""
@ -326,13 +448,13 @@ class Storage(object):
ad = self._flowdetail.find(self._atom_name_to_uuid[atom_name])
except KeyError:
exceptions.raise_with_cause(exceptions.NotFound,
"Unknown atom name: %s" % atom_name)
"Unknown atom name '%s'" % atom_name)
else:
# TODO(harlowja): we need to figure out how to get away from doing
# these kinds of type checks in general (since they likely mean
# we aren't doing something right).
if expected_type and not isinstance(ad, expected_type):
raise TypeError("Atom %s is not of the expected type: %s"
raise TypeError("Atom '%s' is not of the expected type: %s"
% (atom_name,
reflection.get_class_name(expected_type)))
if clone:
@ -479,8 +601,9 @@ class Storage(object):
try:
_item_from(container, index)
except _EXTRACTION_EXCEPTIONS:
LOG.warning("Atom %s did not supply result "
"with index %r (name %s)", atom_name, index, name)
LOG.warning("Atom '%s' did not supply result "
"with index %r (name '%s')", atom_name, index,
name)
@fasteners.write_locked
def save(self, atom_name, result, state=states.SUCCESS):
@ -545,17 +668,34 @@ class Storage(object):
except KeyError:
pass
return failure
# TODO(harlowja): this seems like it should be checked before fetching
# the potential failure, instead of after, fix this soon...
if source.state not in allowed_states:
raise exceptions.NotFound("Result for atom %s is not currently"
" known" % atom_name)
return getattr(source, results_attr_name)
else:
if source.state not in allowed_states:
raise exceptions.DisallowedAccess(
"Result for atom '%s' is not known/accessible"
" due to it being in %s state when result access"
" is restricted to %s states" % (atom_name,
source.state,
allowed_states),
state=source.state)
return getattr(source, results_attr_name)
def get_execute_result(self, atom_name):
"""Gets the ``execute`` results for an atom from storage."""
return self._get(atom_name, 'results', 'failure',
_EXECUTE_STATES_WITH_RESULTS, states.EXECUTE)
try:
results = self._get(atom_name, 'results', 'failure',
_EXECUTE_STATES_WITH_RESULTS, states.EXECUTE)
except exceptions.DisallowedAccess as e:
if e.state == states.IGNORE:
exceptions.raise_with_cause(exceptions.NotFound,
"Result for atom '%s' execution"
" is not known (as it was"
" ignored)" % atom_name)
else:
exceptions.raise_with_cause(exceptions.NotFound,
"Result for atom '%s' execution"
" is not known" % atom_name)
else:
return results
@fasteners.read_locked
def _get_failures(self, fail_cache_key):
@ -577,8 +717,21 @@ class Storage(object):
def get_revert_result(self, atom_name):
"""Gets the ``revert`` results for an atom from storage."""
return self._get(atom_name, 'revert_results', 'revert_failure',
_REVERT_STATES_WITH_RESULTS, states.REVERT)
try:
results = self._get(atom_name, 'revert_results', 'revert_failure',
_REVERT_STATES_WITH_RESULTS, states.REVERT)
except exceptions.DisallowedAccess as e:
if e.state == states.IGNORE:
exceptions.raise_with_cause(exceptions.NotFound,
"Result for atom '%s' revert is"
" not known (as it was"
" ignored)" % atom_name)
else:
exceptions.raise_with_cause(exceptions.NotFound,
"Result for atom '%s' revert is"
" not known" % atom_name)
else:
return results
def get_revert_failures(self):
"""Get all ``revert`` failures that happened with this flow."""
@ -639,7 +792,7 @@ class Storage(object):
be serializable).
"""
if atom_name not in self._atom_name_to_uuid:
raise exceptions.NotFound("Unknown atom name: %s" % atom_name)
raise exceptions.NotFound("Unknown atom name '%s'" % atom_name)
def save_transient():
self._injected_args.setdefault(atom_name, {})
@ -728,6 +881,19 @@ class Storage(object):
self._set_result_mapping(provider_name,
dict((name, name) for name in names))
def _fetch_providers(self, looking_for, providers=None):
"""Return pair of (default providers, atom providers)."""
if providers is None:
providers = self._reverse_mapping.get(looking_for, [])
default_providers = []
atom_providers = []
for p in providers:
if p.name in (_TRANSIENT_PROVIDER, self.injector_name):
default_providers.append(p)
else:
atom_providers.append(p)
return default_providers, atom_providers
def _set_result_mapping(self, provider_name, mapping):
"""Sets the result mapping for a given producer.
@ -757,30 +923,30 @@ class Storage(object):
if many_handler is None:
many_handler = _many_handler
try:
providers = self._reverse_mapping[name]
maybe_providers = self._reverse_mapping[name]
except KeyError:
exceptions.raise_with_cause(exceptions.NotFound,
"Name %r is not mapped as a produced"
" output by any providers" % name)
raise exceptions.NotFound("Name %r is not mapped as a produced"
" output by any providers" % name)
locator = _ProviderLocator(
self._transients,
functools.partial(self._fetch_providers,
providers=maybe_providers),
lambda atom_name:
self._get(atom_name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS, states.EXECUTE))
values = []
for provider in providers:
if provider.name is _TRANSIENT_PROVIDER:
values.append(_item_from_single(provider,
self._transients, name))
else:
try:
container = self._get(provider.name,
'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS,
states.EXECUTE)
except exceptions.NotFound:
pass
else:
values.append(_item_from_single(provider,
container, name))
searched_providers, providers = locator.find(
name, short_circuit=False,
# NOTE(harlowja): There are no scopes used here (as of now), so
# we just return all known providers as if it was one large
# scope.
scope_walker=[[p.name for p in maybe_providers]])
for provider, results in providers:
values.append(_item_from_single(provider, results, name))
if not values:
raise exceptions.NotFound("Unable to find result %r,"
" searched %s" % (name, providers))
raise exceptions.NotFound(
"Unable to find result %r, searched %s providers"
% (name, len(searched_providers)))
else:
return many_handler(values)
@ -796,49 +962,6 @@ class Storage(object):
needed values; it just checks that they are registered to produce
it in the future.
"""
def _fetch_providers(name):
"""Fetchs pair of (default providers, non-default providers)."""
default_providers = []
non_default_providers = []
for p in self._reverse_mapping.get(name, []):
if p.name in (_TRANSIENT_PROVIDER, self.injector_name):
default_providers.append(p)
else:
non_default_providers.append(p)
return default_providers, non_default_providers
def _locate_providers(name, scope_walker=None):
"""Finds the accessible *potential* providers."""
default_providers, non_default_providers = _fetch_providers(name)
providers = []
if non_default_providers:
if scope_walker is not None:
scope_iter = iter(scope_walker)
else:
scope_iter = iter([])
for names in scope_iter:
for p in non_default_providers:
if p.name in names:
providers.append(p)
for p in default_providers:
if p.name is _TRANSIENT_PROVIDER:
results = self._transients
else:
try:
results = self._get(p.name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS,
states.EXECUTE)
except exceptions.NotFound:
results = {}
try:
_item_from_single(p, results, name)
except exceptions.NotFound:
pass
else:
providers.append(p)
return providers
source, _clone = self._atomdetail_by_name(atom_name)
if scope_walker is None:
scope_walker = self._scope_fetcher(atom_name)
@ -849,6 +972,11 @@ class Storage(object):
source.meta.get(META_INJECTED, {}),
]
missing = set(six.iterkeys(args_mapping))
locator = _ProviderLocator(
self._transients, self._fetch_providers,
lambda atom_name:
self._get(atom_name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS, states.EXECUTE))
for (bound_name, name) in six.iteritems(args_mapping):
if LOG.isEnabledFor(logging.TRACE):
LOG.trace("Looking for %r <= %r for atom '%s'",
@ -863,8 +991,8 @@ class Storage(object):
continue
if name in source:
maybe_providers += 1
providers = _locate_providers(name, scope_walker=scope_walker)
maybe_providers += len(providers)
maybe_providers += len(
locator.find_potentials(name, scope_walker=scope_walker))
if maybe_providers:
LOG.trace("Atom '%s' will have %s potential providers"
" of %r <= %r", atom_name, maybe_providers,
@ -894,7 +1022,6 @@ class Storage(object):
atom_name=None, scope_walker=None,
optional_args=None):
"""Fetch ``execute`` arguments for an atom using its args mapping."""
def _extract_first_from(name, sources):
"""Extracts/returns first occurence of key in list of dicts."""
for i, source in enumerate(sources):
@ -903,49 +1030,6 @@ class Storage(object):
if name in source:
return (i, source[name])
raise KeyError(name)
def _get_results(looking_for, provider):
"""Gets the results saved for a given provider."""
try:
return self._get(provider.name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS,
states.EXECUTE)
except exceptions.NotFound:
exceptions.raise_with_cause(exceptions.NotFound,
"Expected to be able to find"
" output %r produced by %s but was"
" unable to get at that providers"
" results" % (looking_for,
provider))
def _locate_providers(looking_for, possible_providers,
scope_walker=None):
"""Finds the accessible providers."""
default_providers = []
for p in possible_providers:
if p.name is _TRANSIENT_PROVIDER:
default_providers.append((p, self._transients))
if p.name == self.injector_name:
default_providers.append((p, _get_results(looking_for, p)))
if default_providers:
return default_providers
if scope_walker is not None:
scope_iter = iter(scope_walker)
else:
scope_iter = iter([])
extractor = lambda p: p.name
for names in scope_iter:
# *Always* retain the scope ordering (if any matches
# happen); instead of retaining the possible provider match
# order (which isn't that important and may be different from
# the scope requested ordering).
providers = misc.look_for(names, possible_providers,
extractor=extractor)
if providers:
return [(p, _get_results(looking_for, p))
for p in providers]
return []
if optional_args is None:
optional_args = []
if atom_name:
@ -960,6 +1044,9 @@ class Storage(object):
injected_sources = []
if not args_mapping:
return {}
get_results = lambda atom_name: \
self._get(atom_name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS, states.EXECUTE)
mapped_args = {}
for (bound_name, name) in six.iteritems(args_mapping):
if LOG.isEnabledFor(logging.TRACE):
@ -969,8 +1056,8 @@ class Storage(object):
else:
LOG.trace("Looking for %r <= %r", bound_name, name)
try:
source_index, value = _extract_first_from(name,
injected_sources)
source_index, value = _extract_first_from(
name, injected_sources)
mapped_args[bound_name] = value
if LOG.isEnabledFor(logging.TRACE):
if source_index == 0:
@ -983,7 +1070,7 @@ class Storage(object):
" values)", bound_name, name, value)
except KeyError:
try:
possible_providers = self._reverse_mapping[name]
maybe_providers = self._reverse_mapping[name]
except KeyError:
if bound_name in optional_args:
LOG.trace("Argument %r is optional, skipping",
@ -992,15 +1079,18 @@ class Storage(object):
raise exceptions.NotFound("Name %r is not mapped as a"
" produced output by any"
" providers" % name)
# Reduce the possible providers to one that are allowed.
providers = _locate_providers(name, possible_providers,
scope_walker=scope_walker)
locator = _ProviderLocator(
self._transients,
functools.partial(self._fetch_providers,
providers=maybe_providers), get_results)
searched_providers, providers = locator.find(
name, scope_walker=scope_walker)
if not providers:
raise exceptions.NotFound(
"Mapped argument %r <= %r was not produced"
" by any accessible provider (%s possible"
" providers were scanned)"
% (bound_name, name, len(possible_providers)))
% (bound_name, name, len(searched_providers)))
provider, value = _item_from_first_of(providers, name)
mapped_args[bound_name] = value
LOG.trace("Matched %r <= %r to %r (from %s)",

View File

@ -25,12 +25,27 @@ from taskflow import test
from taskflow.tests import utils as test_utils
def _replicate_graph_with_names(compilation):
# Turn a graph of nodes into a graph of names only so that
# testing can use those names instead of having to use the exact
# node objects themselves (which is problematic for any end nodes that
# are added into the graph *dynamically*, and are not there in the
# original/source flow).
g = compilation.execution_graph
n_g = g.__class__(name=g.name)
for node, node_data in g.nodes_iter(data=True):
n_g.add_node(node.name, attr_dict=node_data)
for u, v, u_v_data in g.edges_iter(data=True):
n_g.add_edge(u.name, v.name, attr_dict=u_v_data)
return n_g
class PatternCompileTest(test.TestCase):
def test_task(self):
task = test_utils.DummyTask(name='a')
compilation = compiler.PatternCompiler(task).compile()
g = compilation.execution_graph
self.assertEqual([task], list(g.nodes()))
g = _replicate_graph_with_names(
compiler.PatternCompiler(task).compile())
self.assertEqual(['a'], list(g.nodes()))
self.assertEqual([], list(g.edges()))
def test_retry(self):
@ -54,19 +69,20 @@ class PatternCompileTest(test.TestCase):
inner_flo.add(d)
flo.add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(6, len(g))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(8, len(g))
order = g.topological_sort()
self.assertEqual([flo, a, b, c, inner_flo, d], order)
self.assertTrue(g.has_edge(c, inner_flo))
self.assertTrue(g.has_edge(inner_flo, d))
self.assertEqual(['test', 'a', 'b', 'c',
"sub-test", 'd', "sub-test[$]",
'test[$]'], order)
self.assertTrue(g.has_edge('c', "sub-test"))
self.assertTrue(g.has_edge("sub-test", 'd'))
self.assertEqual({'invariant': True},
g.get_edge_data(inner_flo, d))
self.assertEqual([d], list(g.no_successors_iter()))
self.assertEqual([flo], list(g.no_predecessors_iter()))
g.get_edge_data("sub-test", 'd'))
self.assertEqual(['test[$]'], list(g.no_successors_iter()))
self.assertEqual(['test'], list(g.no_predecessors_iter()))
def test_invalid(self):
a, b, c = test_utils.make_many(3)
@ -80,19 +96,21 @@ class PatternCompileTest(test.TestCase):
a, b, c, d = test_utils.make_many(4)
flo = uf.Flow("test")
flo.add(a, b, c, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(5, len(g))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(), [
(flo, a),
(flo, b),
(flo, c),
(flo, d),
('test', 'a'),
('test', 'b'),
('test', 'c'),
('test', 'd'),
('a', 'test[$]'),
('b', 'test[$]'),
('c', 'test[$]'),
('d', 'test[$]'),
])
self.assertEqual(set([a, b, c, d]),
set(g.no_successors_iter()))
self.assertEqual(set([flo]),
set(g.no_predecessors_iter()))
self.assertEqual(set(['test']), set(g.no_predecessors_iter()))
def test_linear_nested(self):
a, b, c, d = test_utils.make_many(4)
@ -102,22 +120,22 @@ class PatternCompileTest(test.TestCase):
inner_flo.add(c, d)
flo.add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
graph = compilation.execution_graph
self.assertEqual(6, len(graph))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(8, len(g))
lb = graph.subgraph([a, b])
self.assertFalse(lb.has_edge(b, a))
self.assertTrue(lb.has_edge(a, b))
self.assertEqual({'invariant': True}, graph.get_edge_data(a, b))
sub_g = g.subgraph(['a', 'b'])
self.assertFalse(sub_g.has_edge('b', 'a'))
self.assertTrue(sub_g.has_edge('a', 'b'))
self.assertEqual({'invariant': True}, sub_g.get_edge_data("a", "b"))
ub = graph.subgraph([c, d])
self.assertEqual(0, ub.number_of_edges())
sub_g = g.subgraph(['c', 'd'])
self.assertEqual(0, sub_g.number_of_edges())
# This ensures that c and d do not start executing until after b.
self.assertTrue(graph.has_edge(b, inner_flo))
self.assertTrue(graph.has_edge(inner_flo, c))
self.assertTrue(graph.has_edge(inner_flo, d))
self.assertTrue(g.has_edge('b', 'test2'))
self.assertTrue(g.has_edge('test2', 'c'))
self.assertTrue(g.has_edge('test2', 'd'))
def test_unordered_nested(self):
a, b, c, d = test_utils.make_many(4)
@ -127,15 +145,19 @@ class PatternCompileTest(test.TestCase):
flo2.add(c, d)
flo.add(flo2)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(6, len(g))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(8, len(g))
self.assertItemsEqual(g.edges(), [
(flo, a),
(flo, b),
(flo, flo2),
(flo2, c),
(c, d)
('test', 'a'),
('test', 'b'),
('test', 'test2'),
('test2', 'c'),
('c', 'd'),
('d', 'test2[$]'),
('test2[$]', 'test[$]'),
('a', 'test[$]'),
('b', 'test[$]'),
])
def test_unordered_nested_in_linear(self):
@ -143,27 +165,27 @@ class PatternCompileTest(test.TestCase):
inner_flo = uf.Flow('ut').add(b, c)
flo = lf.Flow('lt').add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(6, len(g))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(8, len(g))
self.assertItemsEqual(g.edges(), [
(flo, a),
(a, inner_flo),
(inner_flo, b),
(inner_flo, c),
(b, d),
(c, d),
('lt', 'a'),
('a', 'ut'),
('ut', 'b'),
('ut', 'c'),
('b', 'ut[$]'),
('c', 'ut[$]'),
('ut[$]', 'd'),
('d', 'lt[$]'),
])
def test_graph(self):
a, b, c, d = test_utils.make_many(4)
flo = gf.Flow("test")
flo.add(a, b, c, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(5, len(g))
self.assertEqual(4, g.number_of_edges())
self.assertEqual(6, len(compilation.execution_graph))
self.assertEqual(8, compilation.execution_graph.number_of_edges())
def test_graph_nested(self):
a, b, c, d, e, f, g = test_utils.make_many(7)
@ -174,19 +196,26 @@ class PatternCompileTest(test.TestCase):
flo2.add(e, f, g)
flo.add(flo2)
compilation = compiler.PatternCompiler(flo).compile()
graph = compilation.execution_graph
self.assertEqual(9, len(graph))
self.assertItemsEqual(graph.edges(), [
(flo, a),
(flo, b),
(flo, c),
(flo, d),
(flo, flo2),
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(11, len(g))
self.assertItemsEqual(g.edges(), [
('test', 'a'),
('test', 'b'),
('test', 'c'),
('test', 'd'),
('a', 'test[$]'),
('b', 'test[$]'),
('c', 'test[$]'),
('d', 'test[$]'),
(flo2, e),
(e, f),
(f, g),
('test', 'test2'),
('test2', 'e'),
('e', 'f'),
('f', 'g'),
('g', 'test2[$]'),
('test2[$]', 'test[$]'),
])
def test_graph_nested_graph(self):
@ -198,19 +227,29 @@ class PatternCompileTest(test.TestCase):
flo2.add(e, f, g)
flo.add(flo2)
compilation = compiler.PatternCompiler(flo).compile()
graph = compilation.execution_graph
self.assertEqual(9, len(graph))
self.assertItemsEqual(graph.edges(), [
(flo, a),
(flo, b),
(flo, c),
(flo, d),
(flo, flo2),
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(11, len(g))
self.assertItemsEqual(g.edges(), [
('test', 'a'),
('test', 'b'),
('test', 'c'),
('test', 'd'),
('test', 'test2'),
(flo2, e),
(flo2, f),
(flo2, g),
('test2', 'e'),
('test2', 'f'),
('test2', 'g'),
('e', 'test2[$]'),
('f', 'test2[$]'),
('g', 'test2[$]'),
('test2[$]', 'test[$]'),
('a', 'test[$]'),
('b', 'test[$]'),
('c', 'test[$]'),
('d', 'test[$]'),
])
def test_graph_links(self):
@ -221,33 +260,34 @@ class PatternCompileTest(test.TestCase):
flo.link(b, c)
flo.link(c, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(5, len(g))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(data=True), [
(flo, a, {'invariant': True}),
(a, b, {'manual': True}),
(b, c, {'manual': True}),
(c, d, {'manual': True}),
('test', 'a', {'invariant': True}),
('a', 'b', {'manual': True}),
('b', 'c', {'manual': True}),
('c', 'd', {'manual': True}),
('d', 'test[$]', {'invariant': True}),
])
self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([d], g.no_successors_iter())
self.assertItemsEqual(['test'], g.no_predecessors_iter())
self.assertItemsEqual(['test[$]'], g.no_successors_iter())
def test_graph_dependencies(self):
a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=['x'])
flo = gf.Flow("test").add(a, b)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(3, len(g))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
(flo, a, {'invariant': True}),
(a, b, {'reasons': set(['x'])})
('test', 'a', {'invariant': True}),
('a', 'b', {'reasons': set(['x'])}),
('b', 'test[$]', {'invariant': True}),
])
self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([b], g.no_successors_iter())
self.assertItemsEqual(['test'], g.no_predecessors_iter())
self.assertItemsEqual(['test[$]'], g.no_successors_iter())
def test_graph_nested_requires(self):
a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
@ -256,17 +296,19 @@ class PatternCompileTest(test.TestCase):
inner_flo = lf.Flow("test2").add(b, c)
flo = gf.Flow("test").add(a, inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
graph = compilation.execution_graph
self.assertEqual(5, len(graph))
self.assertItemsEqual(graph.edges(data=True), [
(flo, a, {'invariant': True}),
(inner_flo, b, {'invariant': True}),
(a, inner_flo, {'reasons': set(['x'])}),
(b, c, {'invariant': True}),
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(7, len(g))
self.assertItemsEqual(g.edges(data=True), [
('test', 'a', {'invariant': True}),
('test2', 'b', {'invariant': True}),
('a', 'test2', {'reasons': set(['x'])}),
('b', 'c', {'invariant': True}),
('c', 'test2[$]', {'invariant': True}),
('test2[$]', 'test[$]', {'invariant': True}),
])
self.assertItemsEqual([flo], graph.no_predecessors_iter())
self.assertItemsEqual([c], graph.no_successors_iter())
self.assertItemsEqual(['test'], list(g.no_predecessors_iter()))
self.assertItemsEqual(['test[$]'], list(g.no_successors_iter()))
def test_graph_nested_provides(self):
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=['x'])
@ -275,18 +317,22 @@ class PatternCompileTest(test.TestCase):
inner_flo = lf.Flow("test2").add(b, c)
flo = gf.Flow("test").add(a, inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
graph = compilation.execution_graph
self.assertEqual(5, len(graph))
self.assertItemsEqual(graph.edges(data=True), [
(flo, inner_flo, {'invariant': True}),
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(7, len(g))
self.assertItemsEqual(g.edges(data=True), [
('test', 'test2', {'invariant': True}),
('a', 'test[$]', {'invariant': True}),
(inner_flo, b, {'invariant': True}),
(b, c, {'invariant': True}),
(c, a, {'reasons': set(['x'])}),
# The 'x' requirement is produced out of test2...
('test2[$]', 'a', {'reasons': set(['x'])}),
('test2', 'b', {'invariant': True}),
('b', 'c', {'invariant': True}),
('c', 'test2[$]', {'invariant': True}),
])
self.assertItemsEqual([flo], graph.no_predecessors_iter())
self.assertItemsEqual([a], graph.no_successors_iter())
self.assertItemsEqual(['test'], g.no_predecessors_iter())
self.assertItemsEqual(['test[$]'], g.no_successors_iter())
def test_empty_flow_in_linear_flow(self):
flo = lf.Flow('lf')
@ -295,12 +341,14 @@ class PatternCompileTest(test.TestCase):
empty_flo = gf.Flow("empty")
flo.add(a, empty_flo, b)
compilation = compiler.PatternCompiler(flo).compile()
graph = compilation.execution_graph
self.assertItemsEqual(graph.edges(), [
(flo, a),
(a, empty_flo),
(empty_flo, b),
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertItemsEqual(g.edges(), [
("lf", "a"),
("a", "empty"),
("empty", "empty[$]"),
("empty[$]", "b"),
("b", "lf[$]"),
])
def test_many_empty_in_graph_flow(self):
@ -331,22 +379,24 @@ class PatternCompileTest(test.TestCase):
flo.link(a, d)
flo.link(c, d)
compilation = compiler.PatternCompiler(flo).compile()
graph = compilation.execution_graph
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertTrue(graph.has_edge(flo, a))
self.assertTrue(g.has_edge('root', 'a'))
self.assertTrue(g.has_edge('root', 'b'))
self.assertTrue(g.has_edge('root', 'c'))
self.assertTrue(graph.has_edge(flo, b))
self.assertTrue(graph.has_edge(b_0, b_1))
self.assertTrue(graph.has_edge(b_1, b_2))
self.assertTrue(graph.has_edge(b_2, b_3))
self.assertTrue(g.has_edge('b.0', 'b.1'))
self.assertTrue(g.has_edge('b.1[$]', 'b.2'))
self.assertTrue(g.has_edge('b.2[$]', 'b.3'))
self.assertTrue(graph.has_edge(flo, c))
self.assertTrue(graph.has_edge(c_0, c_1))
self.assertTrue(graph.has_edge(c_1, c_2))
self.assertTrue(g.has_edge('c.0[$]', 'c.1'))
self.assertTrue(g.has_edge('c.1[$]', 'c.2'))
self.assertTrue(graph.has_edge(b_3, d))
self.assertEqual(12, len(graph))
self.assertTrue(g.has_edge('a', 'd'))
self.assertTrue(g.has_edge('b[$]', 'd'))
self.assertTrue(g.has_edge('c[$]', 'd'))
self.assertEqual(20, len(g))
def test_empty_flow_in_nested_flow(self):
flow = lf.Flow('lf')
@ -360,13 +410,13 @@ class PatternCompileTest(test.TestCase):
flow2.add(c, empty_flow, d)
flow.add(a, flow2, b)
compilation = compiler.PatternCompiler(flow).compile()
g = compilation.execution_graph
for source, target in [(flow, a), (a, flow2),
(flow2, c), (c, empty_flow),
(empty_flow, d), (d, b)]:
self.assertTrue(g.has_edge(source, target))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flow).compile())
for u, v in [('lf', 'a'), ('a', 'lf-2'),
('lf-2', 'c'), ('c', 'empty'),
('empty[$]', 'd'), ('d', 'lf-2[$]'),
('lf-2[$]', 'b'), ('b', 'lf[$]')]:
self.assertTrue(g.has_edge(u, v))
def test_empty_flow_in_graph_flow(self):
flow = lf.Flow('lf')
@ -379,7 +429,14 @@ class PatternCompileTest(test.TestCase):
g = compilation.execution_graph
self.assertTrue(g.has_edge(flow, a))
self.assertTrue(g.has_edge(a, empty_flow))
self.assertTrue(g.has_edge(empty_flow, b))
empty_flow_successors = g.successors(empty_flow)
self.assertEqual(1, len(empty_flow_successors))
empty_flow_terminal = empty_flow_successors[0]
self.assertIs(empty_flow, empty_flow_terminal.flow)
self.assertEqual(compiler.FLOW_END,
g.node[empty_flow_terminal]['kind'])
self.assertTrue(g.has_edge(empty_flow_terminal, b))
def test_empty_flow_in_graph_flow_linkage(self):
flow = gf.Flow('lf')
@ -417,146 +474,154 @@ class PatternCompileTest(test.TestCase):
def test_retry_in_linear_flow(self):
flo = lf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(2, len(g))
self.assertEqual(1, g.number_of_edges())
self.assertEqual(3, len(compilation.execution_graph))
self.assertEqual(2, compilation.execution_graph.number_of_edges())
def test_retry_in_unordered_flow(self):
flo = uf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(2, len(g))
self.assertEqual(1, g.number_of_edges())
self.assertEqual(3, len(compilation.execution_graph))
self.assertEqual(2, compilation.execution_graph.number_of_edges())
def test_retry_in_graph_flow(self):
flo = gf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(2, len(g))
self.assertEqual(1, g.number_of_edges())
self.assertEqual(3, len(g))
self.assertEqual(2, g.number_of_edges())
def test_retry_in_nested_flows(self):
c1 = retry.AlwaysRevert("c1")
c2 = retry.AlwaysRevert("c2")
inner_flo = lf.Flow("test2", c2)
flo = lf.Flow("test", c1).add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(data=True), [
(flo, c1, {'invariant': True}),
(c1, inner_flo, {'invariant': True, 'retry': True}),
(inner_flo, c2, {'invariant': True}),
('test', 'c1', {'invariant': True}),
('c1', 'test2', {'invariant': True, 'retry': True}),
('test2', 'c2', {'invariant': True}),
('c2', 'test2[$]', {'invariant': True}),
('test2[$]', 'test[$]', {'invariant': True}),
])
self.assertIs(c1, g.node[c2]['retry'])
self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([c2], g.no_successors_iter())
self.assertIs(c1, g.node['c2']['retry'])
self.assertItemsEqual(['test'], list(g.no_predecessors_iter()))
self.assertItemsEqual(['test[$]'], list(g.no_successors_iter()))
def test_retry_in_linear_flow_with_tasks(self):
c = retry.AlwaysRevert("c")
a, b = test_utils.make_many(2)
flo = lf.Flow("test", c).add(a, b)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [
(flo, c, {'invariant': True}),
(a, b, {'invariant': True}),
(c, a, {'invariant': True, 'retry': True})
('test', 'c', {'invariant': True}),
('a', 'b', {'invariant': True}),
('c', 'a', {'invariant': True, 'retry': True}),
('b', 'test[$]', {'invariant': True}),
])
self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([b], g.no_successors_iter())
self.assertIs(c, g.node[a]['retry'])
self.assertIs(c, g.node[b]['retry'])
self.assertItemsEqual(['test'], g.no_predecessors_iter())
self.assertItemsEqual(['test[$]'], g.no_successors_iter())
self.assertIs(c, g.node['a']['retry'])
self.assertIs(c, g.node['b']['retry'])
def test_retry_in_unordered_flow_with_tasks(self):
c = retry.AlwaysRevert("c")
a, b = test_utils.make_many(2)
flo = uf.Flow("test", c).add(a, b)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [
(flo, c, {'invariant': True}),
(c, a, {'invariant': True, 'retry': True}),
(c, b, {'invariant': True, 'retry': True}),
('test', 'c', {'invariant': True}),
('c', 'a', {'invariant': True, 'retry': True}),
('c', 'b', {'invariant': True, 'retry': True}),
('b', 'test[$]', {'invariant': True}),
('a', 'test[$]', {'invariant': True}),
])
self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([a, b], g.no_successors_iter())
self.assertIs(c, g.node[a]['retry'])
self.assertIs(c, g.node[b]['retry'])
self.assertItemsEqual(['test'], list(g.no_predecessors_iter()))
self.assertItemsEqual(['test[$]'], list(g.no_successors_iter()))
self.assertIs(c, g.node['a']['retry'])
self.assertIs(c, g.node['b']['retry'])
def test_retry_in_graph_flow_with_tasks(self):
r = retry.AlwaysRevert("cp")
r = retry.AlwaysRevert("r")
a, b, c = test_utils.make_many(3)
flo = gf.Flow("test", r).add(a, b, c).link(b, c)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(5, len(g))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertItemsEqual(g.edges(data=True), [
(flo, r, {'invariant': True}),
(r, a, {'invariant': True, 'retry': True}),
(r, b, {'invariant': True, 'retry': True}),
(b, c, {'manual': True})
('test', 'r', {'invariant': True}),
('r', 'a', {'invariant': True, 'retry': True}),
('r', 'b', {'invariant': True, 'retry': True}),
('b', 'c', {'manual': True}),
('a', 'test[$]', {'invariant': True}),
('c', 'test[$]', {'invariant': True}),
])
self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([a, c], g.no_successors_iter())
self.assertIs(r, g.node[a]['retry'])
self.assertIs(r, g.node[b]['retry'])
self.assertIs(r, g.node[c]['retry'])
self.assertItemsEqual(['test'], g.no_predecessors_iter())
self.assertItemsEqual(['test[$]'], g.no_successors_iter())
self.assertIs(r, g.node['a']['retry'])
self.assertIs(r, g.node['b']['retry'])
self.assertIs(r, g.node['c']['retry'])
def test_retries_hierarchy(self):
c1 = retry.AlwaysRevert("cp1")
c2 = retry.AlwaysRevert("cp2")
c1 = retry.AlwaysRevert("c1")
c2 = retry.AlwaysRevert("c2")
a, b, c, d = test_utils.make_many(4)
inner_flo = lf.Flow("test", c2).add(b, c)
inner_flo = lf.Flow("test2", c2).add(b, c)
flo = lf.Flow("test", c1).add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(8, len(g))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(10, len(g))
self.assertItemsEqual(g.edges(data=True), [
(flo, c1, {'invariant': True}),
(c1, a, {'invariant': True, 'retry': True}),
(a, inner_flo, {'invariant': True}),
(inner_flo, c2, {'invariant': True}),
(c2, b, {'invariant': True, 'retry': True}),
(b, c, {'invariant': True}),
(c, d, {'invariant': True}),
('test', 'c1', {'invariant': True}),
('c1', 'a', {'invariant': True, 'retry': True}),
('a', 'test2', {'invariant': True}),
('test2', 'c2', {'invariant': True}),
('c2', 'b', {'invariant': True, 'retry': True}),
('b', 'c', {'invariant': True}),
('c', 'test2[$]', {'invariant': True}),
('test2[$]', 'd', {'invariant': True}),
('d', 'test[$]', {'invariant': True}),
])
self.assertIs(c1, g.node[a]['retry'])
self.assertIs(c1, g.node[d]['retry'])
self.assertIs(c2, g.node[b]['retry'])
self.assertIs(c2, g.node[c]['retry'])
self.assertIs(c1, g.node[c2]['retry'])
self.assertIs(None, g.node[c1].get('retry'))
self.assertIs(c1, g.node['a']['retry'])
self.assertIs(c1, g.node['d']['retry'])
self.assertIs(c2, g.node['b']['retry'])
self.assertIs(c2, g.node['c']['retry'])
self.assertIs(c1, g.node['c2']['retry'])
self.assertIs(None, g.node['c1'].get('retry'))
def test_retry_subflows_hierarchy(self):
c1 = retry.AlwaysRevert("cp1")
c1 = retry.AlwaysRevert("c1")
a, b, c, d = test_utils.make_many(4)
inner_flo = lf.Flow("test").add(b, c)
inner_flo = lf.Flow("test2").add(b, c)
flo = lf.Flow("test", c1).add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(7, len(g))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(9, len(g))
self.assertItemsEqual(g.edges(data=True), [
(flo, c1, {'invariant': True}),
(c1, a, {'invariant': True, 'retry': True}),
(a, inner_flo, {'invariant': True}),
(inner_flo, b, {'invariant': True}),
(b, c, {'invariant': True}),
(c, d, {'invariant': True}),
('test', 'c1', {'invariant': True}),
('c1', 'a', {'invariant': True, 'retry': True}),
('a', 'test2', {'invariant': True}),
('test2', 'b', {'invariant': True}),
('b', 'c', {'invariant': True}),
('c', 'test2[$]', {'invariant': True}),
('test2[$]', 'd', {'invariant': True}),
('d', 'test[$]', {'invariant': True}),
])
self.assertIs(c1, g.node[a]['retry'])
self.assertIs(c1, g.node[d]['retry'])
self.assertIs(c1, g.node[b]['retry'])
self.assertIs(c1, g.node[c]['retry'])
self.assertIs(None, g.node[c1].get('retry'))
self.assertIs(c1, g.node['a']['retry'])
self.assertIs(c1, g.node['d']['retry'])
self.assertIs(c1, g.node['b']['retry'])
self.assertIs(c1, g.node['c']['retry'])
self.assertIs(None, g.node['c1'].get('retry'))

View File

@ -26,6 +26,16 @@ def _task(name, provides=None, requires=None):
class GraphFlowTest(test.TestCase):
def test_invalid_decider_depth(self):
g_1 = utils.ProgressingTask(name='g-1')
g_2 = utils.ProgressingTask(name='g-2')
for not_a_depth in ['not-a-depth', object(), 2, 3.4, False]:
flow = gf.Flow('g')
flow.add(g_1, g_2)
self.assertRaises((ValueError, TypeError),
flow.link, g_1, g_2,
decider=lambda history: False,
decider_depth=not_a_depth)
def test_graph_flow_stringy(self):
f = gf.Flow('test')

View File

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import deciders
from taskflow import test
class TestDeciders(test.TestCase):
def test_translate(self):
for val in ['all', 'ALL', 'aLL', deciders.Depth.ALL]:
self.assertEqual(deciders.Depth.ALL,
deciders.Depth.translate(val))
for val in ['atom', 'ATOM', 'atOM', deciders.Depth.ATOM]:
self.assertEqual(deciders.Depth.ATOM,
deciders.Depth.translate(val))
for val in ['neighbors', 'Neighbors',
'NEIGHBORS', deciders.Depth.NEIGHBORS]:
self.assertEqual(deciders.Depth.NEIGHBORS,
deciders.Depth.translate(val))
for val in ['flow', 'FLOW', 'flOW', deciders.Depth.FLOW]:
self.assertEqual(deciders.Depth.FLOW,
deciders.Depth.translate(val))
def test_bad_translate(self):
self.assertRaises(TypeError, deciders.Depth.translate, 3)
self.assertRaises(TypeError, deciders.Depth.translate, object())
self.assertRaises(ValueError, deciders.Depth.translate, "stuff")
def test_pick_widest(self):
choices = [deciders.Depth.ATOM, deciders.Depth.FLOW]
self.assertEqual(deciders.Depth.FLOW, deciders.pick_widest(choices))
choices = [deciders.Depth.ATOM, deciders.Depth.FLOW,
deciders.Depth.ALL]
self.assertEqual(deciders.Depth.ALL, deciders.pick_widest(choices))
choices = [deciders.Depth.ATOM, deciders.Depth.FLOW,
deciders.Depth.ALL, deciders.Depth.NEIGHBORS]
self.assertEqual(deciders.Depth.ALL, deciders.pick_widest(choices))
choices = [deciders.Depth.ATOM, deciders.Depth.NEIGHBORS]
self.assertEqual(deciders.Depth.NEIGHBORS,
deciders.pick_widest(choices))
def test_bad_pick_widest(self):
self.assertRaises(ValueError, deciders.pick_widest, [])
self.assertRaises(ValueError, deciders.pick_widest, ["a"])
self.assertRaises(ValueError, deciders.pick_widest, set(['b']))

View File

@ -669,6 +669,94 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
self.assertNotIn('task2.t REVERTED(None)', capturer.values)
class EngineDeciderDepthTest(utils.EngineTestBase):
def test_run_graph_flow_decider_various_depths(self):
sub_flow_1 = gf.Flow('g_1')
g_1_1 = utils.ProgressingTask(name='g_1-1')
sub_flow_1.add(g_1_1)
g_1 = utils.ProgressingTask(name='g-1')
g_2 = utils.ProgressingTask(name='g-2')
g_3 = utils.ProgressingTask(name='g-3')
g_4 = utils.ProgressingTask(name='g-4')
for a_depth, ran_how_many in [('all', 1),
('atom', 4),
('flow', 2),
('neighbors', 3)]:
flow = gf.Flow('g')
flow.add(g_1, g_2, sub_flow_1, g_3, g_4)
flow.link(g_1, g_2,
decider=lambda history: False,
decider_depth=a_depth)
flow.link(g_2, sub_flow_1)
flow.link(g_2, g_3)
flow.link(g_3, g_4)
flow.link(g_1, sub_flow_1,
decider=lambda history: True,
decider_depth=a_depth)
e = self._make_engine(flow)
with utils.CaptureListener(e, capture_flow=False) as capturer:
e.run()
ran_tasks = 0
for outcome in capturer.values:
if outcome.endswith("RUNNING"):
ran_tasks += 1
self.assertEqual(ran_how_many, ran_tasks)
def test_run_graph_flow_decider_jump_over_atom(self):
flow = gf.Flow('g')
a = utils.AddOneSameProvidesRequires("a", inject={'value': 0})
b = utils.AddOneSameProvidesRequires("b")
c = utils.AddOneSameProvidesRequires("c")
flow.add(a, b, c, resolve_requires=False)
flow.link(a, b, decider=lambda history: False,
decider_depth='atom')
flow.link(b, c)
e = self._make_engine(flow)
e.run()
self.assertEqual(2, e.storage.get('c'))
self.assertEqual(states.IGNORE, e.storage.get_atom_state('b'))
def test_run_graph_flow_decider_jump_over_bad_atom(self):
flow = gf.Flow('g')
a = utils.NoopTask("a")
b = utils.FailingTask("b")
c = utils.NoopTask("c")
flow.add(a, b, c)
flow.link(a, b, decider=lambda history: False,
decider_depth='atom')
flow.link(b, c)
e = self._make_engine(flow)
e.run()
def test_run_graph_flow_decider_revert(self):
flow = gf.Flow('g')
a = utils.NoopTask("a")
b = utils.NoopTask("b")
c = utils.FailingTask("c")
flow.add(a, b, c)
flow.link(a, b, decider=lambda history: False,
decider_depth='atom')
flow.link(b, c)
e = self._make_engine(flow)
with utils.CaptureListener(e, capture_flow=False) as capturer:
# Wrapped failure here for WBE engine, make this better in
# the future, perhaps via a custom testtools matcher??
self.assertRaises((RuntimeError, exc.WrappedFailure), e.run)
expected = [
'a.t RUNNING',
'a.t SUCCESS(None)',
'b.t IGNORE',
'c.t RUNNING',
'c.t FAILURE(Failure: RuntimeError: Woot!)',
'c.t REVERTING',
'c.t REVERTED(None)',
'a.t REVERTING',
'a.t REVERTED(None)',
]
self.assertEqual(expected, capturer.values)
class EngineGraphFlowTest(utils.EngineTestBase):
def test_run_empty_graph_flow(self):
@ -1043,9 +1131,10 @@ class EngineGraphConditionalFlowTest(utils.EngineTestBase):
'task4.t IGNORE',
]
self.assertEqual(expected, capturer.values)
self.assertEqual(1, len(histories))
self.assertIn('task1', histories[0])
self.assertIn('task2', histories[0])
self.assertEqual(2, len(histories))
for i in range(0, 2):
self.assertIn('task1', histories[i])
self.assertIn('task2', histories[i])
def test_graph_flow_conditional(self):
flow = gf.Flow('root')
@ -1249,14 +1338,15 @@ class SerialEngineTest(EngineTaskTest,
EngineResetTests,
EngineGraphConditionalFlowTest,
EngineCheckingTaskTest,
EngineDeciderDepthTest,
test.TestCase):
def _make_engine(self, flow,
flow_detail=None, store=None):
flow_detail=None, store=None, **kwargs):
return taskflow.engines.load(flow,
flow_detail=flow_detail,
engine='serial',
backend=self.backend,
store=store)
store=store, **kwargs)
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
@ -1278,11 +1368,13 @@ class ParallelEngineWithThreadsTest(EngineTaskTest,
EngineMissingDepsTest,
EngineGraphConditionalFlowTest,
EngineCheckingTaskTest,
EngineDeciderDepthTest,
test.TestCase):
_EXECUTOR_WORKERS = 2
def _make_engine(self, flow,
flow_detail=None, executor=None, store=None):
flow_detail=None, executor=None, store=None,
**kwargs):
if executor is None:
executor = 'threads'
return taskflow.engines.load(flow, flow_detail=flow_detail,
@ -1290,7 +1382,8 @@ class ParallelEngineWithThreadsTest(EngineTaskTest,
executor=executor,
engine='parallel',
store=store,
max_workers=self._EXECUTOR_WORKERS)
max_workers=self._EXECUTOR_WORKERS,
**kwargs)
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
@ -1319,17 +1412,19 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
EngineMissingDepsTest,
EngineGraphConditionalFlowTest,
EngineCheckingTaskTest,
EngineDeciderDepthTest,
test.TestCase):
def _make_engine(self, flow,
flow_detail=None, executor=None, store=None):
flow_detail=None, executor=None, store=None,
**kwargs):
if executor is None:
executor = futurist.GreenThreadPoolExecutor()
self.addCleanup(executor.shutdown)
return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend, engine='parallel',
executor=executor,
store=store)
store=store, **kwargs)
class ParallelEngineWithProcessTest(EngineTaskTest,
@ -1342,6 +1437,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
EngineResetTests,
EngineMissingDepsTest,
EngineGraphConditionalFlowTest,
EngineDeciderDepthTest,
test.TestCase):
_EXECUTOR_WORKERS = 2
@ -1350,7 +1446,8 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
self.assertIsInstance(engine, eng.ParallelActionEngine)
def _make_engine(self, flow,
flow_detail=None, executor=None, store=None):
flow_detail=None, executor=None, store=None,
**kwargs):
if executor is None:
executor = 'processes'
return taskflow.engines.load(flow, flow_detail=flow_detail,
@ -1358,7 +1455,8 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
engine='parallel',
executor=executor,
store=store,
max_workers=self._EXECUTOR_WORKERS)
max_workers=self._EXECUTOR_WORKERS,
**kwargs)
class WorkerBasedEngineTest(EngineTaskTest,
@ -1371,6 +1469,7 @@ class WorkerBasedEngineTest(EngineTaskTest,
EngineResetTests,
EngineMissingDepsTest,
EngineGraphConditionalFlowTest,
EngineDeciderDepthTest,
test.TestCase):
def setUp(self):
super(WorkerBasedEngineTest, self).setUp()
@ -1415,10 +1514,11 @@ class WorkerBasedEngineTest(EngineTaskTest,
super(WorkerBasedEngineTest, self).tearDown()
def _make_engine(self, flow,
flow_detail=None, store=None):
flow_detail=None, store=None, **kwargs):
kwargs.update(self.engine_conf)
return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend,
store=store, **self.engine_conf)
store=store, **kwargs)
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)

View File

@ -244,24 +244,6 @@ class TestCountdownIter(test.TestCase):
self.assertRaises(ValueError, six.next, it)
class TestLookFor(test.TestCase):
def test_no_matches(self):
hay = [9, 10, 11]
self.assertEqual([], misc.look_for(hay, [1, 2, 3]))
def test_match_order(self):
hay = [6, 5, 4, 3, 2, 1]
priors = []
for i in range(0, 6):
priors.append(i + 1)
matches = misc.look_for(hay, priors)
self.assertGreater(0, len(matches))
self.assertIsSuperAndSubsequence(hay, matches)
hay = [10, 1, 15, 3, 5, 8, 44]
self.assertEqual([1, 15], misc.look_for(hay, [15, 1]))
self.assertEqual([10, 44], misc.look_for(hay, [44, 10]))
class TestMergeUri(test.TestCase):
def test_merge(self):
url = "http://www.yahoo.com/?a=b&c=d"

View File

@ -226,37 +226,6 @@ def parse_uri(uri):
return netutils.urlsplit(uri)
def look_for(haystack, needles, extractor=None):
"""Find items in haystack and returns matches found (in haystack order).
Given a list of items (the haystack) and a list of items to look for (the
needles) this will look for the needles in the haystack and returns
the found needles (if any). The ordering of the returned needles is in the
order they are located in the haystack.
Example input and output:
>>> from taskflow.utils import misc
>>> hay = [3, 2, 1]
>>> misc.look_for(hay, [1, 2])
[2, 1]
"""
if not haystack:
return []
if extractor is None:
extractor = lambda v: v
matches = []
for i, v in enumerate(needles):
try:
matches.append((haystack.index(extractor(v)), i))
except ValueError:
pass
if not matches:
return []
else:
return [needles[i] for (_hay_i, i) in sorted(matches)]
def disallow_when_frozen(excp_cls):
"""Frozen checking/raising method decorator."""