Ensure Presence of Committed Doc prior to Workflow Execution

Shipyard should (1) validate that there is a current committed version
of the documents, (2) Pass that committed version as a parameter to the
workflow, that is then used for the entire workflow.

This applies to all the current workflow, i.e. deploy_site, update_site,
redeploy_server

Note that We will remove the step to retrieve deckhand design version from
the workflow as it will be handled as part of the Shipyard Create Action
with this change.

Change-Id: Ifdbdd8f1ce1b2c6afa26fdfaee86cbb2776ca715
This commit is contained in:
Anthony Lin 2018-04-20 09:10:45 +00:00
parent 769d0ded47
commit dee8887c88
15 changed files with 88 additions and 89 deletions

View File

@ -25,6 +25,8 @@ from shipyard_airflow import policy
from shipyard_airflow.control.action.action_helper import (determine_lifecycle,
format_action_steps)
from shipyard_airflow.control.base import BaseResource
from shipyard_airflow.control.configdocs.configdocs_helper import (
ConfigdocsHelper)
from shipyard_airflow.control.json_schemas import ACTION
from shipyard_airflow.db.db import AIRFLOW_DB, SHIPYARD_DB
from shipyard_airflow.errors import ApiError
@ -97,6 +99,10 @@ class ActionsResource(BaseResource):
dag = SUPPORTED_ACTION_MAPPINGS.get(action['name'])['dag']
action['dag_id'] = dag
# Retrieve last committed design revision
self.configdocs_helper = ConfigdocsHelper(context)
action['committed_rev_id'] = self.get_committed_design_version()
# populate action parameters if they are not set
if 'parameters' not in action:
action['parameters'] = {}
@ -315,3 +321,21 @@ class ActionsResource(BaseResource):
}],
retry=True,
)
def get_committed_design_version(self):
LOG.info("Checking for committed revision in Deckhand...")
committed_rev_id = self.configdocs_helper._get_committed_rev_id()
if committed_rev_id:
LOG.info("The committed revision in Deckhand is %d",
committed_rev_id)
return committed_rev_id
else:
raise ApiError(
title='Unable to locate any committed revision in Deckhand',
description='No committed version found in Deckhand',
status=falcon.HTTP_404,
retry=False)

View File

@ -19,7 +19,7 @@ from airflow.operators.subdag_operator import SubDagOperator
from armada_deploy_site import deploy_site_armada
import dag_names as dn
from deckhand_get_design import get_design_deckhand
from deckhand_get_rendered_doc import get_rendered_doc_deckhand
from destroy_node import destroy_server
from drydock_deploy_site import deploy_site_drydock
from failure_handlers import step_failure_handler
@ -88,13 +88,14 @@ class CommonStepFactory(object):
on_failure_callback=step_failure_handler,
dag=self.dag)
def get_get_design_version(self, task_id=dn.GET_DESIGN_VERSION):
"""Generate the get design version step
def get_get_rendered_doc(self, task_id=dn.GET_RENDERED_DOC):
"""Generate the get deckhand rendered doc step
Retrieves the version of the design to use from deckhand
Check that we are able to render the docs before proceeding
further with the workflow
"""
return SubDagOperator(
subdag=get_design_deckhand(
subdag=get_rendered_doc_deckhand(
self.parent_dag_name,
task_id,
args=self.default_args),

View File

@ -16,7 +16,7 @@
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
ARMADA_BUILD_DAG_NAME = 'armada_build'
DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
GET_DESIGN_VERSION = 'get_design_version'
GET_RENDERED_DOC = 'get_rendered_doc'
GET_DEPLOY_CONF_DAG_NAME = 'dag_deployment_configuration'
DRYDOCK_BUILD_DAG_NAME = 'drydock_build'
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'

View File

@ -13,35 +13,24 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators import DeckhandGetDesignOperator
from airflow.operators import DeckhandRetrieveRenderedDocOperator
from config_path import config_path
def get_design_deckhand(parent_dag_name, child_dag_name, args):
def get_rendered_doc_deckhand(parent_dag_name, child_dag_name, args):
'''
Get Deckhand Design Version
Get rendered documents from Deckhand for the committed revision ID.
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
deckhand_design = DeckhandGetDesignOperator(
task_id='deckhand_get_design_version',
deckhand_retrieve_rendered_doc = DeckhandRetrieveRenderedDocOperator(
task_id='deckhand_retrieve_rendered_doc',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
shipyard_retrieve_rendered_doc = DeckhandRetrieveRenderedDocOperator(
task_id='shipyard_retrieve_rendered_doc',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Define dependencies
shipyard_retrieve_rendered_doc.set_upstream(deckhand_design)
return dag

View File

@ -46,7 +46,7 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
action_xcom = step_factory.get_action_xcom()
concurrency_check = step_factory.get_concurrency_check()
preflight = step_factory.get_preflight()
get_design_version = step_factory.get_get_design_version()
get_rendered_doc = step_factory.get_get_rendered_doc()
deployment_configuration = step_factory.get_deployment_configuration()
validate_site_design = step_factory.get_validate_site_design()
drydock_build = step_factory.get_drydock_build()
@ -55,8 +55,8 @@ armada_build = step_factory.get_armada_build()
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_design_version.set_upstream(preflight)
deployment_configuration.set_upstream(get_design_version)
get_rendered_doc.set_upstream(preflight)
deployment_configuration.set_upstream(get_rendered_doc)
validate_site_design.set_upstream(deployment_configuration)
drydock_build.set_upstream(validate_site_design)
armada_build.set_upstream(drydock_build)

View File

@ -47,7 +47,7 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
action_xcom = step_factory.get_action_xcom()
concurrency_check = step_factory.get_concurrency_check()
preflight = step_factory.get_preflight()
get_design_version = step_factory.get_get_design_version()
get_rendered_doc = step_factory.get_get_rendered_doc()
deployment_configuration = step_factory.get_deployment_configuration()
validate_site_design = step_factory.get_validate_site_design()
destroy_server = step_factory.get_destroy_server()
@ -56,8 +56,8 @@ drydock_build = step_factory.get_drydock_build()
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_design_version.set_upstream(preflight)
deployment_configuration.set_upstream(get_design_version)
get_rendered_doc.set_upstream(preflight)
deployment_configuration.set_upstream(get_rendered_doc)
validate_site_design.set_upstream(deployment_configuration)
destroy_server.set_upstream(validate_site_design)
drydock_build.set_upstream(destroy_server)

View File

@ -50,7 +50,7 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
action_xcom = step_factory.get_action_xcom()
concurrency_check = step_factory.get_concurrency_check()
get_design_version = step_factory.get_get_design_version()
get_rendered_doc = step_factory.get_get_rendered_doc()
deployment_configuration = step_factory.get_deployment_configuration()
validate_site_design = step_factory.get_validate_site_design()
drydock_build = step_factory.get_drydock_build()
@ -61,8 +61,8 @@ skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow()
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
get_design_version.set_upstream(concurrency_check)
deployment_configuration.set_upstream(get_design_version)
get_rendered_doc.set_upstream(concurrency_check)
deployment_configuration.set_upstream(get_rendered_doc)
validate_site_design.set_upstream(deployment_configuration)
drydock_build.set_upstream(validate_site_design)
armada_build.set_upstream(drydock_build)

View File

@ -93,13 +93,9 @@ class ArmadaBaseOperator(UcpBaseOperator):
deckhand_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.deckhand_svc_type)
# Retrieve last committed revision id
committed_revision_id = self.xcom_puller.get_design_version()
# Get deckhand design reference url
self.deckhand_design_ref = self._init_deckhand_design_ref(
deckhand_svc_endpoint,
committed_revision_id)
deckhand_svc_endpoint)
@staticmethod
def _init_armada_client(armada_svc_endpoint, svc_token):
@ -137,9 +133,7 @@ class ArmadaBaseOperator(UcpBaseOperator):
else:
raise AirflowException("Failed to set up Armada client!")
@staticmethod
def _init_deckhand_design_ref(deckhand_svc_endpoint,
committed_revision_id):
def _init_deckhand_design_ref(self, deckhand_svc_endpoint):
LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
@ -148,7 +142,7 @@ class ArmadaBaseOperator(UcpBaseOperator):
deckhand_path = "deckhand+" + deckhand_svc_endpoint
_deckhand_design_ref = os.path.join(deckhand_path,
"revisions",
str(committed_revision_id),
str(self.revision_id),
"rendered-documents")
if _deckhand_design_ref:

View File

@ -113,24 +113,6 @@ class DeckhandBaseOperator(UcpBaseOperator):
if not self.deckhandclient:
raise AirflowException('Failed to set up deckhand client!')
# Retrieve 'revision_id' from xcom for tasks other than
# 'deckhand_get_design_version'
#
# NOTE: In the case of 'deploy_site', the dag_id will
# be 'deploy_site.get_design_version' for the
# 'deckhand_get_design_version' task. We need to extract
# the xcom value from it in order to get the value of the
# last committed revision ID
if self.task_id != 'deckhand_get_design_version':
# Retrieve 'revision_id' from xcom
self.revision_id = self.xcom_puller.get_design_version()
if self.revision_id:
LOG.info("Revision ID is %d", self.revision_id)
else:
raise AirflowException('Failed to retrieve Revision ID!')
class DeckhandBaseOperatorPlugin(AirflowPlugin):

View File

@ -103,9 +103,13 @@ class DeploymentConfigurationOperator(BaseOperator):
if task_instance:
LOG.debug("task_instance found, extracting design version")
# Set the revision_id to the revision on the xcom
revision_id = task_instance.xcom_pull(
task_ids='deckhand_get_design_version',
dag_id=self.main_dag_name + '.get_design_version')
action_info = task_instance.xcom_pull(
task_ids='action_xcom',
dag_id=self.main_dag_name,
key='action')
revision_id = action_info['committed_rev_id']
if revision_id:
LOG.info("Revision is set to: %s for deployment configuration",
revision_id)

View File

@ -153,15 +153,12 @@ class DrydockBaseOperator(UcpBaseOperator):
LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
# Retrieve last committed revision id
committed_revision_id = self.xcom_puller.get_design_version()
# Form DeckHand Design Reference Path
# This URL will be used to retrieve the Site Design YAMLs
deckhand_path = "deckhand+" + deckhand_svc_endpoint
self.deckhand_design_ref = os.path.join(deckhand_path,
"revisions",
str(committed_revision_id),
str(self.revision_id),
"rendered-documents")
if self.deckhand_design_ref:
LOG.info("Design YAMLs will be retrieved from %s",

View File

@ -86,11 +86,6 @@ class UcpBaseOperator(BaseOperator):
# Exeute child function
self.do_execute()
# Push last committed version to xcom for the
# 'get_design_version' subdag
if self.sub_dag_name == 'get_design_version':
return self.committed_ver
def ucp_base(self, context):
LOG.info("Running UCP Base Operator...")
@ -109,6 +104,7 @@ class UcpBaseOperator(BaseOperator):
self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance)
self.action_info = self.xcom_puller.get_action_info()
self.dc = self.xcom_puller.get_deployment_configuration()
self.revision_id = self.action_info['committed_rev_id']
def get_k8s_logs(self):
"""Retrieve Kubernetes pod/container logs specified by an opererator

View File

@ -73,12 +73,3 @@ class XcomPuller(object):
return self._get_xcom(source_task=source_task,
dag_id=source_dag,
key=key)
def get_design_version(self):
"""Retrieve the design version being used for this workflow"""
source_task = 'deckhand_get_design_version'
source_dag = 'get_design_version'
key = None
return self._get_xcom(source_task=source_task,
dag_id=source_dag,
key=key)

View File

@ -11,18 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import json
import os
from datetime import datetime
import mock
from falcon import testing
from mock import patch
from oslo_config import cfg
import pytest
import falcon
from falcon import testing
import json
import logging
import mock
import os
import pytest
import responses
from shipyard_airflow.control.action import actions_api
@ -35,6 +33,7 @@ DATE_ONE = datetime(2017, 9, 13, 11, 13, 3, 57000)
DATE_TWO = datetime(2017, 9, 13, 11, 13, 5, 57000)
DATE_ONE_STR = DATE_ONE.strftime('%Y-%m-%dT%H:%M:%S')
DATE_TWO_STR = DATE_TWO.strftime('%Y-%m-%dT%H:%M:%S')
DESIGN_VERSION = 1
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -299,6 +298,7 @@ def test_create_action():
action_resource.invoke_airflow_dag = airflow_stub
action_resource.insert_action = insert_action_stub
action_resource.audit_control_command_db = audit_control_command_db
action_resource.get_committed_design_version = lambda: DESIGN_VERSION
# with invalid input. fail.
try:
@ -326,6 +326,7 @@ def test_create_action():
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
assert action['committed_rev_id'] == 1
except ApiError:
assert False, 'Should not raise an ApiError'
@ -338,6 +339,7 @@ def test_create_action():
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
assert action['committed_rev_id'] == 1
except ApiError:
assert False, 'Should not raise an ApiError'

View File

@ -18,6 +18,25 @@ import yaml
import airflow
from airflow.exceptions import AirflowException
ACTION_INFO = {
'committed_rev_id': 2,
'dag_id': 'deploy_site',
'id': '01CBGWY1GXQVXVCXRJKM9V71AT',
'name': 'deploy_site',
'parameters': {},
'timestamp': '2018-04-20 06:47:43.905047',
'user': 'shipyard'}
ACTION_INFO_NO_COMMIT = {
'committed_rev_id': None,
'dag_id': 'deploy_site',
'id': '01CBGWY1GXQVXVCXRJKM9V71AT',
'name': 'deploy_site',
'parameters': {},
'timestamp': '2018-04-20 06:47:43.905047',
'user': 'shipyard'}
try:
from deployment_configuration_operator import (
DeploymentConfigurationOperator
@ -62,7 +81,7 @@ def test_execute_no_client(p1):
@mock.patch.object(airflow.models.TaskInstance, 'xcom_pull',
return_value=99)
return_value=ACTION_INFO)
def test_get_revision_id(ti):
"""Test that get revision id follows desired exits"""
dco = DeploymentConfigurationOperator(main_dag_name="main",
@ -71,11 +90,11 @@ def test_get_revision_id(ti):
ti = airflow.models.TaskInstance(task=mock.MagicMock(),
execution_date="no")
rid = dco.get_revision_id(ti)
assert rid == 99
assert rid == 2
@mock.patch.object(airflow.models.TaskInstance, 'xcom_pull',
return_value=None)
return_value=ACTION_INFO_NO_COMMIT)
def test_get_revision_id_none(ti):
"""Test that get revision id follows desired exits"""
dco = DeploymentConfigurationOperator(main_dag_name="main",