Do not sync suffixes when remote rejects reconstructor revert

SSYNC is designed to limit concurrent incoming connections in order to
prevent IO contention.  The reconstructor should expect remote
replication servers to fail ssync_sender when the remote is too busy.
When the remote rejects SSYNC - it should avoid forcing additional IO
against the remote with a REPLICATE request which causes suffix
rehashing.

Suffix rehashing via REPLICATE verbs takes two forms:

1) a initial pre-flight call to REPLICATE /dev/part will cause a remote
primary to rehash any invalid suffixes and return a map for the local
sender to compare so that a sync can be performed on any mis-matched
suffixes.

2) a final call to REPLICATE /dev/part/suf1-suf2-suf3[-sufX[...]] will
cause the remote primary to rehash the *given* suffixes even if they are
*not* invalid.  This is a requirement for rsync replication because
after a suffix is synced via rsync the contents of a suffix dir will
likely have changed and the remote server needs to update it hashes.pkl
to reflect the new data.

SSYNC does not *need* to send a post-sync REPLICATE request.  Any
suffixes that are modified by the SSYNC protocol will call _finalize_put
under the hood as it is syncing.  It is however not harmful and
potentially useful to go ahead refresh hashes after an SSYNC while the
inodes of those suffixes are warm in the cache.

However, that only makes sense if the SSYNC conversation actually synced
any suffixes - if SSYNC is rejected for concurrency before it ever got
started there is no value in the remote performing a rehash.  It may be
that *another* reconstructor is pushing data into that same partition
and the suffixes will become immediately invalidated.

If a ssync_sender does not successful finish a sync the reconstructor
should skip the REPLICATE call entirely and move on to the next
partition without causing any useless remote IO.

Closes-Bug: #1665141

Change-Id: Ia72c407247e4525ef071a1728750850807ae8231
This commit is contained in:
Clay Gerrard 2017-02-16 14:14:09 -08:00 committed by Ondřej Nový
parent 01156e7f93
commit e127f2277c
3 changed files with 85 additions and 23 deletions

View File

@ -665,8 +665,8 @@ class ObjectReconstructor(Daemon):
for node in job['sync_to']:
success, in_sync_objs = ssync_sender(
self, node, job, job['suffixes'])()
self.rehash_remote(node, job, job['suffixes'])
if success:
self.rehash_remote(node, job, job['suffixes'])
syncd_with += 1
reverted_objs.update(in_sync_objs)
if syncd_with >= len(job['sync_to']):

View File

@ -999,6 +999,7 @@ def fake_http_connect(*code_iter, **kwargs):
body_iter = kwargs.get('body_iter', None)
if body_iter:
body_iter = iter(body_iter)
unexpected_requests = []
def connect(*args, **ckwargs):
if kwargs.get('slow_connect', False):
@ -1008,7 +1009,15 @@ def fake_http_connect(*code_iter, **kwargs):
kwargs['give_content_type'](args[6]['Content-Type'])
else:
kwargs['give_content_type']('')
i, status = next(conn_id_and_code_iter)
try:
i, status = next(conn_id_and_code_iter)
except StopIteration:
# the code under test may swallow the StopIteration, so by logging
# unexpected requests here we allow the test framework to check for
# them after the connect function has been used.
unexpected_requests.append((args, kwargs))
raise
if 'give_connect' in kwargs:
give_conn_fn = kwargs['give_connect']
argspec = inspect.getargspec(give_conn_fn)
@ -1031,6 +1040,7 @@ def fake_http_connect(*code_iter, **kwargs):
connection_id=i, give_send=kwargs.get('give_send'),
give_expect=kwargs.get('give_expect'))
connect.unexpected_requests = unexpected_requests
connect.code_iter = code_iter
return connect

View File

@ -876,7 +876,15 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
for status_path in status_paths:
self.assertTrue(os.path.exists(status_path))
def _make_fake_ssync(self, ssync_calls):
def _make_fake_ssync(self, ssync_calls, fail_jobs=None):
"""
Replace SsyncSender with a thin Fake.
:param ssync_calls: an empty list, a non_local, all calls to ssync will
be captured for assertion in the caller.
:param fail_jobs: optional iter of dicts, any job passed into Fake that
matches a failure dict will return success == False.
"""
class _fake_ssync(object):
def __init__(self, daemon, node, job, suffixes, **kwargs):
# capture context and generate an available_map of objs
@ -896,9 +904,15 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.available_map[hash_] = timestamps
context['available_map'] = self.available_map
ssync_calls.append(context)
self.success = True
for failure in (fail_jobs or []):
if all(job.get(k) == v for (k, v) in failure.items()):
self.success = False
break
context['success'] = self.success
def __call__(self, *args, **kwargs):
return True, self.available_map
return self.success, self.available_map if self.success else {}
return _fake_ssync
@ -957,6 +971,57 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
# sanity check that some files should were deleted
self.assertTrue(n_files > n_files_after)
def test_no_delete_failed_revert(self):
# test will only process revert jobs
self.reconstructor.handoffs_only = True
captured_ssync = []
# fail all jobs on part 2 on sda1
fail_jobs = [
{'device': 'sda1', 'partition': 2},
]
with mock.patch('swift.obj.reconstructor.ssync_sender',
self._make_fake_ssync(
captured_ssync, fail_jobs=fail_jobs)), \
mocked_http_conn(*[200, 200],
body=pickle.dumps({})) as request_log:
self.reconstructor.reconstruct()
# global setup has four revert jobs
self.assertEqual(len(captured_ssync), 4)
expected_ssync_calls = set([
# device, part, frag_index
('sda1', 2, 2),
('sda1', 2, 0),
('sda1', 0, 2),
('sda1', 1, 1),
])
self.assertEqual(expected_ssync_calls, set([
(context['job']['device'],
context['job']['partition'],
context['job']['frag_index'])
for context in captured_ssync
]))
self.assertEqual(2, len(request_log.requests))
expected_suffix_calls = []
for context in captured_ssync:
if not context['success']:
# only successful jobs generate suffix rehash calls
continue
job = context['job']
expected_suffix_calls.append(
(job['sync_to'][0]['replication_ip'], '/%s/%s/%s' % (
job['device'], job['partition'],
'-'.join(sorted(job['suffixes']))))
)
self.assertEqual(set(expected_suffix_calls),
set((r['ip'], r['path'])
for r in request_log.requests))
self.assertFalse(
self.reconstructor.logger.get_lines_for_level('error'))
self.assertFalse(request_log.unexpected_requests)
def test_get_part_jobs(self):
# yeah, this test code expects a specific setup
self.assertEqual(len(self.part_nums), 3)
@ -2407,15 +2472,11 @@ class TestObjectReconstructor(unittest.TestCase):
ssync_calls = []
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mocked_http_conn(*[200] * len(expected_suffix_calls),
body=pickle.dumps({})) as request_log:
mocked_http_conn() as request_log:
self.reconstructor.process_job(job)
found_suffix_calls = set((r['ip'], r['path'])
for r in request_log.requests)
self.assertEqual(expected_suffix_calls, found_suffix_calls)
# failed ssync job should not generate a suffix rehash
self.assertEqual([], request_log.requests)
self.assertEqual(len(ssync_calls), len(expected_suffix_calls))
call = ssync_calls[0]
@ -2456,23 +2517,14 @@ class TestObjectReconstructor(unittest.TestCase):
# should increment
return False, {}
expected_suffix_calls = set([
(sync_to[0]['replication_ip'],
'/%s/0/123-abc' % sync_to[0]['device'])
])
ssync_calls = []
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mocked_http_conn(*[200] * len(expected_suffix_calls),
body=pickle.dumps({})) as request_log:
mocked_http_conn() as request_log:
self.reconstructor.process_job(job)
found_suffix_calls = set((r['ip'], r['path'])
for r in request_log.requests)
self.assertEqual(expected_suffix_calls, found_suffix_calls)
# failed ssync job should not generate a suffix rehash
self.assertEqual([], request_log.requests)
# this is ssync call to primary (which fails) and nothing else!
self.assertEqual(len(ssync_calls), 1)