Merge "stop double refreshing on start" into stable/pike
This commit is contained in:
commit
7bee6ae2ef
|
@ -212,7 +212,8 @@ class NotificationService(cotyledon.Service):
|
|||
self.hashring = self.partition_coordinator.join_partitioned_group(
|
||||
self.NOTIFICATION_NAMESPACE)
|
||||
|
||||
@periodics.periodic(spacing=self.conf.coordination.check_watchers)
|
||||
@periodics.periodic(spacing=self.conf.coordination.check_watchers,
|
||||
run_immediately=True)
|
||||
def run_watchers():
|
||||
self.partition_coordinator.run_watchers()
|
||||
if self.group_state != self.hashring.ring.nodes:
|
||||
|
@ -224,7 +225,6 @@ class NotificationService(cotyledon.Service):
|
|||
futures.ThreadPoolExecutor(max_workers=10))
|
||||
self.periodic.add(run_watchers)
|
||||
utils.spawn_thread(self.periodic.start)
|
||||
self._refresh_agent()
|
||||
|
||||
def _configure_main_queue_listeners(self, pipe_manager,
|
||||
event_pipe_manager):
|
||||
|
|
|
@ -82,6 +82,20 @@ TEST_NOTICE_PAYLOAD = {
|
|||
}
|
||||
|
||||
|
||||
class BaseNotificationTest(tests_base.BaseTestCase):
|
||||
def run_service(self, srv):
|
||||
srv.run()
|
||||
self.addCleanup(srv.terminate)
|
||||
if srv.conf.notification.workload_partitioning:
|
||||
start = time.time()
|
||||
while time.time() - start < 10:
|
||||
if srv.group_state: # ensure pipeline is set if HA
|
||||
break
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
self.fail('Did not start pipeline queues')
|
||||
|
||||
|
||||
class _FakeNotificationPlugin(plugin_base.NotificationBase):
|
||||
event_types = ['fake.event']
|
||||
|
||||
|
@ -94,7 +108,7 @@ class _FakeNotificationPlugin(plugin_base.NotificationBase):
|
|||
return []
|
||||
|
||||
|
||||
class TestNotification(tests_base.BaseTestCase):
|
||||
class TestNotification(BaseNotificationTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestNotification, self).setUp()
|
||||
|
@ -121,8 +135,7 @@ class TestNotification(tests_base.BaseTestCase):
|
|||
with mock.patch.object(self.srv,
|
||||
'_get_notifications_manager') as get_nm:
|
||||
get_nm.side_effect = self.fake_get_notifications_manager
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
self.run_service(self.srv)
|
||||
self.fake_event_endpoint = fake_event_endpoint_class.return_value
|
||||
|
||||
def test_start_multiple_listeners(self):
|
||||
|
@ -161,14 +174,13 @@ class TestNotification(tests_base.BaseTestCase):
|
|||
with mock.patch.object(self.srv,
|
||||
'_get_notifications_manager') as get_nm:
|
||||
get_nm.side_effect = fake_get_notifications_manager_dup_targets
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
self.run_service(self.srv)
|
||||
# 1 target, 1 listener
|
||||
self.assertEqual(1, len(mock_listener.call_args_list[0][0][1]))
|
||||
self.assertEqual(1, len(self.srv.listeners))
|
||||
|
||||
|
||||
class BaseRealNotification(tests_base.BaseTestCase):
|
||||
class BaseRealNotification(BaseNotificationTest):
|
||||
def setup_pipeline(self, counter_names):
|
||||
pipeline = yaml.dump({
|
||||
'sources': [{
|
||||
|
@ -230,9 +242,7 @@ class BaseRealNotification(tests_base.BaseTestCase):
|
|||
self.publisher = test_publisher.TestPublisher(self.CONF, "")
|
||||
|
||||
def _check_notification_service(self):
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
|
||||
self.run_service(self.srv)
|
||||
notifier = messaging.get_notifier(self.transport,
|
||||
"compute.vagrant-precise")
|
||||
notifier.info({}, 'compute.instance.create.end',
|
||||
|
@ -263,8 +273,7 @@ class TestRealNotification(BaseRealNotification):
|
|||
@mock.patch('ceilometer.publisher.test.TestPublisher')
|
||||
def test_notification_service_error_topic(self, fake_publisher_cls):
|
||||
fake_publisher_cls.return_value = self.publisher
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
self.run_service(self.srv)
|
||||
notifier = messaging.get_notifier(self.transport,
|
||||
'compute.vagrant-precise')
|
||||
notifier.error({}, 'compute.instance.error',
|
||||
|
@ -311,10 +320,7 @@ class TestRealNotificationHA(BaseRealNotification):
|
|||
mock.MagicMock(), # pipeline listener
|
||||
mock.MagicMock(), # refresh pipeline listener
|
||||
]
|
||||
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
|
||||
self.run_service(self.srv)
|
||||
listener = self.srv.pipeline_listener
|
||||
self.srv._refresh_agent()
|
||||
self.assertIsNot(listener, self.srv.pipeline_listener)
|
||||
|
@ -331,8 +337,7 @@ class TestRealNotificationHA(BaseRealNotification):
|
|||
hashring.belongs_to_self = _once_over_five
|
||||
self.srv.partition_coordinator = pc = mock.MagicMock()
|
||||
pc.join_partitioned_group.return_value = hashring
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
self.run_service(self.srv)
|
||||
topics = [target.topic for target in mock_listener.call_args[0][1]]
|
||||
self.assertEqual(4, len(topics))
|
||||
self.assertEqual(
|
||||
|
@ -344,8 +349,7 @@ class TestRealNotificationHA(BaseRealNotification):
|
|||
|
||||
@mock.patch('oslo_messaging.get_batch_notification_listener')
|
||||
def test_notify_to_relevant_endpoint(self, mock_listener):
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
self.run_service(self.srv)
|
||||
|
||||
targets = mock_listener.call_args[0][1]
|
||||
self.assertIsNotEmpty(targets)
|
||||
|
@ -371,8 +375,7 @@ class TestRealNotificationHA(BaseRealNotification):
|
|||
|
||||
@mock.patch('oslo_messaging.Notifier.sample')
|
||||
def test_broadcast_to_relevant_pipes_only(self, mock_notifier):
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
self.run_service(self.srv)
|
||||
for endpoint in self.srv.listeners[0].dispatcher.endpoints:
|
||||
if (hasattr(endpoint, 'filter_rule') and
|
||||
not endpoint.filter_rule.match(None, None, 'nonmatching.end',
|
||||
|
@ -408,7 +411,7 @@ class TestRealNotificationHA(BaseRealNotification):
|
|||
mock_notifier.call_args_list[2][1]['event_type'])
|
||||
|
||||
|
||||
class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
|
||||
class TestRealNotificationMultipleAgents(BaseNotificationTest):
|
||||
def setup_pipeline(self, transformers):
|
||||
pipeline = yaml.dump({
|
||||
'sources': [{
|
||||
|
@ -477,8 +480,7 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
|
|||
hashring_srv1.belongs_to_self = _sometimes_srv
|
||||
hashring_srv1.ring.nodes = {'id1': mock.Mock()}
|
||||
pc.join_partitioned_group.return_value = hashring_srv1
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
self.run_service(self.srv)
|
||||
|
||||
def _sometimes_srv2(item):
|
||||
maybe["srv2"] += 1
|
||||
|
@ -491,8 +493,7 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
|
|||
hashring.ring.nodes = {'id1': mock.Mock(), 'id2': mock.Mock()}
|
||||
self.srv.hashring.ring.nodes = hashring.ring.nodes.copy()
|
||||
pc.join_partitioned_group.return_value = hashring
|
||||
self.srv2.run()
|
||||
self.addCleanup(self.srv2.terminate)
|
||||
self.run_service(self.srv2)
|
||||
|
||||
notifier = messaging.get_notifier(self.transport,
|
||||
"compute.vagrant-precise")
|
||||
|
|
Loading…
Reference in New Issue