From 6e712f45035a501a96b3c4bb5332210215c6d7da Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Thu, 19 Apr 2018 08:19:31 +0000 Subject: [PATCH] Add Deckhand Create Site Action Tag Operator Uses Deckhand tags to mark revision as 'site-action-failure' or 'site-action-success' at the end of the workflow Change-Id: I3753fe202eb4fc610b19f2508b7082b7ab16cb5d --- .../dags/common_step_factory.py | 25 ++- .../shipyard_airflow/dags/dag_names.py | 11 +- .../dags/deckhand_create_tag.py | 36 +++++ .../shipyard_airflow/dags/deploy_site.py | 2 + .../shipyard_airflow/dags/update_site.py | 11 +- .../deckhand_create_site_action_tag.py | 150 ++++++++++++++++++ 6 files changed, 226 insertions(+), 9 deletions(-) create mode 100644 src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_create_tag.py create mode 100644 src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_create_site_action_tag.py diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py index 05ae71bc..69687978 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py @@ -18,14 +18,15 @@ from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator from armada_deploy_site import deploy_site_armada -import dag_names as dn +from dag_deployment_configuration import get_deployment_configuration +from deckhand_create_tag import create_deckhand_tag from deckhand_get_rendered_doc import get_rendered_doc_deckhand from destroy_node import destroy_server from drydock_deploy_site import deploy_site_drydock from failure_handlers import step_failure_handler -from dag_deployment_configuration import get_deployment_configuration from preflight_checks import all_preflight_checks from validate_site_design import validate_site_design +import dag_names as dn class CommonStepFactory(object): @@ -242,3 +243,23 @@ class CommonStepFactory(object): bash_command=( "echo 'Airflow Worker Upgrade Not Required'"), dag=self.dag) + + def get_create_action_tag(self, task_id=dn.CREATE_ACTION_TAG): + """Generate the create action tag step + + Step is responsible for tagging the revision with either + 'site-action-success' or 'site-action-failure' depending + on the final state of the site action. + + Note that trigger_rule is set to "all_done" so that this + step will run even when upstream tasks are in failed state. + """ + return SubDagOperator( + subdag=create_deckhand_tag( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + trigger_rule="all_done", + dag=self.dag) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py index ea608e85..39575889 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py @@ -15,15 +15,16 @@ # Subdags ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' ARMADA_BUILD_DAG_NAME = 'armada_build' +CREATE_ACTION_TAG = 'create_action_tag' DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' -GET_RENDERED_DOC = 'get_rendered_doc' -GET_DEPLOY_CONF_DAG_NAME = 'dag_deployment_configuration' -DRYDOCK_BUILD_DAG_NAME = 'drydock_build' -VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' DESTROY_SERVER_DAG_NAME = 'destroy_server' +DRYDOCK_BUILD_DAG_NAME = 'drydock_build' +GET_DESIGN_VERSION = 'get_design_version' +GET_DEPLOY_CONF_DAG_NAME = 'dag_deployment_configuration' +VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' # Steps ACTION_XCOM = 'action_xcom' DECIDE_AIRFLOW_UPGRADE = 'decide_airflow_upgrade' -UPGRADE_AIRFLOW = 'upgrade_airflow' SKIP_UPGRADE_AIRFLOW = 'skip_upgrade_airflow' +UPGRADE_AIRFLOW = 'upgrade_airflow' diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_create_tag.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_create_tag.py new file mode 100644 index 00000000..37728282 --- /dev/null +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_create_tag.py @@ -0,0 +1,36 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.models import DAG +from airflow.operators import DeckhandCreateSiteActionTagOperator + +from config_path import config_path + + +def create_deckhand_tag(parent_dag_name, child_dag_name, args): + ''' + Create Deckhand Revision Tag + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args) + + create_action_tag = DeckhandCreateSiteActionTagOperator( + task_id='deckhand_create_action_tag', + shipyard_conf=config_path, + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, + dag=dag) + + return dag diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py index 6a36f526..ddfc82f0 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py @@ -51,6 +51,7 @@ deployment_configuration = step_factory.get_deployment_configuration() validate_site_design = step_factory.get_validate_site_design() drydock_build = step_factory.get_drydock_build() armada_build = step_factory.get_armada_build() +create_action_tag = step_factory.get_create_action_tag() # DAG Wiring concurrency_check.set_upstream(action_xcom) @@ -60,3 +61,4 @@ deployment_configuration.set_upstream(get_rendered_doc) validate_site_design.set_upstream(deployment_configuration) drydock_build.set_upstream(validate_site_design) armada_build.set_upstream(drydock_build) +create_action_tag.set_upstream(armada_build) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py index 2c8ccec0..2ca89284 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py @@ -58,6 +58,7 @@ armada_build = step_factory.get_armada_build() decide_airflow_upgrade = step_factory.get_decide_airflow_upgrade() upgrade_airflow = step_factory.get_upgrade_airflow() skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow() +create_action_tag = step_factory.get_create_action_tag() # DAG Wiring concurrency_check.set_upstream(action_xcom) @@ -67,5 +68,11 @@ validate_site_design.set_upstream(deployment_configuration) drydock_build.set_upstream(validate_site_design) armada_build.set_upstream(drydock_build) decide_airflow_upgrade.set_upstream(armada_build) -decide_airflow_upgrade.set_downstream(upgrade_airflow) -decide_airflow_upgrade.set_downstream(skip_upgrade_airflow) +decide_airflow_upgrade.set_downstream([ + upgrade_airflow, + skip_upgrade_airflow +]) +create_action_tag.set_upstream([ + upgrade_airflow, + skip_upgrade_airflow +]) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_create_site_action_tag.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_create_site_action_tag.py new file mode 100644 index 00000000..9ce0dfe7 --- /dev/null +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_create_site_action_tag.py @@ -0,0 +1,150 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import datetime +import logging +# Using nosec to prevent Bandit blacklist reporting. Subprocess is used +# in a controlled way as part of this operator. +import subprocess # nosec + +from airflow.plugins_manager import AirflowPlugin +from airflow.exceptions import AirflowException + +from deckhand_base_operator import DeckhandBaseOperator + +LOG = logging.getLogger(__name__) + + +class DeckhandCreateSiteActionTagOperator(DeckhandBaseOperator): + + """Deckhand Create Site Action Tag Operator + + This operator will trigger Deckhand to create a tag for the revision at + the end of the workflow. The tag will either be 'site-action-success' + or 'site-action-failure' (dependent upon the result of the workflow). + + """ + + def do_execute(self): + + # Calculate total elapsed time for workflow + time_delta = datetime.now() - self.task_instance.execution_date + + hours, remainder = divmod(time_delta.seconds, 3600) + minutes, seconds = divmod(remainder, 60) + + LOG.info('The workflow took %d hr %d mins %d seconds to' + ' execute', hours, minutes, seconds) + + LOG.info("Retrieving final state of %s...", self.main_dag_name) + + workflow_result = self.check_workflow_result() + + LOG.info("Creating Site Action Tag for Revision %d", self.revision_id) + + # Create site action tag + try: + if workflow_result: + self.deckhandclient.tags.create(revision_id=self.revision_id, + tag='site-action-success') + else: + self.deckhandclient.tags.create(revision_id=self.revision_id, + tag='site-action-failure') + + LOG.info("Site Action Tag created for Revision %d", + self.revision_id) + + except: + # Dump logs from Deckhand pods + self.get_k8s_logs() + + raise AirflowException("Failed to create revision tag!") + + def check_task_result(self, task_id): + + # Convert Execution Date from datetime format to string + fmt = '%Y-%m-%dT%H:%M:%S' + execution_date = self.task_instance.execution_date.strftime(fmt) + + # Retrieve result of task execution + # + # TODO(eanylin): Use Airflow API instead of CLI once the API is + # ready for consumption, i.e. no longer experimental + response = subprocess.run( + ['airflow', + 'task_state', + self.main_dag_name, + task_id, + execution_date], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + if response.returncode != 0: + LOG.error("Encountered error while executing Airflow CLI!") + + raise AirflowException(response.stderr.decode('utf-8')) + + else: + # The result of the task state will be the last element of + # the list. The task should# either be in 'success' or + # 'failed' state as all relevant tasks would have completed + # execution at this point in time. + result = response.stdout.decode('utf-8').splitlines()[-1] + + LOG.info("Task %s is in %s state", task_id, result) + + return result + + def check_workflow_result(self): + + # Initialize Variables + task = ['armada_build'] + task_result = {} + + if self.main_dag_name == 'update_site': + # NOTE: We will check the final state of the 'armada_build' task + # as a 'success' means that all tasks preceding it would either + # be in 'skipped' or 'success' state. A failure of 'armada_build' + # would mean that the workflow has failed. Hence it is sufficient + # to determine the success/failure of the 'deploy_site' workflow + # with the final state of the 'armada_build' task. + # + # NOTE: The 'update_site' workflow contains additional steps for + # upgrading of worker pods. + for k in ['skip_upgrade_airflow', 'upgrade_airflow']: + task.append(k) + + # Retrieve task result + for i in task: + task_result[i] = self.check_task_result(i) + + # Check for failed task(s) + failed_task = [x for x in task if task_result[x] == 'failed'] + + if failed_task: + LOG.info("Task(s) in the workflow has/have failed: %s", + ", ".join(failed_task)) + + return False + + else: + LOG.info("All tasks completed successfully") + + return True + + +class DeckhandCreateSiteActionTagOperatorPlugin(AirflowPlugin): + + """Creates DeckhandCreateSiteActionTagOperator in Airflow.""" + + name = 'deckhand_create_site_action_tag_operator' + operators = [DeckhandCreateSiteActionTagOperator]