Ensure pod logs are fetched in case of exception in any operator

This patch tries to cover some edge cases could happen during Shipyard
Airflow operator execution. All operators at the moment make
interactions with other services i.e. k8s pods. In a case of exceptions
during execution of the operator, logs will be fetched from the
appropriate pod and if the operator has "fetch_failure_details" method
(see DrydockBaseOperator) it will be called as well.

What exception could happen during an operator execution?
Besides explicitly defined in code like
DrydockClientUseFailureException, other exception e.g. KeyError or
similar may be raised. It's not clear who is a culprit in that client
side (Shipyard) or server side (Drydock, Armada, Deckhand,
Promenade). So this patch applies defensive mode and gets logs from
pods and gets additional details for any exceptional situations.

For doing that do_execute method is wrapped with try..except
in UcpBaseOperator.execute. While fetching logs from a pod
and fetching failure details it makes appropriate logging by itself
and finally reraises the original exception.

Change-Id: If1501e9a24b05edb6eb32c7b1b2d27f24f3ee063
This commit is contained in:
Andrey Volkov 2018-09-17 10:45:23 -07:00
parent d937a165e2
commit 4164518502
13 changed files with 87 additions and 61 deletions

View File

@ -58,9 +58,6 @@ class ArmadaGetReleasesOperator(ArmadaBaseOperator):
LOG.info("Successfully retrieved Helm charts releases")
LOG.info(armada_get_releases)
else:
# Dump logs from Armada API pods
self.get_k8s_logs()
raise AirflowException("Failed to retrieve Helm charts releases!")

View File

@ -61,9 +61,6 @@ class ArmadaGetStatusOperator(ArmadaBaseOperator):
armada_get_status['tiller']['version'])
else:
# Dump logs from Armada API pods
self.get_k8s_logs()
raise AirflowException("Please check Tiller!")

View File

@ -69,9 +69,6 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator):
timeout=timeout)
except errors.ClientError as client_error:
# Dump logs from Armada API pods
self.get_k8s_logs()
raise AirflowException(client_error)
# if this is a retry, assume that the airflow worker needs to be

View File

@ -50,9 +50,6 @@ class ArmadaValidateDesignOperator(ArmadaBaseOperator):
manifest=self.design_ref, timeout=timeout)
except errors.ClientError as client_error:
# Dump logs from Armada API pods
self.get_k8s_logs()
raise AirflowException(client_error)
# Print results
@ -65,9 +62,6 @@ class ArmadaValidateDesignOperator(ArmadaBaseOperator):
if status.lower() == 'success':
LOG.info("Site Design has been successfully validated")
else:
# Dump logs from Armada API pods
self.get_k8s_logs()
raise AirflowException("Site Design Validation Failed "
"with status: {}!".format(status))

View File

@ -70,9 +70,6 @@ class DeckhandCreateSiteActionTagOperator(DeckhandBaseOperator):
self.revision_id)
except:
# Dump logs from Deckhand pods
self.get_k8s_logs()
raise AirflowException("Failed to create revision tag!")
def check_task_result(self, task_id):

View File

@ -46,9 +46,6 @@ class DeckhandRetrieveRenderedDocOperator(DeckhandBaseOperator):
LOG.info("Successfully Retrieved Rendered Document")
except:
# Dump logs from Deckhand pods
self.get_k8s_logs()
raise AirflowException("Failed to Retrieve Rendered Document!")

View File

@ -57,16 +57,10 @@ class DeckhandValidateSiteDesignOperator(DeckhandBaseOperator):
timeout=self.validation_read_timeout).text)
except requests.exceptions.RequestException as e:
# Dump logs from Deckhand pods
self.get_k8s_logs()
raise AirflowException(e)
if (any([str(v.get('status', 'unspecified')).lower() == 'failure'
for v in retrieved_list.get('results', [])])):
# Dump logs from Deckhand pods
self.get_k8s_logs()
raise AirflowException("DeckHand Site Design Validation Failed!")
else:
LOG.info("Revision %d has been successfully validated",

View File

@ -179,9 +179,6 @@ class DrydockBaseOperator(UcpBaseOperator):
node_filter=self.node_filter)
except errors.ClientError as client_error:
# Dump logs from Drydock pods
self.get_k8s_logs()
raise DrydockClientUseFailureException(client_error)
# Retrieve Task ID
@ -221,7 +218,6 @@ class DrydockBaseOperator(UcpBaseOperator):
LOG.info("Current status of task id %s is %s",
self.drydock_task_id, task_status)
except DrydockClientUseFailureException:
self.get_k8s_logs()
raise
except:
# There can be situations where there are intermittent network
@ -234,10 +230,7 @@ class DrydockBaseOperator(UcpBaseOperator):
# TODO(bryan-strassner) If Shipyard has timed out waiting for
# this task to complete, and Drydock has provided a means
# to cancel a task, that cancellation should be done here.
# task_failure only exits with an exception, so this is the
# end of processing in the case of a timeout.
self.task_failure(False)
raise DrydockTaskTimeoutException("Task Execution Timed Out!")
# Exit 'for' loop if the task is in 'complete' or 'terminated'
# state
@ -252,7 +245,8 @@ class DrydockBaseOperator(UcpBaseOperator):
LOG.info('Task id %s has been successfully completed',
self.drydock_task_id)
else:
self.task_failure(True)
raise DrydockTaskFailedException(
"Failed to Execute/Complete Task!")
def get_task_dict(self, task_id):
"""Retrieve task output in its raw dictionary format
@ -268,10 +262,7 @@ class DrydockBaseOperator(UcpBaseOperator):
except errors.ClientError as client_error:
raise DrydockClientUseFailureException(client_error)
def task_failure(self, _task_failure):
# Dump logs from Drydock pods
self.get_k8s_logs()
def fetch_failure_details(self):
LOG.info('Retrieving all tasks records from Drydock...')
try:
@ -297,21 +288,14 @@ class DrydockBaseOperator(UcpBaseOperator):
LOG.error(pprint.pprint(failed_parent_task[0]))
# Get the list of subtasks belonging to the failed parent task
parent_subtask_id_list = failed_parent_task[0]['subtask_id_list']
# Get the list of subtasks belonging to the failed parent task
parent_subtask_id_list = failed_parent_task[0]['subtask_id_list']
# Check for failed subtasks
self.check_subtask_failure(parent_subtask_id_list)
# Raise Exception to terminate workflow
if _task_failure:
raise DrydockTaskFailedException(
"Failed to Execute/Complete Task!"
)
# Check for failed subtasks
self.check_subtask_failure(parent_subtask_id_list)
else:
raise DrydockTaskTimeoutException(
"Task Execution Timed Out!"
)
LOG.info("No failed parent task found for task_id %s",
self.drydock_task_id)
def check_subtask_failure(self, subtask_id_list):

View File

@ -81,9 +81,6 @@ class DrydockValidateDesignOperator(DrydockBaseOperator):
if status.lower() == 'success':
LOG.info("DryDock Site Design has been successfully validated")
else:
# Dump logs from Drydock pods
self.get_k8s_logs()
raise AirflowException("DryDock Site Design Validation Failed "
"with status: {}!".format(status))

View File

@ -89,9 +89,6 @@ class PromenadeValidateSiteDesignOperator(PromenadeBaseOperator):
LOG.info("Promenade Site Design has been successfully validated")
else:
# Dump logs from Promenade pods
self.get_k8s_logs()
raise AirflowException("Promenade Site Design Validation Failed "
"with status: {}!".format(status))

View File

@ -102,8 +102,18 @@ class UcpBaseOperator(BaseOperator):
self.run_base(context)
if self.continue_processing:
# Exeute child function
self.do_execute()
# Execute child function
try:
self.do_execute()
except Exception:
LOG.exception(
'Exception happened during %s execution, '
'will try to log additional details',
self.__class__.__name__)
self.get_k8s_logs()
if hasattr(self, 'fetch_failure_details'):
self.fetch_failure_details()
raise
def ucp_base(self, context):

View File

@ -13,3 +13,6 @@ project_domain_name = default
project_name = service
user_domain_name = default
username = shipyard
[k8s_logs]
ucp_namespace = fake_ucp

View File

@ -0,0 +1,62 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for drydock base operator functions"""
import os
from unittest import mock
import pytest
from shipyard_airflow.plugins.drydock_verify_site import (
DrydockVerifySiteOperator
)
from shipyard_airflow.plugins.drydock_errors import (
DrydockTaskFailedException,
)
CONF_FILE = os.path.join(os.path.dirname(__file__), 'test.conf')
@mock.patch('shipyard_airflow.plugins.ucp_base_operator.get_pod_logs')
def test_logs_fetched_if_exception_in_create_task(get_pod_logs):
client = mock.MagicMock()
err = 'Fake create task method failed'
client.create_task.side_effect = ValueError(err)
dvs = DrydockVerifySiteOperator(
task_id="t1",
shipyard_conf=CONF_FILE,
drydock_client=client)
dvs._deckhand_design_ref = mock.MagicMock()
dvs._continue_processing_flag = mock.MagicMock(return_value=True)
dvs._setup_drydock_client = mock.MagicMock()
with pytest.raises(ValueError, match=err):
dvs.execute(mock.MagicMock())
assert get_pod_logs.called
assert client.get_tasks.called
@mock.patch('time.sleep', mock.MagicMock())
@mock.patch('shipyard_airflow.plugins.ucp_base_operator.get_pod_logs')
def test_logs_fetched_if_exception_in_query_task(get_pod_logs):
client = mock.MagicMock()
dvs = DrydockVerifySiteOperator(
task_id="t1",
shipyard_conf=CONF_FILE,
drydock_client=client)
dvs._deckhand_design_ref = mock.MagicMock()
dvs._continue_processing_flag = mock.MagicMock(return_value=True)
dvs._setup_drydock_client = mock.MagicMock()
with pytest.raises(DrydockTaskFailedException):
dvs.execute(mock.MagicMock())
assert get_pod_logs.called
assert client.get_tasks.called