import click import logging import os import pkg_resources import yaml from pegleg.engine import util from pegleg import config __all__ = ['full'] LOG = logging.getLogger(__name__) DECKHAND_SCHEMAS = { 'root': 'schemas/deckhand-root.yaml', 'metadata/Control/v1': 'schemas/deckhand-metadata-control.yaml', 'metadata/Document/v1': 'schemas/deckhand-metadata-document.yaml', } def full(fail_on_missing_sub_src=False): errors = [] warns = [] warns.extend(_verify_no_unexpected_files()) errors.extend(_verify_file_contents()) errors.extend(_verify_deckhand_render(fail_on_missing_sub_src)) if errors: raise click.ClickException('\n'.join( ['Linting failed:'] + errors + ['Linting warnings:'] + warns)) return warns def _verify_no_unexpected_files(): expected_directories = set() for site_name in util.files.list_sites(): params = util.definition.load_as_params(site_name) expected_directories.update(util.files.directories_for(**params)) LOG.debug('expected_directories: %s', expected_directories) found_directories = util.files.existing_directories() LOG.debug('found_directories: %s', found_directories) msgs = [] for unused_dir in sorted(found_directories - expected_directories): msgs.append('%s exists, but is unused' % unused_dir) for missing_dir in sorted(expected_directories - found_directories): if not missing_dir.endswith('common'): msgs.append( '%s was not found, but expected by manifest' % missing_dir) return msgs def _verify_file_contents(): schemas = _load_schemas() errors = [] for filename in util.files.all(): errors.extend(_verify_single_file(filename, schemas)) return errors def _verify_single_file(filename, schemas): errors = [] LOG.debug("Validating file %s." % filename) with open(filename) as f: if not f.read(4) == '---\n': errors.append('%s does not begin with YAML beginning of document ' 'marker "---".' % filename) f.seek(0) try: documents = yaml.safe_load_all(f) for document in documents: errors.extend(_verify_document(document, schemas, filename)) except Exception as e: errors.append('%s is not valid yaml: %s' % (filename, e)) return errors MANDATORY_ENCRYPTED_TYPES = { 'deckhand/CertificateAuthorityKey/v1', 'deckhand/CertificateKey/v1', 'deckhand/Passphrase/v1', 'deckhand/PrivateKey/v1', } def _verify_document(document, schemas, filename): name = ':'.join([ document.get('schema', ''), document.get('metadata', {}).get('name', '') ]) errors = [] layer = _layer(document) if layer is not None and layer != _expected_layer(filename): errors.append( '%s (document %s) had unexpected layer "%s", expected "%s"' % (filename, name, layer, _expected_layer(filename))) # secrets must live in the appropriate directory, and must be # "storagePolicy: encrypted". if document.get('schema') in MANDATORY_ENCRYPTED_TYPES: storage_policy = document.get('metadata', {}).get('storagePolicy') if storage_policy != 'encrypted': errors.append('%s (document %s) is a secret, but has unexpected ' 'storagePolicy: "%s"' % (filename, name, storage_policy)) if not _filename_in_section(filename, 'secrets/'): errors.append( '%s (document %s) is a secret, is not stored in a secrets path' % (filename, name)) return errors def _verify_deckhand_render(fail_on_missing_sub_src=False): documents = [] for filename in util.files.all(): with open(filename) as f: documents.extend(list(yaml.safe_load_all(f))) rendered_documents, errors = util.deckhand.deckhand_render( documents=documents, fail_on_missing_sub_src=fail_on_missing_sub_src, validate=True, ) return errors def _layer(data): if hasattr(data, 'get'): return data.get('metadata', {}).get('layeringDefinition', {}).get('layer') def _expected_layer(filename): for r in config.all_repos(): if filename.startswith(r + "/"): partial_name = filename[len(r) + 1:] parts = os.path.normpath(partial_name).split(os.sep) return parts[0] def _load_schemas(): schemas = {} for key, filename in DECKHAND_SCHEMAS.items(): schemas[key] = util.files.slurp( pkg_resources.resource_filename('pegleg', filename)) return schemas def _filename_in_section(filename, section): directory = util.files.directory_for(path=filename) rest = filename[len(directory) + 1:] return rest is not None and rest.startswith(section)