diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py index 03c1b9c8..40569667 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py @@ -245,47 +245,57 @@ class DrydockNodesOperator(DrydockBaseOperator): task. Drydock is assumed to roll up overall success to the top level. """ success_nodes = [] - task_dict = self.get_task_dict(task_id) - task_status = task_dict.get('status', "Not Specified") - task_result = task_dict.get('result') - if task_result is None: - LOG.warn("Task result is missing for task %s, with status %s." - " Neither successes nor further details can be extracted" - " from this result", - task_id, task_status) - else: - if extend_success: - try: - # successes and failures on the task result drive the - # interpretation of success or failure for this workflow. - # - Any node that is _only_ success for a task is a - # success to us. - # - Any node that is listed as a failure is a failure. - # This implies that a node listed as a success and a - # failure is a failure. E.g. some subtasks succeeded and - # some failed - t_successes = task_result.get('successes', []) - t_failures = task_result.get('failures', []) - actual_successes = set(t_successes) - set(t_failures) - # acquire the successes from success nodes - success_nodes.extend(actual_successes) - LOG.info("Nodes <%s> added as successes for task %s", - ", ".join(success_nodes), task_id) - except KeyError: - # missing key on the path to getting nodes - don't add any - LOG.warn("Missing successes field on result of task %s, " - "but a success field was expected. No successes" - " can be extracted from this result", - task_id) - pass - _report_task_info(task_id, task_result, task_status) + try: + task_dict = self.get_task_dict(task_id) + task_status = task_dict.get('status', "Not Specified") + task_result = task_dict.get('result') + if task_result is None: + LOG.warn("Task result is missing for task %s, with status %s." + " Neither successes nor further details can be" + " extracted from this result", + task_id, task_status) + else: + if extend_success: + try: + # successes and failures on the task result drive the + # interpretation of success or failure for this + # workflow. + # - Any node that is _only_ success for a task is a + # success to us. + # - Any node that is listed as a failure is a failure. + # This implies that a node listed as a success and a + # failure is a failure. E.g. some subtasks succeeded + # and some failed + t_successes = task_result.get('successes', []) + t_failures = task_result.get('failures', []) + actual_successes = set(t_successes) - set(t_failures) + # acquire the successes from success nodes + success_nodes.extend(actual_successes) + LOG.info("Nodes <%s> added as successes for task %s", + ", ".join(success_nodes), task_id) + except KeyError: + # missing key on the path to getting nodes - don't add + LOG.warn( + "Missing successes field on result of task %s, " + "but a success field was expected. No successes" + " can be extracted from this result", task_id + ) + pass + _report_task_info(task_id, task_result, task_status) + + # for each child, report only the step info, do not add to overall + # success list. + for ch_task_id in task_dict.get('subtask_id_list', []): + success_nodes.extend( + self._get_successes_for_task(ch_task_id, + extend_success=False) + ) + except Exception: + # since we are reporting task results, if we can't get the + # results, do not block the processing. + LOG.warn("Failed to retrieve a result for task %s. Exception " + "follows:", task_id, exc_info=True) - # for each child, report only the step info, do not add to overall - # success list. - for ch_task_id in task_dict.get('subtask_id_list', []): - success_nodes.extend( - self._get_successes_for_task(ch_task_id, extend_success=False) - ) # deduplicate and return return set(success_nodes)