fixed _check_node() in the container sharder

Previously, _check_node() wouldn't catch the raise ValueError when
a drive was unmounted. Therefore the error would bubble up, uncaught,
and stop the shard cycle. The practical effect is that an unmounted
drive on a node would prevent sharding for happening.

This patch updates _check_node() to properly use the check_drive()
method. Furthermore, the _check_node() return value has been modified
to be more similar to what check_drive() actually returns. This
should help prevent similar errors from being introduced in the future.

Closes-Bug: #1806500

Change-Id: I3da9b5b120a5980e77ef5c4dc8fa1697e462ce0d
This commit is contained in:
John Dickinson 2018-12-03 14:22:59 -08:00
parent 333ae3086f
commit c26d67efcf
3 changed files with 87 additions and 17 deletions

View File

@ -493,18 +493,22 @@ class ContainerSharder(ContainerReplicator):
self._report_stats()
def _check_node(self, node):
"""
:return: The path to the device, if the node is mounted.
Returns False if the node is unmounted.
"""
if not node:
return False
if not is_local_device(self.ips, self.port,
node['replication_ip'],
node['replication_port']):
return False
if not check_drive(self.root, node['device'],
self.mount_check):
try:
return check_drive(self.root, node['device'], self.mount_check)
except ValueError:
self.logger.warning(
'Skipping %(device)s as it is not mounted' % node)
return False
return True
def _fetch_shard_ranges(self, broker, newest=False, params=None,
include_deleted=False):
@ -1485,9 +1489,10 @@ class ContainerSharder(ContainerReplicator):
dirs = []
self.ips = whataremyips(bind_ip=self.bind_ip)
for node in self.ring.devs:
if not self._check_node(node):
device_path = self._check_node(node)
if not device_path:
continue
datadir = os.path.join(self.root, node['device'], self.datadir)
datadir = os.path.join(device_path, self.datadir)
if os.path.isdir(datadir):
# Populate self._local_device_ids so we can find devices for
# shard containers later

View File

@ -245,6 +245,10 @@ class FakeRing(Ring):
self._device_char_iter = itertools.cycle(
['sd%s' % chr(ord('a') + x) for x in range(26)])
def add_node(self, dev):
# round trip through json to ensure unicode like real rings
self._devs.append(json.loads(json.dumps(dev)))
def set_replicas(self, replicas):
self.replicas = replicas
self._devs = []
@ -252,8 +256,7 @@ class FakeRing(Ring):
for x in range(self.replicas):
ip = '10.0.0.%s' % x
port = self._base_port + x
# round trip through json to ensure unicode like real rings
self._devs.append(json.loads(json.dumps({
dev = {
'ip': ip,
'replication_ip': ip,
'port': port,
@ -262,7 +265,8 @@ class FakeRing(Ring):
'zone': x % 3,
'region': x % 2,
'id': x,
})))
}
self.add_node(dev)
@property
def replica_count(self):

View File

@ -293,7 +293,8 @@ class TestSharder(BaseTestSharder):
conf = {'recon_cache_path': self.tempdir,
'devices': self.tempdir}
with self._mock_sharder(conf) as sharder:
sharder._check_node = lambda *args: True
sharder._check_node = lambda node: os.path.join(
sharder.conf['devices'], node['device'])
sharder.logger.clear()
brokers = []
for container in ('c1', 'c2'):
@ -468,12 +469,29 @@ class TestSharder(BaseTestSharder):
conf = {'recon_cache_path': self.tempdir,
'devices': self.tempdir,
'shard_container_threshold': 9}
with self._mock_sharder(conf) as sharder:
sharder._check_node = lambda *args: True
def fake_ismount(path):
# unmounted_dev is defined from .get_more_nodes() below
unmounted_path = os.path.join(conf['devices'],
unmounted_dev['device'])
if path == unmounted_path:
return False
else:
return True
with self._mock_sharder(conf) as sharder, \
mock.patch('swift.common.utils.ismount', fake_ismount), \
mock.patch('swift.container.sharder.is_local_device',
return_value=True):
sharder.reported = time.time()
sharder.logger = debug_logger()
brokers = []
device_ids = set(range(3))
device_ids = set(d['id'] for d in sharder.ring.devs)
sharder.ring.max_more_nodes = 1
unmounted_dev = next(sharder.ring.get_more_nodes(1))
unmounted_dev['device'] = 'xxxx'
sharder.ring.add_node(unmounted_dev)
for device_id in device_ids:
brokers.append(self._make_broker(
container='c%s' % device_id, hash_='c%shash' % device_id,
@ -494,6 +512,10 @@ class TestSharder(BaseTestSharder):
sharder._local_device_ids = {'stale_node_id'}
sharder._one_shard_cycle(Everything(), Everything())
lines = sharder.logger.get_lines_for_level('warning')
expected = 'Skipping %s as it is not mounted' % \
unmounted_dev['device']
self.assertIn(expected, lines[0])
self.assertEqual(device_ids, sharder._local_device_ids)
self.assertEqual(2, mock_process_broker.call_count)
processed_paths = [call[0][0].path
@ -542,12 +564,17 @@ class TestSharder(BaseTestSharder):
"for %s" % broker.path)
# check exceptions are handled
sharder.logger.clear()
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker', side_effect=mock_processing
) as mock_process_broker:
sharder._local_device_ids = {'stale_node_id'}
sharder._one_shard_cycle(Everything(), Everything())
lines = sharder.logger.get_lines_for_level('warning')
expected = 'Skipping %s as it is not mounted' % \
unmounted_dev['device']
self.assertIn(expected, lines[0])
self.assertEqual(device_ids, sharder._local_device_ids)
self.assertEqual(3, mock_process_broker.call_count)
processed_paths = [call[0][0].path
@ -697,6 +724,35 @@ class TestSharder(BaseTestSharder):
expected_dict.pop('meta_timestamp'))
self.assertEqual(expected_dict, actual_dict)
def test_check_node(self):
node = {
'replication_ip': '127.0.0.1',
'replication_port': 5000,
'device': 'd100',
}
with self._mock_sharder() as sharder:
sharder.mount_check = True
sharder.ips = ['127.0.0.1']
sharder.port = 5000
# normal behavior
with mock.patch(
'swift.common.utils.ismount',
lambda *args: True):
r = sharder._check_node(node)
expected = os.path.join(sharder.conf['devices'], node['device'])
self.assertEqual(r, expected)
# test with an unmounted drive
with mock.patch(
'swift.common.utils.ismount',
lambda *args: False):
r = sharder._check_node(node)
self.assertEqual(r, False)
lines = sharder.logger.get_lines_for_level('warning')
expected = 'Skipping %s as it is not mounted' % node['device']
self.assertIn(expected, lines[0])
def test_fetch_shard_ranges_unexpected_response(self):
broker = self._make_broker()
exc = internal_client.UnexpectedResponse(
@ -4368,7 +4424,8 @@ class TestSharder(BaseTestSharder):
with self._mock_sharder() as sharder:
sharder.ring = ring
sharder._check_node = lambda *args: True
sharder._check_node = lambda node: os.path.join(
sharder.conf['devices'], node['device'])
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once()
@ -4379,7 +4436,8 @@ class TestSharder(BaseTestSharder):
with self._mock_sharder() as sharder:
sharder.ring = ring
sharder._check_node = lambda *args: True
sharder._check_node = lambda node: os.path.join(
sharder.conf['devices'], node['device'])
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once(partitions='0')
@ -4390,7 +4448,8 @@ class TestSharder(BaseTestSharder):
with self._mock_sharder() as sharder:
sharder.ring = ring
sharder._check_node = lambda *args: True
sharder._check_node = lambda node: os.path.join(
sharder.conf['devices'], node['device'])
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once(partitions='2,0')
@ -4401,7 +4460,8 @@ class TestSharder(BaseTestSharder):
with self._mock_sharder() as sharder:
sharder.ring = ring
sharder._check_node = lambda *args: True
sharder._check_node = lambda node: os.path.join(
sharder.conf['devices'], node['device'])
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once(partitions='2,0', devices='sdc')
@ -4412,7 +4472,8 @@ class TestSharder(BaseTestSharder):
with self._mock_sharder() as sharder:
sharder.ring = ring
sharder._check_node = lambda *args: True
sharder._check_node = lambda node: os.path.join(
sharder.conf['devices'], node['device'])
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once(devices='sdb,sdc')