charmhelper sync to get fix for bug 1517846

This commit is contained in:
Edward Hope-Morley 2015-11-19 17:16:51 +00:00
parent 0a8c917402
commit 72eca1f0e2
16 changed files with 527 additions and 116 deletions

View File

@ -20,7 +20,7 @@ import sys
from six.moves import zip
from charmhelpers.core import unitdata
import charmhelpers.core.unitdata
class OutputFormatter(object):
@ -163,8 +163,8 @@ class CommandLine(object):
if getattr(arguments.func, '_cli_no_output', False):
output = ''
self.formatter.format_output(output, arguments.format)
if unitdata._KV:
unitdata._KV.flush()
if charmhelpers.core.unitdata._KV:
charmhelpers.core.unitdata._KV.flush()
cmdline = CommandLine()

View File

@ -148,6 +148,13 @@ define service {{
self.description = description
self.check_cmd = self._locate_cmd(check_cmd)
def _get_check_filename(self):
return os.path.join(NRPE.nrpe_confdir, '{}.cfg'.format(self.command))
def _get_service_filename(self, hostname):
return os.path.join(NRPE.nagios_exportdir,
'service__{}_{}.cfg'.format(hostname, self.command))
def _locate_cmd(self, check_cmd):
search_path = (
'/usr/lib/nagios/plugins',
@ -163,9 +170,21 @@ define service {{
log('Check command not found: {}'.format(parts[0]))
return ''
def _remove_service_files(self):
if not os.path.exists(NRPE.nagios_exportdir):
return
for f in os.listdir(NRPE.nagios_exportdir):
if f.endswith('_{}.cfg'.format(self.command)):
os.remove(os.path.join(NRPE.nagios_exportdir, f))
def remove(self, hostname):
nrpe_check_file = self._get_check_filename()
if os.path.exists(nrpe_check_file):
os.remove(nrpe_check_file)
self._remove_service_files()
def write(self, nagios_context, hostname, nagios_servicegroups):
nrpe_check_file = '/etc/nagios/nrpe.d/{}.cfg'.format(
self.command)
nrpe_check_file = self._get_check_filename()
with open(nrpe_check_file, 'w') as nrpe_check_config:
nrpe_check_config.write("# check {}\n".format(self.shortname))
nrpe_check_config.write("command[{}]={}\n".format(
@ -180,9 +199,7 @@ define service {{
def write_service_config(self, nagios_context, hostname,
nagios_servicegroups):
for f in os.listdir(NRPE.nagios_exportdir):
if re.search('.*{}.cfg'.format(self.command), f):
os.remove(os.path.join(NRPE.nagios_exportdir, f))
self._remove_service_files()
templ_vars = {
'nagios_hostname': hostname,
@ -192,8 +209,7 @@ define service {{
'command': self.command,
}
nrpe_service_text = Check.service_template.format(**templ_vars)
nrpe_service_file = '{}/service__{}_{}.cfg'.format(
NRPE.nagios_exportdir, hostname, self.command)
nrpe_service_file = self._get_service_filename(hostname)
with open(nrpe_service_file, 'w') as nrpe_service_config:
nrpe_service_config.write(str(nrpe_service_text))
@ -218,12 +234,32 @@ class NRPE(object):
if hostname:
self.hostname = hostname
else:
self.hostname = "{}-{}".format(self.nagios_context, self.unit_name)
nagios_hostname = get_nagios_hostname()
if nagios_hostname:
self.hostname = nagios_hostname
else:
self.hostname = "{}-{}".format(self.nagios_context, self.unit_name)
self.checks = []
def add_check(self, *args, **kwargs):
self.checks.append(Check(*args, **kwargs))
def remove_check(self, *args, **kwargs):
if kwargs.get('shortname') is None:
raise ValueError('shortname of check must be specified')
# Use sensible defaults if they're not specified - these are not
# actually used during removal, but they're required for constructing
# the Check object; check_disk is chosen because it's part of the
# nagios-plugins-basic package.
if kwargs.get('check_cmd') is None:
kwargs['check_cmd'] = 'check_disk'
if kwargs.get('description') is None:
kwargs['description'] = ''
check = Check(*args, **kwargs)
check.remove(self.hostname)
def write(self):
try:
nagios_uid = pwd.getpwnam('nagios').pw_uid

View File

@ -23,7 +23,7 @@ import socket
from functools import partial
from charmhelpers.core.hookenv import unit_get
from charmhelpers.fetch import apt_install
from charmhelpers.fetch import apt_install, apt_update
from charmhelpers.core.hookenv import (
log,
WARNING,
@ -32,13 +32,15 @@ from charmhelpers.core.hookenv import (
try:
import netifaces
except ImportError:
apt_install('python-netifaces')
apt_update(fatal=True)
apt_install('python-netifaces', fatal=True)
import netifaces
try:
import netaddr
except ImportError:
apt_install('python-netaddr')
apt_update(fatal=True)
apt_install('python-netaddr', fatal=True)
import netaddr

View File

@ -26,6 +26,7 @@
import os
import shutil
import six
import json
import time
import uuid
@ -59,6 +60,8 @@ from charmhelpers.fetch import (
apt_install,
)
from charmhelpers.core.kernel import modprobe
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
KEYFILE = '/etc/ceph/ceph.client.{}.key'
@ -123,29 +126,37 @@ def get_osds(service):
return None
def create_pool(service, name, replicas=3):
def update_pool(client, pool, settings):
cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool]
for k, v in six.iteritems(settings):
cmd.append(k)
cmd.append(v)
check_call(cmd)
def create_pool(service, name, replicas=3, pg_num=None):
"""Create a new RADOS pool."""
if pool_exists(service, name):
log("Ceph pool {} already exists, skipping creation".format(name),
level=WARNING)
return
# Calculate the number of placement groups based
# on upstream recommended best practices.
osds = get_osds(service)
if osds:
pgnum = (len(osds) * 100 // replicas)
else:
# NOTE(james-page): Default to 200 for older ceph versions
# which don't support OSD query from cli
pgnum = 200
if not pg_num:
# Calculate the number of placement groups based
# on upstream recommended best practices.
osds = get_osds(service)
if osds:
pg_num = (len(osds) * 100 // replicas)
else:
# NOTE(james-page): Default to 200 for older ceph versions
# which don't support OSD query from cli
pg_num = 200
cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)]
check_call(cmd)
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
str(replicas)]
check_call(cmd)
update_pool(service, name, settings={'size': str(replicas)})
def delete_pool(service, name):
@ -200,10 +211,10 @@ def create_key_file(service, key):
log('Created new keyfile at %s.' % keyfile, level=INFO)
def get_ceph_nodes():
"""Query named relation 'ceph' to determine current nodes."""
def get_ceph_nodes(relation='ceph'):
"""Query named relation to determine current nodes."""
hosts = []
for r_id in relation_ids('ceph'):
for r_id in relation_ids(relation):
for unit in related_units(r_id):
hosts.append(relation_get('private-address', unit=unit, rid=r_id))
@ -291,17 +302,6 @@ def place_data_on_block_device(blk_device, data_src_dst):
os.chown(data_src_dst, uid, gid)
# TODO: re-use
def modprobe(module):
"""Load a kernel module and configure for auto-load on reboot."""
log('Loading kernel module', level=INFO)
cmd = ['modprobe', module]
check_call(cmd)
with open('/etc/modules', 'r+') as modules:
if module not in modules.read():
modules.write(module)
def copy_files(src, dst, symlinks=False, ignore=None):
"""Copy files from src to dst."""
for item in os.listdir(src):
@ -366,14 +366,14 @@ def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
service_start(svc)
def ensure_ceph_keyring(service, user=None, group=None):
def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
Returns False if no ceph key is available in relation state.
"""
key = None
for rid in relation_ids('ceph'):
for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
@ -549,7 +549,7 @@ def get_previous_request(rid):
return request
def get_request_states(request):
def get_request_states(request, relation='ceph'):
"""Return a dict of requests per relation id with their corresponding
completion state.
@ -561,7 +561,7 @@ def get_request_states(request):
"""
complete = []
requests = {}
for rid in relation_ids('ceph'):
for rid in relation_ids(relation):
complete = False
previous_request = get_previous_request(rid)
if request == previous_request:
@ -579,14 +579,14 @@ def get_request_states(request):
return requests
def is_request_sent(request):
def is_request_sent(request, relation='ceph'):
"""Check to see if a functionally equivalent request has already been sent
Returns True if a similair request has been sent
@param request: A CephBrokerRq object
"""
states = get_request_states(request)
states = get_request_states(request, relation=relation)
for rid in states.keys():
if not states[rid]['sent']:
return False
@ -594,7 +594,7 @@ def is_request_sent(request):
return True
def is_request_complete(request):
def is_request_complete(request, relation='ceph'):
"""Check to see if a functionally equivalent request has already been
completed
@ -602,7 +602,7 @@ def is_request_complete(request):
@param request: A CephBrokerRq object
"""
states = get_request_states(request)
states = get_request_states(request, relation=relation)
for rid in states.keys():
if not states[rid]['complete']:
return False
@ -652,15 +652,15 @@ def get_broker_rsp_key():
return 'broker-rsp-' + local_unit().replace('/', '-')
def send_request_if_needed(request):
def send_request_if_needed(request, relation='ceph'):
"""Send broker request if an equivalent request has not already been sent
@param request: A CephBrokerRq object
"""
if is_request_sent(request):
if is_request_sent(request, relation=relation):
log('Request already sent but not complete, not sending new request',
level=DEBUG)
else:
for rid in relation_ids('ceph'):
for rid in relation_ids(relation):
log('Sending request {}'.format(request.request_id), level=DEBUG)
relation_set(relation_id=rid, broker_req=request.request)

View File

@ -490,6 +490,19 @@ def relation_types():
return rel_types
@cached
def peer_relation_id():
'''Get a peer relation id if a peer relation has been joined, else None.'''
md = metadata()
section = md.get('peers')
if section:
for key in section:
relids = relation_ids(key)
if relids:
return relids[0]
return None
@cached
def relation_to_interface(relation_name):
"""
@ -623,6 +636,38 @@ def unit_private_ip():
return unit_get('private-address')
@cached
def storage_get(attribute="", storage_id=""):
"""Get storage attributes"""
_args = ['storage-get', '--format=json']
if storage_id:
_args.extend(('-s', storage_id))
if attribute:
_args.append(attribute)
try:
return json.loads(subprocess.check_output(_args).decode('UTF-8'))
except ValueError:
return None
@cached
def storage_list(storage_name=""):
"""List the storage IDs for the unit"""
_args = ['storage-list', '--format=json']
if storage_name:
_args.append(storage_name)
try:
return json.loads(subprocess.check_output(_args).decode('UTF-8'))
except ValueError:
return None
except OSError as e:
import errno
if e.errno == errno.ENOENT:
# storage-list does not exist
return []
raise
class UnregisteredHookError(Exception):
"""Raised when an undefined hook is called"""
pass
@ -788,6 +833,7 @@ def status_get():
def translate_exc(from_exc, to_exc):
def inner_translate_exc1(f):
@wraps(f)
def inner_translate_exc2(*args, **kwargs):
try:
return f(*args, **kwargs)

View File

@ -63,33 +63,53 @@ def service_reload(service_name, restart_on_failure=False):
return service_result
def service_pause(service_name, init_dir=None):
def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d"):
"""Pause a system service.
Stop it, and prevent it from starting again at boot."""
if init_dir is None:
init_dir = "/etc/init"
stopped = service_stop(service_name)
# XXX: Support systemd too
override_path = os.path.join(
init_dir, '{}.override'.format(service_name))
with open(override_path, 'w') as fh:
fh.write("manual\n")
stopped = True
if service_running(service_name):
stopped = service_stop(service_name)
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
sysv_file = os.path.join(initd_dir, service_name)
if os.path.exists(upstart_file):
override_path = os.path.join(
init_dir, '{}.override'.format(service_name))
with open(override_path, 'w') as fh:
fh.write("manual\n")
elif os.path.exists(sysv_file):
subprocess.check_call(["update-rc.d", service_name, "disable"])
else:
# XXX: Support SystemD too
raise ValueError(
"Unable to detect {0} as either Upstart {1} or SysV {2}".format(
service_name, upstart_file, sysv_file))
return stopped
def service_resume(service_name, init_dir=None):
def service_resume(service_name, init_dir="/etc/init",
initd_dir="/etc/init.d"):
"""Resume a system service.
Reenable starting again at boot. Start the service"""
# XXX: Support systemd too
if init_dir is None:
init_dir = "/etc/init"
override_path = os.path.join(
init_dir, '{}.override'.format(service_name))
if os.path.exists(override_path):
os.unlink(override_path)
started = service_start(service_name)
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
sysv_file = os.path.join(initd_dir, service_name)
if os.path.exists(upstart_file):
override_path = os.path.join(
init_dir, '{}.override'.format(service_name))
if os.path.exists(override_path):
os.unlink(override_path)
elif os.path.exists(sysv_file):
subprocess.check_call(["update-rc.d", service_name, "enable"])
else:
# XXX: Support SystemD too
raise ValueError(
"Unable to detect {0} as either Upstart {1} or SysV {2}".format(
service_name, upstart_file, sysv_file))
started = service_running(service_name)
if not started:
started = service_start(service_name)
return started
@ -550,7 +570,14 @@ def chdir(d):
os.chdir(cur)
def chownr(path, owner, group, follow_links=True):
def chownr(path, owner, group, follow_links=True, chowntopdir=False):
"""
Recursively change user and group ownership of files and directories
in given path. Doesn't chown path itself by default, only its children.
:param bool follow_links: Also Chown links if True
:param bool chowntopdir: Also chown path itself if True
"""
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
if follow_links:
@ -558,6 +585,10 @@ def chownr(path, owner, group, follow_links=True):
else:
chown = os.lchown
if chowntopdir:
broken_symlink = os.path.lexists(path) and not os.path.exists(path)
if not broken_symlink:
chown(path, uid, gid)
for root, dirs, files in os.walk(path):
for name in dirs + files:
full = os.path.join(root, name)
@ -568,3 +599,19 @@ def chownr(path, owner, group, follow_links=True):
def lchownr(path, owner, group):
chownr(path, owner, group, follow_links=False)
def get_total_ram():
'''The total amount of system RAM in bytes.
This is what is reported by the OS, and may be overcommitted when
there are multiple containers hosted on the same machine.
'''
with open('/proc/meminfo', 'r') as f:
for line in f.readlines():
if line:
key, value, unit = line.split()
if key == 'MemTotal:':
assert unit == 'kB', 'Unknown unit'
return int(value) * 1024 # Classic, not KiB.
raise NotImplementedError()

View File

@ -25,11 +25,13 @@ from charmhelpers.core.host import (
fstab_mount,
mkdir,
)
from charmhelpers.core.strutils import bytes_from_string
from subprocess import check_output
def hugepage_support(user, group='hugetlb', nr_hugepages=256,
max_map_count=65536, mnt_point='/run/hugepages/kvm',
pagesize='2MB', mount=True):
pagesize='2MB', mount=True, set_shmmax=False):
"""Enable hugepages on system.
Args:
@ -44,11 +46,18 @@ def hugepage_support(user, group='hugetlb', nr_hugepages=256,
group_info = add_group(group)
gid = group_info.gr_gid
add_user_to_group(user, group)
if max_map_count < 2 * nr_hugepages:
max_map_count = 2 * nr_hugepages
sysctl_settings = {
'vm.nr_hugepages': nr_hugepages,
'vm.max_map_count': max_map_count,
'vm.hugetlb_shm_group': gid,
}
if set_shmmax:
shmmax_current = int(check_output(['sysctl', '-n', 'kernel.shmmax']))
shmmax_minsize = bytes_from_string(pagesize) * nr_hugepages
if shmmax_minsize > shmmax_current:
sysctl_settings['kernel.shmmax'] = shmmax_minsize
sysctl.create(yaml.dump(sysctl_settings), '/etc/sysctl.d/10-hugepage.conf')
mkdir(mnt_point, owner='root', group='root', perms=0o755, force=False)
lfstab = fstab.Fstab()

View File

@ -0,0 +1,68 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2014-2015 Canonical Limited.
#
# This file is part of charm-helpers.
#
# charm-helpers is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3 as
# published by the Free Software Foundation.
#
# charm-helpers is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
__author__ = "Jorge Niedbalski <jorge.niedbalski@canonical.com>"
from charmhelpers.core.hookenv import (
log,
INFO
)
from subprocess import check_call, check_output
import re
def modprobe(module, persist=True):
"""Load a kernel module and configure for auto-load on reboot."""
cmd = ['modprobe', module]
log('Loading kernel module %s' % module, level=INFO)
check_call(cmd)
if persist:
with open('/etc/modules', 'r+') as modules:
if module not in modules.read():
modules.write(module)
def rmmod(module, force=False):
"""Remove a module from the linux kernel"""
cmd = ['rmmod']
if force:
cmd.append('-f')
cmd.append(module)
log('Removing kernel module %s' % module, level=INFO)
return check_call(cmd)
def lsmod():
"""Shows what kernel modules are currently loaded"""
return check_output(['lsmod'],
universal_newlines=True)
def is_module_loaded(module):
"""Checks if a kernel module is already loaded"""
matches = re.findall('^%s[ ]+' % module, lsmod(), re.M)
return len(matches) > 0
def update_initramfs(version='all'):
"""Updates an initramfs image"""
return check_call(["update-initramfs", "-k", version, "-u"])

View File

@ -249,16 +249,18 @@ class TemplateCallback(ManagerCallback):
:param int perms: The permissions of the rendered file
:param partial on_change_action: functools partial to be executed when
rendered file changes
:param jinja2 loader template_loader: A jinja2 template loader
"""
def __init__(self, source, target,
owner='root', group='root', perms=0o444,
on_change_action=None):
on_change_action=None, template_loader=None):
self.source = source
self.target = target
self.owner = owner
self.group = group
self.perms = perms
self.on_change_action = on_change_action
self.template_loader = template_loader
def __call__(self, manager, service_name, event_name):
pre_checksum = ''
@ -269,7 +271,8 @@ class TemplateCallback(ManagerCallback):
for ctx in service.get('required_data', []):
context.update(ctx)
templating.render(self.source, self.target, context,
self.owner, self.group, self.perms)
self.owner, self.group, self.perms,
template_loader=self.template_loader)
if self.on_change_action:
if pre_checksum == host.file_hash(self.target):
hookenv.log(

View File

@ -18,6 +18,7 @@
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
import six
import re
def bool_from_string(value):
@ -40,3 +41,32 @@ def bool_from_string(value):
msg = "Unable to interpret string value '%s' as boolean" % (value)
raise ValueError(msg)
def bytes_from_string(value):
"""Interpret human readable string value as bytes.
Returns int
"""
BYTE_POWER = {
'K': 1,
'KB': 1,
'M': 2,
'MB': 2,
'G': 3,
'GB': 3,
'T': 4,
'TB': 4,
'P': 5,
'PB': 5,
}
if isinstance(value, six.string_types):
value = six.text_type(value)
else:
msg = "Unable to interpret non-string value '%s' as boolean" % (value)
raise ValueError(msg)
matches = re.match("([0-9]+)([a-zA-Z]+)", value)
if not matches:
msg = "Unable to interpret string value '%s' as bytes" % (value)
raise ValueError(msg)
return int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])

View File

@ -21,7 +21,7 @@ from charmhelpers.core import hookenv
def render(source, target, context, owner='root', group='root',
perms=0o444, templates_dir=None, encoding='UTF-8'):
perms=0o444, templates_dir=None, encoding='UTF-8', template_loader=None):
"""
Render a template.
@ -52,17 +52,24 @@ def render(source, target, context, owner='root', group='root',
apt_install('python-jinja2', fatal=True)
from jinja2 import FileSystemLoader, Environment, exceptions
if templates_dir is None:
templates_dir = os.path.join(hookenv.charm_dir(), 'templates')
loader = Environment(loader=FileSystemLoader(templates_dir))
if template_loader:
template_env = Environment(loader=template_loader)
else:
if templates_dir is None:
templates_dir = os.path.join(hookenv.charm_dir(), 'templates')
template_env = Environment(loader=FileSystemLoader(templates_dir))
try:
source = source
template = loader.get_template(source)
template = template_env.get_template(source)
except exceptions.TemplateNotFound as e:
hookenv.log('Could not load template %s from %s.' %
(source, templates_dir),
level=hookenv.ERROR)
raise e
content = template.render(context)
host.mkdir(os.path.dirname(target), owner, group, perms=0o755)
target_dir = os.path.dirname(target)
if not os.path.exists(target_dir):
# This is a terrible default directory permission, as the file
# or its siblings will often contain secrets.
host.mkdir(os.path.dirname(target), owner, group, perms=0o755)
host.write_file(target, content.encode(encoding), owner, group, perms)

View File

@ -225,12 +225,12 @@ def apt_purge(packages, fatal=False):
def apt_mark(packages, mark, fatal=False):
"""Flag one or more packages using apt-mark"""
log("Marking {} as {}".format(packages, mark))
cmd = ['apt-mark', mark]
if isinstance(packages, six.string_types):
cmd.append(packages)
else:
cmd.extend(packages)
log("Holding {}".format(packages))
if fatal:
subprocess.check_call(cmd, universal_newlines=True)

View File

@ -51,7 +51,8 @@ class AmuletDeployment(object):
if 'units' not in this_service:
this_service['units'] = 1
self.d.add(this_service['name'], units=this_service['units'])
self.d.add(this_service['name'], units=this_service['units'],
constraints=this_service.get('constraints'))
for svc in other_services:
if 'location' in svc:
@ -64,7 +65,8 @@ class AmuletDeployment(object):
if 'units' not in svc:
svc['units'] = 1
self.d.add(svc['name'], charm=branch_location, units=svc['units'])
self.d.add(svc['name'], charm=branch_location, units=svc['units'],
constraints=svc.get('constraints'))
def _add_relations(self, relations):
"""Add all of the relations for the services."""

View File

@ -326,7 +326,7 @@ class AmuletUtils(object):
def service_restarted_since(self, sentry_unit, mtime, service,
pgrep_full=None, sleep_time=20,
retry_count=2, retry_sleep_time=30):
retry_count=30, retry_sleep_time=10):
"""Check if service was been started after a given time.
Args:
@ -334,8 +334,9 @@ class AmuletUtils(object):
mtime (float): The epoch time to check against
service (string): service name to look for in process table
pgrep_full: [Deprecated] Use full command line search mode with pgrep
sleep_time (int): Seconds to sleep before looking for process
retry_count (int): If service is not found, how many times to retry
sleep_time (int): Initial sleep time (s) before looking for file
retry_sleep_time (int): Time (s) to sleep between retries
retry_count (int): If file is not found, how many times to retry
Returns:
bool: True if service found and its start time it newer than mtime,
@ -359,11 +360,12 @@ class AmuletUtils(object):
pgrep_full)
self.log.debug('Attempt {} to get {} proc start time on {} '
'OK'.format(tries, service, unit_name))
except IOError:
except IOError as e:
# NOTE(beisner) - race avoidance, proc may not exist yet.
# https://bugs.launchpad.net/charm-helpers/+bug/1474030
self.log.debug('Attempt {} to get {} proc start time on {} '
'failed'.format(tries, service, unit_name))
'failed\n{}'.format(tries, service,
unit_name, e))
time.sleep(retry_sleep_time)
tries += 1
@ -383,35 +385,62 @@ class AmuletUtils(object):
return False
def config_updated_since(self, sentry_unit, filename, mtime,
sleep_time=20):
sleep_time=20, retry_count=30,
retry_sleep_time=10):
"""Check if file was modified after a given time.
Args:
sentry_unit (sentry): The sentry unit to check the file mtime on
filename (string): The file to check mtime of
mtime (float): The epoch time to check against
sleep_time (int): Seconds to sleep before looking for process
sleep_time (int): Initial sleep time (s) before looking for file
retry_sleep_time (int): Time (s) to sleep between retries
retry_count (int): If file is not found, how many times to retry
Returns:
bool: True if file was modified more recently than mtime, False if
file was modified before mtime,
file was modified before mtime, or if file not found.
"""
self.log.debug('Checking %s updated since %s' % (filename, mtime))
unit_name = sentry_unit.info['unit_name']
self.log.debug('Checking that %s updated since %s on '
'%s' % (filename, mtime, unit_name))
time.sleep(sleep_time)
file_mtime = self._get_file_mtime(sentry_unit, filename)
file_mtime = None
tries = 0
while tries <= retry_count and not file_mtime:
try:
file_mtime = self._get_file_mtime(sentry_unit, filename)
self.log.debug('Attempt {} to get {} file mtime on {} '
'OK'.format(tries, filename, unit_name))
except IOError as e:
# NOTE(beisner) - race avoidance, file may not exist yet.
# https://bugs.launchpad.net/charm-helpers/+bug/1474030
self.log.debug('Attempt {} to get {} file mtime on {} '
'failed\n{}'.format(tries, filename,
unit_name, e))
time.sleep(retry_sleep_time)
tries += 1
if not file_mtime:
self.log.warn('Could not determine file mtime, assuming '
'file does not exist')
return False
if file_mtime >= mtime:
self.log.debug('File mtime is newer than provided mtime '
'(%s >= %s)' % (file_mtime, mtime))
'(%s >= %s) on %s (OK)' % (file_mtime,
mtime, unit_name))
return True
else:
self.log.warn('File mtime %s is older than provided mtime %s'
% (file_mtime, mtime))
self.log.warn('File mtime is older than provided mtime'
'(%s < on %s) on %s' % (file_mtime,
mtime, unit_name))
return False
def validate_service_config_changed(self, sentry_unit, mtime, service,
filename, pgrep_full=None,
sleep_time=20, retry_count=2,
retry_sleep_time=30):
sleep_time=20, retry_count=30,
retry_sleep_time=10):
"""Check service and file were updated after mtime
Args:
@ -456,7 +485,9 @@ class AmuletUtils(object):
sentry_unit,
filename,
mtime,
sleep_time=0)
sleep_time=sleep_time,
retry_count=retry_count,
retry_sleep_time=retry_sleep_time)
return service_restart and config_update
@ -776,3 +807,12 @@ class AmuletUtils(object):
output = _check_output(command, universal_newlines=True)
data = json.loads(output)
return data.get(u"status") == "completed"
def status_get(self, unit):
"""Return the current service status of this unit."""
raw_status, return_code = unit.run(
"status-get --format=json --include-data")
if return_code != 0:
return ("unknown", "")
status = json.loads(raw_status)
return (status["status"], status["message"])

View File

@ -14,12 +14,18 @@
# You should have received a copy of the GNU Lesser General Public License
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
import logging
import re
import sys
import six
from collections import OrderedDict
from charmhelpers.contrib.amulet.deployment import (
AmuletDeployment
)
DEBUG = logging.DEBUG
ERROR = logging.ERROR
class OpenStackAmuletDeployment(AmuletDeployment):
"""OpenStack amulet deployment.
@ -28,9 +34,12 @@ class OpenStackAmuletDeployment(AmuletDeployment):
that is specifically for use by OpenStack charms.
"""
def __init__(self, series=None, openstack=None, source=None, stable=True):
def __init__(self, series=None, openstack=None, source=None,
stable=True, log_level=DEBUG):
"""Initialize the deployment environment."""
super(OpenStackAmuletDeployment, self).__init__(series)
self.log = self.get_logger(level=log_level)
self.log.info('OpenStackAmuletDeployment: init')
self.openstack = openstack
self.source = source
self.stable = stable
@ -38,6 +47,22 @@ class OpenStackAmuletDeployment(AmuletDeployment):
# out.
self.current_next = "trusty"
def get_logger(self, name="deployment-logger", level=logging.DEBUG):
"""Get a logger object that will log to stdout."""
log = logging
logger = log.getLogger(name)
fmt = log.Formatter("%(asctime)s %(funcName)s "
"%(levelname)s: %(message)s")
handler = log.StreamHandler(stream=sys.stdout)
handler.setLevel(level)
handler.setFormatter(fmt)
logger.addHandler(handler)
logger.setLevel(level)
return logger
def _determine_branch_locations(self, other_services):
"""Determine the branch locations for the other services.
@ -45,6 +70,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
stable or next (dev) branch, and based on this, use the corresonding
stable or next branches for the other_services."""
self.log.info('OpenStackAmuletDeployment: determine branch locations')
# Charms outside the lp:~openstack-charmers namespace
base_charms = ['mysql', 'mongodb', 'nrpe']
@ -58,19 +85,17 @@ class OpenStackAmuletDeployment(AmuletDeployment):
else:
base_series = self.current_next
if self.stable:
for svc in other_services:
if svc['name'] in force_series_current:
base_series = self.current_next
for svc in other_services:
if svc['name'] in force_series_current:
base_series = self.current_next
# If a location has been explicitly set, use it
if svc.get('location'):
continue
if self.stable:
temp = 'lp:charms/{}/{}'
svc['location'] = temp.format(base_series,
svc['name'])
else:
for svc in other_services:
if svc['name'] in force_series_current:
base_series = self.current_next
else:
if svc['name'] in base_charms:
temp = 'lp:charms/{}/{}'
svc['location'] = temp.format(base_series,
@ -79,10 +104,13 @@ class OpenStackAmuletDeployment(AmuletDeployment):
temp = 'lp:~openstack-charmers/charms/{}/{}/next'
svc['location'] = temp.format(self.current_next,
svc['name'])
return other_services
def _add_services(self, this_service, other_services):
"""Add services to the deployment and set openstack-origin/source."""
self.log.info('OpenStackAmuletDeployment: adding services')
other_services = self._determine_branch_locations(other_services)
super(OpenStackAmuletDeployment, self)._add_services(this_service,
@ -96,7 +124,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
'ceph-osd', 'ceph-radosgw']
# Charms which can not use openstack-origin, ie. many subordinates
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe',
'openvswitch-odl', 'neutron-api-odl', 'odl-controller']
if self.openstack:
for svc in services:
@ -112,9 +141,79 @@ class OpenStackAmuletDeployment(AmuletDeployment):
def _configure_services(self, configs):
"""Configure all of the services."""
self.log.info('OpenStackAmuletDeployment: configure services')
for service, config in six.iteritems(configs):
self.d.configure(service, config)
def _auto_wait_for_status(self, message=None, exclude_services=None,
include_only=None, timeout=1800):
"""Wait for all units to have a specific extended status, except
for any defined as excluded. Unless specified via message, any
status containing any case of 'ready' will be considered a match.
Examples of message usage:
Wait for all unit status to CONTAIN any case of 'ready' or 'ok':
message = re.compile('.*ready.*|.*ok.*', re.IGNORECASE)
Wait for all units to reach this status (exact match):
message = re.compile('^Unit is ready and clustered$')
Wait for all units to reach any one of these (exact match):
message = re.compile('Unit is ready|OK|Ready')
Wait for at least one unit to reach this status (exact match):
message = {'ready'}
See Amulet's sentry.wait_for_messages() for message usage detail.
https://github.com/juju/amulet/blob/master/amulet/sentry.py
:param message: Expected status match
:param exclude_services: List of juju service names to ignore,
not to be used in conjuction with include_only.
:param include_only: List of juju service names to exclusively check,
not to be used in conjuction with exclude_services.
:param timeout: Maximum time in seconds to wait for status match
:returns: None. Raises if timeout is hit.
"""
self.log.info('Waiting for extended status on units...')
all_services = self.d.services.keys()
if exclude_services and include_only:
raise ValueError('exclude_services can not be used '
'with include_only')
if message:
if isinstance(message, re._pattern_type):
match = message.pattern
else:
match = message
self.log.debug('Custom extended status wait match: '
'{}'.format(match))
else:
self.log.debug('Default extended status wait match: contains '
'READY (case-insensitive)')
message = re.compile('.*ready.*', re.IGNORECASE)
if exclude_services:
self.log.debug('Excluding services from extended status match: '
'{}'.format(exclude_services))
else:
exclude_services = []
if include_only:
services = include_only
else:
services = list(set(all_services) - set(exclude_services))
self.log.debug('Waiting up to {}s for extended status on services: '
'{}'.format(timeout, services))
service_messages = {service: message for service in services}
self.d.sentry.wait_for_messages(service_messages, timeout=timeout)
self.log.info('OK')
def _get_openstack_release(self):
"""Get openstack release.

View File

@ -18,6 +18,7 @@ import amulet
import json
import logging
import os
import re
import six
import time
import urllib
@ -604,7 +605,22 @@ class OpenStackAmuletUtils(AmuletUtils):
'{}'.format(sample_type, samples))
return None
# rabbitmq/amqp specific helpers:
# rabbitmq/amqp specific helpers:
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
"""Wait for rmq units extended status to show cluster readiness,
after an optional initial sleep period. Initial sleep is likely
necessary to be effective following a config change, as status
message may not instantly update to non-ready."""
if init_sleep:
time.sleep(init_sleep)
message = re.compile('^Unit is ready and clustered$')
deployment._auto_wait_for_status(message=message,
timeout=timeout,
include_only=['rabbitmq-server'])
def add_rmq_test_user(self, sentry_units,
username="testuser1", password="changeme"):
"""Add a test user via the first rmq juju unit, check connection as
@ -752,7 +768,7 @@ class OpenStackAmuletUtils(AmuletUtils):
self.log.debug('SSL is enabled @{}:{} '
'({})'.format(host, port, unit_name))
return True
elif not port and not conf_ssl:
elif not conf_ssl:
self.log.debug('SSL not enabled @{}:{} '
'({})'.format(host, port, unit_name))
return False
@ -805,7 +821,10 @@ class OpenStackAmuletUtils(AmuletUtils):
if port:
config['ssl_port'] = port
deployment.configure('rabbitmq-server', config)
deployment.d.configure('rabbitmq-server', config)
# Wait for unit status
self.rmq_wait_for_cluster(deployment)
# Confirm
tries = 0
@ -832,7 +851,10 @@ class OpenStackAmuletUtils(AmuletUtils):
# Disable RMQ SSL
config = {'ssl': 'off'}
deployment.configure('rabbitmq-server', config)
deployment.d.configure('rabbitmq-server', config)
# Wait for unit status
self.rmq_wait_for_cluster(deployment)
# Confirm
tries = 0