Merge "Add support for data segments to SLO and SegmentedIterable"

This commit is contained in:
Zuul 2018-02-01 12:52:55 +00:00 committed by Gerrit Code Review
commit 82844a3211
6 changed files with 706 additions and 148 deletions

View File

@ -742,7 +742,7 @@ use = egg:swift#bulk
[filter:slo]
use = egg:swift#slo
# max_manifest_segments = 1000
# max_manifest_size = 2097152
# max_manifest_size = 8388608
#
# Rate limiting applies only to segments smaller than this size (bytes).
# rate_limit_under_size = 1048576

View File

@ -188,16 +188,20 @@ class GetContext(WSGIContext):
if isinstance(seg_name, six.text_type):
seg_name = seg_name.encode("utf-8")
# (obj path, etag, size, first byte, last byte)
yield ("/" + "/".join((version, account, container,
seg_name)),
# We deliberately omit the etag and size here;
# SegmentedIterable will check size and etag if
# specified, but we don't want it to. DLOs only care
# that the objects' names match the specified prefix.
None, None,
(None if first_byte <= 0 else first_byte),
(None if last_byte >= seg_length - 1 else last_byte))
# We deliberately omit the etag and size here;
# SegmentedIterable will check size and etag if
# specified, but we don't want it to. DLOs only care
# that the objects' names match the specified prefix.
# SegmentedIterable will instead check that the data read
# from each segment matches the response headers.
_path = "/".join(["", version, account, container, seg_name])
_first = None if first_byte <= 0 else first_byte
_last = None if last_byte >= seg_length - 1 else last_byte
yield {
'path': _path,
'first_byte': _first,
'last_byte': _last
}
first_byte = max(first_byte - seg_length, -1)
last_byte = max(last_byte - seg_length, -1)

View File

@ -1,4 +1,4 @@
# Copyright (c) 2013 OpenStack Foundation
# Copyright (c) 2018 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -32,7 +32,7 @@ uploaded. The request must be a ``PUT`` with the query parameter::
?multipart-manifest=put
The body of this request will be an ordered list of segment descriptions in
JSON format. The data to be supplied for each segment is:
JSON format. The data to be supplied for each segment is either:
=========== ========================================================
Key Description
@ -44,27 +44,47 @@ etag (optional) the ETag given back when the segment object
size_bytes (optional) the size of the complete segment object in
bytes
range (optional) the (inclusive) range within the object to
use as a segment. If omitted, the entire object is used.
use as a segment. If omitted, the entire object is used
=========== ========================================================
Or:
=========== ========================================================
Key Description
=========== ========================================================
data base64-encoded data to be returned
=========== ========================================================
.. note::
At least one object-backed segment must be included. If you'd like
to create a manifest consisting purely of data segments, consider
uploading a normal object instead.
The format of the list will be::
[{"path": "/cont/object",
"etag": "etagoftheobjectsegment",
"size_bytes": 10485760,
"range": "1048576-2097151"},
{"data": base64.b64encode("interstitial data")},
{"path": "/cont/another-object", ...},
...]
The number of object segments is limited to a configurable amount, default
1000. Each segment must be at least 1 byte. On upload, the middleware will
head every segment passed in to verify:
The number of object-backed segments is limited to ``max_manifest_segments``
(configurable in proxy-server.conf, default 1000). Each segment must be at
least 1 byte. On upload, the middleware will head every object-backed segment
passed in to verify:
1. the segment exists (i.e. the ``HEAD`` was successful);
2. the segment meets minimum size requirements;
3. if the user provided a non-null ``etag``, the etag matches;
4. if the user provided a non-null ``size_bytes``, the size_bytes matches; and
5. if the user provided a ``range``, it is a singular, syntactically correct
range that is satisfiable given the size of the object.
range that is satisfiable given the size of the object referenced.
For inlined data segments, the middleware verifies each is valid, non-empty
base64-encoded binary data. Note that data segments *do not* count against
``max_manifest_segments``.
Note that the ``etag`` and ``size_bytes`` keys are optional; if omitted, the
verification is not performed. If any of the objects fail to verify (not
@ -148,13 +168,16 @@ above manifest would be::
echo -n 'etagoftheobjectsegmentone:1-2;etagoftheobjectsegmenttwo:3-4;' \
| md5sum
For the purposes of Etag computations, inlined data segments are considered to
have an etag of the md5 of the raw data (i.e., *not* base64-encoded).
-------------------
Range Specification
-------------------
Users now have the ability to specify ranges for SLO segments.
Users can now include an optional ``range`` field in segment descriptions
Users can include an optional ``range`` field in segment descriptions
to specify which bytes from the underlying object should be used for the
segment data. Only one range may be specified per segment.
@ -177,11 +200,28 @@ finally bytes 2095104 through 2097152 (i.e., the last 2048 bytes) of
.. note::
The minimum sized range is 1 byte. This is the same as the minimum
segment size.
-------------------------
Inline Data Specification
-------------------------
When uploading a manifest, users can include 'data' segments that should
be included along with objects. The data in these segments must be
base64-encoded binary data and will be included in the etag of the
resulting large object exactly as if that data had been uploaded and
referenced as separate objects.
.. note::
This feature is primarily aimed at reducing the need for storing
many tiny objects, and as such any supplied data must fit within
the maximum manifest size (default is 8MiB). This maximum size
can be configured via ``max_manifest_size`` in proxy-server.conf.
-------------------------
Retrieving a Large Object
-------------------------
@ -272,6 +312,7 @@ the manifest and the segments it's referring to) in the container and account
metadata which can be used for stats and billing purposes.
"""
import base64
from collections import defaultdict
from datetime import datetime
import json
@ -289,7 +330,7 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
from swift.common.utils import get_logger, config_true_value, \
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
register_swift_info, RateLimitedIterator, quote, close_if_possible, \
closing_if_possible, LRUCache, StreamingPile
closing_if_possible, LRUCache, StreamingPile, strict_b64decode
from swift.common.request_helpers import SegmentedIterable, \
get_sys_meta_prefix, update_etag_is_at_header
from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS
@ -299,15 +340,17 @@ from swift.common.middleware.bulk import get_response_body, \
ACCEPTABLE_FORMATS, Bulk
DEFAULT_RATE_LIMIT_UNDER_SIZE = 1024 * 1024 # 1 MiB
DEFAULT_RATE_LIMIT_UNDER_SIZE = 1024 ** 2 # 1 MiB
DEFAULT_MAX_MANIFEST_SEGMENTS = 1000
DEFAULT_MAX_MANIFEST_SIZE = 1024 * 1024 * 2 # 2 MiB
DEFAULT_MAX_MANIFEST_SIZE = 8 * (1024 ** 2) # 8 MiB
DEFAULT_YIELD_FREQUENCY = 10
REQUIRED_SLO_KEYS = set(['path'])
OPTIONAL_SLO_KEYS = set(['range', 'etag', 'size_bytes'])
ALLOWED_SLO_KEYS = REQUIRED_SLO_KEYS | OPTIONAL_SLO_KEYS
SLO_KEYS = {
# required: optional
'data': set(),
'path': {'range', 'etag', 'size_bytes'},
}
SYSMETA_SLO_ETAG = get_sys_meta_prefix('object') + 'slo-etag'
SYSMETA_SLO_SIZE = get_sys_meta_prefix('object') + 'slo-size'
@ -318,8 +361,8 @@ def parse_and_validate_input(req_body, req_path):
Given a request body, parses it and returns a list of dictionaries.
The output structure is nearly the same as the input structure, but it
is not an exact copy. Given a valid input dictionary ``d_in``, its
corresponding output dictionary ``d_out`` will be as follows:
is not an exact copy. Given a valid object-backed input dictionary
``d_in``, its corresponding output dictionary ``d_out`` will be as follows:
* d_out['etag'] == d_in['etag']
@ -333,8 +376,10 @@ def parse_and_validate_input(req_body, req_path):
corresponding swob.Range object. If d_in does not have a key
'range', neither will d_out.
:raises HTTPException: on parse errors or semantic errors (e.g. bogus
JSON structure, syntactically invalid ranges)
Inlined data dictionaries will have any extraneous padding stripped.
:raises: HTTPException on parse errors or semantic errors (e.g. bogus
JSON structure, syntactically invalid ranges)
:returns: a list of dictionaries on success
"""
@ -356,15 +401,19 @@ def parse_and_validate_input(req_body, req_path):
errors.append("Index %d: not a JSON object" % seg_index)
continue
missing_keys = [k for k in REQUIRED_SLO_KEYS if k not in seg_dict]
if missing_keys:
for required in SLO_KEYS:
if required in seg_dict:
segment_type = required
break
else:
errors.append(
"Index %d: missing keys %s"
"Index %d: expected keys to include one of %s"
% (seg_index,
", ".join('"%s"' % (mk,) for mk in sorted(missing_keys))))
" or ".join(repr(required) for required in SLO_KEYS)))
continue
extraneous_keys = [k for k in seg_dict if k not in ALLOWED_SLO_KEYS]
allowed_keys = SLO_KEYS[segment_type].union([segment_type])
extraneous_keys = [k for k in seg_dict if k not in allowed_keys]
if extraneous_keys:
errors.append(
"Index %d: extraneous keys %s"
@ -373,61 +422,84 @@ def parse_and_validate_input(req_body, req_path):
for ek in sorted(extraneous_keys))))
continue
if not isinstance(seg_dict['path'], six.string_types):
errors.append("Index %d: \"path\" must be a string" % seg_index)
continue
if not (seg_dict.get('etag') is None or
isinstance(seg_dict['etag'], six.string_types)):
errors.append('Index %d: "etag" must be a string or null '
'(if provided)' % seg_index)
continue
if '/' not in seg_dict['path'].strip('/'):
errors.append(
"Index %d: path does not refer to an object. Path must be of "
"the form /container/object." % seg_index)
continue
seg_size = seg_dict.get('size_bytes')
if seg_size is not None:
try:
seg_size = int(seg_size)
seg_dict['size_bytes'] = seg_size
except (TypeError, ValueError):
errors.append("Index %d: invalid size_bytes" % seg_index)
if segment_type == 'path':
if not isinstance(seg_dict['path'], six.string_types):
errors.append("Index %d: \"path\" must be a string" %
seg_index)
continue
if seg_size < 1 and seg_index != (len(parsed_data) - 1):
if not (seg_dict.get('etag') is None or
isinstance(seg_dict['etag'], six.string_types)):
errors.append('Index %d: "etag" must be a string or null '
'(if provided)' % seg_index)
continue
if '/' not in seg_dict['path'].strip('/'):
errors.append(
"Index %d: path does not refer to an object. Path must "
"be of the form /container/object." % seg_index)
continue
seg_size = seg_dict.get('size_bytes')
if seg_size is not None:
try:
seg_size = int(seg_size)
seg_dict['size_bytes'] = seg_size
except (TypeError, ValueError):
errors.append("Index %d: invalid size_bytes" % seg_index)
continue
if seg_size < 1 and seg_index != (len(parsed_data) - 1):
errors.append("Index %d: too small; each segment must be "
"at least 1 byte."
% (seg_index,))
continue
obj_path = '/'.join(['', vrs, account,
seg_dict['path'].lstrip('/')])
if req_path == quote(obj_path):
errors.append(
"Index %d: manifest must not include itself as a segment"
% seg_index)
continue
if seg_dict.get('range'):
try:
seg_dict['range'] = Range('bytes=%s' % seg_dict['range'])
except ValueError:
errors.append("Index %d: invalid range" % seg_index)
continue
if len(seg_dict['range'].ranges) > 1:
errors.append("Index %d: multiple ranges "
"(only one allowed)" % seg_index)
continue
# If the user *told* us the object's size, we can check range
# satisfiability right now. If they lied about the size, we'll
# fail that validation later.
if (seg_size is not None and 1 != len(
seg_dict['range'].ranges_for_length(seg_size))):
errors.append("Index %d: unsatisfiable range" % seg_index)
continue
elif segment_type == 'data':
# Validate that the supplied data is non-empty and base64-encoded
try:
data = strict_b64decode(seg_dict['data'])
except ValueError:
errors.append(
"Index %d: data must be valid base64" % seg_index)
continue
if len(data) < 1:
errors.append("Index %d: too small; each segment must be "
"at least 1 byte."
% (seg_index,))
continue
# re-encode to normalize padding
seg_dict['data'] = base64.b64encode(data)
obj_path = '/'.join(['', vrs, account, seg_dict['path'].lstrip('/')])
if req_path == quote(obj_path):
errors.append(
"Index %d: manifest must not include itself as a segment"
% seg_index)
continue
if seg_dict.get('range'):
try:
seg_dict['range'] = Range('bytes=%s' % seg_dict['range'])
except ValueError:
errors.append("Index %d: invalid range" % seg_index)
continue
if len(seg_dict['range'].ranges) > 1:
errors.append("Index %d: multiple ranges (only one allowed)"
% seg_index)
continue
# If the user *told* us the object's size, we can check range
# satisfiability right now. If they lied about the size, we'll
# fail that validation later.
if (seg_size is not None and
len(seg_dict['range'].ranges_for_length(seg_size)) != 1):
errors.append("Index %d: unsatisfiable range" % seg_index)
continue
if parsed_data and all('data' in d for d in parsed_data):
errors.append("Inline data segments require at least one "
"object-backed segment.")
if errors:
error_message = "".join(e + "\n" for e in errors)
@ -472,11 +544,20 @@ class SloGetContext(WSGIContext):
'while fetching %s, JSON-decoding of submanifest %s '
'failed with %s' % (req.path, sub_req.path, err))
def _segment_path(self, version, account, seg_dict):
return "/{ver}/{acc}/{conobj}".format(
ver=version, acc=account,
conobj=seg_dict['name'].lstrip('/')
)
def _segment_length(self, seg_dict):
"""
Returns the number of bytes that will be fetched from the specified
segment on a plain GET request for this SLO manifest.
"""
if 'raw_data' in seg_dict:
return len(seg_dict['raw_data'])
seg_range = seg_dict.get('range')
if seg_range is not None:
# The range is of the form N-M, where N and M are both positive
@ -484,7 +565,7 @@ class SloGetContext(WSGIContext):
# only thing that creates the SLO manifests stored in the
# cluster.
range_start, range_end = [int(x) for x in seg_range.split('-')]
return range_end - range_start + 1
return (range_end - range_start) + 1
else:
return int(seg_dict['bytes'])
@ -533,6 +614,9 @@ class SloGetContext(WSGIContext):
recursion_depth=1):
last_sub_path = None
for seg_dict in segments:
if 'data' in seg_dict:
seg_dict['raw_data'] = strict_b64decode(seg_dict.pop('data'))
seg_length = self._segment_length(seg_dict)
if first_byte >= seg_length:
# don't need any bytes from this segment
@ -544,16 +628,25 @@ class SloGetContext(WSGIContext):
# no bytes are needed from this or any future segment
return
if 'raw_data' in seg_dict:
yield dict(seg_dict,
first_byte=max(0, first_byte),
last_byte=min(seg_length - 1, last_byte))
first_byte -= seg_length
last_byte -= seg_length
continue
seg_range = seg_dict.get('range')
if seg_range is None:
range_start, range_end = 0, seg_length - 1
else:
# We already validated and supplied concrete values
# for the range on upload
# This simple parsing of the range is valid because we already
# validated and supplied concrete values for the range
# during SLO manifest creation
range_start, range_end = map(int, seg_range.split('-'))
if config_true_value(seg_dict.get('sub_slo')):
# do this check here so that we can avoid fetching this last
# Do this check here so that we can avoid fetching this last
# manifest before raising the exception
if recursion_depth >= self.max_slo_recursion_depth:
raise ListingIterError(
@ -568,7 +661,7 @@ class SloGetContext(WSGIContext):
last_sub_path = sub_path
# Use the existing machinery to slice into the sub-SLO.
for sub_seg_dict, sb, eb in self._byterange_listing_iterator(
for sub_seg_dict in self._byterange_listing_iterator(
req, version, account, sub_segments,
# This adjusts first_byte and last_byte to be
# relative to the sub-SLO.
@ -577,13 +670,13 @@ class SloGetContext(WSGIContext):
cached_fetch_sub_slo_segments,
recursion_depth=recursion_depth + 1):
yield sub_seg_dict, sb, eb
yield sub_seg_dict
else:
if isinstance(seg_dict['name'], six.text_type):
seg_dict['name'] = seg_dict['name'].encode("utf-8")
yield (seg_dict,
max(0, first_byte) + range_start,
min(range_end, range_start + last_byte))
yield dict(seg_dict,
first_byte=max(0, first_byte) + range_start,
last_byte=min(range_end, range_start + last_byte))
first_byte -= seg_length
last_byte -= seg_length
@ -741,6 +834,8 @@ class SloGetContext(WSGIContext):
segments = self._get_manifest_read(resp_iter)
for seg_dict in segments:
if 'data' in seg_dict:
continue
seg_dict.pop('content_type', None)
seg_dict.pop('last_modified', None)
seg_dict.pop('sub_slo', None)
@ -774,7 +869,6 @@ class SloGetContext(WSGIContext):
def get_or_head_response(self, req, resp_headers, resp_iter):
segments = self._get_manifest_read(resp_iter)
slo_etag = None
content_length = None
response_headers = []
@ -789,21 +883,38 @@ class SloGetContext(WSGIContext):
elif lheader not in ('etag', 'content-length'):
response_headers.append((header, value))
if slo_etag is None or content_length is None:
etag = md5()
content_length = 0
for seg_dict in segments:
if seg_dict.get('range'):
etag.update('%s:%s;' % (seg_dict['hash'],
seg_dict['range']))
else:
etag.update(seg_dict['hash'])
# Prep to calculate content_length & etag if necessary
if slo_etag is None:
calculated_etag = md5()
if content_length is None:
calculated_content_length = 0
for seg_dict in segments:
# Decode any inlined data; it's important that we do this *before*
# calculating the segment length and etag
if 'data' in seg_dict:
seg_dict['raw_data'] = base64.b64decode(seg_dict.pop('data'))
if slo_etag is None:
if 'raw_data' in seg_dict:
calculated_etag.update(
md5(seg_dict['raw_data']).hexdigest())
elif seg_dict.get('range'):
calculated_etag.update(
'%s:%s;' % (seg_dict['hash'], seg_dict['range']))
else:
calculated_etag.update(seg_dict['hash'])
if content_length is None:
if config_true_value(seg_dict.get('sub_slo')):
override_bytes_from_content_type(
seg_dict, logger=self.slo.logger)
content_length += self._segment_length(seg_dict)
slo_etag = etag.hexdigest()
calculated_content_length += self._segment_length(seg_dict)
if slo_etag is None:
slo_etag = calculated_etag.hexdigest()
if content_length is None:
content_length = calculated_content_length
response_headers.append(('Content-Length', str(content_length)))
response_headers.append(('Etag', '"%s"' % slo_etag))
@ -833,9 +944,13 @@ class SloGetContext(WSGIContext):
plain_listing_iter = self._segment_listing_iterator(
req, ver, account, segments, byteranges)
def is_small_segment((seg_dict, start_byte, end_byte)):
start = 0 if start_byte is None else start_byte
end = int(seg_dict['bytes']) - 1 if end_byte is None else end_byte
def ratelimit_predicate(seg_dict):
if 'raw_data' in seg_dict:
return False # it's already in memory anyway
start = seg_dict.get('start_byte') or 0
end = seg_dict.get('end_byte')
if end is None:
end = int(seg_dict['bytes']) - 1
is_small = (end - start + 1) < self.slo.rate_limit_under_size
return is_small
@ -843,17 +958,14 @@ class SloGetContext(WSGIContext):
plain_listing_iter,
self.slo.rate_limit_segments_per_sec,
limit_after=self.slo.rate_limit_after_segment,
ratelimit_if=is_small_segment)
ratelimit_if=ratelimit_predicate)
# self._segment_listing_iterator gives us 3-tuples of (segment dict,
# start byte, end byte), but SegmentedIterable wants (obj path, etag,
# size, start byte, end byte), so we clean that up here
# data segments are already in the correct format, but object-backed
# segments need a path key added
segment_listing_iter = (
("/{ver}/{acc}/{conobj}".format(
ver=ver, acc=account, conobj=seg_dict['name'].lstrip('/')),
seg_dict['hash'], int(seg_dict['bytes']),
start_byte, end_byte)
for seg_dict, start_byte, end_byte in ratelimited_listing_iter)
seg_dict if 'raw_data' in seg_dict else
dict(seg_dict, path=self._segment_path(ver, account, seg_dict))
for seg_dict in ratelimited_listing_iter)
segmented_iter = SegmentedIterable(
req, self.slo.app, segment_listing_iter,
@ -964,9 +1076,10 @@ class StaticLargeObject(object):
req.path)
problem_segments = []
if len(parsed_data) > self.max_manifest_segments:
object_segments = [seg for seg in parsed_data if 'path' in seg]
if len(object_segments) > self.max_manifest_segments:
raise HTTPRequestEntityTooLarge(
'Number of segments must be <= %d' %
'Number of object-backed segments must be <= %d' %
self.max_manifest_segments)
try:
out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS)
@ -974,10 +1087,15 @@ class StaticLargeObject(object):
out_content_type = 'text/plain' # Ignore invalid header
if not out_content_type:
out_content_type = 'text/plain'
data_for_storage = []
data_for_storage = [None] * len(parsed_data)
total_size = 0
path2indices = defaultdict(list)
for index, seg_dict in enumerate(parsed_data):
path2indices[seg_dict['path']].append(index)
if 'data' in seg_dict:
data_for_storage[index] = seg_dict
total_size += len(base64.b64decode(seg_dict['data']))
else:
path2indices[seg_dict['path']].append(index)
def do_head(obj_name):
obj_path = '/'.join(['', vrs, account,
@ -1023,30 +1141,45 @@ class StaticLargeObject(object):
problem_segments.append(
[quote(obj_name),
'Too small; each segment must be at least 1 byte.'])
if seg_dict.get('size_bytes') is not None and \
seg_dict['size_bytes'] != head_seg_resp.content_length:
_size_bytes = seg_dict.get('size_bytes')
size_mismatch = (
_size_bytes is not None and
_size_bytes != head_seg_resp.content_length
)
if size_mismatch:
problem_segments.append([quote(obj_name), 'Size Mismatch'])
if seg_dict.get('etag') is not None and \
seg_dict['etag'] != head_seg_resp.etag:
_etag = seg_dict.get('etag')
etag_mismatch = (
_etag is not None and
_etag != head_seg_resp.etag
)
if etag_mismatch:
problem_segments.append([quote(obj_name), 'Etag Mismatch'])
if head_seg_resp.last_modified:
last_modified = head_seg_resp.last_modified
else:
# shouldn't happen
last_modified = datetime.now()
last_modified_formatted = \
last_modified.strftime('%Y-%m-%dT%H:%M:%S.%f')
seg_data = {'name': '/' + seg_dict['path'].lstrip('/'),
'bytes': head_seg_resp.content_length,
'hash': head_seg_resp.etag,
'content_type': head_seg_resp.content_type,
'last_modified': last_modified_formatted}
last_modified_formatted = last_modified.strftime(
'%Y-%m-%dT%H:%M:%S.%f'
)
seg_data = {
'name': '/' + seg_dict['path'].lstrip('/'),
'bytes': head_seg_resp.content_length,
'hash': head_seg_resp.etag,
'content_type': head_seg_resp.content_type,
'last_modified': last_modified_formatted
}
if seg_dict.get('range'):
seg_data['range'] = seg_dict['range']
if config_true_value(
head_seg_resp.headers.get('X-Static-Large-Object')):
seg_data['sub_slo'] = True
return segment_length, seg_data
heartbeat = config_true_value(req.params.get('heartbeat'))
@ -1059,10 +1192,8 @@ class StaticLargeObject(object):
('Content-Type', out_content_type),
])
separator = '\r\n\r\n'
data_for_storage = [None] * len(parsed_data)
def resp_iter():
total_size = 0
def resp_iter(total_size=total_size):
# wsgi won't propagate start_response calls until some data has
# been yielded so make sure first heartbeat is sent immediately
if heartbeat:
@ -1102,7 +1233,10 @@ class StaticLargeObject(object):
slo_etag = md5()
for seg_data in data_for_storage:
if seg_data.get('range'):
if 'data' in seg_data:
raw_data = base64.b64decode(seg_data['data'])
slo_etag.update(md5(raw_data).hexdigest())
elif seg_data.get('range'):
slo_etag.update('%s:%s;' % (seg_data['hash'],
seg_data['range']))
else:
@ -1183,6 +1317,8 @@ class StaticLargeObject(object):
raise HTTPBadRequest(
'Too many buffered slo segments to delete.')
seg_data = segments.pop(0)
if 'data' in seg_data:
continue
if seg_data.get('sub_slo'):
try:
segments.extend(

View File

@ -354,12 +354,25 @@ class SegmentedIterable(object):
def _coalesce_requests(self):
start_time = time.time()
pending_req = None
pending_etag = None
pending_size = None
pending_req = pending_etag = pending_size = None
try:
for seg_path, seg_etag, seg_size, first_byte, last_byte \
in self.listing_iter:
for seg_dict in self.listing_iter:
if 'raw_data' in seg_dict:
if pending_req:
yield pending_req, pending_etag, pending_size
to_yield = seg_dict['raw_data'][
seg_dict['first_byte']:seg_dict['last_byte'] + 1]
yield to_yield, None, len(seg_dict['raw_data'])
pending_req = pending_etag = pending_size = None
continue
seg_path, seg_etag, seg_size, first_byte, last_byte = (
seg_dict['path'], seg_dict.get('hash'),
seg_dict.get('bytes'),
seg_dict['first_byte'], seg_dict['last_byte'])
if seg_size is not None:
seg_size = int(seg_size)
first_byte = first_byte or 0
go_to_end = last_byte is None or (
seg_size is not None and last_byte == seg_size - 1)
@ -441,7 +454,18 @@ class SegmentedIterable(object):
bytes_left = self.response_body_length
try:
for seg_req, seg_etag, seg_size in self._coalesce_requests():
for data_or_req, seg_etag, seg_size in self._coalesce_requests():
if isinstance(data_or_req, bytes):
chunk = data_or_req # ugly, awful overloading
if bytes_left is None:
yield chunk
elif bytes_left >= len(chunk):
yield chunk
bytes_left -= len(chunk)
else:
yield chunk[:bytes_left]
continue
seg_req = data_or_req
seg_resp = seg_req.get_response(self.app)
if not is_success(seg_resp.status_int):
close_if_possible(seg_resp.app_iter)

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import email.parser
import hashlib
import itertools
@ -205,6 +206,28 @@ class TestSloEnv(BaseEnv):
'size_bytes': None, 'range': '-1048578'},
]), parms={'multipart-manifest': 'put'})
file_item = cls.container.file("mixed-object-data-manifest")
file_item.write(
json.dumps([
{'data': base64.b64encode('APRE' * 8)},
{'path': seg_info['seg_a']['path']},
{'data': base64.b64encode('APOS' * 16)},
{'path': seg_info['seg_b']['path']},
{'data': base64.b64encode('BPOS' * 32)},
{'data': base64.b64encode('CPRE' * 64)},
{'path': seg_info['seg_c']['path']},
{'data': base64.b64encode('CPOS' * 8)},
]), parms={'multipart-manifest': 'put'}
)
file_item = cls.container.file("nested-data-manifest")
file_item.write(
json.dumps([
{'path': '%s/%s' % (cls.container.name,
"mixed-object-data-manifest")}
]), parms={'multipart-manifest': 'put'}
)
class TestSlo(Base):
env = TestSloEnv
@ -681,6 +704,25 @@ class TestSlo(Base):
self.assertEqual('application/octet-stream', actual['content_type'])
self.assertEqual(copied.etag, actual['hash'])
# Test copy manifest including data segments
source = self.env.container.file("mixed-object-data-manifest")
source_contents = source.read(parms={'multipart-manifest': 'get'})
source_json = json.loads(source_contents)
source.copy(
self.env.container.name,
"copied-mixed-object-data-manifest",
parms={'multipart-manifest': 'get'})
copied = self.env.container.file("copied-mixed-object-data-manifest")
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
try:
copied_json = json.loads(copied_contents)
except ValueError:
self.fail("COPY didn't copy the manifest (invalid json on GET)")
self.assertEqual(source_contents, copied_contents)
self.assertEqual(copied_json[0],
{'data': base64.b64encode('APRE' * 8)})
def test_slo_copy_the_manifest_updating_metadata(self):
source = self.env.container.file("manifest-abcde")
source.content_type = 'application/octet-stream'
@ -1115,6 +1157,56 @@ class TestSlo(Base):
self.assertEqual('d', contents[-2])
self.assertEqual('e', contents[-1])
def test_slo_data_segments(self):
# len('APRE' * 8) == 32
# len('APOS' * 16) == 64
# len('BPOS' * 32) == 128
# len('CPRE' * 64) == 256
# len(a_pre + seg_a + post_a) == 32 + 1024 ** 2 + 64
# len(seg_b + post_b) == 1024 ** 2 + 128
# len(c_pre + seg_c) == 256 + 1024 ** 2
# len(total) == 3146208
for file_name in ("mixed-object-data-manifest",
"nested-data-manifest"):
file_item = self.env.container.file(file_name)
file_contents = file_item.read(size=3 * 1024 ** 2 + 456,
offset=28)
grouped_file_contents = [
(char, sum(1 for _char in grp))
for char, grp in itertools.groupby(file_contents)]
self.assertEqual([
('A', 1),
('P', 1),
('R', 1),
('E', 1),
('a', 1024 * 1024),
] + [
('A', 1),
('P', 1),
('O', 1),
('S', 1),
] * 16 + [
('b', 1024 * 1024),
] + [
('B', 1),
('P', 1),
('O', 1),
('S', 1),
] * 32 + [
('C', 1),
('P', 1),
('R', 1),
('E', 1),
] * 64 + [
('c', 1024 * 1024),
] + [
('C', 1),
('P', 1),
('O', 1),
('S', 1),
], grouped_file_contents)
class TestSloUTF8(Base2, TestSlo):
pass

View File

@ -16,12 +16,15 @@
from six.moves import range
import base64
import hashlib
import json
import time
import unittest
from mock import patch
from StringIO import StringIO
from swift.common import swob, utils
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.middleware import slo
@ -707,6 +710,29 @@ class TestSloPutManifest(SloTestCase):
status, headers, body = self.call_slo(req)
self.assertEqual(status, '201 Created')
def test_handle_multipart_put_invalid_data(self):
def do_test(bad_data):
test_json_data = json.dumps([{'path': '/cont/object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100},
{'data': bad_data}])
req = Request.blank('/v1/a/c/o', body=test_json_data)
with self.assertRaises(HTTPException) as catcher:
self.slo.handle_multipart_put(req, fake_start_response)
self.assertEqual(catcher.exception.status_int, 400)
do_test('invalid') # insufficient padding
do_test(12345)
do_test(0)
do_test(True)
do_test(False)
do_test(None)
do_test({})
do_test([])
# Empties are no good, either
do_test('')
do_test('====')
def test_handle_multipart_put_success_unicode(self):
test_json_data = json.dumps([{'path': u'/cont/object\u2661',
'etag': 'etagoftheobjectsegment',
@ -2367,8 +2393,10 @@ class TestSloGetManifest(SloTestCase):
'bytes=0-3,8-11'])
# we set swift.source for everything but the first request
self.assertIsNone(self.app.swift_sources[0])
self.assertEqual(self.app.swift_sources[1:],
['SLO'] * (len(self.app.swift_sources) - 1))
self.assertEqual(
self.app.swift_sources[1:],
['SLO'] * (len(self.app.swift_sources) - 1)
)
self.assertEqual(md5hex(''.join([
md5hex('a' * 5), ':0-3;',
md5hex('a' * 5), ':1-4;',
@ -2654,18 +2682,21 @@ class TestSloGetManifest(SloTestCase):
'Etag': 'man%d' % i},
manifest_json)
submanifest_bytes = 6
for i in range(19, 0, -1):
manifest_data = [
{'name': '/gettest/obj%d' % i,
'hash': md5hex('body%02d' % i),
'bytes': '6',
'content_type': 'text/plain'},
{'data': base64.b64encode('-' * 3)},
{'name': '/gettest/man%d' % (i + 1),
'hash': 'man%d' % (i + 1),
'sub_slo': True,
'bytes': len(manifest_json),
'bytes': submanifest_bytes,
'content_type': 'application/json'}]
submanifest_bytes += 9
manifest_json = json.dumps(manifest_data)
self.app.register(
'GET', '/v1/AUTH_test/gettest/man%d' % i,
@ -2683,8 +2714,10 @@ class TestSloGetManifest(SloTestCase):
# we don't know at header-sending time that things are going to go
# wrong, so we end up with a 200 and a truncated body
self.assertEqual(status, '200 OK')
self.assertEqual(body, ('body01body02body03body04body05' +
'body06body07body08body09body10'))
self.assertEqual(headers['Content-Length'], str(9 * 19 + 6))
self.assertEqual(body, (
'body01---body02---body03---body04---body05---' +
'body06---body07---body08---body09---body10---'))
# but the error shows up in logs
self.assertEqual(self.slo.logger.get_lines_for_level('error'), [
"While processing manifest '/v1/AUTH_test/gettest/man1', "
@ -3057,6 +3090,275 @@ class TestSloGetManifest(SloTestCase):
'gettest/not_exists_obj'
])
def test_leading_data_segment(self):
slo_etag = md5hex(
md5hex('preamble') +
md5hex('a' * 5)
)
preamble = base64.b64encode('preamble')
self.app.register(
'GET', '/v1/AUTH_test/gettest/manifest-single-preamble',
swob.HTTPOk,
{
'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'
},
json.dumps([{
'data': preamble
}, {
'name': '/gettest/a_5',
'hash': md5hex('a' * 5),
'content_type': 'text/plain',
'bytes': '5',
}])
)
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-single-preamble',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
self.assertEqual('200 OK', status)
self.assertEqual(body, 'preambleaaaaa')
self.assertIn(('Etag', '"%s"' % slo_etag), headers)
self.assertIn(('Content-Length', '13'), headers)
def test_trailing_data_segment(self):
slo_etag = md5hex(
md5hex('a' * 5) +
md5hex('postamble')
)
postamble = base64.b64encode('postamble')
self.app.register(
'GET', '/v1/AUTH_test/gettest/manifest-single-postamble',
swob.HTTPOk,
{
'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'
},
json.dumps([{
'name': '/gettest/a_5',
'hash': md5hex('a' * 5),
'content_type': 'text/plain',
'bytes': '5',
}, {
'data': postamble
}])
)
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-single-postamble',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
self.assertEqual('200 OK', status)
self.assertEqual(body, 'aaaaapostamble')
self.assertIn(('Etag', '"%s"' % slo_etag), headers)
self.assertIn(('Content-Length', '14'), headers)
def test_data_segment_sandwich(self):
slo_etag = md5hex(
md5hex('preamble') +
md5hex('a' * 5) +
md5hex('postamble')
)
preamble = base64.b64encode('preamble')
postamble = base64.b64encode('postamble')
self.app.register(
'GET', '/v1/AUTH_test/gettest/manifest-single-prepostamble',
swob.HTTPOk,
{
'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'
},
json.dumps([{
'data': preamble,
}, {
'name': '/gettest/a_5',
'hash': md5hex('a' * 5),
'content_type': 'text/plain',
'bytes': '5',
}, {
'data': postamble
}])
)
# Test the whole SLO
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-single-prepostamble',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
self.assertEqual('200 OK', status)
self.assertEqual(body, 'preambleaaaaapostamble')
self.assertIn(('Etag', '"%s"' % slo_etag), headers)
self.assertIn(('Content-Length', '22'), headers)
# Test complete preamble only
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-single-prepostamble',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=0-7'})
status, headers, body = self.call_slo(req)
self.assertEqual('206 Partial Content', status)
self.assertEqual(body, 'preamble')
# Test range within preamble only
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-single-prepostamble',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=1-5'})
status, headers, body = self.call_slo(req)
self.assertEqual('206 Partial Content', status)
self.assertEqual(body, 'reamb')
# Test complete postamble only
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-single-prepostamble',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=13-21'})
status, headers, body = self.call_slo(req)
self.assertEqual('206 Partial Content', status)
self.assertEqual(body, 'postamble')
# Test partial pre and postamble
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-single-prepostamble',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=4-16'})
status, headers, body = self.call_slo(req)
self.assertEqual('206 Partial Content', status)
self.assertEqual(body, 'mbleaaaaapost')
# Test partial preamble and first byte of data
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-single-prepostamble',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=1-8'})
status, headers, body = self.call_slo(req)
self.assertEqual('206 Partial Content', status)
self.assertEqual(body, 'reamblea')
# Test last byte of segment data and partial postamble
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-single-prepostamble',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=12-16'})
status, headers, body = self.call_slo(req)
self.assertEqual('206 Partial Content', status)
self.assertEqual(body, 'apost')
def test_bunches_of_data_segments(self):
slo_etag = md5hex(
md5hex('ABCDEF') +
md5hex('a' * 5) +
md5hex('123456') +
md5hex('GHIJKL') +
md5hex('b' * 10) +
md5hex('7890@#')
)
self.app.register(
'GET', '/v1/AUTH_test/gettest/manifest-multi-prepostamble',
swob.HTTPOk,
{
'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'
},
json.dumps([
{
'data': base64.b64encode('ABCDEF'),
},
{
'name': '/gettest/a_5',
'hash': md5hex('a' * 5),
'content_type': 'text/plain',
'bytes': '5',
},
{
'data': base64.b64encode('123456')
},
{
'data': base64.b64encode('GHIJKL'),
},
{
'name': '/gettest/b_10',
'hash': md5hex('b' * 10),
'content_type': 'text/plain',
'bytes': '10',
},
{
'data': base64.b64encode('7890@#')
}
])
)
# Test the whole SLO
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-multi-prepostamble',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
self.assertEqual('200 OK', status)
self.assertEqual(body, 'ABCDEFaaaaa123456GHIJKLbbbbbbbbbb7890@#')
self.assertIn(('Etag', '"%s"' % slo_etag), headers)
self.assertIn(('Content-Length', '39'), headers)
# Test last byte first pre-amble to first byte of second postamble
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-multi-prepostamble',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=5-33'})
status, headers, body = self.call_slo(req)
self.assertEqual('206 Partial Content', status)
self.assertEqual(body, 'Faaaaa123456GHIJKLbbbbbbbbbb7')
# Test only second complete preamble
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-multi-prepostamble',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=17-22'})
status, headers, body = self.call_slo(req)
self.assertEqual('206 Partial Content', status)
self.assertEqual(body, 'GHIJKL')
# Test only first complete postamble
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-multi-prepostamble',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=11-16'})
status, headers, body = self.call_slo(req)
self.assertEqual('206 Partial Content', status)
self.assertEqual(body, '123456')
# Test only range within first postamble
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-multi-prepostamble',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=12-15'})
status, headers, body = self.call_slo(req)
self.assertEqual('206 Partial Content', status)
self.assertEqual(body, '2345')
# Test only range within first postamble and second preamble
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-multi-prepostamble',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=12-18'})
status, headers, body = self.call_slo(req)
self.assertEqual('206 Partial Content', status)
self.assertEqual(body, '23456GH')
class TestSloConditionalGetOldManifest(SloTestCase):
slo_data = [
@ -3324,7 +3626,7 @@ class TestSwiftInfo(unittest.TestCase):
self.assertEqual(swift_info['slo'].get('max_manifest_size'),
mware.max_manifest_size)
self.assertEqual(1000, mware.max_manifest_segments)
self.assertEqual(2097152, mware.max_manifest_size)
self.assertEqual(8388608, mware.max_manifest_size)
self.assertEqual(1048576, mware.rate_limit_under_size)
self.assertEqual(10, mware.rate_limit_after_segment)
self.assertEqual(1, mware.rate_limit_segments_per_sec)