process to gracefully exit when last watcher exits
In case all the watchers (in the CNI case the pod watcher only) have gracefully exited, continuing the process only serves to give a false appearance of things working. At the same time, it prevents the containerized deployment orchestrator from realizing that the Kuryr pod is not functional so it does not restart it. This fix allows non health proves environments where all watchers have gracefully exited to be restarted by k8s/ocp and eventually work again should the issue that made the graceful exits happen be solved. Change-Id: Id70978e06d980bc0ffa08bcee02d78bef9dcbeb8 Closes-Bug: #1776676 Signed-off-by: Antoni Segura Puimedon <antonisp@celebdor.com>
This commit is contained in:
parent
3e1f3e03ac
commit
372b835ebb
|
@ -190,7 +190,7 @@ class CNIDaemonWatcherService(cotyledon.Service):
|
|||
self.pipeline = h_cni.CNIPipeline()
|
||||
self.pipeline.register(h_cni.CallbackHandler(self.on_done,
|
||||
self.on_deleted))
|
||||
self.watcher = k_watcher.Watcher(self.pipeline)
|
||||
self.watcher = k_watcher.Watcher(self.pipeline, exit_on_stop=True)
|
||||
self.watcher.add(
|
||||
"%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % {
|
||||
'base': k_const.K8S_API_BASE,
|
||||
|
|
|
@ -83,7 +83,7 @@ class KuryrK8sService(six.with_metaclass(KuryrK8sServiceMeta,
|
|||
|
||||
objects.register_locally_defined_vifs()
|
||||
pipeline = h_pipeline.ControllerPipeline(self.tg)
|
||||
self.watcher = watcher.Watcher(pipeline, self.tg)
|
||||
self.watcher = watcher.Watcher(pipeline, self.tg, exit_on_stop=True)
|
||||
self.health_manager = health.HealthServer()
|
||||
self.current_leader = None
|
||||
self.node_name = utils.get_node_name()
|
||||
|
|
|
@ -211,14 +211,16 @@ class TestWatcher(test_base.TestCase):
|
|||
|
||||
@staticmethod
|
||||
def _test_watch_create_watcher(path, handler, timeout=0):
|
||||
watcher_obj = watcher.Watcher(handler, timeout=timeout)
|
||||
watcher_obj = watcher.Watcher(handler, timeout=timeout,
|
||||
exit_on_stop=True)
|
||||
watcher_obj._running = True
|
||||
watcher_obj._resources.add(path)
|
||||
watcher_obj._idle[path] = True
|
||||
watcher_obj._watching[path] = None
|
||||
return watcher_obj
|
||||
|
||||
def test_watch(self):
|
||||
@mock.patch('sys.exit')
|
||||
def test_watch(self, m_sys_exit):
|
||||
path = '/test'
|
||||
events = [{'e': i} for i in range(3)]
|
||||
|
||||
|
@ -234,8 +236,12 @@ class TestWatcher(test_base.TestCase):
|
|||
|
||||
self.assertEqual(0, watcher_obj._timeout)
|
||||
m_handler.assert_has_calls([mock.call(e) for e in events])
|
||||
# After all events have been "handled", since there is only
|
||||
# one handler, we'll gracefully exit
|
||||
m_sys_exit.assert_called_once_with(1)
|
||||
|
||||
def test_watch_stopped(self):
|
||||
@mock.patch('sys.exit')
|
||||
def test_watch_stopped(self, m_sys_exit):
|
||||
path = '/test'
|
||||
events = [{'e': i} for i in range(3)]
|
||||
|
||||
|
@ -253,8 +259,10 @@ class TestWatcher(test_base.TestCase):
|
|||
m_handler.assert_called_once_with(events[0])
|
||||
self.assertNotIn(path, watcher_obj._idle)
|
||||
self.assertNotIn(path, watcher_obj._watching)
|
||||
m_sys_exit.assert_called_once_with(1)
|
||||
|
||||
def test_watch_removed(self):
|
||||
@mock.patch('sys.exit')
|
||||
def test_watch_removed(self, m_sys_exit):
|
||||
path = '/test'
|
||||
events = [{'e': i} for i in range(3)]
|
||||
|
||||
|
@ -272,8 +280,10 @@ class TestWatcher(test_base.TestCase):
|
|||
m_handler.assert_called_once_with(events[0])
|
||||
self.assertNotIn(path, watcher_obj._idle)
|
||||
self.assertNotIn(path, watcher_obj._watching)
|
||||
m_sys_exit.assert_called_once_with(1)
|
||||
|
||||
def test_watch_interrupted(self):
|
||||
@mock.patch('sys.exit')
|
||||
def test_watch_interrupted(self, m_sys_exit):
|
||||
path = '/test'
|
||||
events = [{'e': i} for i in range(3)]
|
||||
|
||||
|
@ -291,8 +301,10 @@ class TestWatcher(test_base.TestCase):
|
|||
m_handler.assert_called_once_with(events[0])
|
||||
self.assertNotIn(path, watcher_obj._idle)
|
||||
self.assertNotIn(path, watcher_obj._watching)
|
||||
m_sys_exit.assert_called_once_with(1)
|
||||
|
||||
def test_watch_client_request_failed(self):
|
||||
@mock.patch('sys.exit')
|
||||
def test_watch_client_request_failed(self, m_sys_exit):
|
||||
path = '/test'
|
||||
m_handler = mock.Mock()
|
||||
watcher_obj = self._test_watch_create_watcher(path, m_handler)
|
||||
|
@ -302,8 +314,10 @@ class TestWatcher(test_base.TestCase):
|
|||
|
||||
self.client.watch.assert_called_once()
|
||||
self.assertFalse(watcher_obj._healthy)
|
||||
m_sys_exit.assert_called_once_with(1)
|
||||
|
||||
def test_watch_retry(self):
|
||||
@mock.patch('sys.exit')
|
||||
def test_watch_retry(self, m_sys_exit):
|
||||
path = '/test'
|
||||
events = [{'e': i} for i in range(3)]
|
||||
side_effects = [exceptions.ChunkedEncodingError("Connection Broken")]
|
||||
|
@ -317,3 +331,4 @@ class TestWatcher(test_base.TestCase):
|
|||
watcher_obj._watch(path)
|
||||
|
||||
m_handler.assert_has_calls([mock.call(e) for e in events])
|
||||
m_sys_exit.assert_called_once_with(1)
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
from kuryr_kubernetes import clients
|
||||
|
@ -55,7 +56,8 @@ class Watcher(health.HealthHandler):
|
|||
graceful=False)` for asynchronous `Watcher`).
|
||||
"""
|
||||
|
||||
def __init__(self, handler, thread_group=None, timeout=None):
|
||||
def __init__(self, handler, thread_group=None, timeout=None,
|
||||
exit_on_stop=False):
|
||||
"""Initializes a new Watcher instance.
|
||||
|
||||
:param handler: a `callable` object to be invoked for each observed
|
||||
|
@ -82,6 +84,7 @@ class Watcher(health.HealthHandler):
|
|||
if timeout is None:
|
||||
timeout = CONF.kubernetes.watch_retry_timeout
|
||||
self._timeout = timeout
|
||||
self._exit_on_stop = exit_on_stop
|
||||
|
||||
def add(self, path):
|
||||
"""Adds ths K8s resource to the Watcher.
|
||||
|
@ -152,6 +155,14 @@ class Watcher(health.HealthHandler):
|
|||
LOG.info("Stopped watching '%s'", path)
|
||||
except KeyError:
|
||||
LOG.error("Failed to exit watch gracefully")
|
||||
finally:
|
||||
if not self._watching and not self._idle:
|
||||
self.stop()
|
||||
if self._exit_on_stop:
|
||||
LOG.info("No remaining active watchers, Exiting...")
|
||||
# TODO(dulek): This complicates things, remove once we
|
||||
# don't support running without kuryr-daemon.
|
||||
sys.exit(1)
|
||||
|
||||
def _watch(self, path):
|
||||
attempts = 0
|
||||
|
|
Loading…
Reference in New Issue