collector: fix graceful shutdown when udp enabled

recvfrom is blocking so if no udp data come in
the shutdown process never finish.

This fixes that.

Change-Id: I88302724d51d98ca1c5af8a0577c9dc5902b6355
This commit is contained in:
Mehdi Abaakouk 2016-09-27 17:03:39 +02:00 committed by gordon chung
parent ed0075f824
commit 67bbd3f833
2 changed files with 39 additions and 28 deletions

View File

@ -14,6 +14,7 @@
# under the License.
from itertools import chain
import select
import socket
import cotyledon
@ -111,6 +112,10 @@ class CollectorService(cotyledon.Service):
self.udp_run = True
while self.udp_run:
# NOTE(sileht): return every 10 seconds to allow
# clear shutdown
if not select.select([udp], [], [], 10.0)[0]:
continue
# NOTE(jd) Arbitrary limit of 64K because that ought to be
# enough for anybody.
data, source = udp.recvfrom(64 * units.Ki)

View File

@ -119,13 +119,15 @@ class TestCollector(tests_base.BaseTestCase):
udp_socket = self._make_fake_socket(self.sample)
with mock.patch('socket.socket') as mock_socket:
mock_socket.return_value = udp_socket
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
mock_socket.assert_called_with(socket.AF_INET, socket.SOCK_DGRAM)
with mock.patch('select.select', return_value=([udp_socket], [], [])):
with mock.patch('socket.socket') as mock_socket:
mock_socket.return_value = udp_socket
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
mock_socket.assert_called_with(socket.AF_INET,
socket.SOCK_DGRAM)
self._verify_udp_socket(udp_socket)
mock_record = self.mock_dispatcher.record_metering_data
@ -136,13 +138,15 @@ class TestCollector(tests_base.BaseTestCase):
self.CONF.set_override('udp_address', '::1', group='collector')
sock = self._make_fake_socket(self.sample)
with mock.patch.object(socket, 'socket') as mock_socket:
mock_socket.return_value = sock
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
mock_socket.assert_called_with(socket.AF_INET6, socket.SOCK_DGRAM)
with mock.patch('select.select', return_value=([sock], [], [])):
with mock.patch.object(socket, 'socket') as mock_socket:
mock_socket.return_value = sock
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
mock_socket.assert_called_with(socket.AF_INET6,
socket.SOCK_DGRAM)
def test_udp_receive_storage_error(self):
self._setup_messaging(False)
@ -150,11 +154,12 @@ class TestCollector(tests_base.BaseTestCase):
mock_record.side_effect = self._raise_error
udp_socket = self._make_fake_socket(self.sample)
with mock.patch('socket.socket', return_value=udp_socket):
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
with mock.patch('select.select', return_value=([udp_socket], [], [])):
with mock.patch('socket.socket', return_value=udp_socket):
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
self._verify_udp_socket(udp_socket)
@ -195,15 +200,16 @@ class TestCollector(tests_base.BaseTestCase):
def test_udp_receive_valid_encoding(self):
self._setup_messaging(False)
self.data_sent = []
with mock.patch('socket.socket',
return_value=self._make_fake_socket(self.utf8_msg)):
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
self.assertTrue(utils.verify_signature(
self.mock_dispatcher.method_calls[0][1][0],
"not-so-secret"))
sock = self._make_fake_socket(self.utf8_msg)
with mock.patch('select.select', return_value=([sock], [], [])):
with mock.patch('socket.socket', return_value=sock):
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
self.assertTrue(utils.verify_signature(
self.mock_dispatcher.method_calls[0][1][0],
"not-so-secret"))
def _test_collector_requeue(self, listener, batch_listener=False):