Graceful shutdown WSGI/RPC server
Currently, termination of WSGI application or RPC server immediately stops service and so interrupts in-progress request. Graceful handler for SIGTERM signal was added. SIGINT signal handler was removed to allow instantaneous termination of service. DocImpact: graceful termination of process can be done by sending SIGTERM signal to parent WSGI process. Graceful termination is not instantaneous. To force instantaneous termination SIGINT signal must be sent. (cherry picked from commit 13ce823686062d70b268b4d3888849adef07e4ff) Additional fix: oslo.service provides this facility through it's wait() method, so we need to call it in Cinder Service wait() method which gets called when graceful shutdown is requested by sending the process SIGTERM. Change-Id: Icaedec63c0df0255c6842b688c6d83a496f142b8 Closes-Bug: 1464822
This commit is contained in:
parent
6c055943e7
commit
70c6a35684
|
@ -89,7 +89,6 @@ def _signo_to_signame(signo):
|
|||
|
||||
def _set_signals_handler(handler):
|
||||
signal.signal(signal.SIGTERM, handler)
|
||||
signal.signal(signal.SIGINT, handler)
|
||||
if _sighup_supported():
|
||||
signal.signal(signal.SIGHUP, handler)
|
||||
|
||||
|
@ -209,6 +208,7 @@ class ProcessLauncher(object):
|
|||
self.sigcaught = None
|
||||
self.running = True
|
||||
self.wait_interval = wait_interval
|
||||
self.launcher = None
|
||||
rfd, self.writepipe = os.pipe()
|
||||
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
|
||||
self.handle_signal()
|
||||
|
@ -230,20 +230,26 @@ class ProcessLauncher(object):
|
|||
|
||||
LOG.info(_LI('Parent process has died unexpectedly, exiting'))
|
||||
|
||||
if self.launcher:
|
||||
self.launcher.stop()
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
def _child_process_handle_signal(self):
|
||||
# Setup child signal handlers differently
|
||||
|
||||
def _sigterm(*args):
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
self.launcher.stop()
|
||||
|
||||
def _sighup(*args):
|
||||
signal.signal(signal.SIGHUP, signal.SIG_DFL)
|
||||
raise SignalExit(signal.SIGHUP)
|
||||
|
||||
# Parent signals with SIGTERM when it wants us to go away.
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGTERM, _sigterm)
|
||||
if _sighup_supported():
|
||||
signal.signal(signal.SIGHUP, _sighup)
|
||||
# Block SIGINT and let the parent send us a SIGTERM
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
|
||||
def _child_wait_for_exit_or_signal(self, launcher):
|
||||
status = 0
|
||||
|
@ -264,8 +270,6 @@ class ProcessLauncher(object):
|
|||
except BaseException:
|
||||
LOG.exception(_LE('Unhandled exception'))
|
||||
status = 2
|
||||
finally:
|
||||
launcher.stop()
|
||||
|
||||
return status, signo
|
||||
|
||||
|
@ -304,13 +308,15 @@ class ProcessLauncher(object):
|
|||
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
launcher = self._child_process(wrap.service)
|
||||
self.launcher = self._child_process(wrap.service)
|
||||
while True:
|
||||
self._child_process_handle_signal()
|
||||
status, signo = self._child_wait_for_exit_or_signal(launcher)
|
||||
status, signo = self._child_wait_for_exit_or_signal(
|
||||
self.launcher)
|
||||
if not _is_sighup_and_daemon(signo):
|
||||
self.launcher.wait()
|
||||
break
|
||||
launcher.restart()
|
||||
self.launcher.restart()
|
||||
|
||||
os._exit(status)
|
||||
|
||||
|
@ -400,6 +406,13 @@ class ProcessLauncher(object):
|
|||
def stop(self):
|
||||
"""Terminate child processes and wait on each."""
|
||||
self.running = False
|
||||
|
||||
LOG.debug("Stop services.")
|
||||
for service in set(
|
||||
[wrap.service for wrap in self.children.values()]):
|
||||
service.stop()
|
||||
|
||||
LOG.debug("Killing children.")
|
||||
for pid in self.children:
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
|
@ -456,7 +469,6 @@ class Services(object):
|
|||
# wait for graceful shutdown of services:
|
||||
for service in self.services:
|
||||
service.stop()
|
||||
service.wait()
|
||||
|
||||
# Each service has performed cleanup, now signal that the run_service
|
||||
# wrapper threads can now die:
|
||||
|
@ -467,6 +479,8 @@ class Services(object):
|
|||
self.tg.stop()
|
||||
|
||||
def wait(self):
|
||||
for service in self.services:
|
||||
service.wait()
|
||||
self.tg.wait()
|
||||
|
||||
def restart(self):
|
||||
|
|
|
@ -135,6 +135,7 @@ class Service(service.Service):
|
|||
self.timers = []
|
||||
|
||||
setup_profiler(binary, host)
|
||||
self.rpcserver = None
|
||||
|
||||
def start(self):
|
||||
version_string = version.version_string()
|
||||
|
@ -279,6 +280,8 @@ class Service(service.Service):
|
|||
x.wait()
|
||||
except Exception:
|
||||
pass
|
||||
if self.rpcserver:
|
||||
self.rpcserver.wait()
|
||||
|
||||
def periodic_tasks(self, raise_on_error=False):
|
||||
"""Tasks to be run at a periodic interval."""
|
||||
|
|
|
@ -30,6 +30,7 @@ from cinder import context
|
|||
from cinder import db
|
||||
from cinder import exception
|
||||
from cinder import manager
|
||||
from cinder import rpc
|
||||
from cinder import service
|
||||
from cinder import test
|
||||
from cinder import wsgi
|
||||
|
@ -116,37 +117,36 @@ class ServiceTestCase(test.TestCase):
|
|||
def setUp(self):
|
||||
super(ServiceTestCase, self).setUp()
|
||||
self.mox.StubOutWithMock(service, 'db')
|
||||
self.host = 'foo'
|
||||
self.binary = 'cinder-fake'
|
||||
self.topic = 'fake'
|
||||
|
||||
def test_create(self):
|
||||
host = 'foo'
|
||||
binary = 'cinder-fake'
|
||||
topic = 'fake'
|
||||
|
||||
# NOTE(vish): Create was moved out of mox replay to make sure that
|
||||
# the looping calls are created in StartService.
|
||||
app = service.Service.create(host=host, binary=binary, topic=topic)
|
||||
app = service.Service.create(host=self.host,
|
||||
binary=self.binary,
|
||||
topic=self.topic)
|
||||
|
||||
self.assertTrue(app)
|
||||
|
||||
def test_report_state_newly_disconnected(self):
|
||||
host = 'foo'
|
||||
binary = 'bar'
|
||||
topic = 'test'
|
||||
service_create = {'host': host,
|
||||
'binary': binary,
|
||||
'topic': topic,
|
||||
service_create = {'host': self.host,
|
||||
'binary': self.binary,
|
||||
'topic': self.topic,
|
||||
'report_count': 0,
|
||||
'availability_zone': 'nova'}
|
||||
service_ref = {'host': host,
|
||||
'binary': binary,
|
||||
'topic': topic,
|
||||
service_ref = {'host': self.host,
|
||||
'binary': self.binary,
|
||||
'topic': self.topic,
|
||||
'report_count': 0,
|
||||
'availability_zone': 'nova',
|
||||
'id': 1}
|
||||
|
||||
service.db.service_get_by_args(mox.IgnoreArg(),
|
||||
host,
|
||||
binary).AndRaise(exception.NotFound())
|
||||
self.host,
|
||||
self.binary).AndRaise(
|
||||
exception.NotFound())
|
||||
service.db.service_create(mox.IgnoreArg(),
|
||||
service_create).AndReturn(service_ref)
|
||||
service.db.service_get(
|
||||
|
@ -154,9 +154,9 @@ class ServiceTestCase(test.TestCase):
|
|||
mox.IgnoreArg()).AndRaise(db_exc.DBConnectionError())
|
||||
|
||||
self.mox.ReplayAll()
|
||||
serv = service.Service(host,
|
||||
binary,
|
||||
topic,
|
||||
serv = service.Service(self.host,
|
||||
self.binary,
|
||||
self.topic,
|
||||
'cinder.tests.test_service.FakeManager')
|
||||
serv.start()
|
||||
serv.report_state()
|
||||
|
@ -187,24 +187,22 @@ class ServiceTestCase(test.TestCase):
|
|||
self.assertFalse(mock_db.service_update.called)
|
||||
|
||||
def test_report_state_newly_connected(self):
|
||||
host = 'foo'
|
||||
binary = 'bar'
|
||||
topic = 'test'
|
||||
service_create = {'host': host,
|
||||
'binary': binary,
|
||||
'topic': topic,
|
||||
service_create = {'host': self.host,
|
||||
'binary': self.binary,
|
||||
'topic': self.topic,
|
||||
'report_count': 0,
|
||||
'availability_zone': 'nova'}
|
||||
service_ref = {'host': host,
|
||||
'binary': binary,
|
||||
'topic': topic,
|
||||
service_ref = {'host': self.host,
|
||||
'binary': self.binary,
|
||||
'topic': self.topic,
|
||||
'report_count': 0,
|
||||
'availability_zone': 'nova',
|
||||
'id': 1}
|
||||
|
||||
service.db.service_get_by_args(mox.IgnoreArg(),
|
||||
host,
|
||||
binary).AndRaise(exception.NotFound())
|
||||
self.host,
|
||||
self.binary).AndRaise(
|
||||
exception.NotFound())
|
||||
service.db.service_create(mox.IgnoreArg(),
|
||||
service_create).AndReturn(service_ref)
|
||||
service.db.service_get(mox.IgnoreArg(),
|
||||
|
@ -213,9 +211,9 @@ class ServiceTestCase(test.TestCase):
|
|||
mox.ContainsKeyValue('report_count', 1))
|
||||
|
||||
self.mox.ReplayAll()
|
||||
serv = service.Service(host,
|
||||
binary,
|
||||
topic,
|
||||
serv = service.Service(self.host,
|
||||
self.binary,
|
||||
self.topic,
|
||||
'cinder.tests.test_service.FakeManager')
|
||||
serv.start()
|
||||
serv.model_disconnected = True
|
||||
|
@ -230,6 +228,22 @@ class ServiceTestCase(test.TestCase):
|
|||
manager="cinder.tests.test_service.FakeManager")
|
||||
self.assertEqual(25, CONF.service_down_time)
|
||||
|
||||
@mock.patch.object(rpc, 'get_server')
|
||||
@mock.patch.object(service, 'db')
|
||||
def test_service_stop_waits_for_rpcserver(self, mock_db, mock_rpc):
|
||||
serv = service.Service(
|
||||
self.host,
|
||||
self.binary,
|
||||
self.topic,
|
||||
'cinder.tests.test_service.FakeManager'
|
||||
)
|
||||
serv.start()
|
||||
serv.stop()
|
||||
serv.wait()
|
||||
serv.rpcserver.start.assert_called_once_with()
|
||||
serv.rpcserver.stop.assert_called_once_with()
|
||||
serv.rpcserver.wait.assert_called_once_with()
|
||||
|
||||
|
||||
class TestWSGIService(test.TestCase):
|
||||
|
||||
|
|
Loading…
Reference in New Issue