process to gracefully exit when last watcher exits

In case all the watchers (in the CNI case the pod watcher only) have
gracefully exited, continuing the process only serves to give a false
appearance of things working. At the same time, it prevents the
containerized deployment orchestrator from realizing that the Kuryr pod
is not functional so it does not restart it.

This fix allows non health proves environments where all watchers have
gracefully exited to be restarted by k8s/ocp and eventually work again
should the issue that made the graceful exits happen be solved.

Change-Id: Id70978e06d980bc0ffa08bcee02d78bef9dcbeb8
Closes-Bug: #1776676
Signed-off-by: Antoni Segura Puimedon <antonisp@celebdor.com>
This commit is contained in:
Antoni Segura Puimedon 2018-06-13 15:45:43 +02:00 committed by Michał Dulko
parent 3e1f3e03ac
commit 372b835ebb
4 changed files with 36 additions and 10 deletions

View File

@ -190,7 +190,7 @@ class CNIDaemonWatcherService(cotyledon.Service):
self.pipeline = h_cni.CNIPipeline()
self.pipeline.register(h_cni.CallbackHandler(self.on_done,
self.on_deleted))
self.watcher = k_watcher.Watcher(self.pipeline)
self.watcher = k_watcher.Watcher(self.pipeline, exit_on_stop=True)
self.watcher.add(
"%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % {
'base': k_const.K8S_API_BASE,

View File

@ -83,7 +83,7 @@ class KuryrK8sService(six.with_metaclass(KuryrK8sServiceMeta,
objects.register_locally_defined_vifs()
pipeline = h_pipeline.ControllerPipeline(self.tg)
self.watcher = watcher.Watcher(pipeline, self.tg)
self.watcher = watcher.Watcher(pipeline, self.tg, exit_on_stop=True)
self.health_manager = health.HealthServer()
self.current_leader = None
self.node_name = utils.get_node_name()

View File

@ -211,14 +211,16 @@ class TestWatcher(test_base.TestCase):
@staticmethod
def _test_watch_create_watcher(path, handler, timeout=0):
watcher_obj = watcher.Watcher(handler, timeout=timeout)
watcher_obj = watcher.Watcher(handler, timeout=timeout,
exit_on_stop=True)
watcher_obj._running = True
watcher_obj._resources.add(path)
watcher_obj._idle[path] = True
watcher_obj._watching[path] = None
return watcher_obj
def test_watch(self):
@mock.patch('sys.exit')
def test_watch(self, m_sys_exit):
path = '/test'
events = [{'e': i} for i in range(3)]
@ -234,8 +236,12 @@ class TestWatcher(test_base.TestCase):
self.assertEqual(0, watcher_obj._timeout)
m_handler.assert_has_calls([mock.call(e) for e in events])
# After all events have been "handled", since there is only
# one handler, we'll gracefully exit
m_sys_exit.assert_called_once_with(1)
def test_watch_stopped(self):
@mock.patch('sys.exit')
def test_watch_stopped(self, m_sys_exit):
path = '/test'
events = [{'e': i} for i in range(3)]
@ -253,8 +259,10 @@ class TestWatcher(test_base.TestCase):
m_handler.assert_called_once_with(events[0])
self.assertNotIn(path, watcher_obj._idle)
self.assertNotIn(path, watcher_obj._watching)
m_sys_exit.assert_called_once_with(1)
def test_watch_removed(self):
@mock.patch('sys.exit')
def test_watch_removed(self, m_sys_exit):
path = '/test'
events = [{'e': i} for i in range(3)]
@ -272,8 +280,10 @@ class TestWatcher(test_base.TestCase):
m_handler.assert_called_once_with(events[0])
self.assertNotIn(path, watcher_obj._idle)
self.assertNotIn(path, watcher_obj._watching)
m_sys_exit.assert_called_once_with(1)
def test_watch_interrupted(self):
@mock.patch('sys.exit')
def test_watch_interrupted(self, m_sys_exit):
path = '/test'
events = [{'e': i} for i in range(3)]
@ -291,8 +301,10 @@ class TestWatcher(test_base.TestCase):
m_handler.assert_called_once_with(events[0])
self.assertNotIn(path, watcher_obj._idle)
self.assertNotIn(path, watcher_obj._watching)
m_sys_exit.assert_called_once_with(1)
def test_watch_client_request_failed(self):
@mock.patch('sys.exit')
def test_watch_client_request_failed(self, m_sys_exit):
path = '/test'
m_handler = mock.Mock()
watcher_obj = self._test_watch_create_watcher(path, m_handler)
@ -302,8 +314,10 @@ class TestWatcher(test_base.TestCase):
self.client.watch.assert_called_once()
self.assertFalse(watcher_obj._healthy)
m_sys_exit.assert_called_once_with(1)
def test_watch_retry(self):
@mock.patch('sys.exit')
def test_watch_retry(self, m_sys_exit):
path = '/test'
events = [{'e': i} for i in range(3)]
side_effects = [exceptions.ChunkedEncodingError("Connection Broken")]
@ -317,3 +331,4 @@ class TestWatcher(test_base.TestCase):
watcher_obj._watch(path)
m_handler.assert_has_calls([mock.call(e) for e in events])
m_sys_exit.assert_called_once_with(1)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import sys
import time
from kuryr_kubernetes import clients
@ -55,7 +56,8 @@ class Watcher(health.HealthHandler):
graceful=False)` for asynchronous `Watcher`).
"""
def __init__(self, handler, thread_group=None, timeout=None):
def __init__(self, handler, thread_group=None, timeout=None,
exit_on_stop=False):
"""Initializes a new Watcher instance.
:param handler: a `callable` object to be invoked for each observed
@ -82,6 +84,7 @@ class Watcher(health.HealthHandler):
if timeout is None:
timeout = CONF.kubernetes.watch_retry_timeout
self._timeout = timeout
self._exit_on_stop = exit_on_stop
def add(self, path):
"""Adds ths K8s resource to the Watcher.
@ -152,6 +155,14 @@ class Watcher(health.HealthHandler):
LOG.info("Stopped watching '%s'", path)
except KeyError:
LOG.error("Failed to exit watch gracefully")
finally:
if not self._watching and not self._idle:
self.stop()
if self._exit_on_stop:
LOG.info("No remaining active watchers, Exiting...")
# TODO(dulek): This complicates things, remove once we
# don't support running without kuryr-daemon.
sys.exit(1)
def _watch(self, path):
attempts = 0