Legacy delete attempt thread cancel before stop

The error messages 'Command Out of Sync' are due to the threads being
stopped in the middle of the database operations. This happens in the
legacy action when delete is requested during a stack create.

We have the thread cancel message but that was not being used in this
case. Thread cancel should provide a more graceful way of ensuring the
stack is in a FAILED state before the delete is attempted.

This changes does the following in the delete_stack service method for
legace engine:
- if the stack is still locked, send thread cancel message
- in a subthread wait for the lock to be released, or until a
  timeout based on the 4 minute cancel grace period
- if the stack is still locked, do a thread stop as before

Closes-Bug: #1499669
Closes-Bug: #1546431
Closes-Bug: #1536451
Change-Id: I4cd613681f07d295955c4d8a06505d72d83728a0
This commit is contained in:
Steve Baker 2016-09-16 03:29:59 +00:00
parent ff5f34b0cf
commit 3000f90408
2 changed files with 98 additions and 38 deletions

View File

@ -1380,31 +1380,68 @@ class EngineService(service.Service):
if acquire_result == self.engine_id:
# give threads which are almost complete an opportunity to
# finish naturally before force stopping them
eventlet.sleep(0.2)
self.thread_group_mgr.stop(stack.id)
self.thread_group_mgr.send(stack.id, rpc_api.THREAD_CANCEL)
# Another active engine has the lock
elif service_utils.engine_alive(cnxt, acquire_result):
stop_result = self._remote_call(
cnxt, acquire_result, self.listener.STOP_STACK,
stack_identity=stack_identity)
if stop_result is None:
LOG.debug("Successfully stopped remote task on engine %s"
% acquire_result)
cancel_result = self._remote_call(
cnxt, acquire_result, self.listener.SEND,
stack_identity=stack_identity, message=rpc_api.THREAD_CANCEL)
if cancel_result is None:
LOG.debug("Successfully sent %(msg)s message "
"to remote task on engine %(eng)s" % {
'eng': acquire_result,
'msg': rpc_api.THREAD_CANCEL})
else:
raise exception.StopActionFailed(stack_name=stack.name,
engine_id=acquire_result)
raise exception.EventSendFailed(stack_name=stack.name,
engine_id=acquire_result)
# There may be additional resources that we don't know about
# if an update was in-progress when the stack was stopped, so
# reload the stack from the database.
st = self._get_stack(cnxt, stack_identity)
stack = parser.Stack.load(cnxt, stack=st)
self.resource_enforcer.enforce_stack(stack)
def reload():
st = self._get_stack(cnxt, stack_identity)
stack = parser.Stack.load(cnxt, stack=st)
self.resource_enforcer.enforce_stack(stack)
return stack
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
stack.delete)
return
def wait_then_delete(stack):
watch = timeutils.StopWatch(cfg.CONF.error_wait_time + 10)
watch.start()
while not watch.expired():
LOG.debug('Waiting for stack cancel to complete: %s' %
stack.name)
with lock.try_thread_lock() as acquire_result:
if acquire_result is None:
stack = reload()
# do the actual delete with the aquired lock
self.thread_group_mgr.start_with_acquired_lock(
stack, lock, stack.delete)
return
eventlet.sleep(1.0)
if acquire_result == self.engine_id:
# cancel didn't finish in time, attempt a stop instead
self.thread_group_mgr.stop(stack.id)
elif service_utils.engine_alive(cnxt, acquire_result):
# Another active engine has the lock
stop_result = self._remote_call(
cnxt, acquire_result, self.listener.STOP_STACK,
stack_identity=stack_identity)
if stop_result is None:
LOG.debug("Successfully stopped remote task "
"on engine %s" % acquire_result)
else:
raise exception.StopActionFailed(
stack_name=stack.name, engine_id=acquire_result)
stack = reload()
# do the actual delete in a locked task
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
stack.delete)
# Cancelling the stack could take some time, so do it in a task
self.thread_group_mgr.start(stack.id, wait_then_delete,
stack)
@context.request_context
def export_stack(self, cnxt, stack_identity):

View File

@ -11,7 +11,9 @@
# under the License.
import mock
from oslo_config import cfg
from oslo_messaging.rpc import dispatcher
from oslo_utils import timeutils
from heat.common import exception
from heat.common import service_utils
@ -97,8 +99,11 @@ class StackDeleteTest(common.HeatTestCase):
@mock.patch.object(parser.Stack, 'load')
@mock.patch.object(stack_lock.StackLock, 'try_acquire')
@mock.patch.object(stack_lock.StackLock, 'acquire')
def test_stack_delete_current_engine_active_lock(self, mock_acquire,
mock_try, mock_load):
@mock.patch.object(timeutils.StopWatch, 'expired')
def test_stack_delete_current_engine_active_lock(self, mock_expired,
mock_acquire, mock_try,
mock_load):
cfg.CONF.set_override('error_wait_time', 0)
self.man.start()
stack_name = 'service_delete_test_stack_current_active_lock'
stack = tools.get_stack(stack_name, self.ctx)
@ -108,27 +113,32 @@ class StackDeleteTest(common.HeatTestCase):
stack_lock_object.StackLock.create(
self.ctx, stack.id, self.man.engine_id)
# Create a fake ThreadGroup too
self.man.thread_group_mgr.groups[stack.id] = tools.DummyThreadGroup()
st = stack_object.Stack.get_by_id(self.ctx, sid)
mock_load.return_value = stack
mock_try.return_value = self.man.engine_id
mock_stop = self.patchobject(self.man.thread_group_mgr, 'stop')
mock_send = self.patchobject(self.man.thread_group_mgr, 'send')
mock_expired.side_effect = [False, True]
self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier()))
self.man.thread_group_mgr.groups[sid].wait()
mock_load.assert_called_with(self.ctx, stack=st)
self.assertEqual(2, len(mock_load.mock_calls))
mock_try.assert_called_once_with()
mock_acquire.assert_called_once_with(True)
mock_send.assert_called_once_with(stack.id, 'cancel')
mock_stop.assert_called_once_with(stack.id)
self.assertEqual(2, len(mock_load.mock_calls))
mock_try.assert_called_with()
mock_acquire.assert_called_once_with(True)
@mock.patch.object(parser.Stack, 'load')
@mock.patch.object(stack_lock.StackLock, 'try_acquire')
@mock.patch.object(service_utils, 'engine_alive')
def test_stack_delete_other_engine_active_lock_failed(self, mock_alive,
mock_try, mock_load):
@mock.patch.object(timeutils.StopWatch, 'expired')
def test_stack_delete_other_engine_active_lock_failed(self, mock_expired,
mock_alive, mock_try,
mock_load):
cfg.CONF.set_override('error_wait_time', 0)
OTHER_ENGINE = "other-engine-fake-uuid"
self.man.start()
stack_name = 'service_delete_test_stack_other_engine_lock_fail'
@ -142,6 +152,7 @@ class StackDeleteTest(common.HeatTestCase):
mock_load.return_value = stack
mock_try.return_value = OTHER_ENGINE
mock_alive.return_value = True
mock_expired.side_effect = [False, True]
mock_call = self.patchobject(self.man, '_remote_call',
return_value=False)
@ -149,20 +160,23 @@ class StackDeleteTest(common.HeatTestCase):
ex = self.assertRaises(dispatcher.ExpectedException,
self.man.delete_stack,
self.ctx, stack.identifier())
self.assertEqual(exception.StopActionFailed, ex.exc_info[0])
self.assertEqual(exception.EventSendFailed, ex.exc_info[0])
mock_load.assert_called_once_with(self.ctx, stack=st)
mock_try.assert_called_once_with()
mock_alive.assert_called_once_with(self.ctx, OTHER_ENGINE)
mock_call.assert_called_once_with(self.ctx, OTHER_ENGINE, "stop_stack",
mock_call.assert_called_once_with(self.ctx, OTHER_ENGINE, "send",
message='cancel',
stack_identity=mock.ANY)
@mock.patch.object(parser.Stack, 'load')
@mock.patch.object(stack_lock.StackLock, 'try_acquire')
@mock.patch.object(service_utils, 'engine_alive')
@mock.patch.object(stack_lock.StackLock, 'acquire')
@mock.patch.object(timeutils.StopWatch, 'expired')
def test_stack_delete_other_engine_active_lock_succeeded(
self, mock_acquire, mock_alive, mock_try, mock_load):
self, mock_expired, mock_acquire, mock_alive, mock_try, mock_load):
cfg.CONF.set_override('error_wait_time', 0)
OTHER_ENGINE = "other-engine-fake-uuid"
self.man.start()
@ -177,6 +191,7 @@ class StackDeleteTest(common.HeatTestCase):
mock_load.return_value = stack
mock_try.return_value = OTHER_ENGINE
mock_alive.return_value = True
mock_expired.side_effect = [False, True]
mock_call = self.patchobject(self.man, '_remote_call',
return_value=None)
@ -185,18 +200,25 @@ class StackDeleteTest(common.HeatTestCase):
self.assertEqual(2, len(mock_load.mock_calls))
mock_load.assert_called_with(self.ctx, stack=st)
mock_try.assert_called_once_with()
mock_alive.assert_called_once_with(self.ctx, OTHER_ENGINE)
mock_call.assert_called_once_with(self.ctx, OTHER_ENGINE, "stop_stack",
stack_identity=mock.ANY)
mock_try.assert_called_with()
mock_alive.assert_called_with(self.ctx, OTHER_ENGINE)
mock_call.assert_has_calls([
mock.call(self.ctx, OTHER_ENGINE, "send",
message='cancel',
stack_identity=mock.ANY),
mock.call(self.ctx, OTHER_ENGINE, "stop_stack",
stack_identity=mock.ANY)
])
mock_acquire.assert_called_once_with(True)
@mock.patch.object(parser.Stack, 'load')
@mock.patch.object(stack_lock.StackLock, 'try_acquire')
@mock.patch.object(service_utils, 'engine_alive')
@mock.patch.object(stack_lock.StackLock, 'acquire')
@mock.patch.object(timeutils.StopWatch, 'expired')
def test_stack_delete_other_dead_engine_active_lock(
self, mock_acquire, mock_alive, mock_try, mock_load):
self, mock_expired, mock_acquire, mock_alive, mock_try, mock_load):
cfg.CONF.set_override('error_wait_time', 0)
OTHER_ENGINE = "other-engine-fake-uuid"
stack_name = 'service_delete_test_stack_other_dead_engine'
stack = tools.get_stack(stack_name, self.ctx)
@ -210,11 +232,12 @@ class StackDeleteTest(common.HeatTestCase):
mock_load.return_value = stack
mock_try.return_value = OTHER_ENGINE
mock_alive.return_value = False
mock_expired.side_effect = [False, True]
self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier()))
self.man.thread_group_mgr.groups[sid].wait()
mock_load.assert_called_with(self.ctx, stack=st)
mock_try.assert_called_once_with()
mock_try.assert_called_with()
mock_acquire.assert_called_once_with(True)
mock_alive.assert_called_once_with(self.ctx, OTHER_ENGINE)
mock_alive.assert_called_with(self.ctx, OTHER_ENGINE)