Fix race in new partitions detecting new/invalid suffixes.

The assumption that we don't need to write an entry in the invalidations
file when the hashes.pkl does not exist turned out to be a premature
optimization and also wrong.

Primarily we should recognize the creation of hashes.pkl is the first
thing that happens in a part when it lands on a new primary.  The code
should be optimized toward the assumption of the most common disk state.

Also, in this case the extra stat calls to check if the hashes.pkl exists
were not only un-optimized - but introducing a race.

Consider the common case:

proc 1                         | proc 2
-------------------------------|---------------------------
a) read then truncate journal  |
b) do work                     | c) append to journal
d) apply "a" to index          |

The index written at "d" may not (yet) reflect the entry writen by proc
2 at "c"; however, it's clearly in the journal so it's easy to see we're
safe.

Adding in the extra stat call for the index existence check increases
the state which can effect correctness.

proc 1                        | proc 2
------------------------------|---------------------------
a) no index, truncate journal |
b) do work                    | b) iff index exists
                              | c) append to journal
d) apply (or create) index    |

If step "c" doesn't happen because the index does not yet exist - the
update is clearly lost.

In our case we'd skip marking a suffix as invalid when the hashes.pkl
does not exist because we know "the next time we rehash" we'll have to
os.listdir suffixes anyway.  But if another process is *currently*
rehashing (and has already done it's os.listdir) instead we've just
dropped an invalidation on the floor.

Don't do that.

Instead - write down the invalidation.  The running rehash is welcome to
proceed on outdated information - as long as the next pass will grab and
hash the new suffix.

Known-Issue(s):

If the suffix already exists there's an even chance the running rehash
will hash in the very update for which we want to invalidate the suffix,
but that's ok it's idempotent.

Co-Author: Pavel Kvasnička <pavel.kvasnicka@firma.seznam.cz>
Co-Author: Alistair Coles <alistair.coles@hpe.com>
Co-Author: Kota Tsuyuzaki <tsuyuzaki.kota@lab.ntt.co.jp>
Related-Change-Id: I64cadb1a3feb4d819d545137eecfc295389794f0
Change-Id: I2b48238d9d684e831d9777a7b18f91a3cef57cd1
Closes-Bug: #1651530
This commit is contained in:
Clay Gerrard 2017-01-10 18:53:08 -08:00 committed by Alistair Coles
parent ffd099c26a
commit 442cc1d16d
4 changed files with 175 additions and 22 deletions

View File

@ -309,10 +309,6 @@ def invalidate_hash(suffix_dir):
suffix = basename(suffix_dir)
partition_dir = dirname(suffix_dir)
hashes_file = join(partition_dir, HASH_FILE)
if not os.path.exists(hashes_file):
return
invalidations_file = join(partition_dir, HASH_INVALIDATIONS_FILE)
with lock_path(partition_dir):
with open(invalidations_file, 'ab') as inv_fh:

View File

@ -830,7 +830,9 @@ class TestAuditor(unittest.TestCase):
# create tombstone and hashes.pkl file, ensuring the tombstone is not
# reclaimed by mocking time to be the tombstone time
with mock.patch('time.time', return_value=float(ts_tomb)):
# this delete will create a invalid hashes entry
self.disk_file.delete(ts_tomb)
# this get_hashes call will truncate the invalid hashes entry
self.disk_file.manager.get_hashes(
self.devices + '/sda', '0', [], self.disk_file.policy)
suffix = basename(dirname(self.disk_file._datadir))
@ -839,8 +841,10 @@ class TestAuditor(unittest.TestCase):
self.assertEqual(['%s.ts' % ts_tomb.internal],
os.listdir(self.disk_file._datadir))
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
self.assertFalse(os.path.exists(
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
self.assertTrue(os.path.exists(hash_invalid))
with open(hash_invalid, 'rb') as fp:
self.assertEqual('', fp.read().strip('\n'))
# Run auditor
self.auditor.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
# sanity check - auditor should not remove tombstone file
@ -853,8 +857,10 @@ class TestAuditor(unittest.TestCase):
ts_tomb = Timestamp(time.time() - 55)
part_dir, suffix = self._audit_tombstone(self.conf, ts_tomb)
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
self.assertFalse(os.path.exists(
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
self.assertTrue(os.path.exists(hash_invalid))
with open(hash_invalid, 'rb') as fp:
self.assertEqual('', fp.read().strip('\n'))
def test_reclaimable_tombstone(self):
# audit with a reclaimable tombstone
@ -874,8 +880,10 @@ class TestAuditor(unittest.TestCase):
conf['reclaim_age'] = 2 * 604800
part_dir, suffix = self._audit_tombstone(conf, ts_tomb)
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
self.assertFalse(os.path.exists(
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
self.assertTrue(os.path.exists(hash_invalid))
with open(hash_invalid, 'rb') as fp:
self.assertEqual('', fp.read().strip('\n'))
def test_reclaimable_tombstone_with_custom_reclaim_age(self):
# audit with a tombstone older than custom reclaim age
@ -897,8 +905,10 @@ class TestAuditor(unittest.TestCase):
part_dir, suffix = self._audit_tombstone(
self.conf, ts_tomb, zero_byte_fps=50)
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
self.assertFalse(os.path.exists(
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
self.assertTrue(os.path.exists(hash_invalid))
with open(hash_invalid, 'rb') as fp:
self.assertEqual('', fp.read().strip('\n'))
def _test_expired_object_is_ignored(self, zero_byte_fps):
# verify that an expired object does not get mistaken for a tombstone
@ -910,15 +920,41 @@ class TestAuditor(unittest.TestCase):
extra_metadata={'X-Delete-At': now - 10})
files = os.listdir(self.disk_file._datadir)
self.assertTrue([f for f in files if f.endswith('.data')]) # sanity
# diskfile write appends to invalid hashes file
part_dir = dirname(dirname(self.disk_file._datadir))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
with open(hash_invalid, 'rb') as fp:
self.assertEqual(basename(dirname(self.disk_file._datadir)),
fp.read().strip('\n')) # sanity check
# run the auditor...
with mock.patch.object(auditor, 'dump_recon_cache'):
audit.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
# the auditor doesn't touch anything on the invalidation file
# (i.e. not truncate and add no entry)
with open(hash_invalid, 'rb') as fp:
self.assertEqual(basename(dirname(self.disk_file._datadir)),
fp.read().strip('\n')) # sanity check
# this get_hashes call will truncate the invalid hashes entry
self.disk_file.manager.get_hashes(
self.devices + '/sda', '0', [], self.disk_file.policy)
with open(hash_invalid, 'rb') as fp:
self.assertEqual('', fp.read().strip('\n')) # sanity check
# run the auditor, again...
with mock.patch.object(auditor, 'dump_recon_cache'):
audit.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
# verify nothing changed
self.assertTrue(os.path.exists(self.disk_file._datadir))
part_dir = dirname(dirname(self.disk_file._datadir))
self.assertFalse(os.path.exists(
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
self.assertEqual(files, os.listdir(self.disk_file._datadir))
self.assertFalse(audit.logger.get_lines_for_level('error'))
self.assertFalse(audit.logger.get_lines_for_level('warning'))
# and there was no hash invalidation
with open(hash_invalid, 'rb') as fp:
self.assertEqual('', fp.read().strip('\n'))
def test_expired_object_is_ignored(self):
self._test_expired_object_is_ignored(0)

View File

@ -39,7 +39,8 @@ from gzip import GzipFile
import pyeclib.ec_iface
from eventlet import hubs, timeout, tpool
from swift.obj.diskfile import MD5_OF_EMPTY_STRING, update_auditor_status
from swift.obj.diskfile import (MD5_OF_EMPTY_STRING, update_auditor_status,
write_pickle)
from test.unit import (FakeLogger, mock as unit_mock, temptree,
patch_policies, debug_logger, EMPTY_ETAG,
make_timestamp_iter, DEFAULT_TEST_EC_TYPE,
@ -6057,18 +6058,37 @@ class TestSuffixHashes(unittest.TestCase):
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
policy=policy)
suffix_dir = os.path.dirname(df._datadir)
suffix = os.path.basename(suffix_dir)
part_path = os.path.join(self.devices, 'sda1',
diskfile.get_data_dir(policy), '0')
hashes_file = os.path.join(part_path, diskfile.HASH_FILE)
inv_file = os.path.join(
part_path, diskfile.HASH_INVALIDATIONS_FILE)
self.assertFalse(os.path.exists(hashes_file)) # sanity
with mock.patch('swift.obj.diskfile.lock_path') as mock_lock:
df_mgr.invalidate_hash(suffix_dir)
self.assertFalse(mock_lock.called)
# does not create files
# sanity, new partition has no suffix hashing artifacts
self.assertFalse(os.path.exists(hashes_file))
self.assertFalse(os.path.exists(inv_file))
# invalidating a hash does not create the hashes_file
with mock.patch(
'swift.obj.diskfile.BaseDiskFileManager.invalidate_hash',
side_effect=diskfile.invalidate_hash) \
as mock_invalidate_hash:
df.delete(self.ts())
self.assertFalse(os.path.exists(hashes_file))
# ... but does invalidate the suffix
self.assertEqual([mock.call(suffix_dir)],
mock_invalidate_hash.call_args_list)
with open(inv_file) as f:
self.assertEqual(suffix, f.read().strip('\n'))
# ... and hashing suffixes finds (and hashes) the new suffix
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertIn(suffix, hashes)
self.assertTrue(os.path.exists(hashes_file))
self.assertIn(os.path.basename(suffix_dir), hashes)
with open(hashes_file) as f:
self.assertEqual(hashes, pickle.load(f))
# ... and truncates the invalidations file
with open(inv_file) as f:
self.assertEqual('', f.read().strip('\n'))
def test_invalidate_hash_empty_file_exists(self):
for policy in self.iter_policies():
@ -6125,6 +6145,105 @@ class TestSuffixHashes(unittest.TestCase):
}
self.assertEqual(open_log, expected)
def test_invalidates_hashes_of_new_partition(self):
# a suffix can be changed or created by second process when new pkl
# is calculated - that suffix must be correct on next get_hashes call
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
orig_listdir = os.listdir
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
policy=policy)
suffix = os.path.basename(os.path.dirname(df._datadir))
df2 = self.get_different_suffix_df(df)
suffix2 = os.path.basename(os.path.dirname(df2._datadir))
non_local = {'df2touched': False}
df.delete(self.ts())
def mock_listdir(*args, **kwargs):
# simulating an invalidation occuring in another process while
# get_hashes is executing
result = orig_listdir(*args, **kwargs)
if not non_local['df2touched']:
non_local['df2touched'] = True
# other process creates new suffix
df2.delete(self.ts())
return result
with mock.patch('swift.obj.diskfile.os.listdir',
mock_listdir):
# creates pkl file
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
# second suffix added after directory listing, it's added later
self.assertIn(suffix, hashes)
self.assertNotIn(suffix2, hashes)
# updates pkl file
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertIn(suffix, hashes)
self.assertIn(suffix2, hashes)
@mock.patch('swift.obj.diskfile.getmtime')
@mock.patch('swift.obj.diskfile.write_pickle')
def test_contains_hashes_of_existing_partition(self, mock_write_pickle,
mock_getmtime):
# get_hashes must repeat path listing and return all hashes when
# another concurrent process created new pkl before hashes are stored
# by the first process
non_local = {}
def mock_write_pickle_def(*args, **kwargs):
if 'mtime' not in non_local:
non_local['mtime'] = time()
non_local['mtime'] += 1
write_pickle(*args, **kwargs)
def mock_getmtime_def(filename):
if 'mtime' not in non_local:
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
return non_local['mtime']
mock_write_pickle.side_effect = mock_write_pickle_def
mock_getmtime.side_effect = mock_getmtime_def
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
# force hashes.pkl to exist; when it does not exist that's fine,
# it's just a different race; in that case the invalidation file
# gets appended, but we don't restart hashing suffixes (the
# invalidation get's squashed in and the suffix gets rehashed on
# the next REPLICATE call)
df_mgr.get_hashes('sda1', '0', [], policy)
orig_listdir = os.listdir
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
policy=policy)
suffix = os.path.basename(os.path.dirname(df._datadir))
df2 = self.get_different_suffix_df(df)
suffix2 = os.path.basename(os.path.dirname(df2._datadir))
non_local['df2touched'] = False
df.delete(self.ts())
def mock_listdir(*args, **kwargs):
# simulating hashes.pkl modification by another process while
# get_hashes is executing
# df2 is created to check path hashes recalculation
result = orig_listdir(*args, **kwargs)
if not non_local['df2touched']:
non_local['df2touched'] = True
df2.delete(self.ts())
# simulate pkl update by other process - mtime is updated
self.assertIn('mtime', non_local, "hashes.pkl must exist")
non_local['mtime'] += 1
return result
with mock.patch('swift.obj.diskfile.os.listdir',
mock_listdir):
# creates pkl file and repeats listing when pkl modified
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertIn(suffix, hashes)
self.assertIn(suffix2, hashes)
def test_invalidate_hash_consolidation(self):
def assert_consolidation(suffixes):
# verify that suffixes are invalidated after consolidation
@ -6160,7 +6279,6 @@ class TestSuffixHashes(unittest.TestCase):
part_path, diskfile.HASH_INVALIDATIONS_FILE)
with open(hashes_file, 'rb') as f:
self.assertEqual(original_hashes, pickle.load(f))
self.assertFalse(os.path.exists(invalidations_file))
# invalidate the hash
with mock.patch('swift.obj.diskfile.lock_path') as mock_lock:

View File

@ -6968,9 +6968,12 @@ class TestObjectServer(unittest.TestCase):
self.assertIn(' 499 ', line)
def find_files(self):
ignore_files = {'.lock', 'hashes.invalid'}
found_files = defaultdict(list)
for root, dirs, files in os.walk(self.devices):
for filename in files:
if filename in ignore_files:
continue
_name, ext = os.path.splitext(filename)
file_path = os.path.join(root, filename)
found_files[ext].append(file_path)