From e071486acf8e90f293ce5ce8880047384527c9f0 Mon Sep 17 00:00:00 2001 From: James Page Date: Fri, 8 Jun 2018 12:02:06 +0100 Subject: [PATCH] Add 'osd blacklist' to default mon perms Ensure that the default permissions for clients include the 'osd blacklist' command; This ensures that in the event of a client crashing (due to power outage or segfault), the client and re-connect and write to any devices on reboot. This is a safe permission for all supported Ceph releases. Depends-On: I0b43dece4e1c56fb838b0147bfb75fb9906e6657 Change-Id: Ib1f1e8d7ed54528603b8b08051dafeec075a3232 Closes-Bug: 1773449 --- hooks/ceph_hooks.py | 5 + lib/ceph/broker.py | 7 +- lib/ceph/utils.py | 660 +++++++++++++++++++++++++--------- unit_tests/test_ceph_hooks.py | 8 +- 4 files changed, 502 insertions(+), 178 deletions(-) diff --git a/hooks/ceph_hooks.py b/hooks/ceph_hooks.py index 24095b33..e192d846 100755 --- a/hooks/ceph_hooks.py +++ b/hooks/ceph_hooks.py @@ -658,6 +658,11 @@ def upgrade_charm(): if is_relation_made("nrpe-external-master"): update_nrpe_config() + # NOTE(jamespage): + # Reprocess broker requests to ensure that any cephx + # key permission changes are applied + notify_client() + @hooks.hook('start') def start(): diff --git a/lib/ceph/broker.py b/lib/ceph/broker.py index 0b6d3e24..3e857d21 100644 --- a/lib/ceph/broker.py +++ b/lib/ceph/broker.py @@ -81,6 +81,10 @@ POOL_KEYS = { "cache_min_flush_age": [int], "cache_min_evict_age": [int], "fast_read": [bool], + "allow_ec_overwrites": [bool], + "compression_mode": [str, ["none", "passive", "aggressive", "force"]], + "compression_algorithm": [str, ["lz4", "snappy", "zlib", "zstd"]], + "compression_required_ratio": [float, [0.0, 1.0]], } CEPH_BUCKET_TYPES = [ @@ -251,7 +255,8 @@ def pool_permission_list_for_service(service): for prefix in prefixes: permissions.append("allow {} object_prefix {}".format(permission, prefix)) - return ["mon", "allow r", "osd", ', '.join(permissions)] + return ['mon', 'allow r, allow command "osd blacklist"', + 'osd', ', '.join(permissions)] def get_service_groups(service, namespace=None): diff --git a/lib/ceph/utils.py b/lib/ceph/utils.py index f412032b..7bb951ca 100644 --- a/lib/ceph/utils.py +++ b/lib/ceph/utils.py @@ -13,8 +13,7 @@ # limitations under the License. import collections -import ctypes -import errno +import glob import json import os import pyudev @@ -24,7 +23,7 @@ import socket import subprocess import sys import time -import shutil +import uuid from datetime import datetime @@ -36,7 +35,6 @@ from charmhelpers.core.host import ( cmp_pkgrevno, lsb_release, mkdir, - mounts, owner, service_restart, service_start, @@ -68,11 +66,12 @@ from charmhelpers.contrib.storage.linux.ceph import ( from charmhelpers.contrib.storage.linux.utils import ( is_block_device, is_device_mounted, - zap_disk, ) from charmhelpers.contrib.openstack.utils import ( get_os_codename_install_source, ) +from charmhelpers.contrib.storage.linux import lvm +from charmhelpers.core.unitdata import kv CEPH_BASE_DIR = os.path.join(os.sep, 'var', 'lib', 'ceph') OSD_BASE_DIR = os.path.join(CEPH_BASE_DIR, 'osd') @@ -83,7 +82,15 @@ PEON = 'peon' QUORUM = [LEADER, PEON] PACKAGES = ['ceph', 'gdisk', 'ntp', 'btrfs-tools', 'python-ceph', - 'radosgw', 'xfsprogs', 'python-pyudev'] + 'radosgw', 'xfsprogs', 'python-pyudev', + 'lvm2', 'parted'] + +CEPH_KEY_MANAGER = 'ceph' +VAULT_KEY_MANAGER = 'vault' +KEY_MANAGERS = [ + CEPH_KEY_MANAGER, + VAULT_KEY_MANAGER, +] LinkSpeed = { "BASE_10": 10, @@ -823,114 +830,6 @@ CEPH_PARTITIONS = [ ] -def umount(mount_point): - """This function unmounts a mounted directory forcibly. This will - be used for unmounting broken hard drive mounts which may hang. - - If umount returns EBUSY this will lazy unmount. - - :param mount_point: str. A String representing the filesystem mount point - :returns: int. Returns 0 on success. errno otherwise. - """ - libc_path = ctypes.util.find_library("c") - libc = ctypes.CDLL(libc_path, use_errno=True) - - # First try to umount with MNT_FORCE - ret = libc.umount(mount_point, 1) - if ret < 0: - err = ctypes.get_errno() - if err == errno.EBUSY: - # Detach from try. IE lazy umount - ret = libc.umount(mount_point, 2) - if ret < 0: - err = ctypes.get_errno() - return err - return 0 - else: - return err - return 0 - - -def replace_osd(dead_osd_number, - dead_osd_device, - new_osd_device, - osd_format, - osd_journal, - reformat_osd=False, - ignore_errors=False): - """This function will automate the replacement of a failed osd disk as much - as possible. It will revoke the keys for the old osd, remove it from the - crush map and then add a new osd into the cluster. - - :param dead_osd_number: The osd number found in ceph osd tree. Example: 99 - :param dead_osd_device: The physical device. Example: /dev/sda - :param osd_format: - :param osd_journal: - :param reformat_osd: - :param ignore_errors: - """ - host_mounts = mounts() - mount_point = None - for mount in host_mounts: - if mount[1] == dead_osd_device: - mount_point = mount[0] - # need to convert dev to osd number - # also need to get the mounted drive so we can tell the admin to - # replace it - try: - # Drop this osd out of the cluster. This will begin a - # rebalance operation - status_set('maintenance', 'Removing osd {}'.format(dead_osd_number)) - subprocess.check_output([ - 'ceph', - '--id', - 'osd-upgrade', - 'osd', 'out', - 'osd.{}'.format(dead_osd_number)]) - - # Kill the osd process if it's not already dead - if systemd(): - service_stop('ceph-osd@{}'.format(dead_osd_number)) - else: - subprocess.check_output(['stop', 'ceph-osd', 'id={}'.format( - dead_osd_number)]) - # umount if still mounted - ret = umount(mount_point) - if ret < 0: - raise RuntimeError('umount {} failed with error: {}'.format( - mount_point, os.strerror(ret))) - # Clean up the old mount point - shutil.rmtree(mount_point) - subprocess.check_output([ - 'ceph', - '--id', - 'osd-upgrade', - 'osd', 'crush', 'remove', - 'osd.{}'.format(dead_osd_number)]) - # Revoke the OSDs access keys - subprocess.check_output([ - 'ceph', - '--id', - 'osd-upgrade', - 'auth', 'del', - 'osd.{}'.format(dead_osd_number)]) - subprocess.check_output([ - 'ceph', - '--id', - 'osd-upgrade', - 'osd', 'rm', - 'osd.{}'.format(dead_osd_number)]) - status_set('maintenance', 'Setting up replacement osd {}'.format( - new_osd_device)) - osdize(new_osd_device, - osd_format, - osd_journal, - reformat_osd, - ignore_errors) - except subprocess.CalledProcessError as e: - log('replace_osd failed with error: ' + e.output) - - def get_partition_list(dev): """Lists the partitions of a block device. @@ -970,7 +869,42 @@ def get_partition_list(dev): raise +def is_pristine_disk(dev): + """ + Read first 2048 bytes (LBA 0 - 3) of block device to determine whether it + is actually all zeros and safe for us to use. + + Existing partitioning tools does not discern between a failure to read from + block device, failure to understand a partition table and the fact that a + block device has no partition table. Since we need to be positive about + which is which we need to read the device directly and confirm ourselves. + + :param dev: Path to block device + :type dev: str + :returns: True all 2048 bytes == 0x0, False if not + :rtype: bool + """ + want_bytes = 2048 + + f = open(dev, 'rb') + data = f.read(want_bytes) + read_bytes = len(data) + if read_bytes != want_bytes: + log('{}: short read, got {} bytes expected {}.' + .format(dev, read_bytes, want_bytes), level=WARNING) + return False + + return all(byte == 0x0 for byte in data) + + def is_osd_disk(dev): + db = kv() + osd_devices = db.get('osd-devices', []) + if dev in osd_devices: + log('Device {} already processed by charm,' + ' skipping'.format(dev)) + return True + partitions = get_partition_list(dev) for partition in partitions: try: @@ -1008,6 +942,9 @@ def rescan_osd_devices(): subprocess.call(cmd) + cmd = ['udevadm', 'settle'] + subprocess.call(cmd) + _bootstrap_keyring = "/var/lib/ceph/bootstrap-osd/ceph.keyring" _upgrade_keyring = "/var/lib/ceph/osd/ceph.client.osd-upgrade.keyring" @@ -1159,7 +1096,8 @@ def get_mds_bootstrap_key(): _default_caps = collections.OrderedDict([ - ('mon', ['allow r']), + ('mon', ['allow r', + 'allow command "osd blacklist"']), ('osd', ['allow rwx']), ]) @@ -1226,6 +1164,7 @@ def get_named_key(name, caps=None, pool_list=None): :param caps: dict of cephx capabilities :returns: Returns a cephx key """ + key_name = 'client.{}'.format(name) try: # Does the key already exist? output = str(subprocess.check_output( @@ -1240,8 +1179,14 @@ def get_named_key(name, caps=None, pool_list=None): ), 'auth', 'get', - 'client.{}'.format(name), + key_name, ]).decode('UTF-8')).strip() + # NOTE(jamespage); + # Apply any changes to key capabilities, dealing with + # upgrades which requires new caps for operation. + upgrade_key_caps(key_name, + caps or _default_caps, + pool_list) return parse_key(output) except subprocess.CalledProcessError: # Couldn't get the key, time to create it! @@ -1257,7 +1202,7 @@ def get_named_key(name, caps=None, pool_list=None): '/var/lib/ceph/mon/ceph-{}/keyring'.format( socket.gethostname() ), - 'auth', 'get-or-create', 'client.{}'.format(name), + 'auth', 'get-or-create', key_name, ] # Add capabilities for subsystem, subcaps in caps.items(): @@ -1276,7 +1221,7 @@ def get_named_key(name, caps=None, pool_list=None): .strip()) # IGNORE:E1103 -def upgrade_key_caps(key, caps): +def upgrade_key_caps(key, caps, pool_list=None): """ Upgrade key to have capabilities caps """ if not is_leader(): # Not the MON leader OR not clustered @@ -1285,6 +1230,12 @@ def upgrade_key_caps(key, caps): "sudo", "-u", ceph_user(), 'ceph', 'auth', 'caps', key ] for subsystem, subcaps in caps.items(): + if subsystem == 'osd': + if pool_list: + # This will output a string similar to: + # "pool=rgw pool=rbd pool=something" + pools = " ".join(['pool={0}'.format(i) for i in pool_list]) + subcaps[0] = subcaps[0] + " " + pools cmd.extend([subsystem, '; '.join(subcaps)]) subprocess.check_call(cmd) @@ -1393,15 +1344,6 @@ def update_monfs(): pass -def maybe_zap_journal(journal_dev): - if is_osd_disk(journal_dev): - log('Looks like {} is already an OSD data' - ' or journal, skipping.'.format(journal_dev)) - return - zap_disk(journal_dev) - log("Zapped journal device {}".format(journal_dev)) - - def get_partitions(dev): cmd = ['partx', '--raw', '--noheadings', dev] try: @@ -1413,17 +1355,35 @@ def get_partitions(dev): return [] -def find_least_used_utility_device(utility_devices): +def get_lvs(dev): + """ + List logical volumes for the provided block device + + :param: dev: Full path to block device. + :raises subprocess.CalledProcessError: in the event that any supporting + operation failed. + :returns: list: List of logical volumes provided by the block device + """ + if not lvm.is_lvm_physical_volume(dev): + return [] + vg_name = lvm.list_lvm_volume_group(dev) + return lvm.list_logical_volumes('vg_name={}'.format(vg_name)) + + +def find_least_used_utility_device(utility_devices, lvs=False): """ Find a utility device which has the smallest number of partitions among other devices in the supplied list. :utility_devices: A list of devices to be used for filestore journal or bluestore wal or db. + :lvs: flag to indicate whether inspection should be based on LVM LV's :return: string device name """ - - usages = map(lambda a: (len(get_partitions(a)), a), utility_devices) + if lvs: + usages = map(lambda a: (len(get_lvs(a)), a), utility_devices) + else: + usages = map(lambda a: (len(get_partitions(a)), a), utility_devices) least = min(usages, key=lambda t: t[0]) return least[1] @@ -1445,18 +1405,46 @@ def get_devices(name): return set(devices) -def osdize(dev, osd_format, osd_journal, reformat_osd=False, - ignore_errors=False, encrypt=False, bluestore=False): +def osdize(dev, osd_format, osd_journal, ignore_errors=False, encrypt=False, + bluestore=False, key_manager=CEPH_KEY_MANAGER): if dev.startswith('/dev'): osdize_dev(dev, osd_format, osd_journal, - reformat_osd, ignore_errors, encrypt, - bluestore) + ignore_errors, encrypt, + bluestore, key_manager) else: osdize_dir(dev, encrypt, bluestore) -def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False, - ignore_errors=False, encrypt=False, bluestore=False): +def osdize_dev(dev, osd_format, osd_journal, ignore_errors=False, + encrypt=False, bluestore=False, key_manager=CEPH_KEY_MANAGER): + """ + Prepare a block device for use as a Ceph OSD + + A block device will only be prepared once during the lifetime + of the calling charm unit; future executions will be skipped. + + :param: dev: Full path to block device to use + :param: osd_format: Format for OSD filesystem + :param: osd_journal: List of block devices to use for OSD journals + :param: ignore_errors: Don't fail in the event of any errors during + processing + :param: encrypt: Encrypt block devices using 'key_manager' + :param: bluestore: Use bluestore native ceph block device format + :param: key_manager: Key management approach for encryption keys + :raises subprocess.CalledProcessError: in the event that any supporting + subprocess operation failed + :raises ValueError: if an invalid key_manager is provided + """ + if key_manager not in KEY_MANAGERS: + raise ValueError('Unsupported key manager: {}'.format(key_manager)) + + db = kv() + osd_devices = db.get('osd-devices', []) + if dev in osd_devices: + log('Device {} already processed by charm,' + ' skipping'.format(dev)) + return + if not os.path.exists(dev): log('Path {} does not exist - bailing'.format(dev)) return @@ -1465,7 +1453,7 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False, log('Path {} is not a block device - bailing'.format(dev)) return - if is_osd_disk(dev) and not reformat_osd: + if is_osd_disk(dev): log('Looks like {} is already an' ' OSD data or journal, skipping.'.format(dev)) return @@ -1474,58 +1462,378 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False, log('Looks like {} is in use, skipping.'.format(dev)) return - status_set('maintenance', 'Initializing device {}'.format(dev)) - cmd = ['ceph-disk', 'prepare'] - # Later versions of ceph support more options - if cmp_pkgrevno('ceph', '0.60') >= 0: - if encrypt: - cmd.append('--dmcrypt') - if cmp_pkgrevno('ceph', '0.48.3') >= 0: - if osd_format and not bluestore: - cmd.append('--fs-type') - cmd.append(osd_format) + if is_active_bluestore_device(dev): + log('{} is in use as an active bluestore block device,' + ' skipping.'.format(dev)) + return - if reformat_osd: - cmd.append('--zap-disk') - - # NOTE(jamespage): enable experimental bluestore support - if cmp_pkgrevno('ceph', '10.2.0') >= 0 and bluestore: - cmd.append('--bluestore') - wal = get_devices('bluestore-wal') - if wal: - cmd.append('--block.wal') - least_used_wal = find_least_used_utility_device(wal) - cmd.append(least_used_wal) - db = get_devices('bluestore-db') - if db: - cmd.append('--block.db') - least_used_db = find_least_used_utility_device(db) - cmd.append(least_used_db) - elif cmp_pkgrevno('ceph', '12.1.0') >= 0 and not bluestore: - cmd.append('--filestore') - - cmd.append(dev) - - if osd_journal: - least_used = find_least_used_utility_device(osd_journal) - cmd.append(least_used) + if cmp_pkgrevno('ceph', '12.2.4') >= 0: + cmd = _ceph_volume(dev, + osd_journal, + encrypt, + bluestore, + key_manager) else: - # Just provide the device - no other options - # for older versions of ceph - cmd.append(dev) - if reformat_osd: - zap_disk(dev) + cmd = _ceph_disk(dev, + osd_format, + osd_journal, + encrypt, + bluestore) try: + status_set('maintenance', 'Initializing device {}'.format(dev)) log("osdize cmd: {}".format(cmd)) subprocess.check_call(cmd) except subprocess.CalledProcessError: + try: + lsblk_output = subprocess.check_output( + ['lsblk', '-P']).decode('UTF-8') + except subprocess.CalledProcessError as e: + log("Couldn't get lsblk output: {}".format(e), ERROR) if ignore_errors: log('Unable to initialize device: {}'.format(dev), WARNING) + if lsblk_output: + log('lsblk output: {}'.format(lsblk_output), DEBUG) else: log('Unable to initialize device: {}'.format(dev), ERROR) + if lsblk_output: + log('lsblk output: {}'.format(lsblk_output), WARNING) raise + # NOTE: Record processing of device only on success to ensure that + # the charm only tries to initialize a device of OSD usage + # once during its lifetime. + osd_devices.append(dev) + db.set('osd-devices', osd_devices) + db.flush() + + +def _ceph_disk(dev, osd_format, osd_journal, encrypt=False, bluestore=False): + """ + Prepare a device for usage as a Ceph OSD using ceph-disk + + :param: dev: Full path to use for OSD block device setup + :param: osd_journal: List of block devices to use for OSD journals + :param: encrypt: Use block device encryption (unsupported) + :param: bluestore: Use bluestore storage for OSD + :returns: list. 'ceph-disk' command and required parameters for + execution by check_call + """ + cmd = ['ceph-disk', 'prepare'] + + if encrypt: + cmd.append('--dmcrypt') + + if osd_format and not bluestore: + cmd.append('--fs-type') + cmd.append(osd_format) + + # NOTE(jamespage): enable experimental bluestore support + if cmp_pkgrevno('ceph', '10.2.0') >= 0 and bluestore: + cmd.append('--bluestore') + wal = get_devices('bluestore-wal') + if wal: + cmd.append('--block.wal') + least_used_wal = find_least_used_utility_device(wal) + cmd.append(least_used_wal) + db = get_devices('bluestore-db') + if db: + cmd.append('--block.db') + least_used_db = find_least_used_utility_device(db) + cmd.append(least_used_db) + elif cmp_pkgrevno('ceph', '12.1.0') >= 0 and not bluestore: + cmd.append('--filestore') + + cmd.append(dev) + + if osd_journal: + least_used = find_least_used_utility_device(osd_journal) + cmd.append(least_used) + + return cmd + + +def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False, + key_manager=CEPH_KEY_MANAGER): + """ + Prepare and activate a device for usage as a Ceph OSD using ceph-volume. + + This also includes creation of all PV's, VG's and LV's required to + support the initialization of the OSD. + + :param: dev: Full path to use for OSD block device setup + :param: osd_journal: List of block devices to use for OSD journals + :param: encrypt: Use block device encryption + :param: bluestore: Use bluestore storage for OSD + :param: key_manager: dm-crypt Key Manager to use + :raises subprocess.CalledProcessError: in the event that any supporting + LVM operation failed. + :returns: list. 'ceph-volume' command and required parameters for + execution by check_call + """ + cmd = ['ceph-volume', 'lvm', 'create'] + + osd_fsid = str(uuid.uuid4()) + cmd.append('--osd-fsid') + cmd.append(osd_fsid) + + if bluestore: + cmd.append('--bluestore') + main_device_type = 'block' + else: + cmd.append('--filestore') + main_device_type = 'data' + + if encrypt and key_manager == CEPH_KEY_MANAGER: + cmd.append('--dmcrypt') + + # On-disk journal volume creation + if not osd_journal and not bluestore: + journal_lv_type = 'journal' + cmd.append('--journal') + cmd.append(_allocate_logical_volume( + dev=dev, + lv_type=journal_lv_type, + osd_fsid=osd_fsid, + size='{}M'.format(calculate_volume_size('journal')), + encrypt=encrypt, + key_manager=key_manager) + ) + + cmd.append('--data') + cmd.append(_allocate_logical_volume(dev=dev, + lv_type=main_device_type, + osd_fsid=osd_fsid, + encrypt=encrypt, + key_manager=key_manager)) + + if bluestore: + for extra_volume in ('wal', 'db'): + devices = get_devices('bluestore-{}'.format(extra_volume)) + if devices: + cmd.append('--block.{}'.format(extra_volume)) + least_used = find_least_used_utility_device(devices, + lvs=True) + cmd.append(_allocate_logical_volume( + dev=least_used, + lv_type=extra_volume, + osd_fsid=osd_fsid, + size='{}M'.format(calculate_volume_size(extra_volume)), + shared=True, + encrypt=encrypt, + key_manager=key_manager) + ) + + elif osd_journal: + cmd.append('--journal') + least_used = find_least_used_utility_device(osd_journal, + lvs=True) + cmd.append(_allocate_logical_volume( + dev=least_used, + lv_type='journal', + osd_fsid=osd_fsid, + size='{}M'.format(calculate_volume_size('journal')), + shared=True, + encrypt=encrypt, + key_manager=key_manager) + ) + + return cmd + + +def _partition_name(dev): + """ + Derive the first partition name for a block device + + :param: dev: Full path to block device. + :returns: str: Full path to first partition on block device. + """ + if dev[-1].isdigit(): + return '{}p1'.format(dev) + else: + return '{}1'.format(dev) + + +def is_active_bluestore_device(dev): + """ + Determine whether provided device is part of an active + bluestore based OSD (as its block component). + + :param: dev: Full path to block device to check for Bluestore usage. + :returns: boolean: indicating whether device is in active use. + """ + if not lvm.is_lvm_physical_volume(dev): + return False + + vg_name = lvm.list_lvm_volume_group(dev) + lv_name = lvm.list_logical_volumes('vg_name={}'.format(vg_name))[0] + + block_symlinks = glob.glob('/var/lib/ceph/osd/ceph-*/block') + for block_candidate in block_symlinks: + if os.path.islink(block_candidate): + target = os.readlink(block_candidate) + if target.endswith(lv_name): + return True + + return False + + +def get_conf(variable): + """ + Get the value of the given configuration variable from the + cluster. + + :param variable: ceph configuration variable + :returns: str. configured value for provided variable + + """ + return subprocess.check_output([ + 'ceph-osd', + '--show-config-value={}'.format(variable), + ]).strip() + + +def calculate_volume_size(lv_type): + """ + Determine the configured size for Bluestore DB/WAL or + Filestore Journal devices + + :param lv_type: volume type (db, wal or journal) + :raises KeyError: if invalid lv_type is supplied + :returns: int. Configured size in megabytes for volume type + """ + # lv_type -> ceph configuration option + _config_map = { + 'db': 'bluestore_block_db_size', + 'wal': 'bluestore_block_wal_size', + 'journal': 'osd_journal_size', + } + + # default sizes in MB + _default_size = { + 'db': 1024, + 'wal': 576, + 'journal': 1024, + } + + # conversion of ceph config units to MB + _units = { + 'db': 1048576, # Bytes -> MB + 'wal': 1048576, # Bytes -> MB + 'journal': 1, # Already in MB + } + + configured_size = get_conf(_config_map[lv_type]) + + if configured_size is None or int(configured_size) == 0: + return _default_size[lv_type] + else: + return int(configured_size) / _units[lv_type] + + +def _luks_uuid(dev): + """ + Check to see if dev is a LUKS encrypted volume, returning the UUID + of volume if it is. + + :param: dev: path to block device to check. + :returns: str. UUID of LUKS device or None if not a LUKS device + """ + try: + cmd = ['cryptsetup', 'luksUUID', dev] + return subprocess.check_output(cmd).decode('UTF-8').strip() + except subprocess.CalledProcessError: + return None + + +def _initialize_disk(dev, dev_uuid, encrypt=False, + key_manager=CEPH_KEY_MANAGER): + """ + Initialize a raw block device consuming 100% of the avaliable + disk space. + + Function assumes that block device has already been wiped. + + :param: dev: path to block device to initialize + :param: dev_uuid: UUID to use for any dm-crypt operations + :param: encrypt: Encrypt OSD devices using dm-crypt + :param: key_manager: Key management approach for dm-crypt keys + :raises: subprocess.CalledProcessError: if any parted calls fail + :returns: str: Full path to new partition. + """ + use_vaultlocker = encrypt and key_manager == VAULT_KEY_MANAGER + + if use_vaultlocker: + # NOTE(jamespage): Check to see if already initialized as a LUKS + # volume, which indicates this is a shared block + # device for journal, db or wal volumes. + luks_uuid = _luks_uuid(dev) + if luks_uuid: + return '/dev/mapper/crypt-{}'.format(luks_uuid) + + dm_crypt = '/dev/mapper/crypt-{}'.format(dev_uuid) + + if use_vaultlocker and not os.path.exists(dm_crypt): + subprocess.check_call([ + 'vaultlocker', + 'encrypt', + '--uuid', dev_uuid, + dev, + ]) + + if use_vaultlocker: + return dm_crypt + else: + return dev + + +def _allocate_logical_volume(dev, lv_type, osd_fsid, + size=None, shared=False, + encrypt=False, + key_manager=CEPH_KEY_MANAGER): + """ + Allocate a logical volume from a block device, ensuring any + required initialization and setup of PV's and VG's to support + the LV. + + :param: dev: path to block device to allocate from. + :param: lv_type: logical volume type to create + (data, block, journal, wal, db) + :param: osd_fsid: UUID of the OSD associate with the LV + :param: size: Size in LVM format for the device; + if unset 100% of VG + :param: shared: Shared volume group (journal, wal, db) + :param: encrypt: Encrypt OSD devices using dm-crypt + :param: key_manager: dm-crypt Key Manager to use + :raises subprocess.CalledProcessError: in the event that any supporting + LVM or parted operation fails. + :returns: str: String in the format 'vg_name/lv_name'. + """ + lv_name = "osd-{}-{}".format(lv_type, osd_fsid) + current_volumes = lvm.list_logical_volumes() + if shared: + dev_uuid = str(uuid.uuid4()) + else: + dev_uuid = osd_fsid + pv_dev = _initialize_disk(dev, dev_uuid, encrypt, key_manager) + + vg_name = None + if not lvm.is_lvm_physical_volume(pv_dev): + lvm.create_lvm_physical_volume(pv_dev) + if shared: + vg_name = 'ceph-{}-{}'.format(lv_type, + str(uuid.uuid4())) + else: + vg_name = 'ceph-{}'.format(osd_fsid) + lvm.create_lvm_volume_group(vg_name, pv_dev) + else: + vg_name = lvm.list_lvm_volume_group(pv_dev) + + if lv_name not in current_volumes: + lvm.create_logical_volume(lv_name, vg_name, size) + + return "{}/{}".format(vg_name, lv_name) + def osdize_dir(path, encrypt=False, bluestore=False): """Ask ceph-disk to prepare a directory to become an osd. diff --git a/unit_tests/test_ceph_hooks.py b/unit_tests/test_ceph_hooks.py index ad955f86..83cb38ae 100644 --- a/unit_tests/test_ceph_hooks.py +++ b/unit_tests/test_ceph_hooks.py @@ -186,10 +186,14 @@ class CephHooksTestCase(unittest.TestCase): mocks["apt_install"].assert_called_once_with( ["python-dbus", "lockfile-progs"]) + @patch.object(ceph_hooks, 'ceph') + @patch.object(ceph_hooks, 'notify_client') @patch.object(ceph_hooks, 'config') def test_upgrade_charm_with_nrpe_relation_installs_dependencies( self, - mock_config): + mock_config, + mock_notify_client, + mock_ceph): config = copy.deepcopy(CHARM_CONFIG) mock_config.side_effect = lambda key: config[key] with patch.multiple( @@ -207,6 +211,8 @@ class CephHooksTestCase(unittest.TestCase): ceph_hooks.upgrade_charm() mocks["apt_install"].assert_called_with( ["python-dbus", "lockfile-progs"]) + mock_notify_client.assert_called_once_with() + mock_ceph.update_monfs.assert_called_once_with() class RelatedUnitsTestCase(unittest.TestCase):