diff --git a/glance/common/utils.py b/glance/common/utils.py index ac379a5b73..4291e8bf2b 100644 --- a/glance/common/utils.py +++ b/glance/common/utils.py @@ -1,6 +1,7 @@ # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # Copyright 2014 SoftLayer Technologies, Inc. +# Copyright 2015 Mirantis, Inc # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -128,6 +129,9 @@ def cooperative_read(fd): return readfn +MAX_COOP_READER_BUFFER_SIZE = 134217728 # 128M seems like a sane buffer limit + + class CooperativeReader(object): """ An eventlet thread friendly class for reading in image data. @@ -149,19 +153,68 @@ class CooperativeReader(object): # is more straightforward if hasattr(fd, 'read'): self.read = cooperative_read(fd) + else: + self.iterator = None + self.buffer = '' + self.position = 0 def read(self, length=None): - """Return the next chunk of the underlying iterator. + """Return the requested amount of bytes, fetching the next chunk of + the underlying iterator when needed. This is replaced with cooperative_read in __init__ if the underlying fd already supports read(). """ - if self.iterator is None: - self.iterator = self.__iter__() - try: - return self.iterator.next() - except StopIteration: - return '' + if length is None: + if len(self.buffer) - self.position > 0: + # if no length specified but some data exists in buffer, + # return that data and clear the buffer + result = self.buffer[self.position:] + self.buffer = '' + self.position = 0 + return str(result) + else: + # otherwise read the next chunk from the underlying iterator + # and return it as a whole. Reset the buffer, as subsequent + # calls may specify the length + try: + if self.iterator is None: + self.iterator = self.__iter__() + return self.iterator.next() + except StopIteration: + return '' + finally: + self.buffer = '' + self.position = 0 + else: + result = bytearray() + while len(result) < length: + if self.position < len(self.buffer): + to_read = length - len(result) + chunk = self.buffer[self.position:self.position + to_read] + result.extend(chunk) + + # This check is here to prevent potential OOM issues if + # this code is called with unreasonably high values of read + # size. Currently it is only called from the HTTP clients + # of Glance backend stores, which use httplib for data + # streaming, which has readsize hardcoded to 8K, so this + # check should never fire. Regardless it still worths to + # make the check, as the code may be reused somewhere else. + if len(result) >= MAX_COOP_READER_BUFFER_SIZE: + raise exception.LimitExceeded() + self.position += len(chunk) + else: + try: + if self.iterator is None: + self.iterator = self.__iter__() + self.buffer = self.iterator.next() + self.position = 0 + except StopIteration: + self.buffer = '' + self.position = 0 + return str(result) + return str(result) def __iter__(self): return cooperative_iter(self.fd.__iter__()) diff --git a/glance/tests/unit/common/test_utils.py b/glance/tests/unit/common/test_utils.py index 27ae32d1f5..f67fc35541 100644 --- a/glance/tests/unit/common/test_utils.py +++ b/glance/tests/unit/common/test_utils.py @@ -1,4 +1,5 @@ # Copyright 2011 OpenStack Foundation +# Copyright 2015 Mirantis, Inc # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -74,6 +75,44 @@ class TestUtils(test_utils.BaseTestCase): meat = ''.join(chunks) self.assertEqual('', meat) + def _create_generator(self, chunk_size, max_iterations): + chars = 'abc' + iteration = 0 + while True: + chunk = chars[iteration % len(chars)] * chunk_size + yield chunk + iteration += 1 + if iteration >= max_iterations: + raise StopIteration() + + def _test_reader_chunked(self, chunk_size, read_size, max_iterations=5): + generator = self._create_generator(chunk_size, max_iterations) + reader = utils.CooperativeReader(generator) + result = '' + while True: + data = reader.read(read_size) + if len(data) == 0: + break + self.assertLessEqual(len(data), read_size) + result += data + expected = ('a' * chunk_size + + 'b' * chunk_size + + 'c' * chunk_size + + 'a' * chunk_size + + 'b' * chunk_size) + self.assertEqual(expected, result) + + def test_cooperative_reader_preserves_size_chunk_less_then_read(self): + self._test_reader_chunked(43, 101) + + def test_cooperative_reader_preserves_size_chunk_equals_read(self): + self._test_reader_chunked(1024, 1024) + + def test_cooperative_reader_preserves_size_chunk_more_then_read(self): + chunk_size = 16 * 1024 * 1024 # 16 Mb, as in remote http source + read_size = 8 * 1024 # 8k, as in httplib + self._test_reader_chunked(chunk_size, read_size) + def test_limiting_reader(self): """Ensure limiting reader class accesses all bytes of file""" BYTES = 1024