diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/armada_deploy_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/armada_deploy_site.py index c301697c..47440160 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/armada_deploy_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/armada_deploy_site.py @@ -33,7 +33,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args): task_id='armada_get_status', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, dag=dag) # Armada Apply @@ -41,7 +40,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args): task_id='armada_post_apply', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, retries=3, dag=dag) @@ -50,7 +48,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args): task_id='armada_get_releases', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, dag=dag) # Define dependencies diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py index 69687978..61f4d2e6 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py @@ -12,15 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. from airflow.operators import ConcurrencyCheckOperator +from airflow.operators import DeckhandRetrieveRenderedDocOperator +from airflow.operators import DeploymentConfigurationOperator +from airflow.operators import DeckhandCreateSiteActionTagOperator + from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator from armada_deploy_site import deploy_site_armada -from dag_deployment_configuration import get_deployment_configuration -from deckhand_create_tag import create_deckhand_tag -from deckhand_get_rendered_doc import get_rendered_doc_deckhand +from config_path import config_path from destroy_node import destroy_server from drydock_deploy_site import deploy_site_drydock from failure_handlers import step_failure_handler @@ -64,7 +66,7 @@ class CommonStepFactory(object): dag=self.dag, python_callable=xcom_push) - def get_concurrency_check(self, task_id=dn.DAG_CONCURRENCY_CHECK_DAG_NAME): + def get_concurrency_check(self, task_id=dn.CONCURRENCY_CHECK): """Generate the concurrency check step Concurrency check prevents simultaneous execution of dags that should @@ -95,11 +97,9 @@ class CommonStepFactory(object): Check that we are able to render the docs before proceeding further with the workflow """ - return SubDagOperator( - subdag=get_rendered_doc_deckhand( - self.parent_dag_name, - task_id, - args=self.default_args), + return DeckhandRetrieveRenderedDocOperator( + shipyard_conf=config_path, + main_dag_name=self.parent_dag_name, task_id=task_id, on_failure_callback=step_failure_handler, dag=self.dag) @@ -121,17 +121,15 @@ class CommonStepFactory(object): dag=self.dag) def get_deployment_configuration(self, - task_id=dn.GET_DEPLOY_CONF_DAG_NAME): + task_id=dn.DEPLOYMENT_CONFIGURATION): """Generate the step to retrieve the deployment configuration This step provides the timings and strategies that will be used in subsequent steps """ - return SubDagOperator( - subdag=get_deployment_configuration( - self.parent_dag_name, - task_id, - args=self.default_args), + return DeploymentConfigurationOperator( + main_dag_name=self.parent_dag_name, + shipyard_conf=config_path, task_id=task_id, on_failure_callback=step_failure_handler, dag=self.dag) @@ -254,12 +252,11 @@ class CommonStepFactory(object): Note that trigger_rule is set to "all_done" so that this step will run even when upstream tasks are in failed state. """ - return SubDagOperator( - subdag=create_deckhand_tag( - self.parent_dag_name, - task_id, - args=self.default_args), + + return DeckhandCreateSiteActionTagOperator( task_id=task_id, + shipyard_conf=config_path, on_failure_callback=step_failure_handler, trigger_rule="all_done", + main_dag_name=self.parent_dag_name, dag=self.dag) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_concurrency_check.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_concurrency_check.py deleted file mode 100644 index 33c8c658..00000000 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_concurrency_check.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from airflow.models import DAG -from airflow.operators import ConcurrencyCheckOperator - - -def dag_concurrency_check(parent_dag_name, child_dag_name, args): - ''' - dag_concurrency_check is a sub-DAG that will will allow for a DAG to - determine if it is already running, and result in an error if so. - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - dag_concurrency_check_operator = ConcurrencyCheckOperator( - task_id='dag_concurrency_check', dag=dag) - - return dag diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_deployment_configuration.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_deployment_configuration.py deleted file mode 100644 index e595186a..00000000 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_deployment_configuration.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright 2018 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from airflow.models import DAG -from airflow.operators import DeploymentConfigurationOperator - -from config_path import config_path - - -GET_DEPLOYMENT_CONFIGURATION_NAME = 'get_deployment_configuration' - - -def get_deployment_configuration(parent_dag_name, child_dag_name, args): - """DAG to retrieve deployment configuration""" - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) - - deployment_configuration = DeploymentConfigurationOperator( - task_id=GET_DEPLOYMENT_CONFIGURATION_NAME, - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) - - return dag diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py index b28edce8..2c30a49f 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py @@ -15,16 +15,16 @@ # Subdags ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' ARMADA_BUILD_DAG_NAME = 'armada_build' -CREATE_ACTION_TAG = 'create_action_tag' -DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' DESTROY_SERVER_DAG_NAME = 'destroy_server' DRYDOCK_BUILD_DAG_NAME = 'drydock_build' -GET_DEPLOY_CONF_DAG_NAME = 'dag_deployment_configuration' -GET_RENDERED_DOC = 'get_rendered_doc' VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' # Steps ACTION_XCOM = 'action_xcom' +CONCURRENCY_CHECK = 'dag_concurrency_check' +CREATE_ACTION_TAG = 'create_action_tag' DECIDE_AIRFLOW_UPGRADE = 'decide_airflow_upgrade' +DEPLOYMENT_CONFIGURATION = 'deployment_configuration' +GET_RENDERED_DOC = 'get_rendered_doc' SKIP_UPGRADE_AIRFLOW = 'skip_upgrade_airflow' UPGRADE_AIRFLOW = 'upgrade_airflow' diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_create_tag.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_create_tag.py deleted file mode 100644 index 37728282..00000000 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_create_tag.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright 2018 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from airflow.models import DAG -from airflow.operators import DeckhandCreateSiteActionTagOperator - -from config_path import config_path - - -def create_deckhand_tag(parent_dag_name, child_dag_name, args): - ''' - Create Deckhand Revision Tag - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) - - create_action_tag = DeckhandCreateSiteActionTagOperator( - task_id='deckhand_create_action_tag', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, - dag=dag) - - return dag diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_get_rendered_doc.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_get_rendered_doc.py deleted file mode 100644 index 3b6f5470..00000000 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_get_rendered_doc.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright 2018 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from airflow.models import DAG -from airflow.operators import DeckhandRetrieveRenderedDocOperator - -from config_path import config_path - - -def get_rendered_doc_deckhand(parent_dag_name, child_dag_name, args): - ''' - Get rendered documents from Deckhand for the committed revision ID. - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) - - deckhand_retrieve_rendered_doc = DeckhandRetrieveRenderedDocOperator( - task_id='deckhand_retrieve_rendered_doc', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, - dag=dag) - - return dag diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py index ddfc82f0..5881e6f6 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py @@ -54,11 +54,15 @@ armada_build = step_factory.get_armada_build() create_action_tag = step_factory.get_create_action_tag() # DAG Wiring -concurrency_check.set_upstream(action_xcom) -preflight.set_upstream(concurrency_check) -get_rendered_doc.set_upstream(preflight) -deployment_configuration.set_upstream(get_rendered_doc) -validate_site_design.set_upstream(deployment_configuration) +preflight.set_upstream(action_xcom) +get_rendered_doc.set_upstream(action_xcom) +deployment_configuration.set_upstream(action_xcom) +validate_site_design.set_upstream([ + preflight, + get_rendered_doc, + concurrency_check, + deployment_configuration +]) drydock_build.set_upstream(validate_site_design) armada_build.set_upstream(drydock_build) create_action_tag.set_upstream(armada_build) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py index 33365753..c8f3ab2f 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py @@ -39,7 +39,6 @@ def destroy_server(parent_dag_name, child_dag_name, args): task_id='promenade_drain_node', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, dag=dag) # Clear Labels @@ -47,7 +46,6 @@ def destroy_server(parent_dag_name, child_dag_name, args): task_id='promenade_clear_labels', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, dag=dag) # Shutdown Kubelet @@ -55,7 +53,6 @@ def destroy_server(parent_dag_name, child_dag_name, args): task_id='promenade_shutdown_kubelet', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, dag=dag) # ETCD Sanity Check @@ -63,7 +60,6 @@ def destroy_server(parent_dag_name, child_dag_name, args): task_id='promenade_check_etcd', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, dag=dag) # Power down and destroy node using DryDock @@ -71,7 +67,6 @@ def destroy_server(parent_dag_name, child_dag_name, args): task_id='destroy_node', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, dag=dag) # Decommission node from Kubernetes cluster using Promenade @@ -79,7 +74,6 @@ def destroy_server(parent_dag_name, child_dag_name, args): task_id='promenade_decommission_node', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, dag=dag) # Define dependencies diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py index 98218a23..f635a382 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py @@ -32,21 +32,18 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args): task_id='verify_site', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, dag=dag) drydock_prepare_site = DrydockPrepareSiteOperator( task_id='prepare_site', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, dag=dag) drydock_nodes = DrydockNodesOperator( task_id='prepare_and_deploy_nodes', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, dag=dag) # Define dependencies diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py index 380abafc..af8bc22a 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py @@ -62,11 +62,15 @@ skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow() create_action_tag = step_factory.get_create_action_tag() # DAG Wiring -concurrency_check.set_upstream(action_xcom) -preflight.set_upstream(concurrency_check) -get_rendered_doc.set_upstream(preflight) -deployment_configuration.set_upstream(get_rendered_doc) -validate_site_design.set_upstream(deployment_configuration) +preflight.set_upstream(action_xcom) +get_rendered_doc.set_upstream(action_xcom) +deployment_configuration.set_upstream(action_xcom) +validate_site_design.set_upstream([ + preflight, + get_rendered_doc, + concurrency_check, + deployment_configuration +]) drydock_build.set_upstream(validate_site_design) armada_build.set_upstream(drydock_build) decide_airflow_upgrade.set_upstream(armada_build) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/validate_site_design.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/validate_site_design.py index 5925f43a..ccbedb5a 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/validate_site_design.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/validate_site_design.py @@ -22,9 +22,10 @@ from config_path import config_path def validate_site_design(parent_dag_name, child_dag_name, args): - ''' - Subdag to delegate design verification to the UCP components - ''' + """Subdag to delegate design verification to the UCP components + + There is no wiring of steps - they all execute in parallel + """ dag = DAG( '{}.{}'.format(parent_dag_name, child_dag_name), default_args=args) @@ -33,32 +34,28 @@ def validate_site_design(parent_dag_name, child_dag_name, args): task_id='deckhand_validate_site_design', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, - retries=3, + retries=1, dag=dag) drydock_validate_docs = DrydockValidateDesignOperator( task_id='drydock_validate_site_design', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, - retries=3, + retries=1, dag=dag) armada_validate_docs = ArmadaValidateDesignOperator( task_id='armada_validate_site_design', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, - retries=3, + retries=1, dag=dag) promenade_validate_docs = PromenadeValidateSiteDesignOperator( task_id='promenade_validate_site_design', shipyard_conf=config_path, main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, - retries=3, + retries=1, dag=dag) return dag diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py index c4685845..d26243e3 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py @@ -71,7 +71,6 @@ class UcpBaseOperator(BaseOperator): pod_selector_pattern=None, shipyard_conf=None, start_time=None, - sub_dag_name=None, xcom_push=True, *args, **kwargs): """Initialization of UcpBaseOperator object. @@ -92,7 +91,6 @@ class UcpBaseOperator(BaseOperator): log-rotate container. :param shipyard_conf: Location of shipyard.conf :param start_time: Time when Operator gets executed - :param sub_dag_name: Child Dag :param xcom_push: xcom usage """ @@ -103,7 +101,6 @@ class UcpBaseOperator(BaseOperator): self.pod_selector_pattern = pod_selector_pattern or [] self.shipyard_conf = shipyard_conf self.start_time = datetime.now() - self.sub_dag_name = sub_dag_name self.xcom_push_flag = xcom_push self.doc_utils = _get_document_util(self.shipyard_conf) self.endpoints = service_endpoint.ServiceEndpoints(self.shipyard_conf) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/xcom_puller.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/xcom_puller.py index 7f4dc69e..8ff04246 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/xcom_puller.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/xcom_puller.py @@ -33,7 +33,20 @@ class XcomPuller(object): self.ti = task_instance def _get_xcom(self, source_task, dag_id=None, key=None, log_result=True): - """Find a particular xcom value""" + """Find and return an xcom value + + :param source_task: The name of the task that wrote the xcom + :param dag_id: The name of the subdag (of the main DAG) that contained + the source task. Let this default to None if the task is a direct + child of the main dag + :param key: The name of the xcom item that was written by the task. If + the source task allowed for the step to simply push xcom at the + end of the step, leave this None. + :param log_result: boolean to indicate if the value of the xcom should + be logged upon retreival. This can be nice for investigative + purposes, but would likely not be good for large or complex + values. + """ if dag_id is None: source_dag = self.mdn else: @@ -53,8 +66,8 @@ class XcomPuller(object): def get_deployment_configuration(self): """Retrieve the deployment configuration dictionary""" - source_task = 'get_deployment_configuration' - source_dag = 'dag_deployment_configuration' + source_task = 'deployment_configuration' + source_dag = None key = None return self._get_xcom(source_task=source_task, dag_id=source_dag, @@ -77,7 +90,7 @@ class XcomPuller(object): def get_check_drydock_continue_on_fail(self): """Check if 'drydock_continue_on_fail' key exists""" source_task = 'ucp_preflight_check' - source_dag = 'preflight' + source_dag = None key = 'drydock_continue_on_fail' return self._get_xcom(source_task=source_task, dag_id=source_dag,