Merge "Fix for CooperativeReader to process read length"

This commit is contained in:
Jenkins 2015-02-05 21:43:01 +00:00 committed by Gerrit Code Review
commit 556062c5a7
2 changed files with 99 additions and 7 deletions

View File

@ -1,6 +1,7 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2014 SoftLayer Technologies, Inc.
# Copyright 2015 Mirantis, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -128,6 +129,9 @@ def cooperative_read(fd):
return readfn
MAX_COOP_READER_BUFFER_SIZE = 134217728 # 128M seems like a sane buffer limit
class CooperativeReader(object):
"""
An eventlet thread friendly class for reading in image data.
@ -149,19 +153,68 @@ class CooperativeReader(object):
# is more straightforward
if hasattr(fd, 'read'):
self.read = cooperative_read(fd)
else:
self.iterator = None
self.buffer = ''
self.position = 0
def read(self, length=None):
"""Return the next chunk of the underlying iterator.
"""Return the requested amount of bytes, fetching the next chunk of
the underlying iterator when needed.
This is replaced with cooperative_read in __init__ if the underlying
fd already supports read().
"""
if self.iterator is None:
self.iterator = self.__iter__()
try:
return self.iterator.next()
except StopIteration:
return ''
if length is None:
if len(self.buffer) - self.position > 0:
# if no length specified but some data exists in buffer,
# return that data and clear the buffer
result = self.buffer[self.position:]
self.buffer = ''
self.position = 0
return str(result)
else:
# otherwise read the next chunk from the underlying iterator
# and return it as a whole. Reset the buffer, as subsequent
# calls may specify the length
try:
if self.iterator is None:
self.iterator = self.__iter__()
return self.iterator.next()
except StopIteration:
return ''
finally:
self.buffer = ''
self.position = 0
else:
result = bytearray()
while len(result) < length:
if self.position < len(self.buffer):
to_read = length - len(result)
chunk = self.buffer[self.position:self.position + to_read]
result.extend(chunk)
# This check is here to prevent potential OOM issues if
# this code is called with unreasonably high values of read
# size. Currently it is only called from the HTTP clients
# of Glance backend stores, which use httplib for data
# streaming, which has readsize hardcoded to 8K, so this
# check should never fire. Regardless it still worths to
# make the check, as the code may be reused somewhere else.
if len(result) >= MAX_COOP_READER_BUFFER_SIZE:
raise exception.LimitExceeded()
self.position += len(chunk)
else:
try:
if self.iterator is None:
self.iterator = self.__iter__()
self.buffer = self.iterator.next()
self.position = 0
except StopIteration:
self.buffer = ''
self.position = 0
return str(result)
return str(result)
def __iter__(self):
return cooperative_iter(self.fd.__iter__())

View File

@ -1,4 +1,5 @@
# Copyright 2011 OpenStack Foundation
# Copyright 2015 Mirantis, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -74,6 +75,44 @@ class TestUtils(test_utils.BaseTestCase):
meat = ''.join(chunks)
self.assertEqual('', meat)
def _create_generator(self, chunk_size, max_iterations):
chars = 'abc'
iteration = 0
while True:
chunk = chars[iteration % len(chars)] * chunk_size
yield chunk
iteration += 1
if iteration >= max_iterations:
raise StopIteration()
def _test_reader_chunked(self, chunk_size, read_size, max_iterations=5):
generator = self._create_generator(chunk_size, max_iterations)
reader = utils.CooperativeReader(generator)
result = ''
while True:
data = reader.read(read_size)
if len(data) == 0:
break
self.assertLessEqual(len(data), read_size)
result += data
expected = ('a' * chunk_size +
'b' * chunk_size +
'c' * chunk_size +
'a' * chunk_size +
'b' * chunk_size)
self.assertEqual(expected, result)
def test_cooperative_reader_preserves_size_chunk_less_then_read(self):
self._test_reader_chunked(43, 101)
def test_cooperative_reader_preserves_size_chunk_equals_read(self):
self._test_reader_chunked(1024, 1024)
def test_cooperative_reader_preserves_size_chunk_more_then_read(self):
chunk_size = 16 * 1024 * 1024 # 16 Mb, as in remote http source
read_size = 8 * 1024 # 8k, as in httplib
self._test_reader_chunked(chunk_size, read_size)
def test_limiting_reader(self):
"""Ensure limiting reader class accesses all bytes of file"""
BYTES = 1024