charms.ceph sync for py3 bug fix

Change-Id: I995b98ece695a3de7c7010c5850b5dbe5f7a6b6e
Partial-Bug: #1735720
This commit is contained in:
Liam Young 2017-12-01 13:39:01 +00:00
parent 9d095b31d9
commit a0060f4163
3 changed files with 147 additions and 92 deletions

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import json
import os
@ -134,7 +135,7 @@ def process_requests(reqs):
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
msg = ("Missing or invalid api version (%s)" % version)
msg = ("Missing or invalid api version ({})".format(version))
resp = {'exit-code': 1, 'stderr': msg}
if request_id:
resp['request-id'] = request_id
@ -231,7 +232,7 @@ def add_pool_to_group(pool, group, namespace=None):
def pool_permission_list_for_service(service):
"""Build the permission string for Ceph for a given service"""
permissions = []
permission_types = {}
permission_types = collections.OrderedDict()
for permission, group in service["group_names"].items():
if permission not in permission_types:
permission_types[permission] = []
@ -267,9 +268,7 @@ def get_service_groups(service, namespace=None):
key="cephx.services.{}".format(service))
try:
service = json.loads(service_json)
except TypeError:
service = None
except ValueError:
except (TypeError, ValueError):
service = None
if service:
service['groups'] = _build_service_groups(service, namespace)
@ -296,7 +295,7 @@ def _build_service_groups(service, namespace=None):
}
"""
all_groups = {}
for _, groups in service['group_names'].items():
for groups in service['group_names'].values():
for group in groups:
name = group
if namespace:
@ -316,9 +315,7 @@ def get_group(group_name):
group_json = monitor_key_get(service='admin', key=group_key)
try:
group = json.loads(group_json)
except TypeError:
group = None
except ValueError:
except (TypeError, ValueError):
group = None
if not group:
group = {
@ -391,9 +388,8 @@ def handle_erasure_pool(request, service):
percent_data=weight)
# Ok make the erasure pool
if not pool_exists(service=service, name=pool_name):
log("Creating pool '%s' (erasure_profile=%s)" % (pool.name,
erasure_profile),
level=INFO)
log("Creating pool '{}' (erasure_profile={})"
.format(pool.name, erasure_profile), level=INFO)
pool.create()
# Set a quota if requested
@ -446,11 +442,11 @@ def handle_replicated_pool(request, service):
pool = ReplicatedPool(service=service,
name=pool_name, **kwargs)
if not pool_exists(service=service, name=pool_name):
log("Creating pool '%s' (replicas=%s)" % (pool.name, replicas),
log("Creating pool '{}' (replicas={})".format(pool.name, replicas),
level=INFO)
pool.create()
else:
log("Pool '%s' already exists - skipping create" % pool.name,
log("Pool '{}' already exists - skipping create".format(pool.name),
level=DEBUG)
# Set a quota if requested
@ -519,7 +515,7 @@ def handle_set_pool_value(request, service):
'key': request.get('key'),
'value': request.get('value')}
if params['key'] not in POOL_KEYS:
msg = "Invalid key '%s'" % params['key']
msg = "Invalid key '{}'".format(params['key'])
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
@ -685,7 +681,7 @@ def handle_rgw_create_user(request, service):
]
)
try:
user_json = json.loads(create_output)
user_json = json.loads(str(create_output.decode('UTF-8')))
return {'exit-code': 0, 'user': user_json}
except ValueError as err:
log(err, level=ERROR)
@ -790,10 +786,10 @@ def process_requests_v1(reqs):
operation failed along with an explanation).
"""
ret = None
log("Processing %s ceph broker requests" % (len(reqs)), level=INFO)
log("Processing {} ceph broker requests".format(len(reqs)), level=INFO)
for req in reqs:
op = req.get('op')
log("Processing op='%s'" % op, level=DEBUG)
log("Processing op='{}'".format(op), level=DEBUG)
# Use admin client since we do not have other client key locations
# setup to use them for these operations.
svc = 'admin'
@ -848,7 +844,7 @@ def process_requests_v1(reqs):
elif op == "add-permissions-to-key":
ret = handle_add_permissions_to_key(request=req, service=svc)
else:
msg = "Unknown operation '%s'" % op
msg = "Unknown operation '{}'".format(op)
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}

View File

@ -60,7 +60,7 @@ class Crushmap(object):
ids = list(map(
lambda x: int(x),
re.findall(CRUSHMAP_ID_RE, self._crushmap)))
ids.sort()
ids = sorted(ids)
if roots != []:
for root in roots:
buckets.append(CRUSHBucket(root[0], root[1], True))
@ -73,8 +73,11 @@ class Crushmap(object):
def load_crushmap(self):
try:
crush = check_output(['ceph', 'osd', 'getcrushmap'])
return check_output(['crushtool', '-d', '-'], stdin=crush.stdout)
crush = str(check_output(['ceph', 'osd', 'getcrushmap'])
.decode('UTF-8'))
return str(check_output(['crushtool', '-d', '-'],
stdin=crush.stdout)
.decode('UTF-8'))
except CalledProcessError as e:
log("Error occured while loading and decompiling CRUSH map:"
"{}".format(e), ERROR)
@ -99,10 +102,12 @@ class Crushmap(object):
"""Persist Crushmap to Ceph"""
try:
crushmap = self.build_crushmap()
compiled = check_output(['crushtool', '-c', '/dev/stdin', '-o',
'/dev/stdout'], stdin=crushmap)
ceph_output = check_output(['ceph', 'osd', 'setcrushmap', '-i',
'/dev/stdin'], stdin=compiled)
compiled = str(check_output(['crushtool', '-c', '/dev/stdin', '-o',
'/dev/stdout'], stdin=crushmap)
.decode('UTF-8'))
ceph_output = str(check_output(['ceph', 'osd', 'setcrushmap', '-i',
'/dev/stdin'], stdin=compiled)
.decode('UTF-8'))
return ceph_output
except CalledProcessError as e:
log("save error: {}".format(e))

View File

@ -195,7 +195,7 @@ def save_sysctls(sysctl_dict, save_location):
except IOError as e:
log("Unable to persist sysctl settings to {}. Error {}".format(
save_location, e.message), level=ERROR)
save_location, e), level=ERROR)
raise
@ -221,7 +221,7 @@ def tune_nic(network_interface):
save_location=sysctl_file)
except IOError as e:
log("Write to /etc/sysctl.d/51-ceph-osd-charm-{} "
"failed. {}".format(network_interface, e.message),
"failed. {}".format(network_interface, e),
level=ERROR)
try:
@ -266,7 +266,7 @@ def get_link_speed(network_interface):
except IOError as e:
log("Unable to open {path} because of error: {error}".format(
path=speed_path,
error=e.message), level='error')
error=e), level='error')
return LinkSpeed["UNKNOWN"]
@ -286,13 +286,13 @@ def persist_settings(settings_dict):
context=settings_dict)
except IOError as err:
log("Unable to open {path} because of error: {error}".format(
path=HDPARM_FILE, error=err.message), level=ERROR)
path=HDPARM_FILE, error=err), level=ERROR)
except Exception as e:
# The templating.render can raise a jinja2 exception if the
# template is not found. Rather than polluting the import
# space of this charm, simply catch Exception
log('Unable to render {path} due to error: {error}'.format(
path=HDPARM_FILE, error=e.message), level=ERROR)
path=HDPARM_FILE, error=e), level=ERROR)
def set_max_sectors_kb(dev_name, max_sectors_size):
@ -308,7 +308,7 @@ def set_max_sectors_kb(dev_name, max_sectors_size):
f.write(max_sectors_size)
except IOError as e:
log('Failed to write max_sectors_kb to {}. Error: {}'.format(
max_sectors_kb_path, e.message), level=ERROR)
max_sectors_kb_path, e), level=ERROR)
def get_max_sectors_kb(dev_name):
@ -328,7 +328,7 @@ def get_max_sectors_kb(dev_name):
return int(max_sectors_kb)
except IOError as e:
log('Failed to read max_sectors_kb to {}. Error: {}'.format(
max_sectors_kb_path, e.message), level=ERROR)
max_sectors_kb_path, e), level=ERROR)
# Bail.
return 0
return 0
@ -350,7 +350,7 @@ def get_max_hw_sectors_kb(dev_name):
return int(max_hw_sectors_kb)
except IOError as e:
log('Failed to read max_hw_sectors_kb to {}. Error: {}'.format(
max_hw_sectors_kb_path, e.message), level=ERROR)
max_hw_sectors_kb_path, e), level=ERROR)
return 0
return 0
@ -381,8 +381,9 @@ def get_block_uuid(block_dev):
:returns: The UUID of the device or None on Error.
"""
try:
block_info = subprocess.check_output(
['blkid', '-o', 'export', block_dev])
block_info = str(subprocess
.check_output(['blkid', '-o', 'export', block_dev])
.decode('UTF-8'))
for tag in block_info.split('\n'):
parts = tag.split('=')
if parts[0] == 'UUID':
@ -533,8 +534,9 @@ def get_osd_weight(osd_id):
:raises: CalledProcessError if our ceph command fails.
"""
try:
tree = subprocess.check_output(
['ceph', 'osd', 'tree', '--format=json'])
tree = str(subprocess
.check_output(['ceph', 'osd', 'tree', '--format=json'])
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
# Make sure children are present in the json
@ -545,11 +547,11 @@ def get_osd_weight(osd_id):
return device['crush_weight']
except ValueError as v:
log("Unable to parse ceph tree json: {}. Error: {}".format(
tree, v.message))
tree, v))
raise
except subprocess.CalledProcessError as e:
log("ceph osd tree command failed with message: {}".format(
e.message))
e))
raise
@ -561,9 +563,10 @@ def get_osd_tree(service):
Also raises CalledProcessError if our ceph command fails
"""
try:
tree = subprocess.check_output(
['ceph', '--id', service,
'osd', 'tree', '--format=json'])
tree = str(subprocess
.check_output(['ceph', '--id', service,
'osd', 'tree', '--format=json'])
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
crush_list = []
@ -588,11 +591,11 @@ def get_osd_tree(service):
return crush_list
except ValueError as v:
log("Unable to parse ceph tree json: {}. Error: {}".format(
tree, v.message))
tree, v))
raise
except subprocess.CalledProcessError as e:
log("ceph osd tree command failed with message: {}".format(
e.message))
e))
raise
@ -628,7 +631,7 @@ def _get_osd_num_from_dirname(dirname):
"""
match = re.search('ceph-(?P<osd_id>\d+)', dirname)
if not match:
raise ValueError("dirname not in correct format: %s" % dirname)
raise ValueError("dirname not in correct format: {}".format(dirname))
return match.group('osd_id')
@ -718,7 +721,7 @@ def get_version():
def error_out(msg):
log("FATAL ERROR: %s" % msg,
log("FATAL ERROR: {}".format(msg),
level=ERROR)
sys.exit(1)
@ -736,7 +739,9 @@ def is_quorum():
]
if os.path.exists(asok):
try:
result = json.loads(subprocess.check_output(cmd))
result = json.loads(str(subprocess
.check_output(cmd)
.decode('UTF-8')))
except subprocess.CalledProcessError:
return False
except ValueError:
@ -763,7 +768,9 @@ def is_leader():
]
if os.path.exists(asok):
try:
result = json.loads(subprocess.check_output(cmd))
result = json.loads(str(subprocess
.check_output(cmd)
.decode('UTF-8')))
except subprocess.CalledProcessError:
return False
except ValueError:
@ -937,15 +944,27 @@ def get_partition_list(dev):
# For each line of output
for partition in partitions:
parts = partition.split()
partitions_list.append(
Partition(number=parts[0],
start=parts[1],
end=parts[2],
sectors=parts[3],
size=parts[4],
name=parts[5],
uuid=parts[6])
)
try:
partitions_list.append(
Partition(number=parts[0],
start=parts[1],
end=parts[2],
sectors=parts[3],
size=parts[4],
name=parts[5],
uuid=parts[6])
)
except IndexError:
partitions_list.append(
Partition(number=parts[0],
start=parts[1],
end=parts[2],
sectors=parts[3],
size=parts[4],
name="",
uuid=parts[5])
)
return partitions_list
except subprocess.CalledProcessError:
raise
@ -955,8 +974,9 @@ def is_osd_disk(dev):
partitions = get_partition_list(dev)
for partition in partitions:
try:
info = subprocess.check_output(['sgdisk', '-i', partition.number,
dev])
info = str(subprocess
.check_output(['sgdisk', '-i', partition.number, dev])
.decode('UTF-8'))
info = info.split("\n") # IGNORE:E1103
for line in info:
for ptype in CEPH_PARTITIONS:
@ -965,7 +985,7 @@ def is_osd_disk(dev):
return True
except subprocess.CalledProcessError as e:
log("sgdisk inspection of partition {} on {} failed with "
"error: {}. Skipping".format(partition.minor, dev, e.message),
"error: {}. Skipping".format(partition.minor, dev, e),
level=ERROR)
return False
@ -1039,7 +1059,7 @@ def generate_monitor_secret():
'--name=mon.',
'--gen-key'
]
res = subprocess.check_output(cmd)
res = str(subprocess.check_output(cmd).decode('UTF-8'))
return "{}==".format(res.split('=')[1].strip())
@ -1188,7 +1208,10 @@ def create_named_keyring(entity, name, caps=None):
for subsystem, subcaps in caps.items():
cmd.extend([subsystem, '; '.join(subcaps)])
log("Calling check_output: {}".format(cmd), level=DEBUG)
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103
return (parse_key(str(subprocess
.check_output(cmd)
.decode('UTF-8'))
.strip())) # IGNORE:E1103
def get_upgrade_key():
@ -1205,7 +1228,7 @@ def get_named_key(name, caps=None, pool_list=None):
"""
try:
# Does the key already exist?
output = subprocess.check_output(
output = str(subprocess.check_output(
[
'sudo',
'-u', ceph_user(),
@ -1218,7 +1241,7 @@ def get_named_key(name, caps=None, pool_list=None):
'auth',
'get',
'client.{}'.format(name),
]).strip()
]).decode('UTF-8')).strip()
return parse_key(output)
except subprocess.CalledProcessError:
# Couldn't get the key, time to create it!
@ -1247,7 +1270,10 @@ def get_named_key(name, caps=None, pool_list=None):
cmd.extend([subsystem, '; '.join(subcaps)])
log("Calling check_output: {}".format(cmd), level=DEBUG)
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103
return parse_key(str(subprocess
.check_output(cmd)
.decode('UTF-8'))
.strip()) # IGNORE:E1103
def upgrade_key_caps(key, caps):
@ -1361,7 +1387,7 @@ def maybe_zap_journal(journal_dev):
def get_partitions(dev):
cmd = ['partx', '--raw', '--noheadings', dev]
try:
out = subprocess.check_output(cmd).splitlines()
out = str(subprocess.check_output(cmd).decode('UTF-8')).splitlines()
log("get partitions: {}".format(out), level=DEBUG)
return out
except subprocess.CalledProcessError as e:
@ -1529,7 +1555,7 @@ def get_running_osds():
"""Returns a list of the pids of the current running OSD daemons"""
cmd = ['pgrep', 'ceph-osd']
try:
result = subprocess.check_output(cmd)
result = str(subprocess.check_output(cmd).decode('UTF-8'))
return result.split()
except subprocess.CalledProcessError:
return []
@ -1545,7 +1571,9 @@ def get_cephfs(service):
# This command wasn't introduced until 0.86 ceph
return []
try:
output = subprocess.check_output(["ceph", '--id', service, "fs", "ls"])
output = str(subprocess
.check_output(["ceph", '--id', service, "fs", "ls"])
.decode('UTF-8'))
if not output:
return []
"""
@ -1666,7 +1694,7 @@ def upgrade_monitor(new_version):
apt_update(fatal=True)
except subprocess.CalledProcessError as err:
log("Adding the ceph source failed with message: {}".format(
err.message))
err))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
try:
@ -1695,7 +1723,7 @@ def upgrade_monitor(new_version):
service_start('ceph-mon-all')
except subprocess.CalledProcessError as err:
log("Stopping ceph and upgrading packages failed "
"with message: {}".format(err.message))
"with message: {}".format(err))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
@ -1879,7 +1907,7 @@ def upgrade_osd(new_version):
apt_update(fatal=True)
except subprocess.CalledProcessError as err:
log("Adding the ceph sources failed with message: {}".format(
err.message))
err))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
@ -1924,7 +1952,7 @@ def upgrade_osd(new_version):
except (subprocess.CalledProcessError, IOError) as err:
log("Stopping ceph and upgrading packages failed "
"with message: {}".format(err.message))
"with message: {}".format(err))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
@ -2079,7 +2107,9 @@ def list_pools(service):
"""
try:
pool_list = []
pools = subprocess.check_output(['rados', '--id', service, 'lspools'])
pools = str(subprocess
.check_output(['rados', '--id', service, 'lspools'])
.decode('UTF-8'))
for pool in pools.splitlines():
pool_list.append(pool)
return pool_list
@ -2140,10 +2170,8 @@ UCA_CODENAME_MAP = {
def pretty_print_upgrade_paths():
"""Pretty print supported upgrade paths for ceph"""
lines = []
for key, value in UPGRADE_PATHS.iteritems():
lines.append("{} -> {}".format(key, value))
return lines
return ["{} -> {}".format(key, value)
for key, value in UPGRADE_PATHS.items()]
def resolve_ceph_version(source):
@ -2163,7 +2191,9 @@ def get_ceph_pg_stat():
:returns: dict
"""
try:
tree = subprocess.check_output(['ceph', 'pg', 'stat', '--format=json'])
tree = str(subprocess
.check_output(['ceph', 'pg', 'stat', '--format=json'])
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
if not json_tree['num_pg_by_state']:
@ -2171,11 +2201,10 @@ def get_ceph_pg_stat():
return json_tree
except ValueError as v:
log("Unable to parse ceph pg stat json: {}. Error: {}".format(
tree, v.message))
tree, v))
raise
except subprocess.CalledProcessError as e:
log("ceph pg stat command failed with message: {}".format(
e.message))
log("ceph pg stat command failed with message: {}".format(e))
raise
@ -2187,8 +2216,9 @@ def get_ceph_health():
status, use get_ceph_health()['overall_status'].
"""
try:
tree = subprocess.check_output(
['ceph', 'status', '--format=json'])
tree = str(subprocess
.check_output(['ceph', 'status', '--format=json'])
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
# Make sure children are present in the json
@ -2198,11 +2228,10 @@ def get_ceph_health():
return json_tree
except ValueError as v:
log("Unable to parse ceph tree json: {}. Error: {}".format(
tree, v.message))
tree, v))
raise
except subprocess.CalledProcessError as e:
log("ceph status command failed with message: {}".format(
e.message))
log("ceph status command failed with message: {}".format(e))
raise
@ -2215,9 +2244,12 @@ def reweight_osd(osd_num, new_weight):
:raises CalledProcessError: if an error occurs invoking the systemd cmd
"""
try:
cmd_result = subprocess.check_output(
['ceph', 'osd', 'crush', 'reweight', "osd.{}".format(osd_num),
new_weight], stderr=subprocess.STDOUT)
cmd_result = str(subprocess
.check_output(['ceph', 'osd', 'crush',
'reweight', "osd.{}".format(osd_num),
new_weight],
stderr=subprocess.STDOUT)
.decode('UTF-8'))
expected_result = "reweighted item id {ID} name \'osd.{ID}\'".format(
ID=osd_num) + " to {}".format(new_weight)
log(cmd_result)
@ -2225,8 +2257,8 @@ def reweight_osd(osd_num, new_weight):
return True
return False
except subprocess.CalledProcessError as e:
log("ceph osd crush reweight command failed with message: {}".format(
e.message))
log("ceph osd crush reweight command failed"
" with message: {}".format(e))
raise
@ -2260,3 +2292,25 @@ def bootstrap_manager():
unit = 'ceph-mgr@{}'.format(hostname)
subprocess.check_call(['systemctl', 'enable', unit])
service_restart(unit)
def osd_noout(enable):
"""Sets or unsets 'noout'
:param enable: bool. True to set noout, False to unset.
:returns: bool. True if output looks right.
:raises CalledProcessError: if an error occurs invoking the systemd cmd
"""
operation = {
True: 'set',
False: 'unset',
}
try:
subprocess.check_call(['ceph', '--id', 'admin',
'osd', operation[enable],
'noout'])
log('running ceph osd {} noout'.format(operation[enable]))
return True
except subprocess.CalledProcessError as e:
log(e)
raise