Only consider mounted OSD directories

When gathering the list of local OSD ids, the charm would consider
the entries under '/var/lib/ceph/osd/ceph-XXX' where 'XXX" was the
OSD id. However, if an entry under that directory isn't mounted,
then the OSD that would represent that entry should be discarded,
as it's no longer active. This patchset thus filters those entries
by looking for them in the mount points.

Closes-Bug: #1934938
Change-Id: I69c6356e450cc0c96de4afe571b438d4a2ea5177
This commit is contained in:
Luciano Lo Giudice 2021-08-30 18:13:43 -03:00
parent e5ac333a97
commit 93e9885aa7
3 changed files with 223 additions and 47 deletions

View File

@ -79,9 +79,9 @@ class Crushmap(object):
stdin=crush.stdout)
.decode('UTF-8'))
except CalledProcessError as e:
log("Error occured while loading and decompiling CRUSH map:"
log("Error occurred while loading and decompiling CRUSH map:"
"{}".format(e), ERROR)
raise "Failed to read CRUSH map"
raise
def ensure_bucket_is_present(self, bucket_name):
if bucket_name not in [bucket.name for bucket in self.buckets()]:
@ -111,7 +111,7 @@ class Crushmap(object):
return ceph_output
except CalledProcessError as e:
log("save error: {}".format(e))
raise "Failed to save CRUSH map."
raise
def build_crushmap(self):
"""Modifies the current CRUSH map to include the new buckets"""

View File

@ -14,6 +14,7 @@
import collections
import glob
import itertools
import json
import os
import pyudev
@ -24,6 +25,7 @@ import subprocess
import sys
import time
import uuid
import functools
from contextlib import contextmanager
from datetime import datetime
@ -501,30 +503,33 @@ def ceph_user():
class CrushLocation(object):
def __init__(self,
name,
identifier,
host,
rack,
row,
datacenter,
chassis,
root):
self.name = name
def __init__(self, identifier, name, osd="", host="", chassis="",
rack="", row="", pdu="", pod="", room="",
datacenter="", zone="", region="", root=""):
self.identifier = identifier
self.name = name
self.osd = osd
self.host = host
self.chassis = chassis
self.rack = rack
self.row = row
self.pdu = pdu
self.pod = pod
self.room = room
self.datacenter = datacenter
self.chassis = chassis
self.zone = zone
self.region = region
self.root = root
def __str__(self):
return "name: {} id: {} host: {} rack: {} row: {} datacenter: {} " \
"chassis :{} root: {}".format(self.name, self.identifier,
self.host, self.rack, self.row,
self.datacenter, self.chassis,
self.root)
return "name: {} id: {} osd: {} host: {} chassis: {} rack: {} " \
"row: {} pdu: {} pod: {} room: {} datacenter: {} zone: {} " \
"region: {} root: {}".format(self.name, self.identifier,
self.osd, self.host, self.chassis,
self.rack, self.row, self.pdu,
self.pod, self.room,
self.datacenter, self.zone,
self.region, self.root)
def __eq__(self, other):
return not self.name < other.name and not other.name < self.name
@ -571,10 +576,53 @@ def get_osd_weight(osd_id):
raise
def _filter_nodes_and_set_attributes(node, node_lookup_map, lookup_type):
"""Get all nodes of the desired type, with all their attributes.
These attributes can be direct or inherited from ancestors.
"""
attribute_dict = {node['type']: node['name']}
if node['type'] == lookup_type:
attribute_dict['name'] = node['name']
attribute_dict['identifier'] = node['id']
return [attribute_dict]
elif not node.get('children'):
return [attribute_dict]
else:
descendant_attribute_dicts = [
_filter_nodes_and_set_attributes(node_lookup_map[node_id],
node_lookup_map, lookup_type)
for node_id in node.get('children', [])
]
return [dict(attribute_dict, **descendant_attribute_dict)
for descendant_attribute_dict
in itertools.chain.from_iterable(descendant_attribute_dicts)]
def _flatten_roots(nodes, lookup_type='host'):
"""Get a flattened list of nodes of the desired type.
:param nodes: list of nodes defined as a dictionary of attributes and
children
:type nodes: List[Dict[int, Any]]
:param lookup_type: type of searched node
:type lookup_type: str
:returns: flattened list of nodes
:rtype: List[Dict[str, Any]]
"""
lookup_map = {node['id']: node for node in nodes}
root_attributes_dicts = [_filter_nodes_and_set_attributes(node, lookup_map,
lookup_type)
for node in nodes if node['type'] == 'root']
# get a flattened list of roots.
return list(itertools.chain.from_iterable(root_attributes_dicts))
def get_osd_tree(service):
"""Returns the current osd map in JSON.
:returns: List.
:rtype: List[CrushLocation]
:raises: ValueError if the monmap fails to parse.
Also raises CalledProcessError if our ceph command fails
"""
@ -585,35 +633,14 @@ def get_osd_tree(service):
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
crush_list = []
# Make sure children are present in the json
if not json_tree['nodes']:
return None
host_nodes = [
node for node in json_tree['nodes']
if node['type'] == 'host'
]
for host in host_nodes:
crush_list.append(
CrushLocation(
name=host.get('name'),
identifier=host['id'],
host=host.get('host'),
rack=host.get('rack'),
row=host.get('row'),
datacenter=host.get('datacenter'),
chassis=host.get('chassis'),
root=host.get('root')
)
)
return crush_list
roots = _flatten_roots(json_tree["nodes"])
return [CrushLocation(**host) for host in roots]
except ValueError as v:
log("Unable to parse ceph tree json: {}. Error: {}".format(
tree, v))
raise
except subprocess.CalledProcessError as e:
log("ceph osd tree command failed with message: {}".format(
e))
log("ceph osd tree command failed with message: {}".format(e))
raise
@ -669,7 +696,9 @@ def get_local_osd_ids():
dirs = os.listdir(osd_path)
for osd_dir in dirs:
osd_id = osd_dir.split('-')[1]
if _is_int(osd_id):
if (_is_int(osd_id) and
filesystem_mounted(os.path.join(
os.sep, osd_path, osd_dir))):
osd_ids.append(osd_id)
except OSError:
raise
@ -3271,13 +3300,14 @@ def determine_packages():
def determine_packages_to_remove():
"""Determines packages for removal
Note: if in a container, then the CHRONY_PACKAGE is removed.
:returns: list of packages to be removed
:rtype: List[str]
"""
rm_packages = REMOVE_PACKAGES.copy()
if is_container():
install_list = filter_missing_packages(CHRONY_PACKAGE)
if not install_list:
rm_packages.append(CHRONY_PACKAGE)
rm_packages.extend(filter_missing_packages([CHRONY_PACKAGE]))
return rm_packages
@ -3376,3 +3406,132 @@ def apply_osd_settings(settings):
level=ERROR)
raise OSDConfigSetError
return True
def enabled_manager_modules():
"""Return a list of enabled manager modules.
:rtype: List[str]
"""
cmd = ['ceph', 'mgr', 'module', 'ls']
try:
modules = subprocess.check_output(cmd).decode('UTF-8')
except subprocess.CalledProcessError as e:
log("Failed to list ceph modules: {}".format(e), WARNING)
return []
modules = json.loads(modules)
return modules['enabled_modules']
def is_mgr_module_enabled(module):
"""Is a given manager module enabled.
:param module:
:type module: str
:returns: Whether the named module is enabled
:rtype: bool
"""
return module in enabled_manager_modules()
is_dashboard_enabled = functools.partial(is_mgr_module_enabled, 'dashboard')
def mgr_enable_module(module):
"""Enable a Ceph Manager Module.
:param module: The module name to enable
:type module: str
:raises: subprocess.CalledProcessError
"""
if not is_mgr_module_enabled(module):
subprocess.check_call(['ceph', 'mgr', 'module', 'enable', module])
return True
return False
mgr_enable_dashboard = functools.partial(mgr_enable_module, 'dashboard')
def mgr_disable_module(module):
"""Enable a Ceph Manager Module.
:param module: The module name to enable
:type module: str
:raises: subprocess.CalledProcessError
"""
if is_mgr_module_enabled(module):
subprocess.check_call(['ceph', 'mgr', 'module', 'disable', module])
return True
return False
mgr_disable_dashboard = functools.partial(mgr_disable_module, 'dashboard')
def ceph_config_set(name, value, who):
"""Set a ceph config option
:param name: key to set
:type name: str
:param value: value corresponding to key
:type value: str
:param who: Config area the key is associated with (e.g. 'dashboard')
:type who: str
:raises: subprocess.CalledProcessError
"""
subprocess.check_call(['ceph', 'config', 'set', who, name, value])
mgr_config_set = functools.partial(ceph_config_set, who='mgr')
def ceph_config_get(name, who):
"""Retrieve the value of a ceph config option
:param name: key to lookup
:type name: str
:param who: Config area the key is associated with (e.g. 'dashboard')
:type who: str
:returns: Value associated with key
:rtype: str
:raises: subprocess.CalledProcessError
"""
return subprocess.check_output(
['ceph', 'config', 'get', who, name]).decode('UTF-8')
mgr_config_get = functools.partial(ceph_config_get, who='mgr')
def _dashboard_set_ssl_artifact(path, artifact_name, hostname=None):
"""Set SSL dashboard config option.
:param path: Path to file
:type path: str
:param artifact_name: Option name for setting the artifact
:type artifact_name: str
:param hostname: If hostname is set artifact will only be associated with
the dashboard on that host.
:type hostname: str
:raises: subprocess.CalledProcessError
"""
cmd = ['ceph', 'dashboard', artifact_name]
if hostname:
cmd.append(hostname)
cmd.extend(['-i', path])
log(cmd, level=DEBUG)
subprocess.check_call(cmd)
dashboard_set_ssl_certificate = functools.partial(
_dashboard_set_ssl_artifact,
artifact_name='set-ssl-certificate')
dashboard_set_ssl_certificate_key = functools.partial(
_dashboard_set_ssl_artifact,
artifact_name='set-ssl-certificate-key')

View File

@ -115,6 +115,23 @@ class OSDInTestCase(CharmTestCase):
self.assess_status.assert_not_called()
class OSDMountTestCase(CharmTestCase):
def setUp(self):
super(OSDMountTestCase, self).setUp(actions, [])
@mock.patch('os.path.exists')
@mock.patch('os.listdir')
@mock.patch('charms_ceph.utils.filesystem_mounted')
def test_mounted_osds(self, fs_mounted, listdir, exists):
exists.return_value = True
listdir.return_value = [
'/var/lib/ceph/osd/ceph-1', '/var/lib/ceph/osd/ceph-2']
fs_mounted.side_effect = lambda x: x == listdir.return_value[0]
osds = actions.get_local_osd_ids()
self.assertIn(listdir.return_value[0][-1], osds)
self.assertNotIn(listdir.return_value[1][-1], osds)
class MainTestCase(CharmTestCase):
def setUp(self):
super(MainTestCase, self).setUp(actions, ["function_fail"])