diff --git a/kuryr_kubernetes/cni/daemon/service.py b/kuryr_kubernetes/cni/daemon/service.py index 04ce59d8e..b81076575 100644 --- a/kuryr_kubernetes/cni/daemon/service.py +++ b/kuryr_kubernetes/cni/daemon/service.py @@ -190,7 +190,7 @@ class CNIDaemonWatcherService(cotyledon.Service): self.pipeline = h_cni.CNIPipeline() self.pipeline.register(h_cni.CallbackHandler(self.on_done, self.on_deleted)) - self.watcher = k_watcher.Watcher(self.pipeline) + self.watcher = k_watcher.Watcher(self.pipeline, exit_on_stop=True) self.watcher.add( "%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % { 'base': k_const.K8S_API_BASE, diff --git a/kuryr_kubernetes/controller/service.py b/kuryr_kubernetes/controller/service.py index ede57dfd2..d7f360bf4 100644 --- a/kuryr_kubernetes/controller/service.py +++ b/kuryr_kubernetes/controller/service.py @@ -83,7 +83,7 @@ class KuryrK8sService(six.with_metaclass(KuryrK8sServiceMeta, objects.register_locally_defined_vifs() pipeline = h_pipeline.ControllerPipeline(self.tg) - self.watcher = watcher.Watcher(pipeline, self.tg) + self.watcher = watcher.Watcher(pipeline, self.tg, exit_on_stop=True) self.health_manager = health.HealthServer() self.current_leader = None self.node_name = utils.get_node_name() diff --git a/kuryr_kubernetes/tests/unit/test_watcher.py b/kuryr_kubernetes/tests/unit/test_watcher.py index 58dc68746..97edbb4fd 100644 --- a/kuryr_kubernetes/tests/unit/test_watcher.py +++ b/kuryr_kubernetes/tests/unit/test_watcher.py @@ -211,14 +211,16 @@ class TestWatcher(test_base.TestCase): @staticmethod def _test_watch_create_watcher(path, handler, timeout=0): - watcher_obj = watcher.Watcher(handler, timeout=timeout) + watcher_obj = watcher.Watcher(handler, timeout=timeout, + exit_on_stop=True) watcher_obj._running = True watcher_obj._resources.add(path) watcher_obj._idle[path] = True watcher_obj._watching[path] = None return watcher_obj - def test_watch(self): + @mock.patch('sys.exit') + def test_watch(self, m_sys_exit): path = '/test' events = [{'e': i} for i in range(3)] @@ -234,8 +236,12 @@ class TestWatcher(test_base.TestCase): self.assertEqual(0, watcher_obj._timeout) m_handler.assert_has_calls([mock.call(e) for e in events]) + # After all events have been "handled", since there is only + # one handler, we'll gracefully exit + m_sys_exit.assert_called_once_with(1) - def test_watch_stopped(self): + @mock.patch('sys.exit') + def test_watch_stopped(self, m_sys_exit): path = '/test' events = [{'e': i} for i in range(3)] @@ -253,8 +259,10 @@ class TestWatcher(test_base.TestCase): m_handler.assert_called_once_with(events[0]) self.assertNotIn(path, watcher_obj._idle) self.assertNotIn(path, watcher_obj._watching) + m_sys_exit.assert_called_once_with(1) - def test_watch_removed(self): + @mock.patch('sys.exit') + def test_watch_removed(self, m_sys_exit): path = '/test' events = [{'e': i} for i in range(3)] @@ -272,8 +280,10 @@ class TestWatcher(test_base.TestCase): m_handler.assert_called_once_with(events[0]) self.assertNotIn(path, watcher_obj._idle) self.assertNotIn(path, watcher_obj._watching) + m_sys_exit.assert_called_once_with(1) - def test_watch_interrupted(self): + @mock.patch('sys.exit') + def test_watch_interrupted(self, m_sys_exit): path = '/test' events = [{'e': i} for i in range(3)] @@ -291,8 +301,10 @@ class TestWatcher(test_base.TestCase): m_handler.assert_called_once_with(events[0]) self.assertNotIn(path, watcher_obj._idle) self.assertNotIn(path, watcher_obj._watching) + m_sys_exit.assert_called_once_with(1) - def test_watch_client_request_failed(self): + @mock.patch('sys.exit') + def test_watch_client_request_failed(self, m_sys_exit): path = '/test' m_handler = mock.Mock() watcher_obj = self._test_watch_create_watcher(path, m_handler) @@ -302,8 +314,10 @@ class TestWatcher(test_base.TestCase): self.client.watch.assert_called_once() self.assertFalse(watcher_obj._healthy) + m_sys_exit.assert_called_once_with(1) - def test_watch_retry(self): + @mock.patch('sys.exit') + def test_watch_retry(self, m_sys_exit): path = '/test' events = [{'e': i} for i in range(3)] side_effects = [exceptions.ChunkedEncodingError("Connection Broken")] @@ -317,3 +331,4 @@ class TestWatcher(test_base.TestCase): watcher_obj._watch(path) m_handler.assert_has_calls([mock.call(e) for e in events]) + m_sys_exit.assert_called_once_with(1) diff --git a/kuryr_kubernetes/watcher.py b/kuryr_kubernetes/watcher.py index fe67d6104..c2975a592 100644 --- a/kuryr_kubernetes/watcher.py +++ b/kuryr_kubernetes/watcher.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import sys import time from kuryr_kubernetes import clients @@ -55,7 +56,8 @@ class Watcher(health.HealthHandler): graceful=False)` for asynchronous `Watcher`). """ - def __init__(self, handler, thread_group=None, timeout=None): + def __init__(self, handler, thread_group=None, timeout=None, + exit_on_stop=False): """Initializes a new Watcher instance. :param handler: a `callable` object to be invoked for each observed @@ -82,6 +84,7 @@ class Watcher(health.HealthHandler): if timeout is None: timeout = CONF.kubernetes.watch_retry_timeout self._timeout = timeout + self._exit_on_stop = exit_on_stop def add(self, path): """Adds ths K8s resource to the Watcher. @@ -152,6 +155,14 @@ class Watcher(health.HealthHandler): LOG.info("Stopped watching '%s'", path) except KeyError: LOG.error("Failed to exit watch gracefully") + finally: + if not self._watching and not self._idle: + self.stop() + if self._exit_on_stop: + LOG.info("No remaining active watchers, Exiting...") + # TODO(dulek): This complicates things, remove once we + # don't support running without kuryr-daemon. + sys.exit(1) def _watch(self, path): attempts = 0