Added rack awareness in HDP plugin

Rack awareness is added in HDP plugin for maintain data locality
feature of Hadoop.
Also renamed decomission_helper.py to requests_helper.py

note: Ambari allows to set up rack info through cluster creation
blueprint, but it doesn't work for scaling so that is why we manually
adds rack info for new instances (when scaling) through Ambari API and
that is why we manually restarts HDFS and MAPREDUCE2 services after
scaling.

closes-bug: 1618831
Change-Id: I2a696ea534d0a93f7e88c5e22ef99b008248c875
This commit is contained in:
Michael Ionkin 2016-08-26 14:14:04 +03:00
parent 8ddf82f380
commit 9f8c2197c0
8 changed files with 137 additions and 24 deletions

View File

@ -0,0 +1,3 @@
---
features:
- Added rack awareness feature for HDP plugin

View File

@ -20,7 +20,7 @@ from requests import auth
from sahara import context
from sahara.i18n import _
from sahara.plugins.ambari import decomission_helper as d_helper
from sahara.plugins.ambari import requests_helper as r_helper
from sahara.plugins import exceptions as p_exc
@ -204,14 +204,14 @@ class AmbariClient(object):
def decommission_nodemanagers(self, cluster_name, instances):
url = self._base_url + "/clusters/%s/requests" % cluster_name
data = d_helper.build_nodemanager_decommission_request(cluster_name,
data = r_helper.build_nodemanager_decommission_request(cluster_name,
instances)
resp = self.post(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
def decommission_datanodes(self, cluster_name, instances):
url = self._base_url + "/clusters/%s/requests" % cluster_name
data = d_helper.build_datanode_decommission_request(cluster_name,
data = r_helper.build_datanode_decommission_request(cluster_name,
instances)
resp = self.post(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
@ -237,17 +237,29 @@ class AmbariClient(object):
def restart_namenode(self, cluster_name, instance):
url = self._base_url + "/clusters/%s/requests" % cluster_name
data = d_helper.build_namenode_restart_request(cluster_name, instance)
data = r_helper.build_namenode_restart_request(cluster_name, instance)
resp = self.post(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
def restart_resourcemanager(self, cluster_name, instance):
url = self._base_url + "/clusters/%s/requests" % cluster_name
data = d_helper.build_resourcemanager_restart_request(cluster_name,
data = r_helper.build_resourcemanager_restart_request(cluster_name,
instance)
resp = self.post(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
def restart_service(self, cluster_name, service_name):
url = self._base_url + "/clusters/{}/services/{}".format(
cluster_name, service_name)
data = r_helper.build_stop_service_request(service_name)
resp = self.put(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
data = r_helper.build_start_service_request(service_name)
resp = self.put(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
def delete_host(self, cluster_name, instance):
url = self._base_url + "/clusters/%s/hosts/%s" % (cluster_name,
instance.fqdn())
@ -283,6 +295,17 @@ class AmbariClient(object):
resp = self.put(url, data=jsonutils.dumps(data))
self.check_response(resp)
def set_rack_info_for_instance(self, cluster_name, instance, rack_name):
url = self._base_url + "/clusters/%s/hosts/%s" % (
cluster_name, instance.fqdn())
data = {
"Hosts": {
"rack_info": rack_name
}
}
resp = self.put(url, data=jsonutils.dumps(data))
self.check_response(resp)
def get_request_info(self, cluster_name, request_id):
url = self._base_url + ("/clusters/%s/requests/%s" %
(cluster_name, request_id))

View File

@ -25,6 +25,7 @@ HDFS_SERVICE = "HDFS"
HIVE_SERVICE = "Hive"
KAFKA_SERVICE = "Kafka"
KNOX_SERVICE = "Knox"
MAPREDUCE2_SERVICE = "MAPREDUCE2"
OOZIE_SERVICE = "Oozie"
RANGER_SERVICE = "Ranger"
SLIDER_SERVICE = "Slider"

View File

@ -29,6 +29,7 @@ from sahara.plugins.ambari import configs
from sahara.plugins.ambari import ha_helper
from sahara.plugins import kerberos
from sahara.plugins import utils as plugin_utils
from sahara.topology import topology_helper as t_helper
from sahara.utils import poll_utils
@ -342,12 +343,15 @@ def _build_ambari_cluster_template(cluster):
if kerberos.is_kerberos_security_enabled(cluster):
cl_tmpl["credentials"] = _get_credentials(cluster)
cl_tmpl["security"] = {"type": "KERBEROS"}
topology = _configure_topology_data(cluster)
for ng in cluster.node_groups:
for instance in ng.instances:
host = {"fqdn": instance.fqdn()}
if t_helper.is_data_locality_enabled():
host["rack_info"] = topology[instance.instance_name]
cl_tmpl["host_groups"].append({
"name": instance.instance_name,
"hosts": [{"fqdn": instance.fqdn()}]
"hosts": [host]
})
return cl_tmpl
@ -467,18 +471,12 @@ def decommission_datanodes(cluster, instances):
def restart_namenode(cluster, instance):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
with _get_ambari_client(cluster) as client:
client.restart_namenode(cluster.name, instance)
def restart_resourcemanager(cluster, instance):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
with _get_ambari_client(cluster) as client:
client.restart_resourcemanager(cluster.name, instance)
@ -492,6 +490,11 @@ def restart_nns_and_rms(cluster):
restart_resourcemanager(cluster, rm)
def restart_service(cluster, service_name):
with _get_ambari_client(cluster) as client:
client.restart_service(cluster.name, service_name)
def remove_services_from_hosts(cluster, instances):
for inst in instances:
LOG.debug("Stopping and removing processes from host %s" % inst.fqdn())
@ -535,6 +538,29 @@ def _get_ambari_client(cluster):
return ambari_client.AmbariClient(ambari, password=password)
def _configure_topology_data(cluster):
if not t_helper.is_data_locality_enabled():
return {}
LOG.warning(_LW("Node group awareness is not implemented in YARN yet "
"so enable_hypervisor_awareness set to False "
"explicitly"))
return t_helper.generate_topology_map(cluster, is_node_awareness=False)
def configure_rack_awareness(cluster, instances):
if not t_helper.is_data_locality_enabled():
return
topology = _configure_topology_data(cluster)
with _get_ambari_client(cluster) as client:
for inst in instances:
client.set_rack_info_for_instance(
cluster.name, inst, topology[inst.instance_name])
client.restart_service(cluster.name, p_common.HDFS_SERVICE)
client.restart_service(cluster.name, p_common.MAPREDUCE2_SERVICE)
def add_hadoop_swift_jar(instances):
new_jar = "/opt/hadoop-openstack.jar"
for inst in instances:

View File

@ -191,6 +191,7 @@ class AmbariPluginProvider(p.ProvisioningPluginBase):
deploy.add_new_hosts(cluster, instances)
deploy.manage_config_groups(cluster, instances)
deploy.manage_host_components(cluster, instances)
deploy.configure_rack_awareness(cluster, instances)
swift_helper.install_ssl_certs(instances)
deploy.add_hadoop_swift_jar(instances)

View File

@ -55,6 +55,17 @@ _COMMON_RESTART_TEMPLATE = {
]
}
_COMMON_RESTART_SERVICE_TEMPLATE = {
"RequestInfo": {
"context": "",
},
"Body": {
"ServiceInfo": {
"state": ""
}
}
}
def build_datanode_decommission_request(cluster_name, instances):
tmpl = copy.deepcopy(_COMMON_DECOMMISSION_TEMPLATE)
@ -116,3 +127,19 @@ def build_resourcemanager_restart_request(cluster_name, rm_instance):
tmpl["Requests/resource_filters"][0]["hosts"] = rm_instance.fqdn()
return tmpl
def build_stop_service_request(service_name):
tmpl = copy.deepcopy(_COMMON_RESTART_SERVICE_TEMPLATE)
tmpl["RequestInfo"]["context"] = (
"Restart %s service (stopping)" % service_name)
tmpl["Body"]["ServiceInfo"]["state"] = "INSTALLED"
return tmpl
def build_start_service_request(service_name):
tmpl = copy.deepcopy(_COMMON_RESTART_SERVICE_TEMPLATE)
tmpl["RequestInfo"]["context"] = (
"Restart %s service (starting)" % service_name)
tmpl["Body"]["ServiceInfo"]["state"] = "STARTED"
return tmpl

View File

@ -15,14 +15,14 @@
import mock
from sahara.plugins.ambari import decomission_helper
from sahara.plugins.ambari import requests_helper
from sahara.tests.unit import base
class DecommissionHelperTestCase(base.SaharaTestCase):
class RequestsHelperTestCase(base.SaharaTestCase):
def setUp(self):
super(DecommissionHelperTestCase, self).setUp()
super(RequestsHelperTestCase, self).setUp()
self.i1 = mock.MagicMock()
self.i1.fqdn.return_value = "i1"
@ -33,8 +33,8 @@ class DecommissionHelperTestCase(base.SaharaTestCase):
c_name = "c1"
instances = [self.i1, self.i2]
res = decomission_helper.build_datanode_decommission_request(c_name,
instances)
res = requests_helper.build_datanode_decommission_request(c_name,
instances)
self.assertEqual("i1,i2",
res["RequestInfo"]["parameters"]["excluded_hosts"])
self.assertEqual("c1",
@ -44,7 +44,7 @@ class DecommissionHelperTestCase(base.SaharaTestCase):
c_name = "c1"
instances = [self.i1, self.i2]
res = decomission_helper.build_nodemanager_decommission_request(
res = requests_helper.build_nodemanager_decommission_request(
c_name, instances)
self.assertEqual("i1,i2",
@ -53,16 +53,44 @@ class DecommissionHelperTestCase(base.SaharaTestCase):
res["RequestInfo"]["operation_level"]["cluster_name"])
def test_build_namenode_restart_request(self):
res = decomission_helper.build_namenode_restart_request("c1", self.i1)
res = requests_helper.build_namenode_restart_request("c1", self.i1)
self.assertEqual("i1", res["Requests/resource_filters"][0]["hosts"])
self.assertEqual("c1",
res["RequestInfo"]["operation_level"]["cluster_name"])
def test_build_resourcemanager_restart_request(self):
res = decomission_helper.build_resourcemanager_restart_request("c1",
self.i1)
res = requests_helper.build_resourcemanager_restart_request("c1",
self.i1)
self.assertEqual("i1", res["Requests/resource_filters"][0]["hosts"])
self.assertEqual("c1",
res["RequestInfo"]["operation_level"]["cluster_name"])
def test_build_stop_service_request(self):
res = requests_helper.build_stop_service_request("HDFS")
expected = {
"RequestInfo": {
"context": "Restart HDFS service (stopping)",
},
"Body": {
"ServiceInfo": {
"state": "INSTALLED"
}
}
}
self.assertEqual(res, expected)
def test_build_start_service_request(self):
res = requests_helper.build_start_service_request("HDFS")
expected = {
"RequestInfo": {
"context": "Restart HDFS service (starting)",
},
"Body": {
"ServiceInfo": {
"state": "STARTED"
}
}
}
self.assertEqual(res, expected)

View File

@ -162,3 +162,7 @@ def vm_awareness_mapred_config():
def vm_awareness_all_config():
return vm_awareness_core_config() + vm_awareness_mapred_config()
def is_data_locality_enabled():
return CONF.enable_data_locality