Fix parsing information from ceph tree output

Add helper functions to parse information from the ceph tree and
fix creating CrushLocation object in get_osd_tree function.

Closes-Bug: #1918721
Change-Id: I59c742e594042ad527e71c88404999459f0373c2
This commit is contained in:
Robert Gildein 2021-03-12 16:20:43 +01:00
parent e8240a764e
commit 5a6fbb88fd
2 changed files with 118 additions and 47 deletions

View File

@ -14,6 +14,7 @@
import collections
import glob
import itertools
import json
import os
import pyudev
@ -502,30 +503,33 @@ def ceph_user():
class CrushLocation(object):
def __init__(self,
name,
identifier,
host,
rack,
row,
datacenter,
chassis,
root):
self.name = name
def __init__(self, identifier, name, osd="", host="", chassis="",
rack="", row="", pdu="", pod="", room="",
datacenter="", zone="", region="", root=""):
self.identifier = identifier
self.name = name
self.osd = osd
self.host = host
self.chassis = chassis
self.rack = rack
self.row = row
self.pdu = pdu
self.pod = pod
self.room = room
self.datacenter = datacenter
self.chassis = chassis
self.zone = zone
self.region = region
self.root = root
def __str__(self):
return "name: {} id: {} host: {} rack: {} row: {} datacenter: {} " \
"chassis :{} root: {}".format(self.name, self.identifier,
self.host, self.rack, self.row,
self.datacenter, self.chassis,
self.root)
return "name: {} id: {} osd: {} host: {} chassis: {} rack: {} " \
"row: {} pdu: {} pod: {} room: {} datacenter: {} zone: {} " \
"region: {} root: {}".format(self.name, self.identifier,
self.osd, self.host, self.chassis,
self.rack, self.row, self.pdu,
self.pod, self.room,
self.datacenter, self.zone,
self.region, self.root)
def __eq__(self, other):
return not self.name < other.name and not other.name < self.name
@ -572,10 +576,53 @@ def get_osd_weight(osd_id):
raise
def _filter_nodes_and_set_attributes(node, node_lookup_map, lookup_type):
"""Get all nodes of the desired type, with all their attributes.
These attributes can be direct or inherited from ancestors.
"""
attribute_dict = {node['type']: node['name']}
if node['type'] == lookup_type:
attribute_dict['name'] = node['name']
attribute_dict['identifier'] = node['id']
return [attribute_dict]
elif not node.get('children'):
return [attribute_dict]
else:
descendant_attribute_dicts = [
_filter_nodes_and_set_attributes(node_lookup_map[node_id],
node_lookup_map, lookup_type)
for node_id in node.get('children', [])
]
return [dict(attribute_dict, **descendant_attribute_dict)
for descendant_attribute_dict
in itertools.chain.from_iterable(descendant_attribute_dicts)]
def _flatten_roots(nodes, lookup_type='host'):
"""Get a flattened list of nodes of the desired type.
:param nodes: list of nodes defined as a dictionary of attributes and
children
:type nodes: List[Dict[int, Any]]
:param lookup_type: type of searched node
:type lookup_type: str
:returns: flattened list of nodes
:rtype: List[Dict[str, Any]]
"""
lookup_map = {node['id']: node for node in nodes}
root_attributes_dicts = [_filter_nodes_and_set_attributes(node, lookup_map,
lookup_type)
for node in nodes if node['type'] == 'root']
# get a flattened list of roots.
return list(itertools.chain.from_iterable(root_attributes_dicts))
def get_osd_tree(service):
"""Returns the current osd map in JSON.
:returns: List.
:rtype: List[CrushLocation]
:raises: ValueError if the monmap fails to parse.
Also raises CalledProcessError if our ceph command fails
"""
@ -586,35 +633,14 @@ def get_osd_tree(service):
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
crush_list = []
# Make sure children are present in the json
if not json_tree['nodes']:
return None
host_nodes = [
node for node in json_tree['nodes']
if node['type'] == 'host'
]
for host in host_nodes:
crush_list.append(
CrushLocation(
name=host.get('name'),
identifier=host['id'],
host=host.get('host'),
rack=host.get('rack'),
row=host.get('row'),
datacenter=host.get('datacenter'),
chassis=host.get('chassis'),
root=host.get('root')
)
)
return crush_list
roots = _flatten_roots(json_tree["nodes"])
return [CrushLocation(**host) for host in roots]
except ValueError as v:
log("Unable to parse ceph tree json: {}. Error: {}".format(
tree, v))
raise
except subprocess.CalledProcessError as e:
log("ceph osd tree command failed with message: {}".format(
e))
log("ceph osd tree command failed with message: {}".format(e))
raise

View File

@ -315,6 +315,38 @@ class CephTestCase(unittest.TestCase):
weight = utils.get_osd_weight('osd.0')
self.assertEqual(weight, 0.002899)
def test_flatten_roots(self):
nodes = [
{"id": -6, "name": "default", "type": "root", "children": [-5]},
{"id": -5, "name": "custom", "type": "row", "children": [-3, -4]},
{"id": -4, "name": "az.0", "type": "rack", "children": [-2]},
{"id": -2, "name": "test-host.0", "type": "host", "children": [0]},
{"id": 0, "name": "osd.0", "type": "osd"},
{"id": -3, "name": "az.1", "type": "rack", "children": [-1]},
{"id": -1, "name": "test-host.1", "type": "host", "children": [1]},
{"id": 1, "name": "osd.1", "type": "osd"},
]
host_nodes = utils._flatten_roots(nodes)
self.assertEqual(len(host_nodes), 2)
self.assertEqual(host_nodes[0]["identifier"], -1)
self.assertEqual(host_nodes[0]["rack"], "az.1")
self.assertEqual(host_nodes[0]["row"], "custom")
self.assertEqual(host_nodes[0]["root"], "default")
self.assertEqual(host_nodes[1]["identifier"], -2)
self.assertEqual(host_nodes[1]["rack"], "az.0")
self.assertEqual(host_nodes[1]["row"], "custom")
self.assertEqual(host_nodes[1]["root"], "default")
rack_nodes = utils._flatten_roots(nodes, "rack")
self.assertEqual(len(rack_nodes), 2)
self.assertEqual(rack_nodes[0]["identifier"], -3)
self.assertEqual(rack_nodes[0]["row"], "custom")
self.assertEqual(rack_nodes[0]["root"], "default")
self.assertEqual(rack_nodes[1]["identifier"], -4)
self.assertEqual(rack_nodes[1]["row"], "custom")
self.assertEqual(rack_nodes[1]["root"], "default")
@patch.object(utils.subprocess, 'check_output')
def test_get_osd_tree_multi_root(self, mock_check_output):
mock_check_output.return_value = b"""{
@ -394,8 +426,13 @@ class CephTestCase(unittest.TestCase):
],"stray":[]}
"""
osd_tree = utils.get_osd_tree('test')
self.assertEqual(osd_tree[0].name, "OS-CS-10")
self.assertEqual(osd_tree[-1].name, "OS-CS-06")
self.assertEqual(len(osd_tree), 10)
self.assertEqual(osd_tree[0].identifier, -11)
self.assertEqual(osd_tree[0].name, "OS-CS-08")
self.assertEqual(osd_tree[-1].identifier, -2)
self.assertEqual(osd_tree[-1].name, "OS-CS-05")
self.assertEqual(osd_tree[0].root, "ssds")
self.assertEqual(osd_tree[-1].root, "default")
@patch.object(utils.subprocess, 'check_output')
def test_get_osd_tree_multi_root_with_hierarchy(self, mock_check_output):
@ -434,9 +471,15 @@ class CephTestCase(unittest.TestCase):
{"id":23,"name":"osd.23","type":"osd","type_id":0,"crush_weight":1.429993,"depth":3,"pool_weights":{},"exists":1,"status":"up","reweight":1.000000,"primary_affinity":1.000000}],
"stray":[]}'''
osd_tree = utils.get_osd_tree('test')
self.assertTrue(len(osd_tree) == 4)
self.assertEqual(osd_tree[0].name, "uat-l-stor-11")
self.assertEqual(osd_tree[-1].name, "uat-l-stor-14")
self.assertEqual(len(osd_tree), 4)
self.assertEqual(osd_tree[0].identifier, -7)
self.assertEqual(osd_tree[0].name, "uat-l-stor-14")
self.assertEqual(osd_tree[-1].identifier, -4)
self.assertEqual(osd_tree[-1].name, "uat-l-stor-11")
self.assertEqual(osd_tree[0].root, "default")
self.assertEqual(osd_tree[-1].root, "default")
self.assertEqual(osd_tree[0].rack, "ssd")
self.assertEqual(osd_tree[-1].rack, "sata")
@patch.object(utils.subprocess, 'check_output')
def test_get_osd_tree_single_root(self, mock_check_output):
@ -451,8 +494,10 @@ class CephTestCase(unittest.TestCase):
{"id":2,"name":"osd.2","type":"osd","type_id":0,"crush_weight":0.000092,"depth":2,"exists":1,"status":"up","reweight":1.000000,"primary_affinity":1.000000}],
"stray":[]}"""
osd_tree = utils.get_osd_tree('test')
self.assertEqual(osd_tree[0].name, "juju-9d5cf0-icey-4")
self.assertEqual(osd_tree[-1].name, "juju-9d5cf0-icey-5")
self.assertEqual(osd_tree[0].name, "juju-9d5cf0-icey-5")
self.assertEqual(osd_tree[-1].name, "juju-9d5cf0-icey-4")
self.assertEqual(osd_tree[0].root, "default")
self.assertEqual(osd_tree[-1].root, "default")
@patch.object(utils.subprocess, 'check_output')
@patch.object(utils, "ceph_user", lambda: "ceph")