Shut the heat-engine after all threads finished
Currently heat-engine do shutdown immediately when SIGTERM received. At that time, a stacks in middle of processing would terminate forcefully. It's not graceful. This patch aims to implement graceful shutdown with following methods. * Close rpc connection at first for preventing new requests arrived after SIGTERM received. * Stop stack processing with graceful option. The graceful stop functionality is provided by oslo-incubator. * Then terminating process. Change-Id: I8689b830774f7916febb59aca00979d92c0448b5 Closes-bug: #1304244
This commit is contained in:
parent
f50aa49c6e
commit
3e3f9a9a4d
|
@ -150,10 +150,10 @@ class ThreadGroupManager(object):
|
|||
self.groups[stack_id].add_timer(cfg.CONF.periodic_interval,
|
||||
func, *args, **kwargs)
|
||||
|
||||
def stop(self, stack_id):
|
||||
def stop(self, stack_id, graceful=False):
|
||||
'''Stop any active threads on a stack.'''
|
||||
if stack_id in self.groups:
|
||||
self.groups[stack_id].stop()
|
||||
self.groups[stack_id].stop(graceful)
|
||||
del self.groups[stack_id]
|
||||
|
||||
|
||||
|
@ -297,6 +297,29 @@ class EngineService(service.Service):
|
|||
for s in stacks:
|
||||
self.stack_watch.start_watch_task(s.id, admin_context)
|
||||
|
||||
def stop(self):
|
||||
# Stop rpc connection at first for preventing new requests
|
||||
logger.info(_("Attempting to stop engine service..."))
|
||||
try:
|
||||
self.conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Wait for all active threads to be finished
|
||||
for stack_id in self.thread_group_mgr.groups.keys():
|
||||
# Ingore dummy service task
|
||||
if stack_id == cfg.CONF.periodic_interval:
|
||||
continue
|
||||
logger.info(_("Waiting stack %s processing to be finished")
|
||||
% stack_id)
|
||||
# Stop threads gracefully
|
||||
self.thread_group_mgr.stop(stack_id, True)
|
||||
logger.info(_("Stack %s processing was finished") % stack_id)
|
||||
|
||||
# Terminate the engine process
|
||||
logger.info(_("All threads were gone, terminating engine"))
|
||||
super(EngineService, self).stop()
|
||||
|
||||
@staticmethod
|
||||
def load_user_creds(creds_id):
|
||||
user_creds = db_api.user_creds_get(creds_id)
|
||||
|
|
|
@ -297,7 +297,7 @@ class DummyThreadGroup(object):
|
|||
self.threads.append(callback)
|
||||
return self.pool.spawn(callback, *args, **kwargs)
|
||||
|
||||
def stop(self):
|
||||
def stop(self, graceful=False):
|
||||
pass
|
||||
|
||||
def wait(self):
|
||||
|
@ -2974,9 +2974,9 @@ class ThreadGroupManagerTest(HeatTestCase):
|
|||
|
||||
thm = service.ThreadGroupManager()
|
||||
thm.start(stack_id, self.f, *self.fargs, **self.fkwargs)
|
||||
thm.stop(stack_id)
|
||||
thm.stop(stack_id, True)
|
||||
|
||||
self.tg_mock.stop.assert_called_once()
|
||||
self.tg_mock.stop.assert_called_with(True)
|
||||
self.assertNotIn(stack_id, thm.groups)
|
||||
|
||||
def test_tgm_add_timer(self):
|
||||
|
|
Loading…
Reference in New Issue