diff --git a/actions.yaml b/actions.yaml index 7c405bad..f4233aed 100644 --- a/actions.yaml +++ b/actions.yaml @@ -41,6 +41,24 @@ add-disk: bucket: type: string description: The name of the bucket in Ceph to add these devices into + osd-ids: + type: string + description: | + The OSD ids to recycle. If specified, the number of elements in this + list must be the same as the number of 'osd-devices'. + cache-devices: + type: string + description: | + A list of devices to act as caching devices for 'bcache', using the + 'osd-devices' as backing. If the number of elements in this list is + less than the number of 'osd-devices', then the caching ones will be + distributed in a round-robin fashion. + partition-size: + type: integer + description: | + The size of the partitions to create for the caching devices. If left + unspecified, then the full size of the devices will be split evenly + across partitions. required: - osd-devices blacklist-add-disk: diff --git a/actions/add_disk.py b/actions/add_disk.py index b725c9b0..6f2f9819 100755 --- a/actions/add_disk.py +++ b/actions/add_disk.py @@ -23,20 +23,51 @@ sys.path.append('hooks') import charmhelpers.contrib.storage.linux.ceph as ch_ceph import charmhelpers.core.hookenv as hookenv +from charmhelpers.core.hookenv import function_fail from charmhelpers.core.unitdata import kv +from utils import (PartitionIter, device_size, DeviceError) import ceph_hooks import charms_ceph.utils -def add_device(request, device_path, bucket=None): - charms_ceph.utils.osdize(device_path, hookenv.config('osd-format'), +def add_device(request, device_path, bucket=None, + osd_id=None, part_iter=None): + """Add a new device to be used by the OSD unit. + + :param request: A broker request to notify monitors of changes. + :type request: CephBrokerRq + + :param device_path: The absolute path to the device to be added. + :type device_path: str + + :param bucket: The bucket name in ceph to add the device into, or None. + :type bucket: Option[str, None] + + :param osd_id: The OSD Id to use, or None. + :type osd_id: Option[str, None] + + :param part_iter: The partition iterator that will create partitions on + demand, to service bcache creation, or None, if no + partitions need to be created. + :type part_iter: Option[PartitionIter, None] + """ + if part_iter is not None: + effective_dev = part_iter.create_bcache(device_path) + if not effective_dev: + raise DeviceError( + 'Failed to create bcache for device {}'.format(device_path)) + else: + effective_dev = device_path + + charms_ceph.utils.osdize(effective_dev, hookenv.config('osd-format'), ceph_hooks.get_journal_devices(), hookenv.config('ignore-device-errors'), hookenv.config('osd-encrypt'), hookenv.config('bluestore'), - hookenv.config('osd-encrypt-keymanager')) + hookenv.config('osd-encrypt-keymanager'), + osd_id) # Make it fast! if hookenv.config('autotune'): charms_ceph.utils.tune_dev(device_path) @@ -63,9 +94,10 @@ def add_device(request, device_path, bucket=None): return request -def get_devices(): +def get_devices(key): + """Get a list of the devices passed for this action, for a key.""" devices = [] - for path in hookenv.action_get('osd-devices').split(' '): + for path in (hookenv.action_get(key) or '').split(): path = path.strip() if os.path.isabs(path): devices.append(path) @@ -73,10 +105,83 @@ def get_devices(): return devices +def cache_storage(): + """Return a list of Juju storage for caches.""" + cache_ids = hookenv.storage_list('cache-devices') + return [hookenv.storage_get('location', cid) for cid in cache_ids] + + +def validate_osd_id(osd_id): + """Test that an OSD id is actually valid.""" + if isinstance(osd_id, str): + if osd_id.startswith('osd.'): + osd_id = osd_id[4:] + try: + return int(osd_id) >= 0 + except ValueError: + return False + elif isinstance(osd_id, int): + return osd_id >= 0 + return False + + +def validate_partition_size(psize, devices, caches): + """Test that the cache devices have enough room.""" + sizes = [device_size(cache) for cache in caches] + n_caches = len(caches) + for idx in range(len(devices)): + cache_idx = idx % n_caches + prev = sizes[cache_idx] - psize + if prev < 0: + function_fail('''Cache device {} does not have enough + room to provide {} {}GB partitions'''.format( + caches[cache_idx], (idx + 1) // n_caches, psize)) + sys.exit(1) + sizes[cache_idx] = prev + + if __name__ == "__main__": request = ch_ceph.CephBrokerRq() - for dev in get_devices(): - request = add_device(request=request, - device_path=dev, - bucket=hookenv.action_get("bucket")) + devices = get_devices('osd-devices') + caches = get_devices('cache-devices') or cache_storage() + if caches: + psize = hookenv.action_get('partition-size') + if psize: + validate_partition_size(psize, devices, caches) + + part_iter = PartitionIter(caches, psize, devices) + else: + part_iter = None + + osd_ids = hookenv.action_get('osd-ids') + if osd_ids: + # Validate number and format for OSD ids. + osd_ids = osd_ids.split() + if len(osd_ids) != len(devices): + function_fail('The number of osd-ids and osd-devices must match') + sys.exit(1) + for osd_id in osd_ids: + if not validate_osd_id(osd_id): + function_fail('Invalid OSD ID passed: {}'.format(osd_id)) + sys.exit(1) + else: + osd_ids = [None] * len(devices) + + errors = [] + for dev, osd_id in zip(devices, osd_ids): + try: + request = add_device(request=request, + device_path=dev, + bucket=hookenv.action_get("bucket"), + osd_id=osd_id, part_iter=part_iter) + except Exception: + errors.append(dev) + ch_ceph.send_request_if_needed(request, relation='mon') + if errors: + if part_iter is not None: + for error in errors: + part_iter.cleanup(error) + + function_fail('Failed to add devices: {}', ','.join(errors)) + sys.exit(1) diff --git a/hooks/utils.py b/hooks/utils.py index 8ac8ff1b..26f5f836 100644 --- a/hooks/utils.py +++ b/hooks/utils.py @@ -12,11 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import re import os import socket import subprocess import sys +import time sys.path.append('lib') import charms_ceph.utils as ceph @@ -336,3 +338,229 @@ def parse_osds_arguments(): "explicitly defined OSD IDs", WARNING) return args + + +class DeviceError(Exception): + + """Exception type used to signal errors raised by calling + external commands that manipulate devices. + """ + pass + + +def _check_output(args): + try: + return subprocess.check_output(args).decode('UTF-8') + except subprocess.CalledProcessError as e: + raise DeviceError(str(e)) + + +def _check_call(args): + try: + return subprocess.check_call(args) + except subprocess.CalledProcessError as e: + raise DeviceError(str(e)) + + +def setup_bcache(backing, cache): + """Create a bcache device out of the backing storage and caching device. + + :param backing: The path to the backing device. + :type backing: str + + :param cache: The path to the caching device. + :type cache: str + + :returns: The full path of the newly created bcache device. + :rtype: str + """ + _check_call(['sudo', 'make-bcache', '-B', backing, + '-C', cache, '--writeback']) + + def bcache_name(dev): + rv = _check_output(['lsblk', '-p', '-b', cache, '-J', '-o', 'NAME']) + for x in json.loads(rv)['blockdevices'][0].get('children', []): + if x['name'] != dev: + return x['name'] + + for _ in range(100): + rv = bcache_name(cache) + if rv is not None: + return rv + + # Tell the kernel to refresh the partitions. + time.sleep(0.3) + _check_call(['sudo', 'partprobe']) + + +def get_partition_names(dev): + """Given a raw device, return a set of the partitions it contains. + + :param dev: The path to the device. + :type dev: str + + :returns: A set with the partitions of the passed device. + :rtype: set[str] + """ + rv = _check_output(['lsblk', '-b', dev, '-J', '-p', '-o', 'NAME']) + rv = json.loads(rv)['blockdevices'][0].get('children', {}) + return set(x['name'] for x in rv) + + +def create_partition(cache, size, n_iter): + """Create a partition of a specific size in a device. If needed, + make sure the device has a GPT ready. + + :param cache: The path to the caching device from which to create + the partition. + :type cache: str + + :param size: The size (in GB) of the partition to create. + :type size: int + + :param n_iter: The iteration number. If zero, this function will + also create the GPT on the caching device. + :type n_iter: int + + :returns: The full path of the newly created partition. + :rtype: str + """ + if not n_iter: + # In our first iteration, make sure the device has a GPT. + _check_call(['sudo', 'parted', '-s', cache, 'mklabel', 'gpt']) + prev_partitions = get_partition_names(cache) + cmd = ['sudo', 'parted', '-s', cache, 'mkpart', 'primary', + str(n_iter * size) + 'GB', str((n_iter + 1) * size) + 'GB'] + + _check_call(cmd) + for _ in range(100): + ret = get_partition_names(cache) - prev_partitions + if ret: + return next(iter(ret)) + + time.sleep(0.3) + _check_call(['sudo', 'partprobe']) + + raise DeviceError('Failed to create partition') + + +def device_size(dev): + """Compute the size of a device, in GB. + + :param dev: The full path to the device. + :type dev: str + + :returns: The size in GB of the specified device. + :rtype: int + """ + ret = _check_output(['lsblk', '-b', '-d', dev, '-J', '-o', 'SIZE']) + ret = int(json.loads(ret)['blockdevices'][0]['size']) + return ret / (1024 * 1024 * 1024) # Return size in GB. + + +def bcache_remove(bcache, cache_dev): + """Remove a bcache kernel device, given its caching. + + :param bache: The path of the bcache device. + :type bcache: str + + :param cache_dev: The caching device used for the bcache name. + :type cache_dev: str + """ + rv = _check_output(['sudo', 'bcache-super-show', cache_dev]) + uuid = None + # Fetch the UUID for the caching device. + for line in rv.split('\n'): + idx = line.find('cset.uuid') + if idx >= 0: + uuid = line[idx + 9:].strip() + break + else: + return + bcache_name = bcache[bcache.rfind('/') + 1:] + with open('/sys/block/{}/bcache/stop'.format(bcache_name), 'wb') as f: + f.write(b'1') + with open('/sys/fs/bcache/{}/stop'.format(uuid), 'wb') as f: + f.write(b'1') + + +def wipe_disk(dev): + """Destroy all data in a specific device, including partition tables.""" + _check_call(['sudo', 'wipefs', '-a', dev]) + + +class PartitionIter: + + """Class used to create partitions iteratively. + + Objects of this type are used to create partitions out of + the specified cache devices, either with a specific size, + or with a size proportional to what is needed.""" + + def __init__(self, caches, psize, devices): + """Construct a partition iterator. + + :param caches: The list of cache devices to use. + :type caches: iterable + + :param psize: The size of the partitions (in GB), or None + :type psize: Option[int, None] + + :param devices: The backing devices. Only used to get their length. + :type devices: iterable + """ + self.caches = [[cache, 0] for cache in caches] + self.idx = 0 + if not psize: + factor = min(1.0, len(caches) / len(devices)) + self.psize = [factor * device_size(cache) for cache in caches] + else: + self.psize = psize + self.created = {} + + def __iter__(self): + return self + + def __next__(self): + """Return a newly created partition. + + The object keeps track of the currently used caching device, + so upon creating a new partition, it will move to the next one, + distributing the load among them in a round-robin fashion. + """ + cache, n_iter = self.caches[self.idx] + size = self.psize + if not isinstance(size, (int, float)): + size = self.psize[self.idx] + + self.caches[self.idx][1] += 1 + self.idx = (self.idx + 1) % len(self.caches) + log('Creating partition in device {} of size {}'.format(cache, size)) + return create_partition(cache, size, n_iter) + + def create_bcache(self, backing): + """Create a bcache device, using the internal caching device, + and an external backing one. + + :param backing: The path to the backing device. + :type backing: str + + :returns: The name for the newly created bcache device. + :rtype: str + """ + cache = next(self) + ret = setup_bcache(backing, cache) + if ret is not None: + self.created[backing] = (ret, cache) + log('Bcache device created: {}'.format(cache)) + return ret + + def cleanup(self, device): + args = self.created.get(device) + if not args: + return + + try: + bcache_remove(*args) + except DeviceError: + log('Failed to cleanup bcache device: {}'.format(args[0])) diff --git a/lib/charms_ceph/utils.py b/lib/charms_ceph/utils.py index de917a08..643f2e03 100644 --- a/lib/charms_ceph/utils.py +++ b/lib/charms_ceph/utils.py @@ -1514,11 +1514,11 @@ def get_devices(name): def osdize(dev, osd_format, osd_journal, ignore_errors=False, encrypt=False, - bluestore=False, key_manager=CEPH_KEY_MANAGER): + bluestore=False, key_manager=CEPH_KEY_MANAGER, osd_id=None): if dev.startswith('/dev'): osdize_dev(dev, osd_format, osd_journal, ignore_errors, encrypt, - bluestore, key_manager) + bluestore, key_manager, osd_id) else: if cmp_pkgrevno('ceph', '14.0.0') >= 0: log("Directory backed OSDs can not be created on Nautilus", @@ -1528,7 +1528,8 @@ def osdize(dev, osd_format, osd_journal, ignore_errors=False, encrypt=False, def osdize_dev(dev, osd_format, osd_journal, ignore_errors=False, - encrypt=False, bluestore=False, key_manager=CEPH_KEY_MANAGER): + encrypt=False, bluestore=False, key_manager=CEPH_KEY_MANAGER, + osd_id=None): """ Prepare a block device for use as a Ceph OSD @@ -1593,7 +1594,8 @@ def osdize_dev(dev, osd_format, osd_journal, ignore_errors=False, osd_journal, encrypt, bluestore, - key_manager) + key_manager, + osd_id) else: cmd = _ceph_disk(dev, osd_format, @@ -1677,7 +1679,7 @@ def _ceph_disk(dev, osd_format, osd_journal, encrypt=False, bluestore=False): def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False, - key_manager=CEPH_KEY_MANAGER): + key_manager=CEPH_KEY_MANAGER, osd_id=None): """ Prepare and activate a device for usage as a Ceph OSD using ceph-volume. @@ -1689,6 +1691,7 @@ def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False, :param: encrypt: Use block device encryption :param: bluestore: Use bluestore storage for OSD :param: key_manager: dm-crypt Key Manager to use + :param: osd_id: The OSD-id to recycle, or None to create a new one :raises subprocess.CalledProcessError: in the event that any supporting LVM operation failed. :returns: list. 'ceph-volume' command and required parameters for @@ -1710,6 +1713,9 @@ def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False, if encrypt and key_manager == CEPH_KEY_MANAGER: cmd.append('--dmcrypt') + if osd_id is not None: + cmd.extend(['--osd-id', str(osd_id)]) + # On-disk journal volume creation if not osd_journal and not bluestore: journal_lv_type = 'journal' diff --git a/metadata.yaml b/metadata.yaml index 7069f780..cbefb053 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -48,3 +48,8 @@ storage: type: block multiple: range: 0- + cache-devices: + type: block + multiple: + range: 0- + minimum-size: 10G diff --git a/test-requirements.txt b/test-requirements.txt index 7ce4d17c..cb6913c5 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -42,3 +42,5 @@ git+https://opendev.org/openstack/tempest.git#egg=tempest;python_version>='3.6' tempest<24.0.0;python_version<'3.6' croniter # needed for charm-rabbitmq-server unit tests +pyparsing<3.0.0 # aodhclient is pinned in zaza and needs pyparsing < 3.0.0, but cffi also needs it, so pin here. +cffi==1.14.6; python_version < '3.6' # cffi 1.15.0 drops support for py35. diff --git a/unit_tests/test_actions_add_disk.py b/unit_tests/test_actions_add_disk.py index e29b99ef..dd2bb64d 100644 --- a/unit_tests/test_actions_add_disk.py +++ b/unit_tests/test_actions_add_disk.py @@ -54,4 +54,50 @@ class AddDiskActionTests(CharmTestCase): self.hookenv.relation_set.assert_has_calls([call]) mock_osdize.assert_has_calls([mock.call('/dev/myosddev', None, '', True, True, True, - True)]) + True, None)]) + + piter = add_disk.PartitionIter(['/dev/cache'], 100, ['/dev/myosddev']) + mock_create_bcache = mock.MagicMock(side_effect=lambda b: b) + with mock.patch.object(add_disk.PartitionIter, 'create_bcache', + mock_create_bcache) as mock_call: + add_disk.add_device(request, '/dev/myosddev', part_iter=piter) + mock_call.assert_called() + + mock_create_bcache.side_effect = lambda b: None + with mock.patch.object(add_disk.PartitionIter, 'create_bcache', + mock_create_bcache) as mock_call: + with self.assertRaises(add_disk.DeviceError): + add_disk.add_device(request, '/dev/myosddev', part_iter=piter) + + def test_get_devices(self): + self.hookenv.action_get.return_value = '/dev/foo bar' + rv = add_disk.get_devices('') + self.assertEqual(rv, ['/dev/foo']) + self.hookenv.action_get.return_value = None + rv = add_disk.get_devices('') + self.assertEqual(rv, []) + + @mock.patch.object(add_disk, 'device_size') + @mock.patch.object(add_disk, 'function_fail') + def test_validate_psize(self, function_fail, device_size): + caches = {'cache1': 100, 'cache2': 200} + device_size.side_effect = lambda c: caches[c] + function_fail.return_value = None + with self.assertRaises(SystemExit): + add_disk.validate_partition_size( + 60, ['a', 'b', 'c'], list(caches.keys())) + self.assertIsNone(add_disk.validate_partition_size( + 60, ['a', 'b'], list(caches.keys()))) + + def test_cache_storage(self): + self.hookenv.storage_list.return_value = [{'location': 'a', 'key': 1}, + {'location': 'b'}] + self.hookenv.storage_get.side_effect = lambda k, elem: elem.get(k) + rv = add_disk.cache_storage() + self.assertEqual(['a', 'b'], rv) + + def test_validate_osd_id(self): + for elem in ('osd.1', '1', 0, 113): + self.assertTrue(add_disk.validate_osd_id(elem)) + for elem in ('osd.-1', '-3', '???', -100, 3.4, {}): + self.assertFalse(add_disk.validate_osd_id(elem)) diff --git a/unit_tests/test_ceph_utils.py b/unit_tests/test_ceph_utils.py index 722558b2..f338eb3a 100644 --- a/unit_tests/test_ceph_utils.py +++ b/unit_tests/test_ceph_utils.py @@ -15,7 +15,7 @@ import unittest -from unittest.mock import patch +from unittest.mock import patch, mock_open with patch('charmhelpers.contrib.hardening.harden.harden') as mock_dec: mock_dec.side_effect = (lambda *dargs, **dkwargs: lambda f: @@ -138,3 +138,123 @@ class CephUtilsTestCase(unittest.TestCase): parsed = utils.parse_osds_arguments() self.assertEqual(parsed, expected_id) + + @patch('subprocess.check_call') + @patch('subprocess.check_output') + def test_setup_bcache(self, check_output, check_call): + check_output.return_value = b''' + { + "blockdevices": [ + {"name":"/dev/nvme0n1", + "children": [ + {"name":"/dev/bcache0"} + ] + } + ] + } + ''' + self.assertEqual(utils.setup_bcache('', ''), '/dev/bcache0') + + @patch('subprocess.check_output') + def test_get_partition_names(self, check_output): + check_output.return_value = b''' + { + "blockdevices": [ + {"name":"/dev/sdd", + "children": [ + {"name":"/dev/sdd1"} + ] + } + ] + } + ''' + partitions = utils.get_partition_names('') + self.assertEqual(partitions, set(['/dev/sdd1'])) + # Check for a raw device with no partitions. + check_output.return_value = b''' + {"blockdevices": [{"name":"/dev/sdd"}]} + ''' + self.assertEqual(set(), utils.get_partition_names('')) + + @patch.object(utils, 'get_partition_names') + @patch('subprocess.check_call') + def test_create_partition(self, check_call, get_partition_names): + first_call = True + + def gpn(dev): + nonlocal first_call + if first_call: + first_call = False + return set() + return set(['/dev/nvm0n1p1']) + get_partition_names.side_effect = gpn + partition_name = utils.create_partition('/dev/nvm0n1', 101, 0) + self.assertEqual(partition_name, '/dev/nvm0n1p1') + args = check_call.call_args[0][0] + self.assertIn('/dev/nvm0n1', args) + self.assertIn('101GB', args) + + @patch('subprocess.check_output') + def test_device_size(self, check_output): + check_output.return_value = b''' + { + "blockdevices": [{"size":800166076416}] + } + ''' + self.assertEqual(745, int(utils.device_size(''))) + + @patch('subprocess.check_output') + def test_bcache_remove(self, check_output): + check_output.return_value = b''' + sb.magic ok + sb.first_sector 8 [match] + sb.csum 63F23B706BA0FE6A [match] + sb.version 3 [cache device] + dev.label (empty) + dev.uuid ca4ce5e1-4cf3-4330-b1c9-2c735b14cd0b + dev.sectors_per_block 1 + dev.sectors_per_bucket 1024 + dev.cache.first_sector 1024 + dev.cache.cache_sectors 1562822656 + dev.cache.total_sectors 1562823680 + dev.cache.ordered yes + dev.cache.discard no + dev.cache.pos 0 + dev.cache.replacement 0 [lru] + cset.uuid 424242 + ''' + mo = mock_open() + with patch('builtins.open', mo): + utils.bcache_remove('/dev/bcache0', '/dev/nvme0n1p1') + mo.assert_any_call('/sys/block/bcache0/bcache/stop', 'wb') + mo.assert_any_call('/sys/fs/bcache/424242/stop', 'wb') + + @patch.object(utils, 'create_partition') + @patch.object(utils, 'setup_bcache') + def test_partition_iter(self, setup_bcache, create_partition): + create_partition.side_effect = \ + lambda c, s, n: c + '|' + str(s) + '|' + str(n) + setup_bcache.side_effect = lambda *args: args + piter = utils.PartitionIter(['/dev/nvm0n1', '/dev/nvm0n2'], + 200, ['dev1', 'dev2', 'dev3']) + piter.create_bcache('dev1') + setup_bcache.assert_called_with('dev1', '/dev/nvm0n1|200|0') + setup_bcache.mock_reset() + piter.create_bcache('dev2') + setup_bcache.assert_called_with('dev2', '/dev/nvm0n2|200|0') + piter.create_bcache('dev3') + setup_bcache.assert_called_with('dev3', '/dev/nvm0n1|200|1') + + @patch.object(utils, 'device_size') + @patch.object(utils, 'create_partition') + @patch.object(utils, 'setup_bcache') + def test_partition_iter_no_size(self, setup_bcache, create_partition, + device_size): + device_size.return_value = 300 + piter = utils.PartitionIter(['/dev/nvm0n1'], 0, + ['dev1', 'dev2', 'dev3']) + create_partition.side_effect = lambda c, sz, g: sz + + # 300GB across 3 devices, i.e: 100 for each. + self.assertEqual(100, next(piter)) + self.assertEqual(100, next(piter))