Add `osds` argument to the osd-in/osd-out action

The `osds` parameter has been copied from function start/stop to preserve
the same functionality. By default, osd-in/osd-out needs list of IDs as
an argument or it will not do anything (previously, it applied the change
to all the osds). It's possible to take in/out *all* with provided `osds`
parameter as `all`.

Closes-Bug: #1910150
Change-Id: I0275f015e2d0bbbb661d2b7dea59c320ba6c021c
This commit is contained in:
Robert Gildein 2021-01-08 15:29:39 +01:00
parent e22f602544
commit e928f1376c
7 changed files with 246 additions and 122 deletions

View File

@ -9,11 +9,21 @@ osd-out:
\ \
USE WITH CAUTION - Mark unit OSDs as 'out'. USE WITH CAUTION - Mark unit OSDs as 'out'.
Documentation: https://jaas.ai/ceph-osd/ Documentation: https://jaas.ai/ceph-osd/
params:
osds:
description: A comma-separated list of OSD IDs to start (or keyword 'all')
required:
- osds
osd-in: osd-in:
description: | description: |
\ \
Set the local osd units in the charm to 'in'. Set the local osd units in the charm to 'in'.
Documentation: https://jaas.ai/ceph-osd/ Documentation: https://jaas.ai/ceph-osd/
params:
osds:
description: A comma-separated list of OSD IDs to start (or keyword 'all')
required:
- osds
list-disks: list-disks:
description: | description: |
\ \

View File

@ -18,50 +18,112 @@
import os import os
import sys import sys
from subprocess import check_call
from subprocess import check_output, STDOUT
sys.path.append('lib') sys.path.append('lib')
sys.path.append('hooks') sys.path.append('hooks')
from charmhelpers.core.hookenv import ( from charmhelpers.core.hookenv import (
action_fail, function_fail,
function_set,
log,
ERROR,
) )
from charms_ceph.utils import get_local_osd_ids from charms_ceph.utils import get_local_osd_ids
from ceph_hooks import assess_status from ceph_hooks import assess_status
from utils import parse_osds_arguments, ALL
IN = "in"
OUT = "out"
def osd_out(args): def check_osd_id(osds):
"""Pause the ceph-osd units on the local machine only. """Check ceph OSDs existence.
Optionally uses the 'osd-number' from juju action param to only osd_out a :param osds: list of osds IDs
specific osd. :type osds: set
:returns: list of osds IDs present on the local machine and
@raises CalledProcessError if the ceph commands fails. list of failed osds IDs
@raises OSError if it can't get the local osd ids. :rtype: Tuple[set, set]
:raises OSError: if the unit can't get the local osd ids
""" """
for local_id in get_local_osd_ids(): all_local_osd = get_local_osd_ids()
cmd = [ if ALL in osds:
'ceph', return set(all_local_osd), set()
'--id', 'osd-upgrade',
'osd', 'out', str(local_id)] failed_osds = osds.difference(all_local_osd)
check_call(cmd) if failed_osds:
log("Ceph OSDs not present: {}".format(", ".join(failed_osds)),
level=ERROR)
return osds, failed_osds
def ceph_osd_upgrade(action, osd_id):
"""Execute ceph osd-upgrade command.
:param action: action type IN/OUT
:type action: str
:param osd_id: osd ID
:type osd_id: str
:returns: output message
:rtype: str
:raises subprocess.CalledProcessError: if the ceph commands fails
"""
cmd = ["ceph", "--id", "osd-upgrade", "osd", action, osd_id]
output = check_output(cmd, stderr=STDOUT).decode("utf-8")
log("ceph-osd {osd_id} was updated by the action osd-{action} with "
"output: {output}".format(osd_id=osd_id, action=action, output=output))
return output
def osd_in_out(action):
"""Pause/Resume the ceph OSDs unit ont the local machine only.
:param action: Either IN or OUT (see global constants)
:type action: string
:raises RuntimeError: if a supported action is not used
:raises subprocess.CalledProcessError: if the ceph commands fails
:raises OSError: if the unit can't get the local osd ids
"""
if action not in (IN, OUT):
raise RuntimeError("Unknown action \"{}\"".format(action))
osds = parse_osds_arguments()
osds, failed_osds = check_osd_id(osds)
if failed_osds:
function_fail("invalid ceph OSD device id: "
"{}".format(",".join(failed_osds)))
return
outputs = []
for osd_id in osds:
output = ceph_osd_upgrade(action, str(osd_id))
outputs.append(output)
function_set({
"message": "osd-{action} action was successfully executed for ceph "
"OSD devices [{osds}]".format(action=action,
osds=",".join(osds)),
"outputs": os.linesep.join(outputs)
})
assess_status() assess_status()
def osd_in(args): def osd_in():
"""Resume the ceph-osd units on this local machine only """Shortcut to execute 'osd_in' action"""
osd_in_out(IN)
@raises subprocess.CalledProcessError should the osd units fails to osd_in.
@raises OSError if the unit can't get the local osd ids def osd_out():
""" """Shortcut to execute 'osd_out' action"""
for local_id in get_local_osd_ids(): osd_in_out(OUT)
cmd = [
'ceph',
'--id', 'osd-upgrade',
'osd', 'in', str(local_id)]
check_call(cmd)
assess_status()
# A dictionary of all the defined actions to callables (which take # A dictionary of all the defined actions to callables (which take
@ -75,13 +137,13 @@ def main(args):
action = ACTIONS[action_name] action = ACTIONS[action_name]
except KeyError: except KeyError:
s = "Action {} undefined".format(action_name) s = "Action {} undefined".format(action_name)
action_fail(s) function_fail(s)
return s return s
else: else:
try: try:
action(args) action()
except Exception as e: except Exception as e:
action_fail("Action {} failed: {}".format(action_name, str(e))) function_fail("Action {} failed: {}".format(action_name, str(e)))
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -26,17 +26,14 @@ sys.path.append('hooks')
from charmhelpers.core.hookenv import ( from charmhelpers.core.hookenv import (
function_fail, function_fail,
function_get,
log, log,
WARNING,
) )
from ceph_hooks import assess_status from ceph_hooks import assess_status
from utils import parse_osds_arguments, ALL
START = 'start' START = 'start'
STOP = 'stop' STOP = 'stop'
ALL = 'all'
def systemctl_execute(action, services): def systemctl_execute(action, services):
""" """
@ -110,32 +107,6 @@ def check_service_is_present(service_list):
'unit: {}'.format(missing_services)) 'unit: {}'.format(missing_services))
def parse_arguments():
"""
Fetch action arguments and parse them from comma separated list to
the set of OSD IDs
:return: Set of OSD IDs
:rtype: set(str)
"""
raw_arg = function_get('osds')
if raw_arg is None:
raise RuntimeError('Action argument "osds" is missing')
args = set()
# convert OSD IDs from user's input into the set
for osd_id in str(raw_arg).split(','):
args.add(osd_id.strip())
if ALL in args and len(args) != 1:
args = {ALL}
log('keyword "all" was found in "osds" argument. Dropping other '
'explicitly defined OSD IDs', WARNING)
return args
def execute_action(action): def execute_action(action):
"""Core implementation of the 'start'/'stop' actions """Core implementation of the 'start'/'stop' actions
@ -145,7 +116,7 @@ def execute_action(action):
if action not in (START, STOP): if action not in (START, STOP):
raise RuntimeError('Unknown action "{}"'.format(action)) raise RuntimeError('Unknown action "{}"'.format(action))
osds = parse_arguments() osds = parse_osds_arguments()
services = osd_ids_to_service_names(osds) services = osd_ids_to_service_names(osds)
check_service_is_present(services) check_service_is_present(services)

View File

@ -32,6 +32,7 @@ from charmhelpers.core.hookenv import (
status_set, status_set,
storage_get, storage_get,
storage_list, storage_list,
function_get,
) )
from charmhelpers.core import unitdata from charmhelpers.core import unitdata
from charmhelpers.fetch import ( from charmhelpers.fetch import (
@ -49,7 +50,7 @@ from charmhelpers.contrib.network.ip import (
get_ipv6_addr get_ipv6_addr
) )
ALL = "all" # string value representing all "OSD devices"
TEMPLATES_DIR = 'templates' TEMPLATES_DIR = 'templates'
try: try:
@ -310,3 +311,28 @@ def is_sata30orless(device):
if re.match(r"SATA Version is: *SATA (1\.|2\.|3\.0)", str(line)): if re.match(r"SATA Version is: *SATA (1\.|2\.|3\.0)", str(line)):
return True return True
return False return False
def parse_osds_arguments():
"""Parse OSD IDs from action `osds` argument.
Fetch action arguments and parse them from comma separated list to
the set of OSD IDs.
:return: Set of OSD IDs
:rtype: set(str)
"""
raw_arg = function_get("osds")
if raw_arg is None:
raise RuntimeError("Action argument \"osds\" is missing")
# convert OSD IDs from user's input into the set
args = {osd_id.strip() for osd_id in str(raw_arg).split(',')}
if ALL in args and len(args) != 1:
args = {ALL}
log("keyword \"all\" was found in \"osds\" argument. Dropping other "
"explicitly defined OSD IDs", WARNING)
return args

View File

@ -11,8 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import mock import mock
import subprocess
import sys import sys
@ -23,46 +23,106 @@ sys.path.append('hooks')
import osd_in_out as actions import osd_in_out as actions
def mock_check_output(cmd, **kwargs):
action, osd_id = cmd[-2:] # get the last two arguments from cmd
return "marked {} osd.{}. \n".format(action, osd_id).encode("utf-8")
class OSDOutTestCase(CharmTestCase): class OSDOutTestCase(CharmTestCase):
def setUp(self): def setUp(self):
super(OSDOutTestCase, self).setUp( super(OSDOutTestCase, self).setUp(
actions, ["check_call", actions, ["check_output",
"get_local_osd_ids", "get_local_osd_ids",
"assess_status"]) "assess_status",
"parse_osds_arguments",
"function_fail",
"function_set"])
self.check_output.side_effect = mock_check_output
def test_osd_out(self): def test_osd_out(self):
self.get_local_osd_ids.return_value = [5] self.get_local_osd_ids.return_value = ["5", "6", "7"]
actions.osd_out([]) self.parse_osds_arguments.return_value = {"5"}
cmd = ['ceph', '--id', actions.osd_out()
'osd-upgrade', 'osd', 'out', '5'] self.check_output.assert_called_once_with(
self.check_call.assert_called_once_with(cmd) ["ceph", "--id", "osd-upgrade", "osd", "out", "5"],
stderr=subprocess.STDOUT
)
self.assess_status.assert_called_once_with() self.assess_status.assert_called_once_with()
def test_osd_out_all(self):
self.get_local_osd_ids.return_value = ["5", "6", "7"]
self.parse_osds_arguments.return_value = {"all"}
actions.osd_out()
self.check_output.assert_has_calls(
[mock.call(
["ceph", "--id", "osd-upgrade", "osd", "out", i],
stderr=subprocess.STDOUT
) for i in set(["5", "6", "7"])])
self.assess_status.assert_called_once_with()
def test_osd_out_not_local(self):
self.get_local_osd_ids.return_value = ["5"]
self.parse_osds_arguments.return_value = {"6", "7", "8"}
actions.osd_out()
self.check_output.assert_not_called()
self.function_fail.assert_called_once_with(
"invalid ceph OSD device id: "
"{}".format(",".join(set(["6", "7", "8"]))))
self.assess_status.assert_not_called()
class OSDInTestCase(CharmTestCase): class OSDInTestCase(CharmTestCase):
def setUp(self): def setUp(self):
super(OSDInTestCase, self).setUp( super(OSDInTestCase, self).setUp(
actions, ["check_call", actions, ["check_output",
"get_local_osd_ids", "get_local_osd_ids",
"assess_status"]) "assess_status",
"parse_osds_arguments",
"function_fail",
"function_set"])
self.check_output.side_effect = mock_check_output
def test_osd_in(self): def test_osd_in(self):
self.get_local_osd_ids.return_value = [5] self.get_local_osd_ids.return_value = ["5", "6", "7"]
actions.osd_in([]) self.parse_osds_arguments.return_value = {"5"}
cmd = ['ceph', '--id', actions.osd_in()
'osd-upgrade', 'osd', 'in', '5'] self.check_output.assert_called_once_with(
self.check_call.assert_called_once_with(cmd) ["ceph", "--id", "osd-upgrade", "osd", "in", "5"],
stderr=subprocess.STDOUT
)
self.assess_status.assert_called_once_with() self.assess_status.assert_called_once_with()
def test_osd_in_all(self):
self.get_local_osd_ids.return_value = ["5", "6", "7"]
self.parse_osds_arguments.return_value = {"all"}
actions.osd_in()
self.check_output.assert_has_calls(
[mock.call(
["ceph", "--id", "osd-upgrade", "osd", "in", i],
stderr=subprocess.STDOUT
) for i in set(["5", "6", "7"])])
self.assess_status.assert_called_once_with()
def test_osd_in_not_local(self):
self.get_local_osd_ids.return_value = ["5"]
self.parse_osds_arguments.return_value = {"6"}
actions.osd_in()
self.check_output.assert_not_called()
self.function_fail.assert_called_once_with(
"invalid ceph OSD device id: 6")
self.assess_status.assert_not_called()
class MainTestCase(CharmTestCase): class MainTestCase(CharmTestCase):
def setUp(self): def setUp(self):
super(MainTestCase, self).setUp(actions, ["action_fail"]) super(MainTestCase, self).setUp(actions, ["function_fail"])
def test_invokes_action(self): def test_invokes_action(self):
dummy_calls = [] dummy_calls = []
def dummy_action(args): def dummy_action():
dummy_calls.append(True) dummy_calls.append(True)
with mock.patch.dict(actions.ACTIONS, {"foo": dummy_action}): with mock.patch.dict(actions.ACTIONS, {"foo": dummy_action}):
@ -75,12 +135,12 @@ class MainTestCase(CharmTestCase):
self.assertEqual("Action foo undefined", exit_string) self.assertEqual("Action foo undefined", exit_string)
def test_failing_action(self): def test_failing_action(self):
"""Actions which traceback trigger action_fail() calls.""" """Actions which traceback trigger function_fail() calls."""
dummy_calls = [] dummy_calls = []
self.action_fail.side_effect = dummy_calls.append self.function_fail.side_effect = dummy_calls.append
def dummy_action(args): def dummy_action():
raise ValueError("uh oh") raise ValueError("uh oh")
with mock.patch.dict(actions.ACTIONS, {"foo": dummy_action}): with mock.patch.dict(actions.ACTIONS, {"foo": dummy_action}):

View File

@ -14,9 +14,9 @@
import mock import mock
from contextlib import contextmanager from contextlib import contextmanager
from copy import copy
from actions import service from actions import service
from hooks import utils
from test_utils import CharmTestCase from test_utils import CharmTestCase
@ -40,33 +40,27 @@ class ServiceActionTests(CharmTestCase):
def __init__(self, methodName='runTest'): def __init__(self, methodName='runTest'):
super(ServiceActionTests, self).__init__(methodName) super(ServiceActionTests, self).__init__(methodName)
self._func_args = {'osds': None}
def setUp(self, obj=None, patches=None): def setUp(self, obj=None, patches=None):
super(ServiceActionTests, self).setUp( super(ServiceActionTests, self).setUp(
service, service,
['subprocess', 'function_fail', 'function_get', ['subprocess', 'function_fail',
'log', 'assess_status', 'shutil'] 'log', 'assess_status', 'shutil']
) )
present_services = '\n'.join(self._PRESENT_SERVICES).encode('utf-8') present_services = '\n'.join(self._PRESENT_SERVICES).encode('utf-8')
self.shutil.which.return_value = '/bin/systemctl' self.shutil.which.return_value = '/bin/systemctl'
self.subprocess.check_call.return_value = None self.subprocess.check_call.return_value = None
self.function_get.side_effect = self.function_get_side_effect
self.subprocess.run.return_value = CompletedProcessMock( self.subprocess.run.return_value = CompletedProcessMock(
stdout=present_services) stdout=present_services)
def function_get_side_effect(self, arg):
return self._func_args.get(arg)
@contextmanager @contextmanager
def func_call_arguments(self, osds=None): def func_call_arguments(self, osds=None):
default = copy(self._func_args) with mock.patch("utils.function_get") as mock_function_get:
try:
self._func_args = {'osds': osds} self._func_args = {'osds': osds}
mock_function_get.side_effect = \
lambda arg: self._func_args.get(arg)
yield yield
finally:
self._func_args = copy(default)
def assert_action_start_fail(self, msg): def assert_action_start_fail(self, msg):
self.assert_function_fail(service.START, msg) self.assert_function_fail(service.START, msg)
@ -88,7 +82,7 @@ class ServiceActionTests(CharmTestCase):
def test_systemctl_execute_all(self): def test_systemctl_execute_all(self):
action = 'start' action = 'start'
services = service.ALL services = utils.ALL
expected_call = mock.call(['systemctl', action, self._TARGET_ALL], expected_call = mock.call(['systemctl', action, self._TARGET_ALL],
timeout=self._CHECK_CALL_TIMEOUT) timeout=self._CHECK_CALL_TIMEOUT)
@ -110,17 +104,17 @@ class ServiceActionTests(CharmTestCase):
self.subprocess.check_call.assert_has_calls([expected_call]) self.subprocess.check_call.assert_has_calls([expected_call])
def test_id_translation(self): def test_id_translation(self):
service_ids = {1, service.ALL, 2} service_ids = {1, utils.ALL, 2}
expected_names = [ expected_names = [
'ceph-osd@1.service', 'ceph-osd@1.service',
service.ALL, utils.ALL,
'ceph-osd@2.service', 'ceph-osd@2.service',
] ]
service_names = service.osd_ids_to_service_names(service_ids) service_names = service.osd_ids_to_service_names(service_ids)
self.assertEqual(sorted(service_names), sorted(expected_names)) self.assertEqual(sorted(service_names), sorted(expected_names))
def test_skip_service_presence_check(self): def test_skip_service_presence_check(self):
service_list = [service.ALL] service_list = [utils.ALL]
service.check_service_is_present(service_list) service.check_service_is_present(service_list)
@ -145,28 +139,6 @@ class ServiceActionTests(CharmTestCase):
stdout=self.subprocess.PIPE, stdout=self.subprocess.PIPE,
timeout=30) timeout=30)
def test_raise_on_missing_arguments(self):
err_msg = 'Action argument "osds" is missing'
with self.func_call_arguments(osds=None):
with self.assertRaises(RuntimeError, msg=err_msg):
service.parse_arguments()
def test_parse_service_ids(self):
raw = '1,2,3'
expected_ids = {'1', '2', '3'}
with self.func_call_arguments(osds=raw):
parsed = service.parse_arguments()
self.assertEqual(parsed, expected_ids)
def test_parse_service_ids_with_all(self):
raw = '1,2,all'
expected_id = {service.ALL}
with self.func_call_arguments(osds=raw):
parsed = service.parse_arguments()
self.assertEqual(parsed, expected_id)
def test_fail_execute_unknown_action(self): def test_fail_execute_unknown_action(self):
action = 'foo' action = 'foo'
err_msg = 'Unknown action "{}"'.format(action) err_msg = 'Unknown action "{}"'.format(action)
@ -175,14 +147,14 @@ class ServiceActionTests(CharmTestCase):
@mock.patch.object(service, 'systemctl_execute') @mock.patch.object(service, 'systemctl_execute')
def test_execute_action(self, _): def test_execute_action(self, _):
with self.func_call_arguments(osds=service.ALL): with self.func_call_arguments(osds=utils.ALL):
service.execute_action(service.START) service.execute_action(service.START)
service.systemctl_execute.assert_called_with(service.START, service.systemctl_execute.assert_called_with(service.START,
[service.ALL]) [utils.ALL])
service.execute_action(service.STOP) service.execute_action(service.STOP)
service.systemctl_execute.assert_called_with(service.STOP, service.systemctl_execute.assert_called_with(service.STOP,
[service.ALL]) [utils.ALL])
@mock.patch.object(service, 'execute_action') @mock.patch.object(service, 'execute_action')
def test_action_stop(self, execute_action): def test_action_stop(self, execute_action):

View File

@ -115,3 +115,26 @@ class CephUtilsTestCase(unittest.TestCase):
ret = utils.is_sata30orless('/dev/sda') ret = utils.is_sata30orless('/dev/sda')
mock_subprocess_check_output.assert_called() mock_subprocess_check_output.assert_called()
self.assertEqual(ret, True) self.assertEqual(ret, True)
@patch.object(utils, "function_get")
def test_raise_on_missing_arguments(self, mock_function_get):
mock_function_get.return_value = None
err_msg = "Action argument \"osds\" is missing"
with self.assertRaises(RuntimeError, msg=err_msg):
utils.parse_osds_arguments()
@patch.object(utils, "function_get")
def test_parse_service_ids(self, mock_function_get):
mock_function_get.return_value = "1,2,3"
expected_ids = {"1", "2", "3"}
parsed = utils.parse_osds_arguments()
self.assertEqual(parsed, expected_ids)
@patch.object(utils, "function_get")
def test_parse_service_ids_with_all(self, mock_function_get):
mock_function_get.return_value = "1,2,all"
expected_id = {utils.ALL}
parsed = utils.parse_osds_arguments()
self.assertEqual(parsed, expected_id)