Add support for tosca definition version 1.2

* Added support for tosca definition version 1.2. for validating both
'CSAR with the TOSCA-Metadata directory' and 'CSAR without the
TOSCA-Metadata directory'.
* Added support to validate external references in imports files.

Note: https://review.opendev.org/#/c/673386/ should also be merge
before this patch because in case of capabilities without properties
validation of csar file will break.

Specs: https://review.opendev.org/#/c/582930/

Change-Id: If8155399df12a96cb86631dfa22eaca7a5a8d398
This commit is contained in:
niraj singh 2019-05-15 09:01:27 +00:00
parent 2e51d21f7b
commit 8a18ca8c73
17 changed files with 449 additions and 122 deletions

View File

@ -30,7 +30,8 @@ class TypeValidation(object):
'relationship_types', 'capability_types',
'interface_types', 'policy_types', 'topology_template',
'metadata')
VALID_TEMPLATE_VERSIONS = ['tosca_simple_yaml_1_0']
VALID_TEMPLATE_VERSIONS = ['tosca_simple_yaml_1_0',
'tosca_simple_yaml_1_2']
exttools = ExtTools()
VALID_TEMPLATE_VERSIONS.extend(exttools.get_versions())

View File

@ -38,6 +38,7 @@ class ImportsLoader(object):
self.importslist = importslist
self.custom_defs = {}
self.nested_tosca_tpls = []
self.nested_imports = {}
if not path and not tpl:
msg = _('Input tosca template is not provided.')
log.warning(msg)
@ -60,6 +61,9 @@ class ImportsLoader(object):
def get_nested_tosca_tpls(self):
return self.nested_tosca_tpls
def get_nested_imports(self):
return self.nested_imports
def _validate_and_load_imports(self):
imports_names = set()
@ -98,6 +102,9 @@ class ImportsLoader(object):
custom_type, import_def)
self._update_custom_def(custom_type, None)
if custom_type and 'imports' in custom_type.keys():
self.nested_imports.update(
{full_file_name: custom_type['imports']})
self._update_nested_tosca_tpls(full_file_name, custom_type)
def _update_custom_def(self, custom_type, namespace_prefix):

View File

@ -24,12 +24,17 @@ from toscaparser.common.exception import ValidationError
from toscaparser.imports import ImportsLoader
from toscaparser.utils.gettextutils import _
from toscaparser.utils.urlutils import UrlUtils
from toscaparser.utils import yamlparser
try: # Python 2.x
from BytesIO import BytesIO
except ImportError: # Python 3.x
from io import BytesIO
TOSCA_META = 'TOSCA-Metadata/TOSCA.meta'
YAML_LOADER = yamlparser.load_yaml
class CSAR(object):
@ -40,6 +45,7 @@ class CSAR(object):
self.error_caught = False
self.csar = None
self.temp_dir = None
self.is_tosca_metadata = False
def validate(self):
"""Validate the provided CSAR file."""
@ -74,54 +80,19 @@ class CSAR(object):
# validate that it contains the metadata file in the correct location
self.zfile = zipfile.ZipFile(self.csar, 'r')
filelist = self.zfile.namelist()
if 'TOSCA-Metadata/TOSCA.meta' not in filelist:
err_msg = (_('"%s" is not a valid CSAR as it does not contain the '
'required file "TOSCA.meta" in the folder '
'"TOSCA-Metadata".') % self.path)
ExceptionCollector.appendException(
ValidationError(message=err_msg))
return False
if TOSCA_META in filelist:
self.is_tosca_metadata = True
# validate that 'Entry-Definitions' property exists in TOSCA.meta
is_validated = self._validate_tosca_meta(filelist)
else:
self.is_tosca_metadata = False
is_validated = self._validate_root_level_yaml(filelist)
# validate that 'Entry-Definitions' property exists in TOSCA.meta
data = self.zfile.read('TOSCA-Metadata/TOSCA.meta')
invalid_yaml_err_msg = (_('The file "TOSCA-Metadata/TOSCA.meta" in '
'the CSAR "%s" does not contain valid YAML '
'content.') % self.path)
try:
meta = yaml.load(data)
if type(meta) is dict:
self.metadata = meta
else:
ExceptionCollector.appendException(
ValidationError(message=invalid_yaml_err_msg))
return False
except yaml.YAMLError:
ExceptionCollector.appendException(
ValidationError(message=invalid_yaml_err_msg))
return False
if 'Entry-Definitions' not in self.metadata:
err_msg = (_('The CSAR "%s" is missing the required metadata '
'"Entry-Definitions" in '
'"TOSCA-Metadata/TOSCA.meta".')
% self.path)
ExceptionCollector.appendException(
ValidationError(message=err_msg))
return False
# validate that 'Entry-Definitions' metadata value points to an
# existing file in the CSAR
entry = self.metadata.get('Entry-Definitions')
if entry and entry not in filelist:
err_msg = (_('The "Entry-Definitions" file defined in the '
'CSAR "%s" does not exist.') % self.path)
ExceptionCollector.appendException(
ValidationError(message=err_msg))
return False
# validate that external references in the main template actually
# exist and are accessible
self._validate_external_references()
if is_validated:
# validate that external references and imports in the main
# template actually exist and are accessible
main_tpl = self._read_template_yaml(self.main_template_file_name)
self._validate_external_references(main_tpl)
return not self.error_caught
def get_metadata(self):
@ -140,15 +111,25 @@ class CSAR(object):
return self.metadata.get(key)
def get_author(self):
return self._get_metadata('Created-By')
if self.is_tosca_metadata:
return self._get_metadata('Created-By')
else:
# In case CSAR zip doesn't contain TOSCA.Metadata directory,
# Created-By is defined by the template_author metadata
return self._get_metadata('template_author')
def get_version(self):
return self._get_metadata('CSAR-Version')
if self.is_tosca_metadata:
return self._get_metadata('CSAR-Version')
else:
# In case CSAR zip doesn't contain TOSCA.Metadata directory,
# CSAR-Version is defined by the template_version metadata
return self._get_metadata('template_version')
def get_main_template(self):
entry_def = self._get_metadata('Entry-Definitions')
if entry_def in self.zfile.namelist():
return entry_def
if not self.is_validated:
self.validate()
return self.main_template_file_name
def get_main_template_yaml(self):
main_template = self.get_main_template()
@ -184,7 +165,29 @@ class CSAR(object):
with zipfile.ZipFile(self.csar, "r") as zf:
zf.extractall(self.temp_dir)
def _validate_external_references(self):
def _validate_external_artifact_imports(self, main_tpl, tpl_filename):
"""validate the imports and artifacts"""
self._validate_template(main_tpl, tpl_filename)
if main_tpl:
if 'imports' in main_tpl:
custom_service = ImportsLoader(
main_tpl['imports'],
os.path.join(self.temp_dir, tpl_filename))
# Get list of nested templates
nested_tosca_tpls = custom_service.get_nested_tosca_tpls()
# Validate external references of each nested template.
if nested_tosca_tpls:
for tosca_tpl in nested_tosca_tpls:
for filename, tpl in tosca_tpl.items():
self._validate_external_artifact_imports(
tpl,
filename)
def _validate_external_references(self, main_tpl):
"""Extracts files referenced in the main template
These references are currently supported:
@ -194,62 +197,59 @@ class CSAR(object):
"""
try:
self.decompress()
main_tpl_file = self.get_main_template()
if not main_tpl_file:
return
main_tpl = self.get_main_template_yaml()
if 'imports' in main_tpl:
ImportsLoader(main_tpl['imports'],
os.path.join(self.temp_dir, main_tpl_file))
if 'topology_template' in main_tpl:
topology_template = main_tpl['topology_template']
if 'node_templates' in topology_template:
node_templates = topology_template['node_templates']
for node_template_key in node_templates:
node_template = node_templates[node_template_key]
if 'artifacts' in node_template:
artifacts = node_template['artifacts']
for artifact_key in artifacts:
artifact = artifacts[artifact_key]
if isinstance(artifact, six.string_types):
self._validate_external_reference(
main_tpl_file,
artifact)
elif isinstance(artifact, dict):
if 'file' in artifact:
self._validate_external_reference(
main_tpl_file,
artifact['file'])
else:
ExceptionCollector.appendException(
ValueError(_('Unexpected artifact '
'definition for "%s".')
% artifact_key))
self.error_caught = True
if 'interfaces' in node_template:
interfaces = node_template['interfaces']
for interface_key in interfaces:
interface = interfaces[interface_key]
for opertation_key in interface:
operation = interface[opertation_key]
if isinstance(operation, six.string_types):
self._validate_external_reference(
main_tpl_file,
operation,
False)
elif isinstance(operation, dict):
if 'implementation' in operation:
self._validate_external_reference(
main_tpl_file,
operation['implementation'])
self._validate_external_artifact_imports(
main_tpl,
self.main_template_file_name)
finally:
if self.temp_dir:
shutil.rmtree(self.temp_dir)
def _validate_template(self, template_data, template):
if 'topology_template' in template_data:
topology_template = template_data['topology_template']
if 'node_templates' in topology_template:
node_templates = topology_template['node_templates']
for node_template_key in node_templates:
node_template = node_templates[node_template_key]
if 'artifacts' in node_template:
artifacts = node_template['artifacts']
for artifact_key in artifacts:
artifact = artifacts[artifact_key]
if isinstance(artifact, six.string_types):
self._validate_external_reference(
template,
artifact)
elif isinstance(artifact, dict):
if 'file' in artifact:
self._validate_external_reference(
template,
artifact['file'])
else:
ExceptionCollector.appendException(
ValueError(_('Unexpected artifact '
'definition for "%s".')
% artifact_key))
self.error_caught = True
if 'interfaces' in node_template:
interfaces = node_template['interfaces']
for interface_key in interfaces:
interface = interfaces[interface_key]
for opertation_key in interface:
operation = interface[opertation_key]
if isinstance(operation, six.string_types):
self._validate_external_reference(
template,
operation,
False)
elif isinstance(operation, dict):
if 'implementation' in operation:
self._validate_external_reference(
template,
operation['implementation'])
def _validate_external_reference(self, tpl_file, resource_file,
raise_exc=True):
"""Verify that the external resource exists
@ -284,3 +284,86 @@ class CSAR(object):
ValueError(_('The resource "%s" does not exist.')
% resource_file))
self.error_caught = True
def _read_template_yaml(self, template):
data = self.zfile.read(template)
invalid_tosca_yaml_err_msg = (
_('The file "%(template)s" in the CSAR "%(csar)s" does not '
'contain valid YAML content.') %
{'template': template, 'csar': self.path})
try:
tosca_yaml = yaml.load(data)
if type(tosca_yaml) is not dict:
ExceptionCollector.appendException(
ValidationError(message=invalid_tosca_yaml_err_msg))
return None
return tosca_yaml
except Exception:
ExceptionCollector.appendException(
ValidationError(message=invalid_tosca_yaml_err_msg))
return None
def _validate_tosca_meta(self, filelist):
tosca = self._read_template_yaml(TOSCA_META)
if tosca is None:
return False
self.metadata = tosca
if 'Entry-Definitions' not in self.metadata:
err_msg = (_('The CSAR "%s" is missing the required metadata '
'"Entry-Definitions" in '
'"TOSCA-Metadata/TOSCA.meta".')
% self.path)
ExceptionCollector.appendException(
ValidationError(message=err_msg))
self.error_caught = True
return False
# validate that 'Entry-Definitions' metadata value points to an
# existing file in the CSAR
entry = self.metadata.get('Entry-Definitions')
if entry and entry not in filelist:
err_msg = (_('The "Entry-Definitions" file defined in the '
'CSAR "%s" does not exist.') % self.path)
ExceptionCollector.appendException(
ValidationError(message=err_msg))
self.error_caught = True
return False
self.main_template_file_name = entry
return True
def _validate_root_level_yaml(self, filelist):
root_files = []
for file in filelist:
if '/' not in file:
__, file_extension = os.path.splitext(file)
if file_extension in ['.yaml', '.yml']:
root_files.append(file)
if not len(root_files) == 1:
err_msg = (_('CSAR file should contain only one root level yaml'
' file. Found "%d" yaml file(s).') % len(root_files))
ExceptionCollector.appendException(
ValidationError(message=err_msg))
self.error_caught = True
return False
template_data = self._read_template_yaml(root_files[0])
if template_data is None:
return False
tosca_version = template_data.get('tosca_definitions_version')
if tosca_version == 'tosca_simple_yaml_1_0':
err_msg = (_('"%s" is not a valid CSAR as it does not contain'
' the required file "TOSCA.meta" in the'
' folder "TOSCA-Metadata".') % self.path)
ExceptionCollector.appendException(
ValidationError(message=err_msg))
self.error_caught = True
return False
self.metadata = template_data.get('metadata')
self.main_template_file_name = root_files[0]
return True

Binary file not shown.

View File

@ -0,0 +1,30 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: Sample
metadata:
template_name: my_template
template_author: Niraj
template_version: 1.0
imports:
- Definitions/level1.yaml
topology_template:
inputs:
selected_flavour:
type: string
description: VNF deployment flavour selected by the consumer. It is provided in the API
node_templates:
VNF:
type: company.sample.VNF
properties:
flavour_id: { get_input: selected_flavour }
descriptor_id: b1bb0ce7-ebca-4fa7-95ed-4840d70a1177
provider: Company
product_name: Sample VNF
software_version: '1.0'
descriptor_version: '1.0'
vnfm_info:
- Tacker

View File

@ -0,0 +1,29 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: Sample
metadata:
template_name: my_template
template_author: Niraj
template_version: 1.0
topology_template:
node_templates:
dbBackend:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: dbBackend
description: dbBackend compute node
vdu_profile:
min_number_of_instances: 1
max_number_of_instances: 1
sw_image_data:
name: Software of dbBackend
version: '0.4.0'
checksum:
algorithm: sha-256
hash: b9c3036539fd7a5f87a1bf38eb05fdde8b556a1a7e664dbeda90ed3cd74b4f9d
container_format: bare
disk_format: qcow2
min_disk: 1 GB
size: 1 GB

View File

@ -0,0 +1,29 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: Sample
metadata:
template_name: my_template
template_author: Niraj
template_version: 1.0
topology_template:
node_templates:
dbBackend:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: dbBackend
description: dbBackend compute node
vdu_profile:
min_number_of_instances: 1
max_number_of_instances: 1
sw_image_data:
name: Software of dbBackend
version: '0.4.0'
checksum:
algorithm: sha-256
hash: b9c3036539fd7a5f87a1bf38eb05fdde8b556a1a7e664dbeda90ed3cd74b4f9d
container_format: bare
disk_format: qcow2
min_disk: 1 GB
size: 1 GB

View File

@ -0,0 +1,63 @@
tosca_definitions_version: tosca_simple_yaml_1_2
policy_types:
tosca.policies.nfv.InstantiationLevels:
derived_from: tosca.policies.Root
description: The InstantiationLevels
properties:
levels:
type: map
description: Describes the various levels of resources.
required: true
entry_schema:
type: tosca.datatypes.nfv.InstantiationLevel
constraints:
- min_length: 1
default_level:
type: string
description: The default instantiation level for this flavour.
required: false
tosca.policies.nfv.VduInstantiationLevels:
derived_from: tosca.policies.Root
description: The VduInstantiationLevels
properties:
levels:
type: map
description: Describes the Vdu.Compute
required: true
entry_schema:
type: tosca.datatypes.nfv.VduLevel
constraints:
- min_length: 1
targets: [ tosca.nodes.nfv.Vdu.Compute ]
node_types:
tosca.nodes.nfv.Vdu.Compute:
derived_from: tosca.nodes.Root
description: ''
properties:
name:
type: string
required: true
description:
type: string
required: false
monitoring_parameters:
type: list
description: ''
required: false
entry_schema:
type: tosca.datatypes.nfv.VnfcMonitoringParameter
sw_image_data:
type: tosca.datatypes.nfv.SwImageData
required: false
tosca.datatypes.nfv.SwImageData:
derived_from: tosca.datatypes.Root
description: describes information related to a software image artifact
properties: # in SOL001 v0.8.0: "properties or metadata:"
name:
type: string
description: Name of this software image
required: true

View File

@ -228,3 +228,72 @@ class CSARPrereqTest(TestCase):
self.assertTrue(csar.validate())
self.assertTrue(csar.temp_dir is None or
not os.path.exists(csar.temp_dir))
def test_csar_with_root_level_yaml(self):
path = os.path.join(self.base_path,
"data/CSAR/csar_root_level_yaml.zip")
csar = CSAR(path)
yaml_file = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"data/CSAR/root_level_file.yaml")
expected_yaml = toscaparser.utils.yamlparser.load_yaml(yaml_file)
self.assertEqual(expected_yaml, csar.get_main_template_yaml())
self.assertTrue(csar.temp_dir is None or
not os.path.exists(csar.temp_dir))
def test_csar_with_multiple_root_level_yaml_files_error(self):
path = os.path.join(self.base_path,
"data/CSAR/csar_two_root_level_yaml.zip")
csar = CSAR(path)
error = self.assertRaises(ValidationError, csar.validate)
self.assertEqual(_('CSAR file should contain only one root level'
' yaml file. Found "2" yaml file(s).'), str(error))
self.assertTrue(csar.temp_dir is None or
not os.path.exists(csar.temp_dir))
def test_csar_with_root_level_yaml_and_tosca_metadata(self):
path = os.path.join(self.base_path,
"data/CSAR/csar_root_level_"
"yaml_and_tosca_metadata.zip")
csar = CSAR(path)
yaml_file = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"data/CSAR/tosca_meta_file.yaml")
expected_yaml = toscaparser.utils.yamlparser.load_yaml(yaml_file)
self.assertEqual(expected_yaml, csar.get_main_template_yaml())
self.assertTrue(csar.temp_dir is None or
not os.path.exists(csar.temp_dir))
def test_csar_root_yaml_with_tosca_definition_1_0_error(self):
path = os.path.join(self.base_path, "data/CSAR/csar_root_yaml"
"_with_tosca_definition1_0.zip")
csar = CSAR(path)
error = self.assertRaises(ValidationError, csar.validate)
self.assertEqual(_('"%s" is not a valid CSAR as it does not contain'
' the required file "TOSCA.meta" in the folder '
'"TOSCA-Metadata".') % path, str(error))
self.assertTrue(csar.temp_dir is None or
not os.path.exists(csar.temp_dir))
def test_csar_with_multilevel_imports_valid(self):
path = os.path.join(
self.base_path,
"data/CSAR/csar_valid_multilevel_imports_validation.zip")
csar = CSAR(path)
yaml_file = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"data/CSAR/multi_level_imports_response.yaml")
expected_yaml = toscaparser.utils.yamlparser.load_yaml(yaml_file)
self.assertEqual(expected_yaml, csar.get_main_template_yaml())
self.assertTrue(csar.temp_dir is None or
not os.path.exists(csar.temp_dir))
def test_csar_with_multilevel_imports_invalid(self):
path = os.path.join(self.base_path,
"data/CSAR/csar_invalid_multilevel"
"_imports_validation.zip")
csar = CSAR(path)
error = self.assertRaises(ValueError, csar.validate)
self.assertEqual(_(
'The resource "%s" does '
'not exist.') % 'Files/images/'
'cirros-0.4.0-x86_64-disk.img', str(error))
self.assertTrue(csar.temp_dir is None or
not os.path.exists(csar.temp_dir))

View File

@ -945,3 +945,9 @@ class ToscaTemplateTest(TestCase):
os.path.dirname(os.path.abspath(__file__)),
"data/test_custom_capabilty.yaml")
ToscaTemplate(tosca_tpl)
def test_csar_multilevel_imports_relative_path(self):
csar_archive = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'data/CSAR/csar_relative_path_import_check.zip')
self.assertTrue(ToscaTemplate(csar_archive))

View File

@ -55,11 +55,13 @@ YAML_LOADER = toscaparser.utils.yamlparser.load_yaml
class ToscaTemplate(object):
exttools = ExtTools()
VALID_TEMPLATE_VERSIONS = ['tosca_simple_yaml_1_0']
MAIN_TEMPLATE_VERSIONS = ['tosca_simple_yaml_1_0',
'tosca_simple_yaml_1_2']
VALID_TEMPLATE_VERSIONS.extend(exttools.get_versions())
VALID_TEMPLATE_VERSIONS = MAIN_TEMPLATE_VERSIONS + exttools.get_versions()
ADDITIONAL_SECTIONS = {'tosca_simple_yaml_1_0': SPECIAL_SECTIONS}
ADDITIONAL_SECTIONS = {'tosca_simple_yaml_1_0': SPECIAL_SECTIONS,
'tosca_simple_yaml_1_2': SPECIAL_SECTIONS}
ADDITIONAL_SECTIONS.update(exttools.get_sections())
@ -152,7 +154,8 @@ class ToscaTemplate(object):
return reposit
def _tpl_relationship_types(self):
return self._get_custom_types(RELATIONSHIP_TYPES)
custom_rel, _ = self._get_custom_types(RELATIONSHIP_TYPES)
return custom_rel
def _tpl_relationship_templates(self):
topology_template = self._tpl_topology_template()
@ -164,23 +167,27 @@ class ToscaTemplate(object):
def _policies(self):
return self.topology_template.policies
def _get_all_custom_defs(self, imports=None):
def _get_all_custom_defs(self, imports=None, path=None):
types = [IMPORTS, NODE_TYPES, CAPABILITY_TYPES, RELATIONSHIP_TYPES,
DATA_TYPES, INTERFACE_TYPES, POLICY_TYPES, GROUP_TYPES]
custom_defs_final = {}
custom_defs = self._get_custom_types(types, imports)
custom_defs, nested_imports = self._get_custom_types(
types, imports, path)
if custom_defs:
custom_defs_final.update(custom_defs)
if custom_defs.get(IMPORTS):
import_defs = self._get_all_custom_defs(
custom_defs.get(IMPORTS))
custom_defs_final.update(import_defs)
if nested_imports:
for a_file, nested_import in nested_imports.items():
import_defs = self._get_all_custom_defs(
nested_import, a_file)
custom_defs_final.update(import_defs)
# As imports are not custom_types, removing from the dict
custom_defs_final.pop(IMPORTS, None)
return custom_defs_final
def _get_custom_types(self, type_definitions, imports=None):
def _get_custom_types(self, type_definitions, imports=None,
path=None):
"""Handle custom types defined in imported template files
This method loads the custom type definitions referenced in "imports"
@ -188,6 +195,7 @@ class ToscaTemplate(object):
"""
custom_defs = {}
nested_imports = None
type_defs = []
if not isinstance(type_definitions, list):
type_defs.append(type_definitions)
@ -196,18 +204,20 @@ class ToscaTemplate(object):
if not imports:
imports = self._tpl_imports()
if not path:
path = self.path
if imports:
custom_service = toscaparser.imports.\
ImportsLoader(imports, self.path,
type_defs, self.tpl)
ImportsLoader(imports, path, type_defs, self.tpl)
nested_tosca_tpls = custom_service.get_nested_tosca_tpls()
self._update_nested_tosca_tpls_with_topology(nested_tosca_tpls)
nested_imports = custom_service.get_nested_imports()
custom_defs = custom_service.get_custom_defs()
if not custom_defs:
return
return None, None
# Handle custom types defined in current template file
for type_def in type_defs:
@ -215,7 +225,7 @@ class ToscaTemplate(object):
inner_custom_types = self.tpl.get(type_def) or {}
if inner_custom_types:
custom_defs.update(inner_custom_types)
return custom_defs
return custom_defs, nested_imports
def _update_nested_tosca_tpls_with_topology(self, nested_tosca_tpls):
for tpl in nested_tosca_tpls:
@ -269,7 +279,7 @@ class ToscaTemplate(object):
what=version,
valid_versions='", "'. join(self.VALID_TEMPLATE_VERSIONS)))
else:
if version != 'tosca_simple_yaml_1_0':
if version not in self.MAIN_TEMPLATE_VERSIONS:
update_definitions(version)
def _get_path(self, path):