Use concurrency to retrieve unencrypted secret data

This patch set uses concurrent.futures.ThreadPoolExecutor
[0] to retrieve multiple Barbican secrets concurrently.
This is because currently it is only possible to retrieve
1 secret payload from Barbican at a time -- for revisions
with several dozen secrets it is therefore too costly
to serially perform these API requests.

A new configuration option is added to the [barbican]
group called `max_workers` which specifies the number
of threads to use. The default value is 10. Note that:
"If max_workers is None or not given, it will default
to the number of processors on the machine, multiplied by 5"
[0] so the default is 10 for 2 * 5 which is overly
conservative if anything.

If any error occurs during any of the requests a 500
is raised with appropriate details.

[0] https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

Change-Id: I76a5bb6c345054e160c14bdf9fb7087e3a746a5e
This commit is contained in:
Felipe Monteiro 2018-07-21 16:28:10 -04:00
parent 1583b78902
commit d27ab2d8ea
4 changed files with 55 additions and 18 deletions

View File

@ -24,6 +24,7 @@ barbican_group = cfg.OptGroup(
help="Barbican options for allowing Deckhand to communicate with "
"Barbican.")
barbican_opts = [
# TODO(fmontei): Drop these options and related group once Keystone
# endpoint lookup is used instead.
@ -31,6 +32,10 @@ barbican_opts = [
'api_endpoint',
sample_default='http://barbican.example.org:9311/',
help='URL override for the Barbican API endpoint.'),
cfg.IntOpt(
'max_workers', default=10,
help='Maximum number of threads used to call secret storage service '
'concurrently.')
]

View File

@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import concurrent.futures
import falcon
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
import six
@ -31,6 +34,7 @@ from deckhand import errors
from deckhand import policy
from deckhand import types
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -113,7 +117,7 @@ class RenderedDocumentsResource(api_base.BaseResource):
data = self._retrieve_documents_for_rendering(revision_id, **filters)
documents = document_wrapper.DocumentDict.from_list(data)
encryption_sources = self._retrieve_encrypted_documents(documents)
encryption_sources = self._resolve_encrypted_data(documents)
try:
# NOTE(fmontei): `validate` is False because documents have already
# been pre-validated during ingestion. Documents are post-validated
@ -194,23 +198,50 @@ class RenderedDocumentsResource(api_base.BaseResource):
return documents
def _retrieve_encrypted_documents(self, documents):
def _resolve_encrypted_data(self, documents):
"""Resolve unencrypted data from the secret storage backend.
Submits concurrent requests to the secret storage backend for all
secret references for which unecrypted data is required for future
substitutions during the rendering process.
:param documents: List of all documents for the current revision.
:type documents: List[dict]
:returns: Dictionary keyed with secret references, whose values are
the corresponding unencrypted data.
:rtype: dict
"""
encryption_sources = {}
for document in documents:
if document.is_encrypted and document.has_barbican_ref:
secret_ref = lambda x: x.data
is_encrypted = lambda x: x.is_encrypted and x.has_barbican_ref
encrypted_documents = (d for d in documents if is_encrypted(d))
with concurrent.futures.ThreadPoolExecutor(
max_workers=CONF.barbican.max_workers) as executor:
future_to_document = {
executor.submit(secrets_manager.SecretsManager.get,
secret_ref=secret_ref(d),
src_doc=d): d for d in encrypted_documents
}
for future in concurrent.futures.as_completed(future_to_document):
document = future_to_document[future]
try:
unecrypted_data = secrets_manager.SecretsManager.get(
secret_ref=document.data, src_doc=document)
except Exception as e:
LOG.error(
'An unknown exception occurred while trying to resolve'
' a secret reference for substitution source document '
'[%s, %s] %s.', document.schema, document.layer,
document.name)
raise errors.UnknownSubstitutionError(
src_schema=document.schema, src_layer=document.layer,
src_name=document.name, details=str(e))
encryption_sources.setdefault(document.data, unecrypted_data)
unecrypted_data = future.result()
except Exception as exc:
msg = ('Failed to retrieve a required secret from the '
'configured secret storage service. Document: [%s,'
' %s] %s. Secret ref: %s' % (
document.schema,
document.layer,
document.name,
secret_ref(document)))
LOG.error(msg + '. Details: %s', exc)
raise falcon.HTTPInternalServerError(description=msg)
else:
encryption_sources.setdefault(secret_ref(document),
unecrypted_data)
return encryption_sources
def _post_validate(self, rendered_documents):

View File

@ -167,7 +167,7 @@ class SecretsSubstitution(object):
contained in the destination document's data section to the
actual unecrypted data. If encrypting data with Barbican, the
reference will be a Barbican secret reference.
:type encryption_sources: List[dict]
:type encryption_sources: dict
"""
# This maps a 2-tuple of (schema, name) to a document from which the

View File

@ -114,7 +114,8 @@ commands =
# [H210] Require autospec, spec, or spec_set in mock.patch/mock.patch.object calls
# [H904] Delay string interpolations at logging calls.
enable-extensions = H106,H203,H204,H205,H210,H904
ignore = H405
# [E731] Do not assign a lambda expression, use a def. This reduces readability in some cases.
ignore = E731,H405
exclude = .venv,.git,.tox,dist,*lib/python*,*egg,build,releasenotes,doc,alembic/versions
[testenv:docs]