Move retrieval of encrypted documents to Deckhand controller

This patchset moves retrieval of encrypted documents to the
Deckhand controller so that components like Pegleg and
Promenade can consume the Deckhand engine offline without
running into Barbican errors.

Components can pass in `encryption_sources` to Deckhand's
rendering module which Deckhand will now use instead to resolve
secret references.

`encryption_sources` is a dictionary that maps the reference
contained in the destination document's data section to the
actual unecrypted data. If encrypting data with Barbican, the
reference will be a Barbican secret reference.

Change-Id: I1a457d3bd37101d73a28882845c2ce74ac09fdf4
This commit is contained in:
Felipe Monteiro 2018-06-13 22:06:35 +01:00
parent e25ea04985
commit 039f9830da
13 changed files with 225 additions and 127 deletions

View File

@ -63,7 +63,8 @@ class BarbicanClientWrapper(object):
except barbican_exc.HTTPAuthError as e:
LOG.exception(str(e))
raise errors.BarbicanException(details=str(e))
raise errors.BarbicanClientException(code=e.status_code,
details=str(e))
return cli

View File

@ -13,12 +13,11 @@
# limitations under the License.
import ast
import re
import barbicanclient
from oslo_log import log as logging
from oslo_serialization import base64
from oslo_utils import uuidutils
from oslo_utils import excutils
import six
from deckhand.barbican import client_wrapper
@ -30,30 +29,9 @@ LOG = logging.getLogger(__name__)
class BarbicanDriver(object):
_url_re = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|'
'(?:%[0-9a-fA-F][0-9a-fA-F]))+')
def __init__(self):
self.barbicanclient = client_wrapper.BarbicanClientWrapper()
@classmethod
def is_barbican_ref(cls, secret_ref):
# TODO(felipemonteiro): Query Keystone service catalog for Barbican
# endpoint and cache it if Keystone is enabled. For now, it should be
# enough to check that ``secret_ref`` is a valid URL, contains
# 'secrets' substring, ends in a UUID and that the source document from
# which the reference is extracted is encrypted.
try:
secret_uuid = secret_ref.split('/')[-1]
except Exception:
secret_uuid = None
return (
isinstance(secret_ref, six.string_types) and
cls._url_re.match(secret_ref) and
'secrets' in secret_ref and
uuidutils.is_uuid_like(secret_uuid)
)
@staticmethod
def _get_secret_type(schema):
"""Get the Barbican secret type based on the following mapping:
@ -170,54 +148,54 @@ class BarbicanDriver(object):
secret = self.barbicanclient.call("secrets.create", **kwargs)
secret_ref = secret.store()
except (barbicanclient.exceptions.HTTPAuthError,
barbicanclient.exceptions.HTTPClientError,
barbicanclient.exceptions.HTTPServerError) as e:
barbicanclient.exceptions.HTTPClientError) as e:
LOG.exception(str(e))
raise errors.BarbicanClientException(code=e.status_code,
details=str(e))
except barbicanclient.exceptions.HTTPServerError as e:
LOG.error('Caught %s error from Barbican, likely due to a '
'configuration or deployment issue.',
e.__class__.__name__)
raise errors.BarbicanException(details=str(e))
raise errors.BarbicanServerException(details=str(e))
except barbicanclient.exceptions.PayloadException as e:
LOG.error('Caught %s error from Barbican, because the secret '
'payload type is unsupported.', e.__class__.__name__)
raise errors.BarbicanException(details=str(e))
raise errors.BarbicanServerException(details=str(e))
return secret_ref
def _base64_decode_payload(self, src_doc, dest_doc, payload):
def _base64_decode_payload(self, payload):
# If the secret_type is 'opaque' then this implies the
# payload was encoded to base64 previously. Reverse the
# operation.
try:
# If the secret_type is 'opaque' then this implies the
# payload was encoded to base64 previously. Reverse the
# operation.
payload = ast.literal_eval(base64.decode_as_text(payload))
return ast.literal_eval(base64.decode_as_text(payload))
except Exception:
message = ('Failed to unencode the original payload that '
'presumably was encoded to base64 with '
'secret_type=opaque for document [%s, %s] %s.' %
src_doc.meta)
LOG.error(message)
raise errors.UnknownSubstitutionError(
src_schema=src_doc.schema, src_layer=src_doc.layer,
src_name=src_doc.name, schema=dest_doc.schema,
layer=dest_doc.layer, name=dest_doc.name,
details=message)
return payload
with excutils.save_and_reraise_exception():
message = ('Failed to unencode the original payload that '
'presumably was encoded to base64 with '
'secret_type: opaque.')
LOG.error(message)
def get_secret(self, src_doc, dest_doc, secret_ref):
def get_secret(self, secret_ref, src_doc):
"""Get a secret."""
try:
secret = self.barbicanclient.call("secrets.get", secret_ref)
except (barbicanclient.exceptions.HTTPAuthError,
barbicanclient.exceptions.HTTPClientError,
barbicanclient.exceptions.HTTPServerError,
barbicanclient.exceptions.HTTPClientError) as e:
LOG.exception(str(e))
raise errors.BarbicanClientException(code=e.status_code,
details=str(e))
except (barbicanclient.exceptions.HTTPServerError,
ValueError) as e:
LOG.exception(str(e))
raise errors.BarbicanException(details=str(e))
raise errors.BarbicanServerException(details=str(e))
payload = secret.payload
if secret.secret_type == 'opaque':
LOG.debug('Forcibly base64-decoding original non-string payload '
'for document [%s, %s] %s.', *src_doc.meta)
secret = self._base64_decode_payload(src_doc, dest_doc, payload)
secret = self._base64_decode_payload(payload)
else:
secret = payload

View File

@ -15,12 +15,19 @@
import collections
import functools
import inspect
import re
from oslo_serialization import jsonutils as json
from oslo_utils import uuidutils
import six
from deckhand.common import utils
_URL_RE = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|'
'(?:%[0-9a-fA-F][0-9a-fA-F]))+')
class DocumentDict(dict):
"""Wrapper for a document.
@ -136,6 +143,20 @@ class DocumentDict(dict):
def is_encrypted(self):
return self.storage_policy == 'encrypted'
@property
def has_barbican_ref(self):
try:
secret_ref = self.data
secret_uuid = secret_ref.split('/')[-1]
except Exception:
secret_uuid = None
return (
isinstance(secret_ref, six.string_types) and
_URL_RE.match(secret_ref) and
'secrets' in secret_ref and
uuidutils.is_uuid_like(secret_uuid)
)
@property
def is_replacement(self):
return utils.jsonpath_parse(self, 'metadata.replacement') is True

View File

@ -57,12 +57,7 @@ class BucketsResource(api_base.BaseResource):
'deckhand:create_encrypted_documents', req.context)
break
try:
documents = self._prepare_secret_documents(documents)
except deckhand_errors.BarbicanException:
with excutils.save_and_reraise_exception():
LOG.error('An unknown exception occurred while trying to store'
' a secret in Barbican.')
documents = self._encrypt_secret_documents(documents)
created_documents = self._create_revision_documents(
bucket_name, documents, validations)
@ -70,7 +65,7 @@ class BucketsResource(api_base.BaseResource):
resp.body = self.view_builder.list(created_documents)
resp.status = falcon.HTTP_200
def _prepare_secret_documents(self, documents):
def _encrypt_secret_documents(self, documents):
# Encrypt data for secret documents, if any.
for document in documents:
if secrets_manager.SecretsManager.requires_encryption(document):

View File

@ -25,6 +25,7 @@ from deckhand.control.views import document as document_view
from deckhand.db.sqlalchemy import api as db_api
from deckhand.engine import document_validation
from deckhand.engine import layering
from deckhand.engine import secrets_manager
from deckhand import errors
from deckhand import policy
from deckhand import types
@ -111,15 +112,19 @@ class RenderedDocumentsResource(api_base.BaseResource):
documents = self._retrieve_documents_for_rendering(revision_id,
**filters)
encryption_sources = self._retrieve_encrypted_documents(documents)
try:
# NOTE(fmontei): `validate` is False because documents have already
# been pre-validated during ingestion. Documents are post-validated
# below, regardless.
document_layering = layering.DocumentLayering(
documents, validate=False)
documents, encryption_sources=encryption_sources,
validate=False)
rendered_documents = document_layering.render()
except (errors.InvalidDocumentLayer,
except (errors.BarbicanClientException,
errors.BarbicanServerException,
errors.InvalidDocumentLayer,
errors.InvalidDocumentParent,
errors.InvalidDocumentReplacement,
errors.IndeterminateDocumentParent,
@ -131,6 +136,12 @@ class RenderedDocumentsResource(api_base.BaseResource):
errors.UnsupportedActionMethod) as e:
with excutils.save_and_reraise_exception():
LOG.exception(e.format_message())
except errors.EncryptionSourceNotFound as e:
# This branch should be unreachable, but if an encryption source
# wasn't found, then this indicates the controller fed bad data
# to the engine, in which case this is a 500.
e.code = 500
raise e
# Filters to be applied post-rendering, because many documents are
# involved in rendering. User filters can only be applied once all
@ -183,6 +194,25 @@ class RenderedDocumentsResource(api_base.BaseResource):
return documents
def _retrieve_encrypted_documents(self, documents):
encryption_sources = {}
for document in documents:
if document.is_encrypted and document.has_barbican_ref:
try:
unecrypted_data = secrets_manager.SecretsManager.get(
secret_ref=document.data, src_doc=document)
except Exception as e:
LOG.error(
'An unknown exception occurred while trying to resolve'
' a secret reference for substitution source document '
'[%s, %s] %s.', document.schema, document.layer,
document.name)
raise errors.UnknownSubstitutionError(
src_schema=document.schema, src_layer=document.layer,
src_name=document.name, details=str(e))
encryption_sources.setdefault(document.data, unecrypted_data)
return encryption_sources
def _post_validate(self, rendered_documents):
# Perform schema validation post-rendering to ensure that rendering
# and substitution didn't break anything.
@ -193,12 +223,12 @@ class RenderedDocumentsResource(api_base.BaseResource):
try:
validations = doc_validator.validate_all()
except errors.InvalidDocumentFormat as e:
with excutils.save_and_reraise_exception():
# Post-rendering validation errors likely indicate an internal
# rendering bug, so override the default code to 500.
e.code = 500
LOG.error('Failed to post-validate rendered documents.')
LOG.exception(e.format_message())
# Post-rendering validation errors likely indicate an internal
# rendering bug, so override the default code to 500.
e.code = 500
LOG.error('Failed to post-validate rendered documents.')
LOG.exception(e.format_message())
raise e
else:
error_list = []

View File

@ -374,7 +374,7 @@ def document_get(session=None, raw_dict=False, revision_id=None, **filters):
for doc in documents:
d = doc.to_dict(raw_dict=raw_dict)
if utils.deepfilter(d, **nested_filters):
return d
return document_wrapper.DocumentDict(d)
filters.update(nested_filters)
raise errors.DocumentNotFound(filters=filters)

View File

@ -382,7 +382,7 @@ class DocumentLayering(object):
raise errors.InvalidDocumentFormat(error_list=error_list)
def __init__(self, documents, substitution_sources=None, validate=True,
fail_on_missing_sub_src=True):
fail_on_missing_sub_src=True, encryption_sources=None):
"""Contructor for ``DocumentLayering``.
:param layering_policy: The document with schema
@ -401,6 +401,11 @@ class DocumentLayering(object):
:param fail_on_missing_sub_src: Whether to fail on a missing
substitution source. Default is True.
:type fail_on_missing_sub_src: bool
:param encryption_sources: A dictionary that maps the reference
contained in the destination document's data section to the
actual unecrypted data. If encrypting data with Barbican, the
reference will be a Barbican secret reference.
:type encryption_sources: List[dict]
:raises LayeringPolicyNotFound: If no LayeringPolicy was found among
list of ``documents``.
@ -489,6 +494,7 @@ class DocumentLayering(object):
self.secrets_substitution = secrets_manager.SecretsSubstitution(
substitution_sources,
encryption_sources=encryption_sources,
fail_on_missing_sub_src=fail_on_missing_sub_src)
self._sorted_documents = self._topologically_sort_documents(
@ -692,8 +698,8 @@ class DocumentLayering(object):
# data has been encrypted so that future references use the actual
# secret payload, rather than the Barbican secret reference.
elif doc.is_encrypted:
encrypted_data = self.secrets_substitution.get_encrypted_data(
doc.data, doc, doc)
encrypted_data = self.secrets_substitution\
.get_unencrypted_data(doc.data, doc, doc)
if not doc.is_abstract:
doc.data = encrypted_data
self.secrets_substitution.update_substitution_sources(

View File

@ -76,7 +76,7 @@ class SecretsManager(object):
return payload
@classmethod
def get(cls, secret_ref, src_doc, dest_doc):
def get(cls, secret_ref, src_doc):
"""Retrieve a secret payload from Barbican.
Extracts {secret_uuid} from a secret reference and queries Barbican's
@ -88,8 +88,8 @@ class SecretsManager(object):
"""
LOG.debug('Resolving Barbican secret using source document '
'reference...')
secret = cls.barbican_driver.get_secret(src_doc, dest_doc,
secret_ref=secret_ref)
secret = cls.barbican_driver.get_secret(secret_ref=secret_ref,
src_doc=src_doc)
LOG.debug('Successfully retrieved Barbican secret using reference.')
return secret
@ -97,7 +97,8 @@ class SecretsManager(object):
class SecretsSubstitution(object):
"""Class for document substitution logic for YAML files."""
__slots__ = ('_fail_on_missing_sub_src', '_substitution_sources')
__slots__ = ('_fail_on_missing_sub_src', '_substitution_sources',
'_encryption_sources')
_insecure_reg_exps = (
re.compile(r'^.* is not of type .+$'),
@ -136,27 +137,21 @@ class SecretsSubstitution(object):
return to_sanitize
@staticmethod
def get_encrypted_data(src_secret, src_doc, dest_doc):
try:
src_secret = SecretsManager.get(src_secret, src_doc, dest_doc)
except errors.BarbicanException as e:
def get_unencrypted_data(self, secret_ref, src_doc, dest_doc):
if secret_ref not in self._encryption_sources:
LOG.error(
'Failed to resolve a Barbican reference for substitution '
'source document [%s, %s] %s referenced in document [%s, %s] '
'%s. Details: %s', src_doc.schema, src_doc.layer, src_doc.name,
dest_doc.schema, dest_doc.layer, dest_doc.name,
e.format_message())
raise errors.UnknownSubstitutionError(
src_schema=src_doc.schema, src_layer=src_doc.layer,
src_name=src_doc.name, schema=dest_doc.schema,
layer=dest_doc.layer, name=dest_doc.name,
details=e.format_message())
else:
return src_secret
'Secret reference %s not found among `encryption_sources`, '
'referenced by source document [%s, %s] %s, needed by '
'destination document [%s, %s] %s.', secret_ref,
src_doc.schema, src_doc.layer, src_doc.name,
dest_doc.schema, dest_doc.layer, dest_doc.name)
raise errors.EncryptionSourceNotFound(
secret_ref=secret_ref, schema=src_doc.schema,
layer=src_doc.layer, name=src_doc.name)
return self._encryption_sources[secret_ref]
def __init__(self, substitution_sources=None,
fail_on_missing_sub_src=True):
fail_on_missing_sub_src=True, encryption_sources=None):
"""SecretSubstitution constructor.
This class will automatically detect documents that require
@ -170,6 +165,11 @@ class SecretsSubstitution(object):
:type substitution_sources: List[dict] or dict
:param bool fail_on_missing_sub_src: Whether to fail on a missing
substitution source. Default is True.
:param encryption_sources: A dictionary that maps the reference
contained in the destination document's data section to the
actual unecrypted data. If encrypting data with Barbican, the
reference will be a Barbican secret reference.
:type encryption_sources: List[dict]
"""
# This maps a 2-tuple of (schema, name) to a document from which the
@ -177,6 +177,7 @@ class SecretsSubstitution(object):
# name). This is necessary since the substitution format in the
# document itself only provides a 2-tuple of (schema, name).
self._substitution_sources = {}
self._encryption_sources = encryption_sources or {}
self._fail_on_missing_sub_src = fail_on_missing_sub_src
if isinstance(substitution_sources, dict):
@ -290,10 +291,9 @@ class SecretsSubstitution(object):
# If the document has storagePolicy == encrypted then resolve
# the Barbican reference into the actual secret.
if src_doc.is_encrypted and BarbicanDriver.is_barbican_ref(
src_secret):
src_secret = self.get_encrypted_data(src_secret, src_doc,
document)
if src_doc.is_encrypted and src_doc.has_barbican_ref:
src_secret = self.get_unencrypted_data(src_secret, src_doc,
document)
if not isinstance(sub['dest'], list):
dest_array = [sub['dest']]

View File

@ -190,10 +190,9 @@ class DeckhandException(Exception):
with the keyword arguments provided to the constructor.
"""
msg_fmt = "An unknown exception occurred."
code = 500
def __init__(self, message=None, **kwargs):
kwargs.setdefault('code', DeckhandException.code)
def __init__(self, message=None, code=500, **kwargs):
kwargs.setdefault('code', code)
if not message:
try:
@ -372,6 +371,20 @@ class SubstitutionSourceDataNotFound(DeckhandException):
code = 400
class EncryptionSourceNotFound(DeckhandException):
"""Required encryption source reference was not found.
**Troubleshoot:**
* Ensure that the secret reference exists among the encryption sources.
"""
msg_fmt = (
"Required encryption source reference could not be resolved into a "
"secret because it was not found among encryption sources. Ref: "
"%(secret_ref)s. Referenced by: [%(schema)s, %(layer)s] %(name)s.")
code = 400 # Indicates bad data was passed in, causing a lookup to fail.
class DocumentNotFound(DeckhandException):
"""The requested document could not be found.
@ -469,8 +482,8 @@ class PolicyNotAuthorized(DeckhandException):
code = 403
class BarbicanException(DeckhandException):
"""An error occurred with Barbican.
class BarbicanClientException(DeckhandException):
"""A client-side 4xx error occurred with Barbican.
**Troubleshoot:**
@ -479,8 +492,13 @@ class BarbicanException(DeckhandException):
* Ensure that Deckhand and Barbican are contained in the Keystone service
catalog.
"""
msg_fmt = ('An exception occurred while trying to communicate with '
'Barbican. Details: %(details)s')
msg_fmt = 'Barbican raised a client error. Details: %(details)s'
code = 400 # Needs to be overridden.
class BarbicanServerException(DeckhandException):
"""A server-side 5xx error occurred with Barbican."""
msg_fmt = ('Barbican raised a server error. Details: %(details)s')
code = 500
@ -489,8 +507,16 @@ class UnknownSubstitutionError(DeckhandException):
**Troubleshoot:**
"""
msg_fmt = ('An unknown exception occurred while trying to perform '
'substitution using source document [%(src_schema)s, '
'%(src_layer)s] %(src_name)s contained in document ['
'%(schema)s, %(layer)s] %(name)s. Details: %(details)s')
code = 500
def __init__(self, *args, **kwargs):
super(UnknownSubstitutionError, self).__init__(*args, **kwargs)
dest_args = ('schema', 'layer', 'name')
msg_format = ('An unknown exception occurred while trying to perform '
'substitution using source document [%(src_schema)s, '
'%(src_layer)s] %(src_name)s')
if all(x in args for x in dest_args):
msg_format += (' contained in document [%(schema)s, %(layer)s]'
' %(name)s')
msg_format += '. Details: %(detail)s'
self.msg_fmt = msg_format

View File

@ -84,3 +84,8 @@ def rand_password(length=15):
pre = upper + digit + punc
password = pre + ''.join(random.choice(seed) for x in range(length - 3))
return password
def rand_barbican_ref():
secret_ref = "http://127.0.0.1/key-manager/v1/secrets/%s" % rand_uuid_hex()
return secret_ref

View File

@ -18,6 +18,7 @@ import yaml
import falcon
import mock
from deckhand.common import document as document_wrapper
from deckhand import policy
from deckhand.tests.unit.control import base as test_base
@ -187,7 +188,8 @@ class TestValidationMessageFormatting(test_base.BaseControllerTest):
with mock.patch('deckhand.control.revision_documents.db_api'
'.revision_documents_get', autospec=True) \
as mock_get_rev_documents:
invalid_document = yaml.safe_load(payload)
invalid_document = document_wrapper.DocumentDict(
yaml.safe_load(payload))
invalid_document.pop('metadata')
mock_get_rev_documents.return_value = [invalid_document]

View File

@ -116,7 +116,7 @@ class TestSecretsManager(test_base.TestDbBase):
self.mock_barbicanclient.get_secret.return_value = (
mock.Mock(payload=expected_secret))
secret_payload = secrets_manager.SecretsManager.get(secret_ref, {}, {})
secret_payload = secrets_manager.SecretsManager.get(secret_ref, {})
self.assertEqual(expected_secret, secret_payload)
self.mock_barbicanclient.call.assert_called_with(
@ -164,7 +164,7 @@ class TestSecretsManager(test_base.TestDbBase):
dummy_document = document_wrapper.DocumentDict({})
retrieved_payload = secrets_manager.SecretsManager.get(
secret_ref, dummy_document, dummy_document)
secret_ref, dummy_document)
self.assertEqual(payload, retrieved_payload)
@ -176,7 +176,7 @@ class TestSecretsSubstitution(test_base.TestDbBase):
self.secrets_factory = factories.DocumentSecretFactory()
def _test_doc_substitution(self, document_mapping, substitution_sources,
expected_data):
expected_data, encryption_sources=None):
payload = self.document_factory.gen_test(document_mapping,
global_abstract=False)
bucket_name = test_utils.rand_name('bucket')
@ -187,7 +187,8 @@ class TestSecretsSubstitution(test_base.TestDbBase):
expected_document['data'] = expected_data
secret_substitution = secrets_manager.SecretsSubstitution(
substitution_sources)
encryption_sources=encryption_sources,
substitution_sources=substitution_sources)
substituted_docs = list(secret_substitution.substitute_all(documents))
self.assertIn(expected_document, substituted_docs)
@ -221,9 +222,7 @@ class TestSecretsSubstitution(test_base.TestDbBase):
self._test_doc_substitution(
document_mapping, [certificate], expected_data)
@mock.patch.object(secrets_manager, 'SecretsManager', autospec=True)
def test_doc_substitution_single_encrypted(self, mock_secrets_manager):
mock_secrets_manager.get.return_value = 'test-certificate'
def test_doc_substitution_with_encryption_source(self):
secret_ref = test_utils.rand_uuid_hex()
secret_ref = ("http://127.0.0.1/key-manager/v1/secrets/%s"
@ -255,9 +254,9 @@ class TestSecretsSubstitution(test_base.TestDbBase):
}
}
self._test_doc_substitution(
document_mapping, [certificate], expected_data)
mock_secrets_manager.get.assert_called_once_with(
secret_ref, certificate, mock.ANY)
document_mapping, substitution_sources=[certificate],
encryption_sources={secret_ref: 'test-certificate'},
expected_data=expected_data)
def test_create_destination_path_with_array(self):
# Validate that the destination data will be populated with an array
@ -883,8 +882,7 @@ class TestSecretsSubstitutionNegative(test_base.TestDbBase):
self.secrets_factory = factories.DocumentSecretFactory()
def _test_secrets_substitution(self, secret_type, expected_exception):
secret_ref = ("http://127.0.0.1/key-manager/v1/secrets/%s"
% test_utils.rand_uuid_hex())
secret_ref = test_utils.rand_barbican_ref()
certificate = self.secrets_factory.gen_test(
'Certificate', secret_type, data=secret_ref)
certificate['metadata']['name'] = 'example-cert'
@ -912,13 +910,6 @@ class TestSecretsSubstitutionNegative(test_base.TestDbBase):
with testtools.ExpectedException(expected_exception):
next(secrets_substitution.substitute_all(documents))
@mock.patch.object(secrets_manager, 'SecretsManager', autospec=True)
def test_barbican_exception_raises_unknown_error(
self, mock_secrets_manager):
mock_secrets_manager.get.side_effect = errors.BarbicanException
self._test_secrets_substitution(
'encrypted', errors.UnknownSubstitutionError)
@mock.patch('deckhand.engine.secrets_manager.utils', autospec=True)
def test_generic_exception_raises_unknown_error(
self, mock_utils):
@ -957,3 +948,36 @@ class TestSecretsSubstitutionNegative(test_base.TestDbBase):
with testtools.ExpectedException(
errors.SubstitutionSourceDataNotFound):
next(secrets_substitution.substitute_all(documents))
def test_secret_substitution_missing_encryption_sources_raises_exc(self):
"""Validate that when ``encryption_sources`` doesn't contain a
reference that a ``EncryptionSourceNotFound`` is raised.
"""
secret_ref = test_utils.rand_barbican_ref()
certificate = self.secrets_factory.gen_test(
'Certificate', 'encrypted', data=secret_ref)
certificate['metadata']['name'] = 'example-cert'
document_mapping = {
"_GLOBAL_SUBSTITUTIONS_1_": [{
"dest": {
"path": ".chart.values.tls.certificate"
},
"src": {
"schema": "deckhand/Certificate/v1",
"name": "example-cert",
"path": ".path-to-nowhere"
}
}]
}
payload = self.document_factory.gen_test(document_mapping,
global_abstract=False)
bucket_name = test_utils.rand_name('bucket')
documents = self.create_documents(
bucket_name, [certificate] + [payload[-1]])
secrets_substitution = secrets_manager.SecretsSubstitution(
documents, encryption_sources={'foo': 'bar'})
with testtools.ExpectedException(errors.EncryptionSourceNotFound):
next(secrets_substitution.substitute_all(documents))

View File

@ -24,8 +24,13 @@ Deckhand Exceptions
* - Exception Name
- Description
* - BarbicanException
- .. autoexception:: deckhand.errors.BarbicanException
* - BarbicanClientException
- .. autoexception:: deckhand.errors.BarbicanClientException
:members:
:show-inheritance:
:undoc-members:
* - BarbicanServerException
- .. autoexception:: deckhand.errors.BarbicanServerException
:members:
:show-inheritance:
:undoc-members:
@ -39,6 +44,11 @@ Deckhand Exceptions
:members:
:show-inheritance:
:undoc-members:
* - EncryptionSourceNotFound
- .. autoexception:: deckhand.errors.EncryptionSourceNotFound
:members:
:show-inheritance:
:undoc-members:
* - InvalidDocumentFormat
- .. autoexception:: deckhand.errors.InvalidDocumentFormat
:members: