Spawn dedicated rpc workers for state reports queue

By default spawn one additional rpc worker to process
state report queue.
State report queue will also be processed by regular
rpc workers, but in case these workers are busy with
processing heavy requests, state reports queue will
automatically be consumed by dedicated rpc workers.

This change applies to ML2 plugin only.
Other plugins should implement start_rpc_state_reports_listener
to enable additional rpc workers.

Change-Id: I5f8df6a478f7c82382049274b34b07109eeafbdb
Closes-Bug: #1505217
This commit is contained in:
Eugene Nikanorov 2015-10-12 16:21:02 +04:00
parent 948316190d
commit 678b431ba4
5 changed files with 51 additions and 3 deletions

View File

@ -311,10 +311,14 @@
# the default value is equal to the number of CPUs available.
# api_workers = <number of CPUs>
# Number of separate RPC worker processes to spawn. If not specified or < 1,
# a single RPC worker process is spawned by the parent process.
# Number of separate RPC worker processes to spawn.
# rpc_workers = 1
# Number of separate RPC worker processes for processing state report queue.
# Increasing this parameter makes sense when neutron-server handles
# hundreds of agents.
# rpc_state_report_workers = 1
# Timeout for client connections socket operations. If an
# incoming connection is idle for this number of seconds it
# will be closed. A value of '0' means wait forever. (integer

View File

@ -375,6 +375,16 @@ class NeutronPluginBaseV2(object):
"""
raise NotImplementedError()
def start_rpc_state_reports_listener(self):
"""Start the RPC listeners consuming state reports queue.
This optional method creates rpc consumer for REPORTS queue only.
.. note:: this method is optional, as it was not part of the originally
defined plugin API.
"""
raise NotImplementedError()
def rpc_workers_supported(self):
"""Return whether the plugin supports multiple RPC workers.
@ -390,6 +400,15 @@ class NeutronPluginBaseV2(object):
return (self.__class__.start_rpc_listeners !=
NeutronPluginBaseV2.start_rpc_listeners)
def rpc_state_report_workers_supported(self):
"""Return whether the plugin supports state report RPC workers.
.. note:: this method is optional, as it was not part of the originally
defined plugin API.
"""
return (self.__class__.start_rpc_state_reports_listener !=
NeutronPluginBaseV2.start_rpc_state_reports_listener)
def get_workers(self):
"""Returns a collection NeutronWorker instances

View File

@ -190,11 +190,19 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
# process state reports despite dedicated rpc workers
self.conn.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
return self.conn.consume_in_threads()
def start_rpc_state_reports_listener(self):
self.conn_reports = n_rpc.create_connection(new=True)
self.conn_reports.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
return self.conn_reports.consume_in_threads()
def _filter_nets_provider(self, context, networks, filters):
return [network
for network in networks

View File

@ -47,6 +47,10 @@ service_opts = [
cfg.IntOpt('rpc_workers',
default=1,
help=_('Number of RPC worker processes for service')),
cfg.IntOpt('rpc_state_report_workers',
default=1,
help=_('Number of RPC worker processes dedicated to state '
'reports queue')),
cfg.IntOpt('periodic_fuzzy_delay',
default=5,
help=_('Range of seconds to randomly delay when starting the '
@ -167,6 +171,10 @@ class RpcWorker(worker.NeutronWorker):
config.reset_service()
class RpcReportsWorker(RpcWorker):
start_listeners_method = 'start_rpc_state_reports_listener'
def serve_rpc():
plugin = manager.NeutronManager.get_plugin()
service_plugins = (
@ -190,7 +198,6 @@ def serve_rpc():
try:
# passing service plugins only, because core plugin is among them
rpc = RpcWorker(service_plugins)
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
@ -198,6 +205,14 @@ def serve_rpc():
session.dispose()
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
if (cfg.CONF.rpc_state_report_workers > 0 and
plugin.rpc_state_report_workers_supported()):
rpc_state_rep = RpcReportsWorker([plugin])
LOG.debug('using launcher for state reports rpc, workers=%s',
cfg.CONF.rpc_state_report_workers)
launcher.launch_service(
rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
return launcher
except Exception:
with excutils.save_and_reraise_exception():

View File

@ -239,6 +239,8 @@ class TestRPCServer(TestNeutronServer):
get_plugin.return_value = self.plugin
CONF.set_override("rpc_workers", workers)
# not interested in state report workers specifically
CONF.set_override("rpc_state_report_workers", 0)
launcher = service.serve_rpc()
launcher.wait()