Merge "Fix for CooperativeReader to process read length"
This commit is contained in:
commit
556062c5a7
|
@ -1,6 +1,7 @@
|
|||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# Copyright 2014 SoftLayer Technologies, Inc.
|
||||
# Copyright 2015 Mirantis, Inc
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -128,6 +129,9 @@ def cooperative_read(fd):
|
|||
return readfn
|
||||
|
||||
|
||||
MAX_COOP_READER_BUFFER_SIZE = 134217728 # 128M seems like a sane buffer limit
|
||||
|
||||
|
||||
class CooperativeReader(object):
|
||||
"""
|
||||
An eventlet thread friendly class for reading in image data.
|
||||
|
@ -149,19 +153,68 @@ class CooperativeReader(object):
|
|||
# is more straightforward
|
||||
if hasattr(fd, 'read'):
|
||||
self.read = cooperative_read(fd)
|
||||
else:
|
||||
self.iterator = None
|
||||
self.buffer = ''
|
||||
self.position = 0
|
||||
|
||||
def read(self, length=None):
|
||||
"""Return the next chunk of the underlying iterator.
|
||||
"""Return the requested amount of bytes, fetching the next chunk of
|
||||
the underlying iterator when needed.
|
||||
|
||||
This is replaced with cooperative_read in __init__ if the underlying
|
||||
fd already supports read().
|
||||
"""
|
||||
if self.iterator is None:
|
||||
self.iterator = self.__iter__()
|
||||
try:
|
||||
return self.iterator.next()
|
||||
except StopIteration:
|
||||
return ''
|
||||
if length is None:
|
||||
if len(self.buffer) - self.position > 0:
|
||||
# if no length specified but some data exists in buffer,
|
||||
# return that data and clear the buffer
|
||||
result = self.buffer[self.position:]
|
||||
self.buffer = ''
|
||||
self.position = 0
|
||||
return str(result)
|
||||
else:
|
||||
# otherwise read the next chunk from the underlying iterator
|
||||
# and return it as a whole. Reset the buffer, as subsequent
|
||||
# calls may specify the length
|
||||
try:
|
||||
if self.iterator is None:
|
||||
self.iterator = self.__iter__()
|
||||
return self.iterator.next()
|
||||
except StopIteration:
|
||||
return ''
|
||||
finally:
|
||||
self.buffer = ''
|
||||
self.position = 0
|
||||
else:
|
||||
result = bytearray()
|
||||
while len(result) < length:
|
||||
if self.position < len(self.buffer):
|
||||
to_read = length - len(result)
|
||||
chunk = self.buffer[self.position:self.position + to_read]
|
||||
result.extend(chunk)
|
||||
|
||||
# This check is here to prevent potential OOM issues if
|
||||
# this code is called with unreasonably high values of read
|
||||
# size. Currently it is only called from the HTTP clients
|
||||
# of Glance backend stores, which use httplib for data
|
||||
# streaming, which has readsize hardcoded to 8K, so this
|
||||
# check should never fire. Regardless it still worths to
|
||||
# make the check, as the code may be reused somewhere else.
|
||||
if len(result) >= MAX_COOP_READER_BUFFER_SIZE:
|
||||
raise exception.LimitExceeded()
|
||||
self.position += len(chunk)
|
||||
else:
|
||||
try:
|
||||
if self.iterator is None:
|
||||
self.iterator = self.__iter__()
|
||||
self.buffer = self.iterator.next()
|
||||
self.position = 0
|
||||
except StopIteration:
|
||||
self.buffer = ''
|
||||
self.position = 0
|
||||
return str(result)
|
||||
return str(result)
|
||||
|
||||
def __iter__(self):
|
||||
return cooperative_iter(self.fd.__iter__())
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
# Copyright 2011 OpenStack Foundation
|
||||
# Copyright 2015 Mirantis, Inc
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -74,6 +75,44 @@ class TestUtils(test_utils.BaseTestCase):
|
|||
meat = ''.join(chunks)
|
||||
self.assertEqual('', meat)
|
||||
|
||||
def _create_generator(self, chunk_size, max_iterations):
|
||||
chars = 'abc'
|
||||
iteration = 0
|
||||
while True:
|
||||
chunk = chars[iteration % len(chars)] * chunk_size
|
||||
yield chunk
|
||||
iteration += 1
|
||||
if iteration >= max_iterations:
|
||||
raise StopIteration()
|
||||
|
||||
def _test_reader_chunked(self, chunk_size, read_size, max_iterations=5):
|
||||
generator = self._create_generator(chunk_size, max_iterations)
|
||||
reader = utils.CooperativeReader(generator)
|
||||
result = ''
|
||||
while True:
|
||||
data = reader.read(read_size)
|
||||
if len(data) == 0:
|
||||
break
|
||||
self.assertLessEqual(len(data), read_size)
|
||||
result += data
|
||||
expected = ('a' * chunk_size +
|
||||
'b' * chunk_size +
|
||||
'c' * chunk_size +
|
||||
'a' * chunk_size +
|
||||
'b' * chunk_size)
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
def test_cooperative_reader_preserves_size_chunk_less_then_read(self):
|
||||
self._test_reader_chunked(43, 101)
|
||||
|
||||
def test_cooperative_reader_preserves_size_chunk_equals_read(self):
|
||||
self._test_reader_chunked(1024, 1024)
|
||||
|
||||
def test_cooperative_reader_preserves_size_chunk_more_then_read(self):
|
||||
chunk_size = 16 * 1024 * 1024 # 16 Mb, as in remote http source
|
||||
read_size = 8 * 1024 # 8k, as in httplib
|
||||
self._test_reader_chunked(chunk_size, read_size)
|
||||
|
||||
def test_limiting_reader(self):
|
||||
"""Ensure limiting reader class accesses all bytes of file"""
|
||||
BYTES = 1024
|
||||
|
|
Loading…
Reference in New Issue