Merge "Close file handle after upload job"

This commit is contained in:
Jenkins 2017-03-20 03:07:52 +00:00 committed by Gerrit Code Review
commit cf214393af
2 changed files with 104 additions and 68 deletions

View File

@ -1714,6 +1714,7 @@ class SwiftService(object):
segment_name),
'log_line': '%s segment %s' % (obj_name, segment_index),
}
fp = None
try:
fp = open(path, 'rb')
fp.seek(segment_start)
@ -1761,6 +1762,9 @@ class SwiftService(object):
if results_queue is not None:
results_queue.put(res)
return res
finally:
if fp is not None:
fp.close()
def _get_chunk_data(self, conn, container, obj, headers, manifest=None):
chunks = []
@ -2008,29 +2012,36 @@ class SwiftService(object):
else:
res['large_object'] = False
obr = {}
if path is not None:
content_length = getsize(path)
contents = LengthWrapper(open(path, 'rb'),
content_length,
md5=options['checksum'])
else:
content_length = None
contents = ReadableToIterable(stream,
md5=options['checksum'])
fp = None
try:
if path is not None:
content_length = getsize(path)
fp = open(path, 'rb')
contents = LengthWrapper(fp,
content_length,
md5=options['checksum'])
else:
content_length = None
contents = ReadableToIterable(stream,
md5=options['checksum'])
etag = conn.put_object(
container, obj, contents,
content_length=content_length, headers=put_headers,
response_dict=obr
)
res['response_dict'] = obr
etag = conn.put_object(
container, obj, contents,
content_length=content_length, headers=put_headers,
response_dict=obr
)
res['response_dict'] = obr
if (options['checksum'] and
etag and etag != contents.get_md5sum()):
raise SwiftError('Object upload verification failed: '
'md5 mismatch, local {0} != remote {1} '
'(remote object has not been removed)'
.format(contents.get_md5sum(), etag))
if (options['checksum'] and
etag and etag != contents.get_md5sum()):
raise SwiftError(
'Object upload verification failed: '
'md5 mismatch, local {0} != remote {1} '
'(remote object has not been removed)'
.format(contents.get_md5sum(), etag))
finally:
if fp is not None:
fp.close()
if old_manifest or old_slo_manifest_paths:
drs = []

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import unicode_literals
import contextlib
import mock
import os
import six
@ -1002,7 +1003,6 @@ class TestService(unittest.TestCase):
@mock.patch('swiftclient.service.stat')
@mock.patch('swiftclient.service.getmtime', return_value=1.0)
@mock.patch('swiftclient.service.getsize', return_value=4)
@mock.patch.object(builtins, 'open', return_value=six.StringIO('asdf'))
def test_upload_with_relative_path(self, *args, **kwargs):
service = SwiftService({})
objects = [{'path': "./test",
@ -1012,7 +1012,9 @@ class TestService(unittest.TestCase):
{'path': ".\\test",
'strt_indx': 2}]
for obj in objects:
with mock.patch('swiftclient.service.Connection') as mock_conn:
with mock.patch('swiftclient.service.Connection') as mock_conn, \
mock.patch.object(builtins, 'open') as mock_open:
mock_open.return_value = six.StringIO('asdf')
mock_conn.return_value.head_object.side_effect = \
ClientException('Not Found', http_status=404)
mock_conn.return_value.put_object.return_value =\
@ -1032,10 +1034,29 @@ class TestService(unittest.TestCase):
self.assertEqual(upload_obj_resp['object'],
obj['path'][obj['strt_indx']:])
self.assertEqual(upload_obj_resp['path'], obj['path'])
self.assertTrue(mock_open.return_value.closed)
class TestServiceUpload(_TestServiceBase):
@contextlib.contextmanager
def assert_open_results_are_closed(self):
opened_files = []
builtin_open = builtins.open
def open_wrapper(*a, **kw):
opened_files.append((builtin_open(*a, **kw), a, kw))
return opened_files[-1][0]
with mock.patch.object(builtins, 'open', open_wrapper):
yield
for fp, args, kwargs in opened_files:
formatted_args = [repr(a) for a in args]
formatted_args.extend('%s=%r' % kv for kv in kwargs.items())
formatted_args = ', '.join(formatted_args)
self.assertTrue(fp.closed,
'Failed to close open(%s)' % formatted_args)
def test_upload_object_job_file_with_unicode_path(self):
# Uploading a file results in the file object being wrapped in a
# LengthWrapper. This test sets the options in such a way that much
@ -1109,11 +1130,14 @@ class TestServiceUpload(_TestServiceBase):
f.write(b'c' * 10)
f.flush()
# Mock the connection to return an empty etag. This
# skips etag validation which would fail as the LengthWrapper
# isn't read from.
# run read() when put_object is called to calculate md5sum
def _consuming_conn(*a, **kw):
contents = a[2]
contents.read() # Force md5 calculation
return contents.get_md5sum()
mock_conn = mock.Mock()
mock_conn.put_object.return_value = ''
mock_conn.put_object.side_effect = _consuming_conn
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
expected_r = {
'action': 'upload_segment',
@ -1125,21 +1149,22 @@ class TestServiceUpload(_TestServiceBase):
'log_line': 'test_o segment 2',
'success': True,
'response_dict': {},
'segment_etag': '',
'segment_etag': md5(b'b' * 10).hexdigest(),
'attempts': 2,
}
s = SwiftService()
r = s._upload_segment_job(conn=mock_conn,
path=f.name,
container='test_c',
segment_name='test_s_1',
segment_start=10,
segment_size=10,
segment_index=2,
obj_name='test_o',
options={'segment_container': None,
'checksum': True})
with self.assert_open_results_are_closed():
r = s._upload_segment_job(conn=mock_conn,
path=f.name,
container='test_c',
segment_name='test_s_1',
segment_start=10,
segment_size=10,
segment_index=2,
obj_name='test_o',
options={'segment_container': None,
'checksum': True})
self.assertEqual(r, expected_r)
@ -1153,10 +1178,6 @@ class TestServiceUpload(_TestServiceBase):
contents = mock_conn.put_object.call_args[0][2]
self.assertIsInstance(contents, utils.LengthWrapper)
self.assertEqual(len(contents), 10)
# This read forces the LengthWrapper to calculate the md5
# for the read content.
self.assertEqual(contents.read(), b'b' * 10)
self.assertEqual(contents.get_md5sum(), md5(b'b' * 10).hexdigest())
def test_etag_mismatch_with_ignore_checksum(self):
def _consuming_conn(*a, **kw):
@ -1215,16 +1236,17 @@ class TestServiceUpload(_TestServiceBase):
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
s = SwiftService()
r = s._upload_segment_job(conn=mock_conn,
path=f.name,
container='test_c',
segment_name='test_s_1',
segment_start=10,
segment_size=10,
segment_index=2,
obj_name='test_o',
options={'segment_container': None,
'checksum': True})
with self.assert_open_results_are_closed():
r = s._upload_segment_job(conn=mock_conn,
path=f.name,
container='test_c',
segment_name='test_s_1',
segment_start=10,
segment_size=10,
segment_index=2,
obj_name='test_o',
options={'segment_container': None,
'checksum': True})
self.assertIn('md5 mismatch', str(r.get('error')))
@ -1259,21 +1281,29 @@ class TestServiceUpload(_TestServiceBase):
}
expected_mtime = '%f' % os.path.getmtime(f.name)
# run read() when put_object is called to calculate md5sum
# md5sum is verified in _upload_object_job.
def _consuming_conn(*a, **kw):
contents = a[2]
contents.read() # Force md5 calculation
return contents.get_md5sum()
mock_conn = mock.Mock()
mock_conn.put_object.return_value = ''
mock_conn.put_object.side_effect = _consuming_conn
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
s = SwiftService()
r = s._upload_object_job(conn=mock_conn,
container='test_c',
source=f.name,
obj='test_o',
options={'changed': False,
'skip_identical': False,
'leave_segments': True,
'header': '',
'segment_size': 0,
'checksum': True})
with self.assert_open_results_are_closed():
r = s._upload_object_job(conn=mock_conn,
container='test_c',
source=f.name,
obj='test_o',
options={'changed': False,
'skip_identical': False,
'leave_segments': True,
'header': '',
'segment_size': 0,
'checksum': True})
mtime = r['headers']['x-object-meta-mtime']
self.assertEqual(expected_mtime, mtime)
@ -1292,11 +1322,6 @@ class TestServiceUpload(_TestServiceBase):
contents = mock_conn.put_object.call_args[0][2]
self.assertIsInstance(contents, utils.LengthWrapper)
self.assertEqual(len(contents), 30)
# This read forces the LengthWrapper to calculate the md5
# for the read content. This also checks that LengthWrapper was
# initialized with md5=True
self.assertEqual(contents.read(), b'a' * 30)
self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest())
@mock.patch('swiftclient.service.time', return_value=1400000000)
def test_upload_object_job_stream(self, time_mock):