diff --git a/ceilometer/collector.py b/ceilometer/collector.py index 10d9f460..b4797f7c 100644 --- a/ceilometer/collector.py +++ b/ceilometer/collector.py @@ -14,6 +14,7 @@ # under the License. from itertools import chain +import select import socket import cotyledon @@ -111,6 +112,10 @@ class CollectorService(cotyledon.Service): self.udp_run = True while self.udp_run: + # NOTE(sileht): return every 10 seconds to allow + # clear shutdown + if not select.select([udp], [], [], 10.0)[0]: + continue # NOTE(jd) Arbitrary limit of 64K because that ought to be # enough for anybody. data, source = udp.recvfrom(64 * units.Ki) diff --git a/ceilometer/tests/functional/test_collector.py b/ceilometer/tests/functional/test_collector.py index 7d8c9a6e..e9e5ca32 100644 --- a/ceilometer/tests/functional/test_collector.py +++ b/ceilometer/tests/functional/test_collector.py @@ -119,13 +119,15 @@ class TestCollector(tests_base.BaseTestCase): udp_socket = self._make_fake_socket(self.sample) - with mock.patch('socket.socket') as mock_socket: - mock_socket.return_value = udp_socket - self.srv.run() - self.addCleanup(self.srv.terminate) - self.srv.udp_thread.join(5) - self.assertFalse(self.srv.udp_thread.is_alive()) - mock_socket.assert_called_with(socket.AF_INET, socket.SOCK_DGRAM) + with mock.patch('select.select', return_value=([udp_socket], [], [])): + with mock.patch('socket.socket') as mock_socket: + mock_socket.return_value = udp_socket + self.srv.run() + self.addCleanup(self.srv.terminate) + self.srv.udp_thread.join(5) + self.assertFalse(self.srv.udp_thread.is_alive()) + mock_socket.assert_called_with(socket.AF_INET, + socket.SOCK_DGRAM) self._verify_udp_socket(udp_socket) mock_record = self.mock_dispatcher.record_metering_data @@ -136,13 +138,15 @@ class TestCollector(tests_base.BaseTestCase): self.CONF.set_override('udp_address', '::1', group='collector') sock = self._make_fake_socket(self.sample) - with mock.patch.object(socket, 'socket') as mock_socket: - mock_socket.return_value = sock - self.srv.run() - self.addCleanup(self.srv.terminate) - self.srv.udp_thread.join(5) - self.assertFalse(self.srv.udp_thread.is_alive()) - mock_socket.assert_called_with(socket.AF_INET6, socket.SOCK_DGRAM) + with mock.patch('select.select', return_value=([sock], [], [])): + with mock.patch.object(socket, 'socket') as mock_socket: + mock_socket.return_value = sock + self.srv.run() + self.addCleanup(self.srv.terminate) + self.srv.udp_thread.join(5) + self.assertFalse(self.srv.udp_thread.is_alive()) + mock_socket.assert_called_with(socket.AF_INET6, + socket.SOCK_DGRAM) def test_udp_receive_storage_error(self): self._setup_messaging(False) @@ -150,11 +154,12 @@ class TestCollector(tests_base.BaseTestCase): mock_record.side_effect = self._raise_error udp_socket = self._make_fake_socket(self.sample) - with mock.patch('socket.socket', return_value=udp_socket): - self.srv.run() - self.addCleanup(self.srv.terminate) - self.srv.udp_thread.join(5) - self.assertFalse(self.srv.udp_thread.is_alive()) + with mock.patch('select.select', return_value=([udp_socket], [], [])): + with mock.patch('socket.socket', return_value=udp_socket): + self.srv.run() + self.addCleanup(self.srv.terminate) + self.srv.udp_thread.join(5) + self.assertFalse(self.srv.udp_thread.is_alive()) self._verify_udp_socket(udp_socket) @@ -195,15 +200,16 @@ class TestCollector(tests_base.BaseTestCase): def test_udp_receive_valid_encoding(self): self._setup_messaging(False) self.data_sent = [] - with mock.patch('socket.socket', - return_value=self._make_fake_socket(self.utf8_msg)): - self.srv.run() - self.addCleanup(self.srv.terminate) - self.srv.udp_thread.join(5) - self.assertFalse(self.srv.udp_thread.is_alive()) - self.assertTrue(utils.verify_signature( - self.mock_dispatcher.method_calls[0][1][0], - "not-so-secret")) + sock = self._make_fake_socket(self.utf8_msg) + with mock.patch('select.select', return_value=([sock], [], [])): + with mock.patch('socket.socket', return_value=sock): + self.srv.run() + self.addCleanup(self.srv.terminate) + self.srv.udp_thread.join(5) + self.assertFalse(self.srv.udp_thread.is_alive()) + self.assertTrue(utils.verify_signature( + self.mock_dispatcher.method_calls[0][1][0], + "not-so-secret")) def _test_collector_requeue(self, listener, batch_listener=False):