Merge "Use hashes.pkl files for LOSF and remove list_partition_recursive()" into feature/losf

This commit is contained in:
Zuul 2020-04-09 18:13:34 +00:00 committed by Gerrit Code Review
commit 3dbcbee0e9
5 changed files with 96 additions and 357 deletions

View File

@ -1045,88 +1045,6 @@ func ListPartitions(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, erro
return serializePb(response)
}
// ListPartitionRecursive returns a list of files with structured path info (suffix, object hash) within a partition
// The response should really be streamed, but that makes eventlet hang on the python side...
// This is used to optimize REPLICATE on the object server.
func ListPartitionRecursive(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
in := &pb.ListPartitionInfo{}
if err := proto.Unmarshal(*pbIn, in); err != nil {
logrus.Errorf("failed to unmarshal input: %v", err)
return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
}
reqlog := log.WithFields(logrus.Fields{
"Function": "ListPartitionRecursive",
"Partition": in.Partition,
"PartitionBits": in.PartitionBits,
})
reqlog.Debug("RPC Call")
if !s.isClean {
reqlog.Debug("KV out of sync with volumes")
return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
}
// Partition bits
pBits := int(in.PartitionBits)
partition := uint64(in.Partition)
firstKey, err := getEncodedObjPrefixFromPartition(partition, pBits)
if err != nil {
s.statsd_c.Increment("list_partition_recursive.fail")
return nil, status.Errorf(codes.Internal, "failed to calculate encoded object prefix from partition")
}
// Seek to first key in partition, if any
it := s.kv.NewIterator(objectPrefix)
defer it.Close()
response := &pb.PartitionContent{}
it.Seek(firstKey)
// No object in the KV
if !it.Valid() {
s.statsd_c.Increment("list_partition_recursive.ok")
return serializePb(response)
}
key := make([]byte, 32+len(it.Key()[16:]))
err = DecodeObjectKey(it.Key(), key)
if err != nil {
reqlog.Errorf("failed to decode object key: %v", err)
s.statsd_c.Increment("load_objects_by_prefix.fail")
return nil, status.Errorf(codes.Internal, "unable to decode object key")
}
currentPartition, err := getPartitionFromOhash(key, pBits)
if err != nil {
s.statsd_c.Increment("list_partition_recursive.fail")
return nil, status.Errorf(codes.Internal, "unable to extract partition from object hash")
}
// Iterate over all files within the partition
for currentPartition == partition {
entry := &pb.FullPathEntry{Suffix: key[29:32], Ohash: key[:32], Filename: key[32:]}
response.FileEntries = append(response.FileEntries, entry)
it.Next()
// Check if we're at the end of the KV
if !it.Valid() {
break
}
key = make([]byte, 32+len(it.Key()[16:]))
err = DecodeObjectKey(it.Key(), key)
if err != nil {
reqlog.Errorf("failed to decode object key: %v", err)
s.statsd_c.Increment("load_objects_by_prefix.fail")
return nil, status.Errorf(codes.Internal, "unable to decode object key")
}
currentPartition, err = getPartitionFromOhash(key, pBits)
}
s.statsd_c.Increment("list_partition_recursive.ok")
return serializePb(response)
}
// ListPartition returns a list of suffixes within a partition
func ListPartition(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
in := &pb.ListPartitionInfo{}
@ -1650,7 +1568,6 @@ var strToFunc = map[string]rpcFunc{
"/load_objects_by_prefix": LoadObjectsByPrefix,
"/load_objects_by_volume": LoadObjectsByVolume,
"/list_partitions": ListPartitions,
"/list_partition_recursive": ListPartitionRecursive,
"/list_partition": ListPartition,
"/list_suffix": ListSuffix,
"/list_quarantined_ohashes": ListQuarantinedOHashes,

View File

@ -340,47 +340,6 @@ func TestListPartitions(t *testing.T) {
}
}
func TestListPartitionRecursive(t *testing.T) {
partition := uint32(428)
partPower := uint32(10)
lpInfo := &pb.ListPartitionInfo{Partition: partition, PartitionBits: partPower}
out, err := proto.Marshal(lpInfo)
if err != nil {
t.Error("failed to marshal")
}
body := bytes.NewReader(out)
expEntries := []pb.FullPathEntry{
{Suffix: []byte("845"), Ohash: []byte("6b08eabf5667557c72dc6570aa1fb845"), Filename: []byte("1515750801.08639#4#d.data")},
{Suffix: []byte("845"), Ohash: []byte("6b08eabf5667557c72dc6570aa1fb845"), Filename: []byte("1515750856.77219.meta")},
{Suffix: []byte("845"), Ohash: []byte("6b08eabf5667557c72dc6570abcfb845"), Filename: []byte("1515643210.72429#4#d.data")},
}
response, err := client.Post("http://unix/list_partition_recursive", "application/octet-stream", body)
if err = check_200(response, err); err != nil {
t.Fatalf("RPC call failed: %v", err)
}
defer response.Body.Close()
r := &pb.PartitionContent{}
buf := new(bytes.Buffer)
buf.ReadFrom(response.Body)
if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
t.Error("failed to unmarshal")
}
if len(r.FileEntries) != len(expEntries) {
t.Fatalf("\ngot: %v\nwant: %v", r.FileEntries, expEntries)
}
for i, e := range r.FileEntries {
if !bytes.Equal(e.Suffix, expEntries[i].Suffix) || !bytes.Equal(e.Ohash, expEntries[i].Ohash) || !bytes.Equal(e.Filename, expEntries[i].Filename) {
t.Fatalf("checking individual elements\ngot: %v\nwant: %v", r.FileEntries, expEntries)
}
}
}
// TODO: add more tests, have a suffix with multiple entries
func TestListSuffix(t *testing.T) {
partition := uint32(428)

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import errno
import os
import time
@ -20,7 +21,7 @@ import json
from hashlib import md5
import logging
import traceback
from os.path import basename, dirname, join, split
from os.path import basename, dirname, join
from random import shuffle
from contextlib import contextmanager
from collections import defaultdict
@ -32,7 +33,7 @@ from swift import gettext_ as _
from swift.common.constraints import check_drive
from swift.common.request_helpers import is_sys_meta
from swift.common.utils import fdatasync, \
config_true_value, listdir, split_path
config_true_value, listdir, split_path, lock_path
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
DiskFileError, PathNotDir, \
@ -48,15 +49,8 @@ from swift.obj.diskfile import BaseDiskFileManager, DiskFileManager, \
BaseDiskFileReader, BaseDiskFileWriter, ECDiskFile, ECDiskFileReader, \
ECDiskFileWriter, AuditLocation, RESERVED_DATAFILE_META, \
DATAFILE_SYSTEM_META, strip_self, DEFAULT_RECLAIM_AGE, _encode_metadata, \
get_part_path, get_data_dir, update_auditor_status, extract_policy
def invalidate_hash(suffix_dir):
pass
def consolidate_hashes(partition_dir):
pass
get_part_path, get_data_dir, update_auditor_status, extract_policy, \
HASH_FILE, read_hashes, write_hashes, consolidate_hashes, invalidate_hash
def quarantine_vrenamer(device_path, corrupted_file_path):
@ -739,14 +733,12 @@ class BaseKVFileManager(BaseDiskFileManager):
self.use_splice = False
self.pipe_size = None
# This should not be duplicated (see todo below). except for cached_..
def cleanup_ondisk_files(self, hsh_path, **kwargs):
"""
Clean up on-disk files that are obsolete and gather the set of valid
on-disk files for an object.
:param hsh_path: object hash path
:param reclaim_age: age in seconds at which to remove tombstones
:param frag_index: if set, search for a specific fragment index .data
file, otherwise accept the first valid .data file
:returns: a dict that may contain: valid on disk files keyed by their
@ -758,14 +750,7 @@ class BaseKVFileManager(BaseDiskFileManager):
def is_reclaimable(timestamp):
return (time.time() - float(timestamp)) > self.reclaim_age
cached_partition_tree = kwargs.get('cached_partition_tree')
if cached_partition_tree is not None:
(suff_path, hsh) = split(hsh_path)
suffix = split(suff_path)[1]
files = cached_partition_tree[suffix][hsh]
else:
files = vfile.listdir(hsh_path)
files = vfile.listdir(hsh_path)
files.sort(reverse=True)
results = self.get_ondisk_files(
@ -775,12 +760,14 @@ class BaseKVFileManager(BaseDiskFileManager):
remove_vfile(join(hsh_path, results['ts_info']['filename']))
files.remove(results.pop('ts_info')['filename'])
for file_info in results.get('possible_reclaim', []):
# stray files are not deleted until reclaim-age
if is_reclaimable(file_info['timestamp']):
results.setdefault('obsolete', []).append(file_info)
for file_info in results.get('obsolete', []):
remove_vfile(join(hsh_path, file_info['filename']))
files.remove(file_info['filename'])
results['files'] = files
return results
def object_audit_location_generator(self, policy, device_dirs=None,
@ -798,35 +785,52 @@ class BaseKVFileManager(BaseDiskFileManager):
self.logger, device_dirs,
auditor_type)
def _hash_suffix_dir(self, path, cached_partition_tree=None):
def _hash_suffix_dir(self, path, policy):
"""
:param path: full path to directory
:param reclaim_age: age in seconds at which to remove tombstones
:param policy: storage policy used
"""
hashes = defaultdict(md5)
if cached_partition_tree:
suffix = split(path)[1]
path_contents = sorted(cached_partition_tree[suffix].keys())
if six.PY2:
hashes = defaultdict(md5)
else:
try:
path_contents = sorted(vfile.listdir(path))
except OSError as err:
if err.errno in (errno.ENOTDIR, errno.ENOENT):
raise PathNotDir()
raise
class shim(object):
def __init__(self):
self.md5 = md5()
def update(self, s):
if isinstance(s, str):
self.md5.update(s.encode('utf-8'))
else:
self.md5.update(s)
def hexdigest(self):
return self.md5.hexdigest()
hashes = defaultdict(shim)
try:
path_contents = sorted(vfile.listdir(path))
except OSError as err:
if err.errno in (errno.ENOTDIR, errno.ENOENT):
raise PathNotDir()
raise
for hsh in path_contents:
hsh_path = join(path, hsh)
hsh_path = os.path.join(path, hsh)
try:
ondisk_info = self.cleanup_ondisk_files(
hsh_path,
cached_partition_tree=cached_partition_tree)
hsh_path, policy=policy)
except OSError as err:
if err.errno == errno.ENOTDIR:
partition_path = dirname(path)
objects_path = dirname(partition_path)
device_path = dirname(objects_path)
quar_path = quarantine_vrenamer(device_path, hsh_path)
partition_path = os.path.dirname(path)
objects_path = os.path.dirname(partition_path)
device_path = os.path.dirname(objects_path)
# The made-up filename is so that the eventual dirpath()
# will result in this object directory that we care about.
# Some failures will result in an object directory
# becoming a file, thus causing the parent directory to
# be qarantined.
quar_path = quarantine_vrenamer(
device_path, os.path.join(
hsh_path, "made-up-filename"))
logging.exception(
_('Quarantined %(hsh_path)s to %(quar_path)s because '
'it is not a directory'), {'hsh_path': hsh_path,
@ -834,14 +838,6 @@ class BaseKVFileManager(BaseDiskFileManager):
continue
raise
if not ondisk_info['files']:
try:
# It should not be possible for us to return an empty hash
# path (the ohash is derived from the KV key which is
# ohash + filename)
# os.rmdir(hsh_path)
pass
except OSError:
pass
continue
# ondisk_info has info dicts containing timestamps for those
@ -860,6 +856,7 @@ class BaseKVFileManager(BaseDiskFileManager):
# delegate to subclass for data file related updates...
self._update_suffix_hashes(hashes, ondisk_info)
if 'ctype_info' in ondisk_info:
# We have a distinct content-type timestamp so update the
# hash. As a precaution, append '_ctype' to differentiate this
@ -870,19 +867,6 @@ class BaseKVFileManager(BaseDiskFileManager):
hashes[None].update(info['ctype_timestamp'].internal
+ '_ctype')
try:
# it should not be possible for us to return an empty suffix
# directory (suffixes computed from existing objects)
# os.rmdir(path)
pass
except OSError as e:
if e.errno == errno.ENOENT:
raise PathNotDir()
else:
# if we remove it, pretend like it wasn't there to begin with so
# that the suffix key gets removed
# raise PathNotDir()
pass
return hashes
def _get_hashes(self, *args, **kwargs):
@ -892,13 +876,15 @@ class BaseKVFileManager(BaseDiskFileManager):
return hashed, hashes
def __get_hashes(self, device, partition, policy, recalculate=None,
do_listdir=None):
do_listdir=False):
"""
Get hashes for each suffix dir in a partition. do_listdir causes it to
mistrust the hash cache for suffix existence at the (unexpectedly high)
cost of a listdir.
:param partition_path: absolute path of partition to get hashes for
:param device: name of target device
:param partition: partition on the device in which the object lives
:param policy: the StoragePolicy instance
:param recalculate: list of suffixes which should be recalculated when
got
:param do_listdir: force existence check for all hashes in the
@ -907,62 +893,69 @@ class BaseKVFileManager(BaseDiskFileManager):
:returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
"""
hashed = 0
hashes = {}
dev_path = self.get_dev_path(device)
partition_path = get_part_path(dev_path, policy, partition)
hashes_file = os.path.join(partition_path, HASH_FILE)
modified = False
orig_hashes = {'valid': False}
if recalculate is None:
recalculate = []
# TODO: make this quick on the KV, if possible ? (add a C++ function,
# will avoid the cgo overhead)
# get the whole partition tree cached so we don't hit the KV too much
# (given how keys are ordered, listing object hashes within a suffix
# dir means that the KV has to scan the whole partition)
try:
orig_hashes = self.consolidate_hashes(partition_path)
except Exception:
self.logger.warning('Unable to read %r', hashes_file,
exc_info=True)
# cache suffixes, hashes, files, below the partition
# cached_partition_tree is a nested dict:
# suffix -> object hash -> filename list
cached_partition_tree = vfile.build_partition_tree(partition_path)
# for suff in vfile.listdir(partition_path):
for suff in cached_partition_tree.keys():
if len(suff) == 3:
hashes.setdefault(suff, None)
if not orig_hashes['valid']:
# This is the only path to a valid hashes from invalid read (e.g.
# does not exist, corrupt, etc.). Moreover, in order to write this
# valid hashes we must read *the exact same* invalid state or we'll
# trigger race detection.
do_listdir = True
hashes = {'valid': True}
# If the exception handling around consolidate_hashes fired we're
# going to do a full rehash regardless; but we need to avoid
# needless recursion if the on-disk hashes.pkl is actually readable
# (worst case is consolidate_hashes keeps raising exceptions and we
# eventually run out of stack).
# N.B. orig_hashes invalid only effects new parts and error/edge
# conditions - so try not to get overly caught up trying to
# optimize it out unless you manage to convince yourself there's a
# bad behavior.
orig_hashes = read_hashes(partition_path)
else:
hashes = copy.deepcopy(orig_hashes)
if do_listdir:
for suff in vfile.listdir(partition_path):
if len(suff) == 3:
hashes.setdefault(suff, None)
modified = True
hashes.update((suffix, None) for suffix in recalculate)
for suffix, hash_ in hashes.items():
for suffix, hash_ in list(hashes.items()):
if not hash_:
suffix_dir = join(partition_path, suffix)
suffix_dir = os.path.join(partition_path, suffix)
try:
# vlogger.info("call _vhash_suffix")
hashes[suffix] = self._hash_suffix(suffix_dir,
cached_partition_tree)
hashes[suffix] = self._hash_suffix(
suffix_dir, policy=policy)
hashed += 1
except PathNotDir:
# vlogger.info(" PathNotDir exception")
del hashes[suffix]
except OSError:
# vlogger.info(" OSError exception")
logging.exception(_('Error hashing suffix'))
# modified = True
else:
pass
# Not using pickle files
# if modified:
# with lock_path(partition_path):
# if force_rewrite or not exists(hashes_file) or \
# getmtime(hashes_file) == mtime:
# write_pickle(
# hashes, hashes_file, partition_path, PICKLE_PROTOCOL)
# return hashed, hashes
# return self._get_hashes(partition_path, recalculate, do_listdir,
# reclaim_age)
# pass
# else:
# return hashed, hashes
# vlogger.debug("hashed: {} hashes: {}".format(hashed, hashes))
return hashed, hashes
modified = True
if modified:
with lock_path(partition_path):
if read_hashes(partition_path) == orig_hashes:
write_hashes(partition_path, hashes)
return hashed, hashes
return self.__get_hashes(device, partition, policy,
recalculate=recalculate,
do_listdir=do_listdir)
else:
return hashed, hashes
def get_diskfile_from_hash(self, device, partition, object_hash,
policy, **kwargs):
@ -1086,91 +1079,11 @@ class BaseKVFileManager(BaseDiskFileManager):
"""
pass
def yield_hashes(self, device, partition, policy,
suffixes=None, **kwargs):
"""
Yields tuples of (hash_only, timestamps) for object
information stored for the given device, partition, and
(optionally) suffixes. If suffixes is None, all stored
suffixes will be searched for object hashes. Note that if
suffixes is not None but empty, such as [], then nothing will
be yielded.
timestamps is a dict which may contain items mapping:
- ts_data -> timestamp of data or tombstone file,
- ts_meta -> timestamp of meta file, if one exists
- ts_ctype -> timestamp of meta file containing most recent
content-type value, if one exists
where timestamps are instances of
:class:`~swift.common.utils.Timestamp`
:param device: name of target device
:param partition: partition name
:param policy: the StoragePolicy instance
:param suffixes: optional list of suffix directories to be searched
"""
dev_path = self.get_dev_path(device)
partition_path = get_part_path(dev_path, policy, partition)
part_tree = vfile.build_partition_tree(partition_path)
if not dev_path:
raise DiskFileDeviceUnavailable()
if suffixes is None:
suffixes = ((os.path.join(partition_path, suffix), suffix)
for suffix in part_tree.keys())
else:
suffixes = (
(os.path.join(partition_path, suffix), suffix)
for suffix in suffixes)
key_preference = (
('ts_meta', 'meta_info', 'timestamp'),
('ts_data', 'data_info', 'timestamp'),
('ts_data', 'ts_info', 'timestamp'),
('ts_ctype', 'ctype_info', 'ctype_timestamp'),
)
for suffix_path, suffix in suffixes:
for object_hash in part_tree[suffix]:
object_path = os.path.join(suffix_path, object_hash)
try:
results = self.cleanup_ondisk_files(
object_path, **kwargs)
timestamps = {}
for ts_key, info_key, info_ts_key in key_preference:
if info_key not in results:
continue
timestamps[ts_key] = results[info_key][info_ts_key]
if 'ts_data' not in timestamps:
# file sets that do not include a .data or .ts
# file cannot be opened and therefore cannot
# be ssync'd
continue
yield (object_hash, timestamps)
except AssertionError as err:
self.logger.debug('Invalid file set in %s (%s)' % (
object_path, err))
except DiskFileError as err:
self.logger.debug(
'Invalid diskfile filename in %r (%s)' % (
object_path, err))
class KVFileManager(BaseKVFileManager, DiskFileManager):
diskfile_cls = KVFile
policy_type = REPL_POLICY
def _hash_suffix(self, path, cached_partition_tree=None):
"""
Performs reclamation and returns an md5 of all (remaining) files.
:param path: full path to directory
:raises PathNotDir: if given path is not a valid directory
:raises OSError: for non-ENOTDIR errors
:returns: md5 of files in suffix
"""
hashes = self._hash_suffix_dir(path, cached_partition_tree)
return hashes[None].hexdigest()
class ECKVFileReader(BaseKVFileReader, ECDiskFileReader):
def __init__(self, vfr, data_file, obj_size, etag,
@ -1323,22 +1236,6 @@ class ECKVFileManager(BaseKVFileManager, ECDiskFileManager):
diskfile_cls = ECKVFile
policy_type = EC_POLICY
def _hash_suffix(self, path, cached_partition_tree=None):
"""
Performs reclamation and returns an md5 of all (remaining) files.
:param path: full path to directory
:raises PathNotDir: if given path is not a valid directory
:raises OSError: for non-ENOTDIR errors
:returns: dict of md5 hex digests
"""
# hash_per_fi instead of single hash for whole suffix
# here we flatten out the hashers hexdigest into a dictionary instead
# of just returning the one hexdigest for the whole suffix
hash_per_fi = self._hash_suffix_dir(path, cached_partition_tree)
return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items())
def remove_vfile(filepath):
try:

View File

@ -308,16 +308,6 @@ def list_partition(socket_path, partition, partition_bits):
return response.entry
def list_partition_recursive(socket_path, partition, partition_bits):
list_partition_info = pb.ListPartitionInfo(partition=int(partition),
partition_bits=partition_bits)
conn = UnixHTTPConnection(socket_path)
conn.request('POST', '/list_partition_recursive',
list_partition_info.SerializeToString())
response = get_rpc_reply(conn, pb.PartitionContent)
return response.file_entries
def list_suffix(socket_path, partition, suffix, partition_bits):
suffix = str(suffix)
list_suffix_info = pb.ListSuffixInfo(partition=partition,

View File

@ -24,7 +24,6 @@ import fcntl
import six
import hashlib
import re
from collections import defaultdict
from eventlet.green import os
from swift.obj.header import ObjectHeader, VolumeHeader, ALIGNMENT, \
read_volume_header, HeaderException, STATE_OBJ_QUARANTINED, \
@ -833,29 +832,6 @@ def list_quarantined_ohash(quarantined_ohash_path):
return rpc.list_quarantined_ohash(si.socket_path, si.ohash)
def build_partition_tree(path):
"""
:param path: full path to partition
:return: a dict of dict of the partition directory tree:
suffix -> object hash -> list of filenames
"""
path = os.path.normpath(path)
si = SwiftPathInfo.from_path(path)
if not POLICIES[si.policy_idx].object_ring:
POLICIES[si.policy_idx].load_ring('/etc/swift')
part_power = 32 - POLICIES[si.policy_idx].object_ring._part_shift
partition_tree = defaultdict(lambda: defaultdict(list))
full_path_entries = rpc.list_partition_recursive(si.socket_path,
si.partition, part_power)
for entry in full_path_entries:
partition_tree[str(entry.suffix)][str(entry.ohash)].append(
str(entry.filename))
return partition_tree
def _list_ohash(si, part_power):
"""
:param si: SwiftPathInfo object