Shut the heat-engine after all threads finished

Currently heat-engine do shutdown immediately when SIGTERM received.
At that time, a stacks in middle of processing would terminate forcefully.
It's not graceful.

This patch aims to implement graceful shutdown with following methods.

  * Close rpc connection at first for preventing new requests arrived
    after SIGTERM received.
  * Stop stack processing with graceful option.
    The graceful stop functionality is provided by oslo-incubator.
  * Then terminating process.

Change-Id: I8689b830774f7916febb59aca00979d92c0448b5
Closes-bug: #1304244
This commit is contained in:
Mitsuru Kanabuchi 2014-04-25 21:07:31 +09:00
parent f50aa49c6e
commit 3e3f9a9a4d
2 changed files with 28 additions and 5 deletions

View File

@ -150,10 +150,10 @@ class ThreadGroupManager(object):
self.groups[stack_id].add_timer(cfg.CONF.periodic_interval,
func, *args, **kwargs)
def stop(self, stack_id):
def stop(self, stack_id, graceful=False):
'''Stop any active threads on a stack.'''
if stack_id in self.groups:
self.groups[stack_id].stop()
self.groups[stack_id].stop(graceful)
del self.groups[stack_id]
@ -297,6 +297,29 @@ class EngineService(service.Service):
for s in stacks:
self.stack_watch.start_watch_task(s.id, admin_context)
def stop(self):
# Stop rpc connection at first for preventing new requests
logger.info(_("Attempting to stop engine service..."))
try:
self.conn.close()
except Exception:
pass
# Wait for all active threads to be finished
for stack_id in self.thread_group_mgr.groups.keys():
# Ingore dummy service task
if stack_id == cfg.CONF.periodic_interval:
continue
logger.info(_("Waiting stack %s processing to be finished")
% stack_id)
# Stop threads gracefully
self.thread_group_mgr.stop(stack_id, True)
logger.info(_("Stack %s processing was finished") % stack_id)
# Terminate the engine process
logger.info(_("All threads were gone, terminating engine"))
super(EngineService, self).stop()
@staticmethod
def load_user_creds(creds_id):
user_creds = db_api.user_creds_get(creds_id)

View File

@ -297,7 +297,7 @@ class DummyThreadGroup(object):
self.threads.append(callback)
return self.pool.spawn(callback, *args, **kwargs)
def stop(self):
def stop(self, graceful=False):
pass
def wait(self):
@ -2974,9 +2974,9 @@ class ThreadGroupManagerTest(HeatTestCase):
thm = service.ThreadGroupManager()
thm.start(stack_id, self.f, *self.fargs, **self.fkwargs)
thm.stop(stack_id)
thm.stop(stack_id, True)
self.tg_mock.stop.assert_called_once()
self.tg_mock.stop.assert_called_with(True)
self.assertNotIn(stack_id, thm.groups)
def test_tgm_add_timer(self):