Change k8s node status to be a positive check

When checking for deployed nodes, the kubernetes join check was
only performing a negative check - and would wait for up to the timeout
even in the event that nodes that were not part of the current
processing before proceeding. This had the drawback of being overall
likely to add wait time in any complex deployment scenario, as well as
(and more importantly) miss the case where a node never started to try
to join, and assume that was a success.

This patchset flips the logic to positively look for an expected set of
nodes instead, and will not wait upon nodes that are not currently being
checked. The end result should remedy both of the drawbacks listed
above.

Change-Id: Ib07e4e2677ec4f773d695d57893fdfa5e4b7ff76
This commit is contained in:
Bryan Strassner 2018-06-29 17:44:34 -05:00
parent 4c6cce57ad
commit 1aa814491b
3 changed files with 327 additions and 53 deletions

View File

@ -11,14 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from kubernetes import client, config
from kubernetes import client
from kubernetes import config
def check_node_status(time_out, interval):
def check_node_status(time_out, interval, expected_nodes):
"""This function retrieves the current state of the nodes in the
Kubernetes cluster. We can use it to check the state of the
cluster join process (drydock/promenade) and determine if all
@ -27,6 +27,8 @@ def check_node_status(time_out, interval):
:param time_out: Node should be in Ready state before Time Out
:param interval: Time interval in which we query node state
:param expected_nodes: The list of nodes that are expected to be
present in the check for status
Example::
@ -41,72 +43,96 @@ def check_node_status(time_out, interval):
# Calls function to check that all nodes are in Ready State
# Time out in this case is set to 15 mins, the time interval
# has been set to 60 seconds
check_node_status(900, 60)
# The expected nodes are the nodes to be compared against,
# as there could be nodes that never show up as ready, and those
# need to be represented in the response
check_node_status(900, 60, expected_nodes=['a','b','c'])
"""
# Initialize Variable
not_ready_node_list = []
# Initialize Variables - the nodes we are watching for
if not expected_nodes:
# if you're not looking for any, don't expect me to look either
return []
# Note that we are using 'in_cluster_config'
config.load_incluster_config()
v1 = client.CoreV1Api()
# Logs initial state of all nodes in the cluster
ret_init = v1.list_node(watch=False)
logging.info("Current state of nodes in the cluster is")
for i in ret_init.items:
logging.info("%s\t%s\t%s", i.metadata.name,
i.status.conditions[-1].status,
i.status.conditions[-1].type)
# Populates the list of nodes in the Cluster
not_ready_node_list.append(i.metadata.name)
not_ready_node_list = list(expected_nodes)
# Calculate number of times to execute the 'for' loop
# Ensure that 'time_out' and 'interval' is passed in as integer
# The result from the division will be a floating number which
# We will round off to nearest whole number
# no div/0 or negative intervals
if interval < 1:
interval = 1
if time_out < 1:
time_out = 1
end_range = round(int(time_out) / int(interval))
# end_range + 1 since the first check doesn't have a sleep ahead of it
for i in range(0, end_range + 1):
# Reset node_ready to True for each iteration
cluster_ready = True
logging.info("Remaining expected nodes to join cluster: [%s]",
", ".join(not_ready_node_list))
# Get updated snapshot view of Cluster for each iteration
ret = v1.list_node(watch=False)
ret = _get_all_k8s_node_status()
# Check the current state of nodes that are not in Ready state
# from the previous iteration
for j in ret.items:
if j.metadata.name in not_ready_node_list:
if j.status.conditions[-1].status != 'True':
# Set cluster_ready to False
cluster_ready = False
# cautiously prevent crashing out of this code to ensure continued
# processing.
if ret is not None and hasattr(ret, 'items'):
# Check the state of nodes against the remaining expceted nodes
for j in ret.items:
# resolve response item fields without letting them break
# the processing loop.
try:
node_name = j.metadata.name
summary_status = j.status.conditions[-1].status
summary_message = j.status.conditions[-1].message
except (AttributeError, IndexError):
# any issue with the response object, move on to next item
logging.warning("Malformed node status response object. "
"Processing continues with the next item",
exc_info=True)
continue
# Print current state of node
logging.info("Node %s is not ready", j.metadata.name)
logging.debug("Current status of %s is %s",
j.metadata.name,
j.status.conditions[-1].message)
else:
# Remove 'Ready' node from list
not_ready_node_list.remove(j.metadata.name)
# only check nodes that we're currently waiting for
if node_name in not_ready_node_list:
if summary_status != 'True':
# Node not ready, print current state of node
logging.info("Node %s is not ready. Status is: %s",
node_name, summary_message)
else:
# Remove this node from list, it is ready
not_ready_node_list.remove(node_name)
logging.info("Node %s is in ready state", node_name)
logging.info("Node %s is in Ready state", j.metadata.name)
# If any nodes are not ready and the timeout is reached, stop waiting
if not cluster_ready and i == end_range:
logging.info("Timed Out! One or more Nodes failed to reach ready "
"state")
# determine what to do based on the not_ready_node_list
if not_ready_node_list and i == end_range:
# There are remining items, and the timeout is elapsed
logging.info("Timed Out! Nodes [%s] did not reach ready state",
", ".join(not_ready_node_list))
break
elif cluster_ready:
# Exit loop if Cluster is in Ready state
logging.info("All nodes are in ready state")
elif not not_ready_node_list:
# Exit loop where there are no more nodes to wait for (all ready)
logging.info("All expected nodes are in ready state")
break
else:
# There are nodes remaining, and time remining
# Back off and check again in next iteration
logging.info("Wait for %d seconds...", int(interval))
logging.info("Waiting %d seconds for next check of cluster status",
int(interval))
time.sleep(int(interval))
# Return the nodes that are not ready.
return not_ready_node_list
def _get_all_k8s_node_status():
"""Invoke Kubernetes and return the status response object"""
# Note that we are using 'in_cluster_config'
try:
config.load_incluster_config()
v1 = client.CoreV1Api()
return v1.list_node(watch=False)
except Exception:
# Log some diagnostics and return None.
logging.warning("There was an error retrieving the cluster status",
exc_info=True)

View File

@ -167,7 +167,8 @@ class DrydockNodesOperator(DrydockBaseOperator):
# Anything not ready in the timeout needs to be considered a failure
not_ready_list = check_k8s_node_status.check_node_status(
self.node_st_timeout,
self.node_st_interval
self.node_st_interval,
expected_nodes=task_result.successes
)
for node in not_ready_list:
# Remove nodes that are not ready from the list of successes, since

View File

@ -0,0 +1,247 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for check_k8s_node_status functions"""
import mock
from shipyard_airflow.plugins.check_k8s_node_status import (
check_node_status
)
class MockNodeStatus:
"""A fake response object used to simulate a k8s node status item"""
def __init__(self, name, status, message):
self.metadata = mock.MagicMock()
self.metadata.name = name
self.item = mock.MagicMock()
self.item.status = status
self.item.message = message
self.status = mock.MagicMock()
self.status.conditions = [self.item]
class MalformedNodeStatus:
"""A malformed esponse object used to simulate a k8s node status item
Accepts a name, if the name field should be formed correctly.
"""
def __init__(self, name=None):
if name:
self.metadata = mock.MagicMock()
self.metadata.name = name
self.status = mock.MagicMock()
self.status.conditions = "broken"
def gen_check_node_status(response_dict):
"""Generate a function that will return the requested response dict
:param response_dict: the set of responses to return
"""
class _StatefulResponder:
def __init__(self, res_dict=response_dict):
self.res_dict = res_dict
self.invocation = 0
def responder(self):
ret = mock.MagicMock()
if str(self.invocation) in self.res_dict:
ret.items = self.res_dict.get(str(self.invocation))
else:
ret.items = self.res_dict.get('final')
self.invocation += 1
return ret
sr = _StatefulResponder()
return sr.responder
# Node names used in these tests will be represented by a letter and number
# E.g. a1, a2, a3, b1, b2, etc.
# The following dictionaries are sequences of response objects that are
# returned from the _get_all_k8s_node_status substitute method.
# Successful single invocation response. 3 nodes, a1, b1, c1, all ready on the
# first pass
INV_SEQ_A = {
'final': [
MockNodeStatus('a1', 'True', 'Ready'),
MockNodeStatus('b1', 'True', 'Ready'),
MockNodeStatus('c1', 'True', 'Ready'),
]
}
# Successful invocation response. 3 nodes, a1, b1, c1, ready after three
# passes
INV_SEQ_B = {
'0': [
],
'1': [
MockNodeStatus('c1', 'True', 'Ready'),
],
'2': [
MockNodeStatus('c1', 'True', 'Ready'),
],
'final': [
MockNodeStatus('a1', 'True', 'Ready'),
MockNodeStatus('b1', 'True', 'Ready'),
MockNodeStatus('c1', 'True', 'Ready'),
]
}
# Successful invocation response. 3 nodes, a1, b1, c1, ready after three
# passes with non-ready nodes appearing along the way
INV_SEQ_C = {
'0': [
MockNodeStatus('a1', 'False', 'Not Ready'),
],
'1': [
MockNodeStatus('a1', 'False', 'Not Ready'),
MockNodeStatus('b1', 'False', 'Not Ready'),
MockNodeStatus('c1', 'True', 'Ready'),
],
'2': [
MockNodeStatus('a1', 'False', 'Not Ready'),
MockNodeStatus('b1', 'True', 'Ready'),
MockNodeStatus('c1', 'True', 'Ready'),
],
'final': [
MockNodeStatus('a1', 'True', 'Ready'),
MockNodeStatus('b1', 'True', 'Ready'),
MockNodeStatus('c1', 'True', 'Ready'),
]
}
# Malformed invocation response on first try.
# Successful node c1
INV_SEQ_D = {
'final': [
MalformedNodeStatus('a1'),
MalformedNodeStatus(),
MockNodeStatus('c1', 'True', 'Ready'),
],
}
class TestCheckK8sNodeStatus:
@mock.patch("shipyard_airflow.plugins.check_k8s_node_status."
"_get_all_k8s_node_status",
new=gen_check_node_status(INV_SEQ_A))
def test_check_node_status_all_success(self):
"""Assert that check_node_status completes successfully
Simple case - all nodes ready when response has all values
(set input)
"""
not_found_nodes = check_node_status(10, 1, set(['a1', 'b1', 'c1']))
assert not not_found_nodes
@mock.patch("shipyard_airflow.plugins.check_k8s_node_status."
"_get_all_k8s_node_status",
new=gen_check_node_status(INV_SEQ_A))
def test_check_node_status_simple_failure(self):
"""Assert that check_node_status completes successfully with failures
Some nodes successful, but looking for some that never show up.
(list input)
"""
not_found_nodes = check_node_status(1, 1, ['a1', 'b1', 'c1', 'z1'])
assert not_found_nodes == ['z1']
@mock.patch("shipyard_airflow.plugins.check_k8s_node_status."
"_get_all_k8s_node_status",
new=gen_check_node_status(INV_SEQ_B))
def test_check_node_status_all_success_4th(self):
"""Assert that check_node_status completes successfully
All nodes ready on 4th iteration
"""
not_found_nodes = check_node_status(3, 1, set(['a1', 'b1', 'c1']))
assert not not_found_nodes
@mock.patch("shipyard_airflow.plugins.check_k8s_node_status."
"_get_all_k8s_node_status",
new=gen_check_node_status(INV_SEQ_B))
def test_check_node_status_timeout_before_4th(self):
"""Assert that check_node_status completes successfully with failures
Some nodes not ready before timeout (before 4th iteration)
"""
not_found_nodes = check_node_status(2, 1, set(['a1', 'b1', 'c1']))
assert 'a1' in not_found_nodes
assert 'b1' in not_found_nodes
assert 'c1' not in not_found_nodes
@mock.patch("shipyard_airflow.plugins.check_k8s_node_status."
"_get_all_k8s_node_status",
new=gen_check_node_status(INV_SEQ_C))
def test_check_node_status_success_changing_status(self):
"""Assert that check_node_status completes successfully
Nodes go from not ready to ready
"""
not_found_nodes = check_node_status(30, 1, set(['a1', 'b1', 'c1']))
assert not not_found_nodes
def test_check_node_status_no_interest(self):
"""Assert that check_node_status completes successfully
Returns empty array because nothing was requested to look for.
"""
not_found_nodes = check_node_status(3, 1, expected_nodes=None)
assert not not_found_nodes
not_found_nodes = check_node_status(3, 1, [])
assert not not_found_nodes
not_found_nodes = check_node_status(3, 1, set([]))
assert not not_found_nodes
@mock.patch("shipyard_airflow.plugins.check_k8s_node_status."
"_get_all_k8s_node_status",
new=gen_check_node_status(INV_SEQ_D))
def test_check_node_status_malformed(self):
"""Assert that check_node_status completes successfully
Nodes go from not ready to ready
"""
not_found_nodes = check_node_status(1, 1, set(['a1', 'b1', 'c1']))
assert 'a1' in not_found_nodes
assert 'b1' in not_found_nodes
assert 'c1' not in not_found_nodes
def test_check_node_status_error_not_connected_to_k8s(self):
"""Assert that check_node_status completes successfully with failures
No nodes found because of errors doing lookup.
"""
not_found_nodes = check_node_status(1, 1, set(['a1', 'b1', 'c1']))
assert 'a1' in not_found_nodes
assert 'b1' in not_found_nodes
assert 'c1' in not_found_nodes
@mock.patch("shipyard_airflow.plugins.check_k8s_node_status."
"_get_all_k8s_node_status",
new=gen_check_node_status(INV_SEQ_A))
def test_check_node_status_bad_intervals(self):
"""Assert that check_node_status completes successfully
With bogus timeout and interval values
"""
not_found_nodes = check_node_status(-1, -1, set(['a1', 'b1', 'c1']))
assert not not_found_nodes
not_found_nodes = check_node_status(1, 5, set(['a1', 'b1', 'z1']))
assert not_found_nodes == ['z1']