Modify use case example

* Yaml config: actions, transports, tasks
 * Find only one node and subgraph connected
   with graph root and this node

Change-Id: I29935f66152869e169bbe1f0fba7b52cf228b439
This commit is contained in:
Nikolay Mahotkin 2013-11-20 15:13:42 +04:00
parent a399e31f8d
commit a32cb61b9d
5 changed files with 116 additions and 84 deletions

View File

@ -0,0 +1,3 @@
{
"executeTask": "format_volumes"
}

View File

@ -1,66 +0,0 @@
{
"config": {
"tasks": [
{
"name": "Create Environment",
"provides": ["env"]
},
{
"name": "Build Image",
"provides": ["image"],
"requires": ["env"]
},
{
"name": "CreateVM1",
"provides": ["vm1"],
"requires": ["image"]
},
{
"name": "CreateVM2",
"provides": ["vm2"],
"requires": ["image"]
},
{
"name": "Install Server",
"provides": ["server"],
"requires": ["vm2"]
},
{
"name": "Install Agent",
"provides": ["agent"],
"requires": ["vm1"]
},
{
"name": "Install Dependencies for Agent",
"provides": ["agentDeps"],
"requires": ["vm1"]
},
{
"name": "Install MySQL",
"provides": ["sql"],
"requires": ["vm2"]
},
{
"name": "Configure Agent",
"provides": ["configuredAgent"],
"requires": ["agent"]
},
{
"name": "Start Server",
"provides": ["serverStart"],
"requires": ["server", "sql"]
},
{
"name": "Start Agent",
"provides": ["agentStart"],
"requires": ["agentDeps", "configuredAgent"]
},
{
"name": "Show Agent's log",
"provides": ["log"],
"requires": ["serverStart", "agentStart"]
}
]
},
"flowName": "Service"
}

View File

@ -14,7 +14,10 @@
# limitations under the License.
import json
import networkx as nx
import yaml
from time import sleep
from yaml import composer
from taskflow import task
from taskflow import engines
@ -23,38 +26,87 @@ from taskflow.patterns import graph_flow
class BaseServiceTask(task.Task):
def __init__(self, provides=None, requires=None,
execute=None, name=None):
execute=None, name=None, config=None):
super(BaseServiceTask, self).__init__(provides=provides,
requires=requires,
name=name)
self.func = execute
self.config = config
def revert(self, *args, **kwargs):
print ("Task '%s' is REVERTING" % self.name)
#TODO (nmakhotkin) here should be a really action
def do_action(self, *args, **kwargs):
pass
def execute(self, *args, **kwargs):
print (self.name)
sleep(2)
return self.name
return self.do_action(args, kwargs)
def get_task(task_config):
return BaseServiceTask(
provides=task_config.get("provides", []),
requires=task_config.get("requires", []),
name=task_config["name"]
class ServiceTask(BaseServiceTask):
def do_action(self, *args, **kwargs):
action = self.config["actions"][self.name]
transport_name = action.get("transport", None)
print("Action executing: " + self.name + ",")
print("Doing " + str(action))
if transport_name:
transport = self.config["transports"][transport_name]
print("transport: " + str(transport))
print("")
sleep(0.5)
def get_task(task_name, task_data, config):
return ServiceTask(
provides=task_name,
requires=task_data.get("requires", []),
name=task_name,
config=config
)
def load_flow(config_path):
config = json.loads(open(config_path).read())
tasks = config["config"]["tasks"]
flow = graph_flow.Flow(config["flowName"])
for task in tasks:
flow.add(get_task(task))
def get_stream(file_name):
return open(file_name).read()
def load_flow(cfg_stream):
try:
config = yaml.load(cfg_stream)
except composer.ComposerError:
config = json.loads(cfg_stream)
except ValueError:
raise RuntimeError("Config could not be parsed.")
tasks = config["tasks"]
name = tasks.items()[-1][0]
flow = graph_flow.Flow(name)
for name, data in tasks.items():
flow.add(get_task(name, data, config))
return flow
def get_by_name(graph, name):
for node in graph:
if node.name == name:
return node
return None
def get_root(graph):
for node in graph:
if len(graph.predecessors(node)) == 0:
return node
if __name__ == "__main__":
flow = load_flow("use_case_example.json")
engines.run(flow, engine_conf="parallel")
flow = load_flow(get_stream("concepts/use_case_example.yaml"))
graph = nx.DiGraph(flow._graph.copy())
ex_cfg = json.load(open("concepts/execute_config.json"))
all_paths = nx.all_simple_paths(graph,
get_root(graph),
get_by_name(graph,
ex_cfg["executeTask"]))
nodes_set = set([node for path in all_paths for node in path])
sub_graph = graph.subgraph(nodes_set)
our_flow = graph_flow.Flow(name=flow.name)
our_flow._swap(sub_graph)
engines.run(our_flow, engine_conf="parallel")

View File

@ -0,0 +1,42 @@
actions:
create_vm:
transport: my_amqp
attach_volumes:
transport: my_amqp
format_volumes:
transport: my_amqp
transports:
my_amqp:
type: amqp
host: my_host
port: 5672
exchange: amqp_direct
routing-key: my_queue
user: guest
password: guest
content-type: application/json
tasks:
create_vm:
action: create_vm
args:
- (my_image_id, my_flavor_id)
attach_volumes:
requires: [create_vm]
action: attach_volume
args:
- (ctx.vm_id, 10GB, /mnt/my_vol1)
- (ctx.vm_id, 7GB, /mnt/my_vol2)
- (ctx.vm_id, 15GB, /mnt/my_vol3)
format_volumes:
requires: [attach_volumes]
action: format_volume
args:
- (ctx.vm_id, /mnt/my_vol1)
- (ctx.vm_id, /mnt/my_vol2)
- (ctx.vm_id, /mnt/my_vol3)

View File

@ -1,2 +1,3 @@
pbr>=0.5.21,<1.0
taskflow
pyyaml