Fix race when consolidating new partition

Suffix hash invalidations in hashes.invalid can be lost when two
concurrent calls to get_hashes consolidate the hashes of a new
partition with no hashes.pkl:

- suffix S has been invalidated and is listed in hashes.invalid
- process X calls get_hashes when there is no existing hashes.pkl
- process X removes hashes.invalids file in consolidate_hashes
- process X calculates the hash of suffix S (note, process X has
  not yet written hashes.pkl)
- process Y invalidates suffix S, appends S to hashes.invalid, so the
  hash of suffix S *should* be recalculated at some point
- process Z calls get_hashes->consolidate_hashes, deletes hashes.invalid
  because there is still no hashes.pkl
- process Z fails
- process X writes hashes.pkl with stale hash value for suffix S
- the invalidation of S that was made by process Y is lost

The solution is to never remove hashes.invalid during consolidate_hashes
without first recording any invalid suffixes in hashes and writing hashes
to disk in hashes.pkl. This is already the behaviour when hashes.pkl
exists. The cost of an additional write to hashes.pkl, introduced by this
patch, is only incurred once, when get_hashes first encounters a
partition with no hashes.pkl.

Related-Change: Ia43ec2cf7ab715ec37f0044625a10aeb6420f6e3

Change-Id: I08c8cf09282f737103e580c1f57923b399abe58c
This commit is contained in:
Alistair Coles 2017-01-27 18:26:06 +00:00 committed by Tim Burke
parent a1f263c1b4
commit aad02ad30d
2 changed files with 16 additions and 21 deletions

View File

@ -279,23 +279,11 @@ def consolidate_hashes(partition_dir):
:param suffix_dir: absolute path to partition dir containing hashes.pkl
and hashes.invalid
:returns: the hashes, or None if there's no hashes.pkl.
:returns: a dict, the suffix hashes (if any), the key 'valid' will be False
if hashes.pkl is corrupt, cannot be read or does not exist
"""
hashes_file = join(partition_dir, HASH_FILE)
invalidations_file = join(partition_dir, HASH_INVALIDATIONS_FILE)
if not os.path.exists(hashes_file):
if os.path.exists(invalidations_file):
# no hashes at all -> everything's invalid, so empty the file with
# the invalid suffixes in it, if it exists
try:
with open(invalidations_file, 'wb'):
pass
except OSError as e:
if e.errno != errno.ENOENT:
raise
return None
with lock_path(partition_dir):
hashes = read_hashes(partition_dir)
@ -1059,9 +1047,6 @@ class BaseDiskFileManager(object):
self.logger.warning('Unable to read %r', hashes_file,
exc_info=True)
if orig_hashes is None:
# consolidate_hashes returns None if hashes.pkl does not exist
orig_hashes = {'valid': False}
if not orig_hashes['valid']:
# This is the only path to a valid hashes from invalid read (e.g.
# does not exist, corrupt, etc.). Moreover, in order to write this

View File

@ -5655,7 +5655,7 @@ class TestSuffixHashes(unittest.TestCase):
self.assertIn(suffix, hashes)
self.assertIn(suffix2, hashes)
def test_hash_invalidations_survive_racing_get_hashes_same_suffix(self):
def _check_hash_invalidations_race_get_hashes_same_suffix(self, existing):
# verify that when two processes concurrently call get_hashes, then any
# concurrent hash invalidation will survive and be consolidated on a
# subsequent call to get_hashes (i.e. ensure first get_hashes process
@ -5666,8 +5666,9 @@ class TestSuffixHashes(unittest.TestCase):
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
orig_hash_suffix = df_mgr._hash_suffix
# create hashes.pkl
df_mgr.get_hashes('sda1', '0', [], policy)
if existing:
# create hashes.pkl
df_mgr.get_hashes('sda1', '0', [], policy)
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
policy=policy)
@ -5699,7 +5700,10 @@ class TestSuffixHashes(unittest.TestCase):
# simulate another process calling get_hashes but failing
# after hash invalidation have been consolidated
hashes = df_mgr.consolidate_hashes(part_dir)
self.assertTrue(hashes['valid'])
if existing:
self.assertTrue(hashes['valid'])
else:
self.assertFalse(hashes['valid'])
# get the updated suffix hash...
non_local['hash'] = orig_hash_suffix(suffix_dir,
diskfile.ONE_WEEK)
@ -5719,6 +5723,12 @@ class TestSuffixHashes(unittest.TestCase):
# so hashes should have the latest suffix hash...
self.assertEqual(hashes[suffix], non_local['hash'])
def test_hash_invalidations_race_get_hashes_same_suffix_new(self):
self._check_hash_invalidations_race_get_hashes_same_suffix(False)
def test_hash_invalidations_race_get_hashes_same_suffix_existing(self):
self._check_hash_invalidations_race_get_hashes_same_suffix(True)
def _check_unpickle_error_and_get_hashes_failure(self, existing):
for policy in self.iter_policies():
df_mgr = self.df_router[policy]