Merge "crypto: Verify on-disk data" into feature/crypto

This commit is contained in:
Jenkins 2016-05-31 12:46:55 +00:00 committed by Gerrit Code Review
commit 4728e3e8d3
2 changed files with 71 additions and 104 deletions

View File

@ -92,8 +92,9 @@ class EncInputWrapper(object):
plaintext_etag = None
if self.body_crypto_ctxt:
plaintext_etag = self.plaintext_md5.hexdigest()
# Encrypt the plaintext etag using the object key and persist
# as sysmeta along with the crypto parameters that were used.
# Encrypt the plaintext etag using the container key and
# persist as sysmeta along with the crypto parameters
# that were used.
val, etag_crypto_meta = encrypt_header_val(
self.crypto, plaintext_etag,
self.keys['container'], iv_base=self.path)

View File

@ -14,20 +14,17 @@
# limitations under the License.
import base64
import json
import mock
import os
import unittest
import urllib
import uuid
from swift.common.middleware import encrypter, decrypter, keymaster
from swift.common.swob import Request, HTTPCreated, HTTPAccepted
from swift.common.crypto_utils import CRYPTO_KEY_CALLBACK
from swift.common.middleware import encrypter, decrypter, keymaster, crypto
from swift.common.swob import Request
from swift.common.crypto_utils import load_crypto_meta
from test.unit.common.middleware.crypto_helpers import fetch_crypto_keys, \
md5hex, encrypt
from test.unit.common.middleware.helpers import FakeSwift
from test.unit.common.middleware.crypto_helpers import md5hex, encrypt
from test.unit.helpers import setup_servers, teardown_servers
from swift.obj import diskfile
from test.unit import FakeLogger
class TestCryptoPipelineChanges(unittest.TestCase):
@ -48,7 +45,9 @@ class TestCryptoPipelineChanges(unittest.TestCase):
cls._test_context = None
def setUp(self):
self.container_path = 'http://localhost:8080/v1/a/' + uuid.uuid4().hex
self.container_name = uuid.uuid4().hex
self.container_path = 'http://localhost:8080/v1/a/' + \
self.container_name
self.object_path = self.container_path + '/o'
self.plaintext = 'unencrypted body content'
self.plaintext_etag = md5hex(self.plaintext)
@ -58,8 +57,9 @@ class TestCryptoPipelineChanges(unittest.TestCase):
# via the crypto middleware. Make a fresh instance for each test to
# avoid any state coupling.
enc = encrypter.Encrypter(self.proxy_app, {})
km = keymaster.KeyMaster(enc, {'encryption_root_secret': 's3cr3t'})
self.crypto_app = decrypter.Decrypter(km, {})
self.km = keymaster.KeyMaster(enc,
{'encryption_root_secret': 's3cr3t'})
self.crypto_app = decrypter.Decrypter(self.km, {})
def _create_container(self, app, policy_name='one'):
req = Request.blank(
@ -279,6 +279,62 @@ class TestCryptoPipelineChanges(unittest.TestCase):
self._check_match_requests('GET', crypto_app)
self._check_match_requests('HEAD', crypto_app)
def test_ondisk_data_after_write_with_crypto(self):
self._create_container(self.proxy_app, policy_name='one')
self._put_object(self.crypto_app, self.plaintext)
self._post_object(self.crypto_app)
ring_object = self.proxy_app.get_object_ring(1)
partition, nodes = ring_object.get_nodes('a', self.container_name, 'o')
policy = self._test_context["test_POLICIES"][1]
conf = {'devices': self._test_context["testdir"],
'mount_check': 'false'}
df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[policy]
for node_index, node in enumerate(nodes):
df = df_mgr.get_diskfile(node['device'], partition,
'a', self.container_name, 'o',
policy=policy)
with df.open():
meta = df.get_metadata()
contents = ''.join(df.reader())
metadata = dict((k.lower(), v) for k, v in meta.items())
# verify on disk data - body
body_iv = load_crypto_meta(
metadata['x-object-sysmeta-crypto-meta'])['iv']
wrapped_body_key = load_crypto_meta(
metadata['x-object-sysmeta-crypto-meta'])['key']
obj_key = self.km.create_key('/a/%s/o' % self.container_name)
body_key = crypto.Crypto({}).unwrap_key(
obj_key, wrapped_body_key, body_iv)
exp_enc_body = encrypt(self.plaintext, body_key, body_iv)
self.assertEqual(exp_enc_body, contents)
# verify on disk user metadata
metadata_iv = load_crypto_meta(
metadata['x-object-transient-sysmeta-crypto-meta-fruit']
)['iv']
exp_enc_meta = base64.b64encode(encrypt('Kiwi', obj_key,
metadata_iv))
self.assertEqual(exp_enc_meta, metadata['x-object-meta-fruit'])
# verify etag
etag_iv = load_crypto_meta(
metadata['x-object-sysmeta-crypto-meta-etag'])['iv']
etag_key = self.km.create_key('/a/%s' % self.container_name)
exp_enc_etag = base64.b64encode(encrypt(self.plaintext_etag,
etag_key, etag_iv))
self.assertEqual(exp_enc_etag,
metadata['x-object-sysmeta-crypto-etag'])
# verify etag override for container updates
override = 'x-object-sysmeta-container-update-override-etag'
parts = metadata[override].rsplit(';', 1)
crypto_meta_param = parts[1].strip()
crypto_meta = crypto_meta_param[len('swift_meta='):]
listing_etag_iv = load_crypto_meta(crypto_meta)['iv']
exp_enc_listing_etag = base64.b64encode(
encrypt(self.plaintext_etag, etag_key,
listing_etag_iv))
self.assertEqual(exp_enc_listing_etag, parts[0])
self._check_GET_and_HEAD(self.crypto_app)
class TestCryptoPipelineChangesFastPost(TestCryptoPipelineChanges):
@classmethod
@ -289,95 +345,5 @@ class TestCryptoPipelineChangesFastPost(TestCryptoPipelineChanges):
cls.proxy_app = cls._test_context["test_servers"][0]
class TestEncrypterDecrypter(unittest.TestCase):
"""
Unit tests to verify round-trip encryption followed by decryption.
These tests serve to complement the separate unit tests for encrypter and
decrypter, which test each in isolation.
However, the real Crypto implementation is used.
"""
def test_basic_put_get_req(self):
# pass the PUT request through the encrypter.
body_key = os.urandom(32)
body = 'FAKE APP'
env = {'REQUEST_METHOD': 'PUT',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
hdrs = {'content-type': 'text/plain',
'content-length': str(len(body)),
'x-object-meta-test': 'encrypt me',
'x-object-sysmeta-test': 'do not encrypt me'}
req = Request.blank(
'/v1/a/c/o', environ=env, body=body, headers=hdrs)
app = FakeSwift()
app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
with mock.patch(
'swift.common.middleware.crypto.Crypto.create_random_key',
return_value=body_key):
req.get_response(encrypter.Encrypter(app, {}))
# Verify that at least the request body was indeed encrypted.
# Otherwise, checking that input matches output after decryption is
# not sufficient if encryption and decryption were both just the
# identity function! (i.e. if they did nothing)
encrypt_get_req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'GET'})
encrypt_get_resp = encrypt_get_req.get_response(app)
crypto_meta = json.loads(urllib.unquote_plus(
encrypt_get_resp.headers['X-Object-Sysmeta-Crypto-Meta']))
crypto_meta['iv'] = base64.b64decode(crypto_meta['iv'])
exp_enc_body = encrypt(
body, body_key, crypto_meta['iv'])
self.assertEqual(exp_enc_body, encrypt_get_resp.body)
self.assertNotEqual(body, encrypt_get_resp.body) # sanity check
decrypt_env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
decrypt_req = Request.blank('/v1/a/c/o', environ=decrypt_env)
decrypt_resp = decrypt_req.get_response(
decrypter.Decrypter(app, {}))
self.assertEqual(body, decrypt_resp.body)
self.assertEqual('200 OK', decrypt_resp.status)
self.assertEqual('text/plain', decrypt_resp.headers['Content-Type'])
self.assertEqual(md5hex(body), decrypt_resp.headers['Etag'])
self.assertEqual('encrypt me',
decrypt_resp.headers['x-object-meta-test'])
self.assertEqual('do not encrypt me',
decrypt_resp.headers['x-object-sysmeta-test'])
# do a POST update to verify updated metadata is encrypted
env = {'REQUEST_METHOD': 'POST',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
hdrs = {'x-object-meta-test': 'encrypt me is updated'}
req = Request.blank('/v1/a/c/o', environ=env, headers=hdrs)
app.register('POST', '/v1/a/c/o', HTTPAccepted, {})
req.get_response(encrypter.Encrypter(app, {}))
# verify the metadata header was indeed encrypted by doing a GET
# direct to the app
encrypt_get_resp = encrypt_get_req.get_response(app)
crypto_meta = json.loads(urllib.unquote_plus(
encrypt_get_resp.headers[
'X-Object-Transient-Sysmeta-Crypto-Meta-Test']))
crypto_meta['iv'] = base64.b64decode(crypto_meta['iv'])
exp_header_value = base64.b64encode(encrypt(
'encrypt me is updated', fetch_crypto_keys()['object'],
crypto_meta['iv']))
self.assertEqual(exp_header_value,
encrypt_get_resp.headers['x-object-meta-test'])
# do a GET to verify the updated metadata is decrypted
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
resp_dec = req.get_response(
decrypter.Decrypter(app, {}))
self.assertEqual('encrypt me is updated',
resp_dec.headers['x-object-meta-test'])
if __name__ == '__main__':
unittest.main()