Add quota readiness check to controller

Currently, if the number of neutron resources requested reaches
the quota, kuryr-controller is marked as unhealthy and restarted.
In order to avoid the constant restart of the pod, this patch adds
a new readiness checks that checks if the resources used by
the enabled handlers are over quota.

Closes-Bug: 1804310
Change-Id: If4d42f866d2d64cae63736f4c206bedca039258b
This commit is contained in:
Maysa Macedo 2018-11-18 21:02:33 +00:00
parent 3d845cecd0
commit b215aae146
16 changed files with 252 additions and 61 deletions

View File

@ -95,7 +95,7 @@ def connect(vif, instance_info, ifname, netns=None, report_health=None,
is_default_gateway=True, container_id=None):
driver = _get_binding_driver(vif)
if report_health:
report_health(driver.is_healthy())
report_health(driver.is_alive())
os_vif.plug(vif, instance_info)
driver.connect(vif, ifname, netns, container_id)
_configure_l3(vif, ifname, netns, is_default_gateway)
@ -105,6 +105,6 @@ def disconnect(vif, instance_info, ifname, netns=None, report_health=None,
container_id=None, **kwargs):
driver = _get_binding_driver(vif)
if report_health:
report_health(driver.is_healthy())
report_health(driver.is_alive())
driver.disconnect(vif, ifname, netns, container_id)
os_vif.unplug(vif, instance_info)

View File

@ -109,7 +109,7 @@ class VIFOpenVSwitchDriver(BaseBridgeDriver):
container_id)
net_utils.delete_ovs_vif_port(vif.bridge_name, vif.vif_name)
def is_healthy(self):
def is_alive(self):
bridge_name = CONF.neutron_defaults.ovs_bridge
try:
with b_base.get_ipdb() as h_ipdb:

View File

@ -168,7 +168,7 @@ class VIFSriovDriver(object):
LOG.exception("Unable to execute %s", cmd)
raise
def is_healthy(self):
def is_alive(self):
bridge_name = CONF.neutron_defaults.ovs_bridge
try:
with b_base.get_ipdb() as h_ipdb:

View File

@ -203,7 +203,7 @@ class CNIDaemonWatcherService(cotyledon.Service):
def _start_watcher_health_checker(self):
while self.is_running:
if not self.watcher.is_healthy():
if not self.watcher.is_alive():
LOG.debug("Reporting watcher not healthy.")
with self.healthy.get_lock():
self.healthy.value = False

View File

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_cache import core as cache
from oslo_config import cfg as oslo_cfg
from oslo_log import log as logging
from kuryr_kubernetes import clients
@ -19,11 +21,27 @@ from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import base as drivers
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils
from neutronclient.common import exceptions as n_exc
LOG = logging.getLogger(__name__)
namespace_handler_caching_opts = [
oslo_cfg.BoolOpt('caching', default=True),
oslo_cfg.IntOpt('cache_time', default=120),
]
oslo_cfg.CONF.register_opts(namespace_handler_caching_opts,
"namespace_handler_caching")
cache.configure(oslo_cfg.CONF)
namespace_handler_cache_region = cache.create_region()
MEMOIZE = cache.get_memoization_decorator(
oslo_cfg.CONF, namespace_handler_cache_region, "namespace_handler_caching")
cache.configure_cache_region(oslo_cfg.CONF, namespace_handler_cache_region)
class NamespaceHandler(k8s_base.ResourceEventHandler):
OBJECT_KIND = constants.K8S_OBJ_NAMESPACE
@ -93,6 +111,22 @@ class NamespaceHandler(k8s_base.ResourceEventHandler):
self._del_kuryrnet_crd(net_crd_id)
@MEMOIZE
def is_ready(self, quota):
neutron = clients.get_neutron_client()
resources = {'subnet': neutron.list_subnets,
'network': neutron.list_networks,
'security_group': neutron.list_security_groups}
for resource, neutron_func in resources.items():
resource_quota = quota[resource]
resource_name = resource + 's'
if utils.has_limit(resource_quota):
if not utils.is_available(resource_name, resource_quota,
neutron_func):
return False
return True
def _get_net_crd_id(self, namespace):
try:
annotations = namespace['metadata']['annotations']

View File

@ -12,14 +12,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_cache import core as cache
from oslo_config import cfg as oslo_cfg
from oslo_log import log as logging
from kuryr_kubernetes import clients
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.controller.drivers import base as drivers
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
np_handler_caching_opts = [
oslo_cfg.BoolOpt('caching', default=True),
oslo_cfg.IntOpt('cache_time', default=120),
]
oslo_cfg.CONF.register_opts(np_handler_caching_opts,
"np_handler_caching")
cache.configure(oslo_cfg.CONF)
np_handler_cache_region = cache.create_region()
MEMOIZE = cache.get_memoization_decorator(
oslo_cfg.CONF, np_handler_cache_region, "np_handler_caching")
cache.configure_cache_region(oslo_cfg.CONF, np_handler_cache_region)
class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
"""NetworkPolicyHandler handles k8s Network Policies events"""
@ -41,3 +60,11 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
LOG.debug("Deleted network policy: %s", policy)
project_id = self._drv_project.get_project(policy)
self._drv_policy.release_network_policy(policy, project_id)
@MEMOIZE
def is_ready(self, quota):
neutron = clients.get_neutron_client()
sg_quota = quota['security_group']
sg_func = neutron.list_security_groups
if utils.has_limit(sg_quota):
return utils.is_available('security_groups', sg_quota, sg_func)

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_cache import core as cache
from oslo_config import cfg as oslo_cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
@ -27,6 +29,22 @@ from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
vif_handler_caching_opts = [
oslo_cfg.BoolOpt('caching', default=True),
oslo_cfg.IntOpt('cache_time', default=120),
]
oslo_cfg.CONF.register_opts(vif_handler_caching_opts,
"vif_handler_caching")
cache.configure(oslo_cfg.CONF)
vif_handler_cache_region = cache.create_region()
MEMOIZE = cache.get_memoization_decorator(
oslo_cfg.CONF, vif_handler_cache_region, "vif_handler_caching")
cache.configure_cache_region(oslo_cfg.CONF, vif_handler_cache_region)
class VIFHandler(k8s_base.ResourceEventHandler):
"""Controller side of VIF binding process for Kubernetes pods.
@ -127,6 +145,14 @@ class VIFHandler(k8s_base.ResourceEventHandler):
self._drv_vif_pool.release_vif(pod, vif, project_id,
security_groups)
@MEMOIZE
def is_ready(self, quota):
neutron = clients.get_neutron_client()
port_quota = quota['port']
port_func = neutron.list_ports
if utils.has_limit(port_quota):
return utils.is_available('ports', port_quota, port_func)
@staticmethod
def _is_host_network(pod):
return pod['spec'].get('hostNetwork', False)

View File

@ -23,6 +23,7 @@ from kuryr.lib._i18n import _
from kuryr.lib import config as kuryr_config
from kuryr.lib import utils
from kuryr_kubernetes import clients
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import exceptions as exc
from kuryr_kubernetes.handlers import health as h_health
@ -58,6 +59,17 @@ class HealthServer(object):
'/alive', methods=['GET'], view_func=self.liveness_status)
self.headers = {'Connection': 'close'}
def _components_ready(self):
neutron = clients.get_neutron_client()
project_id = config.CONF.neutron_defaults.project
quota = neutron.show_quota(project_id).get('quota')
for component in self._registry:
if not component.is_ready(quota):
LOG.debug('Controller components are not ready.')
return False
return True
def _has_kuryr_crd(self, crd_url):
k8s = clients.get_kubernetes_client()
try:
@ -89,12 +101,6 @@ class HealthServer(object):
'getting a token: %s.' % ex)
LOG.exception(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers
try:
self.verify_neutron_connection()
except Exception as ex:
error_message = 'Error when creating a Neutron client: %s.' % ex
LOG.exception(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers
crds = [k_const.K8S_API_CRD_KURYRNETS,
k_const.K8S_API_CRD_KURYRNETPOLICIES]
@ -104,13 +110,21 @@ class HealthServer(object):
LOG.error(error_msg)
return error_msg, httplib.INTERNAL_SERVER_ERROR, self.headers
try:
if not self._components_ready():
return '', httplib.INTERNAL_SERVER_ERROR, self.headers
except Exception as ex:
error_message = ('Error when processing neutron request %s' % ex)
LOG.exception(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers
LOG.info('Kuryr Controller readiness verified.')
return data, httplib.OK, self.headers
def liveness_status(self):
data = 'ok'
for component in self._registry:
if not component.is_healthy():
if not component.is_alive():
LOG.debug('Kuryr Controller not healthy.')
return '', httplib.INTERNAL_SERVER_ERROR, self.headers
LOG.debug('Kuryr Controller Liveness verified.')
@ -140,7 +154,3 @@ class HealthServer(object):
auth_plugin = utils.get_auth_plugin(conf_group)
sess = utils.get_keystone_session(conf_group, auth_plugin)
sess.get_token()
def verify_neutron_connection(self):
neutron = utils.get_neutron_client()
neutron.list_extensions()

View File

@ -33,12 +33,19 @@ class HealthHandler(object):
"""Base class for health handlers."""
def __init__(self):
super(HealthHandler, self).__init__()
self._healthy = True
self._alive = True
self._ready = True
self._manager = HealthRegister.get_instance()
self._manager.register(self)
def set_health_status(self, healthy):
self._healthy = healthy
def set_liveness(self, alive):
self._alive = alive
def is_healthy(self):
return self._healthy
def set_readiness(self, ready):
self._ready = ready
def is_alive(self):
return self._alive
def is_ready(self, *args):
return self._ready

View File

@ -16,6 +16,7 @@
import itertools
import time
from neutronclient.common import exceptions as n_exc
from oslo_log import log as logging
from oslo_utils import excutils
@ -54,16 +55,20 @@ class Retry(base.EventHandler):
try:
self._handler(event)
break
except n_exc.OverQuotaClient:
with excutils.save_and_reraise_exception() as ex:
if self._sleep(deadline, attempt, ex.value):
ex.reraise = False
except self._exceptions:
with excutils.save_and_reraise_exception() as ex:
if self._sleep(deadline, attempt, ex.value):
ex.reraise = False
else:
LOG.debug('Report handler unhealthy %s', self._handler)
self._handler.set_health_status(healthy=False)
self._handler.set_liveness(alive=False)
except Exception:
LOG.exception('Report handler unhealthy %s', self._handler)
self._handler.set_health_status(healthy=False)
self._handler.set_liveness(alive=False)
raise
def _sleep(self, deadline, attempt, exception):

View File

@ -19,6 +19,9 @@ from kuryr_kubernetes import config
from kuryr_kubernetes.controller.drivers import namespace_security_groups
from kuryr_kubernetes.controller.drivers import namespace_subnet
from kuryr_kubernetes.controller.drivers import vif_pool
from kuryr_kubernetes.controller.handlers import namespace
from kuryr_kubernetes.controller.handlers import policy
from kuryr_kubernetes.controller.handlers import vif
from kuryr_kubernetes.controller.managers import health
from kuryr_kubernetes.controller.managers import pool
from kuryr_kubernetes import utils
@ -41,6 +44,9 @@ _kuryr_k8s_opts = [
('namespace_sg', namespace_security_groups.namespace_sg_driver_opts),
('ingress', config.ingress),
('sriov', config.sriov_opts),
('namespace_handler_caching', namespace.namespace_handler_caching_opts),
('np_handler_caching', policy.np_handler_caching_opts),
('vif_handler_caching', vif.vif_handler_caching_opts),
]

View File

@ -22,8 +22,27 @@ import mock
from oslo_config import cfg as oslo_cfg
def get_quota_obj():
return {
'quota': {
'subnet': 100,
'network': 100,
'floatingip': 50,
'subnetpool': -1,
'security_group_rule': 100,
'security_group': 10,
'router': 10,
'rbac_policy': 10,
'port': 500
}
}
class _TestHandler(h_health.HealthHandler):
def is_healthy(self):
def is_alive(self):
pass
def is_ready(self):
pass
@ -35,28 +54,28 @@ class TestHealthServer(base.TestCase):
self.srv.application.testing = True
self.test_client = self.srv.application.test_client()
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'_components_ready')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'_has_kuryr_crd')
@mock.patch('os.path.exists')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_neutron_connection')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_keystone_connection')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_k8s_connection')
def test_readiness(self, m_verify_k8s_conn, m_verify_keystone_conn,
m_verify_neutron_conn, m_exist,
m_has_kuryr_crd):
m_exist, m_has_kuryr_crd, m_components_ready):
m_has_kuryr_crd.side_effect = [True, True]
m_verify_k8s_conn.return_value = True, 200
m_exist.return_value = True
m_components_ready.return_value = True
resp = self.test_client.get('/ready')
m_verify_k8s_conn.assert_called_once()
m_verify_keystone_conn.assert_called_once()
m_verify_neutron_conn.assert_called_once_with()
self.assertEqual(m_has_kuryr_crd.call_count, 2)
m_components_ready.assert_called_once()
self.assertEqual(200, resp.status_code)
self.assertEqual('ok', resp.data.decode())
@ -95,37 +114,16 @@ class TestHealthServer(base.TestCase):
m_verify_keystone_conn.assert_called_once()
self.assertEqual(500, resp.status_code)
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_neutron_connection')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_keystone_connection')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_k8s_connection')
@mock.patch('os.path.exists')
def test_readiness_neutron_error(self, m_exist, m_verify_k8s_conn,
m_verify_keystone_conn,
m_verify_neutron_conn):
m_exist.return_value = True
m_verify_k8s_conn.return_value = True, 200
m_verify_neutron_conn.side_effect = Exception
resp = self.test_client.get('/ready')
m_verify_neutron_conn.assert_called_once()
self.assertEqual(500, resp.status_code)
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'_has_kuryr_crd')
@mock.patch('os.path.exists')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_neutron_connection')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_keystone_connection')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_k8s_connection')
def test_readiness_kuryrnet_crd_error(self, m_verify_k8s_conn,
m_verify_keystone_conn,
m_verify_neutron_conn, m_exist,
m_has_kuryr_crd):
m_exist, m_has_kuryr_crd):
kuryrnets_url = k_const.K8S_API_CRD_KURYRNETS
m_has_kuryr_crd.side_effect = [False]
@ -138,16 +136,13 @@ class TestHealthServer(base.TestCase):
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'_has_kuryr_crd')
@mock.patch('os.path.exists')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_neutron_connection')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_keystone_connection')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_k8s_connection')
def test_readiness_kuryrnetpolicy_crd_error(self, m_verify_k8s_conn,
m_verify_keystone_conn,
m_verify_neutron_conn, m_exist,
m_has_kuryr_crd):
m_exist, m_has_kuryr_crd):
kuryrnetpolicies_url = k_const.K8S_API_CRD_KURYRNETPOLICIES
m_has_kuryr_crd.side_effect = [True, False]
@ -157,6 +152,46 @@ class TestHealthServer(base.TestCase):
m_has_kuryr_crd.assert_called_with(kuryrnetpolicies_url)
self.assertEqual(500, resp.status_code)
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'_components_ready')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'_has_kuryr_crd')
@mock.patch('os.path.exists')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_keystone_connection')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_k8s_connection')
def test_readiness_neutron_error(self, m_verify_k8s_conn,
m_verify_keystone_conn,
m_exist, m_has_kuryr_crd,
m_components_ready):
m_components_ready.side_effect = Exception
resp = self.test_client.get('/ready')
m_components_ready.assert_called_once()
self.assertEqual(500, resp.status_code)
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'_components_ready')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'_has_kuryr_crd')
@mock.patch('os.path.exists')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_keystone_connection')
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
'verify_k8s_connection')
def test_readiness_components_ready_error(self, m_verify_k8s_conn,
m_verify_keystone_conn,
m_exist, m_has_kuryr_crd,
m_components_ready):
m_components_ready.return_value = False
resp = self.test_client.get('/ready')
m_components_ready.assert_called_once()
self.assertEqual(500, resp.status_code)
def test__has_kuryrnet_crd(self):
kuryrnet_crd = {
"apiVersion": "openstack.org/v1",
@ -212,7 +247,33 @@ class TestHealthServer(base.TestCase):
kubernetes.get.assert_called_once()
@mock.patch.object(_TestHandler, 'is_healthy')
@mock.patch.object(_TestHandler, 'is_ready')
def test__components_ready(self, m_status):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
neutron.show_quota.return_value = get_quota_obj()
self.srv._registry = [_TestHandler()]
m_status.return_value = True
resp = self.srv._components_ready()
m_status.assert_called_once()
self.assertEqual(resp, True)
neutron.show_quota.assert_called_once()
@mock.patch.object(_TestHandler, 'is_ready')
def test__components_ready_error(self, m_status):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
neutron.show_quota.return_value = get_quota_obj()
self.srv._registry = [_TestHandler()]
m_status.return_value = False
resp = self.srv._components_ready()
m_status.assert_called_once()
self.assertEqual(resp, False)
neutron.show_quota.assert_called_once()
@mock.patch.object(_TestHandler, 'is_alive')
def test_liveness(self, m_status):
m_status.return_value = True
self.srv._registry = [_TestHandler()]
@ -222,7 +283,7 @@ class TestHealthServer(base.TestCase):
m_status.assert_called_once()
self.assertEqual(200, resp.status_code)
@mock.patch.object(_TestHandler, 'is_healthy')
@mock.patch.object(_TestHandler, 'is_alive')
def test_liveness_error(self, m_status):
m_status.return_value = False
self.srv._registry = [_TestHandler()]

View File

@ -18,7 +18,7 @@ import mock
class _TestHandler(h_health.HealthHandler):
def is_healthy(self):
def is_alive(self):
pass
@ -42,6 +42,7 @@ class TestHealthHandler(test_base.TestCase):
health_handler = _TestHandler()
self.assertTrue(health_handler._healthy)
self.assertTrue(health_handler._alive)
self.assertTrue(health_handler._ready)
m_health_register_obj.register.assert_called_once_with(health_handler)
self.assertEqual(m_health_register_obj, health_handler._manager)

View File

@ -313,7 +313,7 @@ class TestWatcher(test_base.TestCase):
"Connection Broken")
self.client.watch.assert_called_once()
self.assertFalse(watcher_obj._healthy)
self.assertFalse(watcher_obj._alive)
m_sys_exit.assert_called_once_with(1)
@mock.patch('sys.exit')

View File

@ -173,3 +173,17 @@ def extract_pod_annotation(annotation):
obj = vif.PodState(default_vif=obj)
return obj
def has_limit(quota):
NO_LIMIT = -1
return quota != NO_LIMIT
def is_available(resource, resource_quota, neutron_func):
qnt_resources = len(neutron_func().get(resource))
availability = resource_quota - qnt_resources
if availability <= 0:
LOG.error("Quota exceeded for resource: %s", resource)
return False
return True

View File

@ -179,7 +179,7 @@ class Watcher(health.HealthHandler):
if (attempts > 0 and
utils.exponential_sleep(deadline, attempts) == 0):
LOG.error("Failed watching '%s': deadline exceeded", path)
self._healthy = False
self._alive = False
return
LOG.info("Started watching '%s'", path)