Search all scopes for catalogs

This patch revises pegleg to check all scopes for catalog files,
rather than just the site scope.

Change-Id: Icf03ce739968ef3ed5fd218ca0fe87e24451ba0d
This commit is contained in:
Lev Morgan 2019-02-04 15:50:23 -06:00
parent 81720cd0c8
commit 0252b71750
2 changed files with 82 additions and 16 deletions

View File

@ -14,7 +14,6 @@
from abc import ABC
import logging
import os
import re
from pegleg import config
@ -43,7 +42,7 @@ class BaseCatalog(ABC):
"""
self._documents = documents or definition.documents_for_site(sitename)
self._site_name = sitename
self._catalog_path = None
self._catalog_path = []
self._kind = kind
self._catalog_docs = list()
for document in self._documents:
@ -61,7 +60,7 @@ class BaseCatalog(ABC):
@property
def catalog_path(self):
if self._catalog_path is None:
if not self._catalog_path:
self._set_catalog_path()
return self._catalog_path
@ -69,15 +68,14 @@ class BaseCatalog(ABC):
repo_name = git.repo_url(config.get_site_repo())
catalog_name = self._get_document_name('{}.yaml'.format(self._kind))
for file_path in definition.site_files(self.site_name):
if file_path.endswith(catalog_name) and repo_name in file_path:
self._catalog_path = os.path.join(
repo_name, file_path.split(repo_name)[1].lstrip('/'))
return
# Cound not find the Catalog for this generated passphrase
# raise an exception.
LOG.error('Catalog path: {} was not found in repo: {}'.format(
catalog_name, repo_name))
raise PassphraseCatalogNotFoundException()
if file_path.endswith(catalog_name):
self._catalog_path.append(file_path)
if not self._catalog_path:
# Cound not find the Catalog for this generated passphrase
# raise an exception.
LOG.error('Catalog path: {} was not found in repo: {}'.format(
catalog_name, repo_name))
raise PassphraseCatalogNotFoundException()
def _get_document_name(self, name):
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', name)

View File

@ -69,6 +69,24 @@ data:
...
""")
TEST_GLOBAL_PASSPHRASES_CATALOG = yaml.load("""
---
schema: pegleg/PassphraseCatalog/v1
metadata:
schema: metadata/Document/v1
name: cluster-passphrases
layeringDefinition:
abstract: false
layer: global
storagePolicy: cleartext
data:
passphrases:
- description: 'description of passphrase from global'
document_name: passphrase_from_global
encrypted: true
...
""")
TEST_REPOSITORIES = {
'repositories': {
'global': {
@ -101,6 +119,7 @@ TEST_SITE_DEFINITION = {
}
TEST_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_PASSPHRASES_CATALOG]
TEST_GLOBAL_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_GLOBAL_PASSPHRASES_CATALOG]
def test_cryptostring_default_len():
@ -149,13 +168,13 @@ def test_cryptostring_long_len():
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt'})
def test_generate_passphrases(*_):
dir = tempfile.mkdtemp()
os.makedirs(os.path.join(dir, 'cicd_site_repo'), exist_ok=True)
PassphraseGenerator('cicd', dir, 'test_author').generate()
_dir = tempfile.mkdtemp()
os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True)
PassphraseGenerator('cicd', _dir, 'test_author').generate()
for passphrase in TEST_PASSPHRASES_CATALOG['data']['passphrases']:
passphrase_file_name = '{}.yaml'.format(passphrase['document_name'])
passphrase_file_path = os.path.join(dir, 'site', 'cicd', 'secrets',
passphrase_file_path = os.path.join(_dir, 'site', 'cicd', 'secrets',
'passphrases',
passphrase_file_name)
assert os.path.isfile(passphrase_file_path)
@ -200,3 +219,52 @@ def test_generate_passphrases_exception(capture):
('Signature verification to decrypt secrets failed. '
'Please check your provided passphrase and salt and '
'try again.')))
@mock.patch.object(
util.definition,
'documents_for_site',
autospec=True,
return_value=TEST_GLOBAL_SITE_DOCUMENTS)
@mock.patch.object(
pegleg.config,
'get_site_repo',
autospec=True,
return_value='cicd_site_repo')
@mock.patch.object(
util.definition,
'site_files',
autospec=True,
return_value=[
'cicd_global_repo/site/cicd/passphrases/passphrase-catalog.yaml', ])
@mock.patch.dict(os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt'})
def test_global_passphrase_catalog(*_):
_dir = tempfile.mkdtemp()
os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True)
PassphraseGenerator('cicd', _dir, 'test_author').generate()
for passphrase in TEST_GLOBAL_PASSPHRASES_CATALOG['data']['passphrases']:
passphrase_file_name = '{}.yaml'.format(passphrase['document_name'])
passphrase_file_path = os.path.join(_dir, 'site', 'cicd', 'secrets',
'passphrases',
passphrase_file_name)
assert os.path.isfile(passphrase_file_path)
with open(passphrase_file_path) as stream:
doc = yaml.safe_load(stream)
assert doc['schema'] == 'pegleg/PeglegManagedDocument/v1'
assert doc['metadata']['storagePolicy'] == 'cleartext'
assert 'encrypted' in doc['data']
assert doc['data']['encrypted']['by'] == 'test_author'
assert 'generated' in doc['data']
assert doc['data']['generated']['by'] == 'test_author'
assert 'managedDocument' in doc['data']
assert doc['data']['managedDocument']['metadata'][
'storagePolicy'] == 'encrypted'
decrypted_passphrase = encryption.decrypt(
doc['data']['managedDocument']['data'],
os.environ['PEGLEG_PASSPHRASE'].encode(),
os.environ['PEGLEG_SALT'].encode())
if passphrase_file_name == "passphrase_from_global.yaml":
assert len(decrypted_passphrase) == 24