diff --git a/glance_store/_drivers/swift/buffered.py b/glance_store/_drivers/swift/buffered.py new file mode 100644 index 00000000..950c95f8 --- /dev/null +++ b/glance_store/_drivers/swift/buffered.py @@ -0,0 +1,169 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import socket +import tempfile + +from oslo_config import cfg +from oslo_utils import encodeutils + +from glance_store import exceptions +from glance_store.i18n import _ + +LOG = logging.getLogger(__name__) +READ_SZ = 65536 + +BUFFERING_OPTS = [ + cfg.StrOpt('swift_upload_buffer_dir', + help=_(""" +Directory to buffer image segments before upload to Swift. + +Provide a string value representing the absolute path to the +directory on the glance node where image segments will be +buffered briefly before they are uploaded to swift. + +NOTES: +* This is required only when the configuration option + ``swift_buffer_on_upload`` is set to True. +* This directory should be provisioned keeping in mind the + ``swift_store_large_object_chunk_size`` and the maximum + number of images that could be uploaded simultaneously by + a given glance node. + +Possible values: + * String value representing an absolute directory path + +Related options: + * swift_buffer_on_upload + * swift_store_large_object_chunk_size + +""")), +] +CONF = cfg.CONF + + +def validate_buffering(buffer_dir): + if buffer_dir is None: + msg = _('Configuration option "swift_upload_buffer_dir" is ' + 'not set. Please set it to a valid path to buffer ' + 'during Swift uploads.') + raise exceptions.BadStoreConfiguration(store_name='swift', + reason=msg) + + # NOTE(dharinic): Ensure that the provided directory path for + # buffering is valid + try: + _tmpfile = tempfile.TemporaryFile(dir=buffer_dir) + except OSError as err: + msg = (_('Unable to use buffer directory set with ' + '"swift_upload_buffer_dir". Error: %s') % + encodeutils.exception_to_unicode(err)) + raise exceptions.BadStoreConfiguration(store_name='swift', + reason=msg) + else: + _tmpfile.close() + return True + + +class BufferedReader(object): + """Buffer a chunk (segment) worth of data to disk before sending it swift. + This creates the ability to back the input stream up and re-try put object + requests. (Swiftclient will try to reset the file pointer on any upload + failure if seek and tell methods are provided on the input file.) + + Chunks are temporarily buffered to disk. Disk space consumed will be + roughly (segment size * number of in-flight upload requests). + + There exists a possibility where the disk space consumed for buffering MAY + eat into the disk space available for glance cache. This may affect image + download performance. So, extra care should be taken while deploying this + to ensure there is enough disk space available. + """ + + def __init__(self, fd, checksum, total, verifier=None): + self.fd = fd + self.total = total + self.checksum = checksum + self.verifier = verifier + # maintain a pointer to use to update checksum and verifier + self.update_position = 0 + + buffer_dir = CONF.glance_store.swift_upload_buffer_dir + self._tmpfile = tempfile.TemporaryFile(dir=buffer_dir) + + self._buffered = False + self.is_zero_size = True + self._buffer() + # Setting the file pointer back to the beginning of file + self._tmpfile.seek(0) + + def read(self, sz): + """Read up to a chunk's worth of data from the input stream into a + file buffer. Then return data out of that buffer. + """ + remaining = self.total - self._tmpfile.tell() + read_size = min(remaining, sz) + # read out of the buffered chunk + result = self._tmpfile.read(read_size) + # update the checksum and verifier with only the bytes + # they have not seen + update = self.update_position - self._tmpfile.tell() + if update < 0: + self.checksum.update(result[update:]) + if self.verifier: + self.verifier.update(result[update:]) + self.update_position += abs(update) + return result + + def _buffer(self): + to_buffer = self.total + LOG.debug("Buffering %s bytes of image segment" % to_buffer) + + while not self._buffered: + read_sz = min(to_buffer, READ_SZ) + try: + buf = self.fd.read(read_sz) + except IOError as e: + # We actually don't know what exactly self.fd is. And as a + # result we don't know which exception it may raise. To pass + # the retry mechanism inside swift client we must limit the + # possible set of errors. + raise socket.error(*e.args) + if len(buf) == 0: + self._tmpfile.seek(0) + self._buffered = True + self.is_zero_size = False + break + self._tmpfile.write(buf) + to_buffer -= len(buf) + + # NOTE(belliott) seek and tell get used by python-swiftclient to "reset" + # if there is a put_object error + def seek(self, offset): + LOG.debug("Seek from %s to %s" % (self._tmpfile.tell(), offset)) + self._tmpfile.seek(offset) + + def tell(self): + return self._tmpfile.tell() + + @property + def bytes_read(self): + return self.tell() + + def __enter__(self): + self._tmpfile.__enter__() + return self + + def __exit__(self, type, value, traceback): + # close and delete the temporary file used to buffer data + self._tmpfile.__exit__(type, value, traceback) diff --git a/glance_store/_drivers/swift/store.py b/glance_store/_drivers/swift/store.py index 7801be38..27b240c1 100644 --- a/glance_store/_drivers/swift/store.py +++ b/glance_store/_drivers/swift/store.py @@ -35,8 +35,8 @@ try: except ImportError: swiftclient = None - import glance_store +from glance_store._drivers.swift import buffered from glance_store._drivers.swift import connection_manager from glance_store._drivers.swift import utils as sutils from glance_store import capabilities @@ -447,6 +447,32 @@ Possible values: Related options: * swift_store_multi_tenant +""")), + cfg.BoolOpt('swift_buffer_on_upload', + default=False, + help=_(""" +Buffer image segments before upload to Swift. + +Provide a boolean value to indicate whether or not Glance should +buffer image data to disk while uploading to swift. This enables +Glance to resume uploads on error. + +NOTES: +When enabling this option, one should take great care as this +increases disk usage on the API node. Be aware that depending +upon how the file system is configured, the disk space used +for buffering may decrease the actual disk space available for +the glance image cache. Disk utilization will cap according to +the following equation: +(``swift_store_large_object_chunk_size`` * ``workers`` * 1000) + +Possible values: + * True + * False + +Related options: + * swift_upload_buffer_dir + """)) ] @@ -715,8 +741,8 @@ def Store(conf): raise exceptions.BadStoreConfiguration(store_name="swift", reason=msg) try: - conf.register_opts(_SWIFT_OPTS + sutils.swift_opts, - group='glance_store') + conf.register_opts(_SWIFT_OPTS + sutils.swift_opts + + buffered.BUFFERING_OPTS, group='glance_store') except cfg.DuplicateOptError: pass @@ -724,7 +750,7 @@ def Store(conf): return MultiTenantStore(conf) return SingleTenantStore(conf) -Store.OPTIONS = _SWIFT_OPTS + sutils.swift_opts +Store.OPTIONS = _SWIFT_OPTS + sutils.swift_opts + buffered.BUFFERING_OPTS def _is_slo(slo_header): @@ -762,6 +788,14 @@ class BaseStore(driver.Store): msg = _("Missing dependency python_swiftclient.") raise exceptions.BadStoreConfiguration(store_name="swift", reason=msg) + + if glance_conf.swift_buffer_on_upload: + buffer_dir = glance_conf.swift_upload_buffer_dir + if buffered.validate_buffering(buffer_dir): + self.reader_class = buffered.BufferedReader + else: + self.reader_class = ChunkReader + super(BaseStore, self).configure(re_raise_bsc=re_raise_bsc) def _get_object(self, location, manager, start=None): @@ -905,42 +939,45 @@ class BaseStore(driver.Store): content_length = chunk_size chunk_name = "%s-%05d" % (location.obj, chunk_id) - reader = ChunkReader(image_file, checksum, chunk_size, - verifier) - if reader.is_zero_size is True: - LOG.debug('Not writing zero-length chunk.') - break - try: - chunk_etag = manager.get_connection().put_object( - location.container, chunk_name, reader, - content_length=content_length) - written_chunks.append(chunk_name) - except Exception: - # Delete orphaned segments from swift backend - with excutils.save_and_reraise_exception(): - reason = _LE("Error during chunked upload to " - "backend, deleting stale chunks") - LOG.error(reason) - self._delete_stale_chunks( - manager.get_connection(), - location.container, - written_chunks) - bytes_read = reader.bytes_read - msg = ("Wrote chunk %(chunk_name)s (%(chunk_id)d/" - "%(total_chunks)s) of length %(bytes_read)d " - "to Swift returning MD5 of content: " - "%(chunk_etag)s" % - {'chunk_name': chunk_name, - 'chunk_id': chunk_id, - 'total_chunks': total_chunks, - 'bytes_read': bytes_read, - 'chunk_etag': chunk_etag}) - LOG.debug(msg) + with self.reader_class(image_file, checksum, + chunk_size, verifier) as reader: + if reader.is_zero_size is True: + LOG.debug('Not writing zero-length chunk.') + break + + try: + chunk_etag = \ + manager.get_connection().put_object( + location.container, + chunk_name, reader, + content_length=content_length) + written_chunks.append(chunk_name) + except Exception: + # Delete orphaned segments from swift backend + with excutils.save_and_reraise_exception(): + LOG.error(_("Error during chunked upload " + "to backend, deleting stale " + "chunks.")) + self._delete_stale_chunks( + manager.get_connection(), + location.container, + written_chunks) + + bytes_read = reader.bytes_read + msg = ("Wrote chunk %(chunk_name)s (%(chunk_id)d/" + "%(total_chunks)s) of length %(bytes_read)" + "d to Swift returning MD5 of content: " + "%(chunk_etag)s" % + {'chunk_name': chunk_name, + 'chunk_id': chunk_id, + 'total_chunks': total_chunks, + 'bytes_read': bytes_read, + 'chunk_etag': chunk_etag}) + LOG.debug(msg) chunk_id += 1 combined_chunks_size += bytes_read - # In the case we have been given an unknown image size, # set the size to the total size of the combined chunks. if image_size == 0: @@ -1501,3 +1538,9 @@ class ChunkReader(object): if self.verifier: self.verifier.update(result) return result + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + pass diff --git a/glance_store/tests/unit/test_opts.py b/glance_store/tests/unit/test_opts.py index b00c9d58..c0e0f8af 100644 --- a/glance_store/tests/unit/test_opts.py +++ b/glance_store/tests/unit/test_opts.py @@ -122,6 +122,8 @@ class OptsTestCase(base.StoreBaseTest): 'swift_store_ssl_compression', 'swift_store_use_trusts', 'swift_store_user', + 'swift_buffer_on_upload', + 'swift_upload_buffer_dir', 'vmware_insecure', 'vmware_ca_file', 'vmware_api_retry_count', diff --git a/glance_store/tests/unit/test_swift_store.py b/glance_store/tests/unit/test_swift_store.py index 6a36e37e..dd055799 100644 --- a/glance_store/tests/unit/test_swift_store.py +++ b/glance_store/tests/unit/test_swift_store.py @@ -34,6 +34,7 @@ from six.moves import http_client from six.moves import range import swiftclient +from glance_store._drivers.swift import buffered from glance_store._drivers.swift import connection_manager as manager from glance_store._drivers.swift import store as swift from glance_store._drivers.swift import utils as sutils @@ -354,6 +355,28 @@ class SwiftTests(object): self.store.get, loc) + def test_buffered_reader_opts(self): + self.config(swift_buffer_on_upload=True) + self.config(swift_upload_buffer_dir=self.test_dir) + try: + self.store = Store(self.conf) + except exceptions.BadStoreConfiguration: + self.fail("Buffered Reader exception raised when it " + "should not have been") + + def test_buffered_reader_with_invalid_path(self): + self.config(swift_buffer_on_upload=True) + self.config(swift_upload_buffer_dir="/some/path") + self.store = Store(self.conf) + self.assertRaises(exceptions.BadStoreConfiguration, + self.store.configure) + + def test_buffered_reader_with_no_path_given(self): + self.config(swift_buffer_on_upload=True) + self.store = Store(self.conf) + self.assertRaises(exceptions.BadStoreConfiguration, + self.store.configure) + @mock.patch('glance_store._drivers.swift.utils' '.is_multiple_swift_store_accounts_enabled', mock.Mock(return_value=False)) @@ -455,7 +478,8 @@ class SwiftTests(object): service_catalog=service_catalog) store = swift.MultiTenantStore(self.conf) store.configure() - loc, size, checksum, _ = store.add(expected_image_id, image_swift, + loc, size, checksum, _ = store.add(expected_image_id, + image_swift, expected_swift_size, context=ctxt) # ensure that image add uses user's context @@ -496,7 +520,8 @@ class SwiftTests(object): self.mock_keystone_client() self.store = Store(self.conf) self.store.configure() - loc, size, checksum, _ = self.store.add(image_id, image_swift, + loc, size, checksum, _ = self.store.add(image_id, + image_swift, expected_swift_size) self.assertEqual(expected_location, loc) @@ -803,7 +828,8 @@ class SwiftTests(object): service_catalog=service_catalog) store = swift.MultiTenantStore(self.conf) store.configure() - location, size, checksum, _ = store.add(expected_image_id, image_swift, + location, size, checksum, _ = store.add(expected_image_id, + image_swift, expected_swift_size, context=ctxt) self.assertEqual(expected_location, location) @@ -895,7 +921,8 @@ class SwiftTests(object): self.store.large_object_size = units.Ki self.store.large_object_chunk_size = units.Ki loc, size, checksum, _ = self.store.add(expected_image_id, - image_swift, 0) + image_swift, + 0) finally: self.store.large_object_chunk_size = orig_temp_size self.store.large_object_size = orig_max_size @@ -1946,3 +1973,179 @@ class TestMultipleContainers(base.StoreBaseTest): 'default_container') expected = 'default_container' self.assertEqual(expected, actual) + + +class TestBufferedReader(base.StoreBaseTest): + + _CONF = cfg.CONF + + def setUp(self): + super(TestBufferedReader, self).setUp() + self.config(swift_upload_buffer_dir=self.test_dir) + s = b'1234567890' + self.infile = six.BytesIO(s) + self.infile.seek(0) + + self.checksum = hashlib.md5() + self.verifier = mock.MagicMock(name='mock_verifier') + total = 7 # not the full 10 byte string - defines segment boundary + self.reader = buffered.BufferedReader(self.infile, self.checksum, + total, self.verifier) + self.addCleanup(self.conf.reset) + + def tearDown(self): + super(TestBufferedReader, self).tearDown() + self.reader.__exit__(None, None, None) + + def test_buffer(self): + self.reader.read(4) + self.assertTrue(self.reader._buffered, True) + + # test buffer position + self.assertEqual(4, self.reader.tell()) + + # also test buffer contents + buf = self.reader._tmpfile + buf.seek(0) + self.assertEqual(b'1234567', buf.read()) + + def test_read(self): + buf = self.reader.read(4) # buffer and return 1234 + self.assertEqual(b'1234', buf) + + buf = self.reader.read(4) # return 567 + self.assertEqual(b'567', buf) + self.assertEqual(7, self.reader.tell()) + + def test_read_limited(self): + # read should not exceed the segment boundary described + # by 'total' + self.assertEqual(b'1234567', self.reader.read(100)) + + def test_reset(self): + # test a reset like what swiftclient would do + # if a segment upload failed. + self.assertEqual(0, self.reader.tell()) + self.reader.read(4) + self.assertEqual(4, self.reader.tell()) + + self.reader.seek(0) + self.assertEqual(0, self.reader.tell()) + + # confirm a read after reset + self.assertEqual(b'1234', self.reader.read(4)) + + def test_partial_reset(self): + # reset, but not all the way to the beginning + self.reader.read(4) + self.reader.seek(2) + self.assertEqual(b'34567', self.reader.read(10)) + + def test_checksum(self): + # the md5 checksum is updated only once on a full segment read + expected_csum = hashlib.md5() + expected_csum.update(b'1234567') + self.reader.read(7) + self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest()) + + def test_checksum_updated_only_once_w_full_segment_read(self): + # Test that the checksum is updated only once when a full segment read + # is followed by a seek and partial reads. + expected_csum = hashlib.md5() + expected_csum.update(b'1234567') + self.reader.read(7) # attempted read of the entire chunk + self.reader.seek(4) # seek back due to possible partial failure + self.reader.read(1) # read one more byte + # checksum was updated just once during the first attempted full read + self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest()) + + def test_checksum_updates_during_partial_segment_reads(self): + # Test to check that checksum is updated with only the bytes it has + # not seen when the number of bytes being read is changed + expected_csum = hashlib.md5() + self.reader.read(4) + expected_csum.update(b'1234') + self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest()) + self.reader.seek(0) # possible failure + self.reader.read(2) + self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest()) + self.reader.read(4) # checksum missing two bytes + expected_csum.update(b'56') + # checksum updated with only the bytes it did not see + self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest()) + + def test_checksum_rolling_calls(self): + # Test that the checksum continues on to the next segment + expected_csum = hashlib.md5() + self.reader.read(7) + expected_csum.update(b'1234567') + self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest()) + # another reader to complete reading the image file + reader1 = buffered.BufferedReader(self.infile, self.checksum, 3, + self.reader.verifier) + reader1.read(3) + expected_csum.update(b'890') + self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest()) + + def test_verifier(self): + # Test that the verifier is updated only once on a full segment read. + self.reader.read(7) + self.verifier.update.assert_called_once_with(b'1234567') + + def test_verifier_updated_only_once_w_full_segment_read(self): + # Test that the verifier is updated only once when a full segment read + # is followed by a seek and partial reads. + self.reader.read(7) # attempted read of the entire chunk + self.reader.seek(4) # seek back due to possible partial failure + self.reader.read(5) # continue reading + # verifier was updated just once during the first attempted full read + self.verifier.update.assert_called_once_with(b'1234567') + + def test_verifier_updates_during_partial_segment_reads(self): + # Test to check that verifier is updated with only the bytes it has + # not seen when the number of bytes being read is changed + self.reader.read(4) + self.verifier.update.assert_called_once_with(b'1234') + self.reader.seek(0) # possible failure + self.reader.read(2) # verifier knows ahead + self.verifier.update.assert_called_once_with(b'1234') + self.reader.read(4) # verify missing 2 bytes + # verifier updated with only the bytes it did not see + self.verifier.update.assert_called_with(b'56') # verifier updated + self.assertEqual(2, self.verifier.update.call_count) + + def test_verifier_rolling_calls(self): + # Test that the verifier continues on to the next segment + self.reader.read(7) + self.verifier.update.assert_called_once_with(b'1234567') + self.assertEqual(1, self.verifier.update.call_count) + # another reader to complete reading the image file + reader1 = buffered.BufferedReader(self.infile, self.checksum, 3, + self.reader.verifier) + reader1.read(3) + self.verifier.update.assert_called_with(b'890') + self.assertEqual(2, self.verifier.update.call_count) + + def test_light_buffer(self): + # eventlet nonblocking fds means sometimes the buffer won't fill. + # simulate testing where there is less in the buffer than a + # full segment + s = b'12' + infile = six.BytesIO(s) + infile.seek(0) + total = 7 + checksum = hashlib.md5() + self.reader = buffered.BufferedReader(infile, checksum, total) + + self.reader.read(0) # read into buffer + self.assertEqual(b'12', self.reader.read(7)) + self.assertEqual(2, self.reader.tell()) + + def test_context_exit(self): + # should close tempfile on context exit + with self.reader: + pass + + # file objects are not required to have a 'close' attribute + if getattr(self.reader._tmpfile, 'closed'): + self.assertTrue(self.reader._tmpfile.closed)