From 372b835ebb1ebc2592983e3d4cc6dfdc8fa90633 Mon Sep 17 00:00:00 2001 From: Antoni Segura Puimedon Date: Wed, 13 Jun 2018 15:45:43 +0200 Subject: [PATCH] process to gracefully exit when last watcher exits In case all the watchers (in the CNI case the pod watcher only) have gracefully exited, continuing the process only serves to give a false appearance of things working. At the same time, it prevents the containerized deployment orchestrator from realizing that the Kuryr pod is not functional so it does not restart it. This fix allows non health proves environments where all watchers have gracefully exited to be restarted by k8s/ocp and eventually work again should the issue that made the graceful exits happen be solved. Change-Id: Id70978e06d980bc0ffa08bcee02d78bef9dcbeb8 Closes-Bug: #1776676 Signed-off-by: Antoni Segura Puimedon --- kuryr_kubernetes/cni/daemon/service.py | 2 +- kuryr_kubernetes/controller/service.py | 2 +- kuryr_kubernetes/tests/unit/test_watcher.py | 29 ++++++++++++++++----- kuryr_kubernetes/watcher.py | 13 ++++++++- 4 files changed, 36 insertions(+), 10 deletions(-) diff --git a/kuryr_kubernetes/cni/daemon/service.py b/kuryr_kubernetes/cni/daemon/service.py index 04ce59d8e..b81076575 100644 --- a/kuryr_kubernetes/cni/daemon/service.py +++ b/kuryr_kubernetes/cni/daemon/service.py @@ -190,7 +190,7 @@ class CNIDaemonWatcherService(cotyledon.Service): self.pipeline = h_cni.CNIPipeline() self.pipeline.register(h_cni.CallbackHandler(self.on_done, self.on_deleted)) - self.watcher = k_watcher.Watcher(self.pipeline) + self.watcher = k_watcher.Watcher(self.pipeline, exit_on_stop=True) self.watcher.add( "%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % { 'base': k_const.K8S_API_BASE, diff --git a/kuryr_kubernetes/controller/service.py b/kuryr_kubernetes/controller/service.py index ede57dfd2..d7f360bf4 100644 --- a/kuryr_kubernetes/controller/service.py +++ b/kuryr_kubernetes/controller/service.py @@ -83,7 +83,7 @@ class KuryrK8sService(six.with_metaclass(KuryrK8sServiceMeta, objects.register_locally_defined_vifs() pipeline = h_pipeline.ControllerPipeline(self.tg) - self.watcher = watcher.Watcher(pipeline, self.tg) + self.watcher = watcher.Watcher(pipeline, self.tg, exit_on_stop=True) self.health_manager = health.HealthServer() self.current_leader = None self.node_name = utils.get_node_name() diff --git a/kuryr_kubernetes/tests/unit/test_watcher.py b/kuryr_kubernetes/tests/unit/test_watcher.py index 58dc68746..97edbb4fd 100644 --- a/kuryr_kubernetes/tests/unit/test_watcher.py +++ b/kuryr_kubernetes/tests/unit/test_watcher.py @@ -211,14 +211,16 @@ class TestWatcher(test_base.TestCase): @staticmethod def _test_watch_create_watcher(path, handler, timeout=0): - watcher_obj = watcher.Watcher(handler, timeout=timeout) + watcher_obj = watcher.Watcher(handler, timeout=timeout, + exit_on_stop=True) watcher_obj._running = True watcher_obj._resources.add(path) watcher_obj._idle[path] = True watcher_obj._watching[path] = None return watcher_obj - def test_watch(self): + @mock.patch('sys.exit') + def test_watch(self, m_sys_exit): path = '/test' events = [{'e': i} for i in range(3)] @@ -234,8 +236,12 @@ class TestWatcher(test_base.TestCase): self.assertEqual(0, watcher_obj._timeout) m_handler.assert_has_calls([mock.call(e) for e in events]) + # After all events have been "handled", since there is only + # one handler, we'll gracefully exit + m_sys_exit.assert_called_once_with(1) - def test_watch_stopped(self): + @mock.patch('sys.exit') + def test_watch_stopped(self, m_sys_exit): path = '/test' events = [{'e': i} for i in range(3)] @@ -253,8 +259,10 @@ class TestWatcher(test_base.TestCase): m_handler.assert_called_once_with(events[0]) self.assertNotIn(path, watcher_obj._idle) self.assertNotIn(path, watcher_obj._watching) + m_sys_exit.assert_called_once_with(1) - def test_watch_removed(self): + @mock.patch('sys.exit') + def test_watch_removed(self, m_sys_exit): path = '/test' events = [{'e': i} for i in range(3)] @@ -272,8 +280,10 @@ class TestWatcher(test_base.TestCase): m_handler.assert_called_once_with(events[0]) self.assertNotIn(path, watcher_obj._idle) self.assertNotIn(path, watcher_obj._watching) + m_sys_exit.assert_called_once_with(1) - def test_watch_interrupted(self): + @mock.patch('sys.exit') + def test_watch_interrupted(self, m_sys_exit): path = '/test' events = [{'e': i} for i in range(3)] @@ -291,8 +301,10 @@ class TestWatcher(test_base.TestCase): m_handler.assert_called_once_with(events[0]) self.assertNotIn(path, watcher_obj._idle) self.assertNotIn(path, watcher_obj._watching) + m_sys_exit.assert_called_once_with(1) - def test_watch_client_request_failed(self): + @mock.patch('sys.exit') + def test_watch_client_request_failed(self, m_sys_exit): path = '/test' m_handler = mock.Mock() watcher_obj = self._test_watch_create_watcher(path, m_handler) @@ -302,8 +314,10 @@ class TestWatcher(test_base.TestCase): self.client.watch.assert_called_once() self.assertFalse(watcher_obj._healthy) + m_sys_exit.assert_called_once_with(1) - def test_watch_retry(self): + @mock.patch('sys.exit') + def test_watch_retry(self, m_sys_exit): path = '/test' events = [{'e': i} for i in range(3)] side_effects = [exceptions.ChunkedEncodingError("Connection Broken")] @@ -317,3 +331,4 @@ class TestWatcher(test_base.TestCase): watcher_obj._watch(path) m_handler.assert_has_calls([mock.call(e) for e in events]) + m_sys_exit.assert_called_once_with(1) diff --git a/kuryr_kubernetes/watcher.py b/kuryr_kubernetes/watcher.py index fe67d6104..c2975a592 100644 --- a/kuryr_kubernetes/watcher.py +++ b/kuryr_kubernetes/watcher.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import sys import time from kuryr_kubernetes import clients @@ -55,7 +56,8 @@ class Watcher(health.HealthHandler): graceful=False)` for asynchronous `Watcher`). """ - def __init__(self, handler, thread_group=None, timeout=None): + def __init__(self, handler, thread_group=None, timeout=None, + exit_on_stop=False): """Initializes a new Watcher instance. :param handler: a `callable` object to be invoked for each observed @@ -82,6 +84,7 @@ class Watcher(health.HealthHandler): if timeout is None: timeout = CONF.kubernetes.watch_retry_timeout self._timeout = timeout + self._exit_on_stop = exit_on_stop def add(self, path): """Adds ths K8s resource to the Watcher. @@ -152,6 +155,14 @@ class Watcher(health.HealthHandler): LOG.info("Stopped watching '%s'", path) except KeyError: LOG.error("Failed to exit watch gracefully") + finally: + if not self._watching and not self._idle: + self.stop() + if self._exit_on_stop: + LOG.info("No remaining active watchers, Exiting...") + # TODO(dulek): This complicates things, remove once we + # don't support running without kuryr-daemon. + sys.exit(1) def _watch(self, path): attempts = 0