Merge "Add liveness checks to Kuryr Controller"
This commit is contained in:
commit
cc3e128ff6
|
@ -439,10 +439,15 @@ spec:
|
|||
subPath: kuryr.conf
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /healthz
|
||||
path: /ready
|
||||
port: ${health_server_port}
|
||||
scheme: HTTP
|
||||
timeoutSeconds: 5
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
path: /alive
|
||||
port: ${health_server_port}
|
||||
initialDelaySeconds: 15
|
||||
EOF
|
||||
|
||||
cat >> "${output_dir}/controller_deployment.yml" << EOF
|
||||
|
|
|
@ -23,8 +23,8 @@ The purpose of this document is to present the design decision behind
|
|||
Kuryr Kubernetes Health Manager.
|
||||
|
||||
The main purpose of the Health Manager is to perform Health verifications
|
||||
that assures Kuryr Controller readiness and so improve the management that
|
||||
Kubernetes does on Kuryr Controller pod.
|
||||
that assures Kuryr Controller readiness and liveness, and so improve the
|
||||
management that Kubernetes does on Kuryr Controller pod.
|
||||
|
||||
Overview
|
||||
--------
|
||||
|
@ -34,19 +34,19 @@ unable to connect with services it depends on and they being not healthy.
|
|||
|
||||
It is important to check health of these services so that Kubernetes and
|
||||
its users know when Kuryr Controller it is ready to perform its networking
|
||||
tasks. To provide this functionality, Health Manager will verify and serve
|
||||
the health state of these services to the probe.
|
||||
tasks. Also, it is necessary to check the health state of Kuryr components in
|
||||
order to assure Kuryr Controller service is alive. To provide these
|
||||
functionalities, Health Manager will verify and serve the health state of
|
||||
these services and components to the probe.
|
||||
|
||||
Proposed Solution
|
||||
-----------------
|
||||
The Health Manager will provide an endpoint that will check whether it is
|
||||
One of the endpoints provided by The Health Manager will check whether it is
|
||||
able to watch the Kubernetes API, authenticate with Keystone and talk to
|
||||
Neutron, since these are services needed by Kuryr Controller. These checks
|
||||
will assure the Controller readiness.
|
||||
will assure the Controller readiness. The other endpoint, will verify
|
||||
the health state of Kuryr components and guarantee Controller liveness.
|
||||
|
||||
The idea behind the Manager is to combine all the necessary checks in a
|
||||
server running inside Kuryr Controller pod and provide the checks result
|
||||
to the probe.
|
||||
|
||||
This design focuses on providing health checks for readiness probe, but
|
||||
another endpoint can be created for liveness probes.
|
||||
|
|
|
@ -40,6 +40,7 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
|
|||
OBJECT_KIND = k_const.K8S_OBJ_SERVICE
|
||||
|
||||
def __init__(self):
|
||||
super(LBaaSSpecHandler, self).__init__()
|
||||
self._drv_project = drv_base.ServiceProjectDriver.get_instance()
|
||||
self._drv_subnets = drv_base.ServiceSubnetsDriver.get_instance()
|
||||
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
|
||||
|
@ -219,6 +220,7 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
OBJECT_KIND = k_const.K8S_OBJ_ENDPOINTS
|
||||
|
||||
def __init__(self):
|
||||
super(LoadBalancerHandler, self).__init__()
|
||||
self._drv_lbaas = drv_base.LBaaSDriver.get_instance()
|
||||
self._drv_pod_project = drv_base.PodProjectDriver.get_instance()
|
||||
self._drv_pod_subnets = drv_base.PodSubnetsDriver.get_instance()
|
||||
|
|
|
@ -40,6 +40,7 @@ class VIFHandler(k8s_base.ResourceEventHandler):
|
|||
OBJECT_KIND = constants.K8S_OBJ_POD
|
||||
|
||||
def __init__(self):
|
||||
super(VIFHandler, self).__init__()
|
||||
self._drv_project = drivers.PodProjectDriver.get_instance()
|
||||
self._drv_subnets = drivers.PodSubnetsDriver.get_instance()
|
||||
self._drv_sg = drivers.PodSecurityGroupsDriver.get_instance()
|
||||
|
|
|
@ -12,13 +12,13 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import eventlet
|
||||
from flask import Flask
|
||||
from keystoneauth1 import exceptions as k_exc
|
||||
from keystoneclient import client as keystone_client
|
||||
from kuryr.lib._i18n import _
|
||||
from kuryr.lib import config as kuryr_config
|
||||
from kuryr.lib import utils
|
||||
from kuryr_kubernetes.handlers import health as h_health
|
||||
import os
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
@ -38,16 +38,25 @@ CONF.register_opts(health_server_opts, "health_server")
|
|||
|
||||
|
||||
class HealthServer(object):
|
||||
"""Proxy server used by readiness and liveness probes to manage health checks.
|
||||
|
||||
Allows to verify connectivity with Kubernetes API, Keystone and Neutron.
|
||||
If pool ports functionality is enabled it is verified whether
|
||||
the precreated ports are loaded into the pools. Also, checks handlers
|
||||
states.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.ctx = None
|
||||
|
||||
self._registry = h_health.HealthRegister.get_instance().registry
|
||||
self.application = Flask('health-daemon')
|
||||
self.application.add_url_rule(
|
||||
'/healthz', methods=['GET'], view_func=self.read)
|
||||
'/ready', methods=['GET'], view_func=self.readiness_status)
|
||||
self.application.add_url_rule(
|
||||
'/alive', methods=['GET'], view_func=self.liveness_status)
|
||||
self.headers = {'Connection': 'close'}
|
||||
|
||||
def read(self):
|
||||
def readiness_status(self):
|
||||
data = 'ok'
|
||||
|
||||
if CONF.kubernetes.vif_pool_driver != 'noop':
|
||||
|
@ -81,6 +90,15 @@ class HealthServer(object):
|
|||
LOG.info('Kuryr Controller readiness verified.')
|
||||
return data, httplib.OK, self.headers
|
||||
|
||||
def liveness_status(self):
|
||||
data = 'ok'
|
||||
for component in self._registry:
|
||||
if not component.is_healthy():
|
||||
LOG.debug('Kuryr Controller not healthy.')
|
||||
return '', httplib.INTERNAL_SERVER_ERROR, self.headers
|
||||
LOG.debug('Kuryr Controller Liveness verified.')
|
||||
return data, httplib.OK, self.headers
|
||||
|
||||
def run(self):
|
||||
address = ''
|
||||
try:
|
||||
|
@ -109,19 +127,3 @@ class HealthServer(object):
|
|||
def verify_neutron_connection(self):
|
||||
neutron = utils.get_neutron_client()
|
||||
neutron.list_extensions()
|
||||
|
||||
|
||||
class ReadinessChecker(object):
|
||||
"""Proxy server used by readiness probe to manage health checks.
|
||||
|
||||
Allows to verify connectivity with Kubernetes API, Keystone and Neutron.
|
||||
Also, if pool ports functionality is enabled it is verified whether
|
||||
the precreated ports are loaded into the pools.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
eventlet.spawn(self._start_readiness_checker_daemon)
|
||||
|
||||
def _start_readiness_checker_daemon(self):
|
||||
server = HealthServer()
|
||||
server.run()
|
||||
|
|
|
@ -41,6 +41,7 @@ class KuryrK8sService(service.Service):
|
|||
objects.register_locally_defined_vifs()
|
||||
pipeline = h_pipeline.ControllerPipeline(self.tg)
|
||||
self.watcher = watcher.Watcher(pipeline, self.tg)
|
||||
self.health_manager = health.HealthServer()
|
||||
# TODO(ivc): pluggable resource/handler registration
|
||||
for resource in ["pods", "services", "endpoints"]:
|
||||
self.watcher.add("%s/%s" % (constants.K8S_API_BASE, resource))
|
||||
|
@ -50,9 +51,9 @@ class KuryrK8sService(service.Service):
|
|||
|
||||
def start(self):
|
||||
LOG.info("Service '%s' starting", self.__class__.__name__)
|
||||
health.ReadinessChecker()
|
||||
super(KuryrK8sService, self).start()
|
||||
self.watcher.start()
|
||||
self.health_manager.run()
|
||||
LOG.info("Service '%s' started", self.__class__.__name__)
|
||||
|
||||
def wait(self):
|
||||
|
|
|
@ -72,6 +72,9 @@ class EventConsumer(h_base.EventHandler):
|
|||
registered by the `EventPipeline`.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super(EventConsumer, self).__init__()
|
||||
|
||||
@abc.abstractproperty
|
||||
def consumes(self):
|
||||
"""Predicates determining events supported by this handler.
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
# Copyright 2018 Maysa de Macedo Souza.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
class HealthRegister(object):
|
||||
instance = None
|
||||
|
||||
def __init__(self):
|
||||
self.registry = []
|
||||
|
||||
def register(self, elem):
|
||||
self.registry.append(elem)
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls):
|
||||
if not HealthRegister.instance:
|
||||
HealthRegister.instance = cls()
|
||||
return HealthRegister.instance
|
||||
|
||||
|
||||
class HealthHandler(object):
|
||||
"""Base class for health handlers."""
|
||||
def __init__(self):
|
||||
super(HealthHandler, self).__init__()
|
||||
self._healthy = True
|
||||
self._manager = HealthRegister.get_instance()
|
||||
self._manager.register(self)
|
||||
|
||||
def set_health_status(self, healthy):
|
||||
self._healthy = healthy
|
||||
|
||||
def is_healthy(self):
|
||||
return self._healthy
|
|
@ -14,6 +14,7 @@
|
|||
# under the License.
|
||||
|
||||
from kuryr_kubernetes.handlers import dispatch
|
||||
from kuryr_kubernetes.handlers import health
|
||||
|
||||
|
||||
def object_kind(event):
|
||||
|
@ -30,7 +31,7 @@ def object_link(event):
|
|||
return None
|
||||
|
||||
|
||||
class ResourceEventHandler(dispatch.EventConsumer):
|
||||
class ResourceEventHandler(dispatch.EventConsumer, health.HealthHandler):
|
||||
"""Base class for K8s event handlers.
|
||||
|
||||
Implementing classes should override the `OBJECT_KIND` attribute with a
|
||||
|
@ -48,6 +49,9 @@ class ResourceEventHandler(dispatch.EventConsumer):
|
|||
|
||||
OBJECT_KIND = None
|
||||
|
||||
def __init__(self):
|
||||
super(ResourceEventHandler, self).__init__()
|
||||
|
||||
@property
|
||||
def consumes(self):
|
||||
return {object_kind: self.OBJECT_KIND}
|
||||
|
|
|
@ -64,6 +64,12 @@ class Retry(base.EventHandler):
|
|||
with excutils.save_and_reraise_exception() as ex:
|
||||
if self._sleep(deadline, attempt, ex.value):
|
||||
ex.reraise = False
|
||||
else:
|
||||
LOG.debug('Report handler unhealthy %s', self._handler)
|
||||
self._handler.set_health_status(healthy=False)
|
||||
except Exception:
|
||||
LOG.debug('Report handler unhealthy %s', self._handler)
|
||||
self._handler.set_health_status(healthy=False)
|
||||
|
||||
def _sleep(self, deadline, attempt, exception):
|
||||
now = time.time()
|
||||
|
|
|
@ -14,11 +14,17 @@
|
|||
|
||||
from keystoneauth1 import exceptions
|
||||
from kuryr_kubernetes.controller.managers import health
|
||||
from kuryr_kubernetes.handlers import health as h_health
|
||||
from kuryr_kubernetes.tests import base
|
||||
import mock
|
||||
from oslo_config import cfg as oslo_cfg
|
||||
|
||||
|
||||
class _TestHandler(h_health.HealthHandler):
|
||||
def is_healthy(self):
|
||||
pass
|
||||
|
||||
|
||||
class TestHealthServer(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -38,7 +44,7 @@ class TestHealthServer(base.TestCase):
|
|||
m_verify_neutron_conn, m_exist):
|
||||
m_verify_k8s_conn.return_value = True, 200
|
||||
m_exist.return_value = True
|
||||
resp = self.test_client.get('/healthz')
|
||||
resp = self.test_client.get('/ready')
|
||||
m_verify_k8s_conn.assert_called_once()
|
||||
m_verify_keystone_conn.assert_called_once()
|
||||
m_verify_neutron_conn.assert_called_once_with()
|
||||
|
@ -51,7 +57,7 @@ class TestHealthServer(base.TestCase):
|
|||
m_exist.return_value = False
|
||||
oslo_cfg.CONF.set_override('vif_pool_driver', 'neutron',
|
||||
group='kubernetes')
|
||||
resp = self.test_client.get('/healthz')
|
||||
resp = self.test_client.get('/ready')
|
||||
self.assertEqual(404, resp.status_code)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
|
||||
|
@ -60,7 +66,7 @@ class TestHealthServer(base.TestCase):
|
|||
def test_read_k8s_error(self, m_exist, m_verify_k8s_conn):
|
||||
m_exist.return_value = True
|
||||
m_verify_k8s_conn.return_value = False, 503
|
||||
resp = self.test_client.get('/healthz')
|
||||
resp = self.test_client.get('/ready')
|
||||
|
||||
m_verify_k8s_conn.assert_called_once()
|
||||
self.assertEqual(503, resp.status_code)
|
||||
|
@ -75,7 +81,7 @@ class TestHealthServer(base.TestCase):
|
|||
m_exist.return_value = True
|
||||
m_verify_k8s_conn.return_value = True, 200
|
||||
m_verify_keystone_conn.side_effect = exceptions.http.Unauthorized
|
||||
resp = self.test_client.get('/healthz')
|
||||
resp = self.test_client.get('/ready')
|
||||
|
||||
m_verify_keystone_conn.assert_called_once()
|
||||
self.assertEqual(401, resp.status_code)
|
||||
|
@ -92,7 +98,26 @@ class TestHealthServer(base.TestCase):
|
|||
m_exist.return_value = True
|
||||
m_verify_k8s_conn.return_value = True, 200
|
||||
m_verify_neutron_conn.side_effect = Exception
|
||||
resp = self.test_client.get('/healthz')
|
||||
resp = self.test_client.get('/ready')
|
||||
|
||||
m_verify_neutron_conn.assert_called_once()
|
||||
self.assertEqual(500, resp.status_code)
|
||||
|
||||
@mock.patch.object(_TestHandler, 'is_healthy')
|
||||
def test_liveness(self, m_status):
|
||||
m_status.return_value = True
|
||||
self.srv._registry = [_TestHandler()]
|
||||
|
||||
resp = self.test_client.get('/alive')
|
||||
|
||||
m_status.assert_called_once()
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
@mock.patch.object(_TestHandler, 'is_healthy')
|
||||
def test_liveness_error(self, m_status):
|
||||
m_status.return_value = False
|
||||
self.srv._registry = [_TestHandler()]
|
||||
resp = self.test_client.get('/alive')
|
||||
|
||||
m_status.assert_called_once()
|
||||
self.assertEqual(500, resp.status_code)
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
# Copyright 2018 Maysa de Macedo Souza.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from kuryr_kubernetes.handlers import health as h_health
|
||||
from kuryr_kubernetes.tests import base as test_base
|
||||
import mock
|
||||
|
||||
|
||||
class _TestHandler(h_health.HealthHandler):
|
||||
def is_healthy(self):
|
||||
pass
|
||||
|
||||
|
||||
class TestHealthRegister(test_base.TestCase):
|
||||
|
||||
def test_register(self):
|
||||
m_component = mock.Mock()
|
||||
health_register = h_health.HealthRegister()
|
||||
health_register.register(m_component)
|
||||
|
||||
self.assertEqual(health_register.registry, [m_component])
|
||||
|
||||
|
||||
class TestHealthHandler(test_base.TestCase):
|
||||
|
||||
@mock.patch.object(h_health.HealthRegister, 'get_instance')
|
||||
def test_init(self, m_health_register):
|
||||
cls = h_health.HealthRegister
|
||||
m_health_register_obj = mock.Mock(spec=cls)
|
||||
m_health_register.return_value = m_health_register_obj
|
||||
|
||||
health_handler = _TestHandler()
|
||||
|
||||
self.assertTrue(health_handler._healthy)
|
||||
m_health_register_obj.register.assert_called_once_with(health_handler)
|
||||
self.assertEqual(m_health_register_obj, health_handler._manager)
|
|
@ -137,14 +137,15 @@ class TestRetryHandler(test_base.TestCase):
|
|||
|
||||
@mock.patch('itertools.count')
|
||||
@mock.patch.object(h_retry.Retry, '_sleep')
|
||||
def test_call_raises_no_retry(self, m_sleep, m_count):
|
||||
def test_call_should_not_raise(self, m_sleep, m_count):
|
||||
event = mock.sentinel.event
|
||||
m_handler = mock.Mock()
|
||||
m_handler.side_effect = _EX1()
|
||||
m_count.return_value = list(range(1, 5))
|
||||
retry = h_retry.Retry(m_handler, exceptions=(_EX11, _EX2))
|
||||
|
||||
self.assertRaises(_EX1, retry, event)
|
||||
retry(event)
|
||||
|
||||
m_handler.assert_called_once_with(event)
|
||||
m_handler.assert_called_with(event)
|
||||
m_handler.set_health_status.assert_called_with(healthy=False)
|
||||
m_sleep.assert_not_called()
|
||||
|
|
|
@ -19,6 +19,7 @@ import mock
|
|||
from kuryr_kubernetes.tests import base as test_base
|
||||
from kuryr_kubernetes.tests.unit import kuryr_fixtures as kuryr_fixtures
|
||||
from kuryr_kubernetes import watcher
|
||||
from requests import exceptions
|
||||
|
||||
|
||||
class TestWatcher(test_base.TestCase):
|
||||
|
@ -289,3 +290,14 @@ class TestWatcher(test_base.TestCase):
|
|||
m_handler.assert_called_once_with(events[0])
|
||||
self.assertNotIn(path, watcher_obj._idle)
|
||||
self.assertNotIn(path, watcher_obj._watching)
|
||||
|
||||
def test_watch_client_request_failed(self):
|
||||
path = '/test'
|
||||
m_handler = mock.Mock()
|
||||
watcher_obj = self._test_watch_create_watcher(path, m_handler)
|
||||
watcher_obj._watch(path)
|
||||
self.client.watch.side_effect = exceptions.ChunkedEncodingError(
|
||||
"Connection Broken")
|
||||
|
||||
self.client.watch.assert_called_once()
|
||||
self.assertFalse(watcher_obj._healthy)
|
||||
|
|
|
@ -13,14 +13,14 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from kuryr_kubernetes import clients
|
||||
from kuryr_kubernetes.handlers import health
|
||||
from oslo_log import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Watcher(object):
|
||||
class Watcher(health.HealthHandler):
|
||||
"""Observes K8s resources' events using K8s '?watch=true' API.
|
||||
|
||||
The `Watcher` maintains a list of K8s resources and manages the event
|
||||
|
@ -65,11 +65,11 @@ class Watcher(object):
|
|||
specified, the `Watcher` will operate in a
|
||||
synchronous mode.
|
||||
"""
|
||||
super(Watcher, self).__init__()
|
||||
self._client = clients.get_kubernetes_client()
|
||||
self._handler = handler
|
||||
self._thread_group = thread_group
|
||||
self._running = False
|
||||
|
||||
self._resources = set()
|
||||
self._watching = {}
|
||||
self._idle = {}
|
||||
|
@ -141,6 +141,8 @@ class Watcher(object):
|
|||
self._idle[path] = True
|
||||
if not (self._running and path in self._resources):
|
||||
return
|
||||
except Exception:
|
||||
self._healthy = False
|
||||
finally:
|
||||
self._watching.pop(path)
|
||||
self._idle.pop(path)
|
||||
|
|
Loading…
Reference in New Issue