Do not install NTP when installed in a container

Use determine_packages() to avoid installing NTP when in a container.
Sync charms.ceph to get ceph.determine_packages().

Change-Id: Ia00af86964d8f77e615367cbcde35a4d7d10774c
Partial-Bug: #1690513
This commit is contained in:
David Ames 2017-05-16 11:22:55 -07:00
parent de03f77b89
commit 04b8f84927
5 changed files with 604 additions and 132 deletions

View File

@ -154,7 +154,7 @@ def install():
execd_preinstall() execd_preinstall()
add_source(config('source'), config('key')) add_source(config('source'), config('key'))
apt_update(fatal=True) apt_update(fatal=True)
apt_install(packages=ceph.PACKAGES, fatal=True) apt_install(packages=ceph.determine_packages(), fatal=True)
def az_info(): def az_info():
@ -544,7 +544,8 @@ def client_relation_changed():
@harden() @harden()
def upgrade_charm(): def upgrade_charm():
emit_cephconf() emit_cephconf()
apt_install(packages=filter_installed_packages(ceph.PACKAGES), fatal=True) apt_install(packages=filter_installed_packages(ceph.determine_packages()),
fatal=True)
ceph.update_monfs() ceph.update_monfs()
mon_relation_joined() mon_relation_joined()
if is_relation_made("nrpe-external-master"): if is_relation_made("nrpe-external-master"):

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from _ctypes import POINTER, byref
import ctypes import ctypes
import collections import collections
import json import json
@ -26,25 +25,32 @@ import errno
import shutil import shutil
import pyudev import pyudev
from datetime import datetime
from charmhelpers.core import hookenv from charmhelpers.core import hookenv
from charmhelpers.core import templating
from charmhelpers.core.host import ( from charmhelpers.core.host import (
mkdir,
chownr, chownr,
service_restart,
lsb_release,
cmp_pkgrevno, cmp_pkgrevno,
service_stop, lsb_release,
mkdir,
mounts, mounts,
owner,
service_restart,
service_start, service_start,
service_stop,
CompareHostReleases, CompareHostReleases,
is_container,
) )
from charmhelpers.core.hookenv import ( from charmhelpers.core.hookenv import (
log,
ERROR,
cached, cached,
config,
log,
status_set, status_set,
WARNING, DEBUG, config) DEBUG,
from charmhelpers.core.services import render_template ERROR,
WARNING,
)
from charmhelpers.fetch import ( from charmhelpers.fetch import (
apt_cache, apt_cache,
add_source, apt_install, apt_update) add_source, apt_install, apt_update)
@ -52,13 +58,22 @@ from charmhelpers.contrib.storage.linux.ceph import (
monitor_key_set, monitor_key_set,
monitor_key_exists, monitor_key_exists,
monitor_key_get, monitor_key_get,
get_mon_map) get_mon_map,
)
from charmhelpers.contrib.storage.linux.utils import ( from charmhelpers.contrib.storage.linux.utils import (
is_block_device, is_block_device,
zap_disk, zap_disk,
is_device_mounted) is_device_mounted,
)
from charmhelpers.contrib.openstack.utils import ( from charmhelpers.contrib.openstack.utils import (
get_os_codename_install_source) get_os_codename_install_source,
)
from ceph.ceph_helpers import check_output
CEPH_BASE_DIR = os.path.join(os.sep, 'var', 'lib', 'ceph')
OSD_BASE_DIR = os.path.join(CEPH_BASE_DIR, 'osd')
HDPARM_FILE = os.path.join(os.sep, 'etc', 'hdparm.conf')
LEADER = 'leader' LEADER = 'leader'
PEON = 'peon' PEON = 'peon'
@ -109,6 +124,42 @@ NETWORK_ADAPTER_SYSCTLS = {
} }
class Partition(object):
def __init__(self, name, number, size, start, end, sectors, uuid):
"""
A block device partition
:param name: Name of block device
:param number: Partition number
:param size: Capacity of the device
:param start: Starting block
:param end: Ending block
:param sectors: Number of blocks
:param uuid: UUID of the partition
"""
self.name = name,
self.number = number
self.size = size
self.start = start
self.end = end
self.sectors = sectors
self.uuid = uuid
def __str__(self):
return "number: {} start: {} end: {} sectors: {} size: {} " \
"name: {} uuid: {}".format(self.number, self.start,
self.end,
self.sectors, self.size,
self.name, self.uuid)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.__dict__ == other.__dict__
return False
def __ne__(self, other):
return not self.__eq__(other)
def unmounted_disks(): def unmounted_disks():
"""List of unmounted block devices on the current host.""" """List of unmounted block devices on the current host."""
disks = [] disks = []
@ -173,7 +224,7 @@ def tune_nic(network_interface):
try: try:
# Apply the settings # Apply the settings
log("Applying sysctl settings", level=DEBUG) log("Applying sysctl settings", level=DEBUG)
subprocess.check_output(["sysctl", "-p", sysctl_file]) check_output(["sysctl", "-p", sysctl_file])
except subprocess.CalledProcessError as err: except subprocess.CalledProcessError as err:
log('sysctl -p {} failed with error {}'.format(sysctl_file, log('sysctl -p {} failed with error {}'.format(sysctl_file,
err.output), err.output),
@ -224,14 +275,21 @@ def persist_settings(settings_dict):
The settings_dict should be in the form of {"uuid": {"key":"value"}} The settings_dict should be in the form of {"uuid": {"key":"value"}}
:param settings_dict: dict of settings to save :param settings_dict: dict of settings to save
""" """
hdparm_path = os.path.join(os.sep, 'etc', 'hdparm.conf') if not settings_dict:
return
try: try:
with open(hdparm_path, 'w') as hdparm: templating.render(source='hdparm.conf', target=HDPARM_FILE,
hdparm.write(render_template('hdparm.conf', settings_dict)) context=settings_dict)
except IOError as err: except IOError as err:
log("Unable to open {path} because of error: {error}".format( log("Unable to open {path} because of error: {error}".format(
path=hdparm_path, path=HDPARM_FILE, error=err.message), level=ERROR)
error=err.message), level=ERROR) except Exception as e:
# The templating.render can raise a jinja2 exception if the
# template is not found. Rather than polluting the import
# space of this charm, simply catch Exception
log('Unable to render {path} due to error: {error}'.format(
path=HDPARM_FILE, error=e.message), level=ERROR)
def set_max_sectors_kb(dev_name, max_sectors_size): def set_max_sectors_kb(dev_name, max_sectors_size):
@ -305,7 +363,7 @@ def set_hdd_read_ahead(dev_name, read_ahead_sectors=256):
log('Setting read ahead to {} for device {}'.format( log('Setting read ahead to {} for device {}'.format(
read_ahead_sectors, read_ahead_sectors,
dev_name)) dev_name))
subprocess.check_output(['hdparm', check_output(['hdparm',
'-a{}'.format(read_ahead_sectors), '-a{}'.format(read_ahead_sectors),
dev_name]) dev_name])
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
@ -315,52 +373,22 @@ def set_hdd_read_ahead(dev_name, read_ahead_sectors=256):
def get_block_uuid(block_dev): def get_block_uuid(block_dev):
""" """
This queries blkid to get the uuid for a block device. Note: This function This queries blkid to get the uuid for a block device.
needs to be called with root priv. It will raise an error otherwise.
:param block_dev: Name of the block device to query. :param block_dev: Name of the block device to query.
:return: The UUID of the device or None on Error. Raises OSError :return: The UUID of the device or None on Error.
""" """
try: try:
blkid = ctypes.cdll.LoadLibrary("libblkid.so") block_info = check_output(
# Header signature ['blkid', '-o', 'export', block_dev])
# extern int blkid_probe_lookup_value(blkid_probe pr, const char *name, for tag in block_info.split('\n'):
# const char **data, size_t *len); parts = tag.split('=')
blkid.blkid_new_probe_from_filename.argtypes = [ctypes.c_char_p] if parts[0] == 'UUID':
blkid.blkid_probe_lookup_value.argtypes = [ctypes.c_void_p, return parts[1]
ctypes.c_char_p,
POINTER(ctypes.c_char_p),
POINTER(ctypes.c_ulong)]
except OSError as err:
log('get_block_uuid loading libblkid.so failed with error: {}'.format(
os.strerror(err.errno)),
level=ERROR)
raise err
if not os.path.exists(block_dev):
return None return None
probe = blkid.blkid_new_probe_from_filename(ctypes.c_char_p(block_dev)) except subprocess.CalledProcessError as err:
if probe < 0: log('get_block_uuid failed with error: {}'.format(err.output),
log('get_block_uuid new_probe_from_filename failed: {}'.format(
os.strerror(probe)),
level=ERROR) level=ERROR)
raise OSError(probe, os.strerror(probe)) return None
result = blkid.blkid_do_probe(probe)
if result != 0:
log('get_block_uuid do_probe failed with error: {}'.format(
os.strerror(result)),
level=ERROR)
raise OSError(result, os.strerror(result))
uuid = ctypes.c_char_p()
result = blkid.blkid_probe_lookup_value(probe,
ctypes.c_char_p(
'UUID'.encode('ascii')),
byref(uuid), None)
if result < 0:
log('get_block_uuid lookup_value failed with error: {}'.format(
os.strerror(result)),
level=ERROR)
raise OSError(result, os.strerror(result))
blkid.blkid_free_probe(probe)
return ctypes.string_at(uuid).decode('ascii')
def check_max_sectors(save_settings_dict, def check_max_sectors(save_settings_dict,
@ -499,7 +527,7 @@ def get_osd_weight(osd_id):
Also raises CalledProcessError if our ceph command fails Also raises CalledProcessError if our ceph command fails
""" """
try: try:
tree = subprocess.check_output( tree = check_output(
['ceph', 'osd', 'tree', '--format=json']) ['ceph', 'osd', 'tree', '--format=json'])
try: try:
json_tree = json.loads(tree) json_tree = json.loads(tree)
@ -526,7 +554,7 @@ def get_osd_tree(service):
Also raises CalledProcessError if our ceph command fails Also raises CalledProcessError if our ceph command fails
""" """
try: try:
tree = subprocess.check_output( tree = check_output(
['ceph', '--id', service, ['ceph', '--id', service,
'osd', 'tree', '--format=json']) 'osd', 'tree', '--format=json'])
try: try:
@ -561,6 +589,43 @@ def get_osd_tree(service):
raise raise
def _get_child_dirs(path):
"""Returns a list of directory names in the specified path.
:param path: a full path listing of the parent directory to return child
directory names
:return: list. A list of child directories under the parent directory
:raises: ValueError if the specified path does not exist or is not a
directory,
OSError if an error occurs reading the directory listing
"""
if not os.path.exists(path):
raise ValueError('Specfied path "%s" does not exist' % path)
if not os.path.isdir(path):
raise ValueError('Specified path "%s" is not a directory' % path)
files_in_dir = [os.path.join(path, f) for f in os.listdir(path)]
return list(filter(os.path.isdir, files_in_dir))
def _get_osd_num_from_dirname(dirname):
"""Parses the dirname and returns the OSD id.
Parses a string in the form of 'ceph-{osd#}' and returns the osd number
from the directory name.
:param dirname: the directory name to return the OSD number from
:return int: the osd number the directory name corresponds to
:raises ValueError: if the osd number cannot be parsed from the provided
directory name.
"""
match = re.search('ceph-(?P<osd_id>\d+)', dirname)
if not match:
raise ValueError("dirname not in correct format: %s" % dirname)
return match.group('osd_id')
def get_local_osd_ids(): def get_local_osd_ids():
""" """
This will list the /var/lib/ceph/osd/* directories and try This will list the /var/lib/ceph/osd/* directories and try
@ -666,7 +731,7 @@ def is_quorum():
] ]
if os.path.exists(asok): if os.path.exists(asok):
try: try:
result = json.loads(subprocess.check_output(cmd)) result = json.loads(check_output(cmd))
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
return False return False
except ValueError: except ValueError:
@ -693,7 +758,7 @@ def is_leader():
] ]
if os.path.exists(asok): if os.path.exists(asok):
try: try:
result = json.loads(subprocess.check_output(cmd)) result = json.loads(check_output(cmd))
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
return False return False
except ValueError: except ValueError:
@ -737,9 +802,12 @@ DISK_FORMATS = [
] ]
CEPH_PARTITIONS = [ CEPH_PARTITIONS = [
'89C57F98-2FE5-4DC0-89C1-5EC00CEFF2BE', # ceph encrypted disk in creation
'45B0969E-9B03-4F30-B4C6-5EC00CEFF106', # ceph encrypted journal
'4FBD7E29-9D25-41B8-AFD0-5EC00CEFF05D', # ceph encrypted osd data '4FBD7E29-9D25-41B8-AFD0-5EC00CEFF05D', # ceph encrypted osd data
'4FBD7E29-9D25-41B8-AFD0-062C0CEFF05D', # ceph osd data '4FBD7E29-9D25-41B8-AFD0-062C0CEFF05D', # ceph osd data
'45B0969E-9B03-4F30-B4C6-B4B80CEFF106', # ceph osd journal '45B0969E-9B03-4F30-B4C6-B4B80CEFF106', # ceph osd journal
'89C57F98-2FE5-4DC0-89C1-F3AD0CEFF2BE', # ceph disk in creation
] ]
@ -800,7 +868,7 @@ def replace_osd(dead_osd_number,
# Drop this osd out of the cluster. This will begin a # Drop this osd out of the cluster. This will begin a
# rebalance operation # rebalance operation
status_set('maintenance', 'Removing osd {}'.format(dead_osd_number)) status_set('maintenance', 'Removing osd {}'.format(dead_osd_number))
subprocess.check_output([ check_output([
'ceph', 'ceph',
'--id', '--id',
'osd-upgrade', 'osd-upgrade',
@ -811,8 +879,8 @@ def replace_osd(dead_osd_number,
if systemd(): if systemd():
service_stop('ceph-osd@{}'.format(dead_osd_number)) service_stop('ceph-osd@{}'.format(dead_osd_number))
else: else:
subprocess.check_output(['stop', 'ceph-osd', 'id={}'.format( check_output(['stop', 'ceph-osd', 'id={}'.format(
dead_osd_number)]), dead_osd_number)])
# umount if still mounted # umount if still mounted
ret = umount(mount_point) ret = umount(mount_point)
if ret < 0: if ret < 0:
@ -820,20 +888,20 @@ def replace_osd(dead_osd_number,
mount_point, os.strerror(ret))) mount_point, os.strerror(ret)))
# Clean up the old mount point # Clean up the old mount point
shutil.rmtree(mount_point) shutil.rmtree(mount_point)
subprocess.check_output([ check_output([
'ceph', 'ceph',
'--id', '--id',
'osd-upgrade', 'osd-upgrade',
'osd', 'crush', 'remove', 'osd', 'crush', 'remove',
'osd.{}'.format(dead_osd_number)]) 'osd.{}'.format(dead_osd_number)])
# Revoke the OSDs access keys # Revoke the OSDs access keys
subprocess.check_output([ check_output([
'ceph', 'ceph',
'--id', '--id',
'osd-upgrade', 'osd-upgrade',
'auth', 'del', 'auth', 'del',
'osd.{}'.format(dead_osd_number)]) 'osd.{}'.format(dead_osd_number)])
subprocess.check_output([ check_output([
'ceph', 'ceph',
'--id', '--id',
'osd-upgrade', 'osd-upgrade',
@ -850,17 +918,48 @@ def replace_osd(dead_osd_number,
log('replace_osd failed with error: ' + e.output) log('replace_osd failed with error: ' + e.output)
def is_osd_disk(dev): def get_partition_list(dev):
"""
Lists the partitions of a block device
:param dev: Path to a block device. ex: /dev/sda
:return: :raise: Returns a list of Partition objects.
Raises CalledProcessException if lsblk fails
"""
partitions_list = []
try: try:
info = subprocess.check_output(['sgdisk', '-i', '1', dev]) partitions = get_partitions(dev)
# For each line of output
for partition in partitions:
parts = partition.split()
partitions_list.append(
Partition(number=parts[0],
start=parts[1],
end=parts[2],
sectors=parts[3],
size=parts[4],
name=parts[5],
uuid=parts[6])
)
return partitions_list
except subprocess.CalledProcessError:
raise
def is_osd_disk(dev):
partitions = get_partition_list(dev)
for partition in partitions:
try:
info = check_output(['sgdisk', '-i', partition.number, dev])
info = info.split("\n") # IGNORE:E1103 info = info.split("\n") # IGNORE:E1103
for line in info: for line in info:
for ptype in CEPH_PARTITIONS: for ptype in CEPH_PARTITIONS:
sig = 'Partition GUID code: {}'.format(ptype) sig = 'Partition GUID code: {}'.format(ptype)
if line.startswith(sig): if line.startswith(sig):
return True return True
except subprocess.CalledProcessError: except subprocess.CalledProcessError as e:
pass log("sgdisk inspection of partition {} on {} failed with "
"error: {}. Skipping".format(partition.minor, dev, e.message),
level=ERROR)
return False return False
@ -933,7 +1032,7 @@ def generate_monitor_secret():
'--name=mon.', '--name=mon.',
'--gen-key' '--gen-key'
] ]
res = subprocess.check_output(cmd) res = check_output(cmd)
return "{}==".format(res.split('=')[1].strip()) return "{}==".format(res.split('=')[1].strip())
@ -1081,8 +1180,8 @@ def create_named_keyring(entity, name, caps=None):
] ]
for subsystem, subcaps in caps.items(): for subsystem, subcaps in caps.items():
cmd.extend([subsystem, '; '.join(subcaps)]) cmd.extend([subsystem, '; '.join(subcaps)])
log("Calling subprocess.check_output: {}".format(cmd), level=DEBUG) log("Calling check_output: {}".format(cmd), level=DEBUG)
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103 return parse_key(check_output(cmd).strip()) # IGNORE:E1103
def get_upgrade_key(): def get_upgrade_key():
@ -1099,7 +1198,7 @@ def get_named_key(name, caps=None, pool_list=None):
""" """
try: try:
# Does the key already exist? # Does the key already exist?
output = subprocess.check_output( output = check_output(
[ [
'sudo', 'sudo',
'-u', ceph_user(), '-u', ceph_user(),
@ -1139,8 +1238,8 @@ def get_named_key(name, caps=None, pool_list=None):
pools = " ".join(['pool={0}'.format(i) for i in pool_list]) pools = " ".join(['pool={0}'.format(i) for i in pool_list])
subcaps[0] = subcaps[0] + " " + pools subcaps[0] = subcaps[0] + " " + pools
cmd.extend([subsystem, '; '.join(subcaps)]) cmd.extend([subsystem, '; '.join(subcaps)])
log("Calling subprocess.check_output: {}".format(cmd), level=DEBUG) log("Calling check_output: {}".format(cmd), level=DEBUG)
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103 return parse_key(check_output(cmd).strip()) # IGNORE:E1103
def upgrade_key_caps(key, caps): def upgrade_key_caps(key, caps):
@ -1158,7 +1257,7 @@ def upgrade_key_caps(key, caps):
@cached @cached
def systemd(): def systemd():
return (CompareHostReleases(lsb_release()['DISTRIB_CODENAME']) >= 'vivid') return CompareHostReleases(lsb_release()['DISTRIB_CODENAME']) >= 'vivid'
def bootstrap_monitor_cluster(secret): def bootstrap_monitor_cluster(secret):
@ -1232,7 +1331,7 @@ def maybe_zap_journal(journal_dev):
def get_partitions(dev): def get_partitions(dev):
cmd = ['partx', '--raw', '--noheadings', dev] cmd = ['partx', '--raw', '--noheadings', dev]
try: try:
out = subprocess.check_output(cmd).splitlines() out = check_output(cmd).splitlines()
log("get partitions: {}".format(out), level=DEBUG) log("get partitions: {}".format(out), level=DEBUG)
return out return out
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
@ -1316,7 +1415,7 @@ def osdize_dir(path, encrypt=False):
if cmp_pkgrevno('ceph', "0.56.6") < 0: if cmp_pkgrevno('ceph', "0.56.6") < 0:
log('Unable to use directories for OSDs with ceph < 0.56.6', log('Unable to use directories for OSDs with ceph < 0.56.6',
level=ERROR) level=ERROR)
raise return
mkdir(path, owner=ceph_user(), group=ceph_user(), perms=0o755) mkdir(path, owner=ceph_user(), group=ceph_user(), perms=0o755)
chownr('/var/lib/ceph', ceph_user(), ceph_user()) chownr('/var/lib/ceph', ceph_user(), ceph_user())
@ -1342,7 +1441,7 @@ def get_running_osds():
"""Returns a list of the pids of the current running OSD daemons""" """Returns a list of the pids of the current running OSD daemons"""
cmd = ['pgrep', 'ceph-osd'] cmd = ['pgrep', 'ceph-osd']
try: try:
result = subprocess.check_output(cmd) result = check_output(cmd)
return result.split() return result.split()
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
return [] return []
@ -1358,7 +1457,7 @@ def get_cephfs(service):
# This command wasn't introduced until 0.86 ceph # This command wasn't introduced until 0.86 ceph
return [] return []
try: try:
output = subprocess.check_output(["ceph", output = check_output(["ceph",
'--id', service, '--id', service,
"fs", "ls"]) "fs", "ls"])
if not output: if not output:
@ -1485,7 +1584,7 @@ def upgrade_monitor(new_version):
service_stop('ceph-mon@{}'.format(mon_id)) service_stop('ceph-mon@{}'.format(mon_id))
else: else:
service_stop('ceph-mon-all') service_stop('ceph-mon-all')
apt_install(packages=PACKAGES, fatal=True) apt_install(packages=determine_packages(), fatal=True)
# Ensure the files and directories under /var/lib/ceph is chowned # Ensure the files and directories under /var/lib/ceph is chowned
# properly as part of the move to the Jewel release, which moved the # properly as part of the move to the Jewel release, which moved the
@ -1538,6 +1637,7 @@ def lock_and_roll(upgrade_key, service, my_name, version):
my_name, my_name,
version, version,
stop_timestamp)) stop_timestamp))
status_set('maintenance', 'Finishing upgrade')
monitor_key_set(upgrade_key, "{}_{}_{}_done".format(service, monitor_key_set(upgrade_key, "{}_{}_{}_done".format(service,
my_name, my_name,
version), version),
@ -1660,42 +1760,198 @@ def upgrade_osd(new_version):
add_source(config('source'), config('key')) add_source(config('source'), config('key'))
apt_update(fatal=True) apt_update(fatal=True)
except subprocess.CalledProcessError as err: except subprocess.CalledProcessError as err:
log("Adding the ceph source failed with message: {}".format( log("Adding the ceph sources failed with message: {}".format(
err.message)) err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version)) status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1) sys.exit(1)
try: try:
if systemd(): # Upgrade the packages before restarting the daemons.
for osd_id in get_local_osd_ids(): status_set('maintenance', 'Upgrading packages to %s' % new_version)
service_stop('ceph-osd@{}'.format(osd_id)) apt_install(packages=determine_packages(), fatal=True)
else:
service_stop('ceph-osd-all')
apt_install(packages=PACKAGES, fatal=True)
# Ensure the files and directories under /var/lib/ceph is chowned # If the upgrade does not need an ownership update of any of the
# properly as part of the move to the Jewel release, which moved the # directories in the osd service directory, then simply restart
# ceph daemons to running as ceph:ceph instead of root:root. Only do # all of the OSDs at the same time as this will be the fastest
# it when necessary as this is an expensive operation to run. # way to update the code on the node.
if new_version == 'jewel': if not dirs_need_ownership_update('osd'):
owner = ceph_user() log('Restarting all OSDs to load new binaries', DEBUG)
status_set('maintenance', 'Updating file ownership for OSDs') service_restart('ceph-osd-all')
chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), return
owner=owner,
group=owner,
follow_links=True)
if systemd(): # Need to change the ownership of all directories which are not OSD
for osd_id in get_local_osd_ids(): # directories as well.
service_start('ceph-osd@{}'.format(osd_id)) # TODO - this should probably be moved to the general upgrade function
else: # and done before mon/osd.
service_start('ceph-osd-all') update_owner(CEPH_BASE_DIR, recurse_dirs=False)
except subprocess.CalledProcessError as err: non_osd_dirs = filter(lambda x: not x == 'osd',
os.listdir(CEPH_BASE_DIR))
non_osd_dirs = map(lambda x: os.path.join(CEPH_BASE_DIR, x),
non_osd_dirs)
for path in non_osd_dirs:
update_owner(path)
# Fast service restart wasn't an option because each of the OSD
# directories need the ownership updated for all the files on
# the OSD. Walk through the OSDs one-by-one upgrading the OSD.
for osd_dir in _get_child_dirs(OSD_BASE_DIR):
try:
osd_num = _get_osd_num_from_dirname(osd_dir)
_upgrade_single_osd(osd_num, osd_dir)
except ValueError as ex:
# Directory could not be parsed - junk directory?
log('Could not parse osd directory %s: %s' % (osd_dir, ex),
WARNING)
continue
except (subprocess.CalledProcessError, IOError) as err:
log("Stopping ceph and upgrading packages failed " log("Stopping ceph and upgrading packages failed "
"with message: {}".format(err.message)) "with message: {}".format(err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version)) status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1) sys.exit(1)
def _upgrade_single_osd(osd_num, osd_dir):
"""Upgrades the single OSD directory.
:param osd_num: the num of the OSD
:param osd_dir: the directory of the OSD to upgrade
:raises CalledProcessError: if an error occurs in a command issued as part
of the upgrade process
:raises IOError: if an error occurs reading/writing to a file as part
of the upgrade process
"""
stop_osd(osd_num)
disable_osd(osd_num)
update_owner(osd_dir)
enable_osd(osd_num)
start_osd(osd_num)
def stop_osd(osd_num):
"""Stops the specified OSD number.
:param osd_num: the osd number to stop
"""
if systemd():
service_stop('ceph-osd@{}'.format(osd_num))
else:
service_stop('ceph-osd', id=osd_num)
def start_osd(osd_num):
"""Starts the specified OSD number.
:param osd_num: the osd number to start.
"""
if systemd():
service_start('ceph-osd@{}'.format(osd_num))
else:
service_start('ceph-osd', id=osd_num)
def disable_osd(osd_num):
"""Disables the specified OSD number.
Ensures that the specified osd will not be automatically started at the
next reboot of the system. Due to differences between init systems,
this method cannot make any guarantees that the specified osd cannot be
started manually.
:param osd_num: the osd id which should be disabled.
:raises CalledProcessError: if an error occurs invoking the systemd cmd
to disable the OSD
:raises IOError, OSError: if the attempt to read/remove the ready file in
an upstart enabled system fails
"""
if systemd():
# When running under systemd, the individual ceph-osd daemons run as
# templated units and can be directly addressed by referring to the
# templated service name ceph-osd@<osd_num>. Additionally, systemd
# allows one to disable a specific templated unit by running the
# 'systemctl disable ceph-osd@<osd_num>' command. When disabled, the
# OSD should remain disabled until re-enabled via systemd.
# Note: disabling an already disabled service in systemd returns 0, so
# no need to check whether it is enabled or not.
cmd = ['systemctl', 'disable', 'ceph-osd@{}'.format(osd_num)]
subprocess.check_call(cmd)
else:
# Neither upstart nor the ceph-osd upstart script provides for
# disabling the starting of an OSD automatically. The specific OSD
# cannot be prevented from running manually, however it can be
# prevented from running automatically on reboot by removing the
# 'ready' file in the OSD's root directory. This is due to the
# ceph-osd-all upstart script checking for the presence of this file
# before starting the OSD.
ready_file = os.path.join(OSD_BASE_DIR, 'ceph-{}'.format(osd_num),
'ready')
if os.path.exists(ready_file):
os.unlink(ready_file)
def enable_osd(osd_num):
"""Enables the specified OSD number.
Ensures that the specified osd_num will be enabled and ready to start
automatically in the event of a reboot.
:param osd_num: the osd id which should be enabled.
:raises CalledProcessError: if the call to the systemd command issued
fails when enabling the service
:raises IOError: if the attempt to write the ready file in an usptart
enabled system fails
"""
if systemd():
cmd = ['systemctl', 'enable', 'ceph-osd@{}'.format(osd_num)]
subprocess.check_call(cmd)
else:
# When running on upstart, the OSDs are started via the ceph-osd-all
# upstart script which will only start the osd if it has a 'ready'
# file. Make sure that file exists.
ready_file = os.path.join(OSD_BASE_DIR, 'ceph-{}'.format(osd_num),
'ready')
with open(ready_file, 'w') as f:
f.write('ready')
# Make sure the correct user owns the file. It shouldn't be necessary
# as the upstart script should run with root privileges, but its better
# to have all the files matching ownership.
update_owner(ready_file)
def update_owner(path, recurse_dirs=True):
"""Changes the ownership of the specified path.
Changes the ownership of the specified path to the new ceph daemon user
using the system's native chown functionality. This may take awhile,
so this method will issue a set_status for any changes of ownership which
recurses into directory structures.
:param path: the path to recursively change ownership for
:param recurse_dirs: boolean indicating whether to recursively change the
ownership of all the files in a path's subtree or to
simply change the ownership of the path.
:raises CalledProcessError: if an error occurs issuing the chown system
command
"""
user = ceph_user()
user_group = '{ceph_user}:{ceph_user}'.format(ceph_user=user)
cmd = ['chown', user_group, path]
if os.path.isdir(path) and recurse_dirs:
status_set('maintenance', ('Updating ownership of %s to %s' %
(path, user)))
cmd.insert(1, '-R')
log('Changing ownership of {path} to {user}'.format(
path=path, user=user_group), DEBUG)
start = datetime.now()
subprocess.check_call(cmd)
elapsed_time = (datetime.now() - start)
log('Took {secs} seconds to change the ownership of path: {path}'.format(
secs=elapsed_time.total_seconds(), path=path), DEBUG)
def list_pools(service): def list_pools(service):
""" """
This will list the current pools that Ceph has This will list the current pools that Ceph has
@ -1706,7 +1962,7 @@ def list_pools(service):
""" """
try: try:
pool_list = [] pool_list = []
pools = subprocess.check_output(['rados', '--id', service, 'lspools']) pools = check_output(['rados', '--id', service, 'lspools'])
for pool in pools.splitlines(): for pool in pools.splitlines():
pool_list.append(pool) pool_list.append(pool)
return pool_list return pool_list
@ -1714,6 +1970,36 @@ def list_pools(service):
log("rados lspools failed with error: {}".format(err.output)) log("rados lspools failed with error: {}".format(err.output))
raise raise
def dirs_need_ownership_update(service):
"""Determines if directories still need change of ownership.
Examines the set of directories under the /var/lib/ceph/{service} directory
and determines if they have the correct ownership or not. This is
necessary due to the upgrade from Hammer to Jewel where the daemon user
changes from root: to ceph:.
:param service: the name of the service folder to check (e.g. osd, mon)
:return: boolean. True if the directories need a change of ownership,
False otherwise.
:raises IOError: if an error occurs reading the file stats from one of
the child directories.
:raises OSError: if the specified path does not exist or some other error
"""
expected_owner = expected_group = ceph_user()
path = os.path.join(CEPH_BASE_DIR, service)
for child in _get_child_dirs(path):
curr_owner, curr_group = owner(child)
if (curr_owner == expected_owner) and (curr_group == expected_group):
continue
log('Directory "%s" needs its ownership updated' % child, DEBUG)
return True
# All child directories had the expected ownership
return False
# A dict of valid ceph upgrade paths. Mapping is old -> new # A dict of valid ceph upgrade paths. Mapping is old -> new
UPGRADE_PATHS = { UPGRADE_PATHS = {
'firefly': 'hammer', 'firefly': 'hammer',
@ -1748,3 +2034,86 @@ def resolve_ceph_version(source):
''' '''
os_release = get_os_codename_install_source(source) os_release = get_os_codename_install_source(source)
return UCA_CODENAME_MAP.get(os_release) return UCA_CODENAME_MAP.get(os_release)
def get_ceph_pg_stat():
"""
Returns the result of ceph pg stat
:return: dict
"""
try:
tree = check_output(['ceph', 'pg', 'stat', '--format=json'])
try:
json_tree = json.loads(tree)
if not json_tree['num_pg_by_state']:
return None
return json_tree
except ValueError as v:
log("Unable to parse ceph pg stat json: {}. Error: {}".format(
tree, v.message))
raise
except subprocess.CalledProcessError as e:
log("ceph pg stat command failed with message: {}".format(
e.message))
raise
def get_ceph_health():
"""
Returns the health of the cluster from a 'ceph health'
:return: dict
Also raises CalledProcessError if our ceph command fails
To get the overall status, use get_ceph_health()['overall_status']
"""
try:
tree = check_output(
['ceph', 'health', '--format=json'])
try:
json_tree = json.loads(tree)
# Make sure children are present in the json
if not json_tree['overall_status']:
return None
return json_tree
except ValueError as v:
log("Unable to parse ceph tree json: {}. Error: {}".format(
tree, v.message))
raise
except subprocess.CalledProcessError as e:
log("ceph osd tree command failed with message: {}".format(
e.message))
raise
def reweight_osd(osd_num, new_weight):
"""
Changes the crush weight of an OSD to the value specified.
:param osd_num: the osd id which should be changed
:param new_weight: the new weight for the OSD
:returns: bool. True if output looks right, else false.
:raises CalledProcessError: if an error occurs invoking the systemd cmd
"""
try:
cmd_result = subprocess.check_output(
['ceph', 'osd', 'crush', 'reweight', "osd.{}".format(osd_num),
new_weight], stderr=subprocess.STDOUT)
expected_result = "reweighted item id {ID} name \'osd.{ID}\'".format(
ID=osd_num) + " to {}".format(new_weight)
log(cmd_result)
if expected_result in cmd_result:
return True
return False
except subprocess.CalledProcessError as e:
log("ceph osd tree command failed with message: {}".format(
e.message))
raise
def determine_packages():
'''
Determines packages for installation.
@returns: list of ceph packages
'''
if is_container():
PACKAGES.remove('ntp')
return PACKAGES

View File

@ -28,7 +28,7 @@ from ceph import (
get_cephfs, get_cephfs,
get_osd_weight get_osd_weight
) )
from ceph_helpers import Crushmap from ceph.ceph_helpers import Crushmap
from charmhelpers.contrib.storage.linux.ceph import ( from charmhelpers.contrib.storage.linux.ceph import (
create_erasure_profile, create_erasure_profile,
delete_pool, delete_pool,
@ -168,6 +168,8 @@ def handle_add_permissions_to_key(request, service):
A group can optionally have a namespace defined that will be used to A group can optionally have a namespace defined that will be used to
further restrict pool access. further restrict pool access.
""" """
resp = {'exit-code': 0}
service_name = request.get('name') service_name = request.get('name')
group_name = request.get('group') group_name = request.get('group')
group_namespace = request.get('group-namespace') group_namespace = request.get('group-namespace')
@ -190,6 +192,8 @@ def handle_add_permissions_to_key(request, service):
group_namespace) group_namespace)
update_service_permissions(service_name, service_obj, group_namespace) update_service_permissions(service_name, service_obj, group_namespace)
return resp
def update_service_permissions(service, service_obj=None, namespace=None): def update_service_permissions(service, service_obj=None, namespace=None):
"""Update the key permissions for the named client in Ceph""" """Update the key permissions for the named client in Ceph"""

View File

@ -36,7 +36,11 @@ import uuid
import re import re
import subprocess import subprocess
from subprocess import (check_call, check_output, CalledProcessError, ) from subprocess import (
check_call,
check_output as s_check_output,
CalledProcessError,
)
from charmhelpers.core.hookenv import (config, from charmhelpers.core.hookenv import (config,
local_unit, local_unit,
relation_get, relation_get,
@ -111,6 +115,15 @@ DEFAULT_POOL_WEIGHT = 10.0
LEGACY_PG_COUNT = 200 LEGACY_PG_COUNT = 200
def check_output(*args, **kwargs):
'''
Helper wrapper for py2/3 compat with subprocess.check_output
@returns str: UTF-8 decoded representation of output
'''
return s_check_output(*args, **kwargs).decode('UTF-8')
def validator(value, valid_type, valid_range=None): def validator(value, valid_type, valid_range=None):
""" """
Used to validate these: http://docs.ceph.com/docs/master/rados/operations/ Used to validate these: http://docs.ceph.com/docs/master/rados/operations/
@ -188,7 +201,7 @@ class Crushmap(object):
stdout=subprocess.PIPE) stdout=subprocess.PIPE)
return subprocess.check_output( return subprocess.check_output(
('crushtool', '-d', '-'), ('crushtool', '-d', '-'),
stdin=crush.stdout).decode('utf-8') stdin=crush.stdout)
except Exception as e: except Exception as e:
log("load_crushmap error: {}".format(e)) log("load_crushmap error: {}".format(e))
raise "Failed to read Crushmap" raise "Failed to read Crushmap"
@ -565,7 +578,8 @@ def monitor_key_delete(service, key):
:param key: six.string_types. The key to delete. :param key: six.string_types. The key to delete.
""" """
try: try:
check_output(['ceph', '--id', service, 'config-key', 'del', str(key)]) check_output(['ceph', '--id', service,
'config-key', 'del', str(key)])
except CalledProcessError as e: except CalledProcessError as e:
log("Monitor config-key put failed with message: {}".format(e.output)) log("Monitor config-key put failed with message: {}".format(e.output))
raise raise
@ -867,8 +881,7 @@ def get_cache_mode(service, pool_name):
def pool_exists(service, name): def pool_exists(service, name):
"""Check to see if a RADOS pool already exists.""" """Check to see if a RADOS pool already exists."""
try: try:
out = check_output(['rados', '--id', service, 'lspools']).decode( out = check_output(['rados', '--id', service, 'lspools'])
'UTF-8')
except CalledProcessError: except CalledProcessError:
return False return False
@ -882,7 +895,7 @@ def get_osds(service):
version = ceph_version() version = ceph_version()
if version and version >= '0.56': if version and version >= '0.56':
return json.loads(check_output(['ceph', '--id', service, 'osd', 'ls', return json.loads(check_output(['ceph', '--id', service, 'osd', 'ls',
'--format=json']).decode('UTF-8')) '--format=json']))
return None return None
@ -900,7 +913,7 @@ def rbd_exists(service, pool, rbd_img):
"""Check to see if a RADOS block device exists.""" """Check to see if a RADOS block device exists."""
try: try:
out = check_output(['rbd', 'list', '--id', service, '--pool', pool out = check_output(['rbd', 'list', '--id', service, '--pool', pool
]).decode('UTF-8') ])
except CalledProcessError: except CalledProcessError:
return False return False
@ -1025,7 +1038,7 @@ def configure(service, key, auth, use_syslog):
def image_mapped(name): def image_mapped(name):
"""Determine whether a RADOS block device is mapped locally.""" """Determine whether a RADOS block device is mapped locally."""
try: try:
out = check_output(['rbd', 'showmapped']).decode('UTF-8') out = check_output(['rbd', 'showmapped'])
except CalledProcessError: except CalledProcessError:
return False return False
@ -1212,7 +1225,7 @@ def ceph_version():
"""Retrieve the local version of ceph.""" """Retrieve the local version of ceph."""
if os.path.exists('/usr/bin/ceph'): if os.path.exists('/usr/bin/ceph'):
cmd = ['ceph', '-v'] cmd = ['ceph', '-v']
output = check_output(cmd).decode('US-ASCII') output = check_output(cmd)
output = output.split() output = output.split()
if len(output) > 3: if len(output) > 3:
return output[2] return output[2]

85
lib/setup.py Normal file
View File

@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
import os
import sys
from setuptools import setup, find_packages
from setuptools.command.test import test as TestCommand
version = "0.0.1.dev1"
install_require = [
]
tests_require = [
'tox >= 2.3.1',
]
class Tox(TestCommand):
user_options = [('tox-args=', 'a', "Arguments to pass to tox")]
def initialize_options(self):
TestCommand.initialize_options(self)
self.tox_args = None
def finalize_options(self):
TestCommand.finalize_options(self)
self.test_args = []
self.test_suite = True
def run_tests(self):
# import here, cause outside the eggs aren't loaded
import tox
import shlex
args = self.tox_args
# remove the 'test' arg from argv as tox passes it to ostestr which
# breaks it.
sys.argv.pop()
if args:
args = shlex.split(self.tox_args)
errno = tox.cmdline(args=args)
sys.exit(errno)
if sys.argv[-1] == 'publish':
os.system("python setup.py sdist upload")
os.system("python setup.py bdist_wheel upload")
sys.exit()
if sys.argv[-1] == 'tag':
os.system("git tag -a %s -m 'version %s'" % (version, version))
os.system("git push --tags")
sys.exit()
setup(
name='charms.ceph',
version=version,
description='Provide base module for ceph charms.',
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"Intended Audience :: Developers",
"Topic :: System",
"Topic :: System :: Installation/Setup",
"Topic :: System :: Software Distribution",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.5",
"License :: OSI Approved :: Apache Software License",
],
url='https://github.com/openstack/charms.ceph',
author='OpenStack Charmers',
author_email='openstack-dev@lists.openstack.org',
license='Apache-2.0: http://www.apache.org/licenses/LICENSE-2.0',
packages=find_packages(exclude=["unit_tests"]),
zip_safe=False,
cmdclass={'test': Tox},
install_requires=install_require,
extras_require={
'testing': tests_require,
},
tests_require=tests_require,
)