Code cleanup

* import ordering
* doctrings
* unit tests cleanup
* move code out of __init__.py
* rename files

Change-Id: Id56d4ea1a0cca33df03f40222bbf51a6bef58b20
This commit is contained in:
Edward Hope-Morley 2017-07-17 18:32:34 +02:00
parent 55a89963d6
commit dd978b7bd0
12 changed files with 3111 additions and 5046 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,3 @@
#!/usr/bin/python
#
# Copyright 2016 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -16,19 +14,21 @@
import json
import os
from tempfile import NamedTemporaryFile
from ceph.utils import (
get_cephfs,
get_osd_weight
)
from ceph.crush_utils import Crushmap
from charmhelpers.core.hookenv import (
log,
DEBUG,
INFO,
ERROR,
)
from ceph import (
get_cephfs,
get_osd_weight
)
from ceph.ceph_helpers import Crushmap
from charmhelpers.contrib.storage.linux.ceph import (
create_erasure_profile,
delete_pool,
@ -112,6 +112,9 @@ def process_requests(reqs):
This is a versioned api. API version must be supplied by the client making
the request.
:param reqs: dict of request parameters.
:returns: dict. exit-code and reason if not 0
"""
request_id = reqs.get('request-id')
try:
@ -140,6 +143,12 @@ def process_requests(reqs):
def handle_create_erasure_profile(request, service):
"""Create an erasure profile.
:param request: dict of request operations and params
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0
"""
# "local" | "shec" or it defaults to "jerasure"
erasure_type = request.get('erasure-type')
# "host" | "rack" or it defaults to "host" # Any valid Ceph bucket
@ -160,10 +169,9 @@ def handle_create_erasure_profile(request, service):
def handle_add_permissions_to_key(request, service):
"""
Groups are defined by the key cephx.groups.(namespace-)?-(name). This key
will contain a dict serialized to JSON with data about the group, including
pools and members.
"""Groups are defined by the key cephx.groups.(namespace-)?-(name). This
key will contain a dict serialized to JSON with data about the group,
including pools and members.
A group can optionally have a namespace defined that will be used to
further restrict pool access.
@ -238,8 +246,7 @@ def pool_permission_list_for_service(service):
def get_service_groups(service, namespace=None):
"""
Services are objects stored with some metadata, they look like (for a
"""Services are objects stored with some metadata, they look like (for a
service named "nova"):
{
group_names: {'rwx': ['images']},
@ -272,7 +279,7 @@ def get_service_groups(service, namespace=None):
def _build_service_groups(service, namespace=None):
'''Rebuild the 'groups' dict for a service group
"""Rebuild the 'groups' dict for a service group
:returns: dict: dictionary keyed by group name of the following
format:
@ -287,7 +294,7 @@ def _build_service_groups(service, namespace=None):
services: ['nova']
}
}
'''
"""
all_groups = {}
for _, groups in service['group_names'].items():
for group in groups:
@ -299,8 +306,7 @@ def _build_service_groups(service, namespace=None):
def get_group(group_name):
"""
A group is a structure to hold data about a named group, structured as:
"""A group is a structure to hold data about a named group, structured as:
{
pools: ['glance'],
services: ['nova']
@ -344,6 +350,12 @@ def get_group_key(group_name):
def handle_erasure_pool(request, service):
"""Create a new erasure coded pool.
:param request: dict of request operations and params.
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0.
"""
pool_name = request.get('name')
erasure_profile = request.get('erasure-profile')
quota = request.get('max-bytes')
@ -390,6 +402,12 @@ def handle_erasure_pool(request, service):
def handle_replicated_pool(request, service):
"""Create a new replicated pool.
:param request: dict of request operations and params.
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0.
"""
pool_name = request.get('name')
replicas = request.get('replicas')
quota = request.get('max-bytes')
@ -441,6 +459,13 @@ def handle_replicated_pool(request, service):
def handle_create_cache_tier(request, service):
"""Create a cache tier on a cold pool. Modes supported are
"writeback" and "readonly".
:param request: dict of request operations and params
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0
"""
# mode = "writeback" | "readonly"
storage_pool = request.get('cold-pool')
cache_pool = request.get('hot-pool')
@ -462,6 +487,12 @@ def handle_create_cache_tier(request, service):
def handle_remove_cache_tier(request, service):
"""Remove a cache tier from the cold pool.
:param request: dict of request operations and params
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0
"""
storage_pool = request.get('cold-pool')
cache_pool = request.get('hot-pool')
# cache and storage pool must exist first
@ -477,6 +508,12 @@ def handle_remove_cache_tier(request, service):
def handle_set_pool_value(request, service):
"""Sets an arbitrary pool value.
:param request: dict of request operations and params
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0
"""
# Set arbitrary pool values
params = {'pool': request.get('name'),
'key': request.get('key'),
@ -501,6 +538,12 @@ def handle_set_pool_value(request, service):
def handle_rgw_regionmap_update(request, service):
"""Change the radosgw region map.
:param request: dict of request operations and params
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0
"""
name = request.get('client-name')
if not name:
msg = "Missing rgw-region or client-name params"
@ -516,6 +559,12 @@ def handle_rgw_regionmap_update(request, service):
def handle_rgw_regionmap_default(request, service):
"""Create a radosgw region map.
:param request: dict of request operations and params
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0
"""
region = request.get('rgw-region')
name = request.get('client-name')
if not region or not name:
@ -537,6 +586,12 @@ def handle_rgw_regionmap_default(request, service):
def handle_rgw_zone_set(request, service):
"""Create a radosgw zone.
:param request: dict of request operations and params
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0
"""
json_file = request.get('zone-json')
name = request.get('client-name')
region_name = request.get('region-name')
@ -567,6 +622,12 @@ def handle_rgw_zone_set(request, service):
def handle_put_osd_in_bucket(request, service):
"""Move an osd into a specified crush bucket.
:param request: dict of request operations and params
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0
"""
osd_id = request.get('osd')
target_bucket = request.get('bucket')
if not osd_id or not target_bucket:
@ -597,6 +658,12 @@ def handle_put_osd_in_bucket(request, service):
def handle_rgw_create_user(request, service):
"""Create a new rados gateway user.
:param request: dict of request operations and params
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0
"""
user_id = request.get('rgw-uid')
display_name = request.get('display-name')
name = request.get('client-name')
@ -630,11 +697,11 @@ def handle_rgw_create_user(request, service):
def handle_create_cephfs(request, service):
"""
Create a new cephfs.
"""Create a new cephfs.
:param request: The broker request
:param service: The cephx user to run this command under
:return:
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0
"""
cephfs_name = request.get('mds_name')
data_pool = request.get('data_pool')
@ -678,6 +745,12 @@ def handle_create_cephfs(request, service):
def handle_rgw_region_set(request, service):
# radosgw-admin region set --infile us.json --name client.radosgw.us-east-1
"""Set the rados gateway region.
:param request: dict. The broker request.
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0
"""
json_file = request.get('region-json')
name = request.get('client-name')
region_name = request.get('region-name')

File diff suppressed because it is too large Load Diff

149
ceph/crush_utils.py Normal file
View File

@ -0,0 +1,149 @@
# Copyright 2014 Canonical Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from subprocess import check_output, CalledProcessError
from charmhelpers.core.hookenv import (
log,
ERROR,
)
CRUSH_BUCKET = """root {name} {{
id {id} # do not change unnecessarily
# weight 0.000
alg straw
hash 0 # rjenkins1
}}
rule {name} {{
ruleset 0
type replicated
min_size 1
max_size 10
step take {name}
step chooseleaf firstn 0 type host
step emit
}}"""
# This regular expression looks for a string like:
# root NAME {
# id NUMBER
# so that we can extract NAME and ID from the crushmap
CRUSHMAP_BUCKETS_RE = re.compile(r"root\s+(.+)\s+\{\s*id\s+(-?\d+)")
# This regular expression looks for ID strings in the crushmap like:
# id NUMBER
# so that we can extract the IDs from a crushmap
CRUSHMAP_ID_RE = re.compile(r"id\s+(-?\d+)")
class Crushmap(object):
"""An object oriented approach to Ceph crushmap management."""
def __init__(self):
self._crushmap = self.load_crushmap()
roots = re.findall(CRUSHMAP_BUCKETS_RE, self._crushmap)
buckets = []
ids = list(map(
lambda x: int(x),
re.findall(CRUSHMAP_ID_RE, self._crushmap)))
ids.sort()
if roots != []:
for root in roots:
buckets.append(CRUSHBucket(root[0], root[1], True))
self._buckets = buckets
if ids != []:
self._ids = ids
else:
self._ids = [0]
def load_crushmap(self):
try:
crush = check_output(['ceph', 'osd', 'getcrushmap'])
return check_output(['crushtool', '-d', '-'], stdin=crush.stdout)
except CalledProcessError as e:
log("Error occured while loading and decompiling CRUSH map:"
"{}".format(e), ERROR)
raise "Failed to read CRUSH map"
def ensure_bucket_is_present(self, bucket_name):
if bucket_name not in [bucket.name for bucket in self.buckets()]:
self.add_bucket(bucket_name)
self.save()
def buckets(self):
"""Return a list of buckets that are in the Crushmap."""
return self._buckets
def add_bucket(self, bucket_name):
"""Add a named bucket to Ceph"""
new_id = min(self._ids) - 1
self._ids.append(new_id)
self._buckets.append(CRUSHBucket(bucket_name, new_id))
def save(self):
"""Persist Crushmap to Ceph"""
try:
crushmap = self.build_crushmap()
compiled = check_output(['crushtool', '-c', '/dev/stdin', '-o',
'/dev/stdout'], stdin=crushmap)
ceph_output = check_output(['ceph', 'osd', 'setcrushmap', '-i',
'/dev/stdin'], stdin=compiled)
return ceph_output
except CalledProcessError as e:
log("save error: {}".format(e))
raise "Failed to save CRUSH map."
def build_crushmap(self):
"""Modifies the current CRUSH map to include the new buckets"""
tmp_crushmap = self._crushmap
for bucket in self._buckets:
if not bucket.default:
tmp_crushmap = "{}\n\n{}".format(
tmp_crushmap,
Crushmap.bucket_string(bucket.name, bucket.id))
return tmp_crushmap
@staticmethod
def bucket_string(name, id):
return CRUSH_BUCKET.format(name=name, id=id)
class CRUSHBucket(object):
"""CRUSH bucket description object."""
def __init__(self, name, id, default=False):
self.name = name
self.id = int(id)
self.default = default
def __repr__(self):
return "Bucket {{Name: {name}, ID: {id}}}".format(
name=self.name, id=self.id)
def __eq__(self, other):
"""Override the default Equals behavior"""
if isinstance(other, self.__class__):
return self.__dict__ == other.__dict__
return NotImplemented
def __ne__(self, other):
"""Define a non-equality test"""
if isinstance(other, self.__class__):
return not self.__eq__(other)
return NotImplemented

2199
ceph/utils.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -13,10 +13,11 @@
# limitations under the License.
import json
import mock
import unittest
import ceph_broker
from mock import patch
import ceph.broker
from mock import call
@ -25,22 +26,22 @@ class CephBrokerTestCase(unittest.TestCase):
def setUp(self):
super(CephBrokerTestCase, self).setUp()
@mock.patch('ceph_broker.check_call')
@patch.object(ceph.broker, 'check_call')
def test_update_service_permission(self, _check_call):
service_obj = {
'group_names': {'rwx': ['images']},
'groups': {'images': {'pools': ['cinder'], 'services': ['nova']}}
}
ceph_broker.update_service_permissions(service='nova',
ceph.broker.update_service_permissions(service='nova',
service_obj=service_obj)
_check_call.assert_called_with(['ceph', 'auth', 'caps',
'client.nova', 'mon', 'allow r', 'osd',
'allow rwx pool=cinder'])
@mock.patch('ceph_broker.check_call')
@mock.patch('ceph_broker.get_service_groups')
@mock.patch('ceph_broker.monitor_key_set')
@mock.patch('ceph_broker.monitor_key_get')
@patch.object(ceph.broker, 'check_call')
@patch.object(ceph.broker, 'get_service_groups')
@patch.object(ceph.broker, 'monitor_key_set')
@patch.object(ceph.broker, 'monitor_key_get')
def test_add_pool_to_existing_group_with_services(self,
_monitor_key_get,
_monitor_key_set,
@ -55,7 +56,7 @@ class CephBrokerTestCase(unittest.TestCase):
], 'services': ['nova']}}
}
_get_service_groups.return_value = service
ceph_broker.add_pool_to_group(
ceph.broker.add_pool_to_group(
pool="cinder",
group="images"
)
@ -69,13 +70,13 @@ class CephBrokerTestCase(unittest.TestCase):
'client.nova', 'mon', 'allow r', 'osd',
'allow rwx pool=glance, allow rwx pool=cinder'])
@mock.patch('ceph_broker.monitor_key_set')
@mock.patch('ceph_broker.monitor_key_get')
@patch.object(ceph.broker, 'monitor_key_set')
@patch.object(ceph.broker, 'monitor_key_get')
def test_add_pool_to_existing_group(self,
_monitor_key_get,
_monitor_key_set):
_monitor_key_get.return_value = '{"pools": ["glance"], "services": []}'
ceph_broker.add_pool_to_group(
ceph.broker.add_pool_to_group(
pool="cinder",
group="images"
)
@ -84,13 +85,13 @@ class CephBrokerTestCase(unittest.TestCase):
service='admin',
value=json.dumps({"pools": ["glance", "cinder"], "services": []}))
@mock.patch('ceph_broker.monitor_key_set')
@mock.patch('ceph_broker.monitor_key_get')
@patch.object(ceph.broker, 'monitor_key_set')
@patch.object(ceph.broker, 'monitor_key_get')
def test_add_pool_to_new_group(self,
_monitor_key_get,
_monitor_key_set):
_monitor_key_get.return_value = '{"pools": [], "services": []}'
ceph_broker.add_pool_to_group(
ceph.broker.add_pool_to_group(
pool="glance",
group="images"
)
@ -104,46 +105,46 @@ class CephBrokerTestCase(unittest.TestCase):
'group_names': {'rwx': ['images']},
'groups': {'images': {'pools': ['glance'], 'services': ['nova']}}
}
result = ceph_broker.pool_permission_list_for_service(service)
result = ceph.broker.pool_permission_list_for_service(service)
self.assertEqual(result, ['mon',
'allow r',
'osd',
'allow rwx pool=glance'])
@mock.patch('ceph_broker.monitor_key_set')
@patch.object(ceph.broker, 'monitor_key_set')
def test_save_service(self, _monitor_key_set):
service = {
'group_names': {'rwx': 'images'},
'groups': {'images': {'pools': ['glance'], 'services': ['nova']}}
}
ceph_broker.save_service(service=service, service_name='nova')
ceph.broker.save_service(service=service, service_name='nova')
_monitor_key_set.assert_called_with(
value='{"groups": {}, "group_names": {"rwx": "images"}}',
key='cephx.services.nova',
service='admin')
@mock.patch('ceph_broker.monitor_key_get')
@patch.object(ceph.broker, 'monitor_key_get')
def test_get_service_groups_empty(self, _monitor_key_get):
_monitor_key_get.return_value = None
service = ceph_broker.get_service_groups('nova')
service = ceph.broker.get_service_groups('nova')
_monitor_key_get.assert_called_with(
key='cephx.services.nova',
service='admin'
)
self.assertEqual(service, {'group_names': {}, 'groups': {}})
@mock.patch('ceph_broker.monitor_key_get')
@patch.object(ceph.broker, 'monitor_key_get')
def test_get_service_groups_empty_str(self, _monitor_key_get):
_monitor_key_get.return_value = ''
service = ceph_broker.get_service_groups('nova')
service = ceph.broker.get_service_groups('nova')
_monitor_key_get.assert_called_with(
key='cephx.services.nova',
service='admin'
)
self.assertEqual(service, {'group_names': {}, 'groups': {}})
@mock.patch('ceph_broker.get_group')
@mock.patch('ceph_broker.monitor_key_get')
@patch.object(ceph.broker, 'get_group')
@patch.object(ceph.broker, 'monitor_key_get')
def test_get_service_groups(self, _monitor_key_get, _get_group):
_monitor_key_get.return_value = '{"group_names": {"rwx": ["images"]}' \
',"groups": {}}'
@ -151,7 +152,7 @@ class CephBrokerTestCase(unittest.TestCase):
'pools': ["glance"],
'services': ['nova']
}
service = ceph_broker.get_service_groups('nova')
service = ceph.broker.get_service_groups('nova')
_monitor_key_get.assert_called_with(
key='cephx.services.nova',
service='admin'
@ -161,78 +162,78 @@ class CephBrokerTestCase(unittest.TestCase):
'groups': {'images': {'pools': ['glance'], 'services': ['nova']}}
})
@mock.patch('ceph_broker.monitor_key_set')
@patch.object(ceph.broker, 'monitor_key_set')
def test_save_group(self, _monitor_key_set):
group = {
'pools': ["glance"],
'services': []
}
ceph_broker.save_group(group=group, group_name='images')
ceph.broker.save_group(group=group, group_name='images')
_monitor_key_set.assert_called_with(
key='cephx.groups.images',
service='admin',
value=json.dumps(group))
@mock.patch('ceph_broker.monitor_key_get')
@patch.object(ceph.broker, 'monitor_key_get')
def test_get_group_empty_str(self, _monitor_key_get):
_monitor_key_get.return_value = ''
group = ceph_broker.get_group('images')
group = ceph.broker.get_group('images')
self.assertEqual(group, {
'pools': [],
'services': []
})
@mock.patch('ceph_broker.monitor_key_get')
@patch.object(ceph.broker, 'monitor_key_get')
def test_get_group_empty(self, _monitor_key_get):
_monitor_key_get.return_value = None
group = ceph_broker.get_group('images')
group = ceph.broker.get_group('images')
self.assertEqual(group, {
'pools': [],
'services': []
})
@mock.patch('ceph_broker.monitor_key_get')
@patch.object(ceph.broker, 'monitor_key_get')
def test_get_group(self, _monitor_key_get):
_monitor_key_get.return_value = '{"pools": ["glance"], "services": []}'
group = ceph_broker.get_group('images')
group = ceph.broker.get_group('images')
self.assertEqual(group, {
'pools': ["glance"],
'services': []
})
@mock.patch('ceph_broker.log')
@patch.object(ceph.broker, 'log')
def test_process_requests_noop(self, mock_log):
req = json.dumps({'api-version': 1, 'ops': []})
rc = ceph_broker.process_requests(req)
rc = ceph.broker.process_requests(req)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.log')
@patch.object(ceph.broker, 'log')
def test_process_requests_missing_api_version(self, mock_log):
req = json.dumps({'ops': []})
rc = ceph_broker.process_requests(req)
rc = ceph.broker.process_requests(req)
self.assertEqual(json.loads(rc), {
'exit-code': 1,
'stderr': 'Missing or invalid api version (None)'})
@mock.patch('ceph_broker.log')
@patch.object(ceph.broker, 'log')
def test_process_requests_invalid_api_version(self, mock_log):
req = json.dumps({'api-version': 2, 'ops': []})
rc = ceph_broker.process_requests(req)
rc = ceph.broker.process_requests(req)
self.assertEqual(json.loads(rc),
{'exit-code': 1,
'stderr': 'Missing or invalid api version (2)'})
@mock.patch('ceph_broker.log')
@patch.object(ceph.broker, 'log')
def test_process_requests_invalid(self, mock_log):
reqs = json.dumps({'api-version': 1, 'ops': [{'op': 'invalid_op'}]})
rc = ceph_broker.process_requests(reqs)
rc = ceph.broker.process_requests(reqs)
self.assertEqual(json.loads(rc),
{'exit-code': 1,
'stderr': "Unknown operation 'invalid_op'"})
@mock.patch('ceph_broker.ReplicatedPool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
@patch.object(ceph.broker, 'ReplicatedPool')
@patch.object(ceph.broker, 'pool_exists')
@patch.object(ceph.broker, 'log')
def test_process_requests_create_pool_w_pg_num(self, mock_log,
mock_pool_exists,
mock_replicated_pool):
@ -243,16 +244,16 @@ class CephBrokerTestCase(unittest.TestCase):
'name': 'foo',
'replicas': 3,
'pg_num': 100}]})
rc = ceph_broker.process_requests(reqs)
rc = ceph.broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_replicated_pool.assert_called_with(service='admin', name='foo',
replicas=3, pg_num=100)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.ReplicatedPool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
@mock.patch('ceph_broker.add_pool_to_group')
@patch.object(ceph.broker, 'ReplicatedPool')
@patch.object(ceph.broker, 'pool_exists')
@patch.object(ceph.broker, 'log')
@patch.object(ceph.broker, 'add_pool_to_group')
def test_process_requests_create_pool_w_group(self, add_pool_to_group,
mock_log, mock_pool_exists,
mock_replicated_pool):
@ -263,7 +264,7 @@ class CephBrokerTestCase(unittest.TestCase):
'name': 'foo',
'replicas': 3,
'group': 'image'}]})
rc = ceph_broker.process_requests(reqs)
rc = ceph.broker.process_requests(reqs)
add_pool_to_group.assert_called_with(group='image',
pool='foo',
namespace=None)
@ -272,9 +273,9 @@ class CephBrokerTestCase(unittest.TestCase):
replicas=3)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.ReplicatedPool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
@patch.object(ceph.broker, 'ReplicatedPool')
@patch.object(ceph.broker, 'pool_exists')
@patch.object(ceph.broker, 'log')
def test_process_requests_create_pool_exists(self, mock_log,
mock_pool_exists,
mock_replicated_pool):
@ -283,15 +284,15 @@ class CephBrokerTestCase(unittest.TestCase):
'ops': [{'op': 'create-pool',
'name': 'foo',
'replicas': 3}]})
rc = ceph_broker.process_requests(reqs)
rc = ceph.broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin',
name='foo')
self.assertFalse(mock_replicated_pool.create.called)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.ReplicatedPool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
@patch.object(ceph.broker, 'ReplicatedPool')
@patch.object(ceph.broker, 'pool_exists')
@patch.object(ceph.broker, 'log')
def test_process_requests_create_pool_rid(self, mock_log,
mock_pool_exists,
mock_replicated_pool):
@ -302,7 +303,7 @@ class CephBrokerTestCase(unittest.TestCase):
'op': 'create-pool',
'name': 'foo',
'replicas': 3}]})
rc = ceph_broker.process_requests(reqs)
rc = ceph.broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_replicated_pool.assert_called_with(service='admin',
name='foo',
@ -310,10 +311,10 @@ class CephBrokerTestCase(unittest.TestCase):
self.assertEqual(json.loads(rc)['exit-code'], 0)
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
@mock.patch('ceph_broker.get_cephfs')
@mock.patch('ceph_broker.check_output')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
@patch.object(ceph.broker, 'get_cephfs')
@patch.object(ceph.broker, 'check_output')
@patch.object(ceph.broker, 'pool_exists')
@patch.object(ceph.broker, 'log')
def test_process_requests_create_cephfs(self,
mock_log,
mock_pool_exists,
@ -329,7 +330,7 @@ class CephBrokerTestCase(unittest.TestCase):
'data_pool': 'data',
'metadata_pool': 'metadata',
}]})
rc = ceph_broker.process_requests(reqs)
rc = ceph.broker.process_requests(reqs)
mock_pool_exists.assert_has_calls(
[
call(service='admin', name='data'),
@ -344,20 +345,20 @@ class CephBrokerTestCase(unittest.TestCase):
self.assertEqual(json.loads(rc)['exit-code'], 0)
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
@mock.patch('ceph_broker.check_output')
@mock.patch('ceph.ceph_helpers.Crushmap.load_crushmap')
@mock.patch('ceph.ceph_helpers.Crushmap.ensure_bucket_is_present')
@mock.patch('ceph_broker.get_osd_weight')
@mock.patch('ceph_broker.log')
@patch.object(ceph.broker, 'check_output')
@patch.object(ceph.broker, 'get_osd_weight')
@patch.object(ceph.broker, 'log')
@patch('ceph.crush_utils.Crushmap.load_crushmap')
@patch('ceph.crush_utils.Crushmap.ensure_bucket_is_present')
def test_process_requests_move_osd(self,
mock_ensure_bucket_is_present,
mock_load_crushmap,
mock_log,
get_osd_weight,
ensure_bucket_is_present,
load_crushmap,
check_output):
load_crushmap.return_value = ""
ensure_bucket_is_present.return_value = None
get_osd_weight.return_value = 1
mock_get_osd_weight,
mock_check_output):
mock_load_crushmap.return_value = ""
mock_ensure_bucket_is_present.return_value = None
mock_get_osd_weight.return_value = 1
reqs = json.dumps({'api-version': 1,
'request-id': '1ef5aede',
'ops': [{
@ -365,20 +366,19 @@ class CephBrokerTestCase(unittest.TestCase):
'osd': 'osd.0',
'bucket': 'test'
}]})
rc = ceph_broker.process_requests(reqs)
check_output.assert_called_with(["ceph",
'--id', 'admin',
"osd", "crush", "set",
u"osd.0", "1", "root=test"])
rc = ceph.broker.process_requests(reqs)
self.assertEqual(json.loads(rc)['exit-code'], 0)
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
mock_check_output.assert_called_with(["ceph",
'--id', 'admin',
"osd", "crush", "set",
"osd.0", "1", "root=test"])
@mock.patch('ceph_broker.log')
@patch.object(ceph.broker, 'log')
def test_process_requests_invalid_api_rid(self, mock_log):
reqs = json.dumps({'api-version': 0, 'request-id': '1ef5aede',
'ops': [{'op': 'create-pool'}]})
rc = ceph_broker.process_requests(reqs)
rc = ceph.broker.process_requests(reqs)
self.assertEqual(json.loads(rc)['exit-code'], 1)
self.assertEqual(json.loads(rc)['stderr'],
"Missing or invalid api version (0)")

View File

@ -1,946 +0,0 @@
import io
import unittest
from shutil import rmtree
from tempfile import mkdtemp
from threading import Timer
import json
from subprocess import CalledProcessError
import nose.plugins.attrib
import os
import time
from contextlib import contextmanager
from mock import patch, call, MagicMock
import ceph_helpers as ceph_utils
import six
if not six.PY3:
builtin_open = '__builtin__.open'
else:
builtin_open = 'builtins.open'
LS_POOLS = """
images
volumes
rbd
"""
LS_RBDS = """
rbd1
rbd2
rbd3
"""
IMG_MAP = """
bar
baz
"""
@contextmanager
def patch_open():
"""Patch open() to allow mocking both open() itself and the file that is
yielded.
Yields the mock for "open" and "file", respectively."""
mock_open = MagicMock(spec=open)
mock_file = MagicMock(spec=io.FileIO)
@contextmanager
def stub_open(*args, **kwargs):
mock_open(*args, **kwargs)
yield mock_file
with patch(builtin_open, stub_open):
yield mock_open, mock_file
class CephCrushmapTests(unittest.TestCase):
def setUp(self):
super(CephCrushmapTests, self).setUp()
@patch.object(ceph_utils.Crushmap, 'load_crushmap')
def test_crushmap_buckets(self, load_crushmap):
load_crushmap.return_value = ""
crushmap = ceph_utils.Crushmap()
crushmap.add_bucket("test")
self.assertEqual(
crushmap.buckets(), [ceph_utils.Crushmap.Bucket("test", -1)])
@patch.object(ceph_utils.Crushmap, 'load_crushmap')
def test_parsed_crushmap(self, load_crushmap):
load_crushmap.return_value = """# begin crush map
tunable choose_local_tries 0
tunable choose_local_fallback_tries 0
tunable choose_total_tries 50
tunable chooseleaf_descend_once 1
tunable chooseleaf_vary_r 1
tunable straw_calc_version 1
# devices
device 0 osd.0
device 1 osd.1
device 2 osd.2
# types
type 0 osd
type 1 host
type 2 chassis
type 3 rack
type 4 row
type 5 pdu
type 6 pod
type 7 room
type 8 datacenter
type 9 region
type 10 root
# buckets
host ip-172-31-33-152 {
id -2 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.0 weight 0.003
}
host ip-172-31-54-117 {
id -3 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.1 weight 0.003
}
host ip-172-31-30-0 {
id -4 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.2 weight 0.003
}
root default {
id -1 # do not change unnecessarily
# weight 0.009
alg straw
hash 0 # rjenkins1
item ip-172-31-33-152 weight 0.003
item ip-172-31-54-117 weight 0.003
item ip-172-31-30-0 weight 0.003
}
# rules
rule replicated_ruleset {
ruleset 0
type replicated
min_size 1
max_size 10
step take default
step chooseleaf firstn 0 type host
step emit
}
# end crush map"""
crushmap = ceph_utils.Crushmap()
self.assertEqual(
[ceph_utils.Crushmap.Bucket("default", -1, True)],
crushmap.buckets())
self.assertEqual([-4, -3, -2, -1], crushmap._ids)
@patch.object(ceph_utils.Crushmap, 'load_crushmap')
def test_build_crushmap(self, load_crushmap):
load_crushmap.return_value = """# begin crush map
tunable choose_local_tries 0
tunable choose_local_fallback_tries 0
tunable choose_total_tries 50
tunable chooseleaf_descend_once 1
tunable chooseleaf_vary_r 1
tunable straw_calc_version 1
# devices
device 0 osd.0
device 1 osd.1
device 2 osd.2
# types
type 0 osd
type 1 host
type 2 chassis
type 3 rack
type 4 row
type 5 pdu
type 6 pod
type 7 room
type 8 datacenter
type 9 region
type 10 root
# buckets
host ip-172-31-33-152 {
id -2 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.0 weight 0.003
}
host ip-172-31-54-117 {
id -3 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.1 weight 0.003
}
host ip-172-31-30-0 {
id -4 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.2 weight 0.003
}
root default {
id -1 # do not change unnecessarily
# weight 0.009
alg straw
hash 0 # rjenkins1
item ip-172-31-33-152 weight 0.003
item ip-172-31-54-117 weight 0.003
item ip-172-31-30-0 weight 0.003
}
# rules
rule replicated_ruleset {
ruleset 0
type replicated
min_size 1
max_size 10
step take default
step chooseleaf firstn 0 type host
step emit
}
# end crush map"""
expected = """# begin crush map
tunable choose_local_tries 0
tunable choose_local_fallback_tries 0
tunable choose_total_tries 50
tunable chooseleaf_descend_once 1
tunable chooseleaf_vary_r 1
tunable straw_calc_version 1
# devices
device 0 osd.0
device 1 osd.1
device 2 osd.2
# types
type 0 osd
type 1 host
type 2 chassis
type 3 rack
type 4 row
type 5 pdu
type 6 pod
type 7 room
type 8 datacenter
type 9 region
type 10 root
# buckets
host ip-172-31-33-152 {
id -2 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.0 weight 0.003
}
host ip-172-31-54-117 {
id -3 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.1 weight 0.003
}
host ip-172-31-30-0 {
id -4 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.2 weight 0.003
}
root default {
id -1 # do not change unnecessarily
# weight 0.009
alg straw
hash 0 # rjenkins1
item ip-172-31-33-152 weight 0.003
item ip-172-31-54-117 weight 0.003
item ip-172-31-30-0 weight 0.003
}
# rules
rule replicated_ruleset {
ruleset 0
type replicated
min_size 1
max_size 10
step take default
step chooseleaf firstn 0 type host
step emit
}
# end crush map
root test {
id -5 # do not change unnecessarily
# weight 0.000
alg straw
hash 0 # rjenkins1
}
rule test {
ruleset 0
type replicated
min_size 1
max_size 10
step take test
step chooseleaf firstn 0 type host
step emit
}"""
crushmap = ceph_utils.Crushmap()
crushmap.add_bucket("test")
self.assertEqual(expected, crushmap.build_crushmap())
def test_crushmap_string(self):
result = ceph_utils.Crushmap.bucket_string("fast", -21)
expected = """root fast {
id -21 # do not change unnecessarily
# weight 0.000
alg straw
hash 0 # rjenkins1
}
rule fast {
ruleset 0
type replicated
min_size 1
max_size 10
step take fast
step chooseleaf firstn 0 type host
step emit
}"""
self.assertEqual(expected, result)
class CephUtilsTests(unittest.TestCase):
def setUp(self):
super(CephUtilsTests, self).setUp()
[self._patch(m) for m in [
'check_call',
'check_output',
'log',
]]
def _patch(self, method):
_m = patch.object(ceph_utils, method)
mock = _m.start()
self.addCleanup(_m.stop)
setattr(self, method, mock)
@patch('os.path.exists')
def test_create_keyring(self, _exists):
"""It creates a new ceph keyring"""
_exists.return_value = False
ceph_utils.create_keyring('cinder', 'cephkey')
_cmd = ['ceph-authtool', '/etc/ceph/ceph.client.cinder.keyring',
'--create-keyring', '--name=client.cinder',
'--add-key=cephkey']
self.check_call.assert_called_with(_cmd)
@patch('os.path.exists')
def test_create_keyring_already_exists(self, _exists):
"""It creates a new ceph keyring"""
_exists.return_value = True
ceph_utils.create_keyring('cinder', 'cephkey')
self.assertTrue(self.log.called)
self.check_call.assert_not_called()
@patch('os.remove')
@patch('os.path.exists')
def test_delete_keyring(self, _exists, _remove):
"""It deletes a ceph keyring."""
_exists.return_value = True
ceph_utils.delete_keyring('cinder')
_remove.assert_called_with('/etc/ceph/ceph.client.cinder.keyring')
self.assertTrue(self.log.called)
@patch('os.remove')
@patch('os.path.exists')
def test_delete_keyring_not_exists(self, _exists, _remove):
"""It creates a new ceph keyring."""
_exists.return_value = False
ceph_utils.delete_keyring('cinder')
self.assertTrue(self.log.called)
_remove.assert_not_called()
@patch('os.path.exists')
def test_create_keyfile(self, _exists):
"""It creates a new ceph keyfile"""
_exists.return_value = False
with patch_open() as (_open, _file):
ceph_utils.create_key_file('cinder', 'cephkey')
_file.write.assert_called_with('cephkey')
self.assertTrue(self.log.called)
@patch('os.path.exists')
def test_create_key_file_already_exists(self, _exists):
"""It creates a new ceph keyring"""
_exists.return_value = True
ceph_utils.create_key_file('cinder', 'cephkey')
self.assertTrue(self.log.called)
@patch('os.mkdir')
@patch.object(ceph_utils, 'apt_install')
@patch('os.path.exists')
def test_install(self, _exists, _install, _mkdir):
_exists.return_value = False
ceph_utils.install()
_mkdir.assert_called_with('/etc/ceph')
_install.assert_called_with('ceph-common', fatal=True)
@patch.object(ceph_utils, 'ceph_version')
def test_get_osds(self, version):
version.return_value = '0.56.2'
self.check_output.return_value = json.dumps([1, 2, 3])
self.assertEquals(ceph_utils.get_osds('test'), [1, 2, 3])
@patch.object(ceph_utils, 'ceph_version')
def test_get_osds_argonaut(self, version):
version.return_value = '0.48.3'
self.assertEquals(ceph_utils.get_osds('test'), None)
@patch.object(ceph_utils, 'ceph_version')
def test_get_osds_none(self, version):
version.return_value = '0.56.2'
self.check_output.return_value = json.dumps(None)
self.assertEquals(ceph_utils.get_osds('test'), None)
@patch.object(ceph_utils, 'get_osds')
@patch.object(ceph_utils, 'pool_exists')
@patch.object(ceph_utils.Pool, 'get_pgs')
@patch.object(ceph_utils, 'get_erasure_profile')
def test_create_erasure_pool(self, _erasure_profile, _pgs, _exists,
_get_osds):
"""It creates rados pool correctly with a default erasure profile """
_exists.return_value = False
_get_osds.return_value = [1, 2, 3]
_pgs.return_value = 100
_erasure_profile.return_value = {"k": 2, "m": 1}
erasure_pool = ceph_utils.ErasurePool(service='cinder',
name='foo',
erasure_code_profile="default")
erasure_pool.create()
self.check_call.assert_has_calls([
call(['ceph', '--id', 'cinder', 'osd', 'pool',
'create', 'foo', '100', '100', 'erasure', 'default']),
])
@patch.object(ceph_utils, 'get_osds')
@patch.object(ceph_utils, 'pool_exists')
@patch.object(ceph_utils.Pool, 'get_pgs')
@patch.object(ceph_utils, 'get_erasure_profile')
def test_create_erasure_local_pool(self, _erasure_profile, _pgs, _exists,
_get_osds):
"""It creates rados pool correctly with a default erasure profile """
_exists.return_value = False
_get_osds.return_value = [1, 2, 3]
_pgs.return_value = 100
_erasure_profile.return_value = {"k": 2, "m": 1, "l": 1}
local_erasure_pool = ceph_utils.ErasurePool(
service='cinder',
name='foo',
erasure_code_profile="default")
local_erasure_pool.create()
self.check_call.assert_has_calls([
call(['ceph', '--id', 'cinder', 'osd', 'pool',
'create', 'foo', '100', '100', 'erasure', 'default']),
])
@patch.object(ceph_utils, 'get_osds')
@patch.object(ceph_utils, 'pool_exists')
@patch.object(ceph_utils.Pool, 'get_pgs')
def test_create_pool(self, _pgs, _exists, _get_osds):
"""It creates rados pool correctly with default replicas """
_exists.return_value = False
_get_osds.return_value = [1, 2, 3]
_pgs.return_value = 100
replicated_pool = ceph_utils.ReplicatedPool(service='cinder',
name='foo', replicas=3)
replicated_pool.create()
self.check_call.assert_has_calls([
call(['ceph', '--id', 'cinder', 'osd', 'pool',
'create', 'foo', '100']),
call(['ceph', '--id', 'cinder', 'osd', 'pool', 'set',
'foo', 'size', '3'])
])
@patch.object(ceph_utils, 'get_osds')
@patch.object(ceph_utils, 'pool_exists')
@patch.object(ceph_utils.Pool, 'get_pgs')
def test_create_pool_2_replicas(self, _pgs, _exists, _get_osds):
"""It creates rados pool correctly with 3 replicas"""
_exists.return_value = False
_get_osds.return_value = [1, 2, 3]
_pgs.return_value = 150
replicated_pool = ceph_utils.ReplicatedPool(service='cinder',
name='foo',
replicas=2)
replicated_pool.create()
self.check_call.assert_has_calls([
call(['ceph', '--id', 'cinder', 'osd', 'pool',
'create', 'foo', '150']),
call(['ceph', '--id', 'cinder', 'osd', 'pool', 'set',
'foo', 'size', '2'])
])
@patch.object(ceph_utils, 'get_osds')
@patch.object(ceph_utils, 'pool_exists')
@patch.object(ceph_utils.Pool, 'get_pgs')
def test_create_pool_argonaut(self, _pgs, _exists, _get_osds):
"""It creates rados pool correctly with 3 replicas"""
_exists.return_value = False
_get_osds.return_value = None
_pgs.return_value = 200
replicated_pool = ceph_utils.ReplicatedPool(service='cinder',
name='foo',
replicas=3)
replicated_pool.create()
# ceph_utils.create_pool(service='cinder', pool_class=replicated_pool)
self.check_call.assert_has_calls([
call(['ceph', '--id', 'cinder', 'osd', 'pool',
'create', 'foo', '200']),
call(['ceph', '--id', 'cinder', 'osd', 'pool', 'set',
'foo', 'size', '3'])
])
@patch.object(ceph_utils, 'pool_exists')
@patch.object(ceph_utils.Pool, 'get_pgs')
def test_create_pool_already_exists(self, _pgs, _exists):
_exists.return_value = True
_pgs.return_value = 200
replicated_pool = ceph_utils.ReplicatedPool(service='cinder',
name='foo')
replicated_pool.create()
self.assertFalse(self.log.called)
self.check_call.assert_not_called()
def test_keyring_path(self):
"""It correctly dervies keyring path from service name"""
result = ceph_utils._keyring_path('cinder')
self.assertEquals('/etc/ceph/ceph.client.cinder.keyring', result)
def test_keyfile_path(self):
"""It correctly dervies keyring path from service name"""
result = ceph_utils._keyfile_path('cinder')
self.assertEquals('/etc/ceph/ceph.client.cinder.key', result)
def test_pool_exists(self):
"""It detects an rbd pool exists"""
self.check_output.return_value = LS_POOLS
self.assertTrue(ceph_utils.pool_exists('cinder', 'volumes'))
def test_pool_does_not_exist(self):
"""It detects an rbd pool exists"""
self.check_output.return_value = LS_POOLS
self.assertFalse(ceph_utils.pool_exists('cinder', 'foo'))
def test_pool_exists_error(self):
""" Ensure subprocess errors and sandboxed with False """
self.check_output.side_effect = CalledProcessError(1, 'rados')
self.assertFalse(ceph_utils.pool_exists('cinder', 'foo'))
def test_rbd_exists(self):
self.check_output.return_value = LS_RBDS
self.assertTrue(ceph_utils.rbd_exists('service', 'pool', 'rbd1'))
self.check_output.assert_called_with(
['rbd', 'list', '--id', 'service', '--pool', 'pool']
)
def test_rbd_does_not_exist(self):
self.check_output.return_value = LS_RBDS
self.assertFalse(ceph_utils.rbd_exists('service', 'pool', 'rbd4'))
self.check_output.assert_called_with(
['rbd', 'list', '--id', 'service', '--pool', 'pool']
)
def test_rbd_exists_error(self):
""" Ensure subprocess errors and sandboxed with False """
self.check_output.side_effect = CalledProcessError(1, 'rbd')
self.assertFalse(ceph_utils.rbd_exists('cinder', 'foo', 'rbd'))
def test_create_rbd_image(self):
ceph_utils.create_rbd_image('service', 'pool', 'image', 128)
_cmd = ['rbd', 'create', 'image',
'--size', '128',
'--id', 'service',
'--pool', 'pool']
self.check_call.assert_called_with(_cmd)
def test_delete_pool(self):
ceph_utils.delete_pool('cinder', 'pool')
_cmd = [
'ceph', '--id', 'cinder',
'osd', 'pool', 'delete',
'pool', '--yes-i-really-really-mean-it'
]
self.check_call.assert_called_with(_cmd)
def test_get_ceph_nodes(self):
self._patch('relation_ids')
self._patch('related_units')
self._patch('relation_get')
units = ['ceph/1', 'ceph2', 'ceph/3']
self.relation_ids.return_value = ['ceph:0']
self.related_units.return_value = units
self.relation_get.return_value = '192.168.1.1'
self.assertEquals(len(ceph_utils.get_ceph_nodes()), 3)
def test_get_ceph_nodes_not_related(self):
self._patch('relation_ids')
self.relation_ids.return_value = []
self.assertEquals(ceph_utils.get_ceph_nodes(), [])
def test_configure(self):
self._patch('create_keyring')
self._patch('create_key_file')
self._patch('get_ceph_nodes')
self._patch('modprobe')
_hosts = ['192.168.1.1', '192.168.1.2']
self.get_ceph_nodes.return_value = _hosts
_conf = ceph_utils.CEPH_CONF.format(
auth='cephx',
keyring=ceph_utils._keyring_path('cinder'),
mon_hosts=",".join(map(str, _hosts)),
use_syslog='true'
)
with patch_open() as (_open, _file):
ceph_utils.configure('cinder', 'key', 'cephx', 'true')
_file.write.assert_called_with(_conf)
_open.assert_called_with('/etc/ceph/ceph.conf', 'w')
self.modprobe.assert_called_with('rbd')
self.create_keyring.assert_called_with('cinder', 'key')
self.create_key_file.assert_called_with('cinder', 'key')
def test_image_mapped(self):
self.check_output.return_value = IMG_MAP
self.assertTrue(ceph_utils.image_mapped('bar'))
def test_image_not_mapped(self):
self.check_output.return_value = IMG_MAP
self.assertFalse(ceph_utils.image_mapped('foo'))
def test_image_not_mapped_error(self):
self.check_output.side_effect = CalledProcessError(1, 'rbd')
self.assertFalse(ceph_utils.image_mapped('bar'))
def test_map_block_storage(self):
_service = 'cinder'
_pool = 'bar'
_img = 'foo'
_cmd = [
'rbd',
'map',
'{}/{}'.format(_pool, _img),
'--user',
_service,
'--secret',
ceph_utils._keyfile_path(_service),
]
ceph_utils.map_block_storage(_service, _pool, _img)
self.check_call.assert_called_with(_cmd)
def test_filesystem_mounted(self):
self._patch('mounts')
self.mounts.return_value = [['/afs', '/dev/sdb'], ['/bfs', '/dev/sdd']]
self.assertTrue(ceph_utils.filesystem_mounted('/afs'))
self.assertFalse(ceph_utils.filesystem_mounted('/zfs'))
@patch('os.path.exists')
def test_make_filesystem(self, _exists):
_exists.return_value = True
ceph_utils.make_filesystem('/dev/sdd')
self.assertTrue(self.log.called)
self.check_call.assert_called_with(['mkfs', '-t', 'ext4', '/dev/sdd'])
@patch('os.path.exists')
def test_make_filesystem_xfs(self, _exists):
_exists.return_value = True
ceph_utils.make_filesystem('/dev/sdd', 'xfs')
self.assertTrue(self.log.called)
self.check_call.assert_called_with(['mkfs', '-t', 'xfs', '/dev/sdd'])
@patch('os.chown')
@patch('os.stat')
def test_place_data_on_block_device(self, _stat, _chown):
self._patch('mount')
self._patch('copy_files')
self._patch('umount')
_stat.return_value.st_uid = 100
_stat.return_value.st_gid = 100
ceph_utils.place_data_on_block_device('/dev/sdd', '/var/lib/mysql')
self.mount.assert_has_calls([
call('/dev/sdd', '/mnt'),
call('/dev/sdd', '/var/lib/mysql', persist=True)
])
self.copy_files.assert_called_with('/var/lib/mysql', '/mnt')
self.umount.assert_called_with('/mnt')
_chown.assert_called_with('/var/lib/mysql', 100, 100)
@patch('shutil.copytree')
@patch('os.listdir')
@patch('os.path.isdir')
def test_copy_files_is_dir(self, _isdir, _listdir, _copytree):
_isdir.return_value = True
subdirs = ['a', 'b', 'c']
_listdir.return_value = subdirs
ceph_utils.copy_files('/source', '/dest')
for d in subdirs:
_copytree.assert_has_calls([
call('/source/{}'.format(d), '/dest/{}'.format(d),
False, None)
])
@patch('shutil.copytree')
@patch('os.listdir')
@patch('os.path.isdir')
def test_copy_files_include_symlinks(self, _isdir, _listdir, _copytree):
_isdir.return_value = True
subdirs = ['a', 'b', 'c']
_listdir.return_value = subdirs
ceph_utils.copy_files('/source', '/dest', True)
for d in subdirs:
_copytree.assert_has_calls([
call('/source/{}'.format(d), '/dest/{}'.format(d),
True, None)
])
@patch('shutil.copytree')
@patch('os.listdir')
@patch('os.path.isdir')
def test_copy_files_ignore(self, _isdir, _listdir, _copytree):
_isdir.return_value = True
subdirs = ['a', 'b', 'c']
_listdir.return_value = subdirs
ceph_utils.copy_files('/source', '/dest', True, False)
for d in subdirs:
_copytree.assert_has_calls([
call('/source/{}'.format(d), '/dest/{}'.format(d),
True, False)
])
@patch('shutil.copy2')
@patch('os.listdir')
@patch('os.path.isdir')
def test_copy_files_files(self, _isdir, _listdir, _copy2):
_isdir.return_value = False
files = ['a', 'b', 'c']
_listdir.return_value = files
ceph_utils.copy_files('/source', '/dest')
for f in files:
_copy2.assert_has_calls([
call('/source/{}'.format(f), '/dest/{}'.format(f))
])
@patch.object(ceph_utils.Pool, 'get_pgs')
def test_ensure_ceph_storage(self, _pgs):
self._patch('pool_exists')
self.pool_exists.return_value = False
self._patch('create_pool')
self._patch('rbd_exists')
self.rbd_exists.return_value = False
self._patch('create_rbd_image')
self._patch('image_mapped')
self.image_mapped.return_value = False
self._patch('map_block_storage')
self._patch('filesystem_mounted')
self.filesystem_mounted.return_value = False
self._patch('make_filesystem')
self._patch('service_stop')
self._patch('service_start')
self._patch('service_running')
self.service_running.return_value = True
self._patch('place_data_on_block_device')
_pgs.return_value = 200
_service = 'mysql'
_pool = 'foo'
_rbd_img = 'foo'
_mount = '/var/lib/mysql'
_services = ['mysql']
_blk_dev = '/dev/rbd1'
ceph_utils.ensure_ceph_storage(_service, _pool,
_rbd_img, 1024, _mount,
_blk_dev, 'ext4', _services)
self.create_pool.assert_called_with(_service, _pool, replicas=3)
self.create_rbd_image.assert_called_with(_service, _pool,
_rbd_img, 1024)
self.map_block_storage.assert_called_with(_service, _pool, _rbd_img)
self.make_filesystem.assert_called_with(_blk_dev, 'ext4')
self.service_stop.assert_called_with(_services[0])
self.place_data_on_block_device.assert_called_with(_blk_dev, _mount)
self.service_start.assert_called_with(_services[0])
def test_make_filesystem_default_filesystem(self):
"""make_filesystem() uses ext4 as the default filesystem."""
device = '/dev/zero'
ceph_utils.make_filesystem(device)
self.check_call.assert_called_with(['mkfs', '-t', 'ext4', device])
def test_make_filesystem_no_device(self):
"""make_filesystem() raises an IOError if the device does not exist."""
device = '/no/such/device'
with self.assertRaises(IOError) as cm:
ceph_utils.make_filesystem(device, timeout=0)
e = cm.exception
self.assertEquals(device, e.filename)
self.assertEquals(os.errno.ENOENT, e.errno)
self.assertEquals(os.strerror(os.errno.ENOENT), e.strerror)
self.log.assert_called_with(
'Gave up waiting on block device %s' % device, level='ERROR')
@nose.plugins.attrib.attr('slow')
def test_make_filesystem_timeout(self):
"""
make_filesystem() allows to specify how long it should wait for the
device to appear before it fails.
"""
device = '/no/such/device'
timeout = 2
before = time.time()
self.assertRaises(IOError, ceph_utils.make_filesystem, device,
timeout=timeout)
after = time.time()
duration = after - before
self.assertTrue(timeout - duration < 0.1)
self.log.assert_called_with(
'Gave up waiting on block device %s' % device, level='ERROR')
@nose.plugins.attrib.attr('slow')
def test_device_is_formatted_if_it_appears(self):
"""
The specified device is formatted if it appears before the timeout
is reached.
"""
def create_my_device(filename):
with open(filename, "w") as device:
device.write("hello\n")
temp_dir = mkdtemp()
self.addCleanup(rmtree, temp_dir)
device = "%s/mydevice" % temp_dir
fstype = 'xfs'
timeout = 4
t = Timer(2, create_my_device, [device])
t.start()
ceph_utils.make_filesystem(device, fstype, timeout)
self.check_call.assert_called_with(['mkfs', '-t', fstype, device])
def test_existing_device_is_formatted(self):
"""
make_filesystem() formats the given device if it exists with the
specified filesystem.
"""
device = '/dev/zero'
fstype = 'xfs'
ceph_utils.make_filesystem(device, fstype)
self.check_call.assert_called_with(['mkfs', '-t', fstype, device])
self.log.assert_called_with(
'Formatting block device %s as '
'filesystem %s.' % (device, fstype), level='INFO'
)
@patch.object(ceph_utils, 'relation_ids')
@patch.object(ceph_utils, 'related_units')
@patch.object(ceph_utils, 'relation_get')
def test_ensure_ceph_keyring_no_relation_no_data(self, rget, runits, rids):
rids.return_value = []
self.assertEquals(False, ceph_utils.ensure_ceph_keyring(service='foo'))
rids.return_value = ['ceph:0']
runits.return_value = ['ceph/0']
rget.return_value = ''
self.assertEquals(False, ceph_utils.ensure_ceph_keyring(service='foo'))
@patch.object(ceph_utils, '_keyring_path')
@patch.object(ceph_utils, 'create_keyring')
@patch.object(ceph_utils, 'relation_ids')
@patch.object(ceph_utils, 'related_units')
@patch.object(ceph_utils, 'relation_get')
def test_ensure_ceph_keyring_with_data(self, rget, runits,
rids, create, _path):
rids.return_value = ['ceph:0']
runits.return_value = ['ceph/0']
rget.return_value = 'fookey'
self.assertEquals(True,
ceph_utils.ensure_ceph_keyring(service='foo'))
create.assert_called_with(service='foo', key='fookey')
_path.assert_called_with('foo')
self.assertFalse(self.check_call.called)
_path.return_value = '/etc/ceph/client.foo.keyring'
self.assertEquals(
True,
ceph_utils.ensure_ceph_keyring(
service='foo', user='adam', group='users'))
create.assert_called_with(service='foo', key='fookey')
_path.assert_called_with('foo')
self.check_call.assert_called_with([
'chown',
'adam.users',
'/etc/ceph/client.foo.keyring'
])
@patch('os.path.exists')
def test_ceph_version_not_installed(self, path):
path.return_value = False
self.assertEquals(ceph_utils.ceph_version(), None)
@patch.object(ceph_utils, 'check_output')
@patch('os.path.exists')
def test_ceph_version_error(self, path, output):
path.return_value = True
output.return_value = b''
self.assertEquals(ceph_utils.ceph_version(), None)
@patch.object(ceph_utils, 'check_output')
@patch('os.path.exists')
def test_ceph_version_ok(self, path, output):
path.return_value = True
output.return_value = \
'ceph version 0.67.4 (ad85b8bfafea6232d64cb7ba76a8b6e8252fa0c7)'
self.assertEquals(ceph_utils.ceph_version(), '0.67.4')
@patch.object(ceph_utils.Pool, 'get_pgs')
def test_ceph_broker_rq_class(self, _get_pgs):
_get_pgs.return_value = 200
rq = ceph_utils.CephBrokerRq(
request_id="3f8941ec-9707-11e6-a0e3-305a3a7cf348")
rq.add_op_create_pool(name='pool1', replica_count=1, pg_num=200)
rq.add_op_create_pool(name='pool2', pg_num=200)
expected = json.dumps({"api-version": 1,
"request-id":
"3f8941ec-9707-11e6-a0e3-305a3a7cf348",
"ops": [{"op": "create-pool",
"replicas": 1, "pg_num": 200,
"name": "pool1", "weight": None},
{"op": "create-pool",
"replicas": 3, "pg_num": 200,
"name": "pool2", "weight": None}]
})
self.assertEqual(rq.request, expected)
def test_ceph_broker_rsp_class(self):
rsp = ceph_utils.CephBrokerRsp(json.dumps({'exit-code': 0,
'stderr': "Success"}))
self.assertEqual(rsp.exit_code, 0)
self.assertEqual(rsp.exit_msg, "Success")

View File

@ -0,0 +1,303 @@
# Copyright 2016 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import ceph.crush_utils
from mock import patch
CRUSHMAP1 = """# begin crush map
tunable choose_local_tries 0
tunable choose_local_fallback_tries 0
tunable choose_total_tries 50
tunable chooseleaf_descend_once 1
tunable chooseleaf_vary_r 1
tunable straw_calc_version 1
# devices
device 0 osd.0
device 1 osd.1
device 2 osd.2
# types
type 0 osd
type 1 host
type 2 chassis
type 3 rack
type 4 row
type 5 pdu
type 6 pod
type 7 room
type 8 datacenter
type 9 region
type 10 root
# buckets
host ip-172-31-33-152 {
id -2 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.0 weight 0.003
}
host ip-172-31-54-117 {
id -3 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.1 weight 0.003
}
host ip-172-31-30-0 {
id -4 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.2 weight 0.003
}
root default {
id -1 # do not change unnecessarily
# weight 0.009
alg straw
hash 0 # rjenkins1
item ip-172-31-33-152 weight 0.003
item ip-172-31-54-117 weight 0.003
item ip-172-31-30-0 weight 0.003
}
# rules
rule replicated_ruleset {
ruleset 0
type replicated
min_size 1
max_size 10
step take default
step chooseleaf firstn 0 type host
step emit
}
# end crush map"""
CRUSHMAP2 = """# begin crush map
tunable choose_local_tries 0
tunable choose_local_fallback_tries 0
tunable choose_total_tries 50
tunable chooseleaf_descend_once 1
tunable chooseleaf_vary_r 1
tunable straw_calc_version 1
# devices
device 0 osd.0
device 1 osd.1
device 2 osd.2
# types
type 0 osd
type 1 host
type 2 chassis
type 3 rack
type 4 row
type 5 pdu
type 6 pod
type 7 room
type 8 datacenter
type 9 region
type 10 root
# buckets
host ip-172-31-33-152 {
id -2 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.0 weight 0.003
}
host ip-172-31-54-117 {
id -3 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.1 weight 0.003
}
host ip-172-31-30-0 {
id -4 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.2 weight 0.003
}
root default {
id -1 # do not change unnecessarily
# weight 0.009
alg straw
hash 0 # rjenkins1
item ip-172-31-33-152 weight 0.003
item ip-172-31-54-117 weight 0.003
item ip-172-31-30-0 weight 0.003
}
# rules
rule replicated_ruleset {
ruleset 0
type replicated
min_size 1
max_size 10
step take default
step chooseleaf firstn 0 type host
step emit
}
# end crush map"""
CRUSHMAP3 = """# begin crush map
tunable choose_local_tries 0
tunable choose_local_fallback_tries 0
tunable choose_total_tries 50
tunable chooseleaf_descend_once 1
tunable chooseleaf_vary_r 1
tunable straw_calc_version 1
# devices
device 0 osd.0
device 1 osd.1
device 2 osd.2
# types
type 0 osd
type 1 host
type 2 chassis
type 3 rack
type 4 row
type 5 pdu
type 6 pod
type 7 room
type 8 datacenter
type 9 region
type 10 root
# buckets
host ip-172-31-33-152 {
id -2 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.0 weight 0.003
}
host ip-172-31-54-117 {
id -3 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.1 weight 0.003
}
host ip-172-31-30-0 {
id -4 # do not change unnecessarily
# weight 0.003
alg straw
hash 0 # rjenkins1
item osd.2 weight 0.003
}
root default {
id -1 # do not change unnecessarily
# weight 0.009
alg straw
hash 0 # rjenkins1
item ip-172-31-33-152 weight 0.003
item ip-172-31-54-117 weight 0.003
item ip-172-31-30-0 weight 0.003
}
# rules
rule replicated_ruleset {
ruleset 0
type replicated
min_size 1
max_size 10
step take default
step chooseleaf firstn 0 type host
step emit
}
# end crush map
root test {
id -5 # do not change unnecessarily
# weight 0.000
alg straw
hash 0 # rjenkins1
}
rule test {
ruleset 0
type replicated
min_size 1
max_size 10
step take test
step chooseleaf firstn 0 type host
step emit
}"""
CRUSHMAP4 = """root fast {
id -21 # do not change unnecessarily
# weight 0.000
alg straw
hash 0 # rjenkins1
}
rule fast {
ruleset 0
type replicated
min_size 1
max_size 10
step take fast
step chooseleaf firstn 0 type host
step emit
}"""
class CephCrushmapTests(unittest.TestCase):
def setUp(self):
super(CephCrushmapTests, self).setUp()
@patch.object(ceph.crush_utils.Crushmap, 'load_crushmap')
def test_crushmap_buckets(self, load_crushmap):
load_crushmap.return_value = ""
crushmap = ceph.crush_utils.Crushmap()
crushmap.add_bucket("test")
self.assertEqual(
crushmap.buckets(), [ceph.crush_utils.CRUSHBucket("test", -1)])
@patch.object(ceph.crush_utils.Crushmap, 'load_crushmap')
def test_parsed_crushmap(self, load_crushmap):
load_crushmap.return_value = CRUSHMAP1
crushmap = ceph.crush_utils.Crushmap()
self.assertEqual(
[ceph.crush_utils.CRUSHBucket("default", -1, True)],
crushmap.buckets())
self.assertEqual([-4, -3, -2, -1], crushmap._ids)
@patch.object(ceph.crush_utils.Crushmap, 'load_crushmap')
def test_build_crushmap(self, load_crushmap):
load_crushmap.return_value = CRUSHMAP2
expected = CRUSHMAP3
crushmap = ceph.crush_utils.Crushmap()
crushmap.add_bucket("test")
self.assertEqual(expected, crushmap.build_crushmap())
def test_crushmap_string(self):
result = ceph.crush_utils.Crushmap.bucket_string("fast", -21)
expected = CRUSHMAP4
self.assertEqual(expected, result)

View File

@ -12,16 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import ceph
import mock
import unittest
from mock import patch
import ceph.utils
class DiskTuningTestCase(unittest.TestCase):
def setUp(self):
super(DiskTuningTestCase, self).setUp()
@mock.patch.object(ceph, 'templating')
@patch.object(ceph.utils, 'templating')
def test_persist_settings(self, _templating):
renderer = _templating.render
settings = {
@ -31,13 +33,13 @@ class DiskTuningTestCase(unittest.TestCase):
}
}
}
ceph.persist_settings(settings)
ceph.utils.persist_settings(settings)
renderer.assert_called_once_with(source='hdparm.conf',
target=ceph.HDPARM_FILE,
target=ceph.utils.HDPARM_FILE,
context=settings)
@mock.patch.object(ceph, 'templating')
@patch.object(ceph.utils, 'templating')
def test_persist_settings_empty_dict(self, _templating):
renderer = _templating.render
ceph.persist_settings({})
ceph.utils.persist_settings({})
assert not renderer.called, 'renderer should not have been called'

View File

@ -11,13 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import sys
import time
import unittest
from mock import patch, call, MagicMock
import ceph
import ceph.utils
# python-apt is not installed as part of test-requirements but is imported by
# some charmhelpers modules so create a fake import.
@ -51,16 +52,16 @@ def monitor_key_side_effect(*args):
class UpgradeRollingTestCase(unittest.TestCase):
@patch('time.time')
@patch('ceph.log')
@patch('ceph.upgrade_monitor')
@patch('ceph.monitor_key_set')
@patch.object(ceph.utils, 'log')
@patch.object(ceph.utils, 'upgrade_monitor')
@patch.object(ceph.utils, 'monitor_key_set')
def test_lock_and_roll(self, monitor_key_set, upgrade_monitor, log, time):
time.return_value = 1473279502.69
monitor_key_set.monitor_key_set.return_value = None
ceph.lock_and_roll(my_name='ip-192-168-1-2',
version='hammer',
service='mon',
upgrade_key='admin')
ceph.utils.lock_and_roll(my_name='ip-192-168-1-2',
version='hammer',
service='mon',
upgrade_key='admin')
upgrade_monitor.assert_called_once_with('hammer')
log.assert_has_calls(
[
@ -72,18 +73,18 @@ class UpgradeRollingTestCase(unittest.TestCase):
'mon_ip-192-168-1-2_hammer_done 1473279502.69'),
])
@patch('ceph.apt_install')
@patch('ceph.chownr')
@patch('ceph.service_stop')
@patch('ceph.service_start')
@patch('ceph.log')
@patch('ceph.status_set')
@patch('ceph.apt_update')
@patch('ceph.add_source')
@patch('ceph.get_local_mon_ids')
@patch('ceph.systemd')
@patch('ceph.get_version')
@patch('ceph.config')
@patch.object(ceph.utils, 'apt_install')
@patch.object(ceph.utils, 'chownr')
@patch.object(ceph.utils, 'service_stop')
@patch.object(ceph.utils, 'service_start')
@patch.object(ceph.utils, 'log')
@patch.object(ceph.utils, 'status_set')
@patch.object(ceph.utils, 'apt_update')
@patch.object(ceph.utils, 'add_source')
@patch.object(ceph.utils, 'get_local_mon_ids')
@patch.object(ceph.utils, 'systemd')
@patch.object(ceph.utils, 'get_version')
@patch.object(ceph.utils, 'config')
def test_upgrade_monitor_hammer(self, config, get_version,
systemd, local_mons, add_source,
apt_update, status_set, log,
@ -94,7 +95,7 @@ class UpgradeRollingTestCase(unittest.TestCase):
systemd.return_value = False
local_mons.return_value = ['a']
ceph.upgrade_monitor('hammer')
ceph.utils.upgrade_monitor('hammer')
service_stop.assert_called_with('ceph-mon-all')
service_start.assert_called_with('ceph-mon-all')
add_source.assert_called_with('cloud:trusty-kilo', 'key')
@ -110,18 +111,18 @@ class UpgradeRollingTestCase(unittest.TestCase):
])
assert not chownr.called
@patch('ceph.apt_install')
@patch('ceph.chownr')
@patch('ceph.service_stop')
@patch('ceph.service_start')
@patch('ceph.log')
@patch('ceph.status_set')
@patch('ceph.apt_update')
@patch('ceph.add_source')
@patch('ceph.get_local_mon_ids')
@patch('ceph.systemd')
@patch('ceph.get_version')
@patch('ceph.config')
@patch.object(ceph.utils, 'apt_install')
@patch.object(ceph.utils, 'chownr')
@patch.object(ceph.utils, 'service_stop')
@patch.object(ceph.utils, 'service_start')
@patch.object(ceph.utils, 'log')
@patch.object(ceph.utils, 'status_set')
@patch.object(ceph.utils, 'apt_update')
@patch.object(ceph.utils, 'add_source')
@patch.object(ceph.utils, 'get_local_mon_ids')
@patch.object(ceph.utils, 'systemd')
@patch.object(ceph.utils, 'get_version')
@patch.object(ceph.utils, 'config')
def test_upgrade_monitor_jewel(self, config, get_version,
systemd, local_mons, add_source,
apt_update, status_set, log,
@ -132,7 +133,7 @@ class UpgradeRollingTestCase(unittest.TestCase):
systemd.return_value = False
local_mons.return_value = ['a']
ceph.upgrade_monitor('jewel')
ceph.utils.upgrade_monitor('jewel')
service_stop.assert_called_with('ceph-mon-all')
service_start.assert_called_with('ceph-mon-all')
add_source.assert_called_with('cloud:trusty-kilo', 'key')
@ -153,12 +154,12 @@ class UpgradeRollingTestCase(unittest.TestCase):
]
)
@patch('ceph.get_version')
@patch('ceph.status_set')
@patch('ceph.lock_and_roll')
@patch('ceph.wait_on_previous_node')
@patch('ceph.get_mon_map')
@patch('ceph.socket')
@patch.object(ceph.utils, 'get_version')
@patch.object(ceph.utils, 'status_set')
@patch.object(ceph.utils, 'lock_and_roll')
@patch.object(ceph.utils, 'wait_on_previous_node')
@patch.object(ceph.utils, 'get_mon_map')
@patch.object(ceph.utils, 'socket')
def test_roll_monitor_cluster_second(self,
socket,
get_mon_map,
@ -181,8 +182,8 @@ class UpgradeRollingTestCase(unittest.TestCase):
]
}
}
ceph.roll_monitor_cluster(new_version='0.94.1',
upgrade_key='admin')
ceph.utils.roll_monitor_cluster(new_version='0.94.1',
upgrade_key='admin')
status_set.assert_called_with(
'waiting',
'Waiting on ip-192-168-1-2 to finish upgrading')
@ -191,10 +192,10 @@ class UpgradeRollingTestCase(unittest.TestCase):
upgrade_key='admin',
version='0.94.1')
@patch('ceph.log')
@patch.object(ceph, 'time')
@patch('ceph.monitor_key_get')
@patch('ceph.monitor_key_exists')
@patch.object(ceph.utils, 'log')
@patch.object(ceph.utils, 'time')
@patch.object(ceph.utils, 'monitor_key_get')
@patch.object(ceph.utils, 'monitor_key_exists')
def test_wait_on_previous_node(self, monitor_key_exists, monitor_key_get,
mock_time, log):
tval = [previous_node_start_time]
@ -207,10 +208,10 @@ class UpgradeRollingTestCase(unittest.TestCase):
monitor_key_get.side_effect = monitor_key_side_effect
monitor_key_exists.return_value = False
ceph.wait_on_previous_node(previous_node="ip-192-168-1-2",
version='0.94.1',
service='mon',
upgrade_key='admin')
ceph.utils.wait_on_previous_node(previous_node="ip-192-168-1-2",
version='0.94.1',
service='mon',
upgrade_key='admin')
# Make sure we checked to see if the previous node started
monitor_key_get.assert_has_calls(

View File

@ -12,15 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import os
import unittest
import sys
import time
import unittest
from mock import patch, call, mock_open
import ceph
from ceph import CrushLocation
import ceph.utils
TO_PATCH = [
'apt_install',
@ -63,18 +62,18 @@ def monitor_key_side_effect(*args):
class UpgradeRollingTestCase(unittest.TestCase):
@patch('ceph.dirs_need_ownership_update')
@patch('ceph.apt_install')
@patch('ceph.chownr')
@patch('ceph.service_restart')
@patch('ceph.log')
@patch('ceph.status_set')
@patch('ceph.apt_update')
@patch('ceph.add_source')
@patch('ceph.get_local_osd_ids')
@patch('ceph.systemd')
@patch('ceph.get_version')
@patch('ceph.config')
@patch.object(ceph.utils, 'dirs_need_ownership_update')
@patch.object(ceph.utils, 'apt_install')
@patch.object(ceph.utils, 'chownr')
@patch.object(ceph.utils, 'service_restart')
@patch.object(ceph.utils, 'log')
@patch.object(ceph.utils, 'status_set')
@patch.object(ceph.utils, 'apt_update')
@patch.object(ceph.utils, 'add_source')
@patch.object(ceph.utils, 'get_local_osd_ids')
@patch.object(ceph.utils, 'systemd')
@patch.object(ceph.utils, 'get_version')
@patch.object(ceph.utils, 'config')
def test_upgrade_osd_hammer(self, config, get_version, systemd, local_osds,
add_source, apt_update, status_set, log,
service_restart, chownr, apt_install,
@ -85,7 +84,7 @@ class UpgradeRollingTestCase(unittest.TestCase):
local_osds.return_value = [0, 1, 2]
dirs_need_ownership_update.return_value = False
ceph.upgrade_osd('hammer')
ceph.utils.upgrade_osd('hammer')
service_restart.assert_called_with('ceph-osd-all')
status_set.assert_has_calls([
call('maintenance', 'Upgrading osd'),
@ -99,20 +98,20 @@ class UpgradeRollingTestCase(unittest.TestCase):
# Make sure on an Upgrade to Hammer that chownr was NOT called.
assert not chownr.called
@patch('ceph._upgrade_single_osd')
@patch('ceph.update_owner')
@patch.object(ceph.utils, '_upgrade_single_osd')
@patch.object(ceph.utils, 'update_owner')
@patch('os.listdir')
@patch('ceph._get_child_dirs')
@patch('ceph.dirs_need_ownership_update')
@patch('ceph.apt_install')
@patch('ceph.log')
@patch('ceph.status_set')
@patch('ceph.apt_update')
@patch('ceph.add_source')
@patch('ceph.get_local_osd_ids')
@patch('ceph.systemd')
@patch('ceph.get_version')
@patch('ceph.config')
@patch.object(ceph.utils, '_get_child_dirs')
@patch.object(ceph.utils, 'dirs_need_ownership_update')
@patch.object(ceph.utils, 'apt_install')
@patch.object(ceph.utils, 'log')
@patch.object(ceph.utils, 'status_set')
@patch.object(ceph.utils, 'apt_update')
@patch.object(ceph.utils, 'add_source')
@patch.object(ceph.utils, 'get_local_osd_ids')
@patch.object(ceph.utils, 'systemd')
@patch.object(ceph.utils, 'get_version')
@patch.object(ceph.utils, 'config')
def test_upgrade_osd_jewel(self, config, get_version, systemd,
local_osds, add_source, apt_update, status_set,
log, apt_install, dirs_need_ownership_update,
@ -126,11 +125,11 @@ class UpgradeRollingTestCase(unittest.TestCase):
_get_child_dirs.return_value = ['ceph-0', 'ceph-1', 'ceph-2']
dirs_need_ownership_update.return_value = True
ceph.upgrade_osd('jewel')
ceph.utils.upgrade_osd('jewel')
update_owner.assert_has_calls([
call(ceph.CEPH_BASE_DIR, recurse_dirs=False),
call(os.path.join(ceph.CEPH_BASE_DIR, 'mon')),
call(os.path.join(ceph.CEPH_BASE_DIR, 'fs')),
call(ceph.utils.CEPH_BASE_DIR, recurse_dirs=False),
call(os.path.join(ceph.utils.CEPH_BASE_DIR, 'mon')),
call(os.path.join(ceph.utils.CEPH_BASE_DIR, 'fs')),
])
_upgrade_single_osd.assert_has_calls([
call('0', 'ceph-0'),
@ -148,62 +147,62 @@ class UpgradeRollingTestCase(unittest.TestCase):
]
)
@patch.object(ceph, 'stop_osd')
@patch.object(ceph, 'disable_osd')
@patch.object(ceph, 'update_owner')
@patch.object(ceph, 'enable_osd')
@patch.object(ceph, 'start_osd')
@patch.object(ceph.utils, 'stop_osd')
@patch.object(ceph.utils, 'disable_osd')
@patch.object(ceph.utils, 'update_owner')
@patch.object(ceph.utils, 'enable_osd')
@patch.object(ceph.utils, 'start_osd')
def test_upgrade_single_osd(self, start_osd, enable_osd, update_owner,
disable_osd, stop_osd):
ceph._upgrade_single_osd(1, '/var/lib/ceph/osd/ceph-1')
ceph.utils._upgrade_single_osd(1, '/var/lib/ceph/osd/ceph-1')
stop_osd.assert_called_with(1)
disable_osd.assert_called_with(1)
update_owner.assert_called_with('/var/lib/ceph/osd/ceph-1')
enable_osd.assert_called_with(1)
start_osd.assert_called_with(1)
@patch.object(ceph, 'systemd')
@patch.object(ceph, 'service_stop')
@patch.object(ceph.utils, 'systemd')
@patch.object(ceph.utils, 'service_stop')
def test_stop_osd(self, service_stop, systemd):
systemd.return_value = False
ceph.stop_osd(1)
ceph.utils.stop_osd(1)
service_stop.assert_called_with('ceph-osd', id=1)
systemd.return_value = True
ceph.stop_osd(2)
ceph.utils.stop_osd(2)
service_stop.assert_called_with('ceph-osd@2')
@patch.object(ceph, 'systemd')
@patch.object(ceph, 'service_start')
@patch.object(ceph.utils, 'systemd')
@patch.object(ceph.utils, 'service_start')
def test_start_osd(self, service_start, systemd):
systemd.return_value = False
ceph.start_osd(1)
ceph.utils.start_osd(1)
service_start.assert_called_with('ceph-osd', id=1)
systemd.return_value = True
ceph.start_osd(2)
ceph.utils.start_osd(2)
service_start.assert_called_with('ceph-osd@2')
@patch('subprocess.check_call')
@patch('os.path.exists')
@patch('os.unlink')
@patch('ceph.systemd')
@patch.object(ceph.utils, 'systemd')
def test_disable_osd(self, systemd, unlink, exists, check_call):
systemd.return_value = True
ceph.disable_osd(4)
ceph.utils.disable_osd(4)
check_call.assert_called_with(['systemctl', 'disable', 'ceph-osd@4'])
exists.return_value = True
systemd.return_value = False
ceph.disable_osd(3)
ceph.utils.disable_osd(3)
unlink.assert_called_with('/var/lib/ceph/osd/ceph-3/ready')
@patch('subprocess.check_call')
@patch('ceph.update_owner')
@patch('ceph.systemd')
@patch.object(ceph.utils, 'update_owner')
@patch.object(ceph.utils, 'systemd')
def test_enable_osd(self, systemd, update_owner, check_call):
systemd.return_value = True
ceph.enable_osd(5)
ceph.utils.enable_osd(5)
check_call.assert_called_with(['systemctl', 'enable', 'ceph-osd@5'])
systemd.return_value = False
@ -212,17 +211,17 @@ class UpgradeRollingTestCase(unittest.TestCase):
# the python version.
bs = 'builtins' if sys.version_info > (3, 0) else '__builtin__'
with patch('%s.open' % bs, mo):
ceph.enable_osd(6)
ceph.utils.enable_osd(6)
mo.assert_called_once_with('/var/lib/ceph/osd/ceph-6/ready', 'w')
handle = mo()
handle.write.assert_called_with('ready')
update_owner.assert_called_with('/var/lib/ceph/osd/ceph-6/ready')
@patch('ceph.socket')
@patch('ceph.get_osd_tree')
@patch('ceph.log')
@patch('ceph.lock_and_roll')
@patch('ceph.get_upgrade_position')
@patch.object(ceph.utils, 'socket')
@patch.object(ceph.utils, 'get_osd_tree')
@patch.object(ceph.utils, 'log')
@patch.object(ceph.utils, 'lock_and_roll')
@patch.object(ceph.utils, 'get_upgrade_position')
def test_roll_osd_cluster_first(self,
get_upgrade_position,
lock_and_roll,
@ -233,8 +232,8 @@ class UpgradeRollingTestCase(unittest.TestCase):
get_osd_tree.return_value = ""
get_upgrade_position.return_value = 0
ceph.roll_osd_cluster(new_version='0.94.1',
upgrade_key='osd-upgrade')
ceph.utils.roll_osd_cluster(new_version='0.94.1',
upgrade_key='osd-upgrade')
log.assert_has_calls(
[
call('roll_osd_cluster called with 0.94.1'),
@ -247,12 +246,12 @@ class UpgradeRollingTestCase(unittest.TestCase):
upgrade_key='osd-upgrade',
service='osd')
@patch('ceph.get_osd_tree')
@patch('ceph.socket')
@patch('ceph.status_set')
@patch('ceph.lock_and_roll')
@patch('ceph.get_upgrade_position')
@patch('ceph.wait_on_previous_node')
@patch.object(ceph.utils, 'get_osd_tree')
@patch.object(ceph.utils, 'socket')
@patch.object(ceph.utils, 'status_set')
@patch.object(ceph.utils, 'lock_and_roll')
@patch.object(ceph.utils, 'get_upgrade_position')
@patch.object(ceph.utils, 'wait_on_previous_node')
def test_roll_osd_cluster_second(self,
wait_on_previous_node,
get_upgrade_position,
@ -263,7 +262,7 @@ class UpgradeRollingTestCase(unittest.TestCase):
wait_on_previous_node.return_value = None
socket.gethostname.return_value = "ip-192-168-1-3"
get_osd_tree.return_value = [
CrushLocation(
ceph.utils.CrushLocation(
name="ip-192-168-1-2",
identifier='a',
host='host-a',
@ -272,7 +271,7 @@ class UpgradeRollingTestCase(unittest.TestCase):
datacenter='dc-1',
chassis='chassis-a',
root='ceph'),
CrushLocation(
ceph.utils.CrushLocation(
name="ip-192-168-1-3",
identifier='a',
host='host-b',
@ -284,8 +283,8 @@ class UpgradeRollingTestCase(unittest.TestCase):
]
get_upgrade_position.return_value = 1
ceph.roll_osd_cluster(new_version='0.94.1',
upgrade_key='osd-upgrade')
ceph.utils.roll_osd_cluster(new_version='0.94.1',
upgrade_key='osd-upgrade')
status_set.assert_called_with(
'blocked',
'Waiting on ip-192-168-1-2 to finish upgrading')
@ -302,7 +301,7 @@ class UpgradeRollingTestCase(unittest.TestCase):
listdir.return_value = ['mon', 'bootstrap-osd', 'foo', 'bootstrap-mon']
exists.return_value = True
child_dirs = ceph._get_child_dirs('/var/lib/ceph')
child_dirs = ceph.utils._get_child_dirs('/var/lib/ceph')
isdir.assert_has_calls([call('/var/lib/ceph'),
call('/var/lib/ceph/mon'),
call('/var/lib/ceph/bootstrap-osd'),
@ -319,54 +318,54 @@ class UpgradeRollingTestCase(unittest.TestCase):
exists.return_value = False
with self.assertRaises(ValueError):
ceph._get_child_dirs('/var/lib/ceph')
ceph.utils._get_child_dirs('/var/lib/ceph')
@patch('os.path.exists')
def test__get_child_dirs_no_exist(self, exists):
exists.return_value = False
with self.assertRaises(ValueError):
ceph._get_child_dirs('/var/lib/ceph')
ceph.utils._get_child_dirs('/var/lib/ceph')
@patch('ceph.ceph_user')
@patch.object(ceph.utils, 'ceph_user')
@patch('os.path.isdir')
@patch('subprocess.check_call')
@patch('ceph.status_set')
@patch.object(ceph.utils, 'status_set')
def test_update_owner_no_recurse(self, status_set, check_call,
isdir, ceph_user):
ceph_user.return_value = 'ceph'
isdir.return_value = True
ceph.update_owner('/var/lib/ceph', False)
ceph.utils.update_owner('/var/lib/ceph', False)
check_call.assert_called_with(['chown', 'ceph:ceph', '/var/lib/ceph'])
@patch('ceph.ceph_user')
@patch.object(ceph.utils, 'ceph_user')
@patch('os.path.isdir')
@patch('subprocess.check_call')
@patch('ceph.status_set')
@patch.object(ceph.utils, 'status_set')
def test_update_owner_recurse_file(self, status_set, check_call,
isdir, ceph_user):
ceph_user.return_value = 'ceph'
isdir.return_value = False
ceph.update_owner('/var/lib/ceph', True)
ceph.utils.update_owner('/var/lib/ceph', True)
check_call.assert_called_with(['chown', 'ceph:ceph', '/var/lib/ceph'])
@patch('ceph.ceph_user')
@patch.object(ceph.utils, 'ceph_user')
@patch('os.path.isdir')
@patch('subprocess.check_call')
@patch('ceph.status_set')
@patch.object(ceph.utils, 'status_set')
def test_update_owner_recurse(self, status_set, check_call,
isdir, ceph_user):
ceph_user.return_value = 'ceph'
isdir.return_value = True
ceph.update_owner('/var/lib/ceph', True)
ceph.utils.update_owner('/var/lib/ceph', True)
check_call.assert_called_with(['chown', '-R', 'ceph:ceph',
'/var/lib/ceph'])
"""
@patch('ceph.log')
@patch.object(ceph.utils, 'log')
@patch('time.time', lambda *args: previous_node_start_time + 10 * 60 + 1)
@patch('ceph.monitor_key_get')
@patch('ceph.monitor_key_exists')
@patch.object(ceph.utils, 'monitor_key_get')
@patch.object(ceph.utils, 'monitor_key_exists')
def test_wait_on_previous_node(self,
monitor_key_exists,
monitor_key_get,
@ -374,10 +373,10 @@ class UpgradeRollingTestCase(unittest.TestCase):
monitor_key_get.side_effect = monitor_key_side_effect
monitor_key_exists.return_value = False
ceph.wait_on_previous_node(previous_node="ip-192-168-1-2",
version='0.94.1',
service='osd',
upgrade_key='osd-upgrade')
ceph.utils.wait_on_previous_node(previous_node="ip-192-168-1-2",
version='0.94.1',
service='osd',
upgrade_key='osd-upgrade')
# Make sure we checked to see if the previous node started
monitor_key_get.assert_has_calls(

View File

@ -12,11 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import unittest
import ceph
from mock import (
call,
mock_open,
MagicMock,
patch,
)
import ceph.utils as utils
from subprocess import CalledProcessError
import subprocess
class TestDevice():
@ -39,7 +46,7 @@ class CephTestCase(unittest.TestCase):
def setUp(self):
super(CephTestCase, self).setUp()
@mock.patch.object(ceph, 'check_output')
@patch.object(utils.subprocess, 'check_output')
def test_get_osd_weight(self, output):
"""It gives an OSD's weight"""
output.return_value = """{
@ -103,105 +110,91 @@ class CephTestCase(unittest.TestCase):
}],
"stray": []
}"""
weight = ceph.get_osd_weight('osd.0')
weight = utils.get_osd_weight('osd.0')
self.assertEqual(weight, 0.002899)
def test_get_named_key_with_pool(self):
with mock.patch.object(ceph, "ceph_user", return_value="ceph"):
with mock.patch.object(ceph, "check_output") \
as subprocess:
with mock.patch.object(ceph.socket, "gethostname",
return_value="osd001"):
subprocess.side_effect = [
CalledProcessError(0, 0, 0), ""]
ceph.get_named_key(name="rgw001",
pool_list=["rbd", "block"])
subprocess.assert_has_calls([
mock.call(['sudo', '-u', 'ceph', 'ceph', '--name',
'mon.', '--keyring',
'/var/lib/ceph/mon/ceph-osd001/keyring',
'auth', 'get', 'client.rgw001']),
mock.call(['sudo', '-u', 'ceph', 'ceph', '--name',
'mon.', '--keyring',
'/var/lib/ceph/mon/ceph-osd001/keyring',
'auth', 'get-or-create', 'client.rgw001',
'mon', 'allow r', 'osd',
'allow rwx pool=rbd pool=block'])])
@patch.object(utils.subprocess, 'check_output')
@patch.object(utils, "ceph_user", lambda: "ceph")
@patch.object(utils.socket, "gethostname", lambda: "osd001")
def test_get_named_key_with_pool(self, mock_check_output):
mock_check_output.side_effect = [CalledProcessError(0, 0, 0), ""]
utils.get_named_key(name="rgw001", pool_list=["rbd", "block"])
mock_check_output.assert_has_calls([
call(['sudo', '-u', 'ceph', 'ceph', '--name',
'mon.', '--keyring',
'/var/lib/ceph/mon/ceph-osd001/keyring',
'auth', 'get', 'client.rgw001']),
call(['sudo', '-u', 'ceph', 'ceph', '--name',
'mon.', '--keyring',
'/var/lib/ceph/mon/ceph-osd001/keyring',
'auth', 'get-or-create', 'client.rgw001',
'mon', 'allow r', 'osd',
'allow rwx pool=rbd pool=block'])])
def test_get_named_key(self):
with mock.patch.object(ceph, "ceph_user", return_value="ceph"):
with mock.patch.object(ceph, "check_output") \
as subprocess:
subprocess.side_effect = [
CalledProcessError(0, 0, 0),
""]
with mock.patch.object(ceph.socket, "gethostname",
return_value="osd001"):
ceph.get_named_key(name="rgw001")
for call in subprocess.mock_calls:
print("Subprocess: {}".format(call))
subprocess.assert_has_calls([
mock.call(['sudo', '-u', 'ceph', 'ceph', '--name',
'mon.', '--keyring',
'/var/lib/ceph/mon/ceph-osd001/keyring',
'auth', 'get', 'client.rgw001']),
mock.call(['sudo', '-u', 'ceph', 'ceph', '--name',
'mon.', '--keyring',
'/var/lib/ceph/mon/ceph-osd001/keyring',
'auth', 'get-or-create', 'client.rgw001',
'mon', 'allow r', 'osd',
'allow rwx'])])
@patch.object(utils.subprocess, 'check_output')
@patch.object(utils, 'ceph_user', lambda: "ceph")
@patch.object(utils.socket, "gethostname", lambda: "osd001")
def test_get_named_key(self, mock_check_output):
mock_check_output.side_effect = [CalledProcessError(0, 0, 0), ""]
utils.get_named_key(name="rgw001")
mock_check_output.assert_has_calls([
call(['sudo', '-u', 'ceph', 'ceph', '--name',
'mon.', '--keyring',
'/var/lib/ceph/mon/ceph-osd001/keyring',
'auth', 'get', 'client.rgw001']),
call(['sudo', '-u', 'ceph', 'ceph', '--name',
'mon.', '--keyring',
'/var/lib/ceph/mon/ceph-osd001/keyring',
'auth', 'get-or-create', 'client.rgw001',
'mon', 'allow r', 'osd',
'allow rwx'])])
def test_parse_key_with_caps_existing_key(self):
expected = "AQCm7aVYQFXXFhAAj0WIeqcag88DKOvY4UKR/g=="
with_caps = "[client.osd-upgrade]\n" \
" key = AQCm7aVYQFXXFhAAj0WIeqcag88DKOvY4UKR/g==\n" \
" caps mon = \"allow command \"config-key\";"
key = ceph.parse_key(with_caps)
print("key: {}".format(key))
key = utils.parse_key(with_caps)
self.assertEqual(key, expected)
def test_parse_key_without_caps(self):
expected = "AQCm7aVYQFXXFhAAj0WIeqcag88DKOvY4UKR/g=="
without_caps = "[client.osd-upgrade]\n" \
" key = AQCm7aVYQFXXFhAAj0WIeqcag88DKOvY4UKR/g=="
key = ceph.parse_key(without_caps)
print("key: {}".format(key))
key = utils.parse_key(without_caps)
self.assertEqual(key, expected)
def test_list_unmounted_devices(self):
dev1 = mock.MagicMock(spec=TestDevice)
dev1 = MagicMock(spec=TestDevice)
dev1.__getitem__.return_value = "block"
dev1.device_node = '/dev/sda'
dev2 = mock.MagicMock(spec=TestDevice)
dev2 = MagicMock(spec=TestDevice)
dev2.__getitem__.return_value = "block"
dev2.device_node = '/dev/sdb'
dev3 = mock.MagicMock(spec=TestDevice)
dev3 = MagicMock(spec=TestDevice)
dev3.__getitem__.return_value = "block"
dev3.device_node = '/dev/loop1'
devices = [dev1, dev2, dev3]
with mock.patch(
with patch(
'pyudev.Context.list_devices',
return_value=devices):
with mock.patch.object(ceph,
'is_device_mounted',
return_value=False):
devices = ceph.unmounted_disks()
with patch.object(utils, 'is_device_mounted',
return_value=False):
devices = utils.unmounted_disks()
self.assertEqual(devices, ['/dev/sda', '/dev/sdb'])
with mock.patch.object(ceph,
'is_device_mounted',
return_value=True):
devices = ceph.unmounted_disks()
with patch.object(utils, 'is_device_mounted',
return_value=True):
devices = utils.unmounted_disks()
self.assertEqual(devices, [])
@mock.patch.object(ceph, 'check_output')
@patch.object(utils.subprocess, 'check_output')
def test_get_partition_list(self, output):
with open('unit_tests/partx_output', 'r') as partx_out:
output.return_value = partx_out.read()
partition_list = ceph.get_partition_list('/dev/xvdb')
partition_list = utils.get_partition_list('/dev/xvdb')
self.assertEqual(len(partition_list), 2)
@mock.patch.object(ceph, 'check_output')
@patch.object(utils.subprocess, 'check_output')
def test_get_ceph_pg_stat(self, output):
"""It returns the current PG stat"""
output.return_value = """{
@ -218,10 +211,10 @@ class CephTestCase(unittest.TestCase):
"raw_bytes_avail": 26627104956416,
"raw_bytes": 26982147686400
}"""
pg_stat = ceph.get_ceph_pg_stat()
pg_stat = utils.get_ceph_pg_stat()
self.assertEqual(pg_stat['num_pgs'], 320)
@mock.patch.object(ceph, 'check_output')
@patch.object(utils.subprocess, 'check_output')
def test_get_ceph_health(self, output):
"""It gives the current Ceph health"""
output.return_value = """{
@ -310,39 +303,43 @@ class CephTestCase(unittest.TestCase):
"overall_status": "HEALTH_OK",
"detail": []
}"""
health = ceph.get_ceph_health()
health = utils.get_ceph_health()
self.assertEqual(health['overall_status'], "HEALTH_OK")
@mock.patch.object(subprocess, 'check_output')
@patch.object(utils.subprocess, 'check_output')
def test_reweight_osd(self, mock_reweight):
"""It changes the weight of an OSD"""
mock_reweight.return_value = "reweighted item id 0 name 'osd.0' to 1"
reweight_result = ceph.reweight_osd('0', '1')
reweight_result = utils.reweight_osd('0', '1')
self.assertEqual(reweight_result, True)
mock_reweight.assert_called_once_with(
['ceph', 'osd', 'crush', 'reweight', 'osd.0', '1'], stderr=-2)
@mock.patch.object(ceph, 'is_container')
@patch.object(utils, 'is_container')
def test_determine_packages(self, mock_is_container):
mock_is_container.return_value = False
self.assertTrue('ntp' in ceph.determine_packages())
self.assertEqual(ceph.PACKAGES, ceph.determine_packages())
self.assertTrue('ntp' in utils.determine_packages())
self.assertEqual(utils.PACKAGES,
utils.determine_packages())
mock_is_container.return_value = True
self.assertFalse('ntp' in ceph.determine_packages())
self.assertFalse('ntp' in utils.determine_packages())
@mock.patch.object(ceph, 'chownr')
@mock.patch.object(ceph, 'cmp_pkgrevno')
@mock.patch.object(ceph, 'ceph_user')
@mock.patch.object(ceph, 'os')
@mock.patch.object(ceph, 'systemd')
@mock.patch.object(ceph, 'log')
@mock.patch.object(ceph, 'mkdir')
@mock.patch.object(ceph, 'subprocess')
@mock.patch.object(ceph, 'service_restart')
@patch.object(utils, 'chownr')
@patch.object(utils, 'cmp_pkgrevno')
@patch.object(utils, 'ceph_user')
@patch.object(utils, 'os')
@patch.object(utils, 'systemd')
@patch.object(utils, 'log')
@patch.object(utils, 'mkdir')
@patch.object(utils.subprocess, 'check_output')
@patch.object(utils.subprocess, 'check_call')
@patch.object(utils, 'service_restart')
@patch.object(utils.socket, 'gethostname', lambda: 'TestHost')
def _test_bootstrap_monitor_cluster(self,
mock_service_restart,
mock_subprocess,
mock_check_call,
mock_check_output,
mock_mkdir,
mock_log,
mock_systemd,
@ -350,8 +347,8 @@ class CephTestCase(unittest.TestCase):
mock_ceph_user,
mock_cmp_pkgrevno,
mock_chownr,
luminos=False):
test_hostname = ceph.socket.gethostname()
luminous=False):
test_hostname = utils.socket.gethostname()
test_secret = 'mysecret'
test_keyring = '/var/lib/ceph/tmp/{}.mon.keyring'.format(test_hostname)
test_path = '/var/lib/ceph/mon/ceph-{}'.format(test_hostname)
@ -360,62 +357,58 @@ class CephTestCase(unittest.TestCase):
mock_os.path.exists.return_value = False
mock_systemd.return_value = True
mock_cmp_pkgrevno.return_value = 1 if luminos else -1
mock_cmp_pkgrevno.return_value = 1 if luminous else -1
mock_ceph_user.return_value = 'ceph'
test_calls = [
mock.call(
call(
['ceph-authtool', test_keyring,
'--create-keyring', '--name=mon.',
'--add-key={}'.format(test_secret),
'--cap', 'mon', 'allow *']
),
mock.call(
call(
['ceph-mon', '--mkfs',
'-i', test_hostname,
'--keyring', test_keyring]
),
mock.call(['systemctl', 'enable', 'ceph-mon']),
call(['systemctl', 'enable', 'ceph-mon']),
]
if luminos:
if luminous:
test_calls.append(
mock.call(['ceph-create-keys', '--id', test_hostname])
call(['ceph-create-keys', '--id', test_hostname])
)
mock_open = mock.mock_open()
with mock.patch('ceph.open', mock_open, create=True):
ceph.bootstrap_monitor_cluster(test_secret)
fake_open = mock_open()
with patch('ceph.utils.open', fake_open, create=True):
utils.bootstrap_monitor_cluster(test_secret)
self.assertEqual(
mock_subprocess.check_call.mock_calls,
test_calls
)
mock_check_call.assert_has_calls(test_calls)
mock_service_restart.assert_called_with('ceph-mon')
mock_mkdir.assert_has_calls([
mock.call('/var/run/ceph', owner='ceph',
group='ceph', perms=0o755),
mock.call(test_path, owner='ceph', group='ceph'),
call('/var/run/ceph', owner='ceph',
group='ceph', perms=0o755),
call(test_path, owner='ceph', group='ceph'),
])
mock_open.assert_has_calls([
mock.call(test_done, 'w'),
mock.call(test_init_marker, 'w'),
], any_order=True)
fake_open.assert_has_calls([call(test_done, 'w'),
call(test_init_marker, 'w')],
any_order=True)
mock_os.unlink.assert_called_with(test_keyring)
def test_bootstrap_monitor_cluster(self):
self._test_bootstrap_monitor_cluster(luminos=False)
self._test_bootstrap_monitor_cluster(luminous=False)
def test_bootstrap_monitor_cluster_luminous(self):
self._test_bootstrap_monitor_cluster(luminos=True)
self._test_bootstrap_monitor_cluster(luminous=True)
@mock.patch.object(ceph, 'chownr')
@mock.patch.object(ceph, 'cmp_pkgrevno')
@mock.patch.object(ceph, 'ceph_user')
@mock.patch.object(ceph, 'os')
@mock.patch.object(ceph, 'log')
@mock.patch.object(ceph, 'mkdir')
@mock.patch.object(ceph, 'subprocess')
@mock.patch.object(ceph, 'service_restart')
@patch.object(utils, 'chownr')
@patch.object(utils, 'cmp_pkgrevno')
@patch.object(utils, 'ceph_user')
@patch.object(utils, 'os')
@patch.object(utils, 'log')
@patch.object(utils, 'mkdir')
@patch.object(utils, 'subprocess')
@patch.object(utils, 'service_restart')
def test_bootstrap_manager(self,
mock_service_restart,
mock_subprocess,
@ -425,7 +418,7 @@ class CephTestCase(unittest.TestCase):
mock_ceph_user,
mock_cmp_pkgrevno,
mock_chownr):
test_hostname = ceph.socket.gethostname()
test_hostname = utils.socket.gethostname()
test_path = '/var/lib/ceph/mgr/ceph-{}'.format(test_hostname)
test_keyring = '/var/lib/ceph/mgr/ceph-{}/keyring'.format(
test_hostname)
@ -436,19 +429,19 @@ class CephTestCase(unittest.TestCase):
mock_ceph_user.return_value = 'ceph'
test_calls = [
mock.call(
call(
['ceph', 'auth', 'get-or-create',
'mgr.{}'.format(test_hostname), 'mon',
'allow profile mgr', 'osd', 'allow *',
'mds', 'allow *', '--out-file',
test_keyring]
),
mock.call(['systemctl', 'enable', test_unit]),
call(['systemctl', 'enable', test_unit]),
]
mock_open = mock.mock_open()
with mock.patch('ceph.open', mock_open, create=True):
ceph.bootstrap_manager()
fake_open = mock_open()
with patch('ceph.open', fake_open, create=True):
utils.bootstrap_manager()
self.assertEqual(
mock_subprocess.check_call.mock_calls,
@ -456,22 +449,28 @@ class CephTestCase(unittest.TestCase):
)
mock_service_restart.assert_called_with(test_unit)
mock_mkdir.assert_has_calls([
mock.call(test_path, owner='ceph', group='ceph'),
call(test_path, owner='ceph', group='ceph'),
])
class CephVersionTestCase(unittest.TestCase):
@mock.patch.object(ceph, 'get_os_codename_install_source')
@patch.object(utils, 'get_os_codename_install_source')
def test_resolve_ceph_version_trusty(self, get_os_codename_install_source):
get_os_codename_install_source.return_value = 'juno'
self.assertEqual(ceph.resolve_ceph_version('cloud:trusty-juno'),
self.assertEqual(utils.resolve_ceph_version('cloud:trusty-juno'),
'firefly')
get_os_codename_install_source.return_value = 'kilo'
self.assertEqual(ceph.resolve_ceph_version('cloud:trusty-kilo'),
self.assertEqual(utils.resolve_ceph_version('cloud:trusty-kilo'),
'hammer')
get_os_codename_install_source.return_value = 'liberty'
self.assertEqual(ceph.resolve_ceph_version('cloud:trusty-liberty'),
'hammer')
self.assertEqual(utils.resolve_ceph_version(
'cloud:trusty-liberty'), 'hammer')
get_os_codename_install_source.return_value = 'mitaka'
self.assertEqual(ceph.resolve_ceph_version('cloud:trusty-mitaka'),
'jewel')
self.assertEqual(utils.resolve_ceph_version(
'cloud:trusty-mitaka'), 'jewel')
get_os_codename_install_source.return_value = 'newton'
self.assertEqual(utils.resolve_ceph_version(
'cloud:xenial-newton'), 'jewel')
get_os_codename_install_source.return_value = 'ocata'
self.assertEqual(utils.resolve_ceph_version(
'cloud:xenial-ocata'), 'jewel')