WIP: Mostly work with new swift?
See: - https://github.com/openstack/swift/commit/60b2e02 - https://github.com/openstack/swift/commit/e91de49 - https://github.com/openstack/swift/commit/4c11833 - https://github.com/openstack/swift/commit/ffd5194 Encryption doesn't work, because X-Object-Sysmeta-* isn't sticky. Neither is X-Static-Large-Object; otherwise, fast-POST mostly works. There's a cross-account COPY func test that's failing almost exactly like is described in https://github.com/openstack/swift/blob/2.16.0/test/functional/tests.py#L1791-L1793 Also, there were some updates to SLO that make tests like test_slo_missing_etag and test_slo_missing_size not reflect expected behavior. Change-Id: I72e40851feda03ae8e041cd7738f93b65d71bfc2
This commit is contained in:
parent
48a4c8b493
commit
1f8f9ece21
|
@ -10,4 +10,4 @@ pastedeploy>=1.3.3
|
|||
simplejson>=2.0.9
|
||||
six>=1.9.0
|
||||
xattr>=0.4
|
||||
PyECLib==1.0.7
|
||||
PyECLib>=1.3.1
|
||||
|
|
|
@ -26,7 +26,7 @@ from cStringIO import StringIO
|
|||
import pickletools
|
||||
from swiftonfile.swift.common.exceptions import SwiftOnFileSystemIOError
|
||||
from swift.common.exceptions import DiskFileNoSpace
|
||||
from swift.common.db import utf8encodekeys
|
||||
from swift.common.db import native_str_keys
|
||||
from swiftonfile.swift.common.fs_utils import do_stat, \
|
||||
do_walk, do_rmdir, do_log_rl, get_filename_from_fd, do_open, \
|
||||
do_getxattr, do_setxattr, do_removexattr, do_read, \
|
||||
|
@ -126,7 +126,7 @@ def deserialize_metadata(metastr):
|
|||
elif metastr.startswith('{') and metastr.endswith('}'):
|
||||
try:
|
||||
metadata = json.loads(metastr)
|
||||
utf8encodekeys(metadata)
|
||||
native_str_keys(metadata)
|
||||
return metadata
|
||||
except (UnicodeDecodeError, ValueError):
|
||||
logging.warning("json.loads() failed.", exc_info=True)
|
||||
|
|
|
@ -24,12 +24,12 @@ except ImportError:
|
|||
import logging
|
||||
import time
|
||||
from uuid import uuid4
|
||||
from eventlet import sleep
|
||||
from eventlet import sleep, tpool
|
||||
from contextlib import contextmanager
|
||||
from swiftonfile.swift.common.exceptions import AlreadyExistsAsFile, \
|
||||
AlreadyExistsAsDir, DiskFileContainerDoesNotExist
|
||||
from swift.common.utils import ThreadPool, hash_path, \
|
||||
normalize_timestamp, fallocate, Timestamp
|
||||
from swift.common.utils import hash_path, normalize_timestamp, \
|
||||
fallocate, Timestamp
|
||||
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
|
||||
DiskFileNoSpace, DiskFileDeviceUnavailable, DiskFileNotOpen, \
|
||||
DiskFileExpired
|
||||
|
@ -227,7 +227,7 @@ class DiskFileManager(SwiftDiskFileManager):
|
|||
dev_path = self.get_dev_path(device, self.mount_check)
|
||||
if not dev_path:
|
||||
raise DiskFileDeviceUnavailable()
|
||||
return DiskFile(self, dev_path, self.threadpools[device],
|
||||
return DiskFile(self, dev_path,
|
||||
partition, account, container, obj,
|
||||
policy=policy, **kwargs)
|
||||
|
||||
|
@ -238,8 +238,7 @@ class DiskFileManager(SwiftDiskFileManager):
|
|||
device_path = self.construct_dev_path(device)
|
||||
async_dir = os.path.join(device_path, get_async_dir(policy))
|
||||
ohash = hash_path(account, container, obj)
|
||||
self.threadpools[device].run_in_thread(
|
||||
write_pickle,
|
||||
write_pickle(
|
||||
data,
|
||||
os.path.join(async_dir, ohash[-3:], ohash + '-' +
|
||||
normalize_timestamp(timestamp)),
|
||||
|
@ -296,8 +295,7 @@ class DiskFileWriter(object):
|
|||
|
||||
:returns: the total number of bytes written to an object
|
||||
"""
|
||||
df = self._disk_file
|
||||
df._threadpool.run_in_thread(self._write_entire_chunk, chunk)
|
||||
self._write_entire_chunk(chunk)
|
||||
return self._upload_size
|
||||
|
||||
def _finalize_put(self, metadata):
|
||||
|
@ -406,7 +404,7 @@ class DiskFileWriter(object):
|
|||
df = self._disk_file
|
||||
|
||||
if dir_is_object(metadata):
|
||||
df._threadpool.force_run_in_thread(
|
||||
tpool.execute(
|
||||
df._create_dir_object, df._data_file, metadata)
|
||||
return
|
||||
|
||||
|
@ -419,7 +417,7 @@ class DiskFileWriter(object):
|
|||
' since the target, %s, already exists'
|
||||
' as a directory' % df._data_file)
|
||||
|
||||
df._threadpool.force_run_in_thread(self._finalize_put, metadata)
|
||||
tpool.execute(self._finalize_put, metadata)
|
||||
|
||||
# Avoid the unlink() system call as part of the create context
|
||||
# cleanup
|
||||
|
@ -454,18 +452,16 @@ class DiskFileReader(object):
|
|||
specific. The API does not define the constructor arguments.
|
||||
|
||||
:param fp: open file descriptor, -1 for a directory object
|
||||
:param threadpool: thread pool to use for read operations
|
||||
:param disk_chunk_size: size of reads from disk in bytes
|
||||
:param obj_size: size of object on disk
|
||||
:param keep_cache_size: maximum object size that will be kept in cache
|
||||
:param iter_hook: called when __iter__ returns a chunk
|
||||
:param keep_cache: should resulting reads be kept in the buffer cache
|
||||
"""
|
||||
def __init__(self, fd, threadpool, disk_chunk_size, obj_size,
|
||||
def __init__(self, fd, disk_chunk_size, obj_size,
|
||||
keep_cache_size, iter_hook=None, keep_cache=False):
|
||||
# Parameter tracking
|
||||
self._fd = fd
|
||||
self._threadpool = threadpool
|
||||
self._disk_chunk_size = disk_chunk_size
|
||||
self._iter_hook = iter_hook
|
||||
if keep_cache:
|
||||
|
@ -485,8 +481,7 @@ class DiskFileReader(object):
|
|||
bytes_read = 0
|
||||
while True:
|
||||
if self._fd != -1:
|
||||
chunk = self._threadpool.run_in_thread(
|
||||
do_read, self._fd, self._disk_chunk_size)
|
||||
chunk = do_read(self._fd, self._disk_chunk_size)
|
||||
else:
|
||||
chunk = None
|
||||
if chunk:
|
||||
|
@ -570,20 +565,18 @@ class DiskFile(object):
|
|||
|
||||
:param mgr: associated on-disk manager instance
|
||||
:param dev_path: device name/account_name for UFO.
|
||||
:param threadpool: thread pool in which to do blocking operations
|
||||
:param account: account name for the object
|
||||
:param container: container name for the object
|
||||
:param obj: object name for the object
|
||||
:param uid: user ID disk object should assume (file or directory)
|
||||
:param gid: group ID disk object should assume (file or directory)
|
||||
"""
|
||||
def __init__(self, mgr, dev_path, threadpool, partition,
|
||||
def __init__(self, mgr, dev_path, partition,
|
||||
account=None, container=None, obj=None,
|
||||
policy=None, uid=DEFAULT_UID, gid=DEFAULT_GID, **kwargs):
|
||||
# Variables partition and policy is currently unused.
|
||||
self._mgr = mgr
|
||||
self._device_path = dev_path
|
||||
self._threadpool = threadpool or ThreadPool(nthreads=0)
|
||||
self._uid = int(uid)
|
||||
self._gid = int(gid)
|
||||
self._is_dir = False
|
||||
|
@ -616,19 +609,27 @@ class DiskFile(object):
|
|||
self._data_file = os.path.join(self._put_datadir, self._obj)
|
||||
self._disk_file_open = False
|
||||
|
||||
@property
|
||||
def content_type(self):
|
||||
if self._metadata is None:
|
||||
raise DiskFileNotOpen()
|
||||
return self._metadata.get('Content-Type')
|
||||
|
||||
@property
|
||||
def timestamp(self):
|
||||
if self._metadata is None:
|
||||
raise DiskFileNotOpen()
|
||||
return Timestamp(self._metadata.get('X-Timestamp'))
|
||||
|
||||
@property
|
||||
def data_timestamp(self):
|
||||
if self._metadata is None:
|
||||
raise DiskFileNotOpen()
|
||||
return Timestamp(self._metadata.get('X-Timestamp'))
|
||||
data_timestamp = timestamp
|
||||
|
||||
def open(self):
|
||||
durable_timestamp = timestamp
|
||||
|
||||
content_type_timestamp = timestamp
|
||||
|
||||
fragments = None
|
||||
|
||||
def open(self, modernize=False, current_time=None):
|
||||
"""
|
||||
Open the object.
|
||||
|
||||
|
@ -674,7 +675,7 @@ class DiskFile(object):
|
|||
obj_size = 0
|
||||
self._fd = -1
|
||||
else:
|
||||
if self._is_object_expired(self._metadata):
|
||||
if self._is_object_expired(self._metadata, current_time):
|
||||
raise DiskFileExpired(metadata=self._metadata)
|
||||
self._obj_size = obj_size
|
||||
except (OSError, IOError, DiskFileExpired) as err:
|
||||
|
@ -696,7 +697,7 @@ class DiskFile(object):
|
|||
self._disk_file_open = True
|
||||
return self
|
||||
|
||||
def _is_object_expired(self, metadata):
|
||||
def _is_object_expired(self, metadata, current_time=None):
|
||||
try:
|
||||
x_delete_at = int(metadata['X-Delete-At'])
|
||||
except KeyError:
|
||||
|
@ -707,7 +708,9 @@ class DiskFile(object):
|
|||
# We just let it pass
|
||||
pass
|
||||
else:
|
||||
if x_delete_at <= time.time():
|
||||
if current_time is None:
|
||||
current_time = time.time()
|
||||
if x_delete_at <= current_time:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
@ -767,7 +770,12 @@ class DiskFile(object):
|
|||
raise DiskFileNotOpen()
|
||||
return self._metadata
|
||||
|
||||
def read_metadata(self):
|
||||
def get_datafile_metadata(self):
|
||||
if self._metadata is None:
|
||||
raise DiskFileNotOpen()
|
||||
return self._metadata
|
||||
|
||||
def read_metadata(self, current_time=None):
|
||||
"""
|
||||
Return the metadata for an object without requiring the caller to open
|
||||
the object first.
|
||||
|
@ -792,7 +800,8 @@ class DiskFile(object):
|
|||
raise DiskFileNotExist
|
||||
raise err
|
||||
|
||||
if self._metadata and self._is_object_expired(self._metadata):
|
||||
if self._metadata and self._is_object_expired(
|
||||
self._metadata, current_time):
|
||||
raise DiskFileExpired(metadata=self._metadata)
|
||||
|
||||
try:
|
||||
|
@ -830,7 +839,7 @@ class DiskFile(object):
|
|||
if not self._disk_file_open:
|
||||
raise DiskFileNotOpen()
|
||||
dr = DiskFileReader(
|
||||
self._fd, self._threadpool, self._mgr.disk_chunk_size,
|
||||
self._fd, self._mgr.disk_chunk_size,
|
||||
self._obj_size, self._mgr.keep_cache_size,
|
||||
iter_hook=iter_hook, keep_cache=keep_cache)
|
||||
# At this point the reader object is now responsible for closing
|
||||
|
@ -1054,8 +1063,7 @@ class DiskFile(object):
|
|||
"""
|
||||
metadata = self._keep_sys_metadata(metadata)
|
||||
data_file = os.path.join(self._put_datadir, self._obj)
|
||||
self._threadpool.run_in_thread(
|
||||
write_metadata, data_file, metadata)
|
||||
write_metadata(data_file, metadata)
|
||||
|
||||
def _keep_sys_metadata(self, metadata):
|
||||
"""
|
||||
|
@ -1147,7 +1155,7 @@ class DiskFile(object):
|
|||
if metadata and metadata[X_TIMESTAMP] >= timestamp:
|
||||
return
|
||||
|
||||
self._threadpool.run_in_thread(self._unlinkold)
|
||||
self._unlinkold()
|
||||
|
||||
self._metadata = None
|
||||
self._data_file = None
|
||||
|
|
|
@ -20,7 +20,7 @@ import socket
|
|||
import time
|
||||
import urllib
|
||||
|
||||
import simplejson as json
|
||||
import json
|
||||
from nose import SkipTest
|
||||
from xml.dom import minidom
|
||||
|
||||
|
|
|
@ -643,8 +643,11 @@ def fake_syslog_handler():
|
|||
logging.handlers.SysLogHandler = FakeLogger
|
||||
|
||||
|
||||
if utils.config_true_value(
|
||||
get_config('unit_test').get('fake_syslog', 'False')):
|
||||
try:
|
||||
config = get_config('unit_test')
|
||||
except (ValueError, IOError):
|
||||
config = {}
|
||||
if utils.config_true_value(config.get('fake_syslog', 'False')):
|
||||
fake_syslog_handler()
|
||||
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ from swiftonfile.swift.common.exceptions import AlreadyExistsAsDir, \
|
|||
AlreadyExistsAsFile
|
||||
from swift.common.exceptions import DiskFileNoSpace, DiskFileNotOpen, \
|
||||
DiskFileNotExist, DiskFileExpired
|
||||
from swift.common.utils import ThreadPool
|
||||
|
||||
from swiftonfile.swift.common.exceptions import SwiftOnFileSystemOSError
|
||||
import swiftonfile.swift.common.utils
|
||||
|
@ -143,6 +142,7 @@ class TestDiskFile(unittest.TestCase):
|
|||
self._saved_fallocate = swiftonfile.swift.obj.diskfile.fallocate
|
||||
swiftonfile.swift.obj.diskfile.fallocate = _mock_fallocate
|
||||
self.td = tempfile.mkdtemp()
|
||||
os.mkdir(os.path.join(self.td, 'vol0'))
|
||||
self.conf = dict(devices=self.td, mb_per_sync=2,
|
||||
keep_cache_size=(1024 * 1024), mount_check=False)
|
||||
self.mgr = DiskFileManager(self.conf, self.lg)
|
||||
|
@ -166,7 +166,6 @@ class TestDiskFile(unittest.TestCase):
|
|||
gdf = self._get_diskfile("vol0", "p57", "ufo47", "bar", "z")
|
||||
assert gdf._mgr is self.mgr
|
||||
assert gdf._device_path == os.path.join(self.td, "vol0")
|
||||
assert isinstance(gdf._threadpool, ThreadPool)
|
||||
assert gdf._uid == DEFAULT_UID
|
||||
assert gdf._gid == DEFAULT_GID
|
||||
assert gdf._obj == "z"
|
||||
|
|
4
tox.ini
4
tox.ini
|
@ -15,8 +15,8 @@ deps =
|
|||
# Note: pip supports installing from git repos.
|
||||
# https://pip.pypa.io/en/latest/reference/pip_install.html#git
|
||||
# Example: git+https://github.com/openstack/swift.git@2.0.0
|
||||
https://launchpad.net/swift/liberty/2.5.0/+download/swift-2.5.0.tar.gz
|
||||
PyECLib==1.0.7
|
||||
http://tarballs.openstack.org/swift/swift-2.16.0.tar.gz
|
||||
PyECLib>=1.3.1
|
||||
-r{toxinidir}/test-requirements.txt
|
||||
changedir = {toxinidir}/test/unit
|
||||
commands = nosetests -v {posargs}
|
||||
|
|
Loading…
Reference in New Issue