Integration test refactoring

* Test for cluster configs was changed;
* Helper script for cluster config testing was changed;
* Test for Map Reduce was changed;
* Helper script for Map Reduce testing was changed;
* Test for Swift was changed;
* Added helper script for Swift testing;
* Test for scaling was changed;
* File base.py was changed;
* test-requirements.txt was changed;
* tox.ini was changed;
* Import of configs was changed: migration to Oslo config.

Implements blueprint itests-refactoring

Change-Id: I157e8aaf8a6542a79dd336650c978d096d40000b
This commit is contained in:
Yaroslav Lobankov 2013-08-27 21:14:08 +04:00
parent dfc58ef1e4
commit 233f21d82e
20 changed files with 2535 additions and 1 deletions

View File

@ -0,0 +1,28 @@
Integration tests for Savanna project
=====================================
How to run
----------
Create config file for integration tests: `/savanna/tests/integration/configs/itest.conf`.
You can take a look at sample config files - `/savanna/tests/integration/configs/itest.conf.sample`,
`/savanna/tests/integration/configs/itest.conf.sample-full`.
All values used in `/savanna/tests/integration/configs/config.py` file are
defaults, so, if they are applicable for your environment then you can skip
config file creation.
To run all integration tests you should use the corresponding tox env: `tox -e integration`.
In this case all tests will be launched except disabled tests.
Tests may be disabled in `/savanna/tests/integration/configs/config.py` file
or created config file `/savanna/tests/integration/configs/itest.conf`.
If you want to run integration tests for one plugin or a few plugins you should use
the corresponding tox env: `tox -e integration -- -a tags=<plugin_name>` or
`tox -e integration -- -a tags=<plugin_name_1>,<plugin_name_2>`.
For example: `tox -e integration -- -a tags=vanilla` or `tox -e integration -- -a tags=vanilla,hdp`
Contents
--------
TBD

View File

@ -0,0 +1,251 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import sys
from oslo.config import cfg
def singleton(cls):
instances = {}
def getinstance():
if cls not in instances:
instances[cls] = cls()
return instances[cls]
return getinstance
COMMON_CONFIG_GROUP = cfg.OptGroup(name='COMMON')
COMMON_CONFIG_OPTS = [
cfg.StrOpt('OS_USERNAME',
default='admin', help='Username for OpenStack'),
cfg.StrOpt('OS_PASSWORD',
default='admin', help='Password for OpenStack'),
cfg.StrOpt('OS_TENANT_NAME',
default='admin', help='Tenant name for OpenStack'),
cfg.StrOpt('OS_AUTH_URL',
default='http://127.0.0.1:35357/v2.0/',
help='URL for OpenStack'),
cfg.StrOpt('SAVANNA_HOST',
default='127.0.0.1',
help='Host for Savanna'),
cfg.IntOpt('SAVANNA_PORT',
default=8386,
help='Port for Savanna'),
cfg.StrOpt('SAVANNA_API_VERSION',
default='v1.1',
help='Api version for Savanna'),
cfg.IntOpt('FLAVOR_ID',
default=2,
help='OpenStack flavor ID for image'),
cfg.IntOpt('CLUSTER_CREATION_TIMEOUT',
default=30,
help='Cluster creation timeout (in minutes); '
'minimal value is 1'),
cfg.IntOpt('TELNET_TIMEOUT',
default=3,
help='Timeout for node process deployment on cluster '
'nodes (in minutes); minimal value is 1'),
cfg.IntOpt('HDFS_INITIALIZATION_TIMEOUT',
default=3,
help='Timeout for HDFS initialization (in minutes); '
'minimal value is 1'),
cfg.StrOpt('CLUSTER_NAME',
default='test-cluster', help='Name for cluster'),
cfg.StrOpt('USER_KEYPAIR_ID',
default='jenkins',
help='OpenStack key pair id your SSH public key which '
'Savanna transfers to cluster nodes for access of users '
'to virtual machines via SSH, using this key'),
cfg.StrOpt('PATH_TO_SSH_KEY',
default='/home/ubuntu/.ssh/id_rsa',
help='Path to folder where is located id_rsa key which is used '
'for remote command execution; if you specified wrong '
'path to key then you will have the error "Private key '
'file is encrypted"; please, make sure you specified '
'right path to key')
]
VANILLA_CONFIG_GROUP = cfg.OptGroup(name='VANILLA')
VANILLA_CONFIG_OPTS = [
cfg.StrOpt('PLUGIN_NAME',
default='vanilla',
help='Name of plugin'),
cfg.StrOpt('IMAGE_ID',
default='e9691262-e286-46f7-aea5-9f40461b5eea',
help='ID for image which is used for cluster creation'),
cfg.StrOpt('NODE_USERNAME',
default='ubuntu',
help='Username which is used for connecting to cluster nodes '
'via SSH'),
cfg.StrOpt('HADOOP_VERSION',
default='1.2.1', help='Version of Hadoop'),
cfg.StrOpt('HADOOP_USER',
default='hadoop',
help='Username which is used for access to Hadoop services'),
cfg.StrOpt('HADOOP_DIRECTORY',
default='/usr/share/hadoop',
help='Directory where are located Hadoop jar files'),
cfg.StrOpt('HADOOP_LOG_DIRECTORY',
default='/mnt/log/hadoop/hadoop/userlogs',
help='Directory where is located log info about '
'completed jobs'),
cfg.DictOpt('HADOOP_PROCESSES_WITH_PORTS',
default={'jobtracker': 50030,
'namenode': 50070,
'tasktracker': 50060,
'datanode': 50075,
'secondarynamenode': 50090},
help='Hadoop process map with ports for Vanilla plugin'),
cfg.DictOpt('PROCESS_NAMES',
default={'nn': 'namenode',
'tt': 'tasktracker',
'dn': 'datanode'},
help='Names for namenode, tasktracker and datanode processes'),
cfg.BoolOpt('SKIP_ALL_TESTS_FOR_PLUGIN',
default=False,
help='If this variable is True then tests for Vanilla plugin '
'will be skipped'),
cfg.BoolOpt('SKIP_CLUSTER_CONFIG_TEST', default=False),
cfg.BoolOpt('SKIP_MAP_REDUCE_TEST', default=False),
cfg.BoolOpt('SKIP_SWIFT_TEST', default=False),
cfg.BoolOpt('SKIP_SCALING_TEST', default=False)
]
HDP_CONFIG_GROUP = cfg.OptGroup(name='HDP')
HDP_CONFIG_OPTS = [
cfg.StrOpt('PLUGIN_NAME',
default='hdp', help='Name of plugin'),
cfg.StrOpt('IMAGE_ID',
default='cd63f719-006e-4541-a523-1fed7b91fa8c',
help='ID for image which is used for cluster creation'),
cfg.StrOpt('NODE_USERNAME',
default='cloud-user',
help='Username which is used for connecting to cluster nodes '
'via SSH'),
cfg.StrOpt('HADOOP_VERSION',
default='1.3.0', help='Version of Hadoop'),
cfg.StrOpt('HADOOP_USER',
default='hdfs',
help='Username which is used for access to Hadoop services'),
cfg.StrOpt('HADOOP_DIRECTORY',
default='/usr/lib/hadoop',
help='Directory where are located Hadoop jar files'),
cfg.StrOpt('HADOOP_LOG_DIRECTORY',
default='/hadoop/mapred/userlogs',
help='Directory where is located log info about '
'completed jobs'),
cfg.DictOpt('HADOOP_PROCESSES_WITH_PORTS',
default={
'JOBTRACKER': 50030,
'NAMENODE': 50070,
'TASKTRACKER': 50060,
'DATANODE': 50075,
'SECONDARY_NAMENODE': 50090
},
help='Hadoop process map with ports for HDP plugin'
),
cfg.DictOpt('PROCESS_NAMES',
default={'nn': 'NAMENODE',
'tt': 'TASKTRACKER',
'dn': 'DATANODE'},
help='Names for namenode, tasktracker and datanode processes'),
cfg.BoolOpt('SKIP_ALL_TESTS_FOR_PLUGIN',
default=True,
help='If this variable is True then tests for HDP plugin '
'will be skipped'),
cfg.BoolOpt('SKIP_MAP_REDUCE_TEST', default=False),
cfg.BoolOpt('SKIP_SWIFT_TEST', default=False),
cfg.BoolOpt('SKIP_SCALING_TEST', default=False)
]
def register_config(config, config_group, config_opts):
config.register_group(config_group)
config.register_opts(config_opts, config_group)
@singleton
class ITConfig:
def __init__(self):
config = 'itest.conf'
config_files = []
if not os.path.exists('%s/integration_new/configs/'
'%s' % (os.getcwd(), config)):
message = '*************************************\n' \
'Config file "%s" not found *\n' \
'*************************************' % config
print(RuntimeError(message), file=sys.stderr)
else:
config = os.path.join(
'%s/integration_new/configs/%s' % (os.getcwd(), config))
config_files.append(config)
register_config(cfg.CONF, COMMON_CONFIG_GROUP, COMMON_CONFIG_OPTS)
register_config(cfg.CONF, VANILLA_CONFIG_GROUP, VANILLA_CONFIG_OPTS)
register_config(cfg.CONF, HDP_CONFIG_GROUP, HDP_CONFIG_OPTS)
cfg.CONF([], project='integration_tests',
default_config_files=config_files)
self.COMMON = cfg.CONF.COMMON
self.VANILLA = cfg.CONF.VANILLA
self.HDP = cfg.CONF.HDP

View File

@ -0,0 +1,32 @@
[COMMON]
# Username for OpenStack
OS_USERNAME = 'admin'
# Password for OpenStack
OS_PASSWORD = 'admin'
# Tenant name for OpenStack
OS_TENANT_NAME = 'admin'
# URL for OpenStack
OS_AUTH_URL = 'http://127.0.0.1:35357/v2.0/'
# Host for Savanna API
SAVANNA_HOST = '127.0.0.1'
# OpenStack key pair id your SSH public key which Savanna transfers
# to cluster nodes for access of users to virtual machines via SSH,
# using this key'
USER_KEYPAIR_ID = 'jsmit'
# Path to folder where is located id_rsa key which is used for
# remote command execution
PATH_TO_SSH_KEY = '/home/user/.ssh/id_rsa'
[VANILLA]
# ID for image which is used for cluster creation
IMAGE_ID = '123456-qwerty'
[HDP]
# ID for image which is used for cluster creation
IMAGE_ID = '123456-qwerty'

View File

@ -0,0 +1,101 @@
[COMMON]
# Username for OpenStack
OS_USERNAME = 'admin'
# Password for OpenStack
OS_PASSWORD = 'admin'
# Tenant name for OpenStack
OS_TENANT_NAME = 'admin'
# URL for OpenStack
OS_AUTH_URL = 'http://127.0.0.1:35357/v2.0/'
# Host for Savanna API
SAVANNA_HOST = '127.0.0.1'
# Port for Savanna API
SAVANNA_PORT = '8386'
# Api version for Savanna
SAVANNA_API_VERSION = 'v1.1'
# Flavor ID for image
FLAVOR_ID = '2'
# Cluster creation timeout (in minutes); minimal value is 1
CLUSTER_CREATION_TIMEOUT = 30
# Timeout for node process deployment on cluster nodes (in minutes);
# minimal value is 1
TELNET_TIMEOUT = 5
# Timeout for HDFS initialization (in minutes); minimal value is 1
HDFS_INITIALIZATION_TIMEOUT = 5
# Name for cluster
CLUSTER_NAME = 'test-cluster'
# OpenStack key pair id your SSH public key which Savanna transfers
# to cluster nodes for access of users to virtual machines via SSH,
# using this key'
USER_KEYPAIR_ID = 'jsmit'
# Path to folder where is located id_rsa key which is used for
# remote command execution
PATH_TO_SSH_KEY = '/home/ubuntu/.ssh/id_rsa'
[VANILLA]
# Name of plugin
PLUGIN_NAME = 'vanilla'
# ID for image which is used for cluster creation
IMAGE_ID = '123456-qwerty'
# Username which is used for connecting to cluster nodes via SSH
NODE_USERNAME = 'ubuntu'
# Version of Hadoop
HADOOP_VERSION = '1.2.1'
# Username which is used for access to Hadoop services
HADOOP_USER = 'hadoop'
# Directory where are located Hadoop jar files
HADOOP_DIRECTORY = '/usr/share/hadoop'
# Directory where is located log info about completed jobs
HADOOP_LOG_DIRECTORY = '/mnt/log/hadoop/hadoop/userlogs'
HADOOP_PROCESSES_WITH_PORTS = jobtracker: 50030, namenode: 50070, tasktracker: 50060, datanode: 50075, secondarynamenode: 50090
PROCESS_NAMES = nn: namenode, tt: tasktracker, dn: datanode
SKIP_ALL_TESTS_FOR_PLUGIN = False
SKIP_CLUSTER_CONFIG_TEST = False
SKIP_MAP_REDUCE_TEST = False
SKIP_SWIFT_TEST = False
SKIP_SCALING_TEST = False
[HDP]
# Name of plugin
PLUGIN_NAME = 'hdp'
# ID for image which is used for cluster creation
IMAGE_ID = '123456-qwerty'
# Username which is used for connecting to cluster nodes via SSH
NODE_USERNAME = 'cloud-user'
# Version of Hadoop
HADOOP_VERSION = '1.3.0'
# Username which is used for access to Hadoop services
HADOOP_USER = 'hdfs'
# Directory where are located Hadoop jar files
HADOOP_DIRECTORY = '/usr/lib/hadoop'
# Directory where is located log info about completed jobs
HADOOP_LOG_DIRECTORY = '/hadoop/mapred/userlogs'
HADOOP_PROCESSES_WITH_PORTS = JOBTRACKER: 50030, NAMENODE: 50070, TASKTRACKER: 50060, DATANODE: 50075, SECONDARY_NAMENODE: 50090
PROCESS_NAMES = nn: NAMENODE,tt: TASKTRACKER,dn: DATANODE
SKIP_ALL_TESTS_FOR_PLUGIN = False
SKIP_MAP_REDUCE_TEST = False
SKIP_SWIFT_TEST = False
SKIP_SCALING_TEST = False

View File

@ -0,0 +1,481 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
import telnetlib
import time
import savannaclient.api.client as savanna_client
import unittest2
from savanna.openstack.common import excutils
from savanna.tests.integration_new.configs.config import ITConfig as cfg
from savanna.utils import remote
def skip_test(config_name, message=''):
def handle(func):
def call(self, *args, **kwargs):
if getattr(self, config_name):
print('======================================================')
print(message)
print('======================================================')
else:
return func(self, *args, **kwargs)
return call
return handle
class ITestCase(unittest2.TestCase):
def setUp(self):
self.COMMON = cfg().COMMON
self.VANILLA = cfg().VANILLA
self.HDP = cfg().HDP
telnetlib.Telnet(self.COMMON.SAVANNA_HOST,
self.COMMON.SAVANNA_PORT)
self.savanna = savanna_client.Client(
username=self.COMMON.OS_USERNAME,
api_key=self.COMMON.OS_PASSWORD,
project_name=self.COMMON.OS_TENANT_NAME,
auth_url=self.COMMON.OS_AUTH_URL,
savanna_url='http://%s:%s/%s' % (
self.COMMON.SAVANNA_HOST,
self.COMMON.SAVANNA_PORT,
self.COMMON.SAVANNA_API_VERSION))
#-------------------------Methods for object creation--------------------------
def create_node_group_template(self, name, plugin, description,
volumes_per_node, volume_size,
node_processes, node_configs,
hadoop_version=None):
if not hadoop_version:
hadoop_version = plugin.HADOOP_VERSION
data = self.savanna.node_group_templates.create(
name, plugin.PLUGIN_NAME, hadoop_version,
self.COMMON.FLAVOR_ID, description, volumes_per_node,
volume_size, node_processes, node_configs)
node_group_template_id = data.id
return node_group_template_id
def create_cluster_template(self, name, plugin, description,
cluster_configs, node_groups, anti_affinity,
hadoop_version=None):
if not hadoop_version:
hadoop_version = plugin.HADOOP_VERSION
data = self.savanna.cluster_templates.create(
name, plugin.PLUGIN_NAME, hadoop_version, description,
cluster_configs, node_groups, anti_affinity)
cluster_template_id = data.id
return cluster_template_id
def create_cluster_and_get_info(self, plugin, cluster_template_id,
description, cluster_configs, node_groups,
anti_affinity, hadoop_version=None,
image_id=None):
if not hadoop_version:
hadoop_version = plugin.HADOOP_VERSION
if not image_id:
image_id = plugin.IMAGE_ID
self.cluster_id = None
data = self.savanna.clusters.create(
self.COMMON.CLUSTER_NAME, plugin.PLUGIN_NAME, hadoop_version,
cluster_template_id, image_id, description, cluster_configs,
node_groups, self.COMMON.USER_KEYPAIR_ID, anti_affinity
)
self.cluster_id = data.id
self.poll_cluster_state(self.cluster_id)
node_ip_list_with_node_processes = \
self.get_cluster_node_ip_list_with_node_processes(self.cluster_id)
try:
node_info = self.get_node_info(node_ip_list_with_node_processes,
plugin)
except Exception as e:
with excutils.save_and_reraise_exception():
self.savanna.clusters.delete(self.cluster_id)
print(
'Failure during check of node process deployment '
'on cluster node: ' + str(e)
)
try:
self.await_active_workers_for_namenode(node_info, plugin)
except Exception as e:
with excutils.save_and_reraise_exception():
self.savanna.clusters.delete(self.cluster_id)
print(
'Failure while active worker waiting for namenode: '
+ str(e)
)
# For example: method "create_cluster_and_get_info" return
# {
# 'node_info': {
# 'tasktracker_count': 3,
# 'node_count': 6,
# 'namenode_ip': '172.18.168.242',
# 'datanode_count': 3
# },
# 'cluster_id': 'bee5c6a1-411a-4e88-95fc-d1fbdff2bb9d',
# 'node_ip_list': {
# '172.18.168.153': ['tasktracker', 'datanode'],
# '172.18.168.208': ['secondarynamenode'],
# '172.18.168.93': ['tasktracker'],
# '172.18.168.101': ['tasktracker', 'datanode'],
# '172.18.168.242': ['namenode', 'jobtracker'],
# '172.18.168.167': ['datanode']
# },
# 'plugin_name': 'vanilla'
# }
return {
'cluster_id': self.cluster_id,
'node_ip_list': node_ip_list_with_node_processes,
'node_info': node_info,
'plugin': plugin
}
#---------Helper methods for cluster info obtaining and its processing---------
def poll_cluster_state(self, cluster_id):
data = self.savanna.clusters.get(cluster_id)
timeout = self.COMMON.CLUSTER_CREATION_TIMEOUT * 60
while str(data.status) != 'Active':
print('CLUSTER STATUS: ' + str(data.status))
if str(data.status) == 'Error':
print('\n' + str(data) + '\n')
self.fail('Cluster state == \'Error\'.')
if timeout <= 0:
print('\n' + str(data) + '\n')
self.fail(
'Cluster did not return to \'Active\' state '
'within %d minutes.' % self.COMMON.CLUSTER_CREATION_TIMEOUT
)
data = self.savanna.clusters.get(cluster_id)
time.sleep(10)
timeout -= 10
return str(data.status)
def get_cluster_node_ip_list_with_node_processes(self, cluster_id):
data = self.savanna.clusters.get(cluster_id)
node_groups = data.node_groups
node_ip_list_with_node_processes = {}
for node_group in node_groups:
instances = node_group['instances']
for instance in instances:
node_ip = instance['management_ip']
node_ip_list_with_node_processes[node_ip] = node_group[
'node_processes']
# For example:
# node_ip_list_with_node_processes = {
# '172.18.168.181': ['tasktracker'],
# '172.18.168.94': ['secondarynamenode'],
# '172.18.168.208': ['namenode', 'jobtracker'],
# '172.18.168.93': ['tasktracker', 'datanode'],
# '172.18.168.44': ['tasktracker', 'datanode'],
# '172.18.168.233': ['datanode']
# }
return node_ip_list_with_node_processes
def try_telnet(self, host, port):
try:
telnetlib.Telnet(host, port)
except Exception as e:
with excutils.save_and_reraise_exception():
print(
'Telnet has failed: ' + str(e) +
' NODE IP: %s, PORT: %s. Passed %s minute(s).'
% (host, port, self.COMMON.TELNET_TIMEOUT)
)
def get_node_info(self, node_ip_list_with_node_processes, plugin):
tasktracker_count = 0
datanode_count = 0
node_count = 0
for node_ip, processes in node_ip_list_with_node_processes.items():
self.try_telnet(node_ip, '22')
node_count += 1
for process in processes:
if process in plugin.HADOOP_PROCESSES_WITH_PORTS:
for i in range(self.COMMON.TELNET_TIMEOUT * 60):
try:
time.sleep(1)
telnetlib.Telnet(
node_ip,
plugin.HADOOP_PROCESSES_WITH_PORTS[process]
)
break
except socket.error:
print(
'Connection attempt. NODE PROCESS: %s, '
'PORT: %s.'
% (process,
plugin.HADOOP_PROCESSES_WITH_PORTS[process])
)
else:
self.try_telnet(
node_ip,
plugin.HADOOP_PROCESSES_WITH_PORTS[process]
)
if plugin.PROCESS_NAMES['tt'] in processes:
tasktracker_count += 1
if plugin.PROCESS_NAMES['dn'] in processes:
datanode_count += 1
if plugin.PROCESS_NAMES['nn'] in processes:
namenode_ip = node_ip
return {
'namenode_ip': namenode_ip,
'tasktracker_count': tasktracker_count,
'datanode_count': datanode_count,
'node_count': node_count
}
def await_active_workers_for_namenode(self, node_info, plugin):
self.open_ssh_connection(node_info['namenode_ip'],
plugin.NODE_USERNAME)
for i in range(self.COMMON.HDFS_INITIALIZATION_TIMEOUT * 6):
time.sleep(10)
active_tasktracker_count = self.execute_command(
'sudo su -c "hadoop job -list-active-trackers" %s'
% plugin.HADOOP_USER)[1]
active_datanode_count = int(
self.execute_command(
'sudo su -c "hadoop dfsadmin -report" %s \
| grep "Datanodes available:.*" | awk \'{print $3}\''
% plugin.HADOOP_USER)[1]
)
if not active_tasktracker_count:
active_tasktracker_count = 0
else:
active_tasktracker_count = len(
active_tasktracker_count[:-1].split('\n')
)
if (
active_tasktracker_count == node_info['tasktracker_count']
) and (
active_datanode_count == node_info['datanode_count']
):
break
else:
self.fail(
'Tasktracker or datanode cannot be started within '
'%s minute(s) for namenode.'
% self.COMMON.HDFS_INITIALIZATION_TIMEOUT
)
self.close_ssh_connection()
#---------------------------------Remote---------------------------------------
def open_ssh_connection(self, host, node_username):
remote._connect(
host, node_username, open(self.COMMON.PATH_TO_SSH_KEY).read()
)
def execute_command(self, cmd):
return remote._execute_command(cmd, get_stderr=True)
def write_file_to(self, remote_file, data):
remote._write_file_to(remote_file, data)
def read_file_from(self, remote_file):
return remote._read_file_from(remote_file)
def close_ssh_connection(self):
remote._cleanup()
def transfer_helper_script_to_node(self, script_name, parameter_list):
script = open('integration_new/tests/helper_scripts/%s'
% script_name).read()
if parameter_list:
for parameter, value in parameter_list.iteritems():
script = script.replace(
'%s=""' % parameter, '%s=%s' % (parameter, value))
try:
self.write_file_to('script.sh', script)
except Exception as e:
with excutils.save_and_reraise_exception():
print(
'Failure while helper script transferring '
'to cluster node: ' + str(e)
)
self.execute_command('chmod 777 script.sh')
def transfer_helper_script_to_nodes(self, node_ip_list, node_username,
script_name, parameter_list=None):
for node_ip in node_ip_list:
self.open_ssh_connection(node_ip, node_username)
self.transfer_helper_script_to_node(script_name, parameter_list)
self.close_ssh_connection()
#--------------------------------Helper methods--------------------------------
def delete_objects(self, cluster_id=None,
cluster_template_id=None,
node_group_template_id_list=None):
if cluster_id:
self.savanna.clusters.delete(cluster_id)
if cluster_template_id:
self.savanna.cluster_templates.delete(cluster_template_id)
if node_group_template_id_list:
for node_group_template_id in node_group_template_id_list:
self.savanna.node_group_templates.delete(
node_group_template_id
)
def delete_swift_container(self, swift, container):
objects = [obj['name'] for obj in swift.get_container(container)[1]]
for obj in objects:
swift.delete_object(container, obj)
swift.delete_container(container)
def print_error_log(self, message, exception=None):
print(
'********************************************************** ERROR '
'LOG ***********************************************************\n'
)
print(message + str(exception))
print(
'*****************************************************************'
'*****************************************************************'
)

View File

@ -0,0 +1,203 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from swiftclient import client as swift_client
from savanna.openstack.common import excutils
from savanna.tests.integration_new.tests import base
#TODO(ylobankov): add secondary nn config when bug #1217245 will be fixed
NN_CONFIG = {'Name Node Heap Size': 512}
JT_CONFIG = {'Job Tracker Heap Size': 514}
DN_CONFIG = {'Data Node Heap Size': 513}
TT_CONFIG = {'Task Tracker Heap Size': 515}
CLUSTER_GENERAL_CONFIG = {'Enable Swift': True}
CLUSTER_HDFS_CONFIG = {'dfs.replication': 2}
CLUSTER_MR_CONFIG = {'mapred.map.tasks.speculative.execution': False,
'mapred.child.java.opts': '-Xmx500m'}
CONFIG_MAP = {
'namenode': {
'service': 'HDFS',
'config': NN_CONFIG
},
'jobtracker': {
'service': 'MapReduce',
'config': JT_CONFIG
},
'datanode': {
'service': 'HDFS',
'config': DN_CONFIG
},
'tasktracker': {
'service': 'MapReduce',
'config': TT_CONFIG
}
}
class ClusterConfigTest(base.ITestCase):
def __get_node_configs(self, node_group, process):
return node_group['node_configs'][CONFIG_MAP[process]['service']]
def __get_config_from_config_map(self, process):
return CONFIG_MAP[process]['config']
def __compare_configs(self, obtained_config, config):
self.assertEqual(
obtained_config, config,
'Failure while config comparison: configs are not equal. \n'
'Config obtained with \'get\'-request: %s. \n'
'Config while cluster creation: %s.'
% (str(obtained_config), str(config))
)
def __compare_configs_on_cluster_node(self, config, value):
config = config.replace(' ', '')
try:
self.execute_command('./script.sh %s -value %s' % (config, value))
except Exception as e:
with excutils.save_and_reraise_exception():
print(
'Failure while config comparison on cluster node: '
+ str(e)
)
print(
self.read_file_from('/tmp/config-test-log.txt')
)
def __check_configs_for_node_groups(self, node_groups):
for node_group in node_groups:
for process in node_group['node_processes']:
if process in CONFIG_MAP:
self.__compare_configs(
self.__get_node_configs(node_group, process),
self.__get_config_from_config_map(process)
)
def __check_config_application_on_cluster_nodes(
self, node_ip_list_with_node_processes):
for node_ip, processes in node_ip_list_with_node_processes.items():
self.open_ssh_connection(node_ip, self.VANILLA.NODE_USERNAME)
for config, value in CLUSTER_MR_CONFIG.items():
self.__compare_configs_on_cluster_node(config, value)
for config, value in CLUSTER_HDFS_CONFIG.items():
self.__compare_configs_on_cluster_node(config, value)
if 'namenode' in processes:
swift = swift_client.Connection(
authurl=self.COMMON.OS_AUTH_URL,
user=self.COMMON.OS_USERNAME,
key=self.COMMON.OS_PASSWORD,
tenant_name=self.COMMON.OS_TENANT_NAME,
auth_version='2') # TODO(ylobankov): delete hard code
swift.put_container('Swift-config-test')
try:
for config, value in CLUSTER_GENERAL_CONFIG.items():
self.__compare_configs_on_cluster_node(config, value)
finally:
self.delete_swift_container(swift, 'Swift-config-test')
#TODO(ylobankov): add check for secondary nn when bug #1217245 will be fixed
for process in processes:
if process in CONFIG_MAP:
for config, value in self.__get_config_from_config_map(
process).items():
self.__compare_configs_on_cluster_node(config, value)
self.close_ssh_connection()
@base.skip_test('SKIP_CLUSTER_CONFIG_TEST',
message='Test for cluster configs was skipped.')
def _cluster_config_testing(self, cluster_info):
cluster_id = cluster_info['cluster_id']
data = self.savanna.clusters.get(cluster_id)
self.__compare_configs(
data.cluster_configs['general'], CLUSTER_GENERAL_CONFIG
)
self.__compare_configs(
data.cluster_configs['HDFS'], CLUSTER_HDFS_CONFIG
)
self.__compare_configs(
data.cluster_configs['MapReduce'], CLUSTER_MR_CONFIG
)
node_groups = data.node_groups
self.__check_configs_for_node_groups(node_groups)
node_ip_list_with_node_processes = \
self.get_cluster_node_ip_list_with_node_processes(cluster_id)
extra_script_parameters = {
'OS_TENANT_NAME': self.COMMON.OS_TENANT_NAME,
'OS_USERNAME': self.COMMON.OS_USERNAME,
'OS_PASSWORD': self.COMMON.OS_PASSWORD,
'HADOOP_USER': self.VANILLA.HADOOP_USER
}
try:
self.transfer_helper_script_to_nodes(
node_ip_list_with_node_processes, self.VANILLA.NODE_USERNAME,
'cluster_config_test_script.sh', extra_script_parameters
)
except Exception as e:
with excutils.save_and_reraise_exception():
print(str(e))
self.__check_config_application_on_cluster_nodes(
node_ip_list_with_node_processes
)

View File

@ -0,0 +1,270 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import nose.plugins.attrib as attrib
import unittest2
from savanna.openstack.common import excutils
from savanna.tests.integration_new.configs.config import ITConfig as cfg
from savanna.tests.integration_new.tests import map_reduce
from savanna.tests.integration_new.tests import scaling
from savanna.tests.integration_new.tests import swift
class HDPGatingTest(map_reduce.MapReduceTest, swift.SwiftTest,
scaling.ScalingTest):
SKIP_MAP_REDUCE_TEST = cfg().HDP.SKIP_MAP_REDUCE_TEST
SKIP_SWIFT_TEST = cfg().HDP.SKIP_SWIFT_TEST
SKIP_SCALING_TEST = cfg().HDP.SKIP_SCALING_TEST
@attrib.attr(tags='hdp')
@unittest2.skipIf(cfg().HDP.SKIP_ALL_TESTS_FOR_PLUGIN,
'All tests for HDP plugin were skipped')
def test_hdp_plugin_gating(self):
node_group_template_id_list = []
#-------------------------------CLUSTER CREATION-------------------------------
#---------------------"jt-nn" node group template creation---------------------
try:
node_group_template_jt_nn_id = self.create_node_group_template(
'jt-nn',
self.HDP,
description='test node group template',
volumes_per_node=0,
volume_size=0,
node_processes=['JOBTRACKER', 'NAMENODE', 'SECONDARY_NAMENODE',
'GANGLIA_SERVER', 'GANGLIA_MONITOR',
'NAGIOS_SERVER', 'AMBARI_SERVER',
'AMBARI_AGENT'],
node_configs={}
)
node_group_template_id_list.append(node_group_template_jt_nn_id)
except Exception as e:
with excutils.save_and_reraise_exception():
message = 'Failure while \'jt-nn\' node group ' \
'template creation: '
self.print_error_log(message, e)
#-----------------------"tt-dn" node group template creation-------------------
try:
node_group_template_tt_dn_id = self.create_node_group_template(
'tt-dn',
self.HDP,
description='test node group template',
volumes_per_node=0,
volume_size=0,
node_processes=['TASKTRACKER', 'DATANODE', 'GANGLIA_MONITOR',
'HDFS_CLIENT', 'MAPREDUCE_CLIENT',
'AMBARI_AGENT'],
node_configs={}
)
node_group_template_id_list.append(node_group_template_tt_dn_id)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
node_group_template_id_list=node_group_template_id_list
)
message = 'Failure while \'tt-dn\' node group ' \
'template creation: '
self.print_error_log(message, e)
#---------------------------Cluster template creation--------------------------
try:
cluster_template_id = self.create_cluster_template(
'test-cluster-template',
self.HDP,
description='test cluster template',
cluster_configs={},
node_groups=[
dict(
name='master-node-jt-nn',
node_group_template_id=node_group_template_jt_nn_id,
count=1),
dict(
name='worker-node-tt-dn',
node_group_template_id=node_group_template_tt_dn_id,
count=3)
],
anti_affinity=[]
)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
node_group_template_id_list=node_group_template_id_list
)
message = 'Failure while cluster template creation: '
self.print_error_log(message, e)
#-------------------------------Cluster creation-------------------------------
try:
cluster_info = self.create_cluster_and_get_info(
self.HDP,
cluster_template_id,
description='test cluster',
cluster_configs={},
node_groups=None,
anti_affinity=[]
)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
self.cluster_id, cluster_template_id,
node_group_template_id_list
)
message = 'Failure while cluster creation: '
self.print_error_log(message, e)
#------------------------------MAP REDUCE TESTING------------------------------
try:
self._map_reduce_testing(cluster_info)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)
message = 'Failure while Map Reduce testing: '
self.print_error_log(message, e)
#---------------------------CHECK SWIFT AVAILABILITY---------------------------
try:
self._check_swift_availability(cluster_info)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)
message = 'Failure during check of Swift availability: '
self.print_error_log(message, e)
#--------------------------------CLUSTER SCALING-------------------------------
change_list = [
{
'operation': 'resize',
'info': ['worker-node-tt-dn', 4]
},
{
'operation': 'add',
'info': [
'new-worker-node-tt-dn', 1, '%s'
% node_group_template_tt_dn_id
]
}
]
try:
new_cluster_info = self._cluster_scaling(cluster_info, change_list)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)
message = 'Failure while cluster scaling: '
self.print_error_log(message, e)
if not self.HDP.SKIP_SCALING_TEST:
#-----------------------MAP REDUCE TESTING AFTER SCALING-----------------------
try:
self._map_reduce_testing(new_cluster_info)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
new_cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)
message = 'Failure while Map Reduce testing after ' \
'cluster scaling: '
self.print_error_log(message, e)
#--------------------CHECK SWIFT AVAILABILITY AFTER SCALING--------------------
try:
self._check_swift_availability(new_cluster_info)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
new_cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)
message = 'Failure during check of Swift availability ' \
'after cluster scaling: '
self.print_error_log(message, e)
#----------------------------DELETE CREATED OBJECTS----------------------------
self.delete_objects(
cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)

View File

@ -0,0 +1,372 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import nose.plugins.attrib as attrib
import unittest2
from savanna.openstack.common import excutils
from savanna.tests.integration_new.configs.config import ITConfig as cfg
from savanna.tests.integration_new.tests import cluster_configs
from savanna.tests.integration_new.tests import map_reduce
from savanna.tests.integration_new.tests import scaling
from savanna.tests.integration_new.tests import swift
class VanillaGatingTest(cluster_configs.ClusterConfigTest,
map_reduce.MapReduceTest, swift.SwiftTest,
scaling.ScalingTest):
SKIP_CLUSTER_CONFIG_TEST = cfg().VANILLA.SKIP_CLUSTER_CONFIG_TEST
SKIP_MAP_REDUCE_TEST = cfg().VANILLA.SKIP_MAP_REDUCE_TEST
SKIP_SWIFT_TEST = cfg().VANILLA.SKIP_SWIFT_TEST
SKIP_SCALING_TEST = cfg().VANILLA.SKIP_SCALING_TEST
@attrib.attr(tags='vanilla')
@unittest2.skipIf(cfg().VANILLA.SKIP_ALL_TESTS_FOR_PLUGIN,
'All tests for Vanilla plugin were skipped')
def test_vanilla_plugin_gating(self):
node_group_template_id_list = []
#-------------------------------CLUSTER CREATION-------------------------------
#---------------------"tt-dn" node group template creation---------------------
try:
node_group_template_tt_dn_id = self.create_node_group_template(
'tt-dn',
self.VANILLA,
description='test node group template',
volumes_per_node=0,
volume_size=0,
node_processes=['tasktracker', 'datanode'],
node_configs={
'HDFS': cluster_configs.DN_CONFIG,
'MapReduce': cluster_configs.TT_CONFIG
}
)
node_group_template_id_list.append(node_group_template_tt_dn_id)
except Exception as e:
with excutils.save_and_reraise_exception():
message = 'Failure while \'tt-dn\' node group ' \
'template creation: '
self.print_error_log(message, e)
#-----------------------"tt" node group template creation----------------------
try:
node_group_template_tt_id = self.create_node_group_template(
'tt',
self.VANILLA,
description='test node group template',
volumes_per_node=0,
volume_size=0,
node_processes=['tasktracker'],
node_configs={
'MapReduce': cluster_configs.TT_CONFIG
}
)
node_group_template_id_list.append(node_group_template_tt_id)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
node_group_template_id_list=node_group_template_id_list
)
message = 'Failure while \'tt\' node group template creation: '
self.print_error_log(message, e)
#----------------------"dn" node group template creation-----------------------
try:
node_group_template_dn_id = self.create_node_group_template(
'dn',
self.VANILLA,
description='test node group template',
volumes_per_node=0,
volume_size=0,
node_processes=['datanode'],
node_configs={
'HDFS': cluster_configs.DN_CONFIG
}
)
node_group_template_id_list.append(node_group_template_dn_id)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
node_group_template_id_list=node_group_template_id_list
)
message = 'Failure while \'dn\' node group template creation: '
self.print_error_log(message, e)
#---------------------------Cluster template creation--------------------------
try:
cluster_template_id = self.create_cluster_template(
'test-cluster-template',
self.VANILLA,
description='test cluster template',
cluster_configs={
'HDFS': cluster_configs.CLUSTER_HDFS_CONFIG,
'MapReduce': cluster_configs.CLUSTER_MR_CONFIG,
'general': cluster_configs.CLUSTER_GENERAL_CONFIG
},
node_groups=[
dict(
name='master-node-jt-nn',
flavor_id=self.COMMON.FLAVOR_ID,
node_processes=['namenode', 'jobtracker'],
node_configs={
'HDFS': cluster_configs.NN_CONFIG,
'MapReduce': cluster_configs.JT_CONFIG
},
count=1),
dict(
name='master-node-sec-nn',
flavor_id=self.COMMON.FLAVOR_ID,
node_processes=['secondarynamenode'],
node_configs={},
count=1),
dict(
name='worker-node-tt-dn',
node_group_template_id=node_group_template_tt_dn_id,
count=3),
dict(
name='worker-node-dn',
node_group_template_id=node_group_template_dn_id,
count=1),
dict(
name='worker-node-tt',
node_group_template_id=node_group_template_tt_id,
count=1)
],
anti_affinity=[]
)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
node_group_template_id_list=node_group_template_id_list
)
message = 'Failure while cluster template creation: '
self.print_error_log(message, e)
#-------------------------------Cluster creation-------------------------------
try:
cluster_info = self.create_cluster_and_get_info(
self.VANILLA,
cluster_template_id,
description='test cluster',
cluster_configs={},
node_groups=None,
anti_affinity=[]
)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
self.cluster_id, cluster_template_id,
node_group_template_id_list
)
message = 'Failure while cluster creation: '
self.print_error_log(message, e)
#----------------------------CLUSTER CONFIG TESTING----------------------------
try:
self._cluster_config_testing(cluster_info)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)
message = 'Failure while cluster config testing: '
self.print_error_log(message, e)
#------------------------------MAP REDUCE TESTING------------------------------
try:
self._map_reduce_testing(cluster_info)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)
message = 'Failure while Map Reduce testing: '
self.print_error_log(message, e)
#---------------------------CHECK SWIFT AVAILABILITY---------------------------
try:
self._check_swift_availability(cluster_info)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)
message = 'Failure during check of Swift availability: '
self.print_error_log(message, e)
#--------------------------------CLUSTER SCALING-------------------------------
change_list = [
{
'operation': 'resize',
'info': ['worker-node-tt-dn', 4]
},
{
'operation': 'resize',
'info': ['worker-node-dn', 0]
},
{
'operation': 'resize',
'info': ['worker-node-tt', 0]
},
{
'operation': 'add',
'info': [
'new-worker-node-tt', 1, '%s' % node_group_template_tt_id
]
},
{
'operation': 'add',
'info': [
'new-worker-node-dn', 1, '%s' % node_group_template_dn_id
]
}
]
try:
new_cluster_info = self._cluster_scaling(cluster_info, change_list)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)
message = 'Failure while cluster scaling: '
self.print_error_log(message, e)
if not self.VANILLA.SKIP_SCALING_TEST:
#---------------------CLUSTER CONFIG TESTING AFTER SCALING---------------------
try:
self._cluster_config_testing(new_cluster_info)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
new_cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)
message = 'Failure while cluster config testing after ' \
'cluster scaling: '
self.print_error_log(message, e)
#-----------------------MAP REDUCE TESTING AFTER SCALING-----------------------
try:
self._map_reduce_testing(new_cluster_info)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
new_cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)
message = 'Failure while Map Reduce testing after ' \
'cluster scaling: '
self.print_error_log(message, e)
#--------------------CHECK SWIFT AVAILABILITY AFTER SCALING--------------------
try:
self._check_swift_availability(new_cluster_info)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
new_cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)
message = 'Failure during check of Swift availability ' \
'after cluster scaling: '
self.print_error_log(message, e)
#----------------------------DELETE CREATED OBJECTS----------------------------
self.delete_objects(
cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)

View File

@ -0,0 +1,175 @@
#!/bin/bash -x
log=/tmp/config-test-log.txt
case $1 in
NameNodeHeapSize)
FUNC="check_nn_heap_size"
;;
JobTrackerHeapSize)
FUNC="check_jt_heap_size"
;;
DataNodeHeapSize)
FUNC="check_dn_heap_size"
;;
TaskTrackerHeapSize)
FUNC="check_tt_heap_size"
;;
EnableSwift)
FUNC="check_swift_availability"
;;
dfs.replication)
FUNC="check_dfs_replication"
;;
mapred.map.tasks.speculative.execution)
FUNC="check_mapred_map_tasks_speculative_execution"
;;
mapred.child.java.opts)
FUNC="check_mapred_child_java_opts"
;;
esac
shift
if [ "$1" = "-value" ]
then
VALUE="$2"
fi
shift
check_submitted_parameter() {
case "$1" in
config_value)
if [ -z "$VALUE" ]
then
echo "Config value is not specified" >> $log
exit 1
fi
;;
esac
}
compare_config_values() {
check_submitted_parameter config_value
if [ "$VALUE" = "$1" ]
then
echo -e "CHECK IS SUCCESSFUL \n\n" >> $log && exit 0
else
echo -e "Config value while cluster creation request: $VALUE \n" >> $log
echo -e "Actual config value on node: $1 \n" >> $log
echo "$VALUE != $1" >> $log && exit 1
fi
}
check_heap_size() {
heap_size=`ps aux | grep java | grep $1 | grep -o 'Xmx[0-9]\{1,10\}m' | tail -n 1 | grep -o '[0-9]\{1,100\}'`
compare_config_values $heap_size
}
check_nn_heap_size() {
echo -e "*******************NAME NODE HEAP SIZE******************\n" >> $log
check_heap_size "namenode"
}
check_jt_heap_size() {
echo -e "******************JOB TRACKER HEAP SIZE*****************\n" >> $log
check_heap_size "jobtracker"
}
check_dn_heap_size() {
echo -e "*******************DATA NODE HEAP SIZE******************\n" >> $log
check_heap_size "datanode"
}
check_tt_heap_size() {
echo -e "*****************TASK TRACKER HEAP SIZE*****************\n" >> $log
check_heap_size "tasktracker"
}
OS_URL=""
OS_TENANT_NAME=""
OS_USERNAME=""
OS_PASSWORD=""
HADOOP_USER=""
check_swift_availability() {
echo -e "**************************SWIFT*************************\n" >> $log
check_submitted_parameter config_value
echo "Swift config test -- Enable Swift" > /tmp/swift-config-test-file.txt
sudo su -c "hadoop dfs -mkdir /swift-config-test/" $HADOOP_USER
if [ `echo "$?"` -ne 0 ]
then
exit 1
fi
sudo su -c "hadoop dfs -copyFromLocal /tmp/swift-config-test-file.txt /swift-config-test/" $HADOOP_USER
if [ `echo "$?"` -ne 0 ]
then
sudo su -c "hadoop dfs -rmr /swift-config-test" $HADOOP_USER && exit 1
fi
sudo su -c "hadoop distcp -D fs.swift.service.savanna.username=$OS_USERNAME -D fs.swift.service.savanna.tenant=$OS_TENANT_NAME -D fs.swift.service.savanna.password=$OS_PASSWORD /swift-config-test/swift-config-test-file.txt swift://Swift-config-test.savanna/" $HADOOP_USER
if [ `echo "$?"` -ne 0 ]
then
swift_availability="False"
else
swift_availability="True"
fi
sudo su -c "hadoop dfs -rmr /swift-config-test" $HADOOP_USER
compare_config_values $swift_availability
}
check_dfs_replication() {
echo -e "*********************DFS.REPLICATION********************\n" >> $log
value=`cat /etc/hadoop/hdfs-site.xml | grep -A 1 '.*dfs.replication.*' | tail -n 1 | grep -o "[0-9]\{1,10\}"`
compare_config_values $value
}
check_mapred_map_tasks_speculative_execution() {
echo -e "*********MAPRED.MAP.TASKS.SPECULATIVE.EXECUTION*********\n" >> $log
value=`cat /etc/hadoop/mapred-site.xml | grep -A 1 '.*mapred.map.tasks.speculative.execution.*' | tail -n 1 | grep -o "[a-z,A-Z]\{4,5\}" | grep -v "value"`
compare_config_values $value
}
check_mapred_child_java_opts() {
echo -e "*****************MAPRED.CHILD.JAVA.OPTS*****************\n" >> $log
value=`cat /etc/hadoop/mapred-site.xml | grep -A 1 '.*mapred.child.java.opts.*' | tail -n 1 | grep -o "\-Xmx[0-9]\{1,10\}m"`
compare_config_values $value
}
$FUNC

View File

@ -0,0 +1,157 @@
#!/bin/bash -x
dir=/tmp/MapReduceTestOutput
log=$dir/log.txt
HADOOP_VERSION=""
HADOOP_DIRECTORY=""
HADOOP_LOG_DIRECTORY=""
HADOOP_USER=""
NODE_COUNT=""
PLUGIN_NAME=""
case $1 in
run_pi_job)
FUNC="run_pi_job"
;;
get_pi_job_name)
FUNC="get_pi_job_name"
;;
check_directory)
FUNC="check_job_directory_existence"
;;
run_wordcount_job)
FUNC="run_wordcount_job"
;;
esac
shift
if [ "$1" = "-job_name" ]
then
JOB_NAME="$2"
fi
shift
check_submitted_parameter() {
case "$1" in
job_name)
if [ -z "$JOB_NAME" ]
then
echo "Job name not specified"
exit 1
fi
;;
esac
}
check_job_directory_existence() {
check_submitted_parameter job_name
if ! [ -d $HADOOP_LOG_DIRECTORY/$JOB_NAME ]
then
echo "Log file of \"PI\" job not found"
exit 1
fi
}
create_log_directory() {
if ! [ -d $dir ]
then
mkdir $dir
chmod -R 777 $dir
touch $log
fi
}
run_pi_job() {
create_log_directory
echo -e "<********************************DPKG*****************************> \n`dpkg --get-selections | grep hadoop` \n\n\n" >> $log
echo -e "<******************************NETSTAT****************************> \n`sudo netstat -plten | grep java` \n\n\n" >> $log
hadoop_version=-$HADOOP_VERSION
if [ "$PLUGIN_NAME" = "hdp" ]
then
hadoop_version=""
fi
echo -e "<************************START OF \"PI\" JOB**********************> \n" >> $log
echo -e "`sudo su -c \"cd $HADOOP_DIRECTORY && hadoop jar hadoop-examples$hadoop_version.jar pi $[$NODE_COUNT*10] $[$NODE_COUNT*1000]\" $HADOOP_USER` \n" >> $log
echo -e "<************************END OF \"PI\" JOB************************> \n\n\n" >> $log
}
get_pi_job_name() {
job_name=`hadoop job -list all | tail -n1 | awk '{print $1}'`
if [ $job_name = "JobId" ]
then
echo "\"PI\" job name has not been obtained since \"PI\" job was not launched" >> $log
exit 1
fi
echo "$job_name"
}
check_return_code_after_command_execution() {
if [ "$1" = "-exit" ]
then
if [ "$2" -ne 0 ]
then
exit 1
fi
fi
if [ "$1" = "-clean_hdfs" ]
then
if [ "$2" -ne 0 ]
then
sudo su -c "hadoop dfs -rmr /map-reduce-test" $HADOOP_USER && exit 1
fi
fi
}
run_wordcount_job() {
create_log_directory
`dmesg > $dir/input`
sudo su -c "hadoop dfs -ls /" $HADOOP_USER
check_return_code_after_command_execution -exit `echo "$?"`
sudo su -c "hadoop dfs -mkdir /map-reduce-test" $HADOOP_USER
check_return_code_after_command_execution -exit `echo "$?"`
sudo su -c "hadoop dfs -copyFromLocal $dir/input /map-reduce-test/mydata" $HADOOP_USER
check_return_code_after_command_execution -clean_hdfs `echo "$?"`
hadoop_version=-$HADOOP_VERSION
if [ "$PLUGIN_NAME" = "hdp" ]
then
hadoop_version=""
fi
sudo su -c "cd $HADOOP_DIRECTORY && hadoop jar hadoop-examples$hadoop_version.jar wordcount /map-reduce-test/mydata /map-reduce-test/output" $HADOOP_USER
check_return_code_after_command_execution -clean_hdfs `echo "$?"`
sudo su -c "hadoop dfs -copyToLocal /map-reduce-test/output/ $dir/output/" $HADOOP_USER
check_return_code_after_command_execution -exit `echo "$?"`
sudo su -c "hadoop dfs -rmr /map-reduce-test" $HADOOP_USER
check_return_code_after_command_execution -exit `echo "$?"`
}
$FUNC

View File

@ -0,0 +1,70 @@
#!/bin/bash -x
OS_TENANT_NAME=""
OS_USERNAME=""
OS_PASSWORD=""
HADOOP_USER=""
compare_files() {
a=`md5sum $1 | awk {'print \$1'}`
b=`md5sum $2 | awk {'print \$1'}`
if [ "$a" = "$b" ]
then
echo "md5-sums of files $1 and $2 are equal"
else
echo -e "\nUpload file to Swift: $1 \n"
echo -e "Download file from Swift: $2 \n"
echo -e "md5-sums of files $1 and $2 are not equal \n"
echo "$1 != $2" && exit 1
fi
}
check_return_code_after_command_execution() {
if [ "$1" = "-exit" ]
then
if [ "$2" -ne 0 ]
then
exit 1
fi
fi
if [ "$1" = "-clean_hdfs" ]
then
if [ "$2" -ne 0 ]
then
sudo su -c "hadoop dfs -rmr /swift-test" $HADOOP_USER && exit 1
fi
fi
}
check_swift_availability() {
dd if=/dev/urandom of=/tmp/test-file bs=1048576 count=1
sudo su -c "hadoop dfs -mkdir /swift-test/" $HADOOP_USER
check_return_code_after_command_execution -exit `echo "$?"`
sudo su -c "hadoop dfs -copyFromLocal /tmp/test-file /swift-test/" $HADOOP_USER
check_return_code_after_command_execution -clean_hdfs `echo "$?"`
sudo su -c "hadoop distcp -D fs.swift.service.savanna.username=$OS_USERNAME -D fs.swift.service.savanna.tenant=$OS_TENANT_NAME -D fs.swift.service.savanna.password=$OS_PASSWORD /swift-test/test-file swift://Swift-test.savanna/" $HADOOP_USER
check_return_code_after_command_execution -clean_hdfs `echo "$?"`
sudo su -c "hadoop distcp -D fs.swift.service.savanna.username=$OS_USERNAME -D fs.swift.service.savanna.tenant=$OS_TENANT_NAME -D fs.swift.service.savanna.password=$OS_PASSWORD swift://Swift-test.savanna/test-file /swift-test/swift-test-file" $HADOOP_USER
check_return_code_after_command_execution -clean_hdfs `echo "$?"`
sudo su -c "hadoop dfs -copyToLocal /swift-test/swift-test-file /tmp/swift-test-file" $HADOOP_USER
check_return_code_after_command_execution -clean_hdfs `echo "$?"`
sudo su -c "hadoop dfs -rmr /swift-test" $HADOOP_USER
compare_files /tmp/test-file /tmp/swift-test-file
sudo rm /tmp/test-file /tmp/swift-test-file
}
check_swift_availability

View File

@ -0,0 +1,143 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.openstack.common import excutils
from savanna.tests.integration_new.tests import base
class MapReduceTest(base.ITestCase):
def __run_pi_job(self):
self.execute_command('./script.sh run_pi_job')
def __get_name_of_completed_pi_job(self):
try:
job_name = self.execute_command('./script.sh get_pi_job_name')
except Exception as e:
with excutils.save_and_reraise_exception():
print(
'Failure while name obtaining completed \'PI\' job: ' +
str(e)
)
print(
self.read_file_from('/tmp/MapReduceTestOutput/log.txt')
)
return job_name[1][:-1]
def __run_wordcount_job(self):
try:
self.execute_command('./script.sh run_wordcount_job')
except Exception as e:
with excutils.save_and_reraise_exception():
print('Failure while \'Wordcount\' job launch: ' + str(e))
print(
self.read_file_from('/tmp/MapReduceTestOutput/log.txt')
)
@base.skip_test('SKIP_MAP_REDUCE_TEST',
message='Test for Map Reduce was skipped.')
def _map_reduce_testing(self, cluster_info, hadoop_version=None):
plugin = cluster_info['plugin']
if not hadoop_version:
hadoop_version = plugin.HADOOP_VERSION
node_count = cluster_info['node_info']['node_count']
extra_script_parameters = {
'HADOOP_VERSION': hadoop_version,
'HADOOP_DIRECTORY': plugin.HADOOP_DIRECTORY,
'HADOOP_LOG_DIRECTORY': plugin.HADOOP_LOG_DIRECTORY,
'HADOOP_USER': plugin.HADOOP_USER,
'NODE_COUNT': node_count,
'PLUGIN_NAME': plugin.PLUGIN_NAME
}
node_ip_and_process_list = cluster_info['node_ip_list']
try:
self.transfer_helper_script_to_nodes(
node_ip_and_process_list, plugin.NODE_USERNAME,
'map_reduce_test_script.sh',
parameter_list=extra_script_parameters)
except Exception as e:
with excutils.save_and_reraise_exception():
print(str(e))
namenode_ip = cluster_info['node_info']['namenode_ip']
self.open_ssh_connection(namenode_ip, plugin.NODE_USERNAME)
self.__run_pi_job()
job_name = self.__get_name_of_completed_pi_job()
self.close_ssh_connection()
# Check that cluster used each "tasktracker" node while work of PI-job.
# Count of map-tasks and reduce-tasks in helper script guarantees that
# cluster will use each from such nodes while work of PI-job.
try:
for node_ip, process_list in node_ip_and_process_list.items():
if plugin.PROCESS_NAMES['tt'] in process_list:
self.open_ssh_connection(node_ip, plugin.NODE_USERNAME)
self.execute_command(
'./script.sh check_directory -job_name %s' % job_name
)
self.close_ssh_connection()
except Exception as e:
with excutils.save_and_reraise_exception():
print(
'Log file of completed \'PI\' job on \'tasktracker\' '
'cluster node not found: ' + str(e)
)
self.close_ssh_connection()
self.open_ssh_connection(namenode_ip, plugin.NODE_USERNAME)
print(
self.read_file_from('/tmp/MapReduceTestOutput/log.txt')
)
self.open_ssh_connection(namenode_ip, plugin.NODE_USERNAME)
self.__run_wordcount_job()
self.close_ssh_connection()

View File

@ -0,0 +1,169 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.openstack.common import excutils
from savanna.tests.integration_new.tests import base
class ScalingTest(base.ITestCase):
def __change_node_info_while_ng_adding(self, ngt_id, count, cluster_info):
cluster_info['node_info']['node_count'] += count
data = self.savanna.node_group_templates.get(ngt_id).node_processes
if cluster_info['plugin'].PROCESS_NAMES['tt'] in data:
cluster_info['node_info']['tasktracker_count'] += count
if cluster_info['plugin'].PROCESS_NAMES['dn'] in data:
cluster_info['node_info']['datanode_count'] += count
def __change_node_info_while_ng_resizing(self, name, count, cluster_info):
data = self.savanna.clusters.get(
cluster_info['cluster_id']).node_groups
for node_group in data:
if node_group['name'] == name:
processes = node_group['node_processes']
old_count = node_group['count']
cluster_info['node_info']['node_count'] += -old_count + count
if cluster_info['plugin'].PROCESS_NAMES['tt'] in processes:
cluster_info['node_info']['tasktracker_count'] += (
-old_count + count
)
if cluster_info['plugin'].PROCESS_NAMES['dn'] in processes:
cluster_info['node_info']['datanode_count'] += -old_count + count
def __add_new_field_to_scale_body_while_ng_resizing(self, scale_body, name,
count):
scale_body['resize_node_groups'].append(
{
'name': name,
'count': count
}
)
def __add_new_field_to_scale_body_while_ng_adding(self, scale_body, ngt_id,
count, name):
scale_body['add_node_groups'].append(
{
'node_group_template_id': ngt_id,
'count': count,
'name': name
}
)
@base.skip_test('SKIP_SCALING_TEST',
'Test for cluster scaling was skipped.')
def _cluster_scaling(self, cluster_info, change_list):
scale_body = {'add_node_groups': [], 'resize_node_groups': []}
for change in change_list:
if change['operation'] == 'resize':
node_group_name = change['info'][0]
node_group_size = change['info'][1]
self.__add_new_field_to_scale_body_while_ng_resizing(
scale_body, node_group_name, node_group_size
)
self.__change_node_info_while_ng_resizing(
node_group_name, node_group_size, cluster_info
)
if change['operation'] == 'add':
node_group_name = change['info'][0]
node_group_size = change['info'][1]
node_group_id = change['info'][2]
self.__add_new_field_to_scale_body_while_ng_adding(
scale_body, node_group_id, node_group_size,
node_group_name
)
self.__change_node_info_while_ng_adding(
node_group_id, node_group_size, cluster_info
)
self.savanna.clusters.scale(
cluster_info['cluster_id'], scale_body
)
self.poll_cluster_state(cluster_info['cluster_id'])
new_node_ip_list = self.get_cluster_node_ip_list_with_node_processes(
cluster_info['cluster_id']
)
try:
new_node_info = self.get_node_info(new_node_ip_list,
cluster_info['plugin'])
except Exception as e:
with excutils.save_and_reraise_exception():
print(
'Failure during check of node process deployment '
'on cluster node: ' + str(e)
)
actual_node_info = cluster_info['node_info']
self.assertEqual(
actual_node_info, new_node_info,
'Failure while node info comparison. \n'
'Actual node info: %s. \n'
'Node info after cluster scaling: %s.'
% (actual_node_info, new_node_info)
)
try:
self.await_active_workers_for_namenode(new_node_info,
cluster_info['plugin'])
except Exception as e:
with excutils.save_and_reraise_exception():
print(
'Failure while active worker waiting for namenode: '
+ str(e)
)
return {
'cluster_id': cluster_info['cluster_id'],
'node_ip_list': new_node_ip_list,
'node_info': new_node_info,
'plugin': cluster_info['plugin']
}

View File

@ -0,0 +1,78 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from swiftclient import client as swift_client
from savanna.openstack.common import excutils
from savanna.tests.integration_new.tests import base
class SwiftTest(base.ITestCase):
@base.skip_test(
'SKIP_SWIFT_TEST',
message='Test for check of Swift availability was skipped.')
def _check_swift_availability(self, cluster_info):
plugin = cluster_info['plugin']
extra_script_parameters = {
'OS_TENANT_NAME': self.COMMON.OS_TENANT_NAME,
'OS_USERNAME': self.COMMON.OS_USERNAME,
'OS_PASSWORD': self.COMMON.OS_PASSWORD,
'HADOOP_USER': plugin.HADOOP_USER,
}
namenode_ip = cluster_info['node_info']['namenode_ip']
self.open_ssh_connection(namenode_ip, plugin.NODE_USERNAME)
try:
self.transfer_helper_script_to_node('swift_test_script.sh',
extra_script_parameters)
except Exception as e:
with excutils.save_and_reraise_exception():
print(str(e))
swift = swift_client.Connection(
authurl=self.COMMON.OS_AUTH_URL,
user=self.COMMON.OS_USERNAME,
key=self.COMMON.OS_PASSWORD,
tenant_name=self.COMMON.OS_TENANT_NAME,
auth_version='2') # TODO(ylobankov): delete hard code
swift.put_container('Swift-test')
try:
self.execute_command('./script.sh')
except Exception as e:
with excutils.save_and_reraise_exception():
print(str(e))
finally:
self.delete_swift_container(swift, 'Swift-test')
self.close_ssh_connection()

View File

@ -10,6 +10,10 @@ mock>=1.0
nose
openstack.nose_plugin>=0.7
pylint==0.25.2
python-swiftclient>=1.6.0
sphinx>=1.1.2
sphinxcontrib-httpdomain
unittest2
-f http://tarballs.openstack.org/python-savannaclient/python-savannaclient-master.tar.gz#egg=pythonsavannaclient-0.1
pythonsavannaclient>=0.1

View File

@ -21,7 +21,7 @@ deps =
commands = nosetests -w unit {posargs}
[testenv:integration]
commands = nosetests -w integration -x {posargs}
commands = nosetests -w integration_new -x {posargs}
[testenv:cover]
setenv = NOSE_WITH_COVERAGE=1