From 270ec44a890f4bd2917310e2fcab9ad2bb9413b7 Mon Sep 17 00:00:00 2001 From: Alexander Tivelkov Date: Tue, 20 Jan 2015 17:25:07 +0300 Subject: [PATCH] Fix for CooperativeReader to process read length CooperativeReader, being an eventlet-friendly wrapper around the generator- based reader of image data, actually transforms chunk-by-chunk iteration into the readable stream. It is used when the image is being copied from the remote source: some generator-based image data representing the remote source acts as its underlying object, and the instance of CooperativeReader is passed as a data stream to the backend client which uses it to read the data. Before this patch, the CooperativeReader was ignoring the "length" parameter of the read method, always returning the whole chunk returned by the underlying generator (in case of HTTP source the size of this chunk is 16 M). This was causing problems for the clients attempting to read data from it, and - under some circumstances - the loss of data. For chunked storage of files in Swift a special class (ChunkReader, declared in the swift store driver) is used to reduce the requested read length so no extra data is read and transferred. However, this was not working as the CooperativeReader (which was the underlying stream for the ChunkReader) was ignoring the requested size. This was causing the data to be lost when reading behind the boundaries of the Chunks. This patchset introduces a buffer in the CooperativeReader to store the most recently fetched iterator chunk. The reads are independent from requests to iterator, so the CooperativeReader is able to return the exact requested amount of bytes and no data is lost due to extra-reads. SecurityImpact Change-Id: Ief37d1e29487bb03e612320f5cc06910cfd1c23a Closes-bug: #1412802 --- glance/common/utils.py | 67 +++++++++++++++++++++++--- glance/tests/unit/common/test_utils.py | 39 +++++++++++++++ 2 files changed, 99 insertions(+), 7 deletions(-) diff --git a/glance/common/utils.py b/glance/common/utils.py index ac379a5b73..4291e8bf2b 100644 --- a/glance/common/utils.py +++ b/glance/common/utils.py @@ -1,6 +1,7 @@ # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # Copyright 2014 SoftLayer Technologies, Inc. +# Copyright 2015 Mirantis, Inc # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -128,6 +129,9 @@ def cooperative_read(fd): return readfn +MAX_COOP_READER_BUFFER_SIZE = 134217728 # 128M seems like a sane buffer limit + + class CooperativeReader(object): """ An eventlet thread friendly class for reading in image data. @@ -149,19 +153,68 @@ class CooperativeReader(object): # is more straightforward if hasattr(fd, 'read'): self.read = cooperative_read(fd) + else: + self.iterator = None + self.buffer = '' + self.position = 0 def read(self, length=None): - """Return the next chunk of the underlying iterator. + """Return the requested amount of bytes, fetching the next chunk of + the underlying iterator when needed. This is replaced with cooperative_read in __init__ if the underlying fd already supports read(). """ - if self.iterator is None: - self.iterator = self.__iter__() - try: - return self.iterator.next() - except StopIteration: - return '' + if length is None: + if len(self.buffer) - self.position > 0: + # if no length specified but some data exists in buffer, + # return that data and clear the buffer + result = self.buffer[self.position:] + self.buffer = '' + self.position = 0 + return str(result) + else: + # otherwise read the next chunk from the underlying iterator + # and return it as a whole. Reset the buffer, as subsequent + # calls may specify the length + try: + if self.iterator is None: + self.iterator = self.__iter__() + return self.iterator.next() + except StopIteration: + return '' + finally: + self.buffer = '' + self.position = 0 + else: + result = bytearray() + while len(result) < length: + if self.position < len(self.buffer): + to_read = length - len(result) + chunk = self.buffer[self.position:self.position + to_read] + result.extend(chunk) + + # This check is here to prevent potential OOM issues if + # this code is called with unreasonably high values of read + # size. Currently it is only called from the HTTP clients + # of Glance backend stores, which use httplib for data + # streaming, which has readsize hardcoded to 8K, so this + # check should never fire. Regardless it still worths to + # make the check, as the code may be reused somewhere else. + if len(result) >= MAX_COOP_READER_BUFFER_SIZE: + raise exception.LimitExceeded() + self.position += len(chunk) + else: + try: + if self.iterator is None: + self.iterator = self.__iter__() + self.buffer = self.iterator.next() + self.position = 0 + except StopIteration: + self.buffer = '' + self.position = 0 + return str(result) + return str(result) def __iter__(self): return cooperative_iter(self.fd.__iter__()) diff --git a/glance/tests/unit/common/test_utils.py b/glance/tests/unit/common/test_utils.py index 27ae32d1f5..f67fc35541 100644 --- a/glance/tests/unit/common/test_utils.py +++ b/glance/tests/unit/common/test_utils.py @@ -1,4 +1,5 @@ # Copyright 2011 OpenStack Foundation +# Copyright 2015 Mirantis, Inc # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -74,6 +75,44 @@ class TestUtils(test_utils.BaseTestCase): meat = ''.join(chunks) self.assertEqual('', meat) + def _create_generator(self, chunk_size, max_iterations): + chars = 'abc' + iteration = 0 + while True: + chunk = chars[iteration % len(chars)] * chunk_size + yield chunk + iteration += 1 + if iteration >= max_iterations: + raise StopIteration() + + def _test_reader_chunked(self, chunk_size, read_size, max_iterations=5): + generator = self._create_generator(chunk_size, max_iterations) + reader = utils.CooperativeReader(generator) + result = '' + while True: + data = reader.read(read_size) + if len(data) == 0: + break + self.assertLessEqual(len(data), read_size) + result += data + expected = ('a' * chunk_size + + 'b' * chunk_size + + 'c' * chunk_size + + 'a' * chunk_size + + 'b' * chunk_size) + self.assertEqual(expected, result) + + def test_cooperative_reader_preserves_size_chunk_less_then_read(self): + self._test_reader_chunked(43, 101) + + def test_cooperative_reader_preserves_size_chunk_equals_read(self): + self._test_reader_chunked(1024, 1024) + + def test_cooperative_reader_preserves_size_chunk_more_then_read(self): + chunk_size = 16 * 1024 * 1024 # 16 Mb, as in remote http source + read_size = 8 * 1024 # 8k, as in httplib + self._test_reader_chunked(chunk_size, read_size) + def test_limiting_reader(self): """Ensure limiting reader class accesses all bytes of file""" BYTES = 1024