Merge "DAG Maintenance - parallelization"

This commit is contained in:
Zuul 2018-07-06 13:51:15 +00:00 committed by Gerrit Code Review
commit 29d8810465
14 changed files with 64 additions and 203 deletions

View File

@ -33,7 +33,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
task_id='armada_get_status',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Armada Apply
@ -41,7 +40,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
task_id='armada_post_apply',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
retries=3,
dag=dag)
@ -50,7 +48,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
task_id='armada_get_releases',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Define dependencies

View File

@ -12,15 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandRetrieveRenderedDocOperator
from airflow.operators import DeploymentConfigurationOperator
from airflow.operators import DeckhandCreateSiteActionTagOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from armada_deploy_site import deploy_site_armada
from dag_deployment_configuration import get_deployment_configuration
from deckhand_create_tag import create_deckhand_tag
from deckhand_get_rendered_doc import get_rendered_doc_deckhand
from config_path import config_path
from destroy_node import destroy_server
from drydock_deploy_site import deploy_site_drydock
from failure_handlers import step_failure_handler
@ -64,7 +66,7 @@ class CommonStepFactory(object):
dag=self.dag,
python_callable=xcom_push)
def get_concurrency_check(self, task_id=dn.DAG_CONCURRENCY_CHECK_DAG_NAME):
def get_concurrency_check(self, task_id=dn.CONCURRENCY_CHECK):
"""Generate the concurrency check step
Concurrency check prevents simultaneous execution of dags that should
@ -95,11 +97,9 @@ class CommonStepFactory(object):
Check that we are able to render the docs before proceeding
further with the workflow
"""
return SubDagOperator(
subdag=get_rendered_doc_deckhand(
self.parent_dag_name,
task_id,
args=self.default_args),
return DeckhandRetrieveRenderedDocOperator(
shipyard_conf=config_path,
main_dag_name=self.parent_dag_name,
task_id=task_id,
on_failure_callback=step_failure_handler,
dag=self.dag)
@ -121,17 +121,15 @@ class CommonStepFactory(object):
dag=self.dag)
def get_deployment_configuration(self,
task_id=dn.GET_DEPLOY_CONF_DAG_NAME):
task_id=dn.DEPLOYMENT_CONFIGURATION):
"""Generate the step to retrieve the deployment configuration
This step provides the timings and strategies that will be used in
subsequent steps
"""
return SubDagOperator(
subdag=get_deployment_configuration(
self.parent_dag_name,
task_id,
args=self.default_args),
return DeploymentConfigurationOperator(
main_dag_name=self.parent_dag_name,
shipyard_conf=config_path,
task_id=task_id,
on_failure_callback=step_failure_handler,
dag=self.dag)
@ -254,12 +252,11 @@ class CommonStepFactory(object):
Note that trigger_rule is set to "all_done" so that this
step will run even when upstream tasks are in failed state.
"""
return SubDagOperator(
subdag=create_deckhand_tag(
self.parent_dag_name,
task_id,
args=self.default_args),
return DeckhandCreateSiteActionTagOperator(
task_id=task_id,
shipyard_conf=config_path,
on_failure_callback=step_failure_handler,
trigger_rule="all_done",
main_dag_name=self.parent_dag_name,
dag=self.dag)

View File

@ -1,31 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.models import DAG
from airflow.operators import ConcurrencyCheckOperator
def dag_concurrency_check(parent_dag_name, child_dag_name, args):
'''
dag_concurrency_check is a sub-DAG that will will allow for a DAG to
determine if it is already running, and result in an error if so.
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
dag_concurrency_check_operator = ConcurrencyCheckOperator(
task_id='dag_concurrency_check', dag=dag)
return dag

View File

@ -1,36 +0,0 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.models import DAG
from airflow.operators import DeploymentConfigurationOperator
from config_path import config_path
GET_DEPLOYMENT_CONFIGURATION_NAME = 'get_deployment_configuration'
def get_deployment_configuration(parent_dag_name, child_dag_name, args):
"""DAG to retrieve deployment configuration"""
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
deployment_configuration = DeploymentConfigurationOperator(
task_id=GET_DEPLOYMENT_CONFIGURATION_NAME,
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
dag=dag)
return dag

View File

@ -15,16 +15,16 @@
# Subdags
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
ARMADA_BUILD_DAG_NAME = 'armada_build'
CREATE_ACTION_TAG = 'create_action_tag'
DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
DESTROY_SERVER_DAG_NAME = 'destroy_server'
DRYDOCK_BUILD_DAG_NAME = 'drydock_build'
GET_DEPLOY_CONF_DAG_NAME = 'dag_deployment_configuration'
GET_RENDERED_DOC = 'get_rendered_doc'
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
# Steps
ACTION_XCOM = 'action_xcom'
CONCURRENCY_CHECK = 'dag_concurrency_check'
CREATE_ACTION_TAG = 'create_action_tag'
DECIDE_AIRFLOW_UPGRADE = 'decide_airflow_upgrade'
DEPLOYMENT_CONFIGURATION = 'deployment_configuration'
GET_RENDERED_DOC = 'get_rendered_doc'
SKIP_UPGRADE_AIRFLOW = 'skip_upgrade_airflow'
UPGRADE_AIRFLOW = 'upgrade_airflow'

View File

@ -1,36 +0,0 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.models import DAG
from airflow.operators import DeckhandCreateSiteActionTagOperator
from config_path import config_path
def create_deckhand_tag(parent_dag_name, child_dag_name, args):
'''
Create Deckhand Revision Tag
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
create_action_tag = DeckhandCreateSiteActionTagOperator(
task_id='deckhand_create_action_tag',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
return dag

View File

@ -1,36 +0,0 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.models import DAG
from airflow.operators import DeckhandRetrieveRenderedDocOperator
from config_path import config_path
def get_rendered_doc_deckhand(parent_dag_name, child_dag_name, args):
'''
Get rendered documents from Deckhand for the committed revision ID.
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
deckhand_retrieve_rendered_doc = DeckhandRetrieveRenderedDocOperator(
task_id='deckhand_retrieve_rendered_doc',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
return dag

View File

@ -54,11 +54,15 @@ armada_build = step_factory.get_armada_build()
create_action_tag = step_factory.get_create_action_tag()
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_rendered_doc.set_upstream(preflight)
deployment_configuration.set_upstream(get_rendered_doc)
validate_site_design.set_upstream(deployment_configuration)
preflight.set_upstream(action_xcom)
get_rendered_doc.set_upstream(action_xcom)
deployment_configuration.set_upstream(action_xcom)
validate_site_design.set_upstream([
preflight,
get_rendered_doc,
concurrency_check,
deployment_configuration
])
drydock_build.set_upstream(validate_site_design)
armada_build.set_upstream(drydock_build)
create_action_tag.set_upstream(armada_build)

View File

@ -39,7 +39,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
task_id='promenade_drain_node',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Clear Labels
@ -47,7 +46,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
task_id='promenade_clear_labels',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Shutdown Kubelet
@ -55,7 +53,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
task_id='promenade_shutdown_kubelet',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# ETCD Sanity Check
@ -63,7 +60,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
task_id='promenade_check_etcd',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Power down and destroy node using DryDock
@ -71,7 +67,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
task_id='destroy_node',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Decommission node from Kubernetes cluster using Promenade
@ -79,7 +74,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
task_id='promenade_decommission_node',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Define dependencies

View File

@ -32,21 +32,18 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args):
task_id='verify_site',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
drydock_prepare_site = DrydockPrepareSiteOperator(
task_id='prepare_site',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
drydock_nodes = DrydockNodesOperator(
task_id='prepare_and_deploy_nodes',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Define dependencies

View File

@ -62,11 +62,15 @@ skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow()
create_action_tag = step_factory.get_create_action_tag()
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_rendered_doc.set_upstream(preflight)
deployment_configuration.set_upstream(get_rendered_doc)
validate_site_design.set_upstream(deployment_configuration)
preflight.set_upstream(action_xcom)
get_rendered_doc.set_upstream(action_xcom)
deployment_configuration.set_upstream(action_xcom)
validate_site_design.set_upstream([
preflight,
get_rendered_doc,
concurrency_check,
deployment_configuration
])
drydock_build.set_upstream(validate_site_design)
armada_build.set_upstream(drydock_build)
decide_airflow_upgrade.set_upstream(armada_build)

View File

@ -22,9 +22,10 @@ from config_path import config_path
def validate_site_design(parent_dag_name, child_dag_name, args):
'''
Subdag to delegate design verification to the UCP components
'''
"""Subdag to delegate design verification to the UCP components
There is no wiring of steps - they all execute in parallel
"""
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
@ -33,32 +34,28 @@ def validate_site_design(parent_dag_name, child_dag_name, args):
task_id='deckhand_validate_site_design',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
retries=3,
retries=1,
dag=dag)
drydock_validate_docs = DrydockValidateDesignOperator(
task_id='drydock_validate_site_design',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
retries=3,
retries=1,
dag=dag)
armada_validate_docs = ArmadaValidateDesignOperator(
task_id='armada_validate_site_design',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
retries=3,
retries=1,
dag=dag)
promenade_validate_docs = PromenadeValidateSiteDesignOperator(
task_id='promenade_validate_site_design',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
retries=3,
retries=1,
dag=dag)
return dag

View File

@ -71,7 +71,6 @@ class UcpBaseOperator(BaseOperator):
pod_selector_pattern=None,
shipyard_conf=None,
start_time=None,
sub_dag_name=None,
xcom_push=True,
*args, **kwargs):
"""Initialization of UcpBaseOperator object.
@ -92,7 +91,6 @@ class UcpBaseOperator(BaseOperator):
log-rotate container.
:param shipyard_conf: Location of shipyard.conf
:param start_time: Time when Operator gets executed
:param sub_dag_name: Child Dag
:param xcom_push: xcom usage
"""
@ -103,7 +101,6 @@ class UcpBaseOperator(BaseOperator):
self.pod_selector_pattern = pod_selector_pattern or []
self.shipyard_conf = shipyard_conf
self.start_time = datetime.now()
self.sub_dag_name = sub_dag_name
self.xcom_push_flag = xcom_push
self.doc_utils = _get_document_util(self.shipyard_conf)
self.endpoints = service_endpoint.ServiceEndpoints(self.shipyard_conf)

View File

@ -33,7 +33,20 @@ class XcomPuller(object):
self.ti = task_instance
def _get_xcom(self, source_task, dag_id=None, key=None, log_result=True):
"""Find a particular xcom value"""
"""Find and return an xcom value
:param source_task: The name of the task that wrote the xcom
:param dag_id: The name of the subdag (of the main DAG) that contained
the source task. Let this default to None if the task is a direct
child of the main dag
:param key: The name of the xcom item that was written by the task. If
the source task allowed for the step to simply push xcom at the
end of the step, leave this None.
:param log_result: boolean to indicate if the value of the xcom should
be logged upon retreival. This can be nice for investigative
purposes, but would likely not be good for large or complex
values.
"""
if dag_id is None:
source_dag = self.mdn
else:
@ -53,8 +66,8 @@ class XcomPuller(object):
def get_deployment_configuration(self):
"""Retrieve the deployment configuration dictionary"""
source_task = 'get_deployment_configuration'
source_dag = 'dag_deployment_configuration'
source_task = 'deployment_configuration'
source_dag = None
key = None
return self._get_xcom(source_task=source_task,
dag_id=source_dag,
@ -77,7 +90,7 @@ class XcomPuller(object):
def get_check_drydock_continue_on_fail(self):
"""Check if 'drydock_continue_on_fail' key exists"""
source_task = 'ucp_preflight_check'
source_dag = 'preflight'
source_dag = None
key = 'drydock_continue_on_fail'
return self._get_xcom(source_task=source_task,
dag_id=source_dag,