Add 'osd blacklist' to default mon perms
Ensure that the default permissions for clients include the 'osd blacklist' command; This ensures that in the event of a client crashing (due to power outage or segfault), the client and re-connect and write to any devices on reboot. This is a safe permission for all supported Ceph releases. Depends-On: I0b43dece4e1c56fb838b0147bfb75fb9906e6657 Change-Id: Ib1f1e8d7ed54528603b8b08051dafeec075a3232 Closes-Bug: 1773449
This commit is contained in:
parent
54c3f21221
commit
e071486acf
|
@ -658,6 +658,11 @@ def upgrade_charm():
|
|||
if is_relation_made("nrpe-external-master"):
|
||||
update_nrpe_config()
|
||||
|
||||
# NOTE(jamespage):
|
||||
# Reprocess broker requests to ensure that any cephx
|
||||
# key permission changes are applied
|
||||
notify_client()
|
||||
|
||||
|
||||
@hooks.hook('start')
|
||||
def start():
|
||||
|
|
|
@ -81,6 +81,10 @@ POOL_KEYS = {
|
|||
"cache_min_flush_age": [int],
|
||||
"cache_min_evict_age": [int],
|
||||
"fast_read": [bool],
|
||||
"allow_ec_overwrites": [bool],
|
||||
"compression_mode": [str, ["none", "passive", "aggressive", "force"]],
|
||||
"compression_algorithm": [str, ["lz4", "snappy", "zlib", "zstd"]],
|
||||
"compression_required_ratio": [float, [0.0, 1.0]],
|
||||
}
|
||||
|
||||
CEPH_BUCKET_TYPES = [
|
||||
|
@ -251,7 +255,8 @@ def pool_permission_list_for_service(service):
|
|||
for prefix in prefixes:
|
||||
permissions.append("allow {} object_prefix {}".format(permission,
|
||||
prefix))
|
||||
return ["mon", "allow r", "osd", ', '.join(permissions)]
|
||||
return ['mon', 'allow r, allow command "osd blacklist"',
|
||||
'osd', ', '.join(permissions)]
|
||||
|
||||
|
||||
def get_service_groups(service, namespace=None):
|
||||
|
|
|
@ -13,8 +13,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import ctypes
|
||||
import errno
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import pyudev
|
||||
|
@ -24,7 +23,7 @@ import socket
|
|||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import shutil
|
||||
import uuid
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
|
@ -36,7 +35,6 @@ from charmhelpers.core.host import (
|
|||
cmp_pkgrevno,
|
||||
lsb_release,
|
||||
mkdir,
|
||||
mounts,
|
||||
owner,
|
||||
service_restart,
|
||||
service_start,
|
||||
|
@ -68,11 +66,12 @@ from charmhelpers.contrib.storage.linux.ceph import (
|
|||
from charmhelpers.contrib.storage.linux.utils import (
|
||||
is_block_device,
|
||||
is_device_mounted,
|
||||
zap_disk,
|
||||
)
|
||||
from charmhelpers.contrib.openstack.utils import (
|
||||
get_os_codename_install_source,
|
||||
)
|
||||
from charmhelpers.contrib.storage.linux import lvm
|
||||
from charmhelpers.core.unitdata import kv
|
||||
|
||||
CEPH_BASE_DIR = os.path.join(os.sep, 'var', 'lib', 'ceph')
|
||||
OSD_BASE_DIR = os.path.join(CEPH_BASE_DIR, 'osd')
|
||||
|
@ -83,7 +82,15 @@ PEON = 'peon'
|
|||
QUORUM = [LEADER, PEON]
|
||||
|
||||
PACKAGES = ['ceph', 'gdisk', 'ntp', 'btrfs-tools', 'python-ceph',
|
||||
'radosgw', 'xfsprogs', 'python-pyudev']
|
||||
'radosgw', 'xfsprogs', 'python-pyudev',
|
||||
'lvm2', 'parted']
|
||||
|
||||
CEPH_KEY_MANAGER = 'ceph'
|
||||
VAULT_KEY_MANAGER = 'vault'
|
||||
KEY_MANAGERS = [
|
||||
CEPH_KEY_MANAGER,
|
||||
VAULT_KEY_MANAGER,
|
||||
]
|
||||
|
||||
LinkSpeed = {
|
||||
"BASE_10": 10,
|
||||
|
@ -823,114 +830,6 @@ CEPH_PARTITIONS = [
|
|||
]
|
||||
|
||||
|
||||
def umount(mount_point):
|
||||
"""This function unmounts a mounted directory forcibly. This will
|
||||
be used for unmounting broken hard drive mounts which may hang.
|
||||
|
||||
If umount returns EBUSY this will lazy unmount.
|
||||
|
||||
:param mount_point: str. A String representing the filesystem mount point
|
||||
:returns: int. Returns 0 on success. errno otherwise.
|
||||
"""
|
||||
libc_path = ctypes.util.find_library("c")
|
||||
libc = ctypes.CDLL(libc_path, use_errno=True)
|
||||
|
||||
# First try to umount with MNT_FORCE
|
||||
ret = libc.umount(mount_point, 1)
|
||||
if ret < 0:
|
||||
err = ctypes.get_errno()
|
||||
if err == errno.EBUSY:
|
||||
# Detach from try. IE lazy umount
|
||||
ret = libc.umount(mount_point, 2)
|
||||
if ret < 0:
|
||||
err = ctypes.get_errno()
|
||||
return err
|
||||
return 0
|
||||
else:
|
||||
return err
|
||||
return 0
|
||||
|
||||
|
||||
def replace_osd(dead_osd_number,
|
||||
dead_osd_device,
|
||||
new_osd_device,
|
||||
osd_format,
|
||||
osd_journal,
|
||||
reformat_osd=False,
|
||||
ignore_errors=False):
|
||||
"""This function will automate the replacement of a failed osd disk as much
|
||||
as possible. It will revoke the keys for the old osd, remove it from the
|
||||
crush map and then add a new osd into the cluster.
|
||||
|
||||
:param dead_osd_number: The osd number found in ceph osd tree. Example: 99
|
||||
:param dead_osd_device: The physical device. Example: /dev/sda
|
||||
:param osd_format:
|
||||
:param osd_journal:
|
||||
:param reformat_osd:
|
||||
:param ignore_errors:
|
||||
"""
|
||||
host_mounts = mounts()
|
||||
mount_point = None
|
||||
for mount in host_mounts:
|
||||
if mount[1] == dead_osd_device:
|
||||
mount_point = mount[0]
|
||||
# need to convert dev to osd number
|
||||
# also need to get the mounted drive so we can tell the admin to
|
||||
# replace it
|
||||
try:
|
||||
# Drop this osd out of the cluster. This will begin a
|
||||
# rebalance operation
|
||||
status_set('maintenance', 'Removing osd {}'.format(dead_osd_number))
|
||||
subprocess.check_output([
|
||||
'ceph',
|
||||
'--id',
|
||||
'osd-upgrade',
|
||||
'osd', 'out',
|
||||
'osd.{}'.format(dead_osd_number)])
|
||||
|
||||
# Kill the osd process if it's not already dead
|
||||
if systemd():
|
||||
service_stop('ceph-osd@{}'.format(dead_osd_number))
|
||||
else:
|
||||
subprocess.check_output(['stop', 'ceph-osd', 'id={}'.format(
|
||||
dead_osd_number)])
|
||||
# umount if still mounted
|
||||
ret = umount(mount_point)
|
||||
if ret < 0:
|
||||
raise RuntimeError('umount {} failed with error: {}'.format(
|
||||
mount_point, os.strerror(ret)))
|
||||
# Clean up the old mount point
|
||||
shutil.rmtree(mount_point)
|
||||
subprocess.check_output([
|
||||
'ceph',
|
||||
'--id',
|
||||
'osd-upgrade',
|
||||
'osd', 'crush', 'remove',
|
||||
'osd.{}'.format(dead_osd_number)])
|
||||
# Revoke the OSDs access keys
|
||||
subprocess.check_output([
|
||||
'ceph',
|
||||
'--id',
|
||||
'osd-upgrade',
|
||||
'auth', 'del',
|
||||
'osd.{}'.format(dead_osd_number)])
|
||||
subprocess.check_output([
|
||||
'ceph',
|
||||
'--id',
|
||||
'osd-upgrade',
|
||||
'osd', 'rm',
|
||||
'osd.{}'.format(dead_osd_number)])
|
||||
status_set('maintenance', 'Setting up replacement osd {}'.format(
|
||||
new_osd_device))
|
||||
osdize(new_osd_device,
|
||||
osd_format,
|
||||
osd_journal,
|
||||
reformat_osd,
|
||||
ignore_errors)
|
||||
except subprocess.CalledProcessError as e:
|
||||
log('replace_osd failed with error: ' + e.output)
|
||||
|
||||
|
||||
def get_partition_list(dev):
|
||||
"""Lists the partitions of a block device.
|
||||
|
||||
|
@ -970,7 +869,42 @@ def get_partition_list(dev):
|
|||
raise
|
||||
|
||||
|
||||
def is_pristine_disk(dev):
|
||||
"""
|
||||
Read first 2048 bytes (LBA 0 - 3) of block device to determine whether it
|
||||
is actually all zeros and safe for us to use.
|
||||
|
||||
Existing partitioning tools does not discern between a failure to read from
|
||||
block device, failure to understand a partition table and the fact that a
|
||||
block device has no partition table. Since we need to be positive about
|
||||
which is which we need to read the device directly and confirm ourselves.
|
||||
|
||||
:param dev: Path to block device
|
||||
:type dev: str
|
||||
:returns: True all 2048 bytes == 0x0, False if not
|
||||
:rtype: bool
|
||||
"""
|
||||
want_bytes = 2048
|
||||
|
||||
f = open(dev, 'rb')
|
||||
data = f.read(want_bytes)
|
||||
read_bytes = len(data)
|
||||
if read_bytes != want_bytes:
|
||||
log('{}: short read, got {} bytes expected {}.'
|
||||
.format(dev, read_bytes, want_bytes), level=WARNING)
|
||||
return False
|
||||
|
||||
return all(byte == 0x0 for byte in data)
|
||||
|
||||
|
||||
def is_osd_disk(dev):
|
||||
db = kv()
|
||||
osd_devices = db.get('osd-devices', [])
|
||||
if dev in osd_devices:
|
||||
log('Device {} already processed by charm,'
|
||||
' skipping'.format(dev))
|
||||
return True
|
||||
|
||||
partitions = get_partition_list(dev)
|
||||
for partition in partitions:
|
||||
try:
|
||||
|
@ -1008,6 +942,9 @@ def rescan_osd_devices():
|
|||
|
||||
subprocess.call(cmd)
|
||||
|
||||
cmd = ['udevadm', 'settle']
|
||||
subprocess.call(cmd)
|
||||
|
||||
|
||||
_bootstrap_keyring = "/var/lib/ceph/bootstrap-osd/ceph.keyring"
|
||||
_upgrade_keyring = "/var/lib/ceph/osd/ceph.client.osd-upgrade.keyring"
|
||||
|
@ -1159,7 +1096,8 @@ def get_mds_bootstrap_key():
|
|||
|
||||
|
||||
_default_caps = collections.OrderedDict([
|
||||
('mon', ['allow r']),
|
||||
('mon', ['allow r',
|
||||
'allow command "osd blacklist"']),
|
||||
('osd', ['allow rwx']),
|
||||
])
|
||||
|
||||
|
@ -1226,6 +1164,7 @@ def get_named_key(name, caps=None, pool_list=None):
|
|||
:param caps: dict of cephx capabilities
|
||||
:returns: Returns a cephx key
|
||||
"""
|
||||
key_name = 'client.{}'.format(name)
|
||||
try:
|
||||
# Does the key already exist?
|
||||
output = str(subprocess.check_output(
|
||||
|
@ -1240,8 +1179,14 @@ def get_named_key(name, caps=None, pool_list=None):
|
|||
),
|
||||
'auth',
|
||||
'get',
|
||||
'client.{}'.format(name),
|
||||
key_name,
|
||||
]).decode('UTF-8')).strip()
|
||||
# NOTE(jamespage);
|
||||
# Apply any changes to key capabilities, dealing with
|
||||
# upgrades which requires new caps for operation.
|
||||
upgrade_key_caps(key_name,
|
||||
caps or _default_caps,
|
||||
pool_list)
|
||||
return parse_key(output)
|
||||
except subprocess.CalledProcessError:
|
||||
# Couldn't get the key, time to create it!
|
||||
|
@ -1257,7 +1202,7 @@ def get_named_key(name, caps=None, pool_list=None):
|
|||
'/var/lib/ceph/mon/ceph-{}/keyring'.format(
|
||||
socket.gethostname()
|
||||
),
|
||||
'auth', 'get-or-create', 'client.{}'.format(name),
|
||||
'auth', 'get-or-create', key_name,
|
||||
]
|
||||
# Add capabilities
|
||||
for subsystem, subcaps in caps.items():
|
||||
|
@ -1276,7 +1221,7 @@ def get_named_key(name, caps=None, pool_list=None):
|
|||
.strip()) # IGNORE:E1103
|
||||
|
||||
|
||||
def upgrade_key_caps(key, caps):
|
||||
def upgrade_key_caps(key, caps, pool_list=None):
|
||||
""" Upgrade key to have capabilities caps """
|
||||
if not is_leader():
|
||||
# Not the MON leader OR not clustered
|
||||
|
@ -1285,6 +1230,12 @@ def upgrade_key_caps(key, caps):
|
|||
"sudo", "-u", ceph_user(), 'ceph', 'auth', 'caps', key
|
||||
]
|
||||
for subsystem, subcaps in caps.items():
|
||||
if subsystem == 'osd':
|
||||
if pool_list:
|
||||
# This will output a string similar to:
|
||||
# "pool=rgw pool=rbd pool=something"
|
||||
pools = " ".join(['pool={0}'.format(i) for i in pool_list])
|
||||
subcaps[0] = subcaps[0] + " " + pools
|
||||
cmd.extend([subsystem, '; '.join(subcaps)])
|
||||
subprocess.check_call(cmd)
|
||||
|
||||
|
@ -1393,15 +1344,6 @@ def update_monfs():
|
|||
pass
|
||||
|
||||
|
||||
def maybe_zap_journal(journal_dev):
|
||||
if is_osd_disk(journal_dev):
|
||||
log('Looks like {} is already an OSD data'
|
||||
' or journal, skipping.'.format(journal_dev))
|
||||
return
|
||||
zap_disk(journal_dev)
|
||||
log("Zapped journal device {}".format(journal_dev))
|
||||
|
||||
|
||||
def get_partitions(dev):
|
||||
cmd = ['partx', '--raw', '--noheadings', dev]
|
||||
try:
|
||||
|
@ -1413,17 +1355,35 @@ def get_partitions(dev):
|
|||
return []
|
||||
|
||||
|
||||
def find_least_used_utility_device(utility_devices):
|
||||
def get_lvs(dev):
|
||||
"""
|
||||
List logical volumes for the provided block device
|
||||
|
||||
:param: dev: Full path to block device.
|
||||
:raises subprocess.CalledProcessError: in the event that any supporting
|
||||
operation failed.
|
||||
:returns: list: List of logical volumes provided by the block device
|
||||
"""
|
||||
if not lvm.is_lvm_physical_volume(dev):
|
||||
return []
|
||||
vg_name = lvm.list_lvm_volume_group(dev)
|
||||
return lvm.list_logical_volumes('vg_name={}'.format(vg_name))
|
||||
|
||||
|
||||
def find_least_used_utility_device(utility_devices, lvs=False):
|
||||
"""
|
||||
Find a utility device which has the smallest number of partitions
|
||||
among other devices in the supplied list.
|
||||
|
||||
:utility_devices: A list of devices to be used for filestore journal
|
||||
or bluestore wal or db.
|
||||
:lvs: flag to indicate whether inspection should be based on LVM LV's
|
||||
:return: string device name
|
||||
"""
|
||||
|
||||
usages = map(lambda a: (len(get_partitions(a)), a), utility_devices)
|
||||
if lvs:
|
||||
usages = map(lambda a: (len(get_lvs(a)), a), utility_devices)
|
||||
else:
|
||||
usages = map(lambda a: (len(get_partitions(a)), a), utility_devices)
|
||||
least = min(usages, key=lambda t: t[0])
|
||||
return least[1]
|
||||
|
||||
|
@ -1445,18 +1405,46 @@ def get_devices(name):
|
|||
return set(devices)
|
||||
|
||||
|
||||
def osdize(dev, osd_format, osd_journal, reformat_osd=False,
|
||||
ignore_errors=False, encrypt=False, bluestore=False):
|
||||
def osdize(dev, osd_format, osd_journal, ignore_errors=False, encrypt=False,
|
||||
bluestore=False, key_manager=CEPH_KEY_MANAGER):
|
||||
if dev.startswith('/dev'):
|
||||
osdize_dev(dev, osd_format, osd_journal,
|
||||
reformat_osd, ignore_errors, encrypt,
|
||||
bluestore)
|
||||
ignore_errors, encrypt,
|
||||
bluestore, key_manager)
|
||||
else:
|
||||
osdize_dir(dev, encrypt, bluestore)
|
||||
|
||||
|
||||
def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False,
|
||||
ignore_errors=False, encrypt=False, bluestore=False):
|
||||
def osdize_dev(dev, osd_format, osd_journal, ignore_errors=False,
|
||||
encrypt=False, bluestore=False, key_manager=CEPH_KEY_MANAGER):
|
||||
"""
|
||||
Prepare a block device for use as a Ceph OSD
|
||||
|
||||
A block device will only be prepared once during the lifetime
|
||||
of the calling charm unit; future executions will be skipped.
|
||||
|
||||
:param: dev: Full path to block device to use
|
||||
:param: osd_format: Format for OSD filesystem
|
||||
:param: osd_journal: List of block devices to use for OSD journals
|
||||
:param: ignore_errors: Don't fail in the event of any errors during
|
||||
processing
|
||||
:param: encrypt: Encrypt block devices using 'key_manager'
|
||||
:param: bluestore: Use bluestore native ceph block device format
|
||||
:param: key_manager: Key management approach for encryption keys
|
||||
:raises subprocess.CalledProcessError: in the event that any supporting
|
||||
subprocess operation failed
|
||||
:raises ValueError: if an invalid key_manager is provided
|
||||
"""
|
||||
if key_manager not in KEY_MANAGERS:
|
||||
raise ValueError('Unsupported key manager: {}'.format(key_manager))
|
||||
|
||||
db = kv()
|
||||
osd_devices = db.get('osd-devices', [])
|
||||
if dev in osd_devices:
|
||||
log('Device {} already processed by charm,'
|
||||
' skipping'.format(dev))
|
||||
return
|
||||
|
||||
if not os.path.exists(dev):
|
||||
log('Path {} does not exist - bailing'.format(dev))
|
||||
return
|
||||
|
@ -1465,7 +1453,7 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False,
|
|||
log('Path {} is not a block device - bailing'.format(dev))
|
||||
return
|
||||
|
||||
if is_osd_disk(dev) and not reformat_osd:
|
||||
if is_osd_disk(dev):
|
||||
log('Looks like {} is already an'
|
||||
' OSD data or journal, skipping.'.format(dev))
|
||||
return
|
||||
|
@ -1474,58 +1462,378 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False,
|
|||
log('Looks like {} is in use, skipping.'.format(dev))
|
||||
return
|
||||
|
||||
status_set('maintenance', 'Initializing device {}'.format(dev))
|
||||
cmd = ['ceph-disk', 'prepare']
|
||||
# Later versions of ceph support more options
|
||||
if cmp_pkgrevno('ceph', '0.60') >= 0:
|
||||
if encrypt:
|
||||
cmd.append('--dmcrypt')
|
||||
if cmp_pkgrevno('ceph', '0.48.3') >= 0:
|
||||
if osd_format and not bluestore:
|
||||
cmd.append('--fs-type')
|
||||
cmd.append(osd_format)
|
||||
if is_active_bluestore_device(dev):
|
||||
log('{} is in use as an active bluestore block device,'
|
||||
' skipping.'.format(dev))
|
||||
return
|
||||
|
||||
if reformat_osd:
|
||||
cmd.append('--zap-disk')
|
||||
|
||||
# NOTE(jamespage): enable experimental bluestore support
|
||||
if cmp_pkgrevno('ceph', '10.2.0') >= 0 and bluestore:
|
||||
cmd.append('--bluestore')
|
||||
wal = get_devices('bluestore-wal')
|
||||
if wal:
|
||||
cmd.append('--block.wal')
|
||||
least_used_wal = find_least_used_utility_device(wal)
|
||||
cmd.append(least_used_wal)
|
||||
db = get_devices('bluestore-db')
|
||||
if db:
|
||||
cmd.append('--block.db')
|
||||
least_used_db = find_least_used_utility_device(db)
|
||||
cmd.append(least_used_db)
|
||||
elif cmp_pkgrevno('ceph', '12.1.0') >= 0 and not bluestore:
|
||||
cmd.append('--filestore')
|
||||
|
||||
cmd.append(dev)
|
||||
|
||||
if osd_journal:
|
||||
least_used = find_least_used_utility_device(osd_journal)
|
||||
cmd.append(least_used)
|
||||
if cmp_pkgrevno('ceph', '12.2.4') >= 0:
|
||||
cmd = _ceph_volume(dev,
|
||||
osd_journal,
|
||||
encrypt,
|
||||
bluestore,
|
||||
key_manager)
|
||||
else:
|
||||
# Just provide the device - no other options
|
||||
# for older versions of ceph
|
||||
cmd.append(dev)
|
||||
if reformat_osd:
|
||||
zap_disk(dev)
|
||||
cmd = _ceph_disk(dev,
|
||||
osd_format,
|
||||
osd_journal,
|
||||
encrypt,
|
||||
bluestore)
|
||||
|
||||
try:
|
||||
status_set('maintenance', 'Initializing device {}'.format(dev))
|
||||
log("osdize cmd: {}".format(cmd))
|
||||
subprocess.check_call(cmd)
|
||||
except subprocess.CalledProcessError:
|
||||
try:
|
||||
lsblk_output = subprocess.check_output(
|
||||
['lsblk', '-P']).decode('UTF-8')
|
||||
except subprocess.CalledProcessError as e:
|
||||
log("Couldn't get lsblk output: {}".format(e), ERROR)
|
||||
if ignore_errors:
|
||||
log('Unable to initialize device: {}'.format(dev), WARNING)
|
||||
if lsblk_output:
|
||||
log('lsblk output: {}'.format(lsblk_output), DEBUG)
|
||||
else:
|
||||
log('Unable to initialize device: {}'.format(dev), ERROR)
|
||||
if lsblk_output:
|
||||
log('lsblk output: {}'.format(lsblk_output), WARNING)
|
||||
raise
|
||||
|
||||
# NOTE: Record processing of device only on success to ensure that
|
||||
# the charm only tries to initialize a device of OSD usage
|
||||
# once during its lifetime.
|
||||
osd_devices.append(dev)
|
||||
db.set('osd-devices', osd_devices)
|
||||
db.flush()
|
||||
|
||||
|
||||
def _ceph_disk(dev, osd_format, osd_journal, encrypt=False, bluestore=False):
|
||||
"""
|
||||
Prepare a device for usage as a Ceph OSD using ceph-disk
|
||||
|
||||
:param: dev: Full path to use for OSD block device setup
|
||||
:param: osd_journal: List of block devices to use for OSD journals
|
||||
:param: encrypt: Use block device encryption (unsupported)
|
||||
:param: bluestore: Use bluestore storage for OSD
|
||||
:returns: list. 'ceph-disk' command and required parameters for
|
||||
execution by check_call
|
||||
"""
|
||||
cmd = ['ceph-disk', 'prepare']
|
||||
|
||||
if encrypt:
|
||||
cmd.append('--dmcrypt')
|
||||
|
||||
if osd_format and not bluestore:
|
||||
cmd.append('--fs-type')
|
||||
cmd.append(osd_format)
|
||||
|
||||
# NOTE(jamespage): enable experimental bluestore support
|
||||
if cmp_pkgrevno('ceph', '10.2.0') >= 0 and bluestore:
|
||||
cmd.append('--bluestore')
|
||||
wal = get_devices('bluestore-wal')
|
||||
if wal:
|
||||
cmd.append('--block.wal')
|
||||
least_used_wal = find_least_used_utility_device(wal)
|
||||
cmd.append(least_used_wal)
|
||||
db = get_devices('bluestore-db')
|
||||
if db:
|
||||
cmd.append('--block.db')
|
||||
least_used_db = find_least_used_utility_device(db)
|
||||
cmd.append(least_used_db)
|
||||
elif cmp_pkgrevno('ceph', '12.1.0') >= 0 and not bluestore:
|
||||
cmd.append('--filestore')
|
||||
|
||||
cmd.append(dev)
|
||||
|
||||
if osd_journal:
|
||||
least_used = find_least_used_utility_device(osd_journal)
|
||||
cmd.append(least_used)
|
||||
|
||||
return cmd
|
||||
|
||||
|
||||
def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False,
|
||||
key_manager=CEPH_KEY_MANAGER):
|
||||
"""
|
||||
Prepare and activate a device for usage as a Ceph OSD using ceph-volume.
|
||||
|
||||
This also includes creation of all PV's, VG's and LV's required to
|
||||
support the initialization of the OSD.
|
||||
|
||||
:param: dev: Full path to use for OSD block device setup
|
||||
:param: osd_journal: List of block devices to use for OSD journals
|
||||
:param: encrypt: Use block device encryption
|
||||
:param: bluestore: Use bluestore storage for OSD
|
||||
:param: key_manager: dm-crypt Key Manager to use
|
||||
:raises subprocess.CalledProcessError: in the event that any supporting
|
||||
LVM operation failed.
|
||||
:returns: list. 'ceph-volume' command and required parameters for
|
||||
execution by check_call
|
||||
"""
|
||||
cmd = ['ceph-volume', 'lvm', 'create']
|
||||
|
||||
osd_fsid = str(uuid.uuid4())
|
||||
cmd.append('--osd-fsid')
|
||||
cmd.append(osd_fsid)
|
||||
|
||||
if bluestore:
|
||||
cmd.append('--bluestore')
|
||||
main_device_type = 'block'
|
||||
else:
|
||||
cmd.append('--filestore')
|
||||
main_device_type = 'data'
|
||||
|
||||
if encrypt and key_manager == CEPH_KEY_MANAGER:
|
||||
cmd.append('--dmcrypt')
|
||||
|
||||
# On-disk journal volume creation
|
||||
if not osd_journal and not bluestore:
|
||||
journal_lv_type = 'journal'
|
||||
cmd.append('--journal')
|
||||
cmd.append(_allocate_logical_volume(
|
||||
dev=dev,
|
||||
lv_type=journal_lv_type,
|
||||
osd_fsid=osd_fsid,
|
||||
size='{}M'.format(calculate_volume_size('journal')),
|
||||
encrypt=encrypt,
|
||||
key_manager=key_manager)
|
||||
)
|
||||
|
||||
cmd.append('--data')
|
||||
cmd.append(_allocate_logical_volume(dev=dev,
|
||||
lv_type=main_device_type,
|
||||
osd_fsid=osd_fsid,
|
||||
encrypt=encrypt,
|
||||
key_manager=key_manager))
|
||||
|
||||
if bluestore:
|
||||
for extra_volume in ('wal', 'db'):
|
||||
devices = get_devices('bluestore-{}'.format(extra_volume))
|
||||
if devices:
|
||||
cmd.append('--block.{}'.format(extra_volume))
|
||||
least_used = find_least_used_utility_device(devices,
|
||||
lvs=True)
|
||||
cmd.append(_allocate_logical_volume(
|
||||
dev=least_used,
|
||||
lv_type=extra_volume,
|
||||
osd_fsid=osd_fsid,
|
||||
size='{}M'.format(calculate_volume_size(extra_volume)),
|
||||
shared=True,
|
||||
encrypt=encrypt,
|
||||
key_manager=key_manager)
|
||||
)
|
||||
|
||||
elif osd_journal:
|
||||
cmd.append('--journal')
|
||||
least_used = find_least_used_utility_device(osd_journal,
|
||||
lvs=True)
|
||||
cmd.append(_allocate_logical_volume(
|
||||
dev=least_used,
|
||||
lv_type='journal',
|
||||
osd_fsid=osd_fsid,
|
||||
size='{}M'.format(calculate_volume_size('journal')),
|
||||
shared=True,
|
||||
encrypt=encrypt,
|
||||
key_manager=key_manager)
|
||||
)
|
||||
|
||||
return cmd
|
||||
|
||||
|
||||
def _partition_name(dev):
|
||||
"""
|
||||
Derive the first partition name for a block device
|
||||
|
||||
:param: dev: Full path to block device.
|
||||
:returns: str: Full path to first partition on block device.
|
||||
"""
|
||||
if dev[-1].isdigit():
|
||||
return '{}p1'.format(dev)
|
||||
else:
|
||||
return '{}1'.format(dev)
|
||||
|
||||
|
||||
def is_active_bluestore_device(dev):
|
||||
"""
|
||||
Determine whether provided device is part of an active
|
||||
bluestore based OSD (as its block component).
|
||||
|
||||
:param: dev: Full path to block device to check for Bluestore usage.
|
||||
:returns: boolean: indicating whether device is in active use.
|
||||
"""
|
||||
if not lvm.is_lvm_physical_volume(dev):
|
||||
return False
|
||||
|
||||
vg_name = lvm.list_lvm_volume_group(dev)
|
||||
lv_name = lvm.list_logical_volumes('vg_name={}'.format(vg_name))[0]
|
||||
|
||||
block_symlinks = glob.glob('/var/lib/ceph/osd/ceph-*/block')
|
||||
for block_candidate in block_symlinks:
|
||||
if os.path.islink(block_candidate):
|
||||
target = os.readlink(block_candidate)
|
||||
if target.endswith(lv_name):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def get_conf(variable):
|
||||
"""
|
||||
Get the value of the given configuration variable from the
|
||||
cluster.
|
||||
|
||||
:param variable: ceph configuration variable
|
||||
:returns: str. configured value for provided variable
|
||||
|
||||
"""
|
||||
return subprocess.check_output([
|
||||
'ceph-osd',
|
||||
'--show-config-value={}'.format(variable),
|
||||
]).strip()
|
||||
|
||||
|
||||
def calculate_volume_size(lv_type):
|
||||
"""
|
||||
Determine the configured size for Bluestore DB/WAL or
|
||||
Filestore Journal devices
|
||||
|
||||
:param lv_type: volume type (db, wal or journal)
|
||||
:raises KeyError: if invalid lv_type is supplied
|
||||
:returns: int. Configured size in megabytes for volume type
|
||||
"""
|
||||
# lv_type -> ceph configuration option
|
||||
_config_map = {
|
||||
'db': 'bluestore_block_db_size',
|
||||
'wal': 'bluestore_block_wal_size',
|
||||
'journal': 'osd_journal_size',
|
||||
}
|
||||
|
||||
# default sizes in MB
|
||||
_default_size = {
|
||||
'db': 1024,
|
||||
'wal': 576,
|
||||
'journal': 1024,
|
||||
}
|
||||
|
||||
# conversion of ceph config units to MB
|
||||
_units = {
|
||||
'db': 1048576, # Bytes -> MB
|
||||
'wal': 1048576, # Bytes -> MB
|
||||
'journal': 1, # Already in MB
|
||||
}
|
||||
|
||||
configured_size = get_conf(_config_map[lv_type])
|
||||
|
||||
if configured_size is None or int(configured_size) == 0:
|
||||
return _default_size[lv_type]
|
||||
else:
|
||||
return int(configured_size) / _units[lv_type]
|
||||
|
||||
|
||||
def _luks_uuid(dev):
|
||||
"""
|
||||
Check to see if dev is a LUKS encrypted volume, returning the UUID
|
||||
of volume if it is.
|
||||
|
||||
:param: dev: path to block device to check.
|
||||
:returns: str. UUID of LUKS device or None if not a LUKS device
|
||||
"""
|
||||
try:
|
||||
cmd = ['cryptsetup', 'luksUUID', dev]
|
||||
return subprocess.check_output(cmd).decode('UTF-8').strip()
|
||||
except subprocess.CalledProcessError:
|
||||
return None
|
||||
|
||||
|
||||
def _initialize_disk(dev, dev_uuid, encrypt=False,
|
||||
key_manager=CEPH_KEY_MANAGER):
|
||||
"""
|
||||
Initialize a raw block device consuming 100% of the avaliable
|
||||
disk space.
|
||||
|
||||
Function assumes that block device has already been wiped.
|
||||
|
||||
:param: dev: path to block device to initialize
|
||||
:param: dev_uuid: UUID to use for any dm-crypt operations
|
||||
:param: encrypt: Encrypt OSD devices using dm-crypt
|
||||
:param: key_manager: Key management approach for dm-crypt keys
|
||||
:raises: subprocess.CalledProcessError: if any parted calls fail
|
||||
:returns: str: Full path to new partition.
|
||||
"""
|
||||
use_vaultlocker = encrypt and key_manager == VAULT_KEY_MANAGER
|
||||
|
||||
if use_vaultlocker:
|
||||
# NOTE(jamespage): Check to see if already initialized as a LUKS
|
||||
# volume, which indicates this is a shared block
|
||||
# device for journal, db or wal volumes.
|
||||
luks_uuid = _luks_uuid(dev)
|
||||
if luks_uuid:
|
||||
return '/dev/mapper/crypt-{}'.format(luks_uuid)
|
||||
|
||||
dm_crypt = '/dev/mapper/crypt-{}'.format(dev_uuid)
|
||||
|
||||
if use_vaultlocker and not os.path.exists(dm_crypt):
|
||||
subprocess.check_call([
|
||||
'vaultlocker',
|
||||
'encrypt',
|
||||
'--uuid', dev_uuid,
|
||||
dev,
|
||||
])
|
||||
|
||||
if use_vaultlocker:
|
||||
return dm_crypt
|
||||
else:
|
||||
return dev
|
||||
|
||||
|
||||
def _allocate_logical_volume(dev, lv_type, osd_fsid,
|
||||
size=None, shared=False,
|
||||
encrypt=False,
|
||||
key_manager=CEPH_KEY_MANAGER):
|
||||
"""
|
||||
Allocate a logical volume from a block device, ensuring any
|
||||
required initialization and setup of PV's and VG's to support
|
||||
the LV.
|
||||
|
||||
:param: dev: path to block device to allocate from.
|
||||
:param: lv_type: logical volume type to create
|
||||
(data, block, journal, wal, db)
|
||||
:param: osd_fsid: UUID of the OSD associate with the LV
|
||||
:param: size: Size in LVM format for the device;
|
||||
if unset 100% of VG
|
||||
:param: shared: Shared volume group (journal, wal, db)
|
||||
:param: encrypt: Encrypt OSD devices using dm-crypt
|
||||
:param: key_manager: dm-crypt Key Manager to use
|
||||
:raises subprocess.CalledProcessError: in the event that any supporting
|
||||
LVM or parted operation fails.
|
||||
:returns: str: String in the format 'vg_name/lv_name'.
|
||||
"""
|
||||
lv_name = "osd-{}-{}".format(lv_type, osd_fsid)
|
||||
current_volumes = lvm.list_logical_volumes()
|
||||
if shared:
|
||||
dev_uuid = str(uuid.uuid4())
|
||||
else:
|
||||
dev_uuid = osd_fsid
|
||||
pv_dev = _initialize_disk(dev, dev_uuid, encrypt, key_manager)
|
||||
|
||||
vg_name = None
|
||||
if not lvm.is_lvm_physical_volume(pv_dev):
|
||||
lvm.create_lvm_physical_volume(pv_dev)
|
||||
if shared:
|
||||
vg_name = 'ceph-{}-{}'.format(lv_type,
|
||||
str(uuid.uuid4()))
|
||||
else:
|
||||
vg_name = 'ceph-{}'.format(osd_fsid)
|
||||
lvm.create_lvm_volume_group(vg_name, pv_dev)
|
||||
else:
|
||||
vg_name = lvm.list_lvm_volume_group(pv_dev)
|
||||
|
||||
if lv_name not in current_volumes:
|
||||
lvm.create_logical_volume(lv_name, vg_name, size)
|
||||
|
||||
return "{}/{}".format(vg_name, lv_name)
|
||||
|
||||
|
||||
def osdize_dir(path, encrypt=False, bluestore=False):
|
||||
"""Ask ceph-disk to prepare a directory to become an osd.
|
||||
|
|
|
@ -186,10 +186,14 @@ class CephHooksTestCase(unittest.TestCase):
|
|||
mocks["apt_install"].assert_called_once_with(
|
||||
["python-dbus", "lockfile-progs"])
|
||||
|
||||
@patch.object(ceph_hooks, 'ceph')
|
||||
@patch.object(ceph_hooks, 'notify_client')
|
||||
@patch.object(ceph_hooks, 'config')
|
||||
def test_upgrade_charm_with_nrpe_relation_installs_dependencies(
|
||||
self,
|
||||
mock_config):
|
||||
mock_config,
|
||||
mock_notify_client,
|
||||
mock_ceph):
|
||||
config = copy.deepcopy(CHARM_CONFIG)
|
||||
mock_config.side_effect = lambda key: config[key]
|
||||
with patch.multiple(
|
||||
|
@ -207,6 +211,8 @@ class CephHooksTestCase(unittest.TestCase):
|
|||
ceph_hooks.upgrade_charm()
|
||||
mocks["apt_install"].assert_called_with(
|
||||
["python-dbus", "lockfile-progs"])
|
||||
mock_notify_client.assert_called_once_with()
|
||||
mock_ceph.update_monfs.assert_called_once_with()
|
||||
|
||||
|
||||
class RelatedUnitsTestCase(unittest.TestCase):
|
||||
|
|
Loading…
Reference in New Issue