Pause/resume for ceph-osd charm
This changeset provides pause and resume actions to the ceph charm. The pause action issues a 'ceph osd out <local_id>' for each of the ceph osd ids that are on the unit. The action does not stop the ceph osd processes. Note that if the pause-health action is NOT used on the ceph-mon charm then the cluster will start trying to rebalance the PGs accross the remaining OSDs. If the cluster might reach its 'full ratio' then this will be a breaking action. The charm does NOT check for this eventuality. The resume action issues a 'ceph osd in <local_id>' for each of the local ceph osd process on the unit. The charm 'remembers' that a pause action was issued, and if successful, it shows a 'maintenance' workload status as a reminder. Change-Id: I9f53c9c6c4bb737670ffcd542acec0b320cc7f6a
This commit is contained in:
parent
72b8ecade3
commit
bbfdeb84f0
26
actions.yaml
26
actions.yaml
|
@ -1,3 +1,29 @@
|
|||
pause:
|
||||
description: |
|
||||
CAUTION - Set the local osd units in the charm to 'out' but does not stop
|
||||
the osds. Unless the osd cluster is set to noout (see below), this removes
|
||||
them from the ceph cluster and forces ceph to migrate the PGs to other OSDs
|
||||
in the cluster. See the following.
|
||||
|
||||
http://docs.ceph.com/docs/master/rados/operations/add-or-rm-osds/#removing-the-osd
|
||||
"Do not let your cluster reach its full ratio when removing an OSD.
|
||||
Removing OSDs could cause the cluster to reach or exceed its full ratio."
|
||||
Also note that for small clusters you may encounter the corner case where
|
||||
some PGs remain stuck in the active+remapped state. Refer to the above link
|
||||
on how to resolve this.
|
||||
|
||||
pause-health (on a ceph-mon) unit can be used before pausing a ceph-osd
|
||||
unit to stop the cluster rebalancing the data off this ceph-osd unit.
|
||||
pause-health sets 'noout' on the cluster such that it will not try to
|
||||
rebalance the data accross the remaining units.
|
||||
|
||||
It is up to the user of the charm to determine whether pause-health should
|
||||
be used as it depends on whether the osd is being paused for maintenance or
|
||||
to remove it from the cluster completely.
|
||||
resume:
|
||||
description: |
|
||||
Set the local osd units in the charm to 'in'. Note that the pause option
|
||||
does NOT stop the osd processes.
|
||||
replace-osd:
|
||||
description: Replace a failed osd with a fresh disk
|
||||
params:
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
pause_resume.py
|
|
@ -0,0 +1,74 @@
|
|||
#!/usr/bin/python
|
||||
# pause/resume actions file.
|
||||
|
||||
import os
|
||||
import sys
|
||||
from subprocess import check_call
|
||||
|
||||
sys.path.append('hooks')
|
||||
|
||||
from charmhelpers.core.hookenv import (
|
||||
action_fail,
|
||||
)
|
||||
|
||||
from ceph import get_local_osd_ids
|
||||
from ceph_hooks import assess_status
|
||||
|
||||
from utils import (
|
||||
set_unit_paused,
|
||||
clear_unit_paused,
|
||||
)
|
||||
|
||||
|
||||
def pause(args):
|
||||
"""Pause the ceph-osd units on the local machine only.
|
||||
|
||||
Optionally uses the 'osd-number' from juju action param to only pause a
|
||||
specific osd. If all the osds are not stopped then the paused status is
|
||||
not set.
|
||||
|
||||
@raises CalledProcessError if the ceph commands fails.
|
||||
@raises OSError if it can't get the local osd ids.
|
||||
"""
|
||||
for local_id in get_local_osd_ids():
|
||||
cmd = ['ceph', 'osd', 'out', str(local_id)]
|
||||
check_call(cmd)
|
||||
set_unit_paused()
|
||||
assess_status()
|
||||
|
||||
|
||||
def resume(args):
|
||||
"""Resume the ceph-osd units on this local machine only
|
||||
|
||||
@raises subprocess.CalledProcessError should the osd units fails to resume.
|
||||
@raises OSError if the unit can't get the local osd ids
|
||||
"""
|
||||
for local_id in get_local_osd_ids():
|
||||
cmd = ['ceph', 'osd', 'in', str(local_id)]
|
||||
check_call(cmd)
|
||||
clear_unit_paused()
|
||||
assess_status()
|
||||
|
||||
|
||||
# A dictionary of all the defined actions to callables (which take
|
||||
# parsed arguments).
|
||||
ACTIONS = {"pause": pause, "resume": resume}
|
||||
|
||||
|
||||
def main(args):
|
||||
action_name = os.path.basename(args[0])
|
||||
try:
|
||||
action = ACTIONS[action_name]
|
||||
except KeyError:
|
||||
s = "Action {} undefined".format(action_name)
|
||||
action_fail(s)
|
||||
return s
|
||||
else:
|
||||
try:
|
||||
action(args)
|
||||
except Exception as e:
|
||||
action_fail("Action {} failed: {}".format(action_name, str(e)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main(sys.argv))
|
|
@ -0,0 +1 @@
|
|||
pause_resume.py
|
|
@ -157,12 +157,22 @@ def get_local_osd_ids():
|
|||
dirs = os.listdir(osd_path)
|
||||
for osd_dir in dirs:
|
||||
osd_id = osd_dir.split('-')[1]
|
||||
osd_ids.append(osd_id)
|
||||
if _is_int(osd_id):
|
||||
osd_ids.append(osd_id)
|
||||
except OSError:
|
||||
raise
|
||||
return osd_ids
|
||||
|
||||
|
||||
def _is_int(v):
|
||||
"""Return True if the object v can be turned into an integer."""
|
||||
try:
|
||||
int(v)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def get_version():
|
||||
'''Derive Ceph release from an installed package.'''
|
||||
import apt_pkg as apt
|
||||
|
|
|
@ -36,7 +36,9 @@ from charmhelpers.core.host import (
|
|||
umount,
|
||||
mkdir,
|
||||
cmp_pkgrevno,
|
||||
service_stop, service_start)
|
||||
service_stop,
|
||||
service_start
|
||||
)
|
||||
from charmhelpers.fetch import (
|
||||
add_source,
|
||||
apt_install,
|
||||
|
@ -50,7 +52,9 @@ from utils import (
|
|||
get_host_ip,
|
||||
get_networks,
|
||||
assert_charm_supports_ipv6,
|
||||
render_template)
|
||||
render_template,
|
||||
is_unit_paused_set,
|
||||
)
|
||||
|
||||
from charmhelpers.contrib.openstack.alternatives import install_alternative
|
||||
from charmhelpers.contrib.network.ip import (
|
||||
|
@ -506,7 +510,12 @@ def update_nrpe_config():
|
|||
|
||||
|
||||
def assess_status():
|
||||
'''Assess status of current unit'''
|
||||
"""Assess status of current unit"""
|
||||
# check to see if the unit is paused.
|
||||
if is_unit_paused_set():
|
||||
status_set('maintenance',
|
||||
"Paused. Use 'resume' action to resume normal service.")
|
||||
return
|
||||
# Check for mon relation
|
||||
if len(relation_ids('mon')) < 1:
|
||||
status_set('blocked', 'Missing relation: monitor')
|
||||
|
|
|
@ -12,8 +12,9 @@ import re
|
|||
from charmhelpers.core.hookenv import (
|
||||
unit_get,
|
||||
cached,
|
||||
config
|
||||
config,
|
||||
)
|
||||
from charmhelpers.core import unitdata
|
||||
from charmhelpers.fetch import (
|
||||
apt_install,
|
||||
filter_installed_packages
|
||||
|
@ -106,3 +107,41 @@ def assert_charm_supports_ipv6():
|
|||
if lsb_release()['DISTRIB_CODENAME'].lower() < "trusty":
|
||||
raise Exception("IPv6 is not supported in the charms for Ubuntu "
|
||||
"versions less than Trusty 14.04")
|
||||
|
||||
|
||||
# copied charmhelpers.contrib.openstack.utils so that the charm does need the
|
||||
# entire set of dependencies that that module actually also has to bring in
|
||||
# from charmhelpers.
|
||||
def set_unit_paused():
|
||||
"""Set the unit to a paused state in the local kv() store.
|
||||
This does NOT actually pause the unit
|
||||
"""
|
||||
with unitdata.HookData()() as t:
|
||||
kv = t[0]
|
||||
kv.set('unit-paused', True)
|
||||
|
||||
|
||||
def clear_unit_paused():
|
||||
"""Clear the unit from a paused state in the local kv() store
|
||||
This does NOT actually restart any services - it only clears the
|
||||
local state.
|
||||
"""
|
||||
with unitdata.HookData()() as t:
|
||||
kv = t[0]
|
||||
kv.set('unit-paused', False)
|
||||
|
||||
|
||||
def is_unit_paused_set():
|
||||
"""Return the state of the kv().get('unit-paused').
|
||||
This does NOT verify that the unit really is paused.
|
||||
|
||||
To help with units that don't have HookData() (testing)
|
||||
if it excepts, return False
|
||||
"""
|
||||
try:
|
||||
with unitdata.HookData()() as t:
|
||||
kv = t[0]
|
||||
# transform something truth-y into a Boolean.
|
||||
return not(not(kv.get('unit-paused')))
|
||||
except:
|
||||
return False
|
||||
|
|
|
@ -634,3 +634,19 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
|
|||
'(%s < on %s) on %s' % (file_mtime,
|
||||
mtime, unit_name))
|
||||
amulet.raise_status('Folder mtime is older than provided mtime')
|
||||
|
||||
def test_910_pause_and_resume(self):
|
||||
"""The services can be paused and resumed. """
|
||||
u.log.debug('Checking pause and resume actions...')
|
||||
sentry_unit = self.ceph_osd_sentry
|
||||
|
||||
assert u.status_get(sentry_unit)[0] == "active"
|
||||
|
||||
action_id = u.run_action(sentry_unit, "pause")
|
||||
assert u.wait_on_action(action_id), "Pause action failed."
|
||||
assert u.status_get(sentry_unit)[0] == "maintenance"
|
||||
|
||||
action_id = u.run_action(sentry_unit, "resume")
|
||||
assert u.wait_on_action(action_id), "Resume action failed."
|
||||
assert u.status_get(sentry_unit)[0] == "active"
|
||||
u.log.debug('OK')
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
import mock
|
||||
|
||||
import sys
|
||||
|
||||
from test_utils import CharmTestCase
|
||||
|
||||
sys.path.append('hooks')
|
||||
|
||||
import pause_resume as actions
|
||||
|
||||
|
||||
class PauseTestCase(CharmTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(PauseTestCase, self).setUp(
|
||||
actions, ["check_call",
|
||||
"get_local_osd_ids",
|
||||
"set_unit_paused",
|
||||
"assess_status"])
|
||||
|
||||
def test_pauses_services(self):
|
||||
self.get_local_osd_ids.return_value = [5]
|
||||
actions.pause([])
|
||||
cmd = ['ceph', 'osd', 'out', '5']
|
||||
self.check_call.assert_called_once_with(cmd)
|
||||
self.set_unit_paused.assert_called_once_with()
|
||||
self.assess_status.assert_called_once_with()
|
||||
|
||||
|
||||
class ResumeTestCase(CharmTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ResumeTestCase, self).setUp(
|
||||
actions, ["check_call",
|
||||
"get_local_osd_ids",
|
||||
"clear_unit_paused",
|
||||
"assess_status"])
|
||||
|
||||
def test_pauses_services(self):
|
||||
self.get_local_osd_ids.return_value = [5]
|
||||
actions.resume([])
|
||||
cmd = ['ceph', 'osd', 'in', '5']
|
||||
self.check_call.assert_called_once_with(cmd)
|
||||
self.clear_unit_paused.assert_called_once_with()
|
||||
self.assess_status.assert_called_once_with()
|
||||
|
||||
|
||||
class MainTestCase(CharmTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(MainTestCase, self).setUp(actions, ["action_fail"])
|
||||
|
||||
def test_invokes_action(self):
|
||||
dummy_calls = []
|
||||
|
||||
def dummy_action(args):
|
||||
dummy_calls.append(True)
|
||||
|
||||
with mock.patch.dict(actions.ACTIONS, {"foo": dummy_action}):
|
||||
actions.main(["foo"])
|
||||
self.assertEqual(dummy_calls, [True])
|
||||
|
||||
def test_unknown_action(self):
|
||||
"""Unknown actions aren't a traceback."""
|
||||
exit_string = actions.main(["foo"])
|
||||
self.assertEqual("Action foo undefined", exit_string)
|
||||
|
||||
def test_failing_action(self):
|
||||
"""Actions which traceback trigger action_fail() calls."""
|
||||
dummy_calls = []
|
||||
|
||||
self.action_fail.side_effect = dummy_calls.append
|
||||
|
||||
def dummy_action(args):
|
||||
raise ValueError("uh oh")
|
||||
|
||||
with mock.patch.dict(actions.ACTIONS, {"foo": dummy_action}):
|
||||
actions.main(["foo"])
|
||||
self.assertEqual(dummy_calls, ["Action foo failed: uh oh"])
|
Loading…
Reference in New Issue