Merge "Rename workers to api_workers and simplify code"

This commit is contained in:
Jenkins 2014-09-29 16:58:40 +00:00 committed by Gerrit Code Review
commit dd238d0aac
5 changed files with 23 additions and 56 deletions

View File

@ -38,7 +38,6 @@ from neutron.openstack.common.cache import cache
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import service
from neutron import wsgi
LOG = logging.getLogger(__name__)
@ -280,16 +279,8 @@ class UnixDomainWSGIServer(wsgi.Server):
self._socket = eventlet.listen(file_socket,
family=socket.AF_UNIX,
backlog=backlog)
if workers < 1:
# For the case where only one process is required.
self._server = self.pool.spawn_n(self._run, application,
self._socket)
else:
# Minimize the cost of checking for child exit by extending the
# wait interval past the default of 0.01s.
self._launcher = service.ProcessLauncher(wait_interval=1.0)
self._server = WorkerService(self, application)
self._launcher.launch_service(self._server, workers=workers)
self._launch(application, workers=workers)
def _run(self, application, socket):
"""Start a WSGI service in a new green thread."""

View File

@ -40,7 +40,7 @@ service_opts = [
help=_('Seconds between running periodic tasks')),
cfg.IntOpt('api_workers',
default=0,
help=_('Number of separate worker processes for service')),
help=_('Number of separate API worker processes for service')),
cfg.IntOpt('rpc_workers',
default=0,
help=_('Number of RPC worker processes for service')),

View File

@ -497,8 +497,8 @@ class TestUnixDomainWSGIServer(base.BaseTestCase):
def test_start(self):
mock_app = mock.Mock()
with mock.patch.object(self.server, 'pool') as pool:
self.server.start(mock_app, '/the/path', workers=0, backlog=128)
with mock.patch.object(self.server, '_launch') as launcher:
self.server.start(mock_app, '/the/path', workers=5, backlog=128)
self.eventlet.assert_has_calls([
mock.call.listen(
'/the/path',
@ -506,27 +506,7 @@ class TestUnixDomainWSGIServer(base.BaseTestCase):
backlog=128
)]
)
pool.spawn_n.assert_called_once_with(
self.server._run,
mock_app,
self.eventlet.listen.return_value
)
@mock.patch('neutron.openstack.common.service.ProcessLauncher')
def test_start_multiple_workers(self, process_launcher):
launcher = process_launcher.return_value
mock_app = mock.Mock()
self.server.start(mock_app, '/the/path', workers=2, backlog=128)
launcher.running = True
launcher.launch_service.assert_called_once_with(self.server._server,
workers=2)
self.server.stop()
self.assertFalse(launcher.running)
self.server.wait()
launcher.wait.assert_called_once_with()
launcher.assert_called_once_with(mock_app, workers=5)
def test_run(self):
with mock.patch.object(agent, 'logging') as logging:

View File

@ -56,12 +56,10 @@ class TestWSGIServer(base.BaseTestCase):
server = wsgi.Server("test_multiple_processes")
server.start(None, 0, host="127.0.0.1", workers=2)
launcher.running = True
launcher.launch_service.assert_called_once_with(server._server,
workers=2)
launcher.launch_service.assert_called_once_with(mock.ANY, workers=2)
server.stop()
self.assertFalse(launcher.running)
launcher.stop.assert_called_once_with()
server.wait()
launcher.wait.assert_called_once_with()

View File

@ -97,7 +97,8 @@ class WorkerService(object):
self._service._socket)
def wait(self):
self._service.pool.waitall()
if isinstance(self._server, eventlet.greenthread.GreenThread):
self._server.wait()
def stop(self):
if isinstance(self._server, eventlet.greenthread.GreenThread):
@ -113,7 +114,6 @@ class Server(object):
eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
self.pool = eventlet.GreenPool(threads)
self.name = name
self._launcher = None
self._server = None
def _get_socket(self, host, port, backlog):
@ -205,17 +205,22 @@ class Server(object):
self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)
self._launch(application, workers)
def _launch(self, application, workers=0):
service = WorkerService(self, application)
if workers < 1:
# For the case where only one process is required.
self._server = self.pool.spawn(self._run, application,
self._socket)
# The API service should run in the current process.
self._server = service
service.start()
systemd.notify_once()
else:
# The API service runs in a number of child processes.
# Minimize the cost of checking for child exit by extending the
# wait interval past the default of 0.01s.
self._launcher = common_service.ProcessLauncher(wait_interval=1.0)
self._server = WorkerService(self, application)
self._launcher.launch_service(self._server, workers=workers)
self._server = common_service.ProcessLauncher(wait_interval=1.0)
self._server.launch_service(service, workers=workers)
@property
def host(self):
@ -226,19 +231,12 @@ class Server(object):
return self._socket.getsockname()[1] if self._socket else self._port
def stop(self):
if self._launcher:
# The process launcher does not support stop or kill.
self._launcher.running = False
else:
self._server.kill()
self._server.stop()
def wait(self):
"""Wait until all servers have completed running."""
try:
if self._launcher:
self._launcher.wait()
else:
self.pool.waitall()
self._server.wait()
except KeyboardInterrupt:
pass