Fix Castellan Secret Store inconsistent encoding

This patch fixes the Castellan secret store use of SecretDTO objects,
which require that the "secret" member be base64 encoded. [1]

Prior to this fix all secrets that were generated were stored in
plaintext, but secrets coming in through the API were base64 encoded
before being stored in the backend.

On secret retreival the Castellan plugin wrongly assumed everything in
the backend was encoded, so attempts to retrieve generated keys failed.

This patch fixes this inconsistency by always storing data un-encoded in
the backend.

A helper method was added to sort out the inconsistent data stored prior
to this fix.

A "version" property was added to the Castellan plugin metadata that is
stored in barbican to help differentiate secrets stored prior to this
fix vs secrets stored after this fix.

Story: 2008335
Task: 41236

[1]
https://opendev.org/openstack/barbican/src/tag/12.0.0/barbican/plugin/interface/secret_store.py#L356

Change-Id: I46fe77a471bf7927a24ca4d64dfccb385cd6402e
This commit is contained in:
Douglas Mendizábal 2021-06-11 16:25:54 -05:00
parent 200bff896c
commit b9daa100d0
4 changed files with 132 additions and 35 deletions

View File

@ -14,8 +14,10 @@
# limitations under the License.
import abc
import base64
import six
from castellan.common.objects import key
from castellan.common.objects import opaque_data
from castellan import key_manager
from oslo_context import context
@ -31,11 +33,56 @@ class CastellanSecretStore(ss.SecretStoreBase, metaclass=abc.ABCMeta):
KEY_ID = "key_id"
ALG = "alg"
BIT_LENGTH = "bit_length"
METADATA_VERSION = "version"
CURRENT_VERSION = 1
def _set_params(self, conf):
self.key_manager = key_manager.API(conf)
self.context = context.get_current()
def _meta_dict(self, key_id, bit_length=None, algorithm=None):
"""Return the current version of the metadata dict
Builds the metadata dict to be stored in the database.
"""
meta = {
self.KEY_ID: key_id,
self.METADATA_VERSION: self.CURRENT_VERSION,
}
if bit_length is not None:
meta[self.BIT_LENGTH] = bit_length
if algorithm is not None:
meta[self.ALG] = algorithm
return meta
def _ensure_legacy_base64(self, secret):
"""Ensure secret data is base64 encoded
This method ensures that secrets that were stored prior to the fix
for Story 2008335 are base64 encoded.
"""
payload = secret.get_encoded()
if isinstance(secret, key.Key):
# Keys generated by Castellan are not base64-encoded.
# Both symmetric and asymmetric keys returned by Castellan
# are subclasses of key.Key
LOG.debug("Encoding legacy Castellan-generated key")
return base64.b64encode(payload)
else:
# Objects stored by Barbican are stored as opaque_data.OpaqueData
# in Castellan. They should already be base64-encoded so we
# check here to make sure.
LOG.debug("Validating base64 encoding")
try:
_ = base64.b64decode(payload)
return payload
except UnicodeDecodeError:
# Data can't be decoded. Not sure how we ended up here,
# but we can encode now to prevent issues when we attempt
# to base64-decode the DTO later.
LOG.warning("Legacy secret data assumed to be plaintext")
return base64.b64encode(payload)
@abc.abstractmethod
def get_conf(self, conf):
"""Get plugin configuration
@ -66,9 +113,17 @@ class CastellanSecretStore(ss.SecretStoreBase, metaclass=abc.ABCMeta):
secret = self.key_manager.get(
self.context,
secret_ref)
return ss.SecretDTO(secret_type, secret.get_encoded(),
ss.KeySpec(), secret_metadata['content_type'])
meta_version = secret_metadata.get(self.METADATA_VERSION)
if meta_version is None:
# Secrets without a metadata version were stored prior to fix
# for Story 2008335. They may or may not be base64-encoded.
LOG.debug("Retrieving legacy secret")
data = self._ensure_legacy_base64(secret)
else:
# Version 1 - secret payload data is stored in plaintext in
# the backend. We need to base64 encode them for the DTO.
data = base64.b64encode(secret.get_encoded())
return ss.SecretDTO(secret_type, data, ss.KeySpec(), None)
except Exception as e:
LOG.exception("Error retrieving secret {}: {}".format(
secret_ref, six.text_type(e)))
@ -78,13 +133,13 @@ class CastellanSecretStore(ss.SecretStoreBase, metaclass=abc.ABCMeta):
if not self.store_secret_supports(secret_dto.key_spec):
raise ss.SecretAlgorithmNotSupportedException(
secret_dto.key_spec.alg)
plaintext = base64.b64decode(secret_dto.secret)
try:
secret_ref = self.key_manager.store(
secret_id = self.key_manager.store(
self.context,
opaque_data.OpaqueData(secret_dto.secret)
opaque_data.OpaqueData(plaintext)
)
return {CastellanSecretStore.KEY_ID: secret_ref}
return self._meta_dict(secret_id)
except Exception as e:
LOG.exception("Error storing secret: {}".format(
six.text_type(e)))
@ -109,12 +164,12 @@ class CastellanSecretStore(ss.SecretStoreBase, metaclass=abc.ABCMeta):
raise ss.SecretAlgorithmNotSupportedException(
key_spec.alg)
try:
secret_ref = self.key_manager.create_key(
secret_id = self.key_manager.create_key(
self.context,
key_spec.alg,
key_spec.bit_length
)
return {CastellanSecretStore.KEY_ID: secret_ref}
return self._meta_dict(secret_id)
except Exception as e:
LOG.exception("Error generating symmetric key: {}".format(
six.text_type(e)))
@ -129,23 +184,19 @@ class CastellanSecretStore(ss.SecretStoreBase, metaclass=abc.ABCMeta):
raise ss.GeneratePassphraseNotSupportedException()
try:
private_ref, public_ref = self.key_manager.create_key_pair(
private_id, public_id = self.key_manager.create_key_pair(
self.context,
key_spec.alg,
key_spec.bit_length
)
private_key_metadata = {
CastellanSecretStore.ALG: key_spec.alg,
CastellanSecretStore.BIT_LENGTH: key_spec.bit_length,
CastellanSecretStore.KEY_ID: private_ref
}
private_key_metadata = self._meta_dict(
private_id, key_spec.bit_length, key_spec.alg
)
public_key_metadata = {
CastellanSecretStore.ALG: key_spec.alg,
CastellanSecretStore.BIT_LENGTH: key_spec.bit_length,
CastellanSecretStore.KEY_ID: public_ref
}
public_key_metadata = self._meta_dict(
public_id, key_spec.bit_length, key_spec.alg
)
return ss.AsymmetricKeyMetadataDTO(
private_key_metadata,

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
from unittest import mock
from castellan.common import exception
@ -25,7 +26,9 @@ from barbican.tests import utils
key_ref1 = 'aff825be-6ede-4b1d-aeb0-aaec8e62aec6'
key_ref2 = '9c94c9c7-16ea-43e8-8ebe-0de282c0e6d5'
secret_passphrase = 'secret passphrase'
mock_key = b'\xae9Eso\xd4\x98\x04>\xc3\x05n\x0f\x03\x96\xa3' + \
b'\xc3Z;\x9c\x11&oYY\x00\x13\xae\xf4>\x83\x82'
class WhenTestingVaultSecretStore(utils.BaseTestCase):
@ -39,7 +42,7 @@ class WhenTestingVaultSecretStore(utils.BaseTestCase):
self.key_manager_mock.create_key.return_value = key_ref1
self.key_manager_mock.store.return_value = key_ref1
secret_object = opaque_data.OpaqueData(secret_passphrase)
secret_object = opaque_data.OpaqueData(mock_key)
self.key_manager_mock.get.return_value = secret_object
self.cfg_mock = mock.MagicMock(name='config mock')
@ -52,6 +55,21 @@ class WhenTestingVaultSecretStore(utils.BaseTestCase):
self.plugin.key_manager = self.key_manager_mock
self.plugin_name = "VaultSecretStore"
def test_meta_dict(self):
key_id = 'SOME_KEY_UUID'
meta = self.plugin._meta_dict(key_id)
self.assertNotIn(css.CastellanSecretStore.BIT_LENGTH, meta)
self.assertNotIn(css.CastellanSecretStore.ALG, meta)
self.assertEqual(key_id, meta[css.CastellanSecretStore.KEY_ID])
meta = self.plugin._meta_dict(key_id, bit_length=128)
self.assertEqual(128, meta[css.CastellanSecretStore.BIT_LENGTH])
meta = self.plugin._meta_dict(key_id, algorithm='AES')
self.assertEqual('AES', meta[css.CastellanSecretStore.ALG])
self.assertEqual(1, meta[css.CastellanSecretStore.METADATA_VERSION])
def test_generate_symmetric_key(self):
key_spec = ss.KeySpec(ss.KeyAlgorithm.AES, 128)
response = self.plugin.generate_symmetric_key(key_spec)
@ -62,7 +80,10 @@ class WhenTestingVaultSecretStore(utils.BaseTestCase):
128
)
expected_response = {css.CastellanSecretStore.KEY_ID: key_ref1}
expected_response = {
css.CastellanSecretStore.KEY_ID: key_ref1,
css.CastellanSecretStore.METADATA_VERSION:
css.CastellanSecretStore.CURRENT_VERSION}
self.assertEqual(response, expected_response)
def test_generate_symmetric_key_raises_exception(self):
@ -118,32 +139,32 @@ class WhenTestingVaultSecretStore(utils.BaseTestCase):
)
def test_store_secret(self):
payload = 'encrypt me!!'
payload = b'encrypt me!!'
key_spec = mock.MagicMock()
content_type = mock.MagicMock()
transport_key = None
secret_dto = ss.SecretDTO(ss.SecretType.SYMMETRIC,
payload,
base64.b64encode(payload),
key_spec,
content_type,
transport_key)
response = self.plugin.store_secret(secret_dto)
data = opaque_data.OpaqueData(secret_dto.secret)
data = opaque_data.OpaqueData(payload)
self.plugin.key_manager.store.assert_called_once_with(
mock.ANY,
data
)
expected_response = {css.CastellanSecretStore.KEY_ID: key_ref1}
expected_response = self.plugin._meta_dict(key_ref1)
self.assertEqual(response, expected_response)
def test_store_secret_raises_exception(self):
payload = 'encrypt me!!'
payload = b'encrypt me!!'
key_spec = mock.MagicMock()
content_type = mock.MagicMock()
transport_key = None
secret_dto = ss.SecretDTO(ss.SecretType.SYMMETRIC,
payload,
base64.b64encode(payload),
key_spec,
content_type,
transport_key)
@ -156,10 +177,8 @@ class WhenTestingVaultSecretStore(utils.BaseTestCase):
)
def test_get_secret(self):
secret_metadata = {
css.CastellanSecretStore.KEY_ID: key_ref1,
"content_type": "application/octet-stream"
}
secret_metadata = self.plugin._meta_dict(key_ref1, 256, 'AES')
response = self.plugin.get_secret(
ss.SecretType.SYMMETRIC,
secret_metadata
@ -167,15 +186,17 @@ class WhenTestingVaultSecretStore(utils.BaseTestCase):
self.assertIsInstance(response, ss.SecretDTO)
plaintext = base64.b64decode(response.secret)
self.assertEqual(ss.SecretType.SYMMETRIC, response.type)
self.assertEqual(secret_passphrase, response.secret)
self.assertEqual(mock_key, plaintext)
self.plugin.key_manager.get.assert_called_once_with(
mock.ANY,
key_ref1
)
def test_get_secret_throws_exception(self):
secret_metadata = {css.CastellanSecretStore.KEY_ID: key_ref1}
secret_metadata = self.plugin._meta_dict(key_ref1, 256, 'AES')
self.plugin.key_manager.get.side_effect = exception.Forbidden()
self.assertRaises(
ss.SecretGeneralException,

View File

@ -186,6 +186,25 @@ class OrdersTestCase(base.TestCase):
secret_ref, payload_content_type="text/plain")
self.assertEqual(406, secret_resp.status_code)
@testcase.attr('positive')
def test_order_create_check_secret_payload_positive(self):
"""Create order and check the secret payload.
Check the secret payload with correct payload_content_type.
"""
test_model = order_models.OrderModel(**self.create_default_data)
resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, resp.status_code)
order_resp = self.behaviors.get_order(order_ref)
self.assertEqual(200, order_resp.status_code)
# PENDING orders may take a moment to be processed by the workers
# when running tests with queue enabled
self.wait_for_order(order_resp, order_ref)
secret_ref = order_resp.model.secret_ref
secret_resp = self.secret_behaviors.get_secret(
secret_ref, payload_content_type="application/octet-stream")
self.assertEqual(200, secret_resp.status_code)
@testcase.attr('positive')
def test_order_and_secret_metadata_same(self):
"""Checks that metadata from secret GET and order GET are the same.

View File

@ -0,0 +1,6 @@
---
fixes:
- |
Fixed Story 2008335: Fixed a data encoding issue in the Hashicorp Vault
backend that was causing errors when retrieving keys that were generated
by the Vault Key Manager in Castellan.