From f0dc2454dadeba04a17263d64574d786cdfcbc17 Mon Sep 17 00:00:00 2001 From: Lucian Petrut Date: Wed, 9 Jan 2019 15:26:14 +0000 Subject: [PATCH] Windows multiprocess wsgi This change allows glance-api to use multiple workers on Windows by not using fork, which is unavailable. Instead, we're duplicating sockets and passing the handles through pipes. Also, instead of using process groups, we have to go with Windows job objects. Note that this doesn't change the workflow for other platforms. A subsequent change will allow the tests to run on Windows. blueprint windows-support Change-Id: Ic786199844e1d804962172286905036d93a4ed14 --- glance/cmd/api.py | 1 + glance/cmd/registry.py | 1 + glance/common/wsgi.py | 417 ++++++++++++++++++++++++++++------------- 3 files changed, 293 insertions(+), 126 deletions(-) diff --git a/glance/cmd/api.py b/glance/cmd/api.py index 4c35b21bf4..1acd3bba80 100644 --- a/glance/cmd/api.py +++ b/glance/cmd/api.py @@ -62,6 +62,7 @@ from glance import notifier CONF = cfg.CONF CONF.import_group("profiler", "glance.common.wsgi") logging.register_options(CONF) +wsgi.register_cli_opts() # NOTE(rosmaita): Any new exceptions added should preserve the current # error codes for backward compatibility. The value 99 is returned diff --git a/glance/cmd/registry.py b/glance/cmd/registry.py index c222e81e9c..85a13665ea 100644 --- a/glance/cmd/registry.py +++ b/glance/cmd/registry.py @@ -60,6 +60,7 @@ from glance import notifier CONF = cfg.CONF CONF.import_group("profiler", "glance.common.wsgi") logging.register_options(CONF) +wsgi.register_cli_opts() def main(): diff --git a/glance/common/wsgi.py b/glance/common/wsgi.py index dae1e2e5b7..103a09b765 100644 --- a/glance/common/wsgi.py +++ b/glance/common/wsgi.py @@ -21,10 +21,14 @@ Utility methods for working with WSGI servers """ from __future__ import print_function +import abc import errno import functools import os +import re import signal +import struct +import subprocess import sys import time @@ -33,6 +37,7 @@ from eventlet.green import ssl import eventlet.greenio import eventlet.wsgi import glance_store +from os_win import utilsfactory as os_win_utilsfactory from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import log as logging @@ -318,6 +323,12 @@ store_opts = [ 'using comma.')), ] +cli_opts = [ + cfg.StrOpt('pipe-handle', + help='This argument is used internally on Windows. Glance ' + 'passes a pipe handle to child processes, which is then ' + 'used for inter-process communication.'), +] LOG = logging.getLogger(__name__) @@ -340,8 +351,17 @@ except ImportError: uwsgi = None +def register_cli_opts(): + CONF.register_cli_opts(cli_opts) + + def get_num_workers(): """Return the configured number of workers.""" + + # Windows only: we're already running on the worker side. + if os.name == 'nt' and getattr(CONF, 'pipe_handle', None): + return 0 + if CONF.workers is None: # None implies the number of CPUs limited to 8 # See Launchpad bug #1748916 and the config help text @@ -475,7 +495,8 @@ def get_asynchronous_eventlet_pool(size=1000): return pool -class Server(object): +@six.add_metaclass(abc.ABCMeta) +class BaseServer(object): """Server class to manage multiple WSGI sockets and applications. This class requires initialize_glance_store set to True if @@ -491,24 +512,6 @@ class Server(object): # NOTE(abhishek): Allows us to only re-initialize glance_store when # the API's configuration reloads. self.initialize_glance_store = initialize_glance_store - self.pgid = os.getpid() - try: - # NOTE(flaper87): Make sure this process - # runs in its own process group. - # NOTE(lpetrut): This isn't available on Windows, so we're going - # to use job objects instead. - os.setpgid(self.pgid, self.pgid) - except (OSError, AttributeError): - # NOTE(flaper87): When running glance-control, - # (glance's functional tests, for example) - # setpgid fails with EPERM as glance-control - # creates a fresh session, of which the newly - # launched service becomes the leader (session - # leaders may not change process groups) - # - # Running glance-(api|registry) is safe and - # shouldn't raise any error here. - self.pgid = 0 @staticmethod def set_signal_handler(signal_name, handler): @@ -524,13 +527,20 @@ class Server(object): self.set_signal_handler("SIGHUP", signal.SIG_IGN) raise exception.SIGHUPInterrupt + @abc.abstractmethod def kill_children(self, *args): - """Kills the entire process group.""" - self.set_signal_handler("SIGTERM", signal.SIG_IGN) - self.set_signal_handler("SIGINT", signal.SIG_IGN) - self.set_signal_handler("SIGCHLD", signal.SIG_IGN) - self.running = False - os.killpg(self.pgid, signal.SIGTERM) + pass + + @abc.abstractmethod + def wait_on_children(self): + pass + + @abc.abstractmethod + def run_child(self): + pass + + def reload(self): + raise NotImplementedError() def start(self, application, default_port): """ @@ -562,50 +572,6 @@ class Server(object): def create_pool(self): return get_asynchronous_eventlet_pool(size=self.threads) - def _remove_children(self, pid): - if pid in self.children: - self.children.remove(pid) - LOG.info(_LI('Removed dead child %s'), pid) - elif pid in self.stale_children: - self.stale_children.remove(pid) - LOG.info(_LI('Removed stale child %s'), pid) - else: - LOG.warn(_LW('Unrecognised child %s') % pid) - - def _verify_and_respawn_children(self, pid, status): - if len(self.stale_children) == 0: - LOG.debug('No stale children') - if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0: - LOG.error(_LE('Not respawning child %d, cannot ' - 'recover from termination') % pid) - if not self.children and not self.stale_children: - LOG.info( - _LI('All workers have terminated. Exiting')) - self.running = False - else: - if len(self.children) < get_num_workers(): - self.run_child() - - def wait_on_children(self): - while self.running: - try: - pid, status = os.wait() - if os.WIFEXITED(status) or os.WIFSIGNALED(status): - self._remove_children(pid) - self._verify_and_respawn_children(pid, status) - except OSError as err: - if err.errno not in (errno.EINTR, errno.ECHILD): - raise - except KeyboardInterrupt: - LOG.info(_LI('Caught keyboard interrupt. Exiting.')) - break - except exception.SIGHUPInterrupt: - self.reload() - continue - eventlet.greenio.shutdown_safe(self.sock) - self.sock.close() - LOG.debug('Exited') - def configure(self, old_conf=None, has_changed=None): """ Apply configuration settings @@ -622,35 +588,6 @@ class Server(object): else: initialize_glance_store() - def reload(self): - """ - Reload and re-apply configuration settings - - Existing child processes are sent a SIGHUP signal - and will exit after completing existing requests. - New child processes, which will have the updated - configuration, are spawned. This allows preventing - interruption to the service. - """ - def _has_changed(old, new, param): - old = old.get(param) - new = getattr(new, param) - return (new != old) - - old_conf = utils.stash_conf_values() - has_changed = functools.partial(_has_changed, old_conf, CONF) - CONF.reload_config_files() - os.killpg(self.pgid, signal.SIGHUP) - self.stale_children = self.children - self.children = set() - - # Ensure any logging config changes are picked up - logging.setup(CONF, 'glance') - config.set_config_defaults() - - self.configure(old_conf, has_changed) - self.start_wsgi() - def wait(self): """Wait until all servers have completed running.""" try: @@ -661,34 +598,6 @@ class Server(object): except KeyboardInterrupt: pass - def run_child(self): - def child_hup(*args): - """Shuts down child processes, existing requests are handled.""" - self.set_signal_handler("SIGHUP", signal.SIG_IGN) - eventlet.wsgi.is_accepting = False - self.sock.close() - - pid = os.fork() - if pid == 0: - self.set_signal_handler("SIGHUP", child_hup) - self.set_signal_handler("SIGTERM", signal.SIG_DFL) - # ignore the interrupt signal to avoid a race whereby - # a child worker receives the signal before the parent - # and is respawned unnecessarily as a result - self.set_signal_handler("SIGINT", signal.SIG_IGN) - # The child has no need to stash the unwrapped - # socket, and the reference prevents a clean - # exit on sighup - self._sock = None - self.run_server() - LOG.info(_LI('Child %d exiting normally'), os.getpid()) - # self.pool.waitall() is now called in wsgi's server so - # it's safe to exit here - sys.exit(0) - else: - LOG.info(_LI('Started child %s'), pid) - self.children.add(pid) - def run_server(self): """Run a WSGI server.""" if cfg.CONF.pydev_worker_debug_host: @@ -796,6 +705,262 @@ class Server(object): self.sock.listen(CONF.backlog) +class PosixServer(BaseServer): + def __init__(self, *args, **kwargs): + super(PosixServer, self).__init__(*args, **kwargs) + + self.pgid = os.getpid() + try: + # NOTE(flaper87): Make sure this process + # runs in its own process group. + os.setpgid(self.pgid, self.pgid) + except OSError: + # NOTE(flaper87): When running glance-control, + # (glance's functional tests, for example) + # setpgid fails with EPERM as glance-control + # creates a fresh session, of which the newly + # launched service becomes the leader (session + # leaders may not change process groups) + # + # Running glance-(api|registry) is safe and + # shouldn't raise any error here. + self.pgid = 0 + + def kill_children(self, *args): + """Kills the entire process group.""" + self.set_signal_handler("SIGTERM", signal.SIG_IGN) + self.set_signal_handler("SIGINT", signal.SIG_IGN) + self.set_signal_handler("SIGCHLD", signal.SIG_IGN) + self.running = False + os.killpg(self.pgid, signal.SIGTERM) + + def _remove_children(self, pid): + if pid in self.children: + self.children.remove(pid) + LOG.info(_LI('Removed dead child %s'), pid) + elif pid in self.stale_children: + self.stale_children.remove(pid) + LOG.info(_LI('Removed stale child %s'), pid) + else: + LOG.warn(_LW('Unrecognised child %s') % pid) + + def _verify_and_respawn_children(self, pid, status): + if len(self.stale_children) == 0: + LOG.debug('No stale children') + if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0: + LOG.error(_LE('Not respawning child %d, cannot ' + 'recover from termination') % pid) + if not self.children and not self.stale_children: + LOG.info( + _LI('All workers have terminated. Exiting')) + self.running = False + else: + if len(self.children) < get_num_workers(): + self.run_child() + + def wait_on_children(self): + while self.running: + try: + pid, status = os.wait() + if os.WIFEXITED(status) or os.WIFSIGNALED(status): + self._remove_children(pid) + self._verify_and_respawn_children(pid, status) + except OSError as err: + if err.errno not in (errno.EINTR, errno.ECHILD): + raise + except KeyboardInterrupt: + LOG.info(_LI('Caught keyboard interrupt. Exiting.')) + break + except exception.SIGHUPInterrupt: + self.reload() + continue + eventlet.greenio.shutdown_safe(self.sock) + self.sock.close() + LOG.debug('Exited') + + def run_child(self): + def child_hup(*args): + """Shuts down child processes, existing requests are handled.""" + self.set_signal_handler("SIGHUP", signal.SIG_IGN) + eventlet.wsgi.is_accepting = False + self.sock.close() + + pid = os.fork() + if pid == 0: + self.set_signal_handler("SIGHUP", child_hup) + self.set_signal_handler("SIGTERM", signal.SIG_DFL) + # ignore the interrupt signal to avoid a race whereby + # a child worker receives the signal before the parent + # and is respawned unnecessarily as a result + self.set_signal_handler("SIGINT", signal.SIG_IGN) + # The child has no need to stash the unwrapped + # socket, and the reference prevents a clean + # exit on sighup + self._sock = None + self.run_server() + LOG.info(_LI('Child %d exiting normally'), os.getpid()) + # self.pool.waitall() is now called in wsgi's server so + # it's safe to exit here + sys.exit(0) + else: + LOG.info(_LI('Started child %s'), pid) + self.children.add(pid) + + def reload(self): + """ + Reload and re-apply configuration settings + + Existing child processes are sent a SIGHUP signal + and will exit after completing existing requests. + New child processes, which will have the updated + configuration, are spawned. This allows preventing + interruption to the service. + """ + def _has_changed(old, new, param): + old = old.get(param) + new = getattr(new, param) + return (new != old) + + old_conf = utils.stash_conf_values() + has_changed = functools.partial(_has_changed, old_conf, CONF) + CONF.reload_config_files() + os.killpg(self.pgid, signal.SIGHUP) + self.stale_children = self.children + self.children = set() + + # Ensure any logging config changes are picked up + logging.setup(CONF, 'glance') + config.set_config_defaults() + + self.configure(old_conf, has_changed) + self.start_wsgi() + + +class Win32ProcessLauncher(object): + def __init__(self): + self._processutils = os_win_utilsfactory.get_processutils() + + self._workers = [] + self._worker_job_handles = [] + + def add_process(self, cmd): + LOG.info("Starting subprocess: %s", cmd) + + worker = subprocess.Popen(cmd, close_fds=False) + try: + job_handle = self._processutils.kill_process_on_job_close( + worker.pid) + except Exception: + LOG.exception("Could not associate child process " + "with a job, killing it.") + worker.kill() + raise + + self._worker_job_handles.append(job_handle) + self._workers.append(worker) + + return worker + + def wait(self): + pids = [worker.pid for worker in self._workers] + if pids: + self._processutils.wait_for_multiple_processes(pids, + wait_all=True) + # By sleeping here, we allow signal handlers to be executed. + time.sleep(0) + + +class Win32Server(BaseServer): + _py_script_re = re.compile(r'.*\.py\w?$') + _sock = None + + def __init__(self, *args, **kwargs): + super(Win32Server, self).__init__(*args, **kwargs) + self._launcher = Win32ProcessLauncher() + self._ioutils = os_win_utilsfactory.get_ioutils() + + def run_child(self): + # We're passing copies of the socket through pipes. + rfd, wfd = self._ioutils.create_pipe(inherit_handle=True) + + cmd = sys.argv + ['--pipe-handle=%s' % int(rfd)] + # Recent setuptools versions will trim '-script.py' and '.exe' + # extensions from sys.argv[0]. + if self._py_script_re.match(sys.argv[0]): + cmd = [sys.executable] + cmd + + worker = self._launcher.add_process(cmd) + self._ioutils.close_handle(rfd) + + share_sock_buff = self._sock.share(worker.pid) + self._ioutils.write_file( + wfd, + struct.pack('