diff --git a/.gitignore b/.gitignore index f529536..9ed8f1c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ bin .testrepository .tox *.sw[nop] +.idea *.pyc diff --git a/hooks/ceph.py b/hooks/ceph.py index 5523b08..01a2a56 100644 --- a/hooks/ceph.py +++ b/hooks/ceph.py @@ -1,4 +1,3 @@ - # # Copyright 2012 Canonical Ltd. # @@ -6,35 +5,32 @@ # James Page # Paul Collins # - import json import subprocess import time import os import re import sys + +from charmhelpers.contrib.storage.linux.utils import ( + is_block_device, + zap_disk, + is_device_mounted) from charmhelpers.core.host import ( mkdir, chownr, service_restart, - cmp_pkgrevno, - lsb_release -) + lsb_release, + cmp_pkgrevno) from charmhelpers.core.hookenv import ( log, ERROR, - WARNING, cached, status_set, -) + WARNING) from charmhelpers.fetch import ( apt_cache ) -from charmhelpers.contrib.storage.linux.utils import ( - zap_disk, - is_block_device, - is_device_mounted, -) from utils import ( get_unit_hostname, ) @@ -53,8 +49,32 @@ def ceph_user(): return "root" +def get_local_mon_ids(): + """ + This will list the /var/lib/ceph/mon/* directories and try + to split the ID off of the directory name and return it in + a list + + :return: list. A list of monitor identifiers :raise: OSError if + something goes wrong with listing the directory. + """ + mon_ids = [] + mon_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'mon') + if os.path.exists(mon_path): + try: + dirs = os.listdir(mon_path) + for mon_dir in dirs: + # Basically this takes everything after ceph- as the monitor ID + match = re.search('ceph-(?P.*)', mon_dir) + if match: + mon_ids.append(match.group('mon_id')) + except OSError: + raise + return mon_ids + + def get_version(): - '''Derive Ceph release from an installed package.''' + """Derive Ceph release from an installed package.""" import apt_pkg as apt cache = apt_cache() @@ -63,7 +83,7 @@ def get_version(): pkg = cache[package] except: # the package is unknown to the current apt cache. - e = 'Could not determine version of package with no installation '\ + e = 'Could not determine version of package with no installation ' \ 'candidate: %s' % package error_out(e) @@ -165,6 +185,7 @@ def add_bootstrap_hint(peer): # Ignore any errors for this call subprocess.call(cmd) + DISK_FORMATS = [ 'xfs', 'ext4', @@ -178,7 +199,7 @@ def is_osd_disk(dev): info = info.split("\n") # IGNORE:E1103 for line in info: if line.startswith( - 'Partition GUID code: 4FBD7E29-9D25-41B8-AFD0-062C0CEFF05D' + 'Partition GUID code: 4FBD7E29-9D25-41B8-AFD0-062C0CEFF05D' ): return True except subprocess.CalledProcessError: @@ -213,7 +234,7 @@ def is_bootstrapped(): def wait_for_bootstrap(): - while (not is_bootstrapped()): + while not is_bootstrapped(): time.sleep(3) @@ -243,7 +264,6 @@ def generate_monitor_secret(): return "{}==".format(res.split('=')[1].strip()) - # OSD caps taken from ceph-create-keys _osd_bootstrap_caps = { 'mon': [ @@ -310,6 +330,9 @@ _radosgw_caps = { 'mon': ['allow rw'], 'osd': ['allow rwx'] } +_upgrade_caps = { + 'mon': ['allow rwx'] +} def get_radosgw_key(): @@ -321,6 +344,26 @@ _default_caps = { 'osd': ['allow rwx'] } +admin_caps = { + 'mds': ['allow'], + 'mon': ['allow *'], + 'osd': ['allow *'] +} + +osd_upgrade_caps = { + 'mon': ['allow command "config-key"', + 'allow command "osd tree"', + 'allow command "config-key list"', + 'allow command "config-key put"', + 'allow command "config-key get"', + 'allow command "config-key exists"', + ] +} + + +def get_upgrade_key(): + return get_named_key('upgrade-osd', _upgrade_caps) + def get_named_key(name, caps=None): caps = caps or _default_caps @@ -346,7 +389,7 @@ def get_named_key(name, caps=None): def upgrade_key_caps(key, caps): - ''' Upgrade key to have capabilities caps ''' + """ Upgrade key to have capabilities caps """ if not is_leader(): # Not the MON leader OR not clustered return @@ -440,7 +483,7 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False, log('Path {} is not a block device - bailing'.format(dev)) return - if (is_osd_disk(dev) and not reformat_osd): + if is_osd_disk(dev) and not reformat_osd: log('Looks like {} is already an OSD, skipping.'.format(dev)) return diff --git a/hooks/ceph_hooks.py b/hooks/ceph_hooks.py index cdf4c5b..354c155 100755 --- a/hooks/ceph_hooks.py +++ b/hooks/ceph_hooks.py @@ -10,10 +10,17 @@ import glob import os +import random import shutil +import socket +import subprocess import sys import uuid +import time + import ceph +from charmhelpers.core import host +from charmhelpers.core import hookenv from charmhelpers.core.hookenv import ( log, DEBUG, @@ -29,15 +36,14 @@ from charmhelpers.core.hookenv import ( service_name, relations_of_type, status_set, - local_unit -) + local_unit) from charmhelpers.core.host import ( service_restart, mkdir, write_file, rsync, - cmp_pkgrevno -) + cmp_pkgrevno, + service_stop, service_start) from charmhelpers.fetch import ( apt_install, apt_update, @@ -52,7 +58,11 @@ from charmhelpers.contrib.network.ip import ( ) from charmhelpers.core.sysctl import create as create_sysctl from charmhelpers.core.templating import render - +from charmhelpers.contrib.storage.linux.ceph import ( + monitor_key_set, + monitor_key_exists, + monitor_key_get, + get_mon_map) from utils import ( get_networks, get_public_addr, @@ -61,7 +71,6 @@ from utils import ( from ceph_broker import ( process_requests ) - from charmhelpers.contrib.charmsupport import nrpe hooks = Hooks() @@ -71,6 +80,186 @@ SCRIPTS_DIR = '/usr/local/bin' STATUS_FILE = '/var/lib/nagios/cat-ceph-status.txt' STATUS_CRONFILE = '/etc/cron.d/cat-ceph-health' +# A dict of valid ceph upgrade paths. Mapping is old -> new +upgrade_paths = { + 'cloud:trusty-juno': 'cloud:trusty-kilo', + 'cloud:trusty-kilo': 'cloud:trusty-liberty', + 'cloud:trusty-liberty': 'cloud:trusty-mitaka', +} + + +def pretty_print_upgrade_paths(): + lines = [] + for key, value in upgrade_paths.iteritems(): + lines.append("{} -> {}".format(key, value)) + return lines + + +def check_for_upgrade(): + release_info = host.lsb_release() + if not release_info['DISTRIB_CODENAME'] == 'trusty': + log("Invalid upgrade path from {}. Only trusty is currently " + "supported".format(release_info['DISTRIB_CODENAME'])) + return + + c = hookenv.config() + old_version = c.previous('source') + log('old_version: {}'.format(old_version)) + # Strip all whitespace + new_version = hookenv.config('source') + if new_version: + # replace all whitespace + new_version = new_version.replace(' ', '') + log('new_version: {}'.format(new_version)) + + if old_version in upgrade_paths: + if new_version == upgrade_paths[old_version]: + log("{} to {} is a valid upgrade path. Proceeding.".format( + old_version, new_version)) + roll_monitor_cluster(new_version) + else: + # Log a helpful error message + log("Invalid upgrade path from {} to {}. " + "Valid paths are: {}".format(old_version, + new_version, + pretty_print_upgrade_paths())) + + +def lock_and_roll(my_name): + start_timestamp = time.time() + + log('monitor_key_set {}_start {}'.format(my_name, start_timestamp)) + monitor_key_set('admin', "{}_start".format(my_name), start_timestamp) + log("Rolling") + # This should be quick + upgrade_monitor() + log("Done") + + stop_timestamp = time.time() + # Set a key to inform others I am finished + log('monitor_key_set {}_done {}'.format(my_name, stop_timestamp)) + monitor_key_set('admin', "{}_done".format(my_name), stop_timestamp) + + +def wait_on_previous_node(previous_node): + log("Previous node is: {}".format(previous_node)) + + previous_node_finished = monitor_key_exists( + 'admin', + "{}_done".format(previous_node)) + + while previous_node_finished is False: + log("{} is not finished. Waiting".format(previous_node)) + # Has this node been trying to upgrade for longer than + # 10 minutes? + # If so then move on and consider that node dead. + + # NOTE: This assumes the clusters clocks are somewhat accurate + # If the hosts clock is really far off it may cause it to skip + # the previous node even though it shouldn't. + current_timestamp = time.time() + previous_node_start_time = monitor_key_get( + 'admin', + "{}_start".format(previous_node)) + if (current_timestamp - (10 * 60)) > previous_node_start_time: + # Previous node is probably dead. Lets move on + if previous_node_start_time is not None: + log( + "Waited 10 mins on node {}. current time: {} > " + "previous node start time: {} Moving on".format( + previous_node, + (current_timestamp - (10 * 60)), + previous_node_start_time)) + return + else: + # I have to wait. Sleep a random amount of time and then + # check if I can lock,upgrade and roll. + wait_time = random.randrange(5, 30) + log('waiting for {} seconds'.format(wait_time)) + time.sleep(wait_time) + previous_node_finished = monitor_key_exists( + 'admin', + "{}_done".format(previous_node)) + + +# Edge cases: +# 1. Previous node dies on upgrade, can we retry? +def roll_monitor_cluster(new_version): + """ + This is tricky to get right so here's what we're going to do. + There's 2 possible cases: Either I'm first in line or not. + If I'm not first in line I'll wait a random time between 5-30 seconds + and test to see if the previous monitor is upgraded yet. + """ + log('roll_monitor_cluster called with {}'.format(new_version)) + my_name = socket.gethostname() + monitor_list = [] + mon_map = get_mon_map('admin') + if mon_map['monmap']['mons']: + for mon in mon_map['monmap']['mons']: + monitor_list.append(mon['name']) + else: + status_set('blocked', 'Unable to get monitor cluster information') + sys.exit(1) + log('monitor_list: {}'.format(monitor_list)) + + # A sorted list of osd unit names + mon_sorted_list = sorted(monitor_list) + + try: + position = mon_sorted_list.index(my_name) + log("upgrade position: {}".format(position)) + if position == 0: + # I'm first! Roll + # First set a key to inform others I'm about to roll + lock_and_roll(my_name=my_name) + else: + # Check if the previous node has finished + status_set('blocked', + 'Waiting on {} to finish upgrading'.format( + mon_sorted_list[position - 1])) + wait_on_previous_node(previous_node=mon_sorted_list[position - 1]) + lock_and_roll(my_name=my_name) + except ValueError: + log("Failed to find {} in list {}.".format( + my_name, mon_sorted_list)) + status_set('blocked', 'failed to upgrade monitor') + + +def upgrade_monitor(): + current_version = ceph.get_version() + status_set("maintenance", "Upgrading monitor") + log("Current ceph version is {}".format(current_version)) + new_version = config('release-version') + log("Upgrading to: {}".format(new_version)) + + try: + add_source(config('source'), config('key')) + apt_update(fatal=True) + except subprocess.CalledProcessError as err: + log("Adding the ceph source failed with message: {}".format( + err.message)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) + try: + if ceph.systemd(): + for mon_id in ceph.get_local_mon_ids(): + service_stop('ceph-mon@{}'.format(mon_id)) + else: + service_stop('ceph-mon-all') + apt_install(packages=ceph.PACKAGES, fatal=True) + if ceph.systemd(): + for mon_id in ceph.get_local_mon_ids(): + service_start('ceph-mon@{}'.format(mon_id)) + else: + service_start('ceph-mon-all') + status_set("active", "") + except subprocess.CalledProcessError as err: + log("Stopping ceph and upgrading packages failed " + "with message: {}".format(err.message)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) + def install_upstart_scripts(): # Only install upstart configurations for older versions @@ -123,6 +312,7 @@ def emit_cephconf(): install_alternative('ceph.conf', '/etc/ceph/ceph.conf', charm_ceph_conf, 100) + JOURNAL_ZAPPED = '/var/lib/ceph/journal_zapped' @@ -131,6 +321,9 @@ def config_changed(): if config('prefer-ipv6'): assert_charm_supports_ipv6() + # Check if an upgrade was requested + check_for_upgrade() + log('Monitor hosts are ' + repr(get_mon_hosts())) sysctl_dict = config('sysctl') @@ -165,7 +358,7 @@ def config_changed(): emit_cephconf() # Support use of single node ceph - if (not ceph.is_bootstrapped() and int(config('monitor-count')) == 1): + if not ceph.is_bootstrapped() and int(config('monitor-count')) == 1: status_set('maintenance', 'Bootstrapping single Ceph MON') ceph.bootstrap_monitor_cluster(config('monitor-secret')) ceph.wait_for_bootstrap() @@ -188,10 +381,10 @@ def get_mon_hosts(): def get_peer_units(): - ''' + """ Returns a dictionary of unit names from the mon peer relation with a flag indicating whether the unit has presented its address - ''' + """ units = {} units[local_unit()] = True for relid in relation_ids('mon'): @@ -206,8 +399,7 @@ def mon_relation_joined(): public_addr = get_public_addr() for relid in relation_ids('mon'): relation_set(relation_id=relid, - relation_settings={'ceph-public-address': - public_addr}) + relation_settings={'ceph-public-address': public_addr}) @hooks.hook('mon-relation-departed', @@ -250,7 +442,7 @@ def notify_client(): def upgrade_keys(): - ''' Ceph now required mon allow rw for pool creation ''' + """ Ceph now required mon allow rw for pool creation """ if len(relation_ids('radosgw')) > 0: ceph.upgrade_key_caps('client.radosgw.gateway', ceph._radosgw_caps) @@ -272,6 +464,8 @@ def osd_relation(relid=None): 'osd_bootstrap_key': ceph.get_osd_bootstrap_key(), 'auth': config('auth-supported'), 'ceph-public-address': public_addr, + 'osd_upgrade_key': ceph.get_named_key('osd-upgrade', + caps=ceph.osd_upgrade_caps), } relation_set(relation_id=relid, relation_settings=data) @@ -430,6 +624,9 @@ def assess_status(): # Unit should be running and clustered, but no quorum # TODO: should this be blocked or waiting? status_set('blocked', 'Unit not clustered (no quorum)') + # If there's a pending lock for this unit, + # can i get the lock? + # reboot the ceph-mon process if __name__ == '__main__': diff --git a/hooks/charmhelpers/contrib/storage/linux/ceph.py b/hooks/charmhelpers/contrib/storage/linux/ceph.py index fb1bee3..2903670 100644 --- a/hooks/charmhelpers/contrib/storage/linux/ceph.py +++ b/hooks/charmhelpers/contrib/storage/linux/ceph.py @@ -24,6 +24,8 @@ # Adam Gandelman # import bisect +import errno +import hashlib import six import os @@ -163,7 +165,7 @@ class Pool(object): :return: None """ # read-only is easy, writeback is much harder - mode = get_cache_mode(cache_pool) + mode = get_cache_mode(self.service, cache_pool) if mode == 'readonly': check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none']) check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool]) @@ -259,6 +261,133 @@ class ErasurePool(Pool): Returns json formatted output""" +def get_mon_map(service): + """ + Returns the current monitor map. + :param service: six.string_types. The Ceph user name to run the command under + :return: json string. :raise: ValueError if the monmap fails to parse. + Also raises CalledProcessError if our ceph command fails + """ + try: + mon_status = check_output( + ['ceph', '--id', service, + 'mon_status', '--format=json']) + try: + return json.loads(mon_status) + except ValueError as v: + log("Unable to parse mon_status json: {}. Error: {}".format( + mon_status, v.message)) + raise + except CalledProcessError as e: + log("mon_status command failed with message: {}".format( + e.message)) + raise + +def hash_monitor_names(service): + """ + Uses the get_mon_map() function to get information about the monitor + cluster. + Hash the name of each monitor. Return a sorted list of monitor hashes + in an ascending order. + :param service: six.string_types. The Ceph user name to run the command under + :rtype : dict. json dict of monitor name, ip address and rank + example: { + 'name': 'ip-172-31-13-165', + 'rank': 0, + 'addr': '172.31.13.165:6789/0'} + """ + try: + hash_list = [] + monitor_list = get_mon_map(service=service) + if monitor_list['monmap']['mons']: + for mon in monitor_list['monmap']['mons']: + hash_list.append( + hashlib.sha224(mon['name'].encode('utf-8')).hexdigest()) + return sorted(hash_list) + else: + return None + except (ValueError, CalledProcessError): + raise + + +def monitor_key_delete(service, key): + """ + Delete a key and value pair from the monitor cluster + :param service: six.string_types. The Ceph user name to run the command under + Deletes a key value pair on the monitor cluster. + :param key: six.string_types. The key to delete. + """ + try: + check_output( + ['ceph', '--id', service, + 'config-key', 'del', str(key)]) + except CalledProcessError as e: + log("Monitor config-key put failed with message: {}".format( + e.output)) + raise + + +def monitor_key_set(service, key, value): + """ + Sets a key value pair on the monitor cluster. + :param service: six.string_types. The Ceph user name to run the command under + :param key: six.string_types. The key to set. + :param value: The value to set. This will be converted to a string + before setting + """ + try: + check_output( + ['ceph', '--id', service, + 'config-key', 'put', str(key), str(value)]) + except CalledProcessError as e: + log("Monitor config-key put failed with message: {}".format( + e.output)) + raise + + +def monitor_key_get(service, key): + """ + Gets the value of an existing key in the monitor cluster. + :param service: six.string_types. The Ceph user name to run the command under + :param key: six.string_types. The key to search for. + :return: Returns the value of that key or None if not found. + """ + try: + output = check_output( + ['ceph', '--id', service, + 'config-key', 'get', str(key)]) + return output + except CalledProcessError as e: + log("Monitor config-key get failed with message: {}".format( + e.output)) + return None + + +def monitor_key_exists(service, key): + """ + Searches for the existence of a key in the monitor cluster. + :param service: six.string_types. The Ceph user name to run the command under + :param key: six.string_types. The key to search for + :return: Returns True if the key exists, False if not and raises an + exception if an unknown error occurs. :raise: CalledProcessError if + an unknown error occurs + """ + try: + check_call( + ['ceph', '--id', service, + 'config-key', 'exists', str(key)]) + # I can return true here regardless because Ceph returns + # ENOENT if the key wasn't found + return True + except CalledProcessError as e: + if e.returncode == errno.ENOENT: + return False + else: + log("Unknown error from ceph config-get exists: {} {}".format( + e.returncode, e.output)) + raise + + def get_erasure_profile(service, name): """ :param service: six.string_types. The Ceph user name to run the command under diff --git a/unit_tests/test_upgrade_roll.py b/unit_tests/test_upgrade_roll.py new file mode 100644 index 0000000..8af24ac --- /dev/null +++ b/unit_tests/test_upgrade_roll.py @@ -0,0 +1,136 @@ +__author__ = 'chris' +import time + +from mock import patch, call, MagicMock +import sys + +sys.path.append('/home/chris/repos/ceph-mon/hooks') + +import test_utils +import ceph_hooks + +TO_PATCH = [ + 'hookenv', + 'status_set', + 'config', + 'ceph', + 'log', + 'add_source', + 'apt_update', + 'apt_install', + 'service_stop', + 'service_start', + 'host', +] + + +def config_side_effect(*args): + if args[0] == 'source': + return 'cloud:trusty-kilo' + elif args[0] == 'key': + return 'key' + elif args[0] == 'release-version': + return 'cloud:trusty-kilo' + + +previous_node_start_time = time.time() - (9 * 60) + + +def monitor_key_side_effect(*args): + if args[1] == \ + 'ip-192-168-1-2_done': + return False + elif args[1] == \ + 'ip-192-168-1-2_start': + # Return that the previous node started 9 minutes ago + return previous_node_start_time + + +class UpgradeRollingTestCase(test_utils.CharmTestCase): + def setUp(self): + super(UpgradeRollingTestCase, self).setUp(ceph_hooks, TO_PATCH) + + @patch('ceph_hooks.roll_monitor_cluster') + def test_check_for_upgrade(self, roll_monitor_cluster): + self.host.lsb_release.return_value = { + 'DISTRIB_CODENAME': 'trusty', + } + previous_mock = MagicMock().return_value + previous_mock.previous.return_value = "cloud:trusty-juno" + self.hookenv.config.side_effect = [previous_mock, + config_side_effect('source')] + ceph_hooks.check_for_upgrade() + + roll_monitor_cluster.assert_called_with('cloud:trusty-kilo') + + @patch('ceph_hooks.upgrade_monitor') + @patch('ceph_hooks.monitor_key_set') + def test_lock_and_roll(self, monitor_key_set, upgrade_monitor): + monitor_key_set.monitor_key_set.return_value = None + ceph_hooks.lock_and_roll(my_name='ip-192-168-1-2') + upgrade_monitor.assert_called_once_with() + + def test_upgrade_monitor(self): + self.config.side_effect = config_side_effect + self.ceph.get_version.return_value = "0.80" + self.ceph.systemd.return_value = False + ceph_hooks.upgrade_monitor() + self.service_stop.assert_called_with('ceph-mon-all') + self.service_start.assert_called_with('ceph-mon-all') + self.status_set.assert_has_calls([ + call('maintenance', 'Upgrading monitor'), + call('active', '') + ]) + + @patch('ceph_hooks.lock_and_roll') + @patch('ceph_hooks.wait_on_previous_node') + @patch('ceph_hooks.get_mon_map') + @patch('ceph_hooks.socket') + def test_roll_monitor_cluster_second(self, + socket, + get_mon_map, + wait_on_previous_node, + lock_and_roll): + wait_on_previous_node.return_value = None + socket.gethostname.return_value = "ip-192-168-1-3" + get_mon_map.return_value = { + 'monmap': { + 'mons': [ + { + 'name': 'ip-192-168-1-2', + }, + { + 'name': 'ip-192-168-1-3', + }, + ] + } + } + ceph_hooks.roll_monitor_cluster('0.94.1') + self.status_set.assert_called_with( + 'blocked', + 'Waiting on ip-192-168-1-2 to finish upgrading') + lock_and_roll.assert_called_with(my_name="ip-192-168-1-3") + + @patch('ceph_hooks.monitor_key_get') + @patch('ceph_hooks.monitor_key_exists') + def test_wait_on_previous_node(self, + monitor_key_exists, + monitor_key_get): + monitor_key_get.side_effect = monitor_key_side_effect + monitor_key_exists.return_value = False + + ceph_hooks.wait_on_previous_node("ip-192-168-1-2") + + # Make sure we checked to see if the previous node started + monitor_key_get.assert_has_calls( + [call('admin', 'ip-192-168-1-2_start')] + ) + # Make sure we checked to see if the previous node was finished + monitor_key_exists.assert_has_calls( + [call('admin', 'ip-192-168-1-2_done')] + ) + # Make sure we waited at last once before proceeding + self.log.assert_has_calls( + [call('Previous node is: ip-192-168-1-2')], + [call('ip-192-168-1-2 is not finished. Waiting')], + )