diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..7f7b5be --- /dev/null +++ b/.coveragerc @@ -0,0 +1,7 @@ +[report] +# Regexes for lines to exclude from consideration +exclude_lines = + if __name__ == .__main__.: +include= + hooks/hooks.py + hooks/ceph*.py diff --git a/Makefile b/Makefile index 70e95fb..e29ab2b 100644 --- a/Makefile +++ b/Makefile @@ -2,9 +2,13 @@ PYTHON := /usr/bin/env python lint: - @flake8 --exclude hooks/charmhelpers hooks tests + @flake8 --exclude hooks/charmhelpers hooks tests unit_tests @charm proof +unit_test: + @echo Starting unit tests... + @$(PYTHON) /usr/bin/nosetests --nologcapture --with-coverage unit_tests + test: @echo Starting Amulet tests... # coreycb note: The -v should only be temporary until Amulet sends diff --git a/charm-helpers-hooks.yaml b/charm-helpers-hooks.yaml index afb9e42..c401e72 100644 --- a/charm-helpers-hooks.yaml +++ b/charm-helpers-hooks.yaml @@ -5,6 +5,7 @@ include: - fetch - contrib.storage.linux: - utils + - ceph - payload.execd - contrib.openstack.alternatives - contrib.network.ip diff --git a/hooks/__init__.py b/hooks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hooks/ceph_broker.py b/hooks/ceph_broker.py new file mode 100644 index 0000000..9fced94 --- /dev/null +++ b/hooks/ceph_broker.py @@ -0,0 +1,90 @@ +#!/usr/bin/python +# +# Copyright 2014 Canonical Ltd. +# +import json + +from charmhelpers.core.hookenv import ( + log, + DEBUG, + INFO, + ERROR, +) +from charmhelpers.contrib.storage.linux.ceph import ( + create_pool, + pool_exists, +) + + +def decode_req_encode_rsp(f): + """Decorator to decode incoming requests and encode responses.""" + def decode_inner(req): + return json.dumps(f(json.loads(req))) + + return decode_inner + + +@decode_req_encode_rsp +def process_requests(reqs): + """Process Ceph broker request(s). + + This is a versioned api. API version must be supplied by the client making + the request. + """ + try: + version = reqs.get('api-version') + if version == 1: + return process_requests_v1(reqs['ops']) + + except Exception as exc: + log(str(exc), level=ERROR) + msg = ("Unexpected error occurred while processing requests: %s" % + (reqs)) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + msg = ("Missing or invalid api version (%s)" % (version)) + return {'exit-code': 1, 'stderr': msg} + + +def process_requests_v1(reqs): + """Process v1 requests. + + Takes a list of requests (dicts) and processes each one. If an error is + found, processing stops and the client is notified in the response. + + Returns a response dict containing the exit code (non-zero if any + operation failed along with an explanation). + """ + log("Processing %s ceph broker requests" % (len(reqs)), level=INFO) + for req in reqs: + op = req.get('op') + log("Processing op='%s'" % (op), level=DEBUG) + # Use admin client since we do not have other client key locations + # setup to use them for these operations. + svc = 'admin' + if op == "create-pool": + params = {'pool': req.get('name'), + 'replicas': req.get('replicas')} + if not all(params.iteritems()): + msg = ("Missing parameter(s): %s" % + (' '.join([k for k in params.iterkeys() + if not params[k]]))) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + pool = params['pool'] + replicas = params['replicas'] + if not pool_exists(service=svc, name=pool): + log("Creating pool '%s' (replicas=%s)" % (pool, replicas), + level=INFO) + create_pool(service=svc, name=pool, replicas=replicas) + else: + log("Pool '%s' already exists - skipping create" % (pool), + level=DEBUG) + else: + msg = "Unknown operation '%s'" % (op) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + return {'exit-code': 0} diff --git a/hooks/charmhelpers/contrib/network/ip.py b/hooks/charmhelpers/contrib/network/ip.py index e62e565..c4bfead 100644 --- a/hooks/charmhelpers/contrib/network/ip.py +++ b/hooks/charmhelpers/contrib/network/ip.py @@ -8,7 +8,6 @@ from functools import partial from charmhelpers.core.hookenv import unit_get from charmhelpers.fetch import apt_install from charmhelpers.core.hookenv import ( - WARNING, ERROR, log ) @@ -175,7 +174,6 @@ def format_ipv6_addr(address): if is_ipv6(address): address = "[%s]" % address else: - log("Not a valid ipv6 address: %s" % address, level=WARNING) address = None return address diff --git a/hooks/charmhelpers/contrib/storage/linux/ceph.py b/hooks/charmhelpers/contrib/storage/linux/ceph.py new file mode 100644 index 0000000..598ec26 --- /dev/null +++ b/hooks/charmhelpers/contrib/storage/linux/ceph.py @@ -0,0 +1,388 @@ +# +# Copyright 2012 Canonical Ltd. +# +# This file is sourced from lp:openstack-charm-helpers +# +# Authors: +# James Page +# Adam Gandelman +# + +import os +import shutil +import json +import time + +from subprocess import ( + check_call, + check_output, + CalledProcessError +) + +from charmhelpers.core.hookenv import ( + relation_get, + relation_ids, + related_units, + log, + INFO, + WARNING, + ERROR +) + +from charmhelpers.core.host import ( + mount, + mounts, + service_start, + service_stop, + service_running, + umount, +) + +from charmhelpers.fetch import ( + apt_install, +) + +KEYRING = '/etc/ceph/ceph.client.{}.keyring' +KEYFILE = '/etc/ceph/ceph.client.{}.key' + +CEPH_CONF = """[global] + auth supported = {auth} + keyring = {keyring} + mon host = {mon_hosts} + log to syslog = {use_syslog} + err to syslog = {use_syslog} + clog to syslog = {use_syslog} +""" + + +def install(): + ''' Basic Ceph client installation ''' + ceph_dir = "/etc/ceph" + if not os.path.exists(ceph_dir): + os.mkdir(ceph_dir) + apt_install('ceph-common', fatal=True) + + +def rbd_exists(service, pool, rbd_img): + ''' Check to see if a RADOS block device exists ''' + try: + out = check_output(['rbd', 'list', '--id', service, + '--pool', pool]) + except CalledProcessError: + return False + else: + return rbd_img in out + + +def create_rbd_image(service, pool, image, sizemb): + ''' Create a new RADOS block device ''' + cmd = [ + 'rbd', + 'create', + image, + '--size', + str(sizemb), + '--id', + service, + '--pool', + pool + ] + check_call(cmd) + + +def pool_exists(service, name): + ''' Check to see if a RADOS pool already exists ''' + try: + out = check_output(['rados', '--id', service, 'lspools']) + except CalledProcessError: + return False + else: + return name in out + + +def get_osds(service): + ''' + Return a list of all Ceph Object Storage Daemons + currently in the cluster + ''' + version = ceph_version() + if version and version >= '0.56': + return json.loads(check_output(['ceph', '--id', service, + 'osd', 'ls', '--format=json'])) + else: + return None + + +def create_pool(service, name, replicas=3): + ''' Create a new RADOS pool ''' + if pool_exists(service, name): + log("Ceph pool {} already exists, skipping creation".format(name), + level=WARNING) + return + # Calculate the number of placement groups based + # on upstream recommended best practices. + osds = get_osds(service) + if osds: + pgnum = (len(osds) * 100 / replicas) + else: + # NOTE(james-page): Default to 200 for older ceph versions + # which don't support OSD query from cli + pgnum = 200 + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'create', + name, str(pgnum) + ] + check_call(cmd) + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'set', name, + 'size', str(replicas) + ] + check_call(cmd) + + +def delete_pool(service, name): + ''' Delete a RADOS pool from ceph ''' + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'delete', + name, '--yes-i-really-really-mean-it' + ] + check_call(cmd) + + +def _keyfile_path(service): + return KEYFILE.format(service) + + +def _keyring_path(service): + return KEYRING.format(service) + + +def create_keyring(service, key): + ''' Create a new Ceph keyring containing key''' + keyring = _keyring_path(service) + if os.path.exists(keyring): + log('ceph: Keyring exists at %s.' % keyring, level=WARNING) + return + cmd = [ + 'ceph-authtool', + keyring, + '--create-keyring', + '--name=client.{}'.format(service), + '--add-key={}'.format(key) + ] + check_call(cmd) + log('ceph: Created new ring at %s.' % keyring, level=INFO) + + +def create_key_file(service, key): + ''' Create a file containing key ''' + keyfile = _keyfile_path(service) + if os.path.exists(keyfile): + log('ceph: Keyfile exists at %s.' % keyfile, level=WARNING) + return + with open(keyfile, 'w') as fd: + fd.write(key) + log('ceph: Created new keyfile at %s.' % keyfile, level=INFO) + + +def get_ceph_nodes(): + ''' Query named relation 'ceph' to detemine current nodes ''' + hosts = [] + for r_id in relation_ids('ceph'): + for unit in related_units(r_id): + hosts.append(relation_get('private-address', unit=unit, rid=r_id)) + return hosts + + +def configure(service, key, auth, use_syslog): + ''' Perform basic configuration of Ceph ''' + create_keyring(service, key) + create_key_file(service, key) + hosts = get_ceph_nodes() + with open('/etc/ceph/ceph.conf', 'w') as ceph_conf: + ceph_conf.write(CEPH_CONF.format(auth=auth, + keyring=_keyring_path(service), + mon_hosts=",".join(map(str, hosts)), + use_syslog=use_syslog)) + modprobe('rbd') + + +def image_mapped(name): + ''' Determine whether a RADOS block device is mapped locally ''' + try: + out = check_output(['rbd', 'showmapped']) + except CalledProcessError: + return False + else: + return name in out + + +def map_block_storage(service, pool, image): + ''' Map a RADOS block device for local use ''' + cmd = [ + 'rbd', + 'map', + '{}/{}'.format(pool, image), + '--user', + service, + '--secret', + _keyfile_path(service), + ] + check_call(cmd) + + +def filesystem_mounted(fs): + ''' Determine whether a filesytems is already mounted ''' + return fs in [f for f, m in mounts()] + + +def make_filesystem(blk_device, fstype='ext4', timeout=10): + ''' Make a new filesystem on the specified block device ''' + count = 0 + e_noent = os.errno.ENOENT + while not os.path.exists(blk_device): + if count >= timeout: + log('ceph: gave up waiting on block device %s' % blk_device, + level=ERROR) + raise IOError(e_noent, os.strerror(e_noent), blk_device) + log('ceph: waiting for block device %s to appear' % blk_device, + level=INFO) + count += 1 + time.sleep(1) + else: + log('ceph: Formatting block device %s as filesystem %s.' % + (blk_device, fstype), level=INFO) + check_call(['mkfs', '-t', fstype, blk_device]) + + +def place_data_on_block_device(blk_device, data_src_dst): + ''' Migrate data in data_src_dst to blk_device and then remount ''' + # mount block device into /mnt + mount(blk_device, '/mnt') + # copy data to /mnt + copy_files(data_src_dst, '/mnt') + # umount block device + umount('/mnt') + # Grab user/group ID's from original source + _dir = os.stat(data_src_dst) + uid = _dir.st_uid + gid = _dir.st_gid + # re-mount where the data should originally be + # TODO: persist is currently a NO-OP in core.host + mount(blk_device, data_src_dst, persist=True) + # ensure original ownership of new mount. + os.chown(data_src_dst, uid, gid) + + +# TODO: re-use +def modprobe(module): + ''' Load a kernel module and configure for auto-load on reboot ''' + log('ceph: Loading kernel module', level=INFO) + cmd = ['modprobe', module] + check_call(cmd) + with open('/etc/modules', 'r+') as modules: + if module not in modules.read(): + modules.write(module) + + +def copy_files(src, dst, symlinks=False, ignore=None): + ''' Copy files from src to dst ''' + for item in os.listdir(src): + s = os.path.join(src, item) + d = os.path.join(dst, item) + if os.path.isdir(s): + shutil.copytree(s, d, symlinks, ignore) + else: + shutil.copy2(s, d) + + +def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point, + blk_device, fstype, system_services=[], + replicas=3): + """ + NOTE: This function must only be called from a single service unit for + the same rbd_img otherwise data loss will occur. + + Ensures given pool and RBD image exists, is mapped to a block device, + and the device is formatted and mounted at the given mount_point. + + If formatting a device for the first time, data existing at mount_point + will be migrated to the RBD device before being re-mounted. + + All services listed in system_services will be stopped prior to data + migration and restarted when complete. + """ + # Ensure pool, RBD image, RBD mappings are in place. + if not pool_exists(service, pool): + log('ceph: Creating new pool {}.'.format(pool)) + create_pool(service, pool, replicas=replicas) + + if not rbd_exists(service, pool, rbd_img): + log('ceph: Creating RBD image ({}).'.format(rbd_img)) + create_rbd_image(service, pool, rbd_img, sizemb) + + if not image_mapped(rbd_img): + log('ceph: Mapping RBD Image {} as a Block Device.'.format(rbd_img)) + map_block_storage(service, pool, rbd_img) + + # make file system + # TODO: What happens if for whatever reason this is run again and + # the data is already in the rbd device and/or is mounted?? + # When it is mounted already, it will fail to make the fs + # XXX: This is really sketchy! Need to at least add an fstab entry + # otherwise this hook will blow away existing data if its executed + # after a reboot. + if not filesystem_mounted(mount_point): + make_filesystem(blk_device, fstype) + + for svc in system_services: + if service_running(svc): + log('ceph: Stopping services {} prior to migrating data.' + .format(svc)) + service_stop(svc) + + place_data_on_block_device(blk_device, mount_point) + + for svc in system_services: + log('ceph: Starting service {} after migrating data.' + .format(svc)) + service_start(svc) + + +def ensure_ceph_keyring(service, user=None, group=None): + ''' + Ensures a ceph keyring is created for a named service + and optionally ensures user and group ownership. + + Returns False if no ceph key is available in relation state. + ''' + key = None + for rid in relation_ids('ceph'): + for unit in related_units(rid): + key = relation_get('key', rid=rid, unit=unit) + if key: + break + if not key: + return False + create_keyring(service=service, key=key) + keyring = _keyring_path(service) + if user and group: + check_call(['chown', '%s.%s' % (user, group), keyring]) + return True + + +def ceph_version(): + ''' Retrieve the local version of ceph ''' + if os.path.exists('/usr/bin/ceph'): + cmd = ['ceph', '-v'] + output = check_output(cmd) + output = output.split() + if len(output) > 3: + return output[2] + else: + return None + else: + return None diff --git a/hooks/charmhelpers/core/services/__init__.py b/hooks/charmhelpers/core/services/__init__.py index e8039a8..69dde79 100644 --- a/hooks/charmhelpers/core/services/__init__.py +++ b/hooks/charmhelpers/core/services/__init__.py @@ -1,2 +1,2 @@ -from .base import * -from .helpers import * +from .base import * # NOQA +from .helpers import * # NOQA diff --git a/hooks/charmhelpers/fetch/__init__.py b/hooks/charmhelpers/fetch/__init__.py index 6724d29..2398e8e 100644 --- a/hooks/charmhelpers/fetch/__init__.py +++ b/hooks/charmhelpers/fetch/__init__.py @@ -256,7 +256,7 @@ def add_source(source, key=None): elif source == 'distro': pass else: - raise SourceConfigError("Unknown source: {!r}".format(source)) + log("Unknown source: {!r}".format(source)) if key: if '-----BEGIN PGP PUBLIC KEY BLOCK-----' in key: diff --git a/hooks/client-relation-changed b/hooks/client-relation-changed new file mode 120000 index 0000000..9416ca6 --- /dev/null +++ b/hooks/client-relation-changed @@ -0,0 +1 @@ +hooks.py \ No newline at end of file diff --git a/hooks/hooks.py b/hooks/hooks.py index 8a6c26c..9014401 100755 --- a/hooks/hooks.py +++ b/hooks/hooks.py @@ -15,7 +15,9 @@ import sys import ceph from charmhelpers.core.hookenv import ( - log, ERROR, + log, + DEBUG, + ERROR, config, relation_ids, related_units, @@ -25,7 +27,6 @@ from charmhelpers.core.hookenv import ( Hooks, UnregisteredHookError, service_name ) - from charmhelpers.core.host import ( service_restart, umount, @@ -44,12 +45,14 @@ from charmhelpers.contrib.network.ip import ( get_ipv6_addr, format_ipv6_addr ) - from utils import ( render_template, get_public_addr, assert_charm_supports_ipv6 ) +from ceph_broker import ( + process_requests +) hooks = Hooks() @@ -215,7 +218,7 @@ def notify_radosgws(): def notify_client(): for relid in relation_ids('client'): - client_relation(relid) + client_relation_joined(relid) def upgrade_keys(): @@ -266,28 +269,46 @@ def radosgw_relation(relid=None): @hooks.hook('client-relation-joined') -def client_relation(relid=None): +def client_relation_joined(relid=None): if ceph.is_quorum(): log('mon cluster in quorum - providing client with keys') service_name = None if relid is None: - service_name = remote_unit().split('/')[0] + units = [remote_unit()] + service_name = units[0].split('/')[0] else: units = related_units(relid) if len(units) > 0: service_name = units[0].split('/')[0] + if service_name is not None: - data = { - 'key': ceph.get_named_key(service_name), - 'auth': config('auth-supported'), - 'ceph-public-address': get_public_addr(), - } + data = {'key': ceph.get_named_key(service_name), + 'auth': config('auth-supported'), + 'ceph-public-address': get_public_addr()} relation_set(relation_id=relid, relation_settings=data) + + client_relation_changed(relid=relid) else: log('mon cluster not in quorum - deferring key provision') +@hooks.hook('client-relation-changed') +def client_relation_changed(relid=None): + """Process broker requests from ceph client relations.""" + if ceph.is_quorum(): + settings = relation_get(rid=relid) + if 'broker_req' in settings: + if not ceph.is_leader(): + log("Not leader - ignoring broker request", level=DEBUG) + else: + rsp = process_requests(settings['broker_req']) + relation_set(relation_id=relid, + relation_settings={'broker_rsp': rsp}) + else: + log('mon cluster not in quorum', level=DEBUG) + + @hooks.hook('upgrade-charm') def upgrade_charm(): emit_cephconf() diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..37083b6 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,5 @@ +[nosetests] +verbosity=2 +with-coverage=1 +cover-erase=1 +cover-package=hooks diff --git a/unit_tests/__init__.py b/unit_tests/__init__.py new file mode 100644 index 0000000..f80aab3 --- /dev/null +++ b/unit_tests/__init__.py @@ -0,0 +1,2 @@ +import sys +sys.path.append('hooks') diff --git a/unit_tests/test_ceph_broker.py b/unit_tests/test_ceph_broker.py new file mode 100644 index 0000000..0176d11 --- /dev/null +++ b/unit_tests/test_ceph_broker.py @@ -0,0 +1,72 @@ +import json +import mock +import unittest + +import ceph_broker + + +class CephBrokerTestCase(unittest.TestCase): + + def setUp(self): + super(CephBrokerTestCase, self).setUp() + + @mock.patch('ceph_broker.log') + def test_process_requests_noop(self, mock_log): + req = json.dumps({'api-version': 1, 'ops': []}) + rc = ceph_broker.process_requests(req) + self.assertEqual(json.loads(rc), {'exit-code': 0}) + + @mock.patch('ceph_broker.log') + def test_process_requests_missing_api_version(self, mock_log): + req = json.dumps({'ops': []}) + rc = ceph_broker.process_requests(req) + self.assertEqual(json.loads(rc), {'exit-code': 1, + 'stderr': + ('Missing or invalid api version ' + '(None)')}) + + @mock.patch('ceph_broker.log') + def test_process_requests_invalid_api_version(self, mock_log): + req = json.dumps({'api-version': 2, 'ops': []}) + rc = ceph_broker.process_requests(req) + self.assertEqual(json.loads(rc), + {'exit-code': 1, + 'stderr': 'Missing or invalid api version (2)'}) + + @mock.patch('ceph_broker.log') + def test_process_requests_invalid(self, mock_log): + reqs = json.dumps({'api-version': 1, 'ops': [{'op': 'invalid_op'}]}) + rc = ceph_broker.process_requests(reqs) + self.assertEqual(json.loads(rc), + {'exit-code': 1, + 'stderr': "Unknown operation 'invalid_op'"}) + + @mock.patch('ceph_broker.create_pool') + @mock.patch('ceph_broker.pool_exists') + @mock.patch('ceph_broker.log') + def test_process_requests_create_pool(self, mock_log, mock_pool_exists, + mock_create_pool): + mock_pool_exists.return_value = False + reqs = json.dumps({'api-version': 1, + 'ops': [{'op': 'create-pool', 'name': + 'foo', 'replicas': 3}]}) + rc = ceph_broker.process_requests(reqs) + mock_pool_exists.assert_called_with(service='admin', name='foo') + mock_create_pool.assert_called_with(service='admin', name='foo', + replicas=3) + self.assertEqual(json.loads(rc), {'exit-code': 0}) + + @mock.patch('ceph_broker.create_pool') + @mock.patch('ceph_broker.pool_exists') + @mock.patch('ceph_broker.log') + def test_process_requests_create_pool_exists(self, mock_log, + mock_pool_exists, + mock_create_pool): + mock_pool_exists.return_value = True + reqs = json.dumps({'api-version': 1, + 'ops': [{'op': 'create-pool', 'name': 'foo', + 'replicas': 3}]}) + rc = ceph_broker.process_requests(reqs) + mock_pool_exists.assert_called_with(service='admin', name='foo') + self.assertFalse(mock_create_pool.called) + self.assertEqual(json.loads(rc), {'exit-code': 0})