swift/test/unit/common/middleware/test_decrypter.py

1140 lines
52 KiB
Python

# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import os
import unittest
from xml.dom import minidom
import mock
from swift.common.middleware import decrypter
from swift.common.middleware.crypto_utils import CRYPTO_KEY_CALLBACK, \
dump_crypto_meta, Crypto
from swift.common.swob import Request, HTTPException, HTTPOk, \
HTTPPreconditionFailed, HTTPNotFound, HTTPPartialContent
from test.unit import FakeLogger
from test.unit.common.middleware.crypto_helpers import md5hex, \
fetch_crypto_keys, FAKE_IV, encrypt, fake_get_crypto_meta
from test.unit.common.middleware.helpers import FakeSwift, FakeAppThatExcepts
def get_crypto_meta_header(crypto_meta=None):
if crypto_meta is None:
crypto_meta = fake_get_crypto_meta()
return dump_crypto_meta(crypto_meta)
def encrypt_and_append_meta(value, key, crypto_meta=None):
return '%s; swift_meta=%s' % (
base64.b64encode(encrypt(value, key, FAKE_IV)),
get_crypto_meta_header(crypto_meta))
class TestDecrypterObjectRequests(unittest.TestCase):
def setUp(self):
self.app = FakeSwift()
self.decrypter = decrypter.Decrypter(self.app, {})
self.decrypter.logger = FakeLogger()
def test_GET_success(self):
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
plaintext_etag = md5hex(body)
cont_key = fetch_crypto_keys()['container']
object_key = fetch_crypto_keys()['object']
body_key = os.urandom(32)
enc_body = encrypt(body, body_key, FAKE_IV)
body_key_meta = {'key': encrypt(body_key, object_key, FAKE_IV),
'iv': FAKE_IV}
body_crypto_meta = fake_get_crypto_meta(body_key=body_key_meta)
hdrs = {
'Etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': len(enc_body),
'X-Object-Sysmeta-Crypto-Etag':
base64.b64encode(encrypt(plaintext_etag, cont_key, FAKE_IV)),
'X-Object-Sysmeta-Crypto-Meta-Etag': get_crypto_meta_header(),
'X-Object-Sysmeta-Crypto-Meta':
get_crypto_meta_header(body_crypto_meta),
'x-object-meta-test':
base64.b64encode(encrypt('encrypt me', object_key, FAKE_IV)),
'x-object-transient-sysmeta-crypto-meta-test':
get_crypto_meta_header(),
'x-object-sysmeta-container-update-override-etag':
encrypt_and_append_meta('encrypt me, too', cont_key),
'x-object-sysmeta-test': 'do not encrypt me'}
self.app.register(
'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual(body, resp.body)
self.assertEqual('200 OK', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
self.assertEqual('text/plain', resp.headers['Content-Type'])
self.assertEqual('encrypt me', resp.headers['x-object-meta-test'])
self.assertEqual('do not encrypt me',
resp.headers['x-object-sysmeta-test'])
self.assertEqual(
'encrypt me, too',
resp.headers['X-Object-Sysmeta-Container-Update-Override-Etag'])
def _test_412_response(self, method):
# simulate a 412 response to a conditional GET which has an Etag header
data = 'the object content'
env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env, method=method)
resp_body = 'I am sorry, you have failed to meet a precondition'
key = fetch_crypto_keys()['object']
cont_key = fetch_crypto_keys()['container']
hdrs = {'Etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': len(resp_body),
'X-Object-Sysmeta-Crypto-Etag':
base64.b64encode(encrypt(md5hex(data), cont_key, FAKE_IV)),
'X-Object-Sysmeta-Crypto-Meta-Etag': get_crypto_meta_header(),
'X-Object-Sysmeta-Crypto-Meta': get_crypto_meta_header(),
'x-object-meta-test':
base64.b64encode(encrypt('encrypt me', key, FAKE_IV)),
'x-object-transient-sysmeta-crypto-meta-test':
get_crypto_meta_header(),
'x-object-sysmeta-test': 'do not encrypt me'}
self.app.register(method, '/v1/a/c/o', HTTPPreconditionFailed,
body=resp_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('412 Precondition Failed', resp.status)
# the response body should not be decrypted, it is already plaintext
self.assertEqual(resp_body if method == 'GET' else '', resp.body)
# whereas the Etag and other headers should be decrypted
self.assertEqual(md5hex(data), resp.headers['Etag'])
self.assertEqual('text/plain', resp.headers['Content-Type'])
self.assertEqual('encrypt me', resp.headers['x-object-meta-test'])
self.assertEqual('do not encrypt me',
resp.headers['x-object-sysmeta-test'])
def test_GET_412_response(self):
self._test_412_response('GET')
def test_HEAD_412_response(self):
self._test_412_response('HEAD')
def _test_404_response(self, method):
# simulate a 404 response, sanity check response headers
env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env, method=method)
resp_body = 'You still have not found what you are looking for'
hdrs = {'content-type': 'text/plain',
'content-length': len(resp_body)}
self.app.register(method, '/v1/a/c/o', HTTPNotFound,
body=resp_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('404 Not Found', resp.status)
# the response body should not be decrypted, it is already plaintext
self.assertEqual(resp_body if method == 'GET' else '', resp.body)
# there should be no etag header inserted by decrypter
self.assertNotIn('Etag', resp.headers)
self.assertEqual('text/plain', resp.headers['Content-Type'])
def test_GET_404_response(self):
self._test_404_response('GET')
def test_HEAD_404_response(self):
self._test_404_response('HEAD')
def test_GET_missing_etag_crypto_meta(self):
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
key = fetch_crypto_keys()['object']
enc_body = encrypt(body, key, FAKE_IV)
hdrs = {'Etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': len(enc_body),
'X-Object-Sysmeta-Crypto-Etag':
base64.b64encode(encrypt(md5hex(body), key, FAKE_IV)),
'X-Object-Sysmeta-Crypto-Meta': get_crypto_meta_header()}
self.app.register('GET', '/v1/a/c/o', HTTPOk, body=enc_body,
headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertIn('Error decrypting header', resp.body)
self.assertIn('Error decrypting header X-Object-Sysmeta-Crypto-Etag',
self.decrypter.logger.get_lines_for_level('error')[0])
def _test_bad_key(self, method):
# use bad key
def bad_fetch_crypto_keys():
keys = fetch_crypto_keys()
keys['object'] = 'bad key'
return keys
env = {'REQUEST_METHOD': method,
CRYPTO_KEY_CALLBACK: bad_fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
key = fetch_crypto_keys()['object']
enc_body = encrypt(body, key, FAKE_IV)
hdrs = {'Etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': len(enc_body),
'X-Object-Sysmeta-Crypto-Etag': md5hex(body),
'X-Object-Sysmeta-Crypto-Meta-Etag': get_crypto_meta_header(),
'X-Object-Sysmeta-Crypto-Meta': get_crypto_meta_header(),
'x-object-meta-test':
base64.b64encode(encrypt('encrypt me', key, FAKE_IV)),
'x-object-transient-sysmeta-crypto-meta-test':
get_crypto_meta_header()}
self.app.register(method, '/v1/a/c/o', HTTPOk, body=enc_body,
headers=hdrs)
return req.get_response(self.decrypter)
def test_HEAD_with_bad_key(self):
resp = self._test_bad_key('HEAD')
self.assertEqual('500 Internal Error', resp.status)
self.assertIn("Bad key for 'object'",
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_with_bad_key(self):
resp = self._test_bad_key('GET')
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Unable to retrieve encryption keys.',
resp.body)
self.assertIn("Bad key for 'object'",
self.decrypter.logger.get_lines_for_level('error')[0])
def _test_bad_iv_for_user_metadata(self, method, bad_crypto_meta):
# use bad iv for metadata headers
env = {'REQUEST_METHOD': method,
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
key = fetch_crypto_keys()['object']
enc_body = encrypt(body, key, FAKE_IV)
hdrs = {'Etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': len(enc_body),
'X-Object-Sysmeta-Crypto-Etag': md5hex(body),
'X-Object-Sysmeta-Crypto-Meta-Etag': get_crypto_meta_header(),
'X-Object-Sysmeta-Crypto-Meta': get_crypto_meta_header(),
'x-object-meta-test':
base64.b64encode(encrypt('encrypt me', key, FAKE_IV)),
'x-object-transient-sysmeta-crypto-meta-test':
get_crypto_meta_header(crypto_meta=bad_crypto_meta)}
self.app.register(method, '/v1/a/c/o', HTTPOk, body=enc_body,
headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertIn('Error decrypting header X-Object-Meta-Test',
self.decrypter.logger.get_lines_for_level('error')[0])
return resp
def test_HEAD_with_bad_iv_for_user_metadata(self):
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['iv'] = 'bad_iv'
self._test_bad_iv_for_user_metadata('HEAD', bad_crypto_meta)
self.assertIn('IV must be length 16',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_HEAD_with_missing_iv_for_user_metadata(self):
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta.pop('iv')
self._test_bad_iv_for_user_metadata('HEAD', bad_crypto_meta)
self.assertIn(
'iv', self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_with_bad_iv_for_user_metadata(self):
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['iv'] = 'bad_iv'
resp = self._test_bad_iv_for_user_metadata('GET', bad_crypto_meta)
self.assertEqual('Error decrypting header', resp.body)
self.assertIn('IV must be length 16',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_with_missing_iv_for_user_metadata(self):
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta.pop('iv')
resp = self._test_bad_iv_for_user_metadata('GET', bad_crypto_meta)
self.assertEqual('Error decrypting header', resp.body)
self.assertIn(
'iv', self.decrypter.logger.get_lines_for_level('error')[0])
def _test_GET_with_bad_crypto_meta_for_object_body(self, bad_crypto_meta):
# use bad iv for object body
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
key = fetch_crypto_keys()['object']
enc_body = encrypt(body, key, FAKE_IV)
hdrs = {'Etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': len(enc_body),
'X-Object-Sysmeta-Crypto-Etag': md5hex(body),
'X-Object-Sysmeta-Crypto-Meta-Etag': get_crypto_meta_header(),
'X-Object-Sysmeta-Crypto-Meta':
get_crypto_meta_header(crypto_meta=bad_crypto_meta)}
self.app.register('GET', '/v1/a/c/o', HTTPOk, body=enc_body,
headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Error decrypting object', resp.body)
self.assertIn('Error decrypting object',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_with_bad_iv_for_object_body(self):
bad_crypto_meta = fake_get_crypto_meta(key=os.urandom(32))
bad_crypto_meta['iv'] = 'bad_iv'
self._test_GET_with_bad_crypto_meta_for_object_body(bad_crypto_meta)
self.assertIn('IV must be length 16',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_with_missing_iv_for_object_body(self):
bad_crypto_meta = fake_get_crypto_meta(key=os.urandom(32))
bad_crypto_meta.pop('iv')
self._test_GET_with_bad_crypto_meta_for_object_body(bad_crypto_meta)
self.assertIn("Missing 'iv'",
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_with_bad_body_key_for_object_body(self):
body_key_meta = {'key': 'wrapped too short key', 'iv': FAKE_IV}
bad_crypto_meta = fake_get_crypto_meta(body_key=body_key_meta)
self._test_GET_with_bad_crypto_meta_for_object_body(bad_crypto_meta)
self.assertIn('Key must be length 32',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_with_missing_body_key_for_object_body(self):
bad_crypto_meta = fake_get_crypto_meta() # no key by default
self._test_GET_with_bad_crypto_meta_for_object_body(bad_crypto_meta)
self.assertIn("Missing 'body_key'",
self.decrypter.logger.get_lines_for_level('error')[0])
def test_HEAD_success(self):
env = {'REQUEST_METHOD': 'HEAD',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
key = fetch_crypto_keys()['object']
cont_key = fetch_crypto_keys()['container']
enc_body = encrypt(body, key, FAKE_IV)
hdrs = {'Etag': 'hashOfCiphertext',
'etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': len(enc_body),
'X-Object-Sysmeta-Crypto-Etag':
base64.b64encode(encrypt(md5hex(body), cont_key, FAKE_IV)),
'X-Object-Sysmeta-Crypto-Meta-Etag': get_crypto_meta_header(),
'X-Object-Sysmeta-Crypto-Meta': get_crypto_meta_header(),
'x-object-meta-test':
base64.b64encode(encrypt('encrypt me', key, FAKE_IV)),
'x-object-transient-sysmeta-crypto-meta-test':
get_crypto_meta_header(),
'x-object-sysmeta-test': 'do not encrypt me'}
self.app.register(
'HEAD', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('200 OK', resp.status)
self.assertEqual(md5hex(body), resp.headers['Etag'])
self.assertEqual('text/plain', resp.headers['Content-Type'])
self.assertEqual('encrypt me', resp.headers['x-object-meta-test'])
self.assertEqual('do not encrypt me',
resp.headers['x-object-sysmeta-test'])
def _test_req_content_type_not_encrypted(self, method):
# check that content_type is not decrypted if it does not have crypto
# meta (testing for future cases where content_type may be updated
# as part of an unencrypted POST).
env = {'REQUEST_METHOD': method,
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
plaintext_etag = md5hex(body)
cont_key = fetch_crypto_keys()['container']
object_key = fetch_crypto_keys()['object']
body_key = os.urandom(32)
enc_body = encrypt(body, body_key, FAKE_IV)
body_key_meta = {'key': encrypt(body_key, object_key, FAKE_IV),
'iv': FAKE_IV}
body_crypto_meta = fake_get_crypto_meta(body_key=body_key_meta)
hdrs = {
'Etag': 'hashOfCiphertext',
'etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': len(enc_body),
'X-Object-Sysmeta-Crypto-Etag':
base64.b64encode(encrypt(plaintext_etag, cont_key, FAKE_IV)),
'X-Object-Sysmeta-Crypto-Meta-Etag': get_crypto_meta_header(),
'X-Object-Sysmeta-Crypto-Meta':
get_crypto_meta_header(body_crypto_meta)}
self.app.register(
method, '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('200 OK', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
self.assertEqual('text/plain', resp.headers['Content-Type'])
def test_HEAD_content_type_not_encrypted(self):
self._test_req_content_type_not_encrypted('HEAD')
def test_GET_content_type_not_encrypted(self):
self._test_req_content_type_not_encrypted('GET')
def _test_req_metadata_not_encrypted(self, method):
# check that metadata is not decrypted if it does not have crypto meta;
# testing for case of an unencrypted POST to an object.
env = {'REQUEST_METHOD': method,
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
plaintext_etag = md5hex(body)
cont_key = fetch_crypto_keys()['container']
object_key = fetch_crypto_keys()['object']
body_key = os.urandom(32)
enc_body = encrypt(body, body_key, FAKE_IV)
body_key_meta = {'key': encrypt(body_key, object_key, FAKE_IV),
'iv': FAKE_IV}
body_crypto_meta = fake_get_crypto_meta(body_key=body_key_meta)
hdrs = {
'Etag': 'hashOfCiphertext',
'etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': len(enc_body),
'X-Object-Sysmeta-Crypto-Etag':
base64.b64encode(encrypt(plaintext_etag, cont_key, FAKE_IV)),
'X-Object-Sysmeta-Crypto-Meta-Etag': get_crypto_meta_header(),
'X-Object-Sysmeta-Crypto-Meta':
get_crypto_meta_header(body_crypto_meta),
'x-object-meta-test': 'plaintext'}
self.app.register(
method, '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('200 OK', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
self.assertEqual('text/plain', resp.headers['Content-Type'])
self.assertEqual('plaintext', resp.headers['x-object-meta-test'])
def test_HEAD_metadata_not_encrypted(self):
self._test_req_metadata_not_encrypted('HEAD')
def test_GET_metadata_not_encrypted(self):
self._test_req_metadata_not_encrypted('GET')
def test_GET_unencrypted_data(self):
# testing case of an unencrypted object with encrypted metadata from
# a later POST
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
hdrs = {'Etag': md5hex(body),
'content-type': 'text/plain',
'content-length': len(body),
'x-object-meta-test':
base64.b64encode(encrypt('encrypt me',
fetch_crypto_keys()['object'],
FAKE_IV)),
'x-object-transient-sysmeta-crypto-meta-test':
get_crypto_meta_header(),
'x-object-sysmeta-test': 'do not encrypt me'}
self.app.register('GET', '/v1/a/c/o', HTTPOk, body=body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual(body, resp.body)
self.assertEqual('200 OK', resp.status)
self.assertEqual(md5hex(body), resp.headers['Etag'])
self.assertEqual('text/plain', resp.headers['Content-Type'])
# POSTed user meta was encrypted
self.assertEqual('encrypt me', resp.headers['x-object-meta-test'])
# PUT sysmeta was not encrypted
self.assertEqual('do not encrypt me',
resp.headers['x-object-sysmeta-test'])
def test_GET_multiseg(self):
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
chunks = ['some', 'chunks', 'of data']
body = ''.join(chunks)
plaintext_etag = md5hex(body)
cont_key = fetch_crypto_keys()['container']
object_key = fetch_crypto_keys()['object']
body_key = os.urandom(32)
body_key_meta = {'key': encrypt(body_key, object_key, FAKE_IV),
'iv': FAKE_IV}
body_crypto_meta = fake_get_crypto_meta(body_key=body_key_meta)
ctxt = Crypto().create_encryption_ctxt(body_key, FAKE_IV)
enc_body = [encrypt(chunk, ctxt=ctxt) for chunk in chunks]
hdrs = {
'Etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': sum(map(len, enc_body)),
'X-Object-Sysmeta-Crypto-Etag':
base64.b64encode(encrypt(plaintext_etag, cont_key, FAKE_IV)),
'X-Object-Sysmeta-Crypto-Meta-Etag': get_crypto_meta_header(),
'X-Object-Sysmeta-Crypto-Meta':
get_crypto_meta_header(body_crypto_meta)}
self.app.register(
'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual(body, resp.body)
self.assertEqual('200 OK', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
self.assertEqual('text/plain', resp.headers['Content-Type'])
def test_GET_multiseg_with_range(self):
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
req.headers['Content-Range'] = 'bytes 3-10/17'
chunks = ['0123', '45678', '9abcdef']
body = ''.join(chunks)
plaintext_etag = md5hex(body)
cont_key = fetch_crypto_keys()['container']
object_key = fetch_crypto_keys()['object']
body_key = os.urandom(32)
body_key_meta = {'key': encrypt(body_key, object_key, FAKE_IV),
'iv': FAKE_IV}
body_crypto_meta = fake_get_crypto_meta(body_key=body_key_meta)
ctxt = Crypto().create_encryption_ctxt(body_key, FAKE_IV)
enc_body = [encrypt(chunk, ctxt=ctxt) for chunk in chunks]
enc_body = [enc_body[0][3:], enc_body[1], enc_body[2][:2]]
hdrs = {
'Etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': sum(map(len, enc_body)),
'content-range': req.headers['Content-Range'],
'X-Object-Sysmeta-Crypto-Etag':
base64.b64encode(encrypt(plaintext_etag, cont_key, FAKE_IV)),
'X-Object-Sysmeta-Crypto-Meta-Etag': get_crypto_meta_header(),
'X-Object-Sysmeta-Crypto-Meta':
get_crypto_meta_header(body_crypto_meta)}
self.app.register(
'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('3456789a', resp.body)
self.assertEqual('200 OK', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
self.assertEqual('text/plain', resp.headers['Content-Type'])
# Force the decrypter context updates to be less than one of our range
# sizes to check that the decrypt context offset is setup correctly with
# offset to first byte of range for first update and then re-used.
# Do mocking here to have the mocked value have effect in the generator
# function.
@mock.patch.object(decrypter, 'DECRYPT_CHUNK_SIZE', 4)
def test_GET_multipart_ciphertext(self):
# build fake multipart response body
cont_key = fetch_crypto_keys()['container']
object_key = fetch_crypto_keys()['object']
body_key = os.urandom(32)
body_key_meta = {'key': encrypt(body_key, object_key, FAKE_IV),
'iv': FAKE_IV}
body_crypto_meta = fake_get_crypto_meta(body_key=body_key_meta)
plaintext = 'Cwm fjord veg balks nth pyx quiz'
plaintext_etag = md5hex(plaintext)
ciphertext = encrypt(plaintext, body_key, FAKE_IV)
parts = ((0, 3, 'text/plain'),
(4, 9, 'text/plain; charset=us-ascii'),
(24, 32, 'text/plain'))
length = len(ciphertext)
body = ''
for start, end, ctype in parts:
body += '--multipartboundary\r\n'
body += 'Content-Type: %s\r\n' % ctype
body += 'Content-Range: bytes %s-%s/%s' % (start, end - 1, length)
body += '\r\n\r\n' + ciphertext[start:end] + '\r\n'
body += '--multipartboundary--'
# register request with fake swift
hdrs = {
'Etag': 'hashOfCiphertext',
'content-type': 'multipart/byteranges;boundary=multipartboundary',
'content-length': len(body),
'X-Object-Sysmeta-Crypto-Etag':
base64.b64encode(encrypt(plaintext_etag, cont_key, FAKE_IV)),
'X-Object-Sysmeta-Crypto-Meta-Etag': get_crypto_meta_header(),
'X-Object-Sysmeta-Crypto-Meta':
get_crypto_meta_header(body_crypto_meta)}
self.app.register('GET', '/v1/a/c/o', HTTPPartialContent, body=body,
headers=hdrs)
# issue request
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
resp = req.get_response(self.decrypter)
self.assertEqual('206 Partial Content', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
self.assertEqual(len(body), int(resp.headers['Content-Length']))
self.assertEqual('multipart/byteranges;boundary=multipartboundary',
resp.headers['Content-Type'])
# the multipart headers could be re-ordered, so parse response body to
# verify expected content
resp_lines = resp.body.split('\r\n')
resp_lines.reverse()
for start, end, ctype in parts:
self.assertEqual('--multipartboundary', resp_lines.pop())
expected_header_lines = {
'Content-Type: %s' % ctype,
'Content-Range: bytes %s-%s/%s' % (start, end - 1, length)}
resp_header_lines = {resp_lines.pop(), resp_lines.pop()}
self.assertEqual(expected_header_lines, resp_header_lines)
self.assertEqual('', resp_lines.pop())
self.assertEqual(plaintext[start:end], resp_lines.pop())
self.assertEqual('--multipartboundary--', resp_lines.pop())
# we should have consumed the whole response body
self.assertFalse(resp_lines)
def test_GET_multipart_content_type(self):
# *just* having multipart content type shouldn't trigger the mime doc
# code path
cont_key = fetch_crypto_keys()['container']
object_key = fetch_crypto_keys()['object']
body_key = os.urandom(32)
body_key_meta = {'key': encrypt(body_key, object_key, FAKE_IV),
'iv': FAKE_IV}
body_crypto_meta = fake_get_crypto_meta(body_key=body_key_meta)
plaintext = 'Cwm fjord veg balks nth pyx quiz'
plaintext_etag = md5hex(plaintext)
ciphertext = encrypt(plaintext, body_key, FAKE_IV)
# register request with fake swift
hdrs = {
'Etag': md5hex(ciphertext),
'content-type': 'multipart/byteranges;boundary=multipartboundary',
'content-length': len(ciphertext),
'X-Object-Sysmeta-Crypto-Etag':
base64.b64encode(encrypt(plaintext_etag, cont_key, FAKE_IV)),
'X-Object-Sysmeta-Crypto-Meta-Etag': get_crypto_meta_header(),
'X-Object-Sysmeta-Crypto-Meta':
get_crypto_meta_header(body_crypto_meta)}
self.app.register('GET', '/v1/a/c/o', HTTPOk, body=ciphertext,
headers=hdrs)
# issue request
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
resp = req.get_response(self.decrypter)
self.assertEqual('200 OK', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
self.assertEqual(len(plaintext), int(resp.headers['Content-Length']))
self.assertEqual('multipart/byteranges;boundary=multipartboundary',
resp.headers['Content-Type'])
self.assertEqual(plaintext, resp.body)
def test_GET_multipart_no_body_crypto_meta(self):
# build fake multipart response body
plaintext = 'Cwm fjord veg balks nth pyx quiz'
plaintext_etag = md5hex(plaintext)
parts = ((0, 3, 'text/plain'),
(4, 9, 'text/plain; charset=us-ascii'),
(24, 32, 'text/plain'))
length = len(plaintext)
body = ''
for start, end, ctype in parts:
body += '--multipartboundary\r\n'
body += 'Content-Type: %s\r\n' % ctype
body += 'Content-Range: bytes %s-%s/%s' % (start, end - 1, length)
body += '\r\n\r\n' + plaintext[start:end] + '\r\n'
body += '--multipartboundary--'
# register request with fake swift
hdrs = {
'Etag': plaintext_etag,
'content-type': 'multipart/byteranges;boundary=multipartboundary',
'content-length': len(body)}
self.app.register('GET', '/v1/a/c/o', HTTPPartialContent, body=body,
headers=hdrs)
# issue request
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
resp = req.get_response(self.decrypter)
self.assertEqual('206 Partial Content', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
self.assertEqual(len(body), int(resp.headers['Content-Length']))
self.assertEqual('multipart/byteranges;boundary=multipartboundary',
resp.headers['Content-Type'])
# the multipart response body should be unchanged
self.assertEqual(body, resp.body)
def _test_GET_multipart_bad_body_crypto_meta(self, bad_crypto_meta):
# build fake multipart response body
key = fetch_crypto_keys()['object']
ctxt = Crypto().create_encryption_ctxt(key, FAKE_IV)
plaintext = 'Cwm fjord veg balks nth pyx quiz'
plaintext_etag = md5hex(plaintext)
ciphertext = encrypt(plaintext, ctxt=ctxt)
parts = ((0, 3, 'text/plain'),
(4, 9, 'text/plain; charset=us-ascii'),
(24, 32, 'text/plain'))
length = len(ciphertext)
body = ''
for start, end, ctype in parts:
body += '--multipartboundary\r\n'
body += 'Content-Type: %s\r\n' % ctype
body += 'Content-Range: bytes %s-%s/%s' % (start, end - 1, length)
body += '\r\n\r\n' + ciphertext[start:end] + '\r\n'
body += '--multipartboundary--'
# register request with fake swift
hdrs = {
'Etag': 'hashOfCiphertext',
'content-type': 'multipart/byteranges;boundary=multipartboundary',
'content-length': len(body),
'X-Object-Sysmeta-Crypto-Etag':
base64.b64encode(encrypt(plaintext_etag, key, FAKE_IV)),
'X-Object-Sysmeta-Crypto-Meta-Etag': get_crypto_meta_header(),
'X-Object-Sysmeta-Crypto-Meta':
get_crypto_meta_header(bad_crypto_meta)}
self.app.register('GET', '/v1/a/c/o', HTTPOk, body=body, headers=hdrs)
# issue request
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Error decrypting object', resp.body)
self.assertIn('Error decrypting object',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_multipart_bad_body_cipher(self):
self._test_GET_multipart_bad_body_crypto_meta(
{'cipher': 'Mystery cipher', 'iv': '1234567887654321'})
self.assertIn('Cipher must be AES_CTR_256',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_multipart_missing_body_cipher(self):
self._test_GET_multipart_bad_body_crypto_meta(
{'iv': '1234567887654321'})
self.assertIn('cipher',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_multipart_too_short_body_iv(self):
self._test_GET_multipart_bad_body_crypto_meta(
{'cipher': 'AES_CTR_256', 'iv': 'too short'})
self.assertIn('IV must be length 16',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_multipart_too_long_body_iv(self):
self._test_GET_multipart_bad_body_crypto_meta(
{'cipher': 'AES_CTR_256', 'iv': 'a little too long'})
self.assertIn('IV must be length 16',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_multipart_missing_body_iv(self):
self._test_GET_multipart_bad_body_crypto_meta(
{'cipher': 'AES_CTR_256'})
self.assertIn('iv',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_missing_key_callback(self):
# Do not provide keys, and do not set override flag
env = {'REQUEST_METHOD': 'GET'}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
enc_body = encrypt(body, fetch_crypto_keys()['object'], FAKE_IV)
hdrs = {'Etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': len(enc_body),
'X-Object-Sysmeta-Crypto-Etag': md5hex('not the body'),
'X-Object-Sysmeta-Crypto-Meta': get_crypto_meta_header()}
self.app.register(
'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Unable to retrieve encryption keys.',
resp.body)
self.assertIn('%s not in env' % CRYPTO_KEY_CALLBACK,
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_error_in_key_callback(self):
def raise_exc():
raise Exception('Testing')
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: raise_exc}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
enc_body = encrypt(body, fetch_crypto_keys()['object'], FAKE_IV)
hdrs = {'Etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': len(enc_body),
'X-Object-Sysmeta-Crypto-Etag': md5hex(body),
'X-Object-Sysmeta-Crypto-Meta': get_crypto_meta_header()}
self.app.register(
'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Unable to retrieve encryption keys.',
resp.body)
self.assertIn('%s: Testing' % CRYPTO_KEY_CALLBACK,
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_cipher_mismatch_for_body(self):
# Cipher does not match
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
enc_body = encrypt(body, fetch_crypto_keys()['object'], FAKE_IV)
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['cipher'] = 'unknown_cipher'
hdrs = {'Etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': len(enc_body),
'X-Object-Sysmeta-Crypto-Etag': md5hex(body),
'X-Object-Sysmeta-Crypto-Meta':
get_crypto_meta_header(crypto_meta=bad_crypto_meta)}
self.app.register(
'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Error decrypting header', resp.body)
self.assertIn('Error decrypting header X-Object-Sysmeta-Crypto-Etag',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_cipher_mismatch_for_metadata(self):
# Cipher does not match
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
key = fetch_crypto_keys()['object']
enc_body = encrypt(body, key, FAKE_IV)
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['cipher'] = 'unknown_cipher'
hdrs = {'Etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': len(enc_body),
'x-object-meta-test':
base64.b64encode(encrypt('encrypt me', key, FAKE_IV)),
'x-object-transient-sysmeta-crypto-meta-test':
get_crypto_meta_header(crypto_meta=bad_crypto_meta)}
self.app.register(
'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Error decrypting header', resp.body)
self.assertIn('Error decrypting header X-Object-Meta-Test',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_decryption_override(self):
# This covers the case of an old un-encrypted object
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys,
'swift.crypto.override': True}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
hdrs = {'Etag': md5hex(body),
'content-type': 'text/plain',
'content-length': len(body),
'x-object-meta-test': 'do not encrypt me',
'x-object-sysmeta-test': 'do not encrypt me'}
self.app.register('GET', '/v1/a/c/o', HTTPOk, body=body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual(body, resp.body)
self.assertEqual('200 OK', resp.status)
self.assertEqual(md5hex(body), resp.headers['Etag'])
self.assertEqual('text/plain', resp.headers['Content-Type'])
self.assertEqual('do not encrypt me',
resp.headers['x-object-meta-test'])
self.assertEqual('do not encrypt me',
resp.headers['x-object-sysmeta-test'])
class TestDecrypterContainerRequests(unittest.TestCase):
def setUp(self):
self.app = FakeSwift()
self.decrypter = decrypter.Decrypter(self.app, {})
self.decrypter.logger = FakeLogger()
def _make_cont_get_req(self, resp_body, format, override=False):
path = '/v1/a/c'
content_type = 'text/plain'
if format:
path = '%s/?format=%s' % (path, format)
content_type = 'application/' + format
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
if override:
env['swift.crypto.override'] = True
req = Request.blank(path, environ=env)
hdrs = {'content-type': content_type}
self.app.register('GET', path, HTTPOk, body=resp_body, headers=hdrs)
return req.get_response(self.decrypter)
def test_GET_container_success(self):
# no format requested, listing has names only
fake_body = 'testfile1\ntestfile2\n'
resp = self._make_cont_get_req(fake_body, None)
self.assertEqual('200 OK', resp.status)
names = resp.body.split('\n')
self.assertEqual(3, len(names))
self.assertIn('testfile1', names)
self.assertIn('testfile2', names)
self.assertIn('', names)
def test_GET_container_json(self):
content_type_1 = u'\uF10F\uD20D\uB30B\u9409'
content_type_2 = 'text/plain; param=foo'
pt_etag1 = 'c6e8196d7f0fff6444b90861fe8d609d'
pt_etag2 = 'ac0374ed4d43635f803c82469d0b5a10'
key = fetch_crypto_keys()['container']
obj_dict_1 = {"bytes": 16,
"last_modified": "2015-04-14T23:33:06.439040",
"hash": encrypt_and_append_meta(
pt_etag1.encode('utf-8'), key),
"name": "testfile",
"content_type": content_type_1}
obj_dict_2 = {"bytes": 24,
"last_modified": "2015-04-14T23:33:06.519020",
"hash": encrypt_and_append_meta(
pt_etag2.encode('utf-8'), key),
"name": "testfile2",
"content_type": content_type_2}
listing = [obj_dict_1, obj_dict_2]
fake_body = json.dumps(listing)
resp = self._make_cont_get_req(fake_body, 'json')
self.assertEqual('200 OK', resp.status)
body = resp.body
self.assertEqual(len(body), int(resp.headers['Content-Length']))
body_json = json.loads(body)
self.assertEqual(2, len(body_json))
obj_dict_1['hash'] = pt_etag1
self.assertDictEqual(obj_dict_1, body_json[0])
obj_dict_2['hash'] = pt_etag2
self.assertDictEqual(obj_dict_2, body_json[1])
def test_GET_container_json_with_crypto_override(self):
content_type_1 = 'image/jpeg'
content_type_2 = 'text/plain; param=foo'
pt_etag1 = 'c6e8196d7f0fff6444b90861fe8d609d'
pt_etag2 = 'ac0374ed4d43635f803c82469d0b5a10'
obj_dict_1 = {"bytes": 16,
"last_modified": "2015-04-14T23:33:06.439040",
"hash": pt_etag1,
"name": "testfile",
"content_type": content_type_1}
obj_dict_2 = {"bytes": 24,
"last_modified": "2015-04-14T23:33:06.519020",
"hash": pt_etag2,
"name": "testfile2",
"content_type": content_type_2}
listing = [obj_dict_1, obj_dict_2]
fake_body = json.dumps(listing)
resp = self._make_cont_get_req(fake_body, 'json', override=True)
self.assertEqual('200 OK', resp.status)
body = resp.body
self.assertEqual(len(body), int(resp.headers['Content-Length']))
body_json = json.loads(body)
self.assertEqual(2, len(body_json))
self.assertDictEqual(obj_dict_1, body_json[0])
self.assertDictEqual(obj_dict_2, body_json[1])
def test_cont_get_json_req_with_cipher_mismatch(self):
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['cipher'] = 'unknown_cipher'
key = fetch_crypto_keys()['container']
pt_etag = 'c6e8196d7f0fff6444b90861fe8d609d'
ct_etag = encrypt_and_append_meta(pt_etag, key,
crypto_meta=bad_crypto_meta)
obj_dict_1 = {"bytes": 16,
"last_modified": "2015-04-14T23:33:06.439040",
"hash": ct_etag,
"name": "testfile",
"content_type": "image/jpeg"}
listing = [obj_dict_1]
fake_body = json.dumps(listing)
resp = self._make_cont_get_req(fake_body, 'json')
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Error decrypting container listing', resp.body)
self.assertIn("Cipher must be AES_CTR_256",
self.decrypter.logger.get_lines_for_level('error')[0])
def _assert_element_contains_dict(self, expected, element):
for k, v in expected.items():
entry = element.getElementsByTagName(k)
self.assertIsNotNone(entry, 'Key %s not found' % k)
actual = entry[0].childNodes[0].nodeValue
self.assertEqual(v, actual,
"Expected %s but got %s for key %s"
% (v, actual, k))
def test_GET_container_xml(self):
content_type_1 = u'\uF10F\uD20D\uB30B\u9409'
content_type_2 = 'text/plain; param=foo'
pt_etag1 = 'c6e8196d7f0fff6444b90861fe8d609d'
pt_etag2 = 'ac0374ed4d43635f803c82469d0b5a10'
key = fetch_crypto_keys()['container']
fake_body = '''<?xml version="1.0" encoding="UTF-8"?>
<container name="testc">\
<object><hash>\
''' + encrypt_and_append_meta(pt_etag1.encode('utf8'), key) + '''\
</hash><content_type>\
''' + content_type_1 + '''\
</content_type><name>testfile</name><bytes>16</bytes>\
<last_modified>2015-04-19T02:37:39.601660</last_modified></object>\
<object><hash>\
''' + encrypt_and_append_meta(pt_etag2.encode('utf8'), key) + '''\
</hash><content_type>\
''' + content_type_2 + '''\
</content_type><name>testfile2</name><bytes>24</bytes>\
<last_modified>2015-04-19T02:37:39.684740</last_modified></object>\
</container>'''
resp = self._make_cont_get_req(fake_body, 'xml')
self.assertEqual('200 OK', resp.status)
body = resp.body
self.assertEqual(len(body), int(resp.headers['Content-Length']))
tree = minidom.parseString(body)
containers = tree.getElementsByTagName('container')
self.assertEqual(1, len(containers))
self.assertEqual('testc',
containers[0].attributes.getNamedItem("name").value)
objs = tree.getElementsByTagName('object')
self.assertEqual(2, len(objs))
obj_dict_1 = {"bytes": "16",
"last_modified": "2015-04-19T02:37:39.601660",
"hash": pt_etag1,
"name": "testfile",
"content_type": content_type_1}
self._assert_element_contains_dict(obj_dict_1, objs[0])
obj_dict_2 = {"bytes": "24",
"last_modified": "2015-04-19T02:37:39.684740",
"hash": pt_etag2,
"name": "testfile2",
"content_type": content_type_2}
self._assert_element_contains_dict(obj_dict_2, objs[1])
def test_GET_container_xml_with_crypto_override(self):
content_type_1 = 'image/jpeg'
content_type_2 = 'text/plain; param=foo'
fake_body = '''<?xml version="1.0" encoding="UTF-8"?>
<container name="testc">\
<object><hash>c6e8196d7f0fff6444b90861fe8d609d</hash>\
<content_type>''' + content_type_1 + '''\
</content_type><name>testfile</name><bytes>16</bytes>\
<last_modified>2015-04-19T02:37:39.601660</last_modified></object>\
<object><hash>ac0374ed4d43635f803c82469d0b5a10</hash>\
<content_type>''' + content_type_2 + '''\
</content_type><name>testfile2</name><bytes>24</bytes>\
<last_modified>2015-04-19T02:37:39.684740</last_modified></object>\
</container>'''
resp = self._make_cont_get_req(fake_body, 'xml', override=True)
self.assertEqual('200 OK', resp.status)
body = resp.body
self.assertEqual(len(body), int(resp.headers['Content-Length']))
tree = minidom.parseString(body)
containers = tree.getElementsByTagName('container')
self.assertEqual(1, len(containers))
self.assertEqual('testc',
containers[0].attributes.getNamedItem("name").value)
objs = tree.getElementsByTagName('object')
self.assertEqual(2, len(objs))
obj_dict_1 = {"bytes": "16",
"last_modified": "2015-04-19T02:37:39.601660",
"hash": "c6e8196d7f0fff6444b90861fe8d609d",
"name": "testfile",
"content_type": content_type_1}
self._assert_element_contains_dict(obj_dict_1, objs[0])
obj_dict_2 = {"bytes": "24",
"last_modified": "2015-04-19T02:37:39.684740",
"hash": "ac0374ed4d43635f803c82469d0b5a10",
"name": "testfile2",
"content_type": content_type_2}
self._assert_element_contains_dict(obj_dict_2, objs[1])
def test_cont_get_xml_req_with_cipher_mismatch(self):
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['cipher'] = 'unknown_cipher'
fake_body = '''<?xml version="1.0" encoding="UTF-8"?>
<container name="testc"><object>\
<hash>''' + encrypt_and_append_meta('c6e8196d7f0fff6444b90861fe8d609d',
fetch_crypto_keys()['container'],
crypto_meta=bad_crypto_meta) + '''\
</hash>\
<content_type>image/jpeg</content_type>\
<name>testfile</name><bytes>16</bytes>\
<last_modified>2015-04-19T02:37:39.601660</last_modified></object>\
</container>'''
resp = self._make_cont_get_req(fake_body, 'xml')
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Error decrypting container listing', resp.body)
self.assertIn("Cipher must be AES_CTR_256",
self.decrypter.logger.get_lines_for_level('error')[0])
class TestModuleMethods(unittest.TestCase):
def test_filter_factory(self):
factory = decrypter.filter_factory({})
self.assertTrue(callable(factory))
self.assertIsInstance(factory(None), decrypter.Decrypter)
class TestDecrypter(unittest.TestCase):
def test_app_exception(self):
app = decrypter.Decrypter(FakeAppThatExcepts(), {})
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'})
with self.assertRaises(HTTPException) as catcher:
req.get_response(app)
self.assertEqual(FakeAppThatExcepts.MESSAGE, catcher.exception.body)
if __name__ == '__main__':
unittest.main()