Enhance the 'add-disk' action for disk replacement

As part of the task to improve disk replacement, the 'add-disk' action
needs some changes. This includes:
- Creating 'bcache' devices to accelerate disk access.
- Creating caching partitions of a specified size.
- Recycling previously deactivated OSD ids.

Change-Id: Id5027f30d51c23d2be4c34f82867d65a50b35137
Depends-On: I43d0a0bc11664c37532c0117711affc93c9d1ad1
func-test-pr: https://github.com/openstack-charmers/zaza-openstack-tests/pull/675
This commit is contained in:
Luciano Lo Giudice 2021-11-19 18:24:39 -03:00
parent 73fe60b3df
commit 65207967cf
8 changed files with 546 additions and 16 deletions

View File

@ -41,6 +41,24 @@ add-disk:
bucket: bucket:
type: string type: string
description: The name of the bucket in Ceph to add these devices into description: The name of the bucket in Ceph to add these devices into
osd-ids:
type: string
description: |
The OSD ids to recycle. If specified, the number of elements in this
list must be the same as the number of 'osd-devices'.
cache-devices:
type: string
description: |
A list of devices to act as caching devices for 'bcache', using the
'osd-devices' as backing. If the number of elements in this list is
less than the number of 'osd-devices', then the caching ones will be
distributed in a round-robin fashion.
partition-size:
type: integer
description: |
The size of the partitions to create for the caching devices. If left
unspecified, then the full size of the devices will be split evenly
across partitions.
required: required:
- osd-devices - osd-devices
blacklist-add-disk: blacklist-add-disk:

View File

@ -23,20 +23,51 @@ sys.path.append('hooks')
import charmhelpers.contrib.storage.linux.ceph as ch_ceph import charmhelpers.contrib.storage.linux.ceph as ch_ceph
import charmhelpers.core.hookenv as hookenv import charmhelpers.core.hookenv as hookenv
from charmhelpers.core.hookenv import function_fail
from charmhelpers.core.unitdata import kv from charmhelpers.core.unitdata import kv
from utils import (PartitionIter, device_size, DeviceError)
import ceph_hooks import ceph_hooks
import charms_ceph.utils import charms_ceph.utils
def add_device(request, device_path, bucket=None): def add_device(request, device_path, bucket=None,
charms_ceph.utils.osdize(device_path, hookenv.config('osd-format'), osd_id=None, part_iter=None):
"""Add a new device to be used by the OSD unit.
:param request: A broker request to notify monitors of changes.
:type request: CephBrokerRq
:param device_path: The absolute path to the device to be added.
:type device_path: str
:param bucket: The bucket name in ceph to add the device into, or None.
:type bucket: Option[str, None]
:param osd_id: The OSD Id to use, or None.
:type osd_id: Option[str, None]
:param part_iter: The partition iterator that will create partitions on
demand, to service bcache creation, or None, if no
partitions need to be created.
:type part_iter: Option[PartitionIter, None]
"""
if part_iter is not None:
effective_dev = part_iter.create_bcache(device_path)
if not effective_dev:
raise DeviceError(
'Failed to create bcache for device {}'.format(device_path))
else:
effective_dev = device_path
charms_ceph.utils.osdize(effective_dev, hookenv.config('osd-format'),
ceph_hooks.get_journal_devices(), ceph_hooks.get_journal_devices(),
hookenv.config('ignore-device-errors'), hookenv.config('ignore-device-errors'),
hookenv.config('osd-encrypt'), hookenv.config('osd-encrypt'),
hookenv.config('bluestore'), hookenv.config('bluestore'),
hookenv.config('osd-encrypt-keymanager')) hookenv.config('osd-encrypt-keymanager'),
osd_id)
# Make it fast! # Make it fast!
if hookenv.config('autotune'): if hookenv.config('autotune'):
charms_ceph.utils.tune_dev(device_path) charms_ceph.utils.tune_dev(device_path)
@ -63,9 +94,10 @@ def add_device(request, device_path, bucket=None):
return request return request
def get_devices(): def get_devices(key):
"""Get a list of the devices passed for this action, for a key."""
devices = [] devices = []
for path in hookenv.action_get('osd-devices').split(' '): for path in (hookenv.action_get(key) or '').split():
path = path.strip() path = path.strip()
if os.path.isabs(path): if os.path.isabs(path):
devices.append(path) devices.append(path)
@ -73,10 +105,83 @@ def get_devices():
return devices return devices
def cache_storage():
"""Return a list of Juju storage for caches."""
cache_ids = hookenv.storage_list('cache-devices')
return [hookenv.storage_get('location', cid) for cid in cache_ids]
def validate_osd_id(osd_id):
"""Test that an OSD id is actually valid."""
if isinstance(osd_id, str):
if osd_id.startswith('osd.'):
osd_id = osd_id[4:]
try:
return int(osd_id) >= 0
except ValueError:
return False
elif isinstance(osd_id, int):
return osd_id >= 0
return False
def validate_partition_size(psize, devices, caches):
"""Test that the cache devices have enough room."""
sizes = [device_size(cache) for cache in caches]
n_caches = len(caches)
for idx in range(len(devices)):
cache_idx = idx % n_caches
prev = sizes[cache_idx] - psize
if prev < 0:
function_fail('''Cache device {} does not have enough
room to provide {} {}GB partitions'''.format(
caches[cache_idx], (idx + 1) // n_caches, psize))
sys.exit(1)
sizes[cache_idx] = prev
if __name__ == "__main__": if __name__ == "__main__":
request = ch_ceph.CephBrokerRq() request = ch_ceph.CephBrokerRq()
for dev in get_devices(): devices = get_devices('osd-devices')
request = add_device(request=request, caches = get_devices('cache-devices') or cache_storage()
device_path=dev, if caches:
bucket=hookenv.action_get("bucket")) psize = hookenv.action_get('partition-size')
if psize:
validate_partition_size(psize, devices, caches)
part_iter = PartitionIter(caches, psize, devices)
else:
part_iter = None
osd_ids = hookenv.action_get('osd-ids')
if osd_ids:
# Validate number and format for OSD ids.
osd_ids = osd_ids.split()
if len(osd_ids) != len(devices):
function_fail('The number of osd-ids and osd-devices must match')
sys.exit(1)
for osd_id in osd_ids:
if not validate_osd_id(osd_id):
function_fail('Invalid OSD ID passed: {}'.format(osd_id))
sys.exit(1)
else:
osd_ids = [None] * len(devices)
errors = []
for dev, osd_id in zip(devices, osd_ids):
try:
request = add_device(request=request,
device_path=dev,
bucket=hookenv.action_get("bucket"),
osd_id=osd_id, part_iter=part_iter)
except Exception:
errors.append(dev)
ch_ceph.send_request_if_needed(request, relation='mon') ch_ceph.send_request_if_needed(request, relation='mon')
if errors:
if part_iter is not None:
for error in errors:
part_iter.cleanup(error)
function_fail('Failed to add devices: {}', ','.join(errors))
sys.exit(1)

View File

@ -12,11 +12,13 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import re import re
import os import os
import socket import socket
import subprocess import subprocess
import sys import sys
import time
sys.path.append('lib') sys.path.append('lib')
import charms_ceph.utils as ceph import charms_ceph.utils as ceph
@ -336,3 +338,229 @@ def parse_osds_arguments():
"explicitly defined OSD IDs", WARNING) "explicitly defined OSD IDs", WARNING)
return args return args
class DeviceError(Exception):
"""Exception type used to signal errors raised by calling
external commands that manipulate devices.
"""
pass
def _check_output(args):
try:
return subprocess.check_output(args).decode('UTF-8')
except subprocess.CalledProcessError as e:
raise DeviceError(str(e))
def _check_call(args):
try:
return subprocess.check_call(args)
except subprocess.CalledProcessError as e:
raise DeviceError(str(e))
def setup_bcache(backing, cache):
"""Create a bcache device out of the backing storage and caching device.
:param backing: The path to the backing device.
:type backing: str
:param cache: The path to the caching device.
:type cache: str
:returns: The full path of the newly created bcache device.
:rtype: str
"""
_check_call(['sudo', 'make-bcache', '-B', backing,
'-C', cache, '--writeback'])
def bcache_name(dev):
rv = _check_output(['lsblk', '-p', '-b', cache, '-J', '-o', 'NAME'])
for x in json.loads(rv)['blockdevices'][0].get('children', []):
if x['name'] != dev:
return x['name']
for _ in range(100):
rv = bcache_name(cache)
if rv is not None:
return rv
# Tell the kernel to refresh the partitions.
time.sleep(0.3)
_check_call(['sudo', 'partprobe'])
def get_partition_names(dev):
"""Given a raw device, return a set of the partitions it contains.
:param dev: The path to the device.
:type dev: str
:returns: A set with the partitions of the passed device.
:rtype: set[str]
"""
rv = _check_output(['lsblk', '-b', dev, '-J', '-p', '-o', 'NAME'])
rv = json.loads(rv)['blockdevices'][0].get('children', {})
return set(x['name'] for x in rv)
def create_partition(cache, size, n_iter):
"""Create a partition of a specific size in a device. If needed,
make sure the device has a GPT ready.
:param cache: The path to the caching device from which to create
the partition.
:type cache: str
:param size: The size (in GB) of the partition to create.
:type size: int
:param n_iter: The iteration number. If zero, this function will
also create the GPT on the caching device.
:type n_iter: int
:returns: The full path of the newly created partition.
:rtype: str
"""
if not n_iter:
# In our first iteration, make sure the device has a GPT.
_check_call(['sudo', 'parted', '-s', cache, 'mklabel', 'gpt'])
prev_partitions = get_partition_names(cache)
cmd = ['sudo', 'parted', '-s', cache, 'mkpart', 'primary',
str(n_iter * size) + 'GB', str((n_iter + 1) * size) + 'GB']
_check_call(cmd)
for _ in range(100):
ret = get_partition_names(cache) - prev_partitions
if ret:
return next(iter(ret))
time.sleep(0.3)
_check_call(['sudo', 'partprobe'])
raise DeviceError('Failed to create partition')
def device_size(dev):
"""Compute the size of a device, in GB.
:param dev: The full path to the device.
:type dev: str
:returns: The size in GB of the specified device.
:rtype: int
"""
ret = _check_output(['lsblk', '-b', '-d', dev, '-J', '-o', 'SIZE'])
ret = int(json.loads(ret)['blockdevices'][0]['size'])
return ret / (1024 * 1024 * 1024) # Return size in GB.
def bcache_remove(bcache, cache_dev):
"""Remove a bcache kernel device, given its caching.
:param bache: The path of the bcache device.
:type bcache: str
:param cache_dev: The caching device used for the bcache name.
:type cache_dev: str
"""
rv = _check_output(['sudo', 'bcache-super-show', cache_dev])
uuid = None
# Fetch the UUID for the caching device.
for line in rv.split('\n'):
idx = line.find('cset.uuid')
if idx >= 0:
uuid = line[idx + 9:].strip()
break
else:
return
bcache_name = bcache[bcache.rfind('/') + 1:]
with open('/sys/block/{}/bcache/stop'.format(bcache_name), 'wb') as f:
f.write(b'1')
with open('/sys/fs/bcache/{}/stop'.format(uuid), 'wb') as f:
f.write(b'1')
def wipe_disk(dev):
"""Destroy all data in a specific device, including partition tables."""
_check_call(['sudo', 'wipefs', '-a', dev])
class PartitionIter:
"""Class used to create partitions iteratively.
Objects of this type are used to create partitions out of
the specified cache devices, either with a specific size,
or with a size proportional to what is needed."""
def __init__(self, caches, psize, devices):
"""Construct a partition iterator.
:param caches: The list of cache devices to use.
:type caches: iterable
:param psize: The size of the partitions (in GB), or None
:type psize: Option[int, None]
:param devices: The backing devices. Only used to get their length.
:type devices: iterable
"""
self.caches = [[cache, 0] for cache in caches]
self.idx = 0
if not psize:
factor = min(1.0, len(caches) / len(devices))
self.psize = [factor * device_size(cache) for cache in caches]
else:
self.psize = psize
self.created = {}
def __iter__(self):
return self
def __next__(self):
"""Return a newly created partition.
The object keeps track of the currently used caching device,
so upon creating a new partition, it will move to the next one,
distributing the load among them in a round-robin fashion.
"""
cache, n_iter = self.caches[self.idx]
size = self.psize
if not isinstance(size, (int, float)):
size = self.psize[self.idx]
self.caches[self.idx][1] += 1
self.idx = (self.idx + 1) % len(self.caches)
log('Creating partition in device {} of size {}'.format(cache, size))
return create_partition(cache, size, n_iter)
def create_bcache(self, backing):
"""Create a bcache device, using the internal caching device,
and an external backing one.
:param backing: The path to the backing device.
:type backing: str
:returns: The name for the newly created bcache device.
:rtype: str
"""
cache = next(self)
ret = setup_bcache(backing, cache)
if ret is not None:
self.created[backing] = (ret, cache)
log('Bcache device created: {}'.format(cache))
return ret
def cleanup(self, device):
args = self.created.get(device)
if not args:
return
try:
bcache_remove(*args)
except DeviceError:
log('Failed to cleanup bcache device: {}'.format(args[0]))

View File

@ -1514,11 +1514,11 @@ def get_devices(name):
def osdize(dev, osd_format, osd_journal, ignore_errors=False, encrypt=False, def osdize(dev, osd_format, osd_journal, ignore_errors=False, encrypt=False,
bluestore=False, key_manager=CEPH_KEY_MANAGER): bluestore=False, key_manager=CEPH_KEY_MANAGER, osd_id=None):
if dev.startswith('/dev'): if dev.startswith('/dev'):
osdize_dev(dev, osd_format, osd_journal, osdize_dev(dev, osd_format, osd_journal,
ignore_errors, encrypt, ignore_errors, encrypt,
bluestore, key_manager) bluestore, key_manager, osd_id)
else: else:
if cmp_pkgrevno('ceph', '14.0.0') >= 0: if cmp_pkgrevno('ceph', '14.0.0') >= 0:
log("Directory backed OSDs can not be created on Nautilus", log("Directory backed OSDs can not be created on Nautilus",
@ -1528,7 +1528,8 @@ def osdize(dev, osd_format, osd_journal, ignore_errors=False, encrypt=False,
def osdize_dev(dev, osd_format, osd_journal, ignore_errors=False, def osdize_dev(dev, osd_format, osd_journal, ignore_errors=False,
encrypt=False, bluestore=False, key_manager=CEPH_KEY_MANAGER): encrypt=False, bluestore=False, key_manager=CEPH_KEY_MANAGER,
osd_id=None):
""" """
Prepare a block device for use as a Ceph OSD Prepare a block device for use as a Ceph OSD
@ -1593,7 +1594,8 @@ def osdize_dev(dev, osd_format, osd_journal, ignore_errors=False,
osd_journal, osd_journal,
encrypt, encrypt,
bluestore, bluestore,
key_manager) key_manager,
osd_id)
else: else:
cmd = _ceph_disk(dev, cmd = _ceph_disk(dev,
osd_format, osd_format,
@ -1677,7 +1679,7 @@ def _ceph_disk(dev, osd_format, osd_journal, encrypt=False, bluestore=False):
def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False, def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False,
key_manager=CEPH_KEY_MANAGER): key_manager=CEPH_KEY_MANAGER, osd_id=None):
""" """
Prepare and activate a device for usage as a Ceph OSD using ceph-volume. Prepare and activate a device for usage as a Ceph OSD using ceph-volume.
@ -1689,6 +1691,7 @@ def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False,
:param: encrypt: Use block device encryption :param: encrypt: Use block device encryption
:param: bluestore: Use bluestore storage for OSD :param: bluestore: Use bluestore storage for OSD
:param: key_manager: dm-crypt Key Manager to use :param: key_manager: dm-crypt Key Manager to use
:param: osd_id: The OSD-id to recycle, or None to create a new one
:raises subprocess.CalledProcessError: in the event that any supporting :raises subprocess.CalledProcessError: in the event that any supporting
LVM operation failed. LVM operation failed.
:returns: list. 'ceph-volume' command and required parameters for :returns: list. 'ceph-volume' command and required parameters for
@ -1710,6 +1713,9 @@ def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False,
if encrypt and key_manager == CEPH_KEY_MANAGER: if encrypt and key_manager == CEPH_KEY_MANAGER:
cmd.append('--dmcrypt') cmd.append('--dmcrypt')
if osd_id is not None:
cmd.extend(['--osd-id', str(osd_id)])
# On-disk journal volume creation # On-disk journal volume creation
if not osd_journal and not bluestore: if not osd_journal and not bluestore:
journal_lv_type = 'journal' journal_lv_type = 'journal'

View File

@ -48,3 +48,8 @@ storage:
type: block type: block
multiple: multiple:
range: 0- range: 0-
cache-devices:
type: block
multiple:
range: 0-
minimum-size: 10G

View File

@ -42,3 +42,5 @@ git+https://opendev.org/openstack/tempest.git#egg=tempest;python_version>='3.6'
tempest<24.0.0;python_version<'3.6' tempest<24.0.0;python_version<'3.6'
croniter # needed for charm-rabbitmq-server unit tests croniter # needed for charm-rabbitmq-server unit tests
pyparsing<3.0.0 # aodhclient is pinned in zaza and needs pyparsing < 3.0.0, but cffi also needs it, so pin here.
cffi==1.14.6; python_version < '3.6' # cffi 1.15.0 drops support for py35.

View File

@ -54,4 +54,50 @@ class AddDiskActionTests(CharmTestCase):
self.hookenv.relation_set.assert_has_calls([call]) self.hookenv.relation_set.assert_has_calls([call])
mock_osdize.assert_has_calls([mock.call('/dev/myosddev', mock_osdize.assert_has_calls([mock.call('/dev/myosddev',
None, '', True, True, True, None, '', True, True, True,
True)]) True, None)])
piter = add_disk.PartitionIter(['/dev/cache'], 100, ['/dev/myosddev'])
mock_create_bcache = mock.MagicMock(side_effect=lambda b: b)
with mock.patch.object(add_disk.PartitionIter, 'create_bcache',
mock_create_bcache) as mock_call:
add_disk.add_device(request, '/dev/myosddev', part_iter=piter)
mock_call.assert_called()
mock_create_bcache.side_effect = lambda b: None
with mock.patch.object(add_disk.PartitionIter, 'create_bcache',
mock_create_bcache) as mock_call:
with self.assertRaises(add_disk.DeviceError):
add_disk.add_device(request, '/dev/myosddev', part_iter=piter)
def test_get_devices(self):
self.hookenv.action_get.return_value = '/dev/foo bar'
rv = add_disk.get_devices('')
self.assertEqual(rv, ['/dev/foo'])
self.hookenv.action_get.return_value = None
rv = add_disk.get_devices('')
self.assertEqual(rv, [])
@mock.patch.object(add_disk, 'device_size')
@mock.patch.object(add_disk, 'function_fail')
def test_validate_psize(self, function_fail, device_size):
caches = {'cache1': 100, 'cache2': 200}
device_size.side_effect = lambda c: caches[c]
function_fail.return_value = None
with self.assertRaises(SystemExit):
add_disk.validate_partition_size(
60, ['a', 'b', 'c'], list(caches.keys()))
self.assertIsNone(add_disk.validate_partition_size(
60, ['a', 'b'], list(caches.keys())))
def test_cache_storage(self):
self.hookenv.storage_list.return_value = [{'location': 'a', 'key': 1},
{'location': 'b'}]
self.hookenv.storage_get.side_effect = lambda k, elem: elem.get(k)
rv = add_disk.cache_storage()
self.assertEqual(['a', 'b'], rv)
def test_validate_osd_id(self):
for elem in ('osd.1', '1', 0, 113):
self.assertTrue(add_disk.validate_osd_id(elem))
for elem in ('osd.-1', '-3', '???', -100, 3.4, {}):
self.assertFalse(add_disk.validate_osd_id(elem))

View File

@ -15,7 +15,7 @@
import unittest import unittest
from unittest.mock import patch from unittest.mock import patch, mock_open
with patch('charmhelpers.contrib.hardening.harden.harden') as mock_dec: with patch('charmhelpers.contrib.hardening.harden.harden') as mock_dec:
mock_dec.side_effect = (lambda *dargs, **dkwargs: lambda f: mock_dec.side_effect = (lambda *dargs, **dkwargs: lambda f:
@ -138,3 +138,123 @@ class CephUtilsTestCase(unittest.TestCase):
parsed = utils.parse_osds_arguments() parsed = utils.parse_osds_arguments()
self.assertEqual(parsed, expected_id) self.assertEqual(parsed, expected_id)
@patch('subprocess.check_call')
@patch('subprocess.check_output')
def test_setup_bcache(self, check_output, check_call):
check_output.return_value = b'''
{
"blockdevices": [
{"name":"/dev/nvme0n1",
"children": [
{"name":"/dev/bcache0"}
]
}
]
}
'''
self.assertEqual(utils.setup_bcache('', ''), '/dev/bcache0')
@patch('subprocess.check_output')
def test_get_partition_names(self, check_output):
check_output.return_value = b'''
{
"blockdevices": [
{"name":"/dev/sdd",
"children": [
{"name":"/dev/sdd1"}
]
}
]
}
'''
partitions = utils.get_partition_names('')
self.assertEqual(partitions, set(['/dev/sdd1']))
# Check for a raw device with no partitions.
check_output.return_value = b'''
{"blockdevices": [{"name":"/dev/sdd"}]}
'''
self.assertEqual(set(), utils.get_partition_names(''))
@patch.object(utils, 'get_partition_names')
@patch('subprocess.check_call')
def test_create_partition(self, check_call, get_partition_names):
first_call = True
def gpn(dev):
nonlocal first_call
if first_call:
first_call = False
return set()
return set(['/dev/nvm0n1p1'])
get_partition_names.side_effect = gpn
partition_name = utils.create_partition('/dev/nvm0n1', 101, 0)
self.assertEqual(partition_name, '/dev/nvm0n1p1')
args = check_call.call_args[0][0]
self.assertIn('/dev/nvm0n1', args)
self.assertIn('101GB', args)
@patch('subprocess.check_output')
def test_device_size(self, check_output):
check_output.return_value = b'''
{
"blockdevices": [{"size":800166076416}]
}
'''
self.assertEqual(745, int(utils.device_size('')))
@patch('subprocess.check_output')
def test_bcache_remove(self, check_output):
check_output.return_value = b'''
sb.magic ok
sb.first_sector 8 [match]
sb.csum 63F23B706BA0FE6A [match]
sb.version 3 [cache device]
dev.label (empty)
dev.uuid ca4ce5e1-4cf3-4330-b1c9-2c735b14cd0b
dev.sectors_per_block 1
dev.sectors_per_bucket 1024
dev.cache.first_sector 1024
dev.cache.cache_sectors 1562822656
dev.cache.total_sectors 1562823680
dev.cache.ordered yes
dev.cache.discard no
dev.cache.pos 0
dev.cache.replacement 0 [lru]
cset.uuid 424242
'''
mo = mock_open()
with patch('builtins.open', mo):
utils.bcache_remove('/dev/bcache0', '/dev/nvme0n1p1')
mo.assert_any_call('/sys/block/bcache0/bcache/stop', 'wb')
mo.assert_any_call('/sys/fs/bcache/424242/stop', 'wb')
@patch.object(utils, 'create_partition')
@patch.object(utils, 'setup_bcache')
def test_partition_iter(self, setup_bcache, create_partition):
create_partition.side_effect = \
lambda c, s, n: c + '|' + str(s) + '|' + str(n)
setup_bcache.side_effect = lambda *args: args
piter = utils.PartitionIter(['/dev/nvm0n1', '/dev/nvm0n2'],
200, ['dev1', 'dev2', 'dev3'])
piter.create_bcache('dev1')
setup_bcache.assert_called_with('dev1', '/dev/nvm0n1|200|0')
setup_bcache.mock_reset()
piter.create_bcache('dev2')
setup_bcache.assert_called_with('dev2', '/dev/nvm0n2|200|0')
piter.create_bcache('dev3')
setup_bcache.assert_called_with('dev3', '/dev/nvm0n1|200|1')
@patch.object(utils, 'device_size')
@patch.object(utils, 'create_partition')
@patch.object(utils, 'setup_bcache')
def test_partition_iter_no_size(self, setup_bcache, create_partition,
device_size):
device_size.return_value = 300
piter = utils.PartitionIter(['/dev/nvm0n1'], 0,
['dev1', 'dev2', 'dev3'])
create_partition.side_effect = lambda c, sz, g: sz
# 300GB across 3 devices, i.e: 100 for each.
self.assertEqual(100, next(piter))
self.assertEqual(100, next(piter))