Merge "Add support for PluginWorker and Process creation notification"
This commit is contained in:
commit
c2eea9eee3
|
@ -306,19 +306,13 @@
|
|||
# ========== end of items for VLAN trunking networks ==========
|
||||
|
||||
# =========== WSGI parameters related to the API server ==============
|
||||
# Number of separate worker processes to spawn. A value of 0 runs the
|
||||
# worker thread in the current process. Greater than 0 launches that number of
|
||||
# child processes as workers. The parent process manages them. If not
|
||||
# specified, the default value is equal to the number of CPUs available to
|
||||
# achieve best performance.
|
||||
# Number of separate API worker processes to spawn. If not specified or < 1,
|
||||
# the default value is equal to the number of CPUs available.
|
||||
# api_workers = <number of CPUs>
|
||||
|
||||
# Number of separate RPC worker processes to spawn. The default, 0, runs the
|
||||
# worker thread in the current process. Greater than 0 launches that number of
|
||||
# child processes as RPC workers. The parent process manages them.
|
||||
# This feature is experimental until issues are addressed and testing has been
|
||||
# enabled for various plugins for compatibility.
|
||||
# rpc_workers = 0
|
||||
# Number of separate RPC worker processes to spawn. If not specified or < 1,
|
||||
# a single RPC worker process is spawned by the parent process.
|
||||
# rpc_workers = 1
|
||||
|
||||
# Timeout for client connections socket operations. If an
|
||||
# incoming connection is idle for this number of seconds it
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
|
||||
# String literals representing core resources.
|
||||
PORT = 'port'
|
||||
PROCESS = 'process'
|
||||
ROUTER = 'router'
|
||||
ROUTER_GATEWAY = 'router_gateway'
|
||||
ROUTER_INTERFACE = 'router_interface'
|
||||
|
|
|
@ -246,3 +246,8 @@ class NeutronManager(object):
|
|||
service_plugins = cls.get_instance().service_plugins
|
||||
return dict((x, weakref.proxy(y))
|
||||
for x, y in six.iteritems(service_plugins))
|
||||
|
||||
@classmethod
|
||||
def get_unique_service_plugins(cls):
|
||||
service_plugins = cls.get_instance().service_plugins
|
||||
return tuple(weakref.proxy(x) for x in set(service_plugins.values()))
|
||||
|
|
|
@ -389,3 +389,12 @@ class NeutronPluginBaseV2(object):
|
|||
"""
|
||||
return (self.__class__.start_rpc_listeners !=
|
||||
NeutronPluginBaseV2.start_rpc_listeners)
|
||||
|
||||
def get_workers(self):
|
||||
"""Returns a collection NeutronWorker instances
|
||||
|
||||
If a plugin needs to define worker processes outside of API/RPC workers
|
||||
then it will override this and return a collection of NeutronWorker
|
||||
instances
|
||||
"""
|
||||
return ()
|
||||
|
|
|
@ -888,6 +888,14 @@ class MechanismDriver(object):
|
|||
"""
|
||||
pass
|
||||
|
||||
def get_workers(self):
|
||||
"""Get any NeutronWorker instances that should have their own process
|
||||
|
||||
Any driver that needs to run processes separate from the API or RPC
|
||||
workers, can return a sequence of NeutronWorker instances.
|
||||
"""
|
||||
return ()
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class ExtensionDriver(object):
|
||||
|
|
|
@ -749,6 +749,12 @@ class MechanismManager(stevedore.named.NamedExtensionManager):
|
|||
return False
|
||||
return True
|
||||
|
||||
def get_workers(self):
|
||||
workers = []
|
||||
for driver in self.ordered_mech_drivers:
|
||||
workers += driver.obj.get_workers()
|
||||
return workers
|
||||
|
||||
|
||||
class ExtensionManager(stevedore.named.NamedExtensionManager):
|
||||
"""Manage extension drivers using drivers."""
|
||||
|
|
|
@ -1556,3 +1556,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
if port:
|
||||
return port.id
|
||||
return device
|
||||
|
||||
def get_workers(self):
|
||||
return self.mechanism_manager.get_workers()
|
||||
|
|
|
@ -50,6 +50,10 @@ def main():
|
|||
else:
|
||||
rpc_thread = pool.spawn(neutron_rpc.wait)
|
||||
|
||||
plugin_workers = service.start_plugin_workers()
|
||||
for worker in plugin_workers:
|
||||
pool.spawn(worker.wait)
|
||||
|
||||
# api and rpc should die together. When one dies, kill the other.
|
||||
rpc_thread.link(lambda gt: api_thread.kill())
|
||||
api_thread.link(lambda gt: rpc_thread.kill())
|
||||
|
|
|
@ -32,6 +32,7 @@ from neutron import context
|
|||
from neutron.db import api as session
|
||||
from neutron.i18n import _LE, _LI
|
||||
from neutron import manager
|
||||
from neutron import worker
|
||||
from neutron import wsgi
|
||||
|
||||
|
||||
|
@ -44,7 +45,7 @@ service_opts = [
|
|||
'If not specified, the default is equal to the number '
|
||||
'of CPUs available for best performance.')),
|
||||
cfg.IntOpt('rpc_workers',
|
||||
default=0,
|
||||
default=1,
|
||||
help=_('Number of RPC worker processes for service')),
|
||||
cfg.IntOpt('periodic_fuzzy_delay',
|
||||
default=5,
|
||||
|
@ -108,13 +109,28 @@ def serve_wsgi(cls):
|
|||
return service
|
||||
|
||||
|
||||
class RpcWorker(common_service.ServiceBase):
|
||||
def start_plugin_workers():
|
||||
launchers = []
|
||||
# NOTE(twilson) get_service_plugins also returns the core plugin
|
||||
for plugin in manager.NeutronManager.get_unique_service_plugins():
|
||||
# TODO(twilson) Instead of defaulting here, come up with a good way to
|
||||
# share a common get_workers default between NeutronPluginBaseV2 and
|
||||
# ServicePluginBase
|
||||
for plugin_worker in getattr(plugin, 'get_workers', tuple)():
|
||||
launcher = common_service.ProcessLauncher(cfg.CONF)
|
||||
launcher.launch_service(plugin_worker)
|
||||
launchers.append(launcher)
|
||||
return launchers
|
||||
|
||||
|
||||
class RpcWorker(worker.NeutronWorker):
|
||||
"""Wraps a worker to be handled by ProcessLauncher"""
|
||||
def __init__(self, plugin):
|
||||
self._plugin = plugin
|
||||
self._servers = []
|
||||
|
||||
def start(self):
|
||||
super(RpcWorker, self).start()
|
||||
self._servers = self._plugin.start_rpc_listeners()
|
||||
|
||||
def wait(self):
|
||||
|
@ -149,6 +165,9 @@ class RpcWorker(common_service.ServiceBase):
|
|||
def serve_rpc():
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
|
||||
if cfg.CONF.rpc_workers < 1:
|
||||
cfg.CONF.set_override('rpc_workers', 1)
|
||||
|
||||
# If 0 < rpc_workers then start_rpc_listeners would be called in a
|
||||
# subprocess and we cannot simply catch the NotImplementedError. It is
|
||||
# simpler to check this up front by testing whether the plugin supports
|
||||
|
@ -164,22 +183,14 @@ def serve_rpc():
|
|||
try:
|
||||
rpc = RpcWorker(plugin)
|
||||
|
||||
if cfg.CONF.rpc_workers < 1:
|
||||
LOG.debug('starting rpc directly, workers=%s',
|
||||
cfg.CONF.rpc_workers)
|
||||
rpc.start()
|
||||
return rpc
|
||||
else:
|
||||
# dispose the whole pool before os.fork, otherwise there will
|
||||
# be shared DB connections in child processes which may cause
|
||||
# DB errors.
|
||||
LOG.debug('using launcher for rpc, workers=%s',
|
||||
cfg.CONF.rpc_workers)
|
||||
session.dispose()
|
||||
launcher = common_service.ProcessLauncher(cfg.CONF,
|
||||
wait_interval=1.0)
|
||||
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
|
||||
return launcher
|
||||
# dispose the whole pool before os.fork, otherwise there will
|
||||
# be shared DB connections in child processes which may cause
|
||||
# DB errors.
|
||||
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
|
||||
session.dispose()
|
||||
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
|
||||
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
|
||||
return launcher
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(_LE('Unrecoverable error: please check log for '
|
||||
|
@ -188,7 +199,7 @@ def serve_rpc():
|
|||
|
||||
def _get_api_workers():
|
||||
workers = cfg.CONF.api_workers
|
||||
if workers is None:
|
||||
if not workers:
|
||||
workers = processutils.get_worker_count()
|
||||
return workers
|
||||
|
||||
|
|
|
@ -31,9 +31,11 @@ from neutron.db import l3_hamode_db
|
|||
from neutron.db import l3_hascheduler_db
|
||||
from neutron.plugins.common import constants
|
||||
from neutron.quota import resource_registry
|
||||
from neutron.services import service_base
|
||||
|
||||
|
||||
class L3RouterPlugin(common_db_mixin.CommonDbMixin,
|
||||
class L3RouterPlugin(service_base.ServicePluginBase,
|
||||
common_db_mixin.CommonDbMixin,
|
||||
extraroute_db.ExtraRoute_db_mixin,
|
||||
l3_hamode_db.L3_HA_NAT_db_mixin,
|
||||
l3_gwmode_db.L3_NAT_db_mixin,
|
||||
|
|
|
@ -46,6 +46,10 @@ class ServicePluginBase(extensions.PluginInterface):
|
|||
"""Return string description of the plugin."""
|
||||
pass
|
||||
|
||||
def get_workers(self):
|
||||
"""Returns a collection of NeutronWorkers"""
|
||||
return ()
|
||||
|
||||
|
||||
def load_drivers(service_type, plugin):
|
||||
"""Loads drivers for specific service.
|
||||
|
|
|
@ -27,6 +27,7 @@ import psutil
|
|||
from neutron.agent.linux import utils
|
||||
from neutron import service
|
||||
from neutron.tests import base
|
||||
from neutron import worker
|
||||
from neutron import wsgi
|
||||
|
||||
|
||||
|
@ -245,3 +246,41 @@ class TestRPCServer(TestNeutronServer):
|
|||
def test_restart_rpc_on_sighup_multiple_workers(self):
|
||||
self._test_restart_service_on_sighup(service=self._serve_rpc,
|
||||
workers=2)
|
||||
|
||||
|
||||
class TestPluginWorker(TestNeutronServer):
|
||||
"""Ensure that a plugin returning Workers spawns workers"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestPluginWorker, self).setUp()
|
||||
self.setup_coreplugin(TARGET_PLUGIN)
|
||||
self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
|
||||
self.plugin = self._plugin_patcher.start()
|
||||
|
||||
def _start_plugin(self, workers=0):
|
||||
with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp:
|
||||
gp.return_value = self.plugin
|
||||
launchers = service.start_plugin_workers()
|
||||
for launcher in launchers:
|
||||
launcher.wait()
|
||||
|
||||
def test_start(self):
|
||||
class FakeWorker(worker.NeutronWorker):
|
||||
def start(self):
|
||||
pass
|
||||
|
||||
def wait(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
# Make both ABC happy and ensure 'self' is correct
|
||||
FakeWorker.reset = self._fake_reset
|
||||
workers = [FakeWorker()]
|
||||
self.plugin.return_value.get_workers.return_value = workers
|
||||
self._test_restart_service_on_sighup(service=self._start_plugin,
|
||||
workers=len(workers))
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_service import service
|
||||
|
||||
from neutron.callbacks import events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.callbacks import resources
|
||||
|
||||
|
||||
class NeutronWorker(service.ServiceBase):
|
||||
"""Partial implementation of the ServiceBase ABC
|
||||
|
||||
Subclasses will still need to add the other abstractmethods defined in
|
||||
service.ServiceBase. See oslo_service for more details.
|
||||
|
||||
If a plugin needs to handle synchornization with the Neutron database and
|
||||
do this only once instead of in every API worker, for instance, it would
|
||||
define a NeutronWorker class and the plugin would have get_workers return
|
||||
an array of NeutronWorker instnaces. For example:
|
||||
class MyPlugin(...):
|
||||
def get_workers(self):
|
||||
return [MyPluginWorker()]
|
||||
|
||||
class MyPluginWorker(NeutronWorker):
|
||||
def start(self):
|
||||
super(MyPluginWorker, self).start()
|
||||
do_sync()
|
||||
"""
|
||||
def start(self):
|
||||
registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)
|
|
@ -44,6 +44,7 @@ from neutron.common import exceptions as exception
|
|||
from neutron import context
|
||||
from neutron.db import api
|
||||
from neutron.i18n import _LE, _LI
|
||||
from neutron import worker
|
||||
|
||||
socket_opts = [
|
||||
cfg.IntOpt('backlog',
|
||||
|
@ -102,7 +103,7 @@ def encode_body(body):
|
|||
return body
|
||||
|
||||
|
||||
class WorkerService(common_service.ServiceBase):
|
||||
class WorkerService(worker.NeutronWorker):
|
||||
"""Wraps a worker to be handled by ProcessLauncher"""
|
||||
def __init__(self, service, application):
|
||||
self._service = service
|
||||
|
@ -110,6 +111,7 @@ class WorkerService(common_service.ServiceBase):
|
|||
self._server = None
|
||||
|
||||
def start(self):
|
||||
super(WorkerService, self).start()
|
||||
# When api worker is stopped it kills the eventlet wsgi server which
|
||||
# internally closes the wsgi server socket object. This server socket
|
||||
# object becomes not usable which leads to "Bad file descriptor"
|
||||
|
|
Loading…
Reference in New Issue