From 1aa814491b21b21ff82f19dc25f7680ba391e0da Mon Sep 17 00:00:00 2001 From: Bryan Strassner Date: Fri, 29 Jun 2018 17:44:34 -0500 Subject: [PATCH] Change k8s node status to be a positive check When checking for deployed nodes, the kubernetes join check was only performing a negative check - and would wait for up to the timeout even in the event that nodes that were not part of the current processing before proceeding. This had the drawback of being overall likely to add wait time in any complex deployment scenario, as well as (and more importantly) miss the case where a node never started to try to join, and assume that was a success. This patchset flips the logic to positively look for an expected set of nodes instead, and will not wait upon nodes that are not currently being checked. The end result should remedy both of the drawbacks listed above. Change-Id: Ib07e4e2677ec4f773d695d57893fdfa5e4b7ff76 --- .../plugins/check_k8s_node_status.py | 130 +++++---- .../shipyard_airflow/plugins/drydock_nodes.py | 3 +- .../plugins/test_check_k8s_node_status.py | 247 ++++++++++++++++++ 3 files changed, 327 insertions(+), 53 deletions(-) create mode 100644 src/bin/shipyard_airflow/tests/unit/plugins/test_check_k8s_node_status.py diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/check_k8s_node_status.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/check_k8s_node_status.py index 9220f769..f5fa333a 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/check_k8s_node_status.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/check_k8s_node_status.py @@ -11,14 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time -from kubernetes import client, config +from kubernetes import client +from kubernetes import config -def check_node_status(time_out, interval): +def check_node_status(time_out, interval, expected_nodes): """This function retrieves the current state of the nodes in the Kubernetes cluster. We can use it to check the state of the cluster join process (drydock/promenade) and determine if all @@ -27,6 +27,8 @@ def check_node_status(time_out, interval): :param time_out: Node should be in Ready state before Time Out :param interval: Time interval in which we query node state + :param expected_nodes: The list of nodes that are expected to be + present in the check for status Example:: @@ -41,72 +43,96 @@ def check_node_status(time_out, interval): # Calls function to check that all nodes are in Ready State # Time out in this case is set to 15 mins, the time interval # has been set to 60 seconds - check_node_status(900, 60) + + # The expected nodes are the nodes to be compared against, + # as there could be nodes that never show up as ready, and those + # need to be represented in the response + check_node_status(900, 60, expected_nodes=['a','b','c']) """ - # Initialize Variable - not_ready_node_list = [] + # Initialize Variables - the nodes we are watching for + if not expected_nodes: + # if you're not looking for any, don't expect me to look either + return [] - # Note that we are using 'in_cluster_config' - config.load_incluster_config() - v1 = client.CoreV1Api() - - # Logs initial state of all nodes in the cluster - ret_init = v1.list_node(watch=False) - - logging.info("Current state of nodes in the cluster is") - - for i in ret_init.items: - logging.info("%s\t%s\t%s", i.metadata.name, - i.status.conditions[-1].status, - i.status.conditions[-1].type) - - # Populates the list of nodes in the Cluster - not_ready_node_list.append(i.metadata.name) + not_ready_node_list = list(expected_nodes) # Calculate number of times to execute the 'for' loop # Ensure that 'time_out' and 'interval' is passed in as integer # The result from the division will be a floating number which # We will round off to nearest whole number + + # no div/0 or negative intervals + if interval < 1: + interval = 1 + if time_out < 1: + time_out = 1 + end_range = round(int(time_out) / int(interval)) - + # end_range + 1 since the first check doesn't have a sleep ahead of it for i in range(0, end_range + 1): - # Reset node_ready to True for each iteration - cluster_ready = True - + logging.info("Remaining expected nodes to join cluster: [%s]", + ", ".join(not_ready_node_list)) # Get updated snapshot view of Cluster for each iteration - ret = v1.list_node(watch=False) + ret = _get_all_k8s_node_status() - # Check the current state of nodes that are not in Ready state - # from the previous iteration - for j in ret.items: - if j.metadata.name in not_ready_node_list: - if j.status.conditions[-1].status != 'True': - # Set cluster_ready to False - cluster_ready = False + # cautiously prevent crashing out of this code to ensure continued + # processing. + if ret is not None and hasattr(ret, 'items'): + # Check the state of nodes against the remaining expceted nodes + for j in ret.items: + # resolve response item fields without letting them break + # the processing loop. + try: + node_name = j.metadata.name + summary_status = j.status.conditions[-1].status + summary_message = j.status.conditions[-1].message + except (AttributeError, IndexError): + # any issue with the response object, move on to next item + logging.warning("Malformed node status response object. " + "Processing continues with the next item", + exc_info=True) + continue - # Print current state of node - logging.info("Node %s is not ready", j.metadata.name) - logging.debug("Current status of %s is %s", - j.metadata.name, - j.status.conditions[-1].message) - else: - # Remove 'Ready' node from list - not_ready_node_list.remove(j.metadata.name) + # only check nodes that we're currently waiting for + if node_name in not_ready_node_list: + if summary_status != 'True': + # Node not ready, print current state of node + logging.info("Node %s is not ready. Status is: %s", + node_name, summary_message) + else: + # Remove this node from list, it is ready + not_ready_node_list.remove(node_name) + logging.info("Node %s is in ready state", node_name) - logging.info("Node %s is in Ready state", j.metadata.name) - - # If any nodes are not ready and the timeout is reached, stop waiting - if not cluster_ready and i == end_range: - logging.info("Timed Out! One or more Nodes failed to reach ready " - "state") + # determine what to do based on the not_ready_node_list + if not_ready_node_list and i == end_range: + # There are remining items, and the timeout is elapsed + logging.info("Timed Out! Nodes [%s] did not reach ready state", + ", ".join(not_ready_node_list)) break - elif cluster_ready: - # Exit loop if Cluster is in Ready state - logging.info("All nodes are in ready state") + elif not not_ready_node_list: + # Exit loop where there are no more nodes to wait for (all ready) + logging.info("All expected nodes are in ready state") break else: + # There are nodes remaining, and time remining # Back off and check again in next iteration - logging.info("Wait for %d seconds...", int(interval)) + logging.info("Waiting %d seconds for next check of cluster status", + int(interval)) time.sleep(int(interval)) + # Return the nodes that are not ready. return not_ready_node_list + + +def _get_all_k8s_node_status(): + """Invoke Kubernetes and return the status response object""" + # Note that we are using 'in_cluster_config' + try: + config.load_incluster_config() + v1 = client.CoreV1Api() + return v1.list_node(watch=False) + except Exception: + # Log some diagnostics and return None. + logging.warning("There was an error retrieving the cluster status", + exc_info=True) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py index 0e7922c7..cba97c0e 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py @@ -167,7 +167,8 @@ class DrydockNodesOperator(DrydockBaseOperator): # Anything not ready in the timeout needs to be considered a failure not_ready_list = check_k8s_node_status.check_node_status( self.node_st_timeout, - self.node_st_interval + self.node_st_interval, + expected_nodes=task_result.successes ) for node in not_ready_list: # Remove nodes that are not ready from the list of successes, since diff --git a/src/bin/shipyard_airflow/tests/unit/plugins/test_check_k8s_node_status.py b/src/bin/shipyard_airflow/tests/unit/plugins/test_check_k8s_node_status.py new file mode 100644 index 00000000..0f59794d --- /dev/null +++ b/src/bin/shipyard_airflow/tests/unit/plugins/test_check_k8s_node_status.py @@ -0,0 +1,247 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for check_k8s_node_status functions""" +import mock + +from shipyard_airflow.plugins.check_k8s_node_status import ( + check_node_status +) + + +class MockNodeStatus: + """A fake response object used to simulate a k8s node status item""" + def __init__(self, name, status, message): + self.metadata = mock.MagicMock() + self.metadata.name = name + self.item = mock.MagicMock() + self.item.status = status + self.item.message = message + self.status = mock.MagicMock() + self.status.conditions = [self.item] + + +class MalformedNodeStatus: + """A malformed esponse object used to simulate a k8s node status item + + Accepts a name, if the name field should be formed correctly. + """ + def __init__(self, name=None): + if name: + self.metadata = mock.MagicMock() + self.metadata.name = name + + self.status = mock.MagicMock() + self.status.conditions = "broken" + + +def gen_check_node_status(response_dict): + """Generate a function that will return the requested response dict + + :param response_dict: the set of responses to return + """ + class _StatefulResponder: + def __init__(self, res_dict=response_dict): + self.res_dict = res_dict + self.invocation = 0 + + def responder(self): + ret = mock.MagicMock() + if str(self.invocation) in self.res_dict: + ret.items = self.res_dict.get(str(self.invocation)) + else: + ret.items = self.res_dict.get('final') + self.invocation += 1 + return ret + sr = _StatefulResponder() + return sr.responder + + +# Node names used in these tests will be represented by a letter and number +# E.g. a1, a2, a3, b1, b2, etc. + +# The following dictionaries are sequences of response objects that are +# returned from the _get_all_k8s_node_status substitute method. + +# Successful single invocation response. 3 nodes, a1, b1, c1, all ready on the +# first pass +INV_SEQ_A = { + 'final': [ + MockNodeStatus('a1', 'True', 'Ready'), + MockNodeStatus('b1', 'True', 'Ready'), + MockNodeStatus('c1', 'True', 'Ready'), + ] +} + + +# Successful invocation response. 3 nodes, a1, b1, c1, ready after three +# passes +INV_SEQ_B = { + '0': [ + ], + '1': [ + MockNodeStatus('c1', 'True', 'Ready'), + ], + '2': [ + MockNodeStatus('c1', 'True', 'Ready'), + ], + 'final': [ + MockNodeStatus('a1', 'True', 'Ready'), + MockNodeStatus('b1', 'True', 'Ready'), + MockNodeStatus('c1', 'True', 'Ready'), + ] +} + +# Successful invocation response. 3 nodes, a1, b1, c1, ready after three +# passes with non-ready nodes appearing along the way +INV_SEQ_C = { + '0': [ + MockNodeStatus('a1', 'False', 'Not Ready'), + ], + '1': [ + MockNodeStatus('a1', 'False', 'Not Ready'), + MockNodeStatus('b1', 'False', 'Not Ready'), + MockNodeStatus('c1', 'True', 'Ready'), + ], + '2': [ + MockNodeStatus('a1', 'False', 'Not Ready'), + MockNodeStatus('b1', 'True', 'Ready'), + MockNodeStatus('c1', 'True', 'Ready'), + ], + 'final': [ + MockNodeStatus('a1', 'True', 'Ready'), + MockNodeStatus('b1', 'True', 'Ready'), + MockNodeStatus('c1', 'True', 'Ready'), + ] +} + +# Malformed invocation response on first try. +# Successful node c1 +INV_SEQ_D = { + 'final': [ + MalformedNodeStatus('a1'), + MalformedNodeStatus(), + MockNodeStatus('c1', 'True', 'Ready'), + ], +} + + +class TestCheckK8sNodeStatus: + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_A)) + def test_check_node_status_all_success(self): + """Assert that check_node_status completes successfully + + Simple case - all nodes ready when response has all values + (set input) + """ + not_found_nodes = check_node_status(10, 1, set(['a1', 'b1', 'c1'])) + assert not not_found_nodes + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_A)) + def test_check_node_status_simple_failure(self): + """Assert that check_node_status completes successfully with failures + + Some nodes successful, but looking for some that never show up. + (list input) + """ + not_found_nodes = check_node_status(1, 1, ['a1', 'b1', 'c1', 'z1']) + assert not_found_nodes == ['z1'] + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_B)) + def test_check_node_status_all_success_4th(self): + """Assert that check_node_status completes successfully + + All nodes ready on 4th iteration + """ + not_found_nodes = check_node_status(3, 1, set(['a1', 'b1', 'c1'])) + assert not not_found_nodes + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_B)) + def test_check_node_status_timeout_before_4th(self): + """Assert that check_node_status completes successfully with failures + + Some nodes not ready before timeout (before 4th iteration) + """ + not_found_nodes = check_node_status(2, 1, set(['a1', 'b1', 'c1'])) + assert 'a1' in not_found_nodes + assert 'b1' in not_found_nodes + assert 'c1' not in not_found_nodes + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_C)) + def test_check_node_status_success_changing_status(self): + """Assert that check_node_status completes successfully + + Nodes go from not ready to ready + """ + not_found_nodes = check_node_status(30, 1, set(['a1', 'b1', 'c1'])) + assert not not_found_nodes + + def test_check_node_status_no_interest(self): + """Assert that check_node_status completes successfully + + Returns empty array because nothing was requested to look for. + """ + not_found_nodes = check_node_status(3, 1, expected_nodes=None) + assert not not_found_nodes + not_found_nodes = check_node_status(3, 1, []) + assert not not_found_nodes + not_found_nodes = check_node_status(3, 1, set([])) + assert not not_found_nodes + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_D)) + def test_check_node_status_malformed(self): + """Assert that check_node_status completes successfully + + Nodes go from not ready to ready + """ + not_found_nodes = check_node_status(1, 1, set(['a1', 'b1', 'c1'])) + assert 'a1' in not_found_nodes + assert 'b1' in not_found_nodes + assert 'c1' not in not_found_nodes + + def test_check_node_status_error_not_connected_to_k8s(self): + """Assert that check_node_status completes successfully with failures + + No nodes found because of errors doing lookup. + """ + not_found_nodes = check_node_status(1, 1, set(['a1', 'b1', 'c1'])) + assert 'a1' in not_found_nodes + assert 'b1' in not_found_nodes + assert 'c1' in not_found_nodes + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_A)) + def test_check_node_status_bad_intervals(self): + """Assert that check_node_status completes successfully + + With bogus timeout and interval values + """ + not_found_nodes = check_node_status(-1, -1, set(['a1', 'b1', 'c1'])) + assert not not_found_nodes + + not_found_nodes = check_node_status(1, 5, set(['a1', 'b1', 'z1'])) + assert not_found_nodes == ['z1']