Merge "Windows multiprocess wsgi"

This commit is contained in:
Zuul 2019-03-14 15:25:08 +00:00 committed by Gerrit Code Review
commit ac15dc608a
3 changed files with 293 additions and 126 deletions

View File

@ -62,6 +62,7 @@ from glance import notifier
CONF = cfg.CONF
CONF.import_group("profiler", "glance.common.wsgi")
logging.register_options(CONF)
wsgi.register_cli_opts()
# NOTE(rosmaita): Any new exceptions added should preserve the current
# error codes for backward compatibility. The value 99 is returned

View File

@ -60,6 +60,7 @@ from glance import notifier
CONF = cfg.CONF
CONF.import_group("profiler", "glance.common.wsgi")
logging.register_options(CONF)
wsgi.register_cli_opts()
def main():

View File

@ -21,10 +21,14 @@ Utility methods for working with WSGI servers
"""
from __future__ import print_function
import abc
import errno
import functools
import os
import re
import signal
import struct
import subprocess
import sys
import time
@ -33,6 +37,7 @@ from eventlet.green import ssl
import eventlet.greenio
import eventlet.wsgi
import glance_store
from os_win import utilsfactory as os_win_utilsfactory
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
@ -318,6 +323,12 @@ store_opts = [
'using comma.')),
]
cli_opts = [
cfg.StrOpt('pipe-handle',
help='This argument is used internally on Windows. Glance '
'passes a pipe handle to child processes, which is then '
'used for inter-process communication.'),
]
LOG = logging.getLogger(__name__)
@ -340,8 +351,17 @@ except ImportError:
uwsgi = None
def register_cli_opts():
CONF.register_cli_opts(cli_opts)
def get_num_workers():
"""Return the configured number of workers."""
# Windows only: we're already running on the worker side.
if os.name == 'nt' and getattr(CONF, 'pipe_handle', None):
return 0
if CONF.workers is None:
# None implies the number of CPUs limited to 8
# See Launchpad bug #1748916 and the config help text
@ -475,7 +495,8 @@ def get_asynchronous_eventlet_pool(size=1000):
return pool
class Server(object):
@six.add_metaclass(abc.ABCMeta)
class BaseServer(object):
"""Server class to manage multiple WSGI sockets and applications.
This class requires initialize_glance_store set to True if
@ -491,24 +512,6 @@ class Server(object):
# NOTE(abhishek): Allows us to only re-initialize glance_store when
# the API's configuration reloads.
self.initialize_glance_store = initialize_glance_store
self.pgid = os.getpid()
try:
# NOTE(flaper87): Make sure this process
# runs in its own process group.
# NOTE(lpetrut): This isn't available on Windows, so we're going
# to use job objects instead.
os.setpgid(self.pgid, self.pgid)
except (OSError, AttributeError):
# NOTE(flaper87): When running glance-control,
# (glance's functional tests, for example)
# setpgid fails with EPERM as glance-control
# creates a fresh session, of which the newly
# launched service becomes the leader (session
# leaders may not change process groups)
#
# Running glance-(api|registry) is safe and
# shouldn't raise any error here.
self.pgid = 0
@staticmethod
def set_signal_handler(signal_name, handler):
@ -524,13 +527,20 @@ class Server(object):
self.set_signal_handler("SIGHUP", signal.SIG_IGN)
raise exception.SIGHUPInterrupt
@abc.abstractmethod
def kill_children(self, *args):
"""Kills the entire process group."""
self.set_signal_handler("SIGTERM", signal.SIG_IGN)
self.set_signal_handler("SIGINT", signal.SIG_IGN)
self.set_signal_handler("SIGCHLD", signal.SIG_IGN)
self.running = False
os.killpg(self.pgid, signal.SIGTERM)
pass
@abc.abstractmethod
def wait_on_children(self):
pass
@abc.abstractmethod
def run_child(self):
pass
def reload(self):
raise NotImplementedError()
def start(self, application, default_port):
"""
@ -562,50 +572,6 @@ class Server(object):
def create_pool(self):
return get_asynchronous_eventlet_pool(size=self.threads)
def _remove_children(self, pid):
if pid in self.children:
self.children.remove(pid)
LOG.info(_LI('Removed dead child %s'), pid)
elif pid in self.stale_children:
self.stale_children.remove(pid)
LOG.info(_LI('Removed stale child %s'), pid)
else:
LOG.warn(_LW('Unrecognised child %s') % pid)
def _verify_and_respawn_children(self, pid, status):
if len(self.stale_children) == 0:
LOG.debug('No stale children')
if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
LOG.error(_LE('Not respawning child %d, cannot '
'recover from termination') % pid)
if not self.children and not self.stale_children:
LOG.info(
_LI('All workers have terminated. Exiting'))
self.running = False
else:
if len(self.children) < get_num_workers():
self.run_child()
def wait_on_children(self):
while self.running:
try:
pid, status = os.wait()
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
self._remove_children(pid)
self._verify_and_respawn_children(pid, status)
except OSError as err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
except KeyboardInterrupt:
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
break
except exception.SIGHUPInterrupt:
self.reload()
continue
eventlet.greenio.shutdown_safe(self.sock)
self.sock.close()
LOG.debug('Exited')
def configure(self, old_conf=None, has_changed=None):
"""
Apply configuration settings
@ -622,35 +588,6 @@ class Server(object):
else:
initialize_glance_store()
def reload(self):
"""
Reload and re-apply configuration settings
Existing child processes are sent a SIGHUP signal
and will exit after completing existing requests.
New child processes, which will have the updated
configuration, are spawned. This allows preventing
interruption to the service.
"""
def _has_changed(old, new, param):
old = old.get(param)
new = getattr(new, param)
return (new != old)
old_conf = utils.stash_conf_values()
has_changed = functools.partial(_has_changed, old_conf, CONF)
CONF.reload_config_files()
os.killpg(self.pgid, signal.SIGHUP)
self.stale_children = self.children
self.children = set()
# Ensure any logging config changes are picked up
logging.setup(CONF, 'glance')
config.set_config_defaults()
self.configure(old_conf, has_changed)
self.start_wsgi()
def wait(self):
"""Wait until all servers have completed running."""
try:
@ -661,34 +598,6 @@ class Server(object):
except KeyboardInterrupt:
pass
def run_child(self):
def child_hup(*args):
"""Shuts down child processes, existing requests are handled."""
self.set_signal_handler("SIGHUP", signal.SIG_IGN)
eventlet.wsgi.is_accepting = False
self.sock.close()
pid = os.fork()
if pid == 0:
self.set_signal_handler("SIGHUP", child_hup)
self.set_signal_handler("SIGTERM", signal.SIG_DFL)
# ignore the interrupt signal to avoid a race whereby
# a child worker receives the signal before the parent
# and is respawned unnecessarily as a result
self.set_signal_handler("SIGINT", signal.SIG_IGN)
# The child has no need to stash the unwrapped
# socket, and the reference prevents a clean
# exit on sighup
self._sock = None
self.run_server()
LOG.info(_LI('Child %d exiting normally'), os.getpid())
# self.pool.waitall() is now called in wsgi's server so
# it's safe to exit here
sys.exit(0)
else:
LOG.info(_LI('Started child %s'), pid)
self.children.add(pid)
def run_server(self):
"""Run a WSGI server."""
if cfg.CONF.pydev_worker_debug_host:
@ -796,6 +705,262 @@ class Server(object):
self.sock.listen(CONF.backlog)
class PosixServer(BaseServer):
def __init__(self, *args, **kwargs):
super(PosixServer, self).__init__(*args, **kwargs)
self.pgid = os.getpid()
try:
# NOTE(flaper87): Make sure this process
# runs in its own process group.
os.setpgid(self.pgid, self.pgid)
except OSError:
# NOTE(flaper87): When running glance-control,
# (glance's functional tests, for example)
# setpgid fails with EPERM as glance-control
# creates a fresh session, of which the newly
# launched service becomes the leader (session
# leaders may not change process groups)
#
# Running glance-(api|registry) is safe and
# shouldn't raise any error here.
self.pgid = 0
def kill_children(self, *args):
"""Kills the entire process group."""
self.set_signal_handler("SIGTERM", signal.SIG_IGN)
self.set_signal_handler("SIGINT", signal.SIG_IGN)
self.set_signal_handler("SIGCHLD", signal.SIG_IGN)
self.running = False
os.killpg(self.pgid, signal.SIGTERM)
def _remove_children(self, pid):
if pid in self.children:
self.children.remove(pid)
LOG.info(_LI('Removed dead child %s'), pid)
elif pid in self.stale_children:
self.stale_children.remove(pid)
LOG.info(_LI('Removed stale child %s'), pid)
else:
LOG.warn(_LW('Unrecognised child %s') % pid)
def _verify_and_respawn_children(self, pid, status):
if len(self.stale_children) == 0:
LOG.debug('No stale children')
if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
LOG.error(_LE('Not respawning child %d, cannot '
'recover from termination') % pid)
if not self.children and not self.stale_children:
LOG.info(
_LI('All workers have terminated. Exiting'))
self.running = False
else:
if len(self.children) < get_num_workers():
self.run_child()
def wait_on_children(self):
while self.running:
try:
pid, status = os.wait()
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
self._remove_children(pid)
self._verify_and_respawn_children(pid, status)
except OSError as err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
except KeyboardInterrupt:
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
break
except exception.SIGHUPInterrupt:
self.reload()
continue
eventlet.greenio.shutdown_safe(self.sock)
self.sock.close()
LOG.debug('Exited')
def run_child(self):
def child_hup(*args):
"""Shuts down child processes, existing requests are handled."""
self.set_signal_handler("SIGHUP", signal.SIG_IGN)
eventlet.wsgi.is_accepting = False
self.sock.close()
pid = os.fork()
if pid == 0:
self.set_signal_handler("SIGHUP", child_hup)
self.set_signal_handler("SIGTERM", signal.SIG_DFL)
# ignore the interrupt signal to avoid a race whereby
# a child worker receives the signal before the parent
# and is respawned unnecessarily as a result
self.set_signal_handler("SIGINT", signal.SIG_IGN)
# The child has no need to stash the unwrapped
# socket, and the reference prevents a clean
# exit on sighup
self._sock = None
self.run_server()
LOG.info(_LI('Child %d exiting normally'), os.getpid())
# self.pool.waitall() is now called in wsgi's server so
# it's safe to exit here
sys.exit(0)
else:
LOG.info(_LI('Started child %s'), pid)
self.children.add(pid)
def reload(self):
"""
Reload and re-apply configuration settings
Existing child processes are sent a SIGHUP signal
and will exit after completing existing requests.
New child processes, which will have the updated
configuration, are spawned. This allows preventing
interruption to the service.
"""
def _has_changed(old, new, param):
old = old.get(param)
new = getattr(new, param)
return (new != old)
old_conf = utils.stash_conf_values()
has_changed = functools.partial(_has_changed, old_conf, CONF)
CONF.reload_config_files()
os.killpg(self.pgid, signal.SIGHUP)
self.stale_children = self.children
self.children = set()
# Ensure any logging config changes are picked up
logging.setup(CONF, 'glance')
config.set_config_defaults()
self.configure(old_conf, has_changed)
self.start_wsgi()
class Win32ProcessLauncher(object):
def __init__(self):
self._processutils = os_win_utilsfactory.get_processutils()
self._workers = []
self._worker_job_handles = []
def add_process(self, cmd):
LOG.info("Starting subprocess: %s", cmd)
worker = subprocess.Popen(cmd, close_fds=False)
try:
job_handle = self._processutils.kill_process_on_job_close(
worker.pid)
except Exception:
LOG.exception("Could not associate child process "
"with a job, killing it.")
worker.kill()
raise
self._worker_job_handles.append(job_handle)
self._workers.append(worker)
return worker
def wait(self):
pids = [worker.pid for worker in self._workers]
if pids:
self._processutils.wait_for_multiple_processes(pids,
wait_all=True)
# By sleeping here, we allow signal handlers to be executed.
time.sleep(0)
class Win32Server(BaseServer):
_py_script_re = re.compile(r'.*\.py\w?$')
_sock = None
def __init__(self, *args, **kwargs):
super(Win32Server, self).__init__(*args, **kwargs)
self._launcher = Win32ProcessLauncher()
self._ioutils = os_win_utilsfactory.get_ioutils()
def run_child(self):
# We're passing copies of the socket through pipes.
rfd, wfd = self._ioutils.create_pipe(inherit_handle=True)
cmd = sys.argv + ['--pipe-handle=%s' % int(rfd)]
# Recent setuptools versions will trim '-script.py' and '.exe'
# extensions from sys.argv[0].
if self._py_script_re.match(sys.argv[0]):
cmd = [sys.executable] + cmd
worker = self._launcher.add_process(cmd)
self._ioutils.close_handle(rfd)
share_sock_buff = self._sock.share(worker.pid)
self._ioutils.write_file(
wfd,
struct.pack('<I', len(share_sock_buff)),
4)
self._ioutils.write_file(
wfd, share_sock_buff, len(share_sock_buff))
self.children.add(worker.pid)
def kill_children(self, *args):
# We're using job objects, the children will exit along with the
# main process.
exit(0)
def wait_on_children(self):
self._launcher.wait()
def _get_sock_from_parent(self):
# This is supposed to be called exactly once in the child process.
# We're passing a copy of the socket through a pipe.
pipe_handle = int(getattr(CONF, 'pipe_handle', 0))
if not pipe_handle:
err_msg = _("Did not receive a pipe handle, which is used when "
"communicating with the parent process.")
raise exception.GlanceException(err_msg)
# Get the length of the data to be received.
buff = self._ioutils.get_buffer(4)
self._ioutils.read_file(pipe_handle, buff, 4)
socket_buff_sz = struct.unpack('<I', buff)[0]
# Get the serialized socket object.
socket_buff = self._ioutils.get_buffer(socket_buff_sz)
self._ioutils.read_file(pipe_handle, socket_buff, socket_buff_sz)
self._ioutils.close_handle(pipe_handle)
# Recreate the socket object. This will only work with
# Python 3.6 or later.
return socket.fromshare(bytes(socket_buff[:]))
def configure_socket(self, old_conf=None, has_changed=None):
fresh_start = not (old_conf or has_changed)
use_ssl = CONF.cert_file or CONF.key_file
pipe_handle = getattr(CONF, 'pipe_handle', None)
if not (fresh_start and pipe_handle):
return super(Win32Server, self).configure_socket(
old_conf, has_changed)
self._sock = self._get_sock_from_parent()
if use_ssl:
self.sock = ssl_wrap_socket(self._sock)
else:
self.sock = self._sock
if hasattr(socket, 'TCP_KEEPIDLE'):
# This was introduced in WS 2016 RS3
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
CONF.tcp_keepidle)
if os.name == 'nt':
Server = Win32Server
else:
Server = PosixServer
class Middleware(object):
"""
Base WSGI middleware wrapper. These classes require an application to be