Rolling upgrades for Ceph Monitor Cluster

This change adds functionality to allow the ceph monitor cluster to
upgrade in a serial rolled fashion.  This will use the ceph monitor
cluster itself as a locking mechanism and only allows 1 ceph monitor
at a time to upgrade.  If a monitor has been waiting on the previous
server for more than 10 minutes and hasn't seen it finish it will
assume it died during the upgrade and proceed with its own upgrade.

Limitations of this patch: As long as the monitor cluster does not
split brain this should work fine.  Also this assumes that NTP
among the ceph cluster is fairly accurate.

Change-Id: I7254261b6206f0ec34a8aa1e94e7b06ae308d8f8
This commit is contained in:
Chris Holcombe 2016-03-01 11:03:18 -08:00
parent 42233da15e
commit 67feefd6f4
5 changed files with 538 additions and 32 deletions

1
.gitignore vendored
View File

@ -3,4 +3,5 @@ bin
.testrepository
.tox
*.sw[nop]
.idea
*.pyc

View File

@ -1,4 +1,3 @@
#
# Copyright 2012 Canonical Ltd.
#
@ -6,35 +5,32 @@
# James Page <james.page@canonical.com>
# Paul Collins <paul.collins@canonical.com>
#
import json
import subprocess
import time
import os
import re
import sys
from charmhelpers.contrib.storage.linux.utils import (
is_block_device,
zap_disk,
is_device_mounted)
from charmhelpers.core.host import (
mkdir,
chownr,
service_restart,
cmp_pkgrevno,
lsb_release
)
lsb_release,
cmp_pkgrevno)
from charmhelpers.core.hookenv import (
log,
ERROR,
WARNING,
cached,
status_set,
)
WARNING)
from charmhelpers.fetch import (
apt_cache
)
from charmhelpers.contrib.storage.linux.utils import (
zap_disk,
is_block_device,
is_device_mounted,
)
from utils import (
get_unit_hostname,
)
@ -53,8 +49,32 @@ def ceph_user():
return "root"
def get_local_mon_ids():
"""
This will list the /var/lib/ceph/mon/* directories and try
to split the ID off of the directory name and return it in
a list
:return: list. A list of monitor identifiers :raise: OSError if
something goes wrong with listing the directory.
"""
mon_ids = []
mon_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'mon')
if os.path.exists(mon_path):
try:
dirs = os.listdir(mon_path)
for mon_dir in dirs:
# Basically this takes everything after ceph- as the monitor ID
match = re.search('ceph-(?P<mon_id>.*)', mon_dir)
if match:
mon_ids.append(match.group('mon_id'))
except OSError:
raise
return mon_ids
def get_version():
'''Derive Ceph release from an installed package.'''
"""Derive Ceph release from an installed package."""
import apt_pkg as apt
cache = apt_cache()
@ -63,7 +83,7 @@ def get_version():
pkg = cache[package]
except:
# the package is unknown to the current apt cache.
e = 'Could not determine version of package with no installation '\
e = 'Could not determine version of package with no installation ' \
'candidate: %s' % package
error_out(e)
@ -165,6 +185,7 @@ def add_bootstrap_hint(peer):
# Ignore any errors for this call
subprocess.call(cmd)
DISK_FORMATS = [
'xfs',
'ext4',
@ -178,7 +199,7 @@ def is_osd_disk(dev):
info = info.split("\n") # IGNORE:E1103
for line in info:
if line.startswith(
'Partition GUID code: 4FBD7E29-9D25-41B8-AFD0-062C0CEFF05D'
'Partition GUID code: 4FBD7E29-9D25-41B8-AFD0-062C0CEFF05D'
):
return True
except subprocess.CalledProcessError:
@ -213,7 +234,7 @@ def is_bootstrapped():
def wait_for_bootstrap():
while (not is_bootstrapped()):
while not is_bootstrapped():
time.sleep(3)
@ -243,7 +264,6 @@ def generate_monitor_secret():
return "{}==".format(res.split('=')[1].strip())
# OSD caps taken from ceph-create-keys
_osd_bootstrap_caps = {
'mon': [
@ -310,6 +330,9 @@ _radosgw_caps = {
'mon': ['allow rw'],
'osd': ['allow rwx']
}
_upgrade_caps = {
'mon': ['allow rwx']
}
def get_radosgw_key():
@ -321,6 +344,26 @@ _default_caps = {
'osd': ['allow rwx']
}
admin_caps = {
'mds': ['allow'],
'mon': ['allow *'],
'osd': ['allow *']
}
osd_upgrade_caps = {
'mon': ['allow command "config-key"',
'allow command "osd tree"',
'allow command "config-key list"',
'allow command "config-key put"',
'allow command "config-key get"',
'allow command "config-key exists"',
]
}
def get_upgrade_key():
return get_named_key('upgrade-osd', _upgrade_caps)
def get_named_key(name, caps=None):
caps = caps or _default_caps
@ -346,7 +389,7 @@ def get_named_key(name, caps=None):
def upgrade_key_caps(key, caps):
''' Upgrade key to have capabilities caps '''
""" Upgrade key to have capabilities caps """
if not is_leader():
# Not the MON leader OR not clustered
return
@ -440,7 +483,7 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False,
log('Path {} is not a block device - bailing'.format(dev))
return
if (is_osd_disk(dev) and not reformat_osd):
if is_osd_disk(dev) and not reformat_osd:
log('Looks like {} is already an OSD, skipping.'.format(dev))
return

View File

@ -10,10 +10,17 @@
import glob
import os
import random
import shutil
import socket
import subprocess
import sys
import uuid
import time
import ceph
from charmhelpers.core import host
from charmhelpers.core import hookenv
from charmhelpers.core.hookenv import (
log,
DEBUG,
@ -29,15 +36,14 @@ from charmhelpers.core.hookenv import (
service_name,
relations_of_type,
status_set,
local_unit
)
local_unit)
from charmhelpers.core.host import (
service_restart,
mkdir,
write_file,
rsync,
cmp_pkgrevno
)
cmp_pkgrevno,
service_stop, service_start)
from charmhelpers.fetch import (
apt_install,
apt_update,
@ -52,7 +58,11 @@ from charmhelpers.contrib.network.ip import (
)
from charmhelpers.core.sysctl import create as create_sysctl
from charmhelpers.core.templating import render
from charmhelpers.contrib.storage.linux.ceph import (
monitor_key_set,
monitor_key_exists,
monitor_key_get,
get_mon_map)
from utils import (
get_networks,
get_public_addr,
@ -61,7 +71,6 @@ from utils import (
from ceph_broker import (
process_requests
)
from charmhelpers.contrib.charmsupport import nrpe
hooks = Hooks()
@ -71,6 +80,186 @@ SCRIPTS_DIR = '/usr/local/bin'
STATUS_FILE = '/var/lib/nagios/cat-ceph-status.txt'
STATUS_CRONFILE = '/etc/cron.d/cat-ceph-health'
# A dict of valid ceph upgrade paths. Mapping is old -> new
upgrade_paths = {
'cloud:trusty-juno': 'cloud:trusty-kilo',
'cloud:trusty-kilo': 'cloud:trusty-liberty',
'cloud:trusty-liberty': 'cloud:trusty-mitaka',
}
def pretty_print_upgrade_paths():
lines = []
for key, value in upgrade_paths.iteritems():
lines.append("{} -> {}".format(key, value))
return lines
def check_for_upgrade():
release_info = host.lsb_release()
if not release_info['DISTRIB_CODENAME'] == 'trusty':
log("Invalid upgrade path from {}. Only trusty is currently "
"supported".format(release_info['DISTRIB_CODENAME']))
return
c = hookenv.config()
old_version = c.previous('source')
log('old_version: {}'.format(old_version))
# Strip all whitespace
new_version = hookenv.config('source')
if new_version:
# replace all whitespace
new_version = new_version.replace(' ', '')
log('new_version: {}'.format(new_version))
if old_version in upgrade_paths:
if new_version == upgrade_paths[old_version]:
log("{} to {} is a valid upgrade path. Proceeding.".format(
old_version, new_version))
roll_monitor_cluster(new_version)
else:
# Log a helpful error message
log("Invalid upgrade path from {} to {}. "
"Valid paths are: {}".format(old_version,
new_version,
pretty_print_upgrade_paths()))
def lock_and_roll(my_name):
start_timestamp = time.time()
log('monitor_key_set {}_start {}'.format(my_name, start_timestamp))
monitor_key_set('admin', "{}_start".format(my_name), start_timestamp)
log("Rolling")
# This should be quick
upgrade_monitor()
log("Done")
stop_timestamp = time.time()
# Set a key to inform others I am finished
log('monitor_key_set {}_done {}'.format(my_name, stop_timestamp))
monitor_key_set('admin', "{}_done".format(my_name), stop_timestamp)
def wait_on_previous_node(previous_node):
log("Previous node is: {}".format(previous_node))
previous_node_finished = monitor_key_exists(
'admin',
"{}_done".format(previous_node))
while previous_node_finished is False:
log("{} is not finished. Waiting".format(previous_node))
# Has this node been trying to upgrade for longer than
# 10 minutes?
# If so then move on and consider that node dead.
# NOTE: This assumes the clusters clocks are somewhat accurate
# If the hosts clock is really far off it may cause it to skip
# the previous node even though it shouldn't.
current_timestamp = time.time()
previous_node_start_time = monitor_key_get(
'admin',
"{}_start".format(previous_node))
if (current_timestamp - (10 * 60)) > previous_node_start_time:
# Previous node is probably dead. Lets move on
if previous_node_start_time is not None:
log(
"Waited 10 mins on node {}. current time: {} > "
"previous node start time: {} Moving on".format(
previous_node,
(current_timestamp - (10 * 60)),
previous_node_start_time))
return
else:
# I have to wait. Sleep a random amount of time and then
# check if I can lock,upgrade and roll.
wait_time = random.randrange(5, 30)
log('waiting for {} seconds'.format(wait_time))
time.sleep(wait_time)
previous_node_finished = monitor_key_exists(
'admin',
"{}_done".format(previous_node))
# Edge cases:
# 1. Previous node dies on upgrade, can we retry?
def roll_monitor_cluster(new_version):
"""
This is tricky to get right so here's what we're going to do.
There's 2 possible cases: Either I'm first in line or not.
If I'm not first in line I'll wait a random time between 5-30 seconds
and test to see if the previous monitor is upgraded yet.
"""
log('roll_monitor_cluster called with {}'.format(new_version))
my_name = socket.gethostname()
monitor_list = []
mon_map = get_mon_map('admin')
if mon_map['monmap']['mons']:
for mon in mon_map['monmap']['mons']:
monitor_list.append(mon['name'])
else:
status_set('blocked', 'Unable to get monitor cluster information')
sys.exit(1)
log('monitor_list: {}'.format(monitor_list))
# A sorted list of osd unit names
mon_sorted_list = sorted(monitor_list)
try:
position = mon_sorted_list.index(my_name)
log("upgrade position: {}".format(position))
if position == 0:
# I'm first! Roll
# First set a key to inform others I'm about to roll
lock_and_roll(my_name=my_name)
else:
# Check if the previous node has finished
status_set('blocked',
'Waiting on {} to finish upgrading'.format(
mon_sorted_list[position - 1]))
wait_on_previous_node(previous_node=mon_sorted_list[position - 1])
lock_and_roll(my_name=my_name)
except ValueError:
log("Failed to find {} in list {}.".format(
my_name, mon_sorted_list))
status_set('blocked', 'failed to upgrade monitor')
def upgrade_monitor():
current_version = ceph.get_version()
status_set("maintenance", "Upgrading monitor")
log("Current ceph version is {}".format(current_version))
new_version = config('release-version')
log("Upgrading to: {}".format(new_version))
try:
add_source(config('source'), config('key'))
apt_update(fatal=True)
except subprocess.CalledProcessError as err:
log("Adding the ceph source failed with message: {}".format(
err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
try:
if ceph.systemd():
for mon_id in ceph.get_local_mon_ids():
service_stop('ceph-mon@{}'.format(mon_id))
else:
service_stop('ceph-mon-all')
apt_install(packages=ceph.PACKAGES, fatal=True)
if ceph.systemd():
for mon_id in ceph.get_local_mon_ids():
service_start('ceph-mon@{}'.format(mon_id))
else:
service_start('ceph-mon-all')
status_set("active", "")
except subprocess.CalledProcessError as err:
log("Stopping ceph and upgrading packages failed "
"with message: {}".format(err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
def install_upstart_scripts():
# Only install upstart configurations for older versions
@ -123,6 +312,7 @@ def emit_cephconf():
install_alternative('ceph.conf', '/etc/ceph/ceph.conf',
charm_ceph_conf, 100)
JOURNAL_ZAPPED = '/var/lib/ceph/journal_zapped'
@ -131,6 +321,9 @@ def config_changed():
if config('prefer-ipv6'):
assert_charm_supports_ipv6()
# Check if an upgrade was requested
check_for_upgrade()
log('Monitor hosts are ' + repr(get_mon_hosts()))
sysctl_dict = config('sysctl')
@ -165,7 +358,7 @@ def config_changed():
emit_cephconf()
# Support use of single node ceph
if (not ceph.is_bootstrapped() and int(config('monitor-count')) == 1):
if not ceph.is_bootstrapped() and int(config('monitor-count')) == 1:
status_set('maintenance', 'Bootstrapping single Ceph MON')
ceph.bootstrap_monitor_cluster(config('monitor-secret'))
ceph.wait_for_bootstrap()
@ -188,10 +381,10 @@ def get_mon_hosts():
def get_peer_units():
'''
"""
Returns a dictionary of unit names from the mon peer relation with
a flag indicating whether the unit has presented its address
'''
"""
units = {}
units[local_unit()] = True
for relid in relation_ids('mon'):
@ -206,8 +399,7 @@ def mon_relation_joined():
public_addr = get_public_addr()
for relid in relation_ids('mon'):
relation_set(relation_id=relid,
relation_settings={'ceph-public-address':
public_addr})
relation_settings={'ceph-public-address': public_addr})
@hooks.hook('mon-relation-departed',
@ -250,7 +442,7 @@ def notify_client():
def upgrade_keys():
''' Ceph now required mon allow rw for pool creation '''
""" Ceph now required mon allow rw for pool creation """
if len(relation_ids('radosgw')) > 0:
ceph.upgrade_key_caps('client.radosgw.gateway',
ceph._radosgw_caps)
@ -272,6 +464,8 @@ def osd_relation(relid=None):
'osd_bootstrap_key': ceph.get_osd_bootstrap_key(),
'auth': config('auth-supported'),
'ceph-public-address': public_addr,
'osd_upgrade_key': ceph.get_named_key('osd-upgrade',
caps=ceph.osd_upgrade_caps),
}
relation_set(relation_id=relid,
relation_settings=data)
@ -430,6 +624,9 @@ def assess_status():
# Unit should be running and clustered, but no quorum
# TODO: should this be blocked or waiting?
status_set('blocked', 'Unit not clustered (no quorum)')
# If there's a pending lock for this unit,
# can i get the lock?
# reboot the ceph-mon process
if __name__ == '__main__':

View File

@ -24,6 +24,8 @@
# Adam Gandelman <adamg@ubuntu.com>
#
import bisect
import errno
import hashlib
import six
import os
@ -163,7 +165,7 @@ class Pool(object):
:return: None
"""
# read-only is easy, writeback is much harder
mode = get_cache_mode(cache_pool)
mode = get_cache_mode(self.service, cache_pool)
if mode == 'readonly':
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
@ -259,6 +261,133 @@ class ErasurePool(Pool):
Returns json formatted output"""
def get_mon_map(service):
"""
Returns the current monitor map.
:param service: six.string_types. The Ceph user name to run the command under
:return: json string. :raise: ValueError if the monmap fails to parse.
Also raises CalledProcessError if our ceph command fails
"""
try:
mon_status = check_output(
['ceph', '--id', service,
'mon_status', '--format=json'])
try:
return json.loads(mon_status)
except ValueError as v:
log("Unable to parse mon_status json: {}. Error: {}".format(
mon_status, v.message))
raise
except CalledProcessError as e:
log("mon_status command failed with message: {}".format(
e.message))
raise
def hash_monitor_names(service):
"""
Uses the get_mon_map() function to get information about the monitor
cluster.
Hash the name of each monitor. Return a sorted list of monitor hashes
in an ascending order.
:param service: six.string_types. The Ceph user name to run the command under
:rtype : dict. json dict of monitor name, ip address and rank
example: {
'name': 'ip-172-31-13-165',
'rank': 0,
'addr': '172.31.13.165:6789/0'}
"""
try:
hash_list = []
monitor_list = get_mon_map(service=service)
if monitor_list['monmap']['mons']:
for mon in monitor_list['monmap']['mons']:
hash_list.append(
hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
return sorted(hash_list)
else:
return None
except (ValueError, CalledProcessError):
raise
def monitor_key_delete(service, key):
"""
Delete a key and value pair from the monitor cluster
:param service: six.string_types. The Ceph user name to run the command under
Deletes a key value pair on the monitor cluster.
:param key: six.string_types. The key to delete.
"""
try:
check_output(
['ceph', '--id', service,
'config-key', 'del', str(key)])
except CalledProcessError as e:
log("Monitor config-key put failed with message: {}".format(
e.output))
raise
def monitor_key_set(service, key, value):
"""
Sets a key value pair on the monitor cluster.
:param service: six.string_types. The Ceph user name to run the command under
:param key: six.string_types. The key to set.
:param value: The value to set. This will be converted to a string
before setting
"""
try:
check_output(
['ceph', '--id', service,
'config-key', 'put', str(key), str(value)])
except CalledProcessError as e:
log("Monitor config-key put failed with message: {}".format(
e.output))
raise
def monitor_key_get(service, key):
"""
Gets the value of an existing key in the monitor cluster.
:param service: six.string_types. The Ceph user name to run the command under
:param key: six.string_types. The key to search for.
:return: Returns the value of that key or None if not found.
"""
try:
output = check_output(
['ceph', '--id', service,
'config-key', 'get', str(key)])
return output
except CalledProcessError as e:
log("Monitor config-key get failed with message: {}".format(
e.output))
return None
def monitor_key_exists(service, key):
"""
Searches for the existence of a key in the monitor cluster.
:param service: six.string_types. The Ceph user name to run the command under
:param key: six.string_types. The key to search for
:return: Returns True if the key exists, False if not and raises an
exception if an unknown error occurs. :raise: CalledProcessError if
an unknown error occurs
"""
try:
check_call(
['ceph', '--id', service,
'config-key', 'exists', str(key)])
# I can return true here regardless because Ceph returns
# ENOENT if the key wasn't found
return True
except CalledProcessError as e:
if e.returncode == errno.ENOENT:
return False
else:
log("Unknown error from ceph config-get exists: {} {}".format(
e.returncode, e.output))
raise
def get_erasure_profile(service, name):
"""
:param service: six.string_types. The Ceph user name to run the command under

View File

@ -0,0 +1,136 @@
__author__ = 'chris'
import time
from mock import patch, call, MagicMock
import sys
sys.path.append('/home/chris/repos/ceph-mon/hooks')
import test_utils
import ceph_hooks
TO_PATCH = [
'hookenv',
'status_set',
'config',
'ceph',
'log',
'add_source',
'apt_update',
'apt_install',
'service_stop',
'service_start',
'host',
]
def config_side_effect(*args):
if args[0] == 'source':
return 'cloud:trusty-kilo'
elif args[0] == 'key':
return 'key'
elif args[0] == 'release-version':
return 'cloud:trusty-kilo'
previous_node_start_time = time.time() - (9 * 60)
def monitor_key_side_effect(*args):
if args[1] == \
'ip-192-168-1-2_done':
return False
elif args[1] == \
'ip-192-168-1-2_start':
# Return that the previous node started 9 minutes ago
return previous_node_start_time
class UpgradeRollingTestCase(test_utils.CharmTestCase):
def setUp(self):
super(UpgradeRollingTestCase, self).setUp(ceph_hooks, TO_PATCH)
@patch('ceph_hooks.roll_monitor_cluster')
def test_check_for_upgrade(self, roll_monitor_cluster):
self.host.lsb_release.return_value = {
'DISTRIB_CODENAME': 'trusty',
}
previous_mock = MagicMock().return_value
previous_mock.previous.return_value = "cloud:trusty-juno"
self.hookenv.config.side_effect = [previous_mock,
config_side_effect('source')]
ceph_hooks.check_for_upgrade()
roll_monitor_cluster.assert_called_with('cloud:trusty-kilo')
@patch('ceph_hooks.upgrade_monitor')
@patch('ceph_hooks.monitor_key_set')
def test_lock_and_roll(self, monitor_key_set, upgrade_monitor):
monitor_key_set.monitor_key_set.return_value = None
ceph_hooks.lock_and_roll(my_name='ip-192-168-1-2')
upgrade_monitor.assert_called_once_with()
def test_upgrade_monitor(self):
self.config.side_effect = config_side_effect
self.ceph.get_version.return_value = "0.80"
self.ceph.systemd.return_value = False
ceph_hooks.upgrade_monitor()
self.service_stop.assert_called_with('ceph-mon-all')
self.service_start.assert_called_with('ceph-mon-all')
self.status_set.assert_has_calls([
call('maintenance', 'Upgrading monitor'),
call('active', '')
])
@patch('ceph_hooks.lock_and_roll')
@patch('ceph_hooks.wait_on_previous_node')
@patch('ceph_hooks.get_mon_map')
@patch('ceph_hooks.socket')
def test_roll_monitor_cluster_second(self,
socket,
get_mon_map,
wait_on_previous_node,
lock_and_roll):
wait_on_previous_node.return_value = None
socket.gethostname.return_value = "ip-192-168-1-3"
get_mon_map.return_value = {
'monmap': {
'mons': [
{
'name': 'ip-192-168-1-2',
},
{
'name': 'ip-192-168-1-3',
},
]
}
}
ceph_hooks.roll_monitor_cluster('0.94.1')
self.status_set.assert_called_with(
'blocked',
'Waiting on ip-192-168-1-2 to finish upgrading')
lock_and_roll.assert_called_with(my_name="ip-192-168-1-3")
@patch('ceph_hooks.monitor_key_get')
@patch('ceph_hooks.monitor_key_exists')
def test_wait_on_previous_node(self,
monitor_key_exists,
monitor_key_get):
monitor_key_get.side_effect = monitor_key_side_effect
monitor_key_exists.return_value = False
ceph_hooks.wait_on_previous_node("ip-192-168-1-2")
# Make sure we checked to see if the previous node started
monitor_key_get.assert_has_calls(
[call('admin', 'ip-192-168-1-2_start')]
)
# Make sure we checked to see if the previous node was finished
monitor_key_exists.assert_has_calls(
[call('admin', 'ip-192-168-1-2_done')]
)
# Make sure we waited at last once before proceeding
self.log.assert_has_calls(
[call('Previous node is: ip-192-168-1-2')],
[call('ip-192-168-1-2 is not finished. Waiting')],
)