From 8000bd0d4a9329ee04ff0bfc322243b957dd259b Mon Sep 17 00:00:00 2001 From: Edward Hope-Morley Date: Fri, 7 Nov 2014 14:16:26 +0100 Subject: [PATCH] Added Ceph broker support to allow clients to request resources e.g. pools as opposed to creating them themselves. This hopefully simplifies the logic at the client side and reduces the risk of race conditions by shifting execution to the ceph charm itself. Backwards-compatibility with clients that don't want to/yet support this approach is maintained. --- Makefile | 6 +- charm-helpers-hooks.yaml | 1 + hooks/ceph_broker.py | 47 +++ hooks/charmhelpers/contrib/network/ip.py | 2 - .../contrib/storage/linux/ceph.py | 388 ++++++++++++++++++ hooks/charmhelpers/core/services/__init__.py | 4 +- hooks/charmhelpers/fetch/__init__.py | 2 +- hooks/client-relation-changed | 1 + hooks/hooks.py | 39 +- unit_tests/test_ceph_broker.py | 11 + 10 files changed, 487 insertions(+), 14 deletions(-) create mode 100644 hooks/ceph_broker.py create mode 100644 hooks/charmhelpers/contrib/storage/linux/ceph.py create mode 120000 hooks/client-relation-changed create mode 100644 unit_tests/test_ceph_broker.py diff --git a/Makefile b/Makefile index 70e95fbb..e29ab2b7 100644 --- a/Makefile +++ b/Makefile @@ -2,9 +2,13 @@ PYTHON := /usr/bin/env python lint: - @flake8 --exclude hooks/charmhelpers hooks tests + @flake8 --exclude hooks/charmhelpers hooks tests unit_tests @charm proof +unit_test: + @echo Starting unit tests... + @$(PYTHON) /usr/bin/nosetests --nologcapture --with-coverage unit_tests + test: @echo Starting Amulet tests... # coreycb note: The -v should only be temporary until Amulet sends diff --git a/charm-helpers-hooks.yaml b/charm-helpers-hooks.yaml index afb9e42b..c401e72e 100644 --- a/charm-helpers-hooks.yaml +++ b/charm-helpers-hooks.yaml @@ -5,6 +5,7 @@ include: - fetch - contrib.storage.linux: - utils + - ceph - payload.execd - contrib.openstack.alternatives - contrib.network.ip diff --git a/hooks/ceph_broker.py b/hooks/ceph_broker.py new file mode 100644 index 00000000..cf1df89b --- /dev/null +++ b/hooks/ceph_broker.py @@ -0,0 +1,47 @@ +#!/usr/bin/python +# +# Copyright 2014 Canonical Ltd. +# +from charmhelpers.core.hookenv import ( + log, + INFO, + ERROR +) +from charmhelpers.contrib.storage.linux.ceph import ( + create_pool, + pool_exists, + ensure_ceph_keyring +) + + +def process_requests(reqs): + """Process a Ceph broker request from a ceph client.""" + log("Processing %s ceph broker requests" % (len(reqs)), level=INFO) + for req in reqs: + op = req.get('op') + log("Processing op='%s'" % (op), level=INFO) + # Use admin client since we do not have other client key locations + # setup to use them for these operations. + svc = 'admin' + if op == "create_pool": + pool = req.get('pool') + replicas = req.get('replicas') + if not all([pool, replicas]): + log("Missing parameter(s)", level=ERROR) + return 1 + + if not pool_exists(service=svc, name=pool): + log("Creating pool '%s'" % (pool), level=INFO) + create_pool(service=svc, name=pool, replicas=replicas) + else: + log("Pool '%s' already exists" % (pool), level=INFO) + elif op == "create_keyring": + user = req.get('user') + group = req.get('group') + if not all([user, group]): + log("Missing parameter(s)", level=ERROR) + return 1 + + ensure_ceph_keyring(service=svc, user=user, group=group) + + return 0 diff --git a/hooks/charmhelpers/contrib/network/ip.py b/hooks/charmhelpers/contrib/network/ip.py index e62e5655..c4bfeadb 100644 --- a/hooks/charmhelpers/contrib/network/ip.py +++ b/hooks/charmhelpers/contrib/network/ip.py @@ -8,7 +8,6 @@ from functools import partial from charmhelpers.core.hookenv import unit_get from charmhelpers.fetch import apt_install from charmhelpers.core.hookenv import ( - WARNING, ERROR, log ) @@ -175,7 +174,6 @@ def format_ipv6_addr(address): if is_ipv6(address): address = "[%s]" % address else: - log("Not a valid ipv6 address: %s" % address, level=WARNING) address = None return address diff --git a/hooks/charmhelpers/contrib/storage/linux/ceph.py b/hooks/charmhelpers/contrib/storage/linux/ceph.py new file mode 100644 index 00000000..598ec263 --- /dev/null +++ b/hooks/charmhelpers/contrib/storage/linux/ceph.py @@ -0,0 +1,388 @@ +# +# Copyright 2012 Canonical Ltd. +# +# This file is sourced from lp:openstack-charm-helpers +# +# Authors: +# James Page +# Adam Gandelman +# + +import os +import shutil +import json +import time + +from subprocess import ( + check_call, + check_output, + CalledProcessError +) + +from charmhelpers.core.hookenv import ( + relation_get, + relation_ids, + related_units, + log, + INFO, + WARNING, + ERROR +) + +from charmhelpers.core.host import ( + mount, + mounts, + service_start, + service_stop, + service_running, + umount, +) + +from charmhelpers.fetch import ( + apt_install, +) + +KEYRING = '/etc/ceph/ceph.client.{}.keyring' +KEYFILE = '/etc/ceph/ceph.client.{}.key' + +CEPH_CONF = """[global] + auth supported = {auth} + keyring = {keyring} + mon host = {mon_hosts} + log to syslog = {use_syslog} + err to syslog = {use_syslog} + clog to syslog = {use_syslog} +""" + + +def install(): + ''' Basic Ceph client installation ''' + ceph_dir = "/etc/ceph" + if not os.path.exists(ceph_dir): + os.mkdir(ceph_dir) + apt_install('ceph-common', fatal=True) + + +def rbd_exists(service, pool, rbd_img): + ''' Check to see if a RADOS block device exists ''' + try: + out = check_output(['rbd', 'list', '--id', service, + '--pool', pool]) + except CalledProcessError: + return False + else: + return rbd_img in out + + +def create_rbd_image(service, pool, image, sizemb): + ''' Create a new RADOS block device ''' + cmd = [ + 'rbd', + 'create', + image, + '--size', + str(sizemb), + '--id', + service, + '--pool', + pool + ] + check_call(cmd) + + +def pool_exists(service, name): + ''' Check to see if a RADOS pool already exists ''' + try: + out = check_output(['rados', '--id', service, 'lspools']) + except CalledProcessError: + return False + else: + return name in out + + +def get_osds(service): + ''' + Return a list of all Ceph Object Storage Daemons + currently in the cluster + ''' + version = ceph_version() + if version and version >= '0.56': + return json.loads(check_output(['ceph', '--id', service, + 'osd', 'ls', '--format=json'])) + else: + return None + + +def create_pool(service, name, replicas=3): + ''' Create a new RADOS pool ''' + if pool_exists(service, name): + log("Ceph pool {} already exists, skipping creation".format(name), + level=WARNING) + return + # Calculate the number of placement groups based + # on upstream recommended best practices. + osds = get_osds(service) + if osds: + pgnum = (len(osds) * 100 / replicas) + else: + # NOTE(james-page): Default to 200 for older ceph versions + # which don't support OSD query from cli + pgnum = 200 + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'create', + name, str(pgnum) + ] + check_call(cmd) + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'set', name, + 'size', str(replicas) + ] + check_call(cmd) + + +def delete_pool(service, name): + ''' Delete a RADOS pool from ceph ''' + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'delete', + name, '--yes-i-really-really-mean-it' + ] + check_call(cmd) + + +def _keyfile_path(service): + return KEYFILE.format(service) + + +def _keyring_path(service): + return KEYRING.format(service) + + +def create_keyring(service, key): + ''' Create a new Ceph keyring containing key''' + keyring = _keyring_path(service) + if os.path.exists(keyring): + log('ceph: Keyring exists at %s.' % keyring, level=WARNING) + return + cmd = [ + 'ceph-authtool', + keyring, + '--create-keyring', + '--name=client.{}'.format(service), + '--add-key={}'.format(key) + ] + check_call(cmd) + log('ceph: Created new ring at %s.' % keyring, level=INFO) + + +def create_key_file(service, key): + ''' Create a file containing key ''' + keyfile = _keyfile_path(service) + if os.path.exists(keyfile): + log('ceph: Keyfile exists at %s.' % keyfile, level=WARNING) + return + with open(keyfile, 'w') as fd: + fd.write(key) + log('ceph: Created new keyfile at %s.' % keyfile, level=INFO) + + +def get_ceph_nodes(): + ''' Query named relation 'ceph' to detemine current nodes ''' + hosts = [] + for r_id in relation_ids('ceph'): + for unit in related_units(r_id): + hosts.append(relation_get('private-address', unit=unit, rid=r_id)) + return hosts + + +def configure(service, key, auth, use_syslog): + ''' Perform basic configuration of Ceph ''' + create_keyring(service, key) + create_key_file(service, key) + hosts = get_ceph_nodes() + with open('/etc/ceph/ceph.conf', 'w') as ceph_conf: + ceph_conf.write(CEPH_CONF.format(auth=auth, + keyring=_keyring_path(service), + mon_hosts=",".join(map(str, hosts)), + use_syslog=use_syslog)) + modprobe('rbd') + + +def image_mapped(name): + ''' Determine whether a RADOS block device is mapped locally ''' + try: + out = check_output(['rbd', 'showmapped']) + except CalledProcessError: + return False + else: + return name in out + + +def map_block_storage(service, pool, image): + ''' Map a RADOS block device for local use ''' + cmd = [ + 'rbd', + 'map', + '{}/{}'.format(pool, image), + '--user', + service, + '--secret', + _keyfile_path(service), + ] + check_call(cmd) + + +def filesystem_mounted(fs): + ''' Determine whether a filesytems is already mounted ''' + return fs in [f for f, m in mounts()] + + +def make_filesystem(blk_device, fstype='ext4', timeout=10): + ''' Make a new filesystem on the specified block device ''' + count = 0 + e_noent = os.errno.ENOENT + while not os.path.exists(blk_device): + if count >= timeout: + log('ceph: gave up waiting on block device %s' % blk_device, + level=ERROR) + raise IOError(e_noent, os.strerror(e_noent), blk_device) + log('ceph: waiting for block device %s to appear' % blk_device, + level=INFO) + count += 1 + time.sleep(1) + else: + log('ceph: Formatting block device %s as filesystem %s.' % + (blk_device, fstype), level=INFO) + check_call(['mkfs', '-t', fstype, blk_device]) + + +def place_data_on_block_device(blk_device, data_src_dst): + ''' Migrate data in data_src_dst to blk_device and then remount ''' + # mount block device into /mnt + mount(blk_device, '/mnt') + # copy data to /mnt + copy_files(data_src_dst, '/mnt') + # umount block device + umount('/mnt') + # Grab user/group ID's from original source + _dir = os.stat(data_src_dst) + uid = _dir.st_uid + gid = _dir.st_gid + # re-mount where the data should originally be + # TODO: persist is currently a NO-OP in core.host + mount(blk_device, data_src_dst, persist=True) + # ensure original ownership of new mount. + os.chown(data_src_dst, uid, gid) + + +# TODO: re-use +def modprobe(module): + ''' Load a kernel module and configure for auto-load on reboot ''' + log('ceph: Loading kernel module', level=INFO) + cmd = ['modprobe', module] + check_call(cmd) + with open('/etc/modules', 'r+') as modules: + if module not in modules.read(): + modules.write(module) + + +def copy_files(src, dst, symlinks=False, ignore=None): + ''' Copy files from src to dst ''' + for item in os.listdir(src): + s = os.path.join(src, item) + d = os.path.join(dst, item) + if os.path.isdir(s): + shutil.copytree(s, d, symlinks, ignore) + else: + shutil.copy2(s, d) + + +def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point, + blk_device, fstype, system_services=[], + replicas=3): + """ + NOTE: This function must only be called from a single service unit for + the same rbd_img otherwise data loss will occur. + + Ensures given pool and RBD image exists, is mapped to a block device, + and the device is formatted and mounted at the given mount_point. + + If formatting a device for the first time, data existing at mount_point + will be migrated to the RBD device before being re-mounted. + + All services listed in system_services will be stopped prior to data + migration and restarted when complete. + """ + # Ensure pool, RBD image, RBD mappings are in place. + if not pool_exists(service, pool): + log('ceph: Creating new pool {}.'.format(pool)) + create_pool(service, pool, replicas=replicas) + + if not rbd_exists(service, pool, rbd_img): + log('ceph: Creating RBD image ({}).'.format(rbd_img)) + create_rbd_image(service, pool, rbd_img, sizemb) + + if not image_mapped(rbd_img): + log('ceph: Mapping RBD Image {} as a Block Device.'.format(rbd_img)) + map_block_storage(service, pool, rbd_img) + + # make file system + # TODO: What happens if for whatever reason this is run again and + # the data is already in the rbd device and/or is mounted?? + # When it is mounted already, it will fail to make the fs + # XXX: This is really sketchy! Need to at least add an fstab entry + # otherwise this hook will blow away existing data if its executed + # after a reboot. + if not filesystem_mounted(mount_point): + make_filesystem(blk_device, fstype) + + for svc in system_services: + if service_running(svc): + log('ceph: Stopping services {} prior to migrating data.' + .format(svc)) + service_stop(svc) + + place_data_on_block_device(blk_device, mount_point) + + for svc in system_services: + log('ceph: Starting service {} after migrating data.' + .format(svc)) + service_start(svc) + + +def ensure_ceph_keyring(service, user=None, group=None): + ''' + Ensures a ceph keyring is created for a named service + and optionally ensures user and group ownership. + + Returns False if no ceph key is available in relation state. + ''' + key = None + for rid in relation_ids('ceph'): + for unit in related_units(rid): + key = relation_get('key', rid=rid, unit=unit) + if key: + break + if not key: + return False + create_keyring(service=service, key=key) + keyring = _keyring_path(service) + if user and group: + check_call(['chown', '%s.%s' % (user, group), keyring]) + return True + + +def ceph_version(): + ''' Retrieve the local version of ceph ''' + if os.path.exists('/usr/bin/ceph'): + cmd = ['ceph', '-v'] + output = check_output(cmd) + output = output.split() + if len(output) > 3: + return output[2] + else: + return None + else: + return None diff --git a/hooks/charmhelpers/core/services/__init__.py b/hooks/charmhelpers/core/services/__init__.py index e8039a84..69dde79a 100644 --- a/hooks/charmhelpers/core/services/__init__.py +++ b/hooks/charmhelpers/core/services/__init__.py @@ -1,2 +1,2 @@ -from .base import * -from .helpers import * +from .base import * # NOQA +from .helpers import * # NOQA diff --git a/hooks/charmhelpers/fetch/__init__.py b/hooks/charmhelpers/fetch/__init__.py index 6724d293..2398e8ed 100644 --- a/hooks/charmhelpers/fetch/__init__.py +++ b/hooks/charmhelpers/fetch/__init__.py @@ -256,7 +256,7 @@ def add_source(source, key=None): elif source == 'distro': pass else: - raise SourceConfigError("Unknown source: {!r}".format(source)) + log("Unknown source: {!r}".format(source)) if key: if '-----BEGIN PGP PUBLIC KEY BLOCK-----' in key: diff --git a/hooks/client-relation-changed b/hooks/client-relation-changed new file mode 120000 index 00000000..9416ca6a --- /dev/null +++ b/hooks/client-relation-changed @@ -0,0 +1 @@ +hooks.py \ No newline at end of file diff --git a/hooks/hooks.py b/hooks/hooks.py index 8a6c26c8..301a87ad 100755 --- a/hooks/hooks.py +++ b/hooks/hooks.py @@ -9,6 +9,7 @@ # import glob +import json import os import shutil import sys @@ -50,6 +51,9 @@ from utils import ( get_public_addr, assert_charm_supports_ipv6 ) +from ceph_broker import ( + process_requests +) hooks = Hooks() @@ -215,7 +219,7 @@ def notify_radosgws(): def notify_client(): for relid in relation_ids('client'): - client_relation(relid) + client_relation_joined(relid) def upgrade_keys(): @@ -266,28 +270,47 @@ def radosgw_relation(relid=None): @hooks.hook('client-relation-joined') -def client_relation(relid=None): +def client_relation_joined(relid=None): if ceph.is_quorum(): log('mon cluster in quorum - providing client with keys') service_name = None if relid is None: - service_name = remote_unit().split('/')[0] + units = [remote_unit()] + service_name = units[0].split('/')[0] else: units = related_units(relid) if len(units) > 0: service_name = units[0].split('/')[0] + if service_name is not None: - data = { - 'key': ceph.get_named_key(service_name), - 'auth': config('auth-supported'), - 'ceph-public-address': get_public_addr(), - } + data = {'key': ceph.get_named_key(service_name), + 'auth': config('auth-supported'), + 'ceph-public-address': get_public_addr()} relation_set(relation_id=relid, relation_settings=data) + + client_relation_changed(relid=relid) else: log('mon cluster not in quorum - deferring key provision') +@hooks.hook('client-relation-changed') +def client_relation_changed(relid=None): + if ceph.is_quorum(): + resp = None + settings = relation_get(rid=relid) + if 'broker_req' in settings: + req = settings['broker_req'] + log("Broker request received") + resp = process_requests(json.loads(req)) + + if resp is not None: + relation_set(relation_id=relid, + relation_settings={'broker_rsp': resp}) + else: + log('mon cluster not in quorum') + + @hooks.hook('upgrade-charm') def upgrade_charm(): emit_cephconf() diff --git a/unit_tests/test_ceph_broker.py b/unit_tests/test_ceph_broker.py new file mode 100644 index 00000000..faca4587 --- /dev/null +++ b/unit_tests/test_ceph_broker.py @@ -0,0 +1,11 @@ +#import mock +import unittest + + +class CephBrokerTestCase(unittest.TestCase): + + def setUp(self): + super(CephBrokerTestCase, self).setUp() + + def test_process_requests(self): + pass