Merge "Re-trigger on update-replace" into stable/newton
This commit is contained in:
commit
298250fa45
|
@ -94,8 +94,8 @@ class CheckResource(object):
|
|||
latest_stack = parser.Stack.load(cnxt, stack_id=stack.id,
|
||||
force_reload=True)
|
||||
if traversal != latest_stack.current_traversal:
|
||||
self._retrigger_check_resource(cnxt, is_update, rsrc_id,
|
||||
latest_stack)
|
||||
self.retrigger_check_resource(cnxt, is_update, rsrc_id,
|
||||
latest_stack)
|
||||
|
||||
def _handle_stack_timeout(self, cnxt, stack):
|
||||
failure_reason = u'Timed out'
|
||||
|
@ -158,7 +158,7 @@ class CheckResource(object):
|
|||
|
||||
return False
|
||||
|
||||
def _retrigger_check_resource(self, cnxt, is_update, resource_id, stack):
|
||||
def retrigger_check_resource(self, cnxt, is_update, resource_id, stack):
|
||||
current_traversal = stack.current_traversal
|
||||
graph = stack.convergence_dependencies.graph()
|
||||
key = (resource_id, is_update)
|
||||
|
@ -239,8 +239,8 @@ class CheckResource(object):
|
|||
current_traversal)
|
||||
return
|
||||
|
||||
self._retrigger_check_resource(cnxt, is_update,
|
||||
resource_id, stack)
|
||||
self.retrigger_check_resource(cnxt, is_update,
|
||||
resource_id, stack)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
|
|
@ -137,6 +137,23 @@ class WorkerService(service.Service):
|
|||
|
||||
return True
|
||||
|
||||
def _retrigger_replaced(self, is_update, rsrc, stack, msg_queue):
|
||||
graph = stack.convergence_dependencies.graph()
|
||||
key = (rsrc.id, is_update)
|
||||
if key not in graph and rsrc.replaces is not None:
|
||||
# This resource replaces old one and is not needed in
|
||||
# current traversal. You need to mark the resource as
|
||||
# DELETED so that it gets cleaned up in purge_db.
|
||||
values = {'action': rsrc.DELETE}
|
||||
db_api.resource_update_and_save(stack.context, rsrc.id, values)
|
||||
# The old resource might be in the graph (a rollback case);
|
||||
# just re-trigger it.
|
||||
key = (rsrc.replaces, is_update)
|
||||
cr = check_resource.CheckResource(self.engine_id, self._rpc_client,
|
||||
self.thread_group_mgr, msg_queue)
|
||||
cr.retrigger_check_resource(stack.context, is_update, key[0],
|
||||
stack)
|
||||
|
||||
@context.request_context
|
||||
def check_resource(self, cnxt, resource_id, current_traversal, data,
|
||||
is_update, adopt_stack_data):
|
||||
|
@ -152,18 +169,20 @@ class WorkerService(service.Service):
|
|||
if rsrc is None:
|
||||
return
|
||||
|
||||
if current_traversal != stack.current_traversal:
|
||||
LOG.debug('[%s] Traversal cancelled; stopping.', current_traversal)
|
||||
return
|
||||
|
||||
msg_queue = eventlet.queue.LightQueue()
|
||||
try:
|
||||
self.thread_group_mgr.add_msg_queue(stack.id, msg_queue)
|
||||
cr = check_resource.CheckResource(self.engine_id, self._rpc_client,
|
||||
self.thread_group_mgr, msg_queue)
|
||||
|
||||
cr.check(cnxt, resource_id, current_traversal, resource_data,
|
||||
is_update, adopt_stack_data, rsrc, stack)
|
||||
if current_traversal != stack.current_traversal:
|
||||
LOG.debug('[%s] Traversal cancelled; re-trigerring.',
|
||||
current_traversal)
|
||||
self._retrigger_replaced(is_update, rsrc, stack, msg_queue)
|
||||
else:
|
||||
cr = check_resource.CheckResource(self.engine_id,
|
||||
self._rpc_client,
|
||||
self.thread_group_mgr,
|
||||
msg_queue)
|
||||
cr.check(cnxt, resource_id, current_traversal, resource_data,
|
||||
is_update, adopt_stack_data, rsrc, stack)
|
||||
finally:
|
||||
self.thread_group_mgr.remove_msg_queue(None,
|
||||
stack.id, msg_queue)
|
||||
|
|
|
@ -77,12 +77,12 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
|
|||
for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]:
|
||||
self.assertFalse(mocked.called)
|
||||
|
||||
@mock.patch.object(worker.WorkerService, '_retrigger_replaced')
|
||||
def test_stale_traversal(
|
||||
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
|
||||
self, mock_rnt, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
|
||||
self.worker.check_resource(self.ctx, self.resource.id,
|
||||
'stale-traversal', {}, True, None)
|
||||
for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]:
|
||||
self.assertFalse(mocked.called)
|
||||
self.assertTrue(mock_rnt.called)
|
||||
|
||||
def test_is_update_traversal(
|
||||
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
|
||||
|
@ -320,7 +320,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
|
|||
self.assertTrue(self.stack.purge_db.called)
|
||||
|
||||
@mock.patch.object(check_resource.CheckResource,
|
||||
'_retrigger_check_resource')
|
||||
'retrigger_check_resource')
|
||||
@mock.patch.object(stack.Stack, 'load')
|
||||
def test_initiate_propagate_rsrc_retriggers_check_rsrc_on_new_stack_update(
|
||||
self, mock_stack_load, mock_rcr, mock_cru, mock_crc, mock_pcr,
|
||||
|
@ -368,8 +368,8 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
|
|||
# A, B are predecessors to C when is_update is True
|
||||
expected_predecessors = {(self.stack['A'].id, True),
|
||||
(self.stack['B'].id, True)}
|
||||
self.cr._retrigger_check_resource(self.ctx, self.is_update,
|
||||
resC.id, self.stack)
|
||||
self.cr.retrigger_check_resource(self.ctx, self.is_update,
|
||||
resC.id, self.stack)
|
||||
mock_pcr.assert_called_once_with(self.ctx, mock.ANY, resC.id,
|
||||
self.stack.current_traversal,
|
||||
mock.ANY, (resC.id, True), None,
|
||||
|
@ -386,7 +386,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
|
|||
[(1, False), (1, True)], [(2, False), None]])
|
||||
# simulate rsrc 2 completing its update for old traversal
|
||||
# and calling rcr
|
||||
self.cr._retrigger_check_resource(self.ctx, True, 2, self.stack)
|
||||
self.cr.retrigger_check_resource(self.ctx, True, 2, self.stack)
|
||||
# Ensure that pcr was called with proper delete traversal
|
||||
mock_pcr.assert_called_once_with(self.ctx, mock.ANY, 2,
|
||||
self.stack.current_traversal,
|
||||
|
@ -401,7 +401,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
|
|||
[(1, False), (1, True)], [(2, False), (2, True)]])
|
||||
# simulate rsrc 2 completing its delete for old traversal
|
||||
# and calling rcr
|
||||
self.cr._retrigger_check_resource(self.ctx, False, 2, self.stack)
|
||||
self.cr.retrigger_check_resource(self.ctx, False, 2, self.stack)
|
||||
# Ensure that pcr was called with proper delete traversal
|
||||
mock_pcr.assert_called_once_with(self.ctx, mock.ANY, 2,
|
||||
self.stack.current_traversal,
|
||||
|
@ -426,7 +426,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
|
|||
@mock.patch.object(stack.Stack, 'purge_db')
|
||||
@mock.patch.object(stack.Stack, 'state_set')
|
||||
@mock.patch.object(check_resource.CheckResource,
|
||||
'_retrigger_check_resource')
|
||||
'retrigger_check_resource')
|
||||
@mock.patch.object(check_resource.CheckResource, '_trigger_rollback')
|
||||
def test_handle_rsrc_failure_when_update_fails(
|
||||
self, mock_tr, mock_rcr, mock_ss, mock_pdb, mock_cru, mock_crc,
|
||||
|
@ -444,7 +444,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
|
|||
@mock.patch.object(stack.Stack, 'purge_db')
|
||||
@mock.patch.object(stack.Stack, 'state_set')
|
||||
@mock.patch.object(check_resource.CheckResource,
|
||||
'_retrigger_check_resource')
|
||||
'retrigger_check_resource')
|
||||
@mock.patch.object(check_resource.CheckResource, '_trigger_rollback')
|
||||
def test_handle_rsrc_failure_when_update_fails_different_traversal(
|
||||
self, mock_tr, mock_rcr, mock_ss, mock_pdb, mock_cru, mock_crc,
|
||||
|
|
Loading…
Reference in New Issue