Support for http footers - Replication and EC

Before this patch, the proxy ObjectController supported sending
metadata from the proxy server to object servers in "footers" that
trail the body of HTTP PUT requests, but this support was for EC
policies only.  The encryption feature requires that footers are sent
with both EC and replicated policy requests in order to persist
encryption specific sysmeta, and to override container update headers
with an encrypted Etag value.

This patch:

- Moves most of the functionality of ECPutter into a generic Putter
  class that is used for replicated object PUTs without footers.

- Creates a MIMEPutter subclass to support multipart and multiphase
  behaviour required for any replicated object PUT with footers and
  all EC PUTs.

- Modifies ReplicatedObjectController to use Putter objects in place
  of raw connection objects.

- Refactors the _get_put_connections method and _put_connect_node methods
  so that more code is in the BaseObjectController class and therefore
  shared by [EC|Replicated]ObjectController classes.

- Adds support to call a callback that middleware may have placed
  in the environ, so the callback can set footers. The
  x-object-sysmeta-ec- namespace is reserved and any footer values
  set by middleware in that namespace will not be forwarded to
  object servers.

In addition this patch enables more than one value to be added to the
X-Backend-Etag-Is-At header. This header is used to point to an
(optional) alternative sysmeta header whose value should be used when
evaluating conditional requests with If-[None-]Match headers.  This is
already used with EC policies when the ECObjectController has
calculated the actual body Etag and sent it using a footer
(X-Object-Sysmeta-EC-Etag). X-Backend-Etag-Is-At is in that case set
to X-Object-Sysmeta-Ec-Etag so as to point to the actual body Etag
value rather than the EC fragment Etag.

Encryption will also need to add a pointer to an encrypted Etag value.
However, the referenced sysmeta may not exist, for example if the
object was created before encryption was enabled. The
X-Backend-Etag-Is-At value is therefore changed to support a list of
possible locations for alternate Etag values. Encryption will place
its expected alternative Etag location on this list, as will the
ECObjectController, and the object server will look for the first
object metadata to match an entry on the list when matching
conditional requests. That way, if the object was not encrypted then
the object server will fall through to using the EC Etag value, or in
the case of a replicated policy will fall through to using the normal
Etag metadata.

If your proxy has a third-party middleware that uses X-Backend-Etag-Is-At
and it upgrades before an object server it's talking to then conditional
requests may be broken.

UpgradeImpact

Co-Authored-By: Alistair Coles <alistair.coles@hpe.com>
Co-Authored-By: Thiago da Silva <thiago@redhat.com>
Co-Authored-By: Samuel Merritt <sam@swiftstack.com>
Co-Authored-By: Kota Tsuyuzaki <tsuyuzaki.kota@lab.ntt.co.jp>

Closes-Bug: #1594739
Change-Id: I12a6e41150f90de746ce03623032b83ed1987ee1
This commit is contained in:
Janie Richling 2016-06-06 17:19:48 +01:00 committed by Alistair Coles
parent 928c4790eb
commit 03b762e80a
10 changed files with 1165 additions and 494 deletions

View File

@ -27,6 +27,7 @@ import time
import six
from six.moves.urllib.parse import unquote
from swift.common.header_key_dict import HeaderKeyDict
from swift import gettext_ as _
from swift.common.storage_policy import POLICIES
@ -38,7 +39,7 @@ from swift.common.swob import HTTPBadRequest, HTTPNotAcceptable, \
from swift.common.utils import split_path, validate_device_partition, \
close_if_possible, maybe_multipart_byteranges_to_document_iters, \
multipart_byteranges_to_document_iters, parse_content_type, \
parse_content_range
parse_content_range, csv_append, list_from_csv
from swift.common.wsgi import make_subrequest
@ -544,3 +545,66 @@ def http_response_to_document_iters(response, read_chunk_size=4096):
params = dict(params_list)
return multipart_byteranges_to_document_iters(
response, params['boundary'], read_chunk_size)
def update_etag_is_at_header(req, name):
"""
Helper function to update an X-Backend-Etag-Is-At header whose value is a
list of alternative header names at which the actual object etag may be
found. This informs the object server where to look for the actual object
etag when processing conditional requests.
Since the proxy server and/or middleware may set alternative etag header
names, the value of X-Backend-Etag-Is-At is a comma separated list which
the object server inspects in order until it finds an etag value.
:param req: a swob Request
:param name: name of a sysmeta where alternative etag may be found
"""
if ',' in name:
# HTTP header names should not have commas but we'll check anyway
raise ValueError('Header name must not contain commas')
existing = req.headers.get("X-Backend-Etag-Is-At")
req.headers["X-Backend-Etag-Is-At"] = csv_append(
existing, name)
def resolve_etag_is_at_header(req, metadata):
"""
Helper function to resolve an alternative etag value that may be stored in
metadata under an alternate name.
The value of the request's X-Backend-Etag-Is-At header (if it exists) is a
comma separated list of alternate names in the metadata at which an
alternate etag value may be found. This list is processed in order until an
alternate etag is found.
The left most value in X-Backend-Etag-Is-At will have been set by the left
most middleware, or if no middleware, by ECObjectController, if an EC
policy is in use. The left most middleware is assumed to be the authority
on what the etag value of the object content is.
The resolver will work from left to right in the list until it finds a
value that is a name in the given metadata. So the left most wins, IF it
exists in the metadata.
By way of example, assume the encrypter middleware is installed. If an
object is *not* encrypted then the resolver will not find the encrypter
middleware's alternate etag sysmeta (X-Object-Sysmeta-Crypto-Etag) but will
then find the EC alternate etag (if EC policy). But if the object *is*
encrypted then X-Object-Sysmeta-Crypto-Etag is found and used, which is
correct because it should be preferred over X-Object-Sysmeta-Crypto-Etag.
:param req: a swob Request
:param metadata: a dict containing object metadata
:return: an alternate etag value if any is found, otherwise None
"""
alternate_etag = None
metadata = HeaderKeyDict(metadata)
if "X-Backend-Etag-Is-At" in req.headers:
names = list_from_csv(req.headers["X-Backend-Etag-Is-At"])
for name in names:
if name in metadata:
alternate_etag = metadata[name]
break
return alternate_etag

View File

@ -1140,8 +1140,8 @@ class Response(object):
conditional requests.
It's most effectively used with X-Backend-Etag-Is-At which would
define the additional Metadata key where the original ETag of the
clear-form client request data.
define the additional Metadata key(s) where the original ETag of the
clear-form client request data may be found.
"""
if self._conditional_etag is not None:
return self._conditional_etag

View File

@ -46,7 +46,7 @@ from swift.common.http import is_success
from swift.common.base_storage_server import BaseStorageServer
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.request_helpers import get_name_and_placement, \
is_user_meta, is_sys_or_user_meta
is_user_meta, is_sys_or_user_meta, resolve_etag_is_at_header
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \
@ -832,10 +832,7 @@ class ObjectController(BaseStorageServer):
keep_cache = (self.keep_cache_private or
('X-Auth-Token' not in request.headers and
'X-Storage-Token' not in request.headers))
conditional_etag = None
if 'X-Backend-Etag-Is-At' in request.headers:
conditional_etag = metadata.get(
request.headers['X-Backend-Etag-Is-At'])
conditional_etag = resolve_etag_is_at_header(request, metadata)
response = Response(
app_iter=disk_file.reader(keep_cache=keep_cache),
request=request, conditional_response=True,
@ -889,10 +886,7 @@ class ObjectController(BaseStorageServer):
headers['X-Backend-Timestamp'] = e.timestamp.internal
return HTTPNotFound(request=request, headers=headers,
conditional_response=True)
conditional_etag = None
if 'X-Backend-Etag-Is-At' in request.headers:
conditional_etag = metadata.get(
request.headers['X-Backend-Etag-Is-At'])
conditional_etag = resolve_etag_is_at_header(request, metadata)
response = Response(request=request, conditional_response=True,
conditional_etag=conditional_etag)
response.headers['Content-Type'] = metadata.get(

File diff suppressed because it is too large Load Diff

View File

@ -32,6 +32,8 @@ import eventlet
from eventlet.green import socket
from tempfile import mkdtemp
from shutil import rmtree
from swift.common.utils import Timestamp, NOTICE
from test import get_config
from swift.common import utils
@ -848,7 +850,7 @@ def fake_http_connect(*code_iter, **kwargs):
def __init__(self, status, etag=None, body='', timestamp='1',
headers=None, expect_headers=None, connection_id=None,
give_send=None):
give_send=None, give_expect=None):
if not isinstance(status, FakeStatus):
status = FakeStatus(status)
self._status = status
@ -864,6 +866,8 @@ def fake_http_connect(*code_iter, **kwargs):
self.timestamp = timestamp
self.connection_id = connection_id
self.give_send = give_send
self.give_expect = give_expect
self.closed = False
if 'slow' in kwargs and isinstance(kwargs['slow'], list):
try:
self._next_sleep = kwargs['slow'].pop(0)
@ -884,6 +888,8 @@ def fake_http_connect(*code_iter, **kwargs):
return self
def getexpect(self):
if self.give_expect:
self.give_expect(self)
expect_status = self._status.get_expect_status()
headers = dict(self.expect_headers)
if expect_status == 409:
@ -953,7 +959,7 @@ def fake_http_connect(*code_iter, **kwargs):
def send(self, amt=None):
if self.give_send:
self.give_send(self.connection_id, amt)
self.give_send(self, amt)
am_slow, value = self.get_slow()
if am_slow:
if self.received < 4:
@ -964,7 +970,7 @@ def fake_http_connect(*code_iter, **kwargs):
return HeaderKeyDict(self.getheaders()).get(name, default)
def close(self):
pass
self.closed = True
timestamps_iter = iter(kwargs.get('timestamps') or ['1'] * len(code_iter))
etag_iter = iter(kwargs.get('etags') or [None] * len(code_iter))
@ -1017,7 +1023,8 @@ def fake_http_connect(*code_iter, **kwargs):
body = next(body_iter)
return FakeConn(status, etag, body=body, timestamp=timestamp,
headers=headers, expect_headers=expect_headers,
connection_id=i, give_send=kwargs.get('give_send'))
connection_id=i, give_send=kwargs.get('give_send'),
give_expect=kwargs.get('give_expect'))
connect.code_iter = code_iter

View File

@ -16,7 +16,6 @@
# This stuff can't live in test/unit/__init__.py due to its swob dependency.
from collections import defaultdict
from copy import deepcopy
from hashlib import md5
from swift.common import swob
from swift.common.header_key_dict import HeaderKeyDict
@ -113,24 +112,34 @@ class FakeSwift(object):
raise KeyError("Didn't find %r in allowed responses" % (
(method, path),))
self._calls.append((method, path, req_headers))
# simulate object PUT
if method == 'PUT' and obj:
input = env['wsgi.input'].read()
input = ''.join(iter(env['wsgi.input'].read, ''))
if 'swift.callback.update_footers' in env:
footers = HeaderKeyDict()
env['swift.callback.update_footers'](footers)
req_headers.update(footers)
etag = md5(input).hexdigest()
headers.setdefault('Etag', etag)
headers.setdefault('Content-Length', len(input))
# keep it for subsequent GET requests later
self.uploaded[path] = (deepcopy(headers), input)
self.uploaded[path] = (dict(req_headers), input)
if "CONTENT_TYPE" in env:
self.uploaded[path][0]['Content-Type'] = env["CONTENT_TYPE"]
# range requests ought to work, which require conditional_response=True
self._calls.append((method, path, HeaderKeyDict(req_headers)))
# range requests ought to work, hence conditional_response=True
req = swob.Request(env)
resp = resp_class(req=req, headers=headers, body=body,
conditional_response=req.method in ('GET', 'HEAD'))
if isinstance(body, list):
resp = resp_class(
req=req, headers=headers, app_iter=body,
conditional_response=req.method in ('GET', 'HEAD'))
else:
resp = resp_class(
req=req, headers=headers, body=body,
conditional_response=req.method in ('GET', 'HEAD'))
wsgi_iter = resp(env, start_response)
self.mark_opened(path)
return LeakTrackingIter(wsgi_iter, self, path)

View File

@ -21,7 +21,8 @@ from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY
from swift.common.request_helpers import is_sys_meta, is_user_meta, \
is_sys_or_user_meta, strip_sys_meta_prefix, strip_user_meta_prefix, \
remove_items, copy_header_subset, get_name_and_placement, \
http_response_to_document_iters
http_response_to_document_iters, update_etag_is_at_header, \
resolve_etag_is_at_header
from test.unit import patch_policies
from test.unit.common.test_utils import FakeResponse
@ -273,3 +274,74 @@ class TestHTTPResponseToDocumentIters(unittest.TestCase):
self.assertEqual(body.read(), 'ches')
self.assertRaises(StopIteration, next, doc_iters)
def test_update_etag_is_at_header(self):
# start with no existing X-Backend-Etag-Is-At
req = Request.blank('/v/a/c/o')
update_etag_is_at_header(req, 'X-Object-Sysmeta-My-Etag')
self.assertEqual('X-Object-Sysmeta-My-Etag',
req.headers['X-Backend-Etag-Is-At'])
# add another alternate
update_etag_is_at_header(req, 'X-Object-Sysmeta-Ec-Etag')
self.assertEqual('X-Object-Sysmeta-My-Etag,X-Object-Sysmeta-Ec-Etag',
req.headers['X-Backend-Etag-Is-At'])
with self.assertRaises(ValueError) as cm:
update_etag_is_at_header(req, 'X-Object-Sysmeta-,-Bad')
self.assertEqual('Header name must not contain commas',
cm.exception.message)
def test_resolve_etag_is_at_header(self):
def do_test():
req = Request.blank('/v/a/c/o')
# ok to have no X-Backend-Etag-Is-At
self.assertIsNone(resolve_etag_is_at_header(req, metadata))
# ok to have no matching metadata
req.headers['X-Backend-Etag-Is-At'] = 'X-Not-There'
self.assertIsNone(resolve_etag_is_at_header(req, metadata))
# selects from metadata
req.headers['X-Backend-Etag-Is-At'] = 'X-Object-Sysmeta-Ec-Etag'
self.assertEqual('an etag value',
resolve_etag_is_at_header(req, metadata))
req.headers['X-Backend-Etag-Is-At'] = 'X-Object-Sysmeta-My-Etag'
self.assertEqual('another etag value',
resolve_etag_is_at_header(req, metadata))
# first in list takes precedence
req.headers['X-Backend-Etag-Is-At'] = \
'X-Object-Sysmeta-My-Etag,X-Object-Sysmeta-Ec-Etag'
self.assertEqual('another etag value',
resolve_etag_is_at_header(req, metadata))
# non-existent alternates are passed over
req.headers['X-Backend-Etag-Is-At'] = \
'X-Bogus,X-Object-Sysmeta-My-Etag,X-Object-Sysmeta-Ec-Etag'
self.assertEqual('another etag value',
resolve_etag_is_at_header(req, metadata))
# spaces in list are ok
alts = 'X-Foo, X-Object-Sysmeta-My-Etag , X-Object-Sysmeta-Ec-Etag'
req.headers['X-Backend-Etag-Is-At'] = alts
self.assertEqual('another etag value',
resolve_etag_is_at_header(req, metadata))
# lower case in list is ok
alts = alts.lower()
req.headers['X-Backend-Etag-Is-At'] = alts
self.assertEqual('another etag value',
resolve_etag_is_at_header(req, metadata))
# upper case in list is ok
alts = alts.upper()
req.headers['X-Backend-Etag-Is-At'] = alts
self.assertEqual('another etag value',
resolve_etag_is_at_header(req, metadata))
metadata = {'X-Object-Sysmeta-Ec-Etag': 'an etag value',
'X-Object-Sysmeta-My-Etag': 'another etag value'}
do_test()
metadata = dict((k.lower(), v) for k, v in metadata.items())
do_test()
metadata = dict((k.upper(), v) for k, v in metadata.items())
do_test()

View File

@ -2385,6 +2385,7 @@ class TestObjectController(unittest.TestCase):
'X-Timestamp': utils.Timestamp(time()).internal,
'Content-Type': 'application/octet-stream',
'X-Object-Meta-Xtag': 'madeup',
'X-Object-Sysmeta-Xtag': 'alternate madeup',
}
req = Request.blank('/sda1/p/a/c/o', method='PUT',
headers=headers)
@ -2400,6 +2401,39 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
# match x-backend-etag-is-at, using first in list of alternates
req = Request.blank('/sda1/p/a/c/o', headers={
'If-Match': 'madeup',
'X-Backend-Etag-Is-At':
'X-Object-Meta-Xtag,X-Object-Sysmeta-Z'})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
# match x-backend-etag-is-at, using second in list of alternates
alts = 'X-Object-Sysmeta-Y,X-Object-Meta-Xtag,X-Object-Sysmeta-Z'
req = Request.blank('/sda1/p/a/c/o', headers={
'If-Match': 'madeup',
'X-Backend-Etag-Is-At': alts})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
# match x-backend-etag-is-at, choosing first of multiple alternates
alts = 'X-Object-Sysmeta-Y,X-Object-Meta-Xtag,X-Object-Sysmeta-Xtag'
req = Request.blank('/sda1/p/a/c/o', headers={
'If-Match': 'madeup',
'X-Backend-Etag-Is-At': alts})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
# match x-backend-etag-is-at, choosing first of multiple alternates
# (switches order of second two alternates from previous assertion)
alts = 'X-Object-Sysmeta-Y,X-Object-Sysmeta-Xtag,X-Object-Meta-Xtag'
req = Request.blank('/sda1/p/a/c/o', headers={
'If-Match': 'alternate madeup',
'X-Backend-Etag-Is-At': alts})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
# no match x-backend-etag-is-at
req = Request.blank('/sda1/p/a/c/o', headers={
'If-Match': real_etag,

View File

@ -122,6 +122,27 @@ class PatchedObjControllerApp(proxy_server.Application):
PatchedObjControllerApp, self).__call__(*args, **kwargs)
def make_footers_callback(body=None):
# helper method to create a footers callback that will generate some fake
# footer metadata
cont_etag = 'container update etag may differ'
crypto_etag = '20242af0cd21dd7195a10483eb7472c9'
etag_crypto_meta = \
'{"cipher": "AES_CTR_256", "iv": "sD+PSw/DfqYwpsVGSo0GEw=="}'
etag = md5(body).hexdigest() if body is not None else None
footers_to_add = {
'X-Object-Sysmeta-Container-Update-Override-Etag': cont_etag,
'X-Object-Sysmeta-Crypto-Etag': crypto_etag,
'X-Object-Sysmeta-Crypto-Meta-Etag': etag_crypto_meta,
'X-I-Feel-Lucky': 'Not blocked',
'Etag': etag}
def footers_callback(footers):
footers.update(footers_to_add)
return footers_callback
class BaseObjectControllerMixin(object):
container_info = {
'status': 200,
@ -253,10 +274,11 @@ class BaseObjectControllerMixin(object):
def test_connect_put_node_timeout(self):
controller = self.controller_cls(
self.app, 'a', 'c', 'o')
req = swift.common.swob.Request.blank('/v1/a/c/o')
self.app.conn_timeout = 0.05
with set_http_connect(slow_connect=True):
nodes = [dict(ip='', port='', device='')]
res = controller._connect_put_node(nodes, '', '', {}, ('', ''))
res = controller._connect_put_node(nodes, '', req, {}, ('', ''))
self.assertTrue(res is None)
def test_DELETE_simple(self):
@ -564,6 +586,163 @@ class TestReplicatedObjController(BaseObjectControllerMixin,
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
def test_PUT_error_with_footers(self):
footers_callback = make_footers_callback('')
env = {'swift.callback.update_footers': footers_callback}
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
environ=env)
req.headers['content-length'] = '0'
codes = [503] * self.replicas()
expect_headers = {
'X-Obj-Metadata-Footer': 'yes'
}
with set_http_connect(*codes, expect_headers=expect_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 503)
def _test_PUT_with_no_footers(self, test_body='', chunked=False):
# verify that when no footers are required then the PUT uses a regular
# single part body
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
body=test_body)
if chunked:
req.headers['Transfer-Encoding'] = 'chunked'
etag = md5(test_body).hexdigest()
req.headers['Etag'] = etag
put_requests = defaultdict(
lambda: {'headers': None, 'chunks': [], 'connection': None})
def capture_body(conn, chunk):
put_requests[conn.connection_id]['chunks'].append(chunk)
put_requests[conn.connection_id]['connection'] = conn
def capture_headers(ip, port, device, part, method, path, headers,
**kwargs):
conn_id = kwargs['connection_id']
put_requests[conn_id]['headers'] = headers
codes = [201] * self.replicas()
expect_headers = {'X-Obj-Metadata-Footer': 'yes'}
with set_http_connect(*codes, expect_headers=expect_headers,
give_send=capture_body,
give_connect=capture_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
for connection_id, info in put_requests.items():
body = ''.join(info['chunks'])
headers = info['headers']
if chunked:
body = unchunk_body(body)
self.assertEqual('100-continue', headers['Expect'])
self.assertEqual('chunked', headers['Transfer-Encoding'])
else:
self.assertNotIn('Transfer-Encoding', headers)
if body:
self.assertEqual('100-continue', headers['Expect'])
else:
self.assertNotIn('Expect', headers)
self.assertNotIn('X-Backend-Obj-Multipart-Mime-Boundary', headers)
self.assertNotIn('X-Backend-Obj-Metadata-Footer', headers)
self.assertNotIn('X-Backend-Obj-Multiphase-Commit', headers)
self.assertEqual(etag, headers['Etag'])
self.assertEqual(test_body, body)
self.assertTrue(info['connection'].closed)
def test_PUT_with_chunked_body_and_no_footers(self):
self._test_PUT_with_no_footers(test_body='asdf', chunked=True)
def test_PUT_with_body_and_no_footers(self):
self._test_PUT_with_no_footers(test_body='asdf', chunked=False)
def test_PUT_with_no_body_and_no_footers(self):
self._test_PUT_with_no_footers(test_body='', chunked=False)
def _test_PUT_with_footers(self, test_body=''):
# verify that when footers are required the PUT body is multipart
# and the footers are appended
footers_callback = make_footers_callback(test_body)
env = {'swift.callback.update_footers': footers_callback}
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
environ=env)
req.body = test_body
# send bogus Etag header to differentiate from footer value
req.headers['Etag'] = 'header_etag'
codes = [201] * self.replicas()
expect_headers = {
'X-Obj-Metadata-Footer': 'yes'
}
put_requests = defaultdict(
lambda: {'headers': None, 'chunks': [], 'connection': None})
def capture_body(conn, chunk):
put_requests[conn.connection_id]['chunks'].append(chunk)
put_requests[conn.connection_id]['connection'] = conn
def capture_headers(ip, port, device, part, method, path, headers,
**kwargs):
conn_id = kwargs['connection_id']
put_requests[conn_id]['headers'] = headers
with set_http_connect(*codes, expect_headers=expect_headers,
give_send=capture_body,
give_connect=capture_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
for connection_id, info in put_requests.items():
body = unchunk_body(''.join(info['chunks']))
headers = info['headers']
boundary = headers['X-Backend-Obj-Multipart-Mime-Boundary']
self.assertTrue(boundary is not None,
"didn't get boundary for conn %r" % (
connection_id,))
self.assertEqual('chunked', headers['Transfer-Encoding'])
self.assertEqual('100-continue', headers['Expect'])
self.assertEqual('yes', headers['X-Backend-Obj-Metadata-Footer'])
self.assertNotIn('X-Backend-Obj-Multiphase-Commit', headers)
self.assertEqual('header_etag', headers['Etag'])
# email.parser.FeedParser doesn't know how to take a multipart
# message and boundary together and parse it; it only knows how
# to take a string, parse the headers, and figure out the
# boundary on its own.
parser = email.parser.FeedParser()
parser.feed(
"Content-Type: multipart/nobodycares; boundary=%s\r\n\r\n" %
boundary)
parser.feed(body)
message = parser.close()
self.assertTrue(message.is_multipart()) # sanity check
mime_parts = message.get_payload()
# notice, no commit confirmation
self.assertEqual(len(mime_parts), 2)
obj_part, footer_part = mime_parts
self.assertEqual(obj_part['X-Document'], 'object body')
self.assertEqual(test_body, obj_part.get_payload())
# validate footer metadata
self.assertEqual(footer_part['X-Document'], 'object metadata')
footer_metadata = json.loads(footer_part.get_payload())
self.assertTrue(footer_metadata)
expected = {}
footers_callback(expected)
self.assertDictEqual(expected, footer_metadata)
self.assertTrue(info['connection'].closed)
def test_PUT_with_body_and_footers(self):
self._test_PUT_with_footers(test_body='asdf')
def test_PUT_with_no_body_and_footers(self):
self._test_PUT_with_footers()
def test_txn_id_logging_on_PUT(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
self.app.logger.txn_id = req.environ['swift.trans_id'] = 'test-txn-id'
@ -585,11 +764,15 @@ class TestReplicatedObjController(BaseObjectControllerMixin,
req.headers['Content-Length'] = '0'
req.headers['Etag'] = '"catbus"'
# The 2-tuple here makes getexpect() return 422, not 100. For
# objects that are >0 bytes, you get a 100 Continue and then a 422
# Unprocessable Entity after sending the body. For zero-byte
# objects, though, you get the 422 right away.
codes = [FakeStatus((422, 422))
# The 2-tuple here makes getexpect() return 422, not 100. For objects
# that are >0 bytes, you get a 100 Continue and then a 422
# Unprocessable Entity after sending the body. For zero-byte objects,
# though, you get the 422 right away because no Expect header is sent
# with zero-byte PUT. The second status in the tuple should not be
# consumed, it's just there to make the FakeStatus treat the first as
# an expect status, but we'll make it something other than a 422 so
# that if it is consumed then the test should fail.
codes = [FakeStatus((422, 200))
for _junk in range(self.replicas())]
with set_http_connect(*codes):
@ -707,16 +890,24 @@ class TestReplicatedObjController(BaseObjectControllerMixin,
class FakeReader(object):
def read(self, size):
raise Timeout()
conns = []
def capture_expect(conn):
# stash connections so that we can verify they all get closed
conns.append(conn)
req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT',
body='test body')
req.environ['wsgi.input'] = FakeReader()
req.headers['content-length'] = '6'
with set_http_connect(201, 201, 201):
with set_http_connect(201, 201, 201, give_expect=capture_expect):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 499)
self.assertEqual(self.replicas(), len(conns))
for conn in conns:
self.assertTrue(conn.closed)
def test_PUT_exception_during_transfer_data(self):
class FakeReader(object):
@ -1131,6 +1322,108 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
self.assertEqual(resp.status_int, 200)
self.assertIn('Accept-Ranges', resp.headers)
def _test_if_match(self, method):
num_responses = self.policy.ec_ndata if method == 'GET' else 1
def _do_test(match_value, backend_status,
etag_is_at='X-Object-Sysmeta-Does-Not-Exist'):
req = swift.common.swob.Request.blank(
'/v1/a/c/o', method=method,
headers={'If-Match': match_value,
'X-Backend-Etag-Is-At': etag_is_at})
get_resp = [backend_status] * num_responses
resp_headers = {'Etag': 'frag_etag',
'X-Object-Sysmeta-Ec-Etag': 'data_etag',
'X-Object-Sysmeta-Alternate-Etag': 'alt_etag'}
with set_http_connect(*get_resp, headers=resp_headers):
resp = req.get_response(self.app)
self.assertEqual('data_etag', resp.headers['Etag'])
return resp
# wildcard
resp = _do_test('*', 200)
self.assertEqual(resp.status_int, 200)
# match
resp = _do_test('"data_etag"', 200)
self.assertEqual(resp.status_int, 200)
# no match
resp = _do_test('"frag_etag"', 412)
self.assertEqual(resp.status_int, 412)
# match wildcard against an alternate etag
resp = _do_test('*', 200,
etag_is_at='X-Object-Sysmeta-Alternate-Etag')
self.assertEqual(resp.status_int, 200)
# match against an alternate etag
resp = _do_test('"alt_etag"', 200,
etag_is_at='X-Object-Sysmeta-Alternate-Etag')
self.assertEqual(resp.status_int, 200)
# no match against an alternate etag
resp = _do_test('"data_etag"', 412,
etag_is_at='X-Object-Sysmeta-Alternate-Etag')
self.assertEqual(resp.status_int, 412)
def test_GET_if_match(self):
self._test_if_match('GET')
def test_HEAD_if_match(self):
self._test_if_match('HEAD')
def _test_if_none_match(self, method):
num_responses = self.policy.ec_ndata if method == 'GET' else 1
def _do_test(match_value, backend_status,
etag_is_at='X-Object-Sysmeta-Does-Not-Exist'):
req = swift.common.swob.Request.blank(
'/v1/a/c/o', method=method,
headers={'If-None-Match': match_value,
'X-Backend-Etag-Is-At': etag_is_at})
get_resp = [backend_status] * num_responses
resp_headers = {'Etag': 'frag_etag',
'X-Object-Sysmeta-Ec-Etag': 'data_etag',
'X-Object-Sysmeta-Alternate-Etag': 'alt_etag'}
with set_http_connect(*get_resp, headers=resp_headers):
resp = req.get_response(self.app)
self.assertEqual('data_etag', resp.headers['Etag'])
return resp
# wildcard
resp = _do_test('*', 304)
self.assertEqual(resp.status_int, 304)
# match
resp = _do_test('"data_etag"', 304)
self.assertEqual(resp.status_int, 304)
# no match
resp = _do_test('"frag_etag"', 200)
self.assertEqual(resp.status_int, 200)
# match wildcard against an alternate etag
resp = _do_test('*', 304,
etag_is_at='X-Object-Sysmeta-Alternate-Etag')
self.assertEqual(resp.status_int, 304)
# match against an alternate etag
resp = _do_test('"alt_etag"', 304,
etag_is_at='X-Object-Sysmeta-Alternate-Etag')
self.assertEqual(resp.status_int, 304)
# no match against an alternate etag
resp = _do_test('"data_etag"', 200,
etag_is_at='X-Object-Sysmeta-Alternate-Etag')
self.assertEqual(resp.status_int, 200)
def test_GET_if_none_match(self):
self._test_if_none_match('GET')
def test_HEAD_if_none_match(self):
self._test_if_none_match('HEAD')
def test_GET_simple_x_newest(self):
req = swift.common.swob.Request.blank('/v1/a/c/o',
headers={'X-Newest': 'true'})
@ -1194,6 +1487,42 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
def test_PUT_with_body_and_bad_etag(self):
segment_size = self.policy.ec_segment_size
test_body = ('asdf' * segment_size)[:-10]
codes = [201] * self.replicas()
expect_headers = {
'X-Obj-Metadata-Footer': 'yes',
'X-Obj-Multiphase-Commit': 'yes'
}
conns = []
def capture_expect(conn):
# stash the backend connection so we can verify that it is closed
# (no data will be sent)
conns.append(conn)
# send a bad etag in the request headers
headers = {'Etag': 'bad etag'}
req = swift.common.swob.Request.blank(
'/v1/a/c/o', method='PUT', headers=headers, body=test_body)
with set_http_connect(*codes, expect_headers=expect_headers,
give_expect=capture_expect):
resp = req.get_response(self.app)
self.assertEqual(422, resp.status_int)
self.assertEqual(self.replicas(), len(conns))
for conn in conns:
self.assertTrue(conn.closed)
# make the footers callback send a bad Etag footer
footers_callback = make_footers_callback('not the test body')
env = {'swift.callback.update_footers': footers_callback}
req = swift.common.swob.Request.blank(
'/v1/a/c/o', method='PUT', environ=env, body=test_body)
with set_http_connect(*codes, expect_headers=expect_headers):
resp = req.get_response(self.app)
self.assertEqual(422, resp.status_int)
def test_txn_id_logging_ECPUT(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
body='')
@ -1399,9 +1728,15 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
self.assertEqual(resp.status_int, 500)
def test_PUT_with_body(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
segment_size = self.policy.ec_segment_size
test_body = ('asdf' * segment_size)[:-10]
# make the footers callback not include Etag footer so that we can
# verify that the correct EC-calculated Etag is included in footers
# sent to backend
footers_callback = make_footers_callback()
env = {'swift.callback.update_footers': footers_callback}
req = swift.common.swob.Request.blank(
'/v1/a/c/o', method='PUT', environ=env)
etag = md5(test_body).hexdigest()
size = len(test_body)
req.body = test_body
@ -1413,8 +1748,8 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
put_requests = defaultdict(lambda: {'boundary': None, 'chunks': []})
def capture_body(conn_id, chunk):
put_requests[conn_id]['chunks'].append(chunk)
def capture_body(conn, chunk):
put_requests[conn.connection_id]['chunks'].append(chunk)
def capture_headers(ip, port, device, part, method, path, headers,
**kwargs):
@ -1471,13 +1806,16 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
self.assertEqual(footer_part['X-Document'], 'object metadata')
footer_metadata = json.loads(footer_part.get_payload())
self.assertTrue(footer_metadata)
expected = {
'X-Object-Sysmeta-EC-Content-Length': str(size),
expected = {}
# update expected with footers from the callback...
footers_callback(expected)
expected.update({
'X-Object-Sysmeta-Ec-Content-Length': str(size),
'X-Backend-Container-Update-Override-Size': str(size),
'X-Object-Sysmeta-EC-Etag': etag,
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Backend-Container-Update-Override-Etag': etag,
'X-Object-Sysmeta-EC-Segment-Size': str(segment_size),
}
'X-Object-Sysmeta-Ec-Segment-Size': str(segment_size),
'Etag': md5(obj_part.get_payload()).hexdigest()})
for header, value in expected.items():
self.assertEqual(footer_metadata[header], value)
@ -1504,6 +1842,118 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
self.assertEqual(len(test_body), len(expected_body))
self.assertEqual(test_body, expected_body)
def test_PUT_with_footers(self):
# verify footers supplied by a footers callback being added to
# trailing metadata
segment_size = self.policy.ec_segment_size
test_body = ('asdf' * segment_size)[:-10]
etag = md5(test_body).hexdigest()
size = len(test_body)
codes = [201] * self.replicas()
expect_headers = {
'X-Obj-Metadata-Footer': 'yes',
'X-Obj-Multiphase-Commit': 'yes'
}
def do_test(footers_to_add, expect_added):
put_requests = defaultdict(
lambda: {'boundary': None, 'chunks': []})
def capture_body(conn, chunk):
put_requests[conn.connection_id]['chunks'].append(chunk)
def capture_headers(ip, port, device, part, method, path, headers,
**kwargs):
conn_id = kwargs['connection_id']
put_requests[conn_id]['boundary'] = headers[
'X-Backend-Obj-Multipart-Mime-Boundary']
def footers_callback(footers):
footers.update(footers_to_add)
env = {'swift.callback.update_footers': footers_callback}
req = swift.common.swob.Request.blank(
'/v1/a/c/o', method='PUT', environ=env, body=test_body)
with set_http_connect(*codes, expect_headers=expect_headers,
give_send=capture_body,
give_connect=capture_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
for connection_id, info in put_requests.items():
body = unchunk_body(''.join(info['chunks']))
# email.parser.FeedParser doesn't know how to take a multipart
# message and boundary together and parse it; it only knows how
# to take a string, parse the headers, and figure out the
# boundary on its own.
parser = email.parser.FeedParser()
parser.feed(
"Content-Type: multipart/nobodycares; boundary=%s\r\n\r\n"
% info['boundary'])
parser.feed(body)
message = parser.close()
self.assertTrue(message.is_multipart()) # sanity check
mime_parts = message.get_payload()
self.assertEqual(len(mime_parts), 3)
obj_part, footer_part, commit_part = mime_parts
# validate EC footer metadata - should always be present
self.assertEqual(footer_part['X-Document'], 'object metadata')
footer_metadata = json.loads(footer_part.get_payload())
self.assertIsNotNone(
footer_metadata.pop('X-Object-Sysmeta-Ec-Frag-Index'))
expected = {
'X-Object-Sysmeta-Ec-Scheme':
self.policy.ec_scheme_description,
'X-Object-Sysmeta-Ec-Content-Length': str(size),
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Segment-Size': str(segment_size),
'Etag': md5(obj_part.get_payload()).hexdigest()}
expected.update(expect_added)
for header, value in expected.items():
self.assertIn(header, footer_metadata)
self.assertEqual(value, footer_metadata[header])
footer_metadata.pop(header)
self.assertFalse(footer_metadata)
# sanity check - middleware sets no footer, expect EC overrides
footers_to_add = {}
expect_added = {
'X-Backend-Container-Update-Override-Size': str(size),
'X-Backend-Container-Update-Override-Etag': etag}
do_test(footers_to_add, expect_added)
# middleware cannot overwrite any EC sysmeta
footers_to_add = {
'X-Object-Sysmeta-Ec-Content-Length': str(size + 1),
'X-Object-Sysmeta-Ec-Etag': 'other etag',
'X-Object-Sysmeta-Ec-Segment-Size': str(segment_size + 1),
'X-Object-Sysmeta-Ec-Unused-But-Reserved': 'ignored'}
do_test(footers_to_add, expect_added)
# middleware can add x-object-sysmeta- headers including
# x-object-sysmeta-container-update-override headers
footers_to_add = {
'X-Object-Sysmeta-Foo': 'bar',
'X-Object-Sysmeta-Container-Update-Override-Size':
str(size + 1),
'X-Object-Sysmeta-Container-Update-Override-Etag': 'other etag',
'X-Object-Sysmeta-Container-Update-Override-Ping': 'pong'
}
expect_added.update(footers_to_add)
do_test(footers_to_add, expect_added)
# middleware can also overwrite x-backend-container-update-override
# headers
override_footers = {
'X-Backend-Container-Update-Override-Wham': 'bam',
'X-Backend-Container-Update-Override-Size': str(size + 2),
'X-Backend-Container-Update-Override-Etag': 'another etag'}
footers_to_add.update(override_footers)
expect_added.update(override_footers)
do_test(footers_to_add, expect_added)
def test_PUT_old_obj_server(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
body='')

View File

@ -2011,7 +2011,7 @@ class TestObjectController(unittest.TestCase):
call_count[0] += 1
commit_confirmation = \
'swift.proxy.controllers.obj.ECPutter.send_commit_confirmation'
'swift.proxy.controllers.obj.MIMEPutter.send_commit_confirmation'
with mock.patch('swift.obj.server.md5', busted_md5_constructor), \
mock.patch(commit_confirmation, mock_committer):
@ -2062,7 +2062,7 @@ class TestObjectController(unittest.TestCase):
read_footer = \
'swift.obj.server.ObjectController._read_metadata_footer'
commit_confirmation = \
'swift.proxy.controllers.obj.ECPutter.send_commit_confirmation'
'swift.proxy.controllers.obj.MIMEPutter.send_commit_confirmation'
with mock.patch(read_footer) as read_footer_call, \
mock.patch(commit_confirmation, mock_committer):