diff --git a/pegleg/engine/catalogs/base_catalog.py b/pegleg/engine/catalogs/base_catalog.py index 01aaa325..58b07a75 100644 --- a/pegleg/engine/catalogs/base_catalog.py +++ b/pegleg/engine/catalogs/base_catalog.py @@ -14,7 +14,6 @@ from abc import ABC import logging -import os import re from pegleg import config @@ -43,7 +42,7 @@ class BaseCatalog(ABC): """ self._documents = documents or definition.documents_for_site(sitename) self._site_name = sitename - self._catalog_path = None + self._catalog_path = [] self._kind = kind self._catalog_docs = list() for document in self._documents: @@ -61,7 +60,7 @@ class BaseCatalog(ABC): @property def catalog_path(self): - if self._catalog_path is None: + if not self._catalog_path: self._set_catalog_path() return self._catalog_path @@ -69,15 +68,14 @@ class BaseCatalog(ABC): repo_name = git.repo_url(config.get_site_repo()) catalog_name = self._get_document_name('{}.yaml'.format(self._kind)) for file_path in definition.site_files(self.site_name): - if file_path.endswith(catalog_name) and repo_name in file_path: - self._catalog_path = os.path.join( - repo_name, file_path.split(repo_name)[1].lstrip('/')) - return - # Cound not find the Catalog for this generated passphrase - # raise an exception. - LOG.error('Catalog path: {} was not found in repo: {}'.format( - catalog_name, repo_name)) - raise PassphraseCatalogNotFoundException() + if file_path.endswith(catalog_name): + self._catalog_path.append(file_path) + if not self._catalog_path: + # Cound not find the Catalog for this generated passphrase + # raise an exception. + LOG.error('Catalog path: {} was not found in repo: {}'.format( + catalog_name, repo_name)) + raise PassphraseCatalogNotFoundException() def _get_document_name(self, name): s1 = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', name) diff --git a/tests/unit/engine/test_generate_cryptostring.py b/tests/unit/engine/test_generate_cryptostring.py index c05bb8bb..4e9576f3 100644 --- a/tests/unit/engine/test_generate_cryptostring.py +++ b/tests/unit/engine/test_generate_cryptostring.py @@ -69,6 +69,24 @@ data: ... """) +TEST_GLOBAL_PASSPHRASES_CATALOG = yaml.load(""" +--- +schema: pegleg/PassphraseCatalog/v1 +metadata: + schema: metadata/Document/v1 + name: cluster-passphrases + layeringDefinition: + abstract: false + layer: global + storagePolicy: cleartext +data: + passphrases: + - description: 'description of passphrase from global' + document_name: passphrase_from_global + encrypted: true +... +""") + TEST_REPOSITORIES = { 'repositories': { 'global': { @@ -101,6 +119,7 @@ TEST_SITE_DEFINITION = { } TEST_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_PASSPHRASES_CATALOG] +TEST_GLOBAL_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_GLOBAL_PASSPHRASES_CATALOG] def test_cryptostring_default_len(): @@ -149,13 +168,13 @@ def test_cryptostring_long_len(): ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', ENV_SALT: 'MySecretSalt'}) def test_generate_passphrases(*_): - dir = tempfile.mkdtemp() - os.makedirs(os.path.join(dir, 'cicd_site_repo'), exist_ok=True) - PassphraseGenerator('cicd', dir, 'test_author').generate() + _dir = tempfile.mkdtemp() + os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True) + PassphraseGenerator('cicd', _dir, 'test_author').generate() for passphrase in TEST_PASSPHRASES_CATALOG['data']['passphrases']: passphrase_file_name = '{}.yaml'.format(passphrase['document_name']) - passphrase_file_path = os.path.join(dir, 'site', 'cicd', 'secrets', + passphrase_file_path = os.path.join(_dir, 'site', 'cicd', 'secrets', 'passphrases', passphrase_file_name) assert os.path.isfile(passphrase_file_path) @@ -200,3 +219,52 @@ def test_generate_passphrases_exception(capture): ('Signature verification to decrypt secrets failed. ' 'Please check your provided passphrase and salt and ' 'try again.'))) + + +@mock.patch.object( + util.definition, + 'documents_for_site', + autospec=True, + return_value=TEST_GLOBAL_SITE_DOCUMENTS) +@mock.patch.object( + pegleg.config, + 'get_site_repo', + autospec=True, + return_value='cicd_site_repo') +@mock.patch.object( + util.definition, + 'site_files', + autospec=True, + return_value=[ + 'cicd_global_repo/site/cicd/passphrases/passphrase-catalog.yaml', ]) +@mock.patch.dict(os.environ, { + ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', + ENV_SALT: 'MySecretSalt'}) +def test_global_passphrase_catalog(*_): + _dir = tempfile.mkdtemp() + os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True) + PassphraseGenerator('cicd', _dir, 'test_author').generate() + + for passphrase in TEST_GLOBAL_PASSPHRASES_CATALOG['data']['passphrases']: + passphrase_file_name = '{}.yaml'.format(passphrase['document_name']) + passphrase_file_path = os.path.join(_dir, 'site', 'cicd', 'secrets', + 'passphrases', + passphrase_file_name) + assert os.path.isfile(passphrase_file_path) + with open(passphrase_file_path) as stream: + doc = yaml.safe_load(stream) + assert doc['schema'] == 'pegleg/PeglegManagedDocument/v1' + assert doc['metadata']['storagePolicy'] == 'cleartext' + assert 'encrypted' in doc['data'] + assert doc['data']['encrypted']['by'] == 'test_author' + assert 'generated' in doc['data'] + assert doc['data']['generated']['by'] == 'test_author' + assert 'managedDocument' in doc['data'] + assert doc['data']['managedDocument']['metadata'][ + 'storagePolicy'] == 'encrypted' + decrypted_passphrase = encryption.decrypt( + doc['data']['managedDocument']['data'], + os.environ['PEGLEG_PASSPHRASE'].encode(), + os.environ['PEGLEG_SALT'].encode()) + if passphrase_file_name == "passphrase_from_global.yaml": + assert len(decrypted_passphrase) == 24