diff --git a/Makefile b/Makefile index 46b94807..8255f93b 100644 --- a/Makefile +++ b/Makefile @@ -15,12 +15,21 @@ functional_test: bin/charm_helpers_sync.py: @mkdir -p bin @bzr cat lp:charm-helpers/tools/charm_helpers_sync/charm_helpers_sync.py \ - > bin/charm_helpers_sync.py + > bin/charm_helpers_sync.py -sync: bin/charm_helpers_sync.py +bin/git_sync.py: + @mkdir -p bin + @wget -O bin/git_sync.py https://raw.githubusercontent.com/ChrisMacNaughton/git-sync/master/git_sync.py + +ch-sync: bin/charm_helpers_sync.py $(PYTHON) bin/charm_helpers_sync.py -c charm-helpers-hooks.yaml $(PYTHON) bin/charm_helpers_sync.py -c charm-helpers-tests.yaml +git-sync: bin/git_sync.py + $(PYTHON) bin/git_sync.py -d lib/ceph -s https://github.com/CanonicalLtd/charms_ceph.git + +sync: git-sync ch-sync + publish: lint bzr push lp:charms/ceph-osd bzr push lp:charms/trusty/ceph-osd diff --git a/actions/__init__.py b/actions/__init__.py index b7fe4e1b..8d6182b7 100644 --- a/actions/__init__.py +++ b/actions/__init__.py @@ -14,3 +14,4 @@ import sys sys.path.append('hooks') +sys.path.append('lib') diff --git a/actions/pause_resume.py b/actions/pause_resume.py index 4d10b759..800c8814 100755 --- a/actions/pause_resume.py +++ b/actions/pause_resume.py @@ -20,13 +20,14 @@ import os import sys from subprocess import check_call +sys.path.append('lib') sys.path.append('hooks') from charmhelpers.core.hookenv import ( action_fail, ) -from ceph import get_local_osd_ids +from ceph.ceph.ceph import get_local_osd_ids from ceph_hooks import assess_status from utils import ( diff --git a/hooks/ceph_hooks.py b/hooks/ceph_hooks.py index f2b18bd6..81ae5f6b 100755 --- a/hooks/ceph_hooks.py +++ b/hooks/ceph_hooks.py @@ -22,7 +22,8 @@ import socket import time import netifaces -import ceph +sys.path.append('lib') +from ceph.ceph import ceph from charmhelpers.core import hookenv from charmhelpers.core.hookenv import ( log, diff --git a/lib/ceph/__init__.py b/lib/ceph/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/ceph/ceph/__init__.py b/lib/ceph/ceph/__init__.py new file mode 100644 index 00000000..9847ec9e --- /dev/null +++ b/lib/ceph/ceph/__init__.py @@ -0,0 +1 @@ +__author__ = 'chris' diff --git a/hooks/ceph.py b/lib/ceph/ceph/ceph.py similarity index 93% rename from hooks/ceph.py rename to lib/ceph/ceph/ceph.py index 712ef3f5..4b68e039 100644 --- a/hooks/ceph.py +++ b/lib/ceph/ceph/ceph.py @@ -4,52 +4,50 @@ # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import ctypes -import ctypes.util -import errno import json import subprocess import time import os import re import sys +import errno import shutil -from charmhelpers.cli.host import mounts + from charmhelpers.core import hookenv + from charmhelpers.core.host import ( mkdir, chownr, - cmp_pkgrevno, + service_restart, lsb_release, - service_stop, - service_restart) + cmp_pkgrevno, service_stop, mounts) from charmhelpers.core.hookenv import ( log, ERROR, - WARNING, - DEBUG, cached, status_set, -) + WARNING, DEBUG) +from charmhelpers.core.services import render_template from charmhelpers.fetch import ( apt_cache ) + from charmhelpers.contrib.storage.linux.utils import ( - zap_disk, is_block_device, - is_device_mounted, -) + zap_disk, + is_device_mounted) from utils import ( get_unit_hostname, - render_template) +) + LEADER = 'leader' PEON = 'peon' @@ -499,6 +497,30 @@ def get_local_osd_ids(): return osd_ids +def get_local_mon_ids(): + """ + This will list the /var/lib/ceph/mon/* directories and try + to split the ID off of the directory name and return it in + a list + + :return: list. A list of monitor identifiers :raise: OSError if + something goes wrong with listing the directory. + """ + mon_ids = [] + mon_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'mon') + if os.path.exists(mon_path): + try: + dirs = os.listdir(mon_path) + for mon_dir in dirs: + # Basically this takes everything after ceph- as the monitor ID + match = re.search('ceph-(?P.*)', mon_dir) + if match: + mon_ids.append(match.group('mon_id')) + except OSError: + raise + return mon_ids + + def _is_int(v): """Return True if the object v can be turned into an integer.""" try: @@ -509,7 +531,7 @@ def _is_int(v): def get_version(): - '''Derive Ceph release from an installed package.''' + """Derive Ceph release from an installed package.""" import apt_pkg as apt cache = apt_cache() @@ -600,6 +622,7 @@ def is_leader(): def wait_for_quorum(): while not is_quorum(): + log("Waiting for quorum to be reached") time.sleep(3) @@ -782,7 +805,7 @@ def is_bootstrapped(): def wait_for_bootstrap(): - while (not is_bootstrapped()): + while not is_bootstrapped(): time.sleep(3) @@ -815,6 +838,18 @@ def import_osd_upgrade_key(key): ] subprocess.check_call(cmd) + +def generate_monitor_secret(): + cmd = [ + 'ceph-authtool', + '/dev/stdout', + '--name=mon.', + '--gen-key' + ] + res = subprocess.check_output(cmd) + + return "{}==".format(res.split('=')[1].strip()) + # OSD caps taken from ceph-create-keys _osd_bootstrap_caps = { 'mon': [ @@ -878,9 +913,12 @@ def import_radosgw_key(key): # OSD caps taken from ceph-create-keys _radosgw_caps = { - 'mon': ['allow r'], + 'mon': ['allow rw'], 'osd': ['allow rwx'] } +_upgrade_caps = { + 'mon': ['allow rwx'] +} def get_radosgw_key(): @@ -888,10 +926,34 @@ def get_radosgw_key(): _default_caps = { - 'mon': ['allow r'], + 'mon': ['allow rw'], 'osd': ['allow rwx'] } +admin_caps = { + 'mds': ['allow'], + 'mon': ['allow *'], + 'osd': ['allow *'] +} + +osd_upgrade_caps = { + 'mon': ['allow command "config-key"', + 'allow command "osd tree"', + 'allow command "config-key list"', + 'allow command "config-key put"', + 'allow command "config-key get"', + 'allow command "config-key exists"', + 'allow command "osd out"', + 'allow command "osd in"', + 'allow command "osd rm"', + 'allow command "auth del"', + ] +} + + +def get_upgrade_key(): + return get_named_key('upgrade-osd', _upgrade_caps) + def get_named_key(name, caps=None): caps = caps or _default_caps @@ -916,6 +978,19 @@ def get_named_key(name, caps=None): return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103 +def upgrade_key_caps(key, caps): + """ Upgrade key to have capabilities caps """ + if not is_leader(): + # Not the MON leader OR not clustered + return + cmd = [ + "sudo", "-u", ceph_user(), 'ceph', 'auth', 'caps', key + ] + for subsystem, subcaps in caps.iteritems(): + cmd.extend([subsystem, '; '.join(subcaps)]) + subprocess.check_call(cmd) + + @cached def systemd(): return (lsb_release()['DISTRIB_CODENAME'] >= 'vivid') diff --git a/lib/ceph/ceph/ceph_broker.py b/lib/ceph/ceph/ceph_broker.py new file mode 100644 index 00000000..da6c3424 --- /dev/null +++ b/lib/ceph/ceph/ceph_broker.py @@ -0,0 +1,352 @@ +#!/usr/bin/python +# +# Copyright 2016 Canonical Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from charmhelpers.core.hookenv import ( + log, + DEBUG, + INFO, + ERROR, +) +from charmhelpers.contrib.storage.linux.ceph import ( + create_erasure_profile, + delete_pool, + erasure_profile_exists, + get_osds, + pool_exists, + pool_set, + remove_pool_snapshot, + rename_pool, + set_pool_quota, + snapshot_pool, + validator, + ErasurePool, + Pool, + ReplicatedPool, +) + +# This comes from http://docs.ceph.com/docs/master/rados/operations/pools/ +# This should do a decent job of preventing people from passing in bad values. +# It will give a useful error message +POOL_KEYS = { + # "Ceph Key Name": [Python type, [Valid Range]] + "size": [int], + "min_size": [int], + "crash_replay_interval": [int], + "pgp_num": [int], # = or < pg_num + "crush_ruleset": [int], + "hashpspool": [bool], + "nodelete": [bool], + "nopgchange": [bool], + "nosizechange": [bool], + "write_fadvise_dontneed": [bool], + "noscrub": [bool], + "nodeep-scrub": [bool], + "hit_set_type": [basestring, ["bloom", "explicit_hash", + "explicit_object"]], + "hit_set_count": [int, [1, 1]], + "hit_set_period": [int], + "hit_set_fpp": [float, [0.0, 1.0]], + "cache_target_dirty_ratio": [float], + "cache_target_dirty_high_ratio": [float], + "cache_target_full_ratio": [float], + "target_max_bytes": [int], + "target_max_objects": [int], + "cache_min_flush_age": [int], + "cache_min_evict_age": [int], + "fast_read": [bool], +} + +CEPH_BUCKET_TYPES = [ + 'osd', + 'host', + 'chassis', + 'rack', + 'row', + 'pdu', + 'pod', + 'room', + 'datacenter', + 'region', + 'root' +] + + +def decode_req_encode_rsp(f): + """Decorator to decode incoming requests and encode responses.""" + + def decode_inner(req): + return json.dumps(f(json.loads(req))) + + return decode_inner + + +@decode_req_encode_rsp +def process_requests(reqs): + """Process Ceph broker request(s). + + This is a versioned api. API version must be supplied by the client making + the request. + """ + request_id = reqs.get('request-id') + try: + version = reqs.get('api-version') + if version == 1: + log('Processing request {}'.format(request_id), level=DEBUG) + resp = process_requests_v1(reqs['ops']) + if request_id: + resp['request-id'] = request_id + + return resp + + except Exception as exc: + log(str(exc), level=ERROR) + msg = ("Unexpected error occurred while processing requests: %s" % + reqs) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + msg = ("Missing or invalid api version (%s)" % version) + resp = {'exit-code': 1, 'stderr': msg} + if request_id: + resp['request-id'] = request_id + + return resp + + +def handle_create_erasure_profile(request, service): + # "local" | "shec" or it defaults to "jerasure" + erasure_type = request.get('erasure-type') + # "host" | "rack" or it defaults to "host" # Any valid Ceph bucket + failure_domain = request.get('failure-domain') + name = request.get('name') + k = request.get('k') + m = request.get('m') + l = request.get('l') + + if failure_domain not in CEPH_BUCKET_TYPES: + msg = "failure-domain must be one of {}".format(CEPH_BUCKET_TYPES) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + create_erasure_profile(service=service, erasure_plugin_name=erasure_type, + profile_name=name, failure_domain=failure_domain, + data_chunks=k, coding_chunks=m, locality=l) + + +def handle_erasure_pool(request, service): + pool_name = request.get('name') + erasure_profile = request.get('erasure-profile') + quota = request.get('max-bytes') + weight = request.get('weight') + + if erasure_profile is None: + erasure_profile = "default-canonical" + + # Check for missing params + if pool_name is None: + msg = "Missing parameter. name is required for the pool" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + # TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds + if not erasure_profile_exists(service=service, name=erasure_profile): + # TODO: Fail and tell them to create the profile or default + msg = ("erasure-profile {} does not exist. Please create it with: " + "create-erasure-profile".format(erasure_profile)) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + pool = ErasurePool(service=service, name=pool_name, + erasure_code_profile=erasure_profile, + percent_data=weight) + # Ok make the erasure pool + if not pool_exists(service=service, name=pool_name): + log("Creating pool '%s' (erasure_profile=%s)" % (pool.name, + erasure_profile), + level=INFO) + pool.create() + + # Set a quota if requested + if quota is not None: + set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota) + + +def handle_replicated_pool(request, service): + pool_name = request.get('name') + replicas = request.get('replicas') + quota = request.get('max-bytes') + weight = request.get('weight') + + # Optional params + pg_num = request.get('pg_num') + if pg_num: + # Cap pg_num to max allowed just in case. + osds = get_osds(service) + if osds: + pg_num = min(pg_num, (len(osds) * 100 // replicas)) + + # Check for missing params + if pool_name is None or replicas is None: + msg = "Missing parameter. name and replicas are required" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + kwargs = {} + if pg_num: + kwargs['pg_num'] = pg_num + if weight: + kwargs['percent_data'] = weight + if replicas: + kwargs['replicas'] = replicas + + pool = ReplicatedPool(service=service, + name=pool_name, **kwargs) + if not pool_exists(service=service, name=pool_name): + log("Creating pool '%s' (replicas=%s)" % (pool.name, replicas), + level=INFO) + pool.create() + else: + log("Pool '%s' already exists - skipping create" % pool.name, + level=DEBUG) + + # Set a quota if requested + if quota is not None: + set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota) + + +def handle_create_cache_tier(request, service): + # mode = "writeback" | "readonly" + storage_pool = request.get('cold-pool') + cache_pool = request.get('hot-pool') + cache_mode = request.get('mode') + + if cache_mode is None: + cache_mode = "writeback" + + # cache and storage pool must exist first + if not pool_exists(service=service, name=storage_pool) or not pool_exists( + service=service, name=cache_pool): + msg = ("cold-pool: {} and hot-pool: {} must exist. Please create " + "them first".format(storage_pool, cache_pool)) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + p = Pool(service=service, name=storage_pool) + p.add_cache_tier(cache_pool=cache_pool, mode=cache_mode) + + +def handle_remove_cache_tier(request, service): + storage_pool = request.get('cold-pool') + cache_pool = request.get('hot-pool') + # cache and storage pool must exist first + if not pool_exists(service=service, name=storage_pool) or not pool_exists( + service=service, name=cache_pool): + msg = ("cold-pool: {} or hot-pool: {} doesn't exist. Not " + "deleting cache tier".format(storage_pool, cache_pool)) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + pool = Pool(name=storage_pool, service=service) + pool.remove_cache_tier(cache_pool=cache_pool) + + +def handle_set_pool_value(request, service): + # Set arbitrary pool values + params = {'pool': request.get('name'), + 'key': request.get('key'), + 'value': request.get('value')} + if params['key'] not in POOL_KEYS: + msg = "Invalid key '%s'" % params['key'] + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + # Get the validation method + validator_params = POOL_KEYS[params['key']] + if len(validator_params) is 1: + # Validate that what the user passed is actually legal per Ceph's rules + validator(params['value'], validator_params[0]) + else: + # Validate that what the user passed is actually legal per Ceph's rules + validator(params['value'], validator_params[0], validator_params[1]) + + # Set the value + pool_set(service=service, pool_name=params['pool'], key=params['key'], + value=params['value']) + + +def process_requests_v1(reqs): + """Process v1 requests. + + Takes a list of requests (dicts) and processes each one. If an error is + found, processing stops and the client is notified in the response. + + Returns a response dict containing the exit code (non-zero if any + operation failed along with an explanation). + """ + ret = None + log("Processing %s ceph broker requests" % (len(reqs)), level=INFO) + for req in reqs: + op = req.get('op') + log("Processing op='%s'" % op, level=DEBUG) + # Use admin client since we do not have other client key locations + # setup to use them for these operations. + svc = 'admin' + if op == "create-pool": + pool_type = req.get('pool-type') # "replicated" | "erasure" + + # Default to replicated if pool_type isn't given + if pool_type == 'erasure': + ret = handle_erasure_pool(request=req, service=svc) + else: + ret = handle_replicated_pool(request=req, service=svc) + + elif op == "create-cache-tier": + ret = handle_create_cache_tier(request=req, service=svc) + elif op == "remove-cache-tier": + ret = handle_remove_cache_tier(request=req, service=svc) + elif op == "create-erasure-profile": + ret = handle_create_erasure_profile(request=req, service=svc) + elif op == "delete-pool": + pool = req.get('name') + ret = delete_pool(service=svc, name=pool) + elif op == "rename-pool": + old_name = req.get('name') + new_name = req.get('new-name') + ret = rename_pool(service=svc, old_name=old_name, + new_name=new_name) + elif op == "snapshot-pool": + pool = req.get('name') + snapshot_name = req.get('snapshot-name') + ret = snapshot_pool(service=svc, pool_name=pool, + snapshot_name=snapshot_name) + elif op == "remove-pool-snapshot": + pool = req.get('name') + snapshot_name = req.get('snapshot-name') + ret = remove_pool_snapshot(service=svc, pool_name=pool, + snapshot_name=snapshot_name) + elif op == "set-pool-value": + ret = handle_set_pool_value(request=req, service=svc) + else: + msg = "Unknown operation '%s'" % op + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + if type(ret) == dict and 'exit-code' in ret: + return ret + + return {'exit-code': 0} diff --git a/unit_tests/__init__.py b/unit_tests/__init__.py index d7a4ace1..84f643d0 100644 --- a/unit_tests/__init__.py +++ b/unit_tests/__init__.py @@ -14,4 +14,5 @@ import sys sys.path.append('hooks') +sys.path.append('lib') sys.path.append('actions') diff --git a/unit_tests/test_replace_osd.py b/unit_tests/test_replace_osd.py index ce919382..53109403 100644 --- a/unit_tests/test_replace_osd.py +++ b/unit_tests/test_replace_osd.py @@ -18,7 +18,7 @@ import posix from mock import call, Mock, patch import test_utils -import ceph +from ceph.ceph import ceph import replace_osd TO_PATCH = [ @@ -73,13 +73,13 @@ class ReplaceOsdTestCase(test_utils.CharmTestCase): ]) assert ret == 0 - @patch('ceph.mounts') - @patch('ceph.subprocess') - @patch('ceph.umount') - @patch('ceph.osdize') - @patch('ceph.shutil') - @patch('ceph.systemd') - @patch('ceph.ceph_user') + @patch('ceph.ceph.ceph.mounts') + @patch('ceph.ceph.ceph.subprocess') + @patch('ceph.ceph.ceph.umount') + @patch('ceph.ceph.ceph.osdize') + @patch('ceph.ceph.ceph.shutil') + @patch('ceph.ceph.ceph.systemd') + @patch('ceph.ceph.ceph.ceph_user') def test_replace_osd(self, ceph_user, systemd, diff --git a/unit_tests/test_tuning.py b/unit_tests/test_tuning.py index 61a69443..84358e53 100644 --- a/unit_tests/test_tuning.py +++ b/unit_tests/test_tuning.py @@ -1,7 +1,7 @@ __author__ = 'Chris Holcombe ' from mock import patch, call import test_utils -import ceph +from ceph.ceph import ceph TO_PATCH = [ 'hookenv', @@ -16,8 +16,8 @@ class PerformanceTestCase(test_utils.CharmTestCase): super(PerformanceTestCase, self).setUp(ceph, TO_PATCH) def test_tune_nic(self): - with patch('ceph.get_link_speed', return_value=10000): - with patch('ceph.save_sysctls') as save_sysctls: + with patch('ceph.ceph.ceph.get_link_speed', return_value=10000): + with patch('ceph.ceph.ceph.save_sysctls') as save_sysctls: ceph.tune_nic('eth0') save_sysctls.assert_has_calls( [ @@ -49,12 +49,12 @@ class PerformanceTestCase(test_utils.CharmTestCase): uuid = ceph.get_block_uuid('/dev/sda1') self.assertEqual(uuid, '378f3c86-b21a-4172-832d-e2b3d4bc7511') - @patch('ceph.persist_settings') - @patch('ceph.set_hdd_read_ahead') - @patch('ceph.get_max_sectors_kb') - @patch('ceph.get_max_hw_sectors_kb') - @patch('ceph.set_max_sectors_kb') - @patch('ceph.get_block_uuid') + @patch('ceph.ceph.ceph.persist_settings') + @patch('ceph.ceph.ceph.set_hdd_read_ahead') + @patch('ceph.ceph.ceph.get_max_sectors_kb') + @patch('ceph.ceph.ceph.get_max_hw_sectors_kb') + @patch('ceph.ceph.ceph.set_max_sectors_kb') + @patch('ceph.ceph.ceph.get_block_uuid') def test_tune_dev(self, block_uuid, set_max_sectors_kb, @@ -84,12 +84,12 @@ class PerformanceTestCase(test_utils.CharmTestCase): call('maintenance', 'Finished tuning device /dev/sda') ]) - @patch('ceph.persist_settings') - @patch('ceph.set_hdd_read_ahead') - @patch('ceph.get_max_sectors_kb') - @patch('ceph.get_max_hw_sectors_kb') - @patch('ceph.set_max_sectors_kb') - @patch('ceph.get_block_uuid') + @patch('ceph.ceph.ceph.persist_settings') + @patch('ceph.ceph.ceph.set_hdd_read_ahead') + @patch('ceph.ceph.ceph.get_max_sectors_kb') + @patch('ceph.ceph.ceph.get_max_hw_sectors_kb') + @patch('ceph.ceph.ceph.set_max_sectors_kb') + @patch('ceph.ceph.ceph.get_block_uuid') def test_tune_dev_2(self, block_uuid, set_max_sectors_kb, diff --git a/unit_tests/test_upgrade_roll.py b/unit_tests/test_upgrade_roll.py index a3a6f260..7fca5918 100644 --- a/unit_tests/test_upgrade_roll.py +++ b/unit_tests/test_upgrade_roll.py @@ -16,7 +16,7 @@ import time from mock import patch, call, MagicMock -from ceph import CrushLocation +from ceph.ceph.ceph import CrushLocation import test_utils import ceph_hooks