diff --git a/heat/engine/service.py b/heat/engine/service.py index 4fd759dcf8..c0fcf32fbd 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -11,6 +11,7 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet import functools import json import os @@ -152,8 +153,22 @@ class ThreadGroupManager(object): def stop(self, stack_id, graceful=False): '''Stop any active threads on a stack.''' if stack_id in self.groups: - self.groups[stack_id].stop(graceful) - del self.groups[stack_id] + threadgroup = self.groups.pop(stack_id) + threads = threadgroup.threads[:] + + threadgroup.stop(graceful) + threadgroup.wait() + + # Wait for link()ed functions (i.e. lock release) + links_done = dict((th, False) for th in threads) + + def mark_done(gt, th): + links_done[th] = True + + for th in threads: + th.link(mark_done, th) + while not all(links_done.values()): + eventlet.sleep() class StackWatch(object): @@ -717,14 +732,6 @@ class EngineService(service.Service): raise exception.StopActionFailed(stack_name=stack.name, engine_id=acquire_result) - # If the lock isn't released here, then the call to - # start_with_lock below will raise an ActionInProgress - # exception. Ideally, we wouldn't be calling another - # release() here, since it should be called as soon as the - # ThreadGroup is stopped. But apparently there's a race - # between release() the next call to lock.acquire(). - db_api.stack_lock_release(stack.id, acquire_result) - # There may be additional resources that we don't know about # if an update was in-progress when the stack was stopped, so # reload the stack from the database. diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index 146a763d15..e5697f380a 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -12,6 +12,7 @@ # under the License. +import eventlet import functools import json import sys @@ -2968,16 +2969,6 @@ class ThreadGroupManagerTest(HeatTestCase): **self.fkwargs) self.assertEqual(self.tg_mock.add_thread(), ret) - def test_tgm_stop(self): - stack_id = 'test' - - thm = service.ThreadGroupManager() - thm.start(stack_id, self.f, *self.fargs, **self.fkwargs) - thm.stop(stack_id, True) - - self.tg_mock.stop.assert_called_with(True) - self.assertNotIn(stack_id, thm.groups) - def test_tgm_add_timer(self): stack_id = 'test' @@ -2988,3 +2979,27 @@ class ThreadGroupManagerTest(HeatTestCase): self.tg_mock.add_timer.assert_called_with( self.cfg_mock.CONF.periodic_interval, self.f, *self.fargs, **self.fkwargs) + + +class ThreadGroupManagerStopTest(HeatTestCase): + def test_tgm_stop(self): + stack_id = 'test' + done = [] + + def function(): + while True: + eventlet.sleep() + + def linked(gt, thread): + for i in range(10): + eventlet.sleep() + done.append(thread) + + thm = service.ThreadGroupManager() + thread = thm.start(stack_id, function) + thread.link(linked, thread) + + thm.stop(stack_id) + + self.assertIn(thread, done) + self.assertNotIn(stack_id, thm.groups)