Merge "Roll osd ownership changes through node"

This commit is contained in:
Jenkins 2017-03-07 16:42:47 +00:00 committed by Gerrit Code Review
commit 0142d5045a
2 changed files with 366 additions and 50 deletions

View File

@ -26,19 +26,27 @@ import errno
import shutil
import pyudev
from datetime import datetime
from charmhelpers.core import hookenv
from charmhelpers.core.host import (
mkdir,
chownr,
service_restart,
cmp_pkgrevno,
lsb_release,
cmp_pkgrevno, service_stop, mounts, service_start)
mkdir,
mounts,
owner,
service_restart,
service_start,
service_stop)
from charmhelpers.core.hookenv import (
log,
ERROR,
cached,
config,
log,
status_set,
WARNING, DEBUG, config)
DEBUG,
ERROR,
WARNING)
from charmhelpers.core.services import render_template
from charmhelpers.fetch import (
apt_cache,
@ -55,6 +63,9 @@ from charmhelpers.contrib.storage.linux.utils import (
from charmhelpers.contrib.openstack.utils import (
get_os_codename_install_source)
CEPH_BASE_DIR = os.path.join(os.sep, 'var', 'lib', 'ceph')
OSD_BASE_DIR = os.path.join(CEPH_BASE_DIR, 'osd')
LEADER = 'leader'
PEON = 'peon'
QUORUM = [LEADER, PEON]
@ -556,6 +567,42 @@ def get_osd_tree(service):
raise
def _get_child_dirs(path):
"""Returns a list of directory names in the specified path.
:param path: a full path listing of the parent directory to return child
directory names
:return: list. A list of child directories under the parent directory
:raises: ValueError if the specified path does not exist or is not a
directory,
OSError if an error occurs reading the directory listing
"""
if not os.path.exists(path):
raise ValueError('Specfied path "%s" does not exist' % path)
if not os.path.isdir(path):
raise ValueError('Specified path "%s" is not a directory' % path)
return filter(os.path.isdir, os.path.listdir(path))
def _get_osd_num_from_dirname(dirname):
"""Parses the dirname and returns the OSD id.
Parses a string in the form of 'ceph-{osd#}' and returns the osd number
from the directory name.
:param dirname: the directory name to return the OSD number from
:return int: the osd number the directory name corresponds to
:raises ValueError: if the osd number cannot be parsed from the provided
directory name.
"""
match = re.search('ceph-(?P<osd_id>\d+)', dirname)
if not match:
raise ValueError("dirname not in correct format: %s" % dirname)
return match.group('osd_id')
def get_local_osd_ids():
"""
This will list the /var/lib/ceph/osd/* directories and try
@ -1655,42 +1702,198 @@ def upgrade_osd(new_version):
add_source(config('source'), config('key'))
apt_update(fatal=True)
except subprocess.CalledProcessError as err:
log("Adding the ceph source failed with message: {}".format(
log("Adding the ceph sources failed with message: {}".format(
err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
try:
if systemd():
for osd_id in get_local_osd_ids():
service_stop('ceph-osd@{}'.format(osd_id))
else:
service_stop('ceph-osd-all')
# Upgrade the packages before restarting the daemons.
status_set('maintenance', 'Upgrading packages to %s' % new_version)
apt_install(packages=PACKAGES, fatal=True)
# Ensure the files and directories under /var/lib/ceph is chowned
# properly as part of the move to the Jewel release, which moved the
# ceph daemons to running as ceph:ceph instead of root:root. Only do
# it when necessary as this is an expensive operation to run.
if new_version == 'jewel':
owner = ceph_user()
status_set('maintenance', 'Updating file ownership for OSDs')
chownr(path=os.path.join(os.sep, "var", "lib", "ceph"),
owner=owner,
group=owner,
follow_links=True)
# If the upgrade does not need an ownership update of any of the
# directories in the osd service directory, then simply restart
# all of the OSDs at the same time as this will be the fastest
# way to update the code on the node.
if not dirs_need_ownership_update('osd'):
log('Restarting all OSDs to load new binaries', DEBUG)
service_restart('ceph-osd-all')
return
if systemd():
for osd_id in get_local_osd_ids():
service_start('ceph-osd@{}'.format(osd_id))
else:
service_start('ceph-osd-all')
except subprocess.CalledProcessError as err:
# Need to change the ownership of all directories which are not OSD
# directories as well.
# TODO - this should probably be moved to the general upgrade function
# and done before mon/osd.
update_owner(CEPH_BASE_DIR, recurse_dirs=False)
non_osd_dirs = filter(lambda x: not x == 'osd',
os.listdir(CEPH_BASE_DIR))
non_osd_dirs = map(lambda x: os.path.join(CEPH_BASE_DIR, x),
non_osd_dirs)
for path in non_osd_dirs:
update_owner(path)
# Fast service restart wasn't an option because each of the OSD
# directories need the ownership updated for all the files on
# the OSD. Walk through the OSDs one-by-one upgrading the OSD.
for osd_dir in _get_child_dirs(OSD_BASE_DIR):
try:
osd_num = _get_osd_num_from_dirname(osd_dir)
_upgrade_single_osd(osd_num, osd_dir)
except ValueError as ex:
# Directory could not be parsed - junk directory?
log('Could not parse osd directory %s: %s' % (osd_dir, ex),
WARNING)
continue
except (subprocess.CalledProcessError, IOError) as err:
log("Stopping ceph and upgrading packages failed "
"with message: {}".format(err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
def _upgrade_single_osd(osd_num, osd_dir):
"""Upgrades the single OSD directory.
:param osd_num: the num of the OSD
:param osd_dir: the directory of the OSD to upgrade
:raises CalledProcessError: if an error occurs in a command issued as part
of the upgrade process
:raises IOError: if an error occurs reading/writing to a file as part
of the upgrade process
"""
stop_osd(osd_num)
disable_osd(osd_num)
update_owner(os.path.join(OSD_BASE_DIR, osd_dir))
enable_osd(osd_num)
start_osd(osd_num)
def stop_osd(osd_num):
"""Stops the specified OSD number.
:param osd_num: the osd number to stop
"""
if systemd():
service_stop('ceph-osd@{}'.format(osd_num))
else:
service_stop('ceph-osd', id=osd_num)
def start_osd(osd_num):
"""Starts the specified OSD number.
:param osd_num: the osd number to start.
"""
if systemd():
service_start('ceph-osd@{}'.format(osd_num))
else:
service_start('ceph-osd', id=osd_num)
def disable_osd(osd_num):
"""Disables the specified OSD number.
Ensures that the specified osd will not be automatically started at the
next reboot of the system. Due to differences between init systems,
this method cannot make any guarantees that the specified osd cannot be
started manually.
:param osd_num: the osd id which should be disabled.
:raises CalledProcessError: if an error occurs invoking the systemd cmd
to disable the OSD
:raises IOError, OSError: if the attempt to read/remove the ready file in
an upstart enabled system fails
"""
if systemd():
# When running under systemd, the individual ceph-osd daemons run as
# templated units and can be directly addressed by referring to the
# templated service name ceph-osd@<osd_num>. Additionally, systemd
# allows one to disable a specific templated unit by running the
# 'systemctl disable ceph-osd@<osd_num>' command. When disabled, the
# OSD should remain disabled until re-enabled via systemd.
# Note: disabling an already disabled service in systemd returns 0, so
# no need to check whether it is enabled or not.
cmd = ['systemctl', 'disable', 'ceph-osd@{}'.format(osd_num)]
subprocess.check_call(cmd)
else:
# Neither upstart nor the ceph-osd upstart script provides for
# disabling the starting of an OSD automatically. The specific OSD
# cannot be prevented from running manually, however it can be
# prevented from running automatically on reboot by removing the
# 'ready' file in the OSD's root directory. This is due to the
# ceph-osd-all upstart script checking for the presence of this file
# before starting the OSD.
ready_file = os.path.join(OSD_BASE_DIR, 'ceph-{}'.format(osd_num),
'ready')
if os.path.exists(ready_file):
os.unlink(ready_file)
def enable_osd(osd_num):
"""Enables the specified OSD number.
Ensures that the specified osd_num will be enabled and ready to start
automatically in the event of a reboot.
:param osd_num: the osd id which should be enabled.
:raises CalledProcessError: if the call to the systemd command issued
fails when enabling the service
:raises IOError: if the attempt to write the ready file in an usptart
enabled system fails
"""
if systemd():
cmd = ['systemctl', 'enable', 'ceph-osd@{}'.format(osd_num)]
subprocess.check_call(cmd)
else:
# When running on upstart, the OSDs are started via the ceph-osd-all
# upstart script which will only start the osd if it has a 'ready'
# file. Make sure that file exists.
ready_file = os.path.join(OSD_BASE_DIR, 'ceph-{}'.format(osd_num),
'ready')
with open(ready_file, 'w') as f:
f.write('ready')
# Make sure the correct user owns the file. It shouldn't be necessary
# as the upstart script should run with root privileges, but its better
# to have all the files matching ownership.
update_owner(ready_file)
def update_owner(path, recurse_dirs=True):
"""Changes the ownership of the specified path.
Changes the ownership of the specified path to the new ceph daemon user
using the system's native chown functionality. This may take awhile,
so this method will issue a set_status for any changes of ownership which
recurses into directory structures.
:param path: the path to recursively change ownership for
:param recurse_dirs: boolean indicating whether to recursively change the
ownership of all the files in a path's subtree or to
simply change the ownership of the path.
:raises CalledProcessError: if an error occurs issuing the chown system
command
"""
user = ceph_user()
user_group = '{ceph_user}:{ceph_user}'.format(ceph_user=user)
cmd = ['chown', user_group, path]
if os.path.isdir(path) and recurse_dirs:
status_set('maintenance', ('Updating ownership of %s to %s' %
(path.split('/')[-1], user)))
cmd.insert('-R', 1)
log('Changing ownership of {path} to {user}'.format(
path=path, user=user_group), DEBUG)
start = datetime.now()
subprocess.check_call(cmd)
elapsed_time = (datetime.now() - start)
log('Took {secs} seconds to change the ownership of path: {path}'.format(
secs=elapsed_time.total_seconds(), path=path), DEBUG)
def list_pools(service):
"""
This will list the current pools that Ceph has
@ -1709,6 +1912,39 @@ def list_pools(service):
log("rados lspools failed with error: {}".format(err.output))
raise
def dirs_need_ownership_update(service):
"""Determines if directories still need change of ownership.
Examines the set of directories under the /var/lib/ceph/{service} directory
and determines if they have the correct ownership or not. This is
necessary due to the upgrade from Hammer to Jewel where the daemon user
changes from root: to ceph:.
:param service: the name of the service folder to check (e.g. osd, mon)
:return: boolean. True if the directories need a change of ownership,
False otherwise.
:raises IOError: if an error occurs reading the file stats from one of
the child directories.
:raises OSError: if the specified path does not exist or some other error
"""
expected_owner = expected_group = ceph_user()
path = os.path.join(CEPH_BASE_DIR, service)
for child in _get_child_dirs(path):
child_path = os.path.join(path, child)
curr_owner, curr_group = owner(child_path)
if (curr_owner == expected_owner) and (curr_group == expected_group):
continue
log('Directory "%s" needs its ownership updated' % child_path,
DEBUG)
return True
# All child directories had the expected ownership
return False
# A dict of valid ceph upgrade paths. Mapping is old -> new
UPGRADE_PATHS = {
'firefly': 'hammer',

View File

@ -13,9 +13,11 @@
# limitations under the License.
import time
import os
import unittest
import sys
from mock import patch, call
from mock import patch, call, mock_open
import ceph
from ceph import CrushLocation
@ -61,10 +63,10 @@ def monitor_key_side_effect(*args):
class UpgradeRollingTestCase(unittest.TestCase):
@patch('ceph.dirs_need_ownership_update')
@patch('ceph.apt_install')
@patch('ceph.chownr')
@patch('ceph.service_stop')
@patch('ceph.service_start')
@patch('ceph.service_restart')
@patch('ceph.log')
@patch('ceph.status_set')
@patch('ceph.apt_update')
@ -75,16 +77,16 @@ class UpgradeRollingTestCase(unittest.TestCase):
@patch('ceph.config')
def test_upgrade_osd_hammer(self, config, get_version, systemd, local_osds,
add_source, apt_update, status_set, log,
service_start, service_stop, chownr,
apt_install):
service_restart, chownr, apt_install,
dirs_need_ownership_update):
config.side_effect = config_side_effect
get_version.side_effect = [0.80, 0.94]
systemd.return_value = False
local_osds.return_value = [0, 1, 2]
dirs_need_ownership_update.return_value = False
ceph.upgrade_osd('hammer')
service_stop.assert_called_with('ceph-osd-all')
service_start.assert_called_with('ceph-osd-all')
service_restart.assert_called_with('ceph-osd-all')
status_set.assert_has_calls([
call('maintenance', 'Upgrading osd'),
])
@ -97,10 +99,12 @@ class UpgradeRollingTestCase(unittest.TestCase):
# Make sure on an Upgrade to Hammer that chownr was NOT called.
assert not chownr.called
@patch('ceph._upgrade_single_osd')
@patch('ceph.update_owner')
@patch('os.listdir')
@patch('ceph._get_child_dirs')
@patch('ceph.dirs_need_ownership_update')
@patch('ceph.apt_install')
@patch('ceph.chownr')
@patch('ceph.service_stop')
@patch('ceph.service_start')
@patch('ceph.log')
@patch('ceph.status_set')
@patch('ceph.apt_update')
@ -111,19 +115,31 @@ class UpgradeRollingTestCase(unittest.TestCase):
@patch('ceph.config')
def test_upgrade_osd_jewel(self, config, get_version, systemd,
local_osds, add_source, apt_update, status_set,
log, service_start, service_stop, chownr,
apt_install):
log, apt_install, dirs_need_ownership_update,
_get_child_dirs, listdir, update_owner,
_upgrade_single_osd):
config.side_effect = config_side_effect
get_version.side_effect = [0.94, 10.1]
systemd.return_value = False
local_osds.return_value = [0, 1, 2]
listdir.return_value = ['osd', 'mon', 'fs']
_get_child_dirs.return_value = ['ceph-0', 'ceph-1', 'ceph-2']
dirs_need_ownership_update.return_value = True
ceph.upgrade_osd('jewel')
service_stop.assert_called_with('ceph-osd-all')
service_start.assert_called_with('ceph-osd-all')
update_owner.assert_has_calls([
call(ceph.CEPH_BASE_DIR, recurse_dirs=False),
call(os.path.join(ceph.CEPH_BASE_DIR, 'mon')),
call(os.path.join(ceph.CEPH_BASE_DIR, 'fs')),
])
_upgrade_single_osd.assert_has_calls([
call('0', 'ceph-0'),
call('1', 'ceph-1'),
call('2', 'ceph-2'),
])
status_set.assert_has_calls([
call('maintenance', 'Upgrading osd'),
call('maintenance', 'Updating file ownership for OSDs')
call('maintenance', 'Upgrading packages to jewel')
])
log.assert_has_calls(
[
@ -131,12 +147,76 @@ class UpgradeRollingTestCase(unittest.TestCase):
call('Upgrading to: jewel')
]
)
chownr.assert_has_calls(
[
call(group='ceph', owner='ceph', path='/var/lib/ceph',
follow_links=True)
]
)
@patch.object(ceph, 'stop_osd')
@patch.object(ceph, 'disable_osd')
@patch.object(ceph, 'update_owner')
@patch.object(ceph, 'enable_osd')
@patch.object(ceph, 'start_osd')
def test_upgrade_single_osd(self, start_osd, enable_osd, update_owner,
disable_osd, stop_osd):
ceph._upgrade_single_osd(1, 'ceph-1')
stop_osd.assert_called_with(1)
disable_osd.assert_called_with(1)
update_owner.assert_called_with('/var/lib/ceph/osd/ceph-1')
enable_osd.assert_called_with(1)
start_osd.assert_called_with(1)
@patch.object(ceph, 'systemd')
@patch.object(ceph, 'service_stop')
def test_stop_osd(self, service_stop, systemd):
systemd.return_value = False
ceph.stop_osd(1)
service_stop.assert_called_with('ceph-osd', id=1)
systemd.return_value = True
ceph.stop_osd(2)
service_stop.assert_called_with('ceph-osd@2')
@patch.object(ceph, 'systemd')
@patch.object(ceph, 'service_start')
def test_start_osd(self, service_start, systemd):
systemd.return_value = False
ceph.start_osd(1)
service_start.assert_called_with('ceph-osd', id=1)
systemd.return_value = True
ceph.start_osd(2)
service_start.assert_called_with('ceph-osd@2')
@patch('subprocess.check_call')
@patch('os.path.exists')
@patch('os.unlink')
@patch('ceph.systemd')
def test_disable_osd(self, systemd, unlink, exists, check_call):
systemd.return_value = True
ceph.disable_osd(4)
check_call.assert_called_with(['systemctl', 'disable', 'ceph-osd@4'])
exists.return_value = True
systemd.return_value = False
ceph.disable_osd(3)
unlink.assert_called_with('/var/lib/ceph/osd/ceph-3/ready')
@patch('subprocess.check_call')
@patch('ceph.update_owner')
@patch('ceph.systemd')
def test_enable_osd(self, systemd, update_owner, check_call):
systemd.return_value = True
ceph.enable_osd(5)
check_call.assert_called_with(['systemctl', 'enable', 'ceph-osd@5'])
systemd.return_value = False
mo = mock_open()
# Detect which builtin open version we need to mock based on
# the python version.
bs = 'builtins' if sys.version_info > (3, 0) else '__builtin__'
with patch('%s.open' % bs, mo):
ceph.enable_osd(6)
mo.assert_called_once_with('/var/lib/ceph/osd/ceph-6/ready', 'w')
handle = mo()
handle.write.assert_called_with('ready')
update_owner.assert_called_with('/var/lib/ceph/osd/ceph-6/ready')
@patch('ceph.socket')
@patch('ceph.get_osd_tree')