Upgrade Storm plugin to version 1.0.1

This is the last part of upgrading Strom to version 1.0.1.

Change-Id: I74660f5734100c04066e1a96f46cddf3f9de0f10
Partially-implements: blueprint upgrade-storm-1-0-1
This commit is contained in:
Telles Nobrega 2016-06-29 09:30:43 -03:00
parent 6681d0aa11
commit 3f43a76227
3 changed files with 165 additions and 111 deletions

View File

@ -42,10 +42,17 @@ def get_plugin_configs():
return {}
def generate_storm_config(master_hostname, zk_hostnames):
def generate_storm_config(master_hostname, zk_hostnames, version):
if version == '1.0.1':
host_cfg = 'nimbus.seeds'
master_value = [master_hostname.encode('ascii', 'ignore')]
else:
host_cfg = 'nimbus.host'
master_value = master_hostname.encode('ascii', 'ignore')
cfg = {
"nimbus.host": master_hostname.encode('ascii', 'ignore'),
host_cfg: master_value,
"worker.childopts": "-Xmx768m -Djava.net.preferIPv4Stack=true",
"nimbus.childopts": "-Xmx1024m -Djava.net.preferIPv4Stack=true",
"supervisor.childopts": "-Djava.net.preferIPv4Stack=true",

View File

@ -53,7 +53,7 @@ class StormProvider(p.ProvisioningPluginBase):
"cluster without any management consoles."))
def get_versions(self):
return ['0.9.2']
return ['0.9.2', '1.0.1']
def get_configs(self, storm_version):
return c_helper.get_plugin_configs()
@ -148,7 +148,8 @@ class StormProvider(p.ProvisioningPluginBase):
config_instances = c_helper.generate_storm_config(
st_master.hostname(),
zknames)
zknames,
cluster.hadoop_version)
config = self._convert_dict_to_yaml(config_instances)
supervisor_conf = c_helper.generate_slave_supervisor_conf()

View File

@ -20,7 +20,9 @@ from sahara import context
from sahara.plugins import base as pb
from sahara.plugins import exceptions as ex
from sahara.plugins.storm import plugin as pl
from sahara.service.edp.storm import engine
from sahara.tests.unit import base
from sahara.utils import edp
conductor = cond.API
@ -43,120 +45,149 @@ class StormPluginTest(base.SaharaWithDbTestCase):
master.id = self.master_inst
return master
def _get_cluster(self, name, version):
cluster_dict = {
'name': name,
'plugin_name': 'storm',
'hadoop_version': version,
'node_groups': []}
return cluster_dict
def test_validate_existing_ng_scaling(self):
data = {'name': "cluster",
'plugin_name': "storm",
'hadoop_version': "0.9.2",
'node_groups': [
{'name': 'master',
'flavor_id': '42',
'count': 1,
'node_processes': ['nimbus']},
{'name': 'slave',
'flavor_id': '42',
'count': 1,
'node_processes': ['supervisor']},
{'name': 'zookeeper',
'flavor_id': '42',
'count': 1,
'node_processes': ['zookeeper']}
]
}
cluster = conductor.cluster_create(context.ctx(), data)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
supervisor_id = [node.id for node in cluster.node_groups
if node.name == 'supervisor']
self.assertIsNone(plugin._validate_existing_ng_scaling(cluster,
supervisor_id))
data = [
{'name': 'master',
'flavor_id': '42',
'count': 1,
'node_processes': ['nimbus']},
{'name': 'slave',
'flavor_id': '42',
'count': 1,
'node_processes': ['supervisor']},
{'name': 'zookeeper',
'flavor_id': '42',
'count': 1,
'node_processes': ['zookeeper']}
]
cluster_data_092 = self._get_cluster('cluster_0.9.2', '0.9.2')
cluster_data_101 = self._get_cluster('cluster_1.0.1', '1.0.1')
cluster_data_092['node_groups'] = data
cluster_data_101['node_groups'] = data
clusters = [cluster_data_092, cluster_data_101]
for cluster_data in clusters:
cluster = conductor.cluster_create(context.ctx(), cluster_data)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
supervisor_id = [node.id for node in cluster.node_groups
if node.name == 'supervisor']
self.assertIsNone(
plugin._validate_existing_ng_scaling(cluster,
supervisor_id))
def test_validate_additional_ng_scaling(self):
data = {'name': "cluster",
'plugin_name': "storm",
'hadoop_version': "0.9.2",
'node_groups': [
{'name': 'master',
'flavor_id': '42',
'count': 1,
'node_processes': ['nimbus']},
{'name': 'slave',
'flavor_id': '42',
'count': 1,
'node_processes': ['supervisor']},
{'name': 'zookeeper',
'flavor_id': '42',
'count': 1,
'node_processes': ['zookeeper']},
{'name': 'slave2',
'flavor_id': '42',
'count': 0,
'node_processes': ['supervisor']}
]
}
cluster = conductor.cluster_create(context.ctx(), data)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
supervisor_id = [node.id for node in cluster.node_groups
if node.name == 'supervisor']
self.assertIsNone(plugin._validate_additional_ng_scaling(cluster,
supervisor_id)
)
data = [
{'name': 'master',
'flavor_id': '42',
'count': 1,
'node_processes': ['nimbus']},
{'name': 'slave',
'flavor_id': '42',
'count': 1,
'node_processes': ['supervisor']},
{'name': 'zookeeper',
'flavor_id': '42',
'count': 1,
'node_processes': ['zookeeper']},
{'name': 'slave2',
'flavor_id': '42',
'count': 0,
'node_processes': ['supervisor']}
]
cluster_data_092 = self._get_cluster('cluster_0.9.2', '0.9.2')
cluster_data_101 = self._get_cluster('cluster_1.0.1', '1.0.1')
cluster_data_092['node_groups'] = data
cluster_data_101['node_groups'] = data
clusters = [cluster_data_092, cluster_data_101]
for cluster_data in clusters:
cluster = conductor.cluster_create(context.ctx(), cluster_data)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
supervisor_id = [node.id for node in cluster.node_groups
if node.name == 'supervisor']
self.assertIsNone(
plugin._validate_additional_ng_scaling(cluster,
supervisor_id))
def test_validate_existing_ng_scaling_raises(self):
data = {'name': "cluster",
'plugin_name': "storm",
'hadoop_version': "0.9.2",
'node_groups': [
{'name': 'master',
'flavor_id': '42',
'count': 1,
'node_processes': ['nimbus']},
{'name': 'slave',
'flavor_id': '42',
'count': 1,
'node_processes': ['supervisor']},
{'name': 'zookeeper',
'flavor_id': '42',
'count': 1,
'node_processes': ['zookeeper']}
]
}
cluster = conductor.cluster_create(context.ctx(), data)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
master_id = [node.id for node in cluster.node_groups
if node.name == 'master']
self.assertRaises(ex.NodeGroupCannotBeScaled,
plugin._validate_existing_ng_scaling,
cluster, master_id)
data = [
{'name': 'master',
'flavor_id': '42',
'count': 1,
'node_processes': ['nimbus']},
{'name': 'slave',
'flavor_id': '42',
'count': 1,
'node_processes': ['supervisor']},
{'name': 'zookeeper',
'flavor_id': '42',
'count': 1,
'node_processes': ['zookeeper']}
]
cluster_data_092 = self._get_cluster('cluster_0.9.2', '0.9.2')
cluster_data_101 = self._get_cluster('cluster_1.0.1', '1.0.1')
cluster_data_092['node_groups'] = data
cluster_data_101['node_groups'] = data
clusters = [cluster_data_092, cluster_data_101]
for cluster_data in clusters:
cluster = conductor.cluster_create(context.ctx(), cluster_data)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
master_id = [node.id for node in cluster.node_groups
if node.name == 'master']
self.assertRaises(ex.NodeGroupCannotBeScaled,
plugin._validate_existing_ng_scaling,
cluster, master_id)
def test_validate_additional_ng_scaling_raises(self):
data = {'name': "cluster",
'plugin_name': "storm",
'hadoop_version': "0.9.2",
'node_groups': [
{'name': 'master',
'flavor_id': '42',
'count': 1,
'node_processes': ['nimbus']},
{'name': 'slave',
'flavor_id': '42',
'count': 1,
'node_processes': ['supervisor']},
{'name': 'zookeeper',
'flavor_id': '42',
'count': 1,
'node_processes': ['zookeeper']},
{'name': 'master2',
'flavor_id': '42',
'count': 0,
'node_processes': ['nimbus']}
]
}
cluster = conductor.cluster_create(context.ctx(), data)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
master_id = [node.id for node in cluster.node_groups
if node.name == 'master2']
self.assertRaises(ex.NodeGroupCannotBeScaled,
plugin._validate_existing_ng_scaling,
cluster, master_id)
data = [
{'name': 'master',
'flavor_id': '42',
'count': 1,
'node_processes': ['nimbus']},
{'name': 'slave',
'flavor_id': '42',
'count': 1,
'node_processes': ['supervisor']},
{'name': 'zookeeper',
'flavor_id': '42',
'count': 1,
'node_processes': ['zookeeper']},
{'name': 'master2',
'flavor_id': '42',
'count': 0,
'node_processes': ['nimbus']}
]
cluster_data_092 = self._get_cluster('cluster_0.9.2', '0.9.2')
cluster_data_101 = self._get_cluster('cluster_1.0.1', '1.0.1')
cluster_data_092['node_groups'] = data
cluster_data_101['node_groups'] = data
clusters = [cluster_data_092, cluster_data_101]
for cluster_data in clusters:
cluster = conductor.cluster_create(context.ctx(), cluster_data)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
master_id = [node.id for node in cluster.node_groups
if node.name == 'master2']
self.assertRaises(ex.NodeGroupCannotBeScaled,
plugin._validate_existing_ng_scaling,
cluster, master_id)
def test_get_open_port(self):
plugin_storm = pl.StormProvider()
@ -167,3 +198,18 @@ class StormPluginTest(base.SaharaWithDbTestCase):
ng.cluster = cluster
ports = plugin_storm.get_open_ports(ng)
self.assertEqual([8080], ports)
def _test_engine(self, version, job_type, eng):
cluster_dict = self._get_cluster('demo', version)
cluster = conductor.cluster_create(context.ctx(), cluster_dict)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
self.assertIsInstance(plugin.get_edp_engine(cluster, job_type), eng)
def test_plugin092_edp_engine(self):
self._test_engine('0.9.2', edp.JOB_TYPE_STORM,
engine.StormJobEngine)
def test_plugin101_edp_engine(self):
self._test_engine('1.0.1', edp.JOB_TYPE_STORM,
engine.StormJobEngine)