Fix stats calculation in object-reconstructor

This patch fixes the object-reconstructor to calculate device_count
as the total number of local devices in all policies. Previously
Swift counts it for each policy but reconstruction_device_count
which means the number of devices actually swift needs to reconstruct
is counted as sum of ones for all polices.

With this patch, Swift will gather all local devices for all policies
at first, and then, collect parts for each devices as well as current.
To do so, we can see the statuses for remaining job/disks percentage via
stats_line output.

To enable this change, this patch also touchs the object replicator
to get a DiskFileManager via the DiskFileRouter class so that
DiskFileManager instances are policy specific. Currently the same
replication policy DiskFileManager class is always used, but this
change future proofs the replicator for possible other DiskFileManager
implementations.

The change also gives the ObjectReplicator a _df_router variable,
making it consistent with the ObjectReconstructor, and allowing a
common way for ssync.Sender to access DiskFileManager instances via
it's daemon's _df_router instance.

Also, remove the use of FakeReplicator from the ssync test suite. It
was not necessary and risked masking divergence between ssync and the
replicator and reconstructor daemon implementations.

Co-Author: Alistair Coles <alistair.coles@hpe.com>

Closes-Bug: #1488608
Change-Id: Ic7a4c932b59158d21a5fb4de9ed3ed57f249d068
This commit is contained in:
Kota Tsuyuzaki 2016-10-30 22:24:18 -07:00
parent a32b411f22
commit b09360d447
8 changed files with 149 additions and 141 deletions

View File

@ -800,12 +800,13 @@ class ObjectReconstructor(Daemon):
override_devices = override_devices or []
override_partitions = override_partitions or []
ips = whataremyips(self.bind_ip)
for policy in POLICIES:
if policy.policy_type != EC_POLICY:
continue
self._diskfile_mgr = self._df_router[policy]
ec_policies = (policy for policy in POLICIES
if policy.policy_type == EC_POLICY)
policy2devices = {}
for policy in ec_policies:
self.load_object_ring(policy)
data_dir = get_data_dir(policy)
local_devices = list(six.moves.filter(
lambda dev: dev and is_local_device(
ips, self.port,
@ -813,21 +814,23 @@ class ObjectReconstructor(Daemon):
policy.object_ring.devs))
if override_devices:
self.device_count = len(override_devices)
else:
self.device_count = len(local_devices)
local_devices = list(six.moves.filter(
lambda dev_info: dev_info['device'] in override_devices,
local_devices))
policy2devices[policy] = local_devices
self.device_count += len(local_devices)
for policy, local_devices in policy2devices.items():
df_mgr = self._df_router[policy]
for local_dev in local_devices:
if override_devices and (local_dev['device'] not in
override_devices):
continue
self.reconstruction_device_count += 1
dev_path = self._df_router[policy].get_dev_path(
local_dev['device'])
dev_path = df_mgr.get_dev_path(local_dev['device'])
if not dev_path:
self.logger.warning(_('%s is not mounted'),
local_dev['device'])
continue
data_dir = get_data_dir(policy)
obj_path = join(dev_path, data_dir)
tmp_path = join(dev_path, get_tmp_dir(int(policy)))
unlink_older_than(tmp_path, time.time() -

View File

@ -38,7 +38,7 @@ from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
from swift.obj import ssync_sender
from swift.obj.diskfile import DiskFileManager, get_data_dir, get_tmp_dir
from swift.obj.diskfile import get_data_dir, get_tmp_dir, DiskFileRouter
from swift.common.storage_policy import POLICIES, REPL_POLICY
DEFAULT_RSYNC_TIMEOUT = 900
@ -121,7 +121,7 @@ class ObjectReplicator(Daemon):
'operation, please disable handoffs_first and '
'handoff_delete before the next '
'normal rebalance')
self._diskfile_mgr = DiskFileManager(conf, self.logger)
self._df_router = DiskFileRouter(conf, self.logger)
def _zero_stats(self):
"""Zero out the stats."""
@ -406,9 +406,10 @@ class ObjectReplicator(Daemon):
target_devs_info = set()
failure_devs_info = set()
begin = time.time()
df_mgr = self._df_router[job['policy']]
try:
hashed, local_hash = tpool_reraise(
self._diskfile_mgr._get_hashes, job['path'],
df_mgr._get_hashes, job['path'],
do_listdir=_do_listdir(
int(job['partition']),
self.replication_cycle),
@ -462,7 +463,7 @@ class ObjectReplicator(Daemon):
self.stats['hashmatch'] += 1
continue
hashed, recalc_hash = tpool_reraise(
self._diskfile_mgr._get_hashes,
df_mgr._get_hashes,
job['path'], recalculate=suffixes,
reclaim_age=self.reclaim_age)
self.logger.update_stats('suffix.hashes', hashed)

View File

@ -80,7 +80,7 @@ class Sender(object):
def __init__(self, daemon, node, job, suffixes, remote_check_objs=None):
self.daemon = daemon
self.df_mgr = self.daemon._diskfile_mgr
self.df_mgr = self.daemon._df_router[job['policy']]
self.node = node
self.job = job
self.suffixes = suffixes

View File

@ -19,28 +19,9 @@ import tempfile
import unittest
import time
from swift.common import utils
from swift.common.storage_policy import POLICIES
from swift.common.utils import Timestamp
from swift.obj import diskfile
from test.unit import debug_logger
class FakeReplicator(object):
def __init__(self, testdir, policy=None):
self.logger = debug_logger('test-ssync-sender')
self.conn_timeout = 1
self.node_timeout = 2
self.http_timeout = 3
self.network_chunk_size = 65536
self.disk_chunk_size = 4096
conf = {
'devices': testdir,
'mount_check': 'false',
}
policy = POLICIES.default if policy is None else policy
self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger)
self._diskfile_mgr = self._diskfile_router[policy]
def write_diskfile(df, timestamp, data='test data', frag_index=None,
@ -74,9 +55,18 @@ def write_diskfile(df, timestamp, data='test data', frag_index=None,
class BaseTest(unittest.TestCase):
def setUp(self):
self.device = 'dev'
self.partition = '9'
self.tmpdir = tempfile.mkdtemp()
# sender side setup
self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
utils.mkdirs(os.path.join(self.tx_testdir, self.device))
self.daemon_conf = {
'devices': self.tx_testdir,
'mount_check': 'false',
}
# daemon will be set in subclass setUp
self.daemon = None
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir, ignore_errors=True)
@ -90,7 +80,7 @@ class BaseTest(unittest.TestCase):
object_parts = account, container, obj
timestamp = Timestamp(time.time()) if timestamp is None else timestamp
if df_mgr is None:
df_mgr = self.daemon._diskfile_router[policy]
df_mgr = self.daemon._df_router[policy]
df = df_mgr.get_diskfile(
device, partition, *object_parts, policy=policy,
frag_index=frag_index)

View File

@ -862,7 +862,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.suffixes = suffixes
self.daemon = daemon
self.job = job
hash_gen = self.daemon._diskfile_mgr.yield_hashes(
hash_gen = self.daemon._df_router[job['policy']].yield_hashes(
self.job['device'], self.job['partition'],
self.job['policy'], self.suffixes,
frag_index=self.job.get('frag_index'))

View File

@ -12,7 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import unittest
import os
import mock
@ -26,10 +26,10 @@ from collections import defaultdict
from errno import ENOENT, ENOTEMPTY, ENOTDIR
from eventlet.green import subprocess
from eventlet import Timeout, tpool
from eventlet import Timeout
from test.unit import (debug_logger, patch_policies, make_timestamp_iter,
mocked_http_conn)
mocked_http_conn, FakeLogger)
from swift.common import utils
from swift.common.utils import (hash_path, mkdirs, normalize_timestamp,
storage_directory)
@ -1623,68 +1623,80 @@ class TestObjectReplicator(unittest.TestCase):
object_replicator.http_connect = was_connector
def test_run_once_recover_from_timeout(self):
# verify that replicator will pass over all policies' partitions even
# if a timeout occurs while replicating one partition to one node.
timeouts = [Timeout()]
def fake_get_hashes(df_mgr, part_path, **kwargs):
self.get_hash_count += 1
# Simulate a REPLICATE timeout by raising Timeout for second call
# to get_hashes (with recalculate suffixes) for a specific
# partition
if (timeouts and '/objects/' in part_path and
part_path.endswith('0') and 'recalculate' in kwargs):
raise timeouts.pop(0)
return 1, {'abc': 'def'}
# map partition_path -> [nodes]
sync_paths = collections.defaultdict(list)
def fake_sync(node, job, suffixes, *args, **kwargs):
sync_paths[job['path']].append(node)
return True, {}
conf = dict(swift_dir=self.testdir, devices=self.devices,
bind_ips=_ips()[0],
bind_ip=_ips()[0], # local dev has id=0
mount_check='false', timeout='300', stats_interval='1')
replicator = object_replicator.ObjectReplicator(conf)
was_connector = object_replicator.http_connect
was_get_hashes = object_replicator.DiskFileManager._get_hashes
was_execute = tpool.execute
self.get_hash_count = 0
try:
with mock.patch('swift.obj.diskfile.DiskFileManager._get_hashes',
fake_get_hashes):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
with mock.patch('swift.obj.replicator.dump_recon_cache'):
replicator = object_replicator.ObjectReplicator(
conf, logger=FakeLogger())
def fake_get_hashes(*args, **kwargs):
self.get_hash_count += 1
if self.get_hash_count == 3:
# raise timeout on last call to get hashes
raise Timeout()
return 2, {'abc': 'def'}
self.get_hash_count = 0
with mock.patch.object(replicator, 'sync', fake_sync):
replicator.run_once()
def fake_exc(tester, *args, **kwargs):
if 'Error syncing partition timeout' in args[0]:
tester.i_failed = True
log_lines = replicator.logger.get_lines_for_level('error')
self.assertIn("Error syncing with node:", log_lines[0])
self.assertFalse(log_lines[1:])
# setup creates 4 partitions; partition 1 does not map to local dev id
# 0 so will be handled by update_delete(); partitions 0, 2, 3 are
# handled by update() for each of two policies, so expect 6 paths to be
# sync'd
self.assertEqual(6, len(sync_paths))
# partition 3 has 2 nodes in remote region, only first node is sync'd.
# partition 0 in policy 0 has fake_get_hashes timeout before first
# sync, so only second node is sync'd.
# other partitions are sync'd to 2 nodes in same region.
expected_node_count = { # map path_end -> expected sync node count
'/objects/0': 1,
'/objects/1': 2,
'/objects/2': 2,
'/objects/3': 1,
'/objects-1/0': 2,
'/objects-1/1': 2,
'/objects-1/2': 2,
'/objects-1/3': 1
}
for path, nodes in sync_paths.items():
path_end = path[path.index('/objects'):]
self.assertEqual(expected_node_count[path_end], len(nodes),
'Expected %s but got %s for path %s' %
(expected_node_count[path_end], len(nodes), path))
# partitions 0 and 2 attempt 3 calls each per policy to get_hashes = 12
# partitions 3 attempts 2 calls per policy to get_hashes = 4
# partitions 1 dosn't get_hashes because of update_deleted
self.assertEqual(16, self.get_hash_count)
self.i_failed = False
object_replicator.http_connect = mock_http_connect(200)
object_replicator.DiskFileManager._get_hashes = fake_get_hashes
replicator.logger.exception = \
lambda *args, **kwargs: fake_exc(self, *args, **kwargs)
# Write some files into '1' and run replicate- they should be moved
# to the other partitions and then node should get deleted.
cur_part = '1'
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
process_arg_checker = []
ring = replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(int(cur_part))
if node['ip'] not in _ips()]
for node in nodes:
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'],
cur_part)
process_arg_checker.append(
(0, '', ['rsync', whole_path_from, rsync_mod]))
self.assertTrue(os.access(os.path.join(self.objects,
'1', data_dir, ohash),
os.F_OK))
with _mock_process(process_arg_checker):
replicator.run_once()
self.assertFalse(process_errors)
self.assertFalse(self.i_failed)
finally:
object_replicator.http_connect = was_connector
object_replicator.DiskFileManager._get_hashes = was_get_hashes
tpool.execute = was_execute
# attempt to 16 times but succeeded only 15 times due to Timeout
suffix_hashes = sum(
count for (metric, count), _junk in
replicator.logger.log_dict['update_stats']
if metric == 'suffix.hashes')
self.assertEqual(15, suffix_hashes)
def test_run(self):
with _mock_process([(0, '')] * 100):
@ -1737,7 +1749,8 @@ class TestObjectReplicator(unittest.TestCase):
do_listdir_results = [False, False, True, False, True, False]
mock_do_listdir.side_effect = do_listdir_results
expected_tpool_calls = [
mock.call(self.replicator._diskfile_mgr._get_hashes, job['path'],
mock.call(self.replicator._df_router[job['policy']]._get_hashes,
job['path'],
do_listdir=do_listdir,
reclaim_age=self.replicator.reclaim_age)
for job, do_listdir in zip(jobs, do_listdir_results)

View File

@ -31,9 +31,10 @@ from swift.common.utils import Timestamp
from swift.obj import ssync_sender, server
from swift.obj.reconstructor import RebuildingECDiskFileStream, \
ObjectReconstructor
from swift.obj.replicator import ObjectReplicator
from test.unit import patch_policies, debug_logger, encode_frag_archive_bodies
from test.unit.obj.common import BaseTest, FakeReplicator
from test.unit.obj.common import BaseTest
class TestBaseSsync(BaseTest):
@ -46,13 +47,6 @@ class TestBaseSsync(BaseTest):
"""
def setUp(self):
super(TestBaseSsync, self).setUp()
self.device = 'dev'
self.partition = '9'
# sender side setup
self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
utils.mkdirs(os.path.join(self.tx_testdir, self.device))
self.daemon = FakeReplicator(self.tx_testdir)
# rx side setup
self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver')
utils.mkdirs(os.path.join(self.rx_testdir, self.device))
@ -142,7 +136,7 @@ class TestBaseSsync(BaseTest):
return diskfiles
def _open_tx_diskfile(self, obj_name, policy, frag_index=None):
df_mgr = self.daemon._diskfile_router[policy]
df_mgr = self.daemon._df_router[policy]
df = df_mgr.get_diskfile(
self.device, self.partition, account='a', container='c',
obj=obj_name, policy=policy, frag_index=frag_index)
@ -310,6 +304,8 @@ class TestBaseSsyncEC(TestBaseSsync):
def setUp(self):
super(TestBaseSsyncEC, self).setUp()
self.policy = POLICIES.default
self.daemon = ObjectReconstructor(self.daemon_conf,
debug_logger('test-ssync-sender'))
def _get_object_data(self, path, frag_index=None, **kwargs):
# return a frag archive for given object name and frag index.
@ -337,7 +333,7 @@ class TestSsyncEC(TestBaseSsyncEC):
tx_objs = {}
rx_objs = {}
tx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 has primary and handoff fragment archives
t1 = next(self.ts_iter)
@ -421,7 +417,7 @@ class TestSsyncEC(TestBaseSsyncEC):
# create sender side diskfiles...
tx_objs = {}
rx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
expected_subreqs = defaultdict(list)
@ -531,7 +527,7 @@ class TestSsyncEC(TestBaseSsyncEC):
tx_objs = {}
tx_tombstones = {}
rx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 only has primary
t1 = next(self.ts_iter)
@ -631,7 +627,7 @@ class TestSsyncEC(TestBaseSsyncEC):
def test_send_with_frag_index_none(self):
policy = POLICIES.default
tx_df_mgr = self.daemon._diskfile_router[policy]
tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# create an ec fragment on the remote node
ts1 = next(self.ts_iter)
@ -692,7 +688,7 @@ class TestSsyncEC(TestBaseSsyncEC):
# create non durable tx obj by not committing, then create a legacy
# .durable file
tx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
t1 = next(self.ts_iter)
tx_objs['o1'] = self._create_ondisk_files(
@ -791,7 +787,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
# create sender side diskfiles...
self.tx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[self.policy]
tx_df_mgr = self.daemon._df_router[self.policy]
t1 = next(self.ts_iter)
self.tx_objs['o1'] = self._create_ondisk_files(
tx_df_mgr, 'o1', self.policy, t1, (self.tx_node_index,))
@ -1073,6 +1069,11 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
@patch_policies
class TestSsyncReplication(TestBaseSsync):
def setUp(self):
super(TestSsyncReplication, self).setUp()
self.daemon = ObjectReplicator(self.daemon_conf,
debug_logger('test-ssync-sender'))
def test_sync(self):
policy = POLICIES.default
rx_node_index = 0
@ -1082,7 +1083,7 @@ class TestSsyncReplication(TestBaseSsync):
rx_objs = {}
tx_tombstones = {}
rx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 and o2 are on tx only
t1 = next(self.ts_iter)
@ -1204,7 +1205,7 @@ class TestSsyncReplication(TestBaseSsync):
rx_objs = {}
tx_tombstones = {}
rx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
expected_subreqs = defaultdict(list)
@ -1349,7 +1350,7 @@ class TestSsyncReplication(TestBaseSsync):
rx_node_index = 0
# create diskfiles...
tx_df_mgr = self.daemon._diskfile_router[policy]
tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# rx has data at t1 but no meta
@ -1434,7 +1435,7 @@ class TestSsyncReplication(TestBaseSsync):
# create diskfiles...
tx_objs = {}
rx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
expected_subreqs = defaultdict(list)

View File

@ -24,9 +24,10 @@ from swift.common import exceptions, utils
from swift.common.storage_policy import POLICIES
from swift.common.utils import Timestamp
from swift.obj import ssync_sender, diskfile, ssync_receiver
from swift.obj.replicator import ObjectReplicator
from test.unit import patch_policies, make_timestamp_iter
from test.unit.obj.common import FakeReplicator, BaseTest
from test.unit import patch_policies, make_timestamp_iter, debug_logger
from test.unit.obj.common import BaseTest
class NullBufferedHTTPConnection(object):
@ -84,10 +85,10 @@ class TestSender(BaseTest):
def setUp(self):
super(TestSender, self).setUp()
self.testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
utils.mkdirs(os.path.join(self.testdir, 'dev'))
self.daemon = FakeReplicator(self.testdir)
self.sender = ssync_sender.Sender(self.daemon, None, None, None)
self.daemon = ObjectReplicator(self.daemon_conf,
debug_logger('test-ssync-sender'))
job = {'policy': POLICIES.legacy} # sufficient for Sender.__init__
self.sender = ssync_sender.Sender(self.daemon, None, job, None)
def test_call_catches_MessageTimeout(self):
@ -146,8 +147,7 @@ class TestSender(BaseTest):
'1.2.3.4:5678/sda1/9 EXCEPTION in ssync.Sender:'))
def test_call_catches_exception_handling_exception(self):
job = node = None # Will cause inside exception handler to fail
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.node = None # Will cause inside exception handler to fail
self.sender.suffixes = ['abc']
self.sender.connect = 'cause exception'
success, candidates = self.sender()
@ -459,7 +459,7 @@ class TestSender(BaseTest):
':UPDATES: START\r\n'
':UPDATES: END\r\n'
))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
df = mock.MagicMock()
df.content_length = 0
@ -505,7 +505,7 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
@ -541,7 +541,7 @@ class TestSender(BaseTest):
chunk_body=(
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
@ -578,7 +578,7 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
@ -743,7 +743,7 @@ class TestSender(BaseTest):
chunk_body=(
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check()
self.assertEqual(
''.join(self.sender.connection.sent),
@ -791,7 +791,7 @@ class TestSender(BaseTest):
chunk_body=(
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check()
self.assertEqual(
''.join(self.sender.connection.sent),
@ -836,7 +836,7 @@ class TestSender(BaseTest):
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(chunk_body='\r\n')
exc = None
try:
@ -875,7 +875,7 @@ class TestSender(BaseTest):
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(
chunk_body=':MISSING_CHECK: START\r\n')
exc = None
@ -915,7 +915,7 @@ class TestSender(BaseTest):
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(chunk_body='OH HAI\r\n')
exc = None
try:
@ -959,7 +959,7 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n'
'0123abc dm\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check()
self.assertEqual(
''.join(self.sender.connection.sent),
@ -1001,7 +1001,7 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n'
'0123abc d extra response parts\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check()
self.assertEqual(self.sender.send_map,
{'0123abc': {'data': True}})
@ -1307,7 +1307,7 @@ class TestSender(BaseTest):
self.assertEqual(path, '/a/c/o')
self.assertTrue(isinstance(df, diskfile.DiskFile))
self.assertEqual(expected, df.get_metadata())
self.assertEqual(os.path.join(self.testdir, 'dev/objects/9/',
self.assertEqual(os.path.join(self.tx_testdir, 'dev/objects/9/',
object_hash[-3:], object_hash),
df._datadir)