Force deletion of clusters

* Add force delete operation to engine using stack abandon
* Basic unit tests for force delete
* Necessary removal of "direct engine" code
* Change schema of APIv2 cluster delete to allow force delete
* Unit tests for new cluster delete schema
* Basic docs about force delete
* Release note

bp sahara-force-delete
Partial-Bug: #1647411

Change-Id: Ida72677c0a4110cb78edf9d62d8330cd4608ff76
This commit is contained in:
Jeremy Freudberg 2017-12-22 05:38:55 +00:00
parent 5535350aff
commit 6850bb8ba8
14 changed files with 178 additions and 230 deletions

View File

@ -144,6 +144,13 @@ Deleting an existing Cluster
The only step, that releases all Cluster's resources and removes it from the The only step, that releases all Cluster's resources and removes it from the
database. database.
2. Force Deleting
~~~~~~~~~~~~~~~~~
In extreme cases the regular "Deleting" step may hang. Sahara APIv2 introduces
the ability to force delete a Cluster. This prevents deleting from hanging but
comes with the risk of orphaned resources.
Error State Error State
----------- -----------

View File

@ -0,0 +1,4 @@
---
features:
- The ability to force delete clusters is exposed in Sahara APIv2. The Heat
service must support Stack Abandon for force delete to function properly.

View File

@ -86,7 +86,9 @@ def clusters_update(cluster_id, data):
@rest.delete('/clusters/<cluster_id>') @rest.delete('/clusters/<cluster_id>')
@acl.enforce("data-processing:clusters:delete") @acl.enforce("data-processing:clusters:delete")
@v.check_exists(api.get_cluster, 'cluster_id') @v.check_exists(api.get_cluster, 'cluster_id')
@v.validate(None, v_c.check_cluster_delete) @v.validate(v_c_schema.CLUSTER_DELETE_SCHEMA_V2, v_c.check_cluster_delete)
def clusters_delete(cluster_id): def clusters_delete(cluster_id):
api.terminate_cluster(cluster_id) data = u.request_data()
force = data.get('force', False)
api.terminate_cluster(cluster_id, force=force)
return u.render() return u.render()

View File

@ -149,14 +149,14 @@ def _add_ports_for_auto_sg(ctx, cluster, plugin):
conductor.node_group_update(ctx, ng, ports) conductor.node_group_update(ctx, ng, ports)
def terminate_cluster(id): def terminate_cluster(id, force=False):
context.set_current_cluster_id(id) context.set_current_cluster_id(id)
cluster = c_u.change_cluster_status(id, c_u.CLUSTER_STATUS_DELETING) cluster = c_u.change_cluster_status(id, c_u.CLUSTER_STATUS_DELETING)
if cluster is None: if cluster is None:
return return
api.OPS.terminate_cluster(id) api.OPS.terminate_cluster(id, force)
sender.status_notify(cluster.id, cluster.name, cluster.status, sender.status_notify(cluster.id, cluster.name, cluster.status,
"delete") "delete")

View File

@ -18,7 +18,6 @@ import abc
import datetime import datetime
import string import string
from novaclient import exceptions as nova_exceptions
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import six import six
@ -27,14 +26,11 @@ from sahara import conductor as c
from sahara import context from sahara import context
from sahara.i18n import _ from sahara.i18n import _
from sahara.service import networks from sahara.service import networks
from sahara.service import volumes
from sahara.utils import cluster as cluster_utils from sahara.utils import cluster as cluster_utils
from sahara.utils import cluster_progress_ops as cpo from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import edp from sahara.utils import edp
from sahara.utils import general as g
from sahara.utils.openstack import base as b from sahara.utils.openstack import base as b
from sahara.utils.openstack import images as sahara_images from sahara.utils.openstack import images as sahara_images
from sahara.utils.openstack import nova
from sahara.utils import poll_utils from sahara.utils import poll_utils
from sahara.utils import remote from sahara.utils import remote
@ -55,7 +51,7 @@ class Engine(object):
pass pass
@abc.abstractmethod @abc.abstractmethod
def shutdown_cluster(self, cluster): def shutdown_cluster(self, cluster, force):
pass pass
@abc.abstractmethod @abc.abstractmethod
@ -229,115 +225,6 @@ sed '/^Defaults requiretty*/ s/^/#/' -i /etc/sudoers\n
"end_time": datetime.datetime.now()}) "end_time": datetime.datetime.now()})
conductor.job_execution_update(ctx, je, update) conductor.job_execution_update(ctx, je, update)
def _delete_auto_security_group(self, node_group):
if not node_group.auto_security_group:
return
if not node_group.security_groups:
# node group has no security groups
# nothing to delete
return
name = node_group.security_groups[-1]
try:
client = nova.client().security_groups
security_group = b.execute_with_retries(client.get, name)
if (security_group.name !=
g.generate_auto_security_group_name(node_group)):
LOG.warning("Auto security group for node group {name} is "
"not found".format(name=node_group.name))
return
b.execute_with_retries(client.delete, name)
except Exception:
LOG.warning("Failed to delete security group {name}".format(
name=name))
def _delete_aa_server_groups(self, cluster):
if cluster.anti_affinity:
for i in range(1, cluster.anti_affinity_ratio):
server_group_name = g.generate_aa_group_name(cluster.name, i)
client = nova.client().server_groups
server_groups = b.execute_with_retries(client.findall,
name=server_group_name)
if len(server_groups) == 1:
b.execute_with_retries(client.delete, server_groups[0].id)
'''In case the server group is created
using mitaka or older version'''
old_server_group_name = server_group_name.rsplit('-', 1)[0]
server_groups_old = b.execute_with_retries(
client.findall,
name=old_server_group_name)
if len(server_groups_old) == 1:
b.execute_with_retries(client.delete,
server_groups_old[0].id)
def _shutdown_instance(self, instance):
# Heat dissociates and deletes upon deletion of resources
# See OS::Neutron::FloatingIP and OS::Neutron::FloatingIPAssociation
if instance.node_group.floating_ip_pool:
pass
try:
volumes.detach_from_instance(instance)
except Exception:
LOG.warning("Detaching volumes from instance failed")
try:
b.execute_with_retries(nova.client().servers.delete,
instance.instance_id)
except nova_exceptions.NotFound:
LOG.warning("Attempted to delete non-existent instance")
conductor.instance_remove(context.ctx(), instance)
@cpo.event_wrapper(mark_successful_on_exit=False)
def _check_if_deleted(self, instance):
try:
nova.get_instance_info(instance)
except nova_exceptions.NotFound:
return True
return False
@poll_utils.poll_status(
'delete_instances_timeout',
_("Wait for instances to be deleted"), sleep=1)
def _check_deleted(self, deleted_ids, cluster, instances):
if not cluster_utils.check_cluster_exists(cluster):
return True
for instance in instances:
if instance.id not in deleted_ids:
with context.set_current_instance_id(instance.instance_id):
if self._check_if_deleted(instance):
LOG.debug("Instance is deleted")
deleted_ids.add(instance.id)
cpo.add_successful_event(instance)
return len(deleted_ids) == len(instances)
def _await_deleted(self, cluster, instances):
"""Await all instances are deleted."""
if not instances:
return
cpo.add_provisioning_step(
cluster.id, _("Wait for instances to be deleted"), len(instances))
deleted_ids = set()
self._check_deleted(deleted_ids, cluster, instances)
def _shutdown_instances(self, cluster):
for node_group in cluster.node_groups:
for instance in node_group.instances:
with context.set_current_instance_id(instance.instance_id):
self._shutdown_instance(instance)
self._await_deleted(cluster, node_group.instances)
self._delete_auto_security_group(node_group)
def _remove_db_objects(self, cluster): def _remove_db_objects(self, cluster):
ctx = context.ctx() ctx = context.ctx()
cluster = conductor.cluster_get(ctx, cluster) cluster = conductor.cluster_get(ctx, cluster)

View File

@ -191,21 +191,22 @@ class HeatEngine(e.Engine):
self._launch_instances(cluster, rollback_count, ROLLBACK_STAGES, self._launch_instances(cluster, rollback_count, ROLLBACK_STAGES,
update_stack=True) update_stack=True)
def shutdown_cluster(self, cluster): def shutdown_cluster(self, cluster, force=False):
"""Shutdown specified cluster and all related resources.""" """Shutdown specified cluster and all related resources."""
if force:
heat_shutdown = heat.abandon_stack
else:
heat_shutdown = heat.delete_stack
try: try:
heat.delete_stack(cluster) heat_shutdown(cluster)
except heat_exc.HTTPNotFound: except heat_exc.HTTPNotFound:
LOG.warning('Did not find stack for cluster. Trying to delete ' LOG.warning('Did not find stack for cluster.')
'cluster manually.') except heat_exc.BadRequest:
LOG.error("Can't force delete cluster.", exc_info=True)
# Stack not found. Trying to delete cluster like direct engine finally:
# do it self._clean_job_executions(cluster)
self._shutdown_instances(cluster) self._remove_db_objects(cluster)
self._delete_aa_server_groups(cluster)
self._clean_job_executions(cluster)
self._remove_db_objects(cluster)
@cpo.event_wrapper( @cpo.event_wrapper(
True, step=_('Create Heat stack'), param=('cluster', 1)) True, step=_('Create Heat stack'), param=('cluster', 1))

View File

@ -60,9 +60,9 @@ class LocalOps(object):
_provision_scaled_cluster, cluster_id, node_group_id_map, _provision_scaled_cluster, cluster_id, node_group_id_map,
node_group_instance_map) node_group_instance_map)
def terminate_cluster(self, cluster_id): def terminate_cluster(self, cluster_id, force=False):
context.spawn("cluster-terminating-%s" % cluster_id, context.spawn("cluster-terminating-%s" % cluster_id,
terminate_cluster, cluster_id) terminate_cluster, cluster_id, force)
def run_edp_job(self, job_execution_id): def run_edp_job(self, job_execution_id):
context.spawn("Starting Job Execution %s" % job_execution_id, context.spawn("Starting Job Execution %s" % job_execution_id,
@ -102,8 +102,8 @@ class RemoteOps(rpc_utils.RPCClient):
node_group_id_map=node_group_id_map, node_group_id_map=node_group_id_map,
node_group_instance_map=node_group_instance_map) node_group_instance_map=node_group_instance_map)
def terminate_cluster(self, cluster_id): def terminate_cluster(self, cluster_id, force=False):
self.cast('terminate_cluster', cluster_id=cluster_id) self.cast('terminate_cluster', cluster_id=cluster_id, force=force)
def run_edp_job(self, job_execution_id): def run_edp_job(self, job_execution_id):
self.cast('run_edp_job', job_execution_id=job_execution_id) self.cast('run_edp_job', job_execution_id=job_execution_id)
@ -153,8 +153,8 @@ class OpsServer(rpc_utils.RPCServer):
node_group_instance_map) node_group_instance_map)
@request_context @request_context
def terminate_cluster(self, cluster_id): def terminate_cluster(self, cluster_id, force=False):
terminate_cluster(cluster_id) terminate_cluster(cluster_id, force)
@request_context @request_context
def run_edp_job(self, job_execution_id): def run_edp_job(self, job_execution_id):
@ -394,7 +394,7 @@ def _get_random_instance_from_ng(instances, instances_to_delete):
@ops_error_handler( @ops_error_handler(
_("Terminating cluster failed for the following reason(s): {reason}")) _("Terminating cluster failed for the following reason(s): {reason}"))
def terminate_cluster(cluster_id): def terminate_cluster(cluster_id, force=False):
ctx = context.ctx() ctx = context.ctx()
_setup_trust_for_cluster(cluster_id) _setup_trust_for_cluster(cluster_id)
@ -406,7 +406,7 @@ def terminate_cluster(cluster_id):
plugin.on_terminate_cluster(cluster) plugin.on_terminate_cluster(cluster)
context.set_step_type(_("Engine: shutdown cluster")) context.set_step_type(_("Engine: shutdown cluster"))
INFRA.shutdown_cluster(cluster) INFRA.shutdown_cluster(cluster, force)
trusts.delete_trust_from_cluster(cluster) trusts.delete_trust_from_cluster(cluster)

View File

@ -146,3 +146,14 @@ CLUSTER_SCALING_SCHEMA_V2['properties']['resize_node_groups'][
}, },
} }
}) })
CLUSTER_DELETE_SCHEMA_V2 = {
"type": "object",
"properties": {
"force": {
"type": "boolean"
}
},
"additionalProperties": False,
"required": []
}

View File

@ -74,7 +74,7 @@ class FakeOps(object):
conductor.node_group_update(context.ctx(), ng, {'count': count}) conductor.node_group_update(context.ctx(), ng, {'count': count})
conductor.cluster_update(context.ctx(), id, {'status': 'Scaled'}) conductor.cluster_update(context.ctx(), id, {'status': 'Scaled'})
def terminate_cluster(self, id): def terminate_cluster(self, id, force):
self.calls_order.append('ops.terminate_cluster') self.calls_order.append('ops.terminate_cluster')
def _get_instance(self, cluster, instances_to_delete): def _get_instance(self, cluster, instances_to_delete):

View File

@ -20,7 +20,6 @@ import mock
from sahara.service import engine from sahara.service import engine
from sahara.service.heat import heat_engine from sahara.service.heat import heat_engine
from sahara.tests.unit import base from sahara.tests.unit import base
from sahara.utils import general as g
class EngineTest(engine.Engine): class EngineTest(engine.Engine):
@ -97,84 +96,11 @@ class TestDeletion(base.SaharaTestCase):
super(TestDeletion, self).setUp() super(TestDeletion, self).setUp()
self.engine = EngineTest() self.engine = EngineTest()
@mock.patch('sahara.utils.openstack.nova.client')
def test_delete_auto_security_group(self, nova_client):
ng = mock.Mock(id="16fd2706-8baf-433b-82eb-8c7fada847da",
auto_security_group=True)
ng.name = "ngname"
ng.cluster.name = "cluster"
auto_name = g.generate_auto_security_group_name(ng)
ng.security_groups = [auto_name]
client = mock.Mock()
nova_client.return_value = client
client.security_groups.get.side_effect = lambda x: SecurityGroup(x)
self.engine._delete_auto_security_group(ng)
client.security_groups.delete.assert_called_once_with(auto_name)
@mock.patch('sahara.utils.openstack.nova.client')
def test_delete_auto_security_group_other_groups(self, nova_client):
ng = mock.Mock(id="16fd2706-8baf-433b-82eb-8c7fada847da",
auto_security_group=True)
ng.name = "ngname"
ng.cluster.name = "cluster"
auto_name = g.generate_auto_security_group_name(ng)
ng.security_groups = ['1', '2', auto_name]
client = mock.Mock()
nova_client.return_value = client
client.security_groups.get.side_effect = lambda x: SecurityGroup(x)
self.engine._delete_auto_security_group(ng)
client.security_groups.delete.assert_called_once_with(auto_name)
@mock.patch('sahara.utils.openstack.nova.client')
def test_delete_auto_security_group_no_groups(self, nova_client):
ng = mock.Mock(id="16fd2706-8baf-433b-82eb-8c7fada847da",
auto_security_group=True)
ng.name = "ngname"
ng.cluster.name = "cluster"
ng.security_groups = []
client = mock.Mock()
nova_client.return_value = client
client.security_groups.get.side_effect = lambda x: SecurityGroup(x)
self.engine._delete_auto_security_group(ng)
self.assertEqual(0, client.security_groups.delete.call_count)
@mock.patch('sahara.utils.openstack.nova.client')
def test_delete_auto_security_group_wrong_group(self, nova_client):
ng = mock.Mock(id="16fd2706-8baf-433b-82eb-8c7fada847da",
auto_security_group=True)
ng.name = "ngname"
ng.cluster.name = "cluster"
ng.security_groups = ['1', '2']
client = mock.Mock()
nova_client.return_value = client
client.security_groups.get.side_effect = lambda x: SecurityGroup(x)
self.engine._delete_auto_security_group(ng)
self.assertEqual(0, client.security_groups.delete.call_count)
@mock.patch('sahara.service.engine.Engine._delete_aa_server_groups')
@mock.patch('sahara.service.engine.Engine._shutdown_instances')
@mock.patch('sahara.service.engine.Engine._remove_db_objects') @mock.patch('sahara.service.engine.Engine._remove_db_objects')
@mock.patch('sahara.service.engine.Engine._clean_job_executions') @mock.patch('sahara.service.engine.Engine._clean_job_executions')
@mock.patch('sahara.utils.openstack.heat.client') @mock.patch('sahara.utils.openstack.heat.client')
@mock.patch('sahara.service.heat.heat_engine.LOG.warning') @mock.patch('sahara.service.heat.heat_engine.LOG.warning')
def test_calls_order(self, logger, heat_client, _job_ex, _db_ob, def test_calls_order(self, logger, heat_client, _job_ex, _db_ob):
_shutdown, _delete_aa):
class FakeHeatEngine(heat_engine.HeatEngine): class FakeHeatEngine(heat_engine.HeatEngine):
def __init__(self): def __init__(self):
super(FakeHeatEngine, self).__init__() super(FakeHeatEngine, self).__init__()
@ -188,26 +114,55 @@ class TestDeletion(base.SaharaTestCase):
self.order.append('remove_db_objects') self.order.append('remove_db_objects')
super(FakeHeatEngine, self)._remove_db_objects(cluster) super(FakeHeatEngine, self)._remove_db_objects(cluster)
def _shutdown_instances(self, cluster):
self.order.append('shutdown_instances')
super(FakeHeatEngine, self)._shutdown_instances(cluster)
def _delete_aa_server_groups(self, cluster):
self.order.append('delete_aa_server_groups')
super(FakeHeatEngine, self)._delete_aa_server_groups(cluster)
fake_cluster = mock.Mock() fake_cluster = mock.Mock()
heat_client.side_effect = heat_exc.HTTPNotFound() heat_client.side_effect = heat_exc.HTTPNotFound()
engine = FakeHeatEngine() engine = FakeHeatEngine()
engine.shutdown_cluster(fake_cluster) engine.shutdown_cluster(fake_cluster)
self.assertEqual(['shutdown_instances', 'delete_aa_server_groups', self.assertEqual(['clean_job_executions', 'remove_db_objects'],
'clean_job_executions', 'remove_db_objects'],
engine.order) engine.order)
self.assertEqual( self.assertEqual(
[mock.call('Did not find stack for cluster. Trying to ' [mock.call('Did not find stack for cluster.')],
'delete cluster manually.')], logger.call_args_list) logger.call_args_list
)
@mock.patch('sahara.service.heat.heat_engine.LOG.error')
@mock.patch('sahara.utils.openstack.heat.delete_stack')
@mock.patch('sahara.utils.openstack.heat.abandon_stack')
@mock.patch('sahara.service.engine.Engine._remove_db_objects')
@mock.patch('sahara.service.engine.Engine._clean_job_executions')
def test_force_delete_calls(self, cj, rdb, abandon, delete, log_err):
engine = heat_engine.HeatEngine()
class SecurityGroup(object): # Force delete when Heat service support abandon
def __init__(self, name): engine.shutdown_cluster(mock.Mock(), force=True)
self.name = name self.assertEqual(delete.call_count, 0)
self.assertEqual(abandon.call_count, 1)
self.assertEqual(cj.call_count, 1)
self.assertEqual(rdb.call_count, 1)
delete.reset_mock()
abandon.reset_mock()
rdb.reset_mock()
cj.reset_mock()
# Regular delete
engine.shutdown_cluster(mock.Mock(), force=False)
self.assertEqual(delete.call_count, 1)
self.assertEqual(abandon.call_count, 0)
self.assertEqual(cj.call_count, 1)
self.assertEqual(rdb.call_count, 1)
delete.reset_mock()
abandon.reset_mock()
rdb.reset_mock()
cj.reset_mock()
# Force delete when stack abandon unavailable
abandon.side_effect = heat_exc.BadRequest()
engine.shutdown_cluster(mock.Mock(), force=True)
self.assertEqual(delete.call_count, 0)
self.assertEqual(abandon.call_count, 1)
self.assertEqual(cj.call_count, 1)
self.assertEqual(rdb.call_count, 1)
log_err.assert_called_once_with(
"Can't force delete cluster.", exc_info=True)

View File

@ -70,7 +70,7 @@ class FakeINFRA(object):
TestOPS.SEQUENCE.append('INFRA.scale_cluster') TestOPS.SEQUENCE.append('INFRA.scale_cluster')
return True return True
def shutdown_cluster(self, cluster): def shutdown_cluster(self, cluster, force):
TestOPS.SEQUENCE.append('shutdown_cluster') TestOPS.SEQUENCE.append('shutdown_cluster')
def rollback_cluster(self, cluster, reason): def rollback_cluster(self, cluster, reason):

View File

@ -17,7 +17,9 @@ import mock
import testtools import testtools
from sahara import exceptions as ex from sahara import exceptions as ex
from sahara.service import validation as v
from sahara.service.validations import clusters as c_val from sahara.service.validations import clusters as c_val
from sahara.service.validations import clusters_schema as c_schema
from sahara.tests.unit.service.validation import utils as u from sahara.tests.unit.service.validation import utils as u
from sahara.tests.unit import testutils as tu from sahara.tests.unit import testutils as tu
@ -52,3 +54,34 @@ class TestClusterDeleteValidation(u.ValidationTestCase):
except ex.DeletionFailed as e: except ex.DeletionFailed as e:
self.assert_created_in_another_tenant_exception(e) self.assert_created_in_another_tenant_exception(e)
raise e raise e
class TestClusterDeleteValidationV2(testtools.TestCase):
@mock.patch("sahara.utils.api.request_data")
@mock.patch("sahara.utils.api.bad_request")
def _validate_body(self, request, br, rd):
m_func = mock.Mock()
m_func.__name__ = "m_func"
rd.return_value = request
validator = v.validate(c_schema.CLUSTER_DELETE_SCHEMA_V2, m_func)
validator(m_func)(data=request)
return not br.call_count
def test_delete_schema_empty_body(self):
request = {}
self.assertTrue(self._validate_body(request))
def test_delete_schema_wrong_type(self):
request = {"force": "True"}
self.assertFalse(self._validate_body(request))
def test_delete_schema_extra_fields(self):
request = {"force": True, "just_kidding": False}
self.assertFalse(self._validate_body(request))
def test_delete_schema_good(self):
request = {"force": True}
self.assertTrue(self._validate_body(request))

View File

@ -0,0 +1,39 @@
# Copyright (c) 2017 Massachusetts Open Cloud
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
from sahara.utils.openstack import heat as heat_u
class HeatClientTest(testtools.TestCase):
@mock.patch('heatclient.client.Client')
@mock.patch('sahara.utils.openstack.base.url_for')
@mock.patch('sahara.service.sessions.cache')
@mock.patch('sahara.context.ctx')
def test_abandon(self, ctx, cache, url_for, heat):
class _FakeHeatStacks(object):
def delete(self, stack):
call_list.append("delete")
def abandon(self, stack):
call_list.append("abandon")
heat.return_value.stacks = _FakeHeatStacks()
call_list = []
heat_u.abandon_stack(mock.Mock())
self.assertEqual(call_list, ["delete", "abandon"])

View File

@ -84,6 +84,15 @@ def delete_stack(cluster):
stack = get_stack(stack_name, raise_on_missing=False) stack = get_stack(stack_name, raise_on_missing=False)
def abandon_stack(cluster):
'''Put stack in deleting state if not already, then abandon it.'''
heat_client = client()
stack_name = cluster.stack_name
base.execute_with_retries(heat_client.stacks.delete, stack_name)
# TODO(jfreud): sleep between calls?
base.execute_with_retries(heat_client.stacks.abandon, stack_name)
def get_stack_outputs(cluster): def get_stack_outputs(cluster):
stack = get_stack(cluster.stack_name) stack = get_stack(cluster.stack_name)
stack.get() stack.get()