Merge "Bring ceph-osd to Python 3"

This commit is contained in:
Zuul 2017-11-21 14:59:03 +00:00 committed by Gerrit Code Review
commit 706d6470c3
44 changed files with 778 additions and 281 deletions

View File

@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/env python3
#
# Copyright 2016 Canonical Ltd
#

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
#
# Copyright 2017 Canonical Ltd
#

View File

@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/env python3
#
# Copyright 2016 Canonical Ltd
#

View File

@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/env python3
#
# Copyright 2016 Canonical Ltd
#

View File

@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/env python3
#
# Copyright 2016 Canonical Ltd
#
@ -33,7 +33,7 @@ point and then replace the OSD with a new one.
def get_disk_stats():
try:
# https://www.kernel.org/doc/Documentation/iostats.txt
with open('/proc/diskstats', 'r') as diskstats:
with open('/proc/diskstats', 'rt', encoding='UTF-8') as diskstats:
return diskstats.readlines()
except IOError as err:
hookenv.log('Could not open /proc/diskstats. Error: {}'

View File

@ -13,10 +13,13 @@ import nagios_plugin
def check_ceph_status(args):
if args.status_file:
nagios_plugin.check_file_freshness(args.status_file, 3600)
with open(args.status_file, "r") as f:
with open(args.status_file, "rt", encoding='UTF-8') as f:
lines = f.readlines()
else:
lines = subprocess.check_output(["ceph", "status"]).split('\n')
lines = (subprocess
.check_output(["ceph", "status"])
.decode('UTF-8')
.split('\n'))
status_data = dict(l.strip().split(' ', 1) for l in lines if len(l) > 1)
if ('health' not in status_data or
@ -42,7 +45,7 @@ def check_ceph_status(args):
msg = 'CRITICAL: Some OSDs are not up. Total: {}, up: {}'.format(
osds.group(1), osds.group(2))
raise nagios_plugin.CriticalError(msg)
print "All OK"
print("All OK")
if __name__ == '__main__':

View File

@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/env python3
#
# Copyright 2016 Canonical Ltd
#
@ -312,7 +312,7 @@ JOURNAL_ZAPPED = '/var/lib/ceph/journal_zapped'
def read_zapped_journals():
if os.path.exists(JOURNAL_ZAPPED):
with open(JOURNAL_ZAPPED) as zapfile:
with open(JOURNAL_ZAPPED, 'rt', encoding='UTF-8') as zapfile:
zapped = set(
filter(None,
[l.strip() for l in zapfile.readlines()]))
@ -326,7 +326,7 @@ def write_zapped_journals(journal_devs):
with os.fdopen(tmpfh, 'wb') as zapfile:
log("write zapped: {}".format(journal_devs),
level=DEBUG)
zapfile.write('\n'.join(sorted(list(journal_devs))))
zapfile.write('\n'.join(sorted(list(journal_devs))).encode('UTF-8'))
shutil.move(tmpfile, JOURNAL_ZAPPED)
@ -407,8 +407,7 @@ def get_mon_hosts():
if addr:
hosts.append('{}:6789'.format(format_ipv6_addr(addr) or addr))
hosts.sort()
return hosts
return sorted(hosts)
def get_fsid():
@ -454,9 +453,8 @@ def get_devices():
devices.extend((storage_get('location', s) for s in storage_ids))
# Filter out any devices in the action managed unit-local device blacklist
return filter(
lambda device: device not in get_blacklist(), devices
)
_blacklist = get_blacklist()
return [device for device in devices if device not in _blacklist]
def get_journal_devices():
@ -468,12 +466,9 @@ def get_journal_devices():
devices.extend((storage_get('location', s) for s in storage_ids))
# Filter out any devices in the action managed unit-local device blacklist
devices = filter(
lambda device: device not in get_blacklist(), devices
)
devices = filter(os.path.exists, devices)
return set(devices)
_blacklist = get_blacklist()
return set(device for device in devices
if device not in _blacklist and os.path.exists(device))
@hooks.hook('mon-relation-changed',
@ -504,7 +499,7 @@ def upgrade_charm():
'nrpe-external-master-relation-changed')
def update_nrpe_config():
# python-dbus is used by check_upstart_job
apt_install('python-dbus')
apt_install('python3-dbus')
hostname = nrpe.get_nagios_hostname()
current_unit = nrpe.get_nagios_unit_name()
nrpe_setup = nrpe.NRPE(hostname=hostname)

View File

@ -30,6 +30,7 @@ import yaml
from charmhelpers.core.hookenv import (
config,
hook_name,
local_unit,
log,
relation_ids,
@ -285,7 +286,7 @@ class NRPE(object):
try:
nagios_uid = pwd.getpwnam('nagios').pw_uid
nagios_gid = grp.getgrnam('nagios').gr_gid
except:
except Exception:
log("Nagios user not set up, nrpe checks not updated")
return
@ -302,7 +303,12 @@ class NRPE(object):
"command": nrpecheck.command,
}
service('restart', 'nagios-nrpe-server')
# update-status hooks are configured to firing every 5 minutes by
# default. When nagios-nrpe-server is restarted, the nagios server
# reports checks failing causing unneccessary alerts. Let's not restart
# on update-status hooks.
if not hook_name() == 'update-status':
service('restart', 'nagios-nrpe-server')
monitor_ids = relation_ids("local-monitors") + \
relation_ids("nrpe-external-master")

View File

@ -27,6 +27,7 @@ clustering-related helpers.
import subprocess
import os
import time
from socket import gethostname as get_unit_hostname
@ -45,6 +46,9 @@ from charmhelpers.core.hookenv import (
is_leader as juju_is_leader,
status_set,
)
from charmhelpers.core.host import (
modulo_distribution,
)
from charmhelpers.core.decorators import (
retry_on_exception,
)
@ -361,3 +365,29 @@ def canonical_url(configs, vip_setting='vip'):
else:
addr = unit_get('private-address')
return '%s://%s' % (scheme, addr)
def distributed_wait(modulo=None, wait=None, operation_name='operation'):
''' Distribute operations by waiting based on modulo_distribution
If modulo and or wait are not set, check config_get for those values.
:param modulo: int The modulo number creates the group distribution
:param wait: int The constant time wait value
:param operation_name: string Operation name for status message
i.e. 'restart'
:side effect: Calls config_get()
:side effect: Calls log()
:side effect: Calls status_set()
:side effect: Calls time.sleep()
'''
if modulo is None:
modulo = config_get('modulo-nodes')
if wait is None:
wait = config_get('known-wait')
calculated_wait = modulo_distribution(modulo=modulo, wait=wait)
msg = "Waiting {} seconds for {} ...".format(calculated_wait,
operation_name)
log(msg, DEBUG)
status_set('maintenance', msg)
time.sleep(calculated_wait)

View File

@ -70,12 +70,12 @@ class DisabledModuleAudit(BaseAudit):
"""Returns the modules which are enabled in Apache."""
output = subprocess.check_output(['apache2ctl', '-M'])
modules = []
for line in output.strip().split():
for line in output.splitlines():
# Each line of the enabled module output looks like:
# module_name (static|shared)
# Plus a header line at the top of the output which is stripped
# out by the regex.
matcher = re.search(r'^ (\S*)', line)
matcher = re.search(r'^ (\S*)_module (\S*)', line)
if matcher:
modules.append(matcher.group(1))
return modules

View File

@ -490,7 +490,7 @@ def get_host_ip(hostname, fallback=None):
if not ip_addr:
try:
ip_addr = socket.gethostbyname(hostname)
except:
except Exception:
log("Failed to resolve hostname '%s'" % (hostname),
level=WARNING)
return fallback
@ -518,7 +518,7 @@ def get_hostname(address, fqdn=True):
if not result:
try:
result = socket.gethostbyaddr(address)[0]
except:
except Exception:
return None
else:
result = address

View File

@ -29,3 +29,16 @@ def install_alternative(name, target, source, priority=50):
target, name, source, str(priority)
]
subprocess.check_call(cmd)
def remove_alternative(name, source):
"""Remove an installed alternative configuration file
:param name: string name of the alternative to remove
:param source: string full path to alternative to remove
"""
cmd = [
'update-alternatives', '--remove',
name, source
]
subprocess.check_call(cmd)

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import glob
import json
import math
@ -578,11 +579,14 @@ class HAProxyContext(OSContextGenerator):
laddr = get_address_in_network(config(cfg_opt))
if laddr:
netmask = get_netmask_for_address(laddr)
cluster_hosts[laddr] = {'network': "{}/{}".format(laddr,
netmask),
'backends': {l_unit: laddr}}
cluster_hosts[laddr] = {
'network': "{}/{}".format(laddr,
netmask),
'backends': collections.OrderedDict([(l_unit,
laddr)])
}
for rid in relation_ids('cluster'):
for unit in related_units(rid):
for unit in sorted(related_units(rid)):
_laddr = relation_get('{}-address'.format(addr_type),
rid=rid, unit=unit)
if _laddr:
@ -594,10 +598,13 @@ class HAProxyContext(OSContextGenerator):
# match in the frontend
cluster_hosts[addr] = {}
netmask = get_netmask_for_address(addr)
cluster_hosts[addr] = {'network': "{}/{}".format(addr, netmask),
'backends': {l_unit: addr}}
cluster_hosts[addr] = {
'network': "{}/{}".format(addr, netmask),
'backends': collections.OrderedDict([(l_unit,
addr)])
}
for rid in relation_ids('cluster'):
for unit in related_units(rid):
for unit in sorted(related_units(rid)):
_laddr = relation_get('private-address',
rid=rid, unit=unit)
if _laddr:
@ -628,6 +635,8 @@ class HAProxyContext(OSContextGenerator):
ctxt['local_host'] = '127.0.0.1'
ctxt['haproxy_host'] = '0.0.0.0'
ctxt['ipv6_enabled'] = not is_ipv6_disabled()
ctxt['stat_port'] = '8888'
db = kv()
@ -802,8 +811,9 @@ class ApacheSSLContext(OSContextGenerator):
else:
# Expect cert/key provided in config (currently assumed that ca
# uses ip for cn)
cn = resolve_address(endpoint_type=INTERNAL)
self.configure_cert(cn)
for net_type in (INTERNAL, ADMIN, PUBLIC):
cn = resolve_address(endpoint_type=net_type)
self.configure_cert(cn)
addresses = self.get_network_addresses()
for address, endpoint in addresses:
@ -843,15 +853,6 @@ class NeutronContext(OSContextGenerator):
for pkgs in self.packages:
ensure_packages(pkgs)
def _save_flag_file(self):
if self.network_manager == 'quantum':
_file = '/etc/nova/quantum_plugin.conf'
else:
_file = '/etc/nova/neutron_plugin.conf'
with open(_file, 'wb') as out:
out.write(self.plugin + '\n')
def ovs_ctxt(self):
driver = neutron_plugin_attribute(self.plugin, 'driver',
self.network_manager)
@ -996,7 +997,6 @@ class NeutronContext(OSContextGenerator):
flags = config_flags_parser(alchemy_flags)
ctxt['neutron_alchemy_flags'] = flags
self._save_flag_file()
return ctxt
@ -1176,7 +1176,7 @@ class SubordinateConfigContext(OSContextGenerator):
if sub_config and sub_config != '':
try:
sub_config = json.loads(sub_config)
except:
except Exception:
log('Could not parse JSON from '
'subordinate_configuration setting from %s'
% rid, level=ERROR)

View File

@ -59,18 +59,13 @@ def determine_dkms_package():
def quantum_plugins():
from charmhelpers.contrib.openstack import context
return {
'ovs': {
'config': '/etc/quantum/plugins/openvswitch/'
'ovs_quantum_plugin.ini',
'driver': 'quantum.plugins.openvswitch.ovs_quantum_plugin.'
'OVSQuantumPluginV2',
'contexts': [
context.SharedDBContext(user=config('neutron-database-user'),
database=config('neutron-database'),
relation_prefix='neutron',
ssl_dir=QUANTUM_CONF_DIR)],
'contexts': [],
'services': ['quantum-plugin-openvswitch-agent'],
'packages': [determine_dkms_package(),
['quantum-plugin-openvswitch-agent']],
@ -82,11 +77,7 @@ def quantum_plugins():
'config': '/etc/quantum/plugins/nicira/nvp.ini',
'driver': 'quantum.plugins.nicira.nicira_nvp_plugin.'
'QuantumPlugin.NvpPluginV2',
'contexts': [
context.SharedDBContext(user=config('neutron-database-user'),
database=config('neutron-database'),
relation_prefix='neutron',
ssl_dir=QUANTUM_CONF_DIR)],
'contexts': [],
'services': [],
'packages': [],
'server_packages': ['quantum-server',
@ -100,7 +91,6 @@ NEUTRON_CONF_DIR = '/etc/neutron'
def neutron_plugins():
from charmhelpers.contrib.openstack import context
release = os_release('nova-common')
plugins = {
'ovs': {
@ -108,11 +98,7 @@ def neutron_plugins():
'ovs_neutron_plugin.ini',
'driver': 'neutron.plugins.openvswitch.ovs_neutron_plugin.'
'OVSNeutronPluginV2',
'contexts': [
context.SharedDBContext(user=config('neutron-database-user'),
database=config('neutron-database'),
relation_prefix='neutron',
ssl_dir=NEUTRON_CONF_DIR)],
'contexts': [],
'services': ['neutron-plugin-openvswitch-agent'],
'packages': [determine_dkms_package(),
['neutron-plugin-openvswitch-agent']],
@ -124,11 +110,7 @@ def neutron_plugins():
'config': '/etc/neutron/plugins/nicira/nvp.ini',
'driver': 'neutron.plugins.nicira.nicira_nvp_plugin.'
'NeutronPlugin.NvpPluginV2',
'contexts': [
context.SharedDBContext(user=config('neutron-database-user'),
database=config('neutron-database'),
relation_prefix='neutron',
ssl_dir=NEUTRON_CONF_DIR)],
'contexts': [],
'services': [],
'packages': [],
'server_packages': ['neutron-server',
@ -138,11 +120,7 @@ def neutron_plugins():
'nsx': {
'config': '/etc/neutron/plugins/vmware/nsx.ini',
'driver': 'vmware',
'contexts': [
context.SharedDBContext(user=config('neutron-database-user'),
database=config('neutron-database'),
relation_prefix='neutron',
ssl_dir=NEUTRON_CONF_DIR)],
'contexts': [],
'services': [],
'packages': [],
'server_packages': ['neutron-server',
@ -152,11 +130,7 @@ def neutron_plugins():
'n1kv': {
'config': '/etc/neutron/plugins/cisco/cisco_plugins.ini',
'driver': 'neutron.plugins.cisco.network_plugin.PluginV2',
'contexts': [
context.SharedDBContext(user=config('neutron-database-user'),
database=config('neutron-database'),
relation_prefix='neutron',
ssl_dir=NEUTRON_CONF_DIR)],
'contexts': [],
'services': [],
'packages': [determine_dkms_package(),
['neutron-plugin-cisco']],
@ -167,11 +141,7 @@ def neutron_plugins():
'Calico': {
'config': '/etc/neutron/plugins/ml2/ml2_conf.ini',
'driver': 'neutron.plugins.ml2.plugin.Ml2Plugin',
'contexts': [
context.SharedDBContext(user=config('neutron-database-user'),
database=config('neutron-database'),
relation_prefix='neutron',
ssl_dir=NEUTRON_CONF_DIR)],
'contexts': [],
'services': ['calico-felix',
'bird',
'neutron-dhcp-agent',
@ -189,11 +159,7 @@ def neutron_plugins():
'vsp': {
'config': '/etc/neutron/plugins/nuage/nuage_plugin.ini',
'driver': 'neutron.plugins.nuage.plugin.NuagePlugin',
'contexts': [
context.SharedDBContext(user=config('neutron-database-user'),
database=config('neutron-database'),
relation_prefix='neutron',
ssl_dir=NEUTRON_CONF_DIR)],
'contexts': [],
'services': [],
'packages': [],
'server_packages': ['neutron-server', 'neutron-plugin-nuage'],
@ -203,10 +169,7 @@ def neutron_plugins():
'config': '/etc/neutron/plugins/plumgrid/plumgrid.ini',
'driver': ('neutron.plugins.plumgrid.plumgrid_plugin'
'.plumgrid_plugin.NeutronPluginPLUMgridV2'),
'contexts': [
context.SharedDBContext(user=config('database-user'),
database=config('database'),
ssl_dir=NEUTRON_CONF_DIR)],
'contexts': [],
'services': [],
'packages': ['plumgrid-lxc',
'iovisor-dkms'],
@ -217,11 +180,7 @@ def neutron_plugins():
'midonet': {
'config': '/etc/neutron/plugins/midonet/midonet.ini',
'driver': 'midonet.neutron.plugin.MidonetPluginV2',
'contexts': [
context.SharedDBContext(user=config('neutron-database-user'),
database=config('neutron-database'),
relation_prefix='neutron',
ssl_dir=NEUTRON_CONF_DIR)],
'contexts': [],
'services': [],
'packages': [determine_dkms_package()],
'server_packages': ['neutron-server',

View File

@ -95,7 +95,7 @@ from charmhelpers.fetch import (
from charmhelpers.fetch.snap import (
snap_install,
snap_refresh,
SNAP_CHANNELS,
valid_snap_channel,
)
from charmhelpers.contrib.storage.linux.utils import is_block_device, zap_disk
@ -426,7 +426,7 @@ def get_os_codename_package(package, fatal=True):
try:
pkg = cache[package]
except:
except Exception:
if not fatal:
return None
# the package is unknown to the current apt cache.
@ -579,6 +579,9 @@ def configure_installation_source(source_plus_key):
Note that the behaviour on error is to log the error to the juju log and
then call sys.exit(1).
"""
if source_plus_key.startswith('snap'):
# Do nothing for snap installs
return
# extract the key if there is one, denoted by a '|' in the rel
source, key = get_source_and_pgp_key(source_plus_key)
@ -615,7 +618,7 @@ def save_script_rc(script_path="scripts/scriptrc", **env_vars):
juju_rc_path = "%s/%s" % (charm_dir(), script_path)
if not os.path.exists(os.path.dirname(juju_rc_path)):
os.mkdir(os.path.dirname(juju_rc_path))
with open(juju_rc_path, 'wb') as rc_script:
with open(juju_rc_path, 'wt') as rc_script:
rc_script.write(
"#!/bin/bash\n")
[rc_script.write('export %s=%s\n' % (u, p))
@ -794,7 +797,7 @@ def git_default_repos(projects_yaml):
service = service_name()
core_project = service
for default, branch in GIT_DEFAULT_BRANCHES.iteritems():
for default, branch in six.iteritems(GIT_DEFAULT_BRANCHES):
if projects_yaml == default:
# add the requirements repo first
@ -1615,7 +1618,7 @@ def do_action_openstack_upgrade(package, upgrade_callback, configs):
upgrade_callback(configs=configs)
action_set({'outcome': 'success, upgrade completed.'})
ret = True
except:
except Exception:
action_set({'outcome': 'upgrade failed, see traceback.'})
action_set({'traceback': traceback.format_exc()})
action_fail('do_openstack_upgrade resulted in an '
@ -1720,7 +1723,7 @@ def is_unit_paused_set():
kv = t[0]
# transform something truth-y into a Boolean.
return not(not(kv.get('unit-paused')))
except:
except Exception:
return False
@ -2048,7 +2051,7 @@ def update_json_file(filename, items):
def snap_install_requested():
""" Determine if installing from snaps
If openstack-origin is of the form snap:channel-series-release
If openstack-origin is of the form snap:track/channel[/branch]
and channel is in SNAPS_CHANNELS return True.
"""
origin = config('openstack-origin') or ""
@ -2056,10 +2059,12 @@ def snap_install_requested():
return False
_src = origin[5:]
channel, series, release = _src.split('-')
if channel.lower() in SNAP_CHANNELS:
return True
return False
if '/' in _src:
channel = _src.split('/')[1]
else:
# Handle snap:track with no channel
channel = 'stable'
return valid_snap_channel(channel)
def get_snaps_install_info_from_origin(snaps, src, mode='classic'):
@ -2067,7 +2072,7 @@ def get_snaps_install_info_from_origin(snaps, src, mode='classic'):
@param snaps: List of snaps
@param src: String of openstack-origin or source of the form
snap:channel-series-track
snap:track/channel
@param mode: String classic, devmode or jailmode
@returns: Dictionary of snaps with channels and modes
"""
@ -2077,8 +2082,7 @@ def get_snaps_install_info_from_origin(snaps, src, mode='classic'):
return {}
_src = src[5:]
_channel, _series, _release = _src.split('-')
channel = '--channel={}/{}'.format(_release, _channel)
channel = '--channel={}'.format(_src)
return {snap: {'channel': channel, 'mode': mode}
for snap in snaps}
@ -2090,8 +2094,8 @@ def install_os_snaps(snaps, refresh=False):
@param snaps: Dictionary of snaps with channels and modes of the form:
{'snap_name': {'channel': 'snap_channel',
'mode': 'snap_mode'}}
Where channel a snapstore channel and mode is --classic, --devmode or
--jailmode.
Where channel is a snapstore channel and mode is --classic, --devmode
or --jailmode.
@param post_snap_install: Callback function to run after snaps have been
installed
"""

View File

@ -370,9 +370,10 @@ def get_mon_map(service):
Also raises CalledProcessError if our ceph command fails
"""
try:
mon_status = check_output(
['ceph', '--id', service,
'mon_status', '--format=json'])
mon_status = check_output(['ceph', '--id', service,
'mon_status', '--format=json'])
if six.PY3:
mon_status = mon_status.decode('UTF-8')
try:
return json.loads(mon_status)
except ValueError as v:
@ -457,7 +458,7 @@ def monitor_key_get(service, key):
try:
output = check_output(
['ceph', '--id', service,
'config-key', 'get', str(key)])
'config-key', 'get', str(key)]).decode('UTF-8')
return output
except CalledProcessError as e:
log("Monitor config-key get failed with message: {}".format(
@ -500,6 +501,8 @@ def get_erasure_profile(service, name):
out = check_output(['ceph', '--id', service,
'osd', 'erasure-code-profile', 'get',
name, '--format=json'])
if six.PY3:
out = out.decode('UTF-8')
return json.loads(out)
except (CalledProcessError, OSError, ValueError):
return None
@ -686,7 +689,10 @@ def get_cache_mode(service, pool_name):
"""
validator(value=service, valid_type=six.string_types)
validator(value=pool_name, valid_type=six.string_types)
out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
out = check_output(['ceph', '--id', service,
'osd', 'dump', '--format=json'])
if six.PY3:
out = out.decode('UTF-8')
try:
osd_json = json.loads(out)
for pool in osd_json['pools']:
@ -700,8 +706,9 @@ def get_cache_mode(service, pool_name):
def pool_exists(service, name):
"""Check to see if a RADOS pool already exists."""
try:
out = check_output(['rados', '--id', service,
'lspools']).decode('UTF-8')
out = check_output(['rados', '--id', service, 'lspools'])
if six.PY3:
out = out.decode('UTF-8')
except CalledProcessError:
return False
@ -714,9 +721,12 @@ def get_osds(service):
"""
version = ceph_version()
if version and version >= '0.56':
return json.loads(check_output(['ceph', '--id', service,
'osd', 'ls',
'--format=json']).decode('UTF-8'))
out = check_output(['ceph', '--id', service,
'osd', 'ls',
'--format=json'])
if six.PY3:
out = out.decode('UTF-8')
return json.loads(out)
return None
@ -734,7 +744,9 @@ def rbd_exists(service, pool, rbd_img):
"""Check to see if a RADOS block device exists."""
try:
out = check_output(['rbd', 'list', '--id',
service, '--pool', pool]).decode('UTF-8')
service, '--pool', pool])
if six.PY3:
out = out.decode('UTF-8')
except CalledProcessError:
return False
@ -859,7 +871,9 @@ def configure(service, key, auth, use_syslog):
def image_mapped(name):
"""Determine whether a RADOS block device is mapped locally."""
try:
out = check_output(['rbd', 'showmapped']).decode('UTF-8')
out = check_output(['rbd', 'showmapped'])
if six.PY3:
out = out.decode('UTF-8')
except CalledProcessError:
return False
@ -1018,7 +1032,9 @@ def ceph_version():
"""Retrieve the local version of ceph."""
if os.path.exists('/usr/bin/ceph'):
cmd = ['ceph', '-v']
output = check_output(cmd).decode('US-ASCII')
output = check_output(cmd)
if six.PY3:
output = output.decode('UTF-8')
output = output.split()
if len(output) > 3:
return output[2]

View File

@ -74,10 +74,10 @@ def list_lvm_volume_group(block_device):
'''
vg = None
pvd = check_output(['pvdisplay', block_device]).splitlines()
for l in pvd:
l = l.decode('UTF-8')
if l.strip().startswith('VG Name'):
vg = ' '.join(l.strip().split()[2:])
for lvm in pvd:
lvm = lvm.decode('UTF-8')
if lvm.strip().startswith('VG Name'):
vg = ' '.join(lvm.strip().split()[2:])
return vg

View File

@ -64,6 +64,6 @@ def is_device_mounted(device):
'''
try:
out = check_output(['lsblk', '-P', device]).decode('UTF-8')
except:
except Exception:
return False
return bool(re.search(r'MOUNTPOINT=".+"', out))

View File

@ -22,6 +22,7 @@ from __future__ import print_function
import copy
from distutils.version import LooseVersion
from functools import wraps
from collections import namedtuple
import glob
import os
import json
@ -218,6 +219,8 @@ def principal_unit():
for rid in relation_ids(reltype):
for unit in related_units(rid):
md = _metadata_unit(unit)
if not md:
continue
subordinate = md.pop('subordinate', None)
if not subordinate:
return unit
@ -511,7 +514,10 @@ def _metadata_unit(unit):
"""
basedir = os.sep.join(charm_dir().split(os.sep)[:-2])
unitdir = 'unit-{}'.format(unit.replace(os.sep, '-'))
with open(os.path.join(basedir, unitdir, 'charm', 'metadata.yaml')) as md:
joineddir = os.path.join(basedir, unitdir, 'charm', 'metadata.yaml')
if not os.path.exists(joineddir):
return None
with open(joineddir) as md:
return yaml.safe_load(md)
@ -639,18 +645,31 @@ def is_relation_made(relation, keys='private-address'):
return False
def _port_op(op_name, port, protocol="TCP"):
"""Open or close a service network port"""
_args = [op_name]
icmp = protocol.upper() == "ICMP"
if icmp:
_args.append(protocol)
else:
_args.append('{}/{}'.format(port, protocol))
try:
subprocess.check_call(_args)
except subprocess.CalledProcessError:
# Older Juju pre 2.3 doesn't support ICMP
# so treat it as a no-op if it fails.
if not icmp:
raise
def open_port(port, protocol="TCP"):
"""Open a service network port"""
_args = ['open-port']
_args.append('{}/{}'.format(port, protocol))
subprocess.check_call(_args)
_port_op('open-port', port, protocol)
def close_port(port, protocol="TCP"):
"""Close a service network port"""
_args = ['close-port']
_args.append('{}/{}'.format(port, protocol))
subprocess.check_call(_args)
_port_op('close-port', port, protocol)
def open_ports(start, end, protocol="TCP"):
@ -667,6 +686,17 @@ def close_ports(start, end, protocol="TCP"):
subprocess.check_call(_args)
def opened_ports():
"""Get the opened ports
*Note that this will only show ports opened in a previous hook*
:returns: Opened ports as a list of strings: ``['8080/tcp', '8081-8083/tcp']``
"""
_args = ['opened-ports', '--format=json']
return json.loads(subprocess.check_output(_args).decode('UTF-8'))
@cached
def unit_get(attribute):
"""Get the unit ID for the remote unit"""
@ -1077,6 +1107,35 @@ def network_get_primary_address(binding):
return subprocess.check_output(cmd).decode('UTF-8').strip()
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def network_get(endpoint, relation_id=None):
"""
Retrieve the network details for a relation endpoint
:param endpoint: string. The name of a relation endpoint
:param relation_id: int. The ID of the relation for the current context.
:return: dict. The loaded YAML output of the network-get query.
:raise: NotImplementedError if run on Juju < 2.1
"""
cmd = ['network-get', endpoint, '--format', 'yaml']
if relation_id:
cmd.append('-r')
cmd.append(relation_id)
try:
response = subprocess.check_output(
cmd,
stderr=subprocess.STDOUT).decode('UTF-8').strip()
except CalledProcessError as e:
# Early versions of Juju 2.0.x required the --primary-address argument.
# We catch that condition here and raise NotImplementedError since
# the requested semantics are not available - the caller can then
# use the network_get_primary_address() method instead.
if '--primary-address is currently required' in e.output.decode('UTF-8'):
raise NotImplementedError
raise
return yaml.safe_load(response)
def add_metric(*args, **kwargs):
"""Add metric values. Values may be expressed with keyword arguments. For
metric names containing dashes, these may be expressed as one or more
@ -1106,3 +1165,42 @@ def meter_info():
"""Get the meter status information, if running in the meter-status-changed
hook."""
return os.environ.get('JUJU_METER_INFO')
def iter_units_for_relation_name(relation_name):
"""Iterate through all units in a relation
Generator that iterates through all the units in a relation and yields
a named tuple with rid and unit field names.
Usage:
data = [(u.rid, u.unit)
for u in iter_units_for_relation_name(relation_name)]
:param relation_name: string relation name
:yield: Named Tuple with rid and unit field names
"""
RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
for rid in relation_ids(relation_name):
for unit in related_units(rid):
yield RelatedUnit(rid, unit)
def ingress_address(rid=None, unit=None):
"""
Retrieve the ingress-address from a relation when available. Otherwise,
return the private-address. This function is to be used on the consuming
side of the relation.
Usage:
addresses = [ingress_address(rid=u.rid, unit=u.unit)
for u in iter_units_for_relation_name(relation_name)]
:param rid: string relation id
:param unit: string unit name
:side effect: calls relation_get
:return: string IP address
"""
settings = relation_get(rid=rid, unit=unit)
return (settings.get('ingress-address') or
settings.get('private-address'))

View File

@ -34,7 +34,7 @@ import six
from contextlib import contextmanager
from collections import OrderedDict
from .hookenv import log, DEBUG
from .hookenv import log, DEBUG, local_unit
from .fstab import Fstab
from charmhelpers.osplatform import get_platform
@ -441,6 +441,49 @@ def add_user_to_group(username, group):
subprocess.check_call(cmd)
def chage(username, lastday=None, expiredate=None, inactive=None,
mindays=None, maxdays=None, root=None, warndays=None):
"""Change user password expiry information
:param str username: User to update
:param str lastday: Set when password was changed in YYYY-MM-DD format
:param str expiredate: Set when user's account will no longer be
accessible in YYYY-MM-DD format.
-1 will remove an account expiration date.
:param str inactive: Set the number of days of inactivity after a password
has expired before the account is locked.
-1 will remove an account's inactivity.
:param str mindays: Set the minimum number of days between password
changes to MIN_DAYS.
0 indicates the password can be changed anytime.
:param str maxdays: Set the maximum number of days during which a
password is valid.
-1 as MAX_DAYS will remove checking maxdays
:param str root: Apply changes in the CHROOT_DIR directory
:param str warndays: Set the number of days of warning before a password
change is required
:raises subprocess.CalledProcessError: if call to chage fails
"""
cmd = ['chage']
if root:
cmd.extend(['--root', root])
if lastday:
cmd.extend(['--lastday', lastday])
if expiredate:
cmd.extend(['--expiredate', expiredate])
if inactive:
cmd.extend(['--inactive', inactive])
if mindays:
cmd.extend(['--mindays', mindays])
if maxdays:
cmd.extend(['--maxdays', maxdays])
if warndays:
cmd.extend(['--warndays', warndays])
cmd.append(username)
subprocess.check_call(cmd)
remove_password_expiry = functools.partial(chage, expiredate='-1', inactive='-1', mindays='0', maxdays='-1')
def rsync(from_path, to_path, flags='-r', options=None, timeout=None):
"""Replicate the contents of a path"""
options = options or ['--delete', '--executability']
@ -946,3 +989,31 @@ def updatedb(updatedb_text, new_path):
lines[i] = 'PRUNEPATHS="{}"'.format(' '.join(paths))
output = "\n".join(lines)
return output
def modulo_distribution(modulo=3, wait=30):
""" Modulo distribution
This helper uses the unit number, a modulo value and a constant wait time
to produce a calculated wait time distribution. This is useful in large
scale deployments to distribute load during an expensive operation such as
service restarts.
If you have 1000 nodes that need to restart 100 at a time 1 minute at a
time:
time.wait(modulo_distribution(modulo=100, wait=60))
restart()
If you need restarts to happen serially set modulo to the exact number of
nodes and set a high constant wait time:
time.wait(modulo_distribution(modulo=10, wait=120))
restart()
@param modulo: int The modulo number creates the group distribution
@param wait: int The constant time wait value
@return: int Calculated time to wait for unit operation
"""
unit_number = int(local_unit().split('/')[1])
return (unit_number % modulo) * wait

View File

@ -61,13 +61,19 @@ def bytes_from_string(value):
if isinstance(value, six.string_types):
value = six.text_type(value)
else:
msg = "Unable to interpret non-string value '%s' as boolean" % (value)
msg = "Unable to interpret non-string value '%s' as bytes" % (value)
raise ValueError(msg)
matches = re.match("([0-9]+)([a-zA-Z]+)", value)
if not matches:
msg = "Unable to interpret string value '%s' as bytes" % (value)
raise ValueError(msg)
return int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])
if matches:
size = int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])
else:
# Assume that value passed in is bytes
try:
size = int(value)
except ValueError:
msg = "Unable to interpret string value '%s' as bytes" % (value)
raise ValueError(msg)
return size
class BasicStringComparator(object):

View File

@ -358,7 +358,7 @@ class Storage(object):
try:
yield self.revision
self.revision = None
except:
except Exception:
self.flush(False)
self.revision = None
raise

View File

@ -41,6 +41,10 @@ class CouldNotAcquireLockException(Exception):
pass
class InvalidSnapChannel(Exception):
pass
def _snap_exec(commands):
"""
Execute snap commands.
@ -132,3 +136,15 @@ def snap_refresh(packages, *flags):
log(message, level='INFO')
return _snap_exec(['refresh'] + flags + packages)
def valid_snap_channel(channel):
""" Validate snap channel exists
:raises InvalidSnapChannel: When channel does not exist
:return: Boolean
"""
if channel.lower() in SNAP_CHANNELS:
return True
else:
raise InvalidSnapChannel("Invalid Snap Channel: {}".format(channel))

View File

@ -572,7 +572,7 @@ def get_upstream_version(package):
cache = apt_cache()
try:
pkg = cache[package]
except:
except Exception:
# the package is unknown to the current apt cache.
return None

View File

@ -11,7 +11,7 @@ check_and_install() {
fi
}
PYTHON="python"
PYTHON="python3"
for dep in ${DEPS[@]}; do
check_and_install ${PYTHON} ${dep}

View File

@ -11,7 +11,7 @@ check_and_install() {
fi
}
PYTHON="python"
PYTHON="python3"
for dep in ${DEPS[@]}; do
check_and_install ${PYTHON} ${dep}

View File

@ -1,6 +1,7 @@
#!/bin/bash
# Wrapper to ensure that old python bytecode isn't hanging around
# after we upgrade the charm with newer libraries
rm -rf **/*.pyc
find . -iname '*.pyc' -delete
find . -name '__pycache__' -prune -exec rm -rf "{}" \;
./hooks/install_deps
exec ./hooks/upgrade-charm.real

View File

@ -44,14 +44,14 @@ TEMPLATES_DIR = 'templates'
try:
import jinja2
except ImportError:
apt_install(filter_installed_packages(['python-jinja2']),
apt_install(filter_installed_packages(['python3-jinja2']),
fatal=True)
import jinja2
try:
import dns.resolver
except ImportError:
apt_install(filter_installed_packages(['python-dnspython']),
apt_install(filter_installed_packages(['python3-dnspython']),
fatal=True)
import dns.resolver
@ -65,9 +65,9 @@ def render_template(template_name, context, template_dir=TEMPLATES_DIR):
def enable_pocket(pocket):
apt_sources = "/etc/apt/sources.list"
with open(apt_sources, "r") as sources:
with open(apt_sources, "rt", encoding='UTF-8') as sources:
lines = sources.readlines()
with open(apt_sources, "w") as sources:
with open(apt_sources, "wt", encoding='UTF-8') as sources:
for line in lines:
if pocket in line:
sources.write(re.sub('^# deb', 'deb', line))

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import json
import os
@ -134,7 +135,7 @@ def process_requests(reqs):
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
msg = ("Missing or invalid api version (%s)" % version)
msg = ("Missing or invalid api version ({})".format(version))
resp = {'exit-code': 1, 'stderr': msg}
if request_id:
resp['request-id'] = request_id
@ -231,7 +232,7 @@ def add_pool_to_group(pool, group, namespace=None):
def pool_permission_list_for_service(service):
"""Build the permission string for Ceph for a given service"""
permissions = []
permission_types = {}
permission_types = collections.OrderedDict()
for permission, group in service["group_names"].items():
if permission not in permission_types:
permission_types[permission] = []
@ -267,9 +268,7 @@ def get_service_groups(service, namespace=None):
key="cephx.services.{}".format(service))
try:
service = json.loads(service_json)
except TypeError:
service = None
except ValueError:
except (TypeError, ValueError):
service = None
if service:
service['groups'] = _build_service_groups(service, namespace)
@ -296,7 +295,7 @@ def _build_service_groups(service, namespace=None):
}
"""
all_groups = {}
for _, groups in service['group_names'].items():
for groups in service['group_names'].values():
for group in groups:
name = group
if namespace:
@ -316,9 +315,7 @@ def get_group(group_name):
group_json = monitor_key_get(service='admin', key=group_key)
try:
group = json.loads(group_json)
except TypeError:
group = None
except ValueError:
except (TypeError, ValueError):
group = None
if not group:
group = {
@ -391,9 +388,8 @@ def handle_erasure_pool(request, service):
percent_data=weight)
# Ok make the erasure pool
if not pool_exists(service=service, name=pool_name):
log("Creating pool '%s' (erasure_profile=%s)" % (pool.name,
erasure_profile),
level=INFO)
log("Creating pool '{}' (erasure_profile={})"
.format(pool.name, erasure_profile), level=INFO)
pool.create()
# Set a quota if requested
@ -446,11 +442,11 @@ def handle_replicated_pool(request, service):
pool = ReplicatedPool(service=service,
name=pool_name, **kwargs)
if not pool_exists(service=service, name=pool_name):
log("Creating pool '%s' (replicas=%s)" % (pool.name, replicas),
log("Creating pool '{}' (replicas={})".format(pool.name, replicas),
level=INFO)
pool.create()
else:
log("Pool '%s' already exists - skipping create" % pool.name,
log("Pool '{}' already exists - skipping create".format(pool.name),
level=DEBUG)
# Set a quota if requested
@ -519,7 +515,7 @@ def handle_set_pool_value(request, service):
'key': request.get('key'),
'value': request.get('value')}
if params['key'] not in POOL_KEYS:
msg = "Invalid key '%s'" % params['key']
msg = "Invalid key '{}'".format(params['key'])
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
@ -685,7 +681,7 @@ def handle_rgw_create_user(request, service):
]
)
try:
user_json = json.loads(create_output)
user_json = json.loads(str(create_output.decode('UTF-8')))
return {'exit-code': 0, 'user': user_json}
except ValueError as err:
log(err, level=ERROR)
@ -790,10 +786,10 @@ def process_requests_v1(reqs):
operation failed along with an explanation).
"""
ret = None
log("Processing %s ceph broker requests" % (len(reqs)), level=INFO)
log("Processing {} ceph broker requests".format(len(reqs)), level=INFO)
for req in reqs:
op = req.get('op')
log("Processing op='%s'" % op, level=DEBUG)
log("Processing op='{}'".format(op), level=DEBUG)
# Use admin client since we do not have other client key locations
# setup to use them for these operations.
svc = 'admin'
@ -848,7 +844,7 @@ def process_requests_v1(reqs):
elif op == "add-permissions-to-key":
ret = handle_add_permissions_to_key(request=req, service=svc)
else:
msg = "Unknown operation '%s'" % op
msg = "Unknown operation '{}'".format(op)
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}

View File

@ -60,7 +60,7 @@ class Crushmap(object):
ids = list(map(
lambda x: int(x),
re.findall(CRUSHMAP_ID_RE, self._crushmap)))
ids.sort()
ids = sorted(ids)
if roots != []:
for root in roots:
buckets.append(CRUSHBucket(root[0], root[1], True))
@ -73,8 +73,11 @@ class Crushmap(object):
def load_crushmap(self):
try:
crush = check_output(['ceph', 'osd', 'getcrushmap'])
return check_output(['crushtool', '-d', '-'], stdin=crush.stdout)
crush = str(check_output(['ceph', 'osd', 'getcrushmap'])
.decode('UTF-8'))
return str(check_output(['crushtool', '-d', '-'],
stdin=crush.stdout)
.decode('UTF-8'))
except CalledProcessError as e:
log("Error occured while loading and decompiling CRUSH map:"
"{}".format(e), ERROR)
@ -99,10 +102,12 @@ class Crushmap(object):
"""Persist Crushmap to Ceph"""
try:
crushmap = self.build_crushmap()
compiled = check_output(['crushtool', '-c', '/dev/stdin', '-o',
'/dev/stdout'], stdin=crushmap)
ceph_output = check_output(['ceph', 'osd', 'setcrushmap', '-i',
'/dev/stdin'], stdin=compiled)
compiled = str(check_output(['crushtool', '-c', '/dev/stdin', '-o',
'/dev/stdout'], stdin=crushmap)
.decode('UTF-8'))
ceph_output = str(check_output(['ceph', 'osd', 'setcrushmap', '-i',
'/dev/stdin'], stdin=compiled)
.decode('UTF-8'))
return ceph_output
except CalledProcessError as e:
log("save error: {}".format(e))

View File

@ -30,6 +30,7 @@ from datetime import datetime
from charmhelpers.core import hookenv
from charmhelpers.core import templating
from charmhelpers.core.decorators import retry_on_exception
from charmhelpers.core.host import (
chownr,
cmp_pkgrevno,
@ -380,8 +381,9 @@ def get_block_uuid(block_dev):
:returns: The UUID of the device or None on Error.
"""
try:
block_info = subprocess.check_output(
['blkid', '-o', 'export', block_dev])
block_info = str(subprocess
.check_output(['blkid', '-o', 'export', block_dev])
.decode('UTF-8'))
for tag in block_info.split('\n'):
parts = tag.split('=')
if parts[0] == 'UUID':
@ -532,8 +534,9 @@ def get_osd_weight(osd_id):
:raises: CalledProcessError if our ceph command fails.
"""
try:
tree = subprocess.check_output(
['ceph', 'osd', 'tree', '--format=json'])
tree = str(subprocess
.check_output(['ceph', 'osd', 'tree', '--format=json'])
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
# Make sure children are present in the json
@ -560,9 +563,10 @@ def get_osd_tree(service):
Also raises CalledProcessError if our ceph command fails
"""
try:
tree = subprocess.check_output(
['ceph', '--id', service,
'osd', 'tree', '--format=json'])
tree = str(subprocess
.check_output(['ceph', '--id', service,
'osd', 'tree', '--format=json'])
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
crush_list = []
@ -627,7 +631,7 @@ def _get_osd_num_from_dirname(dirname):
"""
match = re.search('ceph-(?P<osd_id>\d+)', dirname)
if not match:
raise ValueError("dirname not in correct format: %s" % dirname)
raise ValueError("dirname not in correct format: {}".format(dirname))
return match.group('osd_id')
@ -717,7 +721,7 @@ def get_version():
def error_out(msg):
log("FATAL ERROR: %s" % msg,
log("FATAL ERROR: {}".format(msg),
level=ERROR)
sys.exit(1)
@ -735,7 +739,9 @@ def is_quorum():
]
if os.path.exists(asok):
try:
result = json.loads(subprocess.check_output(cmd))
result = json.loads(str(subprocess
.check_output(cmd)
.decode('UTF-8')))
except subprocess.CalledProcessError:
return False
except ValueError:
@ -762,7 +768,9 @@ def is_leader():
]
if os.path.exists(asok):
try:
result = json.loads(subprocess.check_output(cmd))
result = json.loads(str(subprocess
.check_output(cmd)
.decode('UTF-8')))
except subprocess.CalledProcessError:
return False
except ValueError:
@ -954,8 +962,9 @@ def is_osd_disk(dev):
partitions = get_partition_list(dev)
for partition in partitions:
try:
info = subprocess.check_output(['sgdisk', '-i', partition.number,
dev])
info = str(subprocess
.check_output(['sgdisk', '-i', partition.number, dev])
.decode('UTF-8'))
info = info.split("\n") # IGNORE:E1103
for line in info:
for ptype in CEPH_PARTITIONS:
@ -1038,7 +1047,7 @@ def generate_monitor_secret():
'--name=mon.',
'--gen-key'
]
res = subprocess.check_output(cmd)
res = str(subprocess.check_output(cmd).decode('UTF-8'))
return "{}==".format(res.split('=')[1].strip())
@ -1187,7 +1196,10 @@ def create_named_keyring(entity, name, caps=None):
for subsystem, subcaps in caps.items():
cmd.extend([subsystem, '; '.join(subcaps)])
log("Calling check_output: {}".format(cmd), level=DEBUG)
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103
return (parse_key(str(subprocess
.check_output(cmd)
.decode('UTF-8'))
.strip())) # IGNORE:E1103
def get_upgrade_key():
@ -1204,7 +1216,7 @@ def get_named_key(name, caps=None, pool_list=None):
"""
try:
# Does the key already exist?
output = subprocess.check_output(
output = str(subprocess.check_output(
[
'sudo',
'-u', ceph_user(),
@ -1217,7 +1229,7 @@ def get_named_key(name, caps=None, pool_list=None):
'auth',
'get',
'client.{}'.format(name),
]).strip()
]).decode('UTF-8')).strip()
return parse_key(output)
except subprocess.CalledProcessError:
# Couldn't get the key, time to create it!
@ -1246,7 +1258,10 @@ def get_named_key(name, caps=None, pool_list=None):
cmd.extend([subsystem, '; '.join(subcaps)])
log("Calling check_output: {}".format(cmd), level=DEBUG)
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103
return parse_key(str(subprocess
.check_output(cmd)
.decode('UTF-8'))
.strip()) # IGNORE:E1103
def upgrade_key_caps(key, caps):
@ -1287,40 +1302,53 @@ def bootstrap_monitor_cluster(secret):
mkdir(path, owner=ceph_user(), group=ceph_user())
# end changes for Ceph >= 0.61.3
try:
subprocess.check_call(['ceph-authtool', keyring,
'--create-keyring', '--name=mon.',
'--add-key={}'.format(secret),
'--cap', 'mon', 'allow *'])
add_keyring_to_ceph(keyring,
secret,
hostname,
path,
done,
init_marker)
subprocess.check_call(['ceph-mon', '--mkfs',
'-i', hostname,
'--keyring', keyring])
chownr(path, ceph_user(), ceph_user())
with open(done, 'w'):
pass
with open(init_marker, 'w'):
pass
if systemd():
subprocess.check_call(['systemctl', 'enable', 'ceph-mon'])
service_restart('ceph-mon')
else:
service_restart('ceph-mon-all')
if cmp_pkgrevno('ceph', '12.0.0') >= 0:
# NOTE(jamespage): Later ceph releases require explicit
# call to ceph-create-keys to setup the
# admin keys for the cluster; this command
# will wait for quorum in the cluster before
# returning.
cmd = ['ceph-create-keys', '--id', hostname]
subprocess.check_call(cmd)
except:
raise
finally:
os.unlink(keyring)
@retry_on_exception(3, base_delay=5)
def add_keyring_to_ceph(keyring, secret, hostname, path, done, init_marker):
subprocess.check_call(['ceph-authtool', keyring,
'--create-keyring', '--name=mon.',
'--add-key={}'.format(secret),
'--cap', 'mon', 'allow *'])
subprocess.check_call(['ceph-mon', '--mkfs',
'-i', hostname,
'--keyring', keyring])
chownr(path, ceph_user(), ceph_user())
with open(done, 'w'):
pass
with open(init_marker, 'w'):
pass
if systemd():
subprocess.check_call(['systemctl', 'enable', 'ceph-mon'])
service_restart('ceph-mon')
else:
service_restart('ceph-mon-all')
if cmp_pkgrevno('ceph', '12.0.0') >= 0:
# NOTE(jamespage): Later ceph releases require explicit
# call to ceph-create-keys to setup the
# admin keys for the cluster; this command
# will wait for quorum in the cluster before
# returning.
cmd = ['ceph-create-keys', '--id', hostname]
subprocess.check_call(cmd)
osstat = os.stat("/etc/ceph/ceph.client.admin.keyring")
if not osstat.st_size:
raise Exception
def update_monfs():
hostname = socket.gethostname()
monfs = '/var/lib/ceph/mon/ceph-{}'.format(hostname)
@ -1347,7 +1375,7 @@ def maybe_zap_journal(journal_dev):
def get_partitions(dev):
cmd = ['partx', '--raw', '--noheadings', dev]
try:
out = subprocess.check_output(cmd).splitlines()
out = str(subprocess.check_output(cmd).decode('UTF-8')).splitlines()
log("get partitions: {}".format(out), level=DEBUG)
return out
except subprocess.CalledProcessError as e:
@ -1515,7 +1543,7 @@ def get_running_osds():
"""Returns a list of the pids of the current running OSD daemons"""
cmd = ['pgrep', 'ceph-osd']
try:
result = subprocess.check_output(cmd)
result = str(subprocess.check_output(cmd).decode('UTF-8'))
return result.split()
except subprocess.CalledProcessError:
return []
@ -1531,7 +1559,9 @@ def get_cephfs(service):
# This command wasn't introduced until 0.86 ceph
return []
try:
output = subprocess.check_output(["ceph", '--id', service, "fs", "ls"])
output = str(subprocess
.check_output(["ceph", '--id', service, "fs", "ls"])
.decode('UTF-8'))
if not output:
return []
"""
@ -2065,7 +2095,9 @@ def list_pools(service):
"""
try:
pool_list = []
pools = subprocess.check_output(['rados', '--id', service, 'lspools'])
pools = str(subprocess
.check_output(['rados', '--id', service, 'lspools'])
.decode('UTF-8'))
for pool in pools.splitlines():
pool_list.append(pool)
return pool_list
@ -2126,10 +2158,8 @@ UCA_CODENAME_MAP = {
def pretty_print_upgrade_paths():
"""Pretty print supported upgrade paths for ceph"""
lines = []
for key, value in UPGRADE_PATHS.iteritems():
lines.append("{} -> {}".format(key, value))
return lines
return ["{} -> {}".format(key, value)
for key, value in UPGRADE_PATHS.iteritems()]
def resolve_ceph_version(source):
@ -2149,7 +2179,9 @@ def get_ceph_pg_stat():
:returns: dict
"""
try:
tree = subprocess.check_output(['ceph', 'pg', 'stat', '--format=json'])
tree = str(subprocess
.check_output(['ceph', 'pg', 'stat', '--format=json'])
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
if not json_tree['num_pg_by_state']:
@ -2173,8 +2205,9 @@ def get_ceph_health():
status, use get_ceph_health()['overall_status'].
"""
try:
tree = subprocess.check_output(
['ceph', 'status', '--format=json'])
tree = str(subprocess
.check_output(['ceph', 'status', '--format=json'])
.decode('UTF-8'))
try:
json_tree = json.loads(tree)
# Make sure children are present in the json
@ -2201,9 +2234,12 @@ def reweight_osd(osd_num, new_weight):
:raises CalledProcessError: if an error occurs invoking the systemd cmd
"""
try:
cmd_result = subprocess.check_output(
['ceph', 'osd', 'crush', 'reweight', "osd.{}".format(osd_num),
new_weight], stderr=subprocess.STDOUT)
cmd_result = str(subprocess
.check_output(['ceph', 'osd', 'crush',
'reweight', "osd.{}".format(osd_num),
new_weight],
stderr=subprocess.STDOUT)
.decode('UTF-8'))
expected_result = "reweighted item id {ID} name \'osd.{ID}\'".format(
ID=osd_num) + " to {}".format(new_weight)
log(cmd_result)
@ -2246,3 +2282,25 @@ def bootstrap_manager():
unit = 'ceph-mgr@{}'.format(hostname)
subprocess.check_call(['systemctl', 'enable', unit])
service_restart(unit)
def osd_noout(enable):
"""Sets or unsets 'noout'
:param enable: bool. True to set noout, False to unset.
:returns: bool. True if output looks right.
:raises CalledProcessError: if an error occurs invoking the systemd cmd
"""
operation = {
True: 'set',
False: 'unset',
}
try:
subprocess.check_call(['ceph', '--id', 'admin',
'osd', operation[enable],
'noout'])
log('running ceph osd {} noout'.format(operation[enable]))
return True
except subprocess.CalledProcessError as e:
log(e)
raise

View File

@ -5,12 +5,12 @@ coverage>=3.6
mock>=1.2
flake8>=2.2.4,<=2.4.1
os-testr>=0.4.1
charm-tools>=2.0.0
charm-tools>=2.0.0;python_version=='2.7' # cheetah templates aren't availble in Python 3+
requests==2.6.0
# BEGIN: Amulet OpenStack Charm Helper Requirements
# Liberty client lower constraints
amulet>=1.14.3,<2.0
bundletester>=0.6.1,<1.0
bundletester>=0.6.1,<1.0;python_version=='2.7' # cheetah templates aren't availble in Python 3+
python-ceilometerclient>=1.5.0
python-cinderclient>=1.4.0
python-glanceclient>=1.1.0

View File

@ -347,7 +347,7 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
},
}
for section, pairs in expected.iteritems():
for section, pairs in expected.items():
ret = u.validate_config_data(unit, conf, section, pairs)
if ret:
message = "ceph config error: {}".format(ret)
@ -364,7 +364,7 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
'volume_driver': 'cinder.volume.drivers.rbd.RBDDriver'
}
}
for section, pairs in expected.iteritems():
for section, pairs in expected.items():
ret = u.validate_config_data(unit, conf, section, pairs)
if ret:
message = "cinder (rbd) config error: {}".format(ret)
@ -394,7 +394,7 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
section = 'DEFAULT'
expected = {section: config}
for section, pairs in expected.iteritems():
for section, pairs in expected.items():
ret = u.validate_config_data(unit, conf, section, pairs)
if ret:
message = "glance (rbd) config error: {}".format(ret)
@ -411,7 +411,7 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
'rbd_secret_uuid': u.not_null
}
}
for section, pairs in expected.iteritems():
for section, pairs in expected.items():
ret = u.validate_config_data(unit, conf, section, pairs)
if ret:
message = "nova (rbd) config error: {}".format(ret)

View File

@ -250,7 +250,14 @@ class OpenStackAmuletDeployment(AmuletDeployment):
self.log.debug('Waiting up to {}s for extended status on services: '
'{}'.format(timeout, services))
service_messages = {service: message for service in services}
# Check for idleness
self.d.sentry.wait()
# Check for error states and bail early
self.d.sentry.wait_for_status(self.d.juju_env, services)
# Check for ready messages
self.d.sentry.wait_for_messages(service_messages, timeout=timeout)
self.log.info('OK')
def _get_openstack_release(self):
@ -303,19 +310,26 @@ class OpenStackAmuletDeployment(AmuletDeployment):
test scenario, based on OpenStack release and whether ceph radosgw
is flagged as present or not."""
if self._get_openstack_release() >= self.trusty_kilo:
# Kilo or later
if self._get_openstack_release() <= self.trusty_icehouse:
# Juno or earlier
pools = [
'data',
'metadata',
'rbd',
'cinder-ceph',
'glance'
]
elif (self.trusty_kilo <= self._get_openstack_release() <=
self.zesty_ocata):
# Kilo through Ocata
pools = [
'rbd',
'cinder-ceph',
'glance'
]
else:
# Juno or earlier
# Pike and later
pools = [
'data',
'metadata',
'rbd',
'cinder-ceph',
'glance'
]

View File

@ -23,6 +23,7 @@ import urllib
import urlparse
import cinderclient.v1.client as cinder_client
import cinderclient.v2.client as cinder_clientv2
import glanceclient.v1.client as glance_client
import heatclient.v1.client as heat_client
from keystoneclient.v2_0 import client as keystone_client
@ -42,7 +43,6 @@ import swiftclient
from charmhelpers.contrib.amulet.utils import (
AmuletUtils
)
from charmhelpers.core.decorators import retry_on_exception
from charmhelpers.core.host import CompareHostReleases
DEBUG = logging.DEBUG
@ -310,7 +310,6 @@ class OpenStackAmuletUtils(AmuletUtils):
self.log.debug('Checking if tenant exists ({})...'.format(tenant))
return tenant in [t.name for t in keystone.tenants.list()]
@retry_on_exception(5, base_delay=10)
def keystone_wait_for_propagation(self, sentry_relation_pairs,
api_version):
"""Iterate over list of sentry and relation tuples and verify that
@ -326,7 +325,7 @@ class OpenStackAmuletUtils(AmuletUtils):
rel = sentry.relation('identity-service',
relation_name)
self.log.debug('keystone relation data: {}'.format(rel))
if rel['api_version'] != str(api_version):
if rel.get('api_version') != str(api_version):
raise Exception("api_version not propagated through relation"
" data yet ('{}' != '{}')."
"".format(rel['api_version'], api_version))
@ -348,15 +347,19 @@ class OpenStackAmuletUtils(AmuletUtils):
config = {'preferred-api-version': api_version}
deployment.d.configure('keystone', config)
deployment._auto_wait_for_status()
self.keystone_wait_for_propagation(sentry_relation_pairs, api_version)
def authenticate_cinder_admin(self, keystone_sentry, username,
password, tenant):
password, tenant, api_version=2):
"""Authenticates admin user with cinder."""
# NOTE(beisner): cinder python client doesn't accept tokens.
keystone_ip = keystone_sentry.info['public-address']
ept = "http://{}:5000/v2.0".format(keystone_ip.strip().decode('utf-8'))
return cinder_client.Client(username, password, tenant, ept)
_clients = {
1: cinder_client.Client,
2: cinder_clientv2.Client}
return _clients[api_version](username, password, tenant, ept)
def authenticate_keystone(self, keystone_ip, username, password,
api_version=False, admin_port=False,
@ -617,13 +620,25 @@ class OpenStackAmuletUtils(AmuletUtils):
self.log.debug('Keypair ({}) already exists, '
'using it.'.format(keypair_name))
return _keypair
except:
except Exception:
self.log.debug('Keypair ({}) does not exist, '
'creating it.'.format(keypair_name))
_keypair = nova.keypairs.create(name=keypair_name)
return _keypair
def _get_cinder_obj_name(self, cinder_object):
"""Retrieve name of cinder object.
:param cinder_object: cinder snapshot or volume object
:returns: str cinder object name
"""
# v1 objects store name in 'display_name' attr but v2+ use 'name'
try:
return cinder_object.display_name
except AttributeError:
return cinder_object.name
def create_cinder_volume(self, cinder, vol_name="demo-vol", vol_size=1,
img_id=None, src_vol_id=None, snap_id=None):
"""Create cinder volume, optionally from a glance image, OR
@ -674,6 +689,13 @@ class OpenStackAmuletUtils(AmuletUtils):
source_volid=src_vol_id,
snapshot_id=snap_id)
vol_id = vol_new.id
except TypeError:
vol_new = cinder.volumes.create(name=vol_name,
imageRef=img_id,
size=vol_size,
source_volid=src_vol_id,
snapshot_id=snap_id)
vol_id = vol_new.id
except Exception as e:
msg = 'Failed to create volume: {}'.format(e)
amulet.raise_status(amulet.FAIL, msg=msg)
@ -688,7 +710,7 @@ class OpenStackAmuletUtils(AmuletUtils):
# Re-validate new volume
self.log.debug('Validating volume attributes...')
val_vol_name = cinder.volumes.get(vol_id).display_name
val_vol_name = self._get_cinder_obj_name(cinder.volumes.get(vol_id))
val_vol_boot = cinder.volumes.get(vol_id).bootable
val_vol_stat = cinder.volumes.get(vol_id).status
val_vol_size = cinder.volumes.get(vol_id).size

View File

@ -22,6 +22,7 @@ from __future__ import print_function
import copy
from distutils.version import LooseVersion
from functools import wraps
from collections import namedtuple
import glob
import os
import json
@ -218,6 +219,8 @@ def principal_unit():
for rid in relation_ids(reltype):
for unit in related_units(rid):
md = _metadata_unit(unit)
if not md:
continue
subordinate = md.pop('subordinate', None)
if not subordinate:
return unit
@ -511,7 +514,10 @@ def _metadata_unit(unit):
"""
basedir = os.sep.join(charm_dir().split(os.sep)[:-2])
unitdir = 'unit-{}'.format(unit.replace(os.sep, '-'))
with open(os.path.join(basedir, unitdir, 'charm', 'metadata.yaml')) as md:
joineddir = os.path.join(basedir, unitdir, 'charm', 'metadata.yaml')
if not os.path.exists(joineddir):
return None
with open(joineddir) as md:
return yaml.safe_load(md)
@ -639,18 +645,31 @@ def is_relation_made(relation, keys='private-address'):
return False
def _port_op(op_name, port, protocol="TCP"):
"""Open or close a service network port"""
_args = [op_name]
icmp = protocol.upper() == "ICMP"
if icmp:
_args.append(protocol)
else:
_args.append('{}/{}'.format(port, protocol))
try:
subprocess.check_call(_args)
except subprocess.CalledProcessError:
# Older Juju pre 2.3 doesn't support ICMP
# so treat it as a no-op if it fails.
if not icmp:
raise
def open_port(port, protocol="TCP"):
"""Open a service network port"""
_args = ['open-port']
_args.append('{}/{}'.format(port, protocol))
subprocess.check_call(_args)
_port_op('open-port', port, protocol)
def close_port(port, protocol="TCP"):
"""Close a service network port"""
_args = ['close-port']
_args.append('{}/{}'.format(port, protocol))
subprocess.check_call(_args)
_port_op('close-port', port, protocol)
def open_ports(start, end, protocol="TCP"):
@ -667,6 +686,17 @@ def close_ports(start, end, protocol="TCP"):
subprocess.check_call(_args)
def opened_ports():
"""Get the opened ports
*Note that this will only show ports opened in a previous hook*
:returns: Opened ports as a list of strings: ``['8080/tcp', '8081-8083/tcp']``
"""
_args = ['opened-ports', '--format=json']
return json.loads(subprocess.check_output(_args).decode('UTF-8'))
@cached
def unit_get(attribute):
"""Get the unit ID for the remote unit"""
@ -1077,6 +1107,35 @@ def network_get_primary_address(binding):
return subprocess.check_output(cmd).decode('UTF-8').strip()
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def network_get(endpoint, relation_id=None):
"""
Retrieve the network details for a relation endpoint
:param endpoint: string. The name of a relation endpoint
:param relation_id: int. The ID of the relation for the current context.
:return: dict. The loaded YAML output of the network-get query.
:raise: NotImplementedError if run on Juju < 2.1
"""
cmd = ['network-get', endpoint, '--format', 'yaml']
if relation_id:
cmd.append('-r')
cmd.append(relation_id)
try:
response = subprocess.check_output(
cmd,
stderr=subprocess.STDOUT).decode('UTF-8').strip()
except CalledProcessError as e:
# Early versions of Juju 2.0.x required the --primary-address argument.
# We catch that condition here and raise NotImplementedError since
# the requested semantics are not available - the caller can then
# use the network_get_primary_address() method instead.
if '--primary-address is currently required' in e.output.decode('UTF-8'):
raise NotImplementedError
raise
return yaml.safe_load(response)
def add_metric(*args, **kwargs):
"""Add metric values. Values may be expressed with keyword arguments. For
metric names containing dashes, these may be expressed as one or more
@ -1106,3 +1165,42 @@ def meter_info():
"""Get the meter status information, if running in the meter-status-changed
hook."""
return os.environ.get('JUJU_METER_INFO')
def iter_units_for_relation_name(relation_name):
"""Iterate through all units in a relation
Generator that iterates through all the units in a relation and yields
a named tuple with rid and unit field names.
Usage:
data = [(u.rid, u.unit)
for u in iter_units_for_relation_name(relation_name)]
:param relation_name: string relation name
:yield: Named Tuple with rid and unit field names
"""
RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
for rid in relation_ids(relation_name):
for unit in related_units(rid):
yield RelatedUnit(rid, unit)
def ingress_address(rid=None, unit=None):
"""
Retrieve the ingress-address from a relation when available. Otherwise,
return the private-address. This function is to be used on the consuming
side of the relation.
Usage:
addresses = [ingress_address(rid=u.rid, unit=u.unit)
for u in iter_units_for_relation_name(relation_name)]
:param rid: string relation id
:param unit: string unit name
:side effect: calls relation_get
:return: string IP address
"""
settings = relation_get(rid=rid, unit=unit)
return (settings.get('ingress-address') or
settings.get('private-address'))

View File

@ -34,7 +34,7 @@ import six
from contextlib import contextmanager
from collections import OrderedDict
from .hookenv import log, DEBUG
from .hookenv import log, DEBUG, local_unit
from .fstab import Fstab
from charmhelpers.osplatform import get_platform
@ -441,6 +441,49 @@ def add_user_to_group(username, group):
subprocess.check_call(cmd)
def chage(username, lastday=None, expiredate=None, inactive=None,
mindays=None, maxdays=None, root=None, warndays=None):
"""Change user password expiry information
:param str username: User to update
:param str lastday: Set when password was changed in YYYY-MM-DD format
:param str expiredate: Set when user's account will no longer be
accessible in YYYY-MM-DD format.
-1 will remove an account expiration date.
:param str inactive: Set the number of days of inactivity after a password
has expired before the account is locked.
-1 will remove an account's inactivity.
:param str mindays: Set the minimum number of days between password
changes to MIN_DAYS.
0 indicates the password can be changed anytime.
:param str maxdays: Set the maximum number of days during which a
password is valid.
-1 as MAX_DAYS will remove checking maxdays
:param str root: Apply changes in the CHROOT_DIR directory
:param str warndays: Set the number of days of warning before a password
change is required
:raises subprocess.CalledProcessError: if call to chage fails
"""
cmd = ['chage']
if root:
cmd.extend(['--root', root])
if lastday:
cmd.extend(['--lastday', lastday])
if expiredate:
cmd.extend(['--expiredate', expiredate])
if inactive:
cmd.extend(['--inactive', inactive])
if mindays:
cmd.extend(['--mindays', mindays])
if maxdays:
cmd.extend(['--maxdays', maxdays])
if warndays:
cmd.extend(['--warndays', warndays])
cmd.append(username)
subprocess.check_call(cmd)
remove_password_expiry = functools.partial(chage, expiredate='-1', inactive='-1', mindays='0', maxdays='-1')
def rsync(from_path, to_path, flags='-r', options=None, timeout=None):
"""Replicate the contents of a path"""
options = options or ['--delete', '--executability']
@ -946,3 +989,31 @@ def updatedb(updatedb_text, new_path):
lines[i] = 'PRUNEPATHS="{}"'.format(' '.join(paths))
output = "\n".join(lines)
return output
def modulo_distribution(modulo=3, wait=30):
""" Modulo distribution
This helper uses the unit number, a modulo value and a constant wait time
to produce a calculated wait time distribution. This is useful in large
scale deployments to distribute load during an expensive operation such as
service restarts.
If you have 1000 nodes that need to restart 100 at a time 1 minute at a
time:
time.wait(modulo_distribution(modulo=100, wait=60))
restart()
If you need restarts to happen serially set modulo to the exact number of
nodes and set a high constant wait time:
time.wait(modulo_distribution(modulo=10, wait=120))
restart()
@param modulo: int The modulo number creates the group distribution
@param wait: int The constant time wait value
@return: int Calculated time to wait for unit operation
"""
unit_number = int(local_unit().split('/')[1])
return (unit_number % modulo) * wait

View File

@ -61,13 +61,19 @@ def bytes_from_string(value):
if isinstance(value, six.string_types):
value = six.text_type(value)
else:
msg = "Unable to interpret non-string value '%s' as boolean" % (value)
msg = "Unable to interpret non-string value '%s' as bytes" % (value)
raise ValueError(msg)
matches = re.match("([0-9]+)([a-zA-Z]+)", value)
if not matches:
msg = "Unable to interpret string value '%s' as bytes" % (value)
raise ValueError(msg)
return int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])
if matches:
size = int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])
else:
# Assume that value passed in is bytes
try:
size = int(value)
except ValueError:
msg = "Unable to interpret string value '%s' as bytes" % (value)
raise ValueError(msg)
return size
class BasicStringComparator(object):

View File

@ -358,7 +358,7 @@ class Storage(object):
try:
yield self.revision
self.revision = None
except:
except Exception:
self.flush(False)
self.revision = None
raise

10
tox.ini
View File

@ -2,8 +2,9 @@
# This file is managed centrally by release-tools and should not be modified
# within individual charm repos.
[tox]
envlist = pep8,py27
envlist = pep8,py27,py35,py36
skipsdist = True
skip_missing_interpreters = True
[testenv]
setenv = VIRTUAL_ENV={envdir}
@ -20,12 +21,19 @@ passenv = HOME TERM AMULET_* CS_API_*
basepython = python2.7
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
# temporarily disable py27
commands = /bin/true
[testenv:py35]
basepython = python3.5
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
[testenv:py36]
basepython = python3.6
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
[testenv:pep8]
basepython = python2.7
deps = -r{toxinidir}/requirements.txt

View File

@ -16,3 +16,4 @@ import sys
sys.path.append('hooks')
sys.path.append('lib')
sys.path.append('actions')
sys.path.append('unit_tests')

View File

@ -121,7 +121,7 @@ class ReplaceOsdTestCase(test_utils.CharmTestCase):
@patch('replace_osd.os.lstat')
def test_get_device_number(self, lstat):
lstat.return_value = posix.stat_result([
16877, 16, 51729L, 3, 0, 0, 217, 0, 1458086872, 1458086872
16877, 16, 51729, 3, 0, 0, 217, 0, 1458086872, 1458086872
])
major, minor = replace_osd.get_device_number(1)
assert major == 202

View File

@ -45,7 +45,7 @@ class PerformanceTestCase(test_utils.CharmTestCase):
@patch.object(ceph.subprocess, 'check_output')
def test_get_block_uuid(self, check_output):
check_output.return_value = \
'UUID=378f3c86-b21a-4172-832d-e2b3d4bc7511\nTYPE=ext2\n'
b'UUID=378f3c86-b21a-4172-832d-e2b3d4bc7511\nTYPE=ext2\n'
uuid = ceph.get_block_uuid('/dev/sda1')
self.assertEqual(uuid, '378f3c86-b21a-4172-832d-e2b3d4bc7511')

View File

@ -44,7 +44,7 @@ def load_config():
if not config:
logging.error('Could not find config.yaml in any parent directory '
'of %s. ' % f)
'of {}. '.format(f))
raise Exception
return yaml.safe_load(open(config).read())['options']
@ -57,7 +57,7 @@ def get_default_config():
'''
default_config = {}
config = load_config()
for k, v in config.iteritems():
for k, v in config.items():
if 'default' in v:
default_config[k] = v['default']
else:
@ -138,5 +138,5 @@ def patch_open():
mock_open(*args, **kwargs)
yield mock_file
with patch('__builtin__.open', stub_open):
with patch('builtins.open', stub_open):
yield mock_open, mock_file