Merge "Add support for PluginWorker and Process creation notification"

This commit is contained in:
Jenkins 2015-09-04 05:02:52 +00:00 committed by Gerrit Code Review
commit c2eea9eee3
14 changed files with 160 additions and 32 deletions

View File

@ -306,19 +306,13 @@
# ========== end of items for VLAN trunking networks ==========
# =========== WSGI parameters related to the API server ==============
# Number of separate worker processes to spawn. A value of 0 runs the
# worker thread in the current process. Greater than 0 launches that number of
# child processes as workers. The parent process manages them. If not
# specified, the default value is equal to the number of CPUs available to
# achieve best performance.
# Number of separate API worker processes to spawn. If not specified or < 1,
# the default value is equal to the number of CPUs available.
# api_workers = <number of CPUs>
# Number of separate RPC worker processes to spawn. The default, 0, runs the
# worker thread in the current process. Greater than 0 launches that number of
# child processes as RPC workers. The parent process manages them.
# This feature is experimental until issues are addressed and testing has been
# enabled for various plugins for compatibility.
# rpc_workers = 0
# Number of separate RPC worker processes to spawn. If not specified or < 1,
# a single RPC worker process is spawned by the parent process.
# rpc_workers = 1
# Timeout for client connections socket operations. If an
# incoming connection is idle for this number of seconds it

View File

@ -12,6 +12,7 @@
# String literals representing core resources.
PORT = 'port'
PROCESS = 'process'
ROUTER = 'router'
ROUTER_GATEWAY = 'router_gateway'
ROUTER_INTERFACE = 'router_interface'

View File

@ -246,3 +246,8 @@ class NeutronManager(object):
service_plugins = cls.get_instance().service_plugins
return dict((x, weakref.proxy(y))
for x, y in six.iteritems(service_plugins))
@classmethod
def get_unique_service_plugins(cls):
service_plugins = cls.get_instance().service_plugins
return tuple(weakref.proxy(x) for x in set(service_plugins.values()))

View File

@ -389,3 +389,12 @@ class NeutronPluginBaseV2(object):
"""
return (self.__class__.start_rpc_listeners !=
NeutronPluginBaseV2.start_rpc_listeners)
def get_workers(self):
"""Returns a collection NeutronWorker instances
If a plugin needs to define worker processes outside of API/RPC workers
then it will override this and return a collection of NeutronWorker
instances
"""
return ()

View File

@ -888,6 +888,14 @@ class MechanismDriver(object):
"""
pass
def get_workers(self):
"""Get any NeutronWorker instances that should have their own process
Any driver that needs to run processes separate from the API or RPC
workers, can return a sequence of NeutronWorker instances.
"""
return ()
@six.add_metaclass(abc.ABCMeta)
class ExtensionDriver(object):

View File

@ -749,6 +749,12 @@ class MechanismManager(stevedore.named.NamedExtensionManager):
return False
return True
def get_workers(self):
workers = []
for driver in self.ordered_mech_drivers:
workers += driver.obj.get_workers()
return workers
class ExtensionManager(stevedore.named.NamedExtensionManager):
"""Manage extension drivers using drivers."""

View File

@ -1556,3 +1556,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if port:
return port.id
return device
def get_workers(self):
return self.mechanism_manager.get_workers()

View File

@ -50,6 +50,10 @@ def main():
else:
rpc_thread = pool.spawn(neutron_rpc.wait)
plugin_workers = service.start_plugin_workers()
for worker in plugin_workers:
pool.spawn(worker.wait)
# api and rpc should die together. When one dies, kill the other.
rpc_thread.link(lambda gt: api_thread.kill())
api_thread.link(lambda gt: rpc_thread.kill())

View File

@ -32,6 +32,7 @@ from neutron import context
from neutron.db import api as session
from neutron.i18n import _LE, _LI
from neutron import manager
from neutron import worker
from neutron import wsgi
@ -44,7 +45,7 @@ service_opts = [
'If not specified, the default is equal to the number '
'of CPUs available for best performance.')),
cfg.IntOpt('rpc_workers',
default=0,
default=1,
help=_('Number of RPC worker processes for service')),
cfg.IntOpt('periodic_fuzzy_delay',
default=5,
@ -108,13 +109,28 @@ def serve_wsgi(cls):
return service
class RpcWorker(common_service.ServiceBase):
def start_plugin_workers():
launchers = []
# NOTE(twilson) get_service_plugins also returns the core plugin
for plugin in manager.NeutronManager.get_unique_service_plugins():
# TODO(twilson) Instead of defaulting here, come up with a good way to
# share a common get_workers default between NeutronPluginBaseV2 and
# ServicePluginBase
for plugin_worker in getattr(plugin, 'get_workers', tuple)():
launcher = common_service.ProcessLauncher(cfg.CONF)
launcher.launch_service(plugin_worker)
launchers.append(launcher)
return launchers
class RpcWorker(worker.NeutronWorker):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, plugin):
self._plugin = plugin
self._servers = []
def start(self):
super(RpcWorker, self).start()
self._servers = self._plugin.start_rpc_listeners()
def wait(self):
@ -149,6 +165,9 @@ class RpcWorker(common_service.ServiceBase):
def serve_rpc():
plugin = manager.NeutronManager.get_plugin()
if cfg.CONF.rpc_workers < 1:
cfg.CONF.set_override('rpc_workers', 1)
# If 0 < rpc_workers then start_rpc_listeners would be called in a
# subprocess and we cannot simply catch the NotImplementedError. It is
# simpler to check this up front by testing whether the plugin supports
@ -164,22 +183,14 @@ def serve_rpc():
try:
rpc = RpcWorker(plugin)
if cfg.CONF.rpc_workers < 1:
LOG.debug('starting rpc directly, workers=%s',
cfg.CONF.rpc_workers)
rpc.start()
return rpc
else:
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
LOG.debug('using launcher for rpc, workers=%s',
cfg.CONF.rpc_workers)
session.dispose()
launcher = common_service.ProcessLauncher(cfg.CONF,
wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
return launcher
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
session.dispose()
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
return launcher
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE('Unrecoverable error: please check log for '
@ -188,7 +199,7 @@ def serve_rpc():
def _get_api_workers():
workers = cfg.CONF.api_workers
if workers is None:
if not workers:
workers = processutils.get_worker_count()
return workers

View File

@ -31,9 +31,11 @@ from neutron.db import l3_hamode_db
from neutron.db import l3_hascheduler_db
from neutron.plugins.common import constants
from neutron.quota import resource_registry
from neutron.services import service_base
class L3RouterPlugin(common_db_mixin.CommonDbMixin,
class L3RouterPlugin(service_base.ServicePluginBase,
common_db_mixin.CommonDbMixin,
extraroute_db.ExtraRoute_db_mixin,
l3_hamode_db.L3_HA_NAT_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,

View File

@ -46,6 +46,10 @@ class ServicePluginBase(extensions.PluginInterface):
"""Return string description of the plugin."""
pass
def get_workers(self):
"""Returns a collection of NeutronWorkers"""
return ()
def load_drivers(service_type, plugin):
"""Loads drivers for specific service.

View File

@ -27,6 +27,7 @@ import psutil
from neutron.agent.linux import utils
from neutron import service
from neutron.tests import base
from neutron import worker
from neutron import wsgi
@ -245,3 +246,41 @@ class TestRPCServer(TestNeutronServer):
def test_restart_rpc_on_sighup_multiple_workers(self):
self._test_restart_service_on_sighup(service=self._serve_rpc,
workers=2)
class TestPluginWorker(TestNeutronServer):
"""Ensure that a plugin returning Workers spawns workers"""
def setUp(self):
super(TestPluginWorker, self).setUp()
self.setup_coreplugin(TARGET_PLUGIN)
self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
self.plugin = self._plugin_patcher.start()
def _start_plugin(self, workers=0):
with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp:
gp.return_value = self.plugin
launchers = service.start_plugin_workers()
for launcher in launchers:
launcher.wait()
def test_start(self):
class FakeWorker(worker.NeutronWorker):
def start(self):
pass
def wait(self):
pass
def stop(self):
pass
def reset(self):
pass
# Make both ABC happy and ensure 'self' is correct
FakeWorker.reset = self._fake_reset
workers = [FakeWorker()]
self.plugin.return_value.get_workers.return_value = workers
self._test_restart_service_on_sighup(service=self._start_plugin,
workers=len(workers))

40
neutron/worker.py Normal file
View File

@ -0,0 +1,40 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_service import service
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
class NeutronWorker(service.ServiceBase):
"""Partial implementation of the ServiceBase ABC
Subclasses will still need to add the other abstractmethods defined in
service.ServiceBase. See oslo_service for more details.
If a plugin needs to handle synchornization with the Neutron database and
do this only once instead of in every API worker, for instance, it would
define a NeutronWorker class and the plugin would have get_workers return
an array of NeutronWorker instnaces. For example:
class MyPlugin(...):
def get_workers(self):
return [MyPluginWorker()]
class MyPluginWorker(NeutronWorker):
def start(self):
super(MyPluginWorker, self).start()
do_sync()
"""
def start(self):
registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)

View File

@ -44,6 +44,7 @@ from neutron.common import exceptions as exception
from neutron import context
from neutron.db import api
from neutron.i18n import _LE, _LI
from neutron import worker
socket_opts = [
cfg.IntOpt('backlog',
@ -102,7 +103,7 @@ def encode_body(body):
return body
class WorkerService(common_service.ServiceBase):
class WorkerService(worker.NeutronWorker):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, service, application):
self._service = service
@ -110,6 +111,7 @@ class WorkerService(common_service.ServiceBase):
self._server = None
def start(self):
super(WorkerService, self).start()
# When api worker is stopped it kills the eventlet wsgi server which
# internally closes the wsgi server socket object. This server socket
# object becomes not usable which leads to "Bad file descriptor"