From 6292e0e42b8d4602e08a876a53141c3ddbdcbb58 Mon Sep 17 00:00:00 2001 From: Kirill Bespalov Date: Wed, 20 Apr 2016 17:23:02 +0300 Subject: [PATCH] Fix notification listeners usage Closes-Bug: #1570468 Create the single notification pipeline listener and to aggregate targets and endpoints in it. In this implementation the incoming notification msg will be dispatched in a relevant endpoint by its own filter rule based on publisher id of a notifier, which sent this message. Each pipeline notifier has a conforming publisher id based on a pipeline name in which it to produce messages. Change-Id: If8c1c6838fd291ef8dbdd4d27c78fe5a36522178 (cherry picked from commit 726b2d4d67ada3df07f36ecfd81b0cf72881e159) --- ceilometer/notification.py | 79 +++++++++---------- ceilometer/pipeline.py | 2 + .../tests/functional/test_notification.py | 58 ++++++++++---- 3 files changed, 82 insertions(+), 57 deletions(-) diff --git a/ceilometer/notification.py b/ceilometer/notification.py index d9e4cafc20..b14317db16 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -110,7 +110,7 @@ class NotificationService(service_base.BaseService): notifiers.append(oslo_messaging.Notifier( transport, driver=cfg.CONF.publisher_notifier.telemetry_driver, - publisher_id='ceilometer.notification', + publisher_id=pipe.name, topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, x))) return notifiers @@ -146,7 +146,12 @@ class NotificationService(service_base.BaseService): super(NotificationService, self).start() self.partition_coordinator = None self.coord_lock = threading.Lock() - self.listeners, self.pipeline_listeners = [], [] + + self.listeners = [] + + # NOTE(kbespalov): for the pipeline queues used a single amqp host + # hence only one listener is required + self.pipeline_listener = None self.pipeline_manager = pipeline.setup_pipeline() @@ -174,7 +179,6 @@ class NotificationService(service_base.BaseService): self.event_pipe_manager = self._get_event_pipeline_manager( self.transport) - self.listeners, self.pipeline_listeners = [], [] self._configure_main_queue_listeners(self.pipe_manager, self.event_pipe_manager) @@ -188,7 +192,7 @@ class NotificationService(service_base.BaseService): self.tg.add_timer(cfg.CONF.coordination.check_watchers, self.partition_coordinator.run_watchers) # configure pipelines after all coordination is configured. - self._configure_pipeline_listeners() + self._configure_pipeline_listener() if not cfg.CONF.notification.disable_non_metric_meters: LOG.warning(_LW('Non-metric meters may be collected. It is highly ' @@ -243,9 +247,9 @@ class NotificationService(service_base.BaseService): self.listeners.append(listener) def _refresh_agent(self, event): - self._configure_pipeline_listeners(True) + self._configure_pipeline_listener() - def _configure_pipeline_listeners(self, reuse_listeners=False): + def _configure_pipeline_listener(self): with self.coord_lock: ev_pipes = [] if cfg.CONF.notification.store_events: @@ -256,40 +260,35 @@ class NotificationService(service_base.BaseService): self.group_id, range(cfg.CONF.notification.pipeline_processing_queues)) - queue_set = {} + endpoints = [] + targets = [] + + for pipe in pipelines: + if isinstance(pipe, pipeline.EventPipeline): + endpoints.append(pipeline.EventPipelineEndpoint(self.ctxt, + pipe)) + else: + endpoints.append(pipeline.SamplePipelineEndpoint(self.ctxt, + pipe)) + for pipe_set, pipe in itertools.product(partitioned, pipelines): - queue_set['%s-%s-%s' % - (self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe + LOG.debug('Pipeline endpoint: %s from set: %s', + pipe.name, pipe_set) + topic = '%s-%s-%s' % (self.NOTIFICATION_IPC, + pipe.name, pipe_set) + targets.append(oslo_messaging.Target(topic=topic)) - if reuse_listeners: - topics = queue_set.keys() - kill_list = [] - for listener in self.pipeline_listeners: - if listener.dispatcher.targets[0].topic in topics: - queue_set.pop(listener.dispatcher.targets[0].topic) - else: - kill_list.append(listener) - for listener in kill_list: - utils.kill_listeners([listener]) - self.pipeline_listeners.remove(listener) - else: - utils.kill_listeners(self.pipeline_listeners) - self.pipeline_listeners = [] + if self.pipeline_listener: + self.pipeline_listener.stop() + self.pipeline_listener.wait() - for topic, pipe in queue_set.items(): - LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name, - pipe_set) - pipe_endpoint = (pipeline.EventPipelineEndpoint - if isinstance(pipe, pipeline.EventPipeline) - else pipeline.SamplePipelineEndpoint) - listener = messaging.get_batch_notification_listener( - transport, - [oslo_messaging.Target(topic=topic)], - [pipe_endpoint(self.ctxt, pipe)], - batch_size=cfg.CONF.notification.batch_size, - batch_timeout=cfg.CONF.notification.batch_timeout) - listener.start() - self.pipeline_listeners.append(listener) + self.pipeline_listener = messaging.get_batch_notification_listener( + transport, + targets, + endpoints, + batch_size=cfg.CONF.notification.batch_size, + batch_timeout=cfg.CONF.notification.batch_timeout) + self.pipeline_listener.start() def stop(self): if getattr(self, 'partition_coordinator', None): @@ -297,8 +296,8 @@ class NotificationService(service_base.BaseService): listeners = [] if getattr(self, 'listeners', None): listeners.extend(self.listeners) - if getattr(self, 'pipeline_listeners', None): - listeners.extend(self.pipeline_listeners) + if getattr(self, 'pipeline_listener', None): + listeners.append(self.pipeline_listener) utils.kill_listeners(listeners) super(NotificationService, self).stop() @@ -321,4 +320,4 @@ class NotificationService(service_base.BaseService): # re-start the pipeline listeners if workload partitioning # is enabled. if cfg.CONF.notification.workload_partitioning: - self._configure_pipeline_listeners() + self._configure_pipeline_listener() diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index af20f3ab20..ebb091b74b 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -81,6 +81,8 @@ class PipelineException(Exception): class PipelineEndpoint(object): def __init__(self, context, pipeline): + self.filter_rule = oslo_messaging.NotificationFilter( + publisher_id=pipeline.name) self.publish_context = PublishContext(context, [pipeline]) @abc.abstractmethod diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index 32259f56b7..fc688a1dcd 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -257,7 +257,6 @@ class BaseRealNotification(tests_base.BaseTestCase): if (len(self.publisher.samples) >= self.expected_samples and len(self.publisher.events) >= self.expected_events): break - self.assertNotEqual(self.srv.listeners, self.srv.pipeline_listeners) self.srv.stop() resources = list(set(s.resource_id for s in self.publisher.samples)) @@ -387,31 +386,56 @@ class TestRealNotificationHA(BaseRealNotification): fake_publisher_cls.return_value = self.publisher self._check_notification_service() - def test_reset_listeners_on_refresh(self): + def test_reset_listener_on_refresh(self): self.srv.start() - listeners = self.srv.pipeline_listeners - self.assertEqual(20, len(listeners)) - self.srv._configure_pipeline_listeners() - self.assertEqual(20, len(self.srv.pipeline_listeners)) - for listener in listeners: - self.assertNotIn(listeners, set(self.srv.pipeline_listeners)) + listener = self.srv.pipeline_listener + self.assertEqual(20, + len(self.srv.pipeline_listener.dispatcher.targets)) + self.srv._configure_pipeline_listener() + self.assertEqual(20, + len(self.srv.pipeline_listener.dispatcher.targets)) + self.assertIsNot(listener, self.srv.pipeline_listener) self.srv.stop() - def test_retain_common_listeners_on_refresh(self): + def test_retain_common_targets_on_refresh(self): with mock.patch('ceilometer.coordination.PartitionCoordinator' '.extract_my_subset', return_value=[1, 2]): self.srv.start() - self.assertEqual(4, len(self.srv.pipeline_listeners)) - listeners = [listener for listener in self.srv.pipeline_listeners] + listened_before = [target.topic for target in + self.srv.pipeline_listener.dispatcher.targets] + self.assertEqual(4, len(listened_before)) with mock.patch('ceilometer.coordination.PartitionCoordinator' '.extract_my_subset', return_value=[1, 3]): self.srv._refresh_agent(None) - self.assertEqual(4, len(self.srv.pipeline_listeners)) - for listener in listeners: - if listener.dispatcher.targets[0].topic.endswith('1'): - self.assertIn(listener, set(self.srv.pipeline_listeners)) - else: - self.assertNotIn(listener, set(self.srv.pipeline_listeners)) + listened_after = [target.topic for target in + self.srv.pipeline_listener.dispatcher.targets] + self.assertEqual(4, len(listened_after)) + common = set(listened_before) & set(listened_after) + for topic in common: + self.assertTrue(topic.endswith('1')) + self.srv.stop() + + def test_notify_to_relevant_endpoint(self): + self.srv.start() + dispatcher = self.srv.pipeline_listener.dispatcher + self.assertIsNotEmpty(dispatcher.targets) + + endpoints = {} + + for endpoint in dispatcher.endpoints: + self.assertEqual(1, len(endpoint.publish_context.pipelines)) + pipe = list(endpoint.publish_context.pipelines)[0] + endpoints[pipe.name] = endpoint + + notifiers = [] + notifiers.extend(self.srv.pipe_manager.transporters[0][2]) + notifiers.extend(self.srv.event_pipe_manager.transporters[0][2]) + for notifier in notifiers: + filter_rule = endpoints[notifier.publisher_id].filter_rule + self.assertEqual(True, filter_rule.match(None, + notifier.publisher_id, + None, None, None)) + self.srv.stop() @mock.patch('oslo_messaging.Notifier.sample')