Merge "Buffered reader: Upload recovery for swift store"
This commit is contained in:
commit
6a0c147c5d
|
@ -0,0 +1,169 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import socket
|
||||
import tempfile
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import encodeutils
|
||||
|
||||
from glance_store import exceptions
|
||||
from glance_store.i18n import _
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
READ_SZ = 65536
|
||||
|
||||
BUFFERING_OPTS = [
|
||||
cfg.StrOpt('swift_upload_buffer_dir',
|
||||
help=_("""
|
||||
Directory to buffer image segments before upload to Swift.
|
||||
|
||||
Provide a string value representing the absolute path to the
|
||||
directory on the glance node where image segments will be
|
||||
buffered briefly before they are uploaded to swift.
|
||||
|
||||
NOTES:
|
||||
* This is required only when the configuration option
|
||||
``swift_buffer_on_upload`` is set to True.
|
||||
* This directory should be provisioned keeping in mind the
|
||||
``swift_store_large_object_chunk_size`` and the maximum
|
||||
number of images that could be uploaded simultaneously by
|
||||
a given glance node.
|
||||
|
||||
Possible values:
|
||||
* String value representing an absolute directory path
|
||||
|
||||
Related options:
|
||||
* swift_buffer_on_upload
|
||||
* swift_store_large_object_chunk_size
|
||||
|
||||
""")),
|
||||
]
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def validate_buffering(buffer_dir):
|
||||
if buffer_dir is None:
|
||||
msg = _('Configuration option "swift_upload_buffer_dir" is '
|
||||
'not set. Please set it to a valid path to buffer '
|
||||
'during Swift uploads.')
|
||||
raise exceptions.BadStoreConfiguration(store_name='swift',
|
||||
reason=msg)
|
||||
|
||||
# NOTE(dharinic): Ensure that the provided directory path for
|
||||
# buffering is valid
|
||||
try:
|
||||
_tmpfile = tempfile.TemporaryFile(dir=buffer_dir)
|
||||
except OSError as err:
|
||||
msg = (_('Unable to use buffer directory set with '
|
||||
'"swift_upload_buffer_dir". Error: %s') %
|
||||
encodeutils.exception_to_unicode(err))
|
||||
raise exceptions.BadStoreConfiguration(store_name='swift',
|
||||
reason=msg)
|
||||
else:
|
||||
_tmpfile.close()
|
||||
return True
|
||||
|
||||
|
||||
class BufferedReader(object):
|
||||
"""Buffer a chunk (segment) worth of data to disk before sending it swift.
|
||||
This creates the ability to back the input stream up and re-try put object
|
||||
requests. (Swiftclient will try to reset the file pointer on any upload
|
||||
failure if seek and tell methods are provided on the input file.)
|
||||
|
||||
Chunks are temporarily buffered to disk. Disk space consumed will be
|
||||
roughly (segment size * number of in-flight upload requests).
|
||||
|
||||
There exists a possibility where the disk space consumed for buffering MAY
|
||||
eat into the disk space available for glance cache. This may affect image
|
||||
download performance. So, extra care should be taken while deploying this
|
||||
to ensure there is enough disk space available.
|
||||
"""
|
||||
|
||||
def __init__(self, fd, checksum, total, verifier=None):
|
||||
self.fd = fd
|
||||
self.total = total
|
||||
self.checksum = checksum
|
||||
self.verifier = verifier
|
||||
# maintain a pointer to use to update checksum and verifier
|
||||
self.update_position = 0
|
||||
|
||||
buffer_dir = CONF.glance_store.swift_upload_buffer_dir
|
||||
self._tmpfile = tempfile.TemporaryFile(dir=buffer_dir)
|
||||
|
||||
self._buffered = False
|
||||
self.is_zero_size = True
|
||||
self._buffer()
|
||||
# Setting the file pointer back to the beginning of file
|
||||
self._tmpfile.seek(0)
|
||||
|
||||
def read(self, sz):
|
||||
"""Read up to a chunk's worth of data from the input stream into a
|
||||
file buffer. Then return data out of that buffer.
|
||||
"""
|
||||
remaining = self.total - self._tmpfile.tell()
|
||||
read_size = min(remaining, sz)
|
||||
# read out of the buffered chunk
|
||||
result = self._tmpfile.read(read_size)
|
||||
# update the checksum and verifier with only the bytes
|
||||
# they have not seen
|
||||
update = self.update_position - self._tmpfile.tell()
|
||||
if update < 0:
|
||||
self.checksum.update(result[update:])
|
||||
if self.verifier:
|
||||
self.verifier.update(result[update:])
|
||||
self.update_position += abs(update)
|
||||
return result
|
||||
|
||||
def _buffer(self):
|
||||
to_buffer = self.total
|
||||
LOG.debug("Buffering %s bytes of image segment" % to_buffer)
|
||||
|
||||
while not self._buffered:
|
||||
read_sz = min(to_buffer, READ_SZ)
|
||||
try:
|
||||
buf = self.fd.read(read_sz)
|
||||
except IOError as e:
|
||||
# We actually don't know what exactly self.fd is. And as a
|
||||
# result we don't know which exception it may raise. To pass
|
||||
# the retry mechanism inside swift client we must limit the
|
||||
# possible set of errors.
|
||||
raise socket.error(*e.args)
|
||||
if len(buf) == 0:
|
||||
self._tmpfile.seek(0)
|
||||
self._buffered = True
|
||||
self.is_zero_size = False
|
||||
break
|
||||
self._tmpfile.write(buf)
|
||||
to_buffer -= len(buf)
|
||||
|
||||
# NOTE(belliott) seek and tell get used by python-swiftclient to "reset"
|
||||
# if there is a put_object error
|
||||
def seek(self, offset):
|
||||
LOG.debug("Seek from %s to %s" % (self._tmpfile.tell(), offset))
|
||||
self._tmpfile.seek(offset)
|
||||
|
||||
def tell(self):
|
||||
return self._tmpfile.tell()
|
||||
|
||||
@property
|
||||
def bytes_read(self):
|
||||
return self.tell()
|
||||
|
||||
def __enter__(self):
|
||||
self._tmpfile.__enter__()
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
# close and delete the temporary file used to buffer data
|
||||
self._tmpfile.__exit__(type, value, traceback)
|
|
@ -35,8 +35,8 @@ try:
|
|||
except ImportError:
|
||||
swiftclient = None
|
||||
|
||||
|
||||
import glance_store
|
||||
from glance_store._drivers.swift import buffered
|
||||
from glance_store._drivers.swift import connection_manager
|
||||
from glance_store._drivers.swift import utils as sutils
|
||||
from glance_store import capabilities
|
||||
|
@ -447,6 +447,32 @@ Possible values:
|
|||
Related options:
|
||||
* swift_store_multi_tenant
|
||||
|
||||
""")),
|
||||
cfg.BoolOpt('swift_buffer_on_upload',
|
||||
default=False,
|
||||
help=_("""
|
||||
Buffer image segments before upload to Swift.
|
||||
|
||||
Provide a boolean value to indicate whether or not Glance should
|
||||
buffer image data to disk while uploading to swift. This enables
|
||||
Glance to resume uploads on error.
|
||||
|
||||
NOTES:
|
||||
When enabling this option, one should take great care as this
|
||||
increases disk usage on the API node. Be aware that depending
|
||||
upon how the file system is configured, the disk space used
|
||||
for buffering may decrease the actual disk space available for
|
||||
the glance image cache. Disk utilization will cap according to
|
||||
the following equation:
|
||||
(``swift_store_large_object_chunk_size`` * ``workers`` * 1000)
|
||||
|
||||
Possible values:
|
||||
* True
|
||||
* False
|
||||
|
||||
Related options:
|
||||
* swift_upload_buffer_dir
|
||||
|
||||
"""))
|
||||
]
|
||||
|
||||
|
@ -715,8 +741,8 @@ def Store(conf):
|
|||
raise exceptions.BadStoreConfiguration(store_name="swift",
|
||||
reason=msg)
|
||||
try:
|
||||
conf.register_opts(_SWIFT_OPTS + sutils.swift_opts,
|
||||
group='glance_store')
|
||||
conf.register_opts(_SWIFT_OPTS + sutils.swift_opts +
|
||||
buffered.BUFFERING_OPTS, group='glance_store')
|
||||
except cfg.DuplicateOptError:
|
||||
pass
|
||||
|
||||
|
@ -724,7 +750,7 @@ def Store(conf):
|
|||
return MultiTenantStore(conf)
|
||||
return SingleTenantStore(conf)
|
||||
|
||||
Store.OPTIONS = _SWIFT_OPTS + sutils.swift_opts
|
||||
Store.OPTIONS = _SWIFT_OPTS + sutils.swift_opts + buffered.BUFFERING_OPTS
|
||||
|
||||
|
||||
def _is_slo(slo_header):
|
||||
|
@ -762,6 +788,14 @@ class BaseStore(driver.Store):
|
|||
msg = _("Missing dependency python_swiftclient.")
|
||||
raise exceptions.BadStoreConfiguration(store_name="swift",
|
||||
reason=msg)
|
||||
|
||||
if glance_conf.swift_buffer_on_upload:
|
||||
buffer_dir = glance_conf.swift_upload_buffer_dir
|
||||
if buffered.validate_buffering(buffer_dir):
|
||||
self.reader_class = buffered.BufferedReader
|
||||
else:
|
||||
self.reader_class = ChunkReader
|
||||
|
||||
super(BaseStore, self).configure(re_raise_bsc=re_raise_bsc)
|
||||
|
||||
def _get_object(self, location, manager, start=None):
|
||||
|
@ -905,42 +939,45 @@ class BaseStore(driver.Store):
|
|||
content_length = chunk_size
|
||||
|
||||
chunk_name = "%s-%05d" % (location.obj, chunk_id)
|
||||
reader = ChunkReader(image_file, checksum, chunk_size,
|
||||
verifier)
|
||||
if reader.is_zero_size is True:
|
||||
LOG.debug('Not writing zero-length chunk.')
|
||||
break
|
||||
try:
|
||||
chunk_etag = manager.get_connection().put_object(
|
||||
location.container, chunk_name, reader,
|
||||
content_length=content_length)
|
||||
written_chunks.append(chunk_name)
|
||||
except Exception:
|
||||
# Delete orphaned segments from swift backend
|
||||
with excutils.save_and_reraise_exception():
|
||||
reason = _LE("Error during chunked upload to "
|
||||
"backend, deleting stale chunks")
|
||||
LOG.error(reason)
|
||||
self._delete_stale_chunks(
|
||||
manager.get_connection(),
|
||||
location.container,
|
||||
written_chunks)
|
||||
|
||||
bytes_read = reader.bytes_read
|
||||
msg = ("Wrote chunk %(chunk_name)s (%(chunk_id)d/"
|
||||
"%(total_chunks)s) of length %(bytes_read)d "
|
||||
"to Swift returning MD5 of content: "
|
||||
"%(chunk_etag)s" %
|
||||
{'chunk_name': chunk_name,
|
||||
'chunk_id': chunk_id,
|
||||
'total_chunks': total_chunks,
|
||||
'bytes_read': bytes_read,
|
||||
'chunk_etag': chunk_etag})
|
||||
LOG.debug(msg)
|
||||
with self.reader_class(image_file, checksum,
|
||||
chunk_size, verifier) as reader:
|
||||
if reader.is_zero_size is True:
|
||||
LOG.debug('Not writing zero-length chunk.')
|
||||
break
|
||||
|
||||
try:
|
||||
chunk_etag = \
|
||||
manager.get_connection().put_object(
|
||||
location.container,
|
||||
chunk_name, reader,
|
||||
content_length=content_length)
|
||||
written_chunks.append(chunk_name)
|
||||
except Exception:
|
||||
# Delete orphaned segments from swift backend
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("Error during chunked upload "
|
||||
"to backend, deleting stale "
|
||||
"chunks."))
|
||||
self._delete_stale_chunks(
|
||||
manager.get_connection(),
|
||||
location.container,
|
||||
written_chunks)
|
||||
|
||||
bytes_read = reader.bytes_read
|
||||
msg = ("Wrote chunk %(chunk_name)s (%(chunk_id)d/"
|
||||
"%(total_chunks)s) of length %(bytes_read)"
|
||||
"d to Swift returning MD5 of content: "
|
||||
"%(chunk_etag)s" %
|
||||
{'chunk_name': chunk_name,
|
||||
'chunk_id': chunk_id,
|
||||
'total_chunks': total_chunks,
|
||||
'bytes_read': bytes_read,
|
||||
'chunk_etag': chunk_etag})
|
||||
LOG.debug(msg)
|
||||
|
||||
chunk_id += 1
|
||||
combined_chunks_size += bytes_read
|
||||
|
||||
# In the case we have been given an unknown image size,
|
||||
# set the size to the total size of the combined chunks.
|
||||
if image_size == 0:
|
||||
|
@ -1501,3 +1538,9 @@ class ChunkReader(object):
|
|||
if self.verifier:
|
||||
self.verifier.update(result)
|
||||
return result
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
pass
|
||||
|
|
|
@ -122,6 +122,8 @@ class OptsTestCase(base.StoreBaseTest):
|
|||
'swift_store_ssl_compression',
|
||||
'swift_store_use_trusts',
|
||||
'swift_store_user',
|
||||
'swift_buffer_on_upload',
|
||||
'swift_upload_buffer_dir',
|
||||
'vmware_insecure',
|
||||
'vmware_ca_file',
|
||||
'vmware_api_retry_count',
|
||||
|
|
|
@ -34,6 +34,7 @@ from six.moves import http_client
|
|||
from six.moves import range
|
||||
import swiftclient
|
||||
|
||||
from glance_store._drivers.swift import buffered
|
||||
from glance_store._drivers.swift import connection_manager as manager
|
||||
from glance_store._drivers.swift import store as swift
|
||||
from glance_store._drivers.swift import utils as sutils
|
||||
|
@ -354,6 +355,28 @@ class SwiftTests(object):
|
|||
self.store.get,
|
||||
loc)
|
||||
|
||||
def test_buffered_reader_opts(self):
|
||||
self.config(swift_buffer_on_upload=True)
|
||||
self.config(swift_upload_buffer_dir=self.test_dir)
|
||||
try:
|
||||
self.store = Store(self.conf)
|
||||
except exceptions.BadStoreConfiguration:
|
||||
self.fail("Buffered Reader exception raised when it "
|
||||
"should not have been")
|
||||
|
||||
def test_buffered_reader_with_invalid_path(self):
|
||||
self.config(swift_buffer_on_upload=True)
|
||||
self.config(swift_upload_buffer_dir="/some/path")
|
||||
self.store = Store(self.conf)
|
||||
self.assertRaises(exceptions.BadStoreConfiguration,
|
||||
self.store.configure)
|
||||
|
||||
def test_buffered_reader_with_no_path_given(self):
|
||||
self.config(swift_buffer_on_upload=True)
|
||||
self.store = Store(self.conf)
|
||||
self.assertRaises(exceptions.BadStoreConfiguration,
|
||||
self.store.configure)
|
||||
|
||||
@mock.patch('glance_store._drivers.swift.utils'
|
||||
'.is_multiple_swift_store_accounts_enabled',
|
||||
mock.Mock(return_value=False))
|
||||
|
@ -455,7 +478,8 @@ class SwiftTests(object):
|
|||
service_catalog=service_catalog)
|
||||
store = swift.MultiTenantStore(self.conf)
|
||||
store.configure()
|
||||
loc, size, checksum, _ = store.add(expected_image_id, image_swift,
|
||||
loc, size, checksum, _ = store.add(expected_image_id,
|
||||
image_swift,
|
||||
expected_swift_size,
|
||||
context=ctxt)
|
||||
# ensure that image add uses user's context
|
||||
|
@ -496,7 +520,8 @@ class SwiftTests(object):
|
|||
self.mock_keystone_client()
|
||||
self.store = Store(self.conf)
|
||||
self.store.configure()
|
||||
loc, size, checksum, _ = self.store.add(image_id, image_swift,
|
||||
loc, size, checksum, _ = self.store.add(image_id,
|
||||
image_swift,
|
||||
expected_swift_size)
|
||||
|
||||
self.assertEqual(expected_location, loc)
|
||||
|
@ -803,7 +828,8 @@ class SwiftTests(object):
|
|||
service_catalog=service_catalog)
|
||||
store = swift.MultiTenantStore(self.conf)
|
||||
store.configure()
|
||||
location, size, checksum, _ = store.add(expected_image_id, image_swift,
|
||||
location, size, checksum, _ = store.add(expected_image_id,
|
||||
image_swift,
|
||||
expected_swift_size,
|
||||
context=ctxt)
|
||||
self.assertEqual(expected_location, location)
|
||||
|
@ -895,7 +921,8 @@ class SwiftTests(object):
|
|||
self.store.large_object_size = units.Ki
|
||||
self.store.large_object_chunk_size = units.Ki
|
||||
loc, size, checksum, _ = self.store.add(expected_image_id,
|
||||
image_swift, 0)
|
||||
image_swift,
|
||||
0)
|
||||
finally:
|
||||
self.store.large_object_chunk_size = orig_temp_size
|
||||
self.store.large_object_size = orig_max_size
|
||||
|
@ -1946,3 +1973,179 @@ class TestMultipleContainers(base.StoreBaseTest):
|
|||
'default_container')
|
||||
expected = 'default_container'
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
|
||||
class TestBufferedReader(base.StoreBaseTest):
|
||||
|
||||
_CONF = cfg.CONF
|
||||
|
||||
def setUp(self):
|
||||
super(TestBufferedReader, self).setUp()
|
||||
self.config(swift_upload_buffer_dir=self.test_dir)
|
||||
s = b'1234567890'
|
||||
self.infile = six.BytesIO(s)
|
||||
self.infile.seek(0)
|
||||
|
||||
self.checksum = hashlib.md5()
|
||||
self.verifier = mock.MagicMock(name='mock_verifier')
|
||||
total = 7 # not the full 10 byte string - defines segment boundary
|
||||
self.reader = buffered.BufferedReader(self.infile, self.checksum,
|
||||
total, self.verifier)
|
||||
self.addCleanup(self.conf.reset)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestBufferedReader, self).tearDown()
|
||||
self.reader.__exit__(None, None, None)
|
||||
|
||||
def test_buffer(self):
|
||||
self.reader.read(4)
|
||||
self.assertTrue(self.reader._buffered, True)
|
||||
|
||||
# test buffer position
|
||||
self.assertEqual(4, self.reader.tell())
|
||||
|
||||
# also test buffer contents
|
||||
buf = self.reader._tmpfile
|
||||
buf.seek(0)
|
||||
self.assertEqual(b'1234567', buf.read())
|
||||
|
||||
def test_read(self):
|
||||
buf = self.reader.read(4) # buffer and return 1234
|
||||
self.assertEqual(b'1234', buf)
|
||||
|
||||
buf = self.reader.read(4) # return 567
|
||||
self.assertEqual(b'567', buf)
|
||||
self.assertEqual(7, self.reader.tell())
|
||||
|
||||
def test_read_limited(self):
|
||||
# read should not exceed the segment boundary described
|
||||
# by 'total'
|
||||
self.assertEqual(b'1234567', self.reader.read(100))
|
||||
|
||||
def test_reset(self):
|
||||
# test a reset like what swiftclient would do
|
||||
# if a segment upload failed.
|
||||
self.assertEqual(0, self.reader.tell())
|
||||
self.reader.read(4)
|
||||
self.assertEqual(4, self.reader.tell())
|
||||
|
||||
self.reader.seek(0)
|
||||
self.assertEqual(0, self.reader.tell())
|
||||
|
||||
# confirm a read after reset
|
||||
self.assertEqual(b'1234', self.reader.read(4))
|
||||
|
||||
def test_partial_reset(self):
|
||||
# reset, but not all the way to the beginning
|
||||
self.reader.read(4)
|
||||
self.reader.seek(2)
|
||||
self.assertEqual(b'34567', self.reader.read(10))
|
||||
|
||||
def test_checksum(self):
|
||||
# the md5 checksum is updated only once on a full segment read
|
||||
expected_csum = hashlib.md5()
|
||||
expected_csum.update(b'1234567')
|
||||
self.reader.read(7)
|
||||
self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
|
||||
|
||||
def test_checksum_updated_only_once_w_full_segment_read(self):
|
||||
# Test that the checksum is updated only once when a full segment read
|
||||
# is followed by a seek and partial reads.
|
||||
expected_csum = hashlib.md5()
|
||||
expected_csum.update(b'1234567')
|
||||
self.reader.read(7) # attempted read of the entire chunk
|
||||
self.reader.seek(4) # seek back due to possible partial failure
|
||||
self.reader.read(1) # read one more byte
|
||||
# checksum was updated just once during the first attempted full read
|
||||
self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
|
||||
|
||||
def test_checksum_updates_during_partial_segment_reads(self):
|
||||
# Test to check that checksum is updated with only the bytes it has
|
||||
# not seen when the number of bytes being read is changed
|
||||
expected_csum = hashlib.md5()
|
||||
self.reader.read(4)
|
||||
expected_csum.update(b'1234')
|
||||
self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
|
||||
self.reader.seek(0) # possible failure
|
||||
self.reader.read(2)
|
||||
self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
|
||||
self.reader.read(4) # checksum missing two bytes
|
||||
expected_csum.update(b'56')
|
||||
# checksum updated with only the bytes it did not see
|
||||
self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
|
||||
|
||||
def test_checksum_rolling_calls(self):
|
||||
# Test that the checksum continues on to the next segment
|
||||
expected_csum = hashlib.md5()
|
||||
self.reader.read(7)
|
||||
expected_csum.update(b'1234567')
|
||||
self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
|
||||
# another reader to complete reading the image file
|
||||
reader1 = buffered.BufferedReader(self.infile, self.checksum, 3,
|
||||
self.reader.verifier)
|
||||
reader1.read(3)
|
||||
expected_csum.update(b'890')
|
||||
self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
|
||||
|
||||
def test_verifier(self):
|
||||
# Test that the verifier is updated only once on a full segment read.
|
||||
self.reader.read(7)
|
||||
self.verifier.update.assert_called_once_with(b'1234567')
|
||||
|
||||
def test_verifier_updated_only_once_w_full_segment_read(self):
|
||||
# Test that the verifier is updated only once when a full segment read
|
||||
# is followed by a seek and partial reads.
|
||||
self.reader.read(7) # attempted read of the entire chunk
|
||||
self.reader.seek(4) # seek back due to possible partial failure
|
||||
self.reader.read(5) # continue reading
|
||||
# verifier was updated just once during the first attempted full read
|
||||
self.verifier.update.assert_called_once_with(b'1234567')
|
||||
|
||||
def test_verifier_updates_during_partial_segment_reads(self):
|
||||
# Test to check that verifier is updated with only the bytes it has
|
||||
# not seen when the number of bytes being read is changed
|
||||
self.reader.read(4)
|
||||
self.verifier.update.assert_called_once_with(b'1234')
|
||||
self.reader.seek(0) # possible failure
|
||||
self.reader.read(2) # verifier knows ahead
|
||||
self.verifier.update.assert_called_once_with(b'1234')
|
||||
self.reader.read(4) # verify missing 2 bytes
|
||||
# verifier updated with only the bytes it did not see
|
||||
self.verifier.update.assert_called_with(b'56') # verifier updated
|
||||
self.assertEqual(2, self.verifier.update.call_count)
|
||||
|
||||
def test_verifier_rolling_calls(self):
|
||||
# Test that the verifier continues on to the next segment
|
||||
self.reader.read(7)
|
||||
self.verifier.update.assert_called_once_with(b'1234567')
|
||||
self.assertEqual(1, self.verifier.update.call_count)
|
||||
# another reader to complete reading the image file
|
||||
reader1 = buffered.BufferedReader(self.infile, self.checksum, 3,
|
||||
self.reader.verifier)
|
||||
reader1.read(3)
|
||||
self.verifier.update.assert_called_with(b'890')
|
||||
self.assertEqual(2, self.verifier.update.call_count)
|
||||
|
||||
def test_light_buffer(self):
|
||||
# eventlet nonblocking fds means sometimes the buffer won't fill.
|
||||
# simulate testing where there is less in the buffer than a
|
||||
# full segment
|
||||
s = b'12'
|
||||
infile = six.BytesIO(s)
|
||||
infile.seek(0)
|
||||
total = 7
|
||||
checksum = hashlib.md5()
|
||||
self.reader = buffered.BufferedReader(infile, checksum, total)
|
||||
|
||||
self.reader.read(0) # read into buffer
|
||||
self.assertEqual(b'12', self.reader.read(7))
|
||||
self.assertEqual(2, self.reader.tell())
|
||||
|
||||
def test_context_exit(self):
|
||||
# should close tempfile on context exit
|
||||
with self.reader:
|
||||
pass
|
||||
|
||||
# file objects are not required to have a 'close' attribute
|
||||
if getattr(self.reader._tmpfile, 'closed'):
|
||||
self.assertTrue(self.reader._tmpfile.closed)
|
||||
|
|
Loading…
Reference in New Issue