Better optimistic lock in get_hashes

mtime and force_rewrite have a *long* tangled history starting back in
lp bug #1089140 that's been carried through many refactors.

Using force_rewrite on errors reading from the pickle has always been a
read-modify-write race; but maybe less bad than the infinite recursion
bug it fixed?

Using getmtime has always had somewhat dubious resolution for race
detection - the only way to be sure the content of the file is the same
as when we read it without locking is to open the file up and check.

Unfortunately, the ondisk data wasn't rich enough to disambiguate when
the ondisk state represented may have changed (e.g. when an invalidation
for a suffix currently being hashed is consolidated, or if all hashes
are invalid like after an error reading the hashes.pkl) - so we also add
a key with a timestamp for race detection and write down if the
dictionary has any valid suffix hashes.

Along the way, we accidentally fix a serious performance regression with
hash invalidations...

We currently rehash all invalid suffixes twice on REPLICATE calls.

First we consolidating hashes, marking all invalid suffixes as None
and then perform the first suffix rehashing.

And then also *every time* one more time again immediately as soon as we
get done with the first one we throw all that work we just did on the
floor and rehash ALL the invalid suffixes *again* a second time because
the race detector erroneously notices the hashes.pkl file has been
"modified while we were hashing".

But we're not in a race.  We took the mtime before calling consolidate
hashes, and consolidate hashes modified the pickle when it wrote back the
invalid suffixes.

FWIW, since consolidate hashes operates under directory lock it can't
race - but we don't want suffix rehashing to hold the directory lock
that long so we use optimistic locking - i.e. we optimistically perform
the rehash w/o a lock and write back the update iif it hasn't changed
since read; if it has we retry the whole operation

UpgradeImpact:

If you upgrade and need to rollback - delete all hashes.pkl:

    rm /srv/node*/*/object*/*/hashes.pkl

Anything of significance achived here was blatently plagerised from the
work of others:

Co-Author: Pavel Kvasnička <pavel.kvasnicka@firma.seznam.cz>
Related-Change-Id: I64cadb1a3feb4d819d545137eecfc295389794f0
Co-Author: Alistair Coles <alistair.coles@hpe.com>
Related-Change-Id: I8f6bb89beaaca3beec2e6063299189f52e9eee51
Related-Change-Id: I08c8cf09282f737103e580c1f57923b399abe58c

Change-Id: Ia43ec2cf7ab715ec37f0044625a10aeb6420f6e3
This commit is contained in:
Clay Gerrard 2017-01-12 22:31:12 -08:00
parent 80f550f80e
commit a1f263c1b4
2 changed files with 431 additions and 103 deletions

View File

@ -31,6 +31,7 @@ are also not considered part of the backend API.
"""
import six.moves.cPickle as pickle
import copy
import errno
import fcntl
import json
@ -41,7 +42,7 @@ import hashlib
import logging
import traceback
import xattr
from os.path import basename, dirname, exists, getmtime, join, splitext
from os.path import basename, dirname, exists, join, splitext
from random import shuffle
from tempfile import mkstemp
from contextlib import contextmanager
@ -228,6 +229,48 @@ def quarantine_renamer(device_path, corrupted_file_path):
return to_dir
def read_hashes(partition_dir):
"""
Read the existing hashes.pkl
:returns: a dict, the suffix hashes (if any), the key 'valid' will be False
if hashes.pkl is corrupt, cannot be read or does not exist
"""
hashes_file = join(partition_dir, HASH_FILE)
hashes = {'valid': False}
try:
with open(hashes_file, 'rb') as hashes_fp:
pickled_hashes = hashes_fp.read()
except (IOError, OSError):
pass
else:
try:
hashes = pickle.loads(pickled_hashes)
except Exception:
# pickle.loads() can raise a wide variety of exceptions when
# given invalid input depending on the way in which the
# input is invalid.
pass
# hashes.pkl w/o valid updated key is "valid" but "forever old"
hashes.setdefault('valid', True)
hashes.setdefault('updated', -1)
return hashes
def write_hashes(partition_dir, hashes):
"""
Write hashes to hashes.pkl
The updated key is added to hashes before it is written.
"""
hashes_file = join(partition_dir, HASH_FILE)
# 'valid' key should always be set by the caller; however, if there's a bug
# setting invalid is most safe
hashes.setdefault('valid', False)
hashes['updated'] = time.time()
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
def consolidate_hashes(partition_dir):
"""
Take what's in hashes.pkl and hashes.invalid, combine them, write the
@ -254,41 +297,23 @@ def consolidate_hashes(partition_dir):
return None
with lock_path(partition_dir):
try:
with open(hashes_file, 'rb') as hashes_fp:
pickled_hashes = hashes_fp.read()
except (IOError, OSError):
hashes = {}
else:
try:
hashes = pickle.loads(pickled_hashes)
except Exception:
# pickle.loads() can raise a wide variety of exceptions when
# given invalid input depending on the way in which the
# input is invalid.
hashes = None
hashes = read_hashes(partition_dir)
modified = False
found_invalidation_entry = False
try:
with open(invalidations_file, 'rb') as inv_fh:
for line in inv_fh:
found_invalidation_entry = True
suffix = line.strip()
if hashes is not None and \
hashes.get(suffix, '') is not None:
hashes[suffix] = None
modified = True
hashes[suffix] = None
except (IOError, OSError) as e:
if e.errno != errno.ENOENT:
raise
if modified:
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
# Now that all the invalidations are reflected in hashes.pkl, it's
# safe to clear out the invalidations file.
if found_invalidation_entry:
write_hashes(partition_dir, hashes)
# Now that all the invalidations are reflected in hashes.pkl, it's
# safe to clear out the invalidations file.
with open(invalidations_file, 'wb') as inv_fh:
pass
@ -997,8 +1022,14 @@ class BaseDiskFileManager(object):
"""
raise NotImplementedError
def _get_hashes(self, partition_path, recalculate=None, do_listdir=False,
reclaim_age=None):
def _get_hashes(self, *args, **kwargs):
hashed, hashes = self.__get_hashes(*args, **kwargs)
hashes.pop('updated', None)
hashes.pop('valid', None)
return hashed, hashes
def __get_hashes(self, partition_path, recalculate=None, do_listdir=False,
reclaim_age=None):
"""
Get a list of hashes for the suffix dir. do_listdir causes it to
mistrust the hash cache for suffix existence at the (unexpectedly high)
@ -1017,31 +1048,39 @@ class BaseDiskFileManager(object):
hashed = 0
hashes_file = join(partition_path, HASH_FILE)
modified = False
force_rewrite = False
hashes = {}
mtime = -1
orig_hashes = {'valid': False}
if recalculate is None:
recalculate = []
try:
mtime = getmtime(hashes_file)
except OSError as e:
if e.errno != errno.ENOENT:
raise
try:
hashes = self.consolidate_hashes(partition_path)
orig_hashes = self.consolidate_hashes(partition_path)
except Exception:
self.logger.warning('Unable to read %r', hashes_file,
exc_info=True)
if orig_hashes is None:
# consolidate_hashes returns None if hashes.pkl does not exist
orig_hashes = {'valid': False}
if not orig_hashes['valid']:
# This is the only path to a valid hashes from invalid read (e.g.
# does not exist, corrupt, etc.). Moreover, in order to write this
# valid hashes we must read *the exact same* invalid state or we'll
# trigger race detection.
do_listdir = True
force_rewrite = True
hashes = {'valid': True}
# If the exception handling around consolidate_hashes fired we're
# going to do a full rehash regardless; but we need to avoid
# needless recursion if the on-disk hashes.pkl is actually readable
# (worst case is consolidate_hashes keeps raising exceptions and we
# eventually run out of stack).
# N.B. orig_hashes invalid only effects new parts and error/edge
# conditions - so try not to get overly caught up trying to
# optimize it out unless you manage to convince yourself there's a
# bad behavior.
orig_hashes = read_hashes(partition_path)
else:
if hashes is None: # no hashes.pkl file; let's build it
do_listdir = True
force_rewrite = True
hashes = {}
hashes = copy.deepcopy(orig_hashes)
if do_listdir:
for suff in os.listdir(partition_path):
@ -1063,13 +1102,11 @@ class BaseDiskFileManager(object):
modified = True
if modified:
with lock_path(partition_path):
if force_rewrite or not exists(hashes_file) or \
getmtime(hashes_file) == mtime:
write_pickle(
hashes, hashes_file, partition_path, PICKLE_PROTOCOL)
if read_hashes(partition_path) == orig_hashes:
write_hashes(partition_path, hashes)
return hashed, hashes
return self._get_hashes(partition_path, recalculate, do_listdir,
reclaim_age)
return self.__get_hashes(partition_path, recalculate, do_listdir,
reclaim_age)
else:
return hashed, hashes

View File

@ -40,8 +40,7 @@ from gzip import GzipFile
import pyeclib.ec_iface
from eventlet import hubs, timeout, tpool
from swift.obj.diskfile import (MD5_OF_EMPTY_STRING, update_auditor_status,
write_pickle)
from swift.obj.diskfile import MD5_OF_EMPTY_STRING, update_auditor_status
from test.unit import (FakeLogger, mock as unit_mock, temptree,
patch_policies, debug_logger, EMPTY_ETAG,
make_timestamp_iter, DEFAULT_TEST_EC_TYPE,
@ -5190,7 +5189,7 @@ class TestSuffixHashes(unittest.TestCase):
filename += '.meta'
return filename
def get_different_suffix_df(self, df):
def get_different_suffix_df(self, df, **kwargs):
# returns diskfile in the same partition with different suffix
suffix_dir = os.path.dirname(df._datadir)
for i in itertools.count():
@ -5200,7 +5199,8 @@ class TestSuffixHashes(unittest.TestCase):
df._account,
df._container,
'o%d' % i,
policy=df.policy)
policy=df.policy,
**kwargs)
suffix_dir2 = os.path.dirname(df2._datadir)
if suffix_dir != suffix_dir2:
return df2
@ -5508,7 +5508,10 @@ class TestSuffixHashes(unittest.TestCase):
self.assertTrue(os.path.exists(hashes_file))
self.assertIn(os.path.basename(suffix_dir), hashes)
with open(hashes_file) as f:
self.assertEqual(hashes, pickle.load(f))
found_hashes = pickle.load(f)
found_hashes.pop('updated')
self.assertTrue(found_hashes.pop('valid'))
self.assertEqual(hashes, found_hashes)
# ... and truncates the invalidations file
with open(inv_file) as f:
self.assertEqual('', f.read().strip('\n'))
@ -5605,29 +5608,11 @@ class TestSuffixHashes(unittest.TestCase):
self.assertIn(suffix, hashes)
self.assertIn(suffix2, hashes)
@mock.patch('swift.obj.diskfile.getmtime')
@mock.patch('swift.obj.diskfile.write_pickle')
def test_contains_hashes_of_existing_partition(self, mock_write_pickle,
mock_getmtime):
def test_hash_invalidations_survive_racing_get_hashes_diff_suffix(self):
# get_hashes must repeat path listing and return all hashes when
# another concurrent process created new pkl before hashes are stored
# by the first process
non_local = {}
def mock_write_pickle_def(*args, **kwargs):
if 'mtime' not in non_local:
non_local['mtime'] = time()
non_local['mtime'] += 1
write_pickle(*args, **kwargs)
def mock_getmtime_def(filename):
if 'mtime' not in non_local:
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
return non_local['mtime']
mock_write_pickle.side_effect = mock_write_pickle_def
mock_getmtime.side_effect = mock_getmtime_def
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
# force hashes.pkl to exist; when it does not exist that's fine,
@ -5654,18 +5639,122 @@ class TestSuffixHashes(unittest.TestCase):
if not non_local['df2touched']:
non_local['df2touched'] = True
df2.delete(self.ts())
# simulate pkl update by other process - mtime is updated
self.assertIn('mtime', non_local, "hashes.pkl must exist")
non_local['mtime'] += 1
return result
with mock.patch('swift.obj.diskfile.os.listdir',
mock_listdir):
# creates pkl file but leaves invalidation alone
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
# suffix2 just sits in the invalidations file
self.assertIn(suffix, hashes)
self.assertNotIn(suffix2, hashes)
# it'll show up next hash
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertIn(suffix, hashes)
self.assertIn(suffix2, hashes)
def test_hash_invalidations_survive_racing_get_hashes_same_suffix(self):
# verify that when two processes concurrently call get_hashes, then any
# concurrent hash invalidation will survive and be consolidated on a
# subsequent call to get_hashes (i.e. ensure first get_hashes process
# does not ignore the concurrent hash invalidation that second
# get_hashes might have consolidated to hashes.pkl)
non_local = {}
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
orig_hash_suffix = df_mgr._hash_suffix
# create hashes.pkl
df_mgr.get_hashes('sda1', '0', [], policy)
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
policy=policy)
suffix_dir = os.path.dirname(df._datadir)
suffix = os.path.basename(suffix_dir)
part_dir = os.path.dirname(suffix_dir)
invalidations_file = os.path.join(
part_dir, diskfile.HASH_INVALIDATIONS_FILE)
non_local['hash'] = None
non_local['called'] = False
# delete will append suffix to hashes.invalid
df.delete(self.ts())
with open(invalidations_file) as f:
self.assertEqual(suffix, f.read().strip('\n')) # sanity
hash1 = df_mgr._hash_suffix(suffix_dir, diskfile.ONE_WEEK)
def mock_hash_suffix(*args, **kwargs):
# after first get_hashes has called _hash_suffix, simulate a
# second process invalidating the same suffix, followed by a
# third process calling get_hashes and failing (or yielding)
# after consolidate_hashes has completed
result = orig_hash_suffix(*args, **kwargs)
if not non_local['called']:
non_local['called'] = True
# appends suffix to hashes.invalid
df.delete(self.ts())
# simulate another process calling get_hashes but failing
# after hash invalidation have been consolidated
hashes = df_mgr.consolidate_hashes(part_dir)
self.assertTrue(hashes['valid'])
# get the updated suffix hash...
non_local['hash'] = orig_hash_suffix(suffix_dir,
diskfile.ONE_WEEK)
return result
with mock.patch.object(df_mgr, '_hash_suffix', mock_hash_suffix):
# creates pkl file and repeats listing when pkl modified
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
# first get_hashes should complete with suffix1 state
self.assertIn(suffix, hashes)
self.assertIn(suffix2, hashes)
# sanity check - the suffix hash has changed...
self.assertNotEqual(hash1, non_local['hash'])
# the invalidation file has been truncated...
with open(invalidations_file, 'r') as f:
self.assertEqual('', f.read())
# so hashes should have the latest suffix hash...
self.assertEqual(hashes[suffix], non_local['hash'])
def _check_unpickle_error_and_get_hashes_failure(self, existing):
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
policy=policy)
suffix = os.path.basename(os.path.dirname(df._datadir))
if existing:
df.delete(self.ts())
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
df.delete(self.ts())
part_path = os.path.join(self.devices, 'sda1',
diskfile.get_data_dir(policy), '0')
hashes_file = os.path.join(part_path, diskfile.HASH_FILE)
# write a corrupt hashes.pkl
open(hashes_file, 'w')
# simulate first call to get_hashes failing after attempting to
# consolidate hashes
with mock.patch('swift.obj.diskfile.os.listdir',
side_effect=Exception()):
self.assertRaises(
Exception, df_mgr.get_hashes, 'sda1', '0', [], policy)
# sanity on-disk state is invalid
with open(hashes_file) as f:
found_hashes = pickle.load(f)
found_hashes.pop('updated')
self.assertEqual(False, found_hashes.pop('valid'))
# verify subsequent call to get_hashes reaches correct outcome
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertIn(suffix, hashes)
self.assertEqual([], df_mgr.logger.get_lines_for_level('warning'))
def test_unpickle_error_and_get_hashes_failure_new_part(self):
self._check_unpickle_error_and_get_hashes_failure(False)
def test_unpickle_error_and_get_hashes_failure_existing_part(self):
self._check_unpickle_error_and_get_hashes_failure(True)
def test_invalidate_hash_consolidation(self):
def assert_consolidation(suffixes):
@ -5677,7 +5766,9 @@ class TestSuffixHashes(unittest.TestCase):
self.assertIn(suffix, hashes)
self.assertIsNone(hashes[suffix])
with open(hashes_file, 'rb') as f:
self.assertEqual(hashes, pickle.load(f))
found_hashes = pickle.load(f)
self.assertTrue(hashes['valid'])
self.assertEqual(hashes, found_hashes)
with open(invalidations_file, 'rb') as f:
self.assertEqual("", f.read())
return hashes
@ -5701,7 +5792,10 @@ class TestSuffixHashes(unittest.TestCase):
invalidations_file = os.path.join(
part_path, diskfile.HASH_INVALIDATIONS_FILE)
with open(hashes_file, 'rb') as f:
self.assertEqual(original_hashes, pickle.load(f))
found_hashes = pickle.load(f)
found_hashes.pop('updated')
self.assertTrue(found_hashes.pop('valid'))
self.assertEqual(original_hashes, found_hashes)
# invalidate the hash
with mock.patch('swift.obj.diskfile.lock_path') as mock_lock:
@ -5712,7 +5806,10 @@ class TestSuffixHashes(unittest.TestCase):
self.assertEqual(suffix + "\n", f.read())
# hashes file is unchanged
with open(hashes_file, 'rb') as f:
self.assertEqual(original_hashes, pickle.load(f))
found_hashes = pickle.load(f)
found_hashes.pop('updated')
self.assertTrue(found_hashes.pop('valid'))
self.assertEqual(original_hashes, found_hashes)
# consolidate the hash and the invalidations
hashes = assert_consolidation([suffix])
@ -5728,7 +5825,9 @@ class TestSuffixHashes(unittest.TestCase):
self.assertEqual(suffix2 + "\n", f.read())
# hashes file is not yet changed
with open(hashes_file, 'rb') as f:
self.assertEqual(hashes, pickle.load(f))
found_hashes = pickle.load(f)
self.assertTrue(hashes['valid'])
self.assertEqual(hashes, found_hashes)
# consolidate hashes
hashes = assert_consolidation([suffix, suffix2])
@ -5741,10 +5840,43 @@ class TestSuffixHashes(unittest.TestCase):
self.assertEqual("%s\n%s\n" % (suffix2, suffix2), f.read())
# hashes file is not yet changed
with open(hashes_file, 'rb') as f:
self.assertEqual(hashes, pickle.load(f))
found_hashes = pickle.load(f)
self.assertTrue(hashes['valid'])
self.assertEqual(hashes, found_hashes)
# consolidate hashes
assert_consolidation([suffix, suffix2])
def test_get_hashes_consolidates_suffix_rehash_once(self):
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
policy=policy)
df.delete(self.ts())
suffix_dir = os.path.dirname(df._datadir)
with mock.patch.object(df_mgr, 'consolidate_hashes',
side_effect=df_mgr.consolidate_hashes
) as mock_consolidate_hashes, \
mock.patch.object(df_mgr, '_hash_suffix',
side_effect=df_mgr._hash_suffix
) as mock_hash_suffix:
# creates pkl file
df_mgr.get_hashes('sda1', '0', [], policy)
mock_consolidate_hashes.assert_called_once()
self.assertEqual([mock.call(suffix_dir, diskfile.ONE_WEEK)],
mock_hash_suffix.call_args_list)
# second object in path
df2 = self.get_different_suffix_df(df)
df2.delete(self.ts())
suffix_dir2 = os.path.dirname(df2._datadir)
mock_consolidate_hashes.reset_mock()
mock_hash_suffix.reset_mock()
# updates pkl file
df_mgr.get_hashes('sda1', '0', [], policy)
mock_consolidate_hashes.assert_called_once()
self.assertEqual([mock.call(suffix_dir2, diskfile.ONE_WEEK)],
mock_hash_suffix.call_args_list)
def test_consolidate_hashes_raises_exception(self):
# verify that if consolidate_hashes raises an exception then suffixes
# are rehashed and a hashes.pkl is written
@ -5771,7 +5903,10 @@ class TestSuffixHashes(unittest.TestCase):
hashes_file = os.path.join(part_path, diskfile.HASH_FILE)
with open(hashes_file, 'rb') as f:
self.assertEqual(hashes, pickle.load(f))
found_hashes = pickle.load(f)
found_hashes.pop('updated')
self.assertTrue(found_hashes.pop('valid'))
self.assertEqual(hashes, found_hashes)
# sanity check log warning
warnings = self.logger.get_lines_for_level('warning')
@ -5790,7 +5925,10 @@ class TestSuffixHashes(unittest.TestCase):
diskfile.get_data_dir(policy), '0')
hashes_file = os.path.join(part_path, diskfile.HASH_FILE)
with open(hashes_file, 'rb') as f:
self.assertEqual(hashes, pickle.load(f))
found_hashes = pickle.load(f)
found_hashes.pop('updated')
self.assertTrue(found_hashes.pop('valid'))
self.assertEqual(hashes, found_hashes)
# invalidate_hash tests - error handling
@ -6812,6 +6950,71 @@ class TestSuffixHashes(unittest.TestCase):
policy)
self.assertEqual(hashes, {})
def _test_get_hashes_race(self, hash_breaking_function):
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c',
'o', policy=policy, frag_index=3)
suffix = os.path.basename(os.path.dirname(df._datadir))
df2 = self.get_different_suffix_df(df, frag_index=5)
suffix2 = os.path.basename(os.path.dirname(df2._datadir))
part_path = os.path.dirname(os.path.dirname(
os.path.join(df._datadir)))
hashfile_path = os.path.join(part_path, diskfile.HASH_FILE)
# create hashes.pkl
hashes = df_mgr.get_hashes(self.existing_device, '0', [],
policy)
self.assertEqual(hashes, {}) # sanity
self.assertTrue(os.path.exists(hashfile_path))
# and optionally tamper with the hashes.pkl...
hash_breaking_function(hashfile_path)
non_local = {'called': False}
orig_hash_suffix = df_mgr._hash_suffix
# then create a suffix
df.delete(self.ts())
def mock_hash_suffix(*args, **kwargs):
# capture first call to mock_hash
if not non_local['called']:
non_local['called'] = True
df2.delete(self.ts())
non_local['other_hashes'] = df_mgr.get_hashes(
self.existing_device, '0', [], policy)
return orig_hash_suffix(*args, **kwargs)
with mock.patch.object(df_mgr, '_hash_suffix', mock_hash_suffix):
hashes = df_mgr.get_hashes(self.existing_device, '0', [],
policy)
self.assertTrue(non_local['called'])
self.assertIn(suffix, hashes)
self.assertIn(suffix2, hashes)
def test_get_hashes_race_invalid_pickle(self):
def hash_breaking_function(hashfile_path):
# create a garbage invalid zero-byte file which can not unpickle
open(hashfile_path, 'w').close()
self._test_get_hashes_race(hash_breaking_function)
def test_get_hashes_race_new_partition(self):
def hash_breaking_function(hashfile_path):
# simulate rebalanced part doing post-rsync REPLICATE
os.unlink(hashfile_path)
part_dir = os.path.dirname(hashfile_path)
os.unlink(os.path.join(part_dir, '.lock'))
# sanity
self.assertEqual([], os.listdir(os.path.dirname(hashfile_path)))
self._test_get_hashes_race(hash_breaking_function)
def test_get_hashes_race_existing_partition(self):
def hash_breaking_function(hashfile_path):
# no-op - simulate ok existing partition
self.assertTrue(os.path.exists(hashfile_path))
self._test_get_hashes_race(hash_breaking_function)
def test_get_hashes_hash_suffix_enotdir(self):
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
@ -6865,37 +7068,125 @@ class TestSuffixHashes(unittest.TestCase):
df_mgr = self.df_router[policy]
# first create an empty pickle
df_mgr.get_hashes(self.existing_device, '0', [], policy)
hashes_file = os.path.join(
self.devices, self.existing_device,
diskfile.get_data_dir(policy), '0', diskfile.HASH_FILE)
mtime = os.path.getmtime(hashes_file)
non_local = {'mtime': mtime}
non_local = {'suffix_count': 1}
calls = []
def mock_getmtime(filename):
t = non_local['mtime']
def mock_read_hashes(filename):
rv = {'%03x' % i: 'fake'
for i in range(non_local['suffix_count'])}
if len(calls) <= 3:
# this will make the *next* call get a slightly
# newer mtime than the last
non_local['mtime'] += 1
# this will make the *next* call get slightly
# different content
non_local['suffix_count'] += 1
# track exactly the value for every return
calls.append(t)
return t
with mock.patch('swift.obj.diskfile.getmtime',
mock_getmtime):
calls.append(dict(rv))
rv['valid'] = True
return rv
with mock.patch('swift.obj.diskfile.read_hashes',
mock_read_hashes):
df_mgr.get_hashes(self.existing_device, '0', ['123'],
policy)
self.assertEqual(calls, [
mtime + 0, # read
mtime + 1, # modified
mtime + 2, # read
mtime + 3, # modifed
mtime + 4, # read
mtime + 4, # not modifed
{'000': 'fake'}, # read
{'000': 'fake', '001': 'fake'}, # modification
{'000': 'fake', '001': 'fake', '002': 'fake'}, # read
{'000': 'fake', '001': 'fake', '002': 'fake',
'003': 'fake'}, # modifed
{'000': 'fake', '001': 'fake', '002': 'fake',
'003': 'fake', '004': 'fake'}, # read
{'000': 'fake', '001': 'fake', '002': 'fake',
'003': 'fake', '004': 'fake'}, # not modifed
])
class TestHashesHelpers(unittest.TestCase):
def setUp(self):
self.testdir = tempfile.mkdtemp()
def tearDown(self):
rmtree(self.testdir, ignore_errors=1)
def test_read_legacy_hashes(self):
hashes = {'stub': 'fake'}
hashes_file = os.path.join(self.testdir, diskfile.HASH_FILE)
with open(hashes_file, 'w') as f:
pickle.dump(hashes, f)
expected = {
'stub': 'fake',
'updated': -1,
'valid': True,
}
self.assertEqual(expected, diskfile.read_hashes(self.testdir))
def test_write_hashes_valid_updated(self):
hashes = {'stub': 'fake', 'valid': True}
now = time()
with mock.patch('swift.obj.diskfile.time.time', return_value=now):
diskfile.write_hashes(self.testdir, hashes)
hashes_file = os.path.join(self.testdir, diskfile.HASH_FILE)
with open(hashes_file) as f:
data = pickle.load(f)
expected = {
'stub': 'fake',
'updated': now,
'valid': True,
}
self.assertEqual(expected, data)
def test_write_hashes_invalid_updated(self):
hashes = {'valid': False}
now = time()
with mock.patch('swift.obj.diskfile.time.time', return_value=now):
diskfile.write_hashes(self.testdir, hashes)
hashes_file = os.path.join(self.testdir, diskfile.HASH_FILE)
with open(hashes_file) as f:
data = pickle.load(f)
expected = {
'updated': now,
'valid': False,
}
self.assertEqual(expected, data)
def test_write_hashes_safe_default(self):
hashes = {}
now = time()
with mock.patch('swift.obj.diskfile.time.time', return_value=now):
diskfile.write_hashes(self.testdir, hashes)
hashes_file = os.path.join(self.testdir, diskfile.HASH_FILE)
with open(hashes_file) as f:
data = pickle.load(f)
expected = {
'updated': now,
'valid': False,
}
self.assertEqual(expected, data)
def test_read_write_valid_hashes_mutation_and_transative_equality(self):
hashes = {'stub': 'fake', 'valid': True}
diskfile.write_hashes(self.testdir, hashes)
# write_hashes mutates the passed in hashes, it adds the updated key
self.assertIn('updated', hashes)
self.assertTrue(hashes['valid'])
result = diskfile.read_hashes(self.testdir)
# unpickling result in a new object
self.assertNotEqual(id(hashes), id(result))
# with the exactly the same value mutation from write_hashes
self.assertEqual(hashes, result)
def test_read_write_invalid_hashes_mutation_and_transative_equality(self):
hashes = {'valid': False}
diskfile.write_hashes(self.testdir, hashes)
# write_hashes mutates the passed in hashes, it adds the updated key
self.assertIn('updated', hashes)
self.assertFalse(hashes['valid'])
result = diskfile.read_hashes(self.testdir)
# unpickling result in a new object
self.assertNotEqual(id(hashes), id(result))
# with the exactly the same value mutation from write_hashes
self.assertEqual(hashes, result)
if __name__ == '__main__':
unittest.main()