Use common pool create/update handling code

Use the Pool classes ability to initialize themselves from op.

Some pool properties should be allowed adjusted after a pool is
created.

The Pool quota property is already handled ad-hoc in the
`charms.ceph` broker handling code, let's bring it over to the
pool objects along with the handling of the compression properties.

Move some missing unit tests here from ceph-mon charm.

Merge after https://github.com/juju/charm-helpers/pull/497

Change-Id: Ibec4e3221387199adbc1a920e130975d7b25343c
This commit is contained in:
Frode Nordahl 2020-07-31 12:46:22 +02:00
parent 89b3b8d1eb
commit af0eac506d
No known key found for this signature in database
GPG Key ID: 6A5D59A3BA48373F
6 changed files with 129 additions and 88 deletions

View File

@ -16,6 +16,7 @@ import collections
import json
import os
from subprocess import check_call, check_output, CalledProcessError
from tempfile import NamedTemporaryFile
from charms_ceph.utils import (
@ -41,18 +42,16 @@ from charmhelpers.contrib.storage.linux.ceph import (
pool_set,
remove_pool_snapshot,
rename_pool,
set_pool_quota,
snapshot_pool,
validator,
ErasurePool,
Pool,
BasePool,
ReplicatedPool,
)
# This comes from http://docs.ceph.com/docs/master/rados/operations/pools/
# This should do a decent job of preventing people from passing in bad values.
# It will give a useful error message
from subprocess import check_call, check_output, CalledProcessError
POOL_KEYS = {
# "Ceph Key Name": [Python type, [Valid Range]]
@ -405,23 +404,11 @@ def handle_erasure_pool(request, service):
"""
pool_name = request.get('name')
erasure_profile = request.get('erasure-profile')
max_bytes = request.get('max-bytes')
max_objects = request.get('max-objects')
weight = request.get('weight')
group_name = request.get('group')
allow_ec_overwrites = request.get('allow-ec-overwrites')
if erasure_profile is None:
erasure_profile = "default-canonical"
app_name = request.get('app-name')
# Check for missing params
if pool_name is None:
msg = "Missing parameter. name is required for the pool"
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
if group_name:
group_namespace = request.get('group-namespace')
# Add the pool to the group named "group_name"
@ -437,21 +424,22 @@ def handle_erasure_pool(request, service):
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
pool = ErasurePool(service=service, name=pool_name,
erasure_code_profile=erasure_profile,
percent_data=weight,
app_name=app_name,
allow_ec_overwrites=allow_ec_overwrites)
try:
pool = ErasurePool(service=service,
op=request)
except KeyError:
msg = "Missing parameter."
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
# Ok make the erasure pool
if not pool_exists(service=service, name=pool_name):
log("Creating pool '{}' (erasure_profile={})"
.format(pool.name, erasure_profile), level=INFO)
pool.create()
# Set a quota if requested
if max_bytes or max_objects:
set_pool_quota(service=service, pool_name=pool_name,
max_bytes=max_bytes, max_objects=max_objects)
# Set/update properties that are allowed to change after pool creation.
pool.update()
def handle_replicated_pool(request, service):
@ -462,26 +450,19 @@ def handle_replicated_pool(request, service):
:returns: dict. exit-code and reason if not 0.
"""
pool_name = request.get('name')
replicas = request.get('replicas')
max_bytes = request.get('max-bytes')
max_objects = request.get('max-objects')
weight = request.get('weight')
group_name = request.get('group')
# Optional params
# NOTE: Check this against the handling in the Pool classes, reconcile and
# remove.
pg_num = request.get('pg_num')
replicas = request.get('replicas')
if pg_num:
# Cap pg_num to max allowed just in case.
osds = get_osds(service)
if osds:
pg_num = min(pg_num, (len(osds) * 100 // replicas))
app_name = request.get('app-name')
# Check for missing params
if pool_name is None or replicas is None:
msg = "Missing parameter. name and replicas are required"
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
request.update({'pg_num': pg_num})
if group_name:
group_namespace = request.get('group-namespace')
@ -490,18 +471,14 @@ def handle_replicated_pool(request, service):
group=group_name,
namespace=group_namespace)
kwargs = {}
if pg_num:
kwargs['pg_num'] = pg_num
if weight:
kwargs['percent_data'] = weight
if replicas:
kwargs['replicas'] = replicas
if app_name:
kwargs['app_name'] = app_name
try:
pool = ReplicatedPool(service=service,
op=request)
except KeyError:
msg = "Missing parameter."
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
pool = ReplicatedPool(service=service,
name=pool_name, **kwargs)
if not pool_exists(service=service, name=pool_name):
log("Creating pool '{}' (replicas={})".format(pool.name, replicas),
level=INFO)
@ -510,10 +487,8 @@ def handle_replicated_pool(request, service):
log("Pool '{}' already exists - skipping create".format(pool.name),
level=DEBUG)
# Set a quota if requested
if max_bytes or max_objects:
set_pool_quota(service=service, pool_name=pool_name,
max_bytes=max_bytes, max_objects=max_objects)
# Set/update properties that are allowed to change after pool creation.
pool.update()
def handle_create_cache_tier(request, service):
@ -540,7 +515,7 @@ def handle_create_cache_tier(request, service):
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
p = Pool(service=service, name=storage_pool)
p = BasePool(service=service, name=storage_pool)
p.add_cache_tier(cache_pool=cache_pool, mode=cache_mode)
@ -561,7 +536,7 @@ def handle_remove_cache_tier(request, service):
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
pool = Pool(name=storage_pool, service=service)
pool = BasePool(name=storage_pool, service=service)
pool.remove_cache_tier(cache_pool=cache_pool)

View File

@ -7,3 +7,5 @@ flake8>=2.2.4
stestr
requests==2.6.0
netifaces
mock
git+https://github.com/openstack/charms.openstack.git#egg=charms.openstack

View File

@ -15,3 +15,12 @@
import sys
sys.path.append('charms_ceph')
# Mock out charmhelpers so that we can test without it.
import charms_openstack.test_mocks # noqa
charms_openstack.test_mocks.mock_charmhelpers()
charmhelpers = charms_openstack.test_mocks.charmhelpers
sys.modules['charmhelpers.contrib.storage.linux.utils'] = (
charmhelpers.contrib.storage.linux.utils)

View File

@ -335,16 +335,17 @@ class CephBrokerTestCase(unittest.TestCase):
mock_get_osds):
mock_pool_exists.return_value = False
mock_get_osds.return_value = [0, 1, 2]
op = {
'op': 'create-pool',
'name': 'foo',
'replicas': 3,
'pg_num': 100,
}
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'create-pool',
'name': 'foo',
'replicas': 3,
'pg_num': 100}]})
'ops': [op]})
rc = charms_ceph.broker.process_requests(reqs)
mock_replicated_pool.assert_called_with(service='admin', op=op)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_replicated_pool.assert_called_with(service='admin', name='foo',
replicas=3, pg_num=100)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@patch.object(charms_ceph.broker, 'ReplicatedPool')
@ -355,19 +356,20 @@ class CephBrokerTestCase(unittest.TestCase):
mock_log, mock_pool_exists,
mock_replicated_pool):
mock_pool_exists.return_value = False
op = {
'op': 'create-pool',
'name': 'foo',
'replicas': 3,
'group': 'image',
}
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'create-pool',
'name': 'foo',
'replicas': 3,
'group': 'image'}]})
'ops': [op]})
rc = charms_ceph.broker.process_requests(reqs)
add_pool_to_group.assert_called_with(group='image',
pool='foo',
namespace=None)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_replicated_pool.assert_called_with(service='admin', name='foo',
replicas=3)
mock_replicated_pool.assert_called_with(service='admin', op=op)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@patch.object(charms_ceph.broker, 'ReplicatedPool')
@ -377,10 +379,14 @@ class CephBrokerTestCase(unittest.TestCase):
mock_pool_exists,
mock_replicated_pool):
mock_pool_exists.return_value = True
op = {
'op': 'create-pool',
'name': 'foo',
'replicas': 3,
}
reqs = json.dumps({'api-version': 1,
'ops': [{'op': 'create-pool',
'name': 'foo',
'replicas': 3}]})
'ops': [op]})
rc = charms_ceph.broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin',
name='foo')
@ -394,20 +400,66 @@ class CephBrokerTestCase(unittest.TestCase):
mock_pool_exists,
mock_replicated_pool):
mock_pool_exists.return_value = False
op = {
'op': 'create-pool',
'name': 'foo',
'replicas': 3,
}
reqs = json.dumps({'api-version': 1,
'request-id': '1ef5aede',
'ops': [{
'op': 'create-pool',
'name': 'foo',
'replicas': 3}]})
'ops': [op]})
rc = charms_ceph.broker.process_requests(reqs)
mock_replicated_pool.assert_called_with(service='admin', op=op)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_replicated_pool.assert_called_with(service='admin',
name='foo',
replicas=3)
self.assertEqual(json.loads(rc)['exit-code'], 0)
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
@patch.object(charms_ceph.broker, 'erasure_profile_exists')
@patch.object(charms_ceph.broker, 'ErasurePool')
@patch.object(charms_ceph.broker, 'pool_exists')
@patch.object(charms_ceph.broker, 'log')
def test_process_requests_create_erasure_pool(self, mock_log,
mock_pool_exists,
mock_erasure_pool,
mock_profile_exists):
mock_pool_exists.return_value = False
op = {
'op': 'create-pool',
'pool-type': 'erasure',
'name': 'foo',
'erasure-profile': 'default'
}
reqs = json.dumps({'api-version': 1,
'ops': [op]})
rc = charms_ceph.broker.process_requests(reqs)
mock_profile_exists.assert_called_with(service='admin', name='default')
mock_erasure_pool.assert_called_with(service='admin', op=op)
mock_pool_exists.assert_called_with(service='admin', name='foo')
self.assertEqual(json.loads(rc), {'exit-code': 0})
@patch.object(charms_ceph.broker, 'pool_exists')
@patch.object(charms_ceph.broker, 'BasePool')
@patch.object(charms_ceph.broker, 'log', lambda *args, **kwargs: None)
def test_process_requests_create_cache_tier(self, mock_pool,
mock_pool_exists):
mock_pool_exists.return_value = True
op = {
'op': 'create-cache-tier',
'cold-pool': 'foo',
'hot-pool': 'foo-ssd',
'mode': 'writeback',
'erasure-profile': 'default'
}
reqs = json.dumps({'api-version': 1,
'ops': [op]})
rc = charms_ceph.broker.process_requests(reqs)
mock_pool_exists.assert_any_call(service='admin', name='foo')
mock_pool_exists.assert_any_call(service='admin', name='foo-ssd')
mock_pool().add_cache_tier.assert_called_with(
cache_pool='foo-ssd', mode='writeback')
self.assertEqual(json.loads(rc), {'exit-code': 0})
@patch.object(charms_ceph.broker, 'get_cephfs')
@patch.object(charms_ceph.broker, 'check_output')
@patch.object(charms_ceph.broker, 'pool_exists')

View File

@ -293,9 +293,10 @@ class UpgradeRollingTestCase(unittest.TestCase):
handle.write.assert_called_with('ready')
update_owner.assert_called_with('/var/lib/ceph/osd/ceph-6/ready')
@patch.object(charms_ceph.utils, 'DEBUG')
@patch('subprocess.check_output')
@patch.object(charms_ceph.utils, 'log')
def test_get_osd_state(self, log, check_output):
def test_get_osd_state(self, log, check_output, level_DBG):
check_output.side_effect = [
subprocess.CalledProcessError(returncode=2, cmd=["bad"]),
ValueError("bad value"),
@ -306,9 +307,9 @@ class UpgradeRollingTestCase(unittest.TestCase):
['ceph', 'daemon', '/var/run/ceph/ceph-osd.2.asok', 'status'])
log.assert_has_calls([
call("Command '['bad']' returned non-zero exit status 2.",
level='DEBUG'),
call('bad value', level='DEBUG'),
call('OSD 2 state: active, goal state: None', level='DEBUG')])
level=level_DBG),
call('bad value', level=level_DBG),
call('OSD 2 state: active, goal state: None', level=level_DBG)])
self.assertEqual(osd_state, 'active')
osd_state = charms_ceph.utils.get_osd_state(2, osd_goal_state='active')
@ -316,9 +317,9 @@ class UpgradeRollingTestCase(unittest.TestCase):
['ceph', 'daemon', '/var/run/ceph/ceph-osd.2.asok', 'status'])
log.assert_has_calls([
call("Command '['bad']' returned non-zero exit status 2.",
level='DEBUG'),
call('bad value', level='DEBUG'),
call('OSD 2 state: active, goal state: None', level='DEBUG')])
level=level_DBG),
call('bad value', level=level_DBG),
call('OSD 2 state: active, goal state: None', level=level_DBG)])
self.assertEqual(osd_state, 'active')
@patch.object(charms_ceph.utils, 'socket')

View File

@ -682,9 +682,10 @@ class CephTestCase(unittest.TestCase):
mock_reweight.assert_called_once_with(
['ceph', 'osd', 'crush', 'reweight', 'osd.0', '1'], stderr=-2)
@patch.object(utils, 'lsb_release')
def test_determine_packages(self, _lsb_release):
_lsb_release.return_value = {'DISTRIB_CODENAME': 'bionic'}
@patch.object(utils, 'CompareHostReleases')
def test_determine_packages(self, _cmp):
_cmp().__str__.return_value = 'bionic'
_cmp().__ge__.return_value = False
self.assertEqual(utils.PACKAGES + ['btrfs-tools'],
utils.determine_packages())
@ -2010,8 +2011,9 @@ class CephGetLVSTestCase(unittest.TestCase):
_log.assert_called_with(oserror_exception)
self.assertEqual(result, False)
@patch.object(utils, 'WARNING')
@patch.object(utils, 'log')
def test_is_pristine_disk_short_read(self, _log):
def test_is_pristine_disk_short_read(self, _log, _level_WRN):
data = b'\0' * 2047
fake_open = mock_open(read_data=data)
with patch('charms_ceph.utils.open', fake_open):
@ -2019,7 +2021,7 @@ class CephGetLVSTestCase(unittest.TestCase):
fake_open.assert_called_with('/dev/vdz', 'rb')
_log.assert_called_with(
'/dev/vdz: short read, got 2047 bytes expected 2048.',
level='WARNING')
level=_level_WRN)
self.assertEqual(result, False)
def test_is_pristine_disk_dirty_disk(self):