Make charms.ceph fully py2/py3 compatible

These changes make the charms.ceph library suitable for both the
old-stable Python 2 charms and the (to be) updates Python 3 only charms.
Avoided use of six by using str() with the decode('UTF-8') function to
allow the library to be used with both Py2 and Py3.  The str(...)
coercions can be removed at a later date when the library no longer
needs to be synced to Py2 versions of the ceph-* charms.

Change-Id: I416053439444bf4cf8945d1fe96643f9ed0f05f4
This commit is contained in:
Alex Kavanagh 2017-10-30 11:59:57 +00:00
parent 4b8255a411
commit 407d98b96a
6 changed files with 95 additions and 67 deletions

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import json
import os
@ -134,7 +135,7 @@ def process_requests(reqs):
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
msg = ("Missing or invalid api version (%s)" % version)
msg = ("Missing or invalid api version ({})".format(version))
resp = {'exit-code': 1, 'stderr': msg}
if request_id:
resp['request-id'] = request_id
@ -231,7 +232,7 @@ def add_pool_to_group(pool, group, namespace=None):
def pool_permission_list_for_service(service):
"""Build the permission string for Ceph for a given service"""
permissions = []
permission_types = {}
permission_types = collections.OrderedDict()
for permission, group in service["group_names"].items():
if permission not in permission_types:
permission_types[permission] = []
@ -267,9 +268,7 @@ def get_service_groups(service, namespace=None):
key="cephx.services.{}".format(service))
try:
service = json.loads(service_json)
except TypeError:
service = None
except ValueError:
except (TypeError, ValueError):
service = None
if service:
service['groups'] = _build_service_groups(service, namespace)
@ -296,7 +295,7 @@ def _build_service_groups(service, namespace=None):
}
"""
all_groups = {}
for _, groups in service['group_names'].items():
for groups in service['group_names'].values():
for group in groups:
name = group
if namespace:
@ -316,9 +315,7 @@ def get_group(group_name):
group_json = monitor_key_get(service='admin', key=group_key)
try:
group = json.loads(group_json)
except TypeError:
group = None
except ValueError:
except (TypeError, ValueError):
group = None
if not group:
group = {
@ -391,9 +388,8 @@ def handle_erasure_pool(request, service):
percent_data=weight)
# Ok make the erasure pool
if not pool_exists(service=service, name=pool_name):
log("Creating pool '%s' (erasure_profile=%s)" % (pool.name,
erasure_profile),
level=INFO)
log("Creating pool '{}' (erasure_profile={})"
.format(pool.name, erasure_profile), level=INFO)
pool.create()
# Set a quota if requested
@ -446,11 +442,11 @@ def handle_replicated_pool(request, service):
pool = ReplicatedPool(service=service,
name=pool_name, **kwargs)
if not pool_exists(service=service, name=pool_name):
log("Creating pool '%s' (replicas=%s)" % (pool.name, replicas),
log("Creating pool '{}' (replicas={})".format(pool.name, replicas),
level=INFO)
pool.create()
else:
log("Pool '%s' already exists - skipping create" % pool.name,
log("Pool '{}' already exists - skipping create".format(pool.name),
level=DEBUG)
# Set a quota if requested
@ -519,7 +515,7 @@ def handle_set_pool_value(request, service):
'key': request.get('key'),
'value': request.get('value')}
if params['key'] not in POOL_KEYS:
msg = "Invalid key '%s'" % params['key']
msg = "Invalid key '{}'".format(params['key'])
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
@ -685,7 +681,7 @@ def handle_rgw_create_user(request, service):
]
)
try:
user_json = json.loads(create_output)
user_json = json.loads(str(create_output.decode('UTF-8')))
return {'exit-code': 0, 'user': user_json}
except ValueError as err:
log(err, level=ERROR)
@ -790,10 +786,10 @@ def process_requests_v1(reqs):
operation failed along with an explanation).
"""
ret = None
log("Processing %s ceph broker requests" % (len(reqs)), level=INFO)
log("Processing {} ceph broker requests".format(len(reqs)), level=INFO)
for req in reqs:
op = req.get('op')
log("Processing op='%s'" % op, level=DEBUG)
log("Processing op='{}'".format(op), level=DEBUG)
# Use admin client since we do not have other client key locations
# setup to use them for these operations.
svc = 'admin'
@ -848,7 +844,7 @@ def process_requests_v1(reqs):
elif op == "add-permissions-to-key":
ret = handle_add_permissions_to_key(request=req, service=svc)
else:
msg = "Unknown operation '%s'" % op
msg = "Unknown operation '{}'".format(op)
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}

View File

@ -60,7 +60,7 @@ class Crushmap(object):
ids = list(map(
lambda x: int(x),
re.findall(CRUSHMAP_ID_RE, self._crushmap)))
ids.sort()
ids = sorted(ids)
if roots != []:
for root in roots:
buckets.append(CRUSHBucket(root[0], root[1], True))
@ -73,8 +73,11 @@ class Crushmap(object):
def load_crushmap(self):
try:
crush = check_output(['ceph', 'osd', 'getcrushmap'])
return check_output(['crushtool', '-d', '-'], stdin=crush.stdout)
crush = str(check_output(['ceph', 'osd', 'getcrushmap'])
.decode('UTF-8'))
return str(check_output(['crushtool', '-d', '-'],
stdin=crush.stdout)
.decode('UTF-8'))
except CalledProcessError as e:
log("Error occured while loading and decompiling CRUSH map:"
"{}".format(e), ERROR)
@ -99,10 +102,12 @@ class Crushmap(object):
"""Persist Crushmap to Ceph"""
try:
crushmap = self.build_crushmap()
compiled = check_output(['crushtool', '-c', '/dev/stdin', '-o',
'/dev/stdout'], stdin=crushmap)
ceph_output = check_output(['ceph', 'osd', 'setcrushmap', '-i',
'/dev/stdin'], stdin=compiled)
compiled = str(check_output(['crushtool', '-c', '/dev/stdin', '-o',
'/dev/stdout'], stdin=crushmap)
.decode('UTF-8'))
ceph_output = str(check_output(['ceph', 'osd', 'setcrushmap', '-i',
'/dev/stdin'], stdin=compiled)
.decode('UTF-8'))
return ceph_output
except CalledProcessError as e:
log("save error: {}".format(e))

View File

@ -381,8 +381,9 @@ def get_block_uuid(block_dev):
:returns: The UUID of the device or None on Error.
"""
try:
block_info = subprocess.check_output(
['blkid', '-o', 'export', block_dev])
block_info = str(subprocess
.check_output(['blkid', '-o', 'export', block_dev])
.decode('UTF-8'))
for tag in block_info.split('\n'):
parts = tag.split('=')
if parts[0] == 'UUID':
@ -533,8 +534,9 @@ def get_osd_weight(osd_id):
:raises: CalledProcessError if our ceph command fails.
"""
try:
tree = subprocess.check_output(
['ceph', 'osd', 'tree', '--format=json'])
tree = str(subprocess
.check_output(['ceph', 'osd', 'tree', '--format=json'])
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
# Make sure children are present in the json
@ -561,9 +563,10 @@ def get_osd_tree(service):
Also raises CalledProcessError if our ceph command fails
"""
try:
tree = subprocess.check_output(
['ceph', '--id', service,
'osd', 'tree', '--format=json'])
tree = str(subprocess
.check_output(['ceph', '--id', service,
'osd', 'tree', '--format=json'])
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
crush_list = []
@ -628,7 +631,7 @@ def _get_osd_num_from_dirname(dirname):
"""
match = re.search('ceph-(?P<osd_id>\d+)', dirname)
if not match:
raise ValueError("dirname not in correct format: %s" % dirname)
raise ValueError("dirname not in correct format: {}".format(dirname))
return match.group('osd_id')
@ -718,7 +721,7 @@ def get_version():
def error_out(msg):
log("FATAL ERROR: %s" % msg,
log("FATAL ERROR: {}".format(msg),
level=ERROR)
sys.exit(1)
@ -736,7 +739,9 @@ def is_quorum():
]
if os.path.exists(asok):
try:
result = json.loads(subprocess.check_output(cmd))
result = json.loads(str(subprocess
.check_output(cmd)
.decode('UTF-8')))
except subprocess.CalledProcessError:
return False
except ValueError:
@ -763,7 +768,9 @@ def is_leader():
]
if os.path.exists(asok):
try:
result = json.loads(subprocess.check_output(cmd))
result = json.loads(str(subprocess
.check_output(cmd)
.decode('UTF-8')))
except subprocess.CalledProcessError:
return False
except ValueError:
@ -955,8 +962,9 @@ def is_osd_disk(dev):
partitions = get_partition_list(dev)
for partition in partitions:
try:
info = subprocess.check_output(['sgdisk', '-i', partition.number,
dev])
info = str(subprocess
.check_output(['sgdisk', '-i', partition.number, dev])
.decode('UTF-8'))
info = info.split("\n") # IGNORE:E1103
for line in info:
for ptype in CEPH_PARTITIONS:
@ -1039,7 +1047,7 @@ def generate_monitor_secret():
'--name=mon.',
'--gen-key'
]
res = subprocess.check_output(cmd)
res = str(subprocess.check_output(cmd).decode('UTF-8'))
return "{}==".format(res.split('=')[1].strip())
@ -1188,7 +1196,10 @@ def create_named_keyring(entity, name, caps=None):
for subsystem, subcaps in caps.items():
cmd.extend([subsystem, '; '.join(subcaps)])
log("Calling check_output: {}".format(cmd), level=DEBUG)
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103
return (parse_key(str(subprocess
.check_output(cmd)
.decode('UTF-8'))
.strip())) # IGNORE:E1103
def get_upgrade_key():
@ -1205,7 +1216,7 @@ def get_named_key(name, caps=None, pool_list=None):
"""
try:
# Does the key already exist?
output = subprocess.check_output(
output = str(subprocess.check_output(
[
'sudo',
'-u', ceph_user(),
@ -1218,7 +1229,7 @@ def get_named_key(name, caps=None, pool_list=None):
'auth',
'get',
'client.{}'.format(name),
]).strip()
]).decode('UTF-8')).strip()
return parse_key(output)
except subprocess.CalledProcessError:
# Couldn't get the key, time to create it!
@ -1247,7 +1258,10 @@ def get_named_key(name, caps=None, pool_list=None):
cmd.extend([subsystem, '; '.join(subcaps)])
log("Calling check_output: {}".format(cmd), level=DEBUG)
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103
return parse_key(str(subprocess
.check_output(cmd)
.decode('UTF-8'))
.strip()) # IGNORE:E1103
def upgrade_key_caps(key, caps):
@ -1361,7 +1375,7 @@ def maybe_zap_journal(journal_dev):
def get_partitions(dev):
cmd = ['partx', '--raw', '--noheadings', dev]
try:
out = subprocess.check_output(cmd).splitlines()
out = str(subprocess.check_output(cmd).decode('UTF-8')).splitlines()
log("get partitions: {}".format(out), level=DEBUG)
return out
except subprocess.CalledProcessError as e:
@ -1529,7 +1543,7 @@ def get_running_osds():
"""Returns a list of the pids of the current running OSD daemons"""
cmd = ['pgrep', 'ceph-osd']
try:
result = subprocess.check_output(cmd)
result = str(subprocess.check_output(cmd).decode('UTF-8'))
return result.split()
except subprocess.CalledProcessError:
return []
@ -1545,7 +1559,9 @@ def get_cephfs(service):
# This command wasn't introduced until 0.86 ceph
return []
try:
output = subprocess.check_output(["ceph", '--id', service, "fs", "ls"])
output = str(subprocess
.check_output(["ceph", '--id', service, "fs", "ls"])
.decode('UTF-8'))
if not output:
return []
"""
@ -2079,7 +2095,9 @@ def list_pools(service):
"""
try:
pool_list = []
pools = subprocess.check_output(['rados', '--id', service, 'lspools'])
pools = str(subprocess
.check_output(['rados', '--id', service, 'lspools'])
.decode('UTF-8'))
for pool in pools.splitlines():
pool_list.append(pool)
return pool_list
@ -2140,10 +2158,8 @@ UCA_CODENAME_MAP = {
def pretty_print_upgrade_paths():
"""Pretty print supported upgrade paths for ceph"""
lines = []
for key, value in UPGRADE_PATHS.iteritems():
lines.append("{} -> {}".format(key, value))
return lines
return ["{} -> {}".format(key, value)
for key, value in UPGRADE_PATHS.iteritems()]
def resolve_ceph_version(source):
@ -2163,7 +2179,9 @@ def get_ceph_pg_stat():
:returns: dict
"""
try:
tree = subprocess.check_output(['ceph', 'pg', 'stat', '--format=json'])
tree = str(subprocess
.check_output(['ceph', 'pg', 'stat', '--format=json'])
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
if not json_tree['num_pg_by_state']:
@ -2187,8 +2205,9 @@ def get_ceph_health():
status, use get_ceph_health()['overall_status'].
"""
try:
tree = subprocess.check_output(
['ceph', 'status', '--format=json'])
tree = str(subprocess
.check_output(['ceph', 'status', '--format=json'])
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
# Make sure children are present in the json
@ -2215,9 +2234,12 @@ def reweight_osd(osd_num, new_weight):
:raises CalledProcessError: if an error occurs invoking the systemd cmd
"""
try:
cmd_result = subprocess.check_output(
['ceph', 'osd', 'crush', 'reweight', "osd.{}".format(osd_num),
new_weight], stderr=subprocess.STDOUT)
cmd_result = str(subprocess
.check_output(['ceph', 'osd', 'crush',
'reweight', "osd.{}".format(osd_num),
new_weight],
stderr=subprocess.STDOUT)
.decode('UTF-8'))
expected_result = "reweighted item id {ID} name \'osd.{ID}\'".format(
ID=osd_num) + " to {}".format(new_weight)
log(cmd_result)

View File

@ -49,7 +49,7 @@ if sys.argv[-1] == 'publish':
if sys.argv[-1] == 'tag':
os.system("git tag -a %s -m 'version %s'" % (version, version))
os.system("git tag -a {0} -m 'version {0}'".format(version))
os.system("git push --tags")
sys.exit()

View File

@ -1,5 +1,5 @@
[tox]
envlist = pep8,py27,py34,py35
envlist = pep8,py27,py34,py35,py36
skipsdist = True
skip_missing_interpreters = True
@ -32,6 +32,11 @@ basepython = python3.5
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
[testenv:py36]
basepython = python3.6
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
[testenv:pep8]
basepython = python2.7
deps = -r{toxinidir}/requirements.txt

View File

@ -84,7 +84,7 @@ class CephTestCase(unittest.TestCase):
@patch.object(utils.subprocess, 'check_output')
def test_get_osd_weight(self, output):
"""It gives an OSD's weight"""
output.return_value = """{
output.return_value = b"""{
"nodes": [{
"id": -1,
"name": "default",
@ -152,7 +152,7 @@ class CephTestCase(unittest.TestCase):
@patch.object(utils, "ceph_user", lambda: "ceph")
@patch.object(utils.socket, "gethostname", lambda: "osd001")
def test_get_named_key_with_pool(self, mock_check_output):
mock_check_output.side_effect = [CalledProcessError(0, 0, 0), ""]
mock_check_output.side_effect = [CalledProcessError(0, 0, 0), b""]
utils.get_named_key(name="rgw001", pool_list=["rbd", "block"])
mock_check_output.assert_has_calls([
call(['sudo', '-u', 'ceph', 'ceph', '--name',
@ -170,7 +170,7 @@ class CephTestCase(unittest.TestCase):
@patch.object(utils, 'ceph_user', lambda: "ceph")
@patch.object(utils.socket, "gethostname", lambda: "osd001")
def test_get_named_key(self, mock_check_output):
mock_check_output.side_effect = [CalledProcessError(0, 0, 0), ""]
mock_check_output.side_effect = [CalledProcessError(0, 0, 0), b""]
utils.get_named_key(name="rgw001")
mock_check_output.assert_has_calls([
call(['sudo', '-u', 'ceph', 'ceph', '--name',
@ -225,14 +225,14 @@ class CephTestCase(unittest.TestCase):
@patch.object(utils.subprocess, 'check_output')
def test_get_partition_list(self, output):
with open('unit_tests/partx_output', 'r') as partx_out:
output.return_value = partx_out.read()
output.return_value = partx_out.read().encode('UTF-8')
partition_list = utils.get_partition_list('/dev/xvdb')
self.assertEqual(len(partition_list), 2)
@patch.object(utils.subprocess, 'check_output')
def test_get_ceph_pg_stat(self, output):
"""It returns the current PG stat"""
output.return_value = """{
output.return_value = b"""{
"num_pg_by_state": [
{
"name": "active+clean",
@ -252,7 +252,7 @@ class CephTestCase(unittest.TestCase):
@patch.object(utils.subprocess, 'check_output')
def test_get_ceph_health(self, output):
"""It gives the current Ceph health"""
output.return_value = """{
output.return_value = b"""{
"health": {
"health_services": [
{
@ -344,7 +344,7 @@ class CephTestCase(unittest.TestCase):
@patch.object(utils.subprocess, 'check_output')
def test_reweight_osd(self, mock_reweight):
"""It changes the weight of an OSD"""
mock_reweight.return_value = "reweighted item id 0 name 'osd.0' to 1"
mock_reweight.return_value = b"reweighted item id 0 name 'osd.0' to 1"
reweight_result = utils.reweight_osd('0', '1')
self.assertEqual(reweight_result, True)
mock_reweight.assert_called_once_with(