From 3629245b0c54438595ecd26e8281b231690d3049 Mon Sep 17 00:00:00 2001 From: Krysta Date: Thu, 29 Mar 2018 08:45:45 -0500 Subject: [PATCH] Pass Drydock health failure Bypass a failure due to the health of Drydock on update_site or deploy_site if the continue-on-fail param is enetered. Adds unit tests Changed operator to be more easily testable Removed the K8s Health Check for preflight checks. We will add the functionalities back once we have a clearer view on what should be checked and validated for the workflow. Change-Id: Idd6d6b18d762a0284f2041248faa4040c78def3f --- docs/source/CLI.rst | 5 + .../shipyard_airflow/dags/preflight_checks.py | 13 +-- .../shipyard_airflow/dags/update_site.py | 4 +- .../plugins/drydock_base_operator.py | 12 +++ .../plugins/service_endpoint.py | 5 +- .../plugins/ucp_base_operator.py | 8 +- .../plugins/ucp_preflight_check_operator.py | 72 +++++++++---- .../shipyard_airflow/plugins/xcom_puller.py | 9 ++ .../shipyard_client/cli/create/commands.py | 1 + .../test_ucp_preflight_check_operator.py | 101 ++++++++++++++++++ 10 files changed, 197 insertions(+), 33 deletions(-) create mode 100644 tests/unit/plugins/test_ucp_preflight_check_operator.py diff --git a/docs/source/CLI.rst b/docs/source/CLI.rst index ca07674c..9f0a370c 100644 --- a/docs/source/CLI.rst +++ b/docs/source/CLI.rst @@ -246,12 +246,17 @@ id of the action invoked so that it can be queried subsequently. Example: shipyard create action redeploy_server --param="server-name=mcp" + shipyard create action update_site --param="continue-on-fail=true" The action to invoke. \--param= A parameter to be provided to the action being invoked. (repeatable) + Note that we can pass in different information to the create action + workflow, i.e. name of server to be redeployed, whether to continue + the workflow if there are failures in Drydock, e.g. failed health + checks. \--allow-intermediate-commits Allows continuation of a site action, e.g. update_site even when the diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py index 121c0a18..ebd4add1 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py @@ -13,7 +13,6 @@ # limitations under the License. from airflow.models import DAG -from airflow.operators import K8sHealthCheckOperator from airflow.operators import UcpHealthCheckOperator from config_path import config_path @@ -27,22 +26,14 @@ def all_preflight_checks(parent_dag_name, child_dag_name, args): '{}.{}'.format(parent_dag_name, child_dag_name), default_args=args) - ''' - The k8s_preflight_check checks that k8s is in a good state - for the purposes of the Undercloud Platform to proceed with - processing - ''' - k8s = K8sHealthCheckOperator( - task_id='k8s_preflight_check', - dag=dag) - ''' Check that all UCP components are in good state for the purposes - of the Undercloud Platform to proceed with processing + of the Undercloud Platform to proceed with processing. ''' shipyard = UcpHealthCheckOperator( task_id='ucp_preflight_check', shipyard_conf=config_path, + main_dag_name=parent_dag_name, dag=dag) return dag diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py index 2ca89284..380abafc 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py @@ -50,6 +50,7 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME, action_xcom = step_factory.get_action_xcom() concurrency_check = step_factory.get_concurrency_check() +preflight = step_factory.get_preflight() get_rendered_doc = step_factory.get_get_rendered_doc() deployment_configuration = step_factory.get_deployment_configuration() validate_site_design = step_factory.get_validate_site_design() @@ -62,7 +63,8 @@ create_action_tag = step_factory.get_create_action_tag() # DAG Wiring concurrency_check.set_upstream(action_xcom) -get_rendered_doc.set_upstream(concurrency_check) +preflight.set_upstream(concurrency_check) +get_rendered_doc.set_upstream(preflight) deployment_configuration.set_upstream(get_rendered_doc) validate_site_design.set_upstream(deployment_configuration) drydock_build.set_upstream(validate_site_design) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py index 3b3cf141..f3a086b9 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py @@ -96,6 +96,18 @@ class DrydockBaseOperator(UcpBaseOperator): # Logs uuid of action performed by the Operator LOG.info("DryDock Operator for action %s", self.action_info['id']) + # Skip workflow if health checks on Drydock failed and continue-on-fail + # option is turned on + if self.xcom_puller.get_check_drydock_continue_on_fail(): + LOG.info("Skipping %s as health checks on Drydock have " + "failed and continue-on-fail option has been " + "turned on", self.__class__.__name__) + + # Set continue processing to False + self.continue_processing = False + + return + # Retrieve information of the server that we want to redeploy if user # executes the 'redeploy_server' dag # Set node filter to be the server that we want to redeploy diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_endpoint.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_endpoint.py index 24a9a29a..e16fa146 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_endpoint.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_endpoint.py @@ -17,7 +17,10 @@ import time from airflow.exceptions import AirflowException -from service_session import ucp_keystone_session +try: + from service_session import ucp_keystone_session +except ImportError: + from shipyard_airflow.plugins.service_session import ucp_keystone_session def ucp_service_endpoint(self, svc_type): diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py index babac0d0..96fae01b 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py @@ -48,6 +48,8 @@ class UcpBaseOperator(BaseOperator): *args, **kwargs): """Initialization of UcpBaseOperator object. + :param continue_processing: A boolean value on whether to continue + with the workflow. Defaults to True. :param main_dag_name: Parent Dag :param pod_selector_pattern: A list containing the information on the patterns of the Pod name and name @@ -68,6 +70,7 @@ class UcpBaseOperator(BaseOperator): """ super(UcpBaseOperator, self).__init__(*args, **kwargs) + self.continue_processing = True self.main_dag_name = main_dag_name self.pod_selector_pattern = pod_selector_pattern or [] self.shipyard_conf = shipyard_conf @@ -83,8 +86,9 @@ class UcpBaseOperator(BaseOperator): # Execute base function self.run_base(context) - # Exeute child function - self.do_execute() + if self.continue_processing: + # Exeute child function + self.do_execute() def ucp_base(self, context): diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_preflight_check_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_preflight_check_operator.py index 72e7d4be..04755c71 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_preflight_check_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_preflight_check_operator.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import os import requests @@ -22,6 +21,10 @@ from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults from service_endpoint import ucp_service_endpoint +from xcom_puller import XcomPuller +from xcom_pusher import XcomPusher + +LOG = logging.getLogger(__name__) class UcpHealthCheckOperator(BaseOperator): @@ -31,12 +34,16 @@ class UcpHealthCheckOperator(BaseOperator): @apply_defaults def __init__(self, - shipyard_conf, + shipyard_conf=None, + main_dag_name=None, + xcom_push=True, *args, **kwargs): super(UcpHealthCheckOperator, self).__init__(*args, **kwargs) self.shipyard_conf = shipyard_conf + self.main_dag_name = main_dag_name + self.xcom_push_flag = xcom_push def execute(self, context): @@ -48,35 +55,64 @@ class UcpHealthCheckOperator(BaseOperator): 'physicalprovisioner', 'shipyard'] + # Define task_instance + self.task_instance = context['task_instance'] + + # Set up and retrieve values from xcom + self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance) + self.action_info = self.xcom_puller.get_action_info() + + # Set up xcom_pusher to push values to xcom + self.xcom_pusher = XcomPusher(self.task_instance) + # Loop through various UCP Components - for i in ucp_components: + for component in ucp_components: # Retrieve Endpoint Information - service_endpoint = ucp_service_endpoint(self, svc_type=i) - logging.info("%s endpoint is %s", i, service_endpoint) + service_endpoint = ucp_service_endpoint(self, + svc_type=component) + LOG.info("%s endpoint is %s", component, service_endpoint) # Construct Health Check Endpoint healthcheck_endpoint = os.path.join(service_endpoint, 'health') - logging.info("%s healthcheck endpoint is %s", i, - healthcheck_endpoint) + LOG.info("%s healthcheck endpoint is %s", component, + healthcheck_endpoint) try: - logging.info("Performing Health Check on %s", i) - + LOG.info("Performing Health Check on %s", component) # Set health check timeout to 30 seconds req = requests.get(healthcheck_endpoint, timeout=30) - except requests.exceptions.RequestException as e: - raise AirflowException(e) - # UCP Component will return empty response/body to show that - # it is healthy - if req.status_code == 204: - logging.info("%s is alive and healthy", i) - else: - logging.error(req.text) - raise AirflowException("Invalid Response!") + # An empty response/body returned by a component means + # that it is healthy + if req.status_code == 204: + LOG.info("%s is alive and healthy", component) + + except requests.exceptions.RequestException as e: + self.log_health_exception(component, e) + + def log_health_exception(self, component, error_messages): + """Logs Exceptions for health check + """ + # If Drydock health check fails and continue-on-fail, continue + # and create xcom key 'drydock_continue_on_fail' + if (component == 'physicalprovisioner' and + self.action_info['parameters'].get( + 'continue-on-fail').lower() == 'true' and + self.action_info['dag_id'] in ['update_site', 'deploy_site']): + LOG.warning('Drydock did not pass health check. Continuing ' + 'as "continue-on-fail" option is enabled.') + self.xcom_pusher.xcom_push(key='drydock_continue_on_fail', + value=True) + + else: + LOG.error(error_messages) + raise AirflowException("Health check failed for %s component on " + "dag_id=%s. Details: %s" % + (component, self.action_info.get('dag_id'), + error_messages)) class UcpHealthCheckPlugin(AirflowPlugin): diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/xcom_puller.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/xcom_puller.py index b63207b9..7f4dc69e 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/xcom_puller.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/xcom_puller.py @@ -73,3 +73,12 @@ class XcomPuller(object): return self._get_xcom(source_task=source_task, dag_id=source_dag, key=key) + + def get_check_drydock_continue_on_fail(self): + """Check if 'drydock_continue_on_fail' key exists""" + source_task = 'ucp_preflight_check' + source_dag = 'preflight' + key = 'drydock_continue_on_fail' + return self._get_xcom(source_task=source_task, + dag_id=source_dag, + key=key) diff --git a/src/bin/shipyard_client/shipyard_client/cli/create/commands.py b/src/bin/shipyard_client/shipyard_client/cli/create/commands.py index 46c64aa8..f438a5b0 100644 --- a/src/bin/shipyard_client/shipyard_client/cli/create/commands.py +++ b/src/bin/shipyard_client/shipyard_client/cli/create/commands.py @@ -41,6 +41,7 @@ DESC_ACTION = """ FORMAT: shipyard create action --param= (repeatable) [--allow-intermediate-commits] \n EXAMPLE: shipyard create action redeploy_server --param="server-name=mcp" + shipyard create action update_site --param="continue-on-fail=true" """ SHORT_DESC_ACTION = ( diff --git a/tests/unit/plugins/test_ucp_preflight_check_operator.py b/tests/unit/plugins/test_ucp_preflight_check_operator.py new file mode 100644 index 00000000..8a2fdcb4 --- /dev/null +++ b/tests/unit/plugins/test_ucp_preflight_check_operator.py @@ -0,0 +1,101 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import mock +import pytest +from requests.models import Response + +from airflow.exceptions import AirflowException +from shipyard_airflow.plugins.ucp_preflight_check_operator import ( + UcpHealthCheckOperator) + +ucp_components = [ + 'armada', + 'deckhand', + 'kubernetesprovisioner', + 'physicalprovisioner', + 'shipyard'] + + +def test_drydock_health_skip_update_site(): + """ + Ensure that an error is not thrown due to Drydock health failing during + update_site or deploy site + """ + + expected_log = ('Drydock did not pass health check. Continuing ' + 'as "continue-on-fail" option is enabled.') + + req = Response() + req.status_code = None + + action_info = { + "dag_id": "update_site", + "parameters": {"continue-on-fail": "true"} + } + + op = UcpHealthCheckOperator(task_id='test') + op.action_info = action_info + + with mock.patch('logging.info', autospec=True) as mock_logger: + op.log_health('physicalprovisioner', req) + mock_logger.assert_called_with(expected_log) + + action_info = { + "dag_id": "deploy_site", + "parameters": {"continue-on-fail": "true"} + } + + with mock.patch('logging.info', autospec=True) as mock_logger: + op.log_health('physicalprovisioner', req) + mock_logger.assert_called_with(expected_log) + + +def test_failure_log_health(): + """ Ensure an error is thrown on failure for all components. + """ + action_info = { + "dag_id": "update_site", + "parameters": {"something-else": "true"} + } + + req = Response() + req.status_code = None + + op = UcpHealthCheckOperator(task_id='test') + op.action_info = action_info + + for i in ucp_components: + with pytest.raises(AirflowException) as expected_exc: + op.log_health(i, req) + assert "Health check failed" in str(expected_exc) + + +def test_success_log_health(): + """ Ensure 204 gives correct response for all components + """ + action_info = { + "dag_id": "deploy_site", + "parameters": {"something-else": "true"} + } + + req = Response() + req.status_code = 204 + + op = UcpHealthCheckOperator(task_id='test') + op.action_info = action_info + + for i in ucp_components: + with mock.patch('logging.info', autospec=True) as mock_logger: + op.log_health(i, req) + mock_logger.assert_called_with('%s is alive and healthy', i)