Migrate to shared lib

This change moves our ceph.py and ceph_broker.py into
a seperate repository that we can share between various
ceph related Juju projects, along with a Makefile
change to use a new git_sync file to partially sync
a git repository into a specified path

Change-Id: I8942d2f3411acec197fd6b854c1d9e50457502a5
This commit is contained in:
Chris MacNaughton 2016-08-01 17:00:13 -04:00
parent 31aae0b984
commit 1f140c73ed
11 changed files with 1243 additions and 773 deletions

View File

@ -17,9 +17,17 @@ bin/charm_helpers_sync.py:
@bzr cat lp:charm-helpers/tools/charm_helpers_sync/charm_helpers_sync.py \
> bin/charm_helpers_sync.py
sync: bin/charm_helpers_sync.py
bin/git_sync.py:
@mkdir -p bin
@wget -O bin/git_sync.py https://raw.githubusercontent.com/ChrisMacNaughton/git-sync/master/git_sync.py
ch-sync: bin/charm_helpers_sync.py
$(PYTHON) bin/charm_helpers_sync.py -c charm-helpers-hooks.yaml
$(PYTHON) bin/charm_helpers_sync.py -c charm-helpers-tests.yaml
git-sync: bin/git_sync.py
$(PYTHON) bin/git_sync.py -d lib/ceph -s https://github.com/CanonicalLtd/charms_ceph.git
sync: git-sync ch-sync
publish: lint test
bzr push lp:charms/ceph

View File

@ -21,12 +21,13 @@ import sys
from subprocess import check_call
sys.path.append('hooks')
sys.path.append('lib')
from charmhelpers.core.hookenv import (
action_fail,
)
from ceph import get_local_osd_ids
from ceph.ceph.ceph import get_local_osd_ids
from ceph_hooks import assess_status
from utils import (

View File

@ -1,600 +0,0 @@
# Copyright 2016 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import subprocess
import time
import os
import re
import sys
from charmhelpers.contrib.storage.linux.utils import (
is_block_device,
zap_disk,
is_device_mounted)
from charmhelpers.core.host import (
mkdir,
chownr,
service_restart,
lsb_release,
cmp_pkgrevno)
from charmhelpers.core.hookenv import (
log,
ERROR,
cached,
status_set,
WARNING)
from charmhelpers.fetch import (
apt_cache
)
from utils import (
get_unit_hostname,
)
LEADER = 'leader'
PEON = 'peon'
QUORUM = [LEADER, PEON]
PACKAGES = ['ceph', 'gdisk', 'ntp', 'btrfs-tools', 'python-ceph', 'xfsprogs']
def ceph_user():
if get_version() > 1:
return 'ceph'
else:
return "root"
def get_local_mon_ids():
"""
This will list the /var/lib/ceph/mon/* directories and try
to split the ID off of the directory name and return it in
a list
:return: list. A list of monitor identifiers :raise: OSError if
something goes wrong with listing the directory.
"""
mon_ids = []
mon_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'mon')
if os.path.exists(mon_path):
try:
dirs = os.listdir(mon_path)
for mon_dir in dirs:
# Basically this takes everything after ceph- as the monitor ID
match = re.search('ceph-(?P<mon_id>.*)', mon_dir)
if match:
mon_ids.append(match.group('mon_id'))
except OSError:
raise
return mon_ids
def get_version():
"""Derive Ceph release from an installed package."""
import apt_pkg as apt
cache = apt_cache()
package = "ceph"
try:
pkg = cache[package]
except:
# the package is unknown to the current apt cache.
e = 'Could not determine version of package with no installation ' \
'candidate: %s' % package
error_out(e)
if not pkg.current_ver:
# package is known, but no version is currently installed.
e = 'Could not determine version of uninstalled package: %s' % package
error_out(e)
vers = apt.upstream_version(pkg.current_ver.ver_str)
# x.y match only for 20XX.X
# and ignore patch level for other packages
match = re.match('^(\d+)\.(\d+)', vers)
if match:
vers = match.group(0)
return float(vers)
def error_out(msg):
log("FATAL ERROR: %s" % msg,
level=ERROR)
sys.exit(1)
def is_quorum():
asok = "/var/run/ceph/ceph-mon.{}.asok".format(get_unit_hostname())
cmd = [
"sudo",
"-u",
ceph_user(),
"ceph",
"--admin-daemon",
asok,
"mon_status"
]
if os.path.exists(asok):
try:
result = json.loads(subprocess.check_output(cmd))
except subprocess.CalledProcessError:
return False
except ValueError:
# Non JSON response from mon_status
return False
if result['state'] in QUORUM:
return True
else:
return False
else:
return False
def is_leader():
asok = "/var/run/ceph/ceph-mon.{}.asok".format(get_unit_hostname())
cmd = [
"sudo",
"-u",
ceph_user(),
"ceph",
"--admin-daemon",
asok,
"mon_status"
]
if os.path.exists(asok):
try:
result = json.loads(subprocess.check_output(cmd))
except subprocess.CalledProcessError:
return False
except ValueError:
# Non JSON response from mon_status
return False
if result['state'] == LEADER:
return True
else:
return False
else:
return False
def wait_for_quorum():
while not is_quorum():
log("Waiting for quorum to be reached")
time.sleep(3)
def add_bootstrap_hint(peer):
asok = "/var/run/ceph/ceph-mon.{}.asok".format(get_unit_hostname())
cmd = [
"sudo",
"-u",
ceph_user(),
"ceph",
"--admin-daemon",
asok,
"add_bootstrap_peer_hint",
peer
]
if os.path.exists(asok):
# Ignore any errors for this call
subprocess.call(cmd)
DISK_FORMATS = [
'xfs',
'ext4',
'btrfs'
]
def is_osd_disk(dev):
try:
info = subprocess.check_output(['sgdisk', '-i', '1', dev])
info = info.split("\n") # IGNORE:E1103
for line in info:
if line.startswith(
'Partition GUID code: 4FBD7E29-9D25-41B8-AFD0-062C0CEFF05D'
):
return True
except subprocess.CalledProcessError:
pass
return False
def start_osds(devices):
# Scan for ceph block devices
rescan_osd_devices()
if cmp_pkgrevno('ceph', "0.56.6") >= 0:
# Use ceph-disk activate for directory based OSD's
for dev_or_path in devices:
if os.path.exists(dev_or_path) and os.path.isdir(dev_or_path):
subprocess.check_call(['ceph-disk', 'activate', dev_or_path])
def rescan_osd_devices():
cmd = [
'udevadm', 'trigger',
'--subsystem-match=block', '--action=add'
]
subprocess.call(cmd)
_bootstrap_keyring = "/var/lib/ceph/bootstrap-osd/ceph.keyring"
def is_bootstrapped():
return os.path.exists(_bootstrap_keyring)
def wait_for_bootstrap():
while not is_bootstrapped():
time.sleep(3)
def import_osd_bootstrap_key(key):
if not os.path.exists(_bootstrap_keyring):
cmd = [
"sudo",
"-u",
ceph_user(),
'ceph-authtool',
_bootstrap_keyring,
'--create-keyring',
'--name=client.bootstrap-osd',
'--add-key={}'.format(key)
]
subprocess.check_call(cmd)
def generate_monitor_secret():
cmd = [
'ceph-authtool',
'/dev/stdout',
'--name=mon.',
'--gen-key'
]
res = subprocess.check_output(cmd)
return "{}==".format(res.split('=')[1].strip())
# OSD caps taken from ceph-create-keys
_osd_bootstrap_caps = {
'mon': [
'allow command osd create ...',
'allow command osd crush set ...',
r'allow command auth add * osd allow\ * mon allow\ rwx',
'allow command mon getmap'
]
}
_osd_bootstrap_caps_profile = {
'mon': [
'allow profile bootstrap-osd'
]
}
def parse_key(raw_key):
# get-or-create appears to have different output depending
# on whether its 'get' or 'create'
# 'create' just returns the key, 'get' is more verbose and
# needs parsing
key = None
if len(raw_key.splitlines()) == 1:
key = raw_key
else:
for element in raw_key.splitlines():
if 'key' in element:
key = element.split(' = ')[1].strip() # IGNORE:E1103
return key
def get_osd_bootstrap_key():
try:
# Attempt to get/create a key using the OSD bootstrap profile first
key = get_named_key('bootstrap-osd',
_osd_bootstrap_caps_profile)
except:
# If that fails try with the older style permissions
key = get_named_key('bootstrap-osd',
_osd_bootstrap_caps)
return key
_radosgw_keyring = "/etc/ceph/keyring.rados.gateway"
def import_radosgw_key(key):
if not os.path.exists(_radosgw_keyring):
cmd = [
"sudo",
"-u",
ceph_user(),
'ceph-authtool',
_radosgw_keyring,
'--create-keyring',
'--name=client.radosgw.gateway',
'--add-key={}'.format(key)
]
subprocess.check_call(cmd)
# OSD caps taken from ceph-create-keys
_radosgw_caps = {
'mon': ['allow rw'],
'osd': ['allow rwx']
}
_upgrade_caps = {
'mon': ['allow rwx']
}
osd_upgrade_caps = {
'mon': ['allow command "config-key"',
'allow command "osd tree"',
'allow command "config-key list"',
'allow command "config-key put"',
'allow command "config-key get"',
'allow command "config-key exists"',
'allow command "osd out"',
'allow command "osd in"',
'allow command "osd rm"',
'allow command "auth del"',
]
}
def get_radosgw_key():
return get_named_key('radosgw.gateway', _radosgw_caps)
_default_caps = {
'mon': ['allow rw'],
'osd': ['allow rwx']
}
admin_caps = {
'mds': ['allow'],
'mon': ['allow *'],
'osd': ['allow *']
}
osd_upgrade_caps = {
'mon': ['allow command "config-key"',
'allow command "osd tree"',
'allow command "config-key list"',
'allow command "config-key put"',
'allow command "config-key get"',
'allow command "config-key exists"',
]
}
def get_upgrade_key():
return get_named_key('upgrade-osd', _upgrade_caps)
def get_named_key(name, caps=None):
caps = caps or _default_caps
cmd = [
"sudo",
"-u",
ceph_user(),
'ceph',
'--name', 'mon.',
'--keyring',
'/var/lib/ceph/mon/ceph-{}/keyring'.format(
get_unit_hostname()
),
'auth', 'get-or-create', 'client.{}'.format(name),
]
# Add capabilities
for subsystem, subcaps in caps.iteritems():
cmd.extend([
subsystem,
'; '.join(subcaps),
])
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103
def upgrade_key_caps(key, caps):
""" Upgrade key to have capabilities caps """
if not is_leader():
# Not the MON leader OR not clustered
return
cmd = [
"sudo", "-u", ceph_user(), 'ceph', 'auth', 'caps', key
]
for subsystem, subcaps in caps.iteritems():
cmd.extend([subsystem, '; '.join(subcaps)])
subprocess.check_call(cmd)
@cached
def systemd():
return (lsb_release()['DISTRIB_CODENAME'] >= 'vivid')
def bootstrap_monitor_cluster(secret):
hostname = get_unit_hostname()
path = '/var/lib/ceph/mon/ceph-{}'.format(hostname)
done = '{}/done'.format(path)
if systemd():
init_marker = '{}/systemd'.format(path)
else:
init_marker = '{}/upstart'.format(path)
keyring = '/var/lib/ceph/tmp/{}.mon.keyring'.format(hostname)
if os.path.exists(done):
log('bootstrap_monitor_cluster: mon already initialized.')
else:
# Ceph >= 0.61.3 needs this for ceph-mon fs creation
mkdir('/var/run/ceph', owner=ceph_user(),
group=ceph_user(), perms=0o755)
mkdir(path, owner=ceph_user(), group=ceph_user())
# end changes for Ceph >= 0.61.3
try:
subprocess.check_call(['ceph-authtool', keyring,
'--create-keyring', '--name=mon.',
'--add-key={}'.format(secret),
'--cap', 'mon', 'allow *'])
subprocess.check_call(['ceph-mon', '--mkfs',
'-i', hostname,
'--keyring', keyring])
chownr(path, ceph_user(), ceph_user())
with open(done, 'w'):
pass
with open(init_marker, 'w'):
pass
if systemd():
subprocess.check_call(['systemctl', 'enable', 'ceph-mon'])
service_restart('ceph-mon')
else:
service_restart('ceph-mon-all')
except:
raise
finally:
os.unlink(keyring)
def update_monfs():
hostname = get_unit_hostname()
monfs = '/var/lib/ceph/mon/ceph-{}'.format(hostname)
if systemd():
init_marker = '{}/systemd'.format(monfs)
else:
init_marker = '{}/upstart'.format(monfs)
if os.path.exists(monfs) and not os.path.exists(init_marker):
# Mark mon as managed by upstart so that
# it gets start correctly on reboots
with open(init_marker, 'w'):
pass
def osdize(dev, osd_format, osd_journal, reformat_osd=False,
ignore_errors=False):
if dev.startswith('/dev'):
osdize_dev(dev, osd_format, osd_journal, reformat_osd, ignore_errors)
else:
osdize_dir(dev)
def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False,
ignore_errors=False):
if not os.path.exists(dev):
log('Path {} does not exist - bailing'.format(dev))
return
if not is_block_device(dev):
log('Path {} is not a block device - bailing'.format(dev))
return
if is_osd_disk(dev) and not reformat_osd:
log('Looks like {} is already an OSD, skipping.'.format(dev))
return
if is_device_mounted(dev):
log('Looks like {} is in use, skipping.'.format(dev))
return
status_set('maintenance', 'Initializing device {}'.format(dev))
cmd = ['ceph-disk', 'prepare']
# Later versions of ceph support more options
if cmp_pkgrevno('ceph', '0.48.3') >= 0:
if osd_format:
cmd.append('--fs-type')
cmd.append(osd_format)
if reformat_osd:
cmd.append('--zap-disk')
cmd.append(dev)
if osd_journal and os.path.exists(osd_journal):
cmd.append(osd_journal)
else:
# Just provide the device - no other options
# for older versions of ceph
cmd.append(dev)
if reformat_osd:
zap_disk(dev)
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError as e:
if ignore_errors:
log('Unable to initialize device: {}'.format(dev), WARNING)
else:
log('Unable to initialize device: {}'.format(dev), ERROR)
raise e
def osdize_dir(path):
if os.path.exists(os.path.join(path, 'upstart')):
log('Path {} is already configured as an OSD - bailing'.format(path))
return
if cmp_pkgrevno('ceph', "0.56.6") < 0:
log('Unable to use directories for OSDs with ceph < 0.56.6',
level=ERROR)
raise
mkdir(path, owner=ceph_user(), group=ceph_user(), perms=0o755)
chownr('/var/lib/ceph', ceph_user(), ceph_user())
cmd = [
'sudo', '-u', ceph_user(),
'ceph-disk',
'prepare',
'--data-dir',
path
]
subprocess.check_call(cmd)
def filesystem_mounted(fs):
return subprocess.call(['grep', '-wqs', fs, '/proc/mounts']) == 0
def get_local_osd_ids():
"""
This will list the /var/lib/ceph/osd/* directories and try
to split the ID off of the directory name and return it in
a list
:return: list. A list of osd identifiers :raise: OSError if
something goes wrong with listing the directory.
"""
osd_ids = []
osd_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'osd')
if os.path.exists(osd_path):
try:
dirs = os.listdir(osd_path)
for osd_dir in dirs:
osd_id = osd_dir.split('-')[1]
if _is_int(osd_id):
osd_ids.append(osd_id)
except OSError:
raise
return osd_ids
def _is_int(v):
"""Return True if the object v can be turned into an integer."""
try:
int(v)
return True
except ValueError:
return False

View File

@ -21,7 +21,12 @@ import subprocess
import sys
import time
import ceph
sys.path.append('lib')
from ceph.ceph import ceph
from ceph.ceph.ceph_broker import (
process_requests
)
from charmhelpers.core import host
from charmhelpers.core import hookenv
from charmhelpers.core.hookenv import (
@ -79,9 +84,7 @@ from utils import (
is_unit_paused_set,
get_cluster_addr,
)
from ceph_broker import (
process_requests
)
from charmhelpers.contrib.charmsupport import nrpe
from charmhelpers.contrib.hardening.harden import harden

0
lib/ceph/__init__.py Normal file
View File

View File

@ -0,0 +1 @@
__author__ = 'chris'

1183
lib/ceph/ceph/ceph.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -22,7 +22,7 @@ from charmhelpers.core.hookenv import (
INFO,
ERROR,
)
from charmhelpers.contrib.storage.linux.ceph import (
from charmhelpers.contrib.storage.linux.ceph import (
create_erasure_profile,
delete_pool,
erasure_profile_exists,
@ -152,6 +152,7 @@ def handle_erasure_pool(request, service):
pool_name = request.get('name')
erasure_profile = request.get('erasure-profile')
quota = request.get('max-bytes')
weight = request.get('weight')
if erasure_profile is None:
erasure_profile = "default-canonical"
@ -171,7 +172,8 @@ def handle_erasure_pool(request, service):
return {'exit-code': 1, 'stderr': msg}
pool = ErasurePool(service=service, name=pool_name,
erasure_code_profile=erasure_profile)
erasure_code_profile=erasure_profile,
percent_data=weight)
# Ok make the erasure pool
if not pool_exists(service=service, name=pool_name):
log("Creating pool '%s' (erasure_profile=%s)" % (pool.name,
@ -188,7 +190,8 @@ def handle_replicated_pool(request, service):
pool_name = request.get('name')
replicas = request.get('replicas')
quota = request.get('max-bytes')
weight = request.get('weight')
# Optional params
pg_num = request.get('pg_num')
if pg_num:
@ -203,10 +206,16 @@ def handle_replicated_pool(request, service):
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
kwargs = {}
if pg_num:
kwargs['pg_num'] = pg_num
if weight:
kwargs['percent_data'] = weight
if replicas:
kwargs['replicas'] = replicas
pool = ReplicatedPool(service=service,
name=pool_name,
replicas=replicas,
pg_num=pg_num)
name=pool_name, **kwargs)
if not pool_exists(service=service, name=pool_name):
log("Creating pool '%s' (replicas=%s)" % (pool.name, replicas),
level=INFO)

View File

@ -14,4 +14,5 @@
import sys
sys.path.append('hooks')
sys.path.append('lib')
sys.path.append('actions')

View File

@ -1,151 +0,0 @@
# Copyright 2016 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import unittest
import mock
import ceph_broker
class CephBrokerTestCase(unittest.TestCase):
def setUp(self):
super(CephBrokerTestCase, self).setUp()
@mock.patch('ceph_broker.log')
def test_process_requests_noop(self, mock_log):
req = json.dumps({'api-version': 1, 'ops': []})
rc = ceph_broker.process_requests(req)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.log')
def test_process_requests_missing_api_version(self, mock_log):
req = json.dumps({'ops': []})
rc = ceph_broker.process_requests(req)
self.assertEqual(json.loads(rc), {
'exit-code': 1,
'stderr': 'Missing or invalid api version (None)'})
@mock.patch('ceph_broker.log')
def test_process_requests_invalid_api_version(self, mock_log):
req = json.dumps({'api-version': 2, 'ops': []})
rc = ceph_broker.process_requests(req)
print "Return: %s" % rc
self.assertEqual(json.loads(rc),
{'exit-code': 1,
'stderr': 'Missing or invalid api version (2)'})
@mock.patch('ceph_broker.log')
def test_process_requests_invalid(self, mock_log):
reqs = json.dumps({'api-version': 1, 'ops': [{'op': 'invalid_op'}]})
rc = ceph_broker.process_requests(reqs)
self.assertEqual(json.loads(rc),
{'exit-code': 1,
'stderr': "Unknown operation 'invalid_op'"})
@mock.patch('ceph_broker.get_osds')
@mock.patch('ceph_broker.ReplicatedPool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
def test_process_requests_create_pool_w_pg_num(self, mock_log,
mock_pool_exists,
mock_replicated_pool,
mock_get_osds):
mock_get_osds.return_value = [0, 1, 2]
mock_pool_exists.return_value = False
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'create-pool',
'name': 'foo',
'replicas': 3,
'pg_num': 100}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_replicated_pool.assert_called_with(service='admin', name='foo',
replicas=3, pg_num=100)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.get_osds')
@mock.patch('ceph_broker.ReplicatedPool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
def test_process_requests_create_pool_w_pg_num_capped(self, mock_log,
mock_pool_exists,
mock_replicated_pool,
mock_get_osds):
mock_get_osds.return_value = [0, 1, 2]
mock_pool_exists.return_value = False
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'create-pool',
'name': 'foo',
'replicas': 3,
'pg_num': 300}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin',
name='foo')
mock_replicated_pool.assert_called_with(service='admin', name='foo',
replicas=3, pg_num=100)
self.assertEqual(json.loads(rc), {'exit-code': 0})
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.ReplicatedPool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
def test_process_requests_create_pool_exists(self, mock_log,
mock_pool_exists,
mock_replicated_pool):
mock_pool_exists.return_value = True
reqs = json.dumps({'api-version': 1,
'ops': [{'op': 'create-pool',
'name': 'foo',
'replicas': 3}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin',
name='foo')
self.assertFalse(mock_replicated_pool.create.called)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.ReplicatedPool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
def test_process_requests_create_pool_rid(self, mock_log,
mock_pool_exists,
mock_replicated_pool):
mock_pool_exists.return_value = False
reqs = json.dumps({'api-version': 1,
'request-id': '1ef5aede',
'ops': [{
'op': 'create-pool',
'name': 'foo',
'replicas': 3}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_replicated_pool.assert_called_with(service='admin',
name='foo',
pg_num=None,
replicas=3)
self.assertEqual(json.loads(rc)['exit-code'], 0)
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
@mock.patch('ceph_broker.log')
def test_process_requests_invalid_api_rid(self, mock_log):
reqs = json.dumps({'api-version': 0, 'request-id': '1ef5aede',
'ops': [{'op': 'create-pool'}]})
rc = ceph_broker.process_requests(reqs)
self.assertEqual(json.loads(rc)['exit-code'], 1)
self.assertEqual(json.loads(rc)['stderr'],
"Missing or invalid api version (0)")
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')

View File

@ -20,7 +20,7 @@ from mock import (
patch,
)
from hooks import ceph_broker
from ceph.ceph import ceph_broker
class TestCephOps(unittest.TestCase):
@ -47,15 +47,12 @@ class TestCephOps(unittest.TestCase):
erasure_plugin_name='jerasure')
self.assertEqual(json.loads(rc), {'exit-code': 0})
@patch.object(ceph_broker, 'get_osds')
@patch.object(ceph_broker, 'pool_exists')
@patch.object(ceph_broker, 'ReplicatedPool')
@patch.object(ceph_broker, 'log', lambda *args, **kwargs: None)
def test_process_requests_create_replicated_pool(self,
mock_replicated_pool,
mock_pool_exists,
mock_get_osds):
mock_get_osds.return_value = 0
mock_pool_exists):
mock_pool_exists.return_value = False
reqs = json.dumps({'api-version': 1,
'ops': [{
@ -66,7 +63,29 @@ class TestCephOps(unittest.TestCase):
}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
calls = [call(pg_num=None, name=u'foo', service='admin', replicas=3)]
calls = [call(name=u'foo', service='admin', replicas=3)]
mock_replicated_pool.assert_has_calls(calls)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@patch.object(ceph_broker, 'pool_exists')
@patch.object(ceph_broker, 'ReplicatedPool')
@patch.object(ceph_broker, 'log', lambda *args, **kwargs: None)
def test_process_requests_replicated_pool_weight(self,
mock_replicated_pool,
mock_pool_exists):
mock_pool_exists.return_value = False
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'create-pool',
'pool-type': 'replicated',
'name': 'foo',
'weight': 40.0,
'replicas': 3
}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
calls = [call(name=u'foo', service='admin', replicas=3,
percent_data=40.0)]
mock_replicated_pool.assert_has_calls(calls)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@ -220,7 +239,3 @@ class TestCephOps(unittest.TestCase):
}]})
rc = ceph_broker.process_requests(reqs)
self.assertEqual(json.loads(rc)['exit-code'], 1)
if __name__ == '__main__':
unittest.main()