diff --git a/etc/neutron.conf b/etc/neutron.conf index 96d574c5fd8..591370f7992 100644 --- a/etc/neutron.conf +++ b/etc/neutron.conf @@ -306,19 +306,13 @@ # ========== end of items for VLAN trunking networks ========== # =========== WSGI parameters related to the API server ============== -# Number of separate worker processes to spawn. A value of 0 runs the -# worker thread in the current process. Greater than 0 launches that number of -# child processes as workers. The parent process manages them. If not -# specified, the default value is equal to the number of CPUs available to -# achieve best performance. +# Number of separate API worker processes to spawn. If not specified or < 1, +# the default value is equal to the number of CPUs available. # api_workers = -# Number of separate RPC worker processes to spawn. The default, 0, runs the -# worker thread in the current process. Greater than 0 launches that number of -# child processes as RPC workers. The parent process manages them. -# This feature is experimental until issues are addressed and testing has been -# enabled for various plugins for compatibility. -# rpc_workers = 0 +# Number of separate RPC worker processes to spawn. If not specified or < 1, +# a single RPC worker process is spawned by the parent process. +# rpc_workers = 1 # Timeout for client connections socket operations. If an # incoming connection is idle for this number of seconds it diff --git a/neutron/callbacks/resources.py b/neutron/callbacks/resources.py index 029df4305ab..a0fd4c09eae 100644 --- a/neutron/callbacks/resources.py +++ b/neutron/callbacks/resources.py @@ -12,6 +12,7 @@ # String literals representing core resources. PORT = 'port' +PROCESS = 'process' ROUTER = 'router' ROUTER_GATEWAY = 'router_gateway' ROUTER_INTERFACE = 'router_interface' diff --git a/neutron/manager.py b/neutron/manager.py index 0e3a16cb2ed..7a174507fdd 100644 --- a/neutron/manager.py +++ b/neutron/manager.py @@ -246,3 +246,8 @@ class NeutronManager(object): service_plugins = cls.get_instance().service_plugins return dict((x, weakref.proxy(y)) for x, y in six.iteritems(service_plugins)) + + @classmethod + def get_unique_service_plugins(cls): + service_plugins = cls.get_instance().service_plugins + return tuple(weakref.proxy(x) for x in set(service_plugins.values())) diff --git a/neutron/neutron_plugin_base_v2.py b/neutron/neutron_plugin_base_v2.py index 374dd19e7ef..79a85c4c83e 100644 --- a/neutron/neutron_plugin_base_v2.py +++ b/neutron/neutron_plugin_base_v2.py @@ -389,3 +389,12 @@ class NeutronPluginBaseV2(object): """ return (self.__class__.start_rpc_listeners != NeutronPluginBaseV2.start_rpc_listeners) + + def get_workers(self): + """Returns a collection NeutronWorker instances + + If a plugin needs to define worker processes outside of API/RPC workers + then it will override this and return a collection of NeutronWorker + instances + """ + return () diff --git a/neutron/plugins/ml2/driver_api.py b/neutron/plugins/ml2/driver_api.py index c54ab1ba35a..db25c8d5f12 100644 --- a/neutron/plugins/ml2/driver_api.py +++ b/neutron/plugins/ml2/driver_api.py @@ -888,6 +888,14 @@ class MechanismDriver(object): """ pass + def get_workers(self): + """Get any NeutronWorker instances that should have their own process + + Any driver that needs to run processes separate from the API or RPC + workers, can return a sequence of NeutronWorker instances. + """ + return () + @six.add_metaclass(abc.ABCMeta) class ExtensionDriver(object): diff --git a/neutron/plugins/ml2/managers.py b/neutron/plugins/ml2/managers.py index 1861935496b..8f8920d5447 100644 --- a/neutron/plugins/ml2/managers.py +++ b/neutron/plugins/ml2/managers.py @@ -749,6 +749,12 @@ class MechanismManager(stevedore.named.NamedExtensionManager): return False return True + def get_workers(self): + workers = [] + for driver in self.ordered_mech_drivers: + workers += driver.obj.get_workers() + return workers + class ExtensionManager(stevedore.named.NamedExtensionManager): """Manage extension drivers using drivers.""" diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 9ba5450182f..15deaf5b5f0 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -1556,3 +1556,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, if port: return port.id return device + + def get_workers(self): + return self.mechanism_manager.get_workers() diff --git a/neutron/server/__init__.py b/neutron/server/__init__.py index c6c72e28422..688b7542142 100644 --- a/neutron/server/__init__.py +++ b/neutron/server/__init__.py @@ -50,6 +50,10 @@ def main(): else: rpc_thread = pool.spawn(neutron_rpc.wait) + plugin_workers = service.start_plugin_workers() + for worker in plugin_workers: + pool.spawn(worker.wait) + # api and rpc should die together. When one dies, kill the other. rpc_thread.link(lambda gt: api_thread.kill()) api_thread.link(lambda gt: rpc_thread.kill()) diff --git a/neutron/service.py b/neutron/service.py index 6b1eee248b1..856551ffb96 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -32,6 +32,7 @@ from neutron import context from neutron.db import api as session from neutron.i18n import _LE, _LI from neutron import manager +from neutron import worker from neutron import wsgi @@ -44,7 +45,7 @@ service_opts = [ 'If not specified, the default is equal to the number ' 'of CPUs available for best performance.')), cfg.IntOpt('rpc_workers', - default=0, + default=1, help=_('Number of RPC worker processes for service')), cfg.IntOpt('periodic_fuzzy_delay', default=5, @@ -108,13 +109,28 @@ def serve_wsgi(cls): return service -class RpcWorker(common_service.ServiceBase): +def start_plugin_workers(): + launchers = [] + # NOTE(twilson) get_service_plugins also returns the core plugin + for plugin in manager.NeutronManager.get_unique_service_plugins(): + # TODO(twilson) Instead of defaulting here, come up with a good way to + # share a common get_workers default between NeutronPluginBaseV2 and + # ServicePluginBase + for plugin_worker in getattr(plugin, 'get_workers', tuple)(): + launcher = common_service.ProcessLauncher(cfg.CONF) + launcher.launch_service(plugin_worker) + launchers.append(launcher) + return launchers + + +class RpcWorker(worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" def __init__(self, plugin): self._plugin = plugin self._servers = [] def start(self): + super(RpcWorker, self).start() self._servers = self._plugin.start_rpc_listeners() def wait(self): @@ -149,6 +165,9 @@ class RpcWorker(common_service.ServiceBase): def serve_rpc(): plugin = manager.NeutronManager.get_plugin() + if cfg.CONF.rpc_workers < 1: + cfg.CONF.set_override('rpc_workers', 1) + # If 0 < rpc_workers then start_rpc_listeners would be called in a # subprocess and we cannot simply catch the NotImplementedError. It is # simpler to check this up front by testing whether the plugin supports @@ -164,22 +183,14 @@ def serve_rpc(): try: rpc = RpcWorker(plugin) - if cfg.CONF.rpc_workers < 1: - LOG.debug('starting rpc directly, workers=%s', - cfg.CONF.rpc_workers) - rpc.start() - return rpc - else: - # dispose the whole pool before os.fork, otherwise there will - # be shared DB connections in child processes which may cause - # DB errors. - LOG.debug('using launcher for rpc, workers=%s', - cfg.CONF.rpc_workers) - session.dispose() - launcher = common_service.ProcessLauncher(cfg.CONF, - wait_interval=1.0) - launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers) - return launcher + # dispose the whole pool before os.fork, otherwise there will + # be shared DB connections in child processes which may cause + # DB errors. + LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers) + session.dispose() + launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0) + launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers) + return launcher except Exception: with excutils.save_and_reraise_exception(): LOG.exception(_LE('Unrecoverable error: please check log for ' @@ -188,7 +199,7 @@ def serve_rpc(): def _get_api_workers(): workers = cfg.CONF.api_workers - if workers is None: + if not workers: workers = processutils.get_worker_count() return workers diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index 4c4e96ae5bf..85daa4ea1d3 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -31,9 +31,11 @@ from neutron.db import l3_hamode_db from neutron.db import l3_hascheduler_db from neutron.plugins.common import constants from neutron.quota import resource_registry +from neutron.services import service_base -class L3RouterPlugin(common_db_mixin.CommonDbMixin, +class L3RouterPlugin(service_base.ServicePluginBase, + common_db_mixin.CommonDbMixin, extraroute_db.ExtraRoute_db_mixin, l3_hamode_db.L3_HA_NAT_db_mixin, l3_gwmode_db.L3_NAT_db_mixin, diff --git a/neutron/services/service_base.py b/neutron/services/service_base.py index acae7a0f50b..3ae9f6329f4 100644 --- a/neutron/services/service_base.py +++ b/neutron/services/service_base.py @@ -46,6 +46,10 @@ class ServicePluginBase(extensions.PluginInterface): """Return string description of the plugin.""" pass + def get_workers(self): + """Returns a collection of NeutronWorkers""" + return () + def load_drivers(service_type, plugin): """Loads drivers for specific service. diff --git a/neutron/tests/functional/test_server.py b/neutron/tests/functional/test_server.py index 8f81f684956..48891bb7b99 100644 --- a/neutron/tests/functional/test_server.py +++ b/neutron/tests/functional/test_server.py @@ -27,6 +27,7 @@ import psutil from neutron.agent.linux import utils from neutron import service from neutron.tests import base +from neutron import worker from neutron import wsgi @@ -245,3 +246,41 @@ class TestRPCServer(TestNeutronServer): def test_restart_rpc_on_sighup_multiple_workers(self): self._test_restart_service_on_sighup(service=self._serve_rpc, workers=2) + + +class TestPluginWorker(TestNeutronServer): + """Ensure that a plugin returning Workers spawns workers""" + + def setUp(self): + super(TestPluginWorker, self).setUp() + self.setup_coreplugin(TARGET_PLUGIN) + self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True) + self.plugin = self._plugin_patcher.start() + + def _start_plugin(self, workers=0): + with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp: + gp.return_value = self.plugin + launchers = service.start_plugin_workers() + for launcher in launchers: + launcher.wait() + + def test_start(self): + class FakeWorker(worker.NeutronWorker): + def start(self): + pass + + def wait(self): + pass + + def stop(self): + pass + + def reset(self): + pass + + # Make both ABC happy and ensure 'self' is correct + FakeWorker.reset = self._fake_reset + workers = [FakeWorker()] + self.plugin.return_value.get_workers.return_value = workers + self._test_restart_service_on_sighup(service=self._start_plugin, + workers=len(workers)) diff --git a/neutron/worker.py b/neutron/worker.py new file mode 100644 index 00000000000..c49d742f6a7 --- /dev/null +++ b/neutron/worker.py @@ -0,0 +1,40 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_service import service + +from neutron.callbacks import events +from neutron.callbacks import registry +from neutron.callbacks import resources + + +class NeutronWorker(service.ServiceBase): + """Partial implementation of the ServiceBase ABC + + Subclasses will still need to add the other abstractmethods defined in + service.ServiceBase. See oslo_service for more details. + + If a plugin needs to handle synchornization with the Neutron database and + do this only once instead of in every API worker, for instance, it would + define a NeutronWorker class and the plugin would have get_workers return + an array of NeutronWorker instnaces. For example: + class MyPlugin(...): + def get_workers(self): + return [MyPluginWorker()] + + class MyPluginWorker(NeutronWorker): + def start(self): + super(MyPluginWorker, self).start() + do_sync() + """ + def start(self): + registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start) diff --git a/neutron/wsgi.py b/neutron/wsgi.py index cb302d306dc..dacbadf8eaf 100644 --- a/neutron/wsgi.py +++ b/neutron/wsgi.py @@ -44,6 +44,7 @@ from neutron.common import exceptions as exception from neutron import context from neutron.db import api from neutron.i18n import _LE, _LI +from neutron import worker socket_opts = [ cfg.IntOpt('backlog', @@ -102,7 +103,7 @@ def encode_body(body): return body -class WorkerService(common_service.ServiceBase): +class WorkerService(worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" def __init__(self, service, application): self._service = service @@ -110,6 +111,7 @@ class WorkerService(common_service.ServiceBase): self._server = None def start(self): + super(WorkerService, self).start() # When api worker is stopped it kills the eventlet wsgi server which # internally closes the wsgi server socket object. This server socket # object becomes not usable which leads to "Bad file descriptor"