Add support for PluginWorker and Process creation notification

There are several cases where plugin initialization should be
handled after neutron-server forks API/RPC workers. For example,
starting a client connection to an SDN controller before forking
copies the fd of the socket to the child process, but then you have
multiple processes trying to read/write the same socket connection.

It is also useful for a plugin to be able to do something in only
one process, regardless of how many workers are forked. One example
would be handling syncing from an external system to the neutron
database.

This patch does 3 things:
1) Treats rpc_workers=0 as = 1. This simplifies the code for
   handling notification that forking has completed. In the
   existing code, calling the notification in the Worker object's
   start() method would happen twice in the case where both api
   and rpc workers were 0, despite there being only one process.
   An earlier patch already changed the default api_workers to be
   the number of processors.
2) Adds notification of forking via the callbacks mechanism.
   Plugins can subscribe to resources.PROCESS, event.AFTER_CREATE
   and do any post-fork initialization that needs to be done for
   every spawned process.
3) Adds core/service plugin calls to get_workers() which defaults
   to returning (). Plugins that need additional processes to spawn
   should just return an iterable of NeutronWorkers that will be
   spawned in their own process.

DocImpact

Closes-Bug: #1463129
Change-Id: Ib99954678c2b4f32f486b537979d446aafbea07b
This commit is contained in:
Terry Wilson 2015-06-15 22:52:28 -05:00 committed by Armando Migliaccio
parent 9ed4be7559
commit 9f6bd17703
14 changed files with 160 additions and 32 deletions

View File

@ -301,19 +301,13 @@
# ========== end of items for VLAN trunking networks ==========
# =========== WSGI parameters related to the API server ==============
# Number of separate worker processes to spawn. A value of 0 runs the
# worker thread in the current process. Greater than 0 launches that number of
# child processes as workers. The parent process manages them. If not
# specified, the default value is equal to the number of CPUs available to
# achieve best performance.
# Number of separate API worker processes to spawn. If not specified or < 1,
# the default value is equal to the number of CPUs available.
# api_workers = <number of CPUs>
# Number of separate RPC worker processes to spawn. The default, 0, runs the
# worker thread in the current process. Greater than 0 launches that number of
# child processes as RPC workers. The parent process manages them.
# This feature is experimental until issues are addressed and testing has been
# enabled for various plugins for compatibility.
# rpc_workers = 0
# Number of separate RPC worker processes to spawn. If not specified or < 1,
# a single RPC worker process is spawned by the parent process.
# rpc_workers = 1
# Timeout for client connections socket operations. If an
# incoming connection is idle for this number of seconds it

View File

@ -12,6 +12,7 @@
# String literals representing core resources.
PORT = 'port'
PROCESS = 'process'
ROUTER = 'router'
ROUTER_GATEWAY = 'router_gateway'
ROUTER_INTERFACE = 'router_interface'

View File

@ -246,3 +246,8 @@ class NeutronManager(object):
service_plugins = cls.get_instance().service_plugins
return dict((x, weakref.proxy(y))
for x, y in six.iteritems(service_plugins))
@classmethod
def get_unique_service_plugins(cls):
service_plugins = cls.get_instance().service_plugins
return tuple(weakref.proxy(x) for x in set(service_plugins.values()))

View File

@ -389,3 +389,12 @@ class NeutronPluginBaseV2(object):
"""
return (self.__class__.start_rpc_listeners !=
NeutronPluginBaseV2.start_rpc_listeners)
def get_workers(self):
"""Returns a collection NeutronWorker instances
If a plugin needs to define worker processes outside of API/RPC workers
then it will override this and return a collection of NeutronWorker
instances
"""
return ()

View File

@ -888,6 +888,14 @@ class MechanismDriver(object):
"""
pass
def get_workers(self):
"""Get any NeutronWorker instances that should have their own process
Any driver that needs to run processes separate from the API or RPC
workers, can return a sequence of NeutronWorker instances.
"""
return ()
@six.add_metaclass(abc.ABCMeta)
class ExtensionDriver(object):

View File

@ -749,6 +749,12 @@ class MechanismManager(stevedore.named.NamedExtensionManager):
return False
return True
def get_workers(self):
workers = []
for driver in self.ordered_mech_drivers:
workers += driver.obj.get_workers()
return workers
class ExtensionManager(stevedore.named.NamedExtensionManager):
"""Manage extension drivers using drivers."""

View File

@ -1539,3 +1539,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if port:
return port.id
return device
def get_workers(self):
return self.mechanism_manager.get_workers()

View File

@ -50,6 +50,10 @@ def main():
else:
rpc_thread = pool.spawn(neutron_rpc.wait)
plugin_workers = service.start_plugin_workers()
for worker in plugin_workers:
pool.spawn(worker.wait)
# api and rpc should die together. When one dies, kill the other.
rpc_thread.link(lambda gt: api_thread.kill())
api_thread.link(lambda gt: rpc_thread.kill())

View File

@ -32,6 +32,7 @@ from neutron import context
from neutron.db import api as session
from neutron.i18n import _LE, _LI
from neutron import manager
from neutron import worker
from neutron import wsgi
@ -44,7 +45,7 @@ service_opts = [
'If not specified, the default is equal to the number '
'of CPUs available for best performance.')),
cfg.IntOpt('rpc_workers',
default=0,
default=1,
help=_('Number of RPC worker processes for service')),
cfg.IntOpt('periodic_fuzzy_delay',
default=5,
@ -108,13 +109,28 @@ def serve_wsgi(cls):
return service
class RpcWorker(common_service.ServiceBase):
def start_plugin_workers():
launchers = []
# NOTE(twilson) get_service_plugins also returns the core plugin
for plugin in manager.NeutronManager.get_unique_service_plugins():
# TODO(twilson) Instead of defaulting here, come up with a good way to
# share a common get_workers default between NeutronPluginBaseV2 and
# ServicePluginBase
for plugin_worker in getattr(plugin, 'get_workers', tuple)():
launcher = common_service.ProcessLauncher(cfg.CONF)
launcher.launch_service(plugin_worker)
launchers.append(launcher)
return launchers
class RpcWorker(worker.NeutronWorker):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, plugin):
self._plugin = plugin
self._servers = []
def start(self):
super(RpcWorker, self).start()
self._servers = self._plugin.start_rpc_listeners()
def wait(self):
@ -149,6 +165,9 @@ class RpcWorker(common_service.ServiceBase):
def serve_rpc():
plugin = manager.NeutronManager.get_plugin()
if cfg.CONF.rpc_workers < 1:
cfg.CONF.set_override('rpc_workers', 1)
# If 0 < rpc_workers then start_rpc_listeners would be called in a
# subprocess and we cannot simply catch the NotImplementedError. It is
# simpler to check this up front by testing whether the plugin supports
@ -164,22 +183,14 @@ def serve_rpc():
try:
rpc = RpcWorker(plugin)
if cfg.CONF.rpc_workers < 1:
LOG.debug('starting rpc directly, workers=%s',
cfg.CONF.rpc_workers)
rpc.start()
return rpc
else:
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
LOG.debug('using launcher for rpc, workers=%s',
cfg.CONF.rpc_workers)
session.dispose()
launcher = common_service.ProcessLauncher(cfg.CONF,
wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
return launcher
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
session.dispose()
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
return launcher
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE('Unrecoverable error: please check log for '
@ -188,7 +199,7 @@ def serve_rpc():
def _get_api_workers():
workers = cfg.CONF.api_workers
if workers is None:
if not workers:
workers = processutils.get_worker_count()
return workers

View File

@ -31,9 +31,11 @@ from neutron.db import l3_hamode_db
from neutron.db import l3_hascheduler_db
from neutron.plugins.common import constants
from neutron.quota import resource_registry
from neutron.services import service_base
class L3RouterPlugin(common_db_mixin.CommonDbMixin,
class L3RouterPlugin(service_base.ServicePluginBase,
common_db_mixin.CommonDbMixin,
extraroute_db.ExtraRoute_db_mixin,
l3_hamode_db.L3_HA_NAT_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,

View File

@ -46,6 +46,10 @@ class ServicePluginBase(extensions.PluginInterface):
"""Return string description of the plugin."""
pass
def get_workers(self):
"""Returns a collection of NeutronWorkers"""
return ()
def load_drivers(service_type, plugin):
"""Loads drivers for specific service.

View File

@ -27,6 +27,7 @@ import psutil
from neutron.agent.linux import utils
from neutron import service
from neutron.tests import base
from neutron import worker
from neutron import wsgi
@ -245,3 +246,41 @@ class TestRPCServer(TestNeutronServer):
def test_restart_rpc_on_sighup_multiple_workers(self):
self._test_restart_service_on_sighup(service=self._serve_rpc,
workers=2)
class TestPluginWorker(TestNeutronServer):
"""Ensure that a plugin returning Workers spawns workers"""
def setUp(self):
super(TestPluginWorker, self).setUp()
self.setup_coreplugin(TARGET_PLUGIN)
self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
self.plugin = self._plugin_patcher.start()
def _start_plugin(self, workers=0):
with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp:
gp.return_value = self.plugin
launchers = service.start_plugin_workers()
for launcher in launchers:
launcher.wait()
def test_start(self):
class FakeWorker(worker.NeutronWorker):
def start(self):
pass
def wait(self):
pass
def stop(self):
pass
def reset(self):
pass
# Make both ABC happy and ensure 'self' is correct
FakeWorker.reset = self._fake_reset
workers = [FakeWorker()]
self.plugin.return_value.get_workers.return_value = workers
self._test_restart_service_on_sighup(service=self._start_plugin,
workers=len(workers))

40
neutron/worker.py Normal file
View File

@ -0,0 +1,40 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_service import service
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
class NeutronWorker(service.ServiceBase):
"""Partial implementation of the ServiceBase ABC
Subclasses will still need to add the other abstractmethods defined in
service.ServiceBase. See oslo_service for more details.
If a plugin needs to handle synchornization with the Neutron database and
do this only once instead of in every API worker, for instance, it would
define a NeutronWorker class and the plugin would have get_workers return
an array of NeutronWorker instnaces. For example:
class MyPlugin(...):
def get_workers(self):
return [MyPluginWorker()]
class MyPluginWorker(NeutronWorker):
def start(self):
super(MyPluginWorker, self).start()
do_sync()
"""
def start(self):
registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)

View File

@ -44,6 +44,7 @@ from neutron.common import exceptions as exception
from neutron import context
from neutron.db import api
from neutron.i18n import _LE, _LI
from neutron import worker
socket_opts = [
cfg.IntOpt('backlog',
@ -102,7 +103,7 @@ def encode_body(body):
return body
class WorkerService(common_service.ServiceBase):
class WorkerService(worker.NeutronWorker):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, service, application):
self._service = service
@ -110,6 +111,7 @@ class WorkerService(common_service.ServiceBase):
self._server = None
def start(self):
super(WorkerService, self).start()
# When api worker is stopped it kills the eventlet wsgi server which
# internally closes the wsgi server socket object. This server socket
# object becomes not usable which leads to "Bad file descriptor"