Legacy delete attempt thread cancel before stop
The error messages 'Command Out of Sync' are due to the threads being stopped in the middle of the database operations. This happens in the legacy action when delete is requested during a stack create. We have the thread cancel message but that was not being used in this case. Thread cancel should provide a more graceful way of ensuring the stack is in a FAILED state before the delete is attempted. This changes does the following in the delete_stack service method for legace engine: - if the stack is still locked, send thread cancel message - in a subthread wait for the lock to be released, or until a timeout based on the 4 minute cancel grace period - if the stack is still locked, do a thread stop as before Closes-Bug: #1499669 Closes-Bug: #1546431 Closes-Bug: #1536451 Change-Id: I4cd613681f07d295955c4d8a06505d72d83728a0
This commit is contained in:
parent
ff5f34b0cf
commit
3000f90408
|
@ -1380,31 +1380,68 @@ class EngineService(service.Service):
|
|||
if acquire_result == self.engine_id:
|
||||
# give threads which are almost complete an opportunity to
|
||||
# finish naturally before force stopping them
|
||||
eventlet.sleep(0.2)
|
||||
self.thread_group_mgr.stop(stack.id)
|
||||
self.thread_group_mgr.send(stack.id, rpc_api.THREAD_CANCEL)
|
||||
|
||||
# Another active engine has the lock
|
||||
elif service_utils.engine_alive(cnxt, acquire_result):
|
||||
stop_result = self._remote_call(
|
||||
cnxt, acquire_result, self.listener.STOP_STACK,
|
||||
stack_identity=stack_identity)
|
||||
if stop_result is None:
|
||||
LOG.debug("Successfully stopped remote task on engine %s"
|
||||
% acquire_result)
|
||||
cancel_result = self._remote_call(
|
||||
cnxt, acquire_result, self.listener.SEND,
|
||||
stack_identity=stack_identity, message=rpc_api.THREAD_CANCEL)
|
||||
if cancel_result is None:
|
||||
LOG.debug("Successfully sent %(msg)s message "
|
||||
"to remote task on engine %(eng)s" % {
|
||||
'eng': acquire_result,
|
||||
'msg': rpc_api.THREAD_CANCEL})
|
||||
else:
|
||||
raise exception.StopActionFailed(stack_name=stack.name,
|
||||
engine_id=acquire_result)
|
||||
raise exception.EventSendFailed(stack_name=stack.name,
|
||||
engine_id=acquire_result)
|
||||
|
||||
# There may be additional resources that we don't know about
|
||||
# if an update was in-progress when the stack was stopped, so
|
||||
# reload the stack from the database.
|
||||
st = self._get_stack(cnxt, stack_identity)
|
||||
stack = parser.Stack.load(cnxt, stack=st)
|
||||
self.resource_enforcer.enforce_stack(stack)
|
||||
def reload():
|
||||
st = self._get_stack(cnxt, stack_identity)
|
||||
stack = parser.Stack.load(cnxt, stack=st)
|
||||
self.resource_enforcer.enforce_stack(stack)
|
||||
return stack
|
||||
|
||||
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
|
||||
stack.delete)
|
||||
return
|
||||
def wait_then_delete(stack):
|
||||
watch = timeutils.StopWatch(cfg.CONF.error_wait_time + 10)
|
||||
watch.start()
|
||||
|
||||
while not watch.expired():
|
||||
LOG.debug('Waiting for stack cancel to complete: %s' %
|
||||
stack.name)
|
||||
with lock.try_thread_lock() as acquire_result:
|
||||
|
||||
if acquire_result is None:
|
||||
stack = reload()
|
||||
# do the actual delete with the aquired lock
|
||||
self.thread_group_mgr.start_with_acquired_lock(
|
||||
stack, lock, stack.delete)
|
||||
return
|
||||
eventlet.sleep(1.0)
|
||||
|
||||
if acquire_result == self.engine_id:
|
||||
# cancel didn't finish in time, attempt a stop instead
|
||||
self.thread_group_mgr.stop(stack.id)
|
||||
elif service_utils.engine_alive(cnxt, acquire_result):
|
||||
# Another active engine has the lock
|
||||
stop_result = self._remote_call(
|
||||
cnxt, acquire_result, self.listener.STOP_STACK,
|
||||
stack_identity=stack_identity)
|
||||
if stop_result is None:
|
||||
LOG.debug("Successfully stopped remote task "
|
||||
"on engine %s" % acquire_result)
|
||||
else:
|
||||
raise exception.StopActionFailed(
|
||||
stack_name=stack.name, engine_id=acquire_result)
|
||||
|
||||
stack = reload()
|
||||
# do the actual delete in a locked task
|
||||
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
|
||||
stack.delete)
|
||||
|
||||
# Cancelling the stack could take some time, so do it in a task
|
||||
self.thread_group_mgr.start(stack.id, wait_then_delete,
|
||||
stack)
|
||||
|
||||
@context.request_context
|
||||
def export_stack(self, cnxt, stack_identity):
|
||||
|
|
|
@ -11,7 +11,9 @@
|
|||
# under the License.
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from heat.common import exception
|
||||
from heat.common import service_utils
|
||||
|
@ -97,8 +99,11 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
@mock.patch.object(parser.Stack, 'load')
|
||||
@mock.patch.object(stack_lock.StackLock, 'try_acquire')
|
||||
@mock.patch.object(stack_lock.StackLock, 'acquire')
|
||||
def test_stack_delete_current_engine_active_lock(self, mock_acquire,
|
||||
mock_try, mock_load):
|
||||
@mock.patch.object(timeutils.StopWatch, 'expired')
|
||||
def test_stack_delete_current_engine_active_lock(self, mock_expired,
|
||||
mock_acquire, mock_try,
|
||||
mock_load):
|
||||
cfg.CONF.set_override('error_wait_time', 0)
|
||||
self.man.start()
|
||||
stack_name = 'service_delete_test_stack_current_active_lock'
|
||||
stack = tools.get_stack(stack_name, self.ctx)
|
||||
|
@ -108,27 +113,32 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
stack_lock_object.StackLock.create(
|
||||
self.ctx, stack.id, self.man.engine_id)
|
||||
|
||||
# Create a fake ThreadGroup too
|
||||
self.man.thread_group_mgr.groups[stack.id] = tools.DummyThreadGroup()
|
||||
st = stack_object.Stack.get_by_id(self.ctx, sid)
|
||||
|
||||
mock_load.return_value = stack
|
||||
mock_try.return_value = self.man.engine_id
|
||||
mock_stop = self.patchobject(self.man.thread_group_mgr, 'stop')
|
||||
mock_send = self.patchobject(self.man.thread_group_mgr, 'send')
|
||||
mock_expired.side_effect = [False, True]
|
||||
|
||||
self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier()))
|
||||
self.man.thread_group_mgr.groups[sid].wait()
|
||||
|
||||
mock_load.assert_called_with(self.ctx, stack=st)
|
||||
self.assertEqual(2, len(mock_load.mock_calls))
|
||||
mock_try.assert_called_once_with()
|
||||
mock_acquire.assert_called_once_with(True)
|
||||
mock_send.assert_called_once_with(stack.id, 'cancel')
|
||||
mock_stop.assert_called_once_with(stack.id)
|
||||
self.assertEqual(2, len(mock_load.mock_calls))
|
||||
mock_try.assert_called_with()
|
||||
mock_acquire.assert_called_once_with(True)
|
||||
|
||||
@mock.patch.object(parser.Stack, 'load')
|
||||
@mock.patch.object(stack_lock.StackLock, 'try_acquire')
|
||||
@mock.patch.object(service_utils, 'engine_alive')
|
||||
def test_stack_delete_other_engine_active_lock_failed(self, mock_alive,
|
||||
mock_try, mock_load):
|
||||
@mock.patch.object(timeutils.StopWatch, 'expired')
|
||||
def test_stack_delete_other_engine_active_lock_failed(self, mock_expired,
|
||||
mock_alive, mock_try,
|
||||
mock_load):
|
||||
cfg.CONF.set_override('error_wait_time', 0)
|
||||
OTHER_ENGINE = "other-engine-fake-uuid"
|
||||
self.man.start()
|
||||
stack_name = 'service_delete_test_stack_other_engine_lock_fail'
|
||||
|
@ -142,6 +152,7 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
mock_load.return_value = stack
|
||||
mock_try.return_value = OTHER_ENGINE
|
||||
mock_alive.return_value = True
|
||||
mock_expired.side_effect = [False, True]
|
||||
|
||||
mock_call = self.patchobject(self.man, '_remote_call',
|
||||
return_value=False)
|
||||
|
@ -149,20 +160,23 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
ex = self.assertRaises(dispatcher.ExpectedException,
|
||||
self.man.delete_stack,
|
||||
self.ctx, stack.identifier())
|
||||
self.assertEqual(exception.StopActionFailed, ex.exc_info[0])
|
||||
self.assertEqual(exception.EventSendFailed, ex.exc_info[0])
|
||||
|
||||
mock_load.assert_called_once_with(self.ctx, stack=st)
|
||||
mock_try.assert_called_once_with()
|
||||
mock_alive.assert_called_once_with(self.ctx, OTHER_ENGINE)
|
||||
mock_call.assert_called_once_with(self.ctx, OTHER_ENGINE, "stop_stack",
|
||||
mock_call.assert_called_once_with(self.ctx, OTHER_ENGINE, "send",
|
||||
message='cancel',
|
||||
stack_identity=mock.ANY)
|
||||
|
||||
@mock.patch.object(parser.Stack, 'load')
|
||||
@mock.patch.object(stack_lock.StackLock, 'try_acquire')
|
||||
@mock.patch.object(service_utils, 'engine_alive')
|
||||
@mock.patch.object(stack_lock.StackLock, 'acquire')
|
||||
@mock.patch.object(timeutils.StopWatch, 'expired')
|
||||
def test_stack_delete_other_engine_active_lock_succeeded(
|
||||
self, mock_acquire, mock_alive, mock_try, mock_load):
|
||||
self, mock_expired, mock_acquire, mock_alive, mock_try, mock_load):
|
||||
cfg.CONF.set_override('error_wait_time', 0)
|
||||
|
||||
OTHER_ENGINE = "other-engine-fake-uuid"
|
||||
self.man.start()
|
||||
|
@ -177,6 +191,7 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
mock_load.return_value = stack
|
||||
mock_try.return_value = OTHER_ENGINE
|
||||
mock_alive.return_value = True
|
||||
mock_expired.side_effect = [False, True]
|
||||
mock_call = self.patchobject(self.man, '_remote_call',
|
||||
return_value=None)
|
||||
|
||||
|
@ -185,18 +200,25 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
|
||||
self.assertEqual(2, len(mock_load.mock_calls))
|
||||
mock_load.assert_called_with(self.ctx, stack=st)
|
||||
mock_try.assert_called_once_with()
|
||||
mock_alive.assert_called_once_with(self.ctx, OTHER_ENGINE)
|
||||
mock_call.assert_called_once_with(self.ctx, OTHER_ENGINE, "stop_stack",
|
||||
stack_identity=mock.ANY)
|
||||
mock_try.assert_called_with()
|
||||
mock_alive.assert_called_with(self.ctx, OTHER_ENGINE)
|
||||
mock_call.assert_has_calls([
|
||||
mock.call(self.ctx, OTHER_ENGINE, "send",
|
||||
message='cancel',
|
||||
stack_identity=mock.ANY),
|
||||
mock.call(self.ctx, OTHER_ENGINE, "stop_stack",
|
||||
stack_identity=mock.ANY)
|
||||
])
|
||||
mock_acquire.assert_called_once_with(True)
|
||||
|
||||
@mock.patch.object(parser.Stack, 'load')
|
||||
@mock.patch.object(stack_lock.StackLock, 'try_acquire')
|
||||
@mock.patch.object(service_utils, 'engine_alive')
|
||||
@mock.patch.object(stack_lock.StackLock, 'acquire')
|
||||
@mock.patch.object(timeutils.StopWatch, 'expired')
|
||||
def test_stack_delete_other_dead_engine_active_lock(
|
||||
self, mock_acquire, mock_alive, mock_try, mock_load):
|
||||
self, mock_expired, mock_acquire, mock_alive, mock_try, mock_load):
|
||||
cfg.CONF.set_override('error_wait_time', 0)
|
||||
OTHER_ENGINE = "other-engine-fake-uuid"
|
||||
stack_name = 'service_delete_test_stack_other_dead_engine'
|
||||
stack = tools.get_stack(stack_name, self.ctx)
|
||||
|
@ -210,11 +232,12 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
mock_load.return_value = stack
|
||||
mock_try.return_value = OTHER_ENGINE
|
||||
mock_alive.return_value = False
|
||||
mock_expired.side_effect = [False, True]
|
||||
|
||||
self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier()))
|
||||
self.man.thread_group_mgr.groups[sid].wait()
|
||||
|
||||
mock_load.assert_called_with(self.ctx, stack=st)
|
||||
mock_try.assert_called_once_with()
|
||||
mock_try.assert_called_with()
|
||||
mock_acquire.assert_called_once_with(True)
|
||||
mock_alive.assert_called_once_with(self.ctx, OTHER_ENGINE)
|
||||
mock_alive.assert_called_with(self.ctx, OTHER_ENGINE)
|
||||
|
|
Loading…
Reference in New Issue