Fix inconsistent suffix hashes after ssync of tombstone

Consider two replicas of the same object whose ondisk files
have diverged due to failures:

  A has t2.ts
  B has t1.data, t4.meta

(The DELETE at t2 did not make it to B. The POST at t4 was
rejected by A.)

After ssync replication the two ondisk file sets will not be
consistent:

  A has t2.ts  (ssync cannot POST t4.meta to this node)
  B has t2.ts, t4.meta (ssync should not delete t4.meta,
                        there may be a t3.data somewhere)

Consequenty the two nodes will report different hashes for the
object's suffix, and replication will repeat, always with the
inconsistent outcome. This scenario is reproduced by the probe
test added in this patch.

(Note that rsync replication does result in (t2.ts, t4.meta)
on both nodes.)

The solution is to change the way that suffix hashes are
calculated. Currently the names of *all* files found in each
object dir are added to the hash.  With this patch the
timestamps of only those files that could be used to
construct a valid diskfile are added to the hash. File
extensions are appended to the timestamp so that in most
'normal' situations the result of the hashing is the same
as before this patch. That avoids a storm of hash mismatches
when this patch is deployed in an existing cluster.

In the problem case described above, t4.meta is no longer
added to the hash, since it is not useful for constructing
a diskfile. (Note that t4.meta is not deleted because it
may become useful should a t3.data be replicated in future).

Closes-Bug: 1534276
Change-Id: I99e88b8d5f5d9bc22b42112a99634ba942415e05
This commit is contained in:
Alistair Coles 2016-01-14 18:31:21 +00:00
parent ebe61381c2
commit 2d55960a22
4 changed files with 200 additions and 24 deletions

View File

@ -25,6 +25,7 @@ from time import time
from eventlet import sleep, Timeout
import six
import six.moves.cPickle as pickle
from six.moves.http_client import HTTPException
from swift.common.bufferedhttp import http_connect
@ -496,6 +497,34 @@ def direct_delete_object(node, part, account, container, obj,
node, part, path, resp)
def direct_get_suffix_hashes(node, part, suffixes, conn_timeout=5,
response_timeout=15, headers=None):
"""
Get suffix hashes directly from the object server.
:param node: node dictionary from the ring
:param part: partition the container is on
:param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response
:param headers: dict to be passed into HTTPConnection headers
:returns: dict of suffix hashes
:raises ClientException: HTTP REPLICATE request failed
"""
if headers is None:
headers = {}
path = '/%s' % '-'.join(suffixes)
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'REPLICATE', path, headers=gen_headers(headers))
with Timeout(response_timeout):
resp = conn.getresponse()
if not is_success(resp.status):
raise DirectClientException('Object', 'REPLICATE',
node, part, path, resp)
return pickle.loads(resp.read())
def retry(func, *args, **kwargs):
"""
Helper function to retry a given function a number of times.

View File

@ -678,7 +678,18 @@ class BaseDiskFileManager(object):
return self.cleanup_ondisk_files(
hsh_path, reclaim_age=reclaim_age)['files']
def _hash_suffix_dir(self, path, mapper, reclaim_age):
def _update_suffix_hashes(self, hashes, ondisk_info):
"""
Applies policy specific updates to the given dict of md5 hashes for
the given ondisk_info.
:param hashes: a dict of md5 hashes to be updated
:param ondisk_info: a dict describing the state of ondisk files, as
returned by get_ondisk_files
"""
raise NotImplementedError
def _hash_suffix_dir(self, path, reclaim_age):
"""
:param path: full path to directory
@ -694,7 +705,7 @@ class BaseDiskFileManager(object):
for hsh in path_contents:
hsh_path = join(path, hsh)
try:
files = self.hash_cleanup_listdir(hsh_path, reclaim_age)
ondisk_info = self.cleanup_ondisk_files(hsh_path, reclaim_age)
except OSError as err:
if err.errno == errno.ENOTDIR:
partition_path = dirname(path)
@ -707,14 +718,30 @@ class BaseDiskFileManager(object):
'quar_path': quar_path})
continue
raise
if not files:
if not ondisk_info['files']:
try:
os.rmdir(hsh_path)
except OSError:
pass
for filename in files:
key, value = mapper(filename)
hashes[key].update(value)
continue
# ondisk_info has info dicts containing timestamps for those
# files that could determine the state of the diskfile if it were
# to be opened. We update the suffix hash with the concatenation of
# each file's timestamp and extension. The extension is added to
# guarantee distinct hash values from two object dirs that have
# different file types at the same timestamp(s).
#
# Files that may be in the object dir but would have no effect on
# the state of the diskfile are not used to update the hash.
for key in (k for k in ('meta_info', 'ts_info')
if k in ondisk_info):
info = ondisk_info[key]
hashes[None].update(info['timestamp'].internal + info['ext'])
# delegate to subclass for data file related updates...
self._update_suffix_hashes(hashes, ondisk_info)
try:
os.rmdir(path)
except OSError as e:
@ -2195,6 +2222,20 @@ class DiskFileManager(BaseDiskFileManager):
# set results
results['data_info'] = exts['.data'][0]
def _update_suffix_hashes(self, hashes, ondisk_info):
"""
Applies policy specific updates to the given dict of md5 hashes for
the given ondisk_info.
:param hashes: a dict of md5 hashes to be updated
:param ondisk_info: a dict describing the state of ondisk files, as
returned by get_ondisk_files
"""
if 'data_info' in ondisk_info:
file_info = ondisk_info['data_info']
hashes[None].update(
file_info['timestamp'].internal + file_info['ext'])
def _hash_suffix(self, path, reclaim_age):
"""
Performs reclamation and returns an md5 of all (remaining) files.
@ -2203,9 +2244,9 @@ class DiskFileManager(BaseDiskFileManager):
:param reclaim_age: age in seconds at which to remove tombstones
:raises PathNotDir: if given path is not a valid directory
:raises OSError: for non-ENOTDIR errors
:returns: md5 of files in suffix
"""
mapper = lambda filename: (None, filename)
hashes = self._hash_suffix_dir(path, mapper, reclaim_age)
hashes = self._hash_suffix_dir(path, reclaim_age)
return hashes[None].hexdigest()
@ -2544,28 +2585,41 @@ class ECDiskFileManager(BaseDiskFileManager):
return have_data_file == have_durable
return False
def _update_suffix_hashes(self, hashes, ondisk_info):
"""
Applies policy specific updates to the given dict of md5 hashes for
the given ondisk_info.
The only difference between this method and the replication policy
function is the way that data files update hashes dict. Instead of all
filenames hashed into a single hasher, each data file name will fall
into a bucket keyed by its fragment index.
:param hashes: a dict of md5 hashes to be updated
:param ondisk_info: a dict describing the state of ondisk files, as
returned by get_ondisk_files
"""
for frag_set in ondisk_info['frag_sets'].values():
for file_info in frag_set:
fi = file_info['frag_index']
hashes[fi].update(file_info['timestamp'].internal)
if 'durable_frag_set' in ondisk_info:
file_info = ondisk_info['durable_frag_set'][0]
hashes[None].update(file_info['timestamp'].internal + '.durable')
def _hash_suffix(self, path, reclaim_age):
"""
The only difference between this method and the replication policy
function is the way that files are updated on the returned hash.
Instead of all filenames hashed into a single hasher, each file name
will fall into a bucket either by fragment index for datafiles, or
None (indicating a durable, metadata or tombstone).
Performs reclamation and returns an md5 of all (remaining) files.
:param path: full path to directory
:param reclaim_age: age in seconds at which to remove tombstones
:raises PathNotDir: if given path is not a valid directory
:raises OSError: for non-ENOTDIR errors
:returns: dict of md5 hex digests
"""
# hash_per_fi instead of single hash for whole suffix
# here we flatten out the hashers hexdigest into a dictionary instead
# of just returning the one hexdigest for the whole suffix
def mapper(filename):
info = self.parse_on_disk_filename(filename)
fi = info['frag_index']
if fi is None:
return None, filename
else:
return fi, info['timestamp'].internal
hash_per_fi = self._hash_suffix_dir(path, mapper, reclaim_age)
hash_per_fi = self._hash_suffix_dir(path, reclaim_age)
return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items())

View File

@ -21,12 +21,15 @@ import unittest
import os
import shutil
import uuid
from swift.common.exceptions import DiskFileDeleted
from swift.common.direct_client import direct_get_suffix_hashes
from swift.common.exceptions import DiskFileDeleted
from swift.common.internal_client import UnexpectedResponse
from swift.container.backend import ContainerBroker
from swift.common import internal_client, utils
from swiftclient import client
from swift.common.ring import Ring
from swift.common.utils import Timestamp, get_logger
from swift.common.utils import Timestamp, get_logger, hash_path
from swift.obj.diskfile import DiskFileManager
from swift.common.storage_policy import POLICIES
@ -186,6 +189,20 @@ class Test(ReplProbeTest):
self.container_name,
self.object_name)
def _assert_consistent_suffix_hashes(self):
opart, onodes = self.object_ring.get_nodes(
self.account, self.container_name, self.object_name)
name_hash = hash_path(
self.account, self.container_name, self.object_name)
results = []
for node in onodes:
results.append(
(node,
direct_get_suffix_hashes(node, opart, [name_hash[-3:]])))
for (node, hashes) in results[1:]:
self.assertEqual(results[0][1], hashes,
'Inconsistent suffix hashes found: %s' % results)
def test_object_delete_is_replicated(self):
self.brain.put_container(policy_index=int(self.policy))
# put object
@ -419,6 +436,51 @@ class Test(ReplProbeTest):
self._assert_consistent_object_metadata()
self._assert_consistent_container_dbs()
def test_post_trumped_by_prior_delete(self):
# new metadata and content-type posted to subset of nodes should not
# cause object to persist after replication of an earlier delete on
# other nodes.
self.brain.put_container(policy_index=0)
# incomplete put
self.brain.stop_primary_half()
self._put_object(headers={'Content-Type': 'oldest',
'X-Object-Sysmeta-Test': 'oldest',
'X-Object-Meta-Test': 'oldest'})
self.brain.start_primary_half()
# incomplete put then delete
self.brain.stop_handoff_half()
self._put_object(headers={'Content-Type': 'oldest',
'X-Object-Sysmeta-Test': 'oldest',
'X-Object-Meta-Test': 'oldest'})
self._delete_object()
self.brain.start_handoff_half()
# handoff post
self.brain.stop_primary_half()
self._post_object(headers={'Content-Type': 'newest',
'X-Object-Sysmeta-Test': 'ignored',
'X-Object-Meta-Test': 'newest'})
# check object metadata
metadata = self._get_object_metadata()
self.assertEqual(metadata['x-object-sysmeta-test'], 'oldest')
self.assertEqual(metadata['x-object-meta-test'], 'newest')
self.assertEqual(metadata['content-type'], 'oldest')
self.brain.start_primary_half()
# delete trumps later post
self.get_to_final_state()
# check object is now deleted
self.assertRaises(UnexpectedResponse, self._get_object_metadata)
container_metadata, objs = client.get_container(self.url, self.token,
self.container_name)
self.assertEqual(0, len(objs))
self._assert_consistent_container_dbs()
self._assert_consistent_deleted_object()
self._assert_consistent_suffix_hashes()
if __name__ == "__main__":
unittest.main()

View File

@ -4568,6 +4568,37 @@ class TestSuffixHashes(unittest.TestCase):
}[policy.policy_type]
self.assertEqual(hashes, expected)
def test_hash_suffix_one_tombstone_and_one_meta(self):
# A tombstone plus a newer meta file can happen if a tombstone is
# replicated to a node with a newer meta file but older data file. The
# meta file will be ignored when the diskfile is opened so the
# effective state of the disk files is equivalent to only having the
# tombstone. Replication cannot remove the meta file, and the meta file
# cannot be ssync replicated to a node with only the tombstone, so
# we want the get_hashes result to be the same as if the meta file was
# not there.
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
df = df_mgr.get_diskfile(
'sda1', '0', 'a', 'c', 'o', policy=policy)
suffix = os.path.basename(os.path.dirname(df._datadir))
# write a tombstone
timestamp = self.ts()
df.delete(timestamp)
# write a meta file
df.write_metadata({'X-Timestamp': self.ts().internal})
# sanity check
self.assertEqual(2, len(os.listdir(df._datadir)))
tombstone_hash = md5(timestamp.internal + '.ts').hexdigest()
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
expected = {
REPL_POLICY: {suffix: tombstone_hash},
EC_POLICY: {suffix: {
# fi is None here because we have a tombstone
None: tombstone_hash}},
}[policy.policy_type]
self.assertEqual(hashes, expected)
def test_hash_suffix_one_reclaim_tombstone(self):
for policy in self.iter_policies():
df_mgr = self.df_router[policy]