diff --git a/glance/cmd/api.py b/glance/cmd/api.py index 4c35b21bf4..1acd3bba80 100644 --- a/glance/cmd/api.py +++ b/glance/cmd/api.py @@ -62,6 +62,7 @@ from glance import notifier CONF = cfg.CONF CONF.import_group("profiler", "glance.common.wsgi") logging.register_options(CONF) +wsgi.register_cli_opts() # NOTE(rosmaita): Any new exceptions added should preserve the current # error codes for backward compatibility. The value 99 is returned diff --git a/glance/cmd/registry.py b/glance/cmd/registry.py index c222e81e9c..85a13665ea 100644 --- a/glance/cmd/registry.py +++ b/glance/cmd/registry.py @@ -60,6 +60,7 @@ from glance import notifier CONF = cfg.CONF CONF.import_group("profiler", "glance.common.wsgi") logging.register_options(CONF) +wsgi.register_cli_opts() def main(): diff --git a/glance/common/wsgi.py b/glance/common/wsgi.py index dae1e2e5b7..103a09b765 100644 --- a/glance/common/wsgi.py +++ b/glance/common/wsgi.py @@ -21,10 +21,14 @@ Utility methods for working with WSGI servers """ from __future__ import print_function +import abc import errno import functools import os +import re import signal +import struct +import subprocess import sys import time @@ -33,6 +37,7 @@ from eventlet.green import ssl import eventlet.greenio import eventlet.wsgi import glance_store +from os_win import utilsfactory as os_win_utilsfactory from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import log as logging @@ -318,6 +323,12 @@ store_opts = [ 'using comma.')), ] +cli_opts = [ + cfg.StrOpt('pipe-handle', + help='This argument is used internally on Windows. Glance ' + 'passes a pipe handle to child processes, which is then ' + 'used for inter-process communication.'), +] LOG = logging.getLogger(__name__) @@ -340,8 +351,17 @@ except ImportError: uwsgi = None +def register_cli_opts(): + CONF.register_cli_opts(cli_opts) + + def get_num_workers(): """Return the configured number of workers.""" + + # Windows only: we're already running on the worker side. + if os.name == 'nt' and getattr(CONF, 'pipe_handle', None): + return 0 + if CONF.workers is None: # None implies the number of CPUs limited to 8 # See Launchpad bug #1748916 and the config help text @@ -475,7 +495,8 @@ def get_asynchronous_eventlet_pool(size=1000): return pool -class Server(object): +@six.add_metaclass(abc.ABCMeta) +class BaseServer(object): """Server class to manage multiple WSGI sockets and applications. This class requires initialize_glance_store set to True if @@ -491,24 +512,6 @@ class Server(object): # NOTE(abhishek): Allows us to only re-initialize glance_store when # the API's configuration reloads. self.initialize_glance_store = initialize_glance_store - self.pgid = os.getpid() - try: - # NOTE(flaper87): Make sure this process - # runs in its own process group. - # NOTE(lpetrut): This isn't available on Windows, so we're going - # to use job objects instead. - os.setpgid(self.pgid, self.pgid) - except (OSError, AttributeError): - # NOTE(flaper87): When running glance-control, - # (glance's functional tests, for example) - # setpgid fails with EPERM as glance-control - # creates a fresh session, of which the newly - # launched service becomes the leader (session - # leaders may not change process groups) - # - # Running glance-(api|registry) is safe and - # shouldn't raise any error here. - self.pgid = 0 @staticmethod def set_signal_handler(signal_name, handler): @@ -524,13 +527,20 @@ class Server(object): self.set_signal_handler("SIGHUP", signal.SIG_IGN) raise exception.SIGHUPInterrupt + @abc.abstractmethod def kill_children(self, *args): - """Kills the entire process group.""" - self.set_signal_handler("SIGTERM", signal.SIG_IGN) - self.set_signal_handler("SIGINT", signal.SIG_IGN) - self.set_signal_handler("SIGCHLD", signal.SIG_IGN) - self.running = False - os.killpg(self.pgid, signal.SIGTERM) + pass + + @abc.abstractmethod + def wait_on_children(self): + pass + + @abc.abstractmethod + def run_child(self): + pass + + def reload(self): + raise NotImplementedError() def start(self, application, default_port): """ @@ -562,50 +572,6 @@ class Server(object): def create_pool(self): return get_asynchronous_eventlet_pool(size=self.threads) - def _remove_children(self, pid): - if pid in self.children: - self.children.remove(pid) - LOG.info(_LI('Removed dead child %s'), pid) - elif pid in self.stale_children: - self.stale_children.remove(pid) - LOG.info(_LI('Removed stale child %s'), pid) - else: - LOG.warn(_LW('Unrecognised child %s') % pid) - - def _verify_and_respawn_children(self, pid, status): - if len(self.stale_children) == 0: - LOG.debug('No stale children') - if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0: - LOG.error(_LE('Not respawning child %d, cannot ' - 'recover from termination') % pid) - if not self.children and not self.stale_children: - LOG.info( - _LI('All workers have terminated. Exiting')) - self.running = False - else: - if len(self.children) < get_num_workers(): - self.run_child() - - def wait_on_children(self): - while self.running: - try: - pid, status = os.wait() - if os.WIFEXITED(status) or os.WIFSIGNALED(status): - self._remove_children(pid) - self._verify_and_respawn_children(pid, status) - except OSError as err: - if err.errno not in (errno.EINTR, errno.ECHILD): - raise - except KeyboardInterrupt: - LOG.info(_LI('Caught keyboard interrupt. Exiting.')) - break - except exception.SIGHUPInterrupt: - self.reload() - continue - eventlet.greenio.shutdown_safe(self.sock) - self.sock.close() - LOG.debug('Exited') - def configure(self, old_conf=None, has_changed=None): """ Apply configuration settings @@ -622,35 +588,6 @@ class Server(object): else: initialize_glance_store() - def reload(self): - """ - Reload and re-apply configuration settings - - Existing child processes are sent a SIGHUP signal - and will exit after completing existing requests. - New child processes, which will have the updated - configuration, are spawned. This allows preventing - interruption to the service. - """ - def _has_changed(old, new, param): - old = old.get(param) - new = getattr(new, param) - return (new != old) - - old_conf = utils.stash_conf_values() - has_changed = functools.partial(_has_changed, old_conf, CONF) - CONF.reload_config_files() - os.killpg(self.pgid, signal.SIGHUP) - self.stale_children = self.children - self.children = set() - - # Ensure any logging config changes are picked up - logging.setup(CONF, 'glance') - config.set_config_defaults() - - self.configure(old_conf, has_changed) - self.start_wsgi() - def wait(self): """Wait until all servers have completed running.""" try: @@ -661,34 +598,6 @@ class Server(object): except KeyboardInterrupt: pass - def run_child(self): - def child_hup(*args): - """Shuts down child processes, existing requests are handled.""" - self.set_signal_handler("SIGHUP", signal.SIG_IGN) - eventlet.wsgi.is_accepting = False - self.sock.close() - - pid = os.fork() - if pid == 0: - self.set_signal_handler("SIGHUP", child_hup) - self.set_signal_handler("SIGTERM", signal.SIG_DFL) - # ignore the interrupt signal to avoid a race whereby - # a child worker receives the signal before the parent - # and is respawned unnecessarily as a result - self.set_signal_handler("SIGINT", signal.SIG_IGN) - # The child has no need to stash the unwrapped - # socket, and the reference prevents a clean - # exit on sighup - self._sock = None - self.run_server() - LOG.info(_LI('Child %d exiting normally'), os.getpid()) - # self.pool.waitall() is now called in wsgi's server so - # it's safe to exit here - sys.exit(0) - else: - LOG.info(_LI('Started child %s'), pid) - self.children.add(pid) - def run_server(self): """Run a WSGI server.""" if cfg.CONF.pydev_worker_debug_host: @@ -796,6 +705,262 @@ class Server(object): self.sock.listen(CONF.backlog) +class PosixServer(BaseServer): + def __init__(self, *args, **kwargs): + super(PosixServer, self).__init__(*args, **kwargs) + + self.pgid = os.getpid() + try: + # NOTE(flaper87): Make sure this process + # runs in its own process group. + os.setpgid(self.pgid, self.pgid) + except OSError: + # NOTE(flaper87): When running glance-control, + # (glance's functional tests, for example) + # setpgid fails with EPERM as glance-control + # creates a fresh session, of which the newly + # launched service becomes the leader (session + # leaders may not change process groups) + # + # Running glance-(api|registry) is safe and + # shouldn't raise any error here. + self.pgid = 0 + + def kill_children(self, *args): + """Kills the entire process group.""" + self.set_signal_handler("SIGTERM", signal.SIG_IGN) + self.set_signal_handler("SIGINT", signal.SIG_IGN) + self.set_signal_handler("SIGCHLD", signal.SIG_IGN) + self.running = False + os.killpg(self.pgid, signal.SIGTERM) + + def _remove_children(self, pid): + if pid in self.children: + self.children.remove(pid) + LOG.info(_LI('Removed dead child %s'), pid) + elif pid in self.stale_children: + self.stale_children.remove(pid) + LOG.info(_LI('Removed stale child %s'), pid) + else: + LOG.warn(_LW('Unrecognised child %s') % pid) + + def _verify_and_respawn_children(self, pid, status): + if len(self.stale_children) == 0: + LOG.debug('No stale children') + if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0: + LOG.error(_LE('Not respawning child %d, cannot ' + 'recover from termination') % pid) + if not self.children and not self.stale_children: + LOG.info( + _LI('All workers have terminated. Exiting')) + self.running = False + else: + if len(self.children) < get_num_workers(): + self.run_child() + + def wait_on_children(self): + while self.running: + try: + pid, status = os.wait() + if os.WIFEXITED(status) or os.WIFSIGNALED(status): + self._remove_children(pid) + self._verify_and_respawn_children(pid, status) + except OSError as err: + if err.errno not in (errno.EINTR, errno.ECHILD): + raise + except KeyboardInterrupt: + LOG.info(_LI('Caught keyboard interrupt. Exiting.')) + break + except exception.SIGHUPInterrupt: + self.reload() + continue + eventlet.greenio.shutdown_safe(self.sock) + self.sock.close() + LOG.debug('Exited') + + def run_child(self): + def child_hup(*args): + """Shuts down child processes, existing requests are handled.""" + self.set_signal_handler("SIGHUP", signal.SIG_IGN) + eventlet.wsgi.is_accepting = False + self.sock.close() + + pid = os.fork() + if pid == 0: + self.set_signal_handler("SIGHUP", child_hup) + self.set_signal_handler("SIGTERM", signal.SIG_DFL) + # ignore the interrupt signal to avoid a race whereby + # a child worker receives the signal before the parent + # and is respawned unnecessarily as a result + self.set_signal_handler("SIGINT", signal.SIG_IGN) + # The child has no need to stash the unwrapped + # socket, and the reference prevents a clean + # exit on sighup + self._sock = None + self.run_server() + LOG.info(_LI('Child %d exiting normally'), os.getpid()) + # self.pool.waitall() is now called in wsgi's server so + # it's safe to exit here + sys.exit(0) + else: + LOG.info(_LI('Started child %s'), pid) + self.children.add(pid) + + def reload(self): + """ + Reload and re-apply configuration settings + + Existing child processes are sent a SIGHUP signal + and will exit after completing existing requests. + New child processes, which will have the updated + configuration, are spawned. This allows preventing + interruption to the service. + """ + def _has_changed(old, new, param): + old = old.get(param) + new = getattr(new, param) + return (new != old) + + old_conf = utils.stash_conf_values() + has_changed = functools.partial(_has_changed, old_conf, CONF) + CONF.reload_config_files() + os.killpg(self.pgid, signal.SIGHUP) + self.stale_children = self.children + self.children = set() + + # Ensure any logging config changes are picked up + logging.setup(CONF, 'glance') + config.set_config_defaults() + + self.configure(old_conf, has_changed) + self.start_wsgi() + + +class Win32ProcessLauncher(object): + def __init__(self): + self._processutils = os_win_utilsfactory.get_processutils() + + self._workers = [] + self._worker_job_handles = [] + + def add_process(self, cmd): + LOG.info("Starting subprocess: %s", cmd) + + worker = subprocess.Popen(cmd, close_fds=False) + try: + job_handle = self._processutils.kill_process_on_job_close( + worker.pid) + except Exception: + LOG.exception("Could not associate child process " + "with a job, killing it.") + worker.kill() + raise + + self._worker_job_handles.append(job_handle) + self._workers.append(worker) + + return worker + + def wait(self): + pids = [worker.pid for worker in self._workers] + if pids: + self._processutils.wait_for_multiple_processes(pids, + wait_all=True) + # By sleeping here, we allow signal handlers to be executed. + time.sleep(0) + + +class Win32Server(BaseServer): + _py_script_re = re.compile(r'.*\.py\w?$') + _sock = None + + def __init__(self, *args, **kwargs): + super(Win32Server, self).__init__(*args, **kwargs) + self._launcher = Win32ProcessLauncher() + self._ioutils = os_win_utilsfactory.get_ioutils() + + def run_child(self): + # We're passing copies of the socket through pipes. + rfd, wfd = self._ioutils.create_pipe(inherit_handle=True) + + cmd = sys.argv + ['--pipe-handle=%s' % int(rfd)] + # Recent setuptools versions will trim '-script.py' and '.exe' + # extensions from sys.argv[0]. + if self._py_script_re.match(sys.argv[0]): + cmd = [sys.executable] + cmd + + worker = self._launcher.add_process(cmd) + self._ioutils.close_handle(rfd) + + share_sock_buff = self._sock.share(worker.pid) + self._ioutils.write_file( + wfd, + struct.pack('