Added integration test for cluster scaling

*cluster scaling via node addition to existing node group was checked;
*cluster scaling via new node group addition was checked.

Change-Id: I155e831b5411deb0e7f572a00230dff482d39d22
This commit is contained in:
Yaroslav Lobankov 2013-07-22 20:37:11 +04:00
parent 8379793851
commit d9a1265d68
4 changed files with 263 additions and 3 deletions

View File

@ -117,6 +117,12 @@ class ITestCase(unittest2.TestCase):
data = json.loads(post.content)
return data
def put_object(self, url, object_id, body, code):
data = self._put(url + object_id, json.dumps(body))
self.assertEquals(data.status_code, code)
data = json.loads(data.content)
return data
def get_object(self, url, obj_id, code, printing=False):
rv = self._get(url + obj_id, printing)
self.assertEquals(rv.status_code, code)
@ -479,7 +485,7 @@ class ITestCase(unittest2.TestCase):
}
def await_active_workers_for_namenode(self, data):
attempts_count = 10
attempts_count = 20
while True:
active_tasktrackers_count = int(
@ -501,7 +507,7 @@ class ITestCase(unittest2.TestCase):
if attempts_count == 0:
self.fail('tasktracker or datanode cannot be started '
'within 30 sec.')
'within 1 minute.')
time.sleep(3)

View File

@ -14,6 +14,7 @@ NODE_USERNAME = 'username' # username for master node
CLUSTER_NAME_CRUD = 'cluster-name-crud' # cluster name for crud operations
CLUSTER_NAME_HADOOP = 'cluster-name-hadoop' # cluster name for hadoop testing
CLUSTER_NAME_SWIFT = 'cluster-name-swift' # cluster name for swift testing
CLUSTER_NAME_SCALING = 'cluster-name-scaling' # cluster name for scaling testing
TIMEOUT = 15 # cluster creation timeout (in minutes)
@ -26,9 +27,18 @@ PATH_TO_SSH = '/home/user/.ssh/id_rsa' # path to folder where put id_rsa key
PLUGIN_NAME = 'vanilla'
NAMENODE_CONFIG = {'Name Node Heap Size': 1234}
JOBTRACKER_CONFIG = {'Job Tracker Heap Size': 1345}
DATANODE_CONFIG = {'Data Node Heap Size': 1456}
TASKTRACKER_CONFIG = {'Task Tracker Heap Size': 1567}
GENERAL_CONFIG = {'Enable Swift': True}
CLUSTER_HDFS_CONFIG = {'dfs.replication': 2}
CLUSTER_MAPREDUCE_CONFIG = {'mapred.map.tasks.speculative.execution': False,
'mapred.child.java.opts': '-Xmx100m'}
JT_PORT = 000001 # port for process 'jobtracker'
NN_PORT = 000002 # port for process 'namenode'
TT_PORT = 000003 # port for process 'tasktracker'
DN_PORT = 000004 # port for process 'datanode'
ENABLE_SWIFT_TESTS = True
ENABLE_SWIFT_TESTS = True # enable swift test

View File

@ -37,6 +37,7 @@ NODE_USERNAME = _get_conf('NODE_USERNAME', 'username')
CLUSTER_NAME_CRUD = _get_conf('CLUSTER_NAME_CRUD', 'cluster-crud')
CLUSTER_NAME_HADOOP = _get_conf('CLUSTER_NAME_HADOOP', 'cluster-hadoop')
CLUSTER_NAME_SWIFT = _get_conf('CLUSTER_NAME_SWIFT', 'cluster-swift')
CLUSTER_NAME_SCALING = _get_conf('CLUSTER_NAME_SCALING', 'cluster-scaling')
TIMEOUT = _get_conf('TIMEOUT', 15)

View File

@ -0,0 +1,243 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import telnetlib
import time
from savanna.tests.integration import base
import savanna.tests.integration.configs.parameters as param
class ClusterScalingTest(base.ITestCase):
def setUp(self):
super(ClusterScalingTest, self).setUp()
telnetlib.Telnet(self.host, self.port)
def create_cluster_for_scaling(self, node_processes):
cluster_body = self.make_cl_body_node_processes(node_processes)
cluster_body['name'] = param.CLUSTER_NAME_SCALING
cluster_id = self.create_cluster_and_get_id(cluster_body)
return cluster_id
def implement_scaling(self, cluster_id, scaling_body):
self.put_object(self.url_cluster_with_slash, cluster_id,
scaling_body, 202)
self.await_cluster_active(cluster_id)
def implement_scaling_new_node_group_addition(self, cluster_id,
scaling_map1,
scaling_map2='',
multi_scaling=False):
if multi_scaling:
self.implement_scaling(cluster_id, {
'add_node_groups': [
{
'node_group_template_id': scaling_map1['ngt_id'],
'count': scaling_map1['node_count'],
'name': scaling_map1['ng_name']
}, {
'node_group_template_id': scaling_map1['ngt_id'],
'count': scaling_map2['node_count'],
'name': scaling_map2['ng_name']
}
]
})
else:
self.implement_scaling(cluster_id, {
'add_node_groups': [
{
'node_group_template_id': scaling_map1['ngt_id'],
'count': scaling_map1['node_count'],
'name': scaling_map1['ng_name']
}
]
})
def implement_scaling_addition_to_existing_node_group(self, cluster_id,
scaling_map1,
scaling_map2='',
multi_scaling=False):
if multi_scaling:
self.implement_scaling(cluster_id, {
'resize_node_groups': [
{
'name': scaling_map1['ng_name'],
'count': scaling_map1['node_count']
}, {
'name': scaling_map2['ng_name'],
'count': scaling_map2['node_count']
}
]
})
else:
self.implement_scaling(cluster_id, {
'resize_node_groups': [
{
'name': scaling_map1['ng_name'],
'count': scaling_map1['node_count']
}
]
})
def check_cluster_worker_nodes(self, cluster_id):
ip_instances = self.get_instances_ip_and_node_processes_list(
cluster_id)
time.sleep(10)
try:
worker_map = self.get_namenode_ip_and_tt_dn_count(ip_instances)
self.await_active_workers_for_namenode(worker_map)
return worker_map
except Exception as e:
self.fail(str(e))
def compare_worker_node_count_after_scaling(self,
worker_map,
worker_type,
worker_node_count):
self.assertEquals(
worker_map[worker_type], worker_node_count,
'%s != %s after cluster scaling!' % (worker_type, worker_type))
def check_cluster_worker_nodes_after_scaling(self,
cluster_id,
worker_type,
scaling_worker_node_count):
worker_map = self.check_cluster_worker_nodes(cluster_id)
self.compare_worker_node_count_after_scaling(
worker_map, worker_type, scaling_worker_node_count)
def test_scaling_addition_to_existing_ng(self):
ng_name_for_tt = 'tt'
tt_count = 1
ng_name_for_dn = 'dn'
dn_count = 1
dn_replication_factor = 3
cluster_id = self.create_cluster_for_scaling(
{'JT': 1, 'NN': 1, 'TT': tt_count, 'DN': dn_count})
try:
self.implement_scaling_addition_to_existing_node_group(
cluster_id, {
'ng_name': ng_name_for_tt,
'node_count': tt_count + 1
})
self.check_cluster_worker_nodes_after_scaling(
cluster_id, 'tasktracker_count', tt_count + 1)
self.implement_scaling_addition_to_existing_node_group(
cluster_id, {
'ng_name': ng_name_for_dn,
'node_count': dn_count + dn_replication_factor
})
self.check_cluster_worker_nodes_after_scaling(
cluster_id, 'datanode_count', dn_count + dn_replication_factor)
multi_scaling = True
self.implement_scaling_addition_to_existing_node_group(
cluster_id, {
'ng_name': ng_name_for_tt,
'node_count': 0
},
{
'ng_name': ng_name_for_dn,
'node_count': dn_replication_factor
},
multi_scaling)
self.check_cluster_worker_nodes_after_scaling(
cluster_id, 'tasktracker_count', 0)
self.check_cluster_worker_nodes_after_scaling(
cluster_id, 'datanode_count', dn_replication_factor)
except Exception as e:
self.fail(str(e))
finally:
self.del_object(self.url_cluster_with_slash, cluster_id, 204)
def test_scaling_new_node_group_addition(self):
ng_name_for_tt = 'ng-tt'
added_tt_count = 2
ng_name_for_dn = 'ng-dn'
added_dn_count = 2
dn_replication_factor = 3
cluster_id = self.create_cluster_for_scaling(
{'JT+NN': 1, 'TT+DN': 1})
self.create_node_group_templates()
try:
self.implement_scaling_new_node_group_addition(
cluster_id, {
'ngt_id': self.id_tt,
'node_count': added_tt_count,
'ng_name': ng_name_for_tt,
})
self.check_cluster_worker_nodes_after_scaling(
cluster_id, 'tasktracker_count', added_tt_count + 1)
self.implement_scaling_new_node_group_addition(
cluster_id, {
'ngt_id': self.id_dn,
'node_count': added_dn_count + dn_replication_factor,
'ng_name': ng_name_for_dn,
})
self.check_cluster_worker_nodes_after_scaling(
cluster_id, 'datanode_count',
1 + added_dn_count + dn_replication_factor)
multi_scaling = True
self.implement_scaling_addition_to_existing_node_group(
cluster_id, {
'ngt_id': self.id_tt,
'node_count': 0,
'ng_name': ng_name_for_tt,
}, {
'ngt_id': self.id_dn,
'node_count': dn_replication_factor,
'ng_name': ng_name_for_dn,
}, multi_scaling)
self.check_cluster_worker_nodes_after_scaling(
cluster_id, 'tasktracker_count', 1)
self.check_cluster_worker_nodes_after_scaling(
cluster_id, 'datanode_count', 1 + dn_replication_factor)
except Exception as e:
self.fail(str(e))
finally:
self.del_object(self.url_cluster_with_slash, cluster_id, 204)
self.delete_node_group_templates()