charm-rabbitmq-server/hooks/rabbit_utils.py

2121 lines
70 KiB
Python

# Copyright 2016 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import json
import os
import re
import sys
import subprocess
import glob
import tempfile
import time
import shutil
import socket
import yaml
import functools
from collections import OrderedDict, defaultdict
try:
from croniter import CroniterBadCronError
except ImportError:
# NOTE(lourot): CroniterBadCronError doesn't exist in croniter
# 0.3.12 and older, i.e. it doesn't exist in Bionic and older.
# croniter used to raise a ValueError on these older versions:
CroniterBadCronError = ValueError
from croniter import croniter
from datetime import datetime
from rabbitmq_context import (
RabbitMQSSLContext,
RabbitMQClusterContext,
RabbitMQEnvContext,
SSL_CA_FILE,
)
from charmhelpers.contrib.charmsupport import nrpe
import charmhelpers.contrib.openstack.deferred_events as deferred_events
from charmhelpers.core.templating import render
from charmhelpers.contrib.openstack.utils import (
_determine_os_workload_status,
get_hostname,
pause_unit,
resume_unit,
is_unit_paused_set,
)
from charmhelpers.contrib.hahelpers.cluster import (
distributed_wait,
)
from charmhelpers.core.hookenv import (
relation_id,
relation_ids,
related_units,
relations_for_id,
log, ERROR,
WARNING,
INFO, DEBUG,
service_name,
status_set,
cached,
relation_set,
relation_get,
application_version_set,
config,
is_leader,
leader_get,
local_unit,
action_name,
charm_dir
)
from charmhelpers.core.host import (
pwgen,
mkdir,
write_file,
cmp_pkgrevno,
rsync,
lsb_release,
CompareHostReleases,
path_hash,
)
from charmhelpers.contrib.peerstorage import (
peer_store,
peer_retrieve
)
from charmhelpers.fetch import (
apt_pkg,
apt_update,
apt_install,
get_upstream_version,
)
CLUSTER_MODE_KEY = 'cluster-partition-handling'
CLUSTER_MODE_FOR_INSTALL = 'ignore'
import charmhelpers.coordinator as coordinator
PACKAGES = ['rabbitmq-server', 'python3-amqplib', 'lockfile-progs',
'python3-croniter']
VERSION_PACKAGE = 'rabbitmq-server'
RABBITMQ_CTL = '/usr/sbin/rabbitmqctl'
COOKIE_PATH = '/var/lib/rabbitmq/.erlang.cookie'
ENV_CONF = '/etc/rabbitmq/rabbitmq-env.conf'
RABBITMQ_CONFIG = '/etc/rabbitmq/rabbitmq.config'
RABBITMQ_CONF = '/etc/rabbitmq/rabbitmq.conf'
ENABLED_PLUGINS = '/etc/rabbitmq/enabled_plugins'
RABBIT_USER = 'rabbitmq'
LIB_PATH = '/var/lib/rabbitmq/'
HOSTS_FILE = '/etc/hosts'
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
SCRIPTS_DIR = '/usr/local/bin'
STATS_CRONFILE = '/etc/cron.d/rabbitmq-stats'
CRONJOB_CMD = ("{schedule} root timeout -k 10s -s SIGINT {timeout} "
"{command} 2>&1 | logger -p local0.notice\n")
COORD_KEY_RESTART = "restart"
COORD_KEYS = [COORD_KEY_RESTART]
_named_passwd = '/var/lib/charm/{}/{}.passwd'
_local_named_passwd = '/var/lib/charm/{}/{}.local_passwd'
# hook_contexts are used as a convenient mechanism to render templates
# logically, consider building a hook_context for template rendering so
# the charm doesn't concern itself with template specifics etc.
_CONFIG_FILES = OrderedDict([
(RABBITMQ_CONF, {
'hook_contexts': [
RabbitMQSSLContext(),
RabbitMQClusterContext(),
],
'services': ['rabbitmq-server']
}),
(RABBITMQ_CONFIG, {
'hook_contexts': [
RabbitMQSSLContext(),
RabbitMQClusterContext(),
],
'services': ['rabbitmq-server']
}),
(ENV_CONF, {
'hook_contexts': [
RabbitMQEnvContext(),
],
'services': ['rabbitmq-server']
}),
(ENABLED_PLUGINS, {
'hook_contexts': None,
'services': ['rabbitmq-server']
}),
])
def CONFIG_FILES():
_cfiles = copy.deepcopy(_CONFIG_FILES)
if cmp_pkgrevno('rabbitmq-server', '3.7') >= 0:
del _cfiles[RABBITMQ_CONFIG]
else:
del _cfiles[RABBITMQ_CONF]
return _cfiles
class ConfigRenderer(object):
"""
This class is a generic configuration renderer for
a given dict mapping configuration files and hook_contexts.
"""
def __init__(self, config):
"""
:param config: see CONFIG_FILES
:type config: dict
"""
self.config_data = {}
for config_path, data in config.items():
hook_contexts = data.get('hook_contexts', None)
if hook_contexts:
ctxt = {}
for svc_context in hook_contexts:
ctxt.update(svc_context())
self.config_data[config_path] = ctxt
def write(self, config_path):
data = self.config_data.get(config_path, None)
if data:
log("writing config file: %s , data: %s" % (config_path,
str(data)),
level='DEBUG')
render(os.path.basename(config_path), config_path,
data, perms=0o644)
def write_all(self):
"""Write all the defined configuration files"""
for service in self.config_data.keys():
self.write(service)
def complete_contexts(self):
return []
class RabbitmqError(Exception):
pass
def run_cmd(cmd):
"""Run provided command and decode the output.
:param cmd: Command to run
:type cmd: List[str]
:returns: output from command
:rtype: str
"""
output = subprocess.check_output(cmd)
output = output.decode('utf-8')
return output
def rabbit_supports_json():
"""Check if version of rabbit supports json formatted output.
:returns: If json output is supported.
:rtype: bool
"""
return caching_cmp_pkgrevno('rabbitmq-server', '3.8.2') >= 0
@cached
def caching_cmp_pkgrevno(package, revno, pkgcache=None):
"""Compare supplied revno with the revno of the installed package.
* 1 => Installed revno is greater than supplied arg
* 0 => Installed revno is the same as supplied arg
* -1 => Installed revno is less than supplied arg
:param package: Package to check revno of
:type package: str
:param revno: Revision number to compare against
:type revno: str
:param pkgcache: Version obj from pkgcache
:type pkgcache: ubuntu_apt_pkg.Version
:returns: Whether versions match
:rtype: int
"""
return cmp_pkgrevno(package, revno, pkgcache)
def query_rabbit(cmd, raw_processor=None, json_processor=None,
binary=RABBITMQ_CTL):
"""Run query against rabbit.
Run query against rabbit and then run post-query processor on the
output. If the version of rabbit that is installed supports formatting
the output in json format then the '--formatter=json' flag is added.
:param cmd: Query to run
:type cmd: List[str]
:param raw_processor: Function to call with command output as the only
argument.
:type raw_processor: Callable
:param json_processor: Function to call with json loaded output as the only
argument.
:type json_processor: Callable
:returns: Return processed output from query
:rtype: ANY
"""
cmd.insert(0, binary)
if rabbit_supports_json():
cmd.append('--formatter=json')
output = json.loads(run_cmd(cmd))
if json_processor:
return json_processor(output)
else:
# A processor may not be needed for loaded json.
return output
else:
if raw_processor:
return raw_processor(run_cmd(cmd))
else:
raise NotImplementedError
def list_vhosts():
"""Returns a list of all the available vhosts
:returns: List of vhosts
:rtype: [str]
"""
def _json_processor(output):
return [ll['name'] for ll in output]
def _raw_processor(output):
if '...done' in output:
return output.split('\n')[1:-2]
else:
return output.split('\n')[1:-1]
try:
return query_rabbit(
['list_vhosts'],
raw_processor=_raw_processor,
json_processor=_json_processor)
except Exception as ex:
# if no vhosts, just raises an exception
log(str(ex), level='DEBUG')
return []
def list_vhost_queue_info(vhost):
"""Provide a list of queue info objects for the given vhost.
:returns: List of dictionaries of queue information
eg [{'name': 'queue name', 'messages': 0, 'consumers': 1}, ...]
:rtype: List[Dict[str, Union[str, int]]]
:raises: CalledProcessError
"""
def _raw_processor(output):
queue_info = []
if '...done' in output:
queues = output.split('\n')[1:-2]
else:
queues = output.split('\n')[1:-1]
for queue in queues:
[qname, qmsgs, qconsumers] = queue.split()
queue_info.append({
'name': qname,
'messages': int(qmsgs),
'consumers': int(qconsumers)
})
return queue_info
cmd = ['-p', vhost, 'list_queues', 'name', 'messages', 'consumers']
return query_rabbit(
cmd,
raw_processor=_raw_processor)
def list_users():
"""Returns a list of users.
:returns: List of users
:rtype: [str]
"""
def _json_processor(output):
return [ll['user'] for ll in output]
def _raw_processor(output):
lines = output.split('\n')[1:]
return [line.split('\t')[0] for line in lines]
return query_rabbit(
['list_users'],
raw_processor=_raw_processor,
json_processor=_json_processor)
def list_user_tags(user):
"""Get list of tags for user.
:param user: Name of user to get tags for
:type user: str
:returns: List of tags associated with user.
:rtype: [str]
:raises: NotImplementedError
"""
all_tags = query_rabbit(['list_users'])
users = [
tag_dict['tags']
for tag_dict in all_tags
if tag_dict['user'] == user]
# The datastructure returned by rabbitctl is a list of dicts in the
# form [{'user': 'testuser1', 'tags': []}, ...] so it is possible
# for there to be multiple dicts which apply to the same user. Handle
# this unlikely event by merging the lists.
result = sum(users, [])
result = sorted(list(set(result)))
return result
def list_user_permissions(user):
"""Get list of user permissions.
:param user: Name of user to get permissions for
:type user: str
:returns: List of dictionaries e.g. [{'vhost': 'vhost-name',
'configure': '.*',
'write': '.*',
'read': '.*'},...]
:rtype: List[Dict[str,str]]
:raises: NotImplementedError
"""
return query_rabbit(['list_user_permissions', user])
def list_user_vhost_permissions(user, vhost):
"""Get list of user permissions for a vhost.
:param user: Name of user to get permissions for
:type user: str
:param vhost: Name of vhost to get permissions for
:type vhost: str
:returns: List of dictionaries e.g. [{'configure': '.*',
'write': '.*',
'read': '.*'},...]
:rtype: List[Dict[str,str]]
:raises: NotImplementedError
"""
perms = list_user_permissions(user)
vhost_perms = [
{
'configure': tag_dict['configure'],
'write': tag_dict['write'],
'read': tag_dict['read']}
for tag_dict in perms
if tag_dict['vhost'] == vhost]
return vhost_perms
def list_plugins():
"""List plugins.
Return a list of dictionaries relating to each plugin.
:returns: List of dictionaries e.g.
[
{
'enabled': str,
'name': str,
'running': bool,
'running_version': int,
'version': [int, int, ...]},
...
]
:rtype: List[Dict[str, Union[str, bool, int, List[int]]]]
"""
def _json_processor(output):
return output['plugins']
return query_rabbit(
['list'],
json_processor=_json_processor,
binary=get_plugin_manager())
def list_policies(vhost):
"""List policies for a vhost.
Return a list of dictionaries relating to each policy.
[
{
'apply-to': str,
'definition': str,
'name': str,
'pattern': str,
'priority': int,
'vhost': str}
...]
:param vhost: Name of vhost to get policies for
:type vhost: str
:returns: List of dictionaries e.g.
:rtype: List[Dict[str, Union[str, int]]]
"""
return query_rabbit(['list_policies', '-p', vhost])
def get_vhost_policy(vhost, policy_name):
"""Get a policy with a given name on a given vhost.
Return policy:
{
'apply-to': str,
'definition': str,
'name': str,
'pattern': str,
'priority': int,
'vhost': str}
:param vhost: Name of vhost to get policies for
:type vhost: str
:param policy_name: Name of policy
:type policy_name: str
:returns: Policy dict.
:rtype: Union[Dict[str, Union[str, int]], None]
"""
policies = [
p for p in list_policies(vhost)
if p['name'] == policy_name]
if len(policies) > 0:
return policies[0]
else:
return None
def list_enabled_plugins():
"""Get list of enabled plugins.
:returns: List of enabled plugins
:rtype: [str]
:raises: NotImplementedError
"""
enabled_plugins = [
pdict['name']
for pdict in list_plugins()
if pdict['enabled'] in ['enabled', 'implicit']]
log("Enabled plugins: {}".format(enabled_plugins))
return sorted(list(set(enabled_plugins)))
def vhost_queue_info(vhost):
return list_vhost_queue_info(vhost)
def vhost_exists(vhost):
return vhost in list_vhosts()
def create_vhost(vhost):
if vhost_exists(vhost):
return
rabbitmqctl('add_vhost', vhost)
log('Created new vhost (%s).' % vhost)
def user_exists(user):
return user in list_users()
def apply_tags(user, tags):
"""Apply tags to user if needed.
Apply tags to user. Any existing tags will be removed.
:param user: Name of user to apply tags to.
:type user: str
:param tags: Tags to apply to user.
:type tags: List[str]
:raises: NotImplementedError
"""
log('Adding tags [{}] to user {}'.format(
', '.join(tags),
user
))
try:
existing_user_tags = list_user_tags(user)
if existing_user_tags == sorted(list(set(tags))):
log("User {} already has tags {}".format(user, tags))
else:
log("Existing user tags for {} are {} which do not match {}. "
"Updating tags.".format(user, existing_user_tags, tags))
rabbitmqctl('set_user_tags', user, ' '.join(tags))
except NotImplementedError:
# Cannot cheeck existing tags so apply them to be on thee
# safe side.
log("Cannot retrieve existing tags for {}".format(user))
rabbitmqctl('set_user_tags', user, ' '.join(tags))
def create_user(user, password, tags=[]):
exists = user_exists(user)
if not exists:
log('Creating new user (%s).' % user)
rabbitmqctl('add_user', user, password)
if 'administrator' in tags:
log('Granting admin access to {}'.format(user))
apply_tags(user, tags)
def grant_permissions(user, vhost):
"""Grant all permissions on a vhost to a user.
:param user: Name of user to give permissions to.
:type user: str
:param vhost: Name of vhost to give permissions on
:type vhost: str
"""
log(
"Granting permissions for user {} on vhost {}".format(user, vhost),
level='DEBUG')
try:
vhost_perms = list_user_vhost_permissions(user, vhost)
if len(vhost_perms) > 1:
# This will almost certainly never happen but handle it just in
# case.
log('Multiple permissions found for the same user and vhost. '
'Resetting permission.')
apply_perms = True
elif len(vhost_perms) == 1:
perms = vhost_perms[0]
apply_perms = False
for attr in ['configure', 'write', 'read']:
if perms[attr] != '.*':
log(
'Permissions for user {} on vhost {} to {} are {} '
'not {}'.format(user, vhost, attr, perms[attr], '.*'))
apply_perms = True
else:
apply_perms = True
except NotImplementedError:
apply_perms = True
if apply_perms:
rabbitmqctl('set_permissions', '-p',
vhost, user, '.*', '.*', '.*')
else:
log('No permissions update needed for user {} on vhost {}'.format(
user, vhost))
def compare_policy(vhost, policy_name, pattern, definition, priority=None,
apply_to=None):
"""Check a policy matches the supplied attributes.
:param vhost: Name of vhost to find policy in
:type vhost: str
:param policy_name: Name of policy
:type policy_name: str
:param pattern: The regular expression, which when matches on a given
resources causes the policy to apply.
:type pattern: str
:param definition: The definition of the policy, as a JSON term.
:type definition: str
:param priority: The priority of the policy as an integer.
:type priority: int
:param apply_to: Which types of object this policy should apply to.
Possible values are: queues exchanges all
:type apply_to: str
:returns: Whether they match
:rtype: bool
"""
policy = get_vhost_policy(vhost, policy_name)
# Convert unset options to defaults
# https://www.rabbitmq.com/rabbitmqctl.8.html#set_policy
if priority is None:
priority = 0
if apply_to is None:
apply_to = 'all'
if not policy:
log("Old policy not found.")
return False
if policy.get('pattern') != pattern:
log("Policy pattern does not match, {} != {}".format(
policy.get('pattern'),
pattern))
return False
if policy.get('priority') != int(priority):
log("Policy priority does not match, {} != {}".format(
policy.get('priority'),
priority))
return False
if policy.get('apply-to') != apply_to:
log("Policy apply_to does not match, {} != {}".format(
policy.get('apply-to'),
apply_to))
return False
existing_definition = json.loads(policy['definition'])
new_definition = json.loads(definition)
if existing_definition != new_definition:
log("Policy definition does not match, {} != {}".format(
policy.get('existing_definition'),
new_definition))
return False
log("Policies match")
return True
def set_policy(vhost, policy_name, pattern, definition, priority=None,
apply_to=None):
"""Apply a policy
:param vhost: Name of vhost to apply policy to
:type vhost: str
:param policy_name: Name of policy
:type policy_name: str
:param pattern: The regular expression, which when matches on a given
resources causes the policy to apply.
:type pattern: str
:param definition: The definition of the policy, as a JSON term.
:type definition: str
:param priority: The priority of the policy as an integer.
:type priority: int
:param apply_to: Which types of object this policy should apply to.
Possible values are: queues exchanges all
:type apply_to: str
"""
try:
if compare_policy(vhost, policy_name, pattern, definition, priority,
apply_to):
log("{} on vhost {} matched proposed policy, no update "
"needed.".format(policy_name, vhost))
policy_update_needed = False
else:
log("{} on vhost {} did not match proposed policy, update "
"needed.".format(policy_name, vhost))
policy_update_needed = True
except NotImplementedError:
log("Could not query exiting policies. Assuming update of {} on vhost "
"{} is needed.".format(policy_name, vhost))
policy_update_needed = True
if policy_update_needed:
log("{} on vhost {} required update.".format(policy_name, vhost))
log("setting policy", level='DEBUG')
cmd = ['set_policy', '-p', vhost]
if priority:
cmd.extend(['--priority', priority])
if apply_to:
cmd.extend(['--apply-to', apply_to])
cmd.extend([policy_name, pattern, definition])
rabbitmqctl(*cmd)
def clear_policy(vhost, policy_name):
"""Remove a policy
:param vhost: Name of vhost to remove policy from
:type vhost: str
:param policy_name: Name of policy
:type policy_name: str
"""
try:
policy = get_vhost_policy(vhost, policy_name)
if policy:
policy_update_needed = True
else:
log("{} on vhost {} not found, do not need to clear it.".format(
policy_name,
vhost))
policy_update_needed = False
except NotImplementedError:
log("Policy lookup failed for {} on vhost {} not.".format(
policy_name,
vhost))
policy_update_needed = True
if policy_update_needed:
rabbitmqctl('clear_policy', '-p', vhost, policy_name)
def set_ha_mode(vhost, mode, params=None, sync_mode='automatic'):
"""Valid mode values:
* 'all': Queue is mirrored across all nodes in the cluster. When a new
node is added to the cluster, the queue will be mirrored to that node.
* 'exactly': Queue is mirrored to count nodes in the cluster.
* 'nodes': Queue is mirrored to the nodes listed in node names
More details at http://www.rabbitmq.com./ha.html
:param vhost: virtual host name
:param mode: ha mode
:param params: values to pass to the policy, possible values depend on the
mode chosen.
:param sync_mode: when `mode` is 'exactly' this used to indicate how the
sync has to be done
http://www.rabbitmq.com./ha.html#eager-synchronisation
"""
if caching_cmp_pkgrevno('rabbitmq-server', '3.0.0') < 0:
log(("Mirroring queues cannot be enabled, only supported "
"in rabbitmq-server >= 3.0"), level=WARNING)
log(("More information at http://www.rabbitmq.com/blog/"
"2012/11/19/breaking-things-with-rabbitmq-3-0"), level='INFO')
return
if mode == 'all':
definition = {
"ha-mode": "all",
"ha-sync-mode": sync_mode}
elif mode == 'exactly':
definition = {
"ha-mode": "exactly",
"ha-params": params,
"ha-sync-mode": sync_mode}
elif mode == 'nodes':
definition = {
"ha-mode": "nodes",
"ha-params": params,
"ha-sync-mode": sync_mode}
else:
raise RabbitmqError(("Unknown mode '%s', known modes: "
"all, exactly, nodes"))
log("Setting HA policy to vhost '%s'" % vhost, level='INFO')
set_policy(vhost, 'HA', r'^(?!amq\.).*', json.dumps(definition))
def clear_ha_mode(vhost, name='HA', force=False):
"""
Clear policy from the `vhost` by `name`
"""
if cmp_pkgrevno('rabbitmq-server', '3.0.0') < 0:
log(("Mirroring queues not supported "
"in rabbitmq-server >= 3.0"), level=WARNING)
log(("More information at http://www.rabbitmq.com/blog/"
"2012/11/19/breaking-things-with-rabbitmq-3-0"), level='INFO')
return
log("Clearing '%s' policy from vhost '%s'" % (name, vhost), level='INFO')
try:
clear_policy(vhost, name)
except subprocess.CalledProcessError as ex:
if not force:
raise ex
def set_all_mirroring_queues(enable):
"""
:param enable: if True then enable mirroring queue for all the vhosts,
otherwise the HA policy is removed
"""
if cmp_pkgrevno('rabbitmq-server', '3.0.0') < 0:
log(("Mirroring queues not supported "
"in rabbitmq-server >= 3.0"), level=WARNING)
log(("More information at http://www.rabbitmq.com/blog/"
"2012/11/19/breaking-things-with-rabbitmq-3-0"), level='INFO')
return
if enable:
status_set('active', 'Checking queue mirroring is enabled')
else:
status_set('active', 'Checking queue mirroring is disabled')
for vhost in list_vhosts():
if enable:
set_ha_mode(vhost, 'all')
else:
clear_ha_mode(vhost, force=True)
def rabbitmqctl(action, *args):
''' Run rabbitmqctl with action and args. This function uses
subprocess.check_call. For uses that need check_output
use a direct subprocess call or rabbitmqctl_normalized_output
function.
'''
# NOTE(lourot): before rabbitmq-server 3.8 (focal),
# `rabbitmqctl wait <pidfile>` doesn't have a `--timeout` option and thus
# may hang forever and needs to be wrapped in
# `timeout 180 rabbitmqctl wait <pidfile>`.
# Since 3.8 there is a `--timeout` option, whose default is 10 seconds. [1]
#
# [1]: https://github.com/rabbitmq/rabbitmq-server/commit/3dd58ae1
WAIT_TIMEOUT_SECONDS = 180
focal_or_newer = rabbitmq_version_newer_or_equal('3.8')
cmd = []
if 'wait' in action and not focal_or_newer:
cmd.extend(['timeout', str(WAIT_TIMEOUT_SECONDS)])
cmd.extend([RABBITMQ_CTL, action])
for arg in args:
cmd.append(arg)
if 'wait' in action and focal_or_newer:
cmd.extend(['--timeout', str(WAIT_TIMEOUT_SECONDS)])
log("Running {}".format(cmd), 'DEBUG')
subprocess.check_call(cmd)
def configure_notification_ttl(vhost, ttl=3600000):
''' Configure 1h minute TTL for notfication topics in the provided vhost
This is a workaround for filling notification queues in OpenStack
until a more general service discovery mechanism exists so that
notifications can be enabled/disabled on each individual service.
'''
set_policy(
vhost,
'TTL',
'^(versioned_)?notifications.*',
'{{"message-ttl":{ttl}}}'.format(ttl=ttl),
priority='1',
apply_to='queues')
def configure_ttl(vhost, ttlname, ttlreg, ttl):
''' Some topic queues like in heat also need to set TTL, see lp:1925436
Configure TTL for heat topics in the provided vhost, this is a
workaround for filling heat queues and for future other queues
'''
log('configure_ttl: ttlname={} ttlreg={} ttl={}'.format(
ttlname, ttlreg, ttl), INFO)
if not all([ttlname, ttlreg, ttl]):
return
set_policy(
vhost,
'{ttlname}'.format(ttlname=ttlname),
'"{ttlreg}"'.format(ttlreg=ttlreg),
'{{"expires":{ttl}}}'.format(ttl=ttl),
priority='1',
apply_to='queues')
def rabbitmqctl_normalized_output(*args):
''' Run rabbitmqctl with args. Normalize output by removing
whitespace and return it to caller for further processing.
'''
cmd = [RABBITMQ_CTL]
cmd.extend(args)
out = (subprocess
.check_output(cmd, stderr=subprocess.STDOUT)
.decode('utf-8'))
# Output is in Erlang External Term Format (ETF). The amount of whitespace
# (including newlines in the middle of data structures) in the output
# depends on the data presented. ETF resembles JSON, but it is not.
# Writing our own parser is a bit out of scope, enabling management-plugin
# to use REST interface might be overkill at this stage.
#
# Removing whitespace will let our simple pattern matching work and is a
# compromise.
return out.translate(str.maketrans(dict.fromkeys(' \t\n')))
def wait_app():
''' Wait until rabbitmq has fully started '''
run_dir = '/var/run/rabbitmq/'
if os.path.isdir(run_dir):
pid_file = run_dir + 'pid'
else:
pid_file = '/var/lib/rabbitmq/mnesia/rabbit@' \
+ socket.gethostname() + '.pid'
log('Waiting for rabbitmq app to start: {}'.format(pid_file), DEBUG)
try:
rabbitmqctl('wait', pid_file)
log('Confirmed rabbitmq app is running', DEBUG)
return True
except subprocess.CalledProcessError as ex:
status_set('blocked', 'RabbitMQ failed to start')
try:
status_cmd = ['rabbitmqctl', 'status']
log(subprocess.check_output(status_cmd).decode('utf-8'), DEBUG)
except Exception:
pass
raise ex
def cluster_wait():
''' Wait for operations based on modulo distribution
Use the distributed_wait function to determine how long to wait before
running an operation like restart or cluster join. By setting modulo to
the exact number of nodes in the cluster we get serial operations.
Check for explicit configuration parameters for modulo distribution.
The config setting modulo-nodes has first priority. If modulo-nodes is not
set, check min-cluster-size. Finally, if neither value is set, determine
how many peers there are from the cluster relation.
@side_effect: distributed_wait is called which calls time.sleep()
@return: None
'''
wait = config('known-wait')
if config('modulo-nodes') is not None:
# modulo-nodes has first priority
num_nodes = config('modulo-nodes')
elif config('min-cluster-size'):
# min-cluster-size is consulted next
num_nodes = config('min-cluster-size')
else:
# If nothing explicit is configured, determine cluster size based on
# peer relations
num_nodes = 1
for rid in relation_ids('cluster'):
num_nodes += len(related_units(rid))
distributed_wait(modulo=num_nodes, wait=wait)
def start_app():
''' Start the rabbitmq app and wait until it is fully started '''
status_set('maintenance', 'Starting rabbitmq application')
rabbitmqctl('start_app')
wait_app()
def join_cluster(node):
''' Join cluster with node '''
if cmp_pkgrevno('rabbitmq-server', '3.0.1') >= 0:
cluster_cmd = 'join_cluster'
else:
cluster_cmd = 'cluster'
status_set('maintenance',
'Clustering with remote rabbit host (%s).' % node)
rabbitmqctl('stop_app')
# Intentionally using check_output so we can see rabbitmqctl error
# message if it fails
cmd = [RABBITMQ_CTL, cluster_cmd, node]
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
start_app()
log('Host clustered with %s.' % node, 'INFO')
def cluster_with():
if is_unit_paused_set():
log("Do not run cluster_with while unit is paused", "WARNING")
return
log('Clustering with new node')
# check the leader and try to cluster with it
node = leader_node()
if node:
if node in running_nodes():
log('Host already clustered with %s.' % node)
cluster_rid = relation_id('cluster', local_unit())
is_clustered = relation_get(attribute='clustered',
rid=cluster_rid,
unit=local_unit())
log('am I clustered?: %s' % bool(is_clustered), level=DEBUG)
if not is_clustered:
# NOTE(freyes): this node needs to be marked as clustered, it's
# part of the cluster according to 'rabbitmqctl cluster_status'
# (LP: #1691510)
relation_set(relation_id=cluster_rid,
clustered=get_unit_hostname(),
timestamp=time.time())
return False
# NOTE: The primary problem rabbitmq has clustering is when
# more than one node attempts to cluster at the same time.
# The asynchronous nature of hook firing nearly guarantees
# this. Using cluster_wait based on modulo_distribution
cluster_wait()
try:
join_cluster(node)
# NOTE: toggle the cluster relation to ensure that any peers
# already clustered re-assess status correctly
relation_set(clustered=get_unit_hostname(), timestamp=time.time())
return True
except subprocess.CalledProcessError as e:
status_set('blocked', 'Failed to cluster with %s. Exception: %s'
% (node, e))
start_app()
else:
status_set('waiting', 'Leader not available for clustering')
return False
return False
def check_cluster_memberships():
"""Check for departed nodes.
Iterate over RabbitMQ node list, compare it to charm cluster relationships,
and notify about any nodes previously abruptly removed from the cluster.
:returns: String node name or None
:rtype: Union[str, None]
"""
for rid in relation_ids('cluster'):
for node in nodes():
if not any(rel.get('clustered', None) == node.split('@')[1]
for rel in relations_for_id(relid=rid)) and \
node not in running_nodes():
log("check_cluster_memberships(): '{}' in nodes but not in "
"charm relations or running_nodes."
.format(node), level=DEBUG)
return node
def leave_cluster():
''' Leave cluster gracefully '''
try:
rabbitmqctl('stop_app')
rabbitmqctl('reset')
start_app()
log('Successfully left cluster gracefully.')
except Exception:
# error, no nodes available for clustering
log('Cannot leave cluster, we might be the last disc-node in the '
'cluster.', level=ERROR)
raise
def get_plugin_manager():
"""Find the path to the executable for managing plugins.
:returns: Path to rabbitmq-plugins executable
:rtype: str
"""
# At version 3.8.2, only /sbin/rabbitmq-plugins can enable plugin correctly
if os.path.exists("/sbin/rabbitmq-plugins"):
return '/sbin/rabbitmq-plugins'
else:
return glob.glob(
'/usr/lib/rabbitmq/lib/rabbitmq_server-*/sbin/rabbitmq-plugins')[0]
def _manage_plugin(plugin, action):
os.environ['HOME'] = '/root'
plugin_manager = get_plugin_manager()
subprocess.check_call([plugin_manager, action, plugin])
def enable_plugin(plugin):
try:
enabled_plugins = list_enabled_plugins()
if plugin in enabled_plugins:
log("Plugin {} already in list of enabled plugins {}. Do not "
"need to enable it.".format(plugin, enabled_plugins))
else:
log("Enabling plugin {}".format(plugin))
_manage_plugin(plugin, 'enable')
except (NotImplementedError, subprocess.CalledProcessError):
log("Cannot get list of plugins, running enable just in "
"case.")
_manage_plugin(plugin, 'enable')
def disable_plugin(plugin):
try:
enabled_plugins = list_enabled_plugins()
if plugin in enabled_plugins:
log("Disabling plugin {}".format(plugin))
_manage_plugin(plugin, 'disable')
else:
log("Plugin {} not in list of enabled plugins {}. Do not need to "
"disable it.".format(plugin, enabled_plugins))
except (NotImplementedError, subprocess.CalledProcessError):
log("Cannot get list of plugins, running disable just in "
"case.")
_manage_plugin(plugin, 'disable')
def get_managment_port():
if rabbitmq_version_newer_or_equal('3'):
return 15672
else:
return 55672
def execute(cmd, die=False, echo=False):
""" Executes a command
if die=True, script will exit(1) if command does not return 0
if echo=True, output of command will be printed to stdout
returns a tuple: (stdout, stderr, return code)
"""
p = subprocess.Popen(cmd.split(" "),
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout = ""
stderr = ""
def print_line(ll):
if echo:
print(ll.strip('\n'))
sys.stdout.flush()
for ll in iter(p.stdout.readline, ''):
print_line(ll)
stdout += ll
for ll in iter(p.stderr.readline, ''):
print_line(ll)
stderr += ll
p.communicate()
rc = p.returncode
if die and rc != 0:
log("command %s return non-zero." % cmd, level=ERROR)
return (stdout, stderr, rc)
def get_rabbit_password_on_disk(username, password=None, local=False):
''' Retrieve, generate or store a rabbit password for
the provided username on disk'''
if local:
_passwd_file = _local_named_passwd.format(service_name(), username)
else:
_passwd_file = _named_passwd.format(service_name(), username)
_password = None
if os.path.exists(_passwd_file):
with open(_passwd_file, 'r') as passwd:
_password = passwd.read().strip()
else:
mkdir(os.path.dirname(_passwd_file), owner=RABBIT_USER,
group=RABBIT_USER, perms=0o775)
os.chmod(os.path.dirname(_passwd_file), 0o775)
_password = password or pwgen(length=64)
write_file(_passwd_file, _password, owner=RABBIT_USER,
group=RABBIT_USER, perms=0o660)
return _password
def migrate_passwords_to_peer_relation():
'''Migrate any passwords storage on disk to cluster peer relation'''
for f in glob.glob('/var/lib/charm/{}/*.passwd'.format(service_name())):
_key = os.path.basename(f)
with open(f, 'r') as passwd:
_value = passwd.read().strip()
try:
peer_store(_key, _value)
os.unlink(f)
except ValueError:
# NOTE cluster relation not yet ready - skip for now
pass
def get_rabbit_password(username, password=None, local=False):
''' Retrieve, generate or store a rabbit password for
the provided username using peer relation cluster'''
if local:
return get_rabbit_password_on_disk(username, password, local)
else:
migrate_passwords_to_peer_relation()
_key = '{}.passwd'.format(username)
try:
_password = peer_retrieve(_key)
if _password is None:
_password = password or pwgen(length=64)
peer_store(_key, _password)
except ValueError:
# cluster relation is not yet started, use on-disk
_password = get_rabbit_password_on_disk(username, password)
return _password
def update_hosts_file(map):
"""Rabbitmq does not currently like ipv6 addresses so we need to use dns
names instead. In order to make them resolvable we ensure they are in
/etc/hosts.
"""
with open(HOSTS_FILE, 'r') as hosts:
lines = hosts.readlines()
log("Updating hosts file with: %s (current: %s)" % (map, lines),
level=INFO)
newlines = []
for ip, hostname in map.items():
if not ip or not hostname:
continue
keepers = []
for line in lines:
_line = line.split()
if len(line) < 2 or not (_line[0] == ip or hostname in _line[1:]):
keepers.append(line)
else:
log("Removing line '%s' from hosts file" % (line))
lines = keepers
newlines.append("%s %s\n" % (ip, hostname))
lines += newlines
with tempfile.NamedTemporaryFile(delete=False) as tmpfile:
with open(tmpfile.name, 'w') as hosts:
for line in lines:
hosts.write(line)
shutil.move(tmpfile.name, HOSTS_FILE)
os.chmod(HOSTS_FILE, 0o644)
def restart_map():
'''Determine the correct resource map to be passed to
charmhelpers.core.restart_on_change() based on the services configured.
:returns: dict: A dictionary mapping config file to lists of services
that should be restarted when file changes.
'''
_map = []
for f, ctxt in _CONFIG_FILES.items():
svcs = []
for svc in ctxt['services']:
svcs.append(svc)
if svcs:
_map.append((f, svcs))
return OrderedDict(_map)
def services():
''' Returns a list of services associate with this charm '''
_services = []
for v in restart_map().values():
_services = _services + v
return list(set(_services))
def get_cluster_status(cmd_timeout=None):
"""Raturn rabbit cluster status
:param cmd_timeout: How long to give the command to complete.
:type cmd_timeout: int
:returns: Rabbitmq cluster status
:rtype: dict
:raises: NotImplementedError, subprocess.TimeoutExpired,
"""
if caching_cmp_pkgrevno('rabbitmq-server', '3.8.2') >= 0:
cmd = [RABBITMQ_CTL, 'cluster_status', '--formatter=json']
output = subprocess.check_output(
cmd,
timeout=cmd_timeout).decode('utf-8')
return json.loads(output)
else:
# rabbitmqctl has not implemented the formatter option.
raise NotImplementedError
@cached
def nodes(get_running=False):
''' Get list of nodes registered in the RabbitMQ cluster '''
# NOTE(ajkavanagh): In focal and above, rabbitmq-server now has a
# --formatter option.
try:
status = get_cluster_status()
if get_running:
return status['running_nodes']
return status['disk_nodes'] + status['ram_nodes']
except NotImplementedError:
out = rabbitmqctl_normalized_output('cluster_status')
cluster_status = {}
for m in re.finditer(r"{([^,]+),(?!\[{)\[([^\]]*)", out):
state = m.group(1)
items = m.group(2).split(',')
items = [x.replace("'", '').strip() for x in items]
cluster_status.update({state: items})
if get_running:
return cluster_status.get('running_nodes', [])
return cluster_status.get('disc', []) + cluster_status.get('ram', [])
@cached
def is_partitioned():
"""Check whether rabbitmq cluster is partitioned.
:returns: Whether cluster is partitioned
:rtype: bool
:raises: NotImplementedError, subprocess.TimeoutExpired,
"""
status = get_cluster_status(cmd_timeout=60)
return status.get('partitions') != {}
@cached
def running_nodes():
''' Determine the current set of running nodes in the RabbitMQ cluster '''
return nodes(get_running=True)
@cached
def leader_node():
''' Provide the leader node for clustering
@returns leader node's hostname or None
'''
# Each rabbitmq node should join_cluster with the leader
# to avoid split-brain clusters.
try:
leader_node_hostname = peer_retrieve('leader_node_hostname')
except ValueError:
# This is a single unit
return None
if leader_node_hostname:
return "rabbit@" + leader_node_hostname
else:
return None
def get_node_hostname(ip_addr):
''' Resolve IP address to hostname '''
try:
nodename = get_hostname(ip_addr, fqdn=False)
except Exception:
log('Cannot resolve hostname for %s using DNS servers' % ip_addr,
level=WARNING)
log('Falling back to use socket.gethostname()',
level=WARNING)
# If the private-address is not resolvable using DNS
# then use the current hostname
nodename = socket.gethostname()
log('local nodename: %s' % nodename, level=INFO)
return nodename
@cached
def clustered():
''' Determine whether local rabbitmq-server is clustered '''
# NOTE: A rabbitmq node can only join a cluster once.
# Simply checking for more than one running node tells us
# if this unit is in a cluster.
if len(running_nodes()) > 1:
return True
else:
return False
def assess_cluster_status(*args):
''' Assess the status for the current running unit '''
if is_unit_paused_set():
return "maintenance", "Paused"
# NOTE: ensure rabbitmq is actually installed before doing
# any checks
if not rabbitmq_is_installed():
return 'waiting', 'RabbitMQ is not yet installed'
# Sufficient peers
if not is_sufficient_peers():
return 'waiting', ("Waiting for all {} peers to complete the "
"cluster.".format(config('min-cluster-size')))
# Clustering Check
peer_ids = relation_ids('cluster')
if peer_ids and len(related_units(peer_ids[0])):
if not clustered():
return 'waiting', 'Unit has peers, but RabbitMQ not clustered'
# Departed nodes
departed_node = check_cluster_memberships()
if departed_node:
return (
'blocked',
'Node {} in the cluster but not running. If it is a departed '
'node, remove with `forget-cluster-node` action'
.format(departed_node))
# Check if cluster is partitioned
try:
if peer_ids and len(related_units(peer_ids[0])) and is_partitioned():
return ('blocked', 'RabbitMQ is partitioned')
except (subprocess.TimeoutExpired, NotImplementedError):
pass
# General status check
if not wait_app():
return (
'blocked', 'Unable to determine if the rabbitmq service is up')
if leader_get(CLUSTER_MODE_KEY) != config(CLUSTER_MODE_KEY):
return (
'waiting',
'Not reached target {} mode'.format(CLUSTER_MODE_KEY))
# we're active - so just return the 'active' state, but if 'active'
# is returned, then it is ignored by the assess_status system.
return 'active', "message is ignored"
def in_run_deferred_hooks_action():
"""Check if current execution context is the run-deferred-hooks action
:returns: Whether currently in run-deferred-hooks hook
:rtype: bool
"""
return action_name() == 'run-deferred-hooks'
def coordinated_restart_on_change(restart_map, restart_function):
"""Decorator to check for file changes.
Check for file changes after decorated function runs. If a change
is detected then a restart request is made using the coordinator
module.
:param restart_map: {file: [service, ...]}
:type restart_map: Dict[str, List[str,]]
:param restart_functions: Function to be used to perform restart if
lock is immediatly granted.
:type restart_functions: Callable
"""
def wrap(f):
@functools.wraps(f)
def wrapped_f(*args, **kwargs):
if is_unit_paused_set():
return f(*args, **kwargs)
pre = {path: path_hash(path) for path in restart_map}
f(*args, **kwargs)
post = {path: path_hash(path) for path in restart_map}
if pre == post:
log("No restart needed")
else:
changed = [
path
for path in restart_map.keys()
if pre[path] != post[path]]
log("Requesting restart. File(s) {} have changed".format(
','.join(changed)))
if in_run_deferred_hooks_action():
log("In action context, requesting immediate restart")
restart_function(coordinate_restart=False)
else:
serial = coordinator.Serial()
serial.acquire(COORD_KEY_RESTART)
# Run the restart function which will check if lock is
# is immediatly available.
restart_function(coordinate_restart=True)
return wrapped_f
return wrap
def assess_status(configs):
"""Assess status of current unit
Decides what the state of the unit should be based on the current
configuration.
SIDE EFFECT: calls set_os_workload_status(...) which sets the workload
status of the unit.
Also calls status_set(...) directly if paused state isn't complete.
@param configs: a templating.OSConfigRenderer() object
@returns None - this function is executed for its side-effect
"""
deferred_events.check_restart_timestamps()
assess_status_func(configs)()
rmq_version = get_upstream_version(VERSION_PACKAGE)
if rmq_version:
application_version_set(rmq_version)
def assess_status_func(configs):
"""Helper function to create the function that will assess_status() for
the unit.
Uses charmhelpers.contrib.openstack.utils.make_assess_status_func() to
create the appropriate status function and then returns it.
Used directly by assess_status() and also for pausing and resuming
the unit.
NOTE(ajkavanagh) ports are not checked due to race hazards with services
that don't behave sychronously w.r.t their service scripts. e.g.
apache2.
@param configs: a templating.OSConfigRenderer() object
@return f() -> None : a function that assesses the unit's workload status
"""
def _assess_status_func():
state, message = _determine_os_workload_status(
configs, {},
charm_func=assess_cluster_status,
services=services(), ports=None)
if state == 'active' and clustered():
message = 'Unit is ready and clustered'
# Remind the administrator cluster_series_upgrading is set.
# If the cluster has completed the series upgrade, run the
# complete-cluster-series-upgrade action to clear this setting.
if leader_get('cluster_series_upgrading'):
message += (", run complete-cluster-series-upgrade when the "
"cluster has completed its upgrade")
# Edge case when the first rabbitmq unit is upgraded it will show
# waiting for peers. Force "active" workload state for various
# testing suites like zaza to recognize a successful series upgrade
# of the first unit.
if state == "waiting":
state = "active"
# Validate that the cron schedule for nrpe status checks is correct. An
# invalid cron schedule will not prevent the rabbitmq service from
# running but may cause problems with nrpe checks.
schedule = config('stats_cron_schedule')
if schedule and not is_cron_schedule_valid(schedule):
message += ". stats_cron_schedule is invalid"
# Deferred restarts should be managed by _determine_os_workload_status
# but rabbits wlm code needs refactoring to make it consistent with
# other charms as any message returned by _determine_os_workload_status
# is currently dropped on the floor if: state == 'active'
events = defaultdict(set)
for e in deferred_events.get_deferred_events():
events[e.action].add(e.service)
for action, svcs in events.items():
svc_msg = "Services queued for {}: {}".format(
action, ', '.join(sorted(svcs)))
message = "{}. {}".format(message, svc_msg)
deferred_hooks = deferred_events.get_deferred_hooks()
if deferred_hooks:
svc_msg = "Hooks skipped due to disabled auto restarts: {}".format(
', '.join(sorted(deferred_hooks)))
message = "{}. {}".format(message, svc_msg)
serial = coordinator.Serial()
requested_locks = [k for k in COORD_KEYS if serial.requested(k)]
if requested_locks:
message = "{}. {}".format(
message,
'Waiting for {} lock(s)'.format(','.join(requested_locks)))
state = "waiting"
status_set(state, message)
return _assess_status_func
def pause_unit_helper(configs):
"""Helper function to pause a unit, and then call assess_status(...) in
effect, so that the status is correctly updated.
Uses charmhelpers.contrib.openstack.utils.pause_unit() to do the work.
@param configs: a templating.OSConfigRenderer() object
@returns None - this function is executed for its side-effect
"""
_pause_resume_helper(pause_unit, configs)
def resume_unit_helper(configs):
"""Helper function to resume a unit, and then call assess_status(...) in
effect, so that the status is correctly updated.
Uses charmhelpers.contrib.openstack.utils.resume_unit() to do the work.
@param configs: a templating.OSConfigRenderer() object
@returns None - this function is executed for its side-effect
"""
_pause_resume_helper(resume_unit, configs)
def _pause_resume_helper(f, configs):
"""Helper function that uses the make_assess_status_func(...) from
charmhelpers.contrib.openstack.utils to create an assess_status(...)
function that can be used with the pause/resume of the unit
@param f: the function to be used with the assess_status(...) function
@returns None - this function is executed for its side-effect
"""
# TODO(ajkavanagh) - ports= has been left off because of the race hazard
# that exists due to service_start()
f(assess_status_func(configs),
services=services(),
ports=None)
def get_unit_hostname():
"""Return this unit's hostname.
@returns hostname
"""
return socket.gethostname()
def is_sufficient_peers():
"""Sufficient number of expected peers to build a complete cluster
If min-cluster-size has been provided, check that we have sufficient
number of peers who have presented a hostname for a complete cluster.
If not defined assume a single unit.
@returns boolean
"""
min_size = config('min-cluster-size')
if min_size:
log("Checking for minimum of {} peer units".format(min_size),
level=DEBUG)
# Include this unit
units = 1
for rid in relation_ids('cluster'):
for unit in related_units(rid):
if relation_get(attribute='hostname',
rid=rid, unit=unit):
units += 1
if units < min_size:
log("Insufficient number of peer units to form cluster "
"(expected=%s, got=%s)" % (min_size, units), level=INFO)
return False
else:
log("Sufficient number of peer units to form cluster {}"
"".format(min_size), level=DEBUG)
return True
else:
log("min-cluster-size is not defined, race conditions may occur if "
"this is not a single unit deployment.", level=WARNING)
return True
def rabbitmq_is_installed():
"""Determine if rabbitmq is installed
@returns boolean
"""
return os.path.exists(RABBITMQ_CTL)
def cluster_ready():
"""Determine if each node in the cluster is ready and the cluster is
complete with the expected number of peers.
Once cluster_ready returns True it is safe to execute client relation
hooks. Having min-cluster-size set will guarantee cluster_ready will not
return True until the expected number of peers are clustered and ready.
If min-cluster-size is not set it must assume the cluster is ready in order
to allow for single unit deployments.
@returns boolean
"""
min_size = config('min-cluster-size')
units = 1
for rid in relation_ids('cluster'):
units += len(related_units(rid))
if not min_size:
min_size = units
if not is_sufficient_peers():
return False
elif min_size > 1:
if not clustered():
return False
clustered_units = 1
for rid in relation_ids('cluster'):
for remote_unit in related_units(rid):
if not relation_get(attribute='clustered',
rid=rid,
unit=remote_unit):
log("{} is not yet clustered".format(remote_unit),
DEBUG)
return False
else:
clustered_units += 1
if clustered_units < min_size:
log("Fewer than minimum cluster size:{} rabbit units reporting "
"clustered".format(min_size),
DEBUG)
return False
else:
log("All {} rabbit units reporting clustered"
"".format(min_size),
DEBUG)
return True
log("Must assume this is a single unit returning 'cluster' ready", DEBUG)
return True
def client_node_is_ready():
"""Determine if the leader node has set amqp client data
@returns boolean
"""
# Bail if this unit is paused
if is_unit_paused_set():
return False
for rid in relation_ids('amqp'):
if leader_get(attribute='{}_password'.format(rid)):
return True
return False
def leader_node_is_ready():
"""Determine if the leader node is ready to handle client relationship
hooks.
IFF rabbit is not paused, is installed, this is the leader node and the
cluster is complete.
@returns boolean
"""
# Paused check must run before other checks
# Bail if this unit is paused
if is_unit_paused_set():
return False
return (rabbitmq_is_installed() and
is_leader() and
cluster_ready())
def archive_upgrade_available():
"""Check if the change in sources.list would warrant running
apt-get update/upgrade
@returns boolean:
True: the "source" had changed, so upgrade is available
False: the "source" had not changed, no upgrade needed
"""
log('checking if upgrade is available', DEBUG)
c = config()
old_source = c.previous('source')
log('Previous "source" config options was: {}'.format(old_source), DEBUG)
new_source = c['source']
log('Current "source" config options is: {}'.format(new_source), DEBUG)
if old_source != new_source:
log('The "source" config option change warrants the upgrade.', INFO)
return old_source != new_source
def install_or_upgrade_packages():
"""Run apt-get update/upgrade mantra.
This is called from either install hook, or from config-changed,
if upgrade is warranted
"""
status_set('maintenance', 'Installing/upgrading RabbitMQ packages')
apt_update(fatal=True)
apt_install(PACKAGES, fatal=True)
def remove_file(path):
"""Delete the file or skip it if not exist.
:param path: the file to delete
:type path: str
"""
if os.path.isfile(path):
os.remove(path)
elif os.path.exists(path):
log('{} path is not file'.format(path), level='ERROR')
else:
log('{} file does not exist'.format(path), level='DEBUG')
def management_plugin_enabled():
"""Check if management plugin should be enabled.
:returns: Whether anagement plugin should be enabled
:rtype: bool
"""
_release = lsb_release()['DISTRIB_CODENAME'].lower()
if CompareHostReleases(_release) < "bionic":
return False
else:
return config('management_plugin') is True
def sync_nrpe_files():
"""Sync all NRPE-related files.
Copy all the custom NRPE scripts and create the cron file to run
rabbitmq stats collection
"""
if not os.path.exists(NAGIOS_PLUGINS):
os.makedirs(NAGIOS_PLUGINS, exist_ok=True)
if config('ssl'):
rsync(os.path.join(charm_dir(), 'files', 'check_rabbitmq.py'),
os.path.join(NAGIOS_PLUGINS, 'check_rabbitmq.py'))
if config('queue_thresholds') and config('stats_cron_schedule'):
rsync(os.path.join(charm_dir(), 'files', 'check_rabbitmq_queues.py'),
os.path.join(NAGIOS_PLUGINS, 'check_rabbitmq_queues.py'))
if management_plugin_enabled():
rsync(os.path.join(charm_dir(), 'files', 'check_rabbitmq_cluster.py'),
os.path.join(NAGIOS_PLUGINS, 'check_rabbitmq_cluster.py'))
if config('stats_cron_schedule'):
rsync(os.path.join(charm_dir(), 'files', 'collect_rabbitmq_stats.sh'),
os.path.join(SCRIPTS_DIR, 'collect_rabbitmq_stats.sh'))
cronjob = CRONJOB_CMD.format(
schedule=config('stats_cron_schedule'),
timeout=config('cron-timeout'),
command=os.path.join(SCRIPTS_DIR, 'collect_rabbitmq_stats.sh'))
write_file(STATS_CRONFILE, cronjob)
def remove_nrpe_files():
"""Remove the cron file and all the custom NRPE scripts."""
if not config('stats_cron_schedule'):
# These scripts are redundant if the value `stats_cron_schedule`
# isn't in the config
remove_file(STATS_CRONFILE)
remove_file(os.path.join(SCRIPTS_DIR, 'collect_rabbitmq_stats.sh'))
if not config('ssl'):
# This script is redundant if the value `ssl` isn't in the config
remove_file(os.path.join(NAGIOS_PLUGINS, 'check_rabbitmq.py'))
if not config('queue_thresholds') or not config('stats_cron_schedule'):
# This script is redundant if the value `queue_thresholds` or
# `stats_cron_schedule` isn't in the config
remove_file(os.path.join(NAGIOS_PLUGINS, 'check_rabbitmq_queues.py'))
if not management_plugin_enabled():
# This script is redundant if the value `management_plugin` isn't
# in the config
remove_file(os.path.join(NAGIOS_PLUGINS, 'check_rabbitmq_cluster.py'))
def get_nrpe_credentials():
"""Get the NRPE hostname, unit and user details.
:returns: (hostname, unit, vhosts, user, password)
:rtype: Tuple[str, str, List[Dict[str, str]], str, str]
"""
# Find out if nrpe set nagios_hostname
hostname = nrpe.get_nagios_hostname()
unit = nrpe.get_nagios_unit_name()
# create unique user and vhost for each unit
current_unit = local_unit().replace('/', '-')
user = 'nagios-{}'.format(current_unit)
vhosts = [{'vhost': user, 'shortname': RABBIT_USER}]
password = get_rabbit_password(user, local=True)
create_user(user, password, ['monitoring'])
if config('check-vhosts'):
for other_vhost in config('check-vhosts').split(' '):
if other_vhost:
item = {'vhost': other_vhost,
'shortname': 'rabbit_{}'.format(other_vhost)}
vhosts.append(item)
return hostname, unit, vhosts, user, password
def nrpe_update_vhost_check(nrpe_compat, unit, user, password, vhost):
"""Add/Remove the RabbitMQ non-SSL check
If the SSL is set to `off` or `on`, it will add the non-SSL RabbitMQ check,
otherwise it will remove it.
:param nrpe_compat: the NRPE class object
:type: nrpe.NRPE
:param unit: NRPE unit
:type: str
:param user: username of NRPE user
:type: str
:param password: password of NRPE user
:type: str
:param vhost: dictionary with vhost and shortname
:type: Dict[str, str]
"""
ssl_config = config('ssl') or ''
if ssl_config.lower() in ['off', 'on']:
log('Adding rabbitmq non-SSL check for {}'.format(vhost['vhost']),
level=DEBUG)
nrpe_compat.add_check(
shortname=vhost['shortname'],
description='Check RabbitMQ {} {}'.format(unit, vhost['vhost']),
check_cmd='{}/check_rabbitmq.py --user {} --password {} '
'--vhost {}'.format(
NAGIOS_PLUGINS, user, password, vhost['vhost']))
else:
log('Removing rabbitmq non-SSL check for {}'.format(vhost['vhost']),
level=DEBUG)
nrpe_compat.remove_check(
shortname=vhost['shortname'],
description='Remove check RabbitMQ {} {}'.format(
unit, vhost['vhost']),
check_cmd='{}/check_rabbitmq.py'.format(NAGIOS_PLUGINS))
def nrpe_update_vhost_ssl_check(nrpe_compat, unit, user, password, vhost):
"""Add/Remove the RabbitMQ SSL check
If the SSL is set to `only` or `on`, it will add the SSL RabbitMQ check,
otherwise it will remove it.
:param nrpe_compat: the NRPE class object
:type: nrpe.NRPE
:param unit: NRPE unit
:type: str
:param user: username of NRPE user
:type: str
:param password: password of NRPE user
:type: str
:param vhost: dictionary with vhost and shortname
:type: Dict[str, str]
"""
ssl_config = config('ssl') or ''
if ssl_config.lower() in ['only', 'on']:
log('Adding rabbitmq SSL check for {}'.format(vhost['vhost']),
level=DEBUG)
nrpe_compat.add_check(
shortname=vhost['shortname'] + "_ssl",
description='Check RabbitMQ (SSL) {} {}'.format(
unit, vhost['vhost']),
check_cmd='{}/check_rabbitmq.py --user {} --password {} '
'--vhost {} --ssl --ssl-ca {} --port {}'.format(
NAGIOS_PLUGINS, user, password, vhost['vhost'],
SSL_CA_FILE, int(config('ssl_port'))))
else:
log('Removing rabbitmq SSL check for {}'.format(vhost['vhost']),
level=DEBUG)
nrpe_compat.remove_check(
shortname=vhost['shortname'] + "_ssl",
description='Remove check RabbitMQ (SSL) {} {}'.format(
unit, vhost['vhost']),
check_cmd='{}/check_rabbitmq.py'.format(NAGIOS_PLUGINS))
def is_cron_schedule_valid(cron_schedule):
"""Returns whether or not the stats_cron_schedule can be properly parsed.
:param cron_schedule: the cron schedule to validate
:return: True if the cron schedule defined can be parsed by the croniter
library, False otherwise
"""
try:
croniter(cron_schedule).get_next(datetime)
return True
except CroniterBadCronError:
return False
def get_max_stats_file_age():
"""Returns the max stats file age for NRPE checks.
Max stats file age is determined by a heuristic of 2x the configured
interval in the stats_cron_schedule config value.
:return: the maximum age (in seconds) the queues check should consider
a stats file as aged. If a cron schedule is not defined,
then return 0.
:rtype: int
"""
cron_schedule = config('stats_cron_schedule')
if not cron_schedule:
return 0
try:
it = croniter(cron_schedule)
interval = it.get_next(datetime) - it.get_prev(datetime)
return int(interval.total_seconds() * 2)
except CroniterBadCronError as err:
# The config value is being passed straight into croniter and it may
# not be valid which will cause croniter to raise an error. Catch any
# of the errors raised.
log('Specified cron schedule is invalid: %s' % err,
level=ERROR)
return 0
def nrpe_update_queues_check(nrpe_compat, rabbit_dir):
"""Add/Remove the RabbitMQ queues check
The RabbitMQ Queues check should be added if the `queue_thresholds` and
the `stats_cron_schedule` variables are in the configuration. Otherwise,
this check should be removed.
The cron job configured with the `stats_cron_schedule`
variable is responsible for creating the data files read by this check.
:param nrpe_compat: the NRPE class object
:type: nrpe.NRPE
:param rabbit_dir: path to the RabbitMQ directory
:type: str
"""
stats_datafile = os.path.join(
rabbit_dir, 'data', '{}_queue_stats.dat'.format(get_unit_hostname()))
if config('queue_thresholds') and config('stats_cron_schedule'):
cmd = ""
# If value of queue_thresholds is incorrect we want the hook to fail
for item in yaml.safe_load(config('queue_thresholds')):
cmd += ' -c "{}" "{}" {} {}'.format(*item)
for item in yaml.safe_load(config('exclude_queues')):
cmd += ' -e "{}" "{}"'.format(*item)
busiest_queues = config('busiest_queues')
if busiest_queues is not None and int(busiest_queues) > 0:
cmd += ' -d "{}"'.format(busiest_queues)
max_age = get_max_stats_file_age()
if max_age > 0:
cmd += ' -m {}'.format(max_age)
nrpe_compat.add_check(
shortname=RABBIT_USER + '_queue',
description='Check RabbitMQ Queues',
check_cmd='{}/check_rabbitmq_queues.py{} {}'.format(
NAGIOS_PLUGINS, cmd, stats_datafile))
else:
log('Removing rabbitmq Queues check', level=DEBUG)
nrpe_compat.remove_check(
shortname=RABBIT_USER + '_queue',
description='Remove check RabbitMQ Queues',
check_cmd='{}/check_rabbitmq_queues.py'.format(NAGIOS_PLUGINS))
def nrpe_update_cluster_check(nrpe_compat, user, password):
"""Add/Remove the RabbitMQ cluster check
If the management_plugin is set to `True`, it will add the cluster RabbitMQ
check, otherwise it will remove it.
:param nrpe_compat: the NRPE class object
:type: nrpe.NRPE
:param user: username of NRPE user
:type: str
:param password: password of NRPE user
:type: str
"""
if management_plugin_enabled():
cmd = '{}/check_rabbitmq_cluster.py --port {} ' \
'--user {} --password {}'.format(
NAGIOS_PLUGINS, get_managment_port(), user, password)
nrpe_compat.add_check(
shortname=RABBIT_USER + '_cluster',
description='Check RabbitMQ Cluster',
check_cmd=cmd)
else:
log('Removing rabbitmq Cluster check', level=DEBUG)
nrpe_compat.remove_check(
shortname=RABBIT_USER + '_cluster',
description='Remove check RabbitMQ Cluster',
check_cmd='{}/check_rabbitmq_cluster.py'.format(NAGIOS_PLUGINS))
def rabbitmq_version_newer_or_equal(version):
"""Compare the installed RabbitMQ version
:param version: Version to compare with
:type: str
:returns: True if the installed RabbitMQ version is newer or equal.
:rtype: bool
"""
rmq_version = get_upstream_version(VERSION_PACKAGE)
return apt_pkg.version_compare(rmq_version, version) >= 0