Merge "Update TaskFlow for networkx 2.x"

This commit is contained in:
Zuul 2019-11-15 04:24:16 +00:00 committed by Gerrit Code Review
commit 0b352712af
17 changed files with 76 additions and 168 deletions

View File

@ -37,7 +37,7 @@ mox3==0.20.0
msgpack-python==0.4.0
netaddr==0.7.18
netifaces==0.10.4
networkx==1.10
networkx==2.1.0
os-client-config==1.28.0
oslo.i18n==3.15.3
oslo.serialization==2.18.0

View File

@ -20,8 +20,8 @@ futurist>=1.2.0 # Apache-2.0
fasteners>=0.7.0 # Apache-2.0
# Very nice graph library
networkx>=1.10,<2.3;python_version<'3.0' # BSD
networkx>=1.10;python_version>='3.4' # BSD
networkx>=2.1.0,<2.3;python_version<'3.0' # BSD
networkx>=2.1.0;python_version>='3.4' # BSD
# For contextlib new additions/compatibility for <= python 3.3
contextlib2>=0.4.0;python_version<'3.0' # PSF License

View File

@ -107,7 +107,7 @@ class Compilation(object):
def _overlap_occurrence_detector(to_graph, from_graph):
"""Returns how many nodes in 'from' graph are in 'to' graph (if any)."""
return iter_utils.count(node for node in from_graph.nodes_iter()
return iter_utils.count(node for node in from_graph.nodes
if node in to_graph)
@ -180,9 +180,9 @@ class FlowCompiler(object):
graph.add_node(flow.retry, kind=RETRY)
_add_update_edges(graph, [flow], [flow.retry],
attr_dict={LINK_INVARIANT: True})
for node in graph.nodes_iter():
for node in graph.nodes:
if node is not flow.retry and node is not flow:
graph.node[node].setdefault(RETRY, flow.retry)
graph.nodes[node].setdefault(RETRY, flow.retry)
from_nodes = [flow.retry]
attr_dict = {LINK_INVARIANT: True, LINK_RETRY: True}
else:

View File

@ -98,8 +98,8 @@ def _affect_atom(atom, runtime):
def _affect_direct_task_neighbors(atom, runtime):
def _walk_neighbors():
execution_graph = runtime.compilation.execution_graph
for node in execution_graph.successors_iter(atom):
node_data = execution_graph.node[node]
for node in execution_graph.successors(atom):
node_data = execution_graph.nodes[node]
if node_data['kind'] == compiler.TASK:
yield node
successors_iter = _walk_neighbors()

View File

@ -349,7 +349,7 @@ class ActionEngine(base.Engine):
seen = set()
dups = set()
execution_graph = compilation.execution_graph
for node, node_attrs in execution_graph.nodes_iter(data=True):
for node, node_attrs in execution_graph.nodes(data=True):
if node_attrs['kind'] in compiler.ATOMS:
atom_name = node.name
if atom_name in seen:

View File

@ -66,13 +66,13 @@ class Runtime(object):
"""Iterates through all nodes, deciders that alter atoms execution."""
# This is basically a reverse breadth first exploration, with
# special logic to further traverse down flow nodes as needed...
predecessors_iter = graph.predecessors_iter
predecessors_iter = graph.predecessors
nodes = collections.deque((u_node, atom)
for u_node in predecessors_iter(atom))
visited = set()
while nodes:
u_node, v_node = nodes.popleft()
u_node_kind = graph.node[u_node]['kind']
u_node_kind = graph.nodes[u_node]['kind']
u_v_data = graph.adj[u_node][v_node]
try:
decider = u_v_data[LINK_DECIDER]
@ -121,7 +121,7 @@ class Runtime(object):
com.RETRY: self.retry_action,
}
graph = self._compilation.execution_graph
for node, node_data in graph.nodes_iter(data=True):
for node, node_data in graph.nodes(data=True):
node_kind = node_data['kind']
if node_kind in com.FLOWS:
continue
@ -265,7 +265,7 @@ class Runtime(object):
def iterate_nodes(self, allowed_kinds):
"""Yields back all nodes of specified kinds in the execution graph."""
graph = self._compilation.execution_graph
for node, node_data in graph.nodes_iter(data=True):
for node, node_data in graph.nodes(data=True):
if node_data['kind'] in allowed_kinds:
yield node
@ -285,7 +285,7 @@ class Runtime(object):
def find_retry(self, node):
"""Returns the retry atom associated to the given node (or none)."""
graph = self._compilation.execution_graph
return graph.node[node].get(com.RETRY)
return graph.nodes[node].get(com.RETRY)
def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE):
"""Resets all the provided atoms to the given state and intention."""

View File

@ -81,7 +81,7 @@ class ScopeWalker(object):
if self._predecessors is None:
predecessors = set(
node for node in graph.bfs_predecessors_iter(self._atom)
if graph.node[node]['kind'] in co.ATOMS)
if graph.nodes[node]['kind'] in co.ATOMS)
self._predecessors = predecessors.copy()
else:
predecessors = self._predecessors.copy()

View File

@ -34,9 +34,9 @@ def _extract_connectors(execution_graph, starting_node, direction,
through_flows=True, through_retries=True,
through_tasks=True):
if direction == Direction.FORWARD:
connected_iter = execution_graph.successors_iter
connected_iter = execution_graph.successors
else:
connected_iter = execution_graph.predecessors_iter
connected_iter = execution_graph.predecessors
connected_to_functors = {}
if through_flows:
connected_to_functors[co.FLOW] = connected_iter
@ -64,7 +64,7 @@ def breadth_first_iterate(execution_graph, starting_node, direction,
q = collections.deque(initial_nodes_iter)
while q:
node = q.popleft()
node_attrs = execution_graph.node[node]
node_attrs = execution_graph.nodes[node]
if not node_attrs.get('noop'):
yield node
try:
@ -92,7 +92,7 @@ def depth_first_iterate(execution_graph, starting_node, direction,
stack = list(initial_nodes_iter)
while stack:
node = stack.pop()
node_attrs = execution_graph.node[node]
node_attrs = execution_graph.nodes[node]
if not node_attrs.get('noop'):
yield node
try:

View File

@ -44,11 +44,11 @@ def _fetch_predecessor_tree(graph, atom):
stack = [(root, atom)]
while stack:
parent, node = stack.pop()
for pred_node in graph.predecessors_iter(node):
pred_node_data = graph.node[pred_node]
for pred_node in graph.predecessors(node):
pred_node_data = graph.nodes[pred_node]
if pred_node_data['kind'] == compiler.FLOW_END:
# Jump over and/or don't show flow end nodes...
for pred_pred_node in graph.predecessors_iter(pred_node):
for pred_pred_node in graph.predecessors(pred_node):
stack.append((parent, pred_pred_node))
else:
child = tree.Node(pred_node, **pred_node_data)

View File

@ -224,7 +224,7 @@ class Flow(flow.Flow):
retry_provides.add(value)
provided[value].append(self._retry)
for node in self._graph.nodes_iter():
for node in self._graph.nodes:
for value in self._unsatisfied_requires(node, self._graph,
retry_provides):
required[value].append(node)
@ -292,12 +292,12 @@ class Flow(flow.Flow):
yield n
def iter_links(self):
return self._get_subgraph().edges_iter(data=True)
return self._get_subgraph().edges(data=True)
def iter_nodes(self):
g = self._get_subgraph()
for n in g.topological_sort():
yield n, g.node[n]
yield n, g.nodes[n]
@property
def requires(self):
@ -307,7 +307,7 @@ class Flow(flow.Flow):
requires.update(self._retry.requires)
retry_provides.update(self._retry.provides)
g = self._get_subgraph()
for node in g.nodes_iter():
for node in g.nodes:
requires.update(self._unsatisfied_requires(node, g,
retry_provides))
return frozenset(requires)
@ -367,6 +367,7 @@ class TargetedFlow(Flow):
return self._graph
nodes = [self._target]
nodes.extend(self._graph.bfs_predecessors_iter(self._target))
self._subgraph = gr.DiGraph(data=self._graph.subgraph(nodes))
self._subgraph = gr.DiGraph(
incoming_graph_data=self._graph.subgraph(nodes))
self._subgraph.freeze()
return self._subgraph

View File

@ -55,7 +55,7 @@ class Flow(flow.Flow):
return len(self._graph)
def __iter__(self):
for item in self._graph.nodes_iter():
for item in self._graph.nodes:
yield item
@property
@ -71,9 +71,9 @@ class Flow(flow.Flow):
return frozenset(requires)
def iter_nodes(self):
for (n, n_data) in self._graph.nodes_iter(data=True):
for (n, n_data) in self._graph.nodes(data=True):
yield (n, n_data)
def iter_links(self):
for (u, v, e_data) in self._graph.edges_iter(data=True):
for (u, v, e_data) in self._graph.edges(data=True):
yield (u, v, e_data)

View File

@ -44,11 +44,11 @@ class Flow(flow.Flow):
yield item
def iter_links(self):
for (u, v, e_data) in self._graph.edges_iter(data=True):
for (u, v, e_data) in self._graph.edges(data=True):
yield (u, v, e_data)
def iter_nodes(self):
for n, n_data in self._graph.nodes_iter(data=True):
for n, n_data in self._graph.nodes(data=True):
yield (n, n_data)
@property

View File

@ -37,7 +37,7 @@ class BuildersTest(test.TestCase):
compilation = compiler.PatternCompiler(flow).compile()
flow_detail = pu.create_flow_detail(flow)
store = storage.Storage(flow_detail)
nodes_iter = compilation.execution_graph.nodes_iter(data=True)
nodes_iter = compilation.execution_graph.nodes(data=True)
for node, node_attrs in nodes_iter:
if node_attrs['kind'] in ('task', 'retry'):
store.ensure_atom(node)

View File

@ -33,9 +33,9 @@ def _replicate_graph_with_names(compilation):
# original/source flow).
g = compilation.execution_graph
n_g = g.__class__(name=g.name)
for node, node_data in g.nodes_iter(data=True):
for node, node_data in g.nodes(data=True):
n_g.add_node(node.name, attr_dict=node_data)
for u, v, u_v_data in g.edges_iter(data=True):
for u, v, u_v_data in g.edges(data=True):
n_g.add_edge(u.name, v.name, attr_dict=u_v_data)
return n_g
@ -435,7 +435,7 @@ class PatternCompileTest(test.TestCase):
empty_flow_terminal = empty_flow_successors[0]
self.assertIs(empty_flow, empty_flow_terminal.flow)
self.assertEqual(compiler.FLOW_END,
g.node[empty_flow_terminal]['kind'])
g.nodes[empty_flow_terminal]['kind'])
self.assertTrue(g.has_edge(empty_flow_terminal, b))
def test_empty_flow_in_graph_flow_linkage(self):
@ -506,7 +506,7 @@ class PatternCompileTest(test.TestCase):
('c2', 'test2[$]', {'invariant': True}),
('test2[$]', 'test[$]', {'invariant': True}),
])
self.assertIs(c1, g.node['c2']['retry'])
self.assertIs(c1, g.nodes['c2']['retry'])
self.assertItemsEqual(['test'], list(g.no_predecessors_iter()))
self.assertItemsEqual(['test[$]'], list(g.no_successors_iter()))
@ -527,8 +527,8 @@ class PatternCompileTest(test.TestCase):
self.assertItemsEqual(['test'], g.no_predecessors_iter())
self.assertItemsEqual(['test[$]'], g.no_successors_iter())
self.assertIs(c, g.node['a']['retry'])
self.assertIs(c, g.node['b']['retry'])
self.assertIs(c, g.nodes['a']['retry'])
self.assertIs(c, g.nodes['b']['retry'])
def test_retry_in_unordered_flow_with_tasks(self):
c = retry.AlwaysRevert("c")
@ -548,8 +548,8 @@ class PatternCompileTest(test.TestCase):
self.assertItemsEqual(['test'], list(g.no_predecessors_iter()))
self.assertItemsEqual(['test[$]'], list(g.no_successors_iter()))
self.assertIs(c, g.node['a']['retry'])
self.assertIs(c, g.node['b']['retry'])
self.assertIs(c, g.nodes['a']['retry'])
self.assertIs(c, g.nodes['b']['retry'])
def test_retry_in_graph_flow_with_tasks(self):
r = retry.AlwaysRevert("r")
@ -569,9 +569,9 @@ class PatternCompileTest(test.TestCase):
self.assertItemsEqual(['test'], g.no_predecessors_iter())
self.assertItemsEqual(['test[$]'], g.no_successors_iter())
self.assertIs(r, g.node['a']['retry'])
self.assertIs(r, g.node['b']['retry'])
self.assertIs(r, g.node['c']['retry'])
self.assertIs(r, g.nodes['a']['retry'])
self.assertIs(r, g.nodes['b']['retry'])
self.assertIs(r, g.nodes['c']['retry'])
def test_retries_hierarchy(self):
c1 = retry.AlwaysRevert("c1")
@ -594,12 +594,12 @@ class PatternCompileTest(test.TestCase):
('test2[$]', 'd', {'invariant': True}),
('d', 'test[$]', {'invariant': True}),
])
self.assertIs(c1, g.node['a']['retry'])
self.assertIs(c1, g.node['d']['retry'])
self.assertIs(c2, g.node['b']['retry'])
self.assertIs(c2, g.node['c']['retry'])
self.assertIs(c1, g.node['c2']['retry'])
self.assertIsNone(g.node['c1'].get('retry'))
self.assertIs(c1, g.nodes['a']['retry'])
self.assertIs(c1, g.nodes['d']['retry'])
self.assertIs(c2, g.nodes['b']['retry'])
self.assertIs(c2, g.nodes['c']['retry'])
self.assertIs(c1, g.nodes['c2']['retry'])
self.assertIsNone(g.nodes['c1'].get('retry'))
def test_retry_subflows_hierarchy(self):
c1 = retry.AlwaysRevert("c1")
@ -620,8 +620,8 @@ class PatternCompileTest(test.TestCase):
('test2[$]', 'd', {'invariant': True}),
('d', 'test[$]', {'invariant': True}),
])
self.assertIs(c1, g.node['a']['retry'])
self.assertIs(c1, g.node['d']['retry'])
self.assertIs(c1, g.node['b']['retry'])
self.assertIs(c1, g.node['c']['retry'])
self.assertIsNone(g.node['c1'].get('retry'))
self.assertIs(c1, g.nodes['a']['retry'])
self.assertIs(c1, g.nodes['d']['retry'])
self.assertIs(c1, g.nodes['b']['retry'])
self.assertIs(c1, g.nodes['c']['retry'])
self.assertIsNone(g.nodes['c1'].get('retry'))

View File

@ -152,8 +152,7 @@ b %(edge)s c;
graph.merge_graphs, g, g2)
def occurrence_detector(to_graph, from_graph):
return sum(1 for node in from_graph.nodes_iter()
if node in to_graph)
return sum(1 for node in from_graph.nodes if node in to_graph)
self.assertRaises(ValueError,
graph.merge_graphs, g, g2,
@ -588,8 +587,8 @@ CEO
dead_chicken = tree.Node("chicken.1", alive=False)
root.add(dead_chicken)
g = root.to_digraph()
self.assertEqual(g.node['chickens'], {'alive': True})
self.assertEqual(g.node['chicken.1'], {'alive': False})
self.assertEqual(g.nodes['chickens'], {'alive': True})
self.assertEqual(g.nodes['chicken.1'], {'alive': False})
class OrderedSetTest(test.TestCase):

View File

@ -21,8 +21,6 @@ import networkx as nx
from networkx.drawing import nx_pydot
import six
from taskflow.utils import misc
def _common_format(g, edge_notation):
lines = []
@ -31,13 +29,13 @@ def _common_format(g, edge_notation):
lines.append("Frozen: %s" % nx.is_frozen(g))
lines.append("Density: %0.3f" % nx.density(g))
lines.append("Nodes: %s" % g.number_of_nodes())
for n, n_data in g.nodes_iter(data=True):
for n, n_data in g.nodes(data=True):
if n_data:
lines.append(" - %s (%s)" % (n, n_data))
else:
lines.append(" - %s" % n)
lines.append("Edges: %s" % g.number_of_edges())
for (u, v, e_data) in g.edges_iter(data=True):
for (u, v, e_data) in g.edges(data=True):
if e_data:
lines.append(" %s %s %s (%s)" % (u, edge_notation, v, e_data))
else:
@ -48,11 +46,9 @@ def _common_format(g, edge_notation):
class Graph(nx.Graph):
"""A graph subclass with useful utility functions."""
def __init__(self, data=None, name=''):
if misc.nx_version() == '1':
super(Graph, self).__init__(name=name, data=data)
else:
super(Graph, self).__init__(name=name, incoming_graph_data=data)
def __init__(self, incoming_graph_data=None, name=''):
super(Graph, self).__init__(incoming_graph_data=incoming_graph_data,
name=name)
self.frozen = False
def freeze(self):
@ -69,45 +65,14 @@ class Graph(nx.Graph):
"""Pretty formats your graph into a string."""
return os.linesep.join(_common_format(self, "<->"))
def nodes_iter(self, data=False):
"""Returns an iterable object over the nodes.
Type of iterable returned object depends on which version
of networkx is used. When networkx < 2.0 is used , method
returns an iterator, but if networkx > 2.0 is used, it returns
NodeView of the Graph which is also iterable.
"""
if misc.nx_version() == '1':
return super(Graph, self).nodes_iter(data=data)
return super(Graph, self).nodes(data=data)
def edges_iter(self, nbunch=None, data=False, default=None):
"""Returns an iterable object over the edges.
Type of iterable returned object depends on which version
of networkx is used. When networkx < 2.0 is used , method
returns an iterator, but if networkx > 2.0 is used, it returns
EdgeView of the Graph which is also iterable.
"""
if misc.nx_version() == '1':
return super(Graph, self).edges_iter(nbunch=nbunch, data=data,
default=default)
return super(Graph, self).edges(nbunch=nbunch, data=data,
default=default)
def add_edge(self, u, v, attr_dict=None, **attr):
"""Add an edge between u and v."""
if misc.nx_version() == '1':
return super(Graph, self).add_edge(u, v, attr_dict=attr_dict,
**attr)
if attr_dict is not None:
return super(Graph, self).add_edge(u, v, **attr_dict)
return super(Graph, self).add_edge(u, v, **attr)
def add_node(self, n, attr_dict=None, **attr):
"""Add a single node n and update node attributes."""
if misc.nx_version() == '1':
return super(Graph, self).add_node(n, attr_dict=attr_dict, **attr)
if attr_dict is not None:
return super(Graph, self).add_node(n, **attr_dict)
return super(Graph, self).add_node(n, **attr)
@ -125,11 +90,9 @@ class Graph(nx.Graph):
class DiGraph(nx.DiGraph):
"""A directed graph subclass with useful utility functions."""
def __init__(self, data=None, name=''):
if misc.nx_version() == '1':
super(DiGraph, self).__init__(name=name, data=data)
else:
super(DiGraph, self).__init__(name=name, incoming_graph_data=data)
def __init__(self, incoming_graph_data=None, name=''):
super(DiGraph, self).__init__(incoming_graph_data=incoming_graph_data,
name=name)
self.frozen = False
def freeze(self):
@ -183,13 +146,13 @@ class DiGraph(nx.DiGraph):
def no_successors_iter(self):
"""Returns an iterator for all nodes with no successors."""
for n in self.nodes_iter():
for n in self.nodes:
if not len(list(self.successors(n))):
yield n
def no_predecessors_iter(self):
"""Returns an iterator for all nodes with no predecessors."""
for n in self.nodes_iter():
for n in self.nodes:
if not len(list(self.predecessors(n))):
yield n
@ -203,72 +166,28 @@ class DiGraph(nx.DiGraph):
over more than once (this prevents infinite iteration).
"""
visited = set([n])
queue = collections.deque(self.predecessors_iter(n))
queue = collections.deque(self.predecessors(n))
while queue:
pred = queue.popleft()
if pred not in visited:
yield pred
visited.add(pred)
for pred_pred in self.predecessors_iter(pred):
for pred_pred in self.predecessors(pred):
if pred_pred not in visited:
queue.append(pred_pred)
def add_edge(self, u, v, attr_dict=None, **attr):
"""Add an edge between u and v."""
if misc.nx_version() == '1':
return super(DiGraph, self).add_edge(u, v, attr_dict=attr_dict,
**attr)
if attr_dict is not None:
return super(DiGraph, self).add_edge(u, v, **attr_dict)
return super(DiGraph, self).add_edge(u, v, **attr)
def add_node(self, n, attr_dict=None, **attr):
"""Add a single node n and update node attributes."""
if misc.nx_version() == '1':
return super(DiGraph, self).add_node(n, attr_dict=attr_dict,
**attr)
if attr_dict is not None:
return super(DiGraph, self).add_node(n, **attr_dict)
return super(DiGraph, self).add_node(n, **attr)
def successors_iter(self, n):
"""Returns an iterator over successor nodes of n."""
if misc.nx_version() == '1':
return super(DiGraph, self).successors_iter(n)
return super(DiGraph, self).successors(n)
def predecessors_iter(self, n):
"""Return an iterator over predecessor nodes of n."""
if misc.nx_version() == '1':
return super(DiGraph, self).predecessors_iter(n)
return super(DiGraph, self).predecessors(n)
def nodes_iter(self, data=False):
"""Returns an iterable object over the nodes.
Type of iterable returned object depends on which version
of networkx is used. When networkx < 2.0 is used , method
returns an iterator, but if networkx > 2.0 is used, it returns
NodeView of the Graph which is also iterable.
"""
if misc.nx_version() == '1':
return super(DiGraph, self).nodes_iter(data=data)
return super(DiGraph, self).nodes(data=data)
def edges_iter(self, nbunch=None, data=False, default=None):
"""Returns an iterable object over the edges.
Type of iterable returned object depends on which version
of networkx is used. When networkx < 2.0 is used , method
returns an iterator, but if networkx > 2.0 is used, it returns
EdgeView of the Graph which is also iterable.
"""
if misc.nx_version() == '1':
return super(DiGraph, self).edges_iter(nbunch=nbunch, data=data,
default=default)
return super(DiGraph, self).edges(nbunch=nbunch, data=data,
default=default)
def fresh_copy(self):
"""Return a fresh copy graph with the same data structure.
@ -287,11 +206,8 @@ class OrderedDiGraph(DiGraph):
order).
"""
node_dict_factory = collections.OrderedDict
if misc.nx_version() == '1':
adjlist_dict_factory = collections.OrderedDict
else:
adjlist_outer_dict_factory = collections.OrderedDict
adjlist_inner_dict_factory = collections.OrderedDict
adjlist_outer_dict_factory = collections.OrderedDict
adjlist_inner_dict_factory = collections.OrderedDict
edge_attr_dict_factory = collections.OrderedDict
def fresh_copy(self):
@ -312,11 +228,8 @@ class OrderedGraph(Graph):
order).
"""
node_dict_factory = collections.OrderedDict
if misc.nx_version() == '1':
adjlist_dict_factory = collections.OrderedDict
else:
adjlist_outer_dict_factory = collections.OrderedDict
adjlist_inner_dict_factory = collections.OrderedDict
adjlist_outer_dict_factory = collections.OrderedDict
adjlist_inner_dict_factory = collections.OrderedDict
edge_attr_dict_factory = collections.OrderedDict
def fresh_copy(self):
@ -342,7 +255,7 @@ def merge_graphs(graph, *graphs, **kwargs):
raise ValueError("Overlap detection callback expected to be callable")
elif overlap_detector is None:
overlap_detector = (lambda to_graph, from_graph:
len(to_graph.subgraph(from_graph.nodes_iter())))
len(to_graph.subgraph(from_graph.nodes)))
for g in graphs:
# This should ensure that the nodes to be merged do not already exist
# in the graph that is to be merged into. This could be problematic if

View File

@ -27,7 +27,6 @@ import threading
import types
import enum
import networkx as nx
from oslo_serialization import jsonutils
from oslo_serialization import msgpackutils
from oslo_utils import encodeutils
@ -540,7 +539,3 @@ def safe_copy_dict(obj):
return {}
# default to a shallow copy to avoid most ownership issues
return dict(obj)
def nx_version():
return nx.__version__.split('.')[0]