Windows multiprocess wsgi

This change allows glance-api to use multiple workers on Windows
by not using fork, which is unavailable. Instead, we're duplicating
sockets and passing the handles through pipes. Also, instead of
using process groups, we have to go with Windows job objects.

Note that this doesn't change the workflow for other platforms.

A subsequent change will allow the tests to run on Windows.

blueprint windows-support

Change-Id: Ic786199844e1d804962172286905036d93a4ed14
This commit is contained in:
Lucian Petrut 2019-01-09 15:26:14 +00:00
parent 5759ec0b1c
commit f0dc2454da
3 changed files with 293 additions and 126 deletions

View File

@ -62,6 +62,7 @@ from glance import notifier
CONF = cfg.CONF
CONF.import_group("profiler", "glance.common.wsgi")
logging.register_options(CONF)
wsgi.register_cli_opts()
# NOTE(rosmaita): Any new exceptions added should preserve the current
# error codes for backward compatibility. The value 99 is returned

View File

@ -60,6 +60,7 @@ from glance import notifier
CONF = cfg.CONF
CONF.import_group("profiler", "glance.common.wsgi")
logging.register_options(CONF)
wsgi.register_cli_opts()
def main():

View File

@ -21,10 +21,14 @@ Utility methods for working with WSGI servers
"""
from __future__ import print_function
import abc
import errno
import functools
import os
import re
import signal
import struct
import subprocess
import sys
import time
@ -33,6 +37,7 @@ from eventlet.green import ssl
import eventlet.greenio
import eventlet.wsgi
import glance_store
from os_win import utilsfactory as os_win_utilsfactory
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
@ -318,6 +323,12 @@ store_opts = [
'using comma.')),
]
cli_opts = [
cfg.StrOpt('pipe-handle',
help='This argument is used internally on Windows. Glance '
'passes a pipe handle to child processes, which is then '
'used for inter-process communication.'),
]
LOG = logging.getLogger(__name__)
@ -340,8 +351,17 @@ except ImportError:
uwsgi = None
def register_cli_opts():
CONF.register_cli_opts(cli_opts)
def get_num_workers():
"""Return the configured number of workers."""
# Windows only: we're already running on the worker side.
if os.name == 'nt' and getattr(CONF, 'pipe_handle', None):
return 0
if CONF.workers is None:
# None implies the number of CPUs limited to 8
# See Launchpad bug #1748916 and the config help text
@ -475,7 +495,8 @@ def get_asynchronous_eventlet_pool(size=1000):
return pool
class Server(object):
@six.add_metaclass(abc.ABCMeta)
class BaseServer(object):
"""Server class to manage multiple WSGI sockets and applications.
This class requires initialize_glance_store set to True if
@ -491,24 +512,6 @@ class Server(object):
# NOTE(abhishek): Allows us to only re-initialize glance_store when
# the API's configuration reloads.
self.initialize_glance_store = initialize_glance_store
self.pgid = os.getpid()
try:
# NOTE(flaper87): Make sure this process
# runs in its own process group.
# NOTE(lpetrut): This isn't available on Windows, so we're going
# to use job objects instead.
os.setpgid(self.pgid, self.pgid)
except (OSError, AttributeError):
# NOTE(flaper87): When running glance-control,
# (glance's functional tests, for example)
# setpgid fails with EPERM as glance-control
# creates a fresh session, of which the newly
# launched service becomes the leader (session
# leaders may not change process groups)
#
# Running glance-(api|registry) is safe and
# shouldn't raise any error here.
self.pgid = 0
@staticmethod
def set_signal_handler(signal_name, handler):
@ -524,13 +527,20 @@ class Server(object):
self.set_signal_handler("SIGHUP", signal.SIG_IGN)
raise exception.SIGHUPInterrupt
@abc.abstractmethod
def kill_children(self, *args):
"""Kills the entire process group."""
self.set_signal_handler("SIGTERM", signal.SIG_IGN)
self.set_signal_handler("SIGINT", signal.SIG_IGN)
self.set_signal_handler("SIGCHLD", signal.SIG_IGN)
self.running = False
os.killpg(self.pgid, signal.SIGTERM)
pass
@abc.abstractmethod
def wait_on_children(self):
pass
@abc.abstractmethod
def run_child(self):
pass
def reload(self):
raise NotImplementedError()
def start(self, application, default_port):
"""
@ -562,50 +572,6 @@ class Server(object):
def create_pool(self):
return get_asynchronous_eventlet_pool(size=self.threads)
def _remove_children(self, pid):
if pid in self.children:
self.children.remove(pid)
LOG.info(_LI('Removed dead child %s'), pid)
elif pid in self.stale_children:
self.stale_children.remove(pid)
LOG.info(_LI('Removed stale child %s'), pid)
else:
LOG.warn(_LW('Unrecognised child %s') % pid)
def _verify_and_respawn_children(self, pid, status):
if len(self.stale_children) == 0:
LOG.debug('No stale children')
if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
LOG.error(_LE('Not respawning child %d, cannot '
'recover from termination') % pid)
if not self.children and not self.stale_children:
LOG.info(
_LI('All workers have terminated. Exiting'))
self.running = False
else:
if len(self.children) < get_num_workers():
self.run_child()
def wait_on_children(self):
while self.running:
try:
pid, status = os.wait()
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
self._remove_children(pid)
self._verify_and_respawn_children(pid, status)
except OSError as err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
except KeyboardInterrupt:
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
break
except exception.SIGHUPInterrupt:
self.reload()
continue
eventlet.greenio.shutdown_safe(self.sock)
self.sock.close()
LOG.debug('Exited')
def configure(self, old_conf=None, has_changed=None):
"""
Apply configuration settings
@ -622,35 +588,6 @@ class Server(object):
else:
initialize_glance_store()
def reload(self):
"""
Reload and re-apply configuration settings
Existing child processes are sent a SIGHUP signal
and will exit after completing existing requests.
New child processes, which will have the updated
configuration, are spawned. This allows preventing
interruption to the service.
"""
def _has_changed(old, new, param):
old = old.get(param)
new = getattr(new, param)
return (new != old)
old_conf = utils.stash_conf_values()
has_changed = functools.partial(_has_changed, old_conf, CONF)
CONF.reload_config_files()
os.killpg(self.pgid, signal.SIGHUP)
self.stale_children = self.children
self.children = set()
# Ensure any logging config changes are picked up
logging.setup(CONF, 'glance')
config.set_config_defaults()
self.configure(old_conf, has_changed)
self.start_wsgi()
def wait(self):
"""Wait until all servers have completed running."""
try:
@ -661,34 +598,6 @@ class Server(object):
except KeyboardInterrupt:
pass
def run_child(self):
def child_hup(*args):
"""Shuts down child processes, existing requests are handled."""
self.set_signal_handler("SIGHUP", signal.SIG_IGN)
eventlet.wsgi.is_accepting = False
self.sock.close()
pid = os.fork()
if pid == 0:
self.set_signal_handler("SIGHUP", child_hup)
self.set_signal_handler("SIGTERM", signal.SIG_DFL)
# ignore the interrupt signal to avoid a race whereby
# a child worker receives the signal before the parent
# and is respawned unnecessarily as a result
self.set_signal_handler("SIGINT", signal.SIG_IGN)
# The child has no need to stash the unwrapped
# socket, and the reference prevents a clean
# exit on sighup
self._sock = None
self.run_server()
LOG.info(_LI('Child %d exiting normally'), os.getpid())
# self.pool.waitall() is now called in wsgi's server so
# it's safe to exit here
sys.exit(0)
else:
LOG.info(_LI('Started child %s'), pid)
self.children.add(pid)
def run_server(self):
"""Run a WSGI server."""
if cfg.CONF.pydev_worker_debug_host:
@ -796,6 +705,262 @@ class Server(object):
self.sock.listen(CONF.backlog)
class PosixServer(BaseServer):
def __init__(self, *args, **kwargs):
super(PosixServer, self).__init__(*args, **kwargs)
self.pgid = os.getpid()
try:
# NOTE(flaper87): Make sure this process
# runs in its own process group.
os.setpgid(self.pgid, self.pgid)
except OSError:
# NOTE(flaper87): When running glance-control,
# (glance's functional tests, for example)
# setpgid fails with EPERM as glance-control
# creates a fresh session, of which the newly
# launched service becomes the leader (session
# leaders may not change process groups)
#
# Running glance-(api|registry) is safe and
# shouldn't raise any error here.
self.pgid = 0
def kill_children(self, *args):
"""Kills the entire process group."""
self.set_signal_handler("SIGTERM", signal.SIG_IGN)
self.set_signal_handler("SIGINT", signal.SIG_IGN)
self.set_signal_handler("SIGCHLD", signal.SIG_IGN)
self.running = False
os.killpg(self.pgid, signal.SIGTERM)
def _remove_children(self, pid):
if pid in self.children:
self.children.remove(pid)
LOG.info(_LI('Removed dead child %s'), pid)
elif pid in self.stale_children:
self.stale_children.remove(pid)
LOG.info(_LI('Removed stale child %s'), pid)
else:
LOG.warn(_LW('Unrecognised child %s') % pid)
def _verify_and_respawn_children(self, pid, status):
if len(self.stale_children) == 0:
LOG.debug('No stale children')
if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
LOG.error(_LE('Not respawning child %d, cannot '
'recover from termination') % pid)
if not self.children and not self.stale_children:
LOG.info(
_LI('All workers have terminated. Exiting'))
self.running = False
else:
if len(self.children) < get_num_workers():
self.run_child()
def wait_on_children(self):
while self.running:
try:
pid, status = os.wait()
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
self._remove_children(pid)
self._verify_and_respawn_children(pid, status)
except OSError as err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
except KeyboardInterrupt:
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
break
except exception.SIGHUPInterrupt:
self.reload()
continue
eventlet.greenio.shutdown_safe(self.sock)
self.sock.close()
LOG.debug('Exited')
def run_child(self):
def child_hup(*args):
"""Shuts down child processes, existing requests are handled."""
self.set_signal_handler("SIGHUP", signal.SIG_IGN)
eventlet.wsgi.is_accepting = False
self.sock.close()
pid = os.fork()
if pid == 0:
self.set_signal_handler("SIGHUP", child_hup)
self.set_signal_handler("SIGTERM", signal.SIG_DFL)
# ignore the interrupt signal to avoid a race whereby
# a child worker receives the signal before the parent
# and is respawned unnecessarily as a result
self.set_signal_handler("SIGINT", signal.SIG_IGN)
# The child has no need to stash the unwrapped
# socket, and the reference prevents a clean
# exit on sighup
self._sock = None
self.run_server()
LOG.info(_LI('Child %d exiting normally'), os.getpid())
# self.pool.waitall() is now called in wsgi's server so
# it's safe to exit here
sys.exit(0)
else:
LOG.info(_LI('Started child %s'), pid)
self.children.add(pid)
def reload(self):
"""
Reload and re-apply configuration settings
Existing child processes are sent a SIGHUP signal
and will exit after completing existing requests.
New child processes, which will have the updated
configuration, are spawned. This allows preventing
interruption to the service.
"""
def _has_changed(old, new, param):
old = old.get(param)
new = getattr(new, param)
return (new != old)
old_conf = utils.stash_conf_values()
has_changed = functools.partial(_has_changed, old_conf, CONF)
CONF.reload_config_files()
os.killpg(self.pgid, signal.SIGHUP)
self.stale_children = self.children
self.children = set()
# Ensure any logging config changes are picked up
logging.setup(CONF, 'glance')
config.set_config_defaults()
self.configure(old_conf, has_changed)
self.start_wsgi()
class Win32ProcessLauncher(object):
def __init__(self):
self._processutils = os_win_utilsfactory.get_processutils()
self._workers = []
self._worker_job_handles = []
def add_process(self, cmd):
LOG.info("Starting subprocess: %s", cmd)
worker = subprocess.Popen(cmd, close_fds=False)
try:
job_handle = self._processutils.kill_process_on_job_close(
worker.pid)
except Exception:
LOG.exception("Could not associate child process "
"with a job, killing it.")
worker.kill()
raise
self._worker_job_handles.append(job_handle)
self._workers.append(worker)
return worker
def wait(self):
pids = [worker.pid for worker in self._workers]
if pids:
self._processutils.wait_for_multiple_processes(pids,
wait_all=True)
# By sleeping here, we allow signal handlers to be executed.
time.sleep(0)
class Win32Server(BaseServer):
_py_script_re = re.compile(r'.*\.py\w?$')
_sock = None
def __init__(self, *args, **kwargs):
super(Win32Server, self).__init__(*args, **kwargs)
self._launcher = Win32ProcessLauncher()
self._ioutils = os_win_utilsfactory.get_ioutils()
def run_child(self):
# We're passing copies of the socket through pipes.
rfd, wfd = self._ioutils.create_pipe(inherit_handle=True)
cmd = sys.argv + ['--pipe-handle=%s' % int(rfd)]
# Recent setuptools versions will trim '-script.py' and '.exe'
# extensions from sys.argv[0].
if self._py_script_re.match(sys.argv[0]):
cmd = [sys.executable] + cmd
worker = self._launcher.add_process(cmd)
self._ioutils.close_handle(rfd)
share_sock_buff = self._sock.share(worker.pid)
self._ioutils.write_file(
wfd,
struct.pack('<I', len(share_sock_buff)),
4)
self._ioutils.write_file(
wfd, share_sock_buff, len(share_sock_buff))
self.children.add(worker.pid)
def kill_children(self, *args):
# We're using job objects, the children will exit along with the
# main process.
exit(0)
def wait_on_children(self):
self._launcher.wait()
def _get_sock_from_parent(self):
# This is supposed to be called exactly once in the child process.
# We're passing a copy of the socket through a pipe.
pipe_handle = int(getattr(CONF, 'pipe_handle', 0))
if not pipe_handle:
err_msg = _("Did not receive a pipe handle, which is used when "
"communicating with the parent process.")
raise exception.GlanceException(err_msg)
# Get the length of the data to be received.
buff = self._ioutils.get_buffer(4)
self._ioutils.read_file(pipe_handle, buff, 4)
socket_buff_sz = struct.unpack('<I', buff)[0]
# Get the serialized socket object.
socket_buff = self._ioutils.get_buffer(socket_buff_sz)
self._ioutils.read_file(pipe_handle, socket_buff, socket_buff_sz)
self._ioutils.close_handle(pipe_handle)
# Recreate the socket object. This will only work with
# Python 3.6 or later.
return socket.fromshare(bytes(socket_buff[:]))
def configure_socket(self, old_conf=None, has_changed=None):
fresh_start = not (old_conf or has_changed)
use_ssl = CONF.cert_file or CONF.key_file
pipe_handle = getattr(CONF, 'pipe_handle', None)
if not (fresh_start and pipe_handle):
return super(Win32Server, self).configure_socket(
old_conf, has_changed)
self._sock = self._get_sock_from_parent()
if use_ssl:
self.sock = ssl_wrap_socket(self._sock)
else:
self.sock = self._sock
if hasattr(socket, 'TCP_KEEPIDLE'):
# This was introduced in WS 2016 RS3
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
CONF.tcp_keepidle)
if os.name == 'nt':
Server = Win32Server
else:
Server = PosixServer
class Middleware(object):
"""
Base WSGI middleware wrapper. These classes require an application to be