Allow multiple keymasters

This allows the migration from one key provider to another.

Note that secret_id values must remain unique across all keymasters
in a given pipeline. If they are not unique, the right-most keymaster
will take precedence.

When looking for the active root secret, only the right-most keymaster
is used.

Change-Id: I6b5c812a54624f56c55164556385e3e475fb2470
This commit is contained in:
Tim Burke 2018-07-29 01:38:43 +00:00
parent 00f7732193
commit b39d2efdab
3 changed files with 150 additions and 34 deletions

View File

@ -298,12 +298,21 @@ class DecrypterObjContext(BaseDecrypterContext):
def handle(self, req, start_response):
app_resp = self._app_call(req.environ)
put_crypto_meta = self._read_crypto_meta(
'X-Object-Sysmeta-Crypto-Body-Meta', True)
put_keys = self.get_decryption_keys(req, put_crypto_meta)
post_crypto_meta = self._read_crypto_meta(
'X-Object-Transient-Sysmeta-Crypto-Meta', False)
post_keys = self.get_decryption_keys(req, post_crypto_meta)
try:
put_crypto_meta = self._read_crypto_meta(
'X-Object-Sysmeta-Crypto-Body-Meta', True)
put_keys = self.get_decryption_keys(req, put_crypto_meta)
post_crypto_meta = self._read_crypto_meta(
'X-Object-Transient-Sysmeta-Crypto-Meta', False)
post_keys = self.get_decryption_keys(req, post_crypto_meta)
except EncryptionException as err:
self.logger.error(
"Error decrypting object: %s",
err)
raise HTTPInternalServerError(
body='Error decrypting object',
content_type='text/plain')
if put_keys is None and post_keys is None:
# skip decryption
start_response(self._response_status, self._response_headers,

View File

@ -46,6 +46,7 @@ class KeyMasterContext(WSGIContext):
self.container = container
self.obj = obj
self._keys = {}
self.alternate_fetch_keys = None
def _make_key_id(self, path, secret_id):
key_id = {'v': '1', 'path': path}
@ -82,39 +83,51 @@ class KeyMasterContext(WSGIContext):
keys = {}
account_path = os.path.join(os.sep, self.account)
if self.container:
path = os.path.join(account_path, self.container)
keys['container'] = self.keymaster.create_key(
path, secret_id=secret_id)
if self.obj:
path = os.path.join(path, self.obj)
keys['object'] = self.keymaster.create_key(
try:
if self.container:
path = os.path.join(account_path, self.container)
keys['container'] = self.keymaster.create_key(
path, secret_id=secret_id)
# For future-proofing include a keymaster version number and the
# path used to derive keys in the 'id' entry of the results. The
# encrypter will persist this as part of the crypto-meta for
# encrypted data and metadata. If we ever change the way keys are
# generated then the decrypter could pass the persisted 'id' value
# when it calls fetch_crypto_keys to inform the keymaster as to how
# that particular data or metadata had its keys generated.
# Currently we have no need to do that, so we are simply persisting
# this information for future use.
keys['id'] = self._make_key_id(path, secret_id)
# pass back a list of key id dicts for all other secret ids in case
# the caller is interested, in which case the caller can call this
# method again for different secret ids; this avoided changing the
# return type of the callback or adding another callback. Note that
# the caller should assume no knowledge of the content of these key
# id dicts.
keys['all_ids'] = [self._make_key_id(path, id_)
for id_ in self.keymaster.root_secret_ids]
self._keys[secret_id] = keys
if self.obj:
path = os.path.join(path, self.obj)
keys['object'] = self.keymaster.create_key(
path, secret_id=secret_id)
return keys
# For future-proofing include a keymaster version number and
# the path used to derive keys in the 'id' entry of the
# results. The encrypter will persist this as part of the
# crypto-meta for encrypted data and metadata. If we ever
# change the way keys are generated then the decrypter could
# pass the persisted 'id' value when it calls fetch_crypto_keys
# to inform the keymaster as to how that particular data or
# metadata had its keys generated. Currently we have no need to
# do that, so we are simply persisting this information for
# future use.
keys['id'] = self._make_key_id(path, secret_id)
# pass back a list of key id dicts for all other secret ids in
# case the caller is interested, in which case the caller can
# call this method again for different secret ids; this avoided
# changing the return type of the callback or adding another
# callback. Note that the caller should assume no knowledge of
# the content of these key id dicts.
keys['all_ids'] = [self._make_key_id(path, id_)
for id_ in self.keymaster.root_secret_ids]
if self.alternate_fetch_keys:
alternate_keys = self.alternate_fetch_keys(
key_id=None, *args, **kwargs)
keys['all_ids'].extend(alternate_keys.get('all_ids', []))
self._keys[secret_id] = keys
return keys
except UnknownSecretIdError:
if self.alternate_fetch_keys:
return self.alternate_fetch_keys(key_id, *args, **kwargs)
raise
def handle_request(self, req, start_response):
self.alternate_fetch_keys = req.environ.get(CRYPTO_KEY_CALLBACK)
req.environ[CRYPTO_KEY_CALLBACK] = self.fetch_crypto_keys
resp = self._app_call(req.environ)
start_response(self._response_status, self._response_headers,

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import copy
import hashlib
import hmac
@ -176,6 +177,99 @@ class TestKeymaster(unittest.TestCase):
self.assertEqual(secrets, app._root_secrets)
self.assertEqual([None, '22', 'my_secret_id'], app.root_secret_ids)
def test_chained_keymasters(self):
conf_inner = {'active_root_secret_id': '22'}
conf_inner.update(
('encryption_root_secret_%s' % secret_id, base64.b64encode(secret))
for secret_id, secret in [('22', os.urandom(33)),
('my_secret_id', os.urandom(50))])
conf_outer = {'encryption_root_secret': base64.b64encode(
os.urandom(32))}
app = keymaster.KeyMaster(
keymaster.KeyMaster(self.swift, conf_inner),
conf_outer)
self.swift.register('GET', '/v1/a/c', swob.HTTPOk, {}, b'')
req = Request.blank('/v1/a/c')
start_response, calls = capture_start_response()
app(req.environ, start_response)
self.assertEqual(1, len(calls))
self.assertNotIn('swift.crypto.override', req.environ)
self.assertIn(CRYPTO_KEY_CALLBACK, req.environ,
'%s not set in env' % CRYPTO_KEY_CALLBACK)
keys = copy.deepcopy(req.environ[CRYPTO_KEY_CALLBACK](key_id=None))
self.assertIn('id', keys)
self.assertEqual(keys.pop('id'), {
'v': '1',
'path': '/a/c',
'secret_id': '22',
})
# Inner-most active root secret wins
root_key = base64.b64decode(conf_inner['encryption_root_secret_22'])
self.assertIn('container', keys)
self.assertEqual(keys.pop('container'),
hmac.new(root_key, '/a/c',
digestmod=hashlib.sha256).digest())
self.assertIn('all_ids', keys)
all_keys = set()
at_least_one_old_style_id = False
for key_id in keys.pop('all_ids'):
# Can get key material for each key_id
all_keys.add(req.environ[CRYPTO_KEY_CALLBACK](
key_id=key_id)['container'])
if 'secret_id' in key_id:
self.assertIn(key_id.pop('secret_id'), {'22', 'my_secret_id'})
else:
at_least_one_old_style_id = True
self.assertEqual(key_id, {
'path': '/a/c',
'v': '1',
})
self.assertTrue(at_least_one_old_style_id)
self.assertEqual(len(all_keys), 3)
self.assertFalse(keys)
# Also all works for objects
self.swift.register('GET', '/v1/a/c/o', swob.HTTPOk, {}, b'')
req = Request.blank('/v1/a/c/o')
start_response, calls = capture_start_response()
app(req.environ, start_response)
self.assertEqual(1, len(calls))
self.assertNotIn('swift.crypto.override', req.environ)
self.assertIn(CRYPTO_KEY_CALLBACK, req.environ,
'%s not set in env' % CRYPTO_KEY_CALLBACK)
keys = req.environ.get(CRYPTO_KEY_CALLBACK)(key_id=None)
self.assertIn('id', keys)
self.assertEqual(keys.pop('id'), {
'v': '1',
'path': '/a/c/o',
'secret_id': '22',
})
root_key = base64.b64decode(conf_inner['encryption_root_secret_22'])
self.assertIn('container', keys)
self.assertEqual(keys.pop('container'),
hmac.new(root_key, '/a/c',
digestmod=hashlib.sha256).digest())
self.assertIn('object', keys)
self.assertEqual(keys.pop('object'),
hmac.new(root_key, '/a/c/o',
digestmod=hashlib.sha256).digest())
self.assertIn('all_ids', keys)
at_least_one_old_style_id = False
for key_id in keys.pop('all_ids'):
if 'secret_id' not in key_id:
at_least_one_old_style_id = True
else:
self.assertIn(key_id.pop('secret_id'), {'22', 'my_secret_id'})
self.assertEqual(key_id, {
'path': '/a/c/o',
'v': '1',
})
self.assertTrue(at_least_one_old_style_id)
self.assertEqual(len(all_keys), 3)
self.assertFalse(keys)
def test_multiple_root_secrets_with_invalid_secret(self):
conf = {'encryption_root_secret': base64.b64encode(os.urandom(32)),
# too short...