From a1f263c1b4e8837f930f43fc47deaf15fb02b072 Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Thu, 12 Jan 2017 22:31:12 -0800 Subject: [PATCH] Better optimistic lock in get_hashes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit mtime and force_rewrite have a *long* tangled history starting back in lp bug #1089140 that's been carried through many refactors. Using force_rewrite on errors reading from the pickle has always been a read-modify-write race; but maybe less bad than the infinite recursion bug it fixed? Using getmtime has always had somewhat dubious resolution for race detection - the only way to be sure the content of the file is the same as when we read it without locking is to open the file up and check. Unfortunately, the ondisk data wasn't rich enough to disambiguate when the ondisk state represented may have changed (e.g. when an invalidation for a suffix currently being hashed is consolidated, or if all hashes are invalid like after an error reading the hashes.pkl) - so we also add a key with a timestamp for race detection and write down if the dictionary has any valid suffix hashes. Along the way, we accidentally fix a serious performance regression with hash invalidations... We currently rehash all invalid suffixes twice on REPLICATE calls. First we consolidating hashes, marking all invalid suffixes as None and then perform the first suffix rehashing. And then also *every time* one more time again immediately as soon as we get done with the first one we throw all that work we just did on the floor and rehash ALL the invalid suffixes *again* a second time because the race detector erroneously notices the hashes.pkl file has been "modified while we were hashing". But we're not in a race. We took the mtime before calling consolidate hashes, and consolidate hashes modified the pickle when it wrote back the invalid suffixes. FWIW, since consolidate hashes operates under directory lock it can't race - but we don't want suffix rehashing to hold the directory lock that long so we use optimistic locking - i.e. we optimistically perform the rehash w/o a lock and write back the update iif it hasn't changed since read; if it has we retry the whole operation UpgradeImpact: If you upgrade and need to rollback - delete all hashes.pkl: rm /srv/node*/*/object*/*/hashes.pkl Anything of significance achived here was blatently plagerised from the work of others: Co-Author: Pavel Kvasnička Related-Change-Id: I64cadb1a3feb4d819d545137eecfc295389794f0 Co-Author: Alistair Coles Related-Change-Id: I8f6bb89beaaca3beec2e6063299189f52e9eee51 Related-Change-Id: I08c8cf09282f737103e580c1f57923b399abe58c Change-Id: Ia43ec2cf7ab715ec37f0044625a10aeb6420f6e3 --- swift/obj/diskfile.py | 131 +++++++---- test/unit/obj/test_diskfile.py | 403 ++++++++++++++++++++++++++++----- 2 files changed, 431 insertions(+), 103 deletions(-) diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 2b87fb48d7..a1fc92f9de 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -31,6 +31,7 @@ are also not considered part of the backend API. """ import six.moves.cPickle as pickle +import copy import errno import fcntl import json @@ -41,7 +42,7 @@ import hashlib import logging import traceback import xattr -from os.path import basename, dirname, exists, getmtime, join, splitext +from os.path import basename, dirname, exists, join, splitext from random import shuffle from tempfile import mkstemp from contextlib import contextmanager @@ -228,6 +229,48 @@ def quarantine_renamer(device_path, corrupted_file_path): return to_dir +def read_hashes(partition_dir): + """ + Read the existing hashes.pkl + + :returns: a dict, the suffix hashes (if any), the key 'valid' will be False + if hashes.pkl is corrupt, cannot be read or does not exist + """ + hashes_file = join(partition_dir, HASH_FILE) + hashes = {'valid': False} + try: + with open(hashes_file, 'rb') as hashes_fp: + pickled_hashes = hashes_fp.read() + except (IOError, OSError): + pass + else: + try: + hashes = pickle.loads(pickled_hashes) + except Exception: + # pickle.loads() can raise a wide variety of exceptions when + # given invalid input depending on the way in which the + # input is invalid. + pass + # hashes.pkl w/o valid updated key is "valid" but "forever old" + hashes.setdefault('valid', True) + hashes.setdefault('updated', -1) + return hashes + + +def write_hashes(partition_dir, hashes): + """ + Write hashes to hashes.pkl + + The updated key is added to hashes before it is written. + """ + hashes_file = join(partition_dir, HASH_FILE) + # 'valid' key should always be set by the caller; however, if there's a bug + # setting invalid is most safe + hashes.setdefault('valid', False) + hashes['updated'] = time.time() + write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL) + + def consolidate_hashes(partition_dir): """ Take what's in hashes.pkl and hashes.invalid, combine them, write the @@ -254,41 +297,23 @@ def consolidate_hashes(partition_dir): return None with lock_path(partition_dir): - try: - with open(hashes_file, 'rb') as hashes_fp: - pickled_hashes = hashes_fp.read() - except (IOError, OSError): - hashes = {} - else: - try: - hashes = pickle.loads(pickled_hashes) - except Exception: - # pickle.loads() can raise a wide variety of exceptions when - # given invalid input depending on the way in which the - # input is invalid. - hashes = None + hashes = read_hashes(partition_dir) - modified = False found_invalidation_entry = False try: with open(invalidations_file, 'rb') as inv_fh: for line in inv_fh: found_invalidation_entry = True suffix = line.strip() - if hashes is not None and \ - hashes.get(suffix, '') is not None: - hashes[suffix] = None - modified = True + hashes[suffix] = None except (IOError, OSError) as e: if e.errno != errno.ENOENT: raise - if modified: - write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL) - - # Now that all the invalidations are reflected in hashes.pkl, it's - # safe to clear out the invalidations file. if found_invalidation_entry: + write_hashes(partition_dir, hashes) + # Now that all the invalidations are reflected in hashes.pkl, it's + # safe to clear out the invalidations file. with open(invalidations_file, 'wb') as inv_fh: pass @@ -997,8 +1022,14 @@ class BaseDiskFileManager(object): """ raise NotImplementedError - def _get_hashes(self, partition_path, recalculate=None, do_listdir=False, - reclaim_age=None): + def _get_hashes(self, *args, **kwargs): + hashed, hashes = self.__get_hashes(*args, **kwargs) + hashes.pop('updated', None) + hashes.pop('valid', None) + return hashed, hashes + + def __get_hashes(self, partition_path, recalculate=None, do_listdir=False, + reclaim_age=None): """ Get a list of hashes for the suffix dir. do_listdir causes it to mistrust the hash cache for suffix existence at the (unexpectedly high) @@ -1017,31 +1048,39 @@ class BaseDiskFileManager(object): hashed = 0 hashes_file = join(partition_path, HASH_FILE) modified = False - force_rewrite = False - hashes = {} - mtime = -1 + orig_hashes = {'valid': False} if recalculate is None: recalculate = [] try: - mtime = getmtime(hashes_file) - except OSError as e: - if e.errno != errno.ENOENT: - raise - - try: - hashes = self.consolidate_hashes(partition_path) + orig_hashes = self.consolidate_hashes(partition_path) except Exception: self.logger.warning('Unable to read %r', hashes_file, exc_info=True) + + if orig_hashes is None: + # consolidate_hashes returns None if hashes.pkl does not exist + orig_hashes = {'valid': False} + if not orig_hashes['valid']: + # This is the only path to a valid hashes from invalid read (e.g. + # does not exist, corrupt, etc.). Moreover, in order to write this + # valid hashes we must read *the exact same* invalid state or we'll + # trigger race detection. do_listdir = True - force_rewrite = True + hashes = {'valid': True} + # If the exception handling around consolidate_hashes fired we're + # going to do a full rehash regardless; but we need to avoid + # needless recursion if the on-disk hashes.pkl is actually readable + # (worst case is consolidate_hashes keeps raising exceptions and we + # eventually run out of stack). + # N.B. orig_hashes invalid only effects new parts and error/edge + # conditions - so try not to get overly caught up trying to + # optimize it out unless you manage to convince yourself there's a + # bad behavior. + orig_hashes = read_hashes(partition_path) else: - if hashes is None: # no hashes.pkl file; let's build it - do_listdir = True - force_rewrite = True - hashes = {} + hashes = copy.deepcopy(orig_hashes) if do_listdir: for suff in os.listdir(partition_path): @@ -1063,13 +1102,11 @@ class BaseDiskFileManager(object): modified = True if modified: with lock_path(partition_path): - if force_rewrite or not exists(hashes_file) or \ - getmtime(hashes_file) == mtime: - write_pickle( - hashes, hashes_file, partition_path, PICKLE_PROTOCOL) + if read_hashes(partition_path) == orig_hashes: + write_hashes(partition_path, hashes) return hashed, hashes - return self._get_hashes(partition_path, recalculate, do_listdir, - reclaim_age) + return self.__get_hashes(partition_path, recalculate, do_listdir, + reclaim_age) else: return hashed, hashes diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 86bfbf52a4..22ba9807ad 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -40,8 +40,7 @@ from gzip import GzipFile import pyeclib.ec_iface from eventlet import hubs, timeout, tpool -from swift.obj.diskfile import (MD5_OF_EMPTY_STRING, update_auditor_status, - write_pickle) +from swift.obj.diskfile import MD5_OF_EMPTY_STRING, update_auditor_status from test.unit import (FakeLogger, mock as unit_mock, temptree, patch_policies, debug_logger, EMPTY_ETAG, make_timestamp_iter, DEFAULT_TEST_EC_TYPE, @@ -5190,7 +5189,7 @@ class TestSuffixHashes(unittest.TestCase): filename += '.meta' return filename - def get_different_suffix_df(self, df): + def get_different_suffix_df(self, df, **kwargs): # returns diskfile in the same partition with different suffix suffix_dir = os.path.dirname(df._datadir) for i in itertools.count(): @@ -5200,7 +5199,8 @@ class TestSuffixHashes(unittest.TestCase): df._account, df._container, 'o%d' % i, - policy=df.policy) + policy=df.policy, + **kwargs) suffix_dir2 = os.path.dirname(df2._datadir) if suffix_dir != suffix_dir2: return df2 @@ -5508,7 +5508,10 @@ class TestSuffixHashes(unittest.TestCase): self.assertTrue(os.path.exists(hashes_file)) self.assertIn(os.path.basename(suffix_dir), hashes) with open(hashes_file) as f: - self.assertEqual(hashes, pickle.load(f)) + found_hashes = pickle.load(f) + found_hashes.pop('updated') + self.assertTrue(found_hashes.pop('valid')) + self.assertEqual(hashes, found_hashes) # ... and truncates the invalidations file with open(inv_file) as f: self.assertEqual('', f.read().strip('\n')) @@ -5605,29 +5608,11 @@ class TestSuffixHashes(unittest.TestCase): self.assertIn(suffix, hashes) self.assertIn(suffix2, hashes) - @mock.patch('swift.obj.diskfile.getmtime') - @mock.patch('swift.obj.diskfile.write_pickle') - def test_contains_hashes_of_existing_partition(self, mock_write_pickle, - mock_getmtime): + def test_hash_invalidations_survive_racing_get_hashes_diff_suffix(self): # get_hashes must repeat path listing and return all hashes when # another concurrent process created new pkl before hashes are stored # by the first process non_local = {} - - def mock_write_pickle_def(*args, **kwargs): - if 'mtime' not in non_local: - non_local['mtime'] = time() - non_local['mtime'] += 1 - write_pickle(*args, **kwargs) - - def mock_getmtime_def(filename): - if 'mtime' not in non_local: - raise OSError(errno.ENOENT, os.strerror(errno.ENOENT)) - return non_local['mtime'] - - mock_write_pickle.side_effect = mock_write_pickle_def - mock_getmtime.side_effect = mock_getmtime_def - for policy in self.iter_policies(): df_mgr = self.df_router[policy] # force hashes.pkl to exist; when it does not exist that's fine, @@ -5654,18 +5639,122 @@ class TestSuffixHashes(unittest.TestCase): if not non_local['df2touched']: non_local['df2touched'] = True df2.delete(self.ts()) - # simulate pkl update by other process - mtime is updated - self.assertIn('mtime', non_local, "hashes.pkl must exist") - non_local['mtime'] += 1 return result with mock.patch('swift.obj.diskfile.os.listdir', mock_listdir): + # creates pkl file but leaves invalidation alone + hashes = df_mgr.get_hashes('sda1', '0', [], policy) + + # suffix2 just sits in the invalidations file + self.assertIn(suffix, hashes) + self.assertNotIn(suffix2, hashes) + + # it'll show up next hash + hashes = df_mgr.get_hashes('sda1', '0', [], policy) + self.assertIn(suffix, hashes) + self.assertIn(suffix2, hashes) + + def test_hash_invalidations_survive_racing_get_hashes_same_suffix(self): + # verify that when two processes concurrently call get_hashes, then any + # concurrent hash invalidation will survive and be consolidated on a + # subsequent call to get_hashes (i.e. ensure first get_hashes process + # does not ignore the concurrent hash invalidation that second + # get_hashes might have consolidated to hashes.pkl) + non_local = {} + + for policy in self.iter_policies(): + df_mgr = self.df_router[policy] + orig_hash_suffix = df_mgr._hash_suffix + # create hashes.pkl + df_mgr.get_hashes('sda1', '0', [], policy) + + df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o', + policy=policy) + suffix_dir = os.path.dirname(df._datadir) + suffix = os.path.basename(suffix_dir) + part_dir = os.path.dirname(suffix_dir) + invalidations_file = os.path.join( + part_dir, diskfile.HASH_INVALIDATIONS_FILE) + + non_local['hash'] = None + non_local['called'] = False + + # delete will append suffix to hashes.invalid + df.delete(self.ts()) + with open(invalidations_file) as f: + self.assertEqual(suffix, f.read().strip('\n')) # sanity + hash1 = df_mgr._hash_suffix(suffix_dir, diskfile.ONE_WEEK) + + def mock_hash_suffix(*args, **kwargs): + # after first get_hashes has called _hash_suffix, simulate a + # second process invalidating the same suffix, followed by a + # third process calling get_hashes and failing (or yielding) + # after consolidate_hashes has completed + result = orig_hash_suffix(*args, **kwargs) + if not non_local['called']: + non_local['called'] = True + # appends suffix to hashes.invalid + df.delete(self.ts()) + # simulate another process calling get_hashes but failing + # after hash invalidation have been consolidated + hashes = df_mgr.consolidate_hashes(part_dir) + self.assertTrue(hashes['valid']) + # get the updated suffix hash... + non_local['hash'] = orig_hash_suffix(suffix_dir, + diskfile.ONE_WEEK) + return result + + with mock.patch.object(df_mgr, '_hash_suffix', mock_hash_suffix): # creates pkl file and repeats listing when pkl modified hashes = df_mgr.get_hashes('sda1', '0', [], policy) + # first get_hashes should complete with suffix1 state self.assertIn(suffix, hashes) - self.assertIn(suffix2, hashes) + # sanity check - the suffix hash has changed... + self.assertNotEqual(hash1, non_local['hash']) + # the invalidation file has been truncated... + with open(invalidations_file, 'r') as f: + self.assertEqual('', f.read()) + # so hashes should have the latest suffix hash... + self.assertEqual(hashes[suffix], non_local['hash']) + + def _check_unpickle_error_and_get_hashes_failure(self, existing): + for policy in self.iter_policies(): + df_mgr = self.df_router[policy] + df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o', + policy=policy) + suffix = os.path.basename(os.path.dirname(df._datadir)) + if existing: + df.delete(self.ts()) + hashes = df_mgr.get_hashes('sda1', '0', [], policy) + df.delete(self.ts()) + part_path = os.path.join(self.devices, 'sda1', + diskfile.get_data_dir(policy), '0') + hashes_file = os.path.join(part_path, diskfile.HASH_FILE) + # write a corrupt hashes.pkl + open(hashes_file, 'w') + # simulate first call to get_hashes failing after attempting to + # consolidate hashes + with mock.patch('swift.obj.diskfile.os.listdir', + side_effect=Exception()): + self.assertRaises( + Exception, df_mgr.get_hashes, 'sda1', '0', [], policy) + # sanity on-disk state is invalid + with open(hashes_file) as f: + found_hashes = pickle.load(f) + found_hashes.pop('updated') + self.assertEqual(False, found_hashes.pop('valid')) + # verify subsequent call to get_hashes reaches correct outcome + hashes = df_mgr.get_hashes('sda1', '0', [], policy) + self.assertIn(suffix, hashes) + self.assertEqual([], df_mgr.logger.get_lines_for_level('warning')) + + def test_unpickle_error_and_get_hashes_failure_new_part(self): + self._check_unpickle_error_and_get_hashes_failure(False) + + def test_unpickle_error_and_get_hashes_failure_existing_part(self): + self._check_unpickle_error_and_get_hashes_failure(True) def test_invalidate_hash_consolidation(self): def assert_consolidation(suffixes): @@ -5677,7 +5766,9 @@ class TestSuffixHashes(unittest.TestCase): self.assertIn(suffix, hashes) self.assertIsNone(hashes[suffix]) with open(hashes_file, 'rb') as f: - self.assertEqual(hashes, pickle.load(f)) + found_hashes = pickle.load(f) + self.assertTrue(hashes['valid']) + self.assertEqual(hashes, found_hashes) with open(invalidations_file, 'rb') as f: self.assertEqual("", f.read()) return hashes @@ -5701,7 +5792,10 @@ class TestSuffixHashes(unittest.TestCase): invalidations_file = os.path.join( part_path, diskfile.HASH_INVALIDATIONS_FILE) with open(hashes_file, 'rb') as f: - self.assertEqual(original_hashes, pickle.load(f)) + found_hashes = pickle.load(f) + found_hashes.pop('updated') + self.assertTrue(found_hashes.pop('valid')) + self.assertEqual(original_hashes, found_hashes) # invalidate the hash with mock.patch('swift.obj.diskfile.lock_path') as mock_lock: @@ -5712,7 +5806,10 @@ class TestSuffixHashes(unittest.TestCase): self.assertEqual(suffix + "\n", f.read()) # hashes file is unchanged with open(hashes_file, 'rb') as f: - self.assertEqual(original_hashes, pickle.load(f)) + found_hashes = pickle.load(f) + found_hashes.pop('updated') + self.assertTrue(found_hashes.pop('valid')) + self.assertEqual(original_hashes, found_hashes) # consolidate the hash and the invalidations hashes = assert_consolidation([suffix]) @@ -5728,7 +5825,9 @@ class TestSuffixHashes(unittest.TestCase): self.assertEqual(suffix2 + "\n", f.read()) # hashes file is not yet changed with open(hashes_file, 'rb') as f: - self.assertEqual(hashes, pickle.load(f)) + found_hashes = pickle.load(f) + self.assertTrue(hashes['valid']) + self.assertEqual(hashes, found_hashes) # consolidate hashes hashes = assert_consolidation([suffix, suffix2]) @@ -5741,10 +5840,43 @@ class TestSuffixHashes(unittest.TestCase): self.assertEqual("%s\n%s\n" % (suffix2, suffix2), f.read()) # hashes file is not yet changed with open(hashes_file, 'rb') as f: - self.assertEqual(hashes, pickle.load(f)) + found_hashes = pickle.load(f) + self.assertTrue(hashes['valid']) + self.assertEqual(hashes, found_hashes) # consolidate hashes assert_consolidation([suffix, suffix2]) + def test_get_hashes_consolidates_suffix_rehash_once(self): + for policy in self.iter_policies(): + df_mgr = self.df_router[policy] + df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o', + policy=policy) + df.delete(self.ts()) + suffix_dir = os.path.dirname(df._datadir) + + with mock.patch.object(df_mgr, 'consolidate_hashes', + side_effect=df_mgr.consolidate_hashes + ) as mock_consolidate_hashes, \ + mock.patch.object(df_mgr, '_hash_suffix', + side_effect=df_mgr._hash_suffix + ) as mock_hash_suffix: + # creates pkl file + df_mgr.get_hashes('sda1', '0', [], policy) + mock_consolidate_hashes.assert_called_once() + self.assertEqual([mock.call(suffix_dir, diskfile.ONE_WEEK)], + mock_hash_suffix.call_args_list) + # second object in path + df2 = self.get_different_suffix_df(df) + df2.delete(self.ts()) + suffix_dir2 = os.path.dirname(df2._datadir) + mock_consolidate_hashes.reset_mock() + mock_hash_suffix.reset_mock() + # updates pkl file + df_mgr.get_hashes('sda1', '0', [], policy) + mock_consolidate_hashes.assert_called_once() + self.assertEqual([mock.call(suffix_dir2, diskfile.ONE_WEEK)], + mock_hash_suffix.call_args_list) + def test_consolidate_hashes_raises_exception(self): # verify that if consolidate_hashes raises an exception then suffixes # are rehashed and a hashes.pkl is written @@ -5771,7 +5903,10 @@ class TestSuffixHashes(unittest.TestCase): hashes_file = os.path.join(part_path, diskfile.HASH_FILE) with open(hashes_file, 'rb') as f: - self.assertEqual(hashes, pickle.load(f)) + found_hashes = pickle.load(f) + found_hashes.pop('updated') + self.assertTrue(found_hashes.pop('valid')) + self.assertEqual(hashes, found_hashes) # sanity check log warning warnings = self.logger.get_lines_for_level('warning') @@ -5790,7 +5925,10 @@ class TestSuffixHashes(unittest.TestCase): diskfile.get_data_dir(policy), '0') hashes_file = os.path.join(part_path, diskfile.HASH_FILE) with open(hashes_file, 'rb') as f: - self.assertEqual(hashes, pickle.load(f)) + found_hashes = pickle.load(f) + found_hashes.pop('updated') + self.assertTrue(found_hashes.pop('valid')) + self.assertEqual(hashes, found_hashes) # invalidate_hash tests - error handling @@ -6812,6 +6950,71 @@ class TestSuffixHashes(unittest.TestCase): policy) self.assertEqual(hashes, {}) + def _test_get_hashes_race(self, hash_breaking_function): + for policy in self.iter_policies(): + df_mgr = self.df_router[policy] + + df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', + 'o', policy=policy, frag_index=3) + suffix = os.path.basename(os.path.dirname(df._datadir)) + + df2 = self.get_different_suffix_df(df, frag_index=5) + suffix2 = os.path.basename(os.path.dirname(df2._datadir)) + part_path = os.path.dirname(os.path.dirname( + os.path.join(df._datadir))) + hashfile_path = os.path.join(part_path, diskfile.HASH_FILE) + # create hashes.pkl + hashes = df_mgr.get_hashes(self.existing_device, '0', [], + policy) + self.assertEqual(hashes, {}) # sanity + self.assertTrue(os.path.exists(hashfile_path)) + # and optionally tamper with the hashes.pkl... + hash_breaking_function(hashfile_path) + non_local = {'called': False} + orig_hash_suffix = df_mgr._hash_suffix + + # then create a suffix + df.delete(self.ts()) + + def mock_hash_suffix(*args, **kwargs): + # capture first call to mock_hash + if not non_local['called']: + non_local['called'] = True + df2.delete(self.ts()) + non_local['other_hashes'] = df_mgr.get_hashes( + self.existing_device, '0', [], policy) + return orig_hash_suffix(*args, **kwargs) + + with mock.patch.object(df_mgr, '_hash_suffix', mock_hash_suffix): + hashes = df_mgr.get_hashes(self.existing_device, '0', [], + policy) + + self.assertTrue(non_local['called']) + self.assertIn(suffix, hashes) + self.assertIn(suffix2, hashes) + + def test_get_hashes_race_invalid_pickle(self): + def hash_breaking_function(hashfile_path): + # create a garbage invalid zero-byte file which can not unpickle + open(hashfile_path, 'w').close() + self._test_get_hashes_race(hash_breaking_function) + + def test_get_hashes_race_new_partition(self): + def hash_breaking_function(hashfile_path): + # simulate rebalanced part doing post-rsync REPLICATE + os.unlink(hashfile_path) + part_dir = os.path.dirname(hashfile_path) + os.unlink(os.path.join(part_dir, '.lock')) + # sanity + self.assertEqual([], os.listdir(os.path.dirname(hashfile_path))) + self._test_get_hashes_race(hash_breaking_function) + + def test_get_hashes_race_existing_partition(self): + def hash_breaking_function(hashfile_path): + # no-op - simulate ok existing partition + self.assertTrue(os.path.exists(hashfile_path)) + self._test_get_hashes_race(hash_breaking_function) + def test_get_hashes_hash_suffix_enotdir(self): for policy in self.iter_policies(): df_mgr = self.df_router[policy] @@ -6865,37 +7068,125 @@ class TestSuffixHashes(unittest.TestCase): df_mgr = self.df_router[policy] # first create an empty pickle df_mgr.get_hashes(self.existing_device, '0', [], policy) - hashes_file = os.path.join( - self.devices, self.existing_device, - diskfile.get_data_dir(policy), '0', diskfile.HASH_FILE) - mtime = os.path.getmtime(hashes_file) - non_local = {'mtime': mtime} - + non_local = {'suffix_count': 1} calls = [] - def mock_getmtime(filename): - t = non_local['mtime'] + def mock_read_hashes(filename): + rv = {'%03x' % i: 'fake' + for i in range(non_local['suffix_count'])} if len(calls) <= 3: - # this will make the *next* call get a slightly - # newer mtime than the last - non_local['mtime'] += 1 + # this will make the *next* call get slightly + # different content + non_local['suffix_count'] += 1 # track exactly the value for every return - calls.append(t) - return t - with mock.patch('swift.obj.diskfile.getmtime', - mock_getmtime): + calls.append(dict(rv)) + rv['valid'] = True + return rv + with mock.patch('swift.obj.diskfile.read_hashes', + mock_read_hashes): df_mgr.get_hashes(self.existing_device, '0', ['123'], policy) self.assertEqual(calls, [ - mtime + 0, # read - mtime + 1, # modified - mtime + 2, # read - mtime + 3, # modifed - mtime + 4, # read - mtime + 4, # not modifed + {'000': 'fake'}, # read + {'000': 'fake', '001': 'fake'}, # modification + {'000': 'fake', '001': 'fake', '002': 'fake'}, # read + {'000': 'fake', '001': 'fake', '002': 'fake', + '003': 'fake'}, # modifed + {'000': 'fake', '001': 'fake', '002': 'fake', + '003': 'fake', '004': 'fake'}, # read + {'000': 'fake', '001': 'fake', '002': 'fake', + '003': 'fake', '004': 'fake'}, # not modifed ]) +class TestHashesHelpers(unittest.TestCase): + + def setUp(self): + self.testdir = tempfile.mkdtemp() + + def tearDown(self): + rmtree(self.testdir, ignore_errors=1) + + def test_read_legacy_hashes(self): + hashes = {'stub': 'fake'} + hashes_file = os.path.join(self.testdir, diskfile.HASH_FILE) + with open(hashes_file, 'w') as f: + pickle.dump(hashes, f) + expected = { + 'stub': 'fake', + 'updated': -1, + 'valid': True, + } + self.assertEqual(expected, diskfile.read_hashes(self.testdir)) + + def test_write_hashes_valid_updated(self): + hashes = {'stub': 'fake', 'valid': True} + now = time() + with mock.patch('swift.obj.diskfile.time.time', return_value=now): + diskfile.write_hashes(self.testdir, hashes) + hashes_file = os.path.join(self.testdir, diskfile.HASH_FILE) + with open(hashes_file) as f: + data = pickle.load(f) + expected = { + 'stub': 'fake', + 'updated': now, + 'valid': True, + } + self.assertEqual(expected, data) + + def test_write_hashes_invalid_updated(self): + hashes = {'valid': False} + now = time() + with mock.patch('swift.obj.diskfile.time.time', return_value=now): + diskfile.write_hashes(self.testdir, hashes) + hashes_file = os.path.join(self.testdir, diskfile.HASH_FILE) + with open(hashes_file) as f: + data = pickle.load(f) + expected = { + 'updated': now, + 'valid': False, + } + self.assertEqual(expected, data) + + def test_write_hashes_safe_default(self): + hashes = {} + now = time() + with mock.patch('swift.obj.diskfile.time.time', return_value=now): + diskfile.write_hashes(self.testdir, hashes) + hashes_file = os.path.join(self.testdir, diskfile.HASH_FILE) + with open(hashes_file) as f: + data = pickle.load(f) + expected = { + 'updated': now, + 'valid': False, + } + self.assertEqual(expected, data) + + def test_read_write_valid_hashes_mutation_and_transative_equality(self): + hashes = {'stub': 'fake', 'valid': True} + diskfile.write_hashes(self.testdir, hashes) + # write_hashes mutates the passed in hashes, it adds the updated key + self.assertIn('updated', hashes) + self.assertTrue(hashes['valid']) + result = diskfile.read_hashes(self.testdir) + # unpickling result in a new object + self.assertNotEqual(id(hashes), id(result)) + # with the exactly the same value mutation from write_hashes + self.assertEqual(hashes, result) + + def test_read_write_invalid_hashes_mutation_and_transative_equality(self): + hashes = {'valid': False} + diskfile.write_hashes(self.testdir, hashes) + # write_hashes mutates the passed in hashes, it adds the updated key + self.assertIn('updated', hashes) + self.assertFalse(hashes['valid']) + result = diskfile.read_hashes(self.testdir) + # unpickling result in a new object + self.assertNotEqual(id(hashes), id(result)) + # with the exactly the same value mutation from write_hashes + self.assertEqual(hashes, result) + + if __name__ == '__main__': unittest.main()