Merge "Fix for taskflow reduce error when monitoring cluster of size one"
This commit is contained in:
commit
ad2460d470
|
@ -50,10 +50,14 @@ def check_cluster_status(cluster_id, node_ids):
|
|||
node_id))
|
||||
flow.add(sub_flow)
|
||||
|
||||
node_status_list = ["%s%d" % ("node_status_", i)
|
||||
for i in range(len(node_ids))]
|
||||
# this is used as second arg of lambda in case of cluster size one
|
||||
node_status_list.append('node_status_0')
|
||||
get_cluster_status = os_common.Reduce(
|
||||
lambda a, b: a if (a == 'OK') else b,
|
||||
provides='cluster_status',
|
||||
requires=["%s%d" % ("node_status_", i) for i in range(len(node_ids))],
|
||||
requires=node_status_list,
|
||||
)
|
||||
flow.add(get_cluster_status)
|
||||
|
||||
|
|
|
@ -213,6 +213,71 @@ class CheckClusterStatusTests(base.FunctionalTestCase):
|
|||
self.assertEqual(models.Status.DOWN, node.status,
|
||||
"Invalid status for node %d" % i)
|
||||
|
||||
def test_check_cluster_status_size_one(self):
|
||||
flow_store_create = {
|
||||
"image": self.valid_image.id,
|
||||
"flavor": self.valid_flavor.id,
|
||||
"port": self.port,
|
||||
"context": self.context.to_dict(),
|
||||
"erlang_cookie": str(uuid.uuid4()),
|
||||
"default_rabbit_user": 'rabbit',
|
||||
"default_rabbit_pass": str(uuid.uuid4()),
|
||||
}
|
||||
flow_store_check = {
|
||||
"context": self.context.to_dict(),
|
||||
"default_rabbit_user": "user",
|
||||
"default_rabbit_pass": "pass"
|
||||
}
|
||||
|
||||
cluster_values = {
|
||||
"project_id": self.context.tenant_id,
|
||||
"name": "RabbitCluster",
|
||||
"network_id": str(uuid.uuid4()),
|
||||
"flavor": "1",
|
||||
"size": 1,
|
||||
}
|
||||
|
||||
new_cluster = objects.Cluster(**cluster_values)
|
||||
new_cluster.create(self.context)
|
||||
|
||||
nodes = objects.Node.get_nodes_by_cluster_id(self.context,
|
||||
new_cluster.id)
|
||||
node_ids = []
|
||||
for node in nodes:
|
||||
node_ids.append(str(node.id))
|
||||
|
||||
flow_create = create_cluster(new_cluster.id,
|
||||
node_ids,
|
||||
self.valid_network['id'],
|
||||
self.management_network['id'])
|
||||
engines.run(flow_create, store=flow_store_create)
|
||||
|
||||
cluster_before = objects.Cluster.get_cluster_by_id(self.context,
|
||||
new_cluster.id)
|
||||
self.assertEqual(models.Status.ACTIVE, cluster_before.status,
|
||||
"Invalid status for cluster")
|
||||
|
||||
# mock cluster status returned by each node in cluster
|
||||
urllib2_fixture.Urllib2ResultDetails.set_urllib2_result(
|
||||
['{"status": "ok"}',
|
||||
'[{"name": "/"}]',
|
||||
]
|
||||
)
|
||||
flow_check_status = check_cluster_status(str(new_cluster.id), node_ids)
|
||||
result = engines.run(flow_check_status, store=flow_store_check)
|
||||
|
||||
cluster_after = objects.Cluster.get_cluster_by_id(self.context,
|
||||
new_cluster.id)
|
||||
self.assertEqual(models.Status.ACTIVE, cluster_after.status,
|
||||
"Invalid status for cluster")
|
||||
|
||||
nodes_after = objects.Node.get_nodes_by_cluster_id(self.context,
|
||||
new_cluster.id)
|
||||
self.assertEqual(models.Status.ACTIVE,
|
||||
result["node_values_0"]["status"])
|
||||
self.assertEqual(models.Status.ACTIVE, nodes_after[0].status,
|
||||
"Invalid status for node")
|
||||
|
||||
def tearDown(self):
|
||||
for vm_id in self.new_vm_list:
|
||||
self.nova_client.servers.delete(vm_id)
|
||||
|
|
Loading…
Reference in New Issue