diff --git a/etc/neutron.conf b/etc/neutron.conf index bac35bd2dc1..f5c15fadf20 100644 --- a/etc/neutron.conf +++ b/etc/neutron.conf @@ -311,10 +311,14 @@ # the default value is equal to the number of CPUs available. # api_workers = -# Number of separate RPC worker processes to spawn. If not specified or < 1, -# a single RPC worker process is spawned by the parent process. +# Number of separate RPC worker processes to spawn. # rpc_workers = 1 +# Number of separate RPC worker processes for processing state report queue. +# Increasing this parameter makes sense when neutron-server handles +# hundreds of agents. +# rpc_state_report_workers = 1 + # Timeout for client connections socket operations. If an # incoming connection is idle for this number of seconds it # will be closed. A value of '0' means wait forever. (integer diff --git a/neutron/neutron_plugin_base_v2.py b/neutron/neutron_plugin_base_v2.py index 79a85c4c83e..1a557519c08 100644 --- a/neutron/neutron_plugin_base_v2.py +++ b/neutron/neutron_plugin_base_v2.py @@ -375,6 +375,16 @@ class NeutronPluginBaseV2(object): """ raise NotImplementedError() + def start_rpc_state_reports_listener(self): + """Start the RPC listeners consuming state reports queue. + + This optional method creates rpc consumer for REPORTS queue only. + + .. note:: this method is optional, as it was not part of the originally + defined plugin API. + """ + raise NotImplementedError() + def rpc_workers_supported(self): """Return whether the plugin supports multiple RPC workers. @@ -390,6 +400,15 @@ class NeutronPluginBaseV2(object): return (self.__class__.start_rpc_listeners != NeutronPluginBaseV2.start_rpc_listeners) + def rpc_state_report_workers_supported(self): + """Return whether the plugin supports state report RPC workers. + + .. note:: this method is optional, as it was not part of the originally + defined plugin API. + """ + return (self.__class__.start_rpc_state_reports_listener != + NeutronPluginBaseV2.start_rpc_state_reports_listener) + def get_workers(self): """Returns a collection NeutronWorker instances diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index ae8c3f181f8..e7e0b9dc201 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -190,11 +190,19 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.topic = topics.PLUGIN self.conn = n_rpc.create_connection(new=True) self.conn.create_consumer(self.topic, self.endpoints, fanout=False) + # process state reports despite dedicated rpc workers self.conn.create_consumer(topics.REPORTS, [agents_db.AgentExtRpcCallback()], fanout=False) return self.conn.consume_in_threads() + def start_rpc_state_reports_listener(self): + self.conn_reports = n_rpc.create_connection(new=True) + self.conn_reports.create_consumer(topics.REPORTS, + [agents_db.AgentExtRpcCallback()], + fanout=False) + return self.conn_reports.consume_in_threads() + def _filter_nets_provider(self, context, networks, filters): return [network for network in networks diff --git a/neutron/service.py b/neutron/service.py index b211306a331..8f03e1570c2 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -47,6 +47,10 @@ service_opts = [ cfg.IntOpt('rpc_workers', default=1, help=_('Number of RPC worker processes for service')), + cfg.IntOpt('rpc_state_report_workers', + default=1, + help=_('Number of RPC worker processes dedicated to state ' + 'reports queue')), cfg.IntOpt('periodic_fuzzy_delay', default=5, help=_('Range of seconds to randomly delay when starting the ' @@ -167,6 +171,10 @@ class RpcWorker(worker.NeutronWorker): config.reset_service() +class RpcReportsWorker(RpcWorker): + start_listeners_method = 'start_rpc_state_reports_listener' + + def serve_rpc(): plugin = manager.NeutronManager.get_plugin() service_plugins = ( @@ -190,7 +198,6 @@ def serve_rpc(): try: # passing service plugins only, because core plugin is among them rpc = RpcWorker(service_plugins) - # dispose the whole pool before os.fork, otherwise there will # be shared DB connections in child processes which may cause # DB errors. @@ -198,6 +205,14 @@ def serve_rpc(): session.dispose() launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0) launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers) + if (cfg.CONF.rpc_state_report_workers > 0 and + plugin.rpc_state_report_workers_supported()): + rpc_state_rep = RpcReportsWorker([plugin]) + LOG.debug('using launcher for state reports rpc, workers=%s', + cfg.CONF.rpc_state_report_workers) + launcher.launch_service( + rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers) + return launcher except Exception: with excutils.save_and_reraise_exception(): diff --git a/neutron/tests/functional/test_server.py b/neutron/tests/functional/test_server.py index e04b31b1fad..c13c4a266cf 100644 --- a/neutron/tests/functional/test_server.py +++ b/neutron/tests/functional/test_server.py @@ -239,6 +239,8 @@ class TestRPCServer(TestNeutronServer): get_plugin.return_value = self.plugin CONF.set_override("rpc_workers", workers) + # not interested in state report workers specifically + CONF.set_override("rpc_state_report_workers", 0) launcher = service.serve_rpc() launcher.wait()