Refactor imports to support loading dags for tests

Updates the imports for the dags and operators to support both "as
deployed" and "as tested" package configurations. This allows for a
simple test to be added that at least imorts and checks the dags to
ensure they contain steps.

A future refactor may eliminate the need for some/much of this by moving the
operators away from the plugin appraoch such that they can be statically
built into the airflow pod and used like a third party library instead
of being appended to the airflow plugins. For now though, this maintains
the status quo for the way these are used in a deployed way.

Change-Id: I437ff9c583358188e27de0e2f6987c38ca85ab2f
This commit is contained in:
Bryan Strassner 2018-07-25 09:19:18 -05:00
parent bd2a686dbf
commit 038f958501
35 changed files with 291 additions and 127 deletions

View File

@ -13,11 +13,20 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators import ArmadaGetReleasesOperator
from airflow.operators import ArmadaGetStatusOperator
from airflow.operators import ArmadaPostApplyOperator
from config_path import config_path
try:
from airflow.operators import ArmadaGetReleasesOperator
from airflow.operators import ArmadaGetStatusOperator
from airflow.operators import ArmadaPostApplyOperator
from config_path import config_path
except ImportError:
from shipyard_airflow.plugins.armada_get_releases import \
ArmadaGetReleasesOperator
from shipyard_airflow.plugins.armada_get_status import \
ArmadaGetStatusOperator
from shipyard_airflow.plugins.armada_post_apply import \
ArmadaPostApplyOperator
from shipyard_airflow.dags.config_path import config_path
def deploy_site_armada(parent_dag_name, child_dag_name, args):

View File

@ -11,24 +11,49 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandRetrieveRenderedDocOperator
from airflow.operators import DeploymentConfigurationOperator
from airflow.operators import DeckhandCreateSiteActionTagOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from armada_deploy_site import deploy_site_armada
from config_path import config_path
from destroy_node import destroy_server
from drydock_deploy_site import deploy_site_drydock
from failure_handlers import step_failure_handler
from preflight_checks import all_preflight_checks
from validate_site_design import validate_site_design
import dag_names as dn
try:
# Operators are loaded from being registered to airflow.operators
# in a deployed fashion
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandRetrieveRenderedDocOperator
from airflow.operators import DeploymentConfigurationOperator
from airflow.operators import DeckhandCreateSiteActionTagOperator
except ImportError:
# for local testing, they are loaded from their source directory
from shipyard_airflow.plugins.concurrency_check_operator import \
ConcurrencyCheckOperator
from shipyard_airflow.plugins.deckhand_retrieve_rendered_doc import \
DeckhandRetrieveRenderedDocOperator
from shipyard_airflow.plugins.deployment_configuration_operator import \
DeploymentConfigurationOperator
from shipyard_airflow.plugins.deckhand_create_site_action_tag import \
DeckhandCreateSiteActionTagOperator
try:
# modules reside in a flat directory when deployed with dags
from armada_deploy_site import deploy_site_armada
from config_path import config_path
from destroy_node import destroy_server
from drydock_deploy_site import deploy_site_drydock
from failure_handlers import step_failure_handler
from preflight_checks import all_preflight_checks
from validate_site_design import validate_site_design
import dag_names as dn
except ImportError:
# for testing, specify the qualified source directory
from shipyard_airflow.dags.armada_deploy_site import deploy_site_armada
from shipyard_airflow.dags.config_path import config_path
from shipyard_airflow.dags.destroy_node import destroy_server
from shipyard_airflow.dags.drydock_deploy_site import deploy_site_drydock
from shipyard_airflow.dags.failure_handlers import step_failure_handler
from shipyard_airflow.dags.preflight_checks import all_preflight_checks
from shipyard_airflow.dags.validate_site_design import validate_site_design
import shipyard_airflow.dags.dag_names as dn
class CommonStepFactory(object):

View File

@ -16,7 +16,11 @@ from datetime import timedelta
import airflow
from airflow import DAG
from common_step_factory import CommonStepFactory
try:
from common_step_factory import CommonStepFactory
except ImportError:
from shipyard_airflow.dags.common_step_factory import CommonStepFactory
"""deploy_site
the top-level orchestration DAG for deploying a site using the Undercloud

View File

@ -13,14 +13,29 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators import DrydockDestroyNodeOperator
from airflow.operators import PromenadeCheckEtcdOperator
from airflow.operators import PromenadeClearLabelsOperator
from airflow.operators import PromenadeDecommissionNodeOperator
from airflow.operators import PromenadeDrainNodeOperator
from airflow.operators import PromenadeShutdownKubeletOperator
from config_path import config_path
try:
from airflow.operators import DrydockDestroyNodeOperator
from airflow.operators import PromenadeCheckEtcdOperator
from airflow.operators import PromenadeClearLabelsOperator
from airflow.operators import PromenadeDecommissionNodeOperator
from airflow.operators import PromenadeDrainNodeOperator
from airflow.operators import PromenadeShutdownKubeletOperator
from config_path import config_path
except ImportError:
from shipyard_airflow.plugins.drydock_destroy_nodes import \
DrydockDestroyNodeOperator
from shipyard_airflow.plugins.promenade_check_etcd import \
PromenadeCheckEtcdOperator
from shipyard_airflow.plugins.promenade_clear_labels import \
PromenadeClearLabelsOperator
from shipyard_airflow.plugins.promenade_decommission_node import \
PromenadeDecommissionNodeOperator
from shipyard_airflow.plugins.promenade_drain_node import \
PromenadeDrainNodeOperator
from shipyard_airflow.plugins.promenade_shutdown_kubelet import \
PromenadeShutdownKubeletOperator
from shipyard_airflow.dags.config_path import config_path
def destroy_server(parent_dag_name, child_dag_name, args):

View File

@ -13,11 +13,20 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators import DrydockNodesOperator
from airflow.operators import DrydockPrepareSiteOperator
from airflow.operators import DrydockVerifySiteOperator
from config_path import config_path
try:
from airflow.operators import DrydockNodesOperator
from airflow.operators import DrydockPrepareSiteOperator
from airflow.operators import DrydockVerifySiteOperator
from config_path import config_path
except ImportError:
from shipyard_airflow.plugins.drydock_nodes import \
DrydockNodesOperator
from shipyard_airflow.plugins.drydock_prepare_site import \
DrydockPrepareSiteOperator
from shipyard_airflow.plugins.drydock_verify_site import \
DrydockVerifySiteOperator
from shipyard_airflow.dags.config_path import config_path
def deploy_site_drydock(parent_dag_name, child_dag_name, args):

View File

@ -13,9 +13,14 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators import UcpHealthCheckOperator
from config_path import config_path
try:
from airflow.operators import UcpHealthCheckOperator
from config_path import config_path
except ImportError:
from shipyard_airflow.plugins.ucp_preflight_check_operator import \
UcpHealthCheckOperator
from shipyard_airflow.dags.config_path import config_path
def all_preflight_checks(parent_dag_name, child_dag_name, args):

View File

@ -16,7 +16,11 @@ from datetime import timedelta
import airflow
from airflow import DAG
from common_step_factory import CommonStepFactory
try:
from common_step_factory import CommonStepFactory
except ImportError:
from shipyard_airflow.dags.common_step_factory import CommonStepFactory
"""redeploy_server
The top-level orchestration DAG for redeploying a server using the Undercloud

View File

@ -16,7 +16,10 @@ from datetime import timedelta
import airflow
from airflow import DAG
from common_step_factory import CommonStepFactory
try:
from common_step_factory import CommonStepFactory
except ImportError:
from shipyard_airflow.dags.common_step_factory import CommonStepFactory
"""update_site

View File

@ -16,8 +16,12 @@ from datetime import timedelta
import airflow
from airflow import DAG
from common_step_factory import CommonStepFactory
from validate_site_design import SOFTWARE
try:
from common_step_factory import CommonStepFactory
from validate_site_design import SOFTWARE
except ImportError:
from shipyard_airflow.dags.common_step_factory import CommonStepFactory
from shipyard_airflow.dags.validate_site_design import SOFTWARE
"""update_software

View File

@ -13,13 +13,23 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators import ArmadaValidateDesignOperator
from airflow.operators import DeckhandValidateSiteDesignOperator
from airflow.operators import DrydockValidateDesignOperator
from airflow.operators import PromenadeValidateSiteDesignOperator
from config_path import config_path
try:
from airflow.operators import ArmadaValidateDesignOperator
from airflow.operators import DeckhandValidateSiteDesignOperator
from airflow.operators import DrydockValidateDesignOperator
from airflow.operators import PromenadeValidateSiteDesignOperator
from config_path import config_path
except ImportError:
from shipyard_airflow.plugins.armada_validate_design import \
ArmadaValidateDesignOperator
from shipyard_airflow.plugins.deckhand_validate_site import \
DeckhandValidateSiteDesignOperator
from shipyard_airflow.plugins.drydock_validate_design import \
DrydockValidateDesignOperator
from shipyard_airflow.plugins.promenade_validate_site_design import \
PromenadeValidateSiteDesignOperator
from shipyard_airflow.dags.config_path import config_path
BAREMETAL = 'baremetal'
SOFTWARE = 'software'

View File

@ -20,11 +20,20 @@ from airflow.utils.decorators import apply_defaults
import armada.common.client as client
import armada.common.session as session
from get_k8s_pod_port_ip import get_pod_port_ip
from service_token import shipyard_service_token
from ucp_base_operator import UcpBaseOperator
import service_endpoint
from xcom_pusher import XcomPusher
try:
from get_k8s_pod_port_ip import get_pod_port_ip
import service_endpoint
from service_token import shipyard_service_token
from ucp_base_operator import UcpBaseOperator
from xcom_pusher import XcomPusher
except ImportError:
from shipyard_airflow.plugins.get_k8s_pod_port_ip import get_pod_port_ip
from shipyard_airflow.plugins import service_endpoint
from shipyard_airflow.plugins.service_token import shipyard_service_token
from shipyard_airflow.plugins.ucp_base_operator import UcpBaseOperator
from shipyard_airflow.plugins.xcom_pusher import XcomPusher
LOG = logging.getLogger(__name__)

View File

@ -16,7 +16,11 @@ import logging
from airflow.exceptions import AirflowException
from airflow.plugins_manager import AirflowPlugin
from armada_base_operator import ArmadaBaseOperator
try:
from armada_base_operator import ArmadaBaseOperator
except ImportError:
from shipyard_airflow.plugins.armada_base_operator import \
ArmadaBaseOperator
from armada.exceptions import api_exceptions as errors
LOG = logging.getLogger(__name__)

View File

@ -16,7 +16,11 @@ import logging
from airflow.exceptions import AirflowException
from airflow.plugins_manager import AirflowPlugin
from armada_base_operator import ArmadaBaseOperator
try:
from armada_base_operator import ArmadaBaseOperator
except ImportError:
from shipyard_airflow.plugins.armada_base_operator import \
ArmadaBaseOperator
from armada.exceptions import api_exceptions as errors
LOG = logging.getLogger(__name__)

View File

@ -16,7 +16,11 @@ import logging
from airflow.exceptions import AirflowException
from airflow.plugins_manager import AirflowPlugin
from armada_base_operator import ArmadaBaseOperator
try:
from armada_base_operator import ArmadaBaseOperator
except ImportError:
from shipyard_airflow.plugins.armada_base_operator import \
ArmadaBaseOperator
from armada.exceptions import api_exceptions as errors
LOG = logging.getLogger(__name__)

View File

@ -16,7 +16,12 @@ import logging
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from armada_base_operator import ArmadaBaseOperator
try:
from armada_base_operator import ArmadaBaseOperator
except ImportError:
from shipyard_airflow.plugins.armada_base_operator import \
ArmadaBaseOperator
from armada.exceptions import api_exceptions as errors
LOG = logging.getLogger(__name__)

View File

@ -19,9 +19,15 @@ from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from deckhand.client import client as deckhand_client
import service_endpoint
from service_token import shipyard_service_token
from ucp_base_operator import UcpBaseOperator
try:
import service_endpoint
from service_token import shipyard_service_token
from ucp_base_operator import UcpBaseOperator
except ImportError:
from shipyard_airflow.plugins import service_endpoint
from shipyard_airflow.plugins.service_token import shipyard_service_token
from shipyard_airflow.plugins.ucp_base_operator import UcpBaseOperator
LOG = logging.getLogger(__name__)

View File

@ -20,7 +20,11 @@ import subprocess # nosec
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from deckhand_base_operator import DeckhandBaseOperator
try:
from deckhand_base_operator import DeckhandBaseOperator
except ImportError:
from shipyard_airflow.plugins.deckhand_base_operator import \
DeckhandBaseOperator
FAILED_STATUSES = ('failed', 'upstream_failed')
LOG = logging.getLogger(__name__)

View File

@ -16,7 +16,12 @@ import logging
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from deckhand_base_operator import DeckhandBaseOperator
try:
from deckhand_base_operator import DeckhandBaseOperator
except ImportError:
from shipyard_airflow.plugins.deckhand_base_operator import \
DeckhandBaseOperator
LOG = logging.getLogger(__name__)

View File

@ -19,7 +19,11 @@ import yaml
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from deckhand_base_operator import DeckhandBaseOperator
try:
from deckhand_base_operator import DeckhandBaseOperator
except ImportError:
from shipyard_airflow.plugins.deckhand_base_operator import \
DeckhandBaseOperator
LOG = logging.getLogger(__name__)

View File

@ -25,20 +25,6 @@ import drydock_provisioner.drydock_client.client as client
import drydock_provisioner.drydock_client.session as session
from drydock_provisioner import error as errors
try:
import service_endpoint
except ImportError:
from shipyard_airflow.plugins import service_endpoint
try:
from service_token import shipyard_service_token
except ImportError:
from shipyard_airflow.plugins.service_token import shipyard_service_token
try:
from ucp_base_operator import UcpBaseOperator
except ImportError:
from shipyard_airflow.plugins.ucp_base_operator import UcpBaseOperator
try:
from drydock_errors import (
DrydockClientUseFailureException,
@ -46,6 +32,10 @@ try:
DrydockTaskNotCreatedException,
DrydockTaskTimeoutException
)
import service_endpoint
from service_token import shipyard_service_token
from ucp_base_operator import UcpBaseOperator
except ImportError:
from shipyard_airflow.plugins.drydock_errors import (
DrydockClientUseFailureException,
@ -53,6 +43,9 @@ except ImportError:
DrydockTaskNotCreatedException,
DrydockTaskTimeoutException
)
from shipyard_airflow.plugins import service_endpoint
from shipyard_airflow.plugins.service_token import shipyard_service_token
from shipyard_airflow.plugins.ucp_base_operator import UcpBaseOperator
LOG = logging.getLogger(__name__)

View File

@ -16,7 +16,11 @@ import time
from airflow.plugins_manager import AirflowPlugin
from drydock_base_operator import DrydockBaseOperator
try:
from drydock_base_operator import DrydockBaseOperator
except ImportError:
from shipyard_airflow.plugins.drydock_base_operator import \
DrydockBaseOperator
LOG = logging.getLogger(__name__)

View File

@ -35,21 +35,15 @@ from shipyard_airflow.common.deployment_group.node_lookup import NodeLookup
try:
import check_k8s_node_status
except ImportError:
from shipyard_airflow.plugins import check_k8s_node_status
try:
from drydock_base_operator import DrydockBaseOperator
except ImportError:
from shipyard_airflow.plugins.drydock_base_operator import \
DrydockBaseOperator
try:
from drydock_errors import (
DrydockTaskFailedException,
DrydockTaskTimeoutException
)
except ImportError:
from shipyard_airflow.plugins import check_k8s_node_status
from shipyard_airflow.plugins.drydock_base_operator import \
DrydockBaseOperator
from shipyard_airflow.plugins.drydock_errors import (
DrydockTaskFailedException,
DrydockTaskTimeoutException

View File

@ -13,7 +13,11 @@
# limitations under the License.
from airflow.plugins_manager import AirflowPlugin
from drydock_base_operator import DrydockBaseOperator
try:
from drydock_base_operator import DrydockBaseOperator
except ImportError:
from shipyard_airflow.plugins.drydock_base_operator import \
DrydockBaseOperator
class DrydockPrepareSiteOperator(DrydockBaseOperator):

View File

@ -19,7 +19,11 @@ import requests
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from drydock_base_operator import DrydockBaseOperator
try:
from drydock_base_operator import DrydockBaseOperator
except ImportError:
from shipyard_airflow.plugins.drydock_base_operator import \
DrydockBaseOperator
LOG = logging.getLogger(__name__)

View File

@ -13,7 +13,11 @@
# limitations under the License.
from airflow.plugins_manager import AirflowPlugin
from drydock_base_operator import DrydockBaseOperator
try:
from drydock_base_operator import DrydockBaseOperator
except ImportError:
from shipyard_airflow.plugins.drydock_base_operator import \
DrydockBaseOperator
class DrydockVerifySiteOperator(DrydockBaseOperator):

View File

@ -19,17 +19,11 @@ from airflow.exceptions import AirflowException
try:
import service_endpoint
except ImportError:
from shipyard_airflow.plugins import service_endpoint
try:
from service_token import shipyard_service_token
except ImportError:
from shipyard_airflow.plugins.service_token import shipyard_service_token
try:
from ucp_base_operator import UcpBaseOperator
except ImportError:
from shipyard_airflow.plugins import service_endpoint
from shipyard_airflow.plugins.service_token import shipyard_service_token
from shipyard_airflow.plugins.ucp_base_operator import UcpBaseOperator
LOG = logging.getLogger(__name__)

View File

@ -17,7 +17,11 @@ import time
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from promenade_base_operator import PromenadeBaseOperator
try:
from promenade_base_operator import PromenadeBaseOperator
except ImportError:
from shipyard_airflow.plugins.promenade_base_operator import \
PromenadeBaseOperator
LOG = logging.getLogger(__name__)

View File

@ -17,7 +17,11 @@ import time
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from promenade_base_operator import PromenadeBaseOperator
try:
from promenade_base_operator import PromenadeBaseOperator
except ImportError:
from shipyard_airflow.plugins.promenade_base_operator import \
PromenadeBaseOperator
LOG = logging.getLogger(__name__)

View File

@ -17,7 +17,11 @@ import time
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from promenade_base_operator import PromenadeBaseOperator
try:
from promenade_base_operator import PromenadeBaseOperator
except ImportError:
from shipyard_airflow.plugins.promenade_base_operator import \
PromenadeBaseOperator
LOG = logging.getLogger(__name__)

View File

@ -17,7 +17,11 @@ import time
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from promenade_base_operator import PromenadeBaseOperator
try:
from promenade_base_operator import PromenadeBaseOperator
except ImportError:
from shipyard_airflow.plugins.promenade_base_operator import \
PromenadeBaseOperator
LOG = logging.getLogger(__name__)

View File

@ -17,7 +17,11 @@ import time
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from promenade_base_operator import PromenadeBaseOperator
try:
from promenade_base_operator import PromenadeBaseOperator
except ImportError:
from shipyard_airflow.plugins.promenade_base_operator import \
PromenadeBaseOperator
LOG = logging.getLogger(__name__)

View File

@ -22,9 +22,8 @@ from airflow.plugins_manager import AirflowPlugin
try:
from promenade_base_operator import PromenadeBaseOperator
except ImportError:
from shipyard_airflow.plugins.promenade_base_operator import (
from shipyard_airflow.plugins.promenade_base_operator import \
PromenadeBaseOperator
)
LOG = logging.getLogger(__name__)

View File

@ -23,35 +23,22 @@ from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
try:
from deckhand_client_factory import DeckhandClientFactory
import service_endpoint
except ImportError:
from shipyard_airflow.plugins import service_endpoint
try:
from get_k8s_logs import get_pod_logs
except ImportError:
from shipyard_airflow.plugins.get_k8s_logs import get_pod_logs
try:
from get_k8s_logs import K8sLoggingException
except ImportError:
from shipyard_airflow.plugins.get_k8s_logs import K8sLoggingException
try:
from xcom_puller import XcomPuller
except ImportError:
from shipyard_airflow.plugins.deckhand_client_factory import \
DeckhandClientFactory
from shipyard_airflow.plugins import service_endpoint
from shipyard_airflow.plugins.get_k8s_logs import get_pod_logs
from shipyard_airflow.plugins.get_k8s_logs import K8sLoggingException
from shipyard_airflow.plugins.xcom_puller import XcomPuller
from shipyard_airflow.common.document_validators.document_validation_utils \
import DocumentValidationUtils
try:
from deckhand_client_factory import DeckhandClientFactory
except ImportError:
from shipyard_airflow.plugins.deckhand_client_factory import (
DeckhandClientFactory
)
LOG = logging.getLogger(__name__)
@ -102,10 +89,11 @@ class UcpBaseOperator(BaseOperator):
self.shipyard_conf = shipyard_conf
self.start_time = datetime.now()
self.xcom_push_flag = xcom_push
self.doc_utils = _get_document_util(self.shipyard_conf)
self.endpoints = service_endpoint.ServiceEndpoints(self.shipyard_conf)
def execute(self, context):
# Setup values that depend on the shipyard configuration
self.doc_utils = _get_document_util(self.shipyard_conf)
self.endpoints = service_endpoint.ServiceEndpoints(self.shipyard_conf)
# Execute UCP base function
self.ucp_base(context)

View File

@ -22,17 +22,11 @@ from airflow.utils.decorators import apply_defaults
try:
import service_endpoint
except ImportError:
from shipyard_airflow.plugins import service_endpoint
try:
from xcom_puller import XcomPuller
except ImportError:
from shipyard_airflow.plugins.xcom_puller import XcomPuller
try:
from xcom_pusher import XcomPusher
except ImportError:
from shipyard_airflow.plugins import service_endpoint
from shipyard_airflow.plugins.xcom_puller import XcomPuller
from shipyard_airflow.plugins.xcom_pusher import XcomPusher
LOG = logging.getLogger(__name__)

View File

@ -0,0 +1,33 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow import DAG
from shipyard_airflow.dags import deploy_site
from shipyard_airflow.dags import update_site
from shipyard_airflow.dags import update_software
from shipyard_airflow.dags import redeploy_server
def test_dags_load():
"""assert that each dag is a DAG object after importing the modules
As these are the top level dags, the subdags and many operators
will be loaded as a result. This ensures that basic construction
is working for most of the workflow logic, however it tests nearly
none of the decision making.
"""
for d in [deploy_site.dag, update_site.dag,
update_software.dag, redeploy_server.dag]:
assert isinstance(d, DAG)
assert d.task_count > 0