Allow to rebuild a fragment of an expired object

When a fragment of an expired object was missing, the reconstructor
ssync job would send a DELETE sub-request. This leads to situation
where, for the same object and timestamp, some nodes have a data file,
while others can have a tombstone file.

This patch forces the reconstructor to reconstruct a data file, even
for expired objects. DELETE requests are only sent for tombstoned
objects.

Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>
Closes-Bug: #1652323
Change-Id: I7f90b732c3268cb852b64f17555c631d668044a8
This commit is contained in:
Romain LE DISEZ 2017-06-09 14:23:05 +02:00
parent 0b22193718
commit 69df458254
7 changed files with 144 additions and 47 deletions

View File

@ -407,6 +407,7 @@ class ObjectReconstructor(Daemon):
# need to be durable.
headers = self.headers.copy()
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
headers['X-Backend-Replication'] = 'True'
frag_prefs = [{'timestamp': datafile_metadata['X-Timestamp'],
'exclude': []}]
headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs)

View File

@ -908,7 +908,9 @@ class ObjectController(BaseStorageServer):
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
policy=policy, frag_prefs=frag_prefs)
policy=policy, frag_prefs=frag_prefs,
open_expired=config_true_value(
request.headers.get('x-backend-replication', 'false')))
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:

View File

@ -329,7 +329,8 @@ class Sender(object):
try:
df = self.df_mgr.get_diskfile_from_hash(
self.job['device'], self.job['partition'], object_hash,
self.job['policy'], frag_index=self.job.get('frag_index'))
self.job['policy'], frag_index=self.job.get('frag_index'),
open_expired=True)
except exceptions.DiskFileNotExist:
continue
url_path = urllib.parse.quote(

View File

@ -24,7 +24,9 @@ import shutil
import random
from collections import defaultdict
import os
import time
from swift.common.direct_client import DirectClientException
from test.probe.common import ECProbeTest
from swift.common import direct_client
@ -80,19 +82,10 @@ class TestReconstructorRebuild(ECProbeTest):
headers=headers)
# PUT object and POST some metadata
contents = Body()
headers = {
self._make_name('x-object-meta-').decode('utf8'):
self._make_name('meta-foo-').decode('utf8'),
}
self.proxy_put()
self.headers_post = {
self._make_name('x-object-meta-').decode('utf8'):
self._make_name('meta-bar-').decode('utf8')}
self.etag = client.put_object(self.url, self.token,
self.container_name,
self.object_name,
contents=contents, headers=headers)
client.post_object(self.url, self.token, self.container_name,
self.object_name, headers=dict(self.headers_post))
@ -107,6 +100,19 @@ class TestReconstructorRebuild(ECProbeTest):
'X-Backend-Durable-Timestamp', hdrs,
'Missing durable timestamp in %r' % self.frag_headers)
def proxy_put(self, extra_headers=None):
contents = Body()
headers = {
self._make_name('x-object-meta-').decode('utf8'):
self._make_name('meta-foo-').decode('utf8'),
}
if extra_headers:
headers.update(extra_headers)
self.etag = client.put_object(self.url, self.token,
self.container_name,
self.object_name,
contents=contents, headers=headers)
def proxy_get(self):
# GET object
headers, body = client.get_object(self.url, self.token,
@ -118,8 +124,10 @@ class TestReconstructorRebuild(ECProbeTest):
resp_checksum.update(chunk)
return headers, resp_checksum.hexdigest()
def direct_get(self, node, part, require_durable=True):
def direct_get(self, node, part, require_durable=True, extra_headers=None):
req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
if extra_headers:
req_headers.update(extra_headers)
if not require_durable:
req_headers.update(
{'X-Backend-Fragment-Preferences': json.dumps([])})
@ -166,14 +174,15 @@ class TestReconstructorRebuild(ECProbeTest):
def _format_node(self, node):
return '%s#%s' % (node['device'], node['index'])
def _assert_all_nodes_have_frag(self):
def _assert_all_nodes_have_frag(self, extra_headers=None):
# check all frags are in place
failures = []
frag_etags = {}
frag_headers = {}
for node in self.onodes:
try:
headers, etag = self.direct_get(node, self.opart)
headers, etag = self.direct_get(node, self.opart,
extra_headers=extra_headers)
frag_etags[node['index']] = etag
del headers['Date'] # Date header will vary so remove it
frag_headers[node['index']] = headers
@ -335,6 +344,51 @@ class TestReconstructorRebuild(ECProbeTest):
# just to be nice
self.revive_drive(device_path)
def test_sync_expired_object(self):
# verify that missing frag can be rebuilt for an expired object
delete_at = int(time.time() + 3)
self.proxy_put(extra_headers={'x-delete-at': delete_at})
self.proxy_get() # sanity check
orig_frag_headers, orig_frag_etags = self._assert_all_nodes_have_frag(
extra_headers={'X-Backend-Replication': 'True'})
# wait for object to expire
time.sleep(3)
# sanity check - object has now expired, proxy get fails
with self.assertRaises(ClientException) as cm:
self.proxy_get()
self.assertEqual(404, cm.exception.http_status)
# sanity check - X-Backend-Replication let's us get expired frag...
fail_node = random.choice(self.onodes)
self.direct_get(fail_node, self.opart,
extra_headers={'X-Backend-Replication': 'True'})
# ...until we remove the frag from fail_node
self._break_nodes([self.onodes.index(fail_node)], [])
# ...now it's really gone
with self.assertRaises(DirectClientException) as cm:
self.direct_get(fail_node, self.opart,
extra_headers={'X-Backend-Replication': 'True'})
self.assertEqual(404, cm.exception.http_status)
self.assertNotIn('X-Backend-Timestamp', cm.exception.http_headers)
# run the reconstructor
self.reconstructor.once()
# the missing frag is now in place but expired
with self.assertRaises(DirectClientException) as cm:
self.direct_get(fail_node, self.opart)
self.assertEqual(404, cm.exception.http_status)
self.assertIn('X-Backend-Timestamp', cm.exception.http_headers)
# check all frags are intact, durable and have expected metadata
frag_headers, frag_etags = self._assert_all_nodes_have_frag(
extra_headers={'X-Backend-Replication': 'True'})
self.assertEqual(orig_frag_etags, frag_etags)
self.maxDiff = None
self.assertEqual(orig_frag_headers, frag_headers)
class TestReconstructorRebuildUTF8(TestReconstructorRebuild):

View File

@ -3964,6 +3964,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
self.assertEqual(
[{'timestamp': self.obj_timestamp.normal, 'exclude': []}],
json.loads(called_header['X-Backend-Fragment-Preferences']))
self.assertIn('X-Backend-Replication', called_header)
# no error and warning
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning'))

View File

@ -5711,7 +5711,8 @@ class TestObjectController(unittest.TestCase):
given_args[5], 'sda1', policy])
def test_GET_but_expired(self):
test_time = time() + 10000
now = time()
test_time = now + 10000
delete_at_timestamp = int(test_time + 100)
delete_at_container = str(
delete_at_timestamp /
@ -5734,50 +5735,52 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
orig_time = object_server.time.time
try:
t = time()
object_server.time.time = lambda: t
delete_at_timestamp = int(t + 1)
delete_at_container = str(
delete_at_timestamp /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
put_timestamp = normalize_timestamp(test_time - 1000)
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': put_timestamp,
'X-Delete-At': str(delete_at_timestamp),
'X-Delete-At-Container': delete_at_container,
'Content-Length': '4',
'Content-Type': 'application/octet-stream'})
req.body = 'TEST'
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
delete_at_timestamp = int(now + 1)
delete_at_container = str(
delete_at_timestamp /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
put_timestamp = normalize_timestamp(test_time - 1000)
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': put_timestamp,
'X-Delete-At': str(delete_at_timestamp),
'X-Delete-At-Container': delete_at_container,
'Content-Length': '4',
'Content-Type': 'application/octet-stream'})
req.body = 'TEST'
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
# fix server time to now: delete-at is in future, verify GET is ok
with mock.patch('swift.obj.server.time.time', return_value=now):
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'},
headers={'X-Timestamp': normalize_timestamp(test_time)})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
finally:
object_server.time.time = orig_time
orig_time = object_server.time.time
try:
t = time() + 2
object_server.time.time = lambda: t
# fix server time to now + 2: delete-at is in past, verify GET fails...
with mock.patch('swift.obj.server.time.time', return_value=now + 2):
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'},
headers={'X-Timestamp': normalize_timestamp(t)})
headers={'X-Timestamp': normalize_timestamp(now + 2)})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 404)
self.assertEqual(resp.headers['X-Backend-Timestamp'],
utils.Timestamp(put_timestamp))
finally:
object_server.time.time = orig_time
# ...unless X-Backend-Replication is sent
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'},
headers={'X-Timestamp': normalize_timestamp(now + 2),
'X-Backend-Replication': 'True'})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
self.assertEqual('TEST', resp.body)
def test_HEAD_but_expired(self):
test_time = time() + 10000

View File

@ -147,7 +147,7 @@ class TestBaseSsync(BaseTest):
def _open_rx_diskfile(self, obj_name, policy, frag_index=None):
df = self.rx_controller.get_diskfile(
self.device, self.partition, 'a', 'c', obj_name, policy=policy,
frag_index=frag_index)
frag_index=frag_index, open_expired=True)
df.open()
return df
@ -1346,6 +1346,41 @@ class TestSsyncReplication(TestBaseSsync):
self.assertEqual(metadata['X-Object-Meta-Test'], oname)
self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname)
def test_expired_object(self):
# verify that expired objects sync
policy = POLICIES.default
rx_node_index = 0
tx_df_mgr = self.daemon._df_router[policy]
t1 = next(self.ts_iter)
obj_name = 'o1'
metadata = {'X-Delete-At': '0', 'Content-Type': 'plain/text'}
df = self._make_diskfile(
obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name),
extra_metadata=metadata, timestamp=t1, policy=policy,
df_mgr=tx_df_mgr, verify=False)
with self.assertRaises(DiskFileExpired):
df.open() # sanity check - expired
# create ssync sender instance...
suffixes = [os.path.basename(os.path.dirname(df._datadir))]
job = {'device': self.device,
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
success, in_sync_objs = sender()
self.assertEqual(1, len(in_sync_objs))
self.assertTrue(success)
# allow the expired sender diskfile to be opened for verification
df._open_expired = True
self._verify_ondisk_files({obj_name: [df]}, policy)
def _check_no_longer_expired_object(self, obj_name, df, policy):
# verify that objects with x-delete-at metadata that are not expired
# can be sync'd