diff --git a/etc/neutron.conf b/etc/neutron.conf index d483bb305..62e24888e 100644 --- a/etc/neutron.conf +++ b/etc/neutron.conf @@ -240,6 +240,10 @@ notification_driver = neutron.openstack.common.notifier.rpc_notifier # =========== end of items for agent scheduler extension ===== # =========== WSGI parameters related to the API server ============== +# Number of separate worker processes to spawn. The default, 0, runs the +# worker thread in the current process. Greater than 0 launches that number of +# child processes as workers. The parent process manages them. +# api_workers = 0 # Sets the value of TCP_KEEPIDLE in seconds to use for each server socket when # starting API server. Not supported on OS X. # tcp_keepidle = 600 diff --git a/neutron/service.py b/neutron/service.py index c8a7f907f..32362343c 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -36,6 +36,9 @@ service_opts = [ cfg.IntOpt('periodic_interval', default=40, help=_('Seconds between running periodic tasks')), + cfg.IntOpt('api_workers', + default=0, + help=_('Number of separate worker processes for service')), cfg.IntOpt('periodic_fuzzy_delay', default=5, help=_('Range of seconds to randomly delay when starting the ' @@ -111,7 +114,8 @@ def _run_wsgi(app_name): LOG.error(_('No known API applications configured.')) return server = wsgi.Server("Neutron") - server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host) + server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host, + workers=cfg.CONF.api_workers) # Dump all option values here after all options are parsed cfg.CONF.log_opt_values(LOG, std_logging.DEBUG) LOG.info(_("Neutron service started, listening on %(host)s:%(port)s"), diff --git a/neutron/tests/unit/test_wsgi.py b/neutron/tests/unit/test_wsgi.py index a8be55628..52e946a9c 100644 --- a/neutron/tests/unit/test_wsgi.py +++ b/neutron/tests/unit/test_wsgi.py @@ -47,6 +47,22 @@ class TestWSGIServer(base.BaseTestCase): server.stop() server.wait() + @mock.patch('neutron.wsgi.ProcessLauncher') + def test_start_multiple_workers(self, ProcessLauncher): + launcher = ProcessLauncher.return_value + + server = wsgi.Server("test_multiple_processes") + server.start(None, 0, host="127.0.0.1", workers=2) + launcher.running = True + launcher.launch_service.assert_called_once_with(server._server, + workers=2) + + server.stop() + self.assertFalse(launcher.running) + + server.wait() + launcher.wait.assert_called_once_with() + def test_start_random_port_with_ipv6(self): server = wsgi.Server("test_random_port") server.start(None, 0, host="::1") diff --git a/neutron/wsgi.py b/neutron/wsgi.py index 972b3998c..efce42901 100644 --- a/neutron/wsgi.py +++ b/neutron/wsgi.py @@ -39,9 +39,11 @@ import webob.exc from neutron.common import constants from neutron.common import exceptions as exception from neutron import context +from neutron.openstack.common.db.sqlalchemy import session from neutron.openstack.common import gettextutils from neutron.openstack.common import jsonutils from neutron.openstack.common import log as logging +from neutron.openstack.common.service import ProcessLauncher socket_opts = [ cfg.IntOpt('backlog', @@ -84,12 +86,39 @@ def run_server(application, port): eventlet.wsgi.server(sock, application) +class WorkerService(object): + """Wraps a worker to be handled by ProcessLauncher""" + def __init__(self, service, application): + self._service = service + self._application = application + self._server = None + + def start(self): + # We may have just forked from parent process. A quick disposal of the + # existing sql connections avoids producting 500 errors later when they + # are discovered to be broken. + session.get_engine(sqlite_fk=True).pool.dispose() + self._server = self._service.pool.spawn(self._service._run, + self._application, + self._service._socket) + + def wait(self): + self._service.pool.waitall() + + def stop(self): + if isinstance(self._server, eventlet.greenthread.GreenThread): + self._server.kill() + self._server = None + + class Server(object): """Server class to manage multiple WSGI sockets and applications.""" def __init__(self, name, threads=1000): self.pool = eventlet.GreenPool(threads) self.name = name + self._launcher = None + self._server = None def _get_socket(self, host, port, backlog): bind_addr = (host, port) @@ -168,7 +197,7 @@ class Server(object): return sock - def start(self, application, port, host='0.0.0.0'): + def start(self, application, port, host='0.0.0.0', workers=0): """Run a WSGI server with the given application.""" self._host = host self._port = port @@ -177,7 +206,14 @@ class Server(object): self._socket = self._get_socket(self._host, self._port, backlog=backlog) - self._server = self.pool.spawn(self._run, application, self._socket) + if workers < 1: + # For the case where only one process is required. + self._server = self.pool.spawn(self._run, application, + self._socket) + else: + self._launcher = ProcessLauncher() + self._server = WorkerService(self, application) + self._launcher.launch_service(self._server, workers=workers) @property def host(self): @@ -188,12 +224,19 @@ class Server(object): return self._socket.getsockname()[1] if self._socket else self._port def stop(self): - self._server.kill() + if self._launcher: + # The process launcher does not support stop or kill. + self._launcher.running = False + else: + self._server.kill() def wait(self): """Wait until all servers have completed running.""" try: - self.pool.waitall() + if self._launcher: + self._launcher.wait() + else: + self.pool.waitall() except KeyboardInterrupt: pass