From dee8887c88bb820e0722c993066f2b07191601c5 Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Fri, 20 Apr 2018 09:10:45 +0000 Subject: [PATCH] Ensure Presence of Committed Doc prior to Workflow Execution Shipyard should (1) validate that there is a current committed version of the documents, (2) Pass that committed version as a parameter to the workflow, that is then used for the entire workflow. This applies to all the current workflow, i.e. deploy_site, update_site, redeploy_server Note that We will remove the step to retrieve deckhand design version from the workflow as it will be handled as part of the Shipyard Create Action with this change. Change-Id: Ifdbdd8f1ce1b2c6afa26fdfaee86cbb2776ca715 --- .../control/action/actions_api.py | 24 ++++++++++++++++++ .../dags/common_step_factory.py | 11 ++++---- .../shipyard_airflow/dags/dag_names.py | 2 +- ...design.py => deckhand_get_rendered_doc.py} | 19 +++----------- .../shipyard_airflow/dags/deploy_site.py | 6 ++--- .../shipyard_airflow/dags/redeploy_server.py | 6 ++--- .../shipyard_airflow/dags/update_site.py | 6 ++--- .../plugins/armada_base_operator.py | 12 +++------ .../plugins/deckhand_base_operator.py | 18 ------------- .../deployment_configuration_operator.py | 10 +++++--- .../plugins/drydock_base_operator.py | 5 +--- .../plugins/ucp_base_operator.py | 6 +---- .../shipyard_airflow/plugins/xcom_puller.py | 9 ------- .../tests/unit/control/test_actions_api.py | 18 +++++++------ .../test_deployment_configuration_operator.py | 25 ++++++++++++++++--- 15 files changed, 88 insertions(+), 89 deletions(-) rename src/bin/shipyard_airflow/shipyard_airflow/dags/{deckhand_get_design.py => deckhand_get_rendered_doc.py} (63%) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_api.py b/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_api.py index 9f44971a..84dd632d 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_api.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_api.py @@ -25,6 +25,8 @@ from shipyard_airflow import policy from shipyard_airflow.control.action.action_helper import (determine_lifecycle, format_action_steps) from shipyard_airflow.control.base import BaseResource +from shipyard_airflow.control.configdocs.configdocs_helper import ( + ConfigdocsHelper) from shipyard_airflow.control.json_schemas import ACTION from shipyard_airflow.db.db import AIRFLOW_DB, SHIPYARD_DB from shipyard_airflow.errors import ApiError @@ -97,6 +99,10 @@ class ActionsResource(BaseResource): dag = SUPPORTED_ACTION_MAPPINGS.get(action['name'])['dag'] action['dag_id'] = dag + # Retrieve last committed design revision + self.configdocs_helper = ConfigdocsHelper(context) + action['committed_rev_id'] = self.get_committed_design_version() + # populate action parameters if they are not set if 'parameters' not in action: action['parameters'] = {} @@ -315,3 +321,21 @@ class ActionsResource(BaseResource): }], retry=True, ) + + def get_committed_design_version(self): + + LOG.info("Checking for committed revision in Deckhand...") + committed_rev_id = self.configdocs_helper._get_committed_rev_id() + + if committed_rev_id: + LOG.info("The committed revision in Deckhand is %d", + committed_rev_id) + + return committed_rev_id + + else: + raise ApiError( + title='Unable to locate any committed revision in Deckhand', + description='No committed version found in Deckhand', + status=falcon.HTTP_404, + retry=False) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py index 8453cbd0..05ae71bc 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py @@ -19,7 +19,7 @@ from airflow.operators.subdag_operator import SubDagOperator from armada_deploy_site import deploy_site_armada import dag_names as dn -from deckhand_get_design import get_design_deckhand +from deckhand_get_rendered_doc import get_rendered_doc_deckhand from destroy_node import destroy_server from drydock_deploy_site import deploy_site_drydock from failure_handlers import step_failure_handler @@ -88,13 +88,14 @@ class CommonStepFactory(object): on_failure_callback=step_failure_handler, dag=self.dag) - def get_get_design_version(self, task_id=dn.GET_DESIGN_VERSION): - """Generate the get design version step + def get_get_rendered_doc(self, task_id=dn.GET_RENDERED_DOC): + """Generate the get deckhand rendered doc step - Retrieves the version of the design to use from deckhand + Check that we are able to render the docs before proceeding + further with the workflow """ return SubDagOperator( - subdag=get_design_deckhand( + subdag=get_rendered_doc_deckhand( self.parent_dag_name, task_id, args=self.default_args), diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py index a7af54e6..ea608e85 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py @@ -16,7 +16,7 @@ ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' ARMADA_BUILD_DAG_NAME = 'armada_build' DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' -GET_DESIGN_VERSION = 'get_design_version' +GET_RENDERED_DOC = 'get_rendered_doc' GET_DEPLOY_CONF_DAG_NAME = 'dag_deployment_configuration' DRYDOCK_BUILD_DAG_NAME = 'drydock_build' VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_get_design.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_get_rendered_doc.py similarity index 63% rename from src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_get_design.py rename to src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_get_rendered_doc.py index 138e23e5..3b6f5470 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_get_design.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/deckhand_get_rendered_doc.py @@ -13,35 +13,24 @@ # limitations under the License. from airflow.models import DAG -from airflow.operators import DeckhandGetDesignOperator from airflow.operators import DeckhandRetrieveRenderedDocOperator from config_path import config_path -def get_design_deckhand(parent_dag_name, child_dag_name, args): +def get_rendered_doc_deckhand(parent_dag_name, child_dag_name, args): ''' - Get Deckhand Design Version + Get rendered documents from Deckhand for the committed revision ID. ''' dag = DAG( '{}.{}'.format(parent_dag_name, child_dag_name), default_args=args) - deckhand_design = DeckhandGetDesignOperator( - task_id='deckhand_get_design_version', + deckhand_retrieve_rendered_doc = DeckhandRetrieveRenderedDocOperator( + task_id='deckhand_retrieve_rendered_doc', shipyard_conf=config_path, main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, dag=dag) - shipyard_retrieve_rendered_doc = DeckhandRetrieveRenderedDocOperator( - task_id='shipyard_retrieve_rendered_doc', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, - dag=dag) - - # Define dependencies - shipyard_retrieve_rendered_doc.set_upstream(deckhand_design) - return dag diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py index 0edfd877..6a36f526 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py @@ -46,7 +46,7 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME, action_xcom = step_factory.get_action_xcom() concurrency_check = step_factory.get_concurrency_check() preflight = step_factory.get_preflight() -get_design_version = step_factory.get_get_design_version() +get_rendered_doc = step_factory.get_get_rendered_doc() deployment_configuration = step_factory.get_deployment_configuration() validate_site_design = step_factory.get_validate_site_design() drydock_build = step_factory.get_drydock_build() @@ -55,8 +55,8 @@ armada_build = step_factory.get_armada_build() # DAG Wiring concurrency_check.set_upstream(action_xcom) preflight.set_upstream(concurrency_check) -get_design_version.set_upstream(preflight) -deployment_configuration.set_upstream(get_design_version) +get_rendered_doc.set_upstream(preflight) +deployment_configuration.set_upstream(get_rendered_doc) validate_site_design.set_upstream(deployment_configuration) drydock_build.set_upstream(validate_site_design) armada_build.set_upstream(drydock_build) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/redeploy_server.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/redeploy_server.py index d2fd6b65..37efa287 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/redeploy_server.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/redeploy_server.py @@ -47,7 +47,7 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME, action_xcom = step_factory.get_action_xcom() concurrency_check = step_factory.get_concurrency_check() preflight = step_factory.get_preflight() -get_design_version = step_factory.get_get_design_version() +get_rendered_doc = step_factory.get_get_rendered_doc() deployment_configuration = step_factory.get_deployment_configuration() validate_site_design = step_factory.get_validate_site_design() destroy_server = step_factory.get_destroy_server() @@ -56,8 +56,8 @@ drydock_build = step_factory.get_drydock_build() # DAG Wiring concurrency_check.set_upstream(action_xcom) preflight.set_upstream(concurrency_check) -get_design_version.set_upstream(preflight) -deployment_configuration.set_upstream(get_design_version) +get_rendered_doc.set_upstream(preflight) +deployment_configuration.set_upstream(get_rendered_doc) validate_site_design.set_upstream(deployment_configuration) destroy_server.set_upstream(validate_site_design) drydock_build.set_upstream(destroy_server) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py index 40ce4270..2c8ccec0 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py @@ -50,7 +50,7 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME, action_xcom = step_factory.get_action_xcom() concurrency_check = step_factory.get_concurrency_check() -get_design_version = step_factory.get_get_design_version() +get_rendered_doc = step_factory.get_get_rendered_doc() deployment_configuration = step_factory.get_deployment_configuration() validate_site_design = step_factory.get_validate_site_design() drydock_build = step_factory.get_drydock_build() @@ -61,8 +61,8 @@ skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow() # DAG Wiring concurrency_check.set_upstream(action_xcom) -get_design_version.set_upstream(concurrency_check) -deployment_configuration.set_upstream(get_design_version) +get_rendered_doc.set_upstream(concurrency_check) +deployment_configuration.set_upstream(get_rendered_doc) validate_site_design.set_upstream(deployment_configuration) drydock_build.set_upstream(validate_site_design) armada_build.set_upstream(drydock_build) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_base_operator.py index a66415d9..8f409762 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_base_operator.py @@ -93,13 +93,9 @@ class ArmadaBaseOperator(UcpBaseOperator): deckhand_svc_endpoint = ucp_service_endpoint( self, svc_type=self.deckhand_svc_type) - # Retrieve last committed revision id - committed_revision_id = self.xcom_puller.get_design_version() - # Get deckhand design reference url self.deckhand_design_ref = self._init_deckhand_design_ref( - deckhand_svc_endpoint, - committed_revision_id) + deckhand_svc_endpoint) @staticmethod def _init_armada_client(armada_svc_endpoint, svc_token): @@ -137,9 +133,7 @@ class ArmadaBaseOperator(UcpBaseOperator): else: raise AirflowException("Failed to set up Armada client!") - @staticmethod - def _init_deckhand_design_ref(deckhand_svc_endpoint, - committed_revision_id): + def _init_deckhand_design_ref(self, deckhand_svc_endpoint): LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint) @@ -148,7 +142,7 @@ class ArmadaBaseOperator(UcpBaseOperator): deckhand_path = "deckhand+" + deckhand_svc_endpoint _deckhand_design_ref = os.path.join(deckhand_path, "revisions", - str(committed_revision_id), + str(self.revision_id), "rendered-documents") if _deckhand_design_ref: diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_base_operator.py index 491c9394..72002f8b 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_base_operator.py @@ -113,24 +113,6 @@ class DeckhandBaseOperator(UcpBaseOperator): if not self.deckhandclient: raise AirflowException('Failed to set up deckhand client!') - # Retrieve 'revision_id' from xcom for tasks other than - # 'deckhand_get_design_version' - # - # NOTE: In the case of 'deploy_site', the dag_id will - # be 'deploy_site.get_design_version' for the - # 'deckhand_get_design_version' task. We need to extract - # the xcom value from it in order to get the value of the - # last committed revision ID - if self.task_id != 'deckhand_get_design_version': - - # Retrieve 'revision_id' from xcom - self.revision_id = self.xcom_puller.get_design_version() - - if self.revision_id: - LOG.info("Revision ID is %d", self.revision_id) - else: - raise AirflowException('Failed to retrieve Revision ID!') - class DeckhandBaseOperatorPlugin(AirflowPlugin): diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deployment_configuration_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deployment_configuration_operator.py index 70c6b9f3..b307745e 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deployment_configuration_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deployment_configuration_operator.py @@ -103,9 +103,13 @@ class DeploymentConfigurationOperator(BaseOperator): if task_instance: LOG.debug("task_instance found, extracting design version") # Set the revision_id to the revision on the xcom - revision_id = task_instance.xcom_pull( - task_ids='deckhand_get_design_version', - dag_id=self.main_dag_name + '.get_design_version') + action_info = task_instance.xcom_pull( + task_ids='action_xcom', + dag_id=self.main_dag_name, + key='action') + + revision_id = action_info['committed_rev_id'] + if revision_id: LOG.info("Revision is set to: %s for deployment configuration", revision_id) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py index 9d61bd6c..3b3cf141 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py @@ -153,15 +153,12 @@ class DrydockBaseOperator(UcpBaseOperator): LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint) - # Retrieve last committed revision id - committed_revision_id = self.xcom_puller.get_design_version() - # Form DeckHand Design Reference Path # This URL will be used to retrieve the Site Design YAMLs deckhand_path = "deckhand+" + deckhand_svc_endpoint self.deckhand_design_ref = os.path.join(deckhand_path, "revisions", - str(committed_revision_id), + str(self.revision_id), "rendered-documents") if self.deckhand_design_ref: LOG.info("Design YAMLs will be retrieved from %s", diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py index 368f60db..babac0d0 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py @@ -86,11 +86,6 @@ class UcpBaseOperator(BaseOperator): # Exeute child function self.do_execute() - # Push last committed version to xcom for the - # 'get_design_version' subdag - if self.sub_dag_name == 'get_design_version': - return self.committed_ver - def ucp_base(self, context): LOG.info("Running UCP Base Operator...") @@ -109,6 +104,7 @@ class UcpBaseOperator(BaseOperator): self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance) self.action_info = self.xcom_puller.get_action_info() self.dc = self.xcom_puller.get_deployment_configuration() + self.revision_id = self.action_info['committed_rev_id'] def get_k8s_logs(self): """Retrieve Kubernetes pod/container logs specified by an opererator diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/xcom_puller.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/xcom_puller.py index e400fc66..b63207b9 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/xcom_puller.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/xcom_puller.py @@ -73,12 +73,3 @@ class XcomPuller(object): return self._get_xcom(source_task=source_task, dag_id=source_dag, key=key) - - def get_design_version(self): - """Retrieve the design version being used for this workflow""" - source_task = 'deckhand_get_design_version' - source_dag = 'get_design_version' - key = None - return self._get_xcom(source_task=source_task, - dag_id=source_dag, - key=key) diff --git a/src/bin/shipyard_airflow/tests/unit/control/test_actions_api.py b/src/bin/shipyard_airflow/tests/unit/control/test_actions_api.py index a8f6abda..7c581ecc 100644 --- a/src/bin/shipyard_airflow/tests/unit/control/test_actions_api.py +++ b/src/bin/shipyard_airflow/tests/unit/control/test_actions_api.py @@ -11,18 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging - -import json -import os from datetime import datetime -import mock +from falcon import testing from mock import patch from oslo_config import cfg -import pytest - import falcon -from falcon import testing +import json +import logging +import mock +import os +import pytest import responses from shipyard_airflow.control.action import actions_api @@ -35,6 +33,7 @@ DATE_ONE = datetime(2017, 9, 13, 11, 13, 3, 57000) DATE_TWO = datetime(2017, 9, 13, 11, 13, 5, 57000) DATE_ONE_STR = DATE_ONE.strftime('%Y-%m-%dT%H:%M:%S') DATE_TWO_STR = DATE_TWO.strftime('%Y-%m-%dT%H:%M:%S') +DESIGN_VERSION = 1 CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -299,6 +298,7 @@ def test_create_action(): action_resource.invoke_airflow_dag = airflow_stub action_resource.insert_action = insert_action_stub action_resource.audit_control_command_db = audit_control_command_db + action_resource.get_committed_design_version = lambda: DESIGN_VERSION # with invalid input. fail. try: @@ -326,6 +326,7 @@ def test_create_action(): assert len(action['id']) == 26 assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402' assert action['dag_status'] == 'SCHEDULED' + assert action['committed_rev_id'] == 1 except ApiError: assert False, 'Should not raise an ApiError' @@ -338,6 +339,7 @@ def test_create_action(): assert len(action['id']) == 26 assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402' assert action['dag_status'] == 'SCHEDULED' + assert action['committed_rev_id'] == 1 except ApiError: assert False, 'Should not raise an ApiError' diff --git a/src/bin/shipyard_airflow/tests/unit/plugins/test_deployment_configuration_operator.py b/src/bin/shipyard_airflow/tests/unit/plugins/test_deployment_configuration_operator.py index 24c55aa6..57f8797e 100644 --- a/src/bin/shipyard_airflow/tests/unit/plugins/test_deployment_configuration_operator.py +++ b/src/bin/shipyard_airflow/tests/unit/plugins/test_deployment_configuration_operator.py @@ -18,6 +18,25 @@ import yaml import airflow from airflow.exceptions import AirflowException +ACTION_INFO = { + 'committed_rev_id': 2, + 'dag_id': 'deploy_site', + 'id': '01CBGWY1GXQVXVCXRJKM9V71AT', + 'name': 'deploy_site', + 'parameters': {}, + 'timestamp': '2018-04-20 06:47:43.905047', + 'user': 'shipyard'} + +ACTION_INFO_NO_COMMIT = { + 'committed_rev_id': None, + 'dag_id': 'deploy_site', + 'id': '01CBGWY1GXQVXVCXRJKM9V71AT', + 'name': 'deploy_site', + 'parameters': {}, + 'timestamp': '2018-04-20 06:47:43.905047', + 'user': 'shipyard'} + + try: from deployment_configuration_operator import ( DeploymentConfigurationOperator @@ -62,7 +81,7 @@ def test_execute_no_client(p1): @mock.patch.object(airflow.models.TaskInstance, 'xcom_pull', - return_value=99) + return_value=ACTION_INFO) def test_get_revision_id(ti): """Test that get revision id follows desired exits""" dco = DeploymentConfigurationOperator(main_dag_name="main", @@ -71,11 +90,11 @@ def test_get_revision_id(ti): ti = airflow.models.TaskInstance(task=mock.MagicMock(), execution_date="no") rid = dco.get_revision_id(ti) - assert rid == 99 + assert rid == 2 @mock.patch.object(airflow.models.TaskInstance, 'xcom_pull', - return_value=None) + return_value=ACTION_INFO_NO_COMMIT) def test_get_revision_id_none(ti): """Test that get revision id follows desired exits""" dco = DeploymentConfigurationOperator(main_dag_name="main",