Allow for alterations in decider 'area of influence'

Christmas came early.

Closes-Bug: #1479466

Change-Id: I931d826690c925f022dbfffe9afb7bf41345b1d0
This commit is contained in:
Joshua Harlow 2015-11-16 16:27:42 -08:00 committed by Joshua Harlow
parent f555a35f30
commit 8e8156c488
23 changed files with 1452 additions and 732 deletions

View File

@ -448,11 +448,13 @@ Components
.. automodule:: taskflow.engines.action_engine.builder .. automodule:: taskflow.engines.action_engine.builder
.. automodule:: taskflow.engines.action_engine.compiler .. automodule:: taskflow.engines.action_engine.compiler
.. automodule:: taskflow.engines.action_engine.completer .. automodule:: taskflow.engines.action_engine.completer
.. automodule:: taskflow.engines.action_engine.deciders
.. automodule:: taskflow.engines.action_engine.executor .. automodule:: taskflow.engines.action_engine.executor
.. automodule:: taskflow.engines.action_engine.runtime .. automodule:: taskflow.engines.action_engine.runtime
.. automodule:: taskflow.engines.action_engine.scheduler .. automodule:: taskflow.engines.action_engine.scheduler
.. autoclass:: taskflow.engines.action_engine.scopes.ScopeWalker .. autoclass:: taskflow.engines.action_engine.scopes.ScopeWalker
:special-members: __iter__ :special-members: __iter__
.. automodule:: taskflow.engines.action_engine.traversal
Hierarchy Hierarchy
========= =========

View File

@ -21,6 +21,7 @@ Graph flow
~~~~~~~~~~ ~~~~~~~~~~
.. automodule:: taskflow.patterns.graph_flow .. automodule:: taskflow.patterns.graph_flow
.. automodule:: taskflow.deciders
Hierarchy Hierarchy
~~~~~~~~~ ~~~~~~~~~

99
taskflow/deciders.py Normal file
View File

@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from taskflow.utils import misc
class Depth(misc.StrEnum):
"""Enumeration of decider(s) *area of influence*."""
ALL = 'ALL'
"""
**Default** decider depth that affects **all** successor atoms (including
ones that are in successor nested flows).
"""
FLOW = 'FLOW'
"""
Decider depth that affects **all** successor tasks in the **same**
flow (it will **not** affect tasks/retries that are in successor
nested flows).
.. warning::
While using this kind we are allowed to execute successors of
things that have been ignored (for example nested flows and the
tasks they contain), this may result in symbol lookup errors during
running, user beware.
"""
NEIGHBORS = 'NEIGHBORS'
"""
Decider depth that affects only **next** successor tasks (and does
not traverse past **one** level of successor tasks).
.. warning::
While using this kind we are allowed to execute successors of
things that have been ignored (for example nested flows and the
tasks they contain), this may result in symbol lookup errors during
running, user beware.
"""
ATOM = 'ATOM'
"""
Decider depth that affects only **targeted** atom (and does
**not** traverse into **any** level of successor atoms).
.. warning::
While using this kind we are allowed to execute successors of
things that have been ignored (for example nested flows and the
tasks they contain), this may result in symbol lookup errors during
running, user beware.
"""
@classmethod
def translate(cls, desired_depth):
"""Translates a string into a depth enumeration."""
if isinstance(desired_depth, cls):
# Nothing to do in the first place...
return desired_depth
if not isinstance(desired_depth, six.string_types):
raise TypeError("Unexpected desired depth type, string type"
" expected, not %s" % type(desired_depth))
try:
return cls(desired_depth.upper())
except ValueError:
pretty_depths = sorted([a_depth.name for a_depth in cls])
raise ValueError("Unexpected decider depth value, one of"
" %s (case-insensitive) is expected and"
" not '%s'" % (pretty_depths, desired_depth))
# Depth area of influence order (from greater influence to least).
#
# Order very much matters here...
_ORDERING = tuple([
Depth.ALL, Depth.FLOW, Depth.NEIGHBORS, Depth.ATOM,
])
def pick_widest(depths):
"""Pick from many depths which has the **widest** area of influence."""
return _ORDERING[min(_ORDERING.index(d) for d in depths)]

View File

@ -14,108 +14,16 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import abc
import itertools
import operator import operator
import weakref import weakref
import six
from taskflow.engines.action_engine import compiler as co from taskflow.engines.action_engine import compiler as co
from taskflow.engines.action_engine import deciders
from taskflow.engines.action_engine import traversal
from taskflow import states as st from taskflow import states as st
from taskflow.utils import iter_utils from taskflow.utils import iter_utils
def _depth_first_iterate(graph, connected_to_functors, initial_nodes_iter):
"""Iterates connected nodes in execution graph (from starting set).
Jumps over nodes with ``noop`` attribute (does not yield them back).
"""
stack = list(initial_nodes_iter)
while stack:
node = stack.pop()
node_attrs = graph.node[node]
if not node_attrs.get('noop'):
yield node
try:
node_kind = node_attrs['kind']
connected_to_functor = connected_to_functors[node_kind]
except KeyError:
pass
else:
stack.extend(connected_to_functor(node))
@six.add_metaclass(abc.ABCMeta)
class Decider(object):
"""Base class for deciders.
Provides interface to be implemented by sub-classes
Decider checks whether next atom in flow should be executed or not
"""
@abc.abstractmethod
def check(self, runtime):
"""Returns bool of whether this decider should allow running."""
@abc.abstractmethod
def affect(self, runtime):
"""If the :py:func:`~.check` returns false, affects associated atoms.
"""
def check_and_affect(self, runtime):
"""Handles :py:func:`~.check` + :py:func:`~.affect` in right order."""
proceed = self.check(runtime)
if not proceed:
self.affect(runtime)
return proceed
class IgnoreDecider(Decider):
"""Checks any provided edge-deciders and determines if ok to run."""
def __init__(self, atom, edge_deciders):
self._atom = atom
self._edge_deciders = edge_deciders
def check(self, runtime):
"""Returns bool of whether this decider should allow running."""
# Gather all atoms results so that those results can be used
# by the decider(s) that are making a decision as to pass or
# not pass...
results = {}
for node, node_kind, _local_decider in self._edge_deciders:
if node_kind in co.ATOMS:
results[node.name] = runtime.storage.get(node.name)
for _node, _node_kind, local_decider in self._edge_deciders:
if not local_decider(history=results):
return False
return True
def affect(self, runtime):
"""If the :py:func:`~.check` returns false, affects associated atoms.
This will alter the associated atom + successor atoms by setting there
state to ``IGNORE`` so that they are ignored in future runtime
activities.
"""
successors_iter = runtime.analyzer.iterate_connected_atoms(self._atom)
runtime.reset_atoms(itertools.chain([self._atom], successors_iter),
state=st.IGNORE, intention=st.IGNORE)
class NoOpDecider(Decider):
"""No-op decider that says it is always ok to run & has no effect(s)."""
def check(self, runtime):
"""Always good to go."""
return True
def affect(self, runtime):
"""Does nothing."""
class Analyzer(object): class Analyzer(object):
"""Analyzes a compilation and aids in execution processes. """Analyzes a compilation and aids in execution processes.
@ -142,7 +50,7 @@ class Analyzer(object):
if state == st.SUCCESS: if state == st.SUCCESS:
if intention == st.REVERT: if intention == st.REVERT:
return iter([ return iter([
(atom, NoOpDecider()), (atom, deciders.NoOpDecider()),
]) ])
elif intention == st.EXECUTE: elif intention == st.EXECUTE:
return self.browse_atoms_for_execute(atom=atom) return self.browse_atoms_for_execute(atom=atom)
@ -165,10 +73,15 @@ class Analyzer(object):
if atom is None: if atom is None:
atom_it = self.iterate_nodes(co.ATOMS) atom_it = self.iterate_nodes(co.ATOMS)
else: else:
successors_iter = self._execution_graph.successors_iter # NOTE(harlowja): the reason this uses breadth first is so that
atom_it = _depth_first_iterate(self._execution_graph, # when deciders are applied that those deciders can be applied
{co.FLOW: successors_iter}, # from top levels to lower levels since lower levels *may* be
successors_iter(atom)) # able to run even if top levels have deciders that decide to
# ignore some atoms... (going deeper first would make this
# problematic to determine as top levels can have their deciders
# applied **after** going deeper).
atom_it = traversal.breadth_first_iterate(
self._execution_graph, atom, traversal.Direction.FORWARD)
for atom in atom_it: for atom in atom_it:
is_ready, late_decider = self._get_maybe_ready_for_execute(atom) is_ready, late_decider = self._get_maybe_ready_for_execute(atom)
if is_ready: if is_ready:
@ -185,18 +98,50 @@ class Analyzer(object):
if atom is None: if atom is None:
atom_it = self.iterate_nodes(co.ATOMS) atom_it = self.iterate_nodes(co.ATOMS)
else: else:
predecessors_iter = self._execution_graph.predecessors_iter atom_it = traversal.breadth_first_iterate(
atom_it = _depth_first_iterate(self._execution_graph, self._execution_graph, atom, traversal.Direction.BACKWARD,
{co.FLOW: predecessors_iter}, # Stop at the retry boundary (as retries 'control' there
predecessors_iter(atom)) # surronding atoms, and we don't want to back track over
# them so that they can correctly affect there associated
# atoms); we do though need to jump through all tasks since
# if a predecessor Y was ignored and a predecessor Z before Y
# was not it should be eligible to now revert...
through_retries=False)
for atom in atom_it: for atom in atom_it:
is_ready, late_decider = self._get_maybe_ready_for_revert(atom) is_ready, late_decider = self._get_maybe_ready_for_revert(atom)
if is_ready: if is_ready:
yield (atom, late_decider) yield (atom, late_decider)
def _get_maybe_ready(self, atom, transition_to, allowed_intentions, def _get_maybe_ready(self, atom, transition_to, allowed_intentions,
connected_fetcher, connected_checker, connected_fetcher, ready_checker,
decider_fetcher): decider_fetcher):
def iter_connected_states():
# Lazily iterate over connected states so that ready checkers
# can stop early (vs having to consume and check all the
# things...)
for atom in connected_fetcher():
# TODO(harlowja): make this storage api better, its not
# especially clear what the following is doing (mainly
# to avoid two calls into storage).
atom_states = self._storage.get_atoms_states([atom.name])
yield (atom, atom_states[atom.name])
# NOTE(harlowja): How this works is the following...
#
# 1. First check if the current atom can even transition to the
# desired state, if not this atom is definitely not ready to
# execute or revert.
# 2. Check if the actual atoms intention is in one of the desired/ok
# intentions, if it is not there we are still not ready to execute
# or revert.
# 3. Iterate over (atom, atom_state, atom_intention) for all the
# atoms the 'connected_fetcher' callback yields from underlying
# storage and direct that iterator into the 'ready_checker'
# callback, that callback should then iterate over these entries
# and determine if it is ok to execute or revert.
# 4. If (and only if) 'ready_checker' returns true, then
# the 'decider_fetcher' callback is called to get a late decider
# which can (if it desires) affect this ready result (but does
# so right before the atom is about to be scheduled).
state = self._storage.get_atom_state(atom.name) state = self._storage.get_atom_state(atom.name)
ok_to_transition = self._runtime.check_atom_transition(atom, state, ok_to_transition = self._runtime.check_atom_transition(atom, state,
transition_to) transition_to)
@ -205,59 +150,62 @@ class Analyzer(object):
intention = self._storage.get_atom_intention(atom.name) intention = self._storage.get_atom_intention(atom.name)
if intention not in allowed_intentions: if intention not in allowed_intentions:
return (False, None) return (False, None)
connected_states = self._storage.get_atoms_states( ok_to_run = ready_checker(iter_connected_states())
connected_atom.name for connected_atom in connected_fetcher(atom))
ok_to_run = connected_checker(six.itervalues(connected_states))
if not ok_to_run: if not ok_to_run:
return (False, None) return (False, None)
else: else:
return (True, decider_fetcher(atom)) return (True, decider_fetcher())
def _get_maybe_ready_for_execute(self, atom): def _get_maybe_ready_for_execute(self, atom):
"""Returns if an atom is *likely* ready to be executed.""" """Returns if an atom is *likely* ready to be executed."""
def decider_fetcher(atom): def ready_checker(pred_connected_it):
edge_deciders = self._runtime.fetch_edge_deciders(atom) for _atom, (atom_state, atom_intention) in pred_connected_it:
if edge_deciders: if (atom_state in (st.SUCCESS, st.IGNORE) and
return IgnoreDecider(atom, edge_deciders) atom_intention in (st.EXECUTE, st.IGNORE)):
else: continue
return NoOpDecider() return False
predecessors_iter = self._execution_graph.predecessors_iter return True
connected_fetcher = lambda atom: \ decider_fetcher = lambda: \
_depth_first_iterate(self._execution_graph, deciders.IgnoreDecider(
{co.FLOW: predecessors_iter}, atom, self._runtime.fetch_edge_deciders(atom))
predecessors_iter(atom)) connected_fetcher = lambda: \
connected_checker = lambda connected_iter: \ traversal.depth_first_iterate(self._execution_graph, atom,
all(state == st.SUCCESS and intention == st.EXECUTE # Whether the desired atom
for state, intention in connected_iter) # can execute is dependent on its
# predecessors outcomes (thus why
# we look backwards).
traversal.Direction.BACKWARD)
# If this atoms current state is able to be transitioned to RUNNING
# and its intention is to EXECUTE and all of its predecessors executed
# successfully or were ignored then this atom is ready to execute.
return self._get_maybe_ready(atom, st.RUNNING, [st.EXECUTE], return self._get_maybe_ready(atom, st.RUNNING, [st.EXECUTE],
connected_fetcher, connected_checker, connected_fetcher, ready_checker,
decider_fetcher) decider_fetcher)
def _get_maybe_ready_for_revert(self, atom): def _get_maybe_ready_for_revert(self, atom):
"""Returns if an atom is *likely* ready to be reverted.""" """Returns if an atom is *likely* ready to be reverted."""
successors_iter = self._execution_graph.successors_iter def ready_checker(succ_connected_it):
connected_fetcher = lambda atom: \ for _atom, (atom_state, _atom_intention) in succ_connected_it:
_depth_first_iterate(self._execution_graph, if atom_state not in (st.PENDING, st.REVERTED, st.IGNORE):
{co.FLOW: successors_iter}, return False
successors_iter(atom)) return True
connected_checker = lambda connected_iter: \ noop_decider = deciders.NoOpDecider()
all(state in (st.PENDING, st.REVERTED) connected_fetcher = lambda: \
for state, _intention in connected_iter) traversal.depth_first_iterate(self._execution_graph, atom,
decider_fetcher = lambda atom: NoOpDecider() # Whether the desired atom
# can revert is dependent on its
# successors states (thus why we
# look forwards).
traversal.Direction.FORWARD)
decider_fetcher = lambda: noop_decider
# If this atoms current state is able to be transitioned to REVERTING
# and its intention is either REVERT or RETRY and all of its
# successors are either PENDING or REVERTED then this atom is ready
# to revert.
return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY], return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY],
connected_fetcher, connected_checker, connected_fetcher, ready_checker,
decider_fetcher) decider_fetcher)
def iterate_connected_atoms(self, atom):
"""Iterates **all** successor atoms connected to given atom."""
successors_iter = self._execution_graph.successors_iter
return _depth_first_iterate(
self._execution_graph, {
co.FLOW: successors_iter,
co.TASK: successors_iter,
co.RETRY: successors_iter,
}, successors_iter(atom))
def iterate_retries(self, state=None): def iterate_retries(self, state=None):
"""Iterates retry atoms that match the provided state. """Iterates retry atoms that match the provided state.
@ -268,7 +216,8 @@ class Analyzer(object):
atom_states = self._storage.get_atoms_states(atom.name atom_states = self._storage.get_atoms_states(atom.name
for atom in atoms) for atom in atoms)
for atom in atoms: for atom in atoms:
if atom_states[atom.name][0] == state: atom_state, _atom_intention = atom_states[atom.name]
if atom_state == state:
yield atom yield atom
else: else:
for atom in self.iterate_nodes((co.RETRY,)): for atom in self.iterate_nodes((co.RETRY,)):
@ -290,7 +239,7 @@ class Analyzer(object):
atom_states = self._storage.get_atoms_states(atom.name atom_states = self._storage.get_atoms_states(atom.name
for atom in atoms) for atom in atoms)
for atom in atoms: for atom in atoms:
atom_state = atom_states[atom.name][0] atom_state, _atom_intention = atom_states[atom.name]
if atom_state == st.IGNORE: if atom_state == st.IGNORE:
continue continue
if atom_state != st.SUCCESS: if atom_state != st.SUCCESS:

View File

@ -223,11 +223,15 @@ class MachineBuilder(object):
atom, intention) atom, intention)
except Exception: except Exception:
memory.failures.append(failure.Failure()) memory.failures.append(failure.Failure())
LOG.exception("Engine '%s' atom post-completion"
" failed", atom)
else: else:
try: try:
more_work = set(iter_next_atoms(atom=atom)) more_work = set(iter_next_atoms(atom=atom))
except Exception: except Exception:
memory.failures.append(failure.Failure()) memory.failures.append(failure.Failure())
LOG.exception("Engine '%s' atom post-completion"
" next atom searching failed", atom)
else: else:
next_up.update(more_work) next_up.update(more_work)
if is_runnable() and next_up and not memory.failures: if is_runnable() and next_up and not memory.failures:

View File

@ -40,22 +40,55 @@ LOG = logging.getLogger(__name__)
TASK = 'task' TASK = 'task'
RETRY = 'retry' RETRY = 'retry'
FLOW = 'flow' FLOW = 'flow'
FLOW_END = 'flow_end'
# Quite often used together, so make a tuple everyone can share... # Quite often used together, so make a tuple everyone can share...
ATOMS = (TASK, RETRY) ATOMS = (TASK, RETRY)
FLOWS = (FLOW, FLOW_END)
class Terminator(object):
"""Flow terminator class."""
def __init__(self, flow):
self._flow = flow
self._name = "%s[$]" % (self._flow.name,)
@property
def flow(self):
"""The flow which this terminator signifies/marks the end of."""
return self._flow
@property
def name(self):
"""Useful name this end terminator has (derived from flow name)."""
return self._name
def __str__(self):
return "%s[$]" % (self._flow,)
class Compilation(object): class Compilation(object):
"""The result of a compilers compile() is this *immutable* object.""" """The result of a compilers ``compile()`` is this *immutable* object."""
#: Task nodes will have a ``kind`` attribute/metadata key with this value. #: Task nodes will have a ``kind`` metadata key with this value.
TASK = TASK TASK = TASK
#: Retry nodes will have a ``kind`` attribute/metadata key with this value. #: Retry nodes will have a ``kind`` metadata key with this value.
RETRY = RETRY RETRY = RETRY
#: Flow nodes will have a ``kind`` attribute/metadata key with this value.
FLOW = FLOW FLOW = FLOW
"""
Flow **entry** nodes will have a ``kind`` metadata key with
this value.
"""
FLOW_END = FLOW_END
"""
Flow **exit** nodes will have a ``kind`` metadata key with
this value (only applicable for compilation execution graph, not currently
used in tree hierarchy).
"""
def __init__(self, execution_graph, hierarchy): def __init__(self, execution_graph, hierarchy):
self._execution_graph = execution_graph self._execution_graph = execution_graph
@ -141,6 +174,8 @@ class FlowCompiler(object):
_add_update_edges(graph, u_graph.no_successors_iter(), _add_update_edges(graph, u_graph.no_successors_iter(),
list(v_graph.no_predecessors_iter()), list(v_graph.no_predecessors_iter()),
attr_dict=attr_dict) attr_dict=attr_dict)
# Insert the flow(s) retry if needed, and always make sure it
# is the **immediate** successor of the flow node itself.
if flow.retry is not None: if flow.retry is not None:
graph.add_node(flow.retry, kind=RETRY) graph.add_node(flow.retry, kind=RETRY)
_add_update_edges(graph, [flow], [flow.retry], _add_update_edges(graph, [flow], [flow.retry],
@ -149,20 +184,42 @@ class FlowCompiler(object):
if node is not flow.retry and node is not flow: if node is not flow.retry and node is not flow:
graph.node[node].setdefault(RETRY, flow.retry) graph.node[node].setdefault(RETRY, flow.retry)
from_nodes = [flow.retry] from_nodes = [flow.retry]
connected_attr_dict = {LINK_INVARIANT: True, LINK_RETRY: True} attr_dict = {LINK_INVARIANT: True, LINK_RETRY: True}
else: else:
from_nodes = [flow] from_nodes = [flow]
connected_attr_dict = {LINK_INVARIANT: True} attr_dict = {LINK_INVARIANT: True}
connected_to = [ # Ensure all nodes with no predecessors are connected to this flow
node for node in graph.no_predecessors_iter() if node is not flow # or its retry node (so that the invariant that the flow node is
] # traversed through before its contents is maintained); this allows
if connected_to: # us to easily know when we have entered a flow (when running) and
# Ensure all nodes in this graph(s) that have no # do special and/or smart things such as only traverse up to the
# predecessors depend on this flow (or this flow's retry) so that # start of a flow when looking for node deciders.
# we can depend on the flow being traversed before its _add_update_edges(graph, from_nodes, [
# children (even though at the current time it will be skipped). node for node in graph.no_predecessors_iter()
_add_update_edges(graph, from_nodes, connected_to, if node is not flow
attr_dict=connected_attr_dict) ], attr_dict=attr_dict)
# Connect all nodes with no successors into a special terminator
# that is used to identify the end of the flow and ensure that all
# execution traversals will traverse over this node before executing
# further work (this is especially useful for nesting and knowing
# when we have exited a nesting level); it allows us to do special
# and/or smart things such as applying deciders up to (but not
# beyond) a flow termination point.
#
# Do note that in a empty flow this will just connect itself to
# the flow node itself... and also note we can not use the flow
# object itself (primarily because the underlying graph library
# uses hashing to identify node uniqueness and we can easily create
# a loop if we don't do this correctly, so avoid that by just
# creating this special node and tagging it with a special kind); we
# may be able to make this better in the future with a multidigraph
# that networkx provides??
flow_term = Terminator(flow)
graph.add_node(flow_term, kind=FLOW_END, noop=True)
_add_update_edges(graph, [
node for node in graph.no_successors_iter()
if node is not flow_term
], [flow_term], attr_dict={LINK_INVARIANT: True})
return graph, tree_node return graph, tree_node

View File

@ -128,7 +128,7 @@ class Completer(object):
atom_states = self._storage.get_atoms_states(atom.name atom_states = self._storage.get_atoms_states(atom.name
for atom in atoms) for atom in atoms)
for atom in atoms: for atom in atoms:
atom_state = atom_states[atom.name][0] atom_state, _atom_intention = atom_states[atom.name]
if atom_state == st.FAILURE: if atom_state == st.FAILURE:
self._process_atom_failure(atom, self._storage.get(atom.name)) self._process_atom_failure(atom, self._storage.get(atom.name))
for retry in self._analyzer.iterate_retries(st.RETRYING): for retry in self._analyzer.iterate_retries(st.RETRYING):
@ -137,7 +137,7 @@ class Completer(object):
atom_states[atom.name] = (state, intention) atom_states[atom.name] = (state, intention)
unfinished_atoms = set() unfinished_atoms = set()
for atom in atoms: for atom in atoms:
atom_state = atom_states[atom.name][0] atom_state, _atom_intention = atom_states[atom.name]
if atom_state in (st.RUNNING, st.REVERTING): if atom_state in (st.RUNNING, st.REVERTING):
unfinished_atoms.add(atom) unfinished_atoms.add(atom)
return unfinished_atoms return unfinished_atoms

View File

@ -0,0 +1,162 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import itertools
import six
from taskflow import deciders
from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import traversal
from taskflow import states
@six.add_metaclass(abc.ABCMeta)
class Decider(object):
"""Base class for deciders.
Provides interface to be implemented by sub-classes.
Deciders check whether next atom in flow should be executed or not.
"""
@abc.abstractmethod
def tally(self, runtime):
"""Tally edge deciders on whether this decider should allow running.
The returned value is a list of edge deciders that voted
'nay' (do not allow running).
"""
@abc.abstractmethod
def affect(self, runtime, nay_voters):
"""Affects associated atoms due to at least one 'nay' edge decider.
This will alter the associated atom + some set of successor atoms by
setting there state and intention to ``IGNORE`` so that they are
ignored in future runtime activities.
"""
def check_and_affect(self, runtime):
"""Handles :py:func:`~.tally` + :py:func:`~.affect` in right order.
NOTE(harlowja): If there are zero 'nay' edge deciders then it is
assumed this decider should allow running.
Returns boolean of whether this decider allows for running (or not).
"""
nay_voters = self.tally(runtime)
if nay_voters:
self.affect(runtime, nay_voters)
return False
return True
def _affect_all_successors(atom, runtime):
execution_graph = runtime.compilation.execution_graph
successors_iter = traversal.depth_first_iterate(
execution_graph, atom, traversal.Direction.FORWARD)
runtime.reset_atoms(itertools.chain([atom], successors_iter),
state=states.IGNORE, intention=states.IGNORE)
def _affect_successor_tasks_in_same_flow(atom, runtime):
execution_graph = runtime.compilation.execution_graph
successors_iter = traversal.depth_first_iterate(
execution_graph, atom, traversal.Direction.FORWARD,
# Do not go through nested flows but do follow *all* tasks that
# are directly connected in this same flow (thus the reason this is
# called the same flow decider); retries are direct successors
# of flows, so they should also be not traversed through, but
# setting this explicitly ensures that.
through_flows=False, through_retries=False)
runtime.reset_atoms(itertools.chain([atom], successors_iter),
state=states.IGNORE, intention=states.IGNORE)
def _affect_atom(atom, runtime):
runtime.reset_atoms([atom], state=states.IGNORE, intention=states.IGNORE)
def _affect_direct_task_neighbors(atom, runtime):
def _walk_neighbors():
execution_graph = runtime.compilation.execution_graph
for node in execution_graph.successors_iter(atom):
node_data = execution_graph.node[node]
if node_data['kind'] == compiler.TASK:
yield node
successors_iter = _walk_neighbors()
runtime.reset_atoms(itertools.chain([atom], successors_iter),
state=states.IGNORE, intention=states.IGNORE)
class IgnoreDecider(Decider):
"""Checks any provided edge-deciders and determines if ok to run."""
_depth_strategies = {
deciders.Depth.ALL: _affect_all_successors,
deciders.Depth.ATOM: _affect_atom,
deciders.Depth.FLOW: _affect_successor_tasks_in_same_flow,
deciders.Depth.NEIGHBORS: _affect_direct_task_neighbors,
}
def __init__(self, atom, edge_deciders):
self._atom = atom
self._edge_deciders = edge_deciders
def tally(self, runtime):
if not self._edge_deciders:
return []
# Gather all atoms (the ones that were not ignored) results so that
# those results can be used
# by the decider(s) that are making a decision as to pass or
# not pass...
states_intentions = runtime.storage.get_atoms_states(
ed.from_node.name for ed in self._edge_deciders
if ed.kind in compiler.ATOMS)
history = {}
for atom_name in six.iterkeys(states_intentions):
atom_state, _atom_intention = states_intentions[atom_name]
if atom_state != states.IGNORE:
history[atom_name] = runtime.storage.get(atom_name)
nay_voters = []
for ed in self._edge_deciders:
if ed.kind in compiler.ATOMS and ed.from_node.name not in history:
continue
if not ed.decider(history=history):
nay_voters.append(ed)
return nay_voters
def affect(self, runtime, nay_voters):
# If there were many 'nay' edge deciders that were targeted
# at this atom, then we need to pick the one which has the widest
# impact and respect that one as the decider depth that will
# actually affect things.
widest_depth = deciders.pick_widest(ed.depth for ed in nay_voters)
affector = self._depth_strategies[widest_depth]
return affector(self._atom, runtime)
class NoOpDecider(Decider):
"""No-op decider that says it is always ok to run & has no effect(s)."""
def tally(self, runtime):
"""Always good to go."""
return []
def affect(self, runtime, nay_voters):
"""Does nothing."""

View File

@ -99,37 +99,37 @@ class ActionEngine(base.Engine):
**Engine options:** **Engine options:**
+-------------------+-----------------------+------+-----------+ +----------------------+-----------------------+------+------------+
| Name/key | Description | Type | Default | | Name/key | Description | Type | Default |
+===================+=======================+======+===========+ +======================+=======================+======+============+
| defer_reverts | This option lets you | bool | ``False`` | | ``defer_reverts`` | This option lets you | bool | ``False`` |
| | safely nest flows | | | | | safely nest flows | | |
| | with retries inside | | | | | with retries inside | | |
| | flows without retries | | | | | flows without retries | | |
| | and it still behaves | | | | | and it still behaves | | |
| | as a user would | | | | | as a user would | | |
| | expect (for example | | | | | expect (for example | | |
| | if the retry gets | | | | | if the retry gets | | |
| | exhausted it reverts | | | | | exhausted it reverts | | |
| | the outer flow unless | | | | | the outer flow unless | | |
| | the outer flow has a | | | | | the outer flow has a | | |
| | has a separate retry | | | | | has a separate retry | | |
| | behavior). | | | | | behavior). | | |
+-------------------+-----------------------+------+-----------+ +----------------------+-----------------------+------+------------+
| inject_transient | When true, values | bool | ``True`` | | ``inject_transient`` | When true, values | bool | ``True`` |
| | that are local to | | | | | that are local to | | |
| | each atoms scope | | | | | each atoms scope | | |
| | are injected into | | | | | are injected into | | |
| | storage into a | | | | | storage into a | | |
| | transient location | | | | | transient location | | |
| | (typically a local | | | | | (typically a local | | |
| | dictionary), when | | | | | dictionary), when | | |
| | false those values | | | | | false those values | | |
| | are instead persisted | | | | | are instead persisted | | |
| | into atom details | | | | | into atom details | | |
| | (and saved in a non- | | | | | (and saved in a non- | | |
| | transient manner). | | | | | transient manner). | | |
+-------------------+-----------------------+------+-----------+ +----------------------+-----------------------+------+------------+
""" """
NO_RERAISING_STATES = frozenset([states.SUSPENDED, states.SUCCESS]) NO_RERAISING_STATES = frozenset([states.SUSPENDED, states.SUCCESS])
@ -148,6 +148,12 @@ class ActionEngine(base.Engine):
end-users when doing execution iterations via :py:meth:`.run_iter`. end-users when doing execution iterations via :py:meth:`.run_iter`.
""" """
MAX_MACHINE_STATES_RETAINED = 10
"""
During :py:meth:`~.run_iter` the last X state machine transitions will
be recorded (typically only useful on failure).
"""
def __init__(self, flow, flow_detail, backend, options): def __init__(self, flow, flow_detail, backend, options):
super(ActionEngine, self).__init__(flow, flow_detail, backend, options) super(ActionEngine, self).__init__(flow, flow_detail, backend, options)
self._runtime = None self._runtime = None
@ -242,16 +248,21 @@ class ActionEngine(base.Engine):
self.compile() self.compile()
self.prepare() self.prepare()
self.validate() self.validate()
last_state = None # Keep track of the last X state changes, which if a failure happens
# are quite useful to log (and the performance of tracking this
# should be negligible).
last_transitions = collections.deque(
maxlen=max(1, self.MAX_MACHINE_STATES_RETAINED))
with _start_stop(self._task_executor, self._retry_executor): with _start_stop(self._task_executor, self._retry_executor):
self._change_state(states.RUNNING) self._change_state(states.RUNNING)
try: try:
closed = False closed = False
machine, memory = self._runtime.builder.build(timeout=timeout) machine, memory = self._runtime.builder.build(timeout=timeout)
r = runners.FiniteRunner(machine) r = runners.FiniteRunner(machine)
for (_prior_state, new_state) in r.run_iter(builder.START): for transition in r.run_iter(builder.START):
last_state = new_state last_transitions.append(transition)
# NOTE(harlowja): skip over meta-states. _prior_state, new_state = transition
# NOTE(harlowja): skip over meta-states
if new_state in builder.META_STATES: if new_state in builder.META_STATES:
continue continue
if new_state == states.FAILURE: if new_state == states.FAILURE:
@ -271,15 +282,24 @@ class ActionEngine(base.Engine):
self.suspend() self.suspend()
except Exception: except Exception:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
LOG.exception("Engine execution has failed, something"
" bad must of happened (last"
" %s machine transitions were %s)",
last_transitions.maxlen,
list(last_transitions))
self._change_state(states.FAILURE) self._change_state(states.FAILURE)
else: else:
if last_state and last_state not in self.IGNORABLE_STATES: if last_transitions:
self._change_state(new_state) _prior_state, new_state = last_transitions[-1]
if last_state not in self.NO_RERAISING_STATES: if new_state not in self.IGNORABLE_STATES:
it = itertools.chain( self._change_state(new_state)
six.itervalues(self.storage.get_failures()), if new_state not in self.NO_RERAISING_STATES:
six.itervalues(self.storage.get_revert_failures())) failures = self.storage.get_failures()
failure.Failure.reraise_if_any(it) more_failures = self.storage.get_revert_failures()
fails = itertools.chain(
six.itervalues(failures),
six.itervalues(more_failures))
failure.Failure.reraise_if_any(fails)
@staticmethod @staticmethod
def _check_compilation(compilation): def _check_compilation(compilation):

View File

@ -19,6 +19,7 @@ import functools
from futurist import waiters from futurist import waiters
from taskflow import deciders as de
from taskflow.engines.action_engine.actions import retry as ra from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta from taskflow.engines.action_engine.actions import task as ta
from taskflow.engines.action_engine import analyzer as an from taskflow.engines.action_engine import analyzer as an
@ -27,11 +28,17 @@ from taskflow.engines.action_engine import compiler as com
from taskflow.engines.action_engine import completer as co from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import scheduler as sched from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc from taskflow.engines.action_engine import scopes as sc
from taskflow.engines.action_engine import traversal as tr
from taskflow import exceptions as exc from taskflow import exceptions as exc
from taskflow.flow import LINK_DECIDER
from taskflow import states as st from taskflow import states as st
from taskflow.utils import misc from taskflow.utils import misc
from taskflow.flow import (LINK_DECIDER, LINK_DECIDER_DEPTH) # noqa
# Small helper to make the edge decider tuples more easily useable...
_EdgeDecider = collections.namedtuple('_EdgeDecider',
'from_node,kind,decider,depth')
class Runtime(object): class Runtime(object):
"""A aggregate of runtime objects, properties, ... used during execution. """A aggregate of runtime objects, properties, ... used during execution.
@ -42,7 +49,8 @@ class Runtime(object):
""" """
def __init__(self, compilation, storage, atom_notifier, def __init__(self, compilation, storage, atom_notifier,
task_executor, retry_executor, options=None): task_executor, retry_executor,
options=None):
self._atom_notifier = atom_notifier self._atom_notifier = atom_notifier
self._task_executor = task_executor self._task_executor = task_executor
self._retry_executor = retry_executor self._retry_executor = retry_executor
@ -51,11 +59,10 @@ class Runtime(object):
self._atom_cache = {} self._atom_cache = {}
self._options = misc.safe_copy_dict(options) self._options = misc.safe_copy_dict(options)
@staticmethod def _walk_edge_deciders(self, graph, atom):
def _walk_edge_deciders(graph, atom):
"""Iterates through all nodes, deciders that alter atoms execution.""" """Iterates through all nodes, deciders that alter atoms execution."""
# This is basically a reverse breadth first exploration, with # This is basically a reverse breadth first exploration, with
# special logic to further traverse down flow nodes... # special logic to further traverse down flow nodes as needed...
predecessors_iter = graph.predecessors_iter predecessors_iter = graph.predecessors_iter
nodes = collections.deque((u_node, atom) nodes = collections.deque((u_node, atom)
for u_node in predecessors_iter(atom)) for u_node in predecessors_iter(atom))
@ -63,14 +70,19 @@ class Runtime(object):
while nodes: while nodes:
u_node, v_node = nodes.popleft() u_node, v_node = nodes.popleft()
u_node_kind = graph.node[u_node]['kind'] u_node_kind = graph.node[u_node]['kind']
u_v_data = graph.adj[u_node][v_node]
try: try:
yield (u_node, u_node_kind, decider = u_v_data[LINK_DECIDER]
graph.adj[u_node][v_node][LINK_DECIDER]) decider_depth = u_v_data.get(LINK_DECIDER_DEPTH)
if decider_depth is None:
decider_depth = de.Depth.ALL
yield _EdgeDecider(u_node, u_node_kind,
decider, decider_depth)
except KeyError: except KeyError:
pass pass
if u_node_kind == com.FLOW and u_node not in visited: if u_node_kind == com.FLOW and u_node not in visited:
# Avoid re-exploring the same flow if we get to this # Avoid re-exploring the same flow if we get to this same
# same flow by a different *future* path... # flow by a different *future* path...
visited.add(u_node) visited.add(u_node)
# Since we *currently* jump over flow node(s), we need to make # Since we *currently* jump over flow node(s), we need to make
# sure that any prior decider that was directed at this flow # sure that any prior decider that was directed at this flow
@ -108,7 +120,7 @@ class Runtime(object):
graph = self._compilation.execution_graph graph = self._compilation.execution_graph
for node, node_data in graph.nodes_iter(data=True): for node, node_data in graph.nodes_iter(data=True):
node_kind = node_data['kind'] node_kind = node_data['kind']
if node_kind == com.FLOW: if node_kind in com.FLOWS:
continue continue
elif node_kind in com.ATOMS: elif node_kind in com.ATOMS:
check_transition_handler = check_transition_handlers[node_kind] check_transition_handler = check_transition_handlers[node_kind]
@ -128,6 +140,10 @@ class Runtime(object):
metadata['edge_deciders'] = tuple(deciders_it) metadata['edge_deciders'] = tuple(deciders_it)
metadata['action'] = action metadata['action'] = action
self._atom_cache[node.name] = metadata self._atom_cache[node.name] = metadata
# TODO(harlowja): optimize the different decider depths to avoid
# repeated full successor searching; this can be done by searching
# for the widest depth of parent(s), and limiting the search of
# children by the that depth.
@property @property
def compilation(self): def compilation(self):
@ -246,11 +262,12 @@ class Runtime(object):
def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE): def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE):
"""Resets a atoms subgraph to the given state and intention. """Resets a atoms subgraph to the given state and intention.
The subgraph is contained of all of the atoms successors. The subgraph is contained of **all** of the atoms successors.
""" """
return self.reset_atoms( execution_graph = self._compilation.execution_graph
self.analyzer.iterate_connected_atoms(atom), atoms_it = tr.depth_first_iterate(execution_graph, atom,
state=state, intention=intention) tr.Direction.FORWARD)
return self.reset_atoms(atoms_it, state=state, intention=intention)
def retry_subflow(self, retry): def retry_subflow(self, retry):
"""Prepares a retrys + its subgraph for execution. """Prepares a retrys + its subgraph for execution.

View File

@ -15,33 +15,12 @@
# under the License. # under the License.
from taskflow.engines.action_engine import compiler as co from taskflow.engines.action_engine import compiler as co
from taskflow.engines.action_engine import traversal as tr
from taskflow import logging from taskflow import logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def _depth_first_reverse_iterate(node, idx=-1):
"""Iterates connected (in reverse) nodes (from starting node).
Jumps through nodes with ``FLOW`` ``kind`` attribute (does not yield
them back).
"""
# Always go left to right, since right to left is the pattern order
# and we want to go backwards and not forwards through that ordering...
if idx == -1:
children_iter = node.reverse_iter()
else:
children_iter = reversed(node[0:idx])
for child in children_iter:
if child.metadata['kind'] == co.FLOW:
# Jump through these...
for child_child in child.dfs_iter(right_to_left=False):
if child_child.metadata['kind'] in co.ATOMS:
yield child_child.item
else:
yield child.item
class ScopeWalker(object): class ScopeWalker(object):
"""Walks through the scopes of a atom using a engines compilation. """Walks through the scopes of a atom using a engines compilation.
@ -117,7 +96,9 @@ class ScopeWalker(object):
except KeyError: except KeyError:
visible = [] visible = []
removals = set() removals = set()
for atom in _depth_first_reverse_iterate(parent, idx=last_idx): atom_it = tr.depth_first_reverse_iterate(
parent, start_from_idx=last_idx)
for atom in atom_it:
if atom in predecessors: if atom in predecessors:
predecessors.remove(atom) predecessors.remove(atom)
removals.add(atom) removals.add(atom)

View File

@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import enum
from taskflow.engines.action_engine import compiler as co
class Direction(enum.Enum):
"""Traversal direction enum."""
#: Go through successors.
FORWARD = 1
#: Go through predecessors.
BACKWARD = 2
def _extract_connectors(execution_graph, starting_node, direction,
through_flows=True, through_retries=True,
through_tasks=True):
if direction == Direction.FORWARD:
connected_iter = execution_graph.successors_iter
else:
connected_iter = execution_graph.predecessors_iter
connected_to_functors = {}
if through_flows:
connected_to_functors[co.FLOW] = connected_iter
connected_to_functors[co.FLOW_END] = connected_iter
if through_retries:
connected_to_functors[co.RETRY] = connected_iter
if through_tasks:
connected_to_functors[co.TASK] = connected_iter
return connected_iter(starting_node), connected_to_functors
def breadth_first_iterate(execution_graph, starting_node, direction,
through_flows=True, through_retries=True,
through_tasks=True):
"""Iterates connected nodes in execution graph (from starting node).
Does so in a breadth first manner.
Jumps over nodes with ``noop`` attribute (does not yield them back).
"""
initial_nodes_iter, connected_to_functors = _extract_connectors(
execution_graph, starting_node, direction,
through_flows=through_flows, through_retries=through_retries,
through_tasks=through_tasks)
q = collections.deque(initial_nodes_iter)
while q:
node = q.popleft()
node_attrs = execution_graph.node[node]
if not node_attrs.get('noop'):
yield node
try:
node_kind = node_attrs['kind']
connected_to_functor = connected_to_functors[node_kind]
except KeyError:
pass
else:
q.extend(connected_to_functor(node))
def depth_first_iterate(execution_graph, starting_node, direction,
through_flows=True, through_retries=True,
through_tasks=True):
"""Iterates connected nodes in execution graph (from starting node).
Does so in a depth first manner.
Jumps over nodes with ``noop`` attribute (does not yield them back).
"""
initial_nodes_iter, connected_to_functors = _extract_connectors(
execution_graph, starting_node, direction,
through_flows=through_flows, through_retries=through_retries,
through_tasks=through_tasks)
stack = list(initial_nodes_iter)
while stack:
node = stack.pop()
node_attrs = execution_graph.node[node]
if not node_attrs.get('noop'):
yield node
try:
node_kind = node_attrs['kind']
connected_to_functor = connected_to_functors[node_kind]
except KeyError:
pass
else:
stack.extend(connected_to_functor(node))
def depth_first_reverse_iterate(node, start_from_idx=-1):
"""Iterates connected (in reverse) **tree** nodes (from starting node).
Jumps through nodes with ``noop`` attribute (does not yield them back).
"""
# Always go left to right, since right to left is the pattern order
# and we want to go backwards and not forwards through that ordering...
if start_from_idx == -1:
# All of them...
children_iter = node.reverse_iter()
else:
children_iter = reversed(node[0:start_from_idx])
for child in children_iter:
if child.metadata.get('noop'):
# Jump through these...
for grand_child in child.dfs_iter(right_to_left=False):
if grand_child.metadata['kind'] in co.ATOMS:
yield grand_child.item
else:
yield child.item

View File

@ -221,6 +221,14 @@ class InvalidFormat(TaskFlowException):
"""Raised when some object/entity is not in the expected format.""" """Raised when some object/entity is not in the expected format."""
class DisallowedAccess(TaskFlowException):
"""Raised when storage access is not possible due to state limitations."""
def __init__(self, message, cause=None, state=None):
super(DisallowedAccess, self).__init__(message, cause=cause)
self.state = state
# Others. # Others.
class NotImplementedError(NotImplementedError): class NotImplementedError(NotImplementedError):

View File

@ -39,6 +39,9 @@ LINK_DECIDER = 'decider'
_CHOP_PAT = "taskflow.patterns." _CHOP_PAT = "taskflow.patterns."
_CHOP_PAT_LEN = len(_CHOP_PAT) _CHOP_PAT_LEN = len(_CHOP_PAT)
# This key denotes the depth the decider will apply (defaulting to all).
LINK_DECIDER_DEPTH = 'decider_depth'
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class Flow(object): class Flow(object):

View File

@ -42,16 +42,20 @@ def _fetch_predecessor_tree(graph, atom):
"""Creates a tree of predecessors, rooted at given atom.""" """Creates a tree of predecessors, rooted at given atom."""
root = tree.Node(atom) root = tree.Node(atom)
stack = [(root, atom)] stack = [(root, atom)]
seen = set()
while stack: while stack:
parent, node = stack.pop() parent, node = stack.pop()
for pred_node in graph.predecessors_iter(node): for pred_node in graph.predecessors_iter(node):
child = tree.Node(pred_node, pred_node_data = graph.node[pred_node]
**graph.node[pred_node]) if pred_node_data['kind'] == compiler.FLOW_END:
parent.add(child) # Jump over and/or don't show flow end nodes...
stack.append((child, pred_node)) for pred_pred_node in graph.predecessors_iter(pred_node):
seen.add(pred_node) stack.append((parent, pred_pred_node))
return len(seen), root else:
child = tree.Node(pred_node, **pred_node_data)
parent.add(child)
# And go further backwards...
stack.append((child, pred_node))
return root
class FailureFormatter(object): class FailureFormatter(object):
@ -64,59 +68,51 @@ class FailureFormatter(object):
def __init__(self, engine, hide_inputs_outputs_of=()): def __init__(self, engine, hide_inputs_outputs_of=()):
self._hide_inputs_outputs_of = hide_inputs_outputs_of self._hide_inputs_outputs_of = hide_inputs_outputs_of
self._engine = engine self._engine = engine
self._formatter_funcs = {
compiler.FLOW: self._format_flow,
}
for kind in compiler.ATOMS:
self._formatter_funcs[kind] = self._format_atom
def _format_atom(self, storage, cache, node):
"""Formats a single tree node (atom) into a string version."""
atom = node.item
atom_name = atom.name
atom_attrs = {}
intention, intention_found = _cached_get(cache, 'intentions',
atom_name,
storage.get_atom_intention,
atom_name)
if intention_found:
atom_attrs['intention'] = intention
state, state_found = _cached_get(cache, 'states', atom_name,
storage.get_atom_state, atom_name)
if state_found:
atom_attrs['state'] = state
if atom_name not in self._hide_inputs_outputs_of:
# When the cache does not exist for this atom this
# will be called with the rest of these arguments
# used to populate the cache.
fetch_mapped_args = functools.partial(
storage.fetch_mapped_args, atom.rebind,
atom_name=atom_name, optional_args=atom.optional)
requires, requires_found = _cached_get(cache, 'requires',
atom_name,
fetch_mapped_args)
if requires_found:
atom_attrs['requires'] = requires
provides, provides_found = _cached_get(cache, 'provides',
atom_name,
storage.get_execute_result,
atom_name)
if provides_found:
atom_attrs['provides'] = provides
if atom_attrs:
return "Atom '%s' %s" % (atom_name, atom_attrs)
else:
return "Atom '%s'" % (atom_name)
def _format_flow(self, storage, cache, node):
"""Formats a single tree node (flow) into a string version."""
flow = node.item
return flow.name
def _format_node(self, storage, cache, node): def _format_node(self, storage, cache, node):
"""Formats a single tree node into a string version.""" """Formats a single tree node into a string version."""
formatter_func = self. _formatter_funcs[node.metadata['kind']] if node.metadata['kind'] == compiler.FLOW:
return formatter_func(storage, cache, node) flow = node.item
flow_name = flow.name
return "Flow '%s'" % (flow_name)
elif node.metadata['kind'] in compiler.ATOMS:
atom = node.item
atom_name = atom.name
atom_attrs = {}
intention, intention_found = _cached_get(
cache, 'intentions', atom_name, storage.get_atom_intention,
atom_name)
if intention_found:
atom_attrs['intention'] = intention
state, state_found = _cached_get(cache, 'states', atom_name,
storage.get_atom_state,
atom_name)
if state_found:
atom_attrs['state'] = state
if atom_name not in self._hide_inputs_outputs_of:
# When the cache does not exist for this atom this
# will be called with the rest of these arguments
# used to populate the cache.
fetch_mapped_args = functools.partial(
storage.fetch_mapped_args, atom.rebind,
atom_name=atom_name, optional_args=atom.optional)
requires, requires_found = _cached_get(cache, 'requires',
atom_name,
fetch_mapped_args)
if requires_found:
atom_attrs['requires'] = requires
provides, provides_found = _cached_get(
cache, 'provides', atom_name,
storage.get_execute_result, atom_name)
if provides_found:
atom_attrs['provides'] = provides
if atom_attrs:
return "Atom '%s' %s" % (atom_name, atom_attrs)
else:
return "Atom '%s'" % (atom_name)
else:
raise TypeError("Unable to format node, unknown node"
" kind '%s' encountered" % node.metadata['kind'])
def format(self, fail, atom_matcher): def format(self, fail, atom_matcher):
"""Returns a (exc_info, details) tuple about the failure. """Returns a (exc_info, details) tuple about the failure.
@ -143,13 +139,11 @@ class FailureFormatter(object):
graph = compilation.execution_graph graph = compilation.execution_graph
atom_node = hierarchy.find_first_match(atom_matcher) atom_node = hierarchy.find_first_match(atom_matcher)
atom = None atom = None
priors = 0
atom_intention = None atom_intention = None
if atom_node is not None: if atom_node is not None:
atom = atom_node.item atom = atom_node.item
atom_intention = storage.get_atom_intention(atom.name) atom_intention = storage.get_atom_intention(atom.name)
priors = sum(c for (_n, c) in graph.in_degree_iter([atom])) if atom is not None and atom_intention in self._BUILDERS:
if atom is not None and priors and atom_intention in self._BUILDERS:
# Cache as much as we can, since the path of various atoms # Cache as much as we can, since the path of various atoms
# may cause the same atom to be seen repeatedly depending on # may cause the same atom to be seen repeatedly depending on
# the graph structure... # the graph structure...
@ -160,12 +154,13 @@ class FailureFormatter(object):
'states': {}, 'states': {},
} }
builder, kind = self._BUILDERS[atom_intention] builder, kind = self._BUILDERS[atom_intention]
count, rooted_tree = builder(graph, atom) rooted_tree = builder(graph, atom)
buff.write_nl('%s %s (most recent atoms first):' % (count, kind)) child_count = rooted_tree.child_count(only_direct=False)
buff.write_nl('%s %s (most recent first):' % (child_count, kind))
formatter = functools.partial(self._format_node, storage, cache) formatter = functools.partial(self._format_node, storage, cache)
child_count = rooted_tree.child_count() direct_child_count = rooted_tree.child_count(only_direct=True)
for i, child in enumerate(rooted_tree, 1): for i, child in enumerate(rooted_tree, 1):
if i == child_count: if i == direct_child_count:
buff.write(child.pformat(stringify_node=formatter, buff.write(child.pformat(stringify_node=formatter,
starting_prefix=" ")) starting_prefix=" "))
else: else:

View File

@ -18,6 +18,7 @@ import collections
import six import six
from taskflow import deciders as de
from taskflow import exceptions as exc from taskflow import exceptions as exc
from taskflow import flow from taskflow import flow
from taskflow.types import graph as gr from taskflow.types import graph as gr
@ -73,7 +74,7 @@ class Flow(flow.Flow):
#: Extracts the unsatisified symbol requirements of a single node. #: Extracts the unsatisified symbol requirements of a single node.
_unsatisfied_requires = staticmethod(_unsatisfied_requires) _unsatisfied_requires = staticmethod(_unsatisfied_requires)
def link(self, u, v, decider=None): def link(self, u, v, decider=None, decider_depth=None):
"""Link existing node u as a runtime dependency of existing node v. """Link existing node u as a runtime dependency of existing node v.
Note that if the addition of these edges creates a `cyclic`_ graph Note that if the addition of these edges creates a `cyclic`_ graph
@ -93,6 +94,13 @@ class Flow(flow.Flow):
links that have ``v`` as a target. It is expected to links that have ``v`` as a target. It is expected to
return a single boolean (``True`` to allow ``v`` return a single boolean (``True`` to allow ``v``
execution or ``False`` to not). execution or ``False`` to not).
:param decider_depth: One of the :py:class:`~taskflow.deciders.Depth`
enumerations (or a string version of) that will
be used to influence what atoms are ignored
when the decider provided results false. If
not provided (and a valid decider is provided
then this defaults to
:py:attr:`~taskflow.deciders.Depth.ALL`).
.. _cyclic: https://en.wikipedia.org/wiki/Cycle_graph .. _cyclic: https://en.wikipedia.org/wiki/Cycle_graph
""" """
@ -103,11 +111,13 @@ class Flow(flow.Flow):
if decider is not None: if decider is not None:
if not six.callable(decider): if not six.callable(decider):
raise ValueError("Decider boolean callback must be callable") raise ValueError("Decider boolean callback must be callable")
self._swap(self._link(u, v, manual=True, decider=decider)) self._swap(self._link(u, v, manual=True,
decider=decider, decider_depth=decider_depth))
return self return self
def _link(self, u, v, graph=None, def _link(self, u, v, graph=None,
reason=None, manual=False, decider=None): reason=None, manual=False, decider=None,
decider_depth=None):
mutable_graph = True mutable_graph = True
if graph is None: if graph is None:
graph = self._graph graph = self._graph
@ -119,6 +129,18 @@ class Flow(flow.Flow):
attrs = {} attrs = {}
if decider is not None: if decider is not None:
attrs[flow.LINK_DECIDER] = decider attrs[flow.LINK_DECIDER] = decider
try:
# Remove existing decider depth, if one existed.
del attrs[flow.LINK_DECIDER_DEPTH]
except KeyError:
pass
if decider_depth is not None:
if decider is None:
raise ValueError("Decider depth requires a decider to be"
" provided along with it")
else:
decider_depth = de.Depth.translate(decider_depth)
attrs[flow.LINK_DECIDER_DEPTH] = decider_depth
if manual: if manual:
attrs[flow.LINK_MANUAL] = True attrs[flow.LINK_MANUAL] = True
if reason is not None: if reason is not None:

View File

@ -15,6 +15,7 @@
# under the License. # under the License.
import contextlib import contextlib
import functools
import fasteners import fasteners
from oslo_utils import reflection from oslo_utils import reflection
@ -84,6 +85,127 @@ META_PROGRESS = 'progress'
META_PROGRESS_DETAILS = 'progress_details' META_PROGRESS_DETAILS = 'progress_details'
class _ProviderLocator(object):
"""Helper to start to better decouple the finding logic from storage.
WIP: part of the larger effort to cleanup/refactor the finding of named
arguments so that the code can be more unified and easy to
follow...
"""
def __init__(self, transient_results,
providers_fetcher, result_fetcher):
self.result_fetcher = result_fetcher
self.providers_fetcher = providers_fetcher
self.transient_results = transient_results
def _try_get_results(self, looking_for, provider,
look_into_results=True, find_potentials=False):
if provider.name is _TRANSIENT_PROVIDER:
# TODO(harlowja): This 'is' check still sucks, do this
# better in the future...
results = self.transient_results
else:
try:
results = self.result_fetcher(provider.name)
except (exceptions.NotFound, exceptions.DisallowedAccess):
if not find_potentials:
raise
else:
# Ok, likely hasn't produced a result yet, but
# at a future point it hopefully will, so stub
# out the *expected* result.
results = {}
if look_into_results:
_item_from_single(provider, results, looking_for)
return results
def _find(self, looking_for, scope_walker=None,
short_circuit=True, find_potentials=False):
if scope_walker is None:
scope_walker = []
default_providers, atom_providers = self.providers_fetcher(looking_for)
searched_providers = set()
providers_and_results = []
if default_providers:
for p in default_providers:
searched_providers.add(p)
try:
provider_results = self._try_get_results(
looking_for, p, find_potentials=find_potentials,
# For default providers always look into there
# results as default providers are statically setup
# and therefore looking into there provided results
# should fail early.
look_into_results=True)
except exceptions.NotFound:
if not find_potentials:
raise
else:
providers_and_results.append((p, provider_results))
if short_circuit:
return (searched_providers, providers_and_results)
if not atom_providers:
return (searched_providers, providers_and_results)
atom_providers_by_name = dict((p.name, p) for p in atom_providers)
for accessible_atom_names in iter(scope_walker):
# *Always* retain the scope ordering (if any matches
# happen); instead of retaining the possible provider match
# order (which isn't that important and may be different from
# the scope requested ordering).
maybe_atom_providers = [atom_providers_by_name[atom_name]
for atom_name in accessible_atom_names
if atom_name in atom_providers_by_name]
tmp_providers_and_results = []
if find_potentials:
for p in maybe_atom_providers:
searched_providers.add(p)
tmp_providers_and_results.append((p, {}))
else:
for p in maybe_atom_providers:
searched_providers.add(p)
try:
# Don't at this point look into the provider results
# as calling code will grab all providers, and then
# get the result from the *first* provider that
# actually provided it (or die).
provider_results = self._try_get_results(
looking_for, p, find_potentials=find_potentials,
look_into_results=False)
except exceptions.DisallowedAccess as e:
if e.state != states.IGNORE:
exceptions.raise_with_cause(
exceptions.NotFound,
"Expected to be able to find output %r"
" produced by %s but was unable to get at"
" that providers results" % (looking_for, p))
else:
LOG.blather("Avoiding using the results of"
" %r (from %s) for name %r because"
" it was ignored", p.name, p,
looking_for)
else:
tmp_providers_and_results.append((p, provider_results))
if tmp_providers_and_results and short_circuit:
return (searched_providers, tmp_providers_and_results)
else:
providers_and_results.extend(tmp_providers_and_results)
return (searched_providers, providers_and_results)
def find_potentials(self, looking_for, scope_walker=None):
"""Returns the accessible **potential** providers."""
_searched_providers, providers_and_results = self._find(
looking_for, scope_walker=scope_walker,
short_circuit=False, find_potentials=True)
return set(p for (p, _provider_results) in providers_and_results)
def find(self, looking_for, scope_walker=None, short_circuit=True):
"""Returns the accessible providers."""
return self._find(looking_for, scope_walker=scope_walker,
short_circuit=short_circuit,
find_potentials=False)
class _Provider(object): class _Provider(object):
"""A named symbol provider that produces a output at the given index.""" """A named symbol provider that produces a output at the given index."""
@ -326,13 +448,13 @@ class Storage(object):
ad = self._flowdetail.find(self._atom_name_to_uuid[atom_name]) ad = self._flowdetail.find(self._atom_name_to_uuid[atom_name])
except KeyError: except KeyError:
exceptions.raise_with_cause(exceptions.NotFound, exceptions.raise_with_cause(exceptions.NotFound,
"Unknown atom name: %s" % atom_name) "Unknown atom name '%s'" % atom_name)
else: else:
# TODO(harlowja): we need to figure out how to get away from doing # TODO(harlowja): we need to figure out how to get away from doing
# these kinds of type checks in general (since they likely mean # these kinds of type checks in general (since they likely mean
# we aren't doing something right). # we aren't doing something right).
if expected_type and not isinstance(ad, expected_type): if expected_type and not isinstance(ad, expected_type):
raise TypeError("Atom %s is not of the expected type: %s" raise TypeError("Atom '%s' is not of the expected type: %s"
% (atom_name, % (atom_name,
reflection.get_class_name(expected_type))) reflection.get_class_name(expected_type)))
if clone: if clone:
@ -479,8 +601,9 @@ class Storage(object):
try: try:
_item_from(container, index) _item_from(container, index)
except _EXTRACTION_EXCEPTIONS: except _EXTRACTION_EXCEPTIONS:
LOG.warning("Atom %s did not supply result " LOG.warning("Atom '%s' did not supply result "
"with index %r (name %s)", atom_name, index, name) "with index %r (name '%s')", atom_name, index,
name)
@fasteners.write_locked @fasteners.write_locked
def save(self, atom_name, result, state=states.SUCCESS): def save(self, atom_name, result, state=states.SUCCESS):
@ -545,17 +668,34 @@ class Storage(object):
except KeyError: except KeyError:
pass pass
return failure return failure
# TODO(harlowja): this seems like it should be checked before fetching else:
# the potential failure, instead of after, fix this soon... if source.state not in allowed_states:
if source.state not in allowed_states: raise exceptions.DisallowedAccess(
raise exceptions.NotFound("Result for atom %s is not currently" "Result for atom '%s' is not known/accessible"
" known" % atom_name) " due to it being in %s state when result access"
return getattr(source, results_attr_name) " is restricted to %s states" % (atom_name,
source.state,
allowed_states),
state=source.state)
return getattr(source, results_attr_name)
def get_execute_result(self, atom_name): def get_execute_result(self, atom_name):
"""Gets the ``execute`` results for an atom from storage.""" """Gets the ``execute`` results for an atom from storage."""
return self._get(atom_name, 'results', 'failure', try:
_EXECUTE_STATES_WITH_RESULTS, states.EXECUTE) results = self._get(atom_name, 'results', 'failure',
_EXECUTE_STATES_WITH_RESULTS, states.EXECUTE)
except exceptions.DisallowedAccess as e:
if e.state == states.IGNORE:
exceptions.raise_with_cause(exceptions.NotFound,
"Result for atom '%s' execution"
" is not known (as it was"
" ignored)" % atom_name)
else:
exceptions.raise_with_cause(exceptions.NotFound,
"Result for atom '%s' execution"
" is not known" % atom_name)
else:
return results
@fasteners.read_locked @fasteners.read_locked
def _get_failures(self, fail_cache_key): def _get_failures(self, fail_cache_key):
@ -577,8 +717,21 @@ class Storage(object):
def get_revert_result(self, atom_name): def get_revert_result(self, atom_name):
"""Gets the ``revert`` results for an atom from storage.""" """Gets the ``revert`` results for an atom from storage."""
return self._get(atom_name, 'revert_results', 'revert_failure', try:
_REVERT_STATES_WITH_RESULTS, states.REVERT) results = self._get(atom_name, 'revert_results', 'revert_failure',
_REVERT_STATES_WITH_RESULTS, states.REVERT)
except exceptions.DisallowedAccess as e:
if e.state == states.IGNORE:
exceptions.raise_with_cause(exceptions.NotFound,
"Result for atom '%s' revert is"
" not known (as it was"
" ignored)" % atom_name)
else:
exceptions.raise_with_cause(exceptions.NotFound,
"Result for atom '%s' revert is"
" not known" % atom_name)
else:
return results
def get_revert_failures(self): def get_revert_failures(self):
"""Get all ``revert`` failures that happened with this flow.""" """Get all ``revert`` failures that happened with this flow."""
@ -639,7 +792,7 @@ class Storage(object):
be serializable). be serializable).
""" """
if atom_name not in self._atom_name_to_uuid: if atom_name not in self._atom_name_to_uuid:
raise exceptions.NotFound("Unknown atom name: %s" % atom_name) raise exceptions.NotFound("Unknown atom name '%s'" % atom_name)
def save_transient(): def save_transient():
self._injected_args.setdefault(atom_name, {}) self._injected_args.setdefault(atom_name, {})
@ -728,6 +881,19 @@ class Storage(object):
self._set_result_mapping(provider_name, self._set_result_mapping(provider_name,
dict((name, name) for name in names)) dict((name, name) for name in names))
def _fetch_providers(self, looking_for, providers=None):
"""Return pair of (default providers, atom providers)."""
if providers is None:
providers = self._reverse_mapping.get(looking_for, [])
default_providers = []
atom_providers = []
for p in providers:
if p.name in (_TRANSIENT_PROVIDER, self.injector_name):
default_providers.append(p)
else:
atom_providers.append(p)
return default_providers, atom_providers
def _set_result_mapping(self, provider_name, mapping): def _set_result_mapping(self, provider_name, mapping):
"""Sets the result mapping for a given producer. """Sets the result mapping for a given producer.
@ -757,30 +923,30 @@ class Storage(object):
if many_handler is None: if many_handler is None:
many_handler = _many_handler many_handler = _many_handler
try: try:
providers = self._reverse_mapping[name] maybe_providers = self._reverse_mapping[name]
except KeyError: except KeyError:
exceptions.raise_with_cause(exceptions.NotFound, raise exceptions.NotFound("Name %r is not mapped as a produced"
"Name %r is not mapped as a produced" " output by any providers" % name)
" output by any providers" % name) locator = _ProviderLocator(
self._transients,
functools.partial(self._fetch_providers,
providers=maybe_providers),
lambda atom_name:
self._get(atom_name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS, states.EXECUTE))
values = [] values = []
for provider in providers: searched_providers, providers = locator.find(
if provider.name is _TRANSIENT_PROVIDER: name, short_circuit=False,
values.append(_item_from_single(provider, # NOTE(harlowja): There are no scopes used here (as of now), so
self._transients, name)) # we just return all known providers as if it was one large
else: # scope.
try: scope_walker=[[p.name for p in maybe_providers]])
container = self._get(provider.name, for provider, results in providers:
'last_results', 'failure', values.append(_item_from_single(provider, results, name))
_EXECUTE_STATES_WITH_RESULTS,
states.EXECUTE)
except exceptions.NotFound:
pass
else:
values.append(_item_from_single(provider,
container, name))
if not values: if not values:
raise exceptions.NotFound("Unable to find result %r," raise exceptions.NotFound(
" searched %s" % (name, providers)) "Unable to find result %r, searched %s providers"
% (name, len(searched_providers)))
else: else:
return many_handler(values) return many_handler(values)
@ -796,49 +962,6 @@ class Storage(object):
needed values; it just checks that they are registered to produce needed values; it just checks that they are registered to produce
it in the future. it in the future.
""" """
def _fetch_providers(name):
"""Fetchs pair of (default providers, non-default providers)."""
default_providers = []
non_default_providers = []
for p in self._reverse_mapping.get(name, []):
if p.name in (_TRANSIENT_PROVIDER, self.injector_name):
default_providers.append(p)
else:
non_default_providers.append(p)
return default_providers, non_default_providers
def _locate_providers(name, scope_walker=None):
"""Finds the accessible *potential* providers."""
default_providers, non_default_providers = _fetch_providers(name)
providers = []
if non_default_providers:
if scope_walker is not None:
scope_iter = iter(scope_walker)
else:
scope_iter = iter([])
for names in scope_iter:
for p in non_default_providers:
if p.name in names:
providers.append(p)
for p in default_providers:
if p.name is _TRANSIENT_PROVIDER:
results = self._transients
else:
try:
results = self._get(p.name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS,
states.EXECUTE)
except exceptions.NotFound:
results = {}
try:
_item_from_single(p, results, name)
except exceptions.NotFound:
pass
else:
providers.append(p)
return providers
source, _clone = self._atomdetail_by_name(atom_name) source, _clone = self._atomdetail_by_name(atom_name)
if scope_walker is None: if scope_walker is None:
scope_walker = self._scope_fetcher(atom_name) scope_walker = self._scope_fetcher(atom_name)
@ -849,6 +972,11 @@ class Storage(object):
source.meta.get(META_INJECTED, {}), source.meta.get(META_INJECTED, {}),
] ]
missing = set(six.iterkeys(args_mapping)) missing = set(six.iterkeys(args_mapping))
locator = _ProviderLocator(
self._transients, self._fetch_providers,
lambda atom_name:
self._get(atom_name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS, states.EXECUTE))
for (bound_name, name) in six.iteritems(args_mapping): for (bound_name, name) in six.iteritems(args_mapping):
if LOG.isEnabledFor(logging.TRACE): if LOG.isEnabledFor(logging.TRACE):
LOG.trace("Looking for %r <= %r for atom '%s'", LOG.trace("Looking for %r <= %r for atom '%s'",
@ -863,8 +991,8 @@ class Storage(object):
continue continue
if name in source: if name in source:
maybe_providers += 1 maybe_providers += 1
providers = _locate_providers(name, scope_walker=scope_walker) maybe_providers += len(
maybe_providers += len(providers) locator.find_potentials(name, scope_walker=scope_walker))
if maybe_providers: if maybe_providers:
LOG.trace("Atom '%s' will have %s potential providers" LOG.trace("Atom '%s' will have %s potential providers"
" of %r <= %r", atom_name, maybe_providers, " of %r <= %r", atom_name, maybe_providers,
@ -894,7 +1022,6 @@ class Storage(object):
atom_name=None, scope_walker=None, atom_name=None, scope_walker=None,
optional_args=None): optional_args=None):
"""Fetch ``execute`` arguments for an atom using its args mapping.""" """Fetch ``execute`` arguments for an atom using its args mapping."""
def _extract_first_from(name, sources): def _extract_first_from(name, sources):
"""Extracts/returns first occurence of key in list of dicts.""" """Extracts/returns first occurence of key in list of dicts."""
for i, source in enumerate(sources): for i, source in enumerate(sources):
@ -903,49 +1030,6 @@ class Storage(object):
if name in source: if name in source:
return (i, source[name]) return (i, source[name])
raise KeyError(name) raise KeyError(name)
def _get_results(looking_for, provider):
"""Gets the results saved for a given provider."""
try:
return self._get(provider.name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS,
states.EXECUTE)
except exceptions.NotFound:
exceptions.raise_with_cause(exceptions.NotFound,
"Expected to be able to find"
" output %r produced by %s but was"
" unable to get at that providers"
" results" % (looking_for,
provider))
def _locate_providers(looking_for, possible_providers,
scope_walker=None):
"""Finds the accessible providers."""
default_providers = []
for p in possible_providers:
if p.name is _TRANSIENT_PROVIDER:
default_providers.append((p, self._transients))
if p.name == self.injector_name:
default_providers.append((p, _get_results(looking_for, p)))
if default_providers:
return default_providers
if scope_walker is not None:
scope_iter = iter(scope_walker)
else:
scope_iter = iter([])
extractor = lambda p: p.name
for names in scope_iter:
# *Always* retain the scope ordering (if any matches
# happen); instead of retaining the possible provider match
# order (which isn't that important and may be different from
# the scope requested ordering).
providers = misc.look_for(names, possible_providers,
extractor=extractor)
if providers:
return [(p, _get_results(looking_for, p))
for p in providers]
return []
if optional_args is None: if optional_args is None:
optional_args = [] optional_args = []
if atom_name: if atom_name:
@ -960,6 +1044,9 @@ class Storage(object):
injected_sources = [] injected_sources = []
if not args_mapping: if not args_mapping:
return {} return {}
get_results = lambda atom_name: \
self._get(atom_name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS, states.EXECUTE)
mapped_args = {} mapped_args = {}
for (bound_name, name) in six.iteritems(args_mapping): for (bound_name, name) in six.iteritems(args_mapping):
if LOG.isEnabledFor(logging.TRACE): if LOG.isEnabledFor(logging.TRACE):
@ -969,8 +1056,8 @@ class Storage(object):
else: else:
LOG.trace("Looking for %r <= %r", bound_name, name) LOG.trace("Looking for %r <= %r", bound_name, name)
try: try:
source_index, value = _extract_first_from(name, source_index, value = _extract_first_from(
injected_sources) name, injected_sources)
mapped_args[bound_name] = value mapped_args[bound_name] = value
if LOG.isEnabledFor(logging.TRACE): if LOG.isEnabledFor(logging.TRACE):
if source_index == 0: if source_index == 0:
@ -983,7 +1070,7 @@ class Storage(object):
" values)", bound_name, name, value) " values)", bound_name, name, value)
except KeyError: except KeyError:
try: try:
possible_providers = self._reverse_mapping[name] maybe_providers = self._reverse_mapping[name]
except KeyError: except KeyError:
if bound_name in optional_args: if bound_name in optional_args:
LOG.trace("Argument %r is optional, skipping", LOG.trace("Argument %r is optional, skipping",
@ -992,15 +1079,18 @@ class Storage(object):
raise exceptions.NotFound("Name %r is not mapped as a" raise exceptions.NotFound("Name %r is not mapped as a"
" produced output by any" " produced output by any"
" providers" % name) " providers" % name)
# Reduce the possible providers to one that are allowed. locator = _ProviderLocator(
providers = _locate_providers(name, possible_providers, self._transients,
scope_walker=scope_walker) functools.partial(self._fetch_providers,
providers=maybe_providers), get_results)
searched_providers, providers = locator.find(
name, scope_walker=scope_walker)
if not providers: if not providers:
raise exceptions.NotFound( raise exceptions.NotFound(
"Mapped argument %r <= %r was not produced" "Mapped argument %r <= %r was not produced"
" by any accessible provider (%s possible" " by any accessible provider (%s possible"
" providers were scanned)" " providers were scanned)"
% (bound_name, name, len(possible_providers))) % (bound_name, name, len(searched_providers)))
provider, value = _item_from_first_of(providers, name) provider, value = _item_from_first_of(providers, name)
mapped_args[bound_name] = value mapped_args[bound_name] = value
LOG.trace("Matched %r <= %r to %r (from %s)", LOG.trace("Matched %r <= %r to %r (from %s)",

View File

@ -25,12 +25,27 @@ from taskflow import test
from taskflow.tests import utils as test_utils from taskflow.tests import utils as test_utils
def _replicate_graph_with_names(compilation):
# Turn a graph of nodes into a graph of names only so that
# testing can use those names instead of having to use the exact
# node objects themselves (which is problematic for any end nodes that
# are added into the graph *dynamically*, and are not there in the
# original/source flow).
g = compilation.execution_graph
n_g = g.__class__(name=g.name)
for node, node_data in g.nodes_iter(data=True):
n_g.add_node(node.name, attr_dict=node_data)
for u, v, u_v_data in g.edges_iter(data=True):
n_g.add_edge(u.name, v.name, attr_dict=u_v_data)
return n_g
class PatternCompileTest(test.TestCase): class PatternCompileTest(test.TestCase):
def test_task(self): def test_task(self):
task = test_utils.DummyTask(name='a') task = test_utils.DummyTask(name='a')
compilation = compiler.PatternCompiler(task).compile() g = _replicate_graph_with_names(
g = compilation.execution_graph compiler.PatternCompiler(task).compile())
self.assertEqual([task], list(g.nodes())) self.assertEqual(['a'], list(g.nodes()))
self.assertEqual([], list(g.edges())) self.assertEqual([], list(g.edges()))
def test_retry(self): def test_retry(self):
@ -54,19 +69,20 @@ class PatternCompileTest(test.TestCase):
inner_flo.add(d) inner_flo.add(d)
flo.add(inner_flo) flo.add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile() g = _replicate_graph_with_names(
g = compilation.execution_graph compiler.PatternCompiler(flo).compile())
self.assertEqual(6, len(g)) self.assertEqual(8, len(g))
order = g.topological_sort() order = g.topological_sort()
self.assertEqual([flo, a, b, c, inner_flo, d], order) self.assertEqual(['test', 'a', 'b', 'c',
self.assertTrue(g.has_edge(c, inner_flo)) "sub-test", 'd', "sub-test[$]",
self.assertTrue(g.has_edge(inner_flo, d)) 'test[$]'], order)
self.assertTrue(g.has_edge('c', "sub-test"))
self.assertTrue(g.has_edge("sub-test", 'd'))
self.assertEqual({'invariant': True}, self.assertEqual({'invariant': True},
g.get_edge_data(inner_flo, d)) g.get_edge_data("sub-test", 'd'))
self.assertEqual(['test[$]'], list(g.no_successors_iter()))
self.assertEqual([d], list(g.no_successors_iter())) self.assertEqual(['test'], list(g.no_predecessors_iter()))
self.assertEqual([flo], list(g.no_predecessors_iter()))
def test_invalid(self): def test_invalid(self):
a, b, c = test_utils.make_many(3) a, b, c = test_utils.make_many(3)
@ -80,19 +96,21 @@ class PatternCompileTest(test.TestCase):
a, b, c, d = test_utils.make_many(4) a, b, c, d = test_utils.make_many(4)
flo = uf.Flow("test") flo = uf.Flow("test")
flo.add(a, b, c, d) flo.add(a, b, c, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph g = _replicate_graph_with_names(
self.assertEqual(5, len(g)) compiler.PatternCompiler(flo).compile())
self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(), [ self.assertItemsEqual(g.edges(), [
(flo, a), ('test', 'a'),
(flo, b), ('test', 'b'),
(flo, c), ('test', 'c'),
(flo, d), ('test', 'd'),
('a', 'test[$]'),
('b', 'test[$]'),
('c', 'test[$]'),
('d', 'test[$]'),
]) ])
self.assertEqual(set([a, b, c, d]), self.assertEqual(set(['test']), set(g.no_predecessors_iter()))
set(g.no_successors_iter()))
self.assertEqual(set([flo]),
set(g.no_predecessors_iter()))
def test_linear_nested(self): def test_linear_nested(self):
a, b, c, d = test_utils.make_many(4) a, b, c, d = test_utils.make_many(4)
@ -102,22 +120,22 @@ class PatternCompileTest(test.TestCase):
inner_flo.add(c, d) inner_flo.add(c, d)
flo.add(inner_flo) flo.add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile() g = _replicate_graph_with_names(
graph = compilation.execution_graph compiler.PatternCompiler(flo).compile())
self.assertEqual(6, len(graph)) self.assertEqual(8, len(g))
lb = graph.subgraph([a, b]) sub_g = g.subgraph(['a', 'b'])
self.assertFalse(lb.has_edge(b, a)) self.assertFalse(sub_g.has_edge('b', 'a'))
self.assertTrue(lb.has_edge(a, b)) self.assertTrue(sub_g.has_edge('a', 'b'))
self.assertEqual({'invariant': True}, graph.get_edge_data(a, b)) self.assertEqual({'invariant': True}, sub_g.get_edge_data("a", "b"))
ub = graph.subgraph([c, d]) sub_g = g.subgraph(['c', 'd'])
self.assertEqual(0, ub.number_of_edges()) self.assertEqual(0, sub_g.number_of_edges())
# This ensures that c and d do not start executing until after b. # This ensures that c and d do not start executing until after b.
self.assertTrue(graph.has_edge(b, inner_flo)) self.assertTrue(g.has_edge('b', 'test2'))
self.assertTrue(graph.has_edge(inner_flo, c)) self.assertTrue(g.has_edge('test2', 'c'))
self.assertTrue(graph.has_edge(inner_flo, d)) self.assertTrue(g.has_edge('test2', 'd'))
def test_unordered_nested(self): def test_unordered_nested(self):
a, b, c, d = test_utils.make_many(4) a, b, c, d = test_utils.make_many(4)
@ -127,15 +145,19 @@ class PatternCompileTest(test.TestCase):
flo2.add(c, d) flo2.add(c, d)
flo.add(flo2) flo.add(flo2)
compilation = compiler.PatternCompiler(flo).compile() g = _replicate_graph_with_names(
g = compilation.execution_graph compiler.PatternCompiler(flo).compile())
self.assertEqual(6, len(g)) self.assertEqual(8, len(g))
self.assertItemsEqual(g.edges(), [ self.assertItemsEqual(g.edges(), [
(flo, a), ('test', 'a'),
(flo, b), ('test', 'b'),
(flo, flo2), ('test', 'test2'),
(flo2, c), ('test2', 'c'),
(c, d) ('c', 'd'),
('d', 'test2[$]'),
('test2[$]', 'test[$]'),
('a', 'test[$]'),
('b', 'test[$]'),
]) ])
def test_unordered_nested_in_linear(self): def test_unordered_nested_in_linear(self):
@ -143,27 +165,27 @@ class PatternCompileTest(test.TestCase):
inner_flo = uf.Flow('ut').add(b, c) inner_flo = uf.Flow('ut').add(b, c)
flo = lf.Flow('lt').add(a, inner_flo, d) flo = lf.Flow('lt').add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile() g = _replicate_graph_with_names(
g = compilation.execution_graph compiler.PatternCompiler(flo).compile())
self.assertEqual(6, len(g)) self.assertEqual(8, len(g))
self.assertItemsEqual(g.edges(), [ self.assertItemsEqual(g.edges(), [
(flo, a), ('lt', 'a'),
(a, inner_flo), ('a', 'ut'),
(inner_flo, b), ('ut', 'b'),
(inner_flo, c), ('ut', 'c'),
(b, d), ('b', 'ut[$]'),
(c, d), ('c', 'ut[$]'),
('ut[$]', 'd'),
('d', 'lt[$]'),
]) ])
def test_graph(self): def test_graph(self):
a, b, c, d = test_utils.make_many(4) a, b, c, d = test_utils.make_many(4)
flo = gf.Flow("test") flo = gf.Flow("test")
flo.add(a, b, c, d) flo.add(a, b, c, d)
compilation = compiler.PatternCompiler(flo).compile() compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph self.assertEqual(6, len(compilation.execution_graph))
self.assertEqual(5, len(g)) self.assertEqual(8, compilation.execution_graph.number_of_edges())
self.assertEqual(4, g.number_of_edges())
def test_graph_nested(self): def test_graph_nested(self):
a, b, c, d, e, f, g = test_utils.make_many(7) a, b, c, d, e, f, g = test_utils.make_many(7)
@ -174,19 +196,26 @@ class PatternCompileTest(test.TestCase):
flo2.add(e, f, g) flo2.add(e, f, g)
flo.add(flo2) flo.add(flo2)
compilation = compiler.PatternCompiler(flo).compile() g = _replicate_graph_with_names(
graph = compilation.execution_graph compiler.PatternCompiler(flo).compile())
self.assertEqual(9, len(graph)) self.assertEqual(11, len(g))
self.assertItemsEqual(graph.edges(), [ self.assertItemsEqual(g.edges(), [
(flo, a), ('test', 'a'),
(flo, b), ('test', 'b'),
(flo, c), ('test', 'c'),
(flo, d), ('test', 'd'),
(flo, flo2), ('a', 'test[$]'),
('b', 'test[$]'),
('c', 'test[$]'),
('d', 'test[$]'),
(flo2, e), ('test', 'test2'),
(e, f), ('test2', 'e'),
(f, g), ('e', 'f'),
('f', 'g'),
('g', 'test2[$]'),
('test2[$]', 'test[$]'),
]) ])
def test_graph_nested_graph(self): def test_graph_nested_graph(self):
@ -198,19 +227,29 @@ class PatternCompileTest(test.TestCase):
flo2.add(e, f, g) flo2.add(e, f, g)
flo.add(flo2) flo.add(flo2)
compilation = compiler.PatternCompiler(flo).compile() g = _replicate_graph_with_names(
graph = compilation.execution_graph compiler.PatternCompiler(flo).compile())
self.assertEqual(9, len(graph)) self.assertEqual(11, len(g))
self.assertItemsEqual(graph.edges(), [ self.assertItemsEqual(g.edges(), [
(flo, a), ('test', 'a'),
(flo, b), ('test', 'b'),
(flo, c), ('test', 'c'),
(flo, d), ('test', 'd'),
(flo, flo2), ('test', 'test2'),
(flo2, e), ('test2', 'e'),
(flo2, f), ('test2', 'f'),
(flo2, g), ('test2', 'g'),
('e', 'test2[$]'),
('f', 'test2[$]'),
('g', 'test2[$]'),
('test2[$]', 'test[$]'),
('a', 'test[$]'),
('b', 'test[$]'),
('c', 'test[$]'),
('d', 'test[$]'),
]) ])
def test_graph_links(self): def test_graph_links(self):
@ -221,33 +260,34 @@ class PatternCompileTest(test.TestCase):
flo.link(b, c) flo.link(b, c)
flo.link(c, d) flo.link(c, d)
compilation = compiler.PatternCompiler(flo).compile() g = _replicate_graph_with_names(
g = compilation.execution_graph compiler.PatternCompiler(flo).compile())
self.assertEqual(5, len(g)) self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(data=True), [ self.assertItemsEqual(g.edges(data=True), [
(flo, a, {'invariant': True}), ('test', 'a', {'invariant': True}),
('a', 'b', {'manual': True}),
(a, b, {'manual': True}), ('b', 'c', {'manual': True}),
(b, c, {'manual': True}), ('c', 'd', {'manual': True}),
(c, d, {'manual': True}), ('d', 'test[$]', {'invariant': True}),
]) ])
self.assertItemsEqual([flo], g.no_predecessors_iter()) self.assertItemsEqual(['test'], g.no_predecessors_iter())
self.assertItemsEqual([d], g.no_successors_iter()) self.assertItemsEqual(['test[$]'], g.no_successors_iter())
def test_graph_dependencies(self): def test_graph_dependencies(self):
a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[]) a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=['x']) b = test_utils.ProvidesRequiresTask('b', provides=[], requires=['x'])
flo = gf.Flow("test").add(a, b) flo = gf.Flow("test").add(a, b)
compilation = compiler.PatternCompiler(flo).compile() g = _replicate_graph_with_names(
g = compilation.execution_graph compiler.PatternCompiler(flo).compile())
self.assertEqual(3, len(g)) self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [ self.assertItemsEqual(g.edges(data=True), [
(flo, a, {'invariant': True}), ('test', 'a', {'invariant': True}),
(a, b, {'reasons': set(['x'])}) ('a', 'b', {'reasons': set(['x'])}),
('b', 'test[$]', {'invariant': True}),
]) ])
self.assertItemsEqual([flo], g.no_predecessors_iter()) self.assertItemsEqual(['test'], g.no_predecessors_iter())
self.assertItemsEqual([b], g.no_successors_iter()) self.assertItemsEqual(['test[$]'], g.no_successors_iter())
def test_graph_nested_requires(self): def test_graph_nested_requires(self):
a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[]) a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
@ -256,17 +296,19 @@ class PatternCompileTest(test.TestCase):
inner_flo = lf.Flow("test2").add(b, c) inner_flo = lf.Flow("test2").add(b, c)
flo = gf.Flow("test").add(a, inner_flo) flo = gf.Flow("test").add(a, inner_flo)
compilation = compiler.PatternCompiler(flo).compile() g = _replicate_graph_with_names(
graph = compilation.execution_graph compiler.PatternCompiler(flo).compile())
self.assertEqual(5, len(graph)) self.assertEqual(7, len(g))
self.assertItemsEqual(graph.edges(data=True), [ self.assertItemsEqual(g.edges(data=True), [
(flo, a, {'invariant': True}), ('test', 'a', {'invariant': True}),
(inner_flo, b, {'invariant': True}), ('test2', 'b', {'invariant': True}),
(a, inner_flo, {'reasons': set(['x'])}), ('a', 'test2', {'reasons': set(['x'])}),
(b, c, {'invariant': True}), ('b', 'c', {'invariant': True}),
('c', 'test2[$]', {'invariant': True}),
('test2[$]', 'test[$]', {'invariant': True}),
]) ])
self.assertItemsEqual([flo], graph.no_predecessors_iter()) self.assertItemsEqual(['test'], list(g.no_predecessors_iter()))
self.assertItemsEqual([c], graph.no_successors_iter()) self.assertItemsEqual(['test[$]'], list(g.no_successors_iter()))
def test_graph_nested_provides(self): def test_graph_nested_provides(self):
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=['x']) a = test_utils.ProvidesRequiresTask('a', provides=[], requires=['x'])
@ -275,18 +317,22 @@ class PatternCompileTest(test.TestCase):
inner_flo = lf.Flow("test2").add(b, c) inner_flo = lf.Flow("test2").add(b, c)
flo = gf.Flow("test").add(a, inner_flo) flo = gf.Flow("test").add(a, inner_flo)
compilation = compiler.PatternCompiler(flo).compile() g = _replicate_graph_with_names(
graph = compilation.execution_graph compiler.PatternCompiler(flo).compile())
self.assertEqual(5, len(graph)) self.assertEqual(7, len(g))
self.assertItemsEqual(graph.edges(data=True), [ self.assertItemsEqual(g.edges(data=True), [
(flo, inner_flo, {'invariant': True}), ('test', 'test2', {'invariant': True}),
('a', 'test[$]', {'invariant': True}),
(inner_flo, b, {'invariant': True}), # The 'x' requirement is produced out of test2...
(b, c, {'invariant': True}), ('test2[$]', 'a', {'reasons': set(['x'])}),
(c, a, {'reasons': set(['x'])}),
('test2', 'b', {'invariant': True}),
('b', 'c', {'invariant': True}),
('c', 'test2[$]', {'invariant': True}),
]) ])
self.assertItemsEqual([flo], graph.no_predecessors_iter()) self.assertItemsEqual(['test'], g.no_predecessors_iter())
self.assertItemsEqual([a], graph.no_successors_iter()) self.assertItemsEqual(['test[$]'], g.no_successors_iter())
def test_empty_flow_in_linear_flow(self): def test_empty_flow_in_linear_flow(self):
flo = lf.Flow('lf') flo = lf.Flow('lf')
@ -295,12 +341,14 @@ class PatternCompileTest(test.TestCase):
empty_flo = gf.Flow("empty") empty_flo = gf.Flow("empty")
flo.add(a, empty_flo, b) flo.add(a, empty_flo, b)
compilation = compiler.PatternCompiler(flo).compile() g = _replicate_graph_with_names(
graph = compilation.execution_graph compiler.PatternCompiler(flo).compile())
self.assertItemsEqual(graph.edges(), [ self.assertItemsEqual(g.edges(), [
(flo, a), ("lf", "a"),
(a, empty_flo), ("a", "empty"),
(empty_flo, b), ("empty", "empty[$]"),
("empty[$]", "b"),
("b", "lf[$]"),
]) ])
def test_many_empty_in_graph_flow(self): def test_many_empty_in_graph_flow(self):
@ -331,22 +379,24 @@ class PatternCompileTest(test.TestCase):
flo.link(a, d) flo.link(a, d)
flo.link(c, d) flo.link(c, d)
compilation = compiler.PatternCompiler(flo).compile() g = _replicate_graph_with_names(
graph = compilation.execution_graph compiler.PatternCompiler(flo).compile())
self.assertTrue(graph.has_edge(flo, a)) self.assertTrue(g.has_edge('root', 'a'))
self.assertTrue(g.has_edge('root', 'b'))
self.assertTrue(g.has_edge('root', 'c'))
self.assertTrue(graph.has_edge(flo, b)) self.assertTrue(g.has_edge('b.0', 'b.1'))
self.assertTrue(graph.has_edge(b_0, b_1)) self.assertTrue(g.has_edge('b.1[$]', 'b.2'))
self.assertTrue(graph.has_edge(b_1, b_2)) self.assertTrue(g.has_edge('b.2[$]', 'b.3'))
self.assertTrue(graph.has_edge(b_2, b_3))
self.assertTrue(graph.has_edge(flo, c)) self.assertTrue(g.has_edge('c.0[$]', 'c.1'))
self.assertTrue(graph.has_edge(c_0, c_1)) self.assertTrue(g.has_edge('c.1[$]', 'c.2'))
self.assertTrue(graph.has_edge(c_1, c_2))
self.assertTrue(graph.has_edge(b_3, d)) self.assertTrue(g.has_edge('a', 'd'))
self.assertEqual(12, len(graph)) self.assertTrue(g.has_edge('b[$]', 'd'))
self.assertTrue(g.has_edge('c[$]', 'd'))
self.assertEqual(20, len(g))
def test_empty_flow_in_nested_flow(self): def test_empty_flow_in_nested_flow(self):
flow = lf.Flow('lf') flow = lf.Flow('lf')
@ -360,13 +410,13 @@ class PatternCompileTest(test.TestCase):
flow2.add(c, empty_flow, d) flow2.add(c, empty_flow, d)
flow.add(a, flow2, b) flow.add(a, flow2, b)
compilation = compiler.PatternCompiler(flow).compile() g = _replicate_graph_with_names(
g = compilation.execution_graph compiler.PatternCompiler(flow).compile())
for u, v in [('lf', 'a'), ('a', 'lf-2'),
for source, target in [(flow, a), (a, flow2), ('lf-2', 'c'), ('c', 'empty'),
(flow2, c), (c, empty_flow), ('empty[$]', 'd'), ('d', 'lf-2[$]'),
(empty_flow, d), (d, b)]: ('lf-2[$]', 'b'), ('b', 'lf[$]')]:
self.assertTrue(g.has_edge(source, target)) self.assertTrue(g.has_edge(u, v))
def test_empty_flow_in_graph_flow(self): def test_empty_flow_in_graph_flow(self):
flow = lf.Flow('lf') flow = lf.Flow('lf')
@ -379,7 +429,14 @@ class PatternCompileTest(test.TestCase):
g = compilation.execution_graph g = compilation.execution_graph
self.assertTrue(g.has_edge(flow, a)) self.assertTrue(g.has_edge(flow, a))
self.assertTrue(g.has_edge(a, empty_flow)) self.assertTrue(g.has_edge(a, empty_flow))
self.assertTrue(g.has_edge(empty_flow, b))
empty_flow_successors = g.successors(empty_flow)
self.assertEqual(1, len(empty_flow_successors))
empty_flow_terminal = empty_flow_successors[0]
self.assertIs(empty_flow, empty_flow_terminal.flow)
self.assertEqual(compiler.FLOW_END,
g.node[empty_flow_terminal]['kind'])
self.assertTrue(g.has_edge(empty_flow_terminal, b))
def test_empty_flow_in_graph_flow_linkage(self): def test_empty_flow_in_graph_flow_linkage(self):
flow = gf.Flow('lf') flow = gf.Flow('lf')
@ -417,146 +474,154 @@ class PatternCompileTest(test.TestCase):
def test_retry_in_linear_flow(self): def test_retry_in_linear_flow(self):
flo = lf.Flow("test", retry.AlwaysRevert("c")) flo = lf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile() compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph self.assertEqual(3, len(compilation.execution_graph))
self.assertEqual(2, len(g)) self.assertEqual(2, compilation.execution_graph.number_of_edges())
self.assertEqual(1, g.number_of_edges())
def test_retry_in_unordered_flow(self): def test_retry_in_unordered_flow(self):
flo = uf.Flow("test", retry.AlwaysRevert("c")) flo = uf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile() compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph self.assertEqual(3, len(compilation.execution_graph))
self.assertEqual(2, len(g)) self.assertEqual(2, compilation.execution_graph.number_of_edges())
self.assertEqual(1, g.number_of_edges())
def test_retry_in_graph_flow(self): def test_retry_in_graph_flow(self):
flo = gf.Flow("test", retry.AlwaysRevert("c")) flo = gf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile() compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph g = compilation.execution_graph
self.assertEqual(2, len(g)) self.assertEqual(3, len(g))
self.assertEqual(1, g.number_of_edges()) self.assertEqual(2, g.number_of_edges())
def test_retry_in_nested_flows(self): def test_retry_in_nested_flows(self):
c1 = retry.AlwaysRevert("c1") c1 = retry.AlwaysRevert("c1")
c2 = retry.AlwaysRevert("c2") c2 = retry.AlwaysRevert("c2")
inner_flo = lf.Flow("test2", c2) inner_flo = lf.Flow("test2", c2)
flo = lf.Flow("test", c1).add(inner_flo) flo = lf.Flow("test", c1).add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g)) g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(data=True), [ self.assertItemsEqual(g.edges(data=True), [
(flo, c1, {'invariant': True}), ('test', 'c1', {'invariant': True}),
(c1, inner_flo, {'invariant': True, 'retry': True}), ('c1', 'test2', {'invariant': True, 'retry': True}),
(inner_flo, c2, {'invariant': True}), ('test2', 'c2', {'invariant': True}),
('c2', 'test2[$]', {'invariant': True}),
('test2[$]', 'test[$]', {'invariant': True}),
]) ])
self.assertIs(c1, g.node[c2]['retry']) self.assertIs(c1, g.node['c2']['retry'])
self.assertItemsEqual([flo], g.no_predecessors_iter()) self.assertItemsEqual(['test'], list(g.no_predecessors_iter()))
self.assertItemsEqual([c2], g.no_successors_iter()) self.assertItemsEqual(['test[$]'], list(g.no_successors_iter()))
def test_retry_in_linear_flow_with_tasks(self): def test_retry_in_linear_flow_with_tasks(self):
c = retry.AlwaysRevert("c") c = retry.AlwaysRevert("c")
a, b = test_utils.make_many(2) a, b = test_utils.make_many(2)
flo = lf.Flow("test", c).add(a, b) flo = lf.Flow("test", c).add(a, b)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g)) g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [ self.assertItemsEqual(g.edges(data=True), [
(flo, c, {'invariant': True}), ('test', 'c', {'invariant': True}),
(a, b, {'invariant': True}), ('a', 'b', {'invariant': True}),
(c, a, {'invariant': True, 'retry': True}) ('c', 'a', {'invariant': True, 'retry': True}),
('b', 'test[$]', {'invariant': True}),
]) ])
self.assertItemsEqual([flo], g.no_predecessors_iter()) self.assertItemsEqual(['test'], g.no_predecessors_iter())
self.assertItemsEqual([b], g.no_successors_iter()) self.assertItemsEqual(['test[$]'], g.no_successors_iter())
self.assertIs(c, g.node[a]['retry']) self.assertIs(c, g.node['a']['retry'])
self.assertIs(c, g.node[b]['retry']) self.assertIs(c, g.node['b']['retry'])
def test_retry_in_unordered_flow_with_tasks(self): def test_retry_in_unordered_flow_with_tasks(self):
c = retry.AlwaysRevert("c") c = retry.AlwaysRevert("c")
a, b = test_utils.make_many(2) a, b = test_utils.make_many(2)
flo = uf.Flow("test", c).add(a, b) flo = uf.Flow("test", c).add(a, b)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g)) g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [ self.assertItemsEqual(g.edges(data=True), [
(flo, c, {'invariant': True}), ('test', 'c', {'invariant': True}),
(c, a, {'invariant': True, 'retry': True}), ('c', 'a', {'invariant': True, 'retry': True}),
(c, b, {'invariant': True, 'retry': True}), ('c', 'b', {'invariant': True, 'retry': True}),
('b', 'test[$]', {'invariant': True}),
('a', 'test[$]', {'invariant': True}),
]) ])
self.assertItemsEqual([flo], g.no_predecessors_iter()) self.assertItemsEqual(['test'], list(g.no_predecessors_iter()))
self.assertItemsEqual([a, b], g.no_successors_iter()) self.assertItemsEqual(['test[$]'], list(g.no_successors_iter()))
self.assertIs(c, g.node[a]['retry']) self.assertIs(c, g.node['a']['retry'])
self.assertIs(c, g.node[b]['retry']) self.assertIs(c, g.node['b']['retry'])
def test_retry_in_graph_flow_with_tasks(self): def test_retry_in_graph_flow_with_tasks(self):
r = retry.AlwaysRevert("cp") r = retry.AlwaysRevert("r")
a, b, c = test_utils.make_many(3) a, b, c = test_utils.make_many(3)
flo = gf.Flow("test", r).add(a, b, c).link(b, c) flo = gf.Flow("test", r).add(a, b, c).link(b, c)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(5, len(g))
g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertItemsEqual(g.edges(data=True), [ self.assertItemsEqual(g.edges(data=True), [
(flo, r, {'invariant': True}), ('test', 'r', {'invariant': True}),
(r, a, {'invariant': True, 'retry': True}), ('r', 'a', {'invariant': True, 'retry': True}),
(r, b, {'invariant': True, 'retry': True}), ('r', 'b', {'invariant': True, 'retry': True}),
(b, c, {'manual': True}) ('b', 'c', {'manual': True}),
('a', 'test[$]', {'invariant': True}),
('c', 'test[$]', {'invariant': True}),
]) ])
self.assertItemsEqual([flo], g.no_predecessors_iter()) self.assertItemsEqual(['test'], g.no_predecessors_iter())
self.assertItemsEqual([a, c], g.no_successors_iter()) self.assertItemsEqual(['test[$]'], g.no_successors_iter())
self.assertIs(r, g.node[a]['retry']) self.assertIs(r, g.node['a']['retry'])
self.assertIs(r, g.node[b]['retry']) self.assertIs(r, g.node['b']['retry'])
self.assertIs(r, g.node[c]['retry']) self.assertIs(r, g.node['c']['retry'])
def test_retries_hierarchy(self): def test_retries_hierarchy(self):
c1 = retry.AlwaysRevert("cp1") c1 = retry.AlwaysRevert("c1")
c2 = retry.AlwaysRevert("cp2") c2 = retry.AlwaysRevert("c2")
a, b, c, d = test_utils.make_many(4) a, b, c, d = test_utils.make_many(4)
inner_flo = lf.Flow("test", c2).add(b, c) inner_flo = lf.Flow("test2", c2).add(b, c)
flo = lf.Flow("test", c1).add(a, inner_flo, d) flo = lf.Flow("test", c1).add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(8, len(g)) g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(10, len(g))
self.assertItemsEqual(g.edges(data=True), [ self.assertItemsEqual(g.edges(data=True), [
(flo, c1, {'invariant': True}), ('test', 'c1', {'invariant': True}),
(c1, a, {'invariant': True, 'retry': True}), ('c1', 'a', {'invariant': True, 'retry': True}),
(a, inner_flo, {'invariant': True}), ('a', 'test2', {'invariant': True}),
(inner_flo, c2, {'invariant': True}), ('test2', 'c2', {'invariant': True}),
(c2, b, {'invariant': True, 'retry': True}), ('c2', 'b', {'invariant': True, 'retry': True}),
(b, c, {'invariant': True}), ('b', 'c', {'invariant': True}),
(c, d, {'invariant': True}), ('c', 'test2[$]', {'invariant': True}),
('test2[$]', 'd', {'invariant': True}),
('d', 'test[$]', {'invariant': True}),
]) ])
self.assertIs(c1, g.node[a]['retry']) self.assertIs(c1, g.node['a']['retry'])
self.assertIs(c1, g.node[d]['retry']) self.assertIs(c1, g.node['d']['retry'])
self.assertIs(c2, g.node[b]['retry']) self.assertIs(c2, g.node['b']['retry'])
self.assertIs(c2, g.node[c]['retry']) self.assertIs(c2, g.node['c']['retry'])
self.assertIs(c1, g.node[c2]['retry']) self.assertIs(c1, g.node['c2']['retry'])
self.assertIs(None, g.node[c1].get('retry')) self.assertIs(None, g.node['c1'].get('retry'))
def test_retry_subflows_hierarchy(self): def test_retry_subflows_hierarchy(self):
c1 = retry.AlwaysRevert("cp1") c1 = retry.AlwaysRevert("c1")
a, b, c, d = test_utils.make_many(4) a, b, c, d = test_utils.make_many(4)
inner_flo = lf.Flow("test").add(b, c) inner_flo = lf.Flow("test2").add(b, c)
flo = lf.Flow("test", c1).add(a, inner_flo, d) flo = lf.Flow("test", c1).add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(7, len(g)) g = _replicate_graph_with_names(
compiler.PatternCompiler(flo).compile())
self.assertEqual(9, len(g))
self.assertItemsEqual(g.edges(data=True), [ self.assertItemsEqual(g.edges(data=True), [
(flo, c1, {'invariant': True}), ('test', 'c1', {'invariant': True}),
(c1, a, {'invariant': True, 'retry': True}), ('c1', 'a', {'invariant': True, 'retry': True}),
(a, inner_flo, {'invariant': True}), ('a', 'test2', {'invariant': True}),
(inner_flo, b, {'invariant': True}), ('test2', 'b', {'invariant': True}),
(b, c, {'invariant': True}), ('b', 'c', {'invariant': True}),
(c, d, {'invariant': True}), ('c', 'test2[$]', {'invariant': True}),
('test2[$]', 'd', {'invariant': True}),
('d', 'test[$]', {'invariant': True}),
]) ])
self.assertIs(c1, g.node[a]['retry']) self.assertIs(c1, g.node['a']['retry'])
self.assertIs(c1, g.node[d]['retry']) self.assertIs(c1, g.node['d']['retry'])
self.assertIs(c1, g.node[b]['retry']) self.assertIs(c1, g.node['b']['retry'])
self.assertIs(c1, g.node[c]['retry']) self.assertIs(c1, g.node['c']['retry'])
self.assertIs(None, g.node[c1].get('retry')) self.assertIs(None, g.node['c1'].get('retry'))

View File

@ -26,6 +26,16 @@ def _task(name, provides=None, requires=None):
class GraphFlowTest(test.TestCase): class GraphFlowTest(test.TestCase):
def test_invalid_decider_depth(self):
g_1 = utils.ProgressingTask(name='g-1')
g_2 = utils.ProgressingTask(name='g-2')
for not_a_depth in ['not-a-depth', object(), 2, 3.4, False]:
flow = gf.Flow('g')
flow.add(g_1, g_2)
self.assertRaises((ValueError, TypeError),
flow.link, g_1, g_2,
decider=lambda history: False,
decider_depth=not_a_depth)
def test_graph_flow_stringy(self): def test_graph_flow_stringy(self):
f = gf.Flow('test') f = gf.Flow('test')

View File

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import deciders
from taskflow import test
class TestDeciders(test.TestCase):
def test_translate(self):
for val in ['all', 'ALL', 'aLL', deciders.Depth.ALL]:
self.assertEqual(deciders.Depth.ALL,
deciders.Depth.translate(val))
for val in ['atom', 'ATOM', 'atOM', deciders.Depth.ATOM]:
self.assertEqual(deciders.Depth.ATOM,
deciders.Depth.translate(val))
for val in ['neighbors', 'Neighbors',
'NEIGHBORS', deciders.Depth.NEIGHBORS]:
self.assertEqual(deciders.Depth.NEIGHBORS,
deciders.Depth.translate(val))
for val in ['flow', 'FLOW', 'flOW', deciders.Depth.FLOW]:
self.assertEqual(deciders.Depth.FLOW,
deciders.Depth.translate(val))
def test_bad_translate(self):
self.assertRaises(TypeError, deciders.Depth.translate, 3)
self.assertRaises(TypeError, deciders.Depth.translate, object())
self.assertRaises(ValueError, deciders.Depth.translate, "stuff")
def test_pick_widest(self):
choices = [deciders.Depth.ATOM, deciders.Depth.FLOW]
self.assertEqual(deciders.Depth.FLOW, deciders.pick_widest(choices))
choices = [deciders.Depth.ATOM, deciders.Depth.FLOW,
deciders.Depth.ALL]
self.assertEqual(deciders.Depth.ALL, deciders.pick_widest(choices))
choices = [deciders.Depth.ATOM, deciders.Depth.FLOW,
deciders.Depth.ALL, deciders.Depth.NEIGHBORS]
self.assertEqual(deciders.Depth.ALL, deciders.pick_widest(choices))
choices = [deciders.Depth.ATOM, deciders.Depth.NEIGHBORS]
self.assertEqual(deciders.Depth.NEIGHBORS,
deciders.pick_widest(choices))
def test_bad_pick_widest(self):
self.assertRaises(ValueError, deciders.pick_widest, [])
self.assertRaises(ValueError, deciders.pick_widest, ["a"])
self.assertRaises(ValueError, deciders.pick_widest, set(['b']))

View File

@ -669,6 +669,94 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
self.assertNotIn('task2.t REVERTED(None)', capturer.values) self.assertNotIn('task2.t REVERTED(None)', capturer.values)
class EngineDeciderDepthTest(utils.EngineTestBase):
def test_run_graph_flow_decider_various_depths(self):
sub_flow_1 = gf.Flow('g_1')
g_1_1 = utils.ProgressingTask(name='g_1-1')
sub_flow_1.add(g_1_1)
g_1 = utils.ProgressingTask(name='g-1')
g_2 = utils.ProgressingTask(name='g-2')
g_3 = utils.ProgressingTask(name='g-3')
g_4 = utils.ProgressingTask(name='g-4')
for a_depth, ran_how_many in [('all', 1),
('atom', 4),
('flow', 2),
('neighbors', 3)]:
flow = gf.Flow('g')
flow.add(g_1, g_2, sub_flow_1, g_3, g_4)
flow.link(g_1, g_2,
decider=lambda history: False,
decider_depth=a_depth)
flow.link(g_2, sub_flow_1)
flow.link(g_2, g_3)
flow.link(g_3, g_4)
flow.link(g_1, sub_flow_1,
decider=lambda history: True,
decider_depth=a_depth)
e = self._make_engine(flow)
with utils.CaptureListener(e, capture_flow=False) as capturer:
e.run()
ran_tasks = 0
for outcome in capturer.values:
if outcome.endswith("RUNNING"):
ran_tasks += 1
self.assertEqual(ran_how_many, ran_tasks)
def test_run_graph_flow_decider_jump_over_atom(self):
flow = gf.Flow('g')
a = utils.AddOneSameProvidesRequires("a", inject={'value': 0})
b = utils.AddOneSameProvidesRequires("b")
c = utils.AddOneSameProvidesRequires("c")
flow.add(a, b, c, resolve_requires=False)
flow.link(a, b, decider=lambda history: False,
decider_depth='atom')
flow.link(b, c)
e = self._make_engine(flow)
e.run()
self.assertEqual(2, e.storage.get('c'))
self.assertEqual(states.IGNORE, e.storage.get_atom_state('b'))
def test_run_graph_flow_decider_jump_over_bad_atom(self):
flow = gf.Flow('g')
a = utils.NoopTask("a")
b = utils.FailingTask("b")
c = utils.NoopTask("c")
flow.add(a, b, c)
flow.link(a, b, decider=lambda history: False,
decider_depth='atom')
flow.link(b, c)
e = self._make_engine(flow)
e.run()
def test_run_graph_flow_decider_revert(self):
flow = gf.Flow('g')
a = utils.NoopTask("a")
b = utils.NoopTask("b")
c = utils.FailingTask("c")
flow.add(a, b, c)
flow.link(a, b, decider=lambda history: False,
decider_depth='atom')
flow.link(b, c)
e = self._make_engine(flow)
with utils.CaptureListener(e, capture_flow=False) as capturer:
# Wrapped failure here for WBE engine, make this better in
# the future, perhaps via a custom testtools matcher??
self.assertRaises((RuntimeError, exc.WrappedFailure), e.run)
expected = [
'a.t RUNNING',
'a.t SUCCESS(None)',
'b.t IGNORE',
'c.t RUNNING',
'c.t FAILURE(Failure: RuntimeError: Woot!)',
'c.t REVERTING',
'c.t REVERTED(None)',
'a.t REVERTING',
'a.t REVERTED(None)',
]
self.assertEqual(expected, capturer.values)
class EngineGraphFlowTest(utils.EngineTestBase): class EngineGraphFlowTest(utils.EngineTestBase):
def test_run_empty_graph_flow(self): def test_run_empty_graph_flow(self):
@ -1043,9 +1131,10 @@ class EngineGraphConditionalFlowTest(utils.EngineTestBase):
'task4.t IGNORE', 'task4.t IGNORE',
] ]
self.assertEqual(expected, capturer.values) self.assertEqual(expected, capturer.values)
self.assertEqual(1, len(histories)) self.assertEqual(2, len(histories))
self.assertIn('task1', histories[0]) for i in range(0, 2):
self.assertIn('task2', histories[0]) self.assertIn('task1', histories[i])
self.assertIn('task2', histories[i])
def test_graph_flow_conditional(self): def test_graph_flow_conditional(self):
flow = gf.Flow('root') flow = gf.Flow('root')
@ -1249,14 +1338,15 @@ class SerialEngineTest(EngineTaskTest,
EngineResetTests, EngineResetTests,
EngineGraphConditionalFlowTest, EngineGraphConditionalFlowTest,
EngineCheckingTaskTest, EngineCheckingTaskTest,
EngineDeciderDepthTest,
test.TestCase): test.TestCase):
def _make_engine(self, flow, def _make_engine(self, flow,
flow_detail=None, store=None): flow_detail=None, store=None, **kwargs):
return taskflow.engines.load(flow, return taskflow.engines.load(flow,
flow_detail=flow_detail, flow_detail=flow_detail,
engine='serial', engine='serial',
backend=self.backend, backend=self.backend,
store=store) store=store, **kwargs)
def test_correct_load(self): def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns) engine = self._make_engine(utils.TaskNoRequiresNoReturns)
@ -1278,11 +1368,13 @@ class ParallelEngineWithThreadsTest(EngineTaskTest,
EngineMissingDepsTest, EngineMissingDepsTest,
EngineGraphConditionalFlowTest, EngineGraphConditionalFlowTest,
EngineCheckingTaskTest, EngineCheckingTaskTest,
EngineDeciderDepthTest,
test.TestCase): test.TestCase):
_EXECUTOR_WORKERS = 2 _EXECUTOR_WORKERS = 2
def _make_engine(self, flow, def _make_engine(self, flow,
flow_detail=None, executor=None, store=None): flow_detail=None, executor=None, store=None,
**kwargs):
if executor is None: if executor is None:
executor = 'threads' executor = 'threads'
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
@ -1290,7 +1382,8 @@ class ParallelEngineWithThreadsTest(EngineTaskTest,
executor=executor, executor=executor,
engine='parallel', engine='parallel',
store=store, store=store,
max_workers=self._EXECUTOR_WORKERS) max_workers=self._EXECUTOR_WORKERS,
**kwargs)
def test_correct_load(self): def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns) engine = self._make_engine(utils.TaskNoRequiresNoReturns)
@ -1319,17 +1412,19 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
EngineMissingDepsTest, EngineMissingDepsTest,
EngineGraphConditionalFlowTest, EngineGraphConditionalFlowTest,
EngineCheckingTaskTest, EngineCheckingTaskTest,
EngineDeciderDepthTest,
test.TestCase): test.TestCase):
def _make_engine(self, flow, def _make_engine(self, flow,
flow_detail=None, executor=None, store=None): flow_detail=None, executor=None, store=None,
**kwargs):
if executor is None: if executor is None:
executor = futurist.GreenThreadPoolExecutor() executor = futurist.GreenThreadPoolExecutor()
self.addCleanup(executor.shutdown) self.addCleanup(executor.shutdown)
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend, engine='parallel', backend=self.backend, engine='parallel',
executor=executor, executor=executor,
store=store) store=store, **kwargs)
class ParallelEngineWithProcessTest(EngineTaskTest, class ParallelEngineWithProcessTest(EngineTaskTest,
@ -1342,6 +1437,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
EngineResetTests, EngineResetTests,
EngineMissingDepsTest, EngineMissingDepsTest,
EngineGraphConditionalFlowTest, EngineGraphConditionalFlowTest,
EngineDeciderDepthTest,
test.TestCase): test.TestCase):
_EXECUTOR_WORKERS = 2 _EXECUTOR_WORKERS = 2
@ -1350,7 +1446,8 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
self.assertIsInstance(engine, eng.ParallelActionEngine) self.assertIsInstance(engine, eng.ParallelActionEngine)
def _make_engine(self, flow, def _make_engine(self, flow,
flow_detail=None, executor=None, store=None): flow_detail=None, executor=None, store=None,
**kwargs):
if executor is None: if executor is None:
executor = 'processes' executor = 'processes'
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
@ -1358,7 +1455,8 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
engine='parallel', engine='parallel',
executor=executor, executor=executor,
store=store, store=store,
max_workers=self._EXECUTOR_WORKERS) max_workers=self._EXECUTOR_WORKERS,
**kwargs)
class WorkerBasedEngineTest(EngineTaskTest, class WorkerBasedEngineTest(EngineTaskTest,
@ -1371,6 +1469,7 @@ class WorkerBasedEngineTest(EngineTaskTest,
EngineResetTests, EngineResetTests,
EngineMissingDepsTest, EngineMissingDepsTest,
EngineGraphConditionalFlowTest, EngineGraphConditionalFlowTest,
EngineDeciderDepthTest,
test.TestCase): test.TestCase):
def setUp(self): def setUp(self):
super(WorkerBasedEngineTest, self).setUp() super(WorkerBasedEngineTest, self).setUp()
@ -1415,10 +1514,11 @@ class WorkerBasedEngineTest(EngineTaskTest,
super(WorkerBasedEngineTest, self).tearDown() super(WorkerBasedEngineTest, self).tearDown()
def _make_engine(self, flow, def _make_engine(self, flow,
flow_detail=None, store=None): flow_detail=None, store=None, **kwargs):
kwargs.update(self.engine_conf)
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend, backend=self.backend,
store=store, **self.engine_conf) store=store, **kwargs)
def test_correct_load(self): def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns) engine = self._make_engine(utils.TaskNoRequiresNoReturns)

View File

@ -244,24 +244,6 @@ class TestCountdownIter(test.TestCase):
self.assertRaises(ValueError, six.next, it) self.assertRaises(ValueError, six.next, it)
class TestLookFor(test.TestCase):
def test_no_matches(self):
hay = [9, 10, 11]
self.assertEqual([], misc.look_for(hay, [1, 2, 3]))
def test_match_order(self):
hay = [6, 5, 4, 3, 2, 1]
priors = []
for i in range(0, 6):
priors.append(i + 1)
matches = misc.look_for(hay, priors)
self.assertGreater(0, len(matches))
self.assertIsSuperAndSubsequence(hay, matches)
hay = [10, 1, 15, 3, 5, 8, 44]
self.assertEqual([1, 15], misc.look_for(hay, [15, 1]))
self.assertEqual([10, 44], misc.look_for(hay, [44, 10]))
class TestMergeUri(test.TestCase): class TestMergeUri(test.TestCase):
def test_merge(self): def test_merge(self):
url = "http://www.yahoo.com/?a=b&c=d" url = "http://www.yahoo.com/?a=b&c=d"

View File

@ -226,37 +226,6 @@ def parse_uri(uri):
return netutils.urlsplit(uri) return netutils.urlsplit(uri)
def look_for(haystack, needles, extractor=None):
"""Find items in haystack and returns matches found (in haystack order).
Given a list of items (the haystack) and a list of items to look for (the
needles) this will look for the needles in the haystack and returns
the found needles (if any). The ordering of the returned needles is in the
order they are located in the haystack.
Example input and output:
>>> from taskflow.utils import misc
>>> hay = [3, 2, 1]
>>> misc.look_for(hay, [1, 2])
[2, 1]
"""
if not haystack:
return []
if extractor is None:
extractor = lambda v: v
matches = []
for i, v in enumerate(needles):
try:
matches.append((haystack.index(extractor(v)), i))
except ValueError:
pass
if not matches:
return []
else:
return [needles[i] for (_hay_i, i) in sorted(matches)]
def disallow_when_frozen(excp_cls): def disallow_when_frozen(excp_cls):
"""Frozen checking/raising method decorator.""" """Frozen checking/raising method decorator."""