Add atom priority ability

In situations where many atoms can execute at the same
time it is sometimes useful to denote that when this situation
happens that certain atoms should execute/revert before other
atoms (or at least an attempt should be made to do this) instead
of being nearly arbitrary.

This adds a priority class attribute to the atom class (which
can be overridden or changed as needed) which is then used in
the runtime state machine to sort on so that atoms with higher
priority get submitted (and therefore executed/reverted) first.

Closes-Bug: #1507755

Change-Id: I3dcc705959085cba167883c85278e394b5cb1d2b
This commit is contained in:
Joshua Harlow 2015-08-07 14:46:24 -07:00
parent 4677fdb30e
commit 70e58977c9
3 changed files with 59 additions and 1 deletions

View File

@ -194,6 +194,33 @@ class Atom(object):
this atom produces.
"""
priority = 0
"""A numeric priority that instances of this class will have when running,
used when there are multiple *parallel* candidates to execute and/or
revert. During this situation the candidate list will be stably sorted
based on this priority attribute which will result in atoms with higher
priorities executing (or reverting) before atoms with lower
priorities (higher being defined as a number bigger, or greater tha
an atom with a lower priority number). By default all atoms have the same
priority (zero).
For example when the following is combined into a
graph (where each node in the denoted graph is some task)::
a -> b
b -> c
b -> e
b -> f
When ``b`` finishes there will then be three candidates that can run
``(c, e, f)`` and they may run in any order. What this priority does is
sort those three by their priority before submitting them to be
worked on (so that instead of say a random run order they will now be
ran by there sorted order). This is also true when reverting (in that the
sort order of the potential nodes will be used to determine the
submission order).
"""
def __init__(self, name=None, provides=None, inject=None):
self.name = name
self.version = (1, 0)

View File

@ -108,9 +108,14 @@ class MachineBuilder(object):
timeout = WAITING_TIMEOUT
# Cache some local functions/methods...
do_schedule = self._scheduler.schedule
do_complete = self._completer.complete
def do_schedule(next_nodes):
return self._scheduler.schedule(
sorted(next_nodes,
key=lambda node: getattr(node, 'priority', 0),
reverse=True))
def is_runnable():
# Checks if the storage says the flow is still runnable...
return self._storage.get_flow_state() == st.RUNNING

View File

@ -461,6 +461,32 @@ class EngineParallelFlowTest(utils.EngineTestBase):
engine = self._make_engine(flow)
self.assertRaises(exc.Empty, engine.run)
def test_parallel_flow_with_priority(self):
flow = uf.Flow('p-1')
for i in range(0, 10):
t = utils.ProgressingTask(name='task%s' % i)
t.priority = i
flow.add(t)
engine = self._make_engine(flow)
with utils.CaptureListener(engine, capture_flow=False) as capturer:
engine.run()
expected = [
'task9.t RUNNING',
'task8.t RUNNING',
'task7.t RUNNING',
'task6.t RUNNING',
'task5.t RUNNING',
'task4.t RUNNING',
'task3.t RUNNING',
'task2.t RUNNING',
'task1.t RUNNING',
'task0.t RUNNING',
]
# NOTE(harlowja): chop off the gathering of SUCCESS states, since we
# don't care if thats in order...
gotten = capturer.values[0:10]
self.assertEqual(expected, gotten)
def test_parallel_flow_one_task(self):
flow = uf.Flow('p-1').add(
utils.ProgressingTask(name='task1', provides='a')