From 6f52b8a5416900d6c9ba501769920c2b3fb4bbe5 Mon Sep 17 00:00:00 2001 From: Edward Hope-Morley Date: Fri, 7 Nov 2014 14:16:26 +0100 Subject: [PATCH 1/9] Added Ceph broker support to allow clients to request resources e.g. pools as opposed to creating them themselves. This hopefully simplifies the logic at the client side and reduces the risk of race conditions by shifting execution to the ceph charm itself. Backwards-compatibility with clients that don't want to/yet support this approach is maintained. --- Makefile | 6 +- charm-helpers-hooks.yaml | 1 + hooks/ceph_broker.py | 47 +++ hooks/charmhelpers/contrib/network/ip.py | 2 - .../contrib/storage/linux/ceph.py | 388 ++++++++++++++++++ hooks/charmhelpers/core/services/__init__.py | 4 +- hooks/charmhelpers/fetch/__init__.py | 2 +- hooks/client-relation-changed | 1 + hooks/hooks.py | 39 +- unit_tests/test_ceph_broker.py | 11 + 10 files changed, 487 insertions(+), 14 deletions(-) create mode 100644 hooks/ceph_broker.py create mode 100644 hooks/charmhelpers/contrib/storage/linux/ceph.py create mode 120000 hooks/client-relation-changed create mode 100644 unit_tests/test_ceph_broker.py diff --git a/Makefile b/Makefile index 70e95fb..e29ab2b 100644 --- a/Makefile +++ b/Makefile @@ -2,9 +2,13 @@ PYTHON := /usr/bin/env python lint: - @flake8 --exclude hooks/charmhelpers hooks tests + @flake8 --exclude hooks/charmhelpers hooks tests unit_tests @charm proof +unit_test: + @echo Starting unit tests... + @$(PYTHON) /usr/bin/nosetests --nologcapture --with-coverage unit_tests + test: @echo Starting Amulet tests... # coreycb note: The -v should only be temporary until Amulet sends diff --git a/charm-helpers-hooks.yaml b/charm-helpers-hooks.yaml index afb9e42..c401e72 100644 --- a/charm-helpers-hooks.yaml +++ b/charm-helpers-hooks.yaml @@ -5,6 +5,7 @@ include: - fetch - contrib.storage.linux: - utils + - ceph - payload.execd - contrib.openstack.alternatives - contrib.network.ip diff --git a/hooks/ceph_broker.py b/hooks/ceph_broker.py new file mode 100644 index 0000000..cf1df89 --- /dev/null +++ b/hooks/ceph_broker.py @@ -0,0 +1,47 @@ +#!/usr/bin/python +# +# Copyright 2014 Canonical Ltd. +# +from charmhelpers.core.hookenv import ( + log, + INFO, + ERROR +) +from charmhelpers.contrib.storage.linux.ceph import ( + create_pool, + pool_exists, + ensure_ceph_keyring +) + + +def process_requests(reqs): + """Process a Ceph broker request from a ceph client.""" + log("Processing %s ceph broker requests" % (len(reqs)), level=INFO) + for req in reqs: + op = req.get('op') + log("Processing op='%s'" % (op), level=INFO) + # Use admin client since we do not have other client key locations + # setup to use them for these operations. + svc = 'admin' + if op == "create_pool": + pool = req.get('pool') + replicas = req.get('replicas') + if not all([pool, replicas]): + log("Missing parameter(s)", level=ERROR) + return 1 + + if not pool_exists(service=svc, name=pool): + log("Creating pool '%s'" % (pool), level=INFO) + create_pool(service=svc, name=pool, replicas=replicas) + else: + log("Pool '%s' already exists" % (pool), level=INFO) + elif op == "create_keyring": + user = req.get('user') + group = req.get('group') + if not all([user, group]): + log("Missing parameter(s)", level=ERROR) + return 1 + + ensure_ceph_keyring(service=svc, user=user, group=group) + + return 0 diff --git a/hooks/charmhelpers/contrib/network/ip.py b/hooks/charmhelpers/contrib/network/ip.py index e62e565..c4bfead 100644 --- a/hooks/charmhelpers/contrib/network/ip.py +++ b/hooks/charmhelpers/contrib/network/ip.py @@ -8,7 +8,6 @@ from functools import partial from charmhelpers.core.hookenv import unit_get from charmhelpers.fetch import apt_install from charmhelpers.core.hookenv import ( - WARNING, ERROR, log ) @@ -175,7 +174,6 @@ def format_ipv6_addr(address): if is_ipv6(address): address = "[%s]" % address else: - log("Not a valid ipv6 address: %s" % address, level=WARNING) address = None return address diff --git a/hooks/charmhelpers/contrib/storage/linux/ceph.py b/hooks/charmhelpers/contrib/storage/linux/ceph.py new file mode 100644 index 0000000..598ec26 --- /dev/null +++ b/hooks/charmhelpers/contrib/storage/linux/ceph.py @@ -0,0 +1,388 @@ +# +# Copyright 2012 Canonical Ltd. +# +# This file is sourced from lp:openstack-charm-helpers +# +# Authors: +# James Page +# Adam Gandelman +# + +import os +import shutil +import json +import time + +from subprocess import ( + check_call, + check_output, + CalledProcessError +) + +from charmhelpers.core.hookenv import ( + relation_get, + relation_ids, + related_units, + log, + INFO, + WARNING, + ERROR +) + +from charmhelpers.core.host import ( + mount, + mounts, + service_start, + service_stop, + service_running, + umount, +) + +from charmhelpers.fetch import ( + apt_install, +) + +KEYRING = '/etc/ceph/ceph.client.{}.keyring' +KEYFILE = '/etc/ceph/ceph.client.{}.key' + +CEPH_CONF = """[global] + auth supported = {auth} + keyring = {keyring} + mon host = {mon_hosts} + log to syslog = {use_syslog} + err to syslog = {use_syslog} + clog to syslog = {use_syslog} +""" + + +def install(): + ''' Basic Ceph client installation ''' + ceph_dir = "/etc/ceph" + if not os.path.exists(ceph_dir): + os.mkdir(ceph_dir) + apt_install('ceph-common', fatal=True) + + +def rbd_exists(service, pool, rbd_img): + ''' Check to see if a RADOS block device exists ''' + try: + out = check_output(['rbd', 'list', '--id', service, + '--pool', pool]) + except CalledProcessError: + return False + else: + return rbd_img in out + + +def create_rbd_image(service, pool, image, sizemb): + ''' Create a new RADOS block device ''' + cmd = [ + 'rbd', + 'create', + image, + '--size', + str(sizemb), + '--id', + service, + '--pool', + pool + ] + check_call(cmd) + + +def pool_exists(service, name): + ''' Check to see if a RADOS pool already exists ''' + try: + out = check_output(['rados', '--id', service, 'lspools']) + except CalledProcessError: + return False + else: + return name in out + + +def get_osds(service): + ''' + Return a list of all Ceph Object Storage Daemons + currently in the cluster + ''' + version = ceph_version() + if version and version >= '0.56': + return json.loads(check_output(['ceph', '--id', service, + 'osd', 'ls', '--format=json'])) + else: + return None + + +def create_pool(service, name, replicas=3): + ''' Create a new RADOS pool ''' + if pool_exists(service, name): + log("Ceph pool {} already exists, skipping creation".format(name), + level=WARNING) + return + # Calculate the number of placement groups based + # on upstream recommended best practices. + osds = get_osds(service) + if osds: + pgnum = (len(osds) * 100 / replicas) + else: + # NOTE(james-page): Default to 200 for older ceph versions + # which don't support OSD query from cli + pgnum = 200 + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'create', + name, str(pgnum) + ] + check_call(cmd) + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'set', name, + 'size', str(replicas) + ] + check_call(cmd) + + +def delete_pool(service, name): + ''' Delete a RADOS pool from ceph ''' + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'delete', + name, '--yes-i-really-really-mean-it' + ] + check_call(cmd) + + +def _keyfile_path(service): + return KEYFILE.format(service) + + +def _keyring_path(service): + return KEYRING.format(service) + + +def create_keyring(service, key): + ''' Create a new Ceph keyring containing key''' + keyring = _keyring_path(service) + if os.path.exists(keyring): + log('ceph: Keyring exists at %s.' % keyring, level=WARNING) + return + cmd = [ + 'ceph-authtool', + keyring, + '--create-keyring', + '--name=client.{}'.format(service), + '--add-key={}'.format(key) + ] + check_call(cmd) + log('ceph: Created new ring at %s.' % keyring, level=INFO) + + +def create_key_file(service, key): + ''' Create a file containing key ''' + keyfile = _keyfile_path(service) + if os.path.exists(keyfile): + log('ceph: Keyfile exists at %s.' % keyfile, level=WARNING) + return + with open(keyfile, 'w') as fd: + fd.write(key) + log('ceph: Created new keyfile at %s.' % keyfile, level=INFO) + + +def get_ceph_nodes(): + ''' Query named relation 'ceph' to detemine current nodes ''' + hosts = [] + for r_id in relation_ids('ceph'): + for unit in related_units(r_id): + hosts.append(relation_get('private-address', unit=unit, rid=r_id)) + return hosts + + +def configure(service, key, auth, use_syslog): + ''' Perform basic configuration of Ceph ''' + create_keyring(service, key) + create_key_file(service, key) + hosts = get_ceph_nodes() + with open('/etc/ceph/ceph.conf', 'w') as ceph_conf: + ceph_conf.write(CEPH_CONF.format(auth=auth, + keyring=_keyring_path(service), + mon_hosts=",".join(map(str, hosts)), + use_syslog=use_syslog)) + modprobe('rbd') + + +def image_mapped(name): + ''' Determine whether a RADOS block device is mapped locally ''' + try: + out = check_output(['rbd', 'showmapped']) + except CalledProcessError: + return False + else: + return name in out + + +def map_block_storage(service, pool, image): + ''' Map a RADOS block device for local use ''' + cmd = [ + 'rbd', + 'map', + '{}/{}'.format(pool, image), + '--user', + service, + '--secret', + _keyfile_path(service), + ] + check_call(cmd) + + +def filesystem_mounted(fs): + ''' Determine whether a filesytems is already mounted ''' + return fs in [f for f, m in mounts()] + + +def make_filesystem(blk_device, fstype='ext4', timeout=10): + ''' Make a new filesystem on the specified block device ''' + count = 0 + e_noent = os.errno.ENOENT + while not os.path.exists(blk_device): + if count >= timeout: + log('ceph: gave up waiting on block device %s' % blk_device, + level=ERROR) + raise IOError(e_noent, os.strerror(e_noent), blk_device) + log('ceph: waiting for block device %s to appear' % blk_device, + level=INFO) + count += 1 + time.sleep(1) + else: + log('ceph: Formatting block device %s as filesystem %s.' % + (blk_device, fstype), level=INFO) + check_call(['mkfs', '-t', fstype, blk_device]) + + +def place_data_on_block_device(blk_device, data_src_dst): + ''' Migrate data in data_src_dst to blk_device and then remount ''' + # mount block device into /mnt + mount(blk_device, '/mnt') + # copy data to /mnt + copy_files(data_src_dst, '/mnt') + # umount block device + umount('/mnt') + # Grab user/group ID's from original source + _dir = os.stat(data_src_dst) + uid = _dir.st_uid + gid = _dir.st_gid + # re-mount where the data should originally be + # TODO: persist is currently a NO-OP in core.host + mount(blk_device, data_src_dst, persist=True) + # ensure original ownership of new mount. + os.chown(data_src_dst, uid, gid) + + +# TODO: re-use +def modprobe(module): + ''' Load a kernel module and configure for auto-load on reboot ''' + log('ceph: Loading kernel module', level=INFO) + cmd = ['modprobe', module] + check_call(cmd) + with open('/etc/modules', 'r+') as modules: + if module not in modules.read(): + modules.write(module) + + +def copy_files(src, dst, symlinks=False, ignore=None): + ''' Copy files from src to dst ''' + for item in os.listdir(src): + s = os.path.join(src, item) + d = os.path.join(dst, item) + if os.path.isdir(s): + shutil.copytree(s, d, symlinks, ignore) + else: + shutil.copy2(s, d) + + +def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point, + blk_device, fstype, system_services=[], + replicas=3): + """ + NOTE: This function must only be called from a single service unit for + the same rbd_img otherwise data loss will occur. + + Ensures given pool and RBD image exists, is mapped to a block device, + and the device is formatted and mounted at the given mount_point. + + If formatting a device for the first time, data existing at mount_point + will be migrated to the RBD device before being re-mounted. + + All services listed in system_services will be stopped prior to data + migration and restarted when complete. + """ + # Ensure pool, RBD image, RBD mappings are in place. + if not pool_exists(service, pool): + log('ceph: Creating new pool {}.'.format(pool)) + create_pool(service, pool, replicas=replicas) + + if not rbd_exists(service, pool, rbd_img): + log('ceph: Creating RBD image ({}).'.format(rbd_img)) + create_rbd_image(service, pool, rbd_img, sizemb) + + if not image_mapped(rbd_img): + log('ceph: Mapping RBD Image {} as a Block Device.'.format(rbd_img)) + map_block_storage(service, pool, rbd_img) + + # make file system + # TODO: What happens if for whatever reason this is run again and + # the data is already in the rbd device and/or is mounted?? + # When it is mounted already, it will fail to make the fs + # XXX: This is really sketchy! Need to at least add an fstab entry + # otherwise this hook will blow away existing data if its executed + # after a reboot. + if not filesystem_mounted(mount_point): + make_filesystem(blk_device, fstype) + + for svc in system_services: + if service_running(svc): + log('ceph: Stopping services {} prior to migrating data.' + .format(svc)) + service_stop(svc) + + place_data_on_block_device(blk_device, mount_point) + + for svc in system_services: + log('ceph: Starting service {} after migrating data.' + .format(svc)) + service_start(svc) + + +def ensure_ceph_keyring(service, user=None, group=None): + ''' + Ensures a ceph keyring is created for a named service + and optionally ensures user and group ownership. + + Returns False if no ceph key is available in relation state. + ''' + key = None + for rid in relation_ids('ceph'): + for unit in related_units(rid): + key = relation_get('key', rid=rid, unit=unit) + if key: + break + if not key: + return False + create_keyring(service=service, key=key) + keyring = _keyring_path(service) + if user and group: + check_call(['chown', '%s.%s' % (user, group), keyring]) + return True + + +def ceph_version(): + ''' Retrieve the local version of ceph ''' + if os.path.exists('/usr/bin/ceph'): + cmd = ['ceph', '-v'] + output = check_output(cmd) + output = output.split() + if len(output) > 3: + return output[2] + else: + return None + else: + return None diff --git a/hooks/charmhelpers/core/services/__init__.py b/hooks/charmhelpers/core/services/__init__.py index e8039a8..69dde79 100644 --- a/hooks/charmhelpers/core/services/__init__.py +++ b/hooks/charmhelpers/core/services/__init__.py @@ -1,2 +1,2 @@ -from .base import * -from .helpers import * +from .base import * # NOQA +from .helpers import * # NOQA diff --git a/hooks/charmhelpers/fetch/__init__.py b/hooks/charmhelpers/fetch/__init__.py index 6724d29..2398e8e 100644 --- a/hooks/charmhelpers/fetch/__init__.py +++ b/hooks/charmhelpers/fetch/__init__.py @@ -256,7 +256,7 @@ def add_source(source, key=None): elif source == 'distro': pass else: - raise SourceConfigError("Unknown source: {!r}".format(source)) + log("Unknown source: {!r}".format(source)) if key: if '-----BEGIN PGP PUBLIC KEY BLOCK-----' in key: diff --git a/hooks/client-relation-changed b/hooks/client-relation-changed new file mode 120000 index 0000000..9416ca6 --- /dev/null +++ b/hooks/client-relation-changed @@ -0,0 +1 @@ +hooks.py \ No newline at end of file diff --git a/hooks/hooks.py b/hooks/hooks.py index 8a6c26c..301a87a 100755 --- a/hooks/hooks.py +++ b/hooks/hooks.py @@ -9,6 +9,7 @@ # import glob +import json import os import shutil import sys @@ -50,6 +51,9 @@ from utils import ( get_public_addr, assert_charm_supports_ipv6 ) +from ceph_broker import ( + process_requests +) hooks = Hooks() @@ -215,7 +219,7 @@ def notify_radosgws(): def notify_client(): for relid in relation_ids('client'): - client_relation(relid) + client_relation_joined(relid) def upgrade_keys(): @@ -266,28 +270,47 @@ def radosgw_relation(relid=None): @hooks.hook('client-relation-joined') -def client_relation(relid=None): +def client_relation_joined(relid=None): if ceph.is_quorum(): log('mon cluster in quorum - providing client with keys') service_name = None if relid is None: - service_name = remote_unit().split('/')[0] + units = [remote_unit()] + service_name = units[0].split('/')[0] else: units = related_units(relid) if len(units) > 0: service_name = units[0].split('/')[0] + if service_name is not None: - data = { - 'key': ceph.get_named_key(service_name), - 'auth': config('auth-supported'), - 'ceph-public-address': get_public_addr(), - } + data = {'key': ceph.get_named_key(service_name), + 'auth': config('auth-supported'), + 'ceph-public-address': get_public_addr()} relation_set(relation_id=relid, relation_settings=data) + + client_relation_changed(relid=relid) else: log('mon cluster not in quorum - deferring key provision') +@hooks.hook('client-relation-changed') +def client_relation_changed(relid=None): + if ceph.is_quorum(): + resp = None + settings = relation_get(rid=relid) + if 'broker_req' in settings: + req = settings['broker_req'] + log("Broker request received") + resp = process_requests(json.loads(req)) + + if resp is not None: + relation_set(relation_id=relid, + relation_settings={'broker_rsp': resp}) + else: + log('mon cluster not in quorum') + + @hooks.hook('upgrade-charm') def upgrade_charm(): emit_cephconf() diff --git a/unit_tests/test_ceph_broker.py b/unit_tests/test_ceph_broker.py new file mode 100644 index 0000000..faca458 --- /dev/null +++ b/unit_tests/test_ceph_broker.py @@ -0,0 +1,11 @@ +#import mock +import unittest + + +class CephBrokerTestCase(unittest.TestCase): + + def setUp(self): + super(CephBrokerTestCase, self).setUp() + + def test_process_requests(self): + pass From a4988b051b0bdcf70760b203208a4a5193252abb Mon Sep 17 00:00:00 2001 From: Edward Hope-Morley Date: Fri, 7 Nov 2014 15:26:58 +0100 Subject: [PATCH 2/9] client still has to do keyring create --- hooks/ceph_broker.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/hooks/ceph_broker.py b/hooks/ceph_broker.py index cf1df89..280f05f 100644 --- a/hooks/ceph_broker.py +++ b/hooks/ceph_broker.py @@ -9,8 +9,7 @@ from charmhelpers.core.hookenv import ( ) from charmhelpers.contrib.storage.linux.ceph import ( create_pool, - pool_exists, - ensure_ceph_keyring + pool_exists ) @@ -35,13 +34,5 @@ def process_requests(reqs): create_pool(service=svc, name=pool, replicas=replicas) else: log("Pool '%s' already exists" % (pool), level=INFO) - elif op == "create_keyring": - user = req.get('user') - group = req.get('group') - if not all([user, group]): - log("Missing parameter(s)", level=ERROR) - return 1 - - ensure_ceph_keyring(service=svc, user=user, group=group) return 0 From c0c2512df6795a69e8c5b6e9b0df3bcf483393ba Mon Sep 17 00:00:00 2001 From: Edward Hope-Morley Date: Sat, 8 Nov 2014 20:15:17 +0000 Subject: [PATCH 3/9] added unit tests --- hooks/__init__.py | 0 hooks/ceph_broker.py | 11 ++++++--- hooks/hooks.py | 17 +++++++------- unit_tests/__init__.py | 2 ++ unit_tests/test_ceph_broker.py | 41 +++++++++++++++++++++++++++++++--- 5 files changed, 57 insertions(+), 14 deletions(-) create mode 100644 hooks/__init__.py create mode 100644 unit_tests/__init__.py diff --git a/hooks/__init__.py b/hooks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hooks/ceph_broker.py b/hooks/ceph_broker.py index 280f05f..ccbc2ad 100644 --- a/hooks/ceph_broker.py +++ b/hooks/ceph_broker.py @@ -23,16 +23,21 @@ def process_requests(reqs): # setup to use them for these operations. svc = 'admin' if op == "create_pool": - pool = req.get('pool') + pool = req.get('name') replicas = req.get('replicas') if not all([pool, replicas]): log("Missing parameter(s)", level=ERROR) return 1 if not pool_exists(service=svc, name=pool): - log("Creating pool '%s'" % (pool), level=INFO) + log("Creating pool '%s' (replicas=%s)" % (pool, replicas), + level=INFO) create_pool(service=svc, name=pool, replicas=replicas) else: - log("Pool '%s' already exists" % (pool), level=INFO) + log("Pool '%s' already exists - skipping create" % (pool), + level=INFO) + else: + log("Unknown operation '%s'" % (op)) + return 1 return 0 diff --git a/hooks/hooks.py b/hooks/hooks.py index 301a87a..11aa5ed 100755 --- a/hooks/hooks.py +++ b/hooks/hooks.py @@ -296,17 +296,18 @@ def client_relation_joined(relid=None): @hooks.hook('client-relation-changed') def client_relation_changed(relid=None): - if ceph.is_quorum(): - resp = None + """Process broker requests from ceph client relations.""" + if ceph.is_quorum() and ceph.is_leader(): settings = relation_get(rid=relid) if 'broker_req' in settings: req = settings['broker_req'] - log("Broker request received") - resp = process_requests(json.loads(req)) - - if resp is not None: - relation_set(relation_id=relid, - relation_settings={'broker_rsp': resp}) + log("Broker request received from ceph client") + exit_code = process_requests(json.loads(req)) + # Construct JSON response dict allowing other data to be added as + # and when we need it. + resp = json.dumps({'exit_code': exit_code}) + relation_set(relation_id=relid, + relation_settings={'broker_rsp': resp}) else: log('mon cluster not in quorum') diff --git a/unit_tests/__init__.py b/unit_tests/__init__.py new file mode 100644 index 0000000..f80aab3 --- /dev/null +++ b/unit_tests/__init__.py @@ -0,0 +1,2 @@ +import sys +sys.path.append('hooks') diff --git a/unit_tests/test_ceph_broker.py b/unit_tests/test_ceph_broker.py index faca458..0d4a337 100644 --- a/unit_tests/test_ceph_broker.py +++ b/unit_tests/test_ceph_broker.py @@ -1,11 +1,46 @@ -#import mock +import mock import unittest +import ceph_broker + class CephBrokerTestCase(unittest.TestCase): def setUp(self): super(CephBrokerTestCase, self).setUp() - def test_process_requests(self): - pass + @mock.patch('ceph_broker.log') + def test_process_requests_noop(self, mock_log): + rc = ceph_broker.process_requests([{}]) + self.assertEqual(rc, 1) + + @mock.patch('ceph_broker.log') + def test_process_requests_invalid(self, mock_log): + rc = ceph_broker.process_requests([{'op': 'invalid_op'}]) + self.assertEqual(rc, 1) + + @mock.patch('ceph_broker.create_pool') + @mock.patch('ceph_broker.pool_exists') + @mock.patch('ceph_broker.log') + def test_process_requests_create_pool(self, mock_log, mock_pool_exists, + mock_create_pool): + mock_pool_exists.return_value = False + rc = ceph_broker.process_requests([{'op': 'create_pool', 'name': 'foo', + 'replicas': 3}]) + mock_pool_exists.assert_called_with(service='admin', name='foo') + mock_create_pool.assert_called_with(service='admin', name='foo', + replicas=3) + self.assertEqual(rc, 0) + + @mock.patch('ceph_broker.create_pool') + @mock.patch('ceph_broker.pool_exists') + @mock.patch('ceph_broker.log') + def test_process_requests_create_pool_exists(self, mock_log, + mock_pool_exists, + mock_create_pool): + mock_pool_exists.return_value = True + rc = ceph_broker.process_requests([{'op': 'create_pool', 'name': 'foo', + 'replicas': 3}]) + mock_pool_exists.assert_called_with(service='admin', name='foo') + self.assertFalse(mock_create_pool.called) + self.assertEqual(rc, 0) From eb436cd35f621457706c7e3cf72a92be64772484 Mon Sep 17 00:00:00 2001 From: Edward Hope-Morley Date: Sat, 8 Nov 2014 21:13:40 +0000 Subject: [PATCH 4/9] more --- hooks/ceph_broker.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/hooks/ceph_broker.py b/hooks/ceph_broker.py index ccbc2ad..dd91b05 100644 --- a/hooks/ceph_broker.py +++ b/hooks/ceph_broker.py @@ -23,12 +23,17 @@ def process_requests(reqs): # setup to use them for these operations. svc = 'admin' if op == "create_pool": - pool = req.get('name') - replicas = req.get('replicas') - if not all([pool, replicas]): - log("Missing parameter(s)", level=ERROR) + params = {'pool': req.get('name'), + 'replicas': req.get('replicas')} + if not all(params.iteritems()): + log("Missing parameter(s): %s" % + (' '.join([k for k in params.iterkeys() + if not params[k]])), + level=ERROR) return 1 + pool = params['pool'] + replicas = params['replicas'] if not pool_exists(service=svc, name=pool): log("Creating pool '%s' (replicas=%s)" % (pool, replicas), level=INFO) From e328ba40542c1fe5f866de42f9755afa4072c9e0 Mon Sep 17 00:00:00 2001 From: Edward Hope-Morley Date: Sun, 9 Nov 2014 11:54:40 +0000 Subject: [PATCH 5/9] Moved more broker code into ceph_broker Added support for versioning api Added unit tests --- hooks/ceph_broker.py | 50 ++++++++++++++++++++++++++++------ hooks/hooks.py | 25 ++++++++--------- unit_tests/test_ceph_broker.py | 46 ++++++++++++++++++++++++------- 3 files changed, 89 insertions(+), 32 deletions(-) diff --git a/hooks/ceph_broker.py b/hooks/ceph_broker.py index dd91b05..8ab77c5 100644 --- a/hooks/ceph_broker.py +++ b/hooks/ceph_broker.py @@ -2,6 +2,8 @@ # # Copyright 2014 Canonical Ltd. # +import json + from charmhelpers.core.hookenv import ( log, INFO, @@ -13,8 +15,37 @@ from charmhelpers.contrib.storage.linux.ceph import ( ) +def decode(f): + def decode_inner(req): + return json.dumps(f(json.loads(req))) + + return decode_inner + + +@decode def process_requests(reqs): - """Process a Ceph broker request from a ceph client.""" + """Process a Ceph broker request from a ceph client. + + This is a versioned api. We choose the api version based on provided + version from client. + """ + version = reqs.get('version') + if version == 1: + return process_requests_v1(reqs['ops']) + + msg = ("Missing or invalid api version (%s)" % (version)) + return {'exit_code': 1, 'stderr': msg} + + +def process_requests_v1(reqs): + """Process a v1 requests from a ceph client. + + Takes a list of requests (dicts) and processes each one until it hits an + error. + + Upon completion of all ops or if an error is found, a response dict is + returned containing exit code and any extra info. + """ log("Processing %s ceph broker requests" % (len(reqs)), level=INFO) for req in reqs: op = req.get('op') @@ -26,11 +57,11 @@ def process_requests(reqs): params = {'pool': req.get('name'), 'replicas': req.get('replicas')} if not all(params.iteritems()): - log("Missing parameter(s): %s" % - (' '.join([k for k in params.iterkeys() - if not params[k]])), - level=ERROR) - return 1 + msg = ("Missing parameter(s): %s" % + (' '.join([k for k in params.iterkeys() + if not params[k]]))) + log(msg, level=ERROR) + return {'exit_code': 1, 'stderr': msg} pool = params['pool'] replicas = params['replicas'] @@ -42,7 +73,8 @@ def process_requests(reqs): log("Pool '%s' already exists - skipping create" % (pool), level=INFO) else: - log("Unknown operation '%s'" % (op)) - return 1 + msg = "Unknown operation '%s'" % (op) + log(msg, level=ERROR) + return {'exit_code': 1, 'stderr': msg} - return 0 + return {'exit_code': 0} diff --git a/hooks/hooks.py b/hooks/hooks.py index 11aa5ed..ed3da21 100755 --- a/hooks/hooks.py +++ b/hooks/hooks.py @@ -9,14 +9,15 @@ # import glob -import json import os import shutil import sys import ceph from charmhelpers.core.hookenv import ( - log, ERROR, + log, + INFO, + ERROR, config, relation_ids, related_units, @@ -26,7 +27,6 @@ from charmhelpers.core.hookenv import ( Hooks, UnregisteredHookError, service_name ) - from charmhelpers.core.host import ( service_restart, umount, @@ -45,7 +45,6 @@ from charmhelpers.contrib.network.ip import ( get_ipv6_addr, format_ipv6_addr ) - from utils import ( render_template, get_public_addr, @@ -297,17 +296,17 @@ def client_relation_joined(relid=None): @hooks.hook('client-relation-changed') def client_relation_changed(relid=None): """Process broker requests from ceph client relations.""" - if ceph.is_quorum() and ceph.is_leader(): + if ceph.is_quorum(): settings = relation_get(rid=relid) if 'broker_req' in settings: - req = settings['broker_req'] - log("Broker request received from ceph client") - exit_code = process_requests(json.loads(req)) - # Construct JSON response dict allowing other data to be added as - # and when we need it. - resp = json.dumps({'exit_code': exit_code}) - relation_set(relation_id=relid, - relation_settings={'broker_rsp': resp}) + if not ceph.is_leader(): + log("Not leader - ignoring broker request", level=INFO) + else: + req = settings['broker_req'] + log("Broker request received from ceph client") + rsp = process_requests(req) + relation_set(relation_id=relid, + relation_settings={'broker_rsp': rsp}) else: log('mon cluster not in quorum') diff --git a/unit_tests/test_ceph_broker.py b/unit_tests/test_ceph_broker.py index 0d4a337..2d50e2a 100644 --- a/unit_tests/test_ceph_broker.py +++ b/unit_tests/test_ceph_broker.py @@ -1,3 +1,4 @@ +import json import mock import unittest @@ -11,13 +12,34 @@ class CephBrokerTestCase(unittest.TestCase): @mock.patch('ceph_broker.log') def test_process_requests_noop(self, mock_log): - rc = ceph_broker.process_requests([{}]) - self.assertEqual(rc, 1) + req = json.dumps({'version': 1, 'ops': []}) + rc = ceph_broker.process_requests(req) + self.assertEqual(json.loads(rc), {'exit_code': 0}) + + @mock.patch('ceph_broker.log') + def test_process_requests_missing_api_version(self, mock_log): + req = json.dumps({'ops': []}) + rc = ceph_broker.process_requests(req) + self.assertEqual(json.loads(rc), {'exit_code': 1, + 'stderr': + ('Missing or invalid api version ' + '(None)')}) + + @mock.patch('ceph_broker.log') + def test_process_requests_invalid_api_version(self, mock_log): + req = json.dumps({'version': 2, 'ops': []}) + rc = ceph_broker.process_requests(req) + self.assertEqual(json.loads(rc), + {'exit_code': 1, + 'stderr': 'Missing or invalid api version (2)'}) @mock.patch('ceph_broker.log') def test_process_requests_invalid(self, mock_log): - rc = ceph_broker.process_requests([{'op': 'invalid_op'}]) - self.assertEqual(rc, 1) + reqs = json.dumps({'version': 1, 'ops': [{'op': 'invalid_op'}]}) + rc = ceph_broker.process_requests(reqs) + self.assertEqual(json.loads(rc), + {'exit_code': 1, + 'stderr': "Unknown operation 'invalid_op'"}) @mock.patch('ceph_broker.create_pool') @mock.patch('ceph_broker.pool_exists') @@ -25,12 +47,14 @@ class CephBrokerTestCase(unittest.TestCase): def test_process_requests_create_pool(self, mock_log, mock_pool_exists, mock_create_pool): mock_pool_exists.return_value = False - rc = ceph_broker.process_requests([{'op': 'create_pool', 'name': 'foo', - 'replicas': 3}]) + reqs = json.dumps({'version': 1, + 'ops': [{'op': 'create_pool', 'name': + 'foo', 'replicas': 3}]}) + rc = ceph_broker.process_requests(reqs) mock_pool_exists.assert_called_with(service='admin', name='foo') mock_create_pool.assert_called_with(service='admin', name='foo', replicas=3) - self.assertEqual(rc, 0) + self.assertEqual(json.loads(rc), {'exit_code': 0}) @mock.patch('ceph_broker.create_pool') @mock.patch('ceph_broker.pool_exists') @@ -39,8 +63,10 @@ class CephBrokerTestCase(unittest.TestCase): mock_pool_exists, mock_create_pool): mock_pool_exists.return_value = True - rc = ceph_broker.process_requests([{'op': 'create_pool', 'name': 'foo', - 'replicas': 3}]) + reqs = json.dumps({'version': 1, + 'ops': [{'op': 'create_pool', 'name': 'foo', + 'replicas': 3}]}) + rc = ceph_broker.process_requests(reqs) mock_pool_exists.assert_called_with(service='admin', name='foo') self.assertFalse(mock_create_pool.called) - self.assertEqual(rc, 0) + self.assertEqual(json.loads(rc), {'exit_code': 0}) From 1e730ad6ce2a690cb4c36ce9bc888f1e28fd14f8 Mon Sep 17 00:00:00 2001 From: Edward Hope-Morley Date: Sun, 9 Nov 2014 12:58:04 +0000 Subject: [PATCH 6/9] cleanup --- hooks/ceph_broker.py | 12 ++++++------ unit_tests/test_ceph_broker.py | 26 +++++++++++++------------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/hooks/ceph_broker.py b/hooks/ceph_broker.py index 8ab77c5..adba2ce 100644 --- a/hooks/ceph_broker.py +++ b/hooks/ceph_broker.py @@ -29,12 +29,12 @@ def process_requests(reqs): This is a versioned api. We choose the api version based on provided version from client. """ - version = reqs.get('version') + version = reqs.get('api-version') if version == 1: return process_requests_v1(reqs['ops']) msg = ("Missing or invalid api version (%s)" % (version)) - return {'exit_code': 1, 'stderr': msg} + return {'exit-code': 1, 'stderr': msg} def process_requests_v1(reqs): @@ -53,7 +53,7 @@ def process_requests_v1(reqs): # Use admin client since we do not have other client key locations # setup to use them for these operations. svc = 'admin' - if op == "create_pool": + if op == "create-pool": params = {'pool': req.get('name'), 'replicas': req.get('replicas')} if not all(params.iteritems()): @@ -61,7 +61,7 @@ def process_requests_v1(reqs): (' '.join([k for k in params.iterkeys() if not params[k]]))) log(msg, level=ERROR) - return {'exit_code': 1, 'stderr': msg} + return {'exit-code': 1, 'stderr': msg} pool = params['pool'] replicas = params['replicas'] @@ -75,6 +75,6 @@ def process_requests_v1(reqs): else: msg = "Unknown operation '%s'" % (op) log(msg, level=ERROR) - return {'exit_code': 1, 'stderr': msg} + return {'exit-code': 1, 'stderr': msg} - return {'exit_code': 0} + return {'exit-code': 0} diff --git a/unit_tests/test_ceph_broker.py b/unit_tests/test_ceph_broker.py index 2d50e2a..0176d11 100644 --- a/unit_tests/test_ceph_broker.py +++ b/unit_tests/test_ceph_broker.py @@ -12,33 +12,33 @@ class CephBrokerTestCase(unittest.TestCase): @mock.patch('ceph_broker.log') def test_process_requests_noop(self, mock_log): - req = json.dumps({'version': 1, 'ops': []}) + req = json.dumps({'api-version': 1, 'ops': []}) rc = ceph_broker.process_requests(req) - self.assertEqual(json.loads(rc), {'exit_code': 0}) + self.assertEqual(json.loads(rc), {'exit-code': 0}) @mock.patch('ceph_broker.log') def test_process_requests_missing_api_version(self, mock_log): req = json.dumps({'ops': []}) rc = ceph_broker.process_requests(req) - self.assertEqual(json.loads(rc), {'exit_code': 1, + self.assertEqual(json.loads(rc), {'exit-code': 1, 'stderr': ('Missing or invalid api version ' '(None)')}) @mock.patch('ceph_broker.log') def test_process_requests_invalid_api_version(self, mock_log): - req = json.dumps({'version': 2, 'ops': []}) + req = json.dumps({'api-version': 2, 'ops': []}) rc = ceph_broker.process_requests(req) self.assertEqual(json.loads(rc), - {'exit_code': 1, + {'exit-code': 1, 'stderr': 'Missing or invalid api version (2)'}) @mock.patch('ceph_broker.log') def test_process_requests_invalid(self, mock_log): - reqs = json.dumps({'version': 1, 'ops': [{'op': 'invalid_op'}]}) + reqs = json.dumps({'api-version': 1, 'ops': [{'op': 'invalid_op'}]}) rc = ceph_broker.process_requests(reqs) self.assertEqual(json.loads(rc), - {'exit_code': 1, + {'exit-code': 1, 'stderr': "Unknown operation 'invalid_op'"}) @mock.patch('ceph_broker.create_pool') @@ -47,14 +47,14 @@ class CephBrokerTestCase(unittest.TestCase): def test_process_requests_create_pool(self, mock_log, mock_pool_exists, mock_create_pool): mock_pool_exists.return_value = False - reqs = json.dumps({'version': 1, - 'ops': [{'op': 'create_pool', 'name': + reqs = json.dumps({'api-version': 1, + 'ops': [{'op': 'create-pool', 'name': 'foo', 'replicas': 3}]}) rc = ceph_broker.process_requests(reqs) mock_pool_exists.assert_called_with(service='admin', name='foo') mock_create_pool.assert_called_with(service='admin', name='foo', replicas=3) - self.assertEqual(json.loads(rc), {'exit_code': 0}) + self.assertEqual(json.loads(rc), {'exit-code': 0}) @mock.patch('ceph_broker.create_pool') @mock.patch('ceph_broker.pool_exists') @@ -63,10 +63,10 @@ class CephBrokerTestCase(unittest.TestCase): mock_pool_exists, mock_create_pool): mock_pool_exists.return_value = True - reqs = json.dumps({'version': 1, - 'ops': [{'op': 'create_pool', 'name': 'foo', + reqs = json.dumps({'api-version': 1, + 'ops': [{'op': 'create-pool', 'name': 'foo', 'replicas': 3}]}) rc = ceph_broker.process_requests(reqs) mock_pool_exists.assert_called_with(service='admin', name='foo') self.assertFalse(mock_create_pool.called) - self.assertEqual(json.loads(rc), {'exit_code': 0}) + self.assertEqual(json.loads(rc), {'exit-code': 0}) From 4519ee42f8a6b9fff147d9803534344bd13fe72d Mon Sep 17 00:00:00 2001 From: Edward Hope-Morley Date: Sun, 9 Nov 2014 16:56:19 +0000 Subject: [PATCH 7/9] catch unexpected error and inform caller --- hooks/ceph_broker.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/hooks/ceph_broker.py b/hooks/ceph_broker.py index adba2ce..72887ac 100644 --- a/hooks/ceph_broker.py +++ b/hooks/ceph_broker.py @@ -29,9 +29,16 @@ def process_requests(reqs): This is a versioned api. We choose the api version based on provided version from client. """ - version = reqs.get('api-version') - if version == 1: - return process_requests_v1(reqs['ops']) + try: + version = reqs.get('api-version') + if version == 1: + return process_requests_v1(reqs['ops']) + except Exception as exc: + log(str(exc), level=ERROR) + msg = ("Unexpected error occurred while processing requests: %s" % + (reqs)) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} msg = ("Missing or invalid api version (%s)" % (version)) return {'exit-code': 1, 'stderr': msg} From 8ad43717b71b88dc5f9cd98dc219fbf71af1db49 Mon Sep 17 00:00:00 2001 From: Edward Hope-Morley Date: Mon, 10 Nov 2014 10:46:09 +0000 Subject: [PATCH 8/9] fixed docstring typo --- hooks/ceph_broker.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/hooks/ceph_broker.py b/hooks/ceph_broker.py index 72887ac..616c0f9 100644 --- a/hooks/ceph_broker.py +++ b/hooks/ceph_broker.py @@ -24,10 +24,10 @@ def decode(f): @decode def process_requests(reqs): - """Process a Ceph broker request from a ceph client. + """Process Ceph broker request(s). - This is a versioned api. We choose the api version based on provided - version from client. + This is a versioned api. API version must be supplied by the client making + the request. """ try: version = reqs.get('api-version') @@ -45,13 +45,13 @@ def process_requests(reqs): def process_requests_v1(reqs): - """Process a v1 requests from a ceph client. + """Process v1 requests. - Takes a list of requests (dicts) and processes each one until it hits an - error. + Takes a list of requests (dicts) and processes each one. If an error is + found, processing stops and the client is notified in the response. - Upon completion of all ops or if an error is found, a response dict is - returned containing exit code and any extra info. + Returns a response dict containing the exit code (non-zero if any + operation failed along with an explanation). """ log("Processing %s ceph broker requests" % (len(reqs)), level=INFO) for req in reqs: From b0f4eae9b36d818fcc3c35efb8ceea8785410d79 Mon Sep 17 00:00:00 2001 From: Edward Hope-Morley Date: Wed, 19 Nov 2014 15:33:12 -0600 Subject: [PATCH 9/9] fixed up log levels --- hooks/ceph_broker.py | 15 +++++++++------ hooks/hooks.py | 10 ++++------ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/hooks/ceph_broker.py b/hooks/ceph_broker.py index 616c0f9..9fced94 100644 --- a/hooks/ceph_broker.py +++ b/hooks/ceph_broker.py @@ -6,23 +6,25 @@ import json from charmhelpers.core.hookenv import ( log, + DEBUG, INFO, - ERROR + ERROR, ) from charmhelpers.contrib.storage.linux.ceph import ( create_pool, - pool_exists + pool_exists, ) -def decode(f): +def decode_req_encode_rsp(f): + """Decorator to decode incoming requests and encode responses.""" def decode_inner(req): return json.dumps(f(json.loads(req))) return decode_inner -@decode +@decode_req_encode_rsp def process_requests(reqs): """Process Ceph broker request(s). @@ -33,6 +35,7 @@ def process_requests(reqs): version = reqs.get('api-version') if version == 1: return process_requests_v1(reqs['ops']) + except Exception as exc: log(str(exc), level=ERROR) msg = ("Unexpected error occurred while processing requests: %s" % @@ -56,7 +59,7 @@ def process_requests_v1(reqs): log("Processing %s ceph broker requests" % (len(reqs)), level=INFO) for req in reqs: op = req.get('op') - log("Processing op='%s'" % (op), level=INFO) + log("Processing op='%s'" % (op), level=DEBUG) # Use admin client since we do not have other client key locations # setup to use them for these operations. svc = 'admin' @@ -78,7 +81,7 @@ def process_requests_v1(reqs): create_pool(service=svc, name=pool, replicas=replicas) else: log("Pool '%s' already exists - skipping create" % (pool), - level=INFO) + level=DEBUG) else: msg = "Unknown operation '%s'" % (op) log(msg, level=ERROR) diff --git a/hooks/hooks.py b/hooks/hooks.py index ed3da21..9014401 100755 --- a/hooks/hooks.py +++ b/hooks/hooks.py @@ -16,7 +16,7 @@ import sys import ceph from charmhelpers.core.hookenv import ( log, - INFO, + DEBUG, ERROR, config, relation_ids, @@ -300,15 +300,13 @@ def client_relation_changed(relid=None): settings = relation_get(rid=relid) if 'broker_req' in settings: if not ceph.is_leader(): - log("Not leader - ignoring broker request", level=INFO) + log("Not leader - ignoring broker request", level=DEBUG) else: - req = settings['broker_req'] - log("Broker request received from ceph client") - rsp = process_requests(req) + rsp = process_requests(settings['broker_req']) relation_set(relation_id=relid, relation_settings={'broker_rsp': rsp}) else: - log('mon cluster not in quorum') + log('mon cluster not in quorum', level=DEBUG) @hooks.hook('upgrade-charm')