Fix notification listeners usage

Closes-Bug: #1570468

Create the single notification pipeline listener and to aggregate
targets and endpoints in it.

In this implementation the incoming notification msg will be dispatched
in a relevant endpoint by its own filter rule based on publisher id
of a notifier, which sent this message.

Each pipeline notifier has a conforming publisher id based on
a pipeline name in which it to produce messages.

Change-Id: If8c1c6838fd291ef8dbdd4d27c78fe5a36522178
(cherry picked from commit 726b2d4d67)
This commit is contained in:
Kirill Bespalov 2016-04-20 17:23:02 +03:00 committed by gordon chung
parent c2b7246257
commit 6292e0e42b
3 changed files with 82 additions and 57 deletions

View File

@ -110,7 +110,7 @@ class NotificationService(service_base.BaseService):
notifiers.append(oslo_messaging.Notifier(
transport,
driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id='ceilometer.notification',
publisher_id=pipe.name,
topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, x)))
return notifiers
@ -146,7 +146,12 @@ class NotificationService(service_base.BaseService):
super(NotificationService, self).start()
self.partition_coordinator = None
self.coord_lock = threading.Lock()
self.listeners, self.pipeline_listeners = [], []
self.listeners = []
# NOTE(kbespalov): for the pipeline queues used a single amqp host
# hence only one listener is required
self.pipeline_listener = None
self.pipeline_manager = pipeline.setup_pipeline()
@ -174,7 +179,6 @@ class NotificationService(service_base.BaseService):
self.event_pipe_manager = self._get_event_pipeline_manager(
self.transport)
self.listeners, self.pipeline_listeners = [], []
self._configure_main_queue_listeners(self.pipe_manager,
self.event_pipe_manager)
@ -188,7 +192,7 @@ class NotificationService(service_base.BaseService):
self.tg.add_timer(cfg.CONF.coordination.check_watchers,
self.partition_coordinator.run_watchers)
# configure pipelines after all coordination is configured.
self._configure_pipeline_listeners()
self._configure_pipeline_listener()
if not cfg.CONF.notification.disable_non_metric_meters:
LOG.warning(_LW('Non-metric meters may be collected. It is highly '
@ -243,9 +247,9 @@ class NotificationService(service_base.BaseService):
self.listeners.append(listener)
def _refresh_agent(self, event):
self._configure_pipeline_listeners(True)
self._configure_pipeline_listener()
def _configure_pipeline_listeners(self, reuse_listeners=False):
def _configure_pipeline_listener(self):
with self.coord_lock:
ev_pipes = []
if cfg.CONF.notification.store_events:
@ -256,40 +260,35 @@ class NotificationService(service_base.BaseService):
self.group_id,
range(cfg.CONF.notification.pipeline_processing_queues))
queue_set = {}
endpoints = []
targets = []
for pipe in pipelines:
if isinstance(pipe, pipeline.EventPipeline):
endpoints.append(pipeline.EventPipelineEndpoint(self.ctxt,
pipe))
else:
endpoints.append(pipeline.SamplePipelineEndpoint(self.ctxt,
pipe))
for pipe_set, pipe in itertools.product(partitioned, pipelines):
queue_set['%s-%s-%s' %
(self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe
LOG.debug('Pipeline endpoint: %s from set: %s',
pipe.name, pipe_set)
topic = '%s-%s-%s' % (self.NOTIFICATION_IPC,
pipe.name, pipe_set)
targets.append(oslo_messaging.Target(topic=topic))
if reuse_listeners:
topics = queue_set.keys()
kill_list = []
for listener in self.pipeline_listeners:
if listener.dispatcher.targets[0].topic in topics:
queue_set.pop(listener.dispatcher.targets[0].topic)
else:
kill_list.append(listener)
for listener in kill_list:
utils.kill_listeners([listener])
self.pipeline_listeners.remove(listener)
else:
utils.kill_listeners(self.pipeline_listeners)
self.pipeline_listeners = []
if self.pipeline_listener:
self.pipeline_listener.stop()
self.pipeline_listener.wait()
for topic, pipe in queue_set.items():
LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
pipe_set)
pipe_endpoint = (pipeline.EventPipelineEndpoint
if isinstance(pipe, pipeline.EventPipeline)
else pipeline.SamplePipelineEndpoint)
listener = messaging.get_batch_notification_listener(
transport,
[oslo_messaging.Target(topic=topic)],
[pipe_endpoint(self.ctxt, pipe)],
batch_size=cfg.CONF.notification.batch_size,
batch_timeout=cfg.CONF.notification.batch_timeout)
listener.start()
self.pipeline_listeners.append(listener)
self.pipeline_listener = messaging.get_batch_notification_listener(
transport,
targets,
endpoints,
batch_size=cfg.CONF.notification.batch_size,
batch_timeout=cfg.CONF.notification.batch_timeout)
self.pipeline_listener.start()
def stop(self):
if getattr(self, 'partition_coordinator', None):
@ -297,8 +296,8 @@ class NotificationService(service_base.BaseService):
listeners = []
if getattr(self, 'listeners', None):
listeners.extend(self.listeners)
if getattr(self, 'pipeline_listeners', None):
listeners.extend(self.pipeline_listeners)
if getattr(self, 'pipeline_listener', None):
listeners.append(self.pipeline_listener)
utils.kill_listeners(listeners)
super(NotificationService, self).stop()
@ -321,4 +320,4 @@ class NotificationService(service_base.BaseService):
# re-start the pipeline listeners if workload partitioning
# is enabled.
if cfg.CONF.notification.workload_partitioning:
self._configure_pipeline_listeners()
self._configure_pipeline_listener()

View File

@ -81,6 +81,8 @@ class PipelineException(Exception):
class PipelineEndpoint(object):
def __init__(self, context, pipeline):
self.filter_rule = oslo_messaging.NotificationFilter(
publisher_id=pipeline.name)
self.publish_context = PublishContext(context, [pipeline])
@abc.abstractmethod

View File

@ -257,7 +257,6 @@ class BaseRealNotification(tests_base.BaseTestCase):
if (len(self.publisher.samples) >= self.expected_samples and
len(self.publisher.events) >= self.expected_events):
break
self.assertNotEqual(self.srv.listeners, self.srv.pipeline_listeners)
self.srv.stop()
resources = list(set(s.resource_id for s in self.publisher.samples))
@ -387,31 +386,56 @@ class TestRealNotificationHA(BaseRealNotification):
fake_publisher_cls.return_value = self.publisher
self._check_notification_service()
def test_reset_listeners_on_refresh(self):
def test_reset_listener_on_refresh(self):
self.srv.start()
listeners = self.srv.pipeline_listeners
self.assertEqual(20, len(listeners))
self.srv._configure_pipeline_listeners()
self.assertEqual(20, len(self.srv.pipeline_listeners))
for listener in listeners:
self.assertNotIn(listeners, set(self.srv.pipeline_listeners))
listener = self.srv.pipeline_listener
self.assertEqual(20,
len(self.srv.pipeline_listener.dispatcher.targets))
self.srv._configure_pipeline_listener()
self.assertEqual(20,
len(self.srv.pipeline_listener.dispatcher.targets))
self.assertIsNot(listener, self.srv.pipeline_listener)
self.srv.stop()
def test_retain_common_listeners_on_refresh(self):
def test_retain_common_targets_on_refresh(self):
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'.extract_my_subset', return_value=[1, 2]):
self.srv.start()
self.assertEqual(4, len(self.srv.pipeline_listeners))
listeners = [listener for listener in self.srv.pipeline_listeners]
listened_before = [target.topic for target in
self.srv.pipeline_listener.dispatcher.targets]
self.assertEqual(4, len(listened_before))
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'.extract_my_subset', return_value=[1, 3]):
self.srv._refresh_agent(None)
self.assertEqual(4, len(self.srv.pipeline_listeners))
for listener in listeners:
if listener.dispatcher.targets[0].topic.endswith('1'):
self.assertIn(listener, set(self.srv.pipeline_listeners))
else:
self.assertNotIn(listener, set(self.srv.pipeline_listeners))
listened_after = [target.topic for target in
self.srv.pipeline_listener.dispatcher.targets]
self.assertEqual(4, len(listened_after))
common = set(listened_before) & set(listened_after)
for topic in common:
self.assertTrue(topic.endswith('1'))
self.srv.stop()
def test_notify_to_relevant_endpoint(self):
self.srv.start()
dispatcher = self.srv.pipeline_listener.dispatcher
self.assertIsNotEmpty(dispatcher.targets)
endpoints = {}
for endpoint in dispatcher.endpoints:
self.assertEqual(1, len(endpoint.publish_context.pipelines))
pipe = list(endpoint.publish_context.pipelines)[0]
endpoints[pipe.name] = endpoint
notifiers = []
notifiers.extend(self.srv.pipe_manager.transporters[0][2])
notifiers.extend(self.srv.event_pipe_manager.transporters[0][2])
for notifier in notifiers:
filter_rule = endpoints[notifier.publisher_id].filter_rule
self.assertEqual(True, filter_rule.match(None,
notifier.publisher_id,
None, None, None))
self.srv.stop()
@mock.patch('oslo_messaging.Notifier.sample')