From 0f3203b18cb62af15ea394fe7f80ef68b1b578da Mon Sep 17 00:00:00 2001 From: James Page Date: Tue, 12 Feb 2019 11:33:38 +0000 Subject: [PATCH] Add support for radosgw upgrades Sync charms.ceph and use helper functions to determine whether any changes in the source configuration option are a supported upgrade path. If an upgrade path is detected then upgrade via apt_install with the full list of required packages for the radosgw to force an upgrade. Change-Id: I48a8b5d14ad6ac11af57ddf0260a4a41744e7e21 Closes-Bug: 1539335 --- .pydevproject | 17 +- Makefile | 9 +- hooks/{ceph.py => ceph_rgw.py} | 0 hooks/hooks.py | 46 +- hooks/install_deps | 2 +- lib/.keep | 3 - lib/ceph/__init__.py | 0 lib/ceph/broker.py | 872 ++++++++++ lib/ceph/crush_utils.py | 154 ++ lib/ceph/utils.py | 2729 ++++++++++++++++++++++++++++++++ unit_tests/test_ceph.py | 2 +- unit_tests/test_hooks.py | 68 +- 12 files changed, 3878 insertions(+), 24 deletions(-) rename hooks/{ceph.py => ceph_rgw.py} (100%) delete mode 100644 lib/.keep create mode 100644 lib/ceph/__init__.py create mode 100644 lib/ceph/broker.py create mode 100644 lib/ceph/crush_utils.py create mode 100644 lib/ceph/utils.py diff --git a/.pydevproject b/.pydevproject index 98cc65d3..03181631 100644 --- a/.pydevproject +++ b/.pydevproject @@ -1,8 +1,15 @@ -python 2.7 -Default - -/ceph-radosgw/hooks - + + python 2.7 + + Default + + + /${PROJECT_DIR_NAME}/lib + /${PROJECT_DIR_NAME}/hooks + /${PROJECT_DIR_NAME}/unit_tests + /${PROJECT_DIR_NAME}/actions + + diff --git a/Makefile b/Makefile index 6813bb22..a0ab412e 100644 --- a/Makefile +++ b/Makefile @@ -16,9 +16,12 @@ bin/charm_helpers_sync.py: @mkdir -p bin @curl -o bin/charm_helpers_sync.py https://raw.githubusercontent.com/juju/charm-helpers/master/tools/charm_helpers_sync/charm_helpers_sync.py +bin/git_sync.py: + @mkdir -p bin + @wget -O bin/git_sync.py https://raw.githubusercontent.com/CanonicalLtd/git-sync/master/git_sync.py + sync: bin/charm_helpers_sync.py @$(PYTHON) bin/charm_helpers_sync.py -c charm-helpers-hooks.yaml -publish: lint test - bzr push lp:charms/ceph-radosgw - bzr push lp:charms/trusty/ceph-radosgw +ceph-sync: bin/git_sync.py + $(PYTHON) bin/git_sync.py -d lib -s https://github.com/openstack/charms.ceph.git diff --git a/hooks/ceph.py b/hooks/ceph_rgw.py similarity index 100% rename from hooks/ceph.py rename to hooks/ceph_rgw.py diff --git a/hooks/hooks.py b/hooks/hooks.py index b1d79d84..77f42263 100755 --- a/hooks/hooks.py +++ b/hooks/hooks.py @@ -19,7 +19,10 @@ import subprocess import sys import socket -import ceph +sys.path.append('lib') + +import ceph_rgw as ceph +import ceph.utils as ceph_utils from charmhelpers.core.hookenv import ( relation_get, @@ -39,6 +42,7 @@ from charmhelpers.fetch import ( apt_purge, add_source, filter_installed_packages, + filter_missing_packages, ) from charmhelpers.payload.execd import execd_preinstall from charmhelpers.core.host import ( @@ -115,16 +119,45 @@ APACHE_PACKAGES = [ ] +def upgrade_available(): + """Check for upgrade for ceph + + :returns: whether an upgrade is available + :rtype: boolean + """ + c = config() + old_version = ceph_utils.resolve_ceph_version(c.previous('source') or + 'distro') + new_version = ceph_utils.resolve_ceph_version(c.get('source')) + if (old_version in ceph_utils.UPGRADE_PATHS and + new_version == ceph_utils.UPGRADE_PATHS[old_version]): + return True + return False + + def install_packages(): - add_source(config('source'), config('key')) - apt_update(fatal=True) + c = config() + if c.changed('source') or c.changed('key'): + add_source(c.get('source'), c.get('key')) + apt_update(fatal=True) + if is_container(): PACKAGES.remove('ntp') - pkgs = filter_installed_packages(PACKAGES) + + # NOTE: just use full package list if we're in an upgrade + # config-changed execution + pkgs = ( + PACKAGES if upgrade_available() else + filter_installed_packages(PACKAGES) + ) if pkgs: status_set('maintenance', 'Installing radosgw packages') - apt_install(PACKAGES, fatal=True) - apt_purge(APACHE_PACKAGES) + apt_install(pkgs, fatal=True) + + pkgs = filter_missing_packages(APACHE_PACKAGES) + if pkgs: + apt_purge(pkgs) + disable_unused_apache_sites() @@ -153,7 +186,6 @@ def config_changed(): return install_packages() - disable_unused_apache_sites() if config('prefer-ipv6'): status_set('maintenance', 'configuring ipv6') diff --git a/hooks/install_deps b/hooks/install_deps index 4d06619a..0f116166 100755 --- a/hooks/install_deps +++ b/hooks/install_deps @@ -1,7 +1,7 @@ #!/bin/bash -e # Install required dependencies for charm runtime -declare -a DEPS=('apt' 'netaddr' 'netifaces' 'yaml' 'jinja2' 'dnspython') +declare -a DEPS=('apt' 'netaddr' 'netifaces' 'yaml' 'jinja2' 'dnspython' 'pyudev') check_and_install() { pkg="${1}-${2}" diff --git a/lib/.keep b/lib/.keep deleted file mode 100644 index f49b91ae..00000000 --- a/lib/.keep +++ /dev/null @@ -1,3 +0,0 @@ - This file was created by release-tools to ensure that this empty - directory is preserved in vcs re: lint check definitions in global - tox.ini files. This file can be removed if/when this dir is actually in use. diff --git a/lib/ceph/__init__.py b/lib/ceph/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/ceph/broker.py b/lib/ceph/broker.py new file mode 100644 index 00000000..3e857d21 --- /dev/null +++ b/lib/ceph/broker.py @@ -0,0 +1,872 @@ +# Copyright 2016 Canonical Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import json +import os + +from tempfile import NamedTemporaryFile + +from ceph.utils import ( + get_cephfs, + get_osd_weight +) +from ceph.crush_utils import Crushmap + +from charmhelpers.core.hookenv import ( + log, + DEBUG, + INFO, + ERROR, +) +from charmhelpers.contrib.storage.linux.ceph import ( + create_erasure_profile, + delete_pool, + erasure_profile_exists, + get_osds, + monitor_key_get, + monitor_key_set, + pool_exists, + pool_set, + remove_pool_snapshot, + rename_pool, + set_pool_quota, + snapshot_pool, + validator, + ErasurePool, + Pool, + ReplicatedPool, +) + +# This comes from http://docs.ceph.com/docs/master/rados/operations/pools/ +# This should do a decent job of preventing people from passing in bad values. +# It will give a useful error message +from subprocess import check_call, check_output, CalledProcessError + +POOL_KEYS = { + # "Ceph Key Name": [Python type, [Valid Range]] + "size": [int], + "min_size": [int], + "crash_replay_interval": [int], + "pgp_num": [int], # = or < pg_num + "crush_ruleset": [int], + "hashpspool": [bool], + "nodelete": [bool], + "nopgchange": [bool], + "nosizechange": [bool], + "write_fadvise_dontneed": [bool], + "noscrub": [bool], + "nodeep-scrub": [bool], + "hit_set_type": [str, ["bloom", "explicit_hash", + "explicit_object"]], + "hit_set_count": [int, [1, 1]], + "hit_set_period": [int], + "hit_set_fpp": [float, [0.0, 1.0]], + "cache_target_dirty_ratio": [float], + "cache_target_dirty_high_ratio": [float], + "cache_target_full_ratio": [float], + "target_max_bytes": [int], + "target_max_objects": [int], + "cache_min_flush_age": [int], + "cache_min_evict_age": [int], + "fast_read": [bool], + "allow_ec_overwrites": [bool], + "compression_mode": [str, ["none", "passive", "aggressive", "force"]], + "compression_algorithm": [str, ["lz4", "snappy", "zlib", "zstd"]], + "compression_required_ratio": [float, [0.0, 1.0]], +} + +CEPH_BUCKET_TYPES = [ + 'osd', + 'host', + 'chassis', + 'rack', + 'row', + 'pdu', + 'pod', + 'room', + 'datacenter', + 'region', + 'root' +] + + +def decode_req_encode_rsp(f): + """Decorator to decode incoming requests and encode responses.""" + + def decode_inner(req): + return json.dumps(f(json.loads(req))) + + return decode_inner + + +@decode_req_encode_rsp +def process_requests(reqs): + """Process Ceph broker request(s). + + This is a versioned api. API version must be supplied by the client making + the request. + + :param reqs: dict of request parameters. + :returns: dict. exit-code and reason if not 0 + """ + request_id = reqs.get('request-id') + try: + version = reqs.get('api-version') + if version == 1: + log('Processing request {}'.format(request_id), level=DEBUG) + resp = process_requests_v1(reqs['ops']) + if request_id: + resp['request-id'] = request_id + + return resp + + except Exception as exc: + log(str(exc), level=ERROR) + msg = ("Unexpected error occurred while processing requests: %s" % + reqs) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + msg = ("Missing or invalid api version ({})".format(version)) + resp = {'exit-code': 1, 'stderr': msg} + if request_id: + resp['request-id'] = request_id + + return resp + + +def handle_create_erasure_profile(request, service): + """Create an erasure profile. + + :param request: dict of request operations and params + :param service: The ceph client to run the command under. + :returns: dict. exit-code and reason if not 0 + """ + # "local" | "shec" or it defaults to "jerasure" + erasure_type = request.get('erasure-type') + # "host" | "rack" or it defaults to "host" # Any valid Ceph bucket + failure_domain = request.get('failure-domain') + name = request.get('name') + k = request.get('k') + m = request.get('m') + l = request.get('l') + + if failure_domain not in CEPH_BUCKET_TYPES: + msg = "failure-domain must be one of {}".format(CEPH_BUCKET_TYPES) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + create_erasure_profile(service=service, erasure_plugin_name=erasure_type, + profile_name=name, failure_domain=failure_domain, + data_chunks=k, coding_chunks=m, locality=l) + + +def handle_add_permissions_to_key(request, service): + """Groups are defined by the key cephx.groups.(namespace-)?-(name). This + key will contain a dict serialized to JSON with data about the group, + including pools and members. + + A group can optionally have a namespace defined that will be used to + further restrict pool access. + """ + resp = {'exit-code': 0} + + service_name = request.get('name') + group_name = request.get('group') + group_namespace = request.get('group-namespace') + if group_namespace: + group_name = "{}-{}".format(group_namespace, group_name) + group = get_group(group_name=group_name) + service_obj = get_service_groups(service=service_name, + namespace=group_namespace) + if request.get('object-prefix-permissions'): + service_obj['object_prefix_perms'] = request.get( + 'object-prefix-permissions') + format("Service object: {}".format(service_obj)) + permission = request.get('group-permission') or "rwx" + if service_name not in group['services']: + group['services'].append(service_name) + save_group(group=group, group_name=group_name) + if permission not in service_obj['group_names']: + service_obj['group_names'][permission] = [] + if group_name not in service_obj['group_names'][permission]: + service_obj['group_names'][permission].append(group_name) + save_service(service=service_obj, service_name=service_name) + service_obj['groups'] = _build_service_groups(service_obj, + group_namespace) + update_service_permissions(service_name, service_obj, group_namespace) + + return resp + + +def update_service_permissions(service, service_obj=None, namespace=None): + """Update the key permissions for the named client in Ceph""" + if not service_obj: + service_obj = get_service_groups(service=service, namespace=namespace) + permissions = pool_permission_list_for_service(service_obj) + call = ['ceph', 'auth', 'caps', 'client.{}'.format(service)] + permissions + try: + check_call(call) + except CalledProcessError as e: + log("Error updating key capabilities: {}".format(e)) + + +def add_pool_to_group(pool, group, namespace=None): + """Add a named pool to a named group""" + group_name = group + if namespace: + group_name = "{}-{}".format(namespace, group_name) + group = get_group(group_name=group_name) + if pool not in group['pools']: + group["pools"].append(pool) + save_group(group, group_name=group_name) + for service in group['services']: + update_service_permissions(service, namespace=namespace) + + +def pool_permission_list_for_service(service): + """Build the permission string for Ceph for a given service""" + permissions = [] + permission_types = collections.OrderedDict() + for permission, group in sorted(service["group_names"].items()): + if permission not in permission_types: + permission_types[permission] = [] + for item in group: + permission_types[permission].append(item) + for permission, groups in permission_types.items(): + permission = "allow {}".format(permission) + for group in groups: + for pool in service['groups'][group].get('pools', []): + permissions.append("{} pool={}".format(permission, pool)) + for permission, prefixes in sorted( + service.get("object_prefix_perms", {}).items()): + for prefix in prefixes: + permissions.append("allow {} object_prefix {}".format(permission, + prefix)) + return ['mon', 'allow r, allow command "osd blacklist"', + 'osd', ', '.join(permissions)] + + +def get_service_groups(service, namespace=None): + """Services are objects stored with some metadata, they look like (for a + service named "nova"): + { + group_names: {'rwx': ['images']}, + groups: {} + } + After populating the group, it looks like: + { + group_names: {'rwx': ['images']}, + groups: { + 'images': { + pools: ['glance'], + services: ['nova'] + } + } + } + """ + service_json = monitor_key_get(service='admin', + key="cephx.services.{}".format(service)) + try: + service = json.loads(service_json) + except (TypeError, ValueError): + service = None + if service: + service['groups'] = _build_service_groups(service, namespace) + else: + service = {'group_names': {}, 'groups': {}} + return service + + +def _build_service_groups(service, namespace=None): + """Rebuild the 'groups' dict for a service group + + :returns: dict: dictionary keyed by group name of the following + format: + + { + 'images': { + pools: ['glance'], + services: ['nova', 'glance] + }, + 'vms':{ + pools: ['nova'], + services: ['nova'] + } + } + """ + all_groups = {} + for groups in service['group_names'].values(): + for group in groups: + name = group + if namespace: + name = "{}-{}".format(namespace, name) + all_groups[group] = get_group(group_name=name) + return all_groups + + +def get_group(group_name): + """A group is a structure to hold data about a named group, structured as: + { + pools: ['glance'], + services: ['nova'] + } + """ + group_key = get_group_key(group_name=group_name) + group_json = monitor_key_get(service='admin', key=group_key) + try: + group = json.loads(group_json) + except (TypeError, ValueError): + group = None + if not group: + group = { + 'pools': [], + 'services': [] + } + return group + + +def save_service(service_name, service): + """Persist a service in the monitor cluster""" + service['groups'] = {} + return monitor_key_set(service='admin', + key="cephx.services.{}".format(service_name), + value=json.dumps(service, sort_keys=True)) + + +def save_group(group, group_name): + """Persist a group in the monitor cluster""" + group_key = get_group_key(group_name=group_name) + return monitor_key_set(service='admin', + key=group_key, + value=json.dumps(group, sort_keys=True)) + + +def get_group_key(group_name): + """Build group key""" + return 'cephx.groups.{}'.format(group_name) + + +def handle_erasure_pool(request, service): + """Create a new erasure coded pool. + + :param request: dict of request operations and params. + :param service: The ceph client to run the command under. + :returns: dict. exit-code and reason if not 0. + """ + pool_name = request.get('name') + erasure_profile = request.get('erasure-profile') + quota = request.get('max-bytes') + weight = request.get('weight') + group_name = request.get('group') + + if erasure_profile is None: + erasure_profile = "default-canonical" + + app_name = request.get('app-name') + + # Check for missing params + if pool_name is None: + msg = "Missing parameter. name is required for the pool" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + if group_name: + group_namespace = request.get('group-namespace') + # Add the pool to the group named "group_name" + add_pool_to_group(pool=pool_name, + group=group_name, + namespace=group_namespace) + + # TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds + if not erasure_profile_exists(service=service, name=erasure_profile): + # TODO: Fail and tell them to create the profile or default + msg = ("erasure-profile {} does not exist. Please create it with: " + "create-erasure-profile".format(erasure_profile)) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + pool = ErasurePool(service=service, name=pool_name, + erasure_code_profile=erasure_profile, + percent_data=weight, app_name=app_name) + # Ok make the erasure pool + if not pool_exists(service=service, name=pool_name): + log("Creating pool '{}' (erasure_profile={})" + .format(pool.name, erasure_profile), level=INFO) + pool.create() + + # Set a quota if requested + if quota is not None: + set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota) + + +def handle_replicated_pool(request, service): + """Create a new replicated pool. + + :param request: dict of request operations and params. + :param service: The ceph client to run the command under. + :returns: dict. exit-code and reason if not 0. + """ + pool_name = request.get('name') + replicas = request.get('replicas') + quota = request.get('max-bytes') + weight = request.get('weight') + group_name = request.get('group') + + # Optional params + pg_num = request.get('pg_num') + if pg_num: + # Cap pg_num to max allowed just in case. + osds = get_osds(service) + if osds: + pg_num = min(pg_num, (len(osds) * 100 // replicas)) + + app_name = request.get('app-name') + # Check for missing params + if pool_name is None or replicas is None: + msg = "Missing parameter. name and replicas are required" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + if group_name: + group_namespace = request.get('group-namespace') + # Add the pool to the group named "group_name" + add_pool_to_group(pool=pool_name, + group=group_name, + namespace=group_namespace) + + kwargs = {} + if pg_num: + kwargs['pg_num'] = pg_num + if weight: + kwargs['percent_data'] = weight + if replicas: + kwargs['replicas'] = replicas + if app_name: + kwargs['app_name'] = app_name + + pool = ReplicatedPool(service=service, + name=pool_name, **kwargs) + if not pool_exists(service=service, name=pool_name): + log("Creating pool '{}' (replicas={})".format(pool.name, replicas), + level=INFO) + pool.create() + else: + log("Pool '{}' already exists - skipping create".format(pool.name), + level=DEBUG) + + # Set a quota if requested + if quota is not None: + set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota) + + +def handle_create_cache_tier(request, service): + """Create a cache tier on a cold pool. Modes supported are + "writeback" and "readonly". + + :param request: dict of request operations and params + :param service: The ceph client to run the command under. + :returns: dict. exit-code and reason if not 0 + """ + # mode = "writeback" | "readonly" + storage_pool = request.get('cold-pool') + cache_pool = request.get('hot-pool') + cache_mode = request.get('mode') + + if cache_mode is None: + cache_mode = "writeback" + + # cache and storage pool must exist first + if not pool_exists(service=service, name=storage_pool) or not pool_exists( + service=service, name=cache_pool): + msg = ("cold-pool: {} and hot-pool: {} must exist. Please create " + "them first".format(storage_pool, cache_pool)) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + p = Pool(service=service, name=storage_pool) + p.add_cache_tier(cache_pool=cache_pool, mode=cache_mode) + + +def handle_remove_cache_tier(request, service): + """Remove a cache tier from the cold pool. + + :param request: dict of request operations and params + :param service: The ceph client to run the command under. + :returns: dict. exit-code and reason if not 0 + """ + storage_pool = request.get('cold-pool') + cache_pool = request.get('hot-pool') + # cache and storage pool must exist first + if not pool_exists(service=service, name=storage_pool) or not pool_exists( + service=service, name=cache_pool): + msg = ("cold-pool: {} or hot-pool: {} doesn't exist. Not " + "deleting cache tier".format(storage_pool, cache_pool)) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + pool = Pool(name=storage_pool, service=service) + pool.remove_cache_tier(cache_pool=cache_pool) + + +def handle_set_pool_value(request, service): + """Sets an arbitrary pool value. + + :param request: dict of request operations and params + :param service: The ceph client to run the command under. + :returns: dict. exit-code and reason if not 0 + """ + # Set arbitrary pool values + params = {'pool': request.get('name'), + 'key': request.get('key'), + 'value': request.get('value')} + if params['key'] not in POOL_KEYS: + msg = "Invalid key '{}'".format(params['key']) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + # Get the validation method + validator_params = POOL_KEYS[params['key']] + if len(validator_params) is 1: + # Validate that what the user passed is actually legal per Ceph's rules + validator(params['value'], validator_params[0]) + else: + # Validate that what the user passed is actually legal per Ceph's rules + validator(params['value'], validator_params[0], validator_params[1]) + + # Set the value + pool_set(service=service, pool_name=params['pool'], key=params['key'], + value=params['value']) + + +def handle_rgw_regionmap_update(request, service): + """Change the radosgw region map. + + :param request: dict of request operations and params + :param service: The ceph client to run the command under. + :returns: dict. exit-code and reason if not 0 + """ + name = request.get('client-name') + if not name: + msg = "Missing rgw-region or client-name params" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + try: + check_output(['radosgw-admin', + '--id', service, + 'regionmap', 'update', '--name', name]) + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + + +def handle_rgw_regionmap_default(request, service): + """Create a radosgw region map. + + :param request: dict of request operations and params + :param service: The ceph client to run the command under. + :returns: dict. exit-code and reason if not 0 + """ + region = request.get('rgw-region') + name = request.get('client-name') + if not region or not name: + msg = "Missing rgw-region or client-name params" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + try: + check_output( + [ + 'radosgw-admin', + '--id', service, + 'regionmap', + 'default', + '--rgw-region', region, + '--name', name]) + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + + +def handle_rgw_zone_set(request, service): + """Create a radosgw zone. + + :param request: dict of request operations and params + :param service: The ceph client to run the command under. + :returns: dict. exit-code and reason if not 0 + """ + json_file = request.get('zone-json') + name = request.get('client-name') + region_name = request.get('region-name') + zone_name = request.get('zone-name') + if not json_file or not name or not region_name or not zone_name: + msg = "Missing json-file or client-name params" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + infile = NamedTemporaryFile(delete=False) + with open(infile.name, 'w') as infile_handle: + infile_handle.write(json_file) + try: + check_output( + [ + 'radosgw-admin', + '--id', service, + 'zone', + 'set', + '--rgw-zone', zone_name, + '--infile', infile.name, + '--name', name, + ] + ) + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + os.unlink(infile.name) + + +def handle_put_osd_in_bucket(request, service): + """Move an osd into a specified crush bucket. + + :param request: dict of request operations and params + :param service: The ceph client to run the command under. + :returns: dict. exit-code and reason if not 0 + """ + osd_id = request.get('osd') + target_bucket = request.get('bucket') + if not osd_id or not target_bucket: + msg = "Missing OSD ID or Bucket" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + crushmap = Crushmap() + try: + crushmap.ensure_bucket_is_present(target_bucket) + check_output( + [ + 'ceph', + '--id', service, + 'osd', + 'crush', + 'set', + str(osd_id), + str(get_osd_weight(osd_id)), + "root={}".format(target_bucket) + ] + ) + + except Exception as exc: + msg = "Failed to move OSD " \ + "{} into Bucket {} :: {}".format(osd_id, target_bucket, exc) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + +def handle_rgw_create_user(request, service): + """Create a new rados gateway user. + + :param request: dict of request operations and params + :param service: The ceph client to run the command under. + :returns: dict. exit-code and reason if not 0 + """ + user_id = request.get('rgw-uid') + display_name = request.get('display-name') + name = request.get('client-name') + if not name or not display_name or not user_id: + msg = "Missing client-name, display-name or rgw-uid" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + try: + create_output = check_output( + [ + 'radosgw-admin', + '--id', service, + 'user', + 'create', + '--uid', user_id, + '--display-name', display_name, + '--name', name, + '--system' + ] + ) + try: + user_json = json.loads(str(create_output.decode('UTF-8'))) + return {'exit-code': 0, 'user': user_json} + except ValueError as err: + log(err, level=ERROR) + return {'exit-code': 1, 'stderr': err} + + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + + +def handle_create_cephfs(request, service): + """Create a new cephfs. + + :param request: The broker request + :param service: The ceph client to run the command under. + :returns: dict. exit-code and reason if not 0 + """ + cephfs_name = request.get('mds_name') + data_pool = request.get('data_pool') + metadata_pool = request.get('metadata_pool') + # Check if the user params were provided + if not cephfs_name or not data_pool or not metadata_pool: + msg = "Missing mds_name, data_pool or metadata_pool params" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + # Sanity check that the required pools exist + if not pool_exists(service=service, name=data_pool): + msg = "CephFS data pool does not exist. Cannot create CephFS" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + if not pool_exists(service=service, name=metadata_pool): + msg = "CephFS metadata pool does not exist. Cannot create CephFS" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + if get_cephfs(service=service): + # CephFS new has already been called + log("CephFS already created") + return + + # Finally create CephFS + try: + check_output(["ceph", + '--id', service, + "fs", "new", cephfs_name, + metadata_pool, + data_pool]) + except CalledProcessError as err: + if err.returncode == 22: + log("CephFS already created") + return + else: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + + +def handle_rgw_region_set(request, service): + # radosgw-admin region set --infile us.json --name client.radosgw.us-east-1 + """Set the rados gateway region. + + :param request: dict. The broker request. + :param service: The ceph client to run the command under. + :returns: dict. exit-code and reason if not 0 + """ + json_file = request.get('region-json') + name = request.get('client-name') + region_name = request.get('region-name') + zone_name = request.get('zone-name') + if not json_file or not name or not region_name or not zone_name: + msg = "Missing json-file or client-name params" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + infile = NamedTemporaryFile(delete=False) + with open(infile.name, 'w') as infile_handle: + infile_handle.write(json_file) + try: + check_output( + [ + 'radosgw-admin', + '--id', service, + 'region', + 'set', + '--rgw-zone', zone_name, + '--infile', infile.name, + '--name', name, + ] + ) + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + os.unlink(infile.name) + + +def process_requests_v1(reqs): + """Process v1 requests. + + Takes a list of requests (dicts) and processes each one. If an error is + found, processing stops and the client is notified in the response. + + Returns a response dict containing the exit code (non-zero if any + operation failed along with an explanation). + """ + ret = None + log("Processing {} ceph broker requests".format(len(reqs)), level=INFO) + for req in reqs: + op = req.get('op') + log("Processing op='{}'".format(op), level=DEBUG) + # Use admin client since we do not have other client key locations + # setup to use them for these operations. + svc = 'admin' + if op == "create-pool": + pool_type = req.get('pool-type') # "replicated" | "erasure" + + # Default to replicated if pool_type isn't given + if pool_type == 'erasure': + ret = handle_erasure_pool(request=req, service=svc) + else: + ret = handle_replicated_pool(request=req, service=svc) + elif op == "create-cephfs": + ret = handle_create_cephfs(request=req, service=svc) + elif op == "create-cache-tier": + ret = handle_create_cache_tier(request=req, service=svc) + elif op == "remove-cache-tier": + ret = handle_remove_cache_tier(request=req, service=svc) + elif op == "create-erasure-profile": + ret = handle_create_erasure_profile(request=req, service=svc) + elif op == "delete-pool": + pool = req.get('name') + ret = delete_pool(service=svc, name=pool) + elif op == "rename-pool": + old_name = req.get('name') + new_name = req.get('new-name') + ret = rename_pool(service=svc, old_name=old_name, + new_name=new_name) + elif op == "snapshot-pool": + pool = req.get('name') + snapshot_name = req.get('snapshot-name') + ret = snapshot_pool(service=svc, pool_name=pool, + snapshot_name=snapshot_name) + elif op == "remove-pool-snapshot": + pool = req.get('name') + snapshot_name = req.get('snapshot-name') + ret = remove_pool_snapshot(service=svc, pool_name=pool, + snapshot_name=snapshot_name) + elif op == "set-pool-value": + ret = handle_set_pool_value(request=req, service=svc) + elif op == "rgw-region-set": + ret = handle_rgw_region_set(request=req, service=svc) + elif op == "rgw-zone-set": + ret = handle_rgw_zone_set(request=req, service=svc) + elif op == "rgw-regionmap-update": + ret = handle_rgw_regionmap_update(request=req, service=svc) + elif op == "rgw-regionmap-default": + ret = handle_rgw_regionmap_default(request=req, service=svc) + elif op == "rgw-create-user": + ret = handle_rgw_create_user(request=req, service=svc) + elif op == "move-osd-to-bucket": + ret = handle_put_osd_in_bucket(request=req, service=svc) + elif op == "add-permissions-to-key": + ret = handle_add_permissions_to_key(request=req, service=svc) + else: + msg = "Unknown operation '{}'".format(op) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + if type(ret) == dict and 'exit-code' in ret: + return ret + + return {'exit-code': 0} diff --git a/lib/ceph/crush_utils.py b/lib/ceph/crush_utils.py new file mode 100644 index 00000000..8b6876c1 --- /dev/null +++ b/lib/ceph/crush_utils.py @@ -0,0 +1,154 @@ +# Copyright 2014 Canonical Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re + +from subprocess import check_output, CalledProcessError + +from charmhelpers.core.hookenv import ( + log, + ERROR, +) + +CRUSH_BUCKET = """root {name} {{ + id {id} # do not change unnecessarily + # weight 0.000 + alg straw + hash 0 # rjenkins1 +}} + +rule {name} {{ + ruleset 0 + type replicated + min_size 1 + max_size 10 + step take {name} + step chooseleaf firstn 0 type host + step emit +}}""" + +# This regular expression looks for a string like: +# root NAME { +# id NUMBER +# so that we can extract NAME and ID from the crushmap +CRUSHMAP_BUCKETS_RE = re.compile(r"root\s+(.+)\s+\{\s*id\s+(-?\d+)") + +# This regular expression looks for ID strings in the crushmap like: +# id NUMBER +# so that we can extract the IDs from a crushmap +CRUSHMAP_ID_RE = re.compile(r"id\s+(-?\d+)") + + +class Crushmap(object): + """An object oriented approach to Ceph crushmap management.""" + + def __init__(self): + self._crushmap = self.load_crushmap() + roots = re.findall(CRUSHMAP_BUCKETS_RE, self._crushmap) + buckets = [] + ids = list(map( + lambda x: int(x), + re.findall(CRUSHMAP_ID_RE, self._crushmap))) + ids = sorted(ids) + if roots != []: + for root in roots: + buckets.append(CRUSHBucket(root[0], root[1], True)) + + self._buckets = buckets + if ids != []: + self._ids = ids + else: + self._ids = [0] + + def load_crushmap(self): + try: + crush = str(check_output(['ceph', 'osd', 'getcrushmap']) + .decode('UTF-8')) + return str(check_output(['crushtool', '-d', '-'], + stdin=crush.stdout) + .decode('UTF-8')) + except CalledProcessError as e: + log("Error occured while loading and decompiling CRUSH map:" + "{}".format(e), ERROR) + raise "Failed to read CRUSH map" + + def ensure_bucket_is_present(self, bucket_name): + if bucket_name not in [bucket.name for bucket in self.buckets()]: + self.add_bucket(bucket_name) + self.save() + + def buckets(self): + """Return a list of buckets that are in the Crushmap.""" + return self._buckets + + def add_bucket(self, bucket_name): + """Add a named bucket to Ceph""" + new_id = min(self._ids) - 1 + self._ids.append(new_id) + self._buckets.append(CRUSHBucket(bucket_name, new_id)) + + def save(self): + """Persist Crushmap to Ceph""" + try: + crushmap = self.build_crushmap() + compiled = str(check_output(['crushtool', '-c', '/dev/stdin', '-o', + '/dev/stdout'], stdin=crushmap) + .decode('UTF-8')) + ceph_output = str(check_output(['ceph', 'osd', 'setcrushmap', '-i', + '/dev/stdin'], stdin=compiled) + .decode('UTF-8')) + return ceph_output + except CalledProcessError as e: + log("save error: {}".format(e)) + raise "Failed to save CRUSH map." + + def build_crushmap(self): + """Modifies the current CRUSH map to include the new buckets""" + tmp_crushmap = self._crushmap + for bucket in self._buckets: + if not bucket.default: + tmp_crushmap = "{}\n\n{}".format( + tmp_crushmap, + Crushmap.bucket_string(bucket.name, bucket.id)) + + return tmp_crushmap + + @staticmethod + def bucket_string(name, id): + return CRUSH_BUCKET.format(name=name, id=id) + + +class CRUSHBucket(object): + """CRUSH bucket description object.""" + + def __init__(self, name, id, default=False): + self.name = name + self.id = int(id) + self.default = default + + def __repr__(self): + return "Bucket {{Name: {name}, ID: {id}}}".format( + name=self.name, id=self.id) + + def __eq__(self, other): + """Override the default Equals behavior""" + if isinstance(other, self.__class__): + return self.__dict__ == other.__dict__ + return NotImplemented + + def __ne__(self, other): + """Define a non-equality test""" + if isinstance(other, self.__class__): + return not self.__eq__(other) + return NotImplemented diff --git a/lib/ceph/utils.py b/lib/ceph/utils.py new file mode 100644 index 00000000..98320acb --- /dev/null +++ b/lib/ceph/utils.py @@ -0,0 +1,2729 @@ +# Copyright 2017 Canonical Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import glob +import json +import os +import pyudev +import random +import re +import socket +import subprocess +import sys +import time +import uuid + +from datetime import datetime + +from charmhelpers.core import hookenv +from charmhelpers.core import templating +from charmhelpers.core.decorators import retry_on_exception +from charmhelpers.core.host import ( + chownr, + cmp_pkgrevno, + lsb_release, + mkdir, + owner, + service_restart, + service_start, + service_stop, + CompareHostReleases, +) +from charmhelpers.core.hookenv import ( + cached, + config, + log, + status_set, + DEBUG, + ERROR, + WARNING, + storage_get, + storage_list, +) +from charmhelpers.fetch import ( + apt_cache, + add_source, apt_install, apt_update +) +from charmhelpers.contrib.storage.linux.ceph import ( + get_mon_map, + monitor_key_set, + monitor_key_exists, + monitor_key_get, +) +from charmhelpers.contrib.storage.linux.utils import ( + is_block_device, + is_device_mounted, +) +from charmhelpers.contrib.openstack.utils import ( + get_os_codename_install_source, +) +from charmhelpers.contrib.storage.linux import lvm +from charmhelpers.core.unitdata import kv + +CEPH_BASE_DIR = os.path.join(os.sep, 'var', 'lib', 'ceph') +OSD_BASE_DIR = os.path.join(CEPH_BASE_DIR, 'osd') +HDPARM_FILE = os.path.join(os.sep, 'etc', 'hdparm.conf') + +LEADER = 'leader' +PEON = 'peon' +QUORUM = [LEADER, PEON] + +PACKAGES = ['ceph', 'gdisk', 'btrfs-tools', + 'radosgw', 'xfsprogs', + 'lvm2', 'parted'] + +CEPH_KEY_MANAGER = 'ceph' +VAULT_KEY_MANAGER = 'vault' +KEY_MANAGERS = [ + CEPH_KEY_MANAGER, + VAULT_KEY_MANAGER, +] + +LinkSpeed = { + "BASE_10": 10, + "BASE_100": 100, + "BASE_1000": 1000, + "GBASE_10": 10000, + "GBASE_40": 40000, + "GBASE_100": 100000, + "UNKNOWN": None +} + +# Mapping of adapter speed to sysctl settings +NETWORK_ADAPTER_SYSCTLS = { + # 10Gb + LinkSpeed["GBASE_10"]: { + 'net.core.rmem_default': 524287, + 'net.core.wmem_default': 524287, + 'net.core.rmem_max': 524287, + 'net.core.wmem_max': 524287, + 'net.core.optmem_max': 524287, + 'net.core.netdev_max_backlog': 300000, + 'net.ipv4.tcp_rmem': '10000000 10000000 10000000', + 'net.ipv4.tcp_wmem': '10000000 10000000 10000000', + 'net.ipv4.tcp_mem': '10000000 10000000 10000000' + }, + # Mellanox 10/40Gb + LinkSpeed["GBASE_40"]: { + 'net.ipv4.tcp_timestamps': 0, + 'net.ipv4.tcp_sack': 1, + 'net.core.netdev_max_backlog': 250000, + 'net.core.rmem_max': 4194304, + 'net.core.wmem_max': 4194304, + 'net.core.rmem_default': 4194304, + 'net.core.wmem_default': 4194304, + 'net.core.optmem_max': 4194304, + 'net.ipv4.tcp_rmem': '4096 87380 4194304', + 'net.ipv4.tcp_wmem': '4096 65536 4194304', + 'net.ipv4.tcp_low_latency': 1, + 'net.ipv4.tcp_adv_win_scale': 1 + } +} + + +class Partition(object): + def __init__(self, name, number, size, start, end, sectors, uuid): + """A block device partition. + + :param name: Name of block device + :param number: Partition number + :param size: Capacity of the device + :param start: Starting block + :param end: Ending block + :param sectors: Number of blocks + :param uuid: UUID of the partition + """ + self.name = name, + self.number = number + self.size = size + self.start = start + self.end = end + self.sectors = sectors + self.uuid = uuid + + def __str__(self): + return "number: {} start: {} end: {} sectors: {} size: {} " \ + "name: {} uuid: {}".format(self.number, self.start, + self.end, + self.sectors, self.size, + self.name, self.uuid) + + def __eq__(self, other): + if isinstance(other, self.__class__): + return self.__dict__ == other.__dict__ + return False + + def __ne__(self, other): + return not self.__eq__(other) + + +def unmounted_disks(): + """List of unmounted block devices on the current host.""" + disks = [] + context = pyudev.Context() + for device in context.list_devices(DEVTYPE='disk'): + if device['SUBSYSTEM'] == 'block': + matched = False + for block_type in [u'dm', u'loop', u'ram', u'nbd']: + if block_type in device.device_node: + matched = True + if matched: + continue + disks.append(device.device_node) + log("Found disks: {}".format(disks)) + return [disk for disk in disks if not is_device_mounted(disk)] + + +def save_sysctls(sysctl_dict, save_location): + """Persist the sysctls to the hard drive. + + :param sysctl_dict: dict + :param save_location: path to save the settings to + :raises: IOError if anything goes wrong with writing. + """ + try: + # Persist the settings for reboots + with open(save_location, "w") as fd: + for key, value in sysctl_dict.items(): + fd.write("{}={}\n".format(key, value)) + + except IOError as e: + log("Unable to persist sysctl settings to {}. Error {}".format( + save_location, e), level=ERROR) + raise + + +def tune_nic(network_interface): + """This will set optimal sysctls for the particular network adapter. + + :param network_interface: string The network adapter name. + """ + speed = get_link_speed(network_interface) + if speed in NETWORK_ADAPTER_SYSCTLS: + status_set('maintenance', 'Tuning device {}'.format( + network_interface)) + sysctl_file = os.path.join( + os.sep, + 'etc', + 'sysctl.d', + '51-ceph-osd-charm-{}.conf'.format(network_interface)) + try: + log("Saving sysctl_file: {} values: {}".format( + sysctl_file, NETWORK_ADAPTER_SYSCTLS[speed]), + level=DEBUG) + save_sysctls(sysctl_dict=NETWORK_ADAPTER_SYSCTLS[speed], + save_location=sysctl_file) + except IOError as e: + log("Write to /etc/sysctl.d/51-ceph-osd-charm-{} " + "failed. {}".format(network_interface, e), + level=ERROR) + + try: + # Apply the settings + log("Applying sysctl settings", level=DEBUG) + subprocess.check_output(["sysctl", "-p", sysctl_file]) + except subprocess.CalledProcessError as err: + log('sysctl -p {} failed with error {}'.format(sysctl_file, + err.output), + level=ERROR) + else: + log("No settings found for network adapter: {}".format( + network_interface), level=DEBUG) + + +def get_link_speed(network_interface): + """This will find the link speed for a given network device. Returns None + if an error occurs. + :param network_interface: string The network adapter interface. + :returns: LinkSpeed + """ + speed_path = os.path.join(os.sep, 'sys', 'class', 'net', + network_interface, 'speed') + # I'm not sure where else we'd check if this doesn't exist + if not os.path.exists(speed_path): + return LinkSpeed["UNKNOWN"] + + try: + with open(speed_path, 'r') as sysfs: + nic_speed = sysfs.readlines() + + # Did we actually read anything? + if not nic_speed: + return LinkSpeed["UNKNOWN"] + + # Try to find a sysctl match for this particular speed + for name, speed in LinkSpeed.items(): + if speed == int(nic_speed[0].strip()): + return speed + # Default to UNKNOWN if we can't find a match + return LinkSpeed["UNKNOWN"] + except IOError as e: + log("Unable to open {path} because of error: {error}".format( + path=speed_path, + error=e), level='error') + return LinkSpeed["UNKNOWN"] + + +def persist_settings(settings_dict): + # Write all settings to /etc/hdparm.conf + """ This will persist the hard drive settings to the /etc/hdparm.conf file + + The settings_dict should be in the form of {"uuid": {"key":"value"}} + + :param settings_dict: dict of settings to save + """ + if not settings_dict: + return + + try: + templating.render(source='hdparm.conf', target=HDPARM_FILE, + context=settings_dict) + except IOError as err: + log("Unable to open {path} because of error: {error}".format( + path=HDPARM_FILE, error=err), level=ERROR) + except Exception as e: + # The templating.render can raise a jinja2 exception if the + # template is not found. Rather than polluting the import + # space of this charm, simply catch Exception + log('Unable to render {path} due to error: {error}'.format( + path=HDPARM_FILE, error=e), level=ERROR) + + +def set_max_sectors_kb(dev_name, max_sectors_size): + """This function sets the max_sectors_kb size of a given block device. + + :param dev_name: Name of the block device to query + :param max_sectors_size: int of the max_sectors_size to save + """ + max_sectors_kb_path = os.path.join('sys', 'block', dev_name, 'queue', + 'max_sectors_kb') + try: + with open(max_sectors_kb_path, 'w') as f: + f.write(max_sectors_size) + except IOError as e: + log('Failed to write max_sectors_kb to {}. Error: {}'.format( + max_sectors_kb_path, e), level=ERROR) + + +def get_max_sectors_kb(dev_name): + """This function gets the max_sectors_kb size of a given block device. + + :param dev_name: Name of the block device to query + :returns: int which is either the max_sectors_kb or 0 on error. + """ + max_sectors_kb_path = os.path.join('sys', 'block', dev_name, 'queue', + 'max_sectors_kb') + + # Read in what Linux has set by default + if os.path.exists(max_sectors_kb_path): + try: + with open(max_sectors_kb_path, 'r') as f: + max_sectors_kb = f.read().strip() + return int(max_sectors_kb) + except IOError as e: + log('Failed to read max_sectors_kb to {}. Error: {}'.format( + max_sectors_kb_path, e), level=ERROR) + # Bail. + return 0 + return 0 + + +def get_max_hw_sectors_kb(dev_name): + """This function gets the max_hw_sectors_kb for a given block device. + + :param dev_name: Name of the block device to query + :returns: int which is either the max_hw_sectors_kb or 0 on error. + """ + max_hw_sectors_kb_path = os.path.join('sys', 'block', dev_name, 'queue', + 'max_hw_sectors_kb') + # Read in what the hardware supports + if os.path.exists(max_hw_sectors_kb_path): + try: + with open(max_hw_sectors_kb_path, 'r') as f: + max_hw_sectors_kb = f.read().strip() + return int(max_hw_sectors_kb) + except IOError as e: + log('Failed to read max_hw_sectors_kb to {}. Error: {}'.format( + max_hw_sectors_kb_path, e), level=ERROR) + return 0 + return 0 + + +def set_hdd_read_ahead(dev_name, read_ahead_sectors=256): + """This function sets the hard drive read ahead. + + :param dev_name: Name of the block device to set read ahead on. + :param read_ahead_sectors: int How many sectors to read ahead. + """ + try: + # Set the read ahead sectors to 256 + log('Setting read ahead to {} for device {}'.format( + read_ahead_sectors, + dev_name)) + subprocess.check_output(['hdparm', + '-a{}'.format(read_ahead_sectors), + dev_name]) + except subprocess.CalledProcessError as e: + log('hdparm failed with error: {}'.format(e.output), + level=ERROR) + + +def get_block_uuid(block_dev): + """This queries blkid to get the uuid for a block device. + + :param block_dev: Name of the block device to query. + :returns: The UUID of the device or None on Error. + """ + try: + block_info = str(subprocess + .check_output(['blkid', '-o', 'export', block_dev]) + .decode('UTF-8')) + for tag in block_info.split('\n'): + parts = tag.split('=') + if parts[0] == 'UUID': + return parts[1] + return None + except subprocess.CalledProcessError as err: + log('get_block_uuid failed with error: {}'.format(err.output), + level=ERROR) + return None + + +def check_max_sectors(save_settings_dict, + block_dev, + uuid): + """Tune the max_hw_sectors if needed. + + make sure that /sys/.../max_sectors_kb matches max_hw_sectors_kb or at + least 1MB for spinning disks + If the box has a RAID card with cache this could go much bigger. + + :param save_settings_dict: The dict used to persist settings + :param block_dev: A block device name: Example: /dev/sda + :param uuid: The uuid of the block device + """ + dev_name = None + path_parts = os.path.split(block_dev) + if len(path_parts) == 2: + dev_name = path_parts[1] + else: + log('Unable to determine the block device name from path: {}'.format( + block_dev)) + # Play it safe and bail + return + max_sectors_kb = get_max_sectors_kb(dev_name=dev_name) + max_hw_sectors_kb = get_max_hw_sectors_kb(dev_name=dev_name) + + if max_sectors_kb < max_hw_sectors_kb: + # OK we have a situation where the hardware supports more than Linux is + # currently requesting + config_max_sectors_kb = hookenv.config('max-sectors-kb') + if config_max_sectors_kb < max_hw_sectors_kb: + # Set the max_sectors_kb to the config.yaml value if it is less + # than the max_hw_sectors_kb + log('Setting max_sectors_kb for device {} to {}'.format( + dev_name, config_max_sectors_kb)) + save_settings_dict[ + "drive_settings"][uuid][ + "read_ahead_sect"] = config_max_sectors_kb + set_max_sectors_kb(dev_name=dev_name, + max_sectors_size=config_max_sectors_kb) + else: + # Set to the max_hw_sectors_kb + log('Setting max_sectors_kb for device {} to {}'.format( + dev_name, max_hw_sectors_kb)) + save_settings_dict[ + "drive_settings"][uuid]['read_ahead_sect'] = max_hw_sectors_kb + set_max_sectors_kb(dev_name=dev_name, + max_sectors_size=max_hw_sectors_kb) + else: + log('max_sectors_kb match max_hw_sectors_kb. No change needed for ' + 'device: {}'.format(block_dev)) + + +def tune_dev(block_dev): + """Try to make some intelligent decisions with HDD tuning. Future work will + include optimizing SSDs. + + This function will change the read ahead sectors and the max write + sectors for each block device. + + :param block_dev: A block device name: Example: /dev/sda + """ + uuid = get_block_uuid(block_dev) + if uuid is None: + log('block device {} uuid is None. Unable to save to ' + 'hdparm.conf'.format(block_dev), level=DEBUG) + return + save_settings_dict = {} + log('Tuning device {}'.format(block_dev)) + status_set('maintenance', 'Tuning device {}'.format(block_dev)) + set_hdd_read_ahead(block_dev) + save_settings_dict["drive_settings"] = {} + save_settings_dict["drive_settings"][uuid] = {} + save_settings_dict["drive_settings"][uuid]['read_ahead_sect'] = 256 + + check_max_sectors(block_dev=block_dev, + save_settings_dict=save_settings_dict, + uuid=uuid) + + persist_settings(settings_dict=save_settings_dict) + status_set('maintenance', 'Finished tuning device {}'.format(block_dev)) + + +def ceph_user(): + if get_version() > 1: + return 'ceph' + else: + return "root" + + +class CrushLocation(object): + def __init__(self, + name, + identifier, + host, + rack, + row, + datacenter, + chassis, + root): + self.name = name + self.identifier = identifier + self.host = host + self.rack = rack + self.row = row + self.datacenter = datacenter + self.chassis = chassis + self.root = root + + def __str__(self): + return "name: {} id: {} host: {} rack: {} row: {} datacenter: {} " \ + "chassis :{} root: {}".format(self.name, self.identifier, + self.host, self.rack, self.row, + self.datacenter, self.chassis, + self.root) + + def __eq__(self, other): + return not self.name < other.name and not other.name < self.name + + def __ne__(self, other): + return self.name < other.name or other.name < self.name + + def __gt__(self, other): + return self.name > other.name + + def __ge__(self, other): + return not self.name < other.name + + def __le__(self, other): + return self.name < other.name + + +def get_osd_weight(osd_id): + """Returns the weight of the specified OSD. + + :returns: Float + :raises: ValueError if the monmap fails to parse. + :raises: CalledProcessError if our ceph command fails. + """ + try: + tree = str(subprocess + .check_output(['ceph', 'osd', 'tree', '--format=json']) + .decode('UTF-8')) + try: + json_tree = json.loads(tree) + # Make sure children are present in the json + if not json_tree['nodes']: + return None + for device in json_tree['nodes']: + if device['type'] == 'osd' and device['name'] == osd_id: + return device['crush_weight'] + except ValueError as v: + log("Unable to parse ceph tree json: {}. Error: {}".format( + tree, v)) + raise + except subprocess.CalledProcessError as e: + log("ceph osd tree command failed with message: {}".format( + e)) + raise + + +def get_osd_tree(service): + """Returns the current osd map in JSON. + + :returns: List. + :raises: ValueError if the monmap fails to parse. + Also raises CalledProcessError if our ceph command fails + """ + try: + tree = str(subprocess + .check_output(['ceph', '--id', service, + 'osd', 'tree', '--format=json']) + .decode('UTF-8')) + try: + json_tree = json.loads(tree) + crush_list = [] + # Make sure children are present in the json + if not json_tree['nodes']: + return None + host_nodes = [ + node for node in json_tree['nodes'] + if node['type'] == 'host' + ] + for host in host_nodes: + crush_list.append( + CrushLocation( + name=host.get('name'), + identifier=host['id'], + host=host.get('host'), + rack=host.get('rack'), + row=host.get('row'), + datacenter=host.get('datacenter'), + chassis=host.get('chassis'), + root=host.get('root') + ) + ) + return crush_list + except ValueError as v: + log("Unable to parse ceph tree json: {}. Error: {}".format( + tree, v)) + raise + except subprocess.CalledProcessError as e: + log("ceph osd tree command failed with message: {}".format( + e)) + raise + + +def _get_child_dirs(path): + """Returns a list of directory names in the specified path. + + :param path: a full path listing of the parent directory to return child + directory names + :returns: list. A list of child directories under the parent directory + :raises: ValueError if the specified path does not exist or is not a + directory, + OSError if an error occurs reading the directory listing + """ + if not os.path.exists(path): + raise ValueError('Specfied path "%s" does not exist' % path) + if not os.path.isdir(path): + raise ValueError('Specified path "%s" is not a directory' % path) + + files_in_dir = [os.path.join(path, f) for f in os.listdir(path)] + return list(filter(os.path.isdir, files_in_dir)) + + +def _get_osd_num_from_dirname(dirname): + """Parses the dirname and returns the OSD id. + + Parses a string in the form of 'ceph-{osd#}' and returns the osd number + from the directory name. + + :param dirname: the directory name to return the OSD number from + :return int: the osd number the directory name corresponds to + :raises ValueError: if the osd number cannot be parsed from the provided + directory name. + """ + match = re.search('ceph-(?P\d+)', dirname) + if not match: + raise ValueError("dirname not in correct format: {}".format(dirname)) + + return match.group('osd_id') + + +def get_local_osd_ids(): + """This will list the /var/lib/ceph/osd/* directories and try + to split the ID off of the directory name and return it in + a list. + + :returns: list. A list of osd identifiers + :raises: OSError if something goes wrong with listing the directory. + """ + osd_ids = [] + osd_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'osd') + if os.path.exists(osd_path): + try: + dirs = os.listdir(osd_path) + for osd_dir in dirs: + osd_id = osd_dir.split('-')[1] + if _is_int(osd_id): + osd_ids.append(osd_id) + except OSError: + raise + return osd_ids + + +def get_local_mon_ids(): + """This will list the /var/lib/ceph/mon/* directories and try + to split the ID off of the directory name and return it in + a list. + + :returns: list. A list of monitor identifiers + :raises: OSError if something goes wrong with listing the directory. + """ + mon_ids = [] + mon_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'mon') + if os.path.exists(mon_path): + try: + dirs = os.listdir(mon_path) + for mon_dir in dirs: + # Basically this takes everything after ceph- as the monitor ID + match = re.search('ceph-(?P.*)', mon_dir) + if match: + mon_ids.append(match.group('mon_id')) + except OSError: + raise + return mon_ids + + +def _is_int(v): + """Return True if the object v can be turned into an integer.""" + try: + int(v) + return True + except ValueError: + return False + + +def get_version(): + """Derive Ceph release from an installed package.""" + import apt_pkg as apt + + cache = apt_cache() + package = "ceph" + try: + pkg = cache[package] + except: + # the package is unknown to the current apt cache. + e = 'Could not determine version of package with no installation ' \ + 'candidate: %s' % package + error_out(e) + + if not pkg.current_ver: + # package is known, but no version is currently installed. + e = 'Could not determine version of uninstalled package: %s' % package + error_out(e) + + vers = apt.upstream_version(pkg.current_ver.ver_str) + + # x.y match only for 20XX.X + # and ignore patch level for other packages + match = re.match('^(\d+)\.(\d+)', vers) + + if match: + vers = match.group(0) + return float(vers) + + +def error_out(msg): + log("FATAL ERROR: {}".format(msg), + level=ERROR) + sys.exit(1) + + +def is_quorum(): + asok = "/var/run/ceph/ceph-mon.{}.asok".format(socket.gethostname()) + cmd = [ + "sudo", + "-u", + ceph_user(), + "ceph", + "--admin-daemon", + asok, + "mon_status" + ] + if os.path.exists(asok): + try: + result = json.loads(str(subprocess + .check_output(cmd) + .decode('UTF-8'))) + except subprocess.CalledProcessError: + return False + except ValueError: + # Non JSON response from mon_status + return False + if result['state'] in QUORUM: + return True + else: + return False + else: + return False + + +def is_leader(): + asok = "/var/run/ceph/ceph-mon.{}.asok".format(socket.gethostname()) + cmd = [ + "sudo", + "-u", + ceph_user(), + "ceph", + "--admin-daemon", + asok, + "mon_status" + ] + if os.path.exists(asok): + try: + result = json.loads(str(subprocess + .check_output(cmd) + .decode('UTF-8'))) + except subprocess.CalledProcessError: + return False + except ValueError: + # Non JSON response from mon_status + return False + if result['state'] == LEADER: + return True + else: + return False + else: + return False + + +def wait_for_quorum(): + while not is_quorum(): + log("Waiting for quorum to be reached") + time.sleep(3) + + +def add_bootstrap_hint(peer): + asok = "/var/run/ceph/ceph-mon.{}.asok".format(socket.gethostname()) + cmd = [ + "sudo", + "-u", + ceph_user(), + "ceph", + "--admin-daemon", + asok, + "add_bootstrap_peer_hint", + peer + ] + if os.path.exists(asok): + # Ignore any errors for this call + subprocess.call(cmd) + + +DISK_FORMATS = [ + 'xfs', + 'ext4', + 'btrfs' +] + +CEPH_PARTITIONS = [ + '89C57F98-2FE5-4DC0-89C1-5EC00CEFF2BE', # ceph encrypted disk in creation + '45B0969E-9B03-4F30-B4C6-5EC00CEFF106', # ceph encrypted journal + '4FBD7E29-9D25-41B8-AFD0-5EC00CEFF05D', # ceph encrypted osd data + '4FBD7E29-9D25-41B8-AFD0-062C0CEFF05D', # ceph osd data + '45B0969E-9B03-4F30-B4C6-B4B80CEFF106', # ceph osd journal + '89C57F98-2FE5-4DC0-89C1-F3AD0CEFF2BE', # ceph disk in creation +] + + +def get_partition_list(dev): + """Lists the partitions of a block device. + + :param dev: Path to a block device. ex: /dev/sda + :returns: Returns a list of Partition objects. + :raises: CalledProcessException if lsblk fails + """ + partitions_list = [] + try: + partitions = get_partitions(dev) + # For each line of output + for partition in partitions: + parts = partition.split() + try: + partitions_list.append( + Partition(number=parts[0], + start=parts[1], + end=parts[2], + sectors=parts[3], + size=parts[4], + name=parts[5], + uuid=parts[6]) + ) + except IndexError: + partitions_list.append( + Partition(number=parts[0], + start=parts[1], + end=parts[2], + sectors=parts[3], + size=parts[4], + name="", + uuid=parts[5]) + ) + + return partitions_list + except subprocess.CalledProcessError: + raise + + +def is_pristine_disk(dev): + """ + Read first 2048 bytes (LBA 0 - 3) of block device to determine whether it + is actually all zeros and safe for us to use. + + Existing partitioning tools does not discern between a failure to read from + block device, failure to understand a partition table and the fact that a + block device has no partition table. Since we need to be positive about + which is which we need to read the device directly and confirm ourselves. + + :param dev: Path to block device + :type dev: str + :returns: True all 2048 bytes == 0x0, False if not + :rtype: bool + """ + want_bytes = 2048 + + f = open(dev, 'rb') + data = f.read(want_bytes) + read_bytes = len(data) + if read_bytes != want_bytes: + log('{}: short read, got {} bytes expected {}.' + .format(dev, read_bytes, want_bytes), level=WARNING) + return False + + return all(byte == 0x0 for byte in data) + + +def is_osd_disk(dev): + db = kv() + osd_devices = db.get('osd-devices', []) + if dev in osd_devices: + log('Device {} already processed by charm,' + ' skipping'.format(dev)) + return True + + partitions = get_partition_list(dev) + for partition in partitions: + try: + info = str(subprocess + .check_output(['sgdisk', '-i', partition.number, dev]) + .decode('UTF-8')) + info = info.split("\n") # IGNORE:E1103 + for line in info: + for ptype in CEPH_PARTITIONS: + sig = 'Partition GUID code: {}'.format(ptype) + if line.startswith(sig): + return True + except subprocess.CalledProcessError as e: + log("sgdisk inspection of partition {} on {} failed with " + "error: {}. Skipping".format(partition.minor, dev, e), + level=ERROR) + return False + + +def start_osds(devices): + # Scan for ceph block devices + rescan_osd_devices() + if cmp_pkgrevno('ceph', "0.56.6") >= 0: + # Use ceph-disk activate for directory based OSD's + for dev_or_path in devices: + if os.path.exists(dev_or_path) and os.path.isdir(dev_or_path): + subprocess.check_call(['ceph-disk', 'activate', dev_or_path]) + + +def udevadm_settle(): + cmd = ['udevadm', 'settle'] + subprocess.call(cmd) + + +def rescan_osd_devices(): + cmd = [ + 'udevadm', 'trigger', + '--subsystem-match=block', '--action=add' + ] + + subprocess.call(cmd) + + udevadm_settle() + + +_bootstrap_keyring = "/var/lib/ceph/bootstrap-osd/ceph.keyring" +_upgrade_keyring = "/var/lib/ceph/osd/ceph.client.osd-upgrade.keyring" + + +def is_bootstrapped(): + return os.path.exists(_bootstrap_keyring) + + +def wait_for_bootstrap(): + while not is_bootstrapped(): + time.sleep(3) + + +def import_osd_bootstrap_key(key): + if not os.path.exists(_bootstrap_keyring): + cmd = [ + "sudo", + "-u", + ceph_user(), + 'ceph-authtool', + _bootstrap_keyring, + '--create-keyring', + '--name=client.bootstrap-osd', + '--add-key={}'.format(key) + ] + subprocess.check_call(cmd) + + +def import_osd_upgrade_key(key): + if not os.path.exists(_upgrade_keyring): + cmd = [ + "sudo", + "-u", + ceph_user(), + 'ceph-authtool', + _upgrade_keyring, + '--create-keyring', + '--name=client.osd-upgrade', + '--add-key={}'.format(key) + ] + subprocess.check_call(cmd) + + +def generate_monitor_secret(): + cmd = [ + 'ceph-authtool', + '/dev/stdout', + '--name=mon.', + '--gen-key' + ] + res = str(subprocess.check_output(cmd).decode('UTF-8')) + + return "{}==".format(res.split('=')[1].strip()) + +# OSD caps taken from ceph-create-keys +_osd_bootstrap_caps = { + 'mon': [ + 'allow command osd create ...', + 'allow command osd crush set ...', + r'allow command auth add * osd allow\ * mon allow\ rwx', + 'allow command mon getmap' + ] +} + +_osd_bootstrap_caps_profile = { + 'mon': [ + 'allow profile bootstrap-osd' + ] +} + + +def parse_key(raw_key): + # get-or-create appears to have different output depending + # on whether its 'get' or 'create' + # 'create' just returns the key, 'get' is more verbose and + # needs parsing + key = None + if len(raw_key.splitlines()) == 1: + key = raw_key + else: + for element in raw_key.splitlines(): + if 'key' in element: + return element.split(' = ')[1].strip() # IGNORE:E1103 + return key + + +def get_osd_bootstrap_key(): + try: + # Attempt to get/create a key using the OSD bootstrap profile first + key = get_named_key('bootstrap-osd', + _osd_bootstrap_caps_profile) + except: + # If that fails try with the older style permissions + key = get_named_key('bootstrap-osd', + _osd_bootstrap_caps) + return key + + +_radosgw_keyring = "/etc/ceph/keyring.rados.gateway" + + +def import_radosgw_key(key): + if not os.path.exists(_radosgw_keyring): + cmd = [ + "sudo", + "-u", + ceph_user(), + 'ceph-authtool', + _radosgw_keyring, + '--create-keyring', + '--name=client.radosgw.gateway', + '--add-key={}'.format(key) + ] + subprocess.check_call(cmd) + +# OSD caps taken from ceph-create-keys +_radosgw_caps = { + 'mon': ['allow rw'], + 'osd': ['allow rwx'] +} +_upgrade_caps = { + 'mon': ['allow rwx'] +} + + +def get_radosgw_key(pool_list=None, name=None): + return get_named_key(name=name or 'radosgw.gateway', + caps=_radosgw_caps, + pool_list=pool_list) + + +def get_mds_key(name): + return create_named_keyring(entity='mds', + name=name, + caps=mds_caps) + + +_mds_bootstrap_caps_profile = { + 'mon': [ + 'allow profile bootstrap-mds' + ] +} + + +def get_mds_bootstrap_key(): + return get_named_key('bootstrap-mds', + _mds_bootstrap_caps_profile) + + +_default_caps = collections.OrderedDict([ + ('mon', ['allow r', + 'allow command "osd blacklist"']), + ('osd', ['allow rwx']), +]) + +admin_caps = collections.OrderedDict([ + ('mds', ['allow *']), + ('mon', ['allow *']), + ('osd', ['allow *']) +]) + +mds_caps = collections.OrderedDict([ + ('osd', ['allow *']), + ('mds', ['allow']), + ('mon', ['allow rwx']), +]) + +osd_upgrade_caps = collections.OrderedDict([ + ('mon', ['allow command "config-key"', + 'allow command "osd tree"', + 'allow command "config-key list"', + 'allow command "config-key put"', + 'allow command "config-key get"', + 'allow command "config-key exists"', + 'allow command "osd out"', + 'allow command "osd in"', + 'allow command "osd rm"', + 'allow command "auth del"', + ]) +]) + + +def create_named_keyring(entity, name, caps=None): + caps = caps or _default_caps + cmd = [ + "sudo", + "-u", + ceph_user(), + 'ceph', + '--name', 'mon.', + '--keyring', + '/var/lib/ceph/mon/ceph-{}/keyring'.format( + socket.gethostname() + ), + 'auth', 'get-or-create', '{entity}.{name}'.format(entity=entity, + name=name), + ] + for subsystem, subcaps in caps.items(): + cmd.extend([subsystem, '; '.join(subcaps)]) + log("Calling check_output: {}".format(cmd), level=DEBUG) + return (parse_key(str(subprocess + .check_output(cmd) + .decode('UTF-8')) + .strip())) # IGNORE:E1103 + + +def get_upgrade_key(): + return get_named_key('upgrade-osd', _upgrade_caps) + + +def get_named_key(name, caps=None, pool_list=None): + """Retrieve a specific named cephx key. + + :param name: String Name of key to get. + :param pool_list: The list of pools to give access to + :param caps: dict of cephx capabilities + :returns: Returns a cephx key + """ + key_name = 'client.{}'.format(name) + try: + # Does the key already exist? + output = str(subprocess.check_output( + [ + 'sudo', + '-u', ceph_user(), + 'ceph', + '--name', 'mon.', + '--keyring', + '/var/lib/ceph/mon/ceph-{}/keyring'.format( + socket.gethostname() + ), + 'auth', + 'get', + key_name, + ]).decode('UTF-8')).strip() + # NOTE(jamespage); + # Apply any changes to key capabilities, dealing with + # upgrades which requires new caps for operation. + upgrade_key_caps(key_name, + caps or _default_caps, + pool_list) + return parse_key(output) + except subprocess.CalledProcessError: + # Couldn't get the key, time to create it! + log("Creating new key for {}".format(name), level=DEBUG) + caps = caps or _default_caps + cmd = [ + "sudo", + "-u", + ceph_user(), + 'ceph', + '--name', 'mon.', + '--keyring', + '/var/lib/ceph/mon/ceph-{}/keyring'.format( + socket.gethostname() + ), + 'auth', 'get-or-create', key_name, + ] + # Add capabilities + for subsystem, subcaps in caps.items(): + if subsystem == 'osd': + if pool_list: + # This will output a string similar to: + # "pool=rgw pool=rbd pool=something" + pools = " ".join(['pool={0}'.format(i) for i in pool_list]) + subcaps[0] = subcaps[0] + " " + pools + cmd.extend([subsystem, '; '.join(subcaps)]) + + log("Calling check_output: {}".format(cmd), level=DEBUG) + return parse_key(str(subprocess + .check_output(cmd) + .decode('UTF-8')) + .strip()) # IGNORE:E1103 + + +def upgrade_key_caps(key, caps, pool_list=None): + """ Upgrade key to have capabilities caps """ + if not is_leader(): + # Not the MON leader OR not clustered + return + cmd = [ + "sudo", "-u", ceph_user(), 'ceph', 'auth', 'caps', key + ] + for subsystem, subcaps in caps.items(): + if subsystem == 'osd': + if pool_list: + # This will output a string similar to: + # "pool=rgw pool=rbd pool=something" + pools = " ".join(['pool={0}'.format(i) for i in pool_list]) + subcaps[0] = subcaps[0] + " " + pools + cmd.extend([subsystem, '; '.join(subcaps)]) + subprocess.check_call(cmd) + + +@cached +def systemd(): + return CompareHostReleases(lsb_release()['DISTRIB_CODENAME']) >= 'vivid' + + +def bootstrap_monitor_cluster(secret): + hostname = socket.gethostname() + path = '/var/lib/ceph/mon/ceph-{}'.format(hostname) + done = '{}/done'.format(path) + if systemd(): + init_marker = '{}/systemd'.format(path) + else: + init_marker = '{}/upstart'.format(path) + + keyring = '/var/lib/ceph/tmp/{}.mon.keyring'.format(hostname) + + if os.path.exists(done): + log('bootstrap_monitor_cluster: mon already initialized.') + else: + # Ceph >= 0.61.3 needs this for ceph-mon fs creation + mkdir('/var/run/ceph', owner=ceph_user(), + group=ceph_user(), perms=0o755) + mkdir(path, owner=ceph_user(), group=ceph_user(), + perms=0o755) + # end changes for Ceph >= 0.61.3 + try: + add_keyring_to_ceph(keyring, + secret, + hostname, + path, + done, + init_marker) + + except: + raise + finally: + os.unlink(keyring) + + +@retry_on_exception(3, base_delay=5) +def add_keyring_to_ceph(keyring, secret, hostname, path, done, init_marker): + subprocess.check_call(['ceph-authtool', keyring, + '--create-keyring', '--name=mon.', + '--add-key={}'.format(secret), + '--cap', 'mon', 'allow *']) + subprocess.check_call(['ceph-mon', '--mkfs', + '-i', hostname, + '--keyring', keyring]) + chownr('/var/log/ceph', ceph_user(), ceph_user()) + chownr(path, ceph_user(), ceph_user()) + with open(done, 'w'): + pass + with open(init_marker, 'w'): + pass + + if systemd(): + subprocess.check_call(['systemctl', 'enable', 'ceph-mon']) + service_restart('ceph-mon') + else: + service_restart('ceph-mon-all') + + # NOTE(jamespage): Later ceph releases require explicit + # call to ceph-create-keys to setup the + # admin keys for the cluster; this command + # will wait for quorum in the cluster before + # returning. + # NOTE(fnordahl): Explicitly run `ceph-crate-keys` for older + # ceph releases too. This improves bootstrap + # resilience as the charm will wait for + # presence of peer units before attempting + # to bootstrap. Note that charms deploying + # ceph-mon service should disable running of + # `ceph-create-keys` service in init system. + cmd = ['ceph-create-keys', '--id', hostname] + if cmp_pkgrevno('ceph', '12.0.0') >= 0: + # NOTE(fnordahl): The default timeout in ceph-create-keys of 600 + # seconds is not adequate. Increase timeout when + # timeout parameter available. For older releases + # we rely on retry_on_exception decorator. + # LP#1719436 + cmd.extend(['--timeout', '1800']) + subprocess.check_call(cmd) + _client_admin_keyring = '/etc/ceph/ceph.client.admin.keyring' + osstat = os.stat(_client_admin_keyring) + if not osstat.st_size: + # NOTE(fnordahl): Retry will fail as long as this file exists. + # LP#1719436 + os.remove(_client_admin_keyring) + raise Exception + + +def update_monfs(): + hostname = socket.gethostname() + monfs = '/var/lib/ceph/mon/ceph-{}'.format(hostname) + if systemd(): + init_marker = '{}/systemd'.format(monfs) + else: + init_marker = '{}/upstart'.format(monfs) + if os.path.exists(monfs) and not os.path.exists(init_marker): + # Mark mon as managed by upstart so that + # it gets start correctly on reboots + with open(init_marker, 'w'): + pass + + +def get_partitions(dev): + cmd = ['partx', '--raw', '--noheadings', dev] + try: + out = str(subprocess.check_output(cmd).decode('UTF-8')).splitlines() + log("get partitions: {}".format(out), level=DEBUG) + return out + except subprocess.CalledProcessError as e: + log("Can't get info for {0}: {1}".format(dev, e.output)) + return [] + + +def get_lvs(dev): + """ + List logical volumes for the provided block device + + :param: dev: Full path to block device. + :raises subprocess.CalledProcessError: in the event that any supporting + operation failed. + :returns: list: List of logical volumes provided by the block device + """ + if not lvm.is_lvm_physical_volume(dev): + return [] + vg_name = lvm.list_lvm_volume_group(dev) + return lvm.list_logical_volumes('vg_name={}'.format(vg_name)) + + +def find_least_used_utility_device(utility_devices, lvs=False): + """ + Find a utility device which has the smallest number of partitions + among other devices in the supplied list. + + :utility_devices: A list of devices to be used for filestore journal + or bluestore wal or db. + :lvs: flag to indicate whether inspection should be based on LVM LV's + :return: string device name + """ + if lvs: + usages = map(lambda a: (len(get_lvs(a)), a), utility_devices) + else: + usages = map(lambda a: (len(get_partitions(a)), a), utility_devices) + least = min(usages, key=lambda t: t[0]) + return least[1] + + +def get_devices(name): + """ Merge config and juju storage based devices + + :name: THe name of the device type, eg: wal, osd, journal + :returns: Set(device names), which are strings + """ + if config(name): + devices = [l.strip() for l in config(name).split(' ')] + else: + devices = [] + storage_ids = storage_list(name) + devices.extend((storage_get('location', s) for s in storage_ids)) + devices = filter(os.path.exists, devices) + + return set(devices) + + +def osdize(dev, osd_format, osd_journal, ignore_errors=False, encrypt=False, + bluestore=False, key_manager=CEPH_KEY_MANAGER): + if dev.startswith('/dev'): + osdize_dev(dev, osd_format, osd_journal, + ignore_errors, encrypt, + bluestore, key_manager) + else: + osdize_dir(dev, encrypt, bluestore) + + +def osdize_dev(dev, osd_format, osd_journal, ignore_errors=False, + encrypt=False, bluestore=False, key_manager=CEPH_KEY_MANAGER): + """ + Prepare a block device for use as a Ceph OSD + + A block device will only be prepared once during the lifetime + of the calling charm unit; future executions will be skipped. + + :param: dev: Full path to block device to use + :param: osd_format: Format for OSD filesystem + :param: osd_journal: List of block devices to use for OSD journals + :param: ignore_errors: Don't fail in the event of any errors during + processing + :param: encrypt: Encrypt block devices using 'key_manager' + :param: bluestore: Use bluestore native ceph block device format + :param: key_manager: Key management approach for encryption keys + :raises subprocess.CalledProcessError: in the event that any supporting + subprocess operation failed + :raises ValueError: if an invalid key_manager is provided + """ + if key_manager not in KEY_MANAGERS: + raise ValueError('Unsupported key manager: {}'.format(key_manager)) + + db = kv() + osd_devices = db.get('osd-devices', []) + try: + if dev in osd_devices: + log('Device {} already processed by charm,' + ' skipping'.format(dev)) + return + + if not os.path.exists(dev): + log('Path {} does not exist - bailing'.format(dev)) + return + + if not is_block_device(dev): + log('Path {} is not a block device - bailing'.format(dev)) + return + + if is_osd_disk(dev): + log('Looks like {} is already an' + ' OSD data or journal, skipping.'.format(dev)) + if is_device_mounted(dev): + osd_devices.append(dev) + return + + if is_device_mounted(dev): + log('Looks like {} is in use, skipping.'.format(dev)) + return + + if is_active_bluestore_device(dev): + log('{} is in use as an active bluestore block device,' + ' skipping.'.format(dev)) + osd_devices.append(dev) + return + + if is_mapped_luks_device(dev): + log('{} is a mapped LUKS device,' + ' skipping.'.format(dev)) + return + + if cmp_pkgrevno('ceph', '12.2.4') >= 0: + cmd = _ceph_volume(dev, + osd_journal, + encrypt, + bluestore, + key_manager) + else: + cmd = _ceph_disk(dev, + osd_format, + osd_journal, + encrypt, + bluestore) + + try: + status_set('maintenance', 'Initializing device {}'.format(dev)) + log("osdize cmd: {}".format(cmd)) + subprocess.check_call(cmd) + except subprocess.CalledProcessError: + try: + lsblk_output = subprocess.check_output( + ['lsblk', '-P']).decode('UTF-8') + except subprocess.CalledProcessError as e: + log("Couldn't get lsblk output: {}".format(e), ERROR) + if ignore_errors: + log('Unable to initialize device: {}'.format(dev), WARNING) + if lsblk_output: + log('lsblk output: {}'.format(lsblk_output), DEBUG) + else: + log('Unable to initialize device: {}'.format(dev), ERROR) + if lsblk_output: + log('lsblk output: {}'.format(lsblk_output), WARNING) + raise + + # NOTE: Record processing of device only on success to ensure that + # the charm only tries to initialize a device of OSD usage + # once during its lifetime. + osd_devices.append(dev) + finally: + db.set('osd-devices', osd_devices) + db.flush() + + +def _ceph_disk(dev, osd_format, osd_journal, encrypt=False, bluestore=False): + """ + Prepare a device for usage as a Ceph OSD using ceph-disk + + :param: dev: Full path to use for OSD block device setup, + The function looks up realpath of the device + :param: osd_journal: List of block devices to use for OSD journals + :param: encrypt: Use block device encryption (unsupported) + :param: bluestore: Use bluestore storage for OSD + :returns: list. 'ceph-disk' command and required parameters for + execution by check_call + """ + cmd = ['ceph-disk', 'prepare'] + + if encrypt: + cmd.append('--dmcrypt') + + if osd_format and not bluestore: + cmd.append('--fs-type') + cmd.append(osd_format) + + # NOTE(jamespage): enable experimental bluestore support + if cmp_pkgrevno('ceph', '10.2.0') >= 0 and bluestore: + cmd.append('--bluestore') + wal = get_devices('bluestore-wal') + if wal: + cmd.append('--block.wal') + least_used_wal = find_least_used_utility_device(wal) + cmd.append(least_used_wal) + db = get_devices('bluestore-db') + if db: + cmd.append('--block.db') + least_used_db = find_least_used_utility_device(db) + cmd.append(least_used_db) + elif cmp_pkgrevno('ceph', '12.1.0') >= 0 and not bluestore: + cmd.append('--filestore') + + cmd.append(os.path.realpath(dev)) + + if osd_journal: + least_used = find_least_used_utility_device(osd_journal) + cmd.append(least_used) + + return cmd + + +def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False, + key_manager=CEPH_KEY_MANAGER): + """ + Prepare and activate a device for usage as a Ceph OSD using ceph-volume. + + This also includes creation of all PV's, VG's and LV's required to + support the initialization of the OSD. + + :param: dev: Full path to use for OSD block device setup + :param: osd_journal: List of block devices to use for OSD journals + :param: encrypt: Use block device encryption + :param: bluestore: Use bluestore storage for OSD + :param: key_manager: dm-crypt Key Manager to use + :raises subprocess.CalledProcessError: in the event that any supporting + LVM operation failed. + :returns: list. 'ceph-volume' command and required parameters for + execution by check_call + """ + cmd = ['ceph-volume', 'lvm', 'create'] + + osd_fsid = str(uuid.uuid4()) + cmd.append('--osd-fsid') + cmd.append(osd_fsid) + + if bluestore: + cmd.append('--bluestore') + main_device_type = 'block' + else: + cmd.append('--filestore') + main_device_type = 'data' + + if encrypt and key_manager == CEPH_KEY_MANAGER: + cmd.append('--dmcrypt') + + # On-disk journal volume creation + if not osd_journal and not bluestore: + journal_lv_type = 'journal' + cmd.append('--journal') + cmd.append(_allocate_logical_volume( + dev=dev, + lv_type=journal_lv_type, + osd_fsid=osd_fsid, + size='{}M'.format(calculate_volume_size('journal')), + encrypt=encrypt, + key_manager=key_manager) + ) + + cmd.append('--data') + cmd.append(_allocate_logical_volume(dev=dev, + lv_type=main_device_type, + osd_fsid=osd_fsid, + encrypt=encrypt, + key_manager=key_manager)) + + if bluestore: + for extra_volume in ('wal', 'db'): + devices = get_devices('bluestore-{}'.format(extra_volume)) + if devices: + cmd.append('--block.{}'.format(extra_volume)) + least_used = find_least_used_utility_device(devices, + lvs=True) + cmd.append(_allocate_logical_volume( + dev=least_used, + lv_type=extra_volume, + osd_fsid=osd_fsid, + size='{}M'.format(calculate_volume_size(extra_volume)), + shared=True, + encrypt=encrypt, + key_manager=key_manager) + ) + + elif osd_journal: + cmd.append('--journal') + least_used = find_least_used_utility_device(osd_journal, + lvs=True) + cmd.append(_allocate_logical_volume( + dev=least_used, + lv_type='journal', + osd_fsid=osd_fsid, + size='{}M'.format(calculate_volume_size('journal')), + shared=True, + encrypt=encrypt, + key_manager=key_manager) + ) + + return cmd + + +def _partition_name(dev): + """ + Derive the first partition name for a block device + + :param: dev: Full path to block device. + :returns: str: Full path to first partition on block device. + """ + if dev[-1].isdigit(): + return '{}p1'.format(dev) + else: + return '{}1'.format(dev) + + +def is_active_bluestore_device(dev): + """ + Determine whether provided device is part of an active + bluestore based OSD (as its block component). + + :param: dev: Full path to block device to check for Bluestore usage. + :returns: boolean: indicating whether device is in active use. + """ + if not lvm.is_lvm_physical_volume(dev): + return False + + vg_name = lvm.list_lvm_volume_group(dev) + lv_name = lvm.list_logical_volumes('vg_name={}'.format(vg_name))[0] + + block_symlinks = glob.glob('/var/lib/ceph/osd/ceph-*/block') + for block_candidate in block_symlinks: + if os.path.islink(block_candidate): + target = os.readlink(block_candidate) + if target.endswith(lv_name): + return True + + return False + + +def is_luks_device(dev): + """ + Determine if dev is a LUKS-formatted block device. + + :param: dev: A full path to a block device to check for LUKS header + presence + :returns: boolean: indicates whether a device is used based on LUKS header. + """ + return True if _luks_uuid(dev) else False + + +def is_mapped_luks_device(dev): + """ + Determine if dev is a mapped LUKS device + :param: dev: A full path to a block device to be checked + :returns: boolean: indicates whether a device is mapped + """ + _, dirs, _ = next(os.walk( + '/sys/class/block/{}/holders/' + .format(os.path.basename(os.path.realpath(dev)))) + ) + is_held = len(dirs) > 0 + return is_held and is_luks_device(dev) + + +def get_conf(variable): + """ + Get the value of the given configuration variable from the + cluster. + + :param variable: ceph configuration variable + :returns: str. configured value for provided variable + + """ + return subprocess.check_output([ + 'ceph-osd', + '--show-config-value={}'.format(variable), + '--no-mon-config', + ]).strip() + + +def calculate_volume_size(lv_type): + """ + Determine the configured size for Bluestore DB/WAL or + Filestore Journal devices + + :param lv_type: volume type (db, wal or journal) + :raises KeyError: if invalid lv_type is supplied + :returns: int. Configured size in megabytes for volume type + """ + # lv_type -> ceph configuration option + _config_map = { + 'db': 'bluestore_block_db_size', + 'wal': 'bluestore_block_wal_size', + 'journal': 'osd_journal_size', + } + + # default sizes in MB + _default_size = { + 'db': 1024, + 'wal': 576, + 'journal': 1024, + } + + # conversion of ceph config units to MB + _units = { + 'db': 1048576, # Bytes -> MB + 'wal': 1048576, # Bytes -> MB + 'journal': 1, # Already in MB + } + + configured_size = get_conf(_config_map[lv_type]) + + if configured_size is None or int(configured_size) == 0: + return _default_size[lv_type] + else: + return int(configured_size) / _units[lv_type] + + +def _luks_uuid(dev): + """ + Check to see if dev is a LUKS encrypted volume, returning the UUID + of volume if it is. + + :param: dev: path to block device to check. + :returns: str. UUID of LUKS device or None if not a LUKS device + """ + try: + cmd = ['cryptsetup', 'luksUUID', dev] + return subprocess.check_output(cmd).decode('UTF-8').strip() + except subprocess.CalledProcessError: + return None + + +def _initialize_disk(dev, dev_uuid, encrypt=False, + key_manager=CEPH_KEY_MANAGER): + """ + Initialize a raw block device consuming 100% of the avaliable + disk space. + + Function assumes that block device has already been wiped. + + :param: dev: path to block device to initialize + :param: dev_uuid: UUID to use for any dm-crypt operations + :param: encrypt: Encrypt OSD devices using dm-crypt + :param: key_manager: Key management approach for dm-crypt keys + :raises: subprocess.CalledProcessError: if any parted calls fail + :returns: str: Full path to new partition. + """ + use_vaultlocker = encrypt and key_manager == VAULT_KEY_MANAGER + + if use_vaultlocker: + # NOTE(jamespage): Check to see if already initialized as a LUKS + # volume, which indicates this is a shared block + # device for journal, db or wal volumes. + luks_uuid = _luks_uuid(dev) + if luks_uuid: + return '/dev/mapper/crypt-{}'.format(luks_uuid) + + dm_crypt = '/dev/mapper/crypt-{}'.format(dev_uuid) + + if use_vaultlocker and not os.path.exists(dm_crypt): + subprocess.check_call([ + 'vaultlocker', + 'encrypt', + '--uuid', dev_uuid, + dev, + ]) + subprocess.check_call([ + 'dd', + 'if=/dev/zero', + 'of={}'.format(dm_crypt), + 'bs=512', + 'count=1', + ]) + + if use_vaultlocker: + return dm_crypt + else: + return dev + + +def _allocate_logical_volume(dev, lv_type, osd_fsid, + size=None, shared=False, + encrypt=False, + key_manager=CEPH_KEY_MANAGER): + """ + Allocate a logical volume from a block device, ensuring any + required initialization and setup of PV's and VG's to support + the LV. + + :param: dev: path to block device to allocate from. + :param: lv_type: logical volume type to create + (data, block, journal, wal, db) + :param: osd_fsid: UUID of the OSD associate with the LV + :param: size: Size in LVM format for the device; + if unset 100% of VG + :param: shared: Shared volume group (journal, wal, db) + :param: encrypt: Encrypt OSD devices using dm-crypt + :param: key_manager: dm-crypt Key Manager to use + :raises subprocess.CalledProcessError: in the event that any supporting + LVM or parted operation fails. + :returns: str: String in the format 'vg_name/lv_name'. + """ + lv_name = "osd-{}-{}".format(lv_type, osd_fsid) + current_volumes = lvm.list_logical_volumes() + if shared: + dev_uuid = str(uuid.uuid4()) + else: + dev_uuid = osd_fsid + pv_dev = _initialize_disk(dev, dev_uuid, encrypt, key_manager) + + vg_name = None + if not lvm.is_lvm_physical_volume(pv_dev): + lvm.create_lvm_physical_volume(pv_dev) + if shared: + vg_name = 'ceph-{}-{}'.format(lv_type, + str(uuid.uuid4())) + else: + vg_name = 'ceph-{}'.format(osd_fsid) + lvm.create_lvm_volume_group(vg_name, pv_dev) + else: + vg_name = lvm.list_lvm_volume_group(pv_dev) + + if lv_name not in current_volumes: + lvm.create_logical_volume(lv_name, vg_name, size) + + return "{}/{}".format(vg_name, lv_name) + + +def osdize_dir(path, encrypt=False, bluestore=False): + """Ask ceph-disk to prepare a directory to become an osd. + + :param path: str. The directory to osdize + :param encrypt: bool. Should the OSD directory be encrypted at rest + :returns: None + """ + + db = kv() + osd_devices = db.get('osd-devices', []) + if path in osd_devices: + log('Device {} already processed by charm,' + ' skipping'.format(path)) + return + + if os.path.exists(os.path.join(path, 'upstart')): + log('Path {} is already configured as an OSD - bailing'.format(path)) + return + + if cmp_pkgrevno('ceph', "0.56.6") < 0: + log('Unable to use directories for OSDs with ceph < 0.56.6', + level=ERROR) + return + + mkdir(path, owner=ceph_user(), group=ceph_user(), perms=0o755) + chownr('/var/lib/ceph', ceph_user(), ceph_user()) + cmd = [ + 'sudo', '-u', ceph_user(), + 'ceph-disk', + 'prepare', + '--data-dir', + path + ] + if cmp_pkgrevno('ceph', '0.60') >= 0: + if encrypt: + cmd.append('--dmcrypt') + + # NOTE(icey): enable experimental bluestore support + if cmp_pkgrevno('ceph', '10.2.0') >= 0 and bluestore: + cmd.append('--bluestore') + elif cmp_pkgrevno('ceph', '12.1.0') >= 0 and not bluestore: + cmd.append('--filestore') + log("osdize dir cmd: {}".format(cmd)) + subprocess.check_call(cmd) + + # NOTE: Record processing of device only on success to ensure that + # the charm only tries to initialize a device of OSD usage + # once during its lifetime. + osd_devices.append(path) + db.set('osd-devices', osd_devices) + db.flush() + + +def filesystem_mounted(fs): + return subprocess.call(['grep', '-wqs', fs, '/proc/mounts']) == 0 + + +def get_running_osds(): + """Returns a list of the pids of the current running OSD daemons""" + cmd = ['pgrep', 'ceph-osd'] + try: + result = str(subprocess.check_output(cmd).decode('UTF-8')) + return result.split() + except subprocess.CalledProcessError: + return [] + + +def get_cephfs(service): + """List the Ceph Filesystems that exist. + + :param service: The service name to run the ceph command under + :returns: list. Returns a list of the ceph filesystems + """ + if get_version() < 0.86: + # This command wasn't introduced until 0.86 ceph + return [] + try: + output = str(subprocess + .check_output(["ceph", '--id', service, "fs", "ls"]) + .decode('UTF-8')) + if not output: + return [] + """ + Example subprocess output: + 'name: ip-172-31-23-165, metadata pool: ip-172-31-23-165_metadata, + data pools: [ip-172-31-23-165_data ]\n' + output: filesystems: ['ip-172-31-23-165'] + """ + filesystems = [] + for line in output.splitlines(): + parts = line.split(',') + for part in parts: + if "name" in part: + filesystems.append(part.split(' ')[1]) + except subprocess.CalledProcessError: + return [] + + +def wait_for_all_monitors_to_upgrade(new_version, upgrade_key): + """Fairly self explanatory name. This function will wait + for all monitors in the cluster to upgrade or it will + return after a timeout period has expired. + + :param new_version: str of the version to watch + :param upgrade_key: the cephx key name to use + """ + done = False + start_time = time.time() + monitor_list = [] + + mon_map = get_mon_map('admin') + if mon_map['monmap']['mons']: + for mon in mon_map['monmap']['mons']: + monitor_list.append(mon['name']) + while not done: + try: + done = all(monitor_key_exists(upgrade_key, "{}_{}_{}_done".format( + "mon", mon, new_version + )) for mon in monitor_list) + current_time = time.time() + if current_time > (start_time + 10 * 60): + raise Exception + else: + # Wait 30 seconds and test again if all monitors are upgraded + time.sleep(30) + except subprocess.CalledProcessError: + raise + + +# Edge cases: +# 1. Previous node dies on upgrade, can we retry? +def roll_monitor_cluster(new_version, upgrade_key): + """This is tricky to get right so here's what we're going to do. + + There's 2 possible cases: Either I'm first in line or not. + If I'm not first in line I'll wait a random time between 5-30 seconds + and test to see if the previous monitor is upgraded yet. + + :param new_version: str of the version to upgrade to + :param upgrade_key: the cephx key name to use when upgrading + """ + log('roll_monitor_cluster called with {}'.format(new_version)) + my_name = socket.gethostname() + monitor_list = [] + mon_map = get_mon_map('admin') + if mon_map['monmap']['mons']: + for mon in mon_map['monmap']['mons']: + monitor_list.append(mon['name']) + else: + status_set('blocked', 'Unable to get monitor cluster information') + sys.exit(1) + log('monitor_list: {}'.format(monitor_list)) + + # A sorted list of osd unit names + mon_sorted_list = sorted(monitor_list) + + try: + position = mon_sorted_list.index(my_name) + log("upgrade position: {}".format(position)) + if position == 0: + # I'm first! Roll + # First set a key to inform others I'm about to roll + lock_and_roll(upgrade_key=upgrade_key, + service='mon', + my_name=my_name, + version=new_version) + else: + # Check if the previous node has finished + status_set('waiting', + 'Waiting on {} to finish upgrading'.format( + mon_sorted_list[position - 1])) + wait_on_previous_node(upgrade_key=upgrade_key, + service='mon', + previous_node=mon_sorted_list[position - 1], + version=new_version) + lock_and_roll(upgrade_key=upgrade_key, + service='mon', + my_name=my_name, + version=new_version) + # NOTE(jamespage): + # Wait until all monitors have upgraded before bootstrapping + # the ceph-mgr daemons due to use of new mgr keyring profiles + if new_version == 'luminous': + wait_for_all_monitors_to_upgrade(new_version=new_version, + upgrade_key=upgrade_key) + bootstrap_manager() + except ValueError: + log("Failed to find {} in list {}.".format( + my_name, mon_sorted_list)) + status_set('blocked', 'failed to upgrade monitor') + + +# TODO(jamespage): +# Mimic support will need to ensure that ceph-mgr daemons are also +# restarted during upgrades - probably through use of one of the +# high level systemd targets shipped by the packaging. +def upgrade_monitor(new_version): + """Upgrade the current ceph monitor to the new version + + :param new_version: String version to upgrade to. + """ + current_version = get_version() + status_set("maintenance", "Upgrading monitor") + log("Current ceph version is {}".format(current_version)) + log("Upgrading to: {}".format(new_version)) + + try: + add_source(config('source'), config('key')) + apt_update(fatal=True) + except subprocess.CalledProcessError as err: + log("Adding the ceph source failed with message: {}".format( + err)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) + try: + if systemd(): + service_stop('ceph-mon') + else: + service_stop('ceph-mon-all') + apt_install(packages=determine_packages(), fatal=True) + + owner = ceph_user() + + # Ensure the files and directories under /var/lib/ceph is chowned + # properly as part of the move to the Jewel release, which moved the + # ceph daemons to running as ceph:ceph instead of root:root. + if new_version == 'jewel': + # Ensure the ownership of Ceph's directories is correct + chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), + owner=owner, + group=owner, + follow_links=True) + + # Ensure that mon directory is user writable + hostname = socket.gethostname() + path = '/var/lib/ceph/mon/ceph-{}'.format(hostname) + mkdir(path, owner=ceph_user(), group=ceph_user(), + perms=0o755) + + if systemd(): + service_start('ceph-mon') + else: + service_start('ceph-mon-all') + except subprocess.CalledProcessError as err: + log("Stopping ceph and upgrading packages failed " + "with message: {}".format(err)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) + + +def lock_and_roll(upgrade_key, service, my_name, version): + """Create a lock on the ceph monitor cluster and upgrade. + + :param upgrade_key: str. The cephx key to use + :param service: str. The cephx id to use + :param my_name: str. The current hostname + :param version: str. The version we are upgrading to + """ + start_timestamp = time.time() + + log('monitor_key_set {}_{}_{}_start {}'.format( + service, + my_name, + version, + start_timestamp)) + monitor_key_set(upgrade_key, "{}_{}_{}_start".format( + service, my_name, version), start_timestamp) + log("Rolling") + + # This should be quick + if service == 'osd': + upgrade_osd(version) + elif service == 'mon': + upgrade_monitor(version) + else: + log("Unknown service {}. Unable to upgrade".format(service), + level=ERROR) + log("Done") + + stop_timestamp = time.time() + # Set a key to inform others I am finished + log('monitor_key_set {}_{}_{}_done {}'.format(service, + my_name, + version, + stop_timestamp)) + status_set('maintenance', 'Finishing upgrade') + monitor_key_set(upgrade_key, "{}_{}_{}_done".format(service, + my_name, + version), + stop_timestamp) + + +def wait_on_previous_node(upgrade_key, service, previous_node, version): + """A lock that sleeps the current thread while waiting for the previous + node to finish upgrading. + + :param upgrade_key: + :param service: str. the cephx id to use + :param previous_node: str. The name of the previous node to wait on + :param version: str. The version we are upgrading to + :returns: None + """ + log("Previous node is: {}".format(previous_node)) + + previous_node_finished = monitor_key_exists( + upgrade_key, + "{}_{}_{}_done".format(service, previous_node, version)) + + while previous_node_finished is False: + log("{} is not finished. Waiting".format(previous_node)) + # Has this node been trying to upgrade for longer than + # 10 minutes? + # If so then move on and consider that node dead. + + # NOTE: This assumes the clusters clocks are somewhat accurate + # If the hosts clock is really far off it may cause it to skip + # the previous node even though it shouldn't. + current_timestamp = time.time() + previous_node_start_time = monitor_key_get( + upgrade_key, + "{}_{}_{}_start".format(service, previous_node, version)) + if (previous_node_start_time is not None and + ((current_timestamp - (10 * 60)) > + float(previous_node_start_time))): + # NOTE(jamespage): + # Previous node is probably dead as we've been waiting + # for 10 minutes - lets move on and upgrade + log("Waited 10 mins on node {}. current time: {} > " + "previous node start time: {} Moving on".format( + previous_node, + (current_timestamp - (10 * 60)), + previous_node_start_time)) + return + # NOTE(jamespage) + # Previous node has not started, or started less than + # 10 minutes ago - sleep a random amount of time and + # then check again. + wait_time = random.randrange(5, 30) + log('waiting for {} seconds'.format(wait_time)) + time.sleep(wait_time) + previous_node_finished = monitor_key_exists( + upgrade_key, + "{}_{}_{}_done".format(service, previous_node, version)) + + +def get_upgrade_position(osd_sorted_list, match_name): + """Return the upgrade position for the given osd. + + :param osd_sorted_list: list. Osds sorted + :param match_name: str. The osd name to match + :returns: int. The position or None if not found + """ + for index, item in enumerate(osd_sorted_list): + if item.name == match_name: + return index + return None + + +# Edge cases: +# 1. Previous node dies on upgrade, can we retry? +# 2. This assumes that the osd failure domain is not set to osd. +# It rolls an entire server at a time. +def roll_osd_cluster(new_version, upgrade_key): + """This is tricky to get right so here's what we're going to do. + + There's 2 possible cases: Either I'm first in line or not. + If I'm not first in line I'll wait a random time between 5-30 seconds + and test to see if the previous osd is upgraded yet. + + TODO: If you're not in the same failure domain it's safe to upgrade + 1. Examine all pools and adopt the most strict failure domain policy + Example: Pool 1: Failure domain = rack + Pool 2: Failure domain = host + Pool 3: Failure domain = row + + outcome: Failure domain = host + + :param new_version: str of the version to upgrade to + :param upgrade_key: the cephx key name to use when upgrading + """ + log('roll_osd_cluster called with {}'.format(new_version)) + my_name = socket.gethostname() + osd_tree = get_osd_tree(service=upgrade_key) + # A sorted list of osd unit names + osd_sorted_list = sorted(osd_tree) + log("osd_sorted_list: {}".format(osd_sorted_list)) + + try: + position = get_upgrade_position(osd_sorted_list, my_name) + log("upgrade position: {}".format(position)) + if position == 0: + # I'm first! Roll + # First set a key to inform others I'm about to roll + lock_and_roll(upgrade_key=upgrade_key, + service='osd', + my_name=my_name, + version=new_version) + else: + # Check if the previous node has finished + status_set('waiting', + 'Waiting on {} to finish upgrading'.format( + osd_sorted_list[position - 1].name)) + wait_on_previous_node( + upgrade_key=upgrade_key, + service='osd', + previous_node=osd_sorted_list[position - 1].name, + version=new_version) + lock_and_roll(upgrade_key=upgrade_key, + service='osd', + my_name=my_name, + version=new_version) + except ValueError: + log("Failed to find name {} in list {}".format( + my_name, osd_sorted_list)) + status_set('blocked', 'failed to upgrade osd') + + +def upgrade_osd(new_version): + """Upgrades the current osd + + :param new_version: str. The new version to upgrade to + """ + current_version = get_version() + status_set("maintenance", "Upgrading osd") + log("Current ceph version is {}".format(current_version)) + log("Upgrading to: {}".format(new_version)) + + try: + add_source(config('source'), config('key')) + apt_update(fatal=True) + except subprocess.CalledProcessError as err: + log("Adding the ceph sources failed with message: {}".format( + err)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) + + try: + # Upgrade the packages before restarting the daemons. + status_set('maintenance', 'Upgrading packages to %s' % new_version) + apt_install(packages=determine_packages(), fatal=True) + + # If the upgrade does not need an ownership update of any of the + # directories in the osd service directory, then simply restart + # all of the OSDs at the same time as this will be the fastest + # way to update the code on the node. + if not dirs_need_ownership_update('osd'): + log('Restarting all OSDs to load new binaries', DEBUG) + if systemd(): + service_restart('ceph-osd.target') + else: + service_restart('ceph-osd-all') + return + + # Need to change the ownership of all directories which are not OSD + # directories as well. + # TODO - this should probably be moved to the general upgrade function + # and done before mon/osd. + update_owner(CEPH_BASE_DIR, recurse_dirs=False) + non_osd_dirs = filter(lambda x: not x == 'osd', + os.listdir(CEPH_BASE_DIR)) + non_osd_dirs = map(lambda x: os.path.join(CEPH_BASE_DIR, x), + non_osd_dirs) + for path in non_osd_dirs: + update_owner(path) + + # Fast service restart wasn't an option because each of the OSD + # directories need the ownership updated for all the files on + # the OSD. Walk through the OSDs one-by-one upgrading the OSD. + for osd_dir in _get_child_dirs(OSD_BASE_DIR): + try: + osd_num = _get_osd_num_from_dirname(osd_dir) + _upgrade_single_osd(osd_num, osd_dir) + except ValueError as ex: + # Directory could not be parsed - junk directory? + log('Could not parse osd directory %s: %s' % (osd_dir, ex), + WARNING) + continue + + except (subprocess.CalledProcessError, IOError) as err: + log("Stopping ceph and upgrading packages failed " + "with message: {}".format(err)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) + + +def _upgrade_single_osd(osd_num, osd_dir): + """Upgrades the single OSD directory. + + :param osd_num: the num of the OSD + :param osd_dir: the directory of the OSD to upgrade + :raises CalledProcessError: if an error occurs in a command issued as part + of the upgrade process + :raises IOError: if an error occurs reading/writing to a file as part + of the upgrade process + """ + stop_osd(osd_num) + disable_osd(osd_num) + update_owner(osd_dir) + enable_osd(osd_num) + start_osd(osd_num) + + +def stop_osd(osd_num): + """Stops the specified OSD number. + + :param osd_num: the osd number to stop + """ + if systemd(): + service_stop('ceph-osd@{}'.format(osd_num)) + else: + service_stop('ceph-osd', id=osd_num) + + +def start_osd(osd_num): + """Starts the specified OSD number. + + :param osd_num: the osd number to start. + """ + if systemd(): + service_start('ceph-osd@{}'.format(osd_num)) + else: + service_start('ceph-osd', id=osd_num) + + +def disable_osd(osd_num): + """Disables the specified OSD number. + + Ensures that the specified osd will not be automatically started at the + next reboot of the system. Due to differences between init systems, + this method cannot make any guarantees that the specified osd cannot be + started manually. + + :param osd_num: the osd id which should be disabled. + :raises CalledProcessError: if an error occurs invoking the systemd cmd + to disable the OSD + :raises IOError, OSError: if the attempt to read/remove the ready file in + an upstart enabled system fails + """ + if systemd(): + # When running under systemd, the individual ceph-osd daemons run as + # templated units and can be directly addressed by referring to the + # templated service name ceph-osd@. Additionally, systemd + # allows one to disable a specific templated unit by running the + # 'systemctl disable ceph-osd@' command. When disabled, the + # OSD should remain disabled until re-enabled via systemd. + # Note: disabling an already disabled service in systemd returns 0, so + # no need to check whether it is enabled or not. + cmd = ['systemctl', 'disable', 'ceph-osd@{}'.format(osd_num)] + subprocess.check_call(cmd) + else: + # Neither upstart nor the ceph-osd upstart script provides for + # disabling the starting of an OSD automatically. The specific OSD + # cannot be prevented from running manually, however it can be + # prevented from running automatically on reboot by removing the + # 'ready' file in the OSD's root directory. This is due to the + # ceph-osd-all upstart script checking for the presence of this file + # before starting the OSD. + ready_file = os.path.join(OSD_BASE_DIR, 'ceph-{}'.format(osd_num), + 'ready') + if os.path.exists(ready_file): + os.unlink(ready_file) + + +def enable_osd(osd_num): + """Enables the specified OSD number. + + Ensures that the specified osd_num will be enabled and ready to start + automatically in the event of a reboot. + + :param osd_num: the osd id which should be enabled. + :raises CalledProcessError: if the call to the systemd command issued + fails when enabling the service + :raises IOError: if the attempt to write the ready file in an usptart + enabled system fails + """ + if systemd(): + cmd = ['systemctl', 'enable', 'ceph-osd@{}'.format(osd_num)] + subprocess.check_call(cmd) + else: + # When running on upstart, the OSDs are started via the ceph-osd-all + # upstart script which will only start the osd if it has a 'ready' + # file. Make sure that file exists. + ready_file = os.path.join(OSD_BASE_DIR, 'ceph-{}'.format(osd_num), + 'ready') + with open(ready_file, 'w') as f: + f.write('ready') + + # Make sure the correct user owns the file. It shouldn't be necessary + # as the upstart script should run with root privileges, but its better + # to have all the files matching ownership. + update_owner(ready_file) + + +def update_owner(path, recurse_dirs=True): + """Changes the ownership of the specified path. + + Changes the ownership of the specified path to the new ceph daemon user + using the system's native chown functionality. This may take awhile, + so this method will issue a set_status for any changes of ownership which + recurses into directory structures. + + :param path: the path to recursively change ownership for + :param recurse_dirs: boolean indicating whether to recursively change the + ownership of all the files in a path's subtree or to + simply change the ownership of the path. + :raises CalledProcessError: if an error occurs issuing the chown system + command + """ + user = ceph_user() + user_group = '{ceph_user}:{ceph_user}'.format(ceph_user=user) + cmd = ['chown', user_group, path] + if os.path.isdir(path) and recurse_dirs: + status_set('maintenance', ('Updating ownership of %s to %s' % + (path, user))) + cmd.insert(1, '-R') + + log('Changing ownership of {path} to {user}'.format( + path=path, user=user_group), DEBUG) + start = datetime.now() + subprocess.check_call(cmd) + elapsed_time = (datetime.now() - start) + + log('Took {secs} seconds to change the ownership of path: {path}'.format( + secs=elapsed_time.total_seconds(), path=path), DEBUG) + + +def list_pools(service): + """This will list the current pools that Ceph has + + :param service: String service id to run under + :returns: list. Returns a list of the ceph pools. + :raises: CalledProcessError if the subprocess fails to run. + """ + try: + pool_list = [] + pools = str(subprocess + .check_output(['rados', '--id', service, 'lspools']) + .decode('UTF-8')) + for pool in pools.splitlines(): + pool_list.append(pool) + return pool_list + except subprocess.CalledProcessError as err: + log("rados lspools failed with error: {}".format(err.output)) + raise + + +def dirs_need_ownership_update(service): + """Determines if directories still need change of ownership. + + Examines the set of directories under the /var/lib/ceph/{service} directory + and determines if they have the correct ownership or not. This is + necessary due to the upgrade from Hammer to Jewel where the daemon user + changes from root: to ceph:. + + :param service: the name of the service folder to check (e.g. osd, mon) + :returns: boolean. True if the directories need a change of ownership, + False otherwise. + :raises IOError: if an error occurs reading the file stats from one of + the child directories. + :raises OSError: if the specified path does not exist or some other error + """ + expected_owner = expected_group = ceph_user() + path = os.path.join(CEPH_BASE_DIR, service) + for child in _get_child_dirs(path): + curr_owner, curr_group = owner(child) + + if (curr_owner == expected_owner) and (curr_group == expected_group): + continue + + log('Directory "%s" needs its ownership updated' % child, DEBUG) + return True + + # All child directories had the expected ownership + return False + +# A dict of valid ceph upgrade paths. Mapping is old -> new +UPGRADE_PATHS = collections.OrderedDict([ + ('firefly', 'hammer'), + ('hammer', 'jewel'), + ('jewel', 'luminous'), + ('luminous', 'mimic'), +]) + +# Map UCA codenames to ceph codenames +UCA_CODENAME_MAP = { + 'icehouse': 'firefly', + 'juno': 'firefly', + 'kilo': 'hammer', + 'liberty': 'hammer', + 'mitaka': 'jewel', + 'newton': 'jewel', + 'ocata': 'jewel', + 'pike': 'luminous', + 'queens': 'luminous', + 'rocky': 'mimic', + 'stein': 'mimic', +} + + +def pretty_print_upgrade_paths(): + """Pretty print supported upgrade paths for ceph""" + return ["{} -> {}".format(key, value) + for key, value in UPGRADE_PATHS.items()] + + +def resolve_ceph_version(source): + """Resolves a version of ceph based on source configuration + based on Ubuntu Cloud Archive pockets. + + @param: source: source configuration option of charm + :returns: ceph release codename or None if not resolvable + """ + os_release = get_os_codename_install_source(source) + return UCA_CODENAME_MAP.get(os_release) + + +def get_ceph_pg_stat(): + """Returns the result of ceph pg stat. + + :returns: dict + """ + try: + tree = str(subprocess + .check_output(['ceph', 'pg', 'stat', '--format=json']) + .decode('UTF-8')) + try: + json_tree = json.loads(tree) + if not json_tree['num_pg_by_state']: + return None + return json_tree + except ValueError as v: + log("Unable to parse ceph pg stat json: {}. Error: {}".format( + tree, v)) + raise + except subprocess.CalledProcessError as e: + log("ceph pg stat command failed with message: {}".format(e)) + raise + + +def get_ceph_health(): + """Returns the health of the cluster from a 'ceph status' + + :returns: dict tree of ceph status + :raises: CalledProcessError if our ceph command fails to get the overall + status, use get_ceph_health()['overall_status']. + """ + try: + tree = str(subprocess + .check_output(['ceph', 'status', '--format=json']) + .decode('UTF-8')) + try: + json_tree = json.loads(tree) + # Make sure children are present in the json + if not json_tree['overall_status']: + return None + + return json_tree + except ValueError as v: + log("Unable to parse ceph tree json: {}. Error: {}".format( + tree, v)) + raise + except subprocess.CalledProcessError as e: + log("ceph status command failed with message: {}".format(e)) + raise + + +def reweight_osd(osd_num, new_weight): + """Changes the crush weight of an OSD to the value specified. + + :param osd_num: the osd id which should be changed + :param new_weight: the new weight for the OSD + :returns: bool. True if output looks right, else false. + :raises CalledProcessError: if an error occurs invoking the systemd cmd + """ + try: + cmd_result = str(subprocess + .check_output(['ceph', 'osd', 'crush', + 'reweight', "osd.{}".format(osd_num), + new_weight], + stderr=subprocess.STDOUT) + .decode('UTF-8')) + expected_result = "reweighted item id {ID} name \'osd.{ID}\'".format( + ID=osd_num) + " to {}".format(new_weight) + log(cmd_result) + if expected_result in cmd_result: + return True + return False + except subprocess.CalledProcessError as e: + log("ceph osd crush reweight command failed" + " with message: {}".format(e)) + raise + + +def determine_packages(): + """Determines packages for installation. + + :returns: list of ceph packages + """ + return PACKAGES + + +def bootstrap_manager(): + hostname = socket.gethostname() + path = '/var/lib/ceph/mgr/ceph-{}'.format(hostname) + keyring = os.path.join(path, 'keyring') + + if os.path.exists(keyring): + log('bootstrap_manager: mgr already initialized.') + else: + mkdir(path, owner=ceph_user(), group=ceph_user()) + subprocess.check_call(['ceph', 'auth', 'get-or-create', + 'mgr.{}'.format(hostname), 'mon', + 'allow profile mgr', 'osd', 'allow *', + 'mds', 'allow *', '--out-file', + keyring]) + chownr(path, ceph_user(), ceph_user()) + + unit = 'ceph-mgr@{}'.format(hostname) + subprocess.check_call(['systemctl', 'enable', unit]) + service_restart(unit) + + +def osd_noout(enable): + """Sets or unsets 'noout' + + :param enable: bool. True to set noout, False to unset. + :returns: bool. True if output looks right. + :raises CalledProcessError: if an error occurs invoking the systemd cmd + """ + operation = { + True: 'set', + False: 'unset', + } + try: + subprocess.check_call(['ceph', '--id', 'admin', + 'osd', operation[enable], + 'noout']) + log('running ceph osd {} noout'.format(operation[enable])) + return True + except subprocess.CalledProcessError as e: + log(e) + raise diff --git a/unit_tests/test_ceph.py b/unit_tests/test_ceph.py index 044d24e2..e13d7da1 100644 --- a/unit_tests/test_ceph.py +++ b/unit_tests/test_ceph.py @@ -23,7 +23,7 @@ mock_apt.apt_pkg = MagicMock() sys.modules['apt'] = mock_apt sys.modules['apt_pkg'] = mock_apt.apt_pkg -import ceph # noqa +import ceph_rgw as ceph # noqa import utils # noqa from test_utils import CharmTestCase # noqa diff --git a/unit_tests/test_hooks.py b/unit_tests/test_hooks.py index 7a9e2675..a6ddb431 100644 --- a/unit_tests/test_hooks.py +++ b/unit_tests/test_hooks.py @@ -13,7 +13,7 @@ # limitations under the License. from mock import ( - patch, call + patch, call, MagicMock ) from test_utils import ( @@ -63,6 +63,9 @@ TO_PATCH = [ 'request_per_unit_key', 'get_certificate_request', 'process_certificates', + 'filter_installed_packages', + 'filter_missing_packages', + 'ceph_utils', ] @@ -78,12 +81,69 @@ class CephRadosGWTests(CharmTestCase): self.service_name.return_value = 'radosgw' self.request_per_unit_key.return_value = False self.systemd_based_radosgw.return_value = False + self.filter_installed_packages.side_effect = lambda pkgs: pkgs + self.filter_missing_packages.side_effect = lambda pkgs: pkgs - def test_install_packages(self): + def test_upgrade_available(self): + _vers = { + 'distro': 'luminous', + 'cloud:bionic-rocky': 'mimic', + } + mock_config = MagicMock() + self.test_config.set('source', 'cloud:bionic-rocky') + mock_config.get.side_effect = self.test_config.get + mock_config.previous.return_value = 'distro' + self.config.side_effect = None + self.config.return_value = mock_config + self.ceph_utils.UPGRADE_PATHS = { + 'luminous': 'mimic', + } + self.ceph_utils.resolve_ceph_version.side_effect = ( + lambda v: _vers.get(v) + ) + self.assertTrue(ceph_hooks.upgrade_available()) + + @patch.object(ceph_hooks, 'upgrade_available') + def test_install_packages(self, upgrade_available): + mock_config = MagicMock() + mock_config.get.side_effect = self.test_config.get + mock_config.changed.return_value = True + self.config.side_effect = None + self.config.return_value = mock_config + upgrade_available.return_value = False ceph_hooks.install_packages() self.add_source.assert_called_with('distro', 'secretkey') - self.assertTrue(self.apt_update.called) - self.apt_purge.assert_called_with(['libapache2-mod-fastcgi']) + self.apt_update.assert_called_with(fatal=True) + self.apt_purge.assert_called_with(ceph_hooks.APACHE_PACKAGES) + self.apt_install.assert_called_with(ceph_hooks.PACKAGES, + fatal=True) + mock_config.changed.assert_called_with('source') + self.filter_installed_packages.assert_called_with( + ceph_hooks.PACKAGES + ) + self.filter_missing_packages.assert_called_with( + ceph_hooks.APACHE_PACKAGES + ) + + @patch.object(ceph_hooks, 'upgrade_available') + def test_install_packages_upgrades(self, upgrade_available): + mock_config = MagicMock() + mock_config.get.side_effect = self.test_config.get + mock_config.changed.return_value = True + self.config.side_effect = None + self.config.return_value = mock_config + upgrade_available.return_value = True + ceph_hooks.install_packages() + self.add_source.assert_called_with('distro', 'secretkey') + self.apt_update.assert_called_with(fatal=True) + self.apt_purge.assert_called_with(ceph_hooks.APACHE_PACKAGES) + self.apt_install.assert_called_with(ceph_hooks.PACKAGES, + fatal=True) + mock_config.changed.assert_called_with('source') + self.filter_installed_packages.assert_not_called() + self.filter_missing_packages.assert_called_with( + ceph_hooks.APACHE_PACKAGES + ) def test_install(self): _install_packages = self.patch('install_packages')