improved scaling for cdh plugin
this fixes issues with decommissioning nodes, when gateway nodes are included. additionally, restarting of stale services are required to for redeployment of configs. Change-Id: I9d439936c0e2f851054735f0defba8efc592c84d Closes-bug: 1597701
This commit is contained in:
parent
c63b579795
commit
ea44774c50
|
@ -138,6 +138,32 @@ class ApiCluster(types.BaseApiResource):
|
|||
"""
|
||||
return self._cmd('start')
|
||||
|
||||
def restart(self, restart_only_stale_services=None,
|
||||
redeploy_client_configuration=None,
|
||||
restart_service_names=None):
|
||||
"""Restart all services in the cluster. Services are restarted in the
|
||||
|
||||
appropriate order given their dependencies.
|
||||
:param restart_only_stale_services: Only restart services that
|
||||
have stale configuration and their dependent
|
||||
services. Default is False.
|
||||
:param redeploy_client_configuration: Re-deploy client configuration
|
||||
for all services in the cluster. Default is False.
|
||||
:param restart_service_names: Only restart services that are specified
|
||||
and their dependent services.
|
||||
:return: Reference to the submitted command.
|
||||
"""
|
||||
if self._get_resource_root().version < 6:
|
||||
return self._cmd('restart')
|
||||
|
||||
args = dict()
|
||||
args['restartOnlyStaleServices'] = restart_only_stale_services
|
||||
args['redeployClientConfiguration'] = redeploy_client_configuration
|
||||
if self._get_resource_root().version >= 11:
|
||||
args['restartServiceNames'] = restart_service_names
|
||||
|
||||
return self._cmd('restart', data=args, api_version=6)
|
||||
|
||||
def deploy_client_config(self):
|
||||
"""Deploys Service client configuration to the hosts on the cluster
|
||||
|
||||
|
|
|
@ -106,10 +106,14 @@ class ClouderaUtils(object):
|
|||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Decommission nodes"), param=('cluster', 1))
|
||||
def decommission_nodes(self, cluster, process, role_names):
|
||||
def decommission_nodes(self, cluster, process,
|
||||
decommission_roles, roles_to_delete=None):
|
||||
service = self.get_service_by_role(process, cluster)
|
||||
service.decommission(*role_names).wait()
|
||||
for role_name in role_names:
|
||||
service.decommission(*decommission_roles).wait()
|
||||
# not all roles should be decommissioned
|
||||
if roles_to_delete:
|
||||
decommission_roles.extend(roles_to_delete)
|
||||
for role_name in decommission_roles:
|
||||
service.delete_role(role_name)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
|
@ -131,6 +135,15 @@ class ClouderaUtils(object):
|
|||
for st in service.refresh(nd):
|
||||
yield st
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Restart stale services"), param=('cluster', 1))
|
||||
@cloudera_cmd
|
||||
def restart_stale_services(self, cluster):
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
yield cm_cluster.restart(
|
||||
restart_only_stale_services=True,
|
||||
redeploy_client_configuration=True)
|
||||
|
||||
@cpo.event_wrapper(True, step=_("Deploy configs"), param=('cluster', 1))
|
||||
@cloudera_cmd
|
||||
def deploy_configs(self, cluster):
|
||||
|
|
|
@ -104,29 +104,41 @@ def scale_cluster(cluster, instances):
|
|||
CU.configure_instances(instances, cluster)
|
||||
CU.update_configs(instances)
|
||||
CU.pu.configure_swift(cluster, instances)
|
||||
CU.refresh_datanodes(cluster)
|
||||
_start_roles(cluster, instances)
|
||||
CU.refresh_datanodes(cluster)
|
||||
CU.refresh_yarn_nodes(cluster)
|
||||
CU.restart_stale_services(cluster)
|
||||
|
||||
|
||||
def decommission_cluster(cluster, instances):
|
||||
dns = []
|
||||
dns_to_delete = []
|
||||
nms = []
|
||||
nms_to_delete = []
|
||||
for i in instances:
|
||||
if 'HDFS_DATANODE' in i.node_group.node_processes:
|
||||
dns.append(CU.pu.get_role_name(i, 'DATANODE'))
|
||||
dns_to_delete.append(
|
||||
CU.pu.get_role_name(i, 'HDFS_GATEWAY'))
|
||||
|
||||
if 'YARN_NODEMANAGER' in i.node_group.node_processes:
|
||||
nms.append(CU.pu.get_role_name(i, 'NODEMANAGER'))
|
||||
nms_to_delete.append(
|
||||
CU.pu.get_role_name(i, 'YARN_GATEWAY'))
|
||||
|
||||
if dns:
|
||||
CU.decommission_nodes(cluster, 'DATANODE', dns)
|
||||
CU.decommission_nodes(
|
||||
cluster, 'DATANODE', dns, dns_to_delete)
|
||||
|
||||
if nms:
|
||||
CU.decommission_nodes(cluster, 'NODEMANAGER', nms)
|
||||
CU.decommission_nodes(
|
||||
cluster, 'NODEMANAGER', nms, nms_to_delete)
|
||||
|
||||
CU.delete_instances(cluster, instances)
|
||||
|
||||
CU.refresh_datanodes(cluster)
|
||||
CU.refresh_yarn_nodes(cluster)
|
||||
CU.restart_stale_services(cluster)
|
||||
|
||||
|
||||
@cpo.event_wrapper(True, step=_("Prepare cluster"), param=('cluster', 0))
|
||||
|
|
|
@ -106,29 +106,41 @@ def scale_cluster(cluster, instances):
|
|||
CU.configure_instances(instances, cluster)
|
||||
CU.update_configs(instances)
|
||||
CU.pu.configure_swift(cluster, instances)
|
||||
CU.refresh_datanodes(cluster)
|
||||
_start_roles(cluster, instances)
|
||||
CU.refresh_datanodes(cluster)
|
||||
CU.refresh_yarn_nodes(cluster)
|
||||
CU.restart_stale_services(cluster)
|
||||
|
||||
|
||||
def decommission_cluster(cluster, instances):
|
||||
dns = []
|
||||
dns_to_delete = []
|
||||
nms = []
|
||||
nms_to_delete = []
|
||||
for i in instances:
|
||||
if 'HDFS_DATANODE' in i.node_group.node_processes:
|
||||
dns.append(CU.pu.get_role_name(i, 'DATANODE'))
|
||||
dns_to_delete.append(
|
||||
CU.pu.get_role_name(i, 'HDFS_GATEWAY'))
|
||||
|
||||
if 'YARN_NODEMANAGER' in i.node_group.node_processes:
|
||||
nms.append(CU.pu.get_role_name(i, 'NODEMANAGER'))
|
||||
nms_to_delete.append(
|
||||
CU.pu.get_role_name(i, 'YARN_GATEWAY'))
|
||||
|
||||
if dns:
|
||||
CU.decommission_nodes(cluster, 'DATANODE', dns)
|
||||
CU.decommission_nodes(
|
||||
cluster, 'DATANODE', dns, dns_to_delete)
|
||||
|
||||
if nms:
|
||||
CU.decommission_nodes(cluster, 'NODEMANAGER', nms)
|
||||
CU.decommission_nodes(
|
||||
cluster, 'NODEMANAGER', nms, nms_to_delete)
|
||||
|
||||
CU.delete_instances(cluster, instances)
|
||||
|
||||
CU.refresh_datanodes(cluster)
|
||||
CU.refresh_yarn_nodes(cluster)
|
||||
CU.restart_stale_services(cluster)
|
||||
|
||||
|
||||
@cpo.event_wrapper(True, step=_("Prepare cluster"), param=('cluster', 0))
|
||||
|
|
|
@ -106,29 +106,41 @@ def scale_cluster(cluster, instances):
|
|||
CU.configure_instances(instances, cluster)
|
||||
CU.update_configs(instances)
|
||||
CU.pu.configure_swift(cluster, instances)
|
||||
CU.refresh_datanodes(cluster)
|
||||
_start_roles(cluster, instances)
|
||||
CU.refresh_datanodes(cluster)
|
||||
CU.refresh_yarn_nodes(cluster)
|
||||
CU.restart_stale_services(cluster)
|
||||
|
||||
|
||||
def decommission_cluster(cluster, instances):
|
||||
dns = []
|
||||
dns_to_delete = []
|
||||
nms = []
|
||||
nms_to_delete = []
|
||||
for i in instances:
|
||||
if 'HDFS_DATANODE' in i.node_group.node_processes:
|
||||
dns.append(CU.pu.get_role_name(i, 'DATANODE'))
|
||||
dns_to_delete.append(
|
||||
CU.pu.get_role_name(i, 'HDFS_GATEWAY'))
|
||||
|
||||
if 'YARN_NODEMANAGER' in i.node_group.node_processes:
|
||||
nms.append(CU.pu.get_role_name(i, 'NODEMANAGER'))
|
||||
nms_to_delete.append(
|
||||
CU.pu.get_role_name(i, 'YARN_GATEWAY'))
|
||||
|
||||
if dns:
|
||||
CU.decommission_nodes(cluster, 'DATANODE', dns)
|
||||
CU.decommission_nodes(
|
||||
cluster, 'DATANODE', dns, dns_to_delete)
|
||||
|
||||
if nms:
|
||||
CU.decommission_nodes(cluster, 'NODEMANAGER', nms)
|
||||
CU.decommission_nodes(
|
||||
cluster, 'NODEMANAGER', nms, nms_to_delete)
|
||||
|
||||
CU.delete_instances(cluster, instances)
|
||||
|
||||
CU.refresh_datanodes(cluster)
|
||||
CU.refresh_yarn_nodes(cluster)
|
||||
CU.restart_stale_services(cluster)
|
||||
|
||||
|
||||
@cpo.event_wrapper(True, step=_("Prepare cluster"), param=('cluster', 0))
|
||||
|
|
|
@ -106,29 +106,41 @@ def scale_cluster(cluster, instances):
|
|||
CU.configure_instances(instances, cluster)
|
||||
CU.update_configs(instances)
|
||||
CU.pu.configure_swift(cluster, instances)
|
||||
CU.refresh_datanodes(cluster)
|
||||
_start_roles(cluster, instances)
|
||||
CU.refresh_datanodes(cluster)
|
||||
CU.refresh_yarn_nodes(cluster)
|
||||
CU.restart_stale_services(cluster)
|
||||
|
||||
|
||||
def decommission_cluster(cluster, instances):
|
||||
dns = []
|
||||
dns_to_delete = []
|
||||
nms = []
|
||||
nms_to_delete = []
|
||||
for i in instances:
|
||||
if 'HDFS_DATANODE' in i.node_group.node_processes:
|
||||
dns.append(CU.pu.get_role_name(i, 'DATANODE'))
|
||||
dns_to_delete.append(
|
||||
CU.pu.get_role_name(i, 'HDFS_GATEWAY'))
|
||||
|
||||
if 'YARN_NODEMANAGER' in i.node_group.node_processes:
|
||||
nms.append(CU.pu.get_role_name(i, 'NODEMANAGER'))
|
||||
nms_to_delete.append(
|
||||
CU.pu.get_role_name(i, 'YARN_GATEWAY'))
|
||||
|
||||
if dns:
|
||||
CU.decommission_nodes(cluster, 'DATANODE', dns)
|
||||
CU.decommission_nodes(
|
||||
cluster, 'DATANODE', dns, dns_to_delete)
|
||||
|
||||
if nms:
|
||||
CU.decommission_nodes(cluster, 'NODEMANAGER', nms)
|
||||
CU.decommission_nodes(
|
||||
cluster, 'NODEMANAGER', nms, nms_to_delete)
|
||||
|
||||
CU.delete_instances(cluster, instances)
|
||||
|
||||
CU.refresh_datanodes(cluster)
|
||||
CU.refresh_yarn_nodes(cluster)
|
||||
CU.restart_stale_services(cluster)
|
||||
|
||||
|
||||
@cpo.event_wrapper(True, step=_("Prepare cluster"), param=('cluster', 0))
|
||||
|
|
|
@ -119,16 +119,20 @@ class DeployCDHV550(base.SaharaTestCase):
|
|||
def test_decommission_cluster(self, mock_cu):
|
||||
deploy.decommission_cluster(self.cluster, self.instances)
|
||||
dns = []
|
||||
dns_2 = []
|
||||
nms = []
|
||||
nms_2 = []
|
||||
for i in self.instances:
|
||||
if 'HDFS_DATANODE' in i.node_group.node_processes:
|
||||
dns.append(mock_cu.pu.get_role_name(i, 'DATANODE'))
|
||||
dns_2.append(mock_cu.pu.get_role_name(i, 'HDFS_GATEWAY'))
|
||||
if 'YARN_NODEMANAGER' in i.node_group.node_processes:
|
||||
nms.append(mock_cu.pu.get_role_name(i, 'NODEMANAGER'))
|
||||
nms_2.append(mock_cu.pu.get_role_name(i, 'YARN_GATEWAY'))
|
||||
mock_cu.decommission_nodes.assert_any_call(
|
||||
self.cluster, 'DATANODE', dns)
|
||||
self.cluster, 'DATANODE', dns, dns_2)
|
||||
mock_cu.decommission_nodes.assert_any_call(
|
||||
self.cluster, 'NODEMANAGER', nms)
|
||||
self.cluster, 'NODEMANAGER', nms, nms_2)
|
||||
mock_cu.delete_instances.assert_called_once_with(self.cluster,
|
||||
self.instances)
|
||||
mock_cu.refresh_datanodes.assert_called_once_with(self.cluster)
|
||||
|
|
Loading…
Reference in New Issue