Merge "Creates multiple worker processes for API server"

This commit is contained in:
Jenkins 2013-10-31 01:39:19 +00:00 committed by Gerrit Code Review
commit c1577c5954
4 changed files with 72 additions and 5 deletions

View File

@ -239,6 +239,10 @@ notification_driver = neutron.openstack.common.notifier.rpc_notifier
# =========== end of items for agent scheduler extension =====
# =========== WSGI parameters related to the API server ==============
# Number of separate worker processes to spawn. The default, 0, runs the
# worker thread in the current process. Greater than 0 launches that number of
# child processes as workers. The parent process manages them.
# api_workers = 0
# Sets the value of TCP_KEEPIDLE in seconds to use for each server socket when
# starting API server. Not supported on OS X.
# tcp_keepidle = 600

View File

@ -36,6 +36,9 @@ service_opts = [
cfg.IntOpt('periodic_interval',
default=40,
help=_('Seconds between running periodic tasks')),
cfg.IntOpt('api_workers',
default=0,
help=_('Number of separate worker processes for service')),
cfg.IntOpt('periodic_fuzzy_delay',
default=5,
help=_('Range of seconds to randomly delay when starting the '
@ -111,7 +114,8 @@ def _run_wsgi(app_name):
LOG.error(_('No known API applications configured.'))
return
server = wsgi.Server("Neutron")
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host)
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
workers=cfg.CONF.api_workers)
# Dump all option values here after all options are parsed
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
LOG.info(_("Neutron service started, listening on %(host)s:%(port)s"),

View File

@ -47,6 +47,22 @@ class TestWSGIServer(base.BaseTestCase):
server.stop()
server.wait()
@mock.patch('neutron.wsgi.ProcessLauncher')
def test_start_multiple_workers(self, ProcessLauncher):
launcher = ProcessLauncher.return_value
server = wsgi.Server("test_multiple_processes")
server.start(None, 0, host="127.0.0.1", workers=2)
launcher.running = True
launcher.launch_service.assert_called_once_with(server._server,
workers=2)
server.stop()
self.assertFalse(launcher.running)
server.wait()
launcher.wait.assert_called_once_with()
def test_start_random_port_with_ipv6(self):
server = wsgi.Server("test_random_port")
server.start(None, 0, host="::1")

View File

@ -39,9 +39,11 @@ import webob.exc
from neutron.common import constants
from neutron.common import exceptions as exception
from neutron import context
from neutron.openstack.common.db.sqlalchemy import session
from neutron.openstack.common import gettextutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
from neutron.openstack.common.service import ProcessLauncher
socket_opts = [
cfg.IntOpt('backlog',
@ -84,12 +86,39 @@ def run_server(application, port):
eventlet.wsgi.server(sock, application)
class WorkerService(object):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, service, application):
self._service = service
self._application = application
self._server = None
def start(self):
# We may have just forked from parent process. A quick disposal of the
# existing sql connections avoids producting 500 errors later when they
# are discovered to be broken.
session.get_engine(sqlite_fk=True).pool.dispose()
self._server = self._service.pool.spawn(self._service._run,
self._application,
self._service._socket)
def wait(self):
self._service.pool.waitall()
def stop(self):
if isinstance(self._server, eventlet.greenthread.GreenThread):
self._server.kill()
self._server = None
class Server(object):
"""Server class to manage multiple WSGI sockets and applications."""
def __init__(self, name, threads=1000):
self.pool = eventlet.GreenPool(threads)
self.name = name
self._launcher = None
self._server = None
def _get_socket(self, host, port, backlog):
bind_addr = (host, port)
@ -168,7 +197,7 @@ class Server(object):
return sock
def start(self, application, port, host='0.0.0.0'):
def start(self, application, port, host='0.0.0.0', workers=0):
"""Run a WSGI server with the given application."""
self._host = host
self._port = port
@ -177,7 +206,14 @@ class Server(object):
self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)
self._server = self.pool.spawn(self._run, application, self._socket)
if workers < 1:
# For the case where only one process is required.
self._server = self.pool.spawn(self._run, application,
self._socket)
else:
self._launcher = ProcessLauncher()
self._server = WorkerService(self, application)
self._launcher.launch_service(self._server, workers=workers)
@property
def host(self):
@ -188,12 +224,19 @@ class Server(object):
return self._socket.getsockname()[1] if self._socket else self._port
def stop(self):
self._server.kill()
if self._launcher:
# The process launcher does not support stop or kill.
self._launcher.running = False
else:
self._server.kill()
def wait(self):
"""Wait until all servers have completed running."""
try:
self.pool.waitall()
if self._launcher:
self._launcher.wait()
else:
self.pool.waitall()
except KeyboardInterrupt:
pass