Merge "utils: move kill_listeners to ceilometer.notification"

This commit is contained in:
Zuul 2018-02-06 21:41:42 +00:00 committed by Gerrit Code Review
commit c44759fc4e
3 changed files with 12 additions and 14 deletions

View File

@ -235,8 +235,7 @@ class NotificationService(cotyledon.Service):
targets.append(oslo_messaging.Target(topic=topic))
if self.pipeline_listener:
self.pipeline_listener.stop()
self.pipeline_listener.wait()
self.kill_listeners([self.pipeline_listener])
self.pipeline_listener = messaging.get_batch_notification_listener(
self.transport, targets, endpoints,
@ -248,6 +247,15 @@ class NotificationService(cotyledon.Service):
else self.conf.max_parallel_requests)
self.pipeline_listener.start(override_pool_size=batch)
@staticmethod
def kill_listeners(listeners):
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
# which stops new messages, and wait(), which processes remaining
# messages and closes connection
for listener in listeners:
listener.stop()
listener.wait()
def terminate(self):
self.shutdown = True
if self.periodic:
@ -257,7 +265,7 @@ class NotificationService(cotyledon.Service):
self.partition_coordinator.stop()
with self.coord_lock:
if self.pipeline_listener:
utils.kill_listeners([self.pipeline_listener])
utils.kill_listeners(self.listeners)
self.kill_listeners([self.pipeline_listener])
self.kill_listeners(self.listeners)
super(NotificationService, self).terminate()

View File

@ -258,7 +258,6 @@ class TestRealNotificationHA(BaseRealNotification):
fake_publisher_cls.return_value = self.publisher
self._check_notification_service()
@mock.patch("ceilometer.utils.kill_listeners", mock.MagicMock())
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'stop')
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'wait')
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start')

View File

@ -54,15 +54,6 @@ def hash_of_set(s):
return str(hash(frozenset(s)))
def kill_listeners(listeners):
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
# which stops new messages, and wait(), which processes remaining
# messages and closes connection
for listener in listeners:
listener.stop()
listener.wait()
def spawn_thread(target, *args, **kwargs):
t = threading.Thread(target=target, args=args, kwargs=kwargs)
t.daemon = True