From 2dd44db1b9cf4b789d8a083df6f97ae1fb5e22d5 Mon Sep 17 00:00:00 2001 From: Steve Baker Date: Fri, 16 Sep 2016 03:29:59 +0000 Subject: [PATCH] Legacy delete attempt thread cancel before stop The error messages 'Command Out of Sync' are due to the threads being stopped in the middle of the database operations. This happens in the legacy action when delete is requested during a stack create. We have the thread cancel message but that was not being used in this case. Thread cancel should provide a more graceful way of ensuring the stack is in a FAILED state before the delete is attempted. This changes does the following in the delete_stack service method for legace engine: - if the stack is still locked, send thread cancel message - in a subthread wait for the lock to be released, or until a timeout based on the 4 minute cancel grace period - if the stack is still locked, do a thread stop as before Closes-Bug: #1499669 Closes-Bug: #1546431 Closes-Bug: #1536451 Change-Id: I4cd613681f07d295955c4d8a06505d72d83728a0 (cherry picked from commit 3000f904080d8dcd841d913dcd2ae658fb526c1a) --- heat/engine/service.py | 75 ++++++++++++++----- .../tests/engine/service/test_stack_delete.py | 61 ++++++++++----- 2 files changed, 98 insertions(+), 38 deletions(-) diff --git a/heat/engine/service.py b/heat/engine/service.py index cca3c3148..0a99e216a 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -1380,31 +1380,68 @@ class EngineService(service.Service): if acquire_result == self.engine_id: # give threads which are almost complete an opportunity to # finish naturally before force stopping them - eventlet.sleep(0.2) - self.thread_group_mgr.stop(stack.id) + self.thread_group_mgr.send(stack.id, rpc_api.THREAD_CANCEL) # Another active engine has the lock elif service_utils.engine_alive(cnxt, acquire_result): - stop_result = self._remote_call( - cnxt, acquire_result, self.listener.STOP_STACK, - stack_identity=stack_identity) - if stop_result is None: - LOG.debug("Successfully stopped remote task on engine %s" - % acquire_result) + cancel_result = self._remote_call( + cnxt, acquire_result, self.listener.SEND, + stack_identity=stack_identity, message=rpc_api.THREAD_CANCEL) + if cancel_result is None: + LOG.debug("Successfully sent %(msg)s message " + "to remote task on engine %(eng)s" % { + 'eng': acquire_result, + 'msg': rpc_api.THREAD_CANCEL}) else: - raise exception.StopActionFailed(stack_name=stack.name, - engine_id=acquire_result) + raise exception.EventSendFailed(stack_name=stack.name, + engine_id=acquire_result) - # There may be additional resources that we don't know about - # if an update was in-progress when the stack was stopped, so - # reload the stack from the database. - st = self._get_stack(cnxt, stack_identity) - stack = parser.Stack.load(cnxt, stack=st) - self.resource_enforcer.enforce_stack(stack) + def reload(): + st = self._get_stack(cnxt, stack_identity) + stack = parser.Stack.load(cnxt, stack=st) + self.resource_enforcer.enforce_stack(stack) + return stack - self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id, - stack.delete) - return + def wait_then_delete(stack): + watch = timeutils.StopWatch(cfg.CONF.error_wait_time + 10) + watch.start() + + while not watch.expired(): + LOG.debug('Waiting for stack cancel to complete: %s' % + stack.name) + with lock.try_thread_lock() as acquire_result: + + if acquire_result is None: + stack = reload() + # do the actual delete with the aquired lock + self.thread_group_mgr.start_with_acquired_lock( + stack, lock, stack.delete) + return + eventlet.sleep(1.0) + + if acquire_result == self.engine_id: + # cancel didn't finish in time, attempt a stop instead + self.thread_group_mgr.stop(stack.id) + elif service_utils.engine_alive(cnxt, acquire_result): + # Another active engine has the lock + stop_result = self._remote_call( + cnxt, acquire_result, self.listener.STOP_STACK, + stack_identity=stack_identity) + if stop_result is None: + LOG.debug("Successfully stopped remote task " + "on engine %s" % acquire_result) + else: + raise exception.StopActionFailed( + stack_name=stack.name, engine_id=acquire_result) + + stack = reload() + # do the actual delete in a locked task + self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id, + stack.delete) + + # Cancelling the stack could take some time, so do it in a task + self.thread_group_mgr.start(stack.id, wait_then_delete, + stack) @context.request_context def export_stack(self, cnxt, stack_identity): diff --git a/heat/tests/engine/service/test_stack_delete.py b/heat/tests/engine/service/test_stack_delete.py index 6ca22e71f..ab0dd282d 100644 --- a/heat/tests/engine/service/test_stack_delete.py +++ b/heat/tests/engine/service/test_stack_delete.py @@ -11,7 +11,9 @@ # under the License. import mock +from oslo_config import cfg from oslo_messaging.rpc import dispatcher +from oslo_utils import timeutils from heat.common import exception from heat.common import service_utils @@ -97,8 +99,11 @@ class StackDeleteTest(common.HeatTestCase): @mock.patch.object(parser.Stack, 'load') @mock.patch.object(stack_lock.StackLock, 'try_acquire') @mock.patch.object(stack_lock.StackLock, 'acquire') - def test_stack_delete_current_engine_active_lock(self, mock_acquire, - mock_try, mock_load): + @mock.patch.object(timeutils.StopWatch, 'expired') + def test_stack_delete_current_engine_active_lock(self, mock_expired, + mock_acquire, mock_try, + mock_load): + cfg.CONF.set_override('error_wait_time', 0) self.man.start() stack_name = 'service_delete_test_stack_current_active_lock' stack = tools.get_stack(stack_name, self.ctx) @@ -108,27 +113,32 @@ class StackDeleteTest(common.HeatTestCase): stack_lock_object.StackLock.create( self.ctx, stack.id, self.man.engine_id) - # Create a fake ThreadGroup too - self.man.thread_group_mgr.groups[stack.id] = tools.DummyThreadGroup() st = stack_object.Stack.get_by_id(self.ctx, sid) mock_load.return_value = stack mock_try.return_value = self.man.engine_id mock_stop = self.patchobject(self.man.thread_group_mgr, 'stop') + mock_send = self.patchobject(self.man.thread_group_mgr, 'send') + mock_expired.side_effect = [False, True] self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier())) + self.man.thread_group_mgr.groups[sid].wait() mock_load.assert_called_with(self.ctx, stack=st) - self.assertEqual(2, len(mock_load.mock_calls)) - mock_try.assert_called_once_with() - mock_acquire.assert_called_once_with(True) + mock_send.assert_called_once_with(stack.id, 'cancel') mock_stop.assert_called_once_with(stack.id) + self.assertEqual(2, len(mock_load.mock_calls)) + mock_try.assert_called_with() + mock_acquire.assert_called_once_with(True) @mock.patch.object(parser.Stack, 'load') @mock.patch.object(stack_lock.StackLock, 'try_acquire') @mock.patch.object(service_utils, 'engine_alive') - def test_stack_delete_other_engine_active_lock_failed(self, mock_alive, - mock_try, mock_load): + @mock.patch.object(timeutils.StopWatch, 'expired') + def test_stack_delete_other_engine_active_lock_failed(self, mock_expired, + mock_alive, mock_try, + mock_load): + cfg.CONF.set_override('error_wait_time', 0) OTHER_ENGINE = "other-engine-fake-uuid" self.man.start() stack_name = 'service_delete_test_stack_other_engine_lock_fail' @@ -142,6 +152,7 @@ class StackDeleteTest(common.HeatTestCase): mock_load.return_value = stack mock_try.return_value = OTHER_ENGINE mock_alive.return_value = True + mock_expired.side_effect = [False, True] mock_call = self.patchobject(self.man, '_remote_call', return_value=False) @@ -149,20 +160,23 @@ class StackDeleteTest(common.HeatTestCase): ex = self.assertRaises(dispatcher.ExpectedException, self.man.delete_stack, self.ctx, stack.identifier()) - self.assertEqual(exception.StopActionFailed, ex.exc_info[0]) + self.assertEqual(exception.EventSendFailed, ex.exc_info[0]) mock_load.assert_called_once_with(self.ctx, stack=st) mock_try.assert_called_once_with() mock_alive.assert_called_once_with(self.ctx, OTHER_ENGINE) - mock_call.assert_called_once_with(self.ctx, OTHER_ENGINE, "stop_stack", + mock_call.assert_called_once_with(self.ctx, OTHER_ENGINE, "send", + message='cancel', stack_identity=mock.ANY) @mock.patch.object(parser.Stack, 'load') @mock.patch.object(stack_lock.StackLock, 'try_acquire') @mock.patch.object(service_utils, 'engine_alive') @mock.patch.object(stack_lock.StackLock, 'acquire') + @mock.patch.object(timeutils.StopWatch, 'expired') def test_stack_delete_other_engine_active_lock_succeeded( - self, mock_acquire, mock_alive, mock_try, mock_load): + self, mock_expired, mock_acquire, mock_alive, mock_try, mock_load): + cfg.CONF.set_override('error_wait_time', 0) OTHER_ENGINE = "other-engine-fake-uuid" self.man.start() @@ -177,6 +191,7 @@ class StackDeleteTest(common.HeatTestCase): mock_load.return_value = stack mock_try.return_value = OTHER_ENGINE mock_alive.return_value = True + mock_expired.side_effect = [False, True] mock_call = self.patchobject(self.man, '_remote_call', return_value=None) @@ -185,18 +200,25 @@ class StackDeleteTest(common.HeatTestCase): self.assertEqual(2, len(mock_load.mock_calls)) mock_load.assert_called_with(self.ctx, stack=st) - mock_try.assert_called_once_with() - mock_alive.assert_called_once_with(self.ctx, OTHER_ENGINE) - mock_call.assert_called_once_with(self.ctx, OTHER_ENGINE, "stop_stack", - stack_identity=mock.ANY) + mock_try.assert_called_with() + mock_alive.assert_called_with(self.ctx, OTHER_ENGINE) + mock_call.assert_has_calls([ + mock.call(self.ctx, OTHER_ENGINE, "send", + message='cancel', + stack_identity=mock.ANY), + mock.call(self.ctx, OTHER_ENGINE, "stop_stack", + stack_identity=mock.ANY) + ]) mock_acquire.assert_called_once_with(True) @mock.patch.object(parser.Stack, 'load') @mock.patch.object(stack_lock.StackLock, 'try_acquire') @mock.patch.object(service_utils, 'engine_alive') @mock.patch.object(stack_lock.StackLock, 'acquire') + @mock.patch.object(timeutils.StopWatch, 'expired') def test_stack_delete_other_dead_engine_active_lock( - self, mock_acquire, mock_alive, mock_try, mock_load): + self, mock_expired, mock_acquire, mock_alive, mock_try, mock_load): + cfg.CONF.set_override('error_wait_time', 0) OTHER_ENGINE = "other-engine-fake-uuid" stack_name = 'service_delete_test_stack_other_dead_engine' stack = tools.get_stack(stack_name, self.ctx) @@ -210,11 +232,12 @@ class StackDeleteTest(common.HeatTestCase): mock_load.return_value = stack mock_try.return_value = OTHER_ENGINE mock_alive.return_value = False + mock_expired.side_effect = [False, True] self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier())) self.man.thread_group_mgr.groups[sid].wait() mock_load.assert_called_with(self.ctx, stack=st) - mock_try.assert_called_once_with() + mock_try.assert_called_with() mock_acquire.assert_called_once_with(True) - mock_alive.assert_called_once_with(self.ctx, OTHER_ENGINE) + mock_alive.assert_called_with(self.ctx, OTHER_ENGINE)