Decommission of a specific node
Adding the option to decommission specific node from cluster. Partially implements bp: decommission-specific-instance Change-Id: I1a858fecc1b32f91c76aea6db14e0d5a419211d1
This commit is contained in:
parent
32d4c744f7
commit
b001ef2a55
|
@ -60,7 +60,7 @@ def clusters_create_multiple(data):
|
|||
@rest.put('/clusters/<cluster_id>')
|
||||
@acl.enforce("data-processing:clusters:scale")
|
||||
@v.check_exists(api.get_cluster, 'cluster_id')
|
||||
@v.validate(v_c_schema.CLUSTER_SCALING_SCHEMA, v_c_s.check_cluster_scaling)
|
||||
@v.validate(v_c_schema.CLUSTER_SCALING_SCHEMA_V2, v_c_s.check_cluster_scaling)
|
||||
def clusters_scale(cluster_id, data):
|
||||
return u.to_wrapped_dict(api.scale_cluster, cluster_id, data)
|
||||
|
||||
|
|
|
@ -54,9 +54,12 @@ def scale_cluster(id, data):
|
|||
# the next map is the main object we will work with
|
||||
# to_be_enlarged : {node_group_id: desired_amount_of_instances}
|
||||
to_be_enlarged = {}
|
||||
node_group_instance_map = {}
|
||||
for ng in existing_node_groups:
|
||||
ng_id = g.find(cluster.node_groups, name=ng['name'])['id']
|
||||
to_be_enlarged.update({ng_id: ng['count']})
|
||||
if 'instances' in ng:
|
||||
node_group_instance_map.update({ng_id: ng['instances']})
|
||||
|
||||
additional = construct_ngs_for_scaling(cluster, additional_node_groups)
|
||||
cluster = conductor.cluster_get(ctx, cluster)
|
||||
|
@ -82,7 +85,8 @@ def scale_cluster(id, data):
|
|||
if node_group.id not in to_be_enlarged:
|
||||
to_be_enlarged[node_group.id] = node_group.count
|
||||
|
||||
api.OPS.provision_scaled_cluster(id, to_be_enlarged)
|
||||
api.OPS.provision_scaled_cluster(id, to_be_enlarged,
|
||||
node_group_instance_map)
|
||||
return cluster
|
||||
|
||||
|
||||
|
|
|
@ -84,7 +84,7 @@ class HeatEngine(e.Engine):
|
|||
for node_group in cluster.node_groups:
|
||||
conductor.node_group_update(ctx, node_group, {"count": 0})
|
||||
|
||||
def scale_cluster(self, cluster, target_count):
|
||||
def scale_cluster(self, cluster, target_count, instances_to_delete=None):
|
||||
ctx = context.ctx()
|
||||
|
||||
rollback_count = self._get_ng_counts(cluster)
|
||||
|
@ -94,7 +94,8 @@ class HeatEngine(e.Engine):
|
|||
|
||||
inst_ids = self._launch_instances(
|
||||
cluster, target_count, SCALE_STAGES,
|
||||
update_stack=True, disable_rollback=False)
|
||||
update_stack=True, disable_rollback=False,
|
||||
instances_to_delete=instances_to_delete)
|
||||
|
||||
cluster = conductor.cluster_get(ctx, cluster)
|
||||
c_u.clean_cluster_from_empty_ng(cluster)
|
||||
|
@ -209,10 +210,12 @@ class HeatEngine(e.Engine):
|
|||
@cpo.event_wrapper(
|
||||
True, step=_('Create Heat stack'), param=('cluster', 1))
|
||||
def _create_instances(self, cluster, target_count, update_stack=False,
|
||||
disable_rollback=True):
|
||||
disable_rollback=True, instances_to_delete=None):
|
||||
|
||||
stack = ht.ClusterStack(cluster)
|
||||
|
||||
self._update_instance_count(stack, cluster, target_count)
|
||||
self._update_instance_count(stack, cluster, target_count,
|
||||
instances_to_delete)
|
||||
stack.instantiate(update_existing=update_stack,
|
||||
disable_rollback=disable_rollback)
|
||||
heat.wait_stack_completion(
|
||||
|
@ -221,12 +224,14 @@ class HeatEngine(e.Engine):
|
|||
return self._populate_cluster(cluster, stack)
|
||||
|
||||
def _launch_instances(self, cluster, target_count, stages,
|
||||
update_stack=False, disable_rollback=True):
|
||||
update_stack=False, disable_rollback=True,
|
||||
instances_to_delete=None):
|
||||
# create all instances
|
||||
cluster = c_u.change_cluster_status(cluster, stages[0])
|
||||
|
||||
inst_ids = self._create_instances(
|
||||
cluster, target_count, update_stack, disable_rollback)
|
||||
cluster, target_count, update_stack, disable_rollback,
|
||||
instances_to_delete)
|
||||
|
||||
# wait for all instances are up and networks ready
|
||||
cluster = c_u.change_cluster_status(cluster, stages[1])
|
||||
|
@ -246,19 +251,27 @@ class HeatEngine(e.Engine):
|
|||
|
||||
return inst_ids
|
||||
|
||||
def _update_instance_count(self, stack, cluster, target_count):
|
||||
def _update_instance_count(self, stack, cluster, target_count,
|
||||
instances_to_delete=None):
|
||||
ctx = context.ctx()
|
||||
instances_name_to_delete = {}
|
||||
if instances_to_delete:
|
||||
for instance in instances_to_delete:
|
||||
node_group_id = instance['node_group']['id']
|
||||
if node_group_id not in instances_name_to_delete:
|
||||
instances_name_to_delete[node_group_id] = []
|
||||
instances_name_to_delete[node_group_id].append(
|
||||
instance['instance_name'])
|
||||
|
||||
for node_group in cluster.node_groups:
|
||||
count = target_count[node_group.id]
|
||||
stack.add_node_group_extra(node_group.id, count,
|
||||
self._generate_user_data_script)
|
||||
stack.add_node_group_extra(
|
||||
node_group.id, count, self._generate_user_data_script,
|
||||
instances_name_to_delete.get(node_group.id, None))
|
||||
|
||||
# if number of instances decreases, we need to drop
|
||||
# the excessive ones
|
||||
# instances list doesn't order by creating date, so we should
|
||||
# sort it to make sure deleted instances same as heat deleted.
|
||||
insts = sorted(
|
||||
node_group.instances,
|
||||
key=lambda x: int(x['instance_name'].split('-')[-1]))
|
||||
for i in range(count, node_group.count):
|
||||
conductor.instance_remove(ctx, insts[i])
|
||||
for inst in node_group.instances:
|
||||
if (instances_to_delete and
|
||||
node_group.id in instances_name_to_delete):
|
||||
if (inst.instance_name in
|
||||
instances_name_to_delete[node_group.id]):
|
||||
conductor.instance_remove(ctx, inst)
|
||||
|
|
|
@ -136,6 +136,10 @@ def _get_wc_waiter_name(inst_name):
|
|||
return '%s-wc-waiter' % inst_name
|
||||
|
||||
|
||||
def _get_index_from_inst_name(inst_name):
|
||||
return inst_name.split('-')[-1]
|
||||
|
||||
|
||||
class ClusterStack(object):
|
||||
def __init__(self, cluster):
|
||||
self.cluster = cluster
|
||||
|
@ -165,15 +169,16 @@ class ClusterStack(object):
|
|||
node_group=ng.name, info=self.base_info))
|
||||
|
||||
def add_node_group_extra(self, node_group_id, node_count,
|
||||
gen_userdata_func):
|
||||
gen_userdata_func, instances_to_delete=None):
|
||||
self.node_groups_extra[node_group_id] = {
|
||||
'node_count': node_count,
|
||||
'gen_userdata_func': gen_userdata_func
|
||||
'gen_userdata_func': gen_userdata_func,
|
||||
'instances_to_delete': instances_to_delete
|
||||
}
|
||||
|
||||
def _get_main_template(self):
|
||||
def _get_main_template(self, instances_to_delete=None):
|
||||
outputs = {}
|
||||
resources = self._serialize_resources(outputs)
|
||||
resources = self._serialize_resources(outputs, instances_to_delete)
|
||||
return yaml.safe_dump({
|
||||
"heat_template_version": heat_common.HEAT_TEMPLATE_VERSION,
|
||||
"description": self.base_info,
|
||||
|
@ -181,8 +186,9 @@ class ClusterStack(object):
|
|||
"outputs": outputs
|
||||
})
|
||||
|
||||
def instantiate(self, update_existing, disable_rollback=True):
|
||||
main_tmpl = self._get_main_template()
|
||||
def instantiate(self, update_existing, disable_rollback=True,
|
||||
instances_to_delete=None):
|
||||
main_tmpl = self._get_main_template(instances_to_delete)
|
||||
kwargs = {
|
||||
'stack_name': self.cluster.stack_name,
|
||||
'timeout_mins': 180,
|
||||
|
@ -241,7 +247,7 @@ class ClusterStack(object):
|
|||
}
|
||||
}
|
||||
|
||||
def _serialize_resources(self, outputs):
|
||||
def _serialize_resources(self, outputs, instances_to_delete=None):
|
||||
resources = {}
|
||||
|
||||
if self.cluster.anti_affinity:
|
||||
|
@ -250,14 +256,15 @@ class ClusterStack(object):
|
|||
resources.update(self._serialize_aa_server_group(i))
|
||||
|
||||
for ng in self.cluster.node_groups:
|
||||
resources.update(self._serialize_ng_group(ng, outputs))
|
||||
resources.update(self._serialize_ng_group(ng, outputs,
|
||||
instances_to_delete))
|
||||
|
||||
for ng in self.cluster.node_groups:
|
||||
resources.update(self._serialize_auto_security_group(ng))
|
||||
|
||||
return resources
|
||||
|
||||
def _serialize_ng_group(self, ng, outputs):
|
||||
def _serialize_ng_group(self, ng, outputs, instances_to_delete=None):
|
||||
ng_file_name = "file://" + ng.name + ".yaml"
|
||||
self.files[ng_file_name] = self._serialize_ng_file(ng)
|
||||
|
||||
|
@ -279,11 +286,19 @@ class ClusterStack(object):
|
|||
properties[AUTO_SECURITY_GROUP_PARAM_NAME] = {
|
||||
'get_resource': g.generate_auto_security_group_name(ng)}
|
||||
|
||||
removal_policies = []
|
||||
if self.node_groups_extra[ng.id]['instances_to_delete']:
|
||||
resource_list = []
|
||||
for name in self.node_groups_extra[ng.id]['instances_to_delete']:
|
||||
resource_list.append(_get_index_from_inst_name(name))
|
||||
removal_policies.append({'resource_list': resource_list})
|
||||
|
||||
return {
|
||||
ng.name: {
|
||||
"type": "OS::Heat::ResourceGroup",
|
||||
"properties": {
|
||||
"count": self.node_groups_extra[ng.id]['node_count'],
|
||||
"removal_policies": removal_policies,
|
||||
"resource_def": {
|
||||
"type": ng_file_name,
|
||||
"properties": properties
|
||||
|
|
|
@ -54,9 +54,11 @@ class LocalOps(object):
|
|||
context.spawn("cluster-creating-%s" % cluster_id,
|
||||
_provision_cluster, cluster_id)
|
||||
|
||||
def provision_scaled_cluster(self, cluster_id, node_group_id_map):
|
||||
def provision_scaled_cluster(self, cluster_id, node_group_id_map,
|
||||
node_group_instance_map=None):
|
||||
context.spawn("cluster-scaling-%s" % cluster_id,
|
||||
_provision_scaled_cluster, cluster_id, node_group_id_map)
|
||||
_provision_scaled_cluster, cluster_id, node_group_id_map,
|
||||
node_group_instance_map)
|
||||
|
||||
def terminate_cluster(self, cluster_id):
|
||||
context.spawn("cluster-terminating-%s" % cluster_id,
|
||||
|
@ -94,9 +96,11 @@ class RemoteOps(rpc_utils.RPCClient):
|
|||
def provision_cluster(self, cluster_id):
|
||||
self.cast('provision_cluster', cluster_id=cluster_id)
|
||||
|
||||
def provision_scaled_cluster(self, cluster_id, node_group_id_map):
|
||||
def provision_scaled_cluster(self, cluster_id, node_group_id_map,
|
||||
node_group_instance_map=None):
|
||||
self.cast('provision_scaled_cluster', cluster_id=cluster_id,
|
||||
node_group_id_map=node_group_id_map)
|
||||
node_group_id_map=node_group_id_map,
|
||||
node_group_instance_map=node_group_instance_map)
|
||||
|
||||
def terminate_cluster(self, cluster_id):
|
||||
self.cast('terminate_cluster', cluster_id=cluster_id)
|
||||
|
@ -143,8 +147,10 @@ class OpsServer(rpc_utils.RPCServer):
|
|||
_provision_cluster(cluster_id)
|
||||
|
||||
@request_context
|
||||
def provision_scaled_cluster(self, cluster_id, node_group_id_map):
|
||||
_provision_scaled_cluster(cluster_id, node_group_id_map)
|
||||
def provision_scaled_cluster(self, cluster_id, node_group_id_map,
|
||||
node_group_instance_map=None):
|
||||
_provision_scaled_cluster(cluster_id, node_group_id_map,
|
||||
node_group_instance_map)
|
||||
|
||||
@request_context
|
||||
def terminate_cluster(self, cluster_id):
|
||||
|
@ -314,42 +320,76 @@ def _provision_cluster(cluster_id):
|
|||
|
||||
@ops_error_handler(
|
||||
_("Scaling cluster failed for the following reason(s): {reason}"))
|
||||
def _provision_scaled_cluster(cluster_id, node_group_id_map):
|
||||
def _provision_scaled_cluster(cluster_id, node_group_id_map,
|
||||
node_group_instance_map=None):
|
||||
ctx, cluster, plugin = _prepare_provisioning(cluster_id)
|
||||
|
||||
# Decommissioning surplus nodes with the plugin
|
||||
cluster = c_u.change_cluster_status(
|
||||
cluster, c_u.CLUSTER_STATUS_DECOMMISSIONING)
|
||||
|
||||
instances_to_delete = []
|
||||
try:
|
||||
instances_to_delete = []
|
||||
for node_group in cluster.node_groups:
|
||||
new_count = node_group_id_map[node_group.id]
|
||||
if new_count < node_group.count:
|
||||
if (node_group_instance_map and
|
||||
node_group.id in node_group_instance_map):
|
||||
for instance_ref in node_group_instance_map[
|
||||
node_group.id]:
|
||||
instance = _get_instance_obj(node_group.instances,
|
||||
instance_ref)
|
||||
instances_to_delete.append(instance)
|
||||
|
||||
for node_group in cluster.node_groups:
|
||||
new_count = node_group_id_map[node_group.id]
|
||||
if new_count < node_group.count:
|
||||
instances_to_delete += node_group.instances[new_count:
|
||||
node_group.count]
|
||||
while node_group.count - new_count > len(instances_to_delete):
|
||||
instances_to_delete.append(_get_random_instance_from_ng(
|
||||
node_group.instances, instances_to_delete))
|
||||
|
||||
if instances_to_delete:
|
||||
context.set_step_type(_("Plugin: decommission cluster"))
|
||||
plugin.decommission_nodes(cluster, instances_to_delete)
|
||||
if instances_to_delete:
|
||||
context.set_step_type(_("Plugin: decommission cluster"))
|
||||
plugin.decommission_nodes(cluster, instances_to_delete)
|
||||
|
||||
# Scaling infrastructure
|
||||
cluster = c_u.change_cluster_status(
|
||||
cluster, c_u.CLUSTER_STATUS_SCALING)
|
||||
context.set_step_type(_("Engine: scale cluster"))
|
||||
instance_ids = INFRA.scale_cluster(cluster, node_group_id_map)
|
||||
|
||||
# Setting up new nodes with the plugin
|
||||
if instance_ids:
|
||||
ntp_service.configure_ntp(cluster_id, instance_ids)
|
||||
# Scaling infrastructure
|
||||
cluster = c_u.change_cluster_status(
|
||||
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
|
||||
instances = c_u.get_instances(cluster, instance_ids)
|
||||
context.set_step_type(_("Plugin: scale cluster"))
|
||||
plugin.scale_cluster(cluster, instances)
|
||||
cluster, c_u.CLUSTER_STATUS_SCALING)
|
||||
context.set_step_type(_("Engine: scale cluster"))
|
||||
instance_ids = INFRA.scale_cluster(cluster, node_group_id_map,
|
||||
instances_to_delete)
|
||||
# Setting up new nodes with the plugin
|
||||
if instance_ids:
|
||||
ntp_service.configure_ntp(cluster_id, instance_ids)
|
||||
cluster = c_u.change_cluster_status(
|
||||
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
|
||||
instances = c_u.get_instances(cluster, instance_ids)
|
||||
context.set_step_type(_("Plugin: scale cluster"))
|
||||
plugin.scale_cluster(cluster, instances)
|
||||
|
||||
c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE)
|
||||
_refresh_health_for_cluster(cluster_id)
|
||||
c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE)
|
||||
_refresh_health_for_cluster(cluster_id)
|
||||
|
||||
except Exception as e:
|
||||
c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE,
|
||||
six.text_type(e))
|
||||
|
||||
|
||||
def _get_instance_obj(instances, instance_ref):
|
||||
for instance in instances:
|
||||
if (instance.instance_id == instance_ref or
|
||||
instance.instance_name == instance_ref):
|
||||
return instance
|
||||
|
||||
raise exceptions.NotFoundException(str(instance_ref),
|
||||
_("Instance %s not found"))
|
||||
|
||||
|
||||
def _get_random_instance_from_ng(instances, instances_to_delete):
|
||||
# instances list doesn't order by creating date, so we should
|
||||
# sort it to make sure deleted instances same as heat deleted.
|
||||
insts = sorted(instances,
|
||||
key=lambda x: int(x['instance_name'].split('-')[-1]))
|
||||
for instance in reversed(insts):
|
||||
if instance not in instances_to_delete:
|
||||
return instance
|
||||
|
||||
|
||||
@ops_error_handler(
|
||||
|
|
|
@ -17,6 +17,7 @@ import collections
|
|||
|
||||
import novaclient.exceptions as nova_ex
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
|
||||
from sahara import conductor as cond
|
||||
|
@ -373,6 +374,40 @@ def check_resize(cluster, r_node_groups):
|
|||
cluster.hadoop_version,
|
||||
ng_tmp['node_group_template'])
|
||||
|
||||
for scaling_ng in r_node_groups:
|
||||
current_count = ng_map[scaling_ng['name']].count
|
||||
new_count = scaling_ng['count']
|
||||
count_diff = current_count - new_count
|
||||
if 'instances' in scaling_ng:
|
||||
if len(scaling_ng['instances']) > count_diff:
|
||||
raise ex.InvalidDataException(
|
||||
_("Number of specific instances (%(instance)s) to"
|
||||
" delete can not be greater than the count difference"
|
||||
" (%(count)s during scaling")
|
||||
% {'instance': str(len(scaling_ng['instances'])),
|
||||
'count': str(count_diff)})
|
||||
else:
|
||||
if len(scaling_ng['instances']) > 0:
|
||||
is_uuid = uuidutils.is_uuid_like(
|
||||
scaling_ng['instances'][0])
|
||||
if is_uuid:
|
||||
for instance in scaling_ng['instances']:
|
||||
if not uuidutils.is_uuid_like(instance):
|
||||
raise ex.InvalidReferenceException(
|
||||
_("You can only reference instances by"
|
||||
" Name or UUID, not both on the same"
|
||||
" request"))
|
||||
else:
|
||||
for instance in scaling_ng['instances']:
|
||||
if uuidutils.is_uuid_like(instance):
|
||||
raise ex.InvalidReferenceException(
|
||||
_("You can only reference instances by"
|
||||
" Name or UUID, not both on the same"
|
||||
" request"))
|
||||
_check_duplicates(scaling_ng['instances'],
|
||||
_("Duplicate entry for instances to"
|
||||
" delete"))
|
||||
|
||||
|
||||
def check_add_node_groups(cluster, add_node_groups):
|
||||
cluster_ng_names = [ng.name for ng in cluster.node_groups]
|
||||
|
|
|
@ -134,3 +134,15 @@ CLUSTER_SCALING_SCHEMA = {
|
|||
}
|
||||
]
|
||||
}
|
||||
|
||||
CLUSTER_SCALING_SCHEMA_V2 = copy.deepcopy(CLUSTER_SCALING_SCHEMA)
|
||||
CLUSTER_SCALING_SCHEMA_V2['properties']['resize_node_groups'][
|
||||
'items']['properties'].update(
|
||||
{
|
||||
"instances": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string",
|
||||
},
|
||||
}
|
||||
})
|
||||
|
|
|
@ -77,6 +77,36 @@ SCALE_DATA = {
|
|||
]
|
||||
}
|
||||
|
||||
SCALE_DATA_SPECIFIC_INSTANCE = {
|
||||
'resize_node_groups': [
|
||||
{
|
||||
'name': 'ng_1',
|
||||
'count': 3,
|
||||
},
|
||||
{
|
||||
'name': 'ng_2',
|
||||
'count': 2,
|
||||
'instances': ['ng_2_0']
|
||||
}
|
||||
],
|
||||
'add_node_groups': []
|
||||
}
|
||||
|
||||
SCALE_DATA_N_SPECIFIC_INSTANCE = {
|
||||
'resize_node_groups': [
|
||||
{
|
||||
'name': 'ng_1',
|
||||
'count': 3,
|
||||
},
|
||||
{
|
||||
'name': 'ng_2',
|
||||
'count': 1,
|
||||
'instances': ['ng_2_0', 'ng_2_2']
|
||||
}
|
||||
],
|
||||
'add_node_groups': []
|
||||
}
|
||||
|
||||
|
||||
class FakePlugin(pr_base.ProvisioningPluginBase):
|
||||
_info = {}
|
||||
|
|
|
@ -37,19 +37,54 @@ class FakeOps(object):
|
|||
|
||||
def provision_cluster(self, id):
|
||||
self.calls_order.append('ops.provision_cluster')
|
||||
cluster = conductor.cluster_get(context.ctx(), id)
|
||||
target_count = {}
|
||||
for node_group in cluster.node_groups:
|
||||
target_count[node_group.id] = node_group.count
|
||||
|
||||
for node_group in cluster.node_groups:
|
||||
conductor.node_group_update(context.ctx(),
|
||||
node_group, {"count": 0})
|
||||
|
||||
for node_group in cluster.node_groups:
|
||||
for i in range(target_count[node_group.id]):
|
||||
inst = {
|
||||
"instance_id": node_group.name + '_' + str(i),
|
||||
"instance_name": node_group.name + '_' + str(i)
|
||||
}
|
||||
conductor.instance_add(context.ctx(), node_group, inst)
|
||||
conductor.cluster_update(
|
||||
context.ctx(), id, {'status': c_u.CLUSTER_STATUS_ACTIVE})
|
||||
|
||||
def provision_scaled_cluster(self, id, to_be_enlarged):
|
||||
def provision_scaled_cluster(self, id, to_be_enlarged,
|
||||
node_group_instance_map=None):
|
||||
self.calls_order.append('ops.provision_scaled_cluster')
|
||||
cluster = conductor.cluster_get(context.ctx(), id)
|
||||
|
||||
# Set scaled to see difference between active and scaled
|
||||
for (ng, count) in six.iteritems(to_be_enlarged):
|
||||
instances_to_delete = []
|
||||
if node_group_instance_map:
|
||||
if ng in node_group_instance_map:
|
||||
instances_to_delete = self._get_instance(
|
||||
cluster, node_group_instance_map[ng])
|
||||
for instance in instances_to_delete:
|
||||
conductor.instance_remove(context.ctx(), instance)
|
||||
|
||||
conductor.node_group_update(context.ctx(), ng, {'count': count})
|
||||
conductor.cluster_update(context.ctx(), id, {'status': 'Scaled'})
|
||||
|
||||
def terminate_cluster(self, id):
|
||||
self.calls_order.append('ops.terminate_cluster')
|
||||
|
||||
def _get_instance(self, cluster, instances_to_delete):
|
||||
instances = []
|
||||
for node_group in cluster.node_groups:
|
||||
for instance in node_group.instances:
|
||||
if instance.instance_id in instances_to_delete:
|
||||
instances.append(instance)
|
||||
return instances
|
||||
|
||||
|
||||
class TestClusterApi(base.SaharaWithDbTestCase):
|
||||
def setUp(self):
|
||||
|
@ -134,6 +169,7 @@ class TestClusterApi(base.SaharaWithDbTestCase):
|
|||
@mock.patch('sahara.service.quotas.check_scaling', return_value=None)
|
||||
def test_scale_cluster_success(self, check_scaling, check_cluster):
|
||||
cluster = api.create_cluster(api_base.SAMPLE_CLUSTER)
|
||||
cluster = api.get_cluster(cluster.id)
|
||||
api.scale_cluster(cluster.id, api_base.SCALE_DATA)
|
||||
result_cluster = api.get_cluster(cluster.id)
|
||||
self.assertEqual('Scaled', result_cluster.status)
|
||||
|
@ -156,6 +192,46 @@ class TestClusterApi(base.SaharaWithDbTestCase):
|
|||
'ops.provision_scaled_cluster',
|
||||
'ops.terminate_cluster'], self.calls_order)
|
||||
|
||||
@mock.patch('sahara.service.quotas.check_cluster', return_value=None)
|
||||
@mock.patch('sahara.service.quotas.check_scaling', return_value=None)
|
||||
def test_scale_cluster_n_specific_instances_success(self, check_scaling,
|
||||
check_cluster):
|
||||
cluster = api.create_cluster(api_base.SAMPLE_CLUSTER)
|
||||
cluster = api.get_cluster(cluster.id)
|
||||
api.scale_cluster(cluster.id, api_base.SCALE_DATA_N_SPECIFIC_INSTANCE)
|
||||
result_cluster = api.get_cluster(cluster.id)
|
||||
self.assertEqual('Scaled', result_cluster.status)
|
||||
expected_count = {
|
||||
'ng_1': 3,
|
||||
'ng_2': 1,
|
||||
'ng_3': 1,
|
||||
}
|
||||
ng_count = 0
|
||||
for ng in result_cluster.node_groups:
|
||||
self.assertEqual(expected_count[ng.name], ng.count)
|
||||
ng_count += 1
|
||||
self.assertEqual(1, result_cluster.node_groups[1].count)
|
||||
self.assertNotIn('ng_2_0',
|
||||
self._get_instances_ids(
|
||||
result_cluster.node_groups[1]))
|
||||
self.assertNotIn('ng_2_2',
|
||||
self._get_instances_ids(
|
||||
result_cluster.node_groups[1]))
|
||||
self.assertEqual(3, ng_count)
|
||||
api.terminate_cluster(result_cluster.id)
|
||||
self.assertEqual(
|
||||
['get_open_ports', 'recommend_configs', 'validate',
|
||||
'ops.provision_cluster', 'get_open_ports',
|
||||
'recommend_configs', 'validate_scaling',
|
||||
'ops.provision_scaled_cluster',
|
||||
'ops.terminate_cluster'], self.calls_order)
|
||||
|
||||
def _get_instances_ids(self, node_group):
|
||||
instance_ids = []
|
||||
for instance in node_group.instances:
|
||||
instance_ids.append(instance.instance_id)
|
||||
return instance_ids
|
||||
|
||||
@mock.patch('sahara.service.quotas.check_cluster', return_value=None)
|
||||
@mock.patch('sahara.service.quotas.check_scaling', return_value=None)
|
||||
def test_scale_cluster_failed(self, check_scaling, check_cluster):
|
||||
|
|
|
@ -30,7 +30,8 @@ class FakeCluster(object):
|
|||
class FakeNodeGroup(object):
|
||||
id = 'id'
|
||||
count = 2
|
||||
instances = [1, 2]
|
||||
instances = [{'instance_name': 'id-10', 'id': 2},
|
||||
{'instance_name': 'id-2', 'id': 1}]
|
||||
|
||||
|
||||
class FakePlugin(mock.Mock):
|
||||
|
@ -52,7 +53,8 @@ class FakePlugin(mock.Mock):
|
|||
def decommission_nodes(self, cluster, instances_to_delete):
|
||||
TestOPS.SEQUENCE.append('decommission_nodes')
|
||||
|
||||
def scale_cluster(self, cluster, node_group_id_map):
|
||||
def scale_cluster(self, cluster, node_group_id_map,
|
||||
node_group_instance_map=None):
|
||||
TestOPS.SEQUENCE.append('plugin.scale_cluster')
|
||||
|
||||
def cluster_destroy(self, ctx, cluster):
|
||||
|
@ -63,7 +65,8 @@ class FakeINFRA(object):
|
|||
def create_cluster(self, cluster):
|
||||
TestOPS.SEQUENCE.append('create_cluster')
|
||||
|
||||
def scale_cluster(self, cluster, node_group_id_map):
|
||||
def scale_cluster(self, cluster, node_group_id_map,
|
||||
node_group_instance_map=None):
|
||||
TestOPS.SEQUENCE.append('INFRA.scale_cluster')
|
||||
return True
|
||||
|
||||
|
|
|
@ -211,6 +211,23 @@ class TestScalingValidation(u.ValidationTestCase):
|
|||
self.assertEqual(1, req_data.call_count)
|
||||
self._assert_calls(bad_req, bad_req_i)
|
||||
|
||||
@mock.patch("sahara.utils.api.request_data")
|
||||
@mock.patch("sahara.utils.api.bad_request")
|
||||
def _assert_cluster_scaling_validation_v2(self,
|
||||
bad_req=None,
|
||||
req_data=None,
|
||||
data=None,
|
||||
bad_req_i=None):
|
||||
m_func = mock.Mock()
|
||||
m_func.__name__ = "m_func"
|
||||
req_data.return_value = data
|
||||
v.validate(c_schema.CLUSTER_SCALING_SCHEMA_V2,
|
||||
self._create_object_fun)(m_func)(data=data,
|
||||
cluster_id='42')
|
||||
|
||||
self.assertEqual(1, req_data.call_count)
|
||||
self._assert_calls(bad_req, bad_req_i)
|
||||
|
||||
@mock.patch("sahara.service.api.OPS")
|
||||
def test_cluster_scaling_scheme_v_resize_ng(self, ops):
|
||||
ops.get_engine_type_and_version.return_value = "direct.1.1"
|
||||
|
@ -259,6 +276,55 @@ class TestScalingValidation(u.ValidationTestCase):
|
|||
u"allowed ('flavor_id' was unexpected)")
|
||||
)
|
||||
|
||||
@mock.patch("sahara.service.api.OPS")
|
||||
def test_cluster_scaling_scheme_v_resize_ng_v2(self, ops):
|
||||
ops.get_engine_type_and_version.return_value = "direct.1.1"
|
||||
self._create_object_fun = mock.Mock()
|
||||
data = {
|
||||
}
|
||||
self._assert_cluster_scaling_validation_v2(
|
||||
data=data,
|
||||
bad_req_i=(1, 'VALIDATION_ERROR',
|
||||
u'{} is not valid under any of the given schemas')
|
||||
)
|
||||
data = {
|
||||
'resize_node_groups': [{}]
|
||||
}
|
||||
self._assert_cluster_scaling_validation_v2(
|
||||
data=data,
|
||||
bad_req_i=(1, 'VALIDATION_ERROR',
|
||||
u"resize_node_groups[0]: 'name' is a required property")
|
||||
)
|
||||
data = {
|
||||
'resize_node_groups': [
|
||||
{
|
||||
'name': 'a'
|
||||
}
|
||||
]
|
||||
}
|
||||
self._assert_cluster_scaling_validation_v2(
|
||||
data=data,
|
||||
bad_req_i=(1, 'VALIDATION_ERROR',
|
||||
u"resize_node_groups[0]: 'count' is a required "
|
||||
u"property")
|
||||
)
|
||||
data = {
|
||||
'resize_node_groups': [
|
||||
{
|
||||
'name': 'a',
|
||||
'flavor_id': '42',
|
||||
'instances': ['id1'],
|
||||
'count': 2
|
||||
}
|
||||
]
|
||||
}
|
||||
self._assert_cluster_scaling_validation_v2(
|
||||
data=data,
|
||||
bad_req_i=(1, 'VALIDATION_ERROR',
|
||||
u"resize_node_groups[0]: Additional properties are not "
|
||||
u"allowed ('flavor_id' was unexpected)")
|
||||
)
|
||||
|
||||
@mock.patch("sahara.service.api.OPS")
|
||||
def test_cluster_scaling_validation_add_ng(self, ops):
|
||||
ops.get_engine_type_and_version.return_value = "direct.1.1"
|
||||
|
|
Loading…
Reference in New Issue