diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 0e8d11f966..555ca41fb4 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -407,6 +407,7 @@ class ObjectReconstructor(Daemon): # need to be durable. headers = self.headers.copy() headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) + headers['X-Backend-Replication'] = 'True' frag_prefs = [{'timestamp': datafile_metadata['X-Timestamp'], 'exclude': []}] headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs) diff --git a/swift/obj/server.py b/swift/obj/server.py index 1a370571c8..9a50b66bbb 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -908,7 +908,9 @@ class ObjectController(BaseStorageServer): try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy=policy, frag_prefs=frag_prefs) + policy=policy, frag_prefs=frag_prefs, + open_expired=config_true_value( + request.headers.get('x-backend-replication', 'false'))) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index fcdc0496c9..3689a13648 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -329,7 +329,8 @@ class Sender(object): try: df = self.df_mgr.get_diskfile_from_hash( self.job['device'], self.job['partition'], object_hash, - self.job['policy'], frag_index=self.job.get('frag_index')) + self.job['policy'], frag_index=self.job.get('frag_index'), + open_expired=True) except exceptions.DiskFileNotExist: continue url_path = urllib.parse.quote( diff --git a/test/probe/test_reconstructor_rebuild.py b/test/probe/test_reconstructor_rebuild.py index cd4ee1cf0d..fd29c41b7f 100644 --- a/test/probe/test_reconstructor_rebuild.py +++ b/test/probe/test_reconstructor_rebuild.py @@ -24,7 +24,9 @@ import shutil import random from collections import defaultdict import os +import time +from swift.common.direct_client import DirectClientException from test.probe.common import ECProbeTest from swift.common import direct_client @@ -80,19 +82,10 @@ class TestReconstructorRebuild(ECProbeTest): headers=headers) # PUT object and POST some metadata - contents = Body() - headers = { - self._make_name('x-object-meta-').decode('utf8'): - self._make_name('meta-foo-').decode('utf8'), - } + self.proxy_put() self.headers_post = { self._make_name('x-object-meta-').decode('utf8'): self._make_name('meta-bar-').decode('utf8')} - - self.etag = client.put_object(self.url, self.token, - self.container_name, - self.object_name, - contents=contents, headers=headers) client.post_object(self.url, self.token, self.container_name, self.object_name, headers=dict(self.headers_post)) @@ -107,6 +100,19 @@ class TestReconstructorRebuild(ECProbeTest): 'X-Backend-Durable-Timestamp', hdrs, 'Missing durable timestamp in %r' % self.frag_headers) + def proxy_put(self, extra_headers=None): + contents = Body() + headers = { + self._make_name('x-object-meta-').decode('utf8'): + self._make_name('meta-foo-').decode('utf8'), + } + if extra_headers: + headers.update(extra_headers) + self.etag = client.put_object(self.url, self.token, + self.container_name, + self.object_name, + contents=contents, headers=headers) + def proxy_get(self): # GET object headers, body = client.get_object(self.url, self.token, @@ -118,8 +124,10 @@ class TestReconstructorRebuild(ECProbeTest): resp_checksum.update(chunk) return headers, resp_checksum.hexdigest() - def direct_get(self, node, part, require_durable=True): + def direct_get(self, node, part, require_durable=True, extra_headers=None): req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)} + if extra_headers: + req_headers.update(extra_headers) if not require_durable: req_headers.update( {'X-Backend-Fragment-Preferences': json.dumps([])}) @@ -166,14 +174,15 @@ class TestReconstructorRebuild(ECProbeTest): def _format_node(self, node): return '%s#%s' % (node['device'], node['index']) - def _assert_all_nodes_have_frag(self): + def _assert_all_nodes_have_frag(self, extra_headers=None): # check all frags are in place failures = [] frag_etags = {} frag_headers = {} for node in self.onodes: try: - headers, etag = self.direct_get(node, self.opart) + headers, etag = self.direct_get(node, self.opart, + extra_headers=extra_headers) frag_etags[node['index']] = etag del headers['Date'] # Date header will vary so remove it frag_headers[node['index']] = headers @@ -335,6 +344,51 @@ class TestReconstructorRebuild(ECProbeTest): # just to be nice self.revive_drive(device_path) + def test_sync_expired_object(self): + # verify that missing frag can be rebuilt for an expired object + delete_at = int(time.time() + 3) + self.proxy_put(extra_headers={'x-delete-at': delete_at}) + self.proxy_get() # sanity check + orig_frag_headers, orig_frag_etags = self._assert_all_nodes_have_frag( + extra_headers={'X-Backend-Replication': 'True'}) + + # wait for object to expire + time.sleep(3) + + # sanity check - object has now expired, proxy get fails + with self.assertRaises(ClientException) as cm: + self.proxy_get() + self.assertEqual(404, cm.exception.http_status) + + # sanity check - X-Backend-Replication let's us get expired frag... + fail_node = random.choice(self.onodes) + self.direct_get(fail_node, self.opart, + extra_headers={'X-Backend-Replication': 'True'}) + # ...until we remove the frag from fail_node + self._break_nodes([self.onodes.index(fail_node)], []) + # ...now it's really gone + with self.assertRaises(DirectClientException) as cm: + self.direct_get(fail_node, self.opart, + extra_headers={'X-Backend-Replication': 'True'}) + self.assertEqual(404, cm.exception.http_status) + self.assertNotIn('X-Backend-Timestamp', cm.exception.http_headers) + + # run the reconstructor + self.reconstructor.once() + + # the missing frag is now in place but expired + with self.assertRaises(DirectClientException) as cm: + self.direct_get(fail_node, self.opart) + self.assertEqual(404, cm.exception.http_status) + self.assertIn('X-Backend-Timestamp', cm.exception.http_headers) + + # check all frags are intact, durable and have expected metadata + frag_headers, frag_etags = self._assert_all_nodes_have_frag( + extra_headers={'X-Backend-Replication': 'True'}) + self.assertEqual(orig_frag_etags, frag_etags) + self.maxDiff = None + self.assertEqual(orig_frag_headers, frag_headers) + class TestReconstructorRebuildUTF8(TestReconstructorRebuild): diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 942703f11e..b9f4d34097 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -3964,6 +3964,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): self.assertEqual( [{'timestamp': self.obj_timestamp.normal, 'exclude': []}], json.loads(called_header['X-Backend-Fragment-Preferences'])) + self.assertIn('X-Backend-Replication', called_header) # no error and warning self.assertFalse(self.logger.get_lines_for_level('error')) self.assertFalse(self.logger.get_lines_for_level('warning')) diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 17a6dbc559..888eb22867 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -5711,7 +5711,8 @@ class TestObjectController(unittest.TestCase): given_args[5], 'sda1', policy]) def test_GET_but_expired(self): - test_time = time() + 10000 + now = time() + test_time = now + 10000 delete_at_timestamp = int(test_time + 100) delete_at_container = str( delete_at_timestamp / @@ -5734,50 +5735,52 @@ class TestObjectController(unittest.TestCase): resp = req.get_response(self.object_controller) self.assertEqual(resp.status_int, 200) - orig_time = object_server.time.time - try: - t = time() - object_server.time.time = lambda: t - delete_at_timestamp = int(t + 1) - delete_at_container = str( - delete_at_timestamp / - self.object_controller.expiring_objects_container_divisor * - self.object_controller.expiring_objects_container_divisor) - put_timestamp = normalize_timestamp(test_time - 1000) - req = Request.blank( - '/sda1/p/a/c/o', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Timestamp': put_timestamp, - 'X-Delete-At': str(delete_at_timestamp), - 'X-Delete-At-Container': delete_at_container, - 'Content-Length': '4', - 'Content-Type': 'application/octet-stream'}) - req.body = 'TEST' - resp = req.get_response(self.object_controller) - self.assertEqual(resp.status_int, 201) + delete_at_timestamp = int(now + 1) + delete_at_container = str( + delete_at_timestamp / + self.object_controller.expiring_objects_container_divisor * + self.object_controller.expiring_objects_container_divisor) + put_timestamp = normalize_timestamp(test_time - 1000) + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': put_timestamp, + 'X-Delete-At': str(delete_at_timestamp), + 'X-Delete-At-Container': delete_at_container, + 'Content-Length': '4', + 'Content-Type': 'application/octet-stream'}) + req.body = 'TEST' + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 201) + + # fix server time to now: delete-at is in future, verify GET is ok + with mock.patch('swift.obj.server.time.time', return_value=now): req = Request.blank( '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'GET'}, headers={'X-Timestamp': normalize_timestamp(test_time)}) resp = req.get_response(self.object_controller) self.assertEqual(resp.status_int, 200) - finally: - object_server.time.time = orig_time - orig_time = object_server.time.time - try: - t = time() + 2 - object_server.time.time = lambda: t + # fix server time to now + 2: delete-at is in past, verify GET fails... + with mock.patch('swift.obj.server.time.time', return_value=now + 2): req = Request.blank( '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'GET'}, - headers={'X-Timestamp': normalize_timestamp(t)}) + headers={'X-Timestamp': normalize_timestamp(now + 2)}) resp = req.get_response(self.object_controller) self.assertEqual(resp.status_int, 404) self.assertEqual(resp.headers['X-Backend-Timestamp'], utils.Timestamp(put_timestamp)) - finally: - object_server.time.time = orig_time + # ...unless X-Backend-Replication is sent + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'GET'}, + headers={'X-Timestamp': normalize_timestamp(now + 2), + 'X-Backend-Replication': 'True'}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 200) + self.assertEqual('TEST', resp.body) def test_HEAD_but_expired(self): test_time = time() + 10000 diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py index abfb71467f..130e583d22 100644 --- a/test/unit/obj/test_ssync.py +++ b/test/unit/obj/test_ssync.py @@ -147,7 +147,7 @@ class TestBaseSsync(BaseTest): def _open_rx_diskfile(self, obj_name, policy, frag_index=None): df = self.rx_controller.get_diskfile( self.device, self.partition, 'a', 'c', obj_name, policy=policy, - frag_index=frag_index) + frag_index=frag_index, open_expired=True) df.open() return df @@ -1346,6 +1346,41 @@ class TestSsyncReplication(TestBaseSsync): self.assertEqual(metadata['X-Object-Meta-Test'], oname) self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname) + def test_expired_object(self): + # verify that expired objects sync + policy = POLICIES.default + rx_node_index = 0 + tx_df_mgr = self.daemon._df_router[policy] + t1 = next(self.ts_iter) + obj_name = 'o1' + metadata = {'X-Delete-At': '0', 'Content-Type': 'plain/text'} + df = self._make_diskfile( + obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name), + extra_metadata=metadata, timestamp=t1, policy=policy, + df_mgr=tx_df_mgr, verify=False) + with self.assertRaises(DiskFileExpired): + df.open() # sanity check - expired + + # create ssync sender instance... + suffixes = [os.path.basename(os.path.dirname(df._datadir))] + job = {'device': self.device, + 'partition': self.partition, + 'policy': policy} + node = dict(self.rx_node) + node.update({'index': rx_node_index}) + sender = ssync_sender.Sender(self.daemon, node, job, suffixes) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) + + # run the sync protocol... + success, in_sync_objs = sender() + + self.assertEqual(1, len(in_sync_objs)) + self.assertTrue(success) + # allow the expired sender diskfile to be opened for verification + df._open_expired = True + self._verify_ondisk_files({obj_name: [df]}, policy) + def _check_no_longer_expired_object(self, obj_name, df, policy): # verify that objects with x-delete-at metadata that are not expired # can be sync'd