Fix UnicodeDecodeError in reconstructor _full_path function

Object paths can have non-ascii characters. Device dicts will
have unicode values. Forming a string using both will cause the
object path to be coerced to UTF8, which currently causes a
UnicodeDecodeError. This causes _get_response() to not return
and the recosntructor hangs.

The call to _full_path() is moved outside of _get_response()
(where its result is used in the exception handler logging)
so that _get_response() will always return even if _full_path()
raises an exception.

Unit tests are refactored to split out a new class with those
tests using an object name and the _full_path method, so that
the class can be subclassed to use an object name with non-ascii
characters.

Existing probe tests are subclassed to repeat using non-ascii
chars in object paths.

Change-Id: I4c570c08c770636d57b1157e19d5b7034fd9ed4e
Closes-Bug: 1679175
This commit is contained in:
Alistair Coles 2017-04-03 14:01:26 +01:00
parent bcd0eb70af
commit 83750cf79c
3 changed files with 76 additions and 123 deletions

View File

@ -80,6 +80,8 @@ def _full_path(node, part, relative_path, policy):
:class:`~swift.common.storage_policy.BaseStoragePolicy`
:return: string representation of absolute path on node plus policy index
"""
if not isinstance(relative_path, six.text_type):
relative_path = relative_path.decode('utf8')
return '%(replication_ip)s:%(replication_port)s' \
'/%(device)s/%(part)s%(path)s ' \
'policy#%(policy)d' % {
@ -215,7 +217,7 @@ class ObjectReconstructor(Daemon):
return False
return True
def _get_response(self, node, part, path, headers, policy):
def _get_response(self, node, part, path, headers, full_path):
"""
Helper method for reconstruction that GETs a single EC fragment
archive
@ -224,11 +226,9 @@ class ObjectReconstructor(Daemon):
:param part: the partition
:param path: path of the desired EC archive relative to partition dir
:param headers: the headers to send
:param policy: an instance of
:class:`~swift.common.storage_policy.BaseStoragePolicy`
:param full_path: full path to desired EC archive
:returns: response
"""
full_path = _full_path(node, part, path, policy)
resp = None
try:
with ConnectionTimeout(self.conn_timeout):
@ -285,8 +285,10 @@ class ObjectReconstructor(Daemon):
pile = GreenAsyncPile(len(part_nodes))
path = datafile_metadata['name']
for _node in part_nodes:
full_get_path = _full_path(
_node, job['partition'], path, job['policy'])
pile.spawn(self._get_response, _node, job['partition'],
path, headers, job['policy'])
path, headers, full_get_path)
buckets = defaultdict(dict)
etag_buckets = {}

View File

@ -63,10 +63,13 @@ class Body(object):
class TestReconstructorRebuild(ECProbeTest):
def _make_name(self, prefix):
return '%s%s' % (prefix, uuid.uuid4())
def setUp(self):
super(TestReconstructorRebuild, self).setUp()
self.container_name = 'container-%s' % uuid.uuid4()
self.object_name = 'object-%s' % uuid.uuid4()
self.container_name = self._make_name('container-')
self.object_name = self._make_name('object-')
# sanity
self.assertEqual(self.policy.policy_type, EC_POLICY)
self.reconstructor = Manager(["object-reconstructor"])
@ -115,9 +118,12 @@ class TestReconstructorRebuild(ECProbeTest):
if not require_durable:
req_headers.update(
{'X-Backend-Fragment-Preferences': json.dumps([])})
# node dict has unicode values so utf8 decode our path parts too in
# case they have non-ascii characters
headers, data = direct_client.direct_get_object(
node, part, self.account, self.container_name,
self.object_name, headers=req_headers,
node, part, self.account.decode('utf8'),
self.container_name.decode('utf8'),
self.object_name.decode('utf8'), headers=req_headers,
resp_chunk_size=64 * 2 ** 20)
hasher = md5()
for chunk in data:
@ -325,5 +331,11 @@ class TestReconstructorRebuild(ECProbeTest):
self.revive_drive(device_path)
class TestReconstructorRebuildUTF8(TestReconstructorRebuild):
def _make_name(self, prefix):
return '%s\xc3\xa8-%s' % (prefix, uuid.uuid4())
if __name__ == "__main__":
unittest.main()

View File

@ -652,7 +652,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
resp = self.reconstructor._get_response(node, part,
path='nada',
headers={},
policy=self.policy)
full_path='nada/nada')
return resp
resp = do_test(200)
@ -1161,8 +1161,7 @@ class TestGlobalSetupObjectReconstructorLegacyDurable(
@patch_policies(with_ec_default=True)
class TestObjectReconstructor(unittest.TestCase):
class BaseTestObjectReconstructor(unittest.TestCase):
def setUp(self):
self.policy = POLICIES.default
self.policy.object_ring._rtime = time.time() + 3600
@ -1205,6 +1204,8 @@ class TestObjectReconstructor(unittest.TestCase):
def ts(self):
return next(self.ts_iter)
class TestObjectReconstructor(BaseTestObjectReconstructor):
def test_handoffs_only_default(self):
# sanity neither option added to default conf
self.conf.pop('handoffs_first', None)
@ -2723,6 +2724,20 @@ class TestObjectReconstructor(unittest.TestCase):
# hashpath is still there, but it's empty
self.assertEqual([], os.listdir(df._datadir))
class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
obj_path = '/a/c/o' # subclass overrides this
def setUp(self):
super(TestReconstructFragmentArchive, self).setUp()
self.obj_timestamp = self.ts()
self.obj_metadata = {
'name': self.obj_path,
'Content-Length': '0',
'ETag': 'etag',
'X-Timestamp': self.obj_timestamp.normal
}
def test_reconstruct_fa_no_errors(self):
job = {
'partition': 0,
@ -2730,12 +2745,6 @@ class TestObjectReconstructor(unittest.TestCase):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': '0',
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -2764,7 +2773,7 @@ class TestObjectReconstructor(unittest.TestCase):
with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
job, node, self.obj_metadata)
self.assertEqual(0, df.content_length)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
@ -2783,7 +2792,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.policy)
self.assertIn('X-Backend-Fragment-Preferences', called_header)
self.assertEqual(
[{'timestamp': '1234567890.12345', 'exclude': []}],
[{'timestamp': self.obj_timestamp.normal, 'exclude': []}],
json.loads(called_header['X-Backend-Fragment-Preferences']))
# no error and warning
self.assertFalse(self.logger.get_lines_for_level('error'))
@ -2796,12 +2805,6 @@ class TestObjectReconstructor(unittest.TestCase):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[4]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -2825,7 +2828,7 @@ class TestObjectReconstructor(unittest.TestCase):
with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers_iter):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -2838,12 +2841,6 @@ class TestObjectReconstructor(unittest.TestCase):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[4]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -2875,7 +2872,7 @@ class TestObjectReconstructor(unittest.TestCase):
with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers_iter):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
# ... this bad response should be ignored like any other failure
self.assertEqual(len(fixed_body), len(broken_body))
@ -2889,12 +2886,6 @@ class TestObjectReconstructor(unittest.TestCase):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[-4]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
# make up some data (trim some amount to make it unaligned with
# segment size)
@ -2918,7 +2909,7 @@ class TestObjectReconstructor(unittest.TestCase):
with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers_iter):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -2932,19 +2923,13 @@ class TestObjectReconstructor(unittest.TestCase):
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
policy = self.policy
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
possible_errors = [Timeout(), Exception('kaboom!')]
codes = [random.choice(possible_errors) for i in
range(policy.object_ring.replicas - 1)]
with mocked_http_conn(*codes):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, metadata)
job, node, self.obj_metadata)
error_lines = self.logger.get_lines_for_level('error')
# # of replicas failed and one more error log to report not enough
# responses to reconstruct.
@ -2967,17 +2952,11 @@ class TestObjectReconstructor(unittest.TestCase):
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
policy = self.policy
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
codes = [404 for i in range(policy.object_ring.replicas - 1)]
with mocked_http_conn(*codes):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, metadata)
job, node, self.obj_metadata)
error_lines = self.logger.get_lines_for_level('error')
# only 1 log to report not enough responses
self.assertEqual(1, len(error_lines))
@ -2996,12 +2975,6 @@ class TestObjectReconstructor(unittest.TestCase):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -3035,7 +3008,7 @@ class TestObjectReconstructor(unittest.TestCase):
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
job, node, self.obj_metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -3052,12 +3025,6 @@ class TestObjectReconstructor(unittest.TestCase):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -3079,7 +3046,7 @@ class TestObjectReconstructor(unittest.TestCase):
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -3095,7 +3062,7 @@ class TestObjectReconstructor(unittest.TestCase):
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -3112,12 +3079,6 @@ class TestObjectReconstructor(unittest.TestCase):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -3136,7 +3097,7 @@ class TestObjectReconstructor(unittest.TestCase):
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -3156,7 +3117,7 @@ class TestObjectReconstructor(unittest.TestCase):
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -3166,8 +3127,9 @@ class TestObjectReconstructor(unittest.TestCase):
error_log_lines = self.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_log_lines))
self.assertIn(
'Mixed Etag (some garbage, %s) for 10.0.0.1:1001/sdb/0/a/c/o '
'policy#%s frag#1' % (etag, int(self.policy)),
'Mixed Etag (some garbage, %s) for 10.0.0.1:1001/sdb/0%s '
'policy#%s frag#1' %
(etag, self.obj_path.decode('utf8'), int(self.policy)),
error_log_lines[0])
self.assertFalse(self.logger.get_lines_for_level('warning'))
@ -3178,12 +3140,6 @@ class TestObjectReconstructor(unittest.TestCase):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
ec_archive_dict = dict()
@ -3220,7 +3176,7 @@ class TestObjectReconstructor(unittest.TestCase):
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, metadata)
job, node, self.obj_metadata)
error_lines = self.logger.get_lines_for_level('error')
# 1 error log per etag to report not enough responses
@ -3238,8 +3194,10 @@ class TestObjectReconstructor(unittest.TestCase):
del ec_archive_dict[(expected_etag, ts)]
expected = 'Unable to get enough responses (%s/10) to ' \
'reconstruct 10.0.0.1:1001/sdb/0/a/c/o policy#0 ' \
'frag#1 with ETag' % etag_count[expected_etag]
'reconstruct 10.0.0.1:1001/sdb/0%s policy#0 ' \
'frag#1 with ETag' % \
(etag_count[expected_etag],
self.obj_path.decode('utf8'))
self.assertIn(
expected, error_line,
"Unexpected error line found: Expected: %s Got: %s"
@ -3257,12 +3215,6 @@ class TestObjectReconstructor(unittest.TestCase):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
broken_node = random.randint(0, self.policy.ec_ndata - 1)
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -3283,7 +3235,7 @@ class TestObjectReconstructor(unittest.TestCase):
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, part_nodes[broken_node], metadata)
job, part_nodes[broken_node], self.obj_metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -3321,12 +3273,6 @@ class TestObjectReconstructor(unittest.TestCase):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -3348,7 +3294,7 @@ class TestObjectReconstructor(unittest.TestCase):
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
job, node, self.obj_metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -3376,14 +3322,6 @@ class TestObjectReconstructor(unittest.TestCase):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
ts = make_timestamp_iter()
timestamp = next(ts)
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': timestamp.normal
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -3395,7 +3333,7 @@ class TestObjectReconstructor(unittest.TestCase):
headers = get_header_frag_index(self, body)
headers.update(
{'X-Object-Sysmeta-Ec-Etag': etag,
'X-Backend-Timestamp': timestamp.internal})
'X-Backend-Timestamp': self.obj_timestamp.internal})
return headers
def test_missing_header(missing_header, expected_warning):
@ -3413,7 +3351,7 @@ class TestObjectReconstructor(unittest.TestCase):
with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
job, node, self.obj_metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -3427,7 +3365,8 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertIn(expected_warning, warning_log_lines)
message_base = \
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o policy#0"
"Invalid resp from 10.0.0.0:1000/sda/0%s policy#0" % \
self.obj_path.decode('utf-8')
test_missing_header(
'X-Object-Sysmeta-Ec-Frag-Index',
@ -3453,12 +3392,6 @@ class TestObjectReconstructor(unittest.TestCase):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -3483,7 +3416,7 @@ class TestObjectReconstructor(unittest.TestCase):
with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
job, node, self.obj_metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -3495,15 +3428,21 @@ class TestObjectReconstructor(unittest.TestCase):
warning_log_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_log_lines))
expected_message = \
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o " \
"Invalid resp from 10.0.0.0:1000/sda/0%s " \
"policy#0 (invalid X-Object-Sysmeta-Ec-Frag-Index: %r)" % \
invalid_frag_index
(self.obj_path.decode('utf8'), invalid_frag_index)
self.assertIn(expected_message, warning_log_lines)
for value in ('None', 'invalid'):
test_invalid_ec_frag_index_header(value)
@patch_policies(with_ec_default=True)
class TestReconstructFragmentArchiveUTF8(TestReconstructFragmentArchive):
# repeat superclass tests with an object path that contains non-ascii chars
obj_path = '/a/c/o\xc3\xa8'
@patch_policies([ECStoragePolicy(0, name='ec', is_default=True,
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4,