diff --git a/hooks/ceph_hooks.py b/hooks/ceph_hooks.py index c6b925fd..003c6a89 100755 --- a/hooks/ceph_hooks.py +++ b/hooks/ceph_hooks.py @@ -513,6 +513,7 @@ def prepare_disks_and_activate(): log('ceph bootstrapped, rescanning disks') emit_cephconf() bluestore = use_bluestore() + ceph.udevadm_settle() for dev in get_devices(): ceph.osdize(dev, config('osd-format'), osd_journal, diff --git a/lib/ceph/broker.py b/lib/ceph/broker.py index 3e857d21..3226f4cc 100644 --- a/lib/ceph/broker.py +++ b/lib/ceph/broker.py @@ -85,6 +85,7 @@ POOL_KEYS = { "compression_mode": [str, ["none", "passive", "aggressive", "force"]], "compression_algorithm": [str, ["lz4", "snappy", "zlib", "zstd"]], "compression_required_ratio": [float, [0.0, 1.0]], + "crush_rule": [str], } CEPH_BUCKET_TYPES = [ @@ -368,7 +369,8 @@ def handle_erasure_pool(request, service): """ pool_name = request.get('name') erasure_profile = request.get('erasure-profile') - quota = request.get('max-bytes') + max_bytes = request.get('max-bytes') + max_objects = request.get('max-objects') weight = request.get('weight') group_name = request.get('group') @@ -408,8 +410,9 @@ def handle_erasure_pool(request, service): pool.create() # Set a quota if requested - if quota is not None: - set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota) + if max_bytes or max_objects: + set_pool_quota(service=service, pool_name=pool_name, + max_bytes=max_bytes, max_objects=max_objects) def handle_replicated_pool(request, service): @@ -421,7 +424,8 @@ def handle_replicated_pool(request, service): """ pool_name = request.get('name') replicas = request.get('replicas') - quota = request.get('max-bytes') + max_bytes = request.get('max-bytes') + max_objects = request.get('max-objects') weight = request.get('weight') group_name = request.get('group') @@ -468,8 +472,9 @@ def handle_replicated_pool(request, service): level=DEBUG) # Set a quota if requested - if quota is not None: - set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota) + if max_bytes or max_objects: + set_pool_quota(service=service, pool_name=pool_name, + max_bytes=max_bytes, max_objects=max_objects) def handle_create_cache_tier(request, service): diff --git a/lib/ceph/utils.py b/lib/ceph/utils.py index 83388c9b..b4f87907 100644 --- a/lib/ceph/utils.py +++ b/lib/ceph/utils.py @@ -935,6 +935,11 @@ def start_osds(devices): subprocess.check_call(['ceph-disk', 'activate', dev_or_path]) +def udevadm_settle(): + cmd = ['udevadm', 'settle'] + subprocess.call(cmd) + + def rescan_osd_devices(): cmd = [ 'udevadm', 'trigger', @@ -943,8 +948,7 @@ def rescan_osd_devices(): subprocess.call(cmd) - cmd = ['udevadm', 'settle'] - subprocess.call(cmd) + udevadm_settle() _bootstrap_keyring = "/var/lib/ceph/bootstrap-osd/ceph.keyring" @@ -1072,8 +1076,8 @@ _upgrade_caps = { } -def get_radosgw_key(pool_list=None): - return get_named_key(name='radosgw.gateway', +def get_radosgw_key(pool_list=None, name=None): + return get_named_key(name=name or 'radosgw.gateway', caps=_radosgw_caps, pool_list=pool_list) @@ -1128,6 +1132,15 @@ osd_upgrade_caps = collections.OrderedDict([ ]) ]) +rbd_mirror_caps = collections.OrderedDict([ + ('mon', ['profile rbd']), + ('osd', ['profile rbd']), +]) + + +def get_rbd_mirror_key(name): + return get_named_key(name=name, caps=rbd_mirror_caps) + def create_named_keyring(entity, name, caps=None): caps = caps or _default_caps @@ -1819,6 +1832,13 @@ def _initialize_disk(dev, dev_uuid, encrypt=False, '--uuid', dev_uuid, dev, ]) + subprocess.check_call([ + 'dd', + 'if=/dev/zero', + 'of={}'.format(dm_crypt), + 'bs=512', + 'count=1', + ]) if use_vaultlocker: return dm_crypt @@ -1899,6 +1919,7 @@ def osdize_dir(path, encrypt=False, bluestore=False): return mkdir(path, owner=ceph_user(), group=ceph_user(), perms=0o755) + chownr('/var/lib/ceph', ceph_user(), ceph_user()) cmd = [ 'sudo', '-u', ceph_user(), 'ceph-disk', @@ -2499,18 +2520,21 @@ def update_owner(path, recurse_dirs=True): secs=elapsed_time.total_seconds(), path=path), DEBUG) -def list_pools(service): +def list_pools(client='admin'): """This will list the current pools that Ceph has - :param service: String service id to run under - :returns: list. Returns a list of the ceph pools. - :raises: CalledProcessError if the subprocess fails to run. + :param client: (Optional) client id for ceph key to use + Defaults to ``admin`` + :type cilent: str + :returns: Returns a list of available pools. + :rtype: list + :raises: subprocess.CalledProcessError if the subprocess fails to run. """ try: pool_list = [] - pools = str(subprocess - .check_output(['rados', '--id', service, 'lspools']) - .decode('UTF-8')) + pools = subprocess.check_output(['rados', '--id', client, 'lspools'], + universal_newlines=True, + stderr=subprocess.STDOUT) for pool in pools.splitlines(): pool_list.append(pool) return pool_list @@ -2519,6 +2543,140 @@ def list_pools(service): raise +def get_pool_param(pool, param, client='admin'): + """Get parameter from pool. + + :param pool: Name of pool to get variable from + :type pool: str + :param param: Name of variable to get + :type param: str + :param client: (Optional) client id for ceph key to use + Defaults to ``admin`` + :type cilent: str + :returns: Value of variable on pool or None + :rtype: str or None + :raises: subprocess.CalledProcessError + """ + try: + output = subprocess.check_output( + ['ceph', '--id', client, 'osd', 'pool', 'get', pool, param], + universal_newlines=True, stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as cp: + if cp.returncode == 2 and 'ENOENT: option' in cp.output: + return None + raise + if ':' in output: + return output.split(':')[1].lstrip().rstrip() + + +def get_pool_erasure_profile(pool, client='admin'): + """Get erasure code profile for pool. + + :param pool: Name of pool to get variable from + :type pool: str + :param client: (Optional) client id for ceph key to use + Defaults to ``admin`` + :type cilent: str + :returns: Erasure code profile of pool or None + :rtype: str or None + :raises: subprocess.CalledProcessError + """ + try: + return get_pool_param(pool, 'erasure_code_profile', client=client) + except subprocess.CalledProcessError as cp: + if cp.returncode == 13 and 'EACCES: pool' in cp.output: + # Not a Erasure coded pool + return None + raise + + +def get_pool_quota(pool, client='admin'): + """Get pool quota. + + :param pool: Name of pool to get variable from + :type pool: str + :param client: (Optional) client id for ceph key to use + Defaults to ``admin`` + :type cilent: str + :returns: Dictionary with quota variables + :rtype: dict + :raises: subprocess.CalledProcessError + """ + output = subprocess.check_output( + ['ceph', '--id', client, 'osd', 'pool', 'get-quota', pool], + universal_newlines=True, stderr=subprocess.STDOUT) + rc = re.compile(r'\s+max\s+(\S+)\s*:\s+(\d+)') + result = {} + for line in output.splitlines(): + m = rc.match(line) + if m: + result.update({'max_{}'.format(m.group(1)): m.group(2)}) + return result + + +def get_pool_applications(pool='', client='admin'): + """Get pool applications. + + :param pool: (Optional) Name of pool to get applications for + Defaults to get for all pools + :type pool: str + :param client: (Optional) client id for ceph key to use + Defaults to ``admin`` + :type cilent: str + :returns: Dictionary with pool name as key + :rtype: dict + :raises: subprocess.CalledProcessError + """ + + cmd = ['ceph', '--id', client, 'osd', 'pool', 'application', 'get'] + if pool: + cmd.append(pool) + try: + output = subprocess.check_output(cmd, + universal_newlines=True, + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as cp: + if cp.returncode == 2 and 'ENOENT' in cp.output: + return {} + raise + return json.loads(output) + + +def list_pools_detail(): + """Get detailed information about pools. + + Structure: + {'pool_name_1': {'applications': {'application': {}}, + 'parameters': {'pg_num': '42', 'size': '42'}, + 'quota': {'max_bytes': '1000', + 'max_objects': '10'}, + }, + 'pool_name_2': ... + } + + :returns: Dictionary with detailed pool information. + :rtype: dict + :raises: subproces.CalledProcessError + """ + get_params = ['pg_num', 'size'] + result = {} + applications = get_pool_applications() + for pool in list_pools(): + result[pool] = { + 'applications': applications.get(pool, {}), + 'parameters': {}, + 'quota': get_pool_quota(pool), + } + for param in get_params: + result[pool]['parameters'].update({ + param: get_pool_param(pool, param)}) + erasure_profile = get_pool_erasure_profile(pool) + if erasure_profile: + result[pool]['parameters'].update({ + 'erasure_code_profile': erasure_profile}) + return result + + def dirs_need_ownership_update(service): """Determines if directories still need change of ownership.