Merge "Fix for taskflow reduce error when monitoring cluster of size one"

This commit is contained in:
Jenkins 2015-11-03 20:47:43 +00:00 committed by Gerrit Code Review
commit ad2460d470
2 changed files with 70 additions and 1 deletions

View File

@ -50,10 +50,14 @@ def check_cluster_status(cluster_id, node_ids):
node_id))
flow.add(sub_flow)
node_status_list = ["%s%d" % ("node_status_", i)
for i in range(len(node_ids))]
# this is used as second arg of lambda in case of cluster size one
node_status_list.append('node_status_0')
get_cluster_status = os_common.Reduce(
lambda a, b: a if (a == 'OK') else b,
provides='cluster_status',
requires=["%s%d" % ("node_status_", i) for i in range(len(node_ids))],
requires=node_status_list,
)
flow.add(get_cluster_status)

View File

@ -213,6 +213,71 @@ class CheckClusterStatusTests(base.FunctionalTestCase):
self.assertEqual(models.Status.DOWN, node.status,
"Invalid status for node %d" % i)
def test_check_cluster_status_size_one(self):
flow_store_create = {
"image": self.valid_image.id,
"flavor": self.valid_flavor.id,
"port": self.port,
"context": self.context.to_dict(),
"erlang_cookie": str(uuid.uuid4()),
"default_rabbit_user": 'rabbit',
"default_rabbit_pass": str(uuid.uuid4()),
}
flow_store_check = {
"context": self.context.to_dict(),
"default_rabbit_user": "user",
"default_rabbit_pass": "pass"
}
cluster_values = {
"project_id": self.context.tenant_id,
"name": "RabbitCluster",
"network_id": str(uuid.uuid4()),
"flavor": "1",
"size": 1,
}
new_cluster = objects.Cluster(**cluster_values)
new_cluster.create(self.context)
nodes = objects.Node.get_nodes_by_cluster_id(self.context,
new_cluster.id)
node_ids = []
for node in nodes:
node_ids.append(str(node.id))
flow_create = create_cluster(new_cluster.id,
node_ids,
self.valid_network['id'],
self.management_network['id'])
engines.run(flow_create, store=flow_store_create)
cluster_before = objects.Cluster.get_cluster_by_id(self.context,
new_cluster.id)
self.assertEqual(models.Status.ACTIVE, cluster_before.status,
"Invalid status for cluster")
# mock cluster status returned by each node in cluster
urllib2_fixture.Urllib2ResultDetails.set_urllib2_result(
['{"status": "ok"}',
'[{"name": "/"}]',
]
)
flow_check_status = check_cluster_status(str(new_cluster.id), node_ids)
result = engines.run(flow_check_status, store=flow_store_check)
cluster_after = objects.Cluster.get_cluster_by_id(self.context,
new_cluster.id)
self.assertEqual(models.Status.ACTIVE, cluster_after.status,
"Invalid status for cluster")
nodes_after = objects.Node.get_nodes_by_cluster_id(self.context,
new_cluster.id)
self.assertEqual(models.Status.ACTIVE,
result["node_values_0"]["status"])
self.assertEqual(models.Status.ACTIVE, nodes_after[0].status,
"Invalid status for node")
def tearDown(self):
for vm_id in self.new_vm_list:
self.nova_client.servers.delete(vm_id)