retain existing listeners on refresh

there is an overhead to killing and recreating listeners. this patch
keeps the pipeline listeners we already have after rebalancing
rather than blindly killing and recreating all of them.

Change-Id: Ic7c23fd7649ca0b828cc4266582163bd326c2b80
Closes-Bug: #1496459
This commit is contained in:
gordon chung 2015-09-18 14:39:25 -04:00
parent 342488506f
commit 338e7a655d
2 changed files with 59 additions and 25 deletions

View File

@ -12,6 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from oslo_config import cfg
from oslo_context import context
@ -233,14 +234,9 @@ class NotificationService(service_base.BaseService):
self.listeners.append(listener)
def _refresh_agent(self, event):
self._refresh_listeners()
self._configure_pipeline_listeners(True)
def _refresh_listeners(self):
utils.kill_listeners(self.pipeline_listeners)
self._configure_pipeline_listeners()
def _configure_pipeline_listeners(self):
self.pipeline_listeners = []
def _configure_pipeline_listeners(self, reuse_listeners=False):
ev_pipes = []
if cfg.CONF.notification.store_events:
ev_pipes = self.event_pipeline_manager.pipelines
@ -249,21 +245,39 @@ class NotificationService(service_base.BaseService):
partitioned = self.partition_coordinator.extract_my_subset(
self.group_id,
range(cfg.CONF.notification.pipeline_processing_queues))
for pipe_set in partitioned:
for pipe in pipelines:
LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
pipe_set)
pipe_endpoint = (pipeline.EventPipelineEndpoint
if isinstance(pipe, pipeline.EventPipeline)
else pipeline.SamplePipelineEndpoint)
listener = messaging.get_notification_listener(
transport,
[oslo_messaging.Target(
topic='%s-%s-%s' % (self.NOTIFICATION_IPC,
pipe.name, pipe_set))],
[pipe_endpoint(self.ctxt, pipe)])
listener.start()
self.pipeline_listeners.append(listener)
queue_set = {}
for pipe_set, pipe in itertools.product(partitioned, pipelines):
queue_set['%s-%s-%s' %
(self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe
if reuse_listeners:
topics = queue_set.keys()
kill_list = []
for listener in self.pipeline_listeners:
if listener.dispatcher.targets[0].topic in topics:
queue_set.pop(listener.dispatcher.targets[0].topic)
else:
kill_list.append(listener)
for listener in kill_list:
utils.kill_listeners([listener])
self.pipeline_listeners.remove(listener)
else:
utils.kill_listeners(self.pipeline_listeners)
self.pipeline_listeners = []
for topic, pipe in queue_set.items():
LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
pipe_set)
pipe_endpoint = (pipeline.EventPipelineEndpoint
if isinstance(pipe, pipeline.EventPipeline)
else pipeline.SamplePipelineEndpoint)
listener = messaging.get_notification_listener(
transport,
[oslo_messaging.Target(topic=topic)],
[pipe_endpoint(self.ctxt, pipe)])
listener.start()
self.pipeline_listeners.append(listener)
def stop(self):
if self.partition_coordinator:
@ -290,4 +304,4 @@ class NotificationService(service_base.BaseService):
# re-start the pipeline listeners if workload partitioning
# is enabled.
if cfg.CONF.notification.workload_partitioning:
self._refresh_listeners()
self._configure_pipeline_listeners()

View File

@ -455,9 +455,29 @@ class TestRealNotificationHA(BaseRealNotification):
def test_reset_listeners_on_refresh(self):
self.srv.start()
listeners = self.srv.pipeline_listeners
self.assertEqual(20, len(listeners))
self.srv._configure_pipeline_listeners()
self.assertEqual(20, len(self.srv.pipeline_listeners))
self.srv._refresh_listeners()
self.assertEqual(20, len(self.srv.pipeline_listeners))
for listener in listeners:
self.assertNotIn(listeners, set(self.srv.pipeline_listeners))
self.srv.stop()
def test_retain_common_listeners_on_refresh(self):
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'.extract_my_subset', return_value=[1, 2]):
self.srv.start()
self.assertEqual(4, len(self.srv.pipeline_listeners))
listeners = [listener for listener in self.srv.pipeline_listeners]
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'.extract_my_subset', return_value=[1, 3]):
self.srv._refresh_agent(None)
self.assertEqual(4, len(self.srv.pipeline_listeners))
for listener in listeners:
if listener.dispatcher.targets[0].topic.endswith('1'):
self.assertIn(listener, set(self.srv.pipeline_listeners))
else:
self.assertNotIn(listener, set(self.srv.pipeline_listeners))
self.srv.stop()
@mock.patch('oslo_messaging.Notifier.sample')