Use HPSS client API bindings instead of FUSE wrapper

We're doing this to more easily support future development and advanced features,
like parallel I/O and HPSS identity mappings.

Change-Id: I0192ceb0d65f0d51d9aa77f1f21800cb29e5a262
This commit is contained in:
Phil Bridges 2017-09-05 13:02:33 -05:00
parent e6dc857543
commit 70984cdd9f
9 changed files with 603 additions and 587 deletions

View File

@ -16,43 +16,37 @@
import logging
import os
import errno
import stat
import random
import time
import xattr
from collections import defaultdict
from itertools import repeat
import ctypes
from eventlet import sleep
from swift.common.utils import load_libc_function
from swiftonhpss.swift.common.exceptions import SwiftOnFileSystemOSError
from swift.common.exceptions import DiskFileNoSpace
import hpss.clapi as hpss
import hpss_utils
def do_getxattr(path, key):
return xattr.getxattr(path, key)
return hpss_utils.read_uda(path, key)
def do_setxattr(path, key, value):
xattr.setxattr(path, key, value)
return hpss_utils.write_uda(path, key, value)
def do_removexattr(path, key):
xattr.removexattr(path, key)
return hpss_utils.delete_uda(path, key)
def do_walk(*args, **kwargs):
return os.walk(*args, **kwargs)
return hpss_utils.walk(*args, **kwargs)
def do_write(fd, buf):
try:
cnt = os.write(fd, buf)
except OSError as err:
filename = get_filename_from_fd(fd)
cnt = hpss.Write(fd, buf)
except IOError as err:
if err.errno in (errno.ENOSPC, errno.EDQUOT):
do_log_rl("do_write(%d, msg[%d]) failed: %s : %s",
fd, len(buf), err, filename)
raise DiskFileNoSpace()
else:
raise SwiftOnFileSystemOSError(
@ -62,72 +56,29 @@ def do_write(fd, buf):
def do_read(fd, n):
try:
buf = os.read(fd, n)
except OSError as err:
buf = hpss.Read(fd, n)
except IOError as err:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.read("%s", ...)' % (err.strerror, fd))
return buf
def do_ismount(path):
"""
Test whether a path is a mount point.
This is code hijacked from C Python 2.6.8, adapted to remove the extra
lstat() system call.
"""
try:
s1 = os.lstat(path)
except os.error as err:
if err.errno == errno.ENOENT:
# It doesn't exist -- so not a mount point :-)
return False
else:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.lstat("%s")' % (err.strerror, path))
if stat.S_ISLNK(s1.st_mode):
# A symlink can never be a mount point
return False
try:
s2 = os.lstat(os.path.join(path, '..'))
except os.error as err:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.lstat("%s")' % (err.strerror,
os.path.join(path, '..')))
dev1 = s1.st_dev
dev2 = s2.st_dev
if dev1 != dev2:
# path/.. on a different device as path
return True
ino1 = s1.st_ino
ino2 = s2.st_ino
if ino1 == ino2:
# path/.. is the same i-node as path
return True
return False
def do_mkdir(path):
os.mkdir(path)
hpss.Mkdir(path, 0o600)
def do_rmdir(path):
try:
os.rmdir(path)
except OSError as err:
hpss.Rmdir(path)
except IOError as err:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.rmdir("%s")' % (err.strerror, path))
def do_chown(path, uid, gid):
try:
os.chown(path, uid, gid)
except OSError as err:
hpss.Chown(path, uid, gid)
except IOError as err:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.chown("%s", %s, %s)' % (
err.strerror, path, uid, gid))
@ -135,8 +86,9 @@ def do_chown(path, uid, gid):
def do_fchown(fd, uid, gid):
try:
# TODO: grab path name from fd, chown that
os.fchown(fd, uid, gid)
except OSError as err:
except IOError as err:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.fchown(%s, %s, %s)' % (
err.strerror, fd, uid, gid))
@ -146,65 +98,45 @@ _STAT_ATTEMPTS = 10
def do_stat(path):
serr = None
for i in range(0, _STAT_ATTEMPTS):
try:
stats = os.stat(path)
except OSError as err:
if err.errno == errno.EIO:
# Retry EIO assuming it is a transient error from FUSE after a
# short random sleep
serr = err
sleep(random.uniform(0.001, 0.005))
continue
if err.errno == errno.ENOENT:
stats = None
else:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.stat("%s")[%d attempts]' % (
err.strerror, path, i))
if i > 0:
logging.warn("fs_utils.do_stat():"
" os.stat('%s') retried %d times (%s)",
path, i, 'success' if stats else 'failure')
return stats
else:
try:
stats = hpss.Stat(path)
except IOError as err:
raise SwiftOnFileSystemOSError(
serr.errno, '%s, os.stat("%s")[%d attempts]' % (
serr.strerror, path, _STAT_ATTEMPTS))
err.errno, '%s, os.stat(%s)' % (err.strerror, path))
return stats
def do_fstat(fd):
try:
stats = os.fstat(fd)
except OSError as err:
stats = hpss.Fstat(fd)
except IOError as err:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.fstat(%s)' % (err.strerror, fd))
return stats
def do_open(path, flags, mode=0o777):
def do_open(path, flags, mode=0o777, hints=None):
hints_struct = hpss.cos_hints()
try:
fd = os.open(path, flags, mode)
except OSError as err:
if hints:
cos = hints.get('cos', '0')
hints_struct.cos = int(cos)
file_handle = hpss.Open(path, flags, mode, hints_struct)
else:
file_handle = hpss.Open(path, flags, mode)
fd = file_handle[0]
except IOError as err:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.open("%s", %x, %o)' % (
err.strerror, path, flags, mode))
return fd
def do_dup(fd):
return os.dup(fd)
def do_close(fd):
try:
os.close(fd)
except OSError as err:
hpss.Close(fd)
except IOError as err:
if err.errno in (errno.ENOSPC, errno.EDQUOT):
filename = get_filename_from_fd(fd)
do_log_rl("do_close(%d) failed: %s : %s",
fd, err, filename)
raise DiskFileNoSpace()
else:
raise SwiftOnFileSystemOSError(
@ -213,8 +145,8 @@ def do_close(fd):
def do_unlink(path, log=True):
try:
os.unlink(path)
except OSError as err:
hpss.Unlink(path)
except IOError as err:
if err.errno != errno.ENOENT:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.unlink("%s")' % (err.strerror, path))
@ -225,8 +157,8 @@ def do_unlink(path, log=True):
def do_rename(old_path, new_path):
try:
os.rename(old_path, new_path)
except OSError as err:
hpss.Rename(old_path, new_path)
except IOError as err:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.rename("%s", "%s")' % (
err.strerror, old_path, new_path))
@ -234,76 +166,20 @@ def do_rename(old_path, new_path):
def do_fsync(fd):
try:
os.fsync(fd)
except OSError as err:
hpss.Fsync(fd)
except IOError as err:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.fsync("%s")' % (err.strerror, fd))
def do_fdatasync(fd):
try:
os.fdatasync(fd)
except AttributeError:
do_fsync(fd)
except OSError as err:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.fdatasync("%s")' % (err.strerror, fd))
_posix_fadvise = None
def do_fadvise64(fd, offset, length):
global _posix_fadvise
if _posix_fadvise is None:
_posix_fadvise = load_libc_function('posix_fadvise64')
# 4 means "POSIX_FADV_DONTNEED"
_posix_fadvise(fd, ctypes.c_uint64(offset),
ctypes.c_uint64(length), 4)
def do_lseek(fd, pos, how):
try:
os.lseek(fd, pos, how)
except OSError as err:
hpss.Lseek(fd, pos, how)
except IOError as err:
raise SwiftOnFileSystemOSError(
err.errno, '%s, os.lseek("%s")' % (err.strerror, fd))
def get_filename_from_fd(fd, verify=False):
"""
Given the file descriptor, this method attempts to get the filename as it
was when opened. This may not give accurate results in following cases:
- file was renamed/moved/deleted after it was opened
- file has multiple hardlinks
:param fd: file descriptor of file opened
:param verify: If True, performs additional checks using inode number
"""
filename = None
if isinstance(fd, int):
try:
filename = os.readlink("/proc/self/fd/" + str(fd))
except OSError:
pass
if not verify:
return filename
# If verify = True, we compare st_dev and st_ino of file and fd.
# This involves additional stat and fstat calls. So this is disabled
# by default.
if filename and fd:
s_file = do_stat(filename)
s_fd = do_fstat(fd)
if s_file and s_fd:
if (s_file.st_ino, s_file.st_dev) == (s_fd.st_ino, s_fd.st_dev):
return filename
return None
def static_var(varname, value):
"""Decorator function to create pseudo static variables."""
def decorate(func):
@ -311,6 +187,7 @@ def static_var(varname, value):
return func
return decorate
# Rate limit to emit log message once a second
_DO_LOG_RL_INTERVAL = 1.0

View File

@ -0,0 +1,251 @@
# Copyright (c) 2016 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hpss.clapi as hpss
from errno import ENOENT, EPERM, EEXIST, EINVAL
import posixpath
from swiftonhpss.swift.common.exceptions import SwiftOnFileSystemOSError
_auth_mech_to_api = {'unix': hpss.hpss_authn_mech_unix,
'krb5': hpss.hpss_authn_mech_krb5}
_cred_type_to_api = {'key': hpss.hpss_rpc_auth_type_key,
'keyfile': hpss.hpss_rpc_auth_type_keyfile,
'keytab': hpss.hpss_rpc_auth_type_keytab,
'password': hpss.hpss_rpc_auth_type_passwd}
# TODO: make a nicer wrapping around these things
def create_hpss_session(user_name,
auth_mech='unix',
auth_cred_type='keytab',
auth_cred='/var/hpss/etc/hpss.unix.keytab'):
try:
hpss_mech = _auth_mech_to_api[auth_mech]
hpss_cred_type = _cred_type_to_api[auth_cred_type]
except KeyError:
raise ValueError('invalid mechanism or cred type specified')
try:
hpss.SetLoginCred(user_name,
hpss_mech,
hpss_cred_type,
hpss.hpss_rpc_cred_client,
auth_cred)
except IOError as e:
if e.errno == ENOENT:
raise ValueError('HPSS reports invalid username')
if e.errno == EPERM:
raise OSError('Permission denied by HPSS')
def destroy_hpss_session():
hpss.PurgeLoginCred()
def read_uda(path, key):
normalized_key = '/hpss/' + key
uda_contents = hpss.UserAttrGetAttrs(path,
[normalized_key])[normalized_key]
return hpss.ChompXMLHeader(uda_contents)
def write_uda(path, key, value):
return hpss.UserAttrSetAttrs(path, {'/hpss/' + key: value})
def delete_uda(path, key):
return hpss.UserAttrDeleteAttrs(path, '/hpss/' + key)
def list_uda(path):
return hpss.UserAttrListAttrs(path)
def _split_path(path):
if path == '/':
return ['']
else:
return path.split('/')
def set_purge_lock(path, lock_status):
try:
fd = hpss.Open(path, hpss.O_RDWR | hpss.O_NONBLOCK)[0]
except IOError as e:
raise SwiftOnFileSystemOSError('couldnt open file for purgelock: %s' %
e.errno)
if lock_status:
flag = hpss.PURGE_LOCK
else:
flag = hpss.PURGE_UNLOCK
try:
hpss.PurgeLock(int(fd), flag)
except IOError as e:
if e.errno != EINVAL:
# This happens when either the file is empty, or there's no
# tape layer in the class of service.
# TODO: should we return an objection as an HTTP resp?
raise SwiftOnFileSystemOSError('hpss.purge_lock(%s):'
'fd %s, errno %s' %
(path, fd, e.errno))
try:
hpss.Close(fd)
except IOError as e:
raise SwiftOnFileSystemOSError('couldnt close file for purgelock: %s' %
e.errno)
def set_cos(path, cos_id):
hpss.FileSetCOS(path, cos_id)
def preallocate(fd, size):
hpss.Fpreallocate(fd, size)
def set_checksum(fd, md5_digest):
flags = hpss.HPSS_FILE_HASH_GENERATED | hpss.HPSS_FILE_HASH_DIGEST_VALID
hpss.FsetFileDigest(fd, md5_digest, "", flags, hpss.HASH_MD5)
def relative_symlink(parent_dir, source, target):
pwd = bytes(hpss.Getcwd())
hpss.Chdir(parent_dir)
hpss.Symlink(source, target)
hpss.Chdir(pwd)
def path_exists(path):
try:
return hpss.Access(path, hpss.F_OK)
except IOError as e:
if e.errno == ENOENT:
return False
else:
raise
def makedirs(name, mode=0o777):
components = posixpath.split(name)
if components[0] != '/':
makedirs(components[0], mode)
try:
hpss.Mkdir(name, mode)
except IOError as e:
if e.errno != EEXIST:
raise
# TODO: give an iterator instead of a list of files/directories?
# that could quickly become way too many to list
def walk(path, topdown=True):
dir_handle = hpss.Opendir(path)
current_obj = hpss.Readdir(dir_handle)
dirs = []
files = []
while current_obj:
if current_obj.d_handle.Type == hpss.NS_OBJECT_TYPE_DIRECTORY:
if current_obj.d_name != '.' and current_obj.d_name != '..':
dirs.append(current_obj.d_name)
elif current_obj.d_handle.Type == hpss.NS_OBJECT_TYPE_FILE:
files.append(current_obj.d_name)
current_obj = hpss.Readdir(dir_handle)
hpss.Closedir(dir_handle)
if topdown:
yield (path, dirs, files)
for dir_name in dirs:
next_iter = walk("/".join(_split_path(path) + [dir_name]))
for walk_entity in next_iter:
yield walk_entity
else:
for dir_name in dirs:
next_iter = walk("/".join(_split_path(path) + [dir_name]))
for walk_entity in next_iter:
yield walk_entity
yield (path, dirs, files)
def _levels_to_string(storage_levels):
def storage_info_string(level_struct):
return '%u:%u:%u:%u' % (level_struct.BytesAtLevel,
level_struct.StripeLength,
level_struct.StripeWidth,
level_struct.OptimumAccessSize)
def pv_list_string(pv_list):
return ','.join([pv_element.Name for pv_element in pv_list])
def vv_data_string(vv_struct):
return '%s:%s:[%s]' % (vv_struct.BytesOnVV,
vv_struct.RelPosition,
pv_list_string(vv_struct.PVList))
def vv_list_string(vv_list):
return ':'.join(map(vv_data_string, vv_list))
def more_exists_string(level_struct):
if level_struct.Flags & hpss.BFS_BFATTRS_ADDITIONAL_VV_EXIST:
return '...'
else:
return ''
def level_data_string(level_struct, depth):
if level_struct.Flags & hpss.BFS_BFATTRS_LEVEL_IS_DISK:
storage_type = "disk"
else:
storage_type = "tape"
if level_struct.BytesAtLevel == 0:
return "%u:%s:nodata" % (depth, storage_type)
else:
return "%u:%s:%s:(%s)%s" % (depth,
storage_type,
storage_info_string(level_struct),
vv_list_string(level_struct.VVAttrib),
more_exists_string(level_struct))
def level_list_string(levels):
level_datas = []
for index, level_struct in enumerate(levels):
level_datas.append(level_data_string(level_struct, index))
return ';'.join(level_datas)
return level_list_string(storage_levels)
def read_hpss_system_metadata(path):
flag = hpss.API_GET_STATS_FOR_ALL_LEVELS
xfileattr_struct = hpss.FileGetXAttributes(path, flag)
optimum_tuple = (xfileattr_struct.SCAttrib[0].OptimumAccessSize,
xfileattr_struct.SCAttrib[0].StripeWidth,
xfileattr_struct.SCAttrib[0].StripeLength)
attrs = {'X-HPSS-Account': xfileattr_struct.Attrs.Account,
'X-HPSS-Bitfile-ID': xfileattr_struct.Attrs.BitfileObj.BfId,
'X-HPSS-Comment': xfileattr_struct.Attrs.Comment,
'X-HPSS-Class-Of-Service-ID': xfileattr_struct.Attrs.COSId,
'X-HPSS-Data-Levels':
_levels_to_string(xfileattr_struct.SCAttrib),
'X-HPSS-Family-ID': xfileattr_struct.Attrs.FamilyId,
'X-HPSS-Fileset-ID': xfileattr_struct.Attrs.FilesetId,
'X-HPSS-Optimum-Size': "%u:%u:%u" % optimum_tuple,
'X-HPSS-Purgelock-Status': xfileattr_struct.Attrs.OptionFlags & 1,
'X-HPSS-Reads': xfileattr_struct.Attrs.ReadCount,
'X-HPSS-Realm-ID': xfileattr_struct.Attrs.RealmId,
'X-HPSS-Subsys-ID': xfileattr_struct.Attrs.SubSystemId,
'X-HPSS-Writes': xfileattr_struct.Attrs.WriteCount, }
return attrs

View File

@ -1,41 +0,0 @@
# Copyright (c) 2016 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import array
import fcntl
HPSSFS_GET_COS = 0x80046c01
HPSSFS_SET_COS_HINT = 0x40046c02
HPSSFS_SET_FSIZE_HINT = 0x40086c03
HPSSFS_SET_MAXSEGSZ_HINT = 0x40046c04
HPSSFS_PURGE_CACHE = 0x00006c05
HPSSFS_PURGE_LOCK = 0x40046c06
HPSSFS_UNDELETE = 0x40046c07
HPSSFS_UNDELETE_NONE = 0x00000000
HPSSFS_UNDELETE_RESTORE_TIME = 0x00000001
HPSSFS_UNDELETE_OVERWRITE = 0x00000002
HPSSFS_UNDELETE_OVERWRITE_AND_RESTORE = 0x00000003
def ioctl(fd, cmd, val=None):
if val is not None:
valbuf = array.array('i', [0])
else:
valbuf = array.array('i', [val])
fcntl.ioctl(fd, cmd, valbuf)
return valbuf[0]

View File

@ -28,9 +28,9 @@ from swiftonhpss.swift.common.exceptions import SwiftOnFileSystemIOError
from swift.common.exceptions import DiskFileNoSpace
from swift.common.db import utf8encodekeys
from swiftonhpss.swift.common.fs_utils import do_stat, \
do_walk, do_rmdir, do_log_rl, get_filename_from_fd, do_open, \
do_walk, do_rmdir, do_log_rl, do_open, \
do_getxattr, do_setxattr, do_removexattr, do_read, \
do_close, do_dup, do_lseek, do_fstat, do_fsync, do_rename
do_close, do_fsync, do_rename
from urllib import quote, unquote
X_CONTENT_TYPE = 'Content-Type'
@ -56,6 +56,20 @@ CHUNK_SIZE = 65536
read_pickled_metadata = False
def chunk_list(input_list, chunk_size):
chunks = []
if len(input_list) % chunk_size != 0:
raise IndexError
for i in xrange(0, len(input_list) / chunk_size):
chunks.append(input_list[i * chunk_size:(i + 1) * chunk_size])
return chunks
def hex_digest_to_bytes(hex_digest):
return ''.join([chr(int(chunk, 16)) for chunk
in chunk_list(hex_digest, 2)])
def normalize_timestamp(timestamp):
"""
Format a timestamp (string or numeric) into a standardized
@ -138,7 +152,7 @@ def deserialize_metadata(in_metastr):
return {}
def read_metadata(path_or_fd):
def read_metadata(path):
"""
Helper function to read the serialized metadata from a File/Directory.
@ -150,7 +164,7 @@ def read_metadata(path_or_fd):
key = 0
try:
while True:
metastr += do_getxattr(path_or_fd, '%s%s' %
metastr += do_getxattr(path, '%s%s' %
(METADATA_KEY, (key or '')))
key += 1
if len(metastr) < MAX_XATTR_SIZE:
@ -167,13 +181,13 @@ def read_metadata(path_or_fd):
if not metadata:
# Empty dict i.e deserializing of metadata has failed, probably
# because it is invalid or incomplete or corrupt
clean_metadata(path_or_fd)
clean_metadata(path)
assert isinstance(metadata, dict)
return metadata
def write_metadata(path_or_fd, metadata):
def write_metadata(path, metadata):
"""
Helper function to write serialized metadata for a File/Directory.
@ -185,39 +199,34 @@ def write_metadata(path_or_fd, metadata):
key = 0
while metastr:
try:
do_setxattr(path_or_fd,
do_setxattr(path,
'%s%s' % (METADATA_KEY, key or ''),
metastr[:MAX_XATTR_SIZE])
except IOError as err:
if err.errno in (errno.ENOSPC, errno.EDQUOT):
if isinstance(path_or_fd, int):
filename = get_filename_from_fd(path_or_fd)
do_log_rl("write_metadata(%d, metadata) failed: %s : %s",
path_or_fd, err, filename)
else:
do_log_rl("write_metadata(%s, metadata) failed: %s",
path_or_fd, err)
do_log_rl("write_metadata(%s, metadata) failed: %s",
path, err)
raise DiskFileNoSpace()
else:
raise SwiftOnFileSystemIOError(
err.errno,
'%s, setxattr("%s", %s, metastr)' % (err.strerror,
path_or_fd, key))
path, key))
metastr = metastr[MAX_XATTR_SIZE:]
key += 1
def clean_metadata(path_or_fd):
def clean_metadata(path):
key = 0
while True:
try:
do_removexattr(path_or_fd, '%s%s' % (METADATA_KEY, (key or '')))
do_removexattr(path, '%s%s' % (METADATA_KEY, (key or '')))
except IOError as err:
if err.errno == errno.ENODATA:
break
raise SwiftOnFileSystemIOError(
err.errno, '%s, removexattr("%s", %s)' % (err.strerror,
path_or_fd, key))
path, key))
key += 1
@ -238,7 +247,7 @@ def _read_for_etag(fp):
return etag.hexdigest()
def get_etag(path_or_fd):
def get_etag(path):
"""
FIXME: It would be great to have a translator that returns the md5sum() of
the file as an xattr that can be simply fetched.
@ -246,43 +255,22 @@ def get_etag(path_or_fd):
Since we don't have that we should yield after each chunk read and
computed so that we don't consume the worker thread.
"""
etag = ''
if isinstance(path_or_fd, int):
# We are given a file descriptor, so this is an invocation from the
# DiskFile.open() method.
fd = path_or_fd
dup_fd = do_dup(fd)
try:
etag = _read_for_etag(dup_fd)
do_lseek(fd, 0, os.SEEK_SET)
finally:
do_close(dup_fd)
else:
# We are given a path to the object when the DiskDir.list_objects_iter
# method invokes us.
path = path_or_fd
fd = do_open(path, os.O_RDONLY)
try:
etag = _read_for_etag(fd)
finally:
do_close(fd)
# FIXME: really do need to grab precomputed checksum instead
fd = do_open(path, os.O_RDONLY)
try:
etag = _read_for_etag(fd)
finally:
do_close(fd)
return etag
def get_object_metadata(obj_path_or_fd, stats=None):
def get_object_metadata(obj_path, stats=None):
"""
Return metadata of object.
"""
if not stats:
if isinstance(obj_path_or_fd, int):
# We are given a file descriptor, so this is an invocation from the
# DiskFile.open() method.
stats = do_fstat(obj_path_or_fd)
else:
# We are given a path to the object when the
# DiskDir.list_objects_iter method invokes us.
stats = do_stat(obj_path_or_fd)
stats = do_stat(obj_path)
if not stats:
metadata = {}
@ -290,12 +278,12 @@ def get_object_metadata(obj_path_or_fd, stats=None):
is_dir = stat.S_ISDIR(stats.st_mode)
metadata = {
X_TYPE: OBJECT,
X_TIMESTAMP: normalize_timestamp(stats.st_ctime),
X_TIMESTAMP: normalize_timestamp(stats.hpss_st_ctime),
X_CONTENT_TYPE: DIR_TYPE if is_dir else FILE_TYPE,
X_OBJECT_TYPE: DIR_NON_OBJECT if is_dir else FILE,
X_CONTENT_LENGTH: 0 if is_dir else stats.st_size,
X_MTIME: 0 if is_dir else normalize_timestamp(stats.st_mtime),
X_ETAG: md5().hexdigest() if is_dir else get_etag(obj_path_or_fd)}
X_MTIME: 0 if is_dir else normalize_timestamp(stats.hpss_st_mtime),
X_ETAG: md5().hexdigest() if is_dir else get_etag(obj_path)}
return metadata

View File

@ -23,29 +23,21 @@ except ImportError:
import random
import logging
import time
try:
import hpssfs
except ImportError:
import swiftonhpss.swift.common.hpssfs_ioctl as hpssfs
import xattr
from uuid import uuid4
from hashlib import md5
from eventlet import sleep
from contextlib import contextmanager
from swiftonhpss.swift.common.exceptions import AlreadyExistsAsFile, \
AlreadyExistsAsDir
from swift.common.utils import ThreadPool, hash_path, \
normalize_timestamp, fallocate, Timestamp
from swift.common.utils import hash_path, \
normalize_timestamp, Timestamp
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
DiskFileNoSpace, DiskFileDeviceUnavailable, DiskFileNotOpen, \
DiskFileExpired
DiskFileNoSpace, DiskFileNotOpen, DiskFileExpired
from swift.common.swob import multi_range_iterator
from swiftonhpss.swift.common.exceptions import SwiftOnFileSystemOSError, \
SwiftOnFileSystemIOError
from swiftonhpss.swift.common.exceptions import SwiftOnFileSystemOSError
from swiftonhpss.swift.common.fs_utils import do_fstat, do_open, do_close, \
do_unlink, do_chown, do_fsync, do_fchown, do_stat, do_write, do_read, \
do_fadvise64, do_rename, do_fdatasync, do_lseek, do_mkdir
do_unlink, do_chown, do_fsync, do_stat, do_write, do_read, \
do_rename, do_lseek, do_mkdir
from swiftonhpss.swift.common.utils import read_metadata, write_metadata,\
rmobjdir, dir_is_object, \
get_object_metadata, write_pickle, get_etag
@ -56,6 +48,10 @@ from swiftonhpss.swift.common.utils import X_CONTENT_TYPE, \
from swift.obj.diskfile import DiskFileManager as SwiftDiskFileManager
from swift.obj.diskfile import get_async_dir
import swiftonhpss.swift.common.hpss_utils as hpss_utils
import hpss.clapi as hpss
# FIXME: Hopefully we'll be able to move to Python 2.7+ where O_CLOEXEC will
# be back ported. See http://www.python.org/dev/peps/pep-0433/
O_CLOEXEC = 0o02000000
@ -201,7 +197,7 @@ def _adjust_metadata(fd, metadata):
# detect if the object was changed from filesystem interface (non Swift)
statinfo = do_fstat(fd)
if stat.S_ISREG(statinfo.st_mode):
metadata[X_MTIME] = normalize_timestamp(statinfo.st_mtime)
metadata[X_MTIME] = normalize_timestamp(statinfo.hpss_st_mtime)
metadata[X_TYPE] = OBJECT
return metadata
@ -226,22 +222,19 @@ class DiskFileManager(SwiftDiskFileManager):
"""
def get_diskfile(self, device, partition, account, container, obj,
policy=None, **kwargs):
dev_path = self.get_dev_path(device, self.mount_check)
if not dev_path:
raise DiskFileDeviceUnavailable()
return DiskFile(self, dev_path, self.threadpools[device],
hpss_dir_name = self.conf.get('hpss_swift_dir', '/swift')
return DiskFile(self, hpss_dir_name,
partition, account, container, obj,
policy=policy, **kwargs)
def pickle_async_update(self, device, account, container, obj, data,
timestamp, policy):
# This method invokes swiftonhpss's writepickle method.
# Is patching just write_pickle and calling parent method better ?
# This should be using the JSON blob stuff instead of a pickle.
# Didn't we deprecate it?
device_path = self.construct_dev_path(device)
async_dir = os.path.join(device_path, get_async_dir(policy))
ohash = hash_path(account, container, obj)
self.threadpools[device].run_in_thread(
write_pickle,
write_pickle(
data,
os.path.join(async_dir, ohash[-3:], ohash + '-' +
normalize_timestamp(timestamp)),
@ -267,6 +260,8 @@ class DiskFileWriter(object):
self._upload_size = 0
self._last_sync = 0
self._logger = self._disk_file._logger
def _write_entire_chunk(self, chunk):
bytes_per_sync = self._disk_file._mgr.bytes_per_sync
while chunk:
@ -276,8 +271,7 @@ class DiskFileWriter(object):
# For large files sync every 512MB (by default) written
diff = self._upload_size - self._last_sync
if diff >= bytes_per_sync:
do_fdatasync(self._fd)
do_fadvise64(self._fd, self._last_sync, diff)
do_fsync(self._fd)
self._last_sync = self._upload_size
def close(self):
@ -298,35 +292,17 @@ class DiskFileWriter(object):
:returns: the total number of bytes written to an object
"""
df = self._disk_file
df._threadpool.run_in_thread(self._write_entire_chunk, chunk)
self._write_entire_chunk(chunk)
return self._upload_size
def _finalize_put(self, metadata, purgelock=False):
def _finalize_put(self, metadata):
# Write out metadata before fsync() to ensure it is also forced to
# disk.
write_metadata(self._fd, metadata)
write_metadata(self._tmppath, metadata)
# We call fsync() before calling drop_cache() to lower the
# amount of redundant work the drop cache code will perform on
# the pages (now that after fsync the pages will be all
# clean).
do_fsync(self._fd)
# (HPSS) Purge lock the file now if we're asked to.
if purgelock:
try:
hpssfs.ioctl(self._fd, hpssfs.HPSSFS_PURGE_LOCK,
int(purgelock))
except IOError as err:
raise SwiftOnFileSystemIOError(
err.errno,
'%s, hpssfs.ioctl("%s", ...)' % (err.strerror, self._fd))
# From the Department of the Redundancy Department, make sure
# we call drop_cache() after fsync() to avoid redundant work
# (pages all clean).
do_fadvise64(self._fd, self._last_sync, self._upload_size)
self.set_checksum(metadata['ETag'])
# At this point we know that the object's full directory path
# exists, so we can just rename it directly without using Swift's
@ -400,7 +376,7 @@ class DiskFileWriter(object):
# in a thread.
self.close()
def put(self, metadata, purgelock=False):
def put(self, metadata):
"""
Finalize writing the file on disk, and renames it from the temp file
to the real location. This should be called after the data has been
@ -416,8 +392,7 @@ class DiskFileWriter(object):
df = self._disk_file
if dir_is_object(metadata):
df._threadpool.force_run_in_thread(
df._create_dir_object, df._data_file, metadata)
df._create_dir_object(df._data_file, metadata)
return
if df._is_dir:
@ -429,8 +404,7 @@ class DiskFileWriter(object):
' since the target, %s, already exists'
' as a directory' % df._data_file)
df._threadpool.force_run_in_thread(self._finalize_put, metadata,
purgelock)
self._finalize_put(metadata)
# Avoid the unlink() system call as part of the mkstemp context
# cleanup
@ -446,6 +420,9 @@ class DiskFileWriter(object):
"""
pass
def set_checksum(self, checksum):
hpss_utils.set_checksum(self._fd, checksum)
class DiskFileReader(object):
"""
@ -465,18 +442,16 @@ class DiskFileReader(object):
specific. The API does not define the constructor arguments.
:param fp: open file descriptor, -1 for a directory object
:param threadpool: thread pool to use for read operations
:param disk_chunk_size: size of reads from disk in bytes
:param obj_size: size of object on disk
:param keep_cache_size: maximum object size that will be kept in cache
:param iter_hook: called when __iter__ returns a chunk
:param keep_cache: should resulting reads be kept in the buffer cache
"""
def __init__(self, fd, threadpool, disk_chunk_size, obj_size,
def __init__(self, fd, disk_chunk_size, obj_size,
keep_cache_size, iter_hook=None, keep_cache=False):
# Parameter tracking
self._fd = fd
self._threadpool = threadpool
self._disk_chunk_size = disk_chunk_size
self._iter_hook = iter_hook
if keep_cache:
@ -496,8 +471,7 @@ class DiskFileReader(object):
bytes_read = 0
while True:
if self._fd != -1:
chunk = self._threadpool.run_in_thread(
do_read, self._fd, self._disk_chunk_size)
chunk = do_read(self._fd, self._disk_chunk_size)
else:
chunk = None
if chunk:
@ -555,9 +529,7 @@ class DiskFileReader(object):
self.close()
def _drop_cache(self, offset, length):
"""Method for no-oping buffer cache drop method."""
if not self._keep_cache and self._fd > -1:
do_fadvise64(self._fd, offset, length)
pass
def close(self):
"""
@ -581,25 +553,24 @@ class DiskFile(object):
:param mgr: associated on-disk manager instance
:param dev_path: device name/account_name for UFO.
:param threadpool: thread pool in which to do blocking operations
:param account: account name for the object
:param container: container name for the object
:param obj: object name for the object
:param uid: user ID disk object should assume (file or directory)
:param gid: group ID disk object should assume (file or directory)
"""
def __init__(self, mgr, dev_path, threadpool, partition,
def __init__(self, mgr, dev_path, partition,
account=None, container=None, obj=None,
policy=None, uid=DEFAULT_UID, gid=DEFAULT_GID, **kwargs):
# Variables partition and policy is currently unused.
self._mgr = mgr
self._logger = mgr.logger
self._device_path = dev_path
self._threadpool = threadpool or ThreadPool(nthreads=0)
self._uid = int(uid)
self._gid = int(gid)
self._uid = uid
self._gid = gid
self._is_dir = False
self._metadata = None
self._fd = None
self._fd = -1
# Save stat info as internal variable to avoid multiple stat() calls
self._stat = None
# Save md5sum of object as internal variable to avoid reading the
@ -609,9 +580,11 @@ class DiskFile(object):
# Don't store a value for data_file until we know it exists.
self._data_file = None
# Account name contains resller_prefix which is retained and not
self._obj_size = 0
# Account name contains reseller_prefix which is retained and not
# stripped. This to conform to Swift's behavior where account name
# entry in Account DBs contain resller_prefix.
# entry in Account DBs contain reseller_prefix.
self._account = account
self._container = container
@ -628,7 +601,6 @@ class DiskFile(object):
self._put_datadir = self._container_path
self._data_file = os.path.join(self._put_datadir, self._obj)
self._stat = do_stat(self._data_file)
@property
def timestamp(self):
@ -662,7 +634,15 @@ class DiskFile(object):
"""
# Writes are always performed to a temporary file
try:
self._fd = do_open(self._data_file, os.O_RDONLY | O_CLOEXEC)
self._logger.debug("DiskFile: Opening %s" % self._data_file)
self._stat = do_stat(self._data_file)
self._is_dir = stat.S_ISDIR(self._stat.st_mode)
if not self._is_dir:
self._fd = do_open(self._data_file, hpss.O_RDONLY)
self._obj_size = self._stat.st_size
else:
self._fd = -1
self._obj_size = 0
except SwiftOnFileSystemOSError as err:
if err.errno in (errno.ENOENT, errno.ENOTDIR):
# If the file does exist, or some part of the path does not
@ -670,25 +650,18 @@ class DiskFile(object):
raise DiskFileNotExist
raise
try:
self._stat = do_fstat(self._fd)
self._is_dir = stat.S_ISDIR(self._stat.st_mode)
obj_size = self._stat.st_size
self._metadata = read_metadata(self._fd)
self._logger.debug("DiskFile: Reading metadata")
self._metadata = read_metadata(self._data_file)
if not self._validate_object_metadata():
self._create_object_metadata(self._fd)
self._create_object_metadata(self._data_file)
assert self._metadata is not None
self._filter_metadata()
if self._is_dir:
do_close(self._fd)
obj_size = 0
self._fd = -1
else:
if not self._is_dir:
if self._is_object_expired(self._metadata):
raise DiskFileExpired(metadata=self._metadata)
self._obj_size = obj_size
except (OSError, IOError, DiskFileExpired) as err:
# Something went wrong. Context manager will not call
# __exit__. So we close the fd manually here.
@ -730,9 +703,9 @@ class DiskFile(object):
# Check if the file has been modified through filesystem
# interface by comparing mtime stored in xattr during PUT
# and current mtime of file.
obj_stat = os.stat(self._data_file)
obj_stat = do_stat(self._data_file)
if normalize_timestamp(self._metadata[X_MTIME]) != \
normalize_timestamp(obj_stat.st_mtime):
normalize_timestamp(obj_stat.hpss_st_mtime):
self._file_has_changed = True
return False
else:
@ -755,10 +728,10 @@ class DiskFile(object):
return False
def _create_object_metadata(self, fd):
def _create_object_metadata(self, file_path):
if self._etag is None:
self._etag = md5().hexdigest() if self._is_dir \
else get_etag(fd)
else get_etag(file_path)
if self._file_has_changed or (X_TIMESTAMP not in self._metadata):
timestamp = normalize_timestamp(self._stat.st_mtime)
@ -775,12 +748,12 @@ class DiskFile(object):
# Add X_MTIME key if object is a file
if not self._is_dir:
metadata[X_MTIME] = normalize_timestamp(self._stat.st_mtime)
metadata[X_MTIME] = normalize_timestamp(self._stat.hpss_st_mtime)
meta_new = self._metadata.copy()
meta_new.update(metadata)
if self._metadata != meta_new:
write_metadata(fd, meta_new)
write_metadata(file_path, meta_new)
# Avoid additional read_metadata() later
self._metadata = meta_new
@ -799,7 +772,7 @@ class DiskFile(object):
pass
except ValueError:
# x-delete-at key is present but not an integer.
# TODO: Openstack Swift "quarrantines" the object.
# TODO: Openstack Swift "quarantines" the object.
# We just let it pass
pass
else:
@ -807,58 +780,24 @@ class DiskFile(object):
return True
return False
# TODO: don't parse this, just grab data structure
def is_offline(self):
try:
raw_file_levels = xattr.getxattr(self._data_file,
"system.hpss.level")
except IOError as err:
raise SwiftOnFileSystemIOError(
err.errno,
'%s, xattr.getxattr("system.hpss.level", ...)' % err.strerror
)
try:
file_levels = raw_file_levels.split(";")
top_level = file_levels[0].split(':')
bytes_on_disk = top_level[2].rstrip(' ')
if bytes_on_disk == 'nodata':
bytes_on_disk = '0'
except ValueError:
raise SwiftOnFileSystemIOError("Couldn't get system.hpss.level!")
meta = hpss_utils.read_hpss_system_metadata(self._data_file)
raw_file_levels = meta['X-HPSS-Data-Levels']
file_levels = raw_file_levels.split(";")
top_level = file_levels[0].split(':')
bytes_on_disk = top_level[2].rstrip(' ')
if bytes_on_disk == 'nodata':
bytes_on_disk = '0'
return int(bytes_on_disk) != self._stat.st_size
def read_hpss_system_metadata(self):
header_to_xattr = {'X-HPSS-Account': 'account',
'X-HPSS-Bitfile-ID': 'bitfile',
'X-HPSS-Comment': 'comment',
'X-HPSS-Class-Of-Service-ID': 'cos',
'X-HPSS-Data-Levels': 'level',
'X-HPSS-Family-ID': 'family',
'X-HPSS-Fileset-ID': 'fileset',
'X-HPSS-Optimum-Size': 'optimum',
'X-HPSS-Purgelock-Status': 'purgelock',
'X-HPSS-Reads': 'reads',
'X-HPSS-Realm-ID': 'realm',
'X-HPSS-Subsys-ID': 'subsys',
'X-HPSS-Writes': 'writes', }
result = {}
for header in header_to_xattr:
xattr_to_get = 'system.hpss.%s' % header_to_xattr[header]
try:
result[header] = xattr.getxattr(self._data_file,
xattr_to_get)
except IOError as err:
error_message = "Couldn't get HPSS xattr %s from file %s" \
% (xattr_to_get, self._data_file)
raise SwiftOnFileSystemIOError(err.errno, error_message)
return result
def __enter__(self):
"""
Context enter.
.. note::
An implemenation shall raise `DiskFileNotOpen` when has not
An implementation shall raise `DiskFileNotOpen` when has not
previously invoked the :func:`swift.obj.diskfile.DiskFile.open`
method.
"""
@ -913,15 +852,20 @@ class DiskFile(object):
# validation and resulting timeouts
# This means we do a few things DiskFile.open() does.
try:
self._is_dir = os.path.isdir(self._data_file)
if not self._stat:
self._stat = do_stat(self._data_file)
self._is_dir = stat.S_ISDIR(self._stat.st_mode)
self._metadata = read_metadata(self._data_file)
except IOError:
except SwiftOnFileSystemOSError:
raise DiskFileNotExist
if not self._validate_object_metadata():
self._create_object_metadata(self._data_file)
self._filter_metadata()
return self._metadata
def read_hpss_system_metadata(self):
return hpss_utils.read_hpss_system_metadata(self._data_file)
def reader(self, iter_hook=None, keep_cache=False):
"""
Return a :class:`swift.common.swob.Response` class compatible
@ -939,7 +883,7 @@ class DiskFile(object):
if self._metadata is None:
raise DiskFileNotOpen()
dr = DiskFileReader(
self._fd, self._threadpool, self._mgr.disk_chunk_size,
self._fd, self._mgr.disk_chunk_size,
self._obj_size, self._mgr.keep_cache_size,
iter_hook=iter_hook, keep_cache=keep_cache)
# At this point the reader object is now responsible for closing
@ -1004,7 +948,7 @@ class DiskFile(object):
return True, newmd
@contextmanager
def create(self, size, cos):
def create(self, hpss_hints):
"""
Context manager to create a file. We create a temporary file first, and
then return a DiskFileWriter object to encapsulate the state.
@ -1021,67 +965,68 @@ class DiskFile(object):
preallocations even if the parameter is specified. But if it does
and it fails, it must raise a `DiskFileNoSpace` exception.
:param cos:
:param size: optional initial size of file to explicitly allocate on
disk
:param hpss_hints: dict containing HPSS class of service hints
:raises DiskFileNoSpace: if a size is specified and allocation fails
:raises AlreadyExistsAsFile: if path or part of a path is not a \
directory
"""
# Create /account/container directory structure on mount point root
try:
os.makedirs(self._container_path)
self._logger.debug("DiskFile: Creating directories for %s" %
self._container_path)
hpss_utils.makedirs(self._container_path)
except OSError as err:
if err.errno != errno.EEXIST:
raise
# Create directories to object name, if they don't exist.
self._logger.debug("DiskFile: creating directories in obj name %s" %
self._obj)
object_dirs = os.path.split(self._obj)[0]
self._logger.debug("object_dirs: %s" % repr(object_dirs))
self._logger.debug("data_dir: %s" % self._put_datadir)
hpss_utils.makedirs(os.path.join(self._put_datadir, object_dirs))
data_file = os.path.join(self._put_datadir, self._obj)
# Assume the full directory path exists to the file already, and
# construct the proper name for the temporary file.
# Create the temporary file
attempts = 1
while True:
# To know more about why following temp file naming convention is
# used, please read this GlusterFS doc:
# https://github.com/gluster/glusterfs/blob/master/doc/features/dht.md#rename-optimizations # noqa
tmpfile = '.' + self._obj + '.' + uuid4().hex
tmpfile = '.%s.%s.temporary' % (self._obj,
random.randint(0, 65536))
tmppath = os.path.join(self._put_datadir, tmpfile)
self._logger.debug("DiskFile: Creating temporary file %s" %
tmppath)
try:
# TODO: figure out how to create hints struct
self._logger.debug("DiskFile: creating file")
fd = do_open(tmppath,
os.O_WRONLY | os.O_CREAT | os.O_EXCL | O_CLOEXEC)
if size:
try:
hpssfs.ioctl(fd, hpssfs.HPSSFS_SET_FSIZE_HINT,
long(size))
except IOError as err:
message = '%s, hpssfs.ioctl("%s", SET_FSIZE)'
raise SwiftOnFileSystemIOError(
err.errno,
message % (err.strerror, fd))
if cos:
try:
hpssfs.ioctl(fd, hpssfs.HPSSFS_SET_COS_HINT, int(cos))
except IOError as err:
message = '%s, hpssfs.ioctl("%s", SET_COS)'
raise SwiftOnFileSystemIOError(
err.errno,
message % (err.strerror, fd))
hpss.O_WRONLY | hpss.O_CREAT | hpss.O_EXCL)
self._logger.debug("DiskFile: setting COS")
if hpss_hints['cos']:
hpss_utils.set_cos(tmppath, int(hpss_hints['cos']))
except SwiftOnFileSystemOSError as gerr:
if gerr.errno in (errno.ENOSPC, errno.EDQUOT):
# Raise DiskFileNoSpace to be handled by upper layers when
# there is no space on disk OR when quota is exceeded
self._logger.error("DiskFile: no space")
raise DiskFileNoSpace()
if gerr.errno == errno.EACCES:
self._logger.error("DiskFile: permission denied")
raise DiskFileNoSpace()
if gerr.errno == errno.ENOTDIR:
self._logger.error("DiskFile: not a directory")
raise AlreadyExistsAsFile('do_open(): failed on %s,'
' path or part of a'
' path is not a directory'
% (tmppath))
% data_file)
if gerr.errno not in (errno.ENOENT, errno.EEXIST, errno.EIO):
# FIXME: Other cases we should handle?
self._logger.error("DiskFile: unknown error %s"
% gerr.errno)
raise
if attempts >= MAX_OPEN_ATTEMPTS:
# We failed after N attempts to create the temporary
@ -1093,50 +1038,18 @@ class DiskFile(object):
attempts, MAX_OPEN_ATTEMPTS,
data_file))
if gerr.errno == errno.EEXIST:
self._logger.debug("DiskFile: file exists already")
# Retry with a different random number.
attempts += 1
elif gerr.errno == errno.EIO:
# FIXME: Possible FUSE issue or race condition, let's
# sleep on it and retry the operation.
_random_sleep()
logging.warn("DiskFile.mkstemp(): %s ... retrying in"
" 0.1 secs", gerr)
attempts += 1
elif not self._obj_path:
# No directory hierarchy and the create failed telling us
# the container or volume directory does not exist. This
# could be a FUSE issue or some race condition, so let's
# sleep a bit and retry.
_random_sleep()
logging.warn("DiskFile.mkstemp(): %s ... retrying in"
" 0.1 secs", gerr)
attempts += 1
elif attempts > 1:
# Got ENOENT after previously making the path. This could
# also be a FUSE issue or some race condition, nap and
# retry.
_random_sleep()
logging.warn("DiskFile.mkstemp(): %s ... retrying in"
" 0.1 secs" % gerr)
attempts += 1
else:
# It looks like the path to the object does not already
# exist; don't count this as an attempt, though, since
# we perform the open() system call optimistically.
self._create_dir_object(self._obj_path)
else:
break
dw = None
self._logger.debug("DiskFile: created file")
try:
if size is not None and size > 0:
try:
fallocate(fd, size)
except OSError as err:
if err.errno in (errno.ENOSPC, errno.EDQUOT):
raise DiskFileNoSpace()
raise
# Ensure it is properly owned before we make it available.
do_fchown(fd, self._uid, self._gid)
do_chown(tmppath, self._uid, self._gid)
dw = DiskFileWriter(fd, tmppath, self)
yield dw
finally:
@ -1157,8 +1070,7 @@ class DiskFile(object):
"""
metadata = self._keep_sys_metadata(metadata)
data_file = os.path.join(self._put_datadir, self._obj)
self._threadpool.run_in_thread(
write_metadata, data_file, metadata)
write_metadata(data_file, metadata)
def _keep_sys_metadata(self, metadata):
"""
@ -1221,6 +1133,12 @@ class DiskFile(object):
else:
dirname = os.path.dirname(dirname)
def set_cos(self, cos_id):
hpss_utils.set_cos(self._data_file, cos_id)
def set_purge_lock(self, purgelock):
hpss_utils.set_purge_lock(self._data_file, purgelock)
def delete(self, timestamp):
"""
Delete the object.
@ -1247,7 +1165,7 @@ class DiskFile(object):
if metadata[X_TIMESTAMP] >= timestamp:
return
self._threadpool.run_in_thread(self._unlinkold)
self._unlinkold()
self._metadata = None
self._data_file = None

View File

@ -17,12 +17,7 @@
import math
import logging
import xattr
import os
try:
import hpssfs
except ImportError:
import swiftonhpss.swift.common.hpssfs_ioctl as hpssfs
import time
from hashlib import md5
@ -47,7 +42,7 @@ from swift.common.ring import Ring
from swiftonhpss.swift.obj.diskfile import DiskFileManager
from swiftonhpss.swift.common.constraints import check_object_creation
from swiftonhpss.swift.common import utils
from swiftonhpss.swift.common import utils, hpss_utils
class SwiftOnFileDiskFileRouter(object):
@ -82,13 +77,30 @@ class ObjectController(server.ObjectController):
self._diskfile_router = SwiftOnFileDiskFileRouter(conf, self.logger)
self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.container_ring = None
# This conf option will be deprecated and eventualy removed in
# This conf option will be deprecated and eventually removed in
# future releases
utils.read_pickled_metadata = \
config_true_value(conf.get('read_pickled_metadata', 'no'))
# HPSS-specific conf options
self.allow_purgelock = \
config_true_value(conf.get('allow_purgelock', True))
self.default_cos_id = conf.get('default_cos_id')
self.default_cos_id = conf.get('default_cos_id', '1')
self.hpss_principal = conf.get('hpss_user', 'swift')
self.hpss_auth_mech = conf.get('hpss_auth_mechanism', 'unix')
self.hpss_auth_cred_type = conf.get('hpss_auth_credential_type',
'keytab')
self.hpss_auth_creds = conf.get('hpss_auth_credential',
'/var/hpss/etc/hpss.unix.keytab')
self.hpss_uid = conf.get('hpss_uid', '300')
self.hpss_gid = conf.get('hpss_gid', '300')
self.logger.debug("Creating HPSS session")
hpss_utils.create_hpss_session(self.hpss_principal,
self.hpss_auth_mech,
self.hpss_auth_cred_type,
self.hpss_auth_creds)
def get_container_ring(self):
"""Get the container ring. Load it, if it hasn't been yet."""
@ -100,6 +112,7 @@ class ObjectController(server.ObjectController):
@timing_stats()
def PUT(self, request):
"""Handle HTTP PUT requests for the Swift on File object server"""
try:
device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
@ -128,10 +141,15 @@ class ObjectController(server.ObjectController):
request=request,
content_type='text/plain')
self.logger.debug("DiskFile @ %s/%s/%s/%s" %
(device, account, container, obj))
# Try to get DiskFile
try:
disk_file = self.get_diskfile(device, partition, account,
container, obj, policy=policy)
container, obj, policy=policy,
uid=int(self.hpss_uid),
gid=int(self.hpss_gid))
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
@ -158,22 +176,23 @@ class ObjectController(server.ObjectController):
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
upload_expiration = time.time() + self.max_upload_time
self.logger.debug("Receiving and writing object")
etag = md5()
elapsed_time = 0
# (HPSS) Check for HPSS-specific metadata headers
cos = request.headers.get('X-Hpss-Class-Of-Service-Id',
self.default_cos_id)
hints = {'cos': request.headers.get('X-Hpss-Class-Of-Service-Id',
self.default_cos_id),
'purgelock':
request.headers.get('X-Hpss-Purgelock-Status', False)}
if self.allow_purgelock:
purgelock = config_true_value(
request.headers.get('X-Hpss-Purgelock-Status', 'false'))
else:
purgelock = False
if request.headers['content-type'] == 'application/directory':
# TODO: handle directories different
pass
try:
# Feed DiskFile our HPSS-specific stuff
with disk_file.create(size=fsize, cos=cos) as writer:
with disk_file.create(hpss_hints=hints) as writer:
upload_size = 0
def timeout_reader():
@ -203,6 +222,8 @@ class ObjectController(server.ObjectController):
and request.headers['etag'].lower() != etag:
return HTTPUnprocessableEntity(request=request)
self.logger.debug("Writing object metadata")
# Update object metadata
content_type = request.headers['content-type']
metadata = {'X-Timestamp': request.timestamp.internal,
@ -222,43 +243,18 @@ class ObjectController(server.ObjectController):
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
self.logger.debug("Finalizing object")
# (HPSS) Write the file, with added options
writer.put(metadata, purgelock=purgelock)
writer.put(metadata)
except DiskFileNoSpace:
return HTTPInsufficientStorage(drive=device, request=request)
except SwiftOnFileSystemIOError as e:
self.logger.error(e)
return HTTPServiceUnavailable(request=request)
# FIXME: this stuff really should be handled in DiskFile somehow?
# we set the hpss checksum in here, so both systems have valid
# and current checksum metadata
# (HPSS) Set checksum on file ourselves, if hpssfs won't do it
# for us.
data_file = disk_file._data_file
try:
xattr.setxattr(data_file, 'system.hpss.hash',
"md5:%s" % etag)
except IOError:
logging.debug("Could not write ETag to system.hpss.hash,"
" trying user.hash.checksum")
try:
xattr.setxattr(data_file,
'user.hash.checksum', etag)
xattr.setxattr(data_file,
'user.hash.algorithm', 'md5')
xattr.setxattr(data_file,
'user.hash.state', 'Valid')
xattr.setxattr(data_file,
'user.hash.filesize', str(upload_size))
xattr.setxattr(data_file,
'user.hash.app', 'swiftonhpss')
except IOError as err:
raise SwiftOnFileSystemIOError(
err.errno,
'Could not write MD5 checksum to HPSS filesystem: '
'%s' % err.strerror)
self.logger.debug("Writing container metadata")
# Update container metadata
if orig_delete_at != new_delete_at:
@ -277,13 +273,15 @@ class ObjectController(server.ObjectController):
self.container_update('PUT', account, container, obj, request,
HeaderKeyDict(container_headers),
device, policy)
self.logger.debug("Done!")
# Create convenience symlink
try:
self._object_symlink(request, disk_file._data_file, device,
account)
self._project_symlink(request, disk_file, account)
except SwiftOnFileSystemOSError:
logging.debug('could not make account symlink')
return HTTPServiceUnavailable(request=request)
return HTTPCreated(request=request, etag=etag)
except (AlreadyExistsAsFile, AlreadyExistsAsDir):
@ -323,26 +321,35 @@ class ObjectController(server.ObjectController):
'x-etag': resp.headers['ETag']}),
device, policy_idx)
def _object_symlink(self, request, diskfile, device, account):
mount = diskfile.split(device)[0]
dev = "%s%s" % (mount, device)
# FIXME: this should be in diskfile?
def _project_symlink(self, request, diskfile, account):
project = None
diskfile_path = diskfile._data_file
hpss_root = diskfile_path.split(account)[0]
if 'X-Project-Name' in request.headers:
project = request.headers.get('X-Project-Name')
elif 'X-Tenant-Name' in request.headers:
project = request.headers.get('X-Tenant-Name')
if project:
if project is not account:
accdir = "%s/%s" % (dev, account)
projdir = "%s%s" % (mount, project)
if not os.path.exists(projdir):
if project != account:
symlink_location = os.path.join(hpss_root, project)
account_location = os.path.join(hpss_root, account)
self.logger.debug('symlink_location: %s' % symlink_location)
self.logger.debug('account_location: %s' % account_location)
if not hpss_utils.path_exists(symlink_location):
try:
os.symlink(accdir, projdir)
except OSError as err:
hpss_utils.relative_symlink(hpss_root, account,
project)
except IOError as err:
self.logger.debug('symlink failed, errno %s'
% err.errno)
raise SwiftOnFileSystemOSError(
err.errno,
('%s, os.symlink("%s", ...)' %
err.strerror, account))
('os.symlink("%s", ...)' % account))
@public
@timing_stats()
@ -354,7 +361,9 @@ class ObjectController(server.ObjectController):
# Get DiskFile
try:
disk_file = self.get_diskfile(device, partition, account,
container, obj, policy=policy)
container, obj, policy=policy,
uid=int(self.hpss_uid),
gid=int(self.hpss_gid))
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
@ -365,7 +374,7 @@ class ObjectController(server.ObjectController):
try:
with disk_file.open():
metadata = disk_file.get_metadata()
want_hpss_metadata = request.headers.get('X-HPSS-Get-Metadata',
want_hpss_metadata = request.headers.get('X-Hpss-Get-Metadata',
False)
if config_true_value(want_hpss_metadata):
try:
@ -422,7 +431,9 @@ class ObjectController(server.ObjectController):
# Get Diskfile
try:
disk_file = self.get_diskfile(device, partition, account,
container, obj, policy)
container, obj, policy,
uid=int(self.hpss_uid),
gid=int(self.hpss_gid))
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
@ -493,36 +504,22 @@ class ObjectController(server.ObjectController):
# Get DiskFile
try:
disk_file = self.get_diskfile(device, partition, account,
container, obj, policy)
container, obj, policy,
uid=int(self.hpss_uid),
gid=int(self.hpss_gid))
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
# Set Purgelock status if we got it
if self.allow_purgelock:
purgelock = request.headers.get('X-HPSS-Purgelock-Status')
else:
purgelock = False
if purgelock:
try:
hpssfs.ioctl(disk_file._fd, hpssfs.HPSSFS_PURGE_LOCK,
int(purgelock))
except IOError as err:
raise SwiftOnFileSystemIOError(
err.errno,
'%s, xattr.getxattr("%s", ...)' %
(err.strerror, disk_file._fd))
# Set class of service if we got it
cos = request.headers.get('X-HPSS-Class-Of-Service-ID')
if cos:
try:
xattr.setxattr(disk_file._fd, 'system.hpss.cos', int(cos))
except IOError as err:
raise SwiftOnFileSystemIOError(
err.errno,
'%s, xattr.setxattr("%s", ...)' %
(err.strerror, disk_file._fd))
new_cos = request.headers.get('X-HPSS-Class-Of-Service-Id', None)
if new_cos:
disk_file.set_cos(int(new_cos))
# Set purge lock status if we got it
if self.allow_purgelock:
purge_lock = request.headers.get('X-HPSS-Purgelock-Status', None)
if purge_lock is not None:
disk_file.set_purge_lock(purge_lock)
# Update metadata from request
try:
@ -561,7 +558,9 @@ class ObjectController(server.ObjectController):
req_timestamp = valid_timestamp(request)
try:
disk_file = self.get_diskfile(device, partition, account,
container, obj, policy)
container, obj, policy,
uid=int(self.hpss_uid),
gid=int(self.hpss_gid))
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)

View File

@ -25,10 +25,8 @@ from test.functional.swift_test_client import Account, Connection, \
ResponseError
import test.functional as tf
# PGB - changed 'AUTH_' hardcoded reseller prefix to 'KEY_'.
# TODO: read Swift proxy config for this
class TestSwiftOnFileEnv:
@classmethod
def setUp(cls):
@ -37,14 +35,13 @@ class TestSwiftOnFileEnv:
cls.account = Account(cls.conn, tf.config.get('account',
tf.config['username']))
# PGB - change hardcoded SoF mountpoint
#cls.root_dir = os.path.join('/mnt/swiftonhpss/test')
cls.root_dir = os.path.join('/srv/swift/hpss')
cls.root_dir = os.path.join('/srv/hpss')
cls.account.delete_containers()
cls.file_size = 8
cls.container = cls.account.container(Utils.create_name())
if not cls.container.create(None, None):
if not cls.container.create(hdrs={'X-Storage-Policy': 'swiftonhpss'}):
raise ResponseError(cls.conn.response)
cls.dirs = [
@ -134,6 +131,15 @@ class TestSwiftOnFile(Base):
self.assert_status(201)
file_info = file_item.info()
print self.env.root_dir
print self.env.account.name
print self.env.container.name
print file_name
print os.listdir(self.env.root_dir)
print os.listdir(os.path.join(self.env.root_dir,
self.env.account.name))
with open(os.path.join(self.env.root_dir,
self.env.account.name,
self.env.container.name,

View File

@ -34,8 +34,8 @@ class TestSwiftOnHPSS(unittest.TestCase):
tf.config.get('account',
tf.config['username']))
cls.container = cls.account.container('swiftonhpss_test')
cls.container.create(hdrs={'X-Storage-Policy': 'hpss'})
cls.hpss_dir = '/srv/swift/hpss'
cls.container.create(hdrs={'X-Storage-Policy': 'swiftonhpss'})
cls.hpss_dir = '/srv/hpss'
@classmethod
def tearDownClass(cls):
@ -45,48 +45,65 @@ class TestSwiftOnHPSS(unittest.TestCase):
self.test_file = self.container.file('testfile')
def tearDown(self):
self.test_file.delete()
try:
self.test_file.delete()
except ResponseError as e:
if e.message != "Not Found":
raise
def test_purge_lock(self):
self.test_file.write(data='test',
hdrs={'X-Hpss-Purgelock-Status': 'true',
'X-Hpss-Class-Of-Service-Id': '3'})
resp = self.test_file.write(data='test',
hdrs={'X-Hpss-Purgelock-Status': 'true',
'X-Hpss-Class-Of-Service-Id': '1'},
return_resp=True)
print resp.status
print resp.getheaders()
print resp.read()
test_file_name = os.path.join(self.hpss_dir,
self.account.name,
self.container.name,
'testfile')
xattrs = dict(xattr.xattr(test_file_name))
self.assertEqual(xattrs['system.hpss.purgelock'], '1')
print test_file_name
print os.stat(test_file_name)
print xattr.listxattr(test_file_name)
self.assertEqual(xattr.get(test_file_name,
'system.hpss.purgelock'),
'1')
self.test_file.post(hdrs={'X-Hpss-Purgelock-Status': 'false'})
xattrs = dict(xattr.xattr(test_file_name))
self.assertEqual(xattrs['system.hpss.purgelock'], '0')
self.assertEqual(xattr.get(test_file_name,
'system.hpss.purgelock'), '0')
def test_change_cos(self):
self.test_file.write(data='asdfasdf',
hdrs={'X-Hpss-Class-Of-Service-Id': '3'})
hdrs={'X-Hpss-Class-Of-Service-Id': '2'})
test_file_name = os.path.join(self.hpss_dir,
self.account.name,
self.container.name,
'testfile')
time.sleep(30) # It takes a long time for HPSS to get around to it.
xattrs = dict(xattr.xattr(test_file_name))
self.assertEqual(xattrs['system.hpss.cos'], '3')
print test_file_name
self.test_file.post(hdrs={'X-HPSS-Class-Of-Service-ID': '1'})
time.sleep(30)
xattrs = dict(xattr.xattr(test_file_name))
self.assertEqual(xattrs['system.hpss.cos'], '1')
time.sleep(10) # It takes a long time for HPSS to get around to it.
self.assertEqual(xattr.get(test_file_name, 'system.hpss.cos'), '2')
self.test_file.post(hdrs={'X-Hpss-Class-Of-Service-Id': '1'})
time.sleep(10)
self.assertEqual(xattr.get(test_file_name, 'system.hpss.cos'), '1')
def test_hpss_metadata(self):
# header is X-HPSS-Get-Metadata
self.test_file.write(data='test')
self.connection.make_request('HEAD', self.test_file.path,
hdrs={'X-HPSS-Get-Metadata': 'true'})
hdrs={'X-Hpss-Get-Metadata': 'true'})
md = {t[0]: t[1] for t in self.connection.response.getheaders()}
print md
self.assertTrue('x-hpss-account' in md)
@ -101,4 +118,4 @@ class TestSwiftOnHPSS(unittest.TestCase):
self.assertTrue('x-hpss-reads' in md)
self.assertTrue('x-hpss-realm-id' in md)
self.assertTrue('x-hpss-subsys-id' in md)
self.assertTrue('x-hpss-writes' in md)
self.assertTrue('x-hpss-writes' in md)

View File

@ -1,6 +1,5 @@
[tox]
#envlist = py27,pep8,functest
envlist = py27,pep8
envlist = py27,pep8,functest
minversion = 1.6
skipsdist = True
@ -20,7 +19,9 @@ deps =
PyECLib==1.0.7
-r{toxinidir}/test-requirements.txt
changedir = {toxinidir}/test/unit
commands = nosetests -v {posargs}
# TODO: un-disable this when we have our own test setup ready
#commands = nosetests -v {posargs}
passenv = SWIFT_* *_proxy
[testenv:cover]