From 11bf9e45884aa5122e76a97cf62062d7b0bdfe54 Mon Sep 17 00:00:00 2001 From: Joel Wright Date: Sun, 4 Sep 2016 23:51:53 +0100 Subject: [PATCH] Add support for data segments to SLO and SegmentedIterable This patch updates the SLO middleware and SegmentedIterable to add support for user-specified inlined-data segments. Such segments will contain base64-encoded data to be added before/after an object-backed segment within an SLO. To accommodate the potential extra data we increase the default SLO maximum manifest size from 2MiB to 8MiB. The default maximum number of segments remains 1000, but this will only be enforced for object-backed segments. This patch is a prerequisite for a future patch enabling the download of large objects as tarballs. The TLO patch will be added as a dependent patch later. UpgradeImpact ============= During a rolling upgrade, an updated proxy may write a manifest that out-of-date proxies will not be able to read. This will resolve itself once the upgrade completes on all nodes. Change-Id: Ib8dc216a84d370e6da7d6b819af79582b671d699 --- etc/proxy-server.conf-sample | 2 +- swift/common/middleware/dlo.py | 24 +- swift/common/middleware/slo.py | 386 ++++++++++++++++-------- swift/common/request_helpers.py | 36 ++- test/functional/test_slo.py | 92 ++++++ test/unit/common/middleware/test_slo.py | 314 ++++++++++++++++++- 6 files changed, 706 insertions(+), 148 deletions(-) diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index d4a81fab54..fb23c06bab 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -738,7 +738,7 @@ use = egg:swift#bulk [filter:slo] use = egg:swift#slo # max_manifest_segments = 1000 -# max_manifest_size = 2097152 +# max_manifest_size = 8388608 # # Rate limiting applies only to segments smaller than this size (bytes). # rate_limit_under_size = 1048576 diff --git a/swift/common/middleware/dlo.py b/swift/common/middleware/dlo.py index 91728a5989..4c4ce00bff 100644 --- a/swift/common/middleware/dlo.py +++ b/swift/common/middleware/dlo.py @@ -188,16 +188,20 @@ class GetContext(WSGIContext): if isinstance(seg_name, six.text_type): seg_name = seg_name.encode("utf-8") - # (obj path, etag, size, first byte, last byte) - yield ("/" + "/".join((version, account, container, - seg_name)), - # We deliberately omit the etag and size here; - # SegmentedIterable will check size and etag if - # specified, but we don't want it to. DLOs only care - # that the objects' names match the specified prefix. - None, None, - (None if first_byte <= 0 else first_byte), - (None if last_byte >= seg_length - 1 else last_byte)) + # We deliberately omit the etag and size here; + # SegmentedIterable will check size and etag if + # specified, but we don't want it to. DLOs only care + # that the objects' names match the specified prefix. + # SegmentedIterable will instead check that the data read + # from each segment matches the response headers. + _path = "/".join(["", version, account, container, seg_name]) + _first = None if first_byte <= 0 else first_byte + _last = None if last_byte >= seg_length - 1 else last_byte + yield { + 'path': _path, + 'first_byte': _first, + 'last_byte': _last + } first_byte = max(first_byte - seg_length, -1) last_byte = max(last_byte - seg_length, -1) diff --git a/swift/common/middleware/slo.py b/swift/common/middleware/slo.py index 433c9d6c3c..aaf6fe0381 100644 --- a/swift/common/middleware/slo.py +++ b/swift/common/middleware/slo.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013 OpenStack Foundation +# Copyright (c) 2018 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -32,7 +32,7 @@ uploaded. The request must be a ``PUT`` with the query parameter:: ?multipart-manifest=put The body of this request will be an ordered list of segment descriptions in -JSON format. The data to be supplied for each segment is: +JSON format. The data to be supplied for each segment is either: =========== ======================================================== Key Description @@ -44,27 +44,47 @@ etag (optional) the ETag given back when the segment object size_bytes (optional) the size of the complete segment object in bytes range (optional) the (inclusive) range within the object to - use as a segment. If omitted, the entire object is used. + use as a segment. If omitted, the entire object is used =========== ======================================================== +Or: + +=========== ======================================================== +Key Description +=========== ======================================================== +data base64-encoded data to be returned +=========== ======================================================== + +.. note:: + At least one object-backed segment must be included. If you'd like + to create a manifest consisting purely of data segments, consider + uploading a normal object instead. + The format of the list will be:: [{"path": "/cont/object", "etag": "etagoftheobjectsegment", "size_bytes": 10485760, "range": "1048576-2097151"}, + {"data": base64.b64encode("interstitial data")}, + {"path": "/cont/another-object", ...}, ...] -The number of object segments is limited to a configurable amount, default -1000. Each segment must be at least 1 byte. On upload, the middleware will -head every segment passed in to verify: +The number of object-backed segments is limited to ``max_manifest_segments`` +(configurable in proxy-server.conf, default 1000). Each segment must be at +least 1 byte. On upload, the middleware will head every object-backed segment +passed in to verify: 1. the segment exists (i.e. the ``HEAD`` was successful); 2. the segment meets minimum size requirements; 3. if the user provided a non-null ``etag``, the etag matches; 4. if the user provided a non-null ``size_bytes``, the size_bytes matches; and 5. if the user provided a ``range``, it is a singular, syntactically correct - range that is satisfiable given the size of the object. + range that is satisfiable given the size of the object referenced. + +For inlined data segments, the middleware verifies each is valid, non-empty +base64-encoded binary data. Note that data segments *do not* count against +``max_manifest_segments``. Note that the ``etag`` and ``size_bytes`` keys are optional; if omitted, the verification is not performed. If any of the objects fail to verify (not @@ -148,13 +168,16 @@ above manifest would be:: echo -n 'etagoftheobjectsegmentone:1-2;etagoftheobjectsegmenttwo:3-4;' \ | md5sum +For the purposes of Etag computations, inlined data segments are considered to +have an etag of the md5 of the raw data (i.e., *not* base64-encoded). + ------------------- Range Specification ------------------- Users now have the ability to specify ranges for SLO segments. -Users can now include an optional ``range`` field in segment descriptions +Users can include an optional ``range`` field in segment descriptions to specify which bytes from the underlying object should be used for the segment data. Only one range may be specified per segment. @@ -177,11 +200,28 @@ finally bytes 2095104 through 2097152 (i.e., the last 2048 bytes) of .. note:: - The minimum sized range is 1 byte. This is the same as the minimum segment size. +------------------------- +Inline Data Specification +------------------------- + +When uploading a manifest, users can include 'data' segments that should +be included along with objects. The data in these segments must be +base64-encoded binary data and will be included in the etag of the +resulting large object exactly as if that data had been uploaded and +referenced as separate objects. + +.. note:: + + This feature is primarily aimed at reducing the need for storing + many tiny objects, and as such any supplied data must fit within + the maximum manifest size (default is 8MiB). This maximum size + can be configured via ``max_manifest_size`` in proxy-server.conf. + + ------------------------- Retrieving a Large Object ------------------------- @@ -272,6 +312,7 @@ the manifest and the segments it's referring to) in the container and account metadata which can be used for stats and billing purposes. """ +import base64 from collections import defaultdict from datetime import datetime import json @@ -289,7 +330,7 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \ from swift.common.utils import get_logger, config_true_value, \ get_valid_utf8_str, override_bytes_from_content_type, split_path, \ register_swift_info, RateLimitedIterator, quote, close_if_possible, \ - closing_if_possible, LRUCache, StreamingPile + closing_if_possible, LRUCache, StreamingPile, strict_b64decode from swift.common.request_helpers import SegmentedIterable, \ get_sys_meta_prefix, update_etag_is_at_header from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS @@ -299,15 +340,17 @@ from swift.common.middleware.bulk import get_response_body, \ ACCEPTABLE_FORMATS, Bulk -DEFAULT_RATE_LIMIT_UNDER_SIZE = 1024 * 1024 # 1 MiB +DEFAULT_RATE_LIMIT_UNDER_SIZE = 1024 ** 2 # 1 MiB DEFAULT_MAX_MANIFEST_SEGMENTS = 1000 -DEFAULT_MAX_MANIFEST_SIZE = 1024 * 1024 * 2 # 2 MiB +DEFAULT_MAX_MANIFEST_SIZE = 8 * (1024 ** 2) # 8 MiB DEFAULT_YIELD_FREQUENCY = 10 -REQUIRED_SLO_KEYS = set(['path']) -OPTIONAL_SLO_KEYS = set(['range', 'etag', 'size_bytes']) -ALLOWED_SLO_KEYS = REQUIRED_SLO_KEYS | OPTIONAL_SLO_KEYS +SLO_KEYS = { + # required: optional + 'data': set(), + 'path': {'range', 'etag', 'size_bytes'}, +} SYSMETA_SLO_ETAG = get_sys_meta_prefix('object') + 'slo-etag' SYSMETA_SLO_SIZE = get_sys_meta_prefix('object') + 'slo-size' @@ -318,8 +361,8 @@ def parse_and_validate_input(req_body, req_path): Given a request body, parses it and returns a list of dictionaries. The output structure is nearly the same as the input structure, but it - is not an exact copy. Given a valid input dictionary ``d_in``, its - corresponding output dictionary ``d_out`` will be as follows: + is not an exact copy. Given a valid object-backed input dictionary + ``d_in``, its corresponding output dictionary ``d_out`` will be as follows: * d_out['etag'] == d_in['etag'] @@ -333,8 +376,10 @@ def parse_and_validate_input(req_body, req_path): corresponding swob.Range object. If d_in does not have a key 'range', neither will d_out. - :raises HTTPException: on parse errors or semantic errors (e.g. bogus - JSON structure, syntactically invalid ranges) + Inlined data dictionaries will have any extraneous padding stripped. + + :raises: HTTPException on parse errors or semantic errors (e.g. bogus + JSON structure, syntactically invalid ranges) :returns: a list of dictionaries on success """ @@ -356,15 +401,19 @@ def parse_and_validate_input(req_body, req_path): errors.append("Index %d: not a JSON object" % seg_index) continue - missing_keys = [k for k in REQUIRED_SLO_KEYS if k not in seg_dict] - if missing_keys: + for required in SLO_KEYS: + if required in seg_dict: + segment_type = required + break + else: errors.append( - "Index %d: missing keys %s" + "Index %d: expected keys to include one of %s" % (seg_index, - ", ".join('"%s"' % (mk,) for mk in sorted(missing_keys)))) + " or ".join(repr(required) for required in SLO_KEYS))) continue - extraneous_keys = [k for k in seg_dict if k not in ALLOWED_SLO_KEYS] + allowed_keys = SLO_KEYS[segment_type].union([segment_type]) + extraneous_keys = [k for k in seg_dict if k not in allowed_keys] if extraneous_keys: errors.append( "Index %d: extraneous keys %s" @@ -373,61 +422,84 @@ def parse_and_validate_input(req_body, req_path): for ek in sorted(extraneous_keys)))) continue - if not isinstance(seg_dict['path'], six.string_types): - errors.append("Index %d: \"path\" must be a string" % seg_index) - continue - if not (seg_dict.get('etag') is None or - isinstance(seg_dict['etag'], six.string_types)): - errors.append('Index %d: "etag" must be a string or null ' - '(if provided)' % seg_index) - continue - - if '/' not in seg_dict['path'].strip('/'): - errors.append( - "Index %d: path does not refer to an object. Path must be of " - "the form /container/object." % seg_index) - continue - - seg_size = seg_dict.get('size_bytes') - if seg_size is not None: - try: - seg_size = int(seg_size) - seg_dict['size_bytes'] = seg_size - except (TypeError, ValueError): - errors.append("Index %d: invalid size_bytes" % seg_index) + if segment_type == 'path': + if not isinstance(seg_dict['path'], six.string_types): + errors.append("Index %d: \"path\" must be a string" % + seg_index) continue - if seg_size < 1 and seg_index != (len(parsed_data) - 1): + if not (seg_dict.get('etag') is None or + isinstance(seg_dict['etag'], six.string_types)): + errors.append('Index %d: "etag" must be a string or null ' + '(if provided)' % seg_index) + continue + + if '/' not in seg_dict['path'].strip('/'): + errors.append( + "Index %d: path does not refer to an object. Path must " + "be of the form /container/object." % seg_index) + continue + + seg_size = seg_dict.get('size_bytes') + if seg_size is not None: + try: + seg_size = int(seg_size) + seg_dict['size_bytes'] = seg_size + except (TypeError, ValueError): + errors.append("Index %d: invalid size_bytes" % seg_index) + continue + if seg_size < 1 and seg_index != (len(parsed_data) - 1): + errors.append("Index %d: too small; each segment must be " + "at least 1 byte." + % (seg_index,)) + continue + + obj_path = '/'.join(['', vrs, account, + seg_dict['path'].lstrip('/')]) + if req_path == quote(obj_path): + errors.append( + "Index %d: manifest must not include itself as a segment" + % seg_index) + continue + + if seg_dict.get('range'): + try: + seg_dict['range'] = Range('bytes=%s' % seg_dict['range']) + except ValueError: + errors.append("Index %d: invalid range" % seg_index) + continue + + if len(seg_dict['range'].ranges) > 1: + errors.append("Index %d: multiple ranges " + "(only one allowed)" % seg_index) + continue + + # If the user *told* us the object's size, we can check range + # satisfiability right now. If they lied about the size, we'll + # fail that validation later. + if (seg_size is not None and 1 != len( + seg_dict['range'].ranges_for_length(seg_size))): + errors.append("Index %d: unsatisfiable range" % seg_index) + continue + + elif segment_type == 'data': + # Validate that the supplied data is non-empty and base64-encoded + try: + data = strict_b64decode(seg_dict['data']) + except ValueError: + errors.append( + "Index %d: data must be valid base64" % seg_index) + continue + if len(data) < 1: errors.append("Index %d: too small; each segment must be " "at least 1 byte." % (seg_index,)) continue + # re-encode to normalize padding + seg_dict['data'] = base64.b64encode(data) - obj_path = '/'.join(['', vrs, account, seg_dict['path'].lstrip('/')]) - if req_path == quote(obj_path): - errors.append( - "Index %d: manifest must not include itself as a segment" - % seg_index) - continue - - if seg_dict.get('range'): - try: - seg_dict['range'] = Range('bytes=%s' % seg_dict['range']) - except ValueError: - errors.append("Index %d: invalid range" % seg_index) - continue - - if len(seg_dict['range'].ranges) > 1: - errors.append("Index %d: multiple ranges (only one allowed)" - % seg_index) - continue - - # If the user *told* us the object's size, we can check range - # satisfiability right now. If they lied about the size, we'll - # fail that validation later. - if (seg_size is not None and - len(seg_dict['range'].ranges_for_length(seg_size)) != 1): - errors.append("Index %d: unsatisfiable range" % seg_index) - continue + if parsed_data and all('data' in d for d in parsed_data): + errors.append("Inline data segments require at least one " + "object-backed segment.") if errors: error_message = "".join(e + "\n" for e in errors) @@ -472,11 +544,20 @@ class SloGetContext(WSGIContext): 'while fetching %s, JSON-decoding of submanifest %s ' 'failed with %s' % (req.path, sub_req.path, err)) + def _segment_path(self, version, account, seg_dict): + return "/{ver}/{acc}/{conobj}".format( + ver=version, acc=account, + conobj=seg_dict['name'].lstrip('/') + ) + def _segment_length(self, seg_dict): """ Returns the number of bytes that will be fetched from the specified segment on a plain GET request for this SLO manifest. """ + if 'raw_data' in seg_dict: + return len(seg_dict['raw_data']) + seg_range = seg_dict.get('range') if seg_range is not None: # The range is of the form N-M, where N and M are both positive @@ -484,7 +565,7 @@ class SloGetContext(WSGIContext): # only thing that creates the SLO manifests stored in the # cluster. range_start, range_end = [int(x) for x in seg_range.split('-')] - return range_end - range_start + 1 + return (range_end - range_start) + 1 else: return int(seg_dict['bytes']) @@ -533,6 +614,9 @@ class SloGetContext(WSGIContext): recursion_depth=1): last_sub_path = None for seg_dict in segments: + if 'data' in seg_dict: + seg_dict['raw_data'] = strict_b64decode(seg_dict.pop('data')) + seg_length = self._segment_length(seg_dict) if first_byte >= seg_length: # don't need any bytes from this segment @@ -544,16 +628,25 @@ class SloGetContext(WSGIContext): # no bytes are needed from this or any future segment return + if 'raw_data' in seg_dict: + yield dict(seg_dict, + first_byte=max(0, first_byte), + last_byte=min(seg_length - 1, last_byte)) + first_byte -= seg_length + last_byte -= seg_length + continue + seg_range = seg_dict.get('range') if seg_range is None: range_start, range_end = 0, seg_length - 1 else: - # We already validated and supplied concrete values - # for the range on upload + # This simple parsing of the range is valid because we already + # validated and supplied concrete values for the range + # during SLO manifest creation range_start, range_end = map(int, seg_range.split('-')) if config_true_value(seg_dict.get('sub_slo')): - # do this check here so that we can avoid fetching this last + # Do this check here so that we can avoid fetching this last # manifest before raising the exception if recursion_depth >= self.max_slo_recursion_depth: raise ListingIterError( @@ -568,7 +661,7 @@ class SloGetContext(WSGIContext): last_sub_path = sub_path # Use the existing machinery to slice into the sub-SLO. - for sub_seg_dict, sb, eb in self._byterange_listing_iterator( + for sub_seg_dict in self._byterange_listing_iterator( req, version, account, sub_segments, # This adjusts first_byte and last_byte to be # relative to the sub-SLO. @@ -577,13 +670,13 @@ class SloGetContext(WSGIContext): cached_fetch_sub_slo_segments, recursion_depth=recursion_depth + 1): - yield sub_seg_dict, sb, eb + yield sub_seg_dict else: if isinstance(seg_dict['name'], six.text_type): seg_dict['name'] = seg_dict['name'].encode("utf-8") - yield (seg_dict, - max(0, first_byte) + range_start, - min(range_end, range_start + last_byte)) + yield dict(seg_dict, + first_byte=max(0, first_byte) + range_start, + last_byte=min(range_end, range_start + last_byte)) first_byte -= seg_length last_byte -= seg_length @@ -741,6 +834,8 @@ class SloGetContext(WSGIContext): segments = self._get_manifest_read(resp_iter) for seg_dict in segments: + if 'data' in seg_dict: + continue seg_dict.pop('content_type', None) seg_dict.pop('last_modified', None) seg_dict.pop('sub_slo', None) @@ -774,7 +869,6 @@ class SloGetContext(WSGIContext): def get_or_head_response(self, req, resp_headers, resp_iter): segments = self._get_manifest_read(resp_iter) - slo_etag = None content_length = None response_headers = [] @@ -789,21 +883,38 @@ class SloGetContext(WSGIContext): elif lheader not in ('etag', 'content-length'): response_headers.append((header, value)) - if slo_etag is None or content_length is None: - etag = md5() - content_length = 0 - for seg_dict in segments: - if seg_dict.get('range'): - etag.update('%s:%s;' % (seg_dict['hash'], - seg_dict['range'])) - else: - etag.update(seg_dict['hash']) + # Prep to calculate content_length & etag if necessary + if slo_etag is None: + calculated_etag = md5() + if content_length is None: + calculated_content_length = 0 + for seg_dict in segments: + # Decode any inlined data; it's important that we do this *before* + # calculating the segment length and etag + if 'data' in seg_dict: + seg_dict['raw_data'] = base64.b64decode(seg_dict.pop('data')) + + if slo_etag is None: + if 'raw_data' in seg_dict: + calculated_etag.update( + md5(seg_dict['raw_data']).hexdigest()) + elif seg_dict.get('range'): + calculated_etag.update( + '%s:%s;' % (seg_dict['hash'], seg_dict['range'])) + else: + calculated_etag.update(seg_dict['hash']) + + if content_length is None: if config_true_value(seg_dict.get('sub_slo')): override_bytes_from_content_type( seg_dict, logger=self.slo.logger) - content_length += self._segment_length(seg_dict) - slo_etag = etag.hexdigest() + calculated_content_length += self._segment_length(seg_dict) + + if slo_etag is None: + slo_etag = calculated_etag.hexdigest() + if content_length is None: + content_length = calculated_content_length response_headers.append(('Content-Length', str(content_length))) response_headers.append(('Etag', '"%s"' % slo_etag)) @@ -833,9 +944,13 @@ class SloGetContext(WSGIContext): plain_listing_iter = self._segment_listing_iterator( req, ver, account, segments, byteranges) - def is_small_segment((seg_dict, start_byte, end_byte)): - start = 0 if start_byte is None else start_byte - end = int(seg_dict['bytes']) - 1 if end_byte is None else end_byte + def ratelimit_predicate(seg_dict): + if 'raw_data' in seg_dict: + return False # it's already in memory anyway + start = seg_dict.get('start_byte') or 0 + end = seg_dict.get('end_byte') + if end is None: + end = int(seg_dict['bytes']) - 1 is_small = (end - start + 1) < self.slo.rate_limit_under_size return is_small @@ -843,17 +958,14 @@ class SloGetContext(WSGIContext): plain_listing_iter, self.slo.rate_limit_segments_per_sec, limit_after=self.slo.rate_limit_after_segment, - ratelimit_if=is_small_segment) + ratelimit_if=ratelimit_predicate) - # self._segment_listing_iterator gives us 3-tuples of (segment dict, - # start byte, end byte), but SegmentedIterable wants (obj path, etag, - # size, start byte, end byte), so we clean that up here + # data segments are already in the correct format, but object-backed + # segments need a path key added segment_listing_iter = ( - ("/{ver}/{acc}/{conobj}".format( - ver=ver, acc=account, conobj=seg_dict['name'].lstrip('/')), - seg_dict['hash'], int(seg_dict['bytes']), - start_byte, end_byte) - for seg_dict, start_byte, end_byte in ratelimited_listing_iter) + seg_dict if 'raw_data' in seg_dict else + dict(seg_dict, path=self._segment_path(ver, account, seg_dict)) + for seg_dict in ratelimited_listing_iter) segmented_iter = SegmentedIterable( req, self.slo.app, segment_listing_iter, @@ -966,9 +1078,10 @@ class StaticLargeObject(object): req.path) problem_segments = [] - if len(parsed_data) > self.max_manifest_segments: + object_segments = [seg for seg in parsed_data if 'path' in seg] + if len(object_segments) > self.max_manifest_segments: raise HTTPRequestEntityTooLarge( - 'Number of segments must be <= %d' % + 'Number of object-backed segments must be <= %d' % self.max_manifest_segments) try: out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS) @@ -976,10 +1089,15 @@ class StaticLargeObject(object): out_content_type = 'text/plain' # Ignore invalid header if not out_content_type: out_content_type = 'text/plain' - data_for_storage = [] + data_for_storage = [None] * len(parsed_data) + total_size = 0 path2indices = defaultdict(list) for index, seg_dict in enumerate(parsed_data): - path2indices[seg_dict['path']].append(index) + if 'data' in seg_dict: + data_for_storage[index] = seg_dict + total_size += len(base64.b64decode(seg_dict['data'])) + else: + path2indices[seg_dict['path']].append(index) def do_head(obj_name): obj_path = '/'.join(['', vrs, account, @@ -1025,30 +1143,45 @@ class StaticLargeObject(object): problem_segments.append( [quote(obj_name), 'Too small; each segment must be at least 1 byte.']) - if seg_dict.get('size_bytes') is not None and \ - seg_dict['size_bytes'] != head_seg_resp.content_length: + + _size_bytes = seg_dict.get('size_bytes') + size_mismatch = ( + _size_bytes is not None and + _size_bytes != head_seg_resp.content_length + ) + if size_mismatch: problem_segments.append([quote(obj_name), 'Size Mismatch']) - if seg_dict.get('etag') is not None and \ - seg_dict['etag'] != head_seg_resp.etag: + + _etag = seg_dict.get('etag') + etag_mismatch = ( + _etag is not None and + _etag != head_seg_resp.etag + ) + if etag_mismatch: problem_segments.append([quote(obj_name), 'Etag Mismatch']) + if head_seg_resp.last_modified: last_modified = head_seg_resp.last_modified else: # shouldn't happen last_modified = datetime.now() - last_modified_formatted = \ - last_modified.strftime('%Y-%m-%dT%H:%M:%S.%f') - seg_data = {'name': '/' + seg_dict['path'].lstrip('/'), - 'bytes': head_seg_resp.content_length, - 'hash': head_seg_resp.etag, - 'content_type': head_seg_resp.content_type, - 'last_modified': last_modified_formatted} + last_modified_formatted = last_modified.strftime( + '%Y-%m-%dT%H:%M:%S.%f' + ) + seg_data = { + 'name': '/' + seg_dict['path'].lstrip('/'), + 'bytes': head_seg_resp.content_length, + 'hash': head_seg_resp.etag, + 'content_type': head_seg_resp.content_type, + 'last_modified': last_modified_formatted + } if seg_dict.get('range'): seg_data['range'] = seg_dict['range'] if config_true_value( head_seg_resp.headers.get('X-Static-Large-Object')): seg_data['sub_slo'] = True + return segment_length, seg_data heartbeat = config_true_value(req.params.get('heartbeat')) @@ -1061,10 +1194,8 @@ class StaticLargeObject(object): ('Content-Type', out_content_type), ]) separator = '\r\n\r\n' - data_for_storage = [None] * len(parsed_data) - def resp_iter(): - total_size = 0 + def resp_iter(total_size=total_size): # wsgi won't propagate start_response calls until some data has # been yielded so make sure first heartbeat is sent immediately if heartbeat: @@ -1104,7 +1235,10 @@ class StaticLargeObject(object): slo_etag = md5() for seg_data in data_for_storage: - if seg_data.get('range'): + if 'data' in seg_data: + raw_data = base64.b64decode(seg_data['data']) + slo_etag.update(md5(raw_data).hexdigest()) + elif seg_data.get('range'): slo_etag.update('%s:%s;' % (seg_data['hash'], seg_data['range'])) else: @@ -1185,6 +1319,8 @@ class StaticLargeObject(object): raise HTTPBadRequest( 'Too many buffered slo segments to delete.') seg_data = segments.pop(0) + if 'data' in seg_data: + continue if seg_data.get('sub_slo'): try: segments.extend( diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index 1ac348dec5..ec6ceb2c92 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -354,12 +354,25 @@ class SegmentedIterable(object): def _coalesce_requests(self): start_time = time.time() - pending_req = None - pending_etag = None - pending_size = None + pending_req = pending_etag = pending_size = None try: - for seg_path, seg_etag, seg_size, first_byte, last_byte \ - in self.listing_iter: + for seg_dict in self.listing_iter: + if 'raw_data' in seg_dict: + if pending_req: + yield pending_req, pending_etag, pending_size + + to_yield = seg_dict['raw_data'][ + seg_dict['first_byte']:seg_dict['last_byte'] + 1] + yield to_yield, None, len(seg_dict['raw_data']) + pending_req = pending_etag = pending_size = None + continue + + seg_path, seg_etag, seg_size, first_byte, last_byte = ( + seg_dict['path'], seg_dict.get('hash'), + seg_dict.get('bytes'), + seg_dict['first_byte'], seg_dict['last_byte']) + if seg_size is not None: + seg_size = int(seg_size) first_byte = first_byte or 0 go_to_end = last_byte is None or ( seg_size is not None and last_byte == seg_size - 1) @@ -441,7 +454,18 @@ class SegmentedIterable(object): bytes_left = self.response_body_length try: - for seg_req, seg_etag, seg_size in self._coalesce_requests(): + for data_or_req, seg_etag, seg_size in self._coalesce_requests(): + if isinstance(data_or_req, bytes): + chunk = data_or_req # ugly, awful overloading + if bytes_left is None: + yield chunk + elif bytes_left >= len(chunk): + yield chunk + bytes_left -= len(chunk) + else: + yield chunk[:bytes_left] + continue + seg_req = data_or_req seg_resp = seg_req.get_response(self.app) if not is_success(seg_resp.status_int): close_if_possible(seg_resp.app_iter) diff --git a/test/functional/test_slo.py b/test/functional/test_slo.py index a4bcd8d9eb..3d4d1e7c41 100644 --- a/test/functional/test_slo.py +++ b/test/functional/test_slo.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import email.parser import hashlib import itertools @@ -205,6 +206,28 @@ class TestSloEnv(BaseEnv): 'size_bytes': None, 'range': '-1048578'}, ]), parms={'multipart-manifest': 'put'}) + file_item = cls.container.file("mixed-object-data-manifest") + file_item.write( + json.dumps([ + {'data': base64.b64encode('APRE' * 8)}, + {'path': seg_info['seg_a']['path']}, + {'data': base64.b64encode('APOS' * 16)}, + {'path': seg_info['seg_b']['path']}, + {'data': base64.b64encode('BPOS' * 32)}, + {'data': base64.b64encode('CPRE' * 64)}, + {'path': seg_info['seg_c']['path']}, + {'data': base64.b64encode('CPOS' * 8)}, + ]), parms={'multipart-manifest': 'put'} + ) + + file_item = cls.container.file("nested-data-manifest") + file_item.write( + json.dumps([ + {'path': '%s/%s' % (cls.container.name, + "mixed-object-data-manifest")} + ]), parms={'multipart-manifest': 'put'} + ) + class TestSlo(Base): env = TestSloEnv @@ -671,6 +694,25 @@ class TestSlo(Base): self.assertEqual('application/octet-stream', actual['content_type']) self.assertEqual(copied.etag, actual['hash']) + # Test copy manifest including data segments + source = self.env.container.file("mixed-object-data-manifest") + source_contents = source.read(parms={'multipart-manifest': 'get'}) + source_json = json.loads(source_contents) + source.copy( + self.env.container.name, + "copied-mixed-object-data-manifest", + parms={'multipart-manifest': 'get'}) + + copied = self.env.container.file("copied-mixed-object-data-manifest") + copied_contents = copied.read(parms={'multipart-manifest': 'get'}) + try: + copied_json = json.loads(copied_contents) + except ValueError: + self.fail("COPY didn't copy the manifest (invalid json on GET)") + self.assertEqual(source_contents, copied_contents) + self.assertEqual(copied_json[0], + {'data': base64.b64encode('APRE' * 8)}) + def test_slo_copy_the_manifest_updating_metadata(self): source = self.env.container.file("manifest-abcde") source.content_type = 'application/octet-stream' @@ -1105,6 +1147,56 @@ class TestSlo(Base): self.assertEqual('d', contents[-2]) self.assertEqual('e', contents[-1]) + def test_slo_data_segments(self): + # len('APRE' * 8) == 32 + # len('APOS' * 16) == 64 + # len('BPOS' * 32) == 128 + # len('CPRE' * 64) == 256 + # len(a_pre + seg_a + post_a) == 32 + 1024 ** 2 + 64 + # len(seg_b + post_b) == 1024 ** 2 + 128 + # len(c_pre + seg_c) == 256 + 1024 ** 2 + # len(total) == 3146208 + + for file_name in ("mixed-object-data-manifest", + "nested-data-manifest"): + file_item = self.env.container.file(file_name) + file_contents = file_item.read(size=3 * 1024 ** 2 + 456, + offset=28) + grouped_file_contents = [ + (char, sum(1 for _char in grp)) + for char, grp in itertools.groupby(file_contents)] + self.assertEqual([ + ('A', 1), + ('P', 1), + ('R', 1), + ('E', 1), + ('a', 1024 * 1024), + ] + [ + ('A', 1), + ('P', 1), + ('O', 1), + ('S', 1), + ] * 16 + [ + ('b', 1024 * 1024), + ] + [ + ('B', 1), + ('P', 1), + ('O', 1), + ('S', 1), + ] * 32 + [ + ('C', 1), + ('P', 1), + ('R', 1), + ('E', 1), + ] * 64 + [ + ('c', 1024 * 1024), + ] + [ + ('C', 1), + ('P', 1), + ('O', 1), + ('S', 1), + ], grouped_file_contents) + class TestSloUTF8(Base2, TestSlo): pass diff --git a/test/unit/common/middleware/test_slo.py b/test/unit/common/middleware/test_slo.py index d1434ec645..8fa452df10 100644 --- a/test/unit/common/middleware/test_slo.py +++ b/test/unit/common/middleware/test_slo.py @@ -16,12 +16,15 @@ from six.moves import range +import base64 import hashlib import json import time import unittest + from mock import patch from StringIO import StringIO + from swift.common import swob, utils from swift.common.header_key_dict import HeaderKeyDict from swift.common.middleware import slo @@ -704,6 +707,29 @@ class TestSloPutManifest(SloTestCase): status, headers, body = self.call_slo(req) self.assertEqual(status, '201 Created') + def test_handle_multipart_put_invalid_data(self): + def do_test(bad_data): + test_json_data = json.dumps([{'path': '/cont/object', + 'etag': 'etagoftheobjectsegment', + 'size_bytes': 100}, + {'data': bad_data}]) + req = Request.blank('/v1/a/c/o', body=test_json_data) + with self.assertRaises(HTTPException) as catcher: + self.slo.handle_multipart_put(req, fake_start_response) + self.assertEqual(catcher.exception.status_int, 400) + + do_test('invalid') # insufficient padding + do_test(12345) + do_test(0) + do_test(True) + do_test(False) + do_test(None) + do_test({}) + do_test([]) + # Empties are no good, either + do_test('') + do_test('====') + def test_handle_multipart_put_success_unicode(self): test_json_data = json.dumps([{'path': u'/cont/object\u2661', 'etag': 'etagoftheobjectsegment', @@ -2368,8 +2394,10 @@ class TestSloGetManifest(SloTestCase): 'bytes=0-3,8-11']) # we set swift.source for everything but the first request self.assertIsNone(self.app.swift_sources[0]) - self.assertEqual(self.app.swift_sources[1:], - ['SLO'] * (len(self.app.swift_sources) - 1)) + self.assertEqual( + self.app.swift_sources[1:], + ['SLO'] * (len(self.app.swift_sources) - 1) + ) self.assertEqual(md5hex(''.join([ md5hex('a' * 5), ':0-3;', md5hex('a' * 5), ':1-4;', @@ -2655,18 +2683,21 @@ class TestSloGetManifest(SloTestCase): 'Etag': 'man%d' % i}, manifest_json) + submanifest_bytes = 6 for i in range(19, 0, -1): manifest_data = [ {'name': '/gettest/obj%d' % i, 'hash': md5hex('body%02d' % i), 'bytes': '6', 'content_type': 'text/plain'}, + {'data': base64.b64encode('-' * 3)}, {'name': '/gettest/man%d' % (i + 1), 'hash': 'man%d' % (i + 1), 'sub_slo': True, - 'bytes': len(manifest_json), + 'bytes': submanifest_bytes, 'content_type': 'application/json'}] + submanifest_bytes += 9 manifest_json = json.dumps(manifest_data) self.app.register( 'GET', '/v1/AUTH_test/gettest/man%d' % i, @@ -2684,8 +2715,10 @@ class TestSloGetManifest(SloTestCase): # we don't know at header-sending time that things are going to go # wrong, so we end up with a 200 and a truncated body self.assertEqual(status, '200 OK') - self.assertEqual(body, ('body01body02body03body04body05' + - 'body06body07body08body09body10')) + self.assertEqual(headers['Content-Length'], str(9 * 19 + 6)) + self.assertEqual(body, ( + 'body01---body02---body03---body04---body05---' + + 'body06---body07---body08---body09---body10---')) # but the error shows up in logs self.assertEqual(self.slo.logger.get_lines_for_level('error'), [ "While processing manifest '/v1/AUTH_test/gettest/man1', " @@ -3058,6 +3091,275 @@ class TestSloGetManifest(SloTestCase): 'gettest/not_exists_obj' ]) + def test_leading_data_segment(self): + slo_etag = md5hex( + md5hex('preamble') + + md5hex('a' * 5) + ) + preamble = base64.b64encode('preamble') + self.app.register( + 'GET', '/v1/AUTH_test/gettest/manifest-single-preamble', + swob.HTTPOk, + { + 'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true' + }, + json.dumps([{ + 'data': preamble + }, { + 'name': '/gettest/a_5', + 'hash': md5hex('a' * 5), + 'content_type': 'text/plain', + 'bytes': '5', + }]) + ) + + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-preamble', + environ={'REQUEST_METHOD': 'GET'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('200 OK', status) + self.assertEqual(body, 'preambleaaaaa') + self.assertIn(('Etag', '"%s"' % slo_etag), headers) + self.assertIn(('Content-Length', '13'), headers) + + def test_trailing_data_segment(self): + slo_etag = md5hex( + md5hex('a' * 5) + + md5hex('postamble') + ) + postamble = base64.b64encode('postamble') + self.app.register( + 'GET', '/v1/AUTH_test/gettest/manifest-single-postamble', + swob.HTTPOk, + { + 'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true' + }, + json.dumps([{ + 'name': '/gettest/a_5', + 'hash': md5hex('a' * 5), + 'content_type': 'text/plain', + 'bytes': '5', + }, { + 'data': postamble + }]) + ) + + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-postamble', + environ={'REQUEST_METHOD': 'GET'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('200 OK', status) + self.assertEqual(body, 'aaaaapostamble') + self.assertIn(('Etag', '"%s"' % slo_etag), headers) + self.assertIn(('Content-Length', '14'), headers) + + def test_data_segment_sandwich(self): + slo_etag = md5hex( + md5hex('preamble') + + md5hex('a' * 5) + + md5hex('postamble') + ) + preamble = base64.b64encode('preamble') + postamble = base64.b64encode('postamble') + self.app.register( + 'GET', '/v1/AUTH_test/gettest/manifest-single-prepostamble', + swob.HTTPOk, + { + 'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true' + }, + json.dumps([{ + 'data': preamble, + }, { + 'name': '/gettest/a_5', + 'hash': md5hex('a' * 5), + 'content_type': 'text/plain', + 'bytes': '5', + }, { + 'data': postamble + }]) + ) + + # Test the whole SLO + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('200 OK', status) + self.assertEqual(body, 'preambleaaaaapostamble') + self.assertIn(('Etag', '"%s"' % slo_etag), headers) + self.assertIn(('Content-Length', '22'), headers) + + # Test complete preamble only + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=0-7'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'preamble') + + # Test range within preamble only + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=1-5'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'reamb') + + # Test complete postamble only + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=13-21'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'postamble') + + # Test partial pre and postamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=4-16'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'mbleaaaaapost') + + # Test partial preamble and first byte of data + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=1-8'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'reamblea') + + # Test last byte of segment data and partial postamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=12-16'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'apost') + + def test_bunches_of_data_segments(self): + slo_etag = md5hex( + md5hex('ABCDEF') + + md5hex('a' * 5) + + md5hex('123456') + + md5hex('GHIJKL') + + md5hex('b' * 10) + + md5hex('7890@#') + ) + self.app.register( + 'GET', '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + swob.HTTPOk, + { + 'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true' + }, + json.dumps([ + { + 'data': base64.b64encode('ABCDEF'), + }, + { + 'name': '/gettest/a_5', + 'hash': md5hex('a' * 5), + 'content_type': 'text/plain', + 'bytes': '5', + }, + { + 'data': base64.b64encode('123456') + }, + { + 'data': base64.b64encode('GHIJKL'), + }, + { + 'name': '/gettest/b_10', + 'hash': md5hex('b' * 10), + 'content_type': 'text/plain', + 'bytes': '10', + }, + { + 'data': base64.b64encode('7890@#') + } + ]) + ) + + # Test the whole SLO + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + environ={'REQUEST_METHOD': 'GET'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('200 OK', status) + self.assertEqual(body, 'ABCDEFaaaaa123456GHIJKLbbbbbbbbbb7890@#') + self.assertIn(('Etag', '"%s"' % slo_etag), headers) + self.assertIn(('Content-Length', '39'), headers) + + # Test last byte first pre-amble to first byte of second postamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=5-33'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'Faaaaa123456GHIJKLbbbbbbbbbb7') + + # Test only second complete preamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=17-22'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'GHIJKL') + + # Test only first complete postamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=11-16'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, '123456') + + # Test only range within first postamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=12-15'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, '2345') + + # Test only range within first postamble and second preamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=12-18'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, '23456GH') + class TestSloConditionalGetOldManifest(SloTestCase): slo_data = [ @@ -3327,7 +3629,7 @@ class TestSwiftInfo(unittest.TestCase): self.assertEqual(swift_info['slo'].get('max_manifest_size'), mware.max_manifest_size) self.assertEqual(1000, mware.max_manifest_segments) - self.assertEqual(2097152, mware.max_manifest_size) + self.assertEqual(8388608, mware.max_manifest_size) self.assertEqual(1048576, mware.rate_limit_under_size) self.assertEqual(10, mware.rate_limit_after_segment) self.assertEqual(1, mware.rate_limit_segments_per_sec)