diff --git a/neutron/service.py b/neutron/service.py index 856551ffb96..f70cdee6cd9 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -125,13 +125,17 @@ def start_plugin_workers(): class RpcWorker(worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" - def __init__(self, plugin): - self._plugin = plugin + start_listeners_method = 'start_rpc_listeners' + + def __init__(self, plugins): + self._plugins = plugins self._servers = [] def start(self): - super(RpcWorker, self).start() - self._servers = self._plugin.start_rpc_listeners() + for plugin in self._plugins: + if hasattr(plugin, self.start_listeners_method): + servers = getattr(plugin, self.start_listeners_method)() + self._servers.extend(servers) def wait(self): try: @@ -164,6 +168,8 @@ class RpcWorker(worker.NeutronWorker): def serve_rpc(): plugin = manager.NeutronManager.get_plugin() + service_plugins = ( + manager.NeutronManager.get_service_plugins().values()) if cfg.CONF.rpc_workers < 1: cfg.CONF.set_override('rpc_workers', 1) @@ -181,7 +187,8 @@ def serve_rpc(): raise NotImplementedError() try: - rpc = RpcWorker(plugin) + # passing service plugins only, because core plugin is among them + rpc = RpcWorker(service_plugins) # dispose the whole pool before os.fork, otherwise there will # be shared DB connections in child processes which may cause diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index 85daa4ea1d3..cca550d2b9c 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -58,7 +58,6 @@ class L3RouterPlugin(service_base.ServicePluginBase, @resource_registry.tracked_resources(router=l3_db.Router, floatingip=l3_db.FloatingIP) def __init__(self): - self.setup_rpc() self.router_scheduler = importutils.import_object( cfg.CONF.router_scheduler_driver) self.start_periodic_l3_agent_status_check() @@ -66,9 +65,10 @@ class L3RouterPlugin(service_base.ServicePluginBase, if 'dvr' in self.supported_extension_aliases: l3_dvrscheduler_db.subscribe() l3_db.subscribe() + self.start_rpc_listeners() @log_helpers.log_method_call - def setup_rpc(self): + def start_rpc_listeners(self): # RPC support self.topic = topics.L3PLUGIN self.conn = n_rpc.create_connection(new=True) @@ -77,7 +77,7 @@ class L3RouterPlugin(service_base.ServicePluginBase, self.endpoints = [l3_rpc.L3RpcCallback()] self.conn.create_consumer(self.topic, self.endpoints, fanout=False) - self.conn.consume_in_threads() + return self.conn.consume_in_threads() def get_plugin_type(self): return constants.L3_ROUTER_NAT diff --git a/neutron/services/metering/metering_plugin.py b/neutron/services/metering/metering_plugin.py index 57789b78475..6cbbabd47af 100644 --- a/neutron/services/metering/metering_plugin.py +++ b/neutron/services/metering/metering_plugin.py @@ -27,14 +27,15 @@ class MeteringPlugin(metering_db.MeteringDbMixin): def __init__(self): super(MeteringPlugin, self).__init__() - self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)] + self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI() + self.start_rpc_listeners() + def start_rpc_listeners(self): + self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)] self.conn = n_rpc.create_connection(new=True) self.conn.create_consumer( topics.METERING_PLUGIN, self.endpoints, fanout=False) - self.conn.consume_in_threads() - - self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI() + return self.conn.consume_in_threads() def create_metering_label(self, context, metering_label): label = super(MeteringPlugin, self).create_metering_label(