Graceful shutdown WSGI/RPC server

Currently, termination of WSGI application or RPC server immediately stops
service and so interrupts in-progress request.

Graceful handler for SIGTERM signal was added.
SIGINT signal handler was removed to allow instantaneous termination of
service.

DocImpact: graceful termination of process can be done by sending SIGTERM
signal to parent WSGI process. Graceful termination is not instantaneous.
To force instantaneous termination SIGINT signal must be sent.

(cherry picked from commit 13ce823686062d70b268b4d3888849adef07e4ff)
  Additional fix: oslo.service provides this facility through it's wait()
                  method, so we need to call it in Cinder Service wait()
                  method which gets called when graceful shutdown is
                  requested by sending the process SIGTERM.

Change-Id: Icaedec63c0df0255c6842b688c6d83a496f142b8
Closes-Bug: 1464822
This commit is contained in:
Mitsuhiro Tanino 2015-06-15 00:12:17 -04:00
parent 6c055943e7
commit 70c6a35684
3 changed files with 74 additions and 43 deletions

View File

@ -89,7 +89,6 @@ def _signo_to_signame(signo):
def _set_signals_handler(handler):
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
if _sighup_supported():
signal.signal(signal.SIGHUP, handler)
@ -209,6 +208,7 @@ class ProcessLauncher(object):
self.sigcaught = None
self.running = True
self.wait_interval = wait_interval
self.launcher = None
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
self.handle_signal()
@ -230,20 +230,26 @@ class ProcessLauncher(object):
LOG.info(_LI('Parent process has died unexpectedly, exiting'))
if self.launcher:
self.launcher.stop()
sys.exit(1)
def _child_process_handle_signal(self):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self.launcher.stop()
def _sighup(*args):
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signal.SIGHUP)
# Parent signals with SIGTERM when it wants us to go away.
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGTERM, _sigterm)
if _sighup_supported():
signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _child_wait_for_exit_or_signal(self, launcher):
status = 0
@ -264,8 +270,6 @@ class ProcessLauncher(object):
except BaseException:
LOG.exception(_LE('Unhandled exception'))
status = 2
finally:
launcher.stop()
return status, signo
@ -304,13 +308,15 @@ class ProcessLauncher(object):
pid = os.fork()
if pid == 0:
launcher = self._child_process(wrap.service)
self.launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(launcher)
status, signo = self._child_wait_for_exit_or_signal(
self.launcher)
if not _is_sighup_and_daemon(signo):
self.launcher.wait()
break
launcher.restart()
self.launcher.restart()
os._exit(status)
@ -400,6 +406,13 @@ class ProcessLauncher(object):
def stop(self):
"""Terminate child processes and wait on each."""
self.running = False
LOG.debug("Stop services.")
for service in set(
[wrap.service for wrap in self.children.values()]):
service.stop()
LOG.debug("Killing children.")
for pid in self.children:
try:
os.kill(pid, signal.SIGTERM)
@ -456,7 +469,6 @@ class Services(object):
# wait for graceful shutdown of services:
for service in self.services:
service.stop()
service.wait()
# Each service has performed cleanup, now signal that the run_service
# wrapper threads can now die:
@ -467,6 +479,8 @@ class Services(object):
self.tg.stop()
def wait(self):
for service in self.services:
service.wait()
self.tg.wait()
def restart(self):

View File

@ -135,6 +135,7 @@ class Service(service.Service):
self.timers = []
setup_profiler(binary, host)
self.rpcserver = None
def start(self):
version_string = version.version_string()
@ -279,6 +280,8 @@ class Service(service.Service):
x.wait()
except Exception:
pass
if self.rpcserver:
self.rpcserver.wait()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""

View File

@ -30,6 +30,7 @@ from cinder import context
from cinder import db
from cinder import exception
from cinder import manager
from cinder import rpc
from cinder import service
from cinder import test
from cinder import wsgi
@ -116,37 +117,36 @@ class ServiceTestCase(test.TestCase):
def setUp(self):
super(ServiceTestCase, self).setUp()
self.mox.StubOutWithMock(service, 'db')
self.host = 'foo'
self.binary = 'cinder-fake'
self.topic = 'fake'
def test_create(self):
host = 'foo'
binary = 'cinder-fake'
topic = 'fake'
# NOTE(vish): Create was moved out of mox replay to make sure that
# the looping calls are created in StartService.
app = service.Service.create(host=host, binary=binary, topic=topic)
app = service.Service.create(host=self.host,
binary=self.binary,
topic=self.topic)
self.assertTrue(app)
def test_report_state_newly_disconnected(self):
host = 'foo'
binary = 'bar'
topic = 'test'
service_create = {'host': host,
'binary': binary,
'topic': topic,
service_create = {'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0,
'availability_zone': 'nova'}
service_ref = {'host': host,
'binary': binary,
'topic': topic,
service_ref = {'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0,
'availability_zone': 'nova',
'id': 1}
service.db.service_get_by_args(mox.IgnoreArg(),
host,
binary).AndRaise(exception.NotFound())
self.host,
self.binary).AndRaise(
exception.NotFound())
service.db.service_create(mox.IgnoreArg(),
service_create).AndReturn(service_ref)
service.db.service_get(
@ -154,9 +154,9 @@ class ServiceTestCase(test.TestCase):
mox.IgnoreArg()).AndRaise(db_exc.DBConnectionError())
self.mox.ReplayAll()
serv = service.Service(host,
binary,
topic,
serv = service.Service(self.host,
self.binary,
self.topic,
'cinder.tests.test_service.FakeManager')
serv.start()
serv.report_state()
@ -187,24 +187,22 @@ class ServiceTestCase(test.TestCase):
self.assertFalse(mock_db.service_update.called)
def test_report_state_newly_connected(self):
host = 'foo'
binary = 'bar'
topic = 'test'
service_create = {'host': host,
'binary': binary,
'topic': topic,
service_create = {'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0,
'availability_zone': 'nova'}
service_ref = {'host': host,
'binary': binary,
'topic': topic,
service_ref = {'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0,
'availability_zone': 'nova',
'id': 1}
service.db.service_get_by_args(mox.IgnoreArg(),
host,
binary).AndRaise(exception.NotFound())
self.host,
self.binary).AndRaise(
exception.NotFound())
service.db.service_create(mox.IgnoreArg(),
service_create).AndReturn(service_ref)
service.db.service_get(mox.IgnoreArg(),
@ -213,9 +211,9 @@ class ServiceTestCase(test.TestCase):
mox.ContainsKeyValue('report_count', 1))
self.mox.ReplayAll()
serv = service.Service(host,
binary,
topic,
serv = service.Service(self.host,
self.binary,
self.topic,
'cinder.tests.test_service.FakeManager')
serv.start()
serv.model_disconnected = True
@ -230,6 +228,22 @@ class ServiceTestCase(test.TestCase):
manager="cinder.tests.test_service.FakeManager")
self.assertEqual(25, CONF.service_down_time)
@mock.patch.object(rpc, 'get_server')
@mock.patch.object(service, 'db')
def test_service_stop_waits_for_rpcserver(self, mock_db, mock_rpc):
serv = service.Service(
self.host,
self.binary,
self.topic,
'cinder.tests.test_service.FakeManager'
)
serv.start()
serv.stop()
serv.wait()
serv.rpcserver.start.assert_called_once_with()
serv.rpcserver.stop.assert_called_once_with()
serv.rpcserver.wait.assert_called_once_with()
class TestWSGIService(test.TestCase):