Merge "stop double refreshing on start" into stable/pike

This commit is contained in:
Zuul 2017-11-13 17:12:51 +00:00 committed by Gerrit Code Review
commit 7bee6ae2ef
2 changed files with 29 additions and 28 deletions

View File

@ -212,7 +212,8 @@ class NotificationService(cotyledon.Service):
self.hashring = self.partition_coordinator.join_partitioned_group(
self.NOTIFICATION_NAMESPACE)
@periodics.periodic(spacing=self.conf.coordination.check_watchers)
@periodics.periodic(spacing=self.conf.coordination.check_watchers,
run_immediately=True)
def run_watchers():
self.partition_coordinator.run_watchers()
if self.group_state != self.hashring.ring.nodes:
@ -224,7 +225,6 @@ class NotificationService(cotyledon.Service):
futures.ThreadPoolExecutor(max_workers=10))
self.periodic.add(run_watchers)
utils.spawn_thread(self.periodic.start)
self._refresh_agent()
def _configure_main_queue_listeners(self, pipe_manager,
event_pipe_manager):

View File

@ -82,6 +82,20 @@ TEST_NOTICE_PAYLOAD = {
}
class BaseNotificationTest(tests_base.BaseTestCase):
def run_service(self, srv):
srv.run()
self.addCleanup(srv.terminate)
if srv.conf.notification.workload_partitioning:
start = time.time()
while time.time() - start < 10:
if srv.group_state: # ensure pipeline is set if HA
break
time.sleep(0.1)
else:
self.fail('Did not start pipeline queues')
class _FakeNotificationPlugin(plugin_base.NotificationBase):
event_types = ['fake.event']
@ -94,7 +108,7 @@ class _FakeNotificationPlugin(plugin_base.NotificationBase):
return []
class TestNotification(tests_base.BaseTestCase):
class TestNotification(BaseNotificationTest):
def setUp(self):
super(TestNotification, self).setUp()
@ -121,8 +135,7 @@ class TestNotification(tests_base.BaseTestCase):
with mock.patch.object(self.srv,
'_get_notifications_manager') as get_nm:
get_nm.side_effect = self.fake_get_notifications_manager
self.srv.run()
self.addCleanup(self.srv.terminate)
self.run_service(self.srv)
self.fake_event_endpoint = fake_event_endpoint_class.return_value
def test_start_multiple_listeners(self):
@ -161,14 +174,13 @@ class TestNotification(tests_base.BaseTestCase):
with mock.patch.object(self.srv,
'_get_notifications_manager') as get_nm:
get_nm.side_effect = fake_get_notifications_manager_dup_targets
self.srv.run()
self.addCleanup(self.srv.terminate)
self.run_service(self.srv)
# 1 target, 1 listener
self.assertEqual(1, len(mock_listener.call_args_list[0][0][1]))
self.assertEqual(1, len(self.srv.listeners))
class BaseRealNotification(tests_base.BaseTestCase):
class BaseRealNotification(BaseNotificationTest):
def setup_pipeline(self, counter_names):
pipeline = yaml.dump({
'sources': [{
@ -230,9 +242,7 @@ class BaseRealNotification(tests_base.BaseTestCase):
self.publisher = test_publisher.TestPublisher(self.CONF, "")
def _check_notification_service(self):
self.srv.run()
self.addCleanup(self.srv.terminate)
self.run_service(self.srv)
notifier = messaging.get_notifier(self.transport,
"compute.vagrant-precise")
notifier.info({}, 'compute.instance.create.end',
@ -263,8 +273,7 @@ class TestRealNotification(BaseRealNotification):
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_service_error_topic(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
self.srv.run()
self.addCleanup(self.srv.terminate)
self.run_service(self.srv)
notifier = messaging.get_notifier(self.transport,
'compute.vagrant-precise')
notifier.error({}, 'compute.instance.error',
@ -311,10 +320,7 @@ class TestRealNotificationHA(BaseRealNotification):
mock.MagicMock(), # pipeline listener
mock.MagicMock(), # refresh pipeline listener
]
self.srv.run()
self.addCleanup(self.srv.terminate)
self.run_service(self.srv)
listener = self.srv.pipeline_listener
self.srv._refresh_agent()
self.assertIsNot(listener, self.srv.pipeline_listener)
@ -331,8 +337,7 @@ class TestRealNotificationHA(BaseRealNotification):
hashring.belongs_to_self = _once_over_five
self.srv.partition_coordinator = pc = mock.MagicMock()
pc.join_partitioned_group.return_value = hashring
self.srv.run()
self.addCleanup(self.srv.terminate)
self.run_service(self.srv)
topics = [target.topic for target in mock_listener.call_args[0][1]]
self.assertEqual(4, len(topics))
self.assertEqual(
@ -344,8 +349,7 @@ class TestRealNotificationHA(BaseRealNotification):
@mock.patch('oslo_messaging.get_batch_notification_listener')
def test_notify_to_relevant_endpoint(self, mock_listener):
self.srv.run()
self.addCleanup(self.srv.terminate)
self.run_service(self.srv)
targets = mock_listener.call_args[0][1]
self.assertIsNotEmpty(targets)
@ -371,8 +375,7 @@ class TestRealNotificationHA(BaseRealNotification):
@mock.patch('oslo_messaging.Notifier.sample')
def test_broadcast_to_relevant_pipes_only(self, mock_notifier):
self.srv.run()
self.addCleanup(self.srv.terminate)
self.run_service(self.srv)
for endpoint in self.srv.listeners[0].dispatcher.endpoints:
if (hasattr(endpoint, 'filter_rule') and
not endpoint.filter_rule.match(None, None, 'nonmatching.end',
@ -408,7 +411,7 @@ class TestRealNotificationHA(BaseRealNotification):
mock_notifier.call_args_list[2][1]['event_type'])
class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
class TestRealNotificationMultipleAgents(BaseNotificationTest):
def setup_pipeline(self, transformers):
pipeline = yaml.dump({
'sources': [{
@ -477,8 +480,7 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
hashring_srv1.belongs_to_self = _sometimes_srv
hashring_srv1.ring.nodes = {'id1': mock.Mock()}
pc.join_partitioned_group.return_value = hashring_srv1
self.srv.run()
self.addCleanup(self.srv.terminate)
self.run_service(self.srv)
def _sometimes_srv2(item):
maybe["srv2"] += 1
@ -491,8 +493,7 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
hashring.ring.nodes = {'id1': mock.Mock(), 'id2': mock.Mock()}
self.srv.hashring.ring.nodes = hashring.ring.nodes.copy()
pc.join_partitioned_group.return_value = hashring
self.srv2.run()
self.addCleanup(self.srv2.terminate)
self.run_service(self.srv2)
notifier = messaging.get_notifier(self.transport,
"compute.vagrant-precise")