graceful-shutdown: add graceful shutdown into compute

The following commit brings in oslo-incubator service.py which
includes calls to support graceful shutdown.  The common service
class has changed to handle signals and manage Services.  When
a signal is received the Services are stopped which in turn
waits on all child threads (Proxy callbacks) to complete.

The oslo-incubator commit hash that was used to update
nova was b5fba9e40d609d63b4dab9ca351f7e22e9d8b35b.

This allows long running commands to complete prior to killing
the process.  This change only works for SIGTERM and SIGINT
signals.

Unit test was added to verify parent stop method is called in
a shutdown condition.

Change-Id: I21a45862f491530523037632fd70c60cc0153c0a
Implements: blueprint graceful-shutdown
This commit is contained in:
Christopher Lefelhocz 2013-09-17 17:00:25 +00:00 committed by Gerrit Code Review
parent 805085faf1
commit a16ce230aa
3 changed files with 228 additions and 64 deletions

View File

@ -27,11 +27,12 @@ import sys
import time
import eventlet
from eventlet import event
import logging as std_logging
from oslo.config import cfg
from nova.openstack.common import eventlet_backdoor
from nova.openstack.common.gettextutils import _
from nova.openstack.common.gettextutils import _ # noqa
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
from nova.openstack.common import threadgroup
@ -42,6 +43,29 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def _sighup_supported():
return hasattr(signal, 'SIGHUP')
def _is_sighup(signo):
return _sighup_supported() and signo == signal.SIGHUP
def _signo_to_signame(signo):
signals = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}
if _sighup_supported():
signals[signal.SIGHUP] = 'SIGHUP'
return signals[signo]
def _set_signals_handler(handler):
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
if _sighup_supported():
signal.signal(signal.SIGHUP, handler)
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
@ -51,20 +75,9 @@ class Launcher(object):
:returns: None
"""
self._services = threadgroup.ThreadGroup()
self.services = Services()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
@staticmethod
def run_service(service):
"""Start and wait for a service to finish.
:param service: service to run and wait for.
:returns: None
"""
service.start()
service.wait()
def launch_service(self, service):
"""Load and start the given service.
@ -73,7 +86,7 @@ class Launcher(object):
"""
service.backdoor_port = self.backdoor_port
self._services.add_thread(self.run_service, service)
self.services.add(service)
def stop(self):
"""Stop all services which are currently running.
@ -81,7 +94,7 @@ class Launcher(object):
:returns: None
"""
self._services.stop()
self.services.stop()
def wait(self):
"""Waits until all services have been stopped, and then returns.
@ -89,7 +102,16 @@ class Launcher(object):
:returns: None
"""
self._services.wait()
self.services.wait()
def restart(self):
"""Reload config files and restart service.
:returns: None
"""
cfg.CONF.reload_config_files()
self.services.restart()
class SignalExit(SystemExit):
@ -101,33 +123,46 @@ class SignalExit(SystemExit):
class ServiceLauncher(Launcher):
def _handle_signal(self, signo, frame):
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
_set_signals_handler(signal.SIG_DFL)
raise SignalExit(signo)
def wait(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
def handle_signal(self):
_set_signals_handler(self._handle_signal)
def _wait_for_exit_or_signal(self):
status = None
signo = 0
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
status = None
try:
super(ServiceLauncher, self).wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo]
signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
finally:
if rpc:
rpc.cleanup()
self.stop()
return status
if rpc:
try:
rpc.cleanup()
except Exception:
# We're shutting down, so it doesn't matter at this point.
LOG.exception(_('Exception during rpc cleanup.'))
return status, signo
def wait(self):
while True:
self.handle_signal()
status, signo = self._wait_for_exit_or_signal()
if not _is_sighup(signo):
return status
self.restart()
class ServiceWrapper(object):
@ -145,17 +180,17 @@ class ProcessLauncher(object):
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
self.handle_signal()
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
def handle_signal(self):
_set_signals_handler(self._handle_signal)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
self.running = False
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
_set_signals_handler(signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
@ -166,16 +201,49 @@ class ProcessLauncher(object):
sys.exit(1)
def _child_process(self, service):
def _child_process_handle_signal(self):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
def _sighup(*args):
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signal.SIGHUP)
signal.signal(signal.SIGTERM, _sigterm)
if _sighup_supported():
signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _child_wait_for_exit_or_signal(self, launcher):
status = None
signo = 0
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
try:
launcher.wait()
except SignalExit as exc:
signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
launcher.stop()
return status, signo
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
@ -189,7 +257,8 @@ class ProcessLauncher(object):
random.seed()
launcher = Launcher()
launcher.run_service(service)
launcher.launch_service(service)
return launcher
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
@ -207,24 +276,13 @@ class ProcessLauncher(object):
pid = os.fork()
if pid == 0:
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
status = 0
try:
self._child_process(wrap.service)
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
wrap.service.stop()
launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(launcher)
if not _is_sighup(signo):
break
launcher.restart()
os._exit(status)
@ -270,12 +328,7 @@ class ProcessLauncher(object):
wrap.children.remove(pid)
return wrap
def wait(self):
"""Loop waiting on children to die and respawning as necessary"""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
def _respawn_children(self):
while self.running:
wrap = self._wait_child()
if not wrap:
@ -284,14 +337,28 @@ class ProcessLauncher(object):
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
if self.sigcaught:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[self.sigcaught]
LOG.info(_('Caught %s, stopping children'), signame)
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
while True:
self.handle_signal()
self._respawn_children()
if self.sigcaught:
signame = _signo_to_signame(self.sigcaught)
LOG.info(_('Caught %s, stopping children'), signame)
if not _is_sighup(self.sigcaught):
break
for pid in self.children:
os.kill(pid, signal.SIGHUP)
self.running = True
self.sigcaught = None
for pid in self.children:
try:
@ -313,15 +380,74 @@ class Service(object):
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
# signal that the service is done shutting itself down:
self._done = event.Event()
def reset(self):
# NOTE(Fengqian): docs for Event.reset() recommend against using it
self._done = event.Event()
def start(self):
pass
def stop(self):
self.tg.stop()
self.tg.wait()
# Signal that service cleanup is done:
if not self._done.ready():
self._done.send()
def wait(self):
self._done.wait()
class Services(object):
def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()
def add(self, service):
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
def stop(self):
# wait for graceful shutdown of services:
for service in self.services:
service.stop()
service.wait()
# Each service has performed cleanup, now signal that the run_service
# wrapper threads can now die:
if not self.done.ready():
self.done.send()
# reap threads:
self.tg.stop()
def wait(self):
self.tg.wait()
def restart(self):
self.stop()
self.done = event.Event()
for restart_service in self.services:
restart_service.reset()
self.tg.add_thread(self.run_service, restart_service, self.done)
@staticmethod
def run_service(service, done):
"""Service start wrapper.
:param service: service to run
:param done: event to wait on until a shutdown is triggered
:returns: None
"""
service.start()
done.wait()
def launch(service, workers=None):
if workers:

View File

@ -35,6 +35,8 @@ from nova import test
from nova.tests import utils
from nova import wsgi
from nova.openstack.common import service as _service
test_service_opts = [
cfg.StrOpt("fake_manager",
default="nova.tests.test_service.FakeManager",
@ -173,6 +175,41 @@ class ServiceTestCase(test.TestCase):
'nova.tests.test_service.FakeManager')
serv.start()
def test_parent_graceful_shutdown(self):
self.manager_mock = self.mox.CreateMock(FakeManager)
self.mox.StubOutWithMock(sys.modules[__name__],
'FakeManager', use_mock_anything=True)
self.mox.StubOutWithMock(self.manager_mock, 'init_host')
self.mox.StubOutWithMock(self.manager_mock, 'pre_start_hook')
self.mox.StubOutWithMock(self.manager_mock, 'create_rpc_dispatcher')
self.mox.StubOutWithMock(self.manager_mock, 'post_start_hook')
self.mox.StubOutWithMock(_service.Service, 'stop')
FakeManager(host=self.host).AndReturn(self.manager_mock)
# init_host is called before any service record is created
self.manager_mock.init_host()
self._service_start_mocks()
# pre_start_hook is called after service record is created,
# but before RPC consumer is created
self.manager_mock.pre_start_hook()
self.manager_mock.create_rpc_dispatcher(None)
# post_start_hook is called after RPC consumer is created.
self.manager_mock.post_start_hook()
_service.Service.stop()
self.mox.ReplayAll()
serv = service.Service(self.host,
self.binary,
self.topic,
'nova.tests.test_service.FakeManager')
serv.start()
serv.stop()
class TestWSGIService(test.TestCase):

View File

@ -223,7 +223,8 @@ class Server(object):
"""
try:
self._server.wait()
if self._server is not None:
self._server.wait()
except greenlet.GreenletExit:
LOG.info(_("WSGI server has stopped."))