Make charms.ceph fully py2/py3 compatible
These changes make the charms.ceph library suitable for both the old-stable Python 2 charms and the (to be) updates Python 3 only charms. Avoided use of six by using str() with the decode('UTF-8') function to allow the library to be used with both Py2 and Py3. The str(...) coercions can be removed at a later date when the library no longer needs to be synced to Py2 versions of the ceph-* charms. Change-Id: I416053439444bf4cf8945d1fe96643f9ed0f05f4
This commit is contained in:
parent
4b8255a411
commit
407d98b96a
|
@ -12,6 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import json
|
||||
import os
|
||||
|
||||
|
@ -134,7 +135,7 @@ def process_requests(reqs):
|
|||
log(msg, level=ERROR)
|
||||
return {'exit-code': 1, 'stderr': msg}
|
||||
|
||||
msg = ("Missing or invalid api version (%s)" % version)
|
||||
msg = ("Missing or invalid api version ({})".format(version))
|
||||
resp = {'exit-code': 1, 'stderr': msg}
|
||||
if request_id:
|
||||
resp['request-id'] = request_id
|
||||
|
@ -231,7 +232,7 @@ def add_pool_to_group(pool, group, namespace=None):
|
|||
def pool_permission_list_for_service(service):
|
||||
"""Build the permission string for Ceph for a given service"""
|
||||
permissions = []
|
||||
permission_types = {}
|
||||
permission_types = collections.OrderedDict()
|
||||
for permission, group in service["group_names"].items():
|
||||
if permission not in permission_types:
|
||||
permission_types[permission] = []
|
||||
|
@ -267,9 +268,7 @@ def get_service_groups(service, namespace=None):
|
|||
key="cephx.services.{}".format(service))
|
||||
try:
|
||||
service = json.loads(service_json)
|
||||
except TypeError:
|
||||
service = None
|
||||
except ValueError:
|
||||
except (TypeError, ValueError):
|
||||
service = None
|
||||
if service:
|
||||
service['groups'] = _build_service_groups(service, namespace)
|
||||
|
@ -296,7 +295,7 @@ def _build_service_groups(service, namespace=None):
|
|||
}
|
||||
"""
|
||||
all_groups = {}
|
||||
for _, groups in service['group_names'].items():
|
||||
for groups in service['group_names'].values():
|
||||
for group in groups:
|
||||
name = group
|
||||
if namespace:
|
||||
|
@ -316,9 +315,7 @@ def get_group(group_name):
|
|||
group_json = monitor_key_get(service='admin', key=group_key)
|
||||
try:
|
||||
group = json.loads(group_json)
|
||||
except TypeError:
|
||||
group = None
|
||||
except ValueError:
|
||||
except (TypeError, ValueError):
|
||||
group = None
|
||||
if not group:
|
||||
group = {
|
||||
|
@ -391,9 +388,8 @@ def handle_erasure_pool(request, service):
|
|||
percent_data=weight)
|
||||
# Ok make the erasure pool
|
||||
if not pool_exists(service=service, name=pool_name):
|
||||
log("Creating pool '%s' (erasure_profile=%s)" % (pool.name,
|
||||
erasure_profile),
|
||||
level=INFO)
|
||||
log("Creating pool '{}' (erasure_profile={})"
|
||||
.format(pool.name, erasure_profile), level=INFO)
|
||||
pool.create()
|
||||
|
||||
# Set a quota if requested
|
||||
|
@ -446,11 +442,11 @@ def handle_replicated_pool(request, service):
|
|||
pool = ReplicatedPool(service=service,
|
||||
name=pool_name, **kwargs)
|
||||
if not pool_exists(service=service, name=pool_name):
|
||||
log("Creating pool '%s' (replicas=%s)" % (pool.name, replicas),
|
||||
log("Creating pool '{}' (replicas={})".format(pool.name, replicas),
|
||||
level=INFO)
|
||||
pool.create()
|
||||
else:
|
||||
log("Pool '%s' already exists - skipping create" % pool.name,
|
||||
log("Pool '{}' already exists - skipping create".format(pool.name),
|
||||
level=DEBUG)
|
||||
|
||||
# Set a quota if requested
|
||||
|
@ -519,7 +515,7 @@ def handle_set_pool_value(request, service):
|
|||
'key': request.get('key'),
|
||||
'value': request.get('value')}
|
||||
if params['key'] not in POOL_KEYS:
|
||||
msg = "Invalid key '%s'" % params['key']
|
||||
msg = "Invalid key '{}'".format(params['key'])
|
||||
log(msg, level=ERROR)
|
||||
return {'exit-code': 1, 'stderr': msg}
|
||||
|
||||
|
@ -685,7 +681,7 @@ def handle_rgw_create_user(request, service):
|
|||
]
|
||||
)
|
||||
try:
|
||||
user_json = json.loads(create_output)
|
||||
user_json = json.loads(str(create_output.decode('UTF-8')))
|
||||
return {'exit-code': 0, 'user': user_json}
|
||||
except ValueError as err:
|
||||
log(err, level=ERROR)
|
||||
|
@ -790,10 +786,10 @@ def process_requests_v1(reqs):
|
|||
operation failed along with an explanation).
|
||||
"""
|
||||
ret = None
|
||||
log("Processing %s ceph broker requests" % (len(reqs)), level=INFO)
|
||||
log("Processing {} ceph broker requests".format(len(reqs)), level=INFO)
|
||||
for req in reqs:
|
||||
op = req.get('op')
|
||||
log("Processing op='%s'" % op, level=DEBUG)
|
||||
log("Processing op='{}'".format(op), level=DEBUG)
|
||||
# Use admin client since we do not have other client key locations
|
||||
# setup to use them for these operations.
|
||||
svc = 'admin'
|
||||
|
@ -848,7 +844,7 @@ def process_requests_v1(reqs):
|
|||
elif op == "add-permissions-to-key":
|
||||
ret = handle_add_permissions_to_key(request=req, service=svc)
|
||||
else:
|
||||
msg = "Unknown operation '%s'" % op
|
||||
msg = "Unknown operation '{}'".format(op)
|
||||
log(msg, level=ERROR)
|
||||
return {'exit-code': 1, 'stderr': msg}
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ class Crushmap(object):
|
|||
ids = list(map(
|
||||
lambda x: int(x),
|
||||
re.findall(CRUSHMAP_ID_RE, self._crushmap)))
|
||||
ids.sort()
|
||||
ids = sorted(ids)
|
||||
if roots != []:
|
||||
for root in roots:
|
||||
buckets.append(CRUSHBucket(root[0], root[1], True))
|
||||
|
@ -73,8 +73,11 @@ class Crushmap(object):
|
|||
|
||||
def load_crushmap(self):
|
||||
try:
|
||||
crush = check_output(['ceph', 'osd', 'getcrushmap'])
|
||||
return check_output(['crushtool', '-d', '-'], stdin=crush.stdout)
|
||||
crush = str(check_output(['ceph', 'osd', 'getcrushmap'])
|
||||
.decode('UTF-8'))
|
||||
return str(check_output(['crushtool', '-d', '-'],
|
||||
stdin=crush.stdout)
|
||||
.decode('UTF-8'))
|
||||
except CalledProcessError as e:
|
||||
log("Error occured while loading and decompiling CRUSH map:"
|
||||
"{}".format(e), ERROR)
|
||||
|
@ -99,10 +102,12 @@ class Crushmap(object):
|
|||
"""Persist Crushmap to Ceph"""
|
||||
try:
|
||||
crushmap = self.build_crushmap()
|
||||
compiled = check_output(['crushtool', '-c', '/dev/stdin', '-o',
|
||||
compiled = str(check_output(['crushtool', '-c', '/dev/stdin', '-o',
|
||||
'/dev/stdout'], stdin=crushmap)
|
||||
ceph_output = check_output(['ceph', 'osd', 'setcrushmap', '-i',
|
||||
.decode('UTF-8'))
|
||||
ceph_output = str(check_output(['ceph', 'osd', 'setcrushmap', '-i',
|
||||
'/dev/stdin'], stdin=compiled)
|
||||
.decode('UTF-8'))
|
||||
return ceph_output
|
||||
except CalledProcessError as e:
|
||||
log("save error: {}".format(e))
|
||||
|
|
|
@ -381,8 +381,9 @@ def get_block_uuid(block_dev):
|
|||
:returns: The UUID of the device or None on Error.
|
||||
"""
|
||||
try:
|
||||
block_info = subprocess.check_output(
|
||||
['blkid', '-o', 'export', block_dev])
|
||||
block_info = str(subprocess
|
||||
.check_output(['blkid', '-o', 'export', block_dev])
|
||||
.decode('UTF-8'))
|
||||
for tag in block_info.split('\n'):
|
||||
parts = tag.split('=')
|
||||
if parts[0] == 'UUID':
|
||||
|
@ -533,8 +534,9 @@ def get_osd_weight(osd_id):
|
|||
:raises: CalledProcessError if our ceph command fails.
|
||||
"""
|
||||
try:
|
||||
tree = subprocess.check_output(
|
||||
['ceph', 'osd', 'tree', '--format=json'])
|
||||
tree = str(subprocess
|
||||
.check_output(['ceph', 'osd', 'tree', '--format=json'])
|
||||
.decode('UTF-8'))
|
||||
try:
|
||||
json_tree = json.loads(tree)
|
||||
# Make sure children are present in the json
|
||||
|
@ -561,9 +563,10 @@ def get_osd_tree(service):
|
|||
Also raises CalledProcessError if our ceph command fails
|
||||
"""
|
||||
try:
|
||||
tree = subprocess.check_output(
|
||||
['ceph', '--id', service,
|
||||
tree = str(subprocess
|
||||
.check_output(['ceph', '--id', service,
|
||||
'osd', 'tree', '--format=json'])
|
||||
.decode('UTF-8'))
|
||||
try:
|
||||
json_tree = json.loads(tree)
|
||||
crush_list = []
|
||||
|
@ -628,7 +631,7 @@ def _get_osd_num_from_dirname(dirname):
|
|||
"""
|
||||
match = re.search('ceph-(?P<osd_id>\d+)', dirname)
|
||||
if not match:
|
||||
raise ValueError("dirname not in correct format: %s" % dirname)
|
||||
raise ValueError("dirname not in correct format: {}".format(dirname))
|
||||
|
||||
return match.group('osd_id')
|
||||
|
||||
|
@ -718,7 +721,7 @@ def get_version():
|
|||
|
||||
|
||||
def error_out(msg):
|
||||
log("FATAL ERROR: %s" % msg,
|
||||
log("FATAL ERROR: {}".format(msg),
|
||||
level=ERROR)
|
||||
sys.exit(1)
|
||||
|
||||
|
@ -736,7 +739,9 @@ def is_quorum():
|
|||
]
|
||||
if os.path.exists(asok):
|
||||
try:
|
||||
result = json.loads(subprocess.check_output(cmd))
|
||||
result = json.loads(str(subprocess
|
||||
.check_output(cmd)
|
||||
.decode('UTF-8')))
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
except ValueError:
|
||||
|
@ -763,7 +768,9 @@ def is_leader():
|
|||
]
|
||||
if os.path.exists(asok):
|
||||
try:
|
||||
result = json.loads(subprocess.check_output(cmd))
|
||||
result = json.loads(str(subprocess
|
||||
.check_output(cmd)
|
||||
.decode('UTF-8')))
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
except ValueError:
|
||||
|
@ -955,8 +962,9 @@ def is_osd_disk(dev):
|
|||
partitions = get_partition_list(dev)
|
||||
for partition in partitions:
|
||||
try:
|
||||
info = subprocess.check_output(['sgdisk', '-i', partition.number,
|
||||
dev])
|
||||
info = str(subprocess
|
||||
.check_output(['sgdisk', '-i', partition.number, dev])
|
||||
.decode('UTF-8'))
|
||||
info = info.split("\n") # IGNORE:E1103
|
||||
for line in info:
|
||||
for ptype in CEPH_PARTITIONS:
|
||||
|
@ -1039,7 +1047,7 @@ def generate_monitor_secret():
|
|||
'--name=mon.',
|
||||
'--gen-key'
|
||||
]
|
||||
res = subprocess.check_output(cmd)
|
||||
res = str(subprocess.check_output(cmd).decode('UTF-8'))
|
||||
|
||||
return "{}==".format(res.split('=')[1].strip())
|
||||
|
||||
|
@ -1188,7 +1196,10 @@ def create_named_keyring(entity, name, caps=None):
|
|||
for subsystem, subcaps in caps.items():
|
||||
cmd.extend([subsystem, '; '.join(subcaps)])
|
||||
log("Calling check_output: {}".format(cmd), level=DEBUG)
|
||||
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103
|
||||
return (parse_key(str(subprocess
|
||||
.check_output(cmd)
|
||||
.decode('UTF-8'))
|
||||
.strip())) # IGNORE:E1103
|
||||
|
||||
|
||||
def get_upgrade_key():
|
||||
|
@ -1205,7 +1216,7 @@ def get_named_key(name, caps=None, pool_list=None):
|
|||
"""
|
||||
try:
|
||||
# Does the key already exist?
|
||||
output = subprocess.check_output(
|
||||
output = str(subprocess.check_output(
|
||||
[
|
||||
'sudo',
|
||||
'-u', ceph_user(),
|
||||
|
@ -1218,7 +1229,7 @@ def get_named_key(name, caps=None, pool_list=None):
|
|||
'auth',
|
||||
'get',
|
||||
'client.{}'.format(name),
|
||||
]).strip()
|
||||
]).decode('UTF-8')).strip()
|
||||
return parse_key(output)
|
||||
except subprocess.CalledProcessError:
|
||||
# Couldn't get the key, time to create it!
|
||||
|
@ -1247,7 +1258,10 @@ def get_named_key(name, caps=None, pool_list=None):
|
|||
cmd.extend([subsystem, '; '.join(subcaps)])
|
||||
|
||||
log("Calling check_output: {}".format(cmd), level=DEBUG)
|
||||
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103
|
||||
return parse_key(str(subprocess
|
||||
.check_output(cmd)
|
||||
.decode('UTF-8'))
|
||||
.strip()) # IGNORE:E1103
|
||||
|
||||
|
||||
def upgrade_key_caps(key, caps):
|
||||
|
@ -1361,7 +1375,7 @@ def maybe_zap_journal(journal_dev):
|
|||
def get_partitions(dev):
|
||||
cmd = ['partx', '--raw', '--noheadings', dev]
|
||||
try:
|
||||
out = subprocess.check_output(cmd).splitlines()
|
||||
out = str(subprocess.check_output(cmd).decode('UTF-8')).splitlines()
|
||||
log("get partitions: {}".format(out), level=DEBUG)
|
||||
return out
|
||||
except subprocess.CalledProcessError as e:
|
||||
|
@ -1529,7 +1543,7 @@ def get_running_osds():
|
|||
"""Returns a list of the pids of the current running OSD daemons"""
|
||||
cmd = ['pgrep', 'ceph-osd']
|
||||
try:
|
||||
result = subprocess.check_output(cmd)
|
||||
result = str(subprocess.check_output(cmd).decode('UTF-8'))
|
||||
return result.split()
|
||||
except subprocess.CalledProcessError:
|
||||
return []
|
||||
|
@ -1545,7 +1559,9 @@ def get_cephfs(service):
|
|||
# This command wasn't introduced until 0.86 ceph
|
||||
return []
|
||||
try:
|
||||
output = subprocess.check_output(["ceph", '--id', service, "fs", "ls"])
|
||||
output = str(subprocess
|
||||
.check_output(["ceph", '--id', service, "fs", "ls"])
|
||||
.decode('UTF-8'))
|
||||
if not output:
|
||||
return []
|
||||
"""
|
||||
|
@ -2079,7 +2095,9 @@ def list_pools(service):
|
|||
"""
|
||||
try:
|
||||
pool_list = []
|
||||
pools = subprocess.check_output(['rados', '--id', service, 'lspools'])
|
||||
pools = str(subprocess
|
||||
.check_output(['rados', '--id', service, 'lspools'])
|
||||
.decode('UTF-8'))
|
||||
for pool in pools.splitlines():
|
||||
pool_list.append(pool)
|
||||
return pool_list
|
||||
|
@ -2140,10 +2158,8 @@ UCA_CODENAME_MAP = {
|
|||
|
||||
def pretty_print_upgrade_paths():
|
||||
"""Pretty print supported upgrade paths for ceph"""
|
||||
lines = []
|
||||
for key, value in UPGRADE_PATHS.iteritems():
|
||||
lines.append("{} -> {}".format(key, value))
|
||||
return lines
|
||||
return ["{} -> {}".format(key, value)
|
||||
for key, value in UPGRADE_PATHS.iteritems()]
|
||||
|
||||
|
||||
def resolve_ceph_version(source):
|
||||
|
@ -2163,7 +2179,9 @@ def get_ceph_pg_stat():
|
|||
:returns: dict
|
||||
"""
|
||||
try:
|
||||
tree = subprocess.check_output(['ceph', 'pg', 'stat', '--format=json'])
|
||||
tree = str(subprocess
|
||||
.check_output(['ceph', 'pg', 'stat', '--format=json'])
|
||||
.decode('UTF-8'))
|
||||
try:
|
||||
json_tree = json.loads(tree)
|
||||
if not json_tree['num_pg_by_state']:
|
||||
|
@ -2187,8 +2205,9 @@ def get_ceph_health():
|
|||
status, use get_ceph_health()['overall_status'].
|
||||
"""
|
||||
try:
|
||||
tree = subprocess.check_output(
|
||||
['ceph', 'status', '--format=json'])
|
||||
tree = str(subprocess
|
||||
.check_output(['ceph', 'status', '--format=json'])
|
||||
.decode('UTF-8'))
|
||||
try:
|
||||
json_tree = json.loads(tree)
|
||||
# Make sure children are present in the json
|
||||
|
@ -2215,9 +2234,12 @@ def reweight_osd(osd_num, new_weight):
|
|||
:raises CalledProcessError: if an error occurs invoking the systemd cmd
|
||||
"""
|
||||
try:
|
||||
cmd_result = subprocess.check_output(
|
||||
['ceph', 'osd', 'crush', 'reweight', "osd.{}".format(osd_num),
|
||||
new_weight], stderr=subprocess.STDOUT)
|
||||
cmd_result = str(subprocess
|
||||
.check_output(['ceph', 'osd', 'crush',
|
||||
'reweight', "osd.{}".format(osd_num),
|
||||
new_weight],
|
||||
stderr=subprocess.STDOUT)
|
||||
.decode('UTF-8'))
|
||||
expected_result = "reweighted item id {ID} name \'osd.{ID}\'".format(
|
||||
ID=osd_num) + " to {}".format(new_weight)
|
||||
log(cmd_result)
|
||||
|
|
2
setup.py
2
setup.py
|
@ -49,7 +49,7 @@ if sys.argv[-1] == 'publish':
|
|||
|
||||
|
||||
if sys.argv[-1] == 'tag':
|
||||
os.system("git tag -a %s -m 'version %s'" % (version, version))
|
||||
os.system("git tag -a {0} -m 'version {0}'".format(version))
|
||||
os.system("git push --tags")
|
||||
sys.exit()
|
||||
|
||||
|
|
7
tox.ini
7
tox.ini
|
@ -1,5 +1,5 @@
|
|||
[tox]
|
||||
envlist = pep8,py27,py34,py35
|
||||
envlist = pep8,py27,py34,py35,py36
|
||||
skipsdist = True
|
||||
skip_missing_interpreters = True
|
||||
|
||||
|
@ -32,6 +32,11 @@ basepython = python3.5
|
|||
deps = -r{toxinidir}/requirements.txt
|
||||
-r{toxinidir}/test-requirements.txt
|
||||
|
||||
[testenv:py36]
|
||||
basepython = python3.6
|
||||
deps = -r{toxinidir}/requirements.txt
|
||||
-r{toxinidir}/test-requirements.txt
|
||||
|
||||
[testenv:pep8]
|
||||
basepython = python2.7
|
||||
deps = -r{toxinidir}/requirements.txt
|
||||
|
|
|
@ -84,7 +84,7 @@ class CephTestCase(unittest.TestCase):
|
|||
@patch.object(utils.subprocess, 'check_output')
|
||||
def test_get_osd_weight(self, output):
|
||||
"""It gives an OSD's weight"""
|
||||
output.return_value = """{
|
||||
output.return_value = b"""{
|
||||
"nodes": [{
|
||||
"id": -1,
|
||||
"name": "default",
|
||||
|
@ -152,7 +152,7 @@ class CephTestCase(unittest.TestCase):
|
|||
@patch.object(utils, "ceph_user", lambda: "ceph")
|
||||
@patch.object(utils.socket, "gethostname", lambda: "osd001")
|
||||
def test_get_named_key_with_pool(self, mock_check_output):
|
||||
mock_check_output.side_effect = [CalledProcessError(0, 0, 0), ""]
|
||||
mock_check_output.side_effect = [CalledProcessError(0, 0, 0), b""]
|
||||
utils.get_named_key(name="rgw001", pool_list=["rbd", "block"])
|
||||
mock_check_output.assert_has_calls([
|
||||
call(['sudo', '-u', 'ceph', 'ceph', '--name',
|
||||
|
@ -170,7 +170,7 @@ class CephTestCase(unittest.TestCase):
|
|||
@patch.object(utils, 'ceph_user', lambda: "ceph")
|
||||
@patch.object(utils.socket, "gethostname", lambda: "osd001")
|
||||
def test_get_named_key(self, mock_check_output):
|
||||
mock_check_output.side_effect = [CalledProcessError(0, 0, 0), ""]
|
||||
mock_check_output.side_effect = [CalledProcessError(0, 0, 0), b""]
|
||||
utils.get_named_key(name="rgw001")
|
||||
mock_check_output.assert_has_calls([
|
||||
call(['sudo', '-u', 'ceph', 'ceph', '--name',
|
||||
|
@ -225,14 +225,14 @@ class CephTestCase(unittest.TestCase):
|
|||
@patch.object(utils.subprocess, 'check_output')
|
||||
def test_get_partition_list(self, output):
|
||||
with open('unit_tests/partx_output', 'r') as partx_out:
|
||||
output.return_value = partx_out.read()
|
||||
output.return_value = partx_out.read().encode('UTF-8')
|
||||
partition_list = utils.get_partition_list('/dev/xvdb')
|
||||
self.assertEqual(len(partition_list), 2)
|
||||
|
||||
@patch.object(utils.subprocess, 'check_output')
|
||||
def test_get_ceph_pg_stat(self, output):
|
||||
"""It returns the current PG stat"""
|
||||
output.return_value = """{
|
||||
output.return_value = b"""{
|
||||
"num_pg_by_state": [
|
||||
{
|
||||
"name": "active+clean",
|
||||
|
@ -252,7 +252,7 @@ class CephTestCase(unittest.TestCase):
|
|||
@patch.object(utils.subprocess, 'check_output')
|
||||
def test_get_ceph_health(self, output):
|
||||
"""It gives the current Ceph health"""
|
||||
output.return_value = """{
|
||||
output.return_value = b"""{
|
||||
"health": {
|
||||
"health_services": [
|
||||
{
|
||||
|
@ -344,7 +344,7 @@ class CephTestCase(unittest.TestCase):
|
|||
@patch.object(utils.subprocess, 'check_output')
|
||||
def test_reweight_osd(self, mock_reweight):
|
||||
"""It changes the weight of an OSD"""
|
||||
mock_reweight.return_value = "reweighted item id 0 name 'osd.0' to 1"
|
||||
mock_reweight.return_value = b"reweighted item id 0 name 'osd.0' to 1"
|
||||
reweight_result = utils.reweight_osd('0', '1')
|
||||
self.assertEqual(reweight_result, True)
|
||||
mock_reweight.assert_called_once_with(
|
||||
|
|
Loading…
Reference in New Issue