Merge "Erase object header if we fail to register it to the index-server" into feature/losf
This commit is contained in:
commit
e8a51cc80e
|
@ -24,6 +24,10 @@ PICKLE_PROTOCOL = 2
|
|||
OBJECT_HEADER_VERSION = 4
|
||||
VOLUME_HEADER_VERSION = 1
|
||||
|
||||
# maximum serialized header length
|
||||
MAX_OBJECT_HEADER_LEN = 512
|
||||
MAX_VOLUME_HEADER_LEN = 128
|
||||
|
||||
OBJECT_START_MARKER = b"SWIFTOBJ"
|
||||
VOLUME_START_MARKER = b"SWIFTVOL"
|
||||
|
||||
|
@ -350,7 +354,7 @@ class VolumeHeader(object):
|
|||
|
||||
# Read volume header. Expects fp to be positionned at header offset
|
||||
def read_volume_header(fp):
|
||||
buf = fp.read(128)
|
||||
buf = fp.read(MAX_VOLUME_HEADER_LEN)
|
||||
header = VolumeHeader.unpack(buf)
|
||||
return header
|
||||
|
||||
|
@ -365,7 +369,7 @@ def read_object_header(fp):
|
|||
:param fp: opened file, positioned at header start
|
||||
:return: an ObjectHeader
|
||||
"""
|
||||
buf = fp.read(512)
|
||||
buf = fp.read(MAX_OBJECT_HEADER_LEN)
|
||||
header = ObjectHeader.unpack(buf)
|
||||
return header
|
||||
|
||||
|
@ -378,3 +382,13 @@ def write_object_header(header, fp):
|
|||
"""
|
||||
fp.write(header.pack())
|
||||
fdatasync(fp.fileno())
|
||||
|
||||
|
||||
def erase_object_header(fd, offset):
|
||||
"""
|
||||
Erase an object header by writing null bytes over it
|
||||
:param fd: volume file descriptor
|
||||
:param offset: absolute header offset
|
||||
"""
|
||||
os.lseek(fd, offset, os.SEEK_SET)
|
||||
os.write(fd, b"\x00" * MAX_OBJECT_HEADER_LEN)
|
||||
|
|
|
@ -29,7 +29,8 @@ from eventlet.green import os
|
|||
from swift.obj.header import ObjectHeader, VolumeHeader, ALIGNMENT, \
|
||||
read_volume_header, HeaderException, STATE_OBJ_QUARANTINED, \
|
||||
STATE_OBJ_FILE, write_object_header, \
|
||||
read_object_header, OBJECT_HEADER_VERSION, write_volume_header
|
||||
read_object_header, OBJECT_HEADER_VERSION, write_volume_header, \
|
||||
erase_object_header, MAX_OBJECT_HEADER_LEN
|
||||
from swift.common.exceptions import DiskFileNoSpace, \
|
||||
DiskFileBadMetadataChecksum
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
@ -370,14 +371,17 @@ class VFileWriter(object):
|
|||
fdatasync(self.fd)
|
||||
|
||||
# register object
|
||||
# TODO: if that fails, we want to remove the data that was written
|
||||
# in the volume: an exception is raised, the caller does not expect
|
||||
# the file to have been written. However, if we crash before
|
||||
# the volume is written to again, the file that was written will
|
||||
# be recovered and an entry created for it in the KV.
|
||||
full_name = "{}{}".format(self.header.ohash, filename)
|
||||
rpc.register_object(self.socket_path, full_name, self.volume_index,
|
||||
self.offset, object_end)
|
||||
try:
|
||||
rpc.register_object(self.socket_path, full_name, self.volume_index,
|
||||
self.offset, object_end)
|
||||
except RpcError:
|
||||
# If we failed to register the object, erase the header so that it
|
||||
# will not be picked up by the volume checker if there is a crash
|
||||
# or power failure before it gets overwritten by another object.
|
||||
erase_object_header(self.fd, self.offset)
|
||||
raise
|
||||
|
||||
increment(self.logger, 'vfile.vfile_creation')
|
||||
increment(self.logger, 'vfile.total_space_used',
|
||||
self.header.total_size)
|
||||
|
@ -908,7 +912,7 @@ def set_header_state(socket_path, name, quarantine):
|
|||
# if we find a hole instead of the header, remove entry from
|
||||
# kv and return.
|
||||
fp.seek(obj.offset)
|
||||
data = fp.read(512)
|
||||
data = fp.read(MAX_OBJECT_HEADER_LEN)
|
||||
if all(c == '\x00' for c in data):
|
||||
# unregister the object here
|
||||
rpc.unregister_object(socket_path, name)
|
||||
|
@ -1076,7 +1080,7 @@ def delete_vfile_from_path(filepath):
|
|||
# if we find a hole instead of the header, remove entry from
|
||||
# kv and return.
|
||||
fp.seek(obj.offset)
|
||||
data = fp.read(512)
|
||||
data = fp.read(MAX_OBJECT_HEADER_LEN)
|
||||
if all(c == '\x00' for c in data):
|
||||
# unregister the object here
|
||||
_unregister_object(si.socket_path, full_name,
|
||||
|
|
|
@ -23,7 +23,8 @@ from random import randint
|
|||
import six
|
||||
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
from swift.obj.header import ObjectHeader, STATE_OBJ_FILE
|
||||
from swift.obj.header import ObjectHeader, STATE_OBJ_FILE, \
|
||||
MAX_OBJECT_HEADER_LEN
|
||||
from swift.obj.meta_pb2 import Metadata
|
||||
from swift.obj.vfile import VFileWriter
|
||||
from swift.obj.vfile_utils import VOSError, next_aligned_offset
|
||||
|
@ -400,7 +401,7 @@ class TestVFileWriter(unittest.TestCase):
|
|||
|
||||
# check header
|
||||
vol_file.seek(t["offset"])
|
||||
serialized_header = vol_file.read(512)
|
||||
serialized_header = vol_file.read(MAX_OBJECT_HEADER_LEN)
|
||||
header = ObjectHeader.unpack(serialized_header)
|
||||
self.assertEqual(header.version, vfile.OBJECT_HEADER_VERSION)
|
||||
self.assertEqual(header.ohash, "d41d8cd98f00b204e9800998ecf8427e")
|
||||
|
@ -464,6 +465,40 @@ class TestVFileWriter(unittest.TestCase):
|
|||
vfile_writer, _ = self._get_vfile_writer()
|
||||
self.assertRaises(vfile.VIOError, vfile_writer.commit, "", {})
|
||||
|
||||
@mock.patch("swift.obj.rpc_http.register_object")
|
||||
def test_commit_register_fail(self, m_register_object):
|
||||
"""
|
||||
Check that the header object is erased if commit() fails to register
|
||||
the object on the index server.
|
||||
"""
|
||||
m_register_object.side_effect = RpcError("failed to register object",
|
||||
StatusCode.Unavailable)
|
||||
offset = 4096
|
||||
metadata_reserve = 500
|
||||
vfile_writer, vol_file = self._get_vfile_writer(
|
||||
offset=offset, metadata_reserve=metadata_reserve)
|
||||
content = b"dummy data"
|
||||
os.write(vfile_writer.fd, content)
|
||||
|
||||
filename = "dummy-filename"
|
||||
metadata = {"dummy": "metadata"}
|
||||
|
||||
self.assertRaises(RpcError, vfile_writer.commit, filename, metadata)
|
||||
|
||||
# check the header was erased
|
||||
vol_file.seek(offset)
|
||||
serialized_header = vol_file.read(MAX_OBJECT_HEADER_LEN)
|
||||
self.assertEqual(serialized_header, b"\x00" * MAX_OBJECT_HEADER_LEN)
|
||||
|
||||
# check we did not write past the header by checking the file data
|
||||
data_offset = (offset +
|
||||
len(ObjectHeader(
|
||||
version=header.OBJECT_HEADER_VERSION)) +
|
||||
metadata_reserve)
|
||||
vol_file.seek(data_offset)
|
||||
data = vol_file.read(len(content))
|
||||
self.assertEqual(data, content)
|
||||
|
||||
@mock.patch("swift.obj.vfile.open", new_callable=mock.mock_open)
|
||||
@mock.patch("swift.obj.vfile.fcntl.flock")
|
||||
@mock.patch("swift.obj.vfile.get_next_volume_index")
|
||||
|
|
Loading…
Reference in New Issue