Add Deckhand Create Site Action Tag Operator

Uses Deckhand tags to mark revision as 'site-action-failure' or
'site-action-success' at the end of the workflow

Change-Id: I3753fe202eb4fc610b19f2508b7082b7ab16cb5d
This commit is contained in:
Anthony Lin 2018-04-19 08:19:31 +00:00
parent dee8887c88
commit 6e712f4503
6 changed files with 226 additions and 9 deletions

View File

@ -18,14 +18,15 @@ from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from armada_deploy_site import deploy_site_armada
import dag_names as dn
from dag_deployment_configuration import get_deployment_configuration
from deckhand_create_tag import create_deckhand_tag
from deckhand_get_rendered_doc import get_rendered_doc_deckhand
from destroy_node import destroy_server
from drydock_deploy_site import deploy_site_drydock
from failure_handlers import step_failure_handler
from dag_deployment_configuration import get_deployment_configuration
from preflight_checks import all_preflight_checks
from validate_site_design import validate_site_design
import dag_names as dn
class CommonStepFactory(object):
@ -242,3 +243,23 @@ class CommonStepFactory(object):
bash_command=(
"echo 'Airflow Worker Upgrade Not Required'"),
dag=self.dag)
def get_create_action_tag(self, task_id=dn.CREATE_ACTION_TAG):
"""Generate the create action tag step
Step is responsible for tagging the revision with either
'site-action-success' or 'site-action-failure' depending
on the final state of the site action.
Note that trigger_rule is set to "all_done" so that this
step will run even when upstream tasks are in failed state.
"""
return SubDagOperator(
subdag=create_deckhand_tag(
self.parent_dag_name,
task_id,
args=self.default_args),
task_id=task_id,
on_failure_callback=step_failure_handler,
trigger_rule="all_done",
dag=self.dag)

View File

@ -15,15 +15,16 @@
# Subdags
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
ARMADA_BUILD_DAG_NAME = 'armada_build'
CREATE_ACTION_TAG = 'create_action_tag'
DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
GET_RENDERED_DOC = 'get_rendered_doc'
GET_DEPLOY_CONF_DAG_NAME = 'dag_deployment_configuration'
DRYDOCK_BUILD_DAG_NAME = 'drydock_build'
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
DESTROY_SERVER_DAG_NAME = 'destroy_server'
DRYDOCK_BUILD_DAG_NAME = 'drydock_build'
GET_DESIGN_VERSION = 'get_design_version'
GET_DEPLOY_CONF_DAG_NAME = 'dag_deployment_configuration'
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
# Steps
ACTION_XCOM = 'action_xcom'
DECIDE_AIRFLOW_UPGRADE = 'decide_airflow_upgrade'
UPGRADE_AIRFLOW = 'upgrade_airflow'
SKIP_UPGRADE_AIRFLOW = 'skip_upgrade_airflow'
UPGRADE_AIRFLOW = 'upgrade_airflow'

View File

@ -0,0 +1,36 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.models import DAG
from airflow.operators import DeckhandCreateSiteActionTagOperator
from config_path import config_path
def create_deckhand_tag(parent_dag_name, child_dag_name, args):
'''
Create Deckhand Revision Tag
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
create_action_tag = DeckhandCreateSiteActionTagOperator(
task_id='deckhand_create_action_tag',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
return dag

View File

@ -51,6 +51,7 @@ deployment_configuration = step_factory.get_deployment_configuration()
validate_site_design = step_factory.get_validate_site_design()
drydock_build = step_factory.get_drydock_build()
armada_build = step_factory.get_armada_build()
create_action_tag = step_factory.get_create_action_tag()
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
@ -60,3 +61,4 @@ deployment_configuration.set_upstream(get_rendered_doc)
validate_site_design.set_upstream(deployment_configuration)
drydock_build.set_upstream(validate_site_design)
armada_build.set_upstream(drydock_build)
create_action_tag.set_upstream(armada_build)

View File

@ -58,6 +58,7 @@ armada_build = step_factory.get_armada_build()
decide_airflow_upgrade = step_factory.get_decide_airflow_upgrade()
upgrade_airflow = step_factory.get_upgrade_airflow()
skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow()
create_action_tag = step_factory.get_create_action_tag()
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
@ -67,5 +68,11 @@ validate_site_design.set_upstream(deployment_configuration)
drydock_build.set_upstream(validate_site_design)
armada_build.set_upstream(drydock_build)
decide_airflow_upgrade.set_upstream(armada_build)
decide_airflow_upgrade.set_downstream(upgrade_airflow)
decide_airflow_upgrade.set_downstream(skip_upgrade_airflow)
decide_airflow_upgrade.set_downstream([
upgrade_airflow,
skip_upgrade_airflow
])
create_action_tag.set_upstream([
upgrade_airflow,
skip_upgrade_airflow
])

View File

@ -0,0 +1,150 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import logging
# Using nosec to prevent Bandit blacklist reporting. Subprocess is used
# in a controlled way as part of this operator.
import subprocess # nosec
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from deckhand_base_operator import DeckhandBaseOperator
LOG = logging.getLogger(__name__)
class DeckhandCreateSiteActionTagOperator(DeckhandBaseOperator):
"""Deckhand Create Site Action Tag Operator
This operator will trigger Deckhand to create a tag for the revision at
the end of the workflow. The tag will either be 'site-action-success'
or 'site-action-failure' (dependent upon the result of the workflow).
"""
def do_execute(self):
# Calculate total elapsed time for workflow
time_delta = datetime.now() - self.task_instance.execution_date
hours, remainder = divmod(time_delta.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
LOG.info('The workflow took %d hr %d mins %d seconds to'
' execute', hours, minutes, seconds)
LOG.info("Retrieving final state of %s...", self.main_dag_name)
workflow_result = self.check_workflow_result()
LOG.info("Creating Site Action Tag for Revision %d", self.revision_id)
# Create site action tag
try:
if workflow_result:
self.deckhandclient.tags.create(revision_id=self.revision_id,
tag='site-action-success')
else:
self.deckhandclient.tags.create(revision_id=self.revision_id,
tag='site-action-failure')
LOG.info("Site Action Tag created for Revision %d",
self.revision_id)
except:
# Dump logs from Deckhand pods
self.get_k8s_logs()
raise AirflowException("Failed to create revision tag!")
def check_task_result(self, task_id):
# Convert Execution Date from datetime format to string
fmt = '%Y-%m-%dT%H:%M:%S'
execution_date = self.task_instance.execution_date.strftime(fmt)
# Retrieve result of task execution
#
# TODO(eanylin): Use Airflow API instead of CLI once the API is
# ready for consumption, i.e. no longer experimental
response = subprocess.run(
['airflow',
'task_state',
self.main_dag_name,
task_id,
execution_date],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if response.returncode != 0:
LOG.error("Encountered error while executing Airflow CLI!")
raise AirflowException(response.stderr.decode('utf-8'))
else:
# The result of the task state will be the last element of
# the list. The task should# either be in 'success' or
# 'failed' state as all relevant tasks would have completed
# execution at this point in time.
result = response.stdout.decode('utf-8').splitlines()[-1]
LOG.info("Task %s is in %s state", task_id, result)
return result
def check_workflow_result(self):
# Initialize Variables
task = ['armada_build']
task_result = {}
if self.main_dag_name == 'update_site':
# NOTE: We will check the final state of the 'armada_build' task
# as a 'success' means that all tasks preceding it would either
# be in 'skipped' or 'success' state. A failure of 'armada_build'
# would mean that the workflow has failed. Hence it is sufficient
# to determine the success/failure of the 'deploy_site' workflow
# with the final state of the 'armada_build' task.
#
# NOTE: The 'update_site' workflow contains additional steps for
# upgrading of worker pods.
for k in ['skip_upgrade_airflow', 'upgrade_airflow']:
task.append(k)
# Retrieve task result
for i in task:
task_result[i] = self.check_task_result(i)
# Check for failed task(s)
failed_task = [x for x in task if task_result[x] == 'failed']
if failed_task:
LOG.info("Task(s) in the workflow has/have failed: %s",
", ".join(failed_task))
return False
else:
LOG.info("All tasks completed successfully")
return True
class DeckhandCreateSiteActionTagOperatorPlugin(AirflowPlugin):
"""Creates DeckhandCreateSiteActionTagOperator in Airflow."""
name = 'deckhand_create_site_action_tag_operator'
operators = [DeckhandCreateSiteActionTagOperator]