diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 3758906ef4..007652a7ab 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -800,12 +800,13 @@ class ObjectReconstructor(Daemon): override_devices = override_devices or [] override_partitions = override_partitions or [] ips = whataremyips(self.bind_ip) - for policy in POLICIES: - if policy.policy_type != EC_POLICY: - continue - self._diskfile_mgr = self._df_router[policy] + ec_policies = (policy for policy in POLICIES + if policy.policy_type == EC_POLICY) + + policy2devices = {} + + for policy in ec_policies: self.load_object_ring(policy) - data_dir = get_data_dir(policy) local_devices = list(six.moves.filter( lambda dev: dev and is_local_device( ips, self.port, @@ -813,21 +814,23 @@ class ObjectReconstructor(Daemon): policy.object_ring.devs)) if override_devices: - self.device_count = len(override_devices) - else: - self.device_count = len(local_devices) + local_devices = list(six.moves.filter( + lambda dev_info: dev_info['device'] in override_devices, + local_devices)) + policy2devices[policy] = local_devices + self.device_count += len(local_devices) + + for policy, local_devices in policy2devices.items(): + df_mgr = self._df_router[policy] for local_dev in local_devices: - if override_devices and (local_dev['device'] not in - override_devices): - continue self.reconstruction_device_count += 1 - dev_path = self._df_router[policy].get_dev_path( - local_dev['device']) + dev_path = df_mgr.get_dev_path(local_dev['device']) if not dev_path: self.logger.warning(_('%s is not mounted'), local_dev['device']) continue + data_dir = get_data_dir(policy) obj_path = join(dev_path, data_dir) tmp_path = join(dev_path, get_tmp_dir(int(policy))) unlink_older_than(tmp_path, time.time() - diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 0aee76bfeb..8c72c3359c 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -38,7 +38,7 @@ from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE from swift.obj import ssync_sender -from swift.obj.diskfile import DiskFileManager, get_data_dir, get_tmp_dir +from swift.obj.diskfile import get_data_dir, get_tmp_dir, DiskFileRouter from swift.common.storage_policy import POLICIES, REPL_POLICY DEFAULT_RSYNC_TIMEOUT = 900 @@ -121,7 +121,7 @@ class ObjectReplicator(Daemon): 'operation, please disable handoffs_first and ' 'handoff_delete before the next ' 'normal rebalance') - self._diskfile_mgr = DiskFileManager(conf, self.logger) + self._df_router = DiskFileRouter(conf, self.logger) def _zero_stats(self): """Zero out the stats.""" @@ -406,9 +406,10 @@ class ObjectReplicator(Daemon): target_devs_info = set() failure_devs_info = set() begin = time.time() + df_mgr = self._df_router[job['policy']] try: hashed, local_hash = tpool_reraise( - self._diskfile_mgr._get_hashes, job['path'], + df_mgr._get_hashes, job['path'], do_listdir=_do_listdir( int(job['partition']), self.replication_cycle), @@ -462,7 +463,7 @@ class ObjectReplicator(Daemon): self.stats['hashmatch'] += 1 continue hashed, recalc_hash = tpool_reraise( - self._diskfile_mgr._get_hashes, + df_mgr._get_hashes, job['path'], recalculate=suffixes, reclaim_age=self.reclaim_age) self.logger.update_stats('suffix.hashes', hashed) diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index 692dd45c5b..5bf7f153d9 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -80,7 +80,7 @@ class Sender(object): def __init__(self, daemon, node, job, suffixes, remote_check_objs=None): self.daemon = daemon - self.df_mgr = self.daemon._diskfile_mgr + self.df_mgr = self.daemon._df_router[job['policy']] self.node = node self.job = job self.suffixes = suffixes diff --git a/test/unit/obj/common.py b/test/unit/obj/common.py index 48d91f1003..3fc3dc2be7 100644 --- a/test/unit/obj/common.py +++ b/test/unit/obj/common.py @@ -19,28 +19,9 @@ import tempfile import unittest import time +from swift.common import utils from swift.common.storage_policy import POLICIES from swift.common.utils import Timestamp -from swift.obj import diskfile - -from test.unit import debug_logger - - -class FakeReplicator(object): - def __init__(self, testdir, policy=None): - self.logger = debug_logger('test-ssync-sender') - self.conn_timeout = 1 - self.node_timeout = 2 - self.http_timeout = 3 - self.network_chunk_size = 65536 - self.disk_chunk_size = 4096 - conf = { - 'devices': testdir, - 'mount_check': 'false', - } - policy = POLICIES.default if policy is None else policy - self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger) - self._diskfile_mgr = self._diskfile_router[policy] def write_diskfile(df, timestamp, data='test data', frag_index=None, @@ -74,9 +55,18 @@ def write_diskfile(df, timestamp, data='test data', frag_index=None, class BaseTest(unittest.TestCase): def setUp(self): + self.device = 'dev' + self.partition = '9' + self.tmpdir = tempfile.mkdtemp() + # sender side setup + self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender') + utils.mkdirs(os.path.join(self.tx_testdir, self.device)) + self.daemon_conf = { + 'devices': self.tx_testdir, + 'mount_check': 'false', + } # daemon will be set in subclass setUp self.daemon = None - self.tmpdir = tempfile.mkdtemp() def tearDown(self): shutil.rmtree(self.tmpdir, ignore_errors=True) @@ -90,7 +80,7 @@ class BaseTest(unittest.TestCase): object_parts = account, container, obj timestamp = Timestamp(time.time()) if timestamp is None else timestamp if df_mgr is None: - df_mgr = self.daemon._diskfile_router[policy] + df_mgr = self.daemon._df_router[policy] df = df_mgr.get_diskfile( device, partition, *object_parts, policy=policy, frag_index=frag_index) diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index cec08ce506..353e7c1c7a 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -862,7 +862,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.suffixes = suffixes self.daemon = daemon self.job = job - hash_gen = self.daemon._diskfile_mgr.yield_hashes( + hash_gen = self.daemon._df_router[job['policy']].yield_hashes( self.job['device'], self.job['partition'], self.job['policy'], self.suffixes, frag_index=self.job.get('frag_index')) diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index c863e45a93..2f6cef18c2 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -12,7 +12,7 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. - +import collections import unittest import os import mock @@ -26,10 +26,10 @@ from collections import defaultdict from errno import ENOENT, ENOTEMPTY, ENOTDIR from eventlet.green import subprocess -from eventlet import Timeout, tpool +from eventlet import Timeout from test.unit import (debug_logger, patch_policies, make_timestamp_iter, - mocked_http_conn) + mocked_http_conn, FakeLogger) from swift.common import utils from swift.common.utils import (hash_path, mkdirs, normalize_timestamp, storage_directory) @@ -1623,68 +1623,80 @@ class TestObjectReplicator(unittest.TestCase): object_replicator.http_connect = was_connector def test_run_once_recover_from_timeout(self): + # verify that replicator will pass over all policies' partitions even + # if a timeout occurs while replicating one partition to one node. + timeouts = [Timeout()] + + def fake_get_hashes(df_mgr, part_path, **kwargs): + self.get_hash_count += 1 + # Simulate a REPLICATE timeout by raising Timeout for second call + # to get_hashes (with recalculate suffixes) for a specific + # partition + if (timeouts and '/objects/' in part_path and + part_path.endswith('0') and 'recalculate' in kwargs): + raise timeouts.pop(0) + return 1, {'abc': 'def'} + + # map partition_path -> [nodes] + sync_paths = collections.defaultdict(list) + + def fake_sync(node, job, suffixes, *args, **kwargs): + sync_paths[job['path']].append(node) + return True, {} + conf = dict(swift_dir=self.testdir, devices=self.devices, - bind_ips=_ips()[0], + bind_ip=_ips()[0], # local dev has id=0 mount_check='false', timeout='300', stats_interval='1') - replicator = object_replicator.ObjectReplicator(conf) - was_connector = object_replicator.http_connect - was_get_hashes = object_replicator.DiskFileManager._get_hashes - was_execute = tpool.execute - self.get_hash_count = 0 - try: + with mock.patch('swift.obj.diskfile.DiskFileManager._get_hashes', + fake_get_hashes): + with mock.patch('swift.obj.replicator.http_connect', + mock_http_connect(200)): + with mock.patch('swift.obj.replicator.dump_recon_cache'): + replicator = object_replicator.ObjectReplicator( + conf, logger=FakeLogger()) - def fake_get_hashes(*args, **kwargs): - self.get_hash_count += 1 - if self.get_hash_count == 3: - # raise timeout on last call to get hashes - raise Timeout() - return 2, {'abc': 'def'} + self.get_hash_count = 0 + with mock.patch.object(replicator, 'sync', fake_sync): + replicator.run_once() - def fake_exc(tester, *args, **kwargs): - if 'Error syncing partition timeout' in args[0]: - tester.i_failed = True + log_lines = replicator.logger.get_lines_for_level('error') + self.assertIn("Error syncing with node:", log_lines[0]) + self.assertFalse(log_lines[1:]) + # setup creates 4 partitions; partition 1 does not map to local dev id + # 0 so will be handled by update_delete(); partitions 0, 2, 3 are + # handled by update() for each of two policies, so expect 6 paths to be + # sync'd + self.assertEqual(6, len(sync_paths)) + # partition 3 has 2 nodes in remote region, only first node is sync'd. + # partition 0 in policy 0 has fake_get_hashes timeout before first + # sync, so only second node is sync'd. + # other partitions are sync'd to 2 nodes in same region. + expected_node_count = { # map path_end -> expected sync node count + '/objects/0': 1, + '/objects/1': 2, + '/objects/2': 2, + '/objects/3': 1, + '/objects-1/0': 2, + '/objects-1/1': 2, + '/objects-1/2': 2, + '/objects-1/3': 1 + } + for path, nodes in sync_paths.items(): + path_end = path[path.index('/objects'):] + self.assertEqual(expected_node_count[path_end], len(nodes), + 'Expected %s but got %s for path %s' % + (expected_node_count[path_end], len(nodes), path)) + # partitions 0 and 2 attempt 3 calls each per policy to get_hashes = 12 + # partitions 3 attempts 2 calls per policy to get_hashes = 4 + # partitions 1 dosn't get_hashes because of update_deleted + self.assertEqual(16, self.get_hash_count) - self.i_failed = False - object_replicator.http_connect = mock_http_connect(200) - object_replicator.DiskFileManager._get_hashes = fake_get_hashes - replicator.logger.exception = \ - lambda *args, **kwargs: fake_exc(self, *args, **kwargs) - # Write some files into '1' and run replicate- they should be moved - # to the other partitions and then node should get deleted. - cur_part = '1' - df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o', - policy=POLICIES.legacy) - mkdirs(df._datadir) - f = open(os.path.join(df._datadir, - normalize_timestamp(time.time()) + '.data'), - 'wb') - f.write('1234567890') - f.close() - ohash = hash_path('a', 'c', 'o') - data_dir = ohash[-3:] - whole_path_from = os.path.join(self.objects, cur_part, data_dir) - process_arg_checker = [] - ring = replicator.load_object_ring(POLICIES[0]) - nodes = [node for node in - ring.get_part_nodes(int(cur_part)) - if node['ip'] not in _ips()] - - for node in nodes: - rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], - cur_part) - process_arg_checker.append( - (0, '', ['rsync', whole_path_from, rsync_mod])) - self.assertTrue(os.access(os.path.join(self.objects, - '1', data_dir, ohash), - os.F_OK)) - with _mock_process(process_arg_checker): - replicator.run_once() - self.assertFalse(process_errors) - self.assertFalse(self.i_failed) - finally: - object_replicator.http_connect = was_connector - object_replicator.DiskFileManager._get_hashes = was_get_hashes - tpool.execute = was_execute + # attempt to 16 times but succeeded only 15 times due to Timeout + suffix_hashes = sum( + count for (metric, count), _junk in + replicator.logger.log_dict['update_stats'] + if metric == 'suffix.hashes') + self.assertEqual(15, suffix_hashes) def test_run(self): with _mock_process([(0, '')] * 100): @@ -1737,7 +1749,8 @@ class TestObjectReplicator(unittest.TestCase): do_listdir_results = [False, False, True, False, True, False] mock_do_listdir.side_effect = do_listdir_results expected_tpool_calls = [ - mock.call(self.replicator._diskfile_mgr._get_hashes, job['path'], + mock.call(self.replicator._df_router[job['policy']]._get_hashes, + job['path'], do_listdir=do_listdir, reclaim_age=self.replicator.reclaim_age) for job, do_listdir in zip(jobs, do_listdir_results) diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py index d933bc2f04..41a2a8872d 100644 --- a/test/unit/obj/test_ssync.py +++ b/test/unit/obj/test_ssync.py @@ -31,9 +31,10 @@ from swift.common.utils import Timestamp from swift.obj import ssync_sender, server from swift.obj.reconstructor import RebuildingECDiskFileStream, \ ObjectReconstructor +from swift.obj.replicator import ObjectReplicator from test.unit import patch_policies, debug_logger, encode_frag_archive_bodies -from test.unit.obj.common import BaseTest, FakeReplicator +from test.unit.obj.common import BaseTest class TestBaseSsync(BaseTest): @@ -46,13 +47,6 @@ class TestBaseSsync(BaseTest): """ def setUp(self): super(TestBaseSsync, self).setUp() - self.device = 'dev' - self.partition = '9' - # sender side setup - self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender') - utils.mkdirs(os.path.join(self.tx_testdir, self.device)) - self.daemon = FakeReplicator(self.tx_testdir) - # rx side setup self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver') utils.mkdirs(os.path.join(self.rx_testdir, self.device)) @@ -142,7 +136,7 @@ class TestBaseSsync(BaseTest): return diskfiles def _open_tx_diskfile(self, obj_name, policy, frag_index=None): - df_mgr = self.daemon._diskfile_router[policy] + df_mgr = self.daemon._df_router[policy] df = df_mgr.get_diskfile( self.device, self.partition, account='a', container='c', obj=obj_name, policy=policy, frag_index=frag_index) @@ -310,6 +304,8 @@ class TestBaseSsyncEC(TestBaseSsync): def setUp(self): super(TestBaseSsyncEC, self).setUp() self.policy = POLICIES.default + self.daemon = ObjectReconstructor(self.daemon_conf, + debug_logger('test-ssync-sender')) def _get_object_data(self, path, frag_index=None, **kwargs): # return a frag archive for given object name and frag index. @@ -337,7 +333,7 @@ class TestSsyncEC(TestBaseSsyncEC): tx_objs = {} rx_objs = {} tx_tombstones = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] # o1 has primary and handoff fragment archives t1 = next(self.ts_iter) @@ -421,7 +417,7 @@ class TestSsyncEC(TestBaseSsyncEC): # create sender side diskfiles... tx_objs = {} rx_objs = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] expected_subreqs = defaultdict(list) @@ -531,7 +527,7 @@ class TestSsyncEC(TestBaseSsyncEC): tx_objs = {} tx_tombstones = {} rx_objs = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] # o1 only has primary t1 = next(self.ts_iter) @@ -631,7 +627,7 @@ class TestSsyncEC(TestBaseSsyncEC): def test_send_with_frag_index_none(self): policy = POLICIES.default - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] # create an ec fragment on the remote node ts1 = next(self.ts_iter) @@ -692,7 +688,7 @@ class TestSsyncEC(TestBaseSsyncEC): # create non durable tx obj by not committing, then create a legacy # .durable file tx_objs = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] t1 = next(self.ts_iter) tx_objs['o1'] = self._create_ondisk_files( @@ -791,7 +787,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): # create sender side diskfiles... self.tx_objs = {} - tx_df_mgr = self.daemon._diskfile_router[self.policy] + tx_df_mgr = self.daemon._df_router[self.policy] t1 = next(self.ts_iter) self.tx_objs['o1'] = self._create_ondisk_files( tx_df_mgr, 'o1', self.policy, t1, (self.tx_node_index,)) @@ -1073,6 +1069,11 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): @patch_policies class TestSsyncReplication(TestBaseSsync): + def setUp(self): + super(TestSsyncReplication, self).setUp() + self.daemon = ObjectReplicator(self.daemon_conf, + debug_logger('test-ssync-sender')) + def test_sync(self): policy = POLICIES.default rx_node_index = 0 @@ -1082,7 +1083,7 @@ class TestSsyncReplication(TestBaseSsync): rx_objs = {} tx_tombstones = {} rx_tombstones = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] # o1 and o2 are on tx only t1 = next(self.ts_iter) @@ -1204,7 +1205,7 @@ class TestSsyncReplication(TestBaseSsync): rx_objs = {} tx_tombstones = {} rx_tombstones = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] expected_subreqs = defaultdict(list) @@ -1349,7 +1350,7 @@ class TestSsyncReplication(TestBaseSsync): rx_node_index = 0 # create diskfiles... - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] # rx has data at t1 but no meta @@ -1434,7 +1435,7 @@ class TestSsyncReplication(TestBaseSsync): # create diskfiles... tx_objs = {} rx_objs = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] expected_subreqs = defaultdict(list) diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index c7968c68f0..ab9053bd13 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -24,9 +24,10 @@ from swift.common import exceptions, utils from swift.common.storage_policy import POLICIES from swift.common.utils import Timestamp from swift.obj import ssync_sender, diskfile, ssync_receiver +from swift.obj.replicator import ObjectReplicator -from test.unit import patch_policies, make_timestamp_iter -from test.unit.obj.common import FakeReplicator, BaseTest +from test.unit import patch_policies, make_timestamp_iter, debug_logger +from test.unit.obj.common import BaseTest class NullBufferedHTTPConnection(object): @@ -84,10 +85,10 @@ class TestSender(BaseTest): def setUp(self): super(TestSender, self).setUp() - self.testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender') - utils.mkdirs(os.path.join(self.testdir, 'dev')) - self.daemon = FakeReplicator(self.testdir) - self.sender = ssync_sender.Sender(self.daemon, None, None, None) + self.daemon = ObjectReplicator(self.daemon_conf, + debug_logger('test-ssync-sender')) + job = {'policy': POLICIES.legacy} # sufficient for Sender.__init__ + self.sender = ssync_sender.Sender(self.daemon, None, job, None) def test_call_catches_MessageTimeout(self): @@ -146,8 +147,7 @@ class TestSender(BaseTest): '1.2.3.4:5678/sda1/9 EXCEPTION in ssync.Sender:')) def test_call_catches_exception_handling_exception(self): - job = node = None # Will cause inside exception handler to fail - self.sender = ssync_sender.Sender(self.daemon, node, job, None) + self.sender.node = None # Will cause inside exception handler to fail self.sender.suffixes = ['abc'] self.sender.connect = 'cause exception' success, candidates = self.sender() @@ -459,7 +459,7 @@ class TestSender(BaseTest): ':UPDATES: START\r\n' ':UPDATES: END\r\n' )) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.connect = mock.MagicMock() df = mock.MagicMock() df.content_length = 0 @@ -505,7 +505,7 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' '9d41d8cd98f00b204e9800998ecf0abc d\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.connect = mock.MagicMock() self.sender.updates = mock.MagicMock() self.sender.disconnect = mock.MagicMock() @@ -541,7 +541,7 @@ class TestSender(BaseTest): chunk_body=( ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.connect = mock.MagicMock() self.sender.updates = mock.MagicMock() self.sender.disconnect = mock.MagicMock() @@ -578,7 +578,7 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' '9d41d8cd98f00b204e9800998ecf0abc d\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.connect = mock.MagicMock() self.sender.updates = mock.MagicMock() self.sender.disconnect = mock.MagicMock() @@ -743,7 +743,7 @@ class TestSender(BaseTest): chunk_body=( ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.missing_check() self.assertEqual( ''.join(self.sender.connection.sent), @@ -791,7 +791,7 @@ class TestSender(BaseTest): chunk_body=( ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.missing_check() self.assertEqual( ''.join(self.sender.connection.sent), @@ -836,7 +836,7 @@ class TestSender(BaseTest): 'policy': POLICIES.legacy, } self.sender.suffixes = ['abc'] - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.response = FakeResponse(chunk_body='\r\n') exc = None try: @@ -875,7 +875,7 @@ class TestSender(BaseTest): 'policy': POLICIES.legacy, } self.sender.suffixes = ['abc'] - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.response = FakeResponse( chunk_body=':MISSING_CHECK: START\r\n') exc = None @@ -915,7 +915,7 @@ class TestSender(BaseTest): 'policy': POLICIES.legacy, } self.sender.suffixes = ['abc'] - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.response = FakeResponse(chunk_body='OH HAI\r\n') exc = None try: @@ -959,7 +959,7 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' '0123abc dm\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.missing_check() self.assertEqual( ''.join(self.sender.connection.sent), @@ -1001,7 +1001,7 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' '0123abc d extra response parts\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.missing_check() self.assertEqual(self.sender.send_map, {'0123abc': {'data': True}}) @@ -1307,7 +1307,7 @@ class TestSender(BaseTest): self.assertEqual(path, '/a/c/o') self.assertTrue(isinstance(df, diskfile.DiskFile)) self.assertEqual(expected, df.get_metadata()) - self.assertEqual(os.path.join(self.testdir, 'dev/objects/9/', + self.assertEqual(os.path.join(self.tx_testdir, 'dev/objects/9/', object_hash[-3:], object_hash), df._datadir)