From 70984cdd9f71febb481f9120e2405fcd3dc002ce Mon Sep 17 00:00:00 2001 From: Phil Bridges Date: Tue, 5 Sep 2017 13:02:33 -0500 Subject: [PATCH] Use HPSS client API bindings instead of FUSE wrapper We're doing this to more easily support future development and advanced features, like parallel I/O and HPSS identity mappings. Change-Id: I0192ceb0d65f0d51d9aa77f1f21800cb29e5a262 --- swiftonhpss/swift/common/fs_utils.py | 215 ++++----------- swiftonhpss/swift/common/hpss_utils.py | 251 +++++++++++++++++ swiftonhpss/swift/common/hpssfs_ioctl.py | 41 --- swiftonhpss/swift/common/utils.py | 90 +++--- swiftonhpss/swift/obj/diskfile.py | 332 +++++++++-------------- swiftonhpss/swift/obj/server.py | 181 ++++++------ test/functional/swift_on_file_tests.py | 16 +- test/functional/swift_on_hpss_tests.py | 57 ++-- tox.ini | 7 +- 9 files changed, 603 insertions(+), 587 deletions(-) create mode 100644 swiftonhpss/swift/common/hpss_utils.py delete mode 100644 swiftonhpss/swift/common/hpssfs_ioctl.py diff --git a/swiftonhpss/swift/common/fs_utils.py b/swiftonhpss/swift/common/fs_utils.py index c6c2f09..4efd55a 100644 --- a/swiftonhpss/swift/common/fs_utils.py +++ b/swiftonhpss/swift/common/fs_utils.py @@ -16,43 +16,37 @@ import logging import os import errno -import stat -import random import time -import xattr from collections import defaultdict from itertools import repeat -import ctypes -from eventlet import sleep -from swift.common.utils import load_libc_function from swiftonhpss.swift.common.exceptions import SwiftOnFileSystemOSError from swift.common.exceptions import DiskFileNoSpace +import hpss.clapi as hpss +import hpss_utils + def do_getxattr(path, key): - return xattr.getxattr(path, key) + return hpss_utils.read_uda(path, key) def do_setxattr(path, key, value): - xattr.setxattr(path, key, value) + return hpss_utils.write_uda(path, key, value) def do_removexattr(path, key): - xattr.removexattr(path, key) + return hpss_utils.delete_uda(path, key) def do_walk(*args, **kwargs): - return os.walk(*args, **kwargs) + return hpss_utils.walk(*args, **kwargs) def do_write(fd, buf): try: - cnt = os.write(fd, buf) - except OSError as err: - filename = get_filename_from_fd(fd) + cnt = hpss.Write(fd, buf) + except IOError as err: if err.errno in (errno.ENOSPC, errno.EDQUOT): - do_log_rl("do_write(%d, msg[%d]) failed: %s : %s", - fd, len(buf), err, filename) raise DiskFileNoSpace() else: raise SwiftOnFileSystemOSError( @@ -62,72 +56,29 @@ def do_write(fd, buf): def do_read(fd, n): try: - buf = os.read(fd, n) - except OSError as err: + buf = hpss.Read(fd, n) + except IOError as err: raise SwiftOnFileSystemOSError( err.errno, '%s, os.read("%s", ...)' % (err.strerror, fd)) return buf -def do_ismount(path): - """ - Test whether a path is a mount point. - - This is code hijacked from C Python 2.6.8, adapted to remove the extra - lstat() system call. - """ - try: - s1 = os.lstat(path) - except os.error as err: - if err.errno == errno.ENOENT: - # It doesn't exist -- so not a mount point :-) - return False - else: - raise SwiftOnFileSystemOSError( - err.errno, '%s, os.lstat("%s")' % (err.strerror, path)) - - if stat.S_ISLNK(s1.st_mode): - # A symlink can never be a mount point - return False - - try: - s2 = os.lstat(os.path.join(path, '..')) - except os.error as err: - raise SwiftOnFileSystemOSError( - err.errno, '%s, os.lstat("%s")' % (err.strerror, - os.path.join(path, '..'))) - - dev1 = s1.st_dev - dev2 = s2.st_dev - if dev1 != dev2: - # path/.. on a different device as path - return True - - ino1 = s1.st_ino - ino2 = s2.st_ino - if ino1 == ino2: - # path/.. is the same i-node as path - return True - - return False - - def do_mkdir(path): - os.mkdir(path) + hpss.Mkdir(path, 0o600) def do_rmdir(path): try: - os.rmdir(path) - except OSError as err: + hpss.Rmdir(path) + except IOError as err: raise SwiftOnFileSystemOSError( err.errno, '%s, os.rmdir("%s")' % (err.strerror, path)) def do_chown(path, uid, gid): try: - os.chown(path, uid, gid) - except OSError as err: + hpss.Chown(path, uid, gid) + except IOError as err: raise SwiftOnFileSystemOSError( err.errno, '%s, os.chown("%s", %s, %s)' % ( err.strerror, path, uid, gid)) @@ -135,8 +86,9 @@ def do_chown(path, uid, gid): def do_fchown(fd, uid, gid): try: + # TODO: grab path name from fd, chown that os.fchown(fd, uid, gid) - except OSError as err: + except IOError as err: raise SwiftOnFileSystemOSError( err.errno, '%s, os.fchown(%s, %s, %s)' % ( err.strerror, fd, uid, gid)) @@ -146,65 +98,45 @@ _STAT_ATTEMPTS = 10 def do_stat(path): - serr = None - for i in range(0, _STAT_ATTEMPTS): - try: - stats = os.stat(path) - except OSError as err: - if err.errno == errno.EIO: - # Retry EIO assuming it is a transient error from FUSE after a - # short random sleep - serr = err - sleep(random.uniform(0.001, 0.005)) - continue - if err.errno == errno.ENOENT: - stats = None - else: - raise SwiftOnFileSystemOSError( - err.errno, '%s, os.stat("%s")[%d attempts]' % ( - err.strerror, path, i)) - if i > 0: - logging.warn("fs_utils.do_stat():" - " os.stat('%s') retried %d times (%s)", - path, i, 'success' if stats else 'failure') - return stats - else: + try: + stats = hpss.Stat(path) + except IOError as err: raise SwiftOnFileSystemOSError( - serr.errno, '%s, os.stat("%s")[%d attempts]' % ( - serr.strerror, path, _STAT_ATTEMPTS)) + err.errno, '%s, os.stat(%s)' % (err.strerror, path)) + return stats def do_fstat(fd): try: - stats = os.fstat(fd) - except OSError as err: + stats = hpss.Fstat(fd) + except IOError as err: raise SwiftOnFileSystemOSError( err.errno, '%s, os.fstat(%s)' % (err.strerror, fd)) return stats -def do_open(path, flags, mode=0o777): +def do_open(path, flags, mode=0o777, hints=None): + hints_struct = hpss.cos_hints() try: - fd = os.open(path, flags, mode) - except OSError as err: + if hints: + cos = hints.get('cos', '0') + hints_struct.cos = int(cos) + file_handle = hpss.Open(path, flags, mode, hints_struct) + else: + file_handle = hpss.Open(path, flags, mode) + fd = file_handle[0] + except IOError as err: raise SwiftOnFileSystemOSError( err.errno, '%s, os.open("%s", %x, %o)' % ( err.strerror, path, flags, mode)) return fd -def do_dup(fd): - return os.dup(fd) - - def do_close(fd): try: - os.close(fd) - except OSError as err: + hpss.Close(fd) + except IOError as err: if err.errno in (errno.ENOSPC, errno.EDQUOT): - filename = get_filename_from_fd(fd) - do_log_rl("do_close(%d) failed: %s : %s", - fd, err, filename) raise DiskFileNoSpace() else: raise SwiftOnFileSystemOSError( @@ -213,8 +145,8 @@ def do_close(fd): def do_unlink(path, log=True): try: - os.unlink(path) - except OSError as err: + hpss.Unlink(path) + except IOError as err: if err.errno != errno.ENOENT: raise SwiftOnFileSystemOSError( err.errno, '%s, os.unlink("%s")' % (err.strerror, path)) @@ -225,8 +157,8 @@ def do_unlink(path, log=True): def do_rename(old_path, new_path): try: - os.rename(old_path, new_path) - except OSError as err: + hpss.Rename(old_path, new_path) + except IOError as err: raise SwiftOnFileSystemOSError( err.errno, '%s, os.rename("%s", "%s")' % ( err.strerror, old_path, new_path)) @@ -234,76 +166,20 @@ def do_rename(old_path, new_path): def do_fsync(fd): try: - os.fsync(fd) - except OSError as err: + hpss.Fsync(fd) + except IOError as err: raise SwiftOnFileSystemOSError( err.errno, '%s, os.fsync("%s")' % (err.strerror, fd)) -def do_fdatasync(fd): - try: - os.fdatasync(fd) - except AttributeError: - do_fsync(fd) - except OSError as err: - raise SwiftOnFileSystemOSError( - err.errno, '%s, os.fdatasync("%s")' % (err.strerror, fd)) - - -_posix_fadvise = None - - -def do_fadvise64(fd, offset, length): - global _posix_fadvise - if _posix_fadvise is None: - _posix_fadvise = load_libc_function('posix_fadvise64') - # 4 means "POSIX_FADV_DONTNEED" - _posix_fadvise(fd, ctypes.c_uint64(offset), - ctypes.c_uint64(length), 4) - - def do_lseek(fd, pos, how): try: - os.lseek(fd, pos, how) - except OSError as err: + hpss.Lseek(fd, pos, how) + except IOError as err: raise SwiftOnFileSystemOSError( err.errno, '%s, os.lseek("%s")' % (err.strerror, fd)) -def get_filename_from_fd(fd, verify=False): - """ - Given the file descriptor, this method attempts to get the filename as it - was when opened. This may not give accurate results in following cases: - - file was renamed/moved/deleted after it was opened - - file has multiple hardlinks - - :param fd: file descriptor of file opened - :param verify: If True, performs additional checks using inode number - """ - filename = None - if isinstance(fd, int): - try: - filename = os.readlink("/proc/self/fd/" + str(fd)) - except OSError: - pass - - if not verify: - return filename - - # If verify = True, we compare st_dev and st_ino of file and fd. - # This involves additional stat and fstat calls. So this is disabled - # by default. - if filename and fd: - s_file = do_stat(filename) - s_fd = do_fstat(fd) - - if s_file and s_fd: - if (s_file.st_ino, s_file.st_dev) == (s_fd.st_ino, s_fd.st_dev): - return filename - - return None - - def static_var(varname, value): """Decorator function to create pseudo static variables.""" def decorate(func): @@ -311,6 +187,7 @@ def static_var(varname, value): return func return decorate + # Rate limit to emit log message once a second _DO_LOG_RL_INTERVAL = 1.0 diff --git a/swiftonhpss/swift/common/hpss_utils.py b/swiftonhpss/swift/common/hpss_utils.py new file mode 100644 index 0000000..554b265 --- /dev/null +++ b/swiftonhpss/swift/common/hpss_utils.py @@ -0,0 +1,251 @@ +# Copyright (c) 2016 IBM Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hpss.clapi as hpss +from errno import ENOENT, EPERM, EEXIST, EINVAL +import posixpath + +from swiftonhpss.swift.common.exceptions import SwiftOnFileSystemOSError + +_auth_mech_to_api = {'unix': hpss.hpss_authn_mech_unix, + 'krb5': hpss.hpss_authn_mech_krb5} + +_cred_type_to_api = {'key': hpss.hpss_rpc_auth_type_key, + 'keyfile': hpss.hpss_rpc_auth_type_keyfile, + 'keytab': hpss.hpss_rpc_auth_type_keytab, + 'password': hpss.hpss_rpc_auth_type_passwd} + + +# TODO: make a nicer wrapping around these things +def create_hpss_session(user_name, + auth_mech='unix', + auth_cred_type='keytab', + auth_cred='/var/hpss/etc/hpss.unix.keytab'): + try: + hpss_mech = _auth_mech_to_api[auth_mech] + hpss_cred_type = _cred_type_to_api[auth_cred_type] + except KeyError: + raise ValueError('invalid mechanism or cred type specified') + try: + hpss.SetLoginCred(user_name, + hpss_mech, + hpss_cred_type, + hpss.hpss_rpc_cred_client, + auth_cred) + except IOError as e: + if e.errno == ENOENT: + raise ValueError('HPSS reports invalid username') + if e.errno == EPERM: + raise OSError('Permission denied by HPSS') + + +def destroy_hpss_session(): + hpss.PurgeLoginCred() + + +def read_uda(path, key): + normalized_key = '/hpss/' + key + uda_contents = hpss.UserAttrGetAttrs(path, + [normalized_key])[normalized_key] + return hpss.ChompXMLHeader(uda_contents) + + +def write_uda(path, key, value): + return hpss.UserAttrSetAttrs(path, {'/hpss/' + key: value}) + + +def delete_uda(path, key): + return hpss.UserAttrDeleteAttrs(path, '/hpss/' + key) + + +def list_uda(path): + return hpss.UserAttrListAttrs(path) + + +def _split_path(path): + if path == '/': + return [''] + else: + return path.split('/') + + +def set_purge_lock(path, lock_status): + try: + fd = hpss.Open(path, hpss.O_RDWR | hpss.O_NONBLOCK)[0] + except IOError as e: + raise SwiftOnFileSystemOSError('couldnt open file for purgelock: %s' % + e.errno) + + if lock_status: + flag = hpss.PURGE_LOCK + else: + flag = hpss.PURGE_UNLOCK + + try: + hpss.PurgeLock(int(fd), flag) + except IOError as e: + if e.errno != EINVAL: + # This happens when either the file is empty, or there's no + # tape layer in the class of service. + # TODO: should we return an objection as an HTTP resp? + raise SwiftOnFileSystemOSError('hpss.purge_lock(%s):' + 'fd %s, errno %s' % + (path, fd, e.errno)) + + try: + hpss.Close(fd) + except IOError as e: + raise SwiftOnFileSystemOSError('couldnt close file for purgelock: %s' % + e.errno) + + +def set_cos(path, cos_id): + hpss.FileSetCOS(path, cos_id) + + +def preallocate(fd, size): + hpss.Fpreallocate(fd, size) + + +def set_checksum(fd, md5_digest): + flags = hpss.HPSS_FILE_HASH_GENERATED | hpss.HPSS_FILE_HASH_DIGEST_VALID + hpss.FsetFileDigest(fd, md5_digest, "", flags, hpss.HASH_MD5) + + +def relative_symlink(parent_dir, source, target): + pwd = bytes(hpss.Getcwd()) + hpss.Chdir(parent_dir) + hpss.Symlink(source, target) + hpss.Chdir(pwd) + + +def path_exists(path): + try: + return hpss.Access(path, hpss.F_OK) + except IOError as e: + if e.errno == ENOENT: + return False + else: + raise + + +def makedirs(name, mode=0o777): + components = posixpath.split(name) + if components[0] != '/': + makedirs(components[0], mode) + try: + hpss.Mkdir(name, mode) + except IOError as e: + if e.errno != EEXIST: + raise + + +# TODO: give an iterator instead of a list of files/directories? +# that could quickly become way too many to list +def walk(path, topdown=True): + dir_handle = hpss.Opendir(path) + current_obj = hpss.Readdir(dir_handle) + dirs = [] + files = [] + while current_obj: + if current_obj.d_handle.Type == hpss.NS_OBJECT_TYPE_DIRECTORY: + if current_obj.d_name != '.' and current_obj.d_name != '..': + dirs.append(current_obj.d_name) + elif current_obj.d_handle.Type == hpss.NS_OBJECT_TYPE_FILE: + files.append(current_obj.d_name) + current_obj = hpss.Readdir(dir_handle) + hpss.Closedir(dir_handle) + if topdown: + yield (path, dirs, files) + for dir_name in dirs: + next_iter = walk("/".join(_split_path(path) + [dir_name])) + for walk_entity in next_iter: + yield walk_entity + else: + for dir_name in dirs: + next_iter = walk("/".join(_split_path(path) + [dir_name])) + for walk_entity in next_iter: + yield walk_entity + yield (path, dirs, files) + + +def _levels_to_string(storage_levels): + def storage_info_string(level_struct): + return '%u:%u:%u:%u' % (level_struct.BytesAtLevel, + level_struct.StripeLength, + level_struct.StripeWidth, + level_struct.OptimumAccessSize) + + def pv_list_string(pv_list): + return ','.join([pv_element.Name for pv_element in pv_list]) + + def vv_data_string(vv_struct): + return '%s:%s:[%s]' % (vv_struct.BytesOnVV, + vv_struct.RelPosition, + pv_list_string(vv_struct.PVList)) + + def vv_list_string(vv_list): + return ':'.join(map(vv_data_string, vv_list)) + + def more_exists_string(level_struct): + if level_struct.Flags & hpss.BFS_BFATTRS_ADDITIONAL_VV_EXIST: + return '...' + else: + return '' + + def level_data_string(level_struct, depth): + if level_struct.Flags & hpss.BFS_BFATTRS_LEVEL_IS_DISK: + storage_type = "disk" + else: + storage_type = "tape" + if level_struct.BytesAtLevel == 0: + return "%u:%s:nodata" % (depth, storage_type) + else: + return "%u:%s:%s:(%s)%s" % (depth, + storage_type, + storage_info_string(level_struct), + vv_list_string(level_struct.VVAttrib), + more_exists_string(level_struct)) + + def level_list_string(levels): + level_datas = [] + for index, level_struct in enumerate(levels): + level_datas.append(level_data_string(level_struct, index)) + return ';'.join(level_datas) + + return level_list_string(storage_levels) + + +def read_hpss_system_metadata(path): + flag = hpss.API_GET_STATS_FOR_ALL_LEVELS + xfileattr_struct = hpss.FileGetXAttributes(path, flag) + optimum_tuple = (xfileattr_struct.SCAttrib[0].OptimumAccessSize, + xfileattr_struct.SCAttrib[0].StripeWidth, + xfileattr_struct.SCAttrib[0].StripeLength) + attrs = {'X-HPSS-Account': xfileattr_struct.Attrs.Account, + 'X-HPSS-Bitfile-ID': xfileattr_struct.Attrs.BitfileObj.BfId, + 'X-HPSS-Comment': xfileattr_struct.Attrs.Comment, + 'X-HPSS-Class-Of-Service-ID': xfileattr_struct.Attrs.COSId, + 'X-HPSS-Data-Levels': + _levels_to_string(xfileattr_struct.SCAttrib), + 'X-HPSS-Family-ID': xfileattr_struct.Attrs.FamilyId, + 'X-HPSS-Fileset-ID': xfileattr_struct.Attrs.FilesetId, + 'X-HPSS-Optimum-Size': "%u:%u:%u" % optimum_tuple, + 'X-HPSS-Purgelock-Status': xfileattr_struct.Attrs.OptionFlags & 1, + 'X-HPSS-Reads': xfileattr_struct.Attrs.ReadCount, + 'X-HPSS-Realm-ID': xfileattr_struct.Attrs.RealmId, + 'X-HPSS-Subsys-ID': xfileattr_struct.Attrs.SubSystemId, + 'X-HPSS-Writes': xfileattr_struct.Attrs.WriteCount, } + return attrs diff --git a/swiftonhpss/swift/common/hpssfs_ioctl.py b/swiftonhpss/swift/common/hpssfs_ioctl.py deleted file mode 100644 index 9dff94f..0000000 --- a/swiftonhpss/swift/common/hpssfs_ioctl.py +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright (c) 2016 IBM Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import array -import fcntl - -HPSSFS_GET_COS = 0x80046c01 -HPSSFS_SET_COS_HINT = 0x40046c02 - -HPSSFS_SET_FSIZE_HINT = 0x40086c03 -HPSSFS_SET_MAXSEGSZ_HINT = 0x40046c04 - -HPSSFS_PURGE_CACHE = 0x00006c05 -HPSSFS_PURGE_LOCK = 0x40046c06 - -HPSSFS_UNDELETE = 0x40046c07 -HPSSFS_UNDELETE_NONE = 0x00000000 -HPSSFS_UNDELETE_RESTORE_TIME = 0x00000001 -HPSSFS_UNDELETE_OVERWRITE = 0x00000002 -HPSSFS_UNDELETE_OVERWRITE_AND_RESTORE = 0x00000003 - - -def ioctl(fd, cmd, val=None): - if val is not None: - valbuf = array.array('i', [0]) - else: - valbuf = array.array('i', [val]) - fcntl.ioctl(fd, cmd, valbuf) - return valbuf[0] diff --git a/swiftonhpss/swift/common/utils.py b/swiftonhpss/swift/common/utils.py index 236026b..e580d55 100644 --- a/swiftonhpss/swift/common/utils.py +++ b/swiftonhpss/swift/common/utils.py @@ -28,9 +28,9 @@ from swiftonhpss.swift.common.exceptions import SwiftOnFileSystemIOError from swift.common.exceptions import DiskFileNoSpace from swift.common.db import utf8encodekeys from swiftonhpss.swift.common.fs_utils import do_stat, \ - do_walk, do_rmdir, do_log_rl, get_filename_from_fd, do_open, \ + do_walk, do_rmdir, do_log_rl, do_open, \ do_getxattr, do_setxattr, do_removexattr, do_read, \ - do_close, do_dup, do_lseek, do_fstat, do_fsync, do_rename + do_close, do_fsync, do_rename from urllib import quote, unquote X_CONTENT_TYPE = 'Content-Type' @@ -56,6 +56,20 @@ CHUNK_SIZE = 65536 read_pickled_metadata = False +def chunk_list(input_list, chunk_size): + chunks = [] + if len(input_list) % chunk_size != 0: + raise IndexError + for i in xrange(0, len(input_list) / chunk_size): + chunks.append(input_list[i * chunk_size:(i + 1) * chunk_size]) + return chunks + + +def hex_digest_to_bytes(hex_digest): + return ''.join([chr(int(chunk, 16)) for chunk + in chunk_list(hex_digest, 2)]) + + def normalize_timestamp(timestamp): """ Format a timestamp (string or numeric) into a standardized @@ -138,7 +152,7 @@ def deserialize_metadata(in_metastr): return {} -def read_metadata(path_or_fd): +def read_metadata(path): """ Helper function to read the serialized metadata from a File/Directory. @@ -150,7 +164,7 @@ def read_metadata(path_or_fd): key = 0 try: while True: - metastr += do_getxattr(path_or_fd, '%s%s' % + metastr += do_getxattr(path, '%s%s' % (METADATA_KEY, (key or ''))) key += 1 if len(metastr) < MAX_XATTR_SIZE: @@ -167,13 +181,13 @@ def read_metadata(path_or_fd): if not metadata: # Empty dict i.e deserializing of metadata has failed, probably # because it is invalid or incomplete or corrupt - clean_metadata(path_or_fd) + clean_metadata(path) assert isinstance(metadata, dict) return metadata -def write_metadata(path_or_fd, metadata): +def write_metadata(path, metadata): """ Helper function to write serialized metadata for a File/Directory. @@ -185,39 +199,34 @@ def write_metadata(path_or_fd, metadata): key = 0 while metastr: try: - do_setxattr(path_or_fd, + do_setxattr(path, '%s%s' % (METADATA_KEY, key or ''), metastr[:MAX_XATTR_SIZE]) except IOError as err: if err.errno in (errno.ENOSPC, errno.EDQUOT): - if isinstance(path_or_fd, int): - filename = get_filename_from_fd(path_or_fd) - do_log_rl("write_metadata(%d, metadata) failed: %s : %s", - path_or_fd, err, filename) - else: - do_log_rl("write_metadata(%s, metadata) failed: %s", - path_or_fd, err) + do_log_rl("write_metadata(%s, metadata) failed: %s", + path, err) raise DiskFileNoSpace() else: raise SwiftOnFileSystemIOError( err.errno, '%s, setxattr("%s", %s, metastr)' % (err.strerror, - path_or_fd, key)) + path, key)) metastr = metastr[MAX_XATTR_SIZE:] key += 1 -def clean_metadata(path_or_fd): +def clean_metadata(path): key = 0 while True: try: - do_removexattr(path_or_fd, '%s%s' % (METADATA_KEY, (key or ''))) + do_removexattr(path, '%s%s' % (METADATA_KEY, (key or ''))) except IOError as err: if err.errno == errno.ENODATA: break raise SwiftOnFileSystemIOError( err.errno, '%s, removexattr("%s", %s)' % (err.strerror, - path_or_fd, key)) + path, key)) key += 1 @@ -238,7 +247,7 @@ def _read_for_etag(fp): return etag.hexdigest() -def get_etag(path_or_fd): +def get_etag(path): """ FIXME: It would be great to have a translator that returns the md5sum() of the file as an xattr that can be simply fetched. @@ -246,43 +255,22 @@ def get_etag(path_or_fd): Since we don't have that we should yield after each chunk read and computed so that we don't consume the worker thread. """ - etag = '' - if isinstance(path_or_fd, int): - # We are given a file descriptor, so this is an invocation from the - # DiskFile.open() method. - fd = path_or_fd - dup_fd = do_dup(fd) - try: - etag = _read_for_etag(dup_fd) - do_lseek(fd, 0, os.SEEK_SET) - finally: - do_close(dup_fd) - else: - # We are given a path to the object when the DiskDir.list_objects_iter - # method invokes us. - path = path_or_fd - fd = do_open(path, os.O_RDONLY) - try: - etag = _read_for_etag(fd) - finally: - do_close(fd) + # FIXME: really do need to grab precomputed checksum instead + fd = do_open(path, os.O_RDONLY) + try: + etag = _read_for_etag(fd) + finally: + do_close(fd) return etag -def get_object_metadata(obj_path_or_fd, stats=None): +def get_object_metadata(obj_path, stats=None): """ Return metadata of object. """ if not stats: - if isinstance(obj_path_or_fd, int): - # We are given a file descriptor, so this is an invocation from the - # DiskFile.open() method. - stats = do_fstat(obj_path_or_fd) - else: - # We are given a path to the object when the - # DiskDir.list_objects_iter method invokes us. - stats = do_stat(obj_path_or_fd) + stats = do_stat(obj_path) if not stats: metadata = {} @@ -290,12 +278,12 @@ def get_object_metadata(obj_path_or_fd, stats=None): is_dir = stat.S_ISDIR(stats.st_mode) metadata = { X_TYPE: OBJECT, - X_TIMESTAMP: normalize_timestamp(stats.st_ctime), + X_TIMESTAMP: normalize_timestamp(stats.hpss_st_ctime), X_CONTENT_TYPE: DIR_TYPE if is_dir else FILE_TYPE, X_OBJECT_TYPE: DIR_NON_OBJECT if is_dir else FILE, X_CONTENT_LENGTH: 0 if is_dir else stats.st_size, - X_MTIME: 0 if is_dir else normalize_timestamp(stats.st_mtime), - X_ETAG: md5().hexdigest() if is_dir else get_etag(obj_path_or_fd)} + X_MTIME: 0 if is_dir else normalize_timestamp(stats.hpss_st_mtime), + X_ETAG: md5().hexdigest() if is_dir else get_etag(obj_path)} return metadata diff --git a/swiftonhpss/swift/obj/diskfile.py b/swiftonhpss/swift/obj/diskfile.py index 76031bf..2460e90 100644 --- a/swiftonhpss/swift/obj/diskfile.py +++ b/swiftonhpss/swift/obj/diskfile.py @@ -23,29 +23,21 @@ except ImportError: import random import logging import time -try: - import hpssfs -except ImportError: - import swiftonhpss.swift.common.hpssfs_ioctl as hpssfs -import xattr -from uuid import uuid4 from hashlib import md5 from eventlet import sleep from contextlib import contextmanager from swiftonhpss.swift.common.exceptions import AlreadyExistsAsFile, \ AlreadyExistsAsDir -from swift.common.utils import ThreadPool, hash_path, \ - normalize_timestamp, fallocate, Timestamp +from swift.common.utils import hash_path, \ + normalize_timestamp, Timestamp from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ - DiskFileNoSpace, DiskFileDeviceUnavailable, DiskFileNotOpen, \ - DiskFileExpired + DiskFileNoSpace, DiskFileNotOpen, DiskFileExpired from swift.common.swob import multi_range_iterator -from swiftonhpss.swift.common.exceptions import SwiftOnFileSystemOSError, \ - SwiftOnFileSystemIOError +from swiftonhpss.swift.common.exceptions import SwiftOnFileSystemOSError from swiftonhpss.swift.common.fs_utils import do_fstat, do_open, do_close, \ - do_unlink, do_chown, do_fsync, do_fchown, do_stat, do_write, do_read, \ - do_fadvise64, do_rename, do_fdatasync, do_lseek, do_mkdir + do_unlink, do_chown, do_fsync, do_stat, do_write, do_read, \ + do_rename, do_lseek, do_mkdir from swiftonhpss.swift.common.utils import read_metadata, write_metadata,\ rmobjdir, dir_is_object, \ get_object_metadata, write_pickle, get_etag @@ -56,6 +48,10 @@ from swiftonhpss.swift.common.utils import X_CONTENT_TYPE, \ from swift.obj.diskfile import DiskFileManager as SwiftDiskFileManager from swift.obj.diskfile import get_async_dir +import swiftonhpss.swift.common.hpss_utils as hpss_utils + +import hpss.clapi as hpss + # FIXME: Hopefully we'll be able to move to Python 2.7+ where O_CLOEXEC will # be back ported. See http://www.python.org/dev/peps/pep-0433/ O_CLOEXEC = 0o02000000 @@ -201,7 +197,7 @@ def _adjust_metadata(fd, metadata): # detect if the object was changed from filesystem interface (non Swift) statinfo = do_fstat(fd) if stat.S_ISREG(statinfo.st_mode): - metadata[X_MTIME] = normalize_timestamp(statinfo.st_mtime) + metadata[X_MTIME] = normalize_timestamp(statinfo.hpss_st_mtime) metadata[X_TYPE] = OBJECT return metadata @@ -226,22 +222,19 @@ class DiskFileManager(SwiftDiskFileManager): """ def get_diskfile(self, device, partition, account, container, obj, policy=None, **kwargs): - dev_path = self.get_dev_path(device, self.mount_check) - if not dev_path: - raise DiskFileDeviceUnavailable() - return DiskFile(self, dev_path, self.threadpools[device], + hpss_dir_name = self.conf.get('hpss_swift_dir', '/swift') + return DiskFile(self, hpss_dir_name, partition, account, container, obj, policy=policy, **kwargs) def pickle_async_update(self, device, account, container, obj, data, timestamp, policy): - # This method invokes swiftonhpss's writepickle method. - # Is patching just write_pickle and calling parent method better ? + # This should be using the JSON blob stuff instead of a pickle. + # Didn't we deprecate it? device_path = self.construct_dev_path(device) async_dir = os.path.join(device_path, get_async_dir(policy)) ohash = hash_path(account, container, obj) - self.threadpools[device].run_in_thread( - write_pickle, + write_pickle( data, os.path.join(async_dir, ohash[-3:], ohash + '-' + normalize_timestamp(timestamp)), @@ -267,6 +260,8 @@ class DiskFileWriter(object): self._upload_size = 0 self._last_sync = 0 + self._logger = self._disk_file._logger + def _write_entire_chunk(self, chunk): bytes_per_sync = self._disk_file._mgr.bytes_per_sync while chunk: @@ -276,8 +271,7 @@ class DiskFileWriter(object): # For large files sync every 512MB (by default) written diff = self._upload_size - self._last_sync if diff >= bytes_per_sync: - do_fdatasync(self._fd) - do_fadvise64(self._fd, self._last_sync, diff) + do_fsync(self._fd) self._last_sync = self._upload_size def close(self): @@ -298,35 +292,17 @@ class DiskFileWriter(object): :returns: the total number of bytes written to an object """ - df = self._disk_file - df._threadpool.run_in_thread(self._write_entire_chunk, chunk) + self._write_entire_chunk(chunk) return self._upload_size - def _finalize_put(self, metadata, purgelock=False): + def _finalize_put(self, metadata): # Write out metadata before fsync() to ensure it is also forced to # disk. - write_metadata(self._fd, metadata) + write_metadata(self._tmppath, metadata) - # We call fsync() before calling drop_cache() to lower the - # amount of redundant work the drop cache code will perform on - # the pages (now that after fsync the pages will be all - # clean). do_fsync(self._fd) - # (HPSS) Purge lock the file now if we're asked to. - if purgelock: - try: - hpssfs.ioctl(self._fd, hpssfs.HPSSFS_PURGE_LOCK, - int(purgelock)) - except IOError as err: - raise SwiftOnFileSystemIOError( - err.errno, - '%s, hpssfs.ioctl("%s", ...)' % (err.strerror, self._fd)) - - # From the Department of the Redundancy Department, make sure - # we call drop_cache() after fsync() to avoid redundant work - # (pages all clean). - do_fadvise64(self._fd, self._last_sync, self._upload_size) + self.set_checksum(metadata['ETag']) # At this point we know that the object's full directory path # exists, so we can just rename it directly without using Swift's @@ -400,7 +376,7 @@ class DiskFileWriter(object): # in a thread. self.close() - def put(self, metadata, purgelock=False): + def put(self, metadata): """ Finalize writing the file on disk, and renames it from the temp file to the real location. This should be called after the data has been @@ -416,8 +392,7 @@ class DiskFileWriter(object): df = self._disk_file if dir_is_object(metadata): - df._threadpool.force_run_in_thread( - df._create_dir_object, df._data_file, metadata) + df._create_dir_object(df._data_file, metadata) return if df._is_dir: @@ -429,8 +404,7 @@ class DiskFileWriter(object): ' since the target, %s, already exists' ' as a directory' % df._data_file) - df._threadpool.force_run_in_thread(self._finalize_put, metadata, - purgelock) + self._finalize_put(metadata) # Avoid the unlink() system call as part of the mkstemp context # cleanup @@ -446,6 +420,9 @@ class DiskFileWriter(object): """ pass + def set_checksum(self, checksum): + hpss_utils.set_checksum(self._fd, checksum) + class DiskFileReader(object): """ @@ -465,18 +442,16 @@ class DiskFileReader(object): specific. The API does not define the constructor arguments. :param fp: open file descriptor, -1 for a directory object - :param threadpool: thread pool to use for read operations :param disk_chunk_size: size of reads from disk in bytes :param obj_size: size of object on disk :param keep_cache_size: maximum object size that will be kept in cache :param iter_hook: called when __iter__ returns a chunk :param keep_cache: should resulting reads be kept in the buffer cache """ - def __init__(self, fd, threadpool, disk_chunk_size, obj_size, + def __init__(self, fd, disk_chunk_size, obj_size, keep_cache_size, iter_hook=None, keep_cache=False): # Parameter tracking self._fd = fd - self._threadpool = threadpool self._disk_chunk_size = disk_chunk_size self._iter_hook = iter_hook if keep_cache: @@ -496,8 +471,7 @@ class DiskFileReader(object): bytes_read = 0 while True: if self._fd != -1: - chunk = self._threadpool.run_in_thread( - do_read, self._fd, self._disk_chunk_size) + chunk = do_read(self._fd, self._disk_chunk_size) else: chunk = None if chunk: @@ -555,9 +529,7 @@ class DiskFileReader(object): self.close() def _drop_cache(self, offset, length): - """Method for no-oping buffer cache drop method.""" - if not self._keep_cache and self._fd > -1: - do_fadvise64(self._fd, offset, length) + pass def close(self): """ @@ -581,25 +553,24 @@ class DiskFile(object): :param mgr: associated on-disk manager instance :param dev_path: device name/account_name for UFO. - :param threadpool: thread pool in which to do blocking operations :param account: account name for the object :param container: container name for the object :param obj: object name for the object :param uid: user ID disk object should assume (file or directory) :param gid: group ID disk object should assume (file or directory) """ - def __init__(self, mgr, dev_path, threadpool, partition, + def __init__(self, mgr, dev_path, partition, account=None, container=None, obj=None, policy=None, uid=DEFAULT_UID, gid=DEFAULT_GID, **kwargs): # Variables partition and policy is currently unused. self._mgr = mgr + self._logger = mgr.logger self._device_path = dev_path - self._threadpool = threadpool or ThreadPool(nthreads=0) - self._uid = int(uid) - self._gid = int(gid) + self._uid = uid + self._gid = gid self._is_dir = False self._metadata = None - self._fd = None + self._fd = -1 # Save stat info as internal variable to avoid multiple stat() calls self._stat = None # Save md5sum of object as internal variable to avoid reading the @@ -609,9 +580,11 @@ class DiskFile(object): # Don't store a value for data_file until we know it exists. self._data_file = None - # Account name contains resller_prefix which is retained and not + self._obj_size = 0 + + # Account name contains reseller_prefix which is retained and not # stripped. This to conform to Swift's behavior where account name - # entry in Account DBs contain resller_prefix. + # entry in Account DBs contain reseller_prefix. self._account = account self._container = container @@ -628,7 +601,6 @@ class DiskFile(object): self._put_datadir = self._container_path self._data_file = os.path.join(self._put_datadir, self._obj) - self._stat = do_stat(self._data_file) @property def timestamp(self): @@ -662,7 +634,15 @@ class DiskFile(object): """ # Writes are always performed to a temporary file try: - self._fd = do_open(self._data_file, os.O_RDONLY | O_CLOEXEC) + self._logger.debug("DiskFile: Opening %s" % self._data_file) + self._stat = do_stat(self._data_file) + self._is_dir = stat.S_ISDIR(self._stat.st_mode) + if not self._is_dir: + self._fd = do_open(self._data_file, hpss.O_RDONLY) + self._obj_size = self._stat.st_size + else: + self._fd = -1 + self._obj_size = 0 except SwiftOnFileSystemOSError as err: if err.errno in (errno.ENOENT, errno.ENOTDIR): # If the file does exist, or some part of the path does not @@ -670,25 +650,18 @@ class DiskFile(object): raise DiskFileNotExist raise try: - self._stat = do_fstat(self._fd) - self._is_dir = stat.S_ISDIR(self._stat.st_mode) - obj_size = self._stat.st_size - - self._metadata = read_metadata(self._fd) + self._logger.debug("DiskFile: Reading metadata") + self._metadata = read_metadata(self._data_file) if not self._validate_object_metadata(): - self._create_object_metadata(self._fd) + self._create_object_metadata(self._data_file) assert self._metadata is not None self._filter_metadata() - if self._is_dir: - do_close(self._fd) - obj_size = 0 - self._fd = -1 - else: + if not self._is_dir: if self._is_object_expired(self._metadata): raise DiskFileExpired(metadata=self._metadata) - self._obj_size = obj_size + except (OSError, IOError, DiskFileExpired) as err: # Something went wrong. Context manager will not call # __exit__. So we close the fd manually here. @@ -730,9 +703,9 @@ class DiskFile(object): # Check if the file has been modified through filesystem # interface by comparing mtime stored in xattr during PUT # and current mtime of file. - obj_stat = os.stat(self._data_file) + obj_stat = do_stat(self._data_file) if normalize_timestamp(self._metadata[X_MTIME]) != \ - normalize_timestamp(obj_stat.st_mtime): + normalize_timestamp(obj_stat.hpss_st_mtime): self._file_has_changed = True return False else: @@ -755,10 +728,10 @@ class DiskFile(object): return False - def _create_object_metadata(self, fd): + def _create_object_metadata(self, file_path): if self._etag is None: self._etag = md5().hexdigest() if self._is_dir \ - else get_etag(fd) + else get_etag(file_path) if self._file_has_changed or (X_TIMESTAMP not in self._metadata): timestamp = normalize_timestamp(self._stat.st_mtime) @@ -775,12 +748,12 @@ class DiskFile(object): # Add X_MTIME key if object is a file if not self._is_dir: - metadata[X_MTIME] = normalize_timestamp(self._stat.st_mtime) + metadata[X_MTIME] = normalize_timestamp(self._stat.hpss_st_mtime) meta_new = self._metadata.copy() meta_new.update(metadata) if self._metadata != meta_new: - write_metadata(fd, meta_new) + write_metadata(file_path, meta_new) # Avoid additional read_metadata() later self._metadata = meta_new @@ -799,7 +772,7 @@ class DiskFile(object): pass except ValueError: # x-delete-at key is present but not an integer. - # TODO: Openstack Swift "quarrantines" the object. + # TODO: Openstack Swift "quarantines" the object. # We just let it pass pass else: @@ -807,58 +780,24 @@ class DiskFile(object): return True return False + # TODO: don't parse this, just grab data structure def is_offline(self): - try: - raw_file_levels = xattr.getxattr(self._data_file, - "system.hpss.level") - except IOError as err: - raise SwiftOnFileSystemIOError( - err.errno, - '%s, xattr.getxattr("system.hpss.level", ...)' % err.strerror - ) - try: - file_levels = raw_file_levels.split(";") - top_level = file_levels[0].split(':') - bytes_on_disk = top_level[2].rstrip(' ') - if bytes_on_disk == 'nodata': - bytes_on_disk = '0' - except ValueError: - raise SwiftOnFileSystemIOError("Couldn't get system.hpss.level!") + meta = hpss_utils.read_hpss_system_metadata(self._data_file) + raw_file_levels = meta['X-HPSS-Data-Levels'] + file_levels = raw_file_levels.split(";") + top_level = file_levels[0].split(':') + bytes_on_disk = top_level[2].rstrip(' ') + if bytes_on_disk == 'nodata': + bytes_on_disk = '0' return int(bytes_on_disk) != self._stat.st_size - def read_hpss_system_metadata(self): - header_to_xattr = {'X-HPSS-Account': 'account', - 'X-HPSS-Bitfile-ID': 'bitfile', - 'X-HPSS-Comment': 'comment', - 'X-HPSS-Class-Of-Service-ID': 'cos', - 'X-HPSS-Data-Levels': 'level', - 'X-HPSS-Family-ID': 'family', - 'X-HPSS-Fileset-ID': 'fileset', - 'X-HPSS-Optimum-Size': 'optimum', - 'X-HPSS-Purgelock-Status': 'purgelock', - 'X-HPSS-Reads': 'reads', - 'X-HPSS-Realm-ID': 'realm', - 'X-HPSS-Subsys-ID': 'subsys', - 'X-HPSS-Writes': 'writes', } - result = {} - for header in header_to_xattr: - xattr_to_get = 'system.hpss.%s' % header_to_xattr[header] - try: - result[header] = xattr.getxattr(self._data_file, - xattr_to_get) - except IOError as err: - error_message = "Couldn't get HPSS xattr %s from file %s" \ - % (xattr_to_get, self._data_file) - raise SwiftOnFileSystemIOError(err.errno, error_message) - return result - def __enter__(self): """ Context enter. .. note:: - An implemenation shall raise `DiskFileNotOpen` when has not + An implementation shall raise `DiskFileNotOpen` when has not previously invoked the :func:`swift.obj.diskfile.DiskFile.open` method. """ @@ -913,15 +852,20 @@ class DiskFile(object): # validation and resulting timeouts # This means we do a few things DiskFile.open() does. try: - self._is_dir = os.path.isdir(self._data_file) + if not self._stat: + self._stat = do_stat(self._data_file) + self._is_dir = stat.S_ISDIR(self._stat.st_mode) self._metadata = read_metadata(self._data_file) - except IOError: + except SwiftOnFileSystemOSError: raise DiskFileNotExist if not self._validate_object_metadata(): self._create_object_metadata(self._data_file) self._filter_metadata() return self._metadata + def read_hpss_system_metadata(self): + return hpss_utils.read_hpss_system_metadata(self._data_file) + def reader(self, iter_hook=None, keep_cache=False): """ Return a :class:`swift.common.swob.Response` class compatible @@ -939,7 +883,7 @@ class DiskFile(object): if self._metadata is None: raise DiskFileNotOpen() dr = DiskFileReader( - self._fd, self._threadpool, self._mgr.disk_chunk_size, + self._fd, self._mgr.disk_chunk_size, self._obj_size, self._mgr.keep_cache_size, iter_hook=iter_hook, keep_cache=keep_cache) # At this point the reader object is now responsible for closing @@ -1004,7 +948,7 @@ class DiskFile(object): return True, newmd @contextmanager - def create(self, size, cos): + def create(self, hpss_hints): """ Context manager to create a file. We create a temporary file first, and then return a DiskFileWriter object to encapsulate the state. @@ -1021,67 +965,68 @@ class DiskFile(object): preallocations even if the parameter is specified. But if it does and it fails, it must raise a `DiskFileNoSpace` exception. - :param cos: - :param size: optional initial size of file to explicitly allocate on - disk + :param hpss_hints: dict containing HPSS class of service hints :raises DiskFileNoSpace: if a size is specified and allocation fails :raises AlreadyExistsAsFile: if path or part of a path is not a \ directory """ # Create /account/container directory structure on mount point root try: - os.makedirs(self._container_path) + self._logger.debug("DiskFile: Creating directories for %s" % + self._container_path) + hpss_utils.makedirs(self._container_path) except OSError as err: if err.errno != errno.EEXIST: raise + # Create directories to object name, if they don't exist. + self._logger.debug("DiskFile: creating directories in obj name %s" % + self._obj) + object_dirs = os.path.split(self._obj)[0] + self._logger.debug("object_dirs: %s" % repr(object_dirs)) + self._logger.debug("data_dir: %s" % self._put_datadir) + hpss_utils.makedirs(os.path.join(self._put_datadir, object_dirs)) + data_file = os.path.join(self._put_datadir, self._obj) - # Assume the full directory path exists to the file already, and - # construct the proper name for the temporary file. + # Create the temporary file attempts = 1 while True: - # To know more about why following temp file naming convention is - # used, please read this GlusterFS doc: - # https://github.com/gluster/glusterfs/blob/master/doc/features/dht.md#rename-optimizations # noqa - tmpfile = '.' + self._obj + '.' + uuid4().hex + tmpfile = '.%s.%s.temporary' % (self._obj, + random.randint(0, 65536)) tmppath = os.path.join(self._put_datadir, tmpfile) + self._logger.debug("DiskFile: Creating temporary file %s" % + tmppath) + try: + # TODO: figure out how to create hints struct + self._logger.debug("DiskFile: creating file") fd = do_open(tmppath, - os.O_WRONLY | os.O_CREAT | os.O_EXCL | O_CLOEXEC) - - if size: - try: - hpssfs.ioctl(fd, hpssfs.HPSSFS_SET_FSIZE_HINT, - long(size)) - except IOError as err: - message = '%s, hpssfs.ioctl("%s", SET_FSIZE)' - raise SwiftOnFileSystemIOError( - err.errno, - message % (err.strerror, fd)) - - if cos: - try: - hpssfs.ioctl(fd, hpssfs.HPSSFS_SET_COS_HINT, int(cos)) - except IOError as err: - message = '%s, hpssfs.ioctl("%s", SET_COS)' - raise SwiftOnFileSystemIOError( - err.errno, - message % (err.strerror, fd)) + hpss.O_WRONLY | hpss.O_CREAT | hpss.O_EXCL) + self._logger.debug("DiskFile: setting COS") + if hpss_hints['cos']: + hpss_utils.set_cos(tmppath, int(hpss_hints['cos'])) except SwiftOnFileSystemOSError as gerr: if gerr.errno in (errno.ENOSPC, errno.EDQUOT): # Raise DiskFileNoSpace to be handled by upper layers when # there is no space on disk OR when quota is exceeded + self._logger.error("DiskFile: no space") + raise DiskFileNoSpace() + if gerr.errno == errno.EACCES: + self._logger.error("DiskFile: permission denied") raise DiskFileNoSpace() if gerr.errno == errno.ENOTDIR: + self._logger.error("DiskFile: not a directory") raise AlreadyExistsAsFile('do_open(): failed on %s,' ' path or part of a' ' path is not a directory' - % (tmppath)) + % data_file) if gerr.errno not in (errno.ENOENT, errno.EEXIST, errno.EIO): # FIXME: Other cases we should handle? + self._logger.error("DiskFile: unknown error %s" + % gerr.errno) raise if attempts >= MAX_OPEN_ATTEMPTS: # We failed after N attempts to create the temporary @@ -1093,50 +1038,18 @@ class DiskFile(object): attempts, MAX_OPEN_ATTEMPTS, data_file)) if gerr.errno == errno.EEXIST: + self._logger.debug("DiskFile: file exists already") # Retry with a different random number. attempts += 1 - elif gerr.errno == errno.EIO: - # FIXME: Possible FUSE issue or race condition, let's - # sleep on it and retry the operation. - _random_sleep() - logging.warn("DiskFile.mkstemp(): %s ... retrying in" - " 0.1 secs", gerr) - attempts += 1 - elif not self._obj_path: - # No directory hierarchy and the create failed telling us - # the container or volume directory does not exist. This - # could be a FUSE issue or some race condition, so let's - # sleep a bit and retry. - _random_sleep() - logging.warn("DiskFile.mkstemp(): %s ... retrying in" - " 0.1 secs", gerr) - attempts += 1 - elif attempts > 1: - # Got ENOENT after previously making the path. This could - # also be a FUSE issue or some race condition, nap and - # retry. - _random_sleep() - logging.warn("DiskFile.mkstemp(): %s ... retrying in" - " 0.1 secs" % gerr) - attempts += 1 - else: - # It looks like the path to the object does not already - # exist; don't count this as an attempt, though, since - # we perform the open() system call optimistically. - self._create_dir_object(self._obj_path) else: break dw = None + + self._logger.debug("DiskFile: created file") + try: - if size is not None and size > 0: - try: - fallocate(fd, size) - except OSError as err: - if err.errno in (errno.ENOSPC, errno.EDQUOT): - raise DiskFileNoSpace() - raise # Ensure it is properly owned before we make it available. - do_fchown(fd, self._uid, self._gid) + do_chown(tmppath, self._uid, self._gid) dw = DiskFileWriter(fd, tmppath, self) yield dw finally: @@ -1157,8 +1070,7 @@ class DiskFile(object): """ metadata = self._keep_sys_metadata(metadata) data_file = os.path.join(self._put_datadir, self._obj) - self._threadpool.run_in_thread( - write_metadata, data_file, metadata) + write_metadata(data_file, metadata) def _keep_sys_metadata(self, metadata): """ @@ -1221,6 +1133,12 @@ class DiskFile(object): else: dirname = os.path.dirname(dirname) + def set_cos(self, cos_id): + hpss_utils.set_cos(self._data_file, cos_id) + + def set_purge_lock(self, purgelock): + hpss_utils.set_purge_lock(self._data_file, purgelock) + def delete(self, timestamp): """ Delete the object. @@ -1247,7 +1165,7 @@ class DiskFile(object): if metadata[X_TIMESTAMP] >= timestamp: return - self._threadpool.run_in_thread(self._unlinkold) + self._unlinkold() self._metadata = None self._data_file = None diff --git a/swiftonhpss/swift/obj/server.py b/swiftonhpss/swift/obj/server.py index f814db8..ca1115e 100644 --- a/swiftonhpss/swift/obj/server.py +++ b/swiftonhpss/swift/obj/server.py @@ -17,12 +17,7 @@ import math import logging -import xattr import os -try: - import hpssfs -except ImportError: - import swiftonhpss.swift.common.hpssfs_ioctl as hpssfs import time from hashlib import md5 @@ -47,7 +42,7 @@ from swift.common.ring import Ring from swiftonhpss.swift.obj.diskfile import DiskFileManager from swiftonhpss.swift.common.constraints import check_object_creation -from swiftonhpss.swift.common import utils +from swiftonhpss.swift.common import utils, hpss_utils class SwiftOnFileDiskFileRouter(object): @@ -82,13 +77,30 @@ class ObjectController(server.ObjectController): self._diskfile_router = SwiftOnFileDiskFileRouter(conf, self.logger) self.swift_dir = conf.get('swift_dir', '/etc/swift') self.container_ring = None - # This conf option will be deprecated and eventualy removed in + # This conf option will be deprecated and eventually removed in # future releases utils.read_pickled_metadata = \ config_true_value(conf.get('read_pickled_metadata', 'no')) + + # HPSS-specific conf options self.allow_purgelock = \ config_true_value(conf.get('allow_purgelock', True)) - self.default_cos_id = conf.get('default_cos_id') + self.default_cos_id = conf.get('default_cos_id', '1') + self.hpss_principal = conf.get('hpss_user', 'swift') + self.hpss_auth_mech = conf.get('hpss_auth_mechanism', 'unix') + self.hpss_auth_cred_type = conf.get('hpss_auth_credential_type', + 'keytab') + self.hpss_auth_creds = conf.get('hpss_auth_credential', + '/var/hpss/etc/hpss.unix.keytab') + self.hpss_uid = conf.get('hpss_uid', '300') + self.hpss_gid = conf.get('hpss_gid', '300') + + self.logger.debug("Creating HPSS session") + + hpss_utils.create_hpss_session(self.hpss_principal, + self.hpss_auth_mech, + self.hpss_auth_cred_type, + self.hpss_auth_creds) def get_container_ring(self): """Get the container ring. Load it, if it hasn't been yet.""" @@ -100,6 +112,7 @@ class ObjectController(server.ObjectController): @timing_stats() def PUT(self, request): """Handle HTTP PUT requests for the Swift on File object server""" + try: device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) @@ -128,10 +141,15 @@ class ObjectController(server.ObjectController): request=request, content_type='text/plain') + self.logger.debug("DiskFile @ %s/%s/%s/%s" % + (device, account, container, obj)) + # Try to get DiskFile try: disk_file = self.get_diskfile(device, partition, account, - container, obj, policy=policy) + container, obj, policy=policy, + uid=int(self.hpss_uid), + gid=int(self.hpss_gid)) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) @@ -158,22 +176,23 @@ class ObjectController(server.ObjectController): orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0) upload_expiration = time.time() + self.max_upload_time + self.logger.debug("Receiving and writing object") + etag = md5() elapsed_time = 0 - # (HPSS) Check for HPSS-specific metadata headers - cos = request.headers.get('X-Hpss-Class-Of-Service-Id', - self.default_cos_id) + hints = {'cos': request.headers.get('X-Hpss-Class-Of-Service-Id', + self.default_cos_id), + 'purgelock': + request.headers.get('X-Hpss-Purgelock-Status', False)} - if self.allow_purgelock: - purgelock = config_true_value( - request.headers.get('X-Hpss-Purgelock-Status', 'false')) - else: - purgelock = False + if request.headers['content-type'] == 'application/directory': + # TODO: handle directories different + pass try: # Feed DiskFile our HPSS-specific stuff - with disk_file.create(size=fsize, cos=cos) as writer: + with disk_file.create(hpss_hints=hints) as writer: upload_size = 0 def timeout_reader(): @@ -203,6 +222,8 @@ class ObjectController(server.ObjectController): and request.headers['etag'].lower() != etag: return HTTPUnprocessableEntity(request=request) + self.logger.debug("Writing object metadata") + # Update object metadata content_type = request.headers['content-type'] metadata = {'X-Timestamp': request.timestamp.internal, @@ -222,43 +243,18 @@ class ObjectController(server.ObjectController): header_caps = header_key.title() metadata[header_caps] = request.headers[header_key] + self.logger.debug("Finalizing object") + # (HPSS) Write the file, with added options - writer.put(metadata, purgelock=purgelock) + writer.put(metadata) except DiskFileNoSpace: return HTTPInsufficientStorage(drive=device, request=request) except SwiftOnFileSystemIOError as e: + self.logger.error(e) return HTTPServiceUnavailable(request=request) - # FIXME: this stuff really should be handled in DiskFile somehow? - # we set the hpss checksum in here, so both systems have valid - # and current checksum metadata - - # (HPSS) Set checksum on file ourselves, if hpssfs won't do it - # for us. - data_file = disk_file._data_file - try: - xattr.setxattr(data_file, 'system.hpss.hash', - "md5:%s" % etag) - except IOError: - logging.debug("Could not write ETag to system.hpss.hash," - " trying user.hash.checksum") - try: - xattr.setxattr(data_file, - 'user.hash.checksum', etag) - xattr.setxattr(data_file, - 'user.hash.algorithm', 'md5') - xattr.setxattr(data_file, - 'user.hash.state', 'Valid') - xattr.setxattr(data_file, - 'user.hash.filesize', str(upload_size)) - xattr.setxattr(data_file, - 'user.hash.app', 'swiftonhpss') - except IOError as err: - raise SwiftOnFileSystemIOError( - err.errno, - 'Could not write MD5 checksum to HPSS filesystem: ' - '%s' % err.strerror) + self.logger.debug("Writing container metadata") # Update container metadata if orig_delete_at != new_delete_at: @@ -277,13 +273,15 @@ class ObjectController(server.ObjectController): self.container_update('PUT', account, container, obj, request, HeaderKeyDict(container_headers), device, policy) + + self.logger.debug("Done!") # Create convenience symlink try: - self._object_symlink(request, disk_file._data_file, device, - account) + self._project_symlink(request, disk_file, account) except SwiftOnFileSystemOSError: logging.debug('could not make account symlink') return HTTPServiceUnavailable(request=request) + return HTTPCreated(request=request, etag=etag) except (AlreadyExistsAsFile, AlreadyExistsAsDir): @@ -323,26 +321,35 @@ class ObjectController(server.ObjectController): 'x-etag': resp.headers['ETag']}), device, policy_idx) - def _object_symlink(self, request, diskfile, device, account): - mount = diskfile.split(device)[0] - dev = "%s%s" % (mount, device) + # FIXME: this should be in diskfile? + def _project_symlink(self, request, diskfile, account): project = None + + diskfile_path = diskfile._data_file + + hpss_root = diskfile_path.split(account)[0] + if 'X-Project-Name' in request.headers: project = request.headers.get('X-Project-Name') elif 'X-Tenant-Name' in request.headers: project = request.headers.get('X-Tenant-Name') + if project: - if project is not account: - accdir = "%s/%s" % (dev, account) - projdir = "%s%s" % (mount, project) - if not os.path.exists(projdir): + if project != account: + symlink_location = os.path.join(hpss_root, project) + account_location = os.path.join(hpss_root, account) + self.logger.debug('symlink_location: %s' % symlink_location) + self.logger.debug('account_location: %s' % account_location) + if not hpss_utils.path_exists(symlink_location): try: - os.symlink(accdir, projdir) - except OSError as err: + hpss_utils.relative_symlink(hpss_root, account, + project) + except IOError as err: + self.logger.debug('symlink failed, errno %s' + % err.errno) raise SwiftOnFileSystemOSError( err.errno, - ('%s, os.symlink("%s", ...)' % - err.strerror, account)) + ('os.symlink("%s", ...)' % account)) @public @timing_stats() @@ -354,7 +361,9 @@ class ObjectController(server.ObjectController): # Get DiskFile try: disk_file = self.get_diskfile(device, partition, account, - container, obj, policy=policy) + container, obj, policy=policy, + uid=int(self.hpss_uid), + gid=int(self.hpss_gid)) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) @@ -365,7 +374,7 @@ class ObjectController(server.ObjectController): try: with disk_file.open(): metadata = disk_file.get_metadata() - want_hpss_metadata = request.headers.get('X-HPSS-Get-Metadata', + want_hpss_metadata = request.headers.get('X-Hpss-Get-Metadata', False) if config_true_value(want_hpss_metadata): try: @@ -422,7 +431,9 @@ class ObjectController(server.ObjectController): # Get Diskfile try: disk_file = self.get_diskfile(device, partition, account, - container, obj, policy) + container, obj, policy, + uid=int(self.hpss_uid), + gid=int(self.hpss_gid)) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) @@ -493,36 +504,22 @@ class ObjectController(server.ObjectController): # Get DiskFile try: disk_file = self.get_diskfile(device, partition, account, - container, obj, policy) + container, obj, policy, + uid=int(self.hpss_uid), + gid=int(self.hpss_gid)) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) - # Set Purgelock status if we got it - if self.allow_purgelock: - purgelock = request.headers.get('X-HPSS-Purgelock-Status') - else: - purgelock = False - - if purgelock: - try: - hpssfs.ioctl(disk_file._fd, hpssfs.HPSSFS_PURGE_LOCK, - int(purgelock)) - except IOError as err: - raise SwiftOnFileSystemIOError( - err.errno, - '%s, xattr.getxattr("%s", ...)' % - (err.strerror, disk_file._fd)) - # Set class of service if we got it - cos = request.headers.get('X-HPSS-Class-Of-Service-ID') - if cos: - try: - xattr.setxattr(disk_file._fd, 'system.hpss.cos', int(cos)) - except IOError as err: - raise SwiftOnFileSystemIOError( - err.errno, - '%s, xattr.setxattr("%s", ...)' % - (err.strerror, disk_file._fd)) + new_cos = request.headers.get('X-HPSS-Class-Of-Service-Id', None) + if new_cos: + disk_file.set_cos(int(new_cos)) + + # Set purge lock status if we got it + if self.allow_purgelock: + purge_lock = request.headers.get('X-HPSS-Purgelock-Status', None) + if purge_lock is not None: + disk_file.set_purge_lock(purge_lock) # Update metadata from request try: @@ -561,7 +558,9 @@ class ObjectController(server.ObjectController): req_timestamp = valid_timestamp(request) try: disk_file = self.get_diskfile(device, partition, account, - container, obj, policy) + container, obj, policy, + uid=int(self.hpss_uid), + gid=int(self.hpss_gid)) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) diff --git a/test/functional/swift_on_file_tests.py b/test/functional/swift_on_file_tests.py index 92c52e8..60f3040 100644 --- a/test/functional/swift_on_file_tests.py +++ b/test/functional/swift_on_file_tests.py @@ -25,10 +25,8 @@ from test.functional.swift_test_client import Account, Connection, \ ResponseError import test.functional as tf -# PGB - changed 'AUTH_' hardcoded reseller prefix to 'KEY_'. # TODO: read Swift proxy config for this - class TestSwiftOnFileEnv: @classmethod def setUp(cls): @@ -37,14 +35,13 @@ class TestSwiftOnFileEnv: cls.account = Account(cls.conn, tf.config.get('account', tf.config['username'])) - # PGB - change hardcoded SoF mountpoint #cls.root_dir = os.path.join('/mnt/swiftonhpss/test') - cls.root_dir = os.path.join('/srv/swift/hpss') + cls.root_dir = os.path.join('/srv/hpss') cls.account.delete_containers() cls.file_size = 8 cls.container = cls.account.container(Utils.create_name()) - if not cls.container.create(None, None): + if not cls.container.create(hdrs={'X-Storage-Policy': 'swiftonhpss'}): raise ResponseError(cls.conn.response) cls.dirs = [ @@ -134,6 +131,15 @@ class TestSwiftOnFile(Base): self.assert_status(201) file_info = file_item.info() + print self.env.root_dir + print self.env.account.name + print self.env.container.name + print file_name + + print os.listdir(self.env.root_dir) + print os.listdir(os.path.join(self.env.root_dir, + self.env.account.name)) + with open(os.path.join(self.env.root_dir, self.env.account.name, self.env.container.name, diff --git a/test/functional/swift_on_hpss_tests.py b/test/functional/swift_on_hpss_tests.py index 05a8238..856e7a1 100644 --- a/test/functional/swift_on_hpss_tests.py +++ b/test/functional/swift_on_hpss_tests.py @@ -34,8 +34,8 @@ class TestSwiftOnHPSS(unittest.TestCase): tf.config.get('account', tf.config['username'])) cls.container = cls.account.container('swiftonhpss_test') - cls.container.create(hdrs={'X-Storage-Policy': 'hpss'}) - cls.hpss_dir = '/srv/swift/hpss' + cls.container.create(hdrs={'X-Storage-Policy': 'swiftonhpss'}) + cls.hpss_dir = '/srv/hpss' @classmethod def tearDownClass(cls): @@ -45,48 +45,65 @@ class TestSwiftOnHPSS(unittest.TestCase): self.test_file = self.container.file('testfile') def tearDown(self): - self.test_file.delete() + try: + self.test_file.delete() + except ResponseError as e: + if e.message != "Not Found": + raise def test_purge_lock(self): - self.test_file.write(data='test', - hdrs={'X-Hpss-Purgelock-Status': 'true', - 'X-Hpss-Class-Of-Service-Id': '3'}) + resp = self.test_file.write(data='test', + hdrs={'X-Hpss-Purgelock-Status': 'true', + 'X-Hpss-Class-Of-Service-Id': '1'}, + return_resp=True) + + print resp.status + print resp.getheaders() + print resp.read() test_file_name = os.path.join(self.hpss_dir, self.account.name, self.container.name, 'testfile') - xattrs = dict(xattr.xattr(test_file_name)) - self.assertEqual(xattrs['system.hpss.purgelock'], '1') + print test_file_name + + print os.stat(test_file_name) + + print xattr.listxattr(test_file_name) + + self.assertEqual(xattr.get(test_file_name, + 'system.hpss.purgelock'), + '1') self.test_file.post(hdrs={'X-Hpss-Purgelock-Status': 'false'}) - xattrs = dict(xattr.xattr(test_file_name)) - self.assertEqual(xattrs['system.hpss.purgelock'], '0') + + self.assertEqual(xattr.get(test_file_name, + 'system.hpss.purgelock'), '0') def test_change_cos(self): self.test_file.write(data='asdfasdf', - hdrs={'X-Hpss-Class-Of-Service-Id': '3'}) + hdrs={'X-Hpss-Class-Of-Service-Id': '2'}) test_file_name = os.path.join(self.hpss_dir, self.account.name, self.container.name, 'testfile') - time.sleep(30) # It takes a long time for HPSS to get around to it. - xattrs = dict(xattr.xattr(test_file_name)) - self.assertEqual(xattrs['system.hpss.cos'], '3') + print test_file_name - self.test_file.post(hdrs={'X-HPSS-Class-Of-Service-ID': '1'}) - time.sleep(30) - xattrs = dict(xattr.xattr(test_file_name)) - self.assertEqual(xattrs['system.hpss.cos'], '1') + time.sleep(10) # It takes a long time for HPSS to get around to it. + self.assertEqual(xattr.get(test_file_name, 'system.hpss.cos'), '2') + + self.test_file.post(hdrs={'X-Hpss-Class-Of-Service-Id': '1'}) + time.sleep(10) + self.assertEqual(xattr.get(test_file_name, 'system.hpss.cos'), '1') def test_hpss_metadata(self): # header is X-HPSS-Get-Metadata self.test_file.write(data='test') self.connection.make_request('HEAD', self.test_file.path, - hdrs={'X-HPSS-Get-Metadata': 'true'}) + hdrs={'X-Hpss-Get-Metadata': 'true'}) md = {t[0]: t[1] for t in self.connection.response.getheaders()} print md self.assertTrue('x-hpss-account' in md) @@ -101,4 +118,4 @@ class TestSwiftOnHPSS(unittest.TestCase): self.assertTrue('x-hpss-reads' in md) self.assertTrue('x-hpss-realm-id' in md) self.assertTrue('x-hpss-subsys-id' in md) - self.assertTrue('x-hpss-writes' in md) \ No newline at end of file + self.assertTrue('x-hpss-writes' in md) diff --git a/tox.ini b/tox.ini index c4e4b71..13c6456 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,5 @@ [tox] -#envlist = py27,pep8,functest -envlist = py27,pep8 +envlist = py27,pep8,functest minversion = 1.6 skipsdist = True @@ -20,7 +19,9 @@ deps = PyECLib==1.0.7 -r{toxinidir}/test-requirements.txt changedir = {toxinidir}/test/unit -commands = nosetests -v {posargs} + +# TODO: un-disable this when we have our own test setup ready +#commands = nosetests -v {posargs} passenv = SWIFT_* *_proxy [testenv:cover]