Merge "utils: move kill_listeners to ceilometer.notification"
This commit is contained in:
commit
c44759fc4e
|
@ -235,8 +235,7 @@ class NotificationService(cotyledon.Service):
|
|||
targets.append(oslo_messaging.Target(topic=topic))
|
||||
|
||||
if self.pipeline_listener:
|
||||
self.pipeline_listener.stop()
|
||||
self.pipeline_listener.wait()
|
||||
self.kill_listeners([self.pipeline_listener])
|
||||
|
||||
self.pipeline_listener = messaging.get_batch_notification_listener(
|
||||
self.transport, targets, endpoints,
|
||||
|
@ -248,6 +247,15 @@ class NotificationService(cotyledon.Service):
|
|||
else self.conf.max_parallel_requests)
|
||||
self.pipeline_listener.start(override_pool_size=batch)
|
||||
|
||||
@staticmethod
|
||||
def kill_listeners(listeners):
|
||||
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
|
||||
# which stops new messages, and wait(), which processes remaining
|
||||
# messages and closes connection
|
||||
for listener in listeners:
|
||||
listener.stop()
|
||||
listener.wait()
|
||||
|
||||
def terminate(self):
|
||||
self.shutdown = True
|
||||
if self.periodic:
|
||||
|
@ -257,7 +265,7 @@ class NotificationService(cotyledon.Service):
|
|||
self.partition_coordinator.stop()
|
||||
with self.coord_lock:
|
||||
if self.pipeline_listener:
|
||||
utils.kill_listeners([self.pipeline_listener])
|
||||
utils.kill_listeners(self.listeners)
|
||||
self.kill_listeners([self.pipeline_listener])
|
||||
self.kill_listeners(self.listeners)
|
||||
|
||||
super(NotificationService, self).terminate()
|
||||
|
|
|
@ -258,7 +258,6 @@ class TestRealNotificationHA(BaseRealNotification):
|
|||
fake_publisher_cls.return_value = self.publisher
|
||||
self._check_notification_service()
|
||||
|
||||
@mock.patch("ceilometer.utils.kill_listeners", mock.MagicMock())
|
||||
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'stop')
|
||||
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'wait')
|
||||
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start')
|
||||
|
|
|
@ -54,15 +54,6 @@ def hash_of_set(s):
|
|||
return str(hash(frozenset(s)))
|
||||
|
||||
|
||||
def kill_listeners(listeners):
|
||||
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
|
||||
# which stops new messages, and wait(), which processes remaining
|
||||
# messages and closes connection
|
||||
for listener in listeners:
|
||||
listener.stop()
|
||||
listener.wait()
|
||||
|
||||
|
||||
def spawn_thread(target, *args, **kwargs):
|
||||
t = threading.Thread(target=target, args=args, kwargs=kwargs)
|
||||
t.daemon = True
|
||||
|
|
Loading…
Reference in New Issue