Configuration token replacement is incorrect for some topologies
HDP configuration tokens are now properly replaced for non co-located master services
Fixes bug: 1223434
Change-Id: I4380db4653ae4ba2c225eda79662b8eac84a55de
(cherry picked from commit c6d5f000de
)
This commit is contained in:
parent
3a8ddfbd81
commit
e29fb68b76
|
@ -45,7 +45,7 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
|||
cluster_template, cluster=cluster)
|
||||
|
||||
hosts = self._get_servers(cluster)
|
||||
ambari_info = self.get_ambari_info(cluster_spec, hosts)
|
||||
ambari_info = self.get_ambari_info(cluster_spec)
|
||||
self.cluster_ambari_mapping[cluster.name] = ambari_info
|
||||
ambari_uri = self._get_ambari_uri(cluster_spec)
|
||||
|
||||
|
@ -63,8 +63,7 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
|||
if installed:
|
||||
LOG.info("Install of Hadoop stack successful.")
|
||||
# add service urls
|
||||
self._set_cluster_info(
|
||||
cluster, cluster_spec, hosts, ambari_info)
|
||||
self._set_cluster_info(cluster, cluster_spec, ambari_info)
|
||||
else:
|
||||
raise ex.HadoopProvisionError(
|
||||
'Installation of Hadoop stack failed.')
|
||||
|
@ -292,23 +291,6 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
|||
#TODO(jspeidel): max wait time
|
||||
LOG.info('Waiting to connect to ambari server ...')
|
||||
|
||||
def _determine_host_for_server_component(
|
||||
self, component, cluster_spec, servers):
|
||||
|
||||
found_node_group = None
|
||||
node_groups = cluster_spec.node_groups
|
||||
for node_group in node_groups.values():
|
||||
if component in node_group.components:
|
||||
found_node_group = node_group.name
|
||||
|
||||
for host in servers:
|
||||
if host.role == found_node_group:
|
||||
return host
|
||||
|
||||
raise Exception(
|
||||
'Server component [{0}] not specified in configuration'.format(
|
||||
component))
|
||||
|
||||
def _install_services(self, cluster_name, ambari_info):
|
||||
LOG.info('Installing required Hadoop services ...')
|
||||
|
||||
|
@ -484,12 +466,12 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
|||
'default-cluster.template'), 'r') as f:
|
||||
return clusterspec.ClusterSpec(f.read())
|
||||
|
||||
def _set_cluster_info(self, cluster, cluster_spec, hosts, ambari_info):
|
||||
def _set_cluster_info(self, cluster, cluster_spec, ambari_info):
|
||||
info = cluster.info
|
||||
|
||||
try:
|
||||
jobtracker_ip = self._determine_host_for_server_component(
|
||||
'JOBTRACKER', cluster_spec, hosts).management_ip
|
||||
jobtracker_ip = cluster_spec.determine_host_for_server_component(
|
||||
'JOBTRACKER').management_ip
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
|
@ -498,8 +480,8 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
|||
}
|
||||
|
||||
try:
|
||||
namenode_ip = self._determine_host_for_server_component(
|
||||
'NAMENODE', cluster_spec, hosts).management_ip
|
||||
namenode_ip = cluster_spec.determine_host_for_server_component(
|
||||
'NAMENODE').management_ip
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
|
@ -683,7 +665,6 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
|||
cluster_spec = clusterspec.ClusterSpec(
|
||||
json.dumps(processor.blueprint), cluster=cluster)
|
||||
ambari_uri = self._get_ambari_uri(cluster_spec)
|
||||
hosts = self._get_servers(cluster)
|
||||
|
||||
servers = []
|
||||
for instance in instances:
|
||||
|
@ -693,7 +674,7 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
|||
[host_role],
|
||||
ambari_uri=ambari_uri))
|
||||
|
||||
ambari_info = self.get_ambari_info(cluster_spec, hosts)
|
||||
ambari_info = self.get_ambari_info(cluster_spec)
|
||||
self._update_ambari_info_credentials(cluster_spec, ambari_info)
|
||||
|
||||
for server in servers:
|
||||
|
@ -732,9 +713,9 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
|||
ambari_config = cluster_spec.configurations['ambari']
|
||||
return ambari_config.get('repo.uri', None)
|
||||
|
||||
def get_ambari_info(self, cluster_spec, hosts):
|
||||
ambari_host = self._determine_host_for_server_component(
|
||||
'AMBARI_SERVER', cluster_spec, hosts)
|
||||
def get_ambari_info(self, cluster_spec):
|
||||
ambari_host = cluster_spec.determine_host_for_server_component(
|
||||
'AMBARI_SERVER')
|
||||
|
||||
port = cluster_spec.configurations['ambari'].get(
|
||||
'server.port', '8080')
|
||||
|
|
|
@ -15,10 +15,47 @@
|
|||
|
||||
import os
|
||||
from savanna.openstack.common import jsonutils as json
|
||||
from savanna.openstack.common import log as logging
|
||||
from savanna.plugins.hdp import configprovider as cfg
|
||||
from savanna.plugins.hdp import savannautils as utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ClusterSpec():
|
||||
def __init__(self, cluster_template, cluster=None):
|
||||
self.services = []
|
||||
self.configurations = {}
|
||||
self.node_groups = {}
|
||||
self.servers = None
|
||||
self.str = cluster_template
|
||||
|
||||
if cluster:
|
||||
self.servers = self._get_servers_from_savanna_cluster(cluster)
|
||||
host_manifest = self._generate_host_manifest()
|
||||
cluster_template = self._replace_config_tokens(cluster_template)
|
||||
|
||||
self.str = self._add_manifest_to_config(
|
||||
cluster_template, host_manifest)
|
||||
|
||||
template_json = json.loads(self.str)
|
||||
self._parse_services(template_json)
|
||||
self._parse_configurations(template_json)
|
||||
self._parse_host_component_mappings(template_json)
|
||||
|
||||
def determine_host_for_server_component(self, component):
|
||||
host = None
|
||||
for server in self.servers:
|
||||
node_processes = utils.get_node_processes(server)
|
||||
if node_processes is not None and component in node_processes:
|
||||
host = server
|
||||
break
|
||||
|
||||
return host
|
||||
|
||||
def normalize(self):
|
||||
return NormalizedClusterConfig(self)
|
||||
|
||||
def _get_servers_from_savanna_cluster(self, cluster):
|
||||
servers = []
|
||||
for node_group in cluster.node_groups:
|
||||
|
@ -29,49 +66,6 @@ class ClusterSpec():
|
|||
|
||||
return servers
|
||||
|
||||
def __init__(self, cluster_template, cluster=None):
|
||||
self.services = []
|
||||
self.configurations = {}
|
||||
self.node_groups = {}
|
||||
self.str = cluster_template
|
||||
|
||||
servers = []
|
||||
if cluster is not None:
|
||||
if hasattr(cluster, 'node_groups'):
|
||||
servers = self._get_servers_from_savanna_cluster(cluster)
|
||||
else:
|
||||
servers = cluster.instances
|
||||
|
||||
host_manifest = self._generate_host_manifest(servers)
|
||||
#TODO(jspeidel): don't hard code ambari server
|
||||
ambari_server = self._get_ambari_host(servers)
|
||||
if ambari_server is not None:
|
||||
cluster_template = cluster_template.replace('%AMBARI_HOST%',
|
||||
ambari_server.fqdn)
|
||||
else:
|
||||
raise RuntimeError('No Ambari server host found')
|
||||
|
||||
self.str = self._add_manifest_to_config(cluster_template,
|
||||
host_manifest)
|
||||
|
||||
template_json = json.loads(self.str)
|
||||
self._parse_services(template_json)
|
||||
self._parse_configurations(template_json)
|
||||
self._parse_host_component_mappings(template_json)
|
||||
|
||||
def _get_ambari_host(self, servers):
|
||||
# iterate thru servers and find the master server
|
||||
host = next((server for server in servers
|
||||
if server.node_processes is not None and
|
||||
'AMBARI_SERVER' in server.node_processes), None)
|
||||
if host is None:
|
||||
host = next((server for server in servers
|
||||
if server.role == 'MASTER'), None)
|
||||
return host
|
||||
|
||||
def normalize(self):
|
||||
return NormalizedClusterConfig(self)
|
||||
|
||||
def _parse_services(self, template_json):
|
||||
for s in template_json['services']:
|
||||
service = Service(s['name'])
|
||||
|
@ -121,12 +115,42 @@ class ClusterSpec():
|
|||
node_group.default_count = host['default_count']
|
||||
self.node_groups[node_group.name] = node_group
|
||||
|
||||
def _generate_host_manifest(self, servers):
|
||||
def _replace_config_tokens(self, cluster_template):
|
||||
ambari_server = self.determine_host_for_server_component(
|
||||
'AMBARI_SERVER')
|
||||
nn_server = self.determine_host_for_server_component(
|
||||
'NAMENODE')
|
||||
snn_server = self.determine_host_for_server_component(
|
||||
'SECONDARY_NAMENODE')
|
||||
jt_server = self.determine_host_for_server_component(
|
||||
'JOBTRACKER')
|
||||
|
||||
LOG.info('Replacing the following configuration tokens:')
|
||||
|
||||
LOG.info('%AMBARI_HOST% : {0}'.format(ambari_server.fqdn))
|
||||
cluster_template = cluster_template.replace(
|
||||
'%AMBARI_HOST%', ambari_server.fqdn)
|
||||
if nn_server:
|
||||
LOG.info('%NN_HOST% : {0}'.format(nn_server.fqdn))
|
||||
cluster_template = cluster_template.replace(
|
||||
'%NN_HOST%', nn_server.fqdn)
|
||||
if snn_server:
|
||||
LOG.info('%SNN_HOST% : {0}'.format(snn_server.fqdn))
|
||||
cluster_template = cluster_template.replace(
|
||||
'%SNN_HOST%', snn_server.fqdn)
|
||||
if jt_server:
|
||||
LOG.info('%JT_HOST% : {0}'.format(jt_server.fqdn))
|
||||
cluster_template = cluster_template.replace(
|
||||
'%JT_HOST%', jt_server.fqdn)
|
||||
|
||||
return cluster_template
|
||||
|
||||
def _generate_host_manifest(self):
|
||||
host_manifest = {}
|
||||
hosts = []
|
||||
host_id = 1
|
||||
|
||||
for server in servers:
|
||||
for server in self.servers:
|
||||
hosts.append({'host_id': host_id,
|
||||
'hostname': server.hostname,
|
||||
'role': server.role,
|
||||
|
|
|
@ -238,7 +238,7 @@
|
|||
{ "name" : "ipc.client.connection.maxidletime", "value" : "30000" },
|
||||
{ "name" : "ipc.client.connect.max.retries", "value" : "50" },
|
||||
{ "name" : "webinterface.private.actions", "value" : "false" },
|
||||
{ "name" : "fs.default.name", "value" : "hdfs://%AMBARI_HOST%:8020" },
|
||||
{ "name" : "fs.default.name", "value" : "hdfs://%NN_HOST%:8020" },
|
||||
{ "name" : "fs.checkpoint.dir", "value" : "/hadoop/hdfs/namesecondary" },
|
||||
{ "name" : "fs.checkpoint.period", "value" : "21600" },
|
||||
{ "name" : "fs.checkpoint.size", "value" : "0.5" },
|
||||
|
@ -304,8 +304,8 @@
|
|||
{ "name" : "io.sort.mb", "value" : "200" },
|
||||
{ "name" : "io.sort.spill.percent", "value" : "0.9" },
|
||||
{ "name" : "mapred.system.dir", "value" : "/mapred/system" },
|
||||
{ "name" : "mapred.job.tracker", "value" : "%AMBARI_HOST%:50300" },
|
||||
{ "name" : "mapred.job.tracker.http.address", "value" : "%AMBARI_HOST%:50030" },
|
||||
{ "name" : "mapred.job.tracker", "value" : "%JT_HOST%:50300" },
|
||||
{ "name" : "mapred.job.tracker.http.address", "value" : "%JT_HOST%:50030" },
|
||||
{ "name" : "mapred.userlog.retain.hours", "value" : "24" },
|
||||
{ "name" : "mapred.jobtracker.maxtasks.per.job", "value" : "-1" },
|
||||
{ "name" : "mapred.task.tracker.task-controller", "value" : "org.apache.hadoop.mapred.DefaultTaskController" },
|
||||
|
@ -314,7 +314,7 @@
|
|||
{ "name" : "mapreduce.jobtracker.keytab.file", "value" : "/etc/security/keytabs/jt.service.keytab" },
|
||||
{ "name" : "mapreduce.tasktracker.keytab.file", "value" : "/etc/security/keytabs/tt.service.keytab" },
|
||||
{ "name" : "mapreduce.history.server.embedded", "value" : "false" },
|
||||
{ "name" : "mapreduce.history.server.http.address", "value" : "%AMBARI_HOST%:51111" },
|
||||
{ "name" : "mapreduce.history.server.http.address", "value" : "%JT_HOST%:51111" },
|
||||
{ "name" : "mapreduce.jobhistory.kerberos.principal", "value" : "jt/_HOST@EXAMPLE.COM" },
|
||||
{ "name" : "mapreduce.jobhistory.keytab.file", "value" : "/etc/security/keytabs/jt.service.keytab" }
|
||||
]
|
||||
|
@ -354,19 +354,19 @@
|
|||
{ "name" : "dfs.replication", "value" : "3" },
|
||||
{ "name" : "dfs.datanode.address", "value" : "0.0.0.0:50010" },
|
||||
{ "name" : "dfs.datanode.http.address", "value" : "0.0.0.0:50075" },
|
||||
{ "name" : "dfs.http.address", "value" : "%AMBARI_HOST%:50070" },
|
||||
{ "name" : "dfs.http.address", "value" : "%NN_HOST%:50070" },
|
||||
{ "name" : "dfs.datanode.du.reserved", "value" : "1" },
|
||||
{ "name" : "dfs.namenode.kerberos.principal", "value" : "nn/_HOST@EXAMPLE.COM" },
|
||||
{ "name" : "dfs.secondary.namenode.kerberos.principal", "value" : "nn/_HOST@EXAMPLE.COM" },
|
||||
{ "name" : "dfs.namenode.kerberos.https.principal", "value" : "host/_HOST@EXAMPLE.COM" },
|
||||
{ "name" : "dfs.secondary.namenode.kerberos.https.principal", "value" : "host/_HOST@EXAMPLE.COM" },
|
||||
{ "name" : "dfs.secondary.http.address", "value" : "%AMBARI_HOST%:50090" },
|
||||
{ "name" : "dfs.secondary.http.address", "value" : "%SNN_HOST%:50090" },
|
||||
{ "name" : "dfs.web.authentication.kerberos.keytab", "value" : "/etc/security/keytabs/spnego.service.keytab" },
|
||||
{ "name" : "dfs.datanode.kerberos.principal", "value" : "dn/_HOST@EXAMPLE.COM" },
|
||||
{ "name" : "dfs.namenode.keytab.file", "value" : "/etc/security/keytabs/nn.service.keytab" },
|
||||
{ "name" : "dfs.secondary.namenode.keytab.file", "value" : "/etc/security/keytabs/nn.service.keytab" },
|
||||
{ "name" : "dfs.datanode.keytab.file", "value" : "/etc/security/keytabs/dn.service.keytab" },
|
||||
{ "name" : "dfs.https.address", "value" : "%AMBARI_HOST%:50470" },
|
||||
{ "name" : "dfs.https.address", "value" : "%NN_HOST%:50470" },
|
||||
{ "name" : "dfs.datanode.data.dir.perm", "value" : "750" }
|
||||
]
|
||||
},
|
||||
|
|
|
@ -13,8 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
|
||||
import pkg_resources as pkg
|
||||
from savanna.plugins.hdp import ambariplugin as ap
|
||||
from savanna.plugins.hdp import clusterspec as cs
|
||||
|
@ -54,38 +52,30 @@ class AmbariPluginTest(unittest2.TestCase):
|
|||
|
||||
def test_convert(self):
|
||||
plugin = ap.AmbariPlugin()
|
||||
cluster = TestCluster()
|
||||
with open(os.path.join(os.path.realpath('../plugins'), 'hdp',
|
||||
'resources',
|
||||
'default-cluster.template'), 'r') as f:
|
||||
plugin.convert(cluster, f.read())
|
||||
with open(os.path.join(os.path.realpath('../plugins'), 'hdp',
|
||||
'resources',
|
||||
'default-cluster.template'), 'r') as f:
|
||||
normalized_config = cs.ClusterSpec(f.read()).normalize()
|
||||
cluster = TestCluster([])
|
||||
|
||||
cluster_config_file = pkg.resource_string(
|
||||
version.version_info.package,
|
||||
'plugins/hdp/resources/default-cluster.template')
|
||||
|
||||
plugin.convert(cluster, cluster_config_file)
|
||||
normalized_config = cs.ClusterSpec(cluster_config_file).normalize()
|
||||
|
||||
self.assertEquals(normalized_config.hadoop_version,
|
||||
cluster.hadoop_version)
|
||||
self.assertEquals(len(normalized_config.node_groups),
|
||||
len(cluster.node_groups))
|
||||
|
||||
def test_update_infra(self):
|
||||
plugin = ap.AmbariPlugin()
|
||||
cluster = TestCluster()
|
||||
plugin.update_infra(cluster)
|
||||
|
||||
for node_group in cluster.node_groups:
|
||||
self.assertEquals(cluster.default_image_id, node_group.image)
|
||||
|
||||
def test__set_ambari_credentials__admin_only(self):
|
||||
self.requests = []
|
||||
plugin = ap.AmbariPlugin()
|
||||
plugin._get_rest_request = self._get_test_request
|
||||
|
||||
with open(os.path.join(os.path.realpath('../plugins'), 'hdp',
|
||||
'resources',
|
||||
'default-cluster.template'), 'r') as f:
|
||||
cluster_spec = cs.ClusterSpec(f.read())
|
||||
cluster_config_file = pkg.resource_string(
|
||||
version.version_info.package,
|
||||
'plugins/hdp/resources/default-cluster.template')
|
||||
|
||||
cluster_spec = cs.ClusterSpec(cluster_config_file)
|
||||
|
||||
ambari_info = ap.AmbariInfo(TestHost('111.11.1111'),
|
||||
'8080', 'admin', 'old-pwd')
|
||||
|
@ -107,10 +97,11 @@ class AmbariPluginTest(unittest2.TestCase):
|
|||
plugin = ap.AmbariPlugin()
|
||||
plugin._get_rest_request = self._get_test_request
|
||||
|
||||
with open(os.path.join(os.path.realpath('../plugins'), 'hdp',
|
||||
'resources',
|
||||
'default-cluster.template'), 'r') as f:
|
||||
cluster_spec = cs.ClusterSpec(f.read())
|
||||
cluster_config_file = pkg.resource_string(
|
||||
version.version_info.package,
|
||||
'plugins/hdp/resources/default-cluster.template')
|
||||
|
||||
cluster_spec = cs.ClusterSpec(cluster_config_file)
|
||||
|
||||
for service in cluster_spec.services:
|
||||
if service.name == 'AMBARI':
|
||||
|
@ -145,10 +136,11 @@ class AmbariPluginTest(unittest2.TestCase):
|
|||
plugin = ap.AmbariPlugin()
|
||||
plugin._get_rest_request = self._get_test_request
|
||||
|
||||
with open(os.path.join(os.path.realpath('../plugins'), 'hdp',
|
||||
'resources',
|
||||
'default-cluster.template'), 'r') as f:
|
||||
cluster_spec = cs.ClusterSpec(f.read())
|
||||
cluster_config_file = pkg.resource_string(
|
||||
version.version_info.package,
|
||||
'plugins/hdp/resources/default-cluster.template')
|
||||
|
||||
cluster_spec = cs.ClusterSpec(cluster_config_file)
|
||||
|
||||
for service in cluster_spec.services:
|
||||
if service.name == 'AMBARI':
|
||||
|
@ -184,10 +176,11 @@ class AmbariPluginTest(unittest2.TestCase):
|
|||
plugin = ap.AmbariPlugin()
|
||||
plugin._get_rest_request = self._get_test_request
|
||||
|
||||
with open(os.path.join(os.path.realpath('../plugins'), 'hdp',
|
||||
'resources',
|
||||
'default-cluster.template'), 'r') as f:
|
||||
cluster_spec = cs.ClusterSpec(f.read())
|
||||
cluster_config_file = pkg.resource_string(
|
||||
version.version_info.package,
|
||||
'plugins/hdp/resources/default-cluster.template')
|
||||
|
||||
cluster_spec = cs.ClusterSpec(cluster_config_file)
|
||||
|
||||
for service in cluster_spec.services:
|
||||
if service.name == 'AMBARI':
|
||||
|
@ -207,20 +200,24 @@ class AmbariPluginTest(unittest2.TestCase):
|
|||
version.version_info.package,
|
||||
'plugins/hdp/resources/default-cluster.template')
|
||||
|
||||
cluster_config = cs.ClusterSpec(cluster_config_file)
|
||||
test_host = TestServer(
|
||||
'host1', 'test-master', '111.11.1111',
|
||||
'222.11.1111', 'img1', '3', node_processes=['AMBARI_SERVER'])
|
||||
|
||||
node_group = TestNodeGroup([test_host], 'test-master')
|
||||
cluster = TestCluster([node_group])
|
||||
cluster_config = cs.ClusterSpec(cluster_config_file, cluster=cluster)
|
||||
plugin = ap.AmbariPlugin()
|
||||
|
||||
#change port
|
||||
cluster_config.configurations['ambari']['server.port'] = '9000'
|
||||
ambari_info = plugin.get_ambari_info(
|
||||
cluster_config, [TestHost('111.11.1111', 'master')])
|
||||
ambari_info = plugin.get_ambari_info(cluster_config)
|
||||
|
||||
self.assertEqual('9000', ambari_info.port)
|
||||
|
||||
#remove port
|
||||
del cluster_config.configurations['ambari']['server.port']
|
||||
ambari_info = plugin.get_ambari_info(
|
||||
cluster_config, [TestHost('111.11.1111', 'master')])
|
||||
ambari_info = plugin.get_ambari_info(cluster_config)
|
||||
|
||||
self.assertEqual('8080', ambari_info.port)
|
||||
|
||||
|
@ -246,13 +243,23 @@ class AmbariPluginTest(unittest2.TestCase):
|
|||
|
||||
|
||||
class TestCluster:
|
||||
def __init__(self):
|
||||
def __init__(self, node_groups):
|
||||
self.hadoop_version = None
|
||||
self.cluster_configs = {}
|
||||
self.node_groups = []
|
||||
self.node_groups = node_groups
|
||||
self.default_image_id = '11111'
|
||||
|
||||
|
||||
class TestNodeGroup:
|
||||
def __init__(self, instances, name):
|
||||
self.instances = instances
|
||||
self.name = name
|
||||
self.node_processes = []
|
||||
|
||||
for np in instances[0].node_processes:
|
||||
self.node_processes.append(np)
|
||||
|
||||
|
||||
class TestRequest:
|
||||
def put(self, url, data=None, auth=None):
|
||||
self.url = url
|
||||
|
@ -285,7 +292,27 @@ class TestResult:
|
|||
self.text = ''
|
||||
|
||||
|
||||
class TestServer:
|
||||
def __init__(self, hostname, role, public_ip, private_ip, image, flavor,
|
||||
node_processes=None):
|
||||
self.hostname = hostname
|
||||
self.fqdn = hostname
|
||||
self.role = role
|
||||
self.management_ip = public_ip
|
||||
self.public_ip = public_ip
|
||||
self.internal_ip = private_ip
|
||||
self.node_processes = node_processes
|
||||
self.nova_info = TestNovaInfo(image, flavor)
|
||||
|
||||
|
||||
class TestHost:
|
||||
def __init__(self, management_ip, role=None):
|
||||
self.management_ip = management_ip
|
||||
self.role = role
|
||||
self.node_processes = []
|
||||
|
||||
|
||||
class TestNovaInfo:
|
||||
def __init__(self, image, flavor):
|
||||
self.image = image
|
||||
self.flavor = flavor
|
||||
|
|
|
@ -28,22 +28,21 @@ class ClusterSpecTest(unittest2.TestCase):
|
|||
version.version_info.package,
|
||||
'plugins/hdp/resources/default-cluster.template')
|
||||
|
||||
servers = []
|
||||
server1 = TestServer('host1', 'master', '11111', 3, '111.11.1111',
|
||||
server1 = TestServer('host1', 'test-master', '11111', 3, '111.11.1111',
|
||||
'222.11.1111',
|
||||
node_processes=["namenode", "jobtracker",
|
||||
"secondary_namenode",
|
||||
"ganglia_server",
|
||||
"ganglia_monitor",
|
||||
"nagios_server", "AMBARI_SERVER",
|
||||
"ambari_agent"])
|
||||
server2 = TestServer('host2', 'slave', '11111', 3, '222.22.2222',
|
||||
'333.22.2222')
|
||||
servers.append(server1)
|
||||
servers.append(server2)
|
||||
node_processes=["NAMENODE", "JOBTRACKER",
|
||||
"SECONDARY_NAMENODE",
|
||||
"GANGLIA_SERVER",
|
||||
"GANGLIA_MONITOR",
|
||||
"NAGIOS_SERVER", "AMBARI_SERVER",
|
||||
"AMBARI_AGENT"])
|
||||
server2 = TestServer('host2', 'test-slave', '11111', 3, '222.22.2222',
|
||||
'333.22.2222',
|
||||
node_processes=['DATANODE', 'AMBARI_AGENT'])
|
||||
|
||||
cluster = TestCluster()
|
||||
cluster.instances = servers
|
||||
node_group1 = TestNodeGroup([server1])
|
||||
node_group2 = TestNodeGroup([server2])
|
||||
cluster = TestCluster([node_group1, node_group2])
|
||||
|
||||
cluster_config = cs.ClusterSpec(cluster_config_file, cluster)
|
||||
|
||||
|
@ -58,31 +57,99 @@ class ClusterSpecTest(unittest2.TestCase):
|
|||
version.version_info.package,
|
||||
'plugins/hdp/resources/default-cluster.template')
|
||||
|
||||
servers = []
|
||||
server1 = TestServer('ambari_machine', 'master', '11111', 3,
|
||||
'111.11.1111', '222.11.1111',
|
||||
node_processes=["namenode", "jobtracker",
|
||||
"secondary_namenode",
|
||||
"ganglia_server",
|
||||
"ganglia_monitor",
|
||||
"nagios_server", "AMBARI_SERVER",
|
||||
"ambari_agent"])
|
||||
node_processes=["NAMENODE", "JOBTRACKER",
|
||||
"SECONDARY_NAMENODE",
|
||||
"GANGLIA_SERVER",
|
||||
"GANGLIA_MONITOR",
|
||||
"NAGIOS_SERVER", "AMBARI_SERVER",
|
||||
"AMBARI_AGENT"])
|
||||
server2 = TestServer('host2', 'slave', '11111', 3, '222.22.2222',
|
||||
'333.22.2222',
|
||||
node_processes=["datanode", "tasktracker",
|
||||
"ganglia_monitor", "hdfs_client",
|
||||
"mapreduce_client",
|
||||
"ambari_agent"])
|
||||
servers.append(server1)
|
||||
servers.append(server2)
|
||||
node_processes=["DATANODE", "TASKTRACKER",
|
||||
"GANGLIA_MONITOR", "HDFS_CLIENT",
|
||||
"MAPREDUCE_CLIENT",
|
||||
"AMBARI_AGENT"])
|
||||
|
||||
cluster = TestCluster
|
||||
cluster.instances = servers
|
||||
node_group1 = TestNodeGroup([server1])
|
||||
node_group2 = TestNodeGroup([server2])
|
||||
cluster = TestCluster([node_group1, node_group2])
|
||||
|
||||
cluster_config = cs.ClusterSpec(cluster_config_file, cluster)
|
||||
self.assertIn('ambari_machine', cluster_config.str,
|
||||
'Ambari host not found')
|
||||
|
||||
def test_config_token_replacement(self):
|
||||
cluster_config_file = pkg.resource_string(
|
||||
version.version_info.package,
|
||||
'plugins/hdp/resources/default-cluster.template')
|
||||
|
||||
master_host = TestServer(
|
||||
'master.novalocal', 'master', '11111', 3,
|
||||
'111.11.1111', '222.11.1111',
|
||||
node_processes=["GANGLIA_SERVER",
|
||||
"GANGLIA_MONITOR",
|
||||
"NAGIOIS_SERVER",
|
||||
"AMBARI_SERVER",
|
||||
"AMBARI_AGENT"])
|
||||
|
||||
jt_host = TestServer(
|
||||
'jt_host.novalocal', 'jt', '11111', 3,
|
||||
'111.11.2222', '222.11.2222',
|
||||
node_processes=["JOBTRACKER",
|
||||
"GANGLIA_MONITOR",
|
||||
"AMBARI_AGENT"])
|
||||
|
||||
nn_host = TestServer(
|
||||
'nn_host.novalocal', 'nn', '11111', 3,
|
||||
'111.11.3333', '222.11.3333',
|
||||
node_processes=["NAMENODE",
|
||||
"GANGLIA_MONITOR",
|
||||
"AMBARI_AGENT"])
|
||||
|
||||
snn_host = TestServer(
|
||||
'snn_host.novalocal', 'jt', '11111', 3,
|
||||
'111.11.4444', '222.11.4444',
|
||||
node_processes=["SECONDARY_NAMENODE",
|
||||
"GANGLIA_MONITOR",
|
||||
"AMBARI_AGENT"])
|
||||
|
||||
slave_host = TestServer(
|
||||
'slave1.novalocal', 'slave', '11111', 3,
|
||||
'222.22.5555', '333.22.5555',
|
||||
node_processes=["DATANODE", "TASKTRACKER",
|
||||
"GANGLIA_MONITOR", "HDFS_CLIENT",
|
||||
"MAPREDUCE_CLIENT",
|
||||
"AMBARI_AGENT"])
|
||||
|
||||
master_ng = TestNodeGroup([master_host])
|
||||
jt_ng = TestNodeGroup([jt_host])
|
||||
nn_ng = TestNodeGroup([nn_host])
|
||||
snn_ng = TestNodeGroup([snn_host])
|
||||
slave_ng = TestNodeGroup([slave_host])
|
||||
|
||||
cluster = TestCluster([master_ng, jt_ng, nn_ng, snn_ng, slave_ng])
|
||||
cluster_config = cs.ClusterSpec(cluster_config_file, cluster)
|
||||
|
||||
config = cluster_config.str
|
||||
self.assertIn('"fs.default.name", "value" : '
|
||||
'"hdfs://nn_host.novalocal:8020"', config)
|
||||
self.assertIn('"mapred.job.tracker", "value" : '
|
||||
'"jt_host.novalocal:50300"', config)
|
||||
self.assertIn('"mapred.job.tracker", "value" : '
|
||||
'"jt_host.novalocal:50300"', config)
|
||||
self.assertIn('"mapred.job.tracker.http.address", "value" : '
|
||||
'"jt_host.novalocal:50030"', config)
|
||||
self.assertIn('"mapreduce.history.server.http.address", "value" : '
|
||||
'"jt_host.novalocal:51111"', config)
|
||||
self.assertIn('"dfs.http.address", "value" : '
|
||||
'"nn_host.novalocal:50070"', config)
|
||||
self.assertIn('"dfs.secondary.http.address", "value" : '
|
||||
'"snn_host.novalocal:50090"', config)
|
||||
self.assertIn('"dfs.https.address", "value" : '
|
||||
'"nn_host.novalocal:50470"', config)
|
||||
|
||||
def test_ambari_rpm_path(self):
|
||||
cluster_config_file = pkg.resource_string(
|
||||
version.version_info.package,
|
||||
|
@ -356,19 +423,41 @@ class TestServer():
|
|||
self.hostname = hostname
|
||||
self.fqdn = hostname
|
||||
self.role = role
|
||||
self.nova_info = TestNova
|
||||
self.nova_info.image = img
|
||||
self.nova_info.flavor = flavor
|
||||
|
||||
self.instance_info = InstanceInfo(
|
||||
hostname, img, flavor, public_ip, private_ip)
|
||||
self.management_ip = public_ip
|
||||
self.public_ip = public_ip
|
||||
self.internal_ip = private_ip
|
||||
self.node_processes = node_processes
|
||||
self.nova_info = TestNovaInfo(img, flavor)
|
||||
|
||||
|
||||
class TestNova():
|
||||
image = None
|
||||
flavor = None
|
||||
class InstanceInfo():
|
||||
def __init__(self, hostname, image, flavor, management_ip, internal_ip):
|
||||
self.hostname = hostname
|
||||
self.image = image
|
||||
self.flavor = flavor
|
||||
self.management_ip = management_ip
|
||||
self.internal_ip = internal_ip
|
||||
|
||||
|
||||
class TestCluster():
|
||||
instances = []
|
||||
def __init__(self, node_groups):
|
||||
self.node_groups = node_groups
|
||||
|
||||
|
||||
class TestNodeGroup():
|
||||
def __init__(self, instances):
|
||||
self.instances = instances
|
||||
self.name = instances[0].role
|
||||
self.node_processes = []
|
||||
|
||||
for np in instances[0].node_processes:
|
||||
self.node_processes.append(np)
|
||||
|
||||
|
||||
class TestNovaInfo:
|
||||
def __init__(self, image, flavor):
|
||||
self.image = image
|
||||
self.flavor = flavor
|
||||
|
|
Loading…
Reference in New Issue