diff --git a/actions.yaml b/actions.yaml index be586451..65ff76ab 100644 --- a/actions.yaml +++ b/actions.yaml @@ -1,3 +1,29 @@ +pause: + description: | + CAUTION - Set the local osd units in the charm to 'out' but does not stop + the osds. Unless the osd cluster is set to noout (see below), this removes + them from the ceph cluster and forces ceph to migrate the PGs to other OSDs + in the cluster. See the following. + + http://docs.ceph.com/docs/master/rados/operations/add-or-rm-osds/#removing-the-osd + "Do not let your cluster reach its full ratio when removing an OSD. + Removing OSDs could cause the cluster to reach or exceed its full ratio." + Also note that for small clusters you may encounter the corner case where + some PGs remain stuck in the active+remapped state. Refer to the above link + on how to resolve this. + + pause-health (on a ceph-mon) unit can be used before pausing a ceph-osd + unit to stop the cluster rebalancing the data off this ceph-osd unit. + pause-health sets 'noout' on the cluster such that it will not try to + rebalance the data accross the remaining units. + + It is up to the user of the charm to determine whether pause-health should + be used as it depends on whether the osd is being paused for maintenance or + to remove it from the cluster completely. +resume: + description: | + Set the local osd units in the charm to 'in'. Note that the pause option + does NOT stop the osd processes. replace-osd: description: Replace a failed osd with a fresh disk params: diff --git a/actions/pause b/actions/pause new file mode 120000 index 00000000..bd4c0e00 --- /dev/null +++ b/actions/pause @@ -0,0 +1 @@ +pause_resume.py \ No newline at end of file diff --git a/actions/pause_resume.py b/actions/pause_resume.py new file mode 100755 index 00000000..68149f34 --- /dev/null +++ b/actions/pause_resume.py @@ -0,0 +1,74 @@ +#!/usr/bin/python +# pause/resume actions file. + +import os +import sys +from subprocess import check_call + +sys.path.append('hooks') + +from charmhelpers.core.hookenv import ( + action_fail, +) + +from ceph import get_local_osd_ids +from ceph_hooks import assess_status + +from utils import ( + set_unit_paused, + clear_unit_paused, +) + + +def pause(args): + """Pause the ceph-osd units on the local machine only. + + Optionally uses the 'osd-number' from juju action param to only pause a + specific osd. If all the osds are not stopped then the paused status is + not set. + + @raises CalledProcessError if the ceph commands fails. + @raises OSError if it can't get the local osd ids. + """ + for local_id in get_local_osd_ids(): + cmd = ['ceph', 'osd', 'out', str(local_id)] + check_call(cmd) + set_unit_paused() + assess_status() + + +def resume(args): + """Resume the ceph-osd units on this local machine only + + @raises subprocess.CalledProcessError should the osd units fails to resume. + @raises OSError if the unit can't get the local osd ids + """ + for local_id in get_local_osd_ids(): + cmd = ['ceph', 'osd', 'in', str(local_id)] + check_call(cmd) + clear_unit_paused() + assess_status() + + +# A dictionary of all the defined actions to callables (which take +# parsed arguments). +ACTIONS = {"pause": pause, "resume": resume} + + +def main(args): + action_name = os.path.basename(args[0]) + try: + action = ACTIONS[action_name] + except KeyError: + s = "Action {} undefined".format(action_name) + action_fail(s) + return s + else: + try: + action(args) + except Exception as e: + action_fail("Action {} failed: {}".format(action_name, str(e))) + + +if __name__ == "__main__": + sys.exit(main(sys.argv)) diff --git a/actions/resume b/actions/resume new file mode 120000 index 00000000..bd4c0e00 --- /dev/null +++ b/actions/resume @@ -0,0 +1 @@ +pause_resume.py \ No newline at end of file diff --git a/hooks/ceph.py b/hooks/ceph.py index 0b23979b..d51ea400 100644 --- a/hooks/ceph.py +++ b/hooks/ceph.py @@ -157,12 +157,22 @@ def get_local_osd_ids(): dirs = os.listdir(osd_path) for osd_dir in dirs: osd_id = osd_dir.split('-')[1] - osd_ids.append(osd_id) + if _is_int(osd_id): + osd_ids.append(osd_id) except OSError: raise return osd_ids +def _is_int(v): + """Return True if the object v can be turned into an integer.""" + try: + int(v) + return True + except ValueError: + return False + + def get_version(): '''Derive Ceph release from an installed package.''' import apt_pkg as apt diff --git a/hooks/ceph_hooks.py b/hooks/ceph_hooks.py index e508e0e6..5ba176cf 100755 --- a/hooks/ceph_hooks.py +++ b/hooks/ceph_hooks.py @@ -36,7 +36,9 @@ from charmhelpers.core.host import ( umount, mkdir, cmp_pkgrevno, - service_stop, service_start) + service_stop, + service_start +) from charmhelpers.fetch import ( add_source, apt_install, @@ -50,7 +52,9 @@ from utils import ( get_host_ip, get_networks, assert_charm_supports_ipv6, - render_template) + render_template, + is_unit_paused_set, +) from charmhelpers.contrib.openstack.alternatives import install_alternative from charmhelpers.contrib.network.ip import ( @@ -506,7 +510,12 @@ def update_nrpe_config(): def assess_status(): - '''Assess status of current unit''' + """Assess status of current unit""" + # check to see if the unit is paused. + if is_unit_paused_set(): + status_set('maintenance', + "Paused. Use 'resume' action to resume normal service.") + return # Check for mon relation if len(relation_ids('mon')) < 1: status_set('blocked', 'Missing relation: monitor') diff --git a/hooks/utils.py b/hooks/utils.py index 0071ecbd..b6fa3744 100644 --- a/hooks/utils.py +++ b/hooks/utils.py @@ -12,8 +12,9 @@ import re from charmhelpers.core.hookenv import ( unit_get, cached, - config + config, ) +from charmhelpers.core import unitdata from charmhelpers.fetch import ( apt_install, filter_installed_packages @@ -106,3 +107,41 @@ def assert_charm_supports_ipv6(): if lsb_release()['DISTRIB_CODENAME'].lower() < "trusty": raise Exception("IPv6 is not supported in the charms for Ubuntu " "versions less than Trusty 14.04") + + +# copied charmhelpers.contrib.openstack.utils so that the charm does need the +# entire set of dependencies that that module actually also has to bring in +# from charmhelpers. +def set_unit_paused(): + """Set the unit to a paused state in the local kv() store. + This does NOT actually pause the unit + """ + with unitdata.HookData()() as t: + kv = t[0] + kv.set('unit-paused', True) + + +def clear_unit_paused(): + """Clear the unit from a paused state in the local kv() store + This does NOT actually restart any services - it only clears the + local state. + """ + with unitdata.HookData()() as t: + kv = t[0] + kv.set('unit-paused', False) + + +def is_unit_paused_set(): + """Return the state of the kv().get('unit-paused'). + This does NOT verify that the unit really is paused. + + To help with units that don't have HookData() (testing) + if it excepts, return False + """ + try: + with unitdata.HookData()() as t: + kv = t[0] + # transform something truth-y into a Boolean. + return not(not(kv.get('unit-paused'))) + except: + return False diff --git a/tests/basic_deployment.py b/tests/basic_deployment.py index 87e236cb..9e522391 100644 --- a/tests/basic_deployment.py +++ b/tests/basic_deployment.py @@ -634,3 +634,19 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment): '(%s < on %s) on %s' % (file_mtime, mtime, unit_name)) amulet.raise_status('Folder mtime is older than provided mtime') + + def test_910_pause_and_resume(self): + """The services can be paused and resumed. """ + u.log.debug('Checking pause and resume actions...') + sentry_unit = self.ceph_osd_sentry + + assert u.status_get(sentry_unit)[0] == "active" + + action_id = u.run_action(sentry_unit, "pause") + assert u.wait_on_action(action_id), "Pause action failed." + assert u.status_get(sentry_unit)[0] == "maintenance" + + action_id = u.run_action(sentry_unit, "resume") + assert u.wait_on_action(action_id), "Resume action failed." + assert u.status_get(sentry_unit)[0] == "active" + u.log.debug('OK') diff --git a/unit_tests/test_actions_pause_resume.py b/unit_tests/test_actions_pause_resume.py new file mode 100644 index 00000000..43c3aafc --- /dev/null +++ b/unit_tests/test_actions_pause_resume.py @@ -0,0 +1,79 @@ +import mock + +import sys + +from test_utils import CharmTestCase + +sys.path.append('hooks') + +import pause_resume as actions + + +class PauseTestCase(CharmTestCase): + + def setUp(self): + super(PauseTestCase, self).setUp( + actions, ["check_call", + "get_local_osd_ids", + "set_unit_paused", + "assess_status"]) + + def test_pauses_services(self): + self.get_local_osd_ids.return_value = [5] + actions.pause([]) + cmd = ['ceph', 'osd', 'out', '5'] + self.check_call.assert_called_once_with(cmd) + self.set_unit_paused.assert_called_once_with() + self.assess_status.assert_called_once_with() + + +class ResumeTestCase(CharmTestCase): + + def setUp(self): + super(ResumeTestCase, self).setUp( + actions, ["check_call", + "get_local_osd_ids", + "clear_unit_paused", + "assess_status"]) + + def test_pauses_services(self): + self.get_local_osd_ids.return_value = [5] + actions.resume([]) + cmd = ['ceph', 'osd', 'in', '5'] + self.check_call.assert_called_once_with(cmd) + self.clear_unit_paused.assert_called_once_with() + self.assess_status.assert_called_once_with() + + +class MainTestCase(CharmTestCase): + + def setUp(self): + super(MainTestCase, self).setUp(actions, ["action_fail"]) + + def test_invokes_action(self): + dummy_calls = [] + + def dummy_action(args): + dummy_calls.append(True) + + with mock.patch.dict(actions.ACTIONS, {"foo": dummy_action}): + actions.main(["foo"]) + self.assertEqual(dummy_calls, [True]) + + def test_unknown_action(self): + """Unknown actions aren't a traceback.""" + exit_string = actions.main(["foo"]) + self.assertEqual("Action foo undefined", exit_string) + + def test_failing_action(self): + """Actions which traceback trigger action_fail() calls.""" + dummy_calls = [] + + self.action_fail.side_effect = dummy_calls.append + + def dummy_action(args): + raise ValueError("uh oh") + + with mock.patch.dict(actions.ACTIONS, {"foo": dummy_action}): + actions.main(["foo"]) + self.assertEqual(dummy_calls, ["Action foo failed: uh oh"])