Merge "process to gracefully exit when last watcher exits"

This commit is contained in:
Zuul 2018-07-17 10:48:46 +00:00 committed by Gerrit Code Review
commit 5d4632ee69
4 changed files with 36 additions and 10 deletions

View File

@ -190,7 +190,7 @@ class CNIDaemonWatcherService(cotyledon.Service):
self.pipeline = h_cni.CNIPipeline()
self.pipeline.register(h_cni.CallbackHandler(self.on_done,
self.on_deleted))
self.watcher = k_watcher.Watcher(self.pipeline)
self.watcher = k_watcher.Watcher(self.pipeline, exit_on_stop=True)
self.watcher.add(
"%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % {
'base': k_const.K8S_API_BASE,

View File

@ -83,7 +83,7 @@ class KuryrK8sService(six.with_metaclass(KuryrK8sServiceMeta,
objects.register_locally_defined_vifs()
pipeline = h_pipeline.ControllerPipeline(self.tg)
self.watcher = watcher.Watcher(pipeline, self.tg)
self.watcher = watcher.Watcher(pipeline, self.tg, exit_on_stop=True)
self.health_manager = health.HealthServer()
self.current_leader = None
self.node_name = utils.get_node_name()

View File

@ -211,14 +211,16 @@ class TestWatcher(test_base.TestCase):
@staticmethod
def _test_watch_create_watcher(path, handler, timeout=0):
watcher_obj = watcher.Watcher(handler, timeout=timeout)
watcher_obj = watcher.Watcher(handler, timeout=timeout,
exit_on_stop=True)
watcher_obj._running = True
watcher_obj._resources.add(path)
watcher_obj._idle[path] = True
watcher_obj._watching[path] = None
return watcher_obj
def test_watch(self):
@mock.patch('sys.exit')
def test_watch(self, m_sys_exit):
path = '/test'
events = [{'e': i} for i in range(3)]
@ -234,8 +236,12 @@ class TestWatcher(test_base.TestCase):
self.assertEqual(0, watcher_obj._timeout)
m_handler.assert_has_calls([mock.call(e) for e in events])
# After all events have been "handled", since there is only
# one handler, we'll gracefully exit
m_sys_exit.assert_called_once_with(1)
def test_watch_stopped(self):
@mock.patch('sys.exit')
def test_watch_stopped(self, m_sys_exit):
path = '/test'
events = [{'e': i} for i in range(3)]
@ -253,8 +259,10 @@ class TestWatcher(test_base.TestCase):
m_handler.assert_called_once_with(events[0])
self.assertNotIn(path, watcher_obj._idle)
self.assertNotIn(path, watcher_obj._watching)
m_sys_exit.assert_called_once_with(1)
def test_watch_removed(self):
@mock.patch('sys.exit')
def test_watch_removed(self, m_sys_exit):
path = '/test'
events = [{'e': i} for i in range(3)]
@ -272,8 +280,10 @@ class TestWatcher(test_base.TestCase):
m_handler.assert_called_once_with(events[0])
self.assertNotIn(path, watcher_obj._idle)
self.assertNotIn(path, watcher_obj._watching)
m_sys_exit.assert_called_once_with(1)
def test_watch_interrupted(self):
@mock.patch('sys.exit')
def test_watch_interrupted(self, m_sys_exit):
path = '/test'
events = [{'e': i} for i in range(3)]
@ -291,8 +301,10 @@ class TestWatcher(test_base.TestCase):
m_handler.assert_called_once_with(events[0])
self.assertNotIn(path, watcher_obj._idle)
self.assertNotIn(path, watcher_obj._watching)
m_sys_exit.assert_called_once_with(1)
def test_watch_client_request_failed(self):
@mock.patch('sys.exit')
def test_watch_client_request_failed(self, m_sys_exit):
path = '/test'
m_handler = mock.Mock()
watcher_obj = self._test_watch_create_watcher(path, m_handler)
@ -302,8 +314,10 @@ class TestWatcher(test_base.TestCase):
self.client.watch.assert_called_once()
self.assertFalse(watcher_obj._healthy)
m_sys_exit.assert_called_once_with(1)
def test_watch_retry(self):
@mock.patch('sys.exit')
def test_watch_retry(self, m_sys_exit):
path = '/test'
events = [{'e': i} for i in range(3)]
side_effects = [exceptions.ChunkedEncodingError("Connection Broken")]
@ -317,3 +331,4 @@ class TestWatcher(test_base.TestCase):
watcher_obj._watch(path)
m_handler.assert_has_calls([mock.call(e) for e in events])
m_sys_exit.assert_called_once_with(1)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import sys
import time
from kuryr_kubernetes import clients
@ -55,7 +56,8 @@ class Watcher(health.HealthHandler):
graceful=False)` for asynchronous `Watcher`).
"""
def __init__(self, handler, thread_group=None, timeout=None):
def __init__(self, handler, thread_group=None, timeout=None,
exit_on_stop=False):
"""Initializes a new Watcher instance.
:param handler: a `callable` object to be invoked for each observed
@ -82,6 +84,7 @@ class Watcher(health.HealthHandler):
if timeout is None:
timeout = CONF.kubernetes.watch_retry_timeout
self._timeout = timeout
self._exit_on_stop = exit_on_stop
def add(self, path):
"""Adds ths K8s resource to the Watcher.
@ -152,6 +155,14 @@ class Watcher(health.HealthHandler):
LOG.info("Stopped watching '%s'", path)
except KeyError:
LOG.error("Failed to exit watch gracefully")
finally:
if not self._watching and not self._idle:
self.stop()
if self._exit_on_stop:
LOG.info("No remaining active watchers, Exiting...")
# TODO(dulek): This complicates things, remove once we
# don't support running without kuryr-daemon.
sys.exit(1)
def _watch(self, path):
attempts = 0