Pass Drydock health failure

Bypass a failure due to the health of Drydock on update_site
or deploy_site if the continue-on-fail param is enetered.

Adds unit tests

Changed operator to be more easily testable

Removed the K8s Health Check for preflight checks. We will add
the functionalities back once we have a clearer view on what
should be checked and validated for the workflow.

Change-Id: Idd6d6b18d762a0284f2041248faa4040c78def3f
This commit is contained in:
Krysta 2018-03-29 08:45:45 -05:00 committed by Anthony Lin
parent c0f8fc4359
commit 3629245b0c
10 changed files with 197 additions and 33 deletions

View File

@ -246,12 +246,17 @@ id of the action invoked so that it can be queried subsequently.
Example:
shipyard create action redeploy_server --param="server-name=mcp"
shipyard create action update_site --param="continue-on-fail=true"
<action_command>
The action to invoke.
\--param=<parameter>
A parameter to be provided to the action being invoked. (repeatable)
Note that we can pass in different information to the create action
workflow, i.e. name of server to be redeployed, whether to continue
the workflow if there are failures in Drydock, e.g. failed health
checks.
\--allow-intermediate-commits
Allows continuation of a site action, e.g. update_site even when the

View File

@ -13,7 +13,6 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators import K8sHealthCheckOperator
from airflow.operators import UcpHealthCheckOperator
from config_path import config_path
@ -27,22 +26,14 @@ def all_preflight_checks(parent_dag_name, child_dag_name, args):
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
'''
The k8s_preflight_check checks that k8s is in a good state
for the purposes of the Undercloud Platform to proceed with
processing
'''
k8s = K8sHealthCheckOperator(
task_id='k8s_preflight_check',
dag=dag)
'''
Check that all UCP components are in good state for the purposes
of the Undercloud Platform to proceed with processing
of the Undercloud Platform to proceed with processing.
'''
shipyard = UcpHealthCheckOperator(
task_id='ucp_preflight_check',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
dag=dag)
return dag

View File

@ -50,6 +50,7 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
action_xcom = step_factory.get_action_xcom()
concurrency_check = step_factory.get_concurrency_check()
preflight = step_factory.get_preflight()
get_rendered_doc = step_factory.get_get_rendered_doc()
deployment_configuration = step_factory.get_deployment_configuration()
validate_site_design = step_factory.get_validate_site_design()
@ -62,7 +63,8 @@ create_action_tag = step_factory.get_create_action_tag()
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
get_rendered_doc.set_upstream(concurrency_check)
preflight.set_upstream(concurrency_check)
get_rendered_doc.set_upstream(preflight)
deployment_configuration.set_upstream(get_rendered_doc)
validate_site_design.set_upstream(deployment_configuration)
drydock_build.set_upstream(validate_site_design)

View File

@ -96,6 +96,18 @@ class DrydockBaseOperator(UcpBaseOperator):
# Logs uuid of action performed by the Operator
LOG.info("DryDock Operator for action %s", self.action_info['id'])
# Skip workflow if health checks on Drydock failed and continue-on-fail
# option is turned on
if self.xcom_puller.get_check_drydock_continue_on_fail():
LOG.info("Skipping %s as health checks on Drydock have "
"failed and continue-on-fail option has been "
"turned on", self.__class__.__name__)
# Set continue processing to False
self.continue_processing = False
return
# Retrieve information of the server that we want to redeploy if user
# executes the 'redeploy_server' dag
# Set node filter to be the server that we want to redeploy

View File

@ -17,7 +17,10 @@ import time
from airflow.exceptions import AirflowException
from service_session import ucp_keystone_session
try:
from service_session import ucp_keystone_session
except ImportError:
from shipyard_airflow.plugins.service_session import ucp_keystone_session
def ucp_service_endpoint(self, svc_type):

View File

@ -48,6 +48,8 @@ class UcpBaseOperator(BaseOperator):
*args, **kwargs):
"""Initialization of UcpBaseOperator object.
:param continue_processing: A boolean value on whether to continue
with the workflow. Defaults to True.
:param main_dag_name: Parent Dag
:param pod_selector_pattern: A list containing the information on
the patterns of the Pod name and name
@ -68,6 +70,7 @@ class UcpBaseOperator(BaseOperator):
"""
super(UcpBaseOperator, self).__init__(*args, **kwargs)
self.continue_processing = True
self.main_dag_name = main_dag_name
self.pod_selector_pattern = pod_selector_pattern or []
self.shipyard_conf = shipyard_conf
@ -83,8 +86,9 @@ class UcpBaseOperator(BaseOperator):
# Execute base function
self.run_base(context)
# Exeute child function
self.do_execute()
if self.continue_processing:
# Exeute child function
self.do_execute()
def ucp_base(self, context):

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import requests
@ -22,6 +21,10 @@ from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from service_endpoint import ucp_service_endpoint
from xcom_puller import XcomPuller
from xcom_pusher import XcomPusher
LOG = logging.getLogger(__name__)
class UcpHealthCheckOperator(BaseOperator):
@ -31,12 +34,16 @@ class UcpHealthCheckOperator(BaseOperator):
@apply_defaults
def __init__(self,
shipyard_conf,
shipyard_conf=None,
main_dag_name=None,
xcom_push=True,
*args,
**kwargs):
super(UcpHealthCheckOperator, self).__init__(*args, **kwargs)
self.shipyard_conf = shipyard_conf
self.main_dag_name = main_dag_name
self.xcom_push_flag = xcom_push
def execute(self, context):
@ -48,35 +55,64 @@ class UcpHealthCheckOperator(BaseOperator):
'physicalprovisioner',
'shipyard']
# Define task_instance
self.task_instance = context['task_instance']
# Set up and retrieve values from xcom
self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance)
self.action_info = self.xcom_puller.get_action_info()
# Set up xcom_pusher to push values to xcom
self.xcom_pusher = XcomPusher(self.task_instance)
# Loop through various UCP Components
for i in ucp_components:
for component in ucp_components:
# Retrieve Endpoint Information
service_endpoint = ucp_service_endpoint(self, svc_type=i)
logging.info("%s endpoint is %s", i, service_endpoint)
service_endpoint = ucp_service_endpoint(self,
svc_type=component)
LOG.info("%s endpoint is %s", component, service_endpoint)
# Construct Health Check Endpoint
healthcheck_endpoint = os.path.join(service_endpoint,
'health')
logging.info("%s healthcheck endpoint is %s", i,
healthcheck_endpoint)
LOG.info("%s healthcheck endpoint is %s", component,
healthcheck_endpoint)
try:
logging.info("Performing Health Check on %s", i)
LOG.info("Performing Health Check on %s", component)
# Set health check timeout to 30 seconds
req = requests.get(healthcheck_endpoint, timeout=30)
except requests.exceptions.RequestException as e:
raise AirflowException(e)
# UCP Component will return empty response/body to show that
# it is healthy
if req.status_code == 204:
logging.info("%s is alive and healthy", i)
else:
logging.error(req.text)
raise AirflowException("Invalid Response!")
# An empty response/body returned by a component means
# that it is healthy
if req.status_code == 204:
LOG.info("%s is alive and healthy", component)
except requests.exceptions.RequestException as e:
self.log_health_exception(component, e)
def log_health_exception(self, component, error_messages):
"""Logs Exceptions for health check
"""
# If Drydock health check fails and continue-on-fail, continue
# and create xcom key 'drydock_continue_on_fail'
if (component == 'physicalprovisioner' and
self.action_info['parameters'].get(
'continue-on-fail').lower() == 'true' and
self.action_info['dag_id'] in ['update_site', 'deploy_site']):
LOG.warning('Drydock did not pass health check. Continuing '
'as "continue-on-fail" option is enabled.')
self.xcom_pusher.xcom_push(key='drydock_continue_on_fail',
value=True)
else:
LOG.error(error_messages)
raise AirflowException("Health check failed for %s component on "
"dag_id=%s. Details: %s" %
(component, self.action_info.get('dag_id'),
error_messages))
class UcpHealthCheckPlugin(AirflowPlugin):

View File

@ -73,3 +73,12 @@ class XcomPuller(object):
return self._get_xcom(source_task=source_task,
dag_id=source_dag,
key=key)
def get_check_drydock_continue_on_fail(self):
"""Check if 'drydock_continue_on_fail' key exists"""
source_task = 'ucp_preflight_check'
source_dag = 'preflight'
key = 'drydock_continue_on_fail'
return self._get_xcom(source_task=source_task,
dag_id=source_dag,
key=key)

View File

@ -41,6 +41,7 @@ DESC_ACTION = """
FORMAT: shipyard create action <action command> --param=<parameter>
(repeatable) [--allow-intermediate-commits] \n
EXAMPLE: shipyard create action redeploy_server --param="server-name=mcp"
shipyard create action update_site --param="continue-on-fail=true"
"""
SHORT_DESC_ACTION = (

View File

@ -0,0 +1,101 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import pytest
from requests.models import Response
from airflow.exceptions import AirflowException
from shipyard_airflow.plugins.ucp_preflight_check_operator import (
UcpHealthCheckOperator)
ucp_components = [
'armada',
'deckhand',
'kubernetesprovisioner',
'physicalprovisioner',
'shipyard']
def test_drydock_health_skip_update_site():
"""
Ensure that an error is not thrown due to Drydock health failing during
update_site or deploy site
"""
expected_log = ('Drydock did not pass health check. Continuing '
'as "continue-on-fail" option is enabled.')
req = Response()
req.status_code = None
action_info = {
"dag_id": "update_site",
"parameters": {"continue-on-fail": "true"}
}
op = UcpHealthCheckOperator(task_id='test')
op.action_info = action_info
with mock.patch('logging.info', autospec=True) as mock_logger:
op.log_health('physicalprovisioner', req)
mock_logger.assert_called_with(expected_log)
action_info = {
"dag_id": "deploy_site",
"parameters": {"continue-on-fail": "true"}
}
with mock.patch('logging.info', autospec=True) as mock_logger:
op.log_health('physicalprovisioner', req)
mock_logger.assert_called_with(expected_log)
def test_failure_log_health():
""" Ensure an error is thrown on failure for all components.
"""
action_info = {
"dag_id": "update_site",
"parameters": {"something-else": "true"}
}
req = Response()
req.status_code = None
op = UcpHealthCheckOperator(task_id='test')
op.action_info = action_info
for i in ucp_components:
with pytest.raises(AirflowException) as expected_exc:
op.log_health(i, req)
assert "Health check failed" in str(expected_exc)
def test_success_log_health():
""" Ensure 204 gives correct response for all components
"""
action_info = {
"dag_id": "deploy_site",
"parameters": {"something-else": "true"}
}
req = Response()
req.status_code = 204
op = UcpHealthCheckOperator(task_id='test')
op.action_info = action_info
for i in ucp_components:
with mock.patch('logging.info', autospec=True) as mock_logger:
op.log_health(i, req)
mock_logger.assert_called_with('%s is alive and healthy', i)