Fix for CooperativeReader to process read length

CooperativeReader, being an eventlet-friendly wrapper around the
generator- based reader of image data, actually transforms
chunk-by-chunk iteration into the readable stream. It is used when the
image is being copied from the remote source: some generator-based
image data representing the remote source acts as its underlying
object, and the instance of CooperativeReader is passed as a data
stream to the backend client which uses it to read the data.

Before this patch, the CooperativeReader was ignoring the "length"
parameter of the read method, always returning the whole chunk returned
by the underlying generator (in case of HTTP source the size of this
chunk is 16 M). This was causing problems for the clients attempting to
read data from it, and - under some circumstances - the loss of data.

For chunked storage of files in Swift a special class (ChunkReader,
declared in the swift store driver) is used to reduce the requested
read length so no extra data is read and transferred. However, this was
not working as the CooperativeReader (which was the underlying stream
for the ChunkReader) was ignoring the requested size. This was causing
the data to be lost when reading behind the boundaries of the Chunks.

This patchset introduces a buffer in the CooperativeReader to store the
most recently fetched iterator chunk. The reads are independent from
requests to iterator, so the CooperativeReader is able to return the
exact requested amount of bytes and no data is lost due to extra-reads.

SecurityImpact

Change-Id: Ief37d1e29487bb03e612320f5cc06910cfd1c23a
Closes-bug: #1412802
This commit is contained in:
Alexander Tivelkov 2015-01-20 17:25:07 +03:00
parent 808fa29ce2
commit 270ec44a89
2 changed files with 99 additions and 7 deletions

View File

@ -1,6 +1,7 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2014 SoftLayer Technologies, Inc.
# Copyright 2015 Mirantis, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -128,6 +129,9 @@ def cooperative_read(fd):
return readfn
MAX_COOP_READER_BUFFER_SIZE = 134217728 # 128M seems like a sane buffer limit
class CooperativeReader(object):
"""
An eventlet thread friendly class for reading in image data.
@ -149,19 +153,68 @@ class CooperativeReader(object):
# is more straightforward
if hasattr(fd, 'read'):
self.read = cooperative_read(fd)
else:
self.iterator = None
self.buffer = ''
self.position = 0
def read(self, length=None):
"""Return the next chunk of the underlying iterator.
"""Return the requested amount of bytes, fetching the next chunk of
the underlying iterator when needed.
This is replaced with cooperative_read in __init__ if the underlying
fd already supports read().
"""
if self.iterator is None:
self.iterator = self.__iter__()
try:
return self.iterator.next()
except StopIteration:
return ''
if length is None:
if len(self.buffer) - self.position > 0:
# if no length specified but some data exists in buffer,
# return that data and clear the buffer
result = self.buffer[self.position:]
self.buffer = ''
self.position = 0
return str(result)
else:
# otherwise read the next chunk from the underlying iterator
# and return it as a whole. Reset the buffer, as subsequent
# calls may specify the length
try:
if self.iterator is None:
self.iterator = self.__iter__()
return self.iterator.next()
except StopIteration:
return ''
finally:
self.buffer = ''
self.position = 0
else:
result = bytearray()
while len(result) < length:
if self.position < len(self.buffer):
to_read = length - len(result)
chunk = self.buffer[self.position:self.position + to_read]
result.extend(chunk)
# This check is here to prevent potential OOM issues if
# this code is called with unreasonably high values of read
# size. Currently it is only called from the HTTP clients
# of Glance backend stores, which use httplib for data
# streaming, which has readsize hardcoded to 8K, so this
# check should never fire. Regardless it still worths to
# make the check, as the code may be reused somewhere else.
if len(result) >= MAX_COOP_READER_BUFFER_SIZE:
raise exception.LimitExceeded()
self.position += len(chunk)
else:
try:
if self.iterator is None:
self.iterator = self.__iter__()
self.buffer = self.iterator.next()
self.position = 0
except StopIteration:
self.buffer = ''
self.position = 0
return str(result)
return str(result)
def __iter__(self):
return cooperative_iter(self.fd.__iter__())

View File

@ -1,4 +1,5 @@
# Copyright 2011 OpenStack Foundation
# Copyright 2015 Mirantis, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -74,6 +75,44 @@ class TestUtils(test_utils.BaseTestCase):
meat = ''.join(chunks)
self.assertEqual('', meat)
def _create_generator(self, chunk_size, max_iterations):
chars = 'abc'
iteration = 0
while True:
chunk = chars[iteration % len(chars)] * chunk_size
yield chunk
iteration += 1
if iteration >= max_iterations:
raise StopIteration()
def _test_reader_chunked(self, chunk_size, read_size, max_iterations=5):
generator = self._create_generator(chunk_size, max_iterations)
reader = utils.CooperativeReader(generator)
result = ''
while True:
data = reader.read(read_size)
if len(data) == 0:
break
self.assertLessEqual(len(data), read_size)
result += data
expected = ('a' * chunk_size +
'b' * chunk_size +
'c' * chunk_size +
'a' * chunk_size +
'b' * chunk_size)
self.assertEqual(expected, result)
def test_cooperative_reader_preserves_size_chunk_less_then_read(self):
self._test_reader_chunked(43, 101)
def test_cooperative_reader_preserves_size_chunk_equals_read(self):
self._test_reader_chunked(1024, 1024)
def test_cooperative_reader_preserves_size_chunk_more_then_read(self):
chunk_size = 16 * 1024 * 1024 # 16 Mb, as in remote http source
read_size = 8 * 1024 # 8k, as in httplib
self._test_reader_chunked(chunk_size, read_size)
def test_limiting_reader(self):
"""Ensure limiting reader class accesses all bytes of file"""
BYTES = 1024