Fix for quantum network for savanna ostf tests

Minor fixes:
 * fixed double cluster delete after error stataus
 * fixed image check if image was rediter and unregister in Savvanna

Change-Id: I60c75528df4646c8e93d116af0218b19ebde4b6d
This commit is contained in:
Sergey Galkin 2013-10-19 19:57:39 +04:00
parent 8b06e949ca
commit 50c4e28d7a
3 changed files with 188 additions and 170 deletions

View File

@ -0,0 +1,49 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import paramiko
from fuel_health.common.ssh import Client as SSHClient
from fuel_health.exceptions import SSHExecCommandFailed
import fuel_health.config
import fuel_health.nmanager as nmanager
LOG = logging.getLogger(__name__)
def ssh_command(cmd):
config = fuel_health.config.FuelConfig()
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
LOG.debug('Remote ssh commad is "%s"', cmd)
try:
ssh.connect(hostname=config.compute.controller_nodes[0],
username=config.compute.controller_node_ssh_user,
key_filename=config.compute.path_to_private_key,
timeout=300)
_, stdout, stderr = ssh.exec_command(cmd)
output = stdout.read()
output_err = stderr.read()
ssh.close()
LOG.debug('Output ssh is "%s%s"', output, output_err)
return output, output_err
except SSHExecCommandFailed as exc:
output_msg = "Command failed."
LOG.debug(exc)
self.fail(output_msg)

View File

@ -13,6 +13,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
LOG = logging.getLogger(__name__)
def create_cluster(self, name, plugin_name, hadoop_version,
@ -25,7 +27,7 @@ def create_cluster(self, name, plugin_name, hadoop_version,
'plugin_name': plugin_name,
'hadoop_version': hadoop_version
}
LOG.debug("Network id is '%s'" % net_id)
if cluster_template_id is None:
self._assert_variables(default_image_id=default_image_id,
cluster_configs=cluster_configs,

View File

@ -17,10 +17,8 @@
import logging
import time
import paramiko
from fuel_health.common.ssh import Client as SSHClient
from fuel_health.exceptions import SSHExecCommandFailed
from fuel_health.common.savanna_ssh import ssh_command
import fuel_health.nmanager as nmanager
@ -42,7 +40,8 @@ class SavannaTest(nmanager.OfficialClientTest):
cls.keys = []
cls.plugin = 'vanilla'
cls.plugin_version = '1.1.2'
cls.floating_ip_pool = 'nova'
cls.quantum_floating_ip = 'net04_ext'
cls.quantum_net = 'net04'
cls.TT_CONFIG = {'Task Tracker Heap Size': 515}
cls.DN_CONFIG = {'Data Node Heap Size': 513}
cls.CLUSTER_HDFS_CONFIG = {'dfs.replication': 2}
@ -73,11 +72,16 @@ class SavannaTest(nmanager.OfficialClientTest):
def _test_image(self, version, plugin):
tag_version = '_savanna_tag_%s' % version
tag_plugin = '_savanna_tag_%s' % plugin
LOG.debug('Testing image - plugin - %s version - %s', tag_plugin, tag_version)
for image in self.compute_client.images.list():
if image.name == 'savanna':
LOG.debug('Savanna image metadata is %s', image.metadata)
if image.metadata[tag_version] == 'True'\
and image.metadata[tag_plugin] == 'True':
return True
and image.metadata[tag_plugin] == 'True'\
and image.metadata[_savanna_username] is not None:
LOG.debug('Correct image for savanna found')
return True
LOG.debug('Correct image for savanna not found')
return False
def _create_node_group_template_and_get_id(
@ -90,14 +94,15 @@ class SavannaTest(nmanager.OfficialClientTest):
self.flavors.append(flavor.id)
if floating_ip_pool:
data = client.node_group_templates.create(
name, plugin_name, hadoop_version, self.flavors[0], description,
volumes_per_node, volume_size, node_processes, node_configs,
floating_ip_pool
name, plugin_name, hadoop_version, self.flavors[0],
description, volumes_per_node, volume_size,
node_processes, node_configs, floating_ip_pool
)
else:
data = client.node_group_templates.create(
name, plugin_name, hadoop_version, self.flavors[0], description,
volumes_per_node, volume_size, node_processes, node_configs
name, plugin_name, hadoop_version, self.flavors[0],
description, volumes_per_node, volume_size,
node_processes, node_configs
)
node_group_template_id = str(data.id)
return node_group_template_id
@ -116,10 +121,8 @@ class SavannaTest(nmanager.OfficialClientTest):
def _check_cluster_state(self, client, cluster_state, cluster_id):
if cluster_state == 'Error':
client.clusters.delete(cluster_id)
client.fail('Cluster state == \'Error\'')
if cluster_state != 'Active':
client.clusters.delete(cluster_id)
client.fail(
'Cluster state != \'Active\', passed %d minutes'
% self.CLUSTER_CREATION_TIMEOUT)
@ -128,7 +131,8 @@ class SavannaTest(nmanager.OfficialClientTest):
data = client.clusters.get(cluster_id)
i = 1
while str(data.status) != 'Active':
LOG.debug('CLUSTER STATUS:' + str(i * 10) + ' sec:' + str(data.status))
LOG.debug('CLUSTER STATUS:' + str(i * 10) +
' sec:' + str(data.status))
print('CLUSTER STATUS:' + str(i * 10) + ' sec:' + str(data.status))
if str(data.status) == 'Error':
LOG.debug('\n' + str(i * 10) + ' sec:' + str(data) + '\n')
@ -153,27 +157,37 @@ class SavannaTest(nmanager.OfficialClientTest):
node_ip = instance['management_ip']
node_ip_list_with_node_processes[node_ip] = node_group[
'node_processes']
LOG.debug('node_ip_list_with_node_processes:\n%s' % node_ip_list_with_node_processes)
LOG.debug('node_ip_list_with_node_processes:\n%s',
node_ip_list_with_node_processes)
return node_ip_list_with_node_processes
def _create_cluster_and_get_info(
self, client, plugin_name, hadoop_version, cluster_template_id,
image_name, description, cluster_configs, node_groups,
anti_affinity):
anti_affinity, neutron_management_network=None):
self.keys.append(
self.compute_client.keypairs.create(self.USER_KEYPAIR_ID))
image_id = str(self.compute_client.images.find(name=image_name).id)
data = client.clusters.create(
self.CLUSTER_NAME, plugin_name, hadoop_version,
cluster_template_id, image_id, description, cluster_configs,
node_groups, self.USER_KEYPAIR_ID, anti_affinity
)
if neutron_management_network:
data = client.clusters.create(
self.CLUSTER_NAME, plugin_name, hadoop_version,
cluster_template_id, image_id, description, cluster_configs,
node_groups, self.USER_KEYPAIR_ID, anti_affinity,
neutron_management_network
)
else:
data = client.clusters.create(
self.CLUSTER_NAME, plugin_name, hadoop_version,
cluster_template_id, image_id, description, cluster_configs,
node_groups, self.USER_KEYPAIR_ID, anti_affinity
)
cluster_id = data.id
self.clusters.append(cluster_id)
cluster_state = self._get_cluster_state(client, cluster_id)
self._check_cluster_state(client, cluster_state, cluster_id)
node_ip_list_with_node_processes = \
self._get_cluster_node_ip_list_with_node_processes(client, cluster_id)
self._get_cluster_node_ip_list_with_node_processes(client,
cluster_id)
node_info = self._get_node_info(
node_ip_list_with_node_processes, plugin_name
)
@ -187,58 +201,39 @@ class SavannaTest(nmanager.OfficialClientTest):
i = 0
while True:
cmd = ('nc -v -z -w 60 %s %s | grep succeeded' % (host, port))
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(
hostname=self.config.compute.controller_nodes[0],
username=self.config.compute.controller_node_ssh_user,
key_filename=self.config.compute.path_to_private_key,
timeout=300)
stdin, stdout, stderr = ssh.exec_command(cmd)
output = stdout.read()
output_err = stderr.read()
ssh.close()
print('NC output after %s seconds is "%s"' % (i*10, output))
LOG.debug('Remote command: %s' % cmd)
LOG.debug('NC output after %s seconds is "%s"' % (i*10, output))
if output or str(output_err).find(' succeeded!') > 0:
break
if not output and i > 600:
self.fail('On host %s port %s is not opened more then 10 minutes' % (host, port))
time.sleep(10)
i += 1
except SSHExecCommandFailed as exc:
output_msg = "Command failed."
LOG.debug(exc)
self.fail(output_msg)
output, output_err = ssh_command(cmd)
print('NC output after %s seconds is "%s"' % (i * 10, output))
LOG.debug('NC output after %s seconds is "%s"',
i * 10, output)
if output or str(output_err).find(' succeeded!') > 0:
break
if not output and i > 600:
self.fail('On host %s port %s is not opened '
'more then 10 minutes' % (host, port))
time.sleep(10)
i += 1
return True
def _check_auto_assign_floating_ip(self):
cmd = ('grep auto_assign_floating_ip /etc/nova/nova.conf | grep True')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(
hostname=self.config.compute.controller_nodes[0],
username=self.config.compute.controller_node_ssh_user,
key_filename=self.config.compute.path_to_private_key,
timeout=300)
stdin, stdout, stderr = ssh.exec_command(cmd)
output = stdout.read()
output_err = stderr.read()
ssh.close()
LOG.debug('Remote command: %s' % cmd)
LOG.debug('Output is "%s%s"' % (output, output_err))
if output or str(output_err).find(' True') > 0:
LOG.debug('auto_assign_floating_ip is found')
return True
else:
LOG.debug('auto_assign_floating_ip is not found')
return False
except SSHExecCommandFailed as exc:
output_msg = "Command failed."
LOG.debug(exc)
self.fail(output_msg)
cmd_nova = ('grep auto_assign_floating_ip '
'/etc/nova/nova.conf | grep True')
cmd_quantum = ('grep network_api_class=nova.network.quantumv2.api.API '
'/etc/nova/nova.conf')
output_nova, output_nova_err = ssh_command(cmd_nova)
output_quantum, output_quantum_err = ssh_command(cmd_quantum)
if output_nova or str(output_nova_err).find(' True') > 0:
LOG.debug('auto_assign_floating_ip is found')
return ('nova_auto', None)
elif(output_quantum or str(output_quantum_err).find(
'network_api_class=nova.network.quantumv2.api.API') > 0):
LOG.debug('quantum is found')
return ('quantum', self.quantum_floating_ip)
else:
LOG.debug('auto_assign_floating_ip is not found')
LOG.debug('floating pool is %s',
self.compute_client.floating_ip_pools.list()[0].name)
return ('nova',
self.compute_client.floating_ip_pools.list()[0].name)
def _get_node_info(self, node_ip_list_with_node_processes, plugin_name):
tasktracker_count = 0
@ -285,35 +280,42 @@ class SavannaTest(nmanager.OfficialClientTest):
'node_count': node_count
}
@classmethod
def _get_cluster_node_ip_list_with_node_processes(
cls, client, cluster_id):
data = client.clusters.get(cluster_id)
node_groups = data.node_groups
node_ip_list_with_node_processes = {}
for node_group in node_groups:
instances = node_group['instances']
for instance in instances:
node_ip = instance['management_ip']
node_ip_list_with_node_processes[node_ip] = node_group[
'node_processes']
LOG.debug(node_ip_list_with_node_processes)
return node_ip_list_with_node_processes
def _create_cluster(self, client, cluster_template_id):
self._create_cluster_and_get_info(
client,
self.PLUGIN_NAME,
self.HADOOP_VERSION,
cluster_template_id,
self.IMAGE_NAME,
description='test cluster',
cluster_configs={},
node_groups=None,
anti_affinity=[])
net_type, _ = self._check_auto_assign_floating_ip()
LOG.debug('Network type is "%s"', net_type)
if net_type == 'quantum':
neutron_m_network = \
self.compute_client.networks.find(label=self.quantum_net)
LOG.debug('Neutron network - %s', neutron_m_network.id)
LOG.debug('Creating clister for neutron network')
self._create_cluster_and_get_info(
client,
self.PLUGIN_NAME,
self.HADOOP_VERSION,
cluster_template_id,
self.IMAGE_NAME,
description='test cluster',
cluster_configs={},
node_groups=None,
anti_affinity=[],
neutron_management_network=neutron_m_network.id)
else:
LOG.debug('Creating clister for nova network')
self._create_cluster_and_get_info(
client,
self.PLUGIN_NAME,
self.HADOOP_VERSION,
cluster_template_id,
self.IMAGE_NAME,
description='test cluster',
cluster_configs={},
node_groups=None,
anti_affinity=[])
def _create_node_group_template_tt_dn_id(self, client):
if self._check_auto_assign_floating_ip():
net_type, floating_ip_pool = self._check_auto_assign_floating_ip()
if net_type == 'nova_auto':
LOG.debug('Creating node group template without floating ip')
node_group_template_tt_dn_id = \
self._create_node_group_template_and_get_id(
client,
@ -330,6 +332,7 @@ class SavannaTest(nmanager.OfficialClientTest):
}
)
else:
LOG.debug('Creating node group template with floating ip')
node_group_template_tt_dn_id = \
self._create_node_group_template_and_get_id(
client,
@ -344,7 +347,7 @@ class SavannaTest(nmanager.OfficialClientTest):
'HDFS': self.DN_CONFIG,
'MapReduce': self.TT_CONFIG
},
floating_ip_pool=self.floating_ip_pool
floating_ip_pool=floating_ip_pool
)
self.node_groups.append(node_group_template_tt_dn_id)
return node_group_template_tt_dn_id
@ -434,7 +437,9 @@ class SavannaTest(nmanager.OfficialClientTest):
return cluster_template_id
def _create_tiny_cluster_template(self, client):
if self._check_auto_assign_floating_ip():
net_type, floating_ip_pool = self._check_auto_assign_floating_ip()
if net_type == 'nova_auto':
LOG.debug('Creating cluster template without floating ip')
cluster_template_id = self._create_cluster_template_and_get_id(
client,
'ostf-test-savanna-cluster-template',
@ -464,6 +469,7 @@ class SavannaTest(nmanager.OfficialClientTest):
anti_affinity=[]
)
else:
LOG.debug('Creating cluster template with floating ip')
cluster_template_id = self._create_cluster_template_and_get_id(
client,
'ostf-test-savanna-cluster-template',
@ -484,7 +490,7 @@ class SavannaTest(nmanager.OfficialClientTest):
'HDFS': self.NN_CONFIG,
'MapReduce': self.JT_CONFIG
},
floating_ip_pool=self.floating_ip_pool,
floating_ip_pool=floating_ip_pool,
count=1),
dict(
name='ostf-test-worker-node-tt-dn',
@ -496,80 +502,42 @@ class SavannaTest(nmanager.OfficialClientTest):
self.cluster_templates.append(cluster_template_id)
return cluster_template_id
def _await_active_workers_for_namenode(self, node_info, plugin_name):
def _await_active_workers_for_namenoded(self, node_info, plugin_name):
attempt_count = 100
if plugin_name == 'vanilla':
hadoop_user = self.V_HADOOP_USER
node_username = self.V_NODE_USERNAME
elif plugin_name == 'hdp':
hadoop_user = self.HDP_HADOOP_USER
LOG.debug('Starting ssh execute')
try:
SSHClient(host=self.config.compute.controller_nodes[0],
username=self.config.compute.controller_node_ssh_user,
pkey=self.config.compute.path_to_private_key,
timeout=300
).exec_command('echo "%s" > /tmp/ostf-savanna.pem' %
self.keys[0].private_key)
SSHClient(host=self.config.compute.controller_nodes[0],
username=self.config.compute.controller_node_ssh_user,
pkey=self.config.compute.path_to_private_key,
timeout=300
).exec_command('chmod 600 /tmp/ostf-savanna.pem')
except SSHExecCommandFailed as exc:
output_msg = "Command failed."
LOG.debug(exc)
self.fail(output_msg)
ssh_command('echo "%s" > /tmp/ostf-savanna.pem' %
self.keys[0].private_key)
ssh_command('chmod 600 /tmp/ostf-savanna.pem')
while True:
try:
cmd = ('ssh -i /tmp/ostf-savanna.pem -l %s '
'-oUserKnownHostsFile=/dev/null '
'-oStrictHostKeyChecking=no %s '
'sudo -u %s -i "hadoop job '
'-list-active-trackers"' %
(self.V_NODE_USERNAME, node_info['namenode_ip'], hadoop_user))
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
hostname=self.config.compute.controller_nodes[0],
username=self.config.compute.controller_node_ssh_user,
key_filename=self.config.compute.path_to_private_key,
timeout=300)
stdin, stdout, stderr = ssh.exec_command(cmd)
active_tasktracker_count = stdout.read()
ssh.close()
LOG.debug('active_tasktracker_count:%s' % active_tasktracker_count)
print('active_tasktracker_count:%s' % active_tasktracker_count)
except SSHExecCommandFailed as exc:
output_msg = "Command failed."
LOG.debug(exc)
self.fail(output_msg)
try:
cmd = ('ssh -i /tmp/ostf-savanna.pem -l %s '
'-oUserKnownHostsFile=/dev/null '
'-oStrictHostKeyChecking=no %s '
'sudo -u %s -i "hadoop dfsadmin -report" '
'| grep "Datanodes available:.*" | awk '
'\'{print $3}\'' %
(self.V_NODE_USERNAME, node_info['namenode_ip'], hadoop_user))
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
hostname=self.config.compute.controller_nodes[0],
username=self.config.compute.controller_node_ssh_user,
key_filename=self.config.compute.path_to_private_key,
timeout=300)
stdin, stdout, stderr = ssh.exec_command(cmd)
active_datanode_count = stdout.read()
ssh.close()
LOG.debug('active_datanode_count:%s' % active_datanode_count)
print('active_datanode_count:%s' % active_datanode_count)
except SSHExecCommandFailed as exc:
output_msg = "Command failed."
LOG.debug(exc)
self.fail(output_msg)
cmd = ('ssh -i /tmp/ostf-savanna.pem -l %s '
'-oUserKnownHostsFile=/dev/null '
'-oStrictHostKeyChecking=no %s '
'sudo -u %s -i "hadoop job '
'-list-active-trackers"' %
(self.V_NODE_USERNAME, node_info['namenode_ip'],
hadoop_user))
stdout, stderr = ssh_command(cmd)
active_tasktracker_count = stdout
LOG.debug('active_tasktracker_count:%s',
active_tasktracker_count)
print('active_tasktracker_count:%s' % active_tasktracker_count)
cmd = ('ssh -i /tmp/ostf-savanna.pem -l %s '
'-oUserKnownHostsFile=/dev/null '
'-oStrictHostKeyChecking=no %s '
'sudo -u %s -i "hadoop dfsadmin -report" '
'| grep "Datanodes available:.*" | awk '
'\'{print $3}\'' %
(self.V_NODE_USERNAME, node_info['namenode_ip'],
hadoop_user))
stdout, stderr = ssh_command(cmd)
active_datanode_count = stdout
ssh.close()
LOG.debug('active_datanode_count:%s', active_datanode_count)
print('active_datanode_count:%s' % active_datanode_count)
if not active_tasktracker_count:
active_tasktracker_count = 0
else:
@ -636,4 +604,3 @@ class SavannaTest(nmanager.OfficialClientTest):
cls._clean_node_groups_templates()
cls._clean_flavors()
cls._clean_keys()