Add actions to lint. Change actions.yaml to use enum and also change underscores to dashes. Log action_fail in addition to exiting -1. Merge v2 requests with v1 requests since this does not break backwards compatibility. Add unit tests. Modify tox.ini to include actions. .

This commit is contained in:
Chris Holcombe 2016-01-18 08:07:35 -08:00
parent bdd4e69e80
commit 1977cdbde1
12 changed files with 169 additions and 180 deletions

View File

@ -3,7 +3,7 @@ PYTHON := /usr/bin/env python
lint:
@flake8 --exclude hooks/charmhelpers,tests/charmhelpers \
hooks tests unit_tests
hooks tests unit_tests actions
@charm proof
test:

View File

@ -26,7 +26,6 @@ create-pool:
before calling create-pool
required: [name, pool-type]
additionalProperties: false
create-erasure-profile:
description: Create a new erasure code profile to use on a pool.
params:
@ -36,41 +35,39 @@ create-erasure-profile:
failure-domain:
type: string
default: host
enum: [chassis, datacenter, host, osd, pdu, pod, rack, region, room, root, row]
description: |
The failure-domain=host will create a CRUSH ruleset that ensures no two chunks are stored in the same host.
Other valid options include
['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row']
plugin:
type: string
default: jerasure
description: |
The erasure plugin to use for this profile.
See http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/ for more details
data_chunks:
data-chunks:
type: integer
default: 3
description: |
The number of data chunks, i.e. the number of chunks in which the original object is divided. For instance
if K = 2 a 10KB object will be divided into K objects of 5KB each.
coding_chunks:
coding-chunks:
type: integer
default: 2
description: |
The number of coding chunks, i.e. the number of additional chunks computed by the encoding functions.
If there are 2 coding chunks, it means 2 OSDs can be out without losing data.
locality_chunks:
locality-chunks:
type: integer
description: |
Group the coding and data chunks into sets of size locality. For instance, for k=4 and m=2, when locality=3
two groups of three are created. Each set can be recovered without reading chunks from another set.
durability_estimator:
durability-estimator:
type: integer
description: |
The number of parity chunks each of which includes each data chunk in its calculation range. The number is used
as a durability estimator. For instance, if c=2, 2 OSDs can be down without losing data.
required: [name, data_chunks, coding_chunks]
required: [name, data-chunks, coding-chunks]
additionalProperties: false
get-erasure-profile:
description: Display an erasure code profile.
params:
@ -79,7 +76,6 @@ get-erasure-profile:
description: The name of the profile
required: [name]
additionalProperties: false
delete-erasure-profile:
description: Deletes an erasure code profile.
params:
@ -88,15 +84,12 @@ delete-erasure-profile:
description: The name of the profile
required: [name]
additionalProperties: false
list-erasure-profiles:
description: List the names of all erasure code profiles
additionalProperties: false
list-pools:
description: List your clusters pools
additionalProperties: false
set-pool-max-bytes:
description: Set pool quotas for the maximum number of bytes.
params:
@ -108,7 +101,6 @@ set-pool-max-bytes:
description: The name of the pool
required: [pool-name, max]
additionalProperties: false
delete-pool:
description: Deletes the named pool
params:
@ -117,7 +109,6 @@ delete-pool:
description: The name of the pool
required: [pool-name]
additionalProperties: false
rename-pool:
description: Rename a pool
params:
@ -129,11 +120,9 @@ rename-pool:
description: The new name of the pool
required: [pool-name, new-name]
additionalProperties: false
pool-statistics:
description: Show a pools utilization statistics
additionalProperties: false
snapshot-pool:
description: Snapshot a pool
params:
@ -145,7 +134,6 @@ snapshot-pool:
description: The name of the snapshot
required: [snapshot-name, pool-name]
additionalProperties: false
remove-pool-snapshot:
description: Remove a pool snapshot
params:
@ -157,7 +145,6 @@ remove-pool-snapshot:
description: The name of the snapshot
required: [snapshot-name, pool-name]
additionalProperties: false
pool-set:
description: Set a value for the pool
params:
@ -172,7 +159,6 @@ pool-set:
description: The value to set
required: [key, value, pool-name]
additionalProperties: false
pool-get:
description: Get a value for the pool
params:

View File

@ -1,10 +1,11 @@
__author__ = 'chris'
from subprocess import CalledProcessError, check_output
import sys
sys.path.append('hooks')
import rados
from charmhelpers.core.hookenv import log, Hooks, action_get
from charmhelpers.core.hookenv import log, action_get, action_fail
from charmhelpers.contrib.storage.linux.ceph import pool_set, set_pool_quota, snapshot_pool, remove_pool_snapshot
# Connect to Ceph via Librados and return a connection
@ -36,8 +37,8 @@ def list_pools():
rados.ObjectNotFound,
rados.NoData,
rados.NoSpace,
rados.PermissionError):
sys.exit(-1)
rados.PermissionError) as e:
action_fail(e.message)
def pool_get():
@ -47,7 +48,7 @@ def pool_get():
value = check_output(['ceph', 'osd', 'pool', 'get', pool_name, key])
return value
except CalledProcessError as e:
sys.exit(e.returncode)
action_fail(e.message)
def set_pool():
@ -71,8 +72,8 @@ def pool_stats():
rados.ObjectNotFound,
rados.NoData,
rados.NoSpace,
rados.PermissionError):
sys.exit(-1)
rados.PermissionError) as e:
action_fail(e.message)
def delete_pool_snapshot():
@ -81,12 +82,6 @@ def delete_pool_snapshot():
remove_pool_snapshot(service='ceph', pool_name=pool_name, snapshot_name=snapshot_name)
def rename_pool():
old_name = action_get("name")
new_name = action_get("new-name")
rename_pool(service='ceph', old_name=old_name, new_name=new_name)
# Note only one or the other can be set
def set_pool_max_bytes():
pool_name = action_get("pool-name")
@ -94,7 +89,7 @@ def set_pool_max_bytes():
set_pool_quota(service='ceph', pool_name=pool_name, max_bytes=max_bytes)
def snapshot_pool():
def snapshot_ceph_pool():
pool_name = action_get("pool-name")
snapshot_name = action_get("snapshot-name")
snapshot_pool(service='ceph', pool_name=pool_name, snapshot_name=snapshot_name)

View File

@ -19,34 +19,34 @@ def make_erasure_profile():
# shec requires k+m+c
if plugin == "jerasure":
k = action_get("data_chunks")
m = action_get("coding_chunks")
k = action_get("data-chunks")
m = action_get("coding-chunks")
try:
create_erasure_profile(service='admin', erasure_plugin_name=plugin, profile_name=name, data_chunks=k,
coding_chunks=m, failure_domain=failure_domain)
except CalledProcessError as e:
log(e)
elif plugin == "isa":
k = action_get("data_chunks")
m = action_get("coding_chunks")
k = action_get("data-chunks")
m = action_get("coding-chunks")
try:
create_erasure_profile(service='admin', erasure_plugin_name=plugin, profile_name=name, data_chunks=k,
coding_chunks=m, failure_domain=failure_domain)
except CalledProcessError as e:
log(e)
elif plugin == "local":
k = action_get("data_chunks")
m = action_get("coding_chunks")
l = action_get("locality_chunks")
k = action_get("data-chunks")
m = action_get("coding-chunks")
l = action_get("locality-chunks")
try:
create_erasure_profile(service='admin', erasure_plugin_name=plugin, profile_name=name, data_chunks=k,
coding_chunks=m, locality=l, failure_domain=failure_domain)
except CalledProcessError as e:
log(e)
elif plugin == "shec":
k = action_get("data_chunks")
m = action_get("coding_chunks")
c = action_get("durability_estimator")
k = action_get("data-chunks")
m = action_get("coding-chunks")
c = action_get("durability-estimator")
try:
create_erasure_profile(service='admin', erasure_plugin_name=plugin, profile_name=name, data_chunks=k,
coding_chunks=m, durability_estimator=c, failure_domain=failure_domain)

View File

@ -3,7 +3,7 @@ import sys
sys.path.append('hooks')
from subprocess import CalledProcessError
from charmhelpers.core.hookenv import action_get, log
from charmhelpers.core.hookenv import action_get, log, action_fail
from charmhelpers.contrib.storage.linux.ceph import ErasurePool, ReplicatedPool
@ -23,6 +23,7 @@ def create_pool():
erasure_pool.create()
else:
log("Unknown pool type of {}. Only erasure or replicated is allowed".format(pool_type))
action_fail("Unknown pool type of {}. Only erasure or replicated is allowed".format(pool_type))
sys.exit(-1)
except CalledProcessError as e:
sys.exit(e.returncode)

View File

@ -4,15 +4,14 @@ import sys
sys.path.append('hooks')
from charmhelpers.contrib.storage.linux.ceph import create_erasure_profile
from charmhelpers.contrib.storage.linux.ceph import remove_erasure_profile
from charmhelpers.core.hookenv import action_get, log
def delete_erasure_profile():
name = action_get("name")
delete_erasure_profile(service='admin', erasure_plugin_name=plugin, profile_name=name, data_chunks=k,
coding_chunks=m, durability_estimator=c, failure_domain=failure_domain)
remove_erasure_profile(service='admin', profile_name=name)
if __name__ == '__main__':

View File

@ -4,9 +4,8 @@ import sys
sys.path.append('hooks')
import rados
import sys
from ceph_ops import connect
from charmhelpers.core.hookenv import action_get, log
from charmhelpers.core.hookenv import action_get, log, action_fail
def remove_pool():
@ -22,7 +21,7 @@ def remove_pool():
rados.NoSpace,
rados.PermissionError) as e:
log(e)
sys.exit(-1)
action_fail(e)
if __name__ == '__main__':

View File

@ -3,6 +3,7 @@
# Copyright 2015 Canonical Ltd.
#
import json
import six
from charmhelpers.contrib.storage.linux.ceph import validator, \
erasure_profile_exists, ErasurePool, set_pool_quota, \
@ -94,15 +95,6 @@ def process_requests(reqs):
resp['request-id'] = request_id
return resp
elif version == 2:
# Version with advanced pool support
log('Processing request {}'.format(request_id), level=DEBUG)
resp = process_requests_v2(reqs['ops'])
if request_id:
resp['request-id'] = request_id
return resp
except Exception as exc:
log(str(exc), level=ERROR)
msg = ("Unexpected error occurred while processing requests: %s" %
@ -118,69 +110,6 @@ def process_requests(reqs):
return resp
def process_requests_v2(requests):
"""Process v2 requests.
Takes a list of requests (dicts) and processes each one. If an error is
found, processing stops and the client is notified in the response.
Returns a response dict containing the exit code (non-zero if any
operation failed along with an explanation).
"""
log("Processing %s ceph broker requests" % len(requests), level=INFO)
for req in requests:
op = req.get('op')
log("Processing op='%s'" % op, level=DEBUG)
# Use admin client since we do not have other client key locations
# setup to use them for these operations.
svc = 'admin'
if op == "create-pool":
# TODO: Default to replicated
pool_type = req.get('pool-type') # "replicated" | "erasure"
if pool_type == 'erasure':
handle_erasure_pool(request=req, service=svc)
elif pool_type == 'replicated':
handle_replicated_pool(request=req, service=svc)
elif op == "create-cache-tier":
handle_create_cache_tier(request=req, service=svc)
elif op == "remove-cache-tier":
handle_remove_cache_tier(request=req, service=svc)
elif op == "create-erasure-profile":
handle_create_erasure_profile(request=req, service=svc)
elif op == "delete-pool":
pool = req.get('name')
delete_pool(service=svc, name=pool)
elif op == "rename-pool":
old_name = req.get('name')
new_name = req.get('new-name')
rename_pool(service=svc, old_name=old_name, new_name=new_name)
elif op == "snapshot-pool":
pool = req.get('name')
snapshot_name = req.get('snapshot-name')
snapshot_pool(service=svc, pool_name=pool,
snapshot_name=snapshot_name)
elif op == "remove-pool-snapshot":
pool = req.get('name')
snapshot_name = req.get('snapshot-name')
remove_pool_snapshot(service=svc, pool_name=pool,
snapshot_name=snapshot_name)
elif op == "set-pool-value":
handle_set_pool_value(request=req, service=svc)
else:
msg = "Unknown operation '%s'" % op
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
return {'exit-code': 0}
def handle_create_erasure_profile(request, service):
# "local" | "shec" or it defaults to "jerasure"
erasure_type = request.get('erasure-type')
@ -332,24 +261,39 @@ def process_requests_v1(reqs):
# setup to use them for these operations.
svc = 'admin'
if op == "create-pool":
params = {'pool': req.get('name'),
'replicas': req.get('replicas')}
if not all(params.iteritems()):
msg = ("Missing parameter(s): %s" %
(' '.join([k for k in params.iterkeys()
if not params[k]])))
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
pool_name = req.get('name')
pool_type = req.get('pool-type') # "replicated" | "erasure"
pool = params['pool']
replicas = params['replicas']
if not pool_exists(service=svc, name=pool):
log("Creating pool '%s' (replicas=%s)" % (pool, replicas),
level=INFO)
create_pool(service=svc, name=pool, replicas=replicas)
# Default to replicated if pool_type isn't given
if pool_type == 'erasure':
handle_erasure_pool(request=req, service=svc)
else:
log("Pool '%s' already exists - skipping create" % pool,
level=DEBUG)
handle_replicated_pool(request=req, service=svc)
elif op == "create-cache-tier":
handle_create_cache_tier(request=req, service=svc)
elif op == "remove-cache-tier":
handle_remove_cache_tier(request=req, service=svc)
elif op == "create-erasure-profile":
handle_create_erasure_profile(request=req, service=svc)
elif op == "delete-pool":
pool = req.get('name')
delete_pool(service=svc, name=pool)
elif op == "rename-pool":
old_name = req.get('name')
new_name = req.get('new-name')
rename_pool(service=svc, old_name=old_name, new_name=new_name)
elif op == "snapshot-pool":
pool = req.get('name')
snapshot_name = req.get('snapshot-name')
snapshot_pool(service=svc, pool_name=pool,
snapshot_name=snapshot_name)
elif op == "remove-pool-snapshot":
pool = req.get('name')
snapshot_name = req.get('snapshot-name')
remove_pool_snapshot(service=svc, pool_name=pool,
snapshot_name=snapshot_name)
elif op == "set-pool-value":
handle_set_pool_value(request=req, service=svc)
else:
msg = "Unknown operation '%s'" % op
log(msg, level=ERROR)

View File

@ -61,7 +61,7 @@ from charmhelpers.fetch import (
apt_install,
)
from charmhelpers.core.kernel import modprobe
# from charmhelpers.core.kernel import modprobe
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
KEYFILE = '/etc/ceph/ceph.client.{}.key'
@ -120,6 +120,7 @@ class PoolCreationError(Exception):
"""
A custom error to inform the caller that a pool creation failed. Provides an error message
"""
def __init__(self, message):
super(PoolCreationError, self).__init__(message)
@ -129,6 +130,7 @@ class Pool(object):
An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool.
Do not call create() on this base class as it will not do anything. Instantiate a child class and call create().
"""
def __init__(self, service, name):
self.service = service
self.name = name
@ -343,6 +345,22 @@ def remove_pool_quota(service, pool_name):
raise
def remove_erasure_profile(service, profile_name):
"""
Create a new erasure code profile if one does not already exist for it. Updates
the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
for more details
:param service: six.string_types. The Ceph user name to run the command under
:param profile_name: six.string_types
:return: None. Can raise CalledProcessError
"""
cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm', profile_name]
try:
check_call(cmd)
except CalledProcessError:
raise
def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure', failure_domain='host',
data_chunks=2, coding_chunks=1,
locality=None, durability_estimator=None):
@ -596,7 +614,7 @@ def configure(service, key, auth, use_syslog):
keyring=_keyring_path(service),
mon_hosts=",".join(map(str, hosts)),
use_syslog=use_syslog))
modprobe('rbd')
# modprobe('rbd')
def image_mapped(name):

View File

@ -18,7 +18,7 @@ deps = -r{toxinidir}/requirements.txt
basepython = python2.7
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = flake8 {posargs} hooks unit_tests tests
commands = flake8 {posargs} hooks unit_tests tests actions
charm proof
[testenv:venv]

View File

@ -1,12 +1,12 @@
import json
import mock
import unittest
import mock
import ceph_broker
class CephBrokerTestCase(unittest.TestCase):
def setUp(self):
super(CephBrokerTestCase, self).setUp()
@ -22,13 +22,14 @@ class CephBrokerTestCase(unittest.TestCase):
rc = ceph_broker.process_requests(req)
self.assertEqual(json.loads(rc), {'exit-code': 1,
'stderr':
('Missing or invalid api version '
'(None)')})
('Missing or invalid api version '
'(None)')})
@mock.patch('ceph_broker.log')
def test_process_requests_invalid_api_version(self, mock_log):
req = json.dumps({'api-version': 2, 'ops': []})
rc = ceph_broker.process_requests(req)
print "Return: %s" % rc
self.assertEqual(json.loads(rc),
{'exit-code': 1,
'stderr': 'Missing or invalid api version (2)'})
@ -49,7 +50,7 @@ class CephBrokerTestCase(unittest.TestCase):
mock_pool_exists.return_value = False
reqs = json.dumps({'api-version': 1,
'ops': [{'op': 'create-pool', 'name':
'foo', 'replicas': 3}]})
'foo', 'replicas': 3}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_create_pool.assert_called_with(service='admin', name='foo',
@ -59,41 +60,88 @@ class CephBrokerTestCase(unittest.TestCase):
@mock.patch('ceph_broker.create_pool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
def test_process_requests_create_pool_exists(self, mock_log,
mock_pool_exists,
mock_create_pool):
mock_pool_exists.return_value = True
reqs = json.dumps({'api-version': 1,
'ops': [{'op': 'create-pool', 'name': 'foo',
'replicas': 3}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
self.assertFalse(mock_create_pool.called)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.create_pool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
def test_process_requests_create_pool_rid(self, mock_log, mock_pool_exists,
mock_create_pool):
def test_process_requests_create_erasure_pool(self, mock_log, mock_pool_exists,
mock_create_pool):
mock_pool_exists.return_value = False
reqs = json.dumps({'api-version': 1,
'request-id': '1ef5aede',
'ops': [{'op': 'create-pool', 'name':
'foo', 'replicas': 3}]})
'foo', 'erasure-type': 'jerasure',
'failure-domain': 'host', 'k': 3, 'm': 2}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_create_pool.assert_called_with(service='admin', name='foo',
replicas=3)
self.assertEqual(json.loads(rc)['exit-code'], 0)
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
mock_create_pool.assert_called_with(service='admin', name='foo')
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.log')
def test_process_requests_invalid_api_rid(self, mock_log):
reqs = json.dumps({'api-version': 0, 'request-id': '1ef5aede',
'ops': [{'op': 'create-pool'}]})
rc = ceph_broker.process_requests(reqs)
self.assertEqual(json.loads(rc)['exit-code'], 1)
self.assertEqual(json.loads(rc)['stderr'],
"Missing or invalid api version (0)")
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
'''
elif op == "create-cache-tier":
handle_create_cache_tier(request=req, service=svc)
elif op == "remove-cache-tier":
handle_remove_cache_tier(request=req, service=svc)
elif op == "create-erasure-profile":
handle_create_erasure_profile(request=req, service=svc)
elif op == "delete-pool":
pool = req.get('name')
delete_pool(service=svc, name=pool)
elif op == "rename-pool":
old_name = req.get('name')
new_name = req.get('new-name')
rename_pool(service=svc, old_name=old_name, new_name=new_name)
elif op == "snapshot-pool":
pool = req.get('name')
snapshot_name = req.get('snapshot-name')
snapshot_pool(service=svc, pool_name=pool,
snapshot_name=snapshot_name)
elif op == "remove-pool-snapshot":
pool = req.get('name')
snapshot_name = req.get('snapshot-name')
remove_pool_snapshot(service=svc, pool_name=pool,
snapshot_name=snapshot_name)
elif op == "set-pool-value":
handle_set_pool_value(request=req, service=svc)
'''
@mock.patch('ceph_broker.create_pool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
def test_process_requests_create_pool_exists(self, mock_log,
mock_pool_exists,
mock_create_pool):
mock_pool_exists.return_value = True
reqs = json.dumps({'api-version': 1,
'ops': [{'op': 'create-pool', 'name': 'foo',
'replicas': 3}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
self.assertFalse(mock_create_pool.called)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.create_pool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
def test_process_requests_create_pool_rid(self, mock_log, mock_pool_exists,
mock_create_pool):
mock_pool_exists.return_value = False
reqs = json.dumps({'api-version': 1,
'request-id': '1ef5aede',
'ops': [{'op': 'create-pool', 'name':
'foo', 'replicas': 3}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_create_pool.assert_called_with(service='admin', name='foo',
replicas=3)
self.assertEqual(json.loads(rc)['exit-code'], 0)
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
@mock.patch('ceph_broker.log')
def test_process_requests_invalid_api_rid(self, mock_log):
reqs = json.dumps({'api-version': 0, 'request-id': '1ef5aede',
'ops': [{'op': 'create-pool'}]})
rc = ceph_broker.process_requests(reqs)
self.assertEqual(json.loads(rc)['exit-code'], 1)
self.assertEqual(json.loads(rc)['stderr'],
"Missing or invalid api version (0)")
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')

View File

@ -31,7 +31,6 @@ ENOUGH_PEERS_COMPLETE = {
class ServiceStatusTestCase(test_utils.CharmTestCase):
def setUp(self):
super(ServiceStatusTestCase, self).setUp(hooks, TO_PATCH)
self.config.side_effect = self.test_config.get