improved scaling for cdh plugin

this fixes issues with decommissioning nodes,
when gateway nodes are included. additionally,
restarting of stale services are required to
for redeployment of configs.

Change-Id: I9d439936c0e2f851054735f0defba8efc592c84d
Closes-bug: 1597701
This commit is contained in:
Vitaly Gridnev 2016-07-14 19:14:52 +03:00
parent c63b579795
commit ea44774c50
7 changed files with 108 additions and 17 deletions

View File

@ -138,6 +138,32 @@ class ApiCluster(types.BaseApiResource):
"""
return self._cmd('start')
def restart(self, restart_only_stale_services=None,
redeploy_client_configuration=None,
restart_service_names=None):
"""Restart all services in the cluster. Services are restarted in the
appropriate order given their dependencies.
:param restart_only_stale_services: Only restart services that
have stale configuration and their dependent
services. Default is False.
:param redeploy_client_configuration: Re-deploy client configuration
for all services in the cluster. Default is False.
:param restart_service_names: Only restart services that are specified
and their dependent services.
:return: Reference to the submitted command.
"""
if self._get_resource_root().version < 6:
return self._cmd('restart')
args = dict()
args['restartOnlyStaleServices'] = restart_only_stale_services
args['redeployClientConfiguration'] = redeploy_client_configuration
if self._get_resource_root().version >= 11:
args['restartServiceNames'] = restart_service_names
return self._cmd('restart', data=args, api_version=6)
def deploy_client_config(self):
"""Deploys Service client configuration to the hosts on the cluster

View File

@ -106,10 +106,14 @@ class ClouderaUtils(object):
@cpo.event_wrapper(
True, step=_("Decommission nodes"), param=('cluster', 1))
def decommission_nodes(self, cluster, process, role_names):
def decommission_nodes(self, cluster, process,
decommission_roles, roles_to_delete=None):
service = self.get_service_by_role(process, cluster)
service.decommission(*role_names).wait()
for role_name in role_names:
service.decommission(*decommission_roles).wait()
# not all roles should be decommissioned
if roles_to_delete:
decommission_roles.extend(roles_to_delete)
for role_name in decommission_roles:
service.delete_role(role_name)
@cpo.event_wrapper(
@ -131,6 +135,15 @@ class ClouderaUtils(object):
for st in service.refresh(nd):
yield st
@cpo.event_wrapper(
True, step=_("Restart stale services"), param=('cluster', 1))
@cloudera_cmd
def restart_stale_services(self, cluster):
cm_cluster = self.get_cloudera_cluster(cluster)
yield cm_cluster.restart(
restart_only_stale_services=True,
redeploy_client_configuration=True)
@cpo.event_wrapper(True, step=_("Deploy configs"), param=('cluster', 1))
@cloudera_cmd
def deploy_configs(self, cluster):

View File

@ -104,29 +104,41 @@ def scale_cluster(cluster, instances):
CU.configure_instances(instances, cluster)
CU.update_configs(instances)
CU.pu.configure_swift(cluster, instances)
CU.refresh_datanodes(cluster)
_start_roles(cluster, instances)
CU.refresh_datanodes(cluster)
CU.refresh_yarn_nodes(cluster)
CU.restart_stale_services(cluster)
def decommission_cluster(cluster, instances):
dns = []
dns_to_delete = []
nms = []
nms_to_delete = []
for i in instances:
if 'HDFS_DATANODE' in i.node_group.node_processes:
dns.append(CU.pu.get_role_name(i, 'DATANODE'))
dns_to_delete.append(
CU.pu.get_role_name(i, 'HDFS_GATEWAY'))
if 'YARN_NODEMANAGER' in i.node_group.node_processes:
nms.append(CU.pu.get_role_name(i, 'NODEMANAGER'))
nms_to_delete.append(
CU.pu.get_role_name(i, 'YARN_GATEWAY'))
if dns:
CU.decommission_nodes(cluster, 'DATANODE', dns)
CU.decommission_nodes(
cluster, 'DATANODE', dns, dns_to_delete)
if nms:
CU.decommission_nodes(cluster, 'NODEMANAGER', nms)
CU.decommission_nodes(
cluster, 'NODEMANAGER', nms, nms_to_delete)
CU.delete_instances(cluster, instances)
CU.refresh_datanodes(cluster)
CU.refresh_yarn_nodes(cluster)
CU.restart_stale_services(cluster)
@cpo.event_wrapper(True, step=_("Prepare cluster"), param=('cluster', 0))

View File

@ -106,29 +106,41 @@ def scale_cluster(cluster, instances):
CU.configure_instances(instances, cluster)
CU.update_configs(instances)
CU.pu.configure_swift(cluster, instances)
CU.refresh_datanodes(cluster)
_start_roles(cluster, instances)
CU.refresh_datanodes(cluster)
CU.refresh_yarn_nodes(cluster)
CU.restart_stale_services(cluster)
def decommission_cluster(cluster, instances):
dns = []
dns_to_delete = []
nms = []
nms_to_delete = []
for i in instances:
if 'HDFS_DATANODE' in i.node_group.node_processes:
dns.append(CU.pu.get_role_name(i, 'DATANODE'))
dns_to_delete.append(
CU.pu.get_role_name(i, 'HDFS_GATEWAY'))
if 'YARN_NODEMANAGER' in i.node_group.node_processes:
nms.append(CU.pu.get_role_name(i, 'NODEMANAGER'))
nms_to_delete.append(
CU.pu.get_role_name(i, 'YARN_GATEWAY'))
if dns:
CU.decommission_nodes(cluster, 'DATANODE', dns)
CU.decommission_nodes(
cluster, 'DATANODE', dns, dns_to_delete)
if nms:
CU.decommission_nodes(cluster, 'NODEMANAGER', nms)
CU.decommission_nodes(
cluster, 'NODEMANAGER', nms, nms_to_delete)
CU.delete_instances(cluster, instances)
CU.refresh_datanodes(cluster)
CU.refresh_yarn_nodes(cluster)
CU.restart_stale_services(cluster)
@cpo.event_wrapper(True, step=_("Prepare cluster"), param=('cluster', 0))

View File

@ -106,29 +106,41 @@ def scale_cluster(cluster, instances):
CU.configure_instances(instances, cluster)
CU.update_configs(instances)
CU.pu.configure_swift(cluster, instances)
CU.refresh_datanodes(cluster)
_start_roles(cluster, instances)
CU.refresh_datanodes(cluster)
CU.refresh_yarn_nodes(cluster)
CU.restart_stale_services(cluster)
def decommission_cluster(cluster, instances):
dns = []
dns_to_delete = []
nms = []
nms_to_delete = []
for i in instances:
if 'HDFS_DATANODE' in i.node_group.node_processes:
dns.append(CU.pu.get_role_name(i, 'DATANODE'))
dns_to_delete.append(
CU.pu.get_role_name(i, 'HDFS_GATEWAY'))
if 'YARN_NODEMANAGER' in i.node_group.node_processes:
nms.append(CU.pu.get_role_name(i, 'NODEMANAGER'))
nms_to_delete.append(
CU.pu.get_role_name(i, 'YARN_GATEWAY'))
if dns:
CU.decommission_nodes(cluster, 'DATANODE', dns)
CU.decommission_nodes(
cluster, 'DATANODE', dns, dns_to_delete)
if nms:
CU.decommission_nodes(cluster, 'NODEMANAGER', nms)
CU.decommission_nodes(
cluster, 'NODEMANAGER', nms, nms_to_delete)
CU.delete_instances(cluster, instances)
CU.refresh_datanodes(cluster)
CU.refresh_yarn_nodes(cluster)
CU.restart_stale_services(cluster)
@cpo.event_wrapper(True, step=_("Prepare cluster"), param=('cluster', 0))

View File

@ -106,29 +106,41 @@ def scale_cluster(cluster, instances):
CU.configure_instances(instances, cluster)
CU.update_configs(instances)
CU.pu.configure_swift(cluster, instances)
CU.refresh_datanodes(cluster)
_start_roles(cluster, instances)
CU.refresh_datanodes(cluster)
CU.refresh_yarn_nodes(cluster)
CU.restart_stale_services(cluster)
def decommission_cluster(cluster, instances):
dns = []
dns_to_delete = []
nms = []
nms_to_delete = []
for i in instances:
if 'HDFS_DATANODE' in i.node_group.node_processes:
dns.append(CU.pu.get_role_name(i, 'DATANODE'))
dns_to_delete.append(
CU.pu.get_role_name(i, 'HDFS_GATEWAY'))
if 'YARN_NODEMANAGER' in i.node_group.node_processes:
nms.append(CU.pu.get_role_name(i, 'NODEMANAGER'))
nms_to_delete.append(
CU.pu.get_role_name(i, 'YARN_GATEWAY'))
if dns:
CU.decommission_nodes(cluster, 'DATANODE', dns)
CU.decommission_nodes(
cluster, 'DATANODE', dns, dns_to_delete)
if nms:
CU.decommission_nodes(cluster, 'NODEMANAGER', nms)
CU.decommission_nodes(
cluster, 'NODEMANAGER', nms, nms_to_delete)
CU.delete_instances(cluster, instances)
CU.refresh_datanodes(cluster)
CU.refresh_yarn_nodes(cluster)
CU.restart_stale_services(cluster)
@cpo.event_wrapper(True, step=_("Prepare cluster"), param=('cluster', 0))

View File

@ -119,16 +119,20 @@ class DeployCDHV550(base.SaharaTestCase):
def test_decommission_cluster(self, mock_cu):
deploy.decommission_cluster(self.cluster, self.instances)
dns = []
dns_2 = []
nms = []
nms_2 = []
for i in self.instances:
if 'HDFS_DATANODE' in i.node_group.node_processes:
dns.append(mock_cu.pu.get_role_name(i, 'DATANODE'))
dns_2.append(mock_cu.pu.get_role_name(i, 'HDFS_GATEWAY'))
if 'YARN_NODEMANAGER' in i.node_group.node_processes:
nms.append(mock_cu.pu.get_role_name(i, 'NODEMANAGER'))
nms_2.append(mock_cu.pu.get_role_name(i, 'YARN_GATEWAY'))
mock_cu.decommission_nodes.assert_any_call(
self.cluster, 'DATANODE', dns)
self.cluster, 'DATANODE', dns, dns_2)
mock_cu.decommission_nodes.assert_any_call(
self.cluster, 'NODEMANAGER', nms)
self.cluster, 'NODEMANAGER', nms, nms_2)
mock_cu.delete_instances.assert_called_once_with(self.cluster,
self.instances)
mock_cu.refresh_datanodes.assert_called_once_with(self.cluster)