charm-rabbitmq-server/hooks/rabbitmq_server_relations.py

549 lines
17 KiB
Python
Executable File

#!/usr/bin/python
import base64
import os
import shutil
import sys
import subprocess
import glob
import rabbit_utils as rabbit
from lib.utils import (
chown, chmod,
is_newer,
)
from charmhelpers.contrib.hahelpers.cluster import (
is_clustered,
eligible_leader
)
import charmhelpers.contrib.storage.linux.ceph as ceph
from charmhelpers.contrib.openstack.utils import save_script_rc
from charmhelpers.fetch import (
add_source,
apt_update,
apt_install)
from charmhelpers.core.hookenv import (
open_port, close_port,
log, ERROR,
relation_get,
relation_set,
relation_ids,
related_units,
service_name,
local_unit,
config,
unit_get,
is_relation_made,
Hooks,
UnregisteredHookError
)
from charmhelpers.core.host import (
rsync, service_stop, service_restart
)
from charmhelpers.contrib.charmsupport.nrpe import NRPE
from charmhelpers.contrib.ssl.service import ServiceCA
from charmhelpers.contrib.peerstorage import (
peer_echo,
peer_store,
peer_retrieve
)
hooks = Hooks()
SERVICE_NAME = os.getenv('JUJU_UNIT_NAME').split('/')[0]
POOL_NAME = SERVICE_NAME
RABBIT_DIR = '/var/lib/rabbitmq'
RABBIT_USER = 'rabbitmq'
RABBIT_GROUP = 'rabbitmq'
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
@hooks.hook('install')
def install():
pre_install_hooks()
# NOTE(jamespage) install actually happens in config_changed hook
def configure_amqp(username, vhost):
# get and update service password
password = rabbit.get_rabbit_password(username)
# update vhost
rabbit.create_vhost(vhost)
rabbit.create_user(username, password)
rabbit.grant_permissions(username, vhost)
return password
@hooks.hook('amqp-relation-changed')
def amqp_changed(relation_id=None, remote_unit=None):
if not eligible_leader('res_rabbitmq_vip'):
log('amqp_changed(): Deferring amqp_changed'
' to eligible_leader.')
return
relation_settings = {}
settings = relation_get(rid=relation_id, unit=remote_unit)
singleset = set(['username', 'vhost'])
if singleset.issubset(settings):
if None in [settings['username'], settings['vhost']]:
log('amqp_changed(): Relation not ready.')
return
relation_settings['password'] = configure_amqp(
username=settings['username'],
vhost=settings['vhost'])
else:
queues = {}
for k, v in settings.iteritems():
amqp = k.split('_')[0]
x = '_'.join(k.split('_')[1:])
if amqp not in queues:
queues[amqp] = {}
queues[amqp][x] = v
for amqp in queues:
if singleset.issubset(queues[amqp]):
relation_settings[
'_'.join([amqp, 'password'])] = configure_amqp(
queues[amqp]['username'],
queues[amqp]['vhost'])
relation_settings['hostname'] = unit_get('private-address')
configure_client_ssl(relation_settings)
if is_clustered():
relation_settings['clustered'] = 'true'
if is_relation_made('ha'):
# active/passive settings
relation_settings['vip'] = config('vip')
# or ha-vip-only to support active/active, but
# accessed via a VIP for older clients.
if config('ha-vip-only') is True:
relation_settings['ha-vip-only'] = 'true'
if relation_id:
relation_settings['rid'] = relation_id
# set if need HA queues or not
if rabbit.compare_version('3.0.1') < 0:
relation_settings['ha_queues'] = True
relation_set(relation_settings=relation_settings)
@hooks.hook('cluster-relation-joined')
def cluster_joined():
if is_relation_made('ha') and \
config('ha-vip-only') is False:
log('hacluster relation is present, skipping native '
'rabbitmq cluster config.')
return
if is_newer():
log('cluster_joined: Relation greater.')
return
rabbit.COOKIE_PATH = '/var/lib/rabbitmq/.erlang.cookie'
if not os.path.isfile(rabbit.COOKIE_PATH):
log('erlang cookie missing from %s' % rabbit.COOKIE_PATH,
level=ERROR)
return
cookie = open(rabbit.COOKIE_PATH, 'r').read().strip()
peer_store('cookie', cookie)
@hooks.hook('cluster-relation-changed')
def cluster_changed():
rdata = relation_get()
if 'cookie' not in rdata:
log('cluster_joined: cookie not yet set.')
return
# sync passwords
peer_echo()
# sync cookie
cookie = peer_retrieve('cookie')
if open(rabbit.COOKIE_PATH, 'r').read().strip() == cookie:
log('Cookie already synchronized with peer.')
else:
log('Synchronizing erlang cookie from peer.')
rabbit.service('stop')
with open(rabbit.COOKIE_PATH, 'wb') as out:
out.write(cookie)
rabbit.service('start')
if is_relation_made('ha') and \
config('ha-vip-only') is False:
log('hacluster relation is present, skipping native '
'rabbitmq cluster config.')
return
# cluster with node
if is_newer():
if rabbit.cluster_with():
# resync nrpe user after clustering
update_nrpe_checks()
@hooks.hook('cluster-relation-departed')
def cluster_departed():
if is_relation_made('ha') and \
config('ha-vip-only') is False:
log('hacluster relation is present, skipping native '
'rabbitmq cluster config.')
return
if not is_newer():
log('cluster_joined: Relation lesser.')
return
rabbit.break_cluster()
@hooks.hook('ha-relation-joined')
def ha_joined():
corosync_bindiface = config('ha-bindiface')
corosync_mcastport = config('ha-mcastport')
vip = config('vip')
vip_iface = config('vip_iface')
vip_cidr = config('vip_cidr')
rbd_name = config('rbd-name')
vip_only = config('ha-vip-only')
if None in [corosync_bindiface, corosync_mcastport, vip, vip_iface,
vip_cidr, rbd_name] and vip_only is False:
log('Insufficient configuration data to configure hacluster.',
level=ERROR)
sys.exit(1)
elif None in [corosync_bindiface, corosync_mcastport, vip, vip_iface,
vip_cidr] and vip_only is True:
log('Insufficient configuration data to configure VIP-only hacluster.',
level=ERROR)
sys.exit(1)
if not is_relation_made('ceph', 'auth') and vip_only is False:
log('ha_joined: No ceph relation yet, deferring.')
return
name = '%s@localhost' % SERVICE_NAME
if rabbit.get_node_name() != name and vip_only is False:
log('Stopping rabbitmq-server.')
service_stop('rabbitmq-server')
rabbit.set_node_name('%s@localhost' % SERVICE_NAME)
else:
log('Node name already set to %s.' % name)
relation_settings = {}
relation_settings['corosync_bindiface'] = corosync_bindiface
relation_settings['corosync_mcastport'] = corosync_mcastport
if vip_only is True:
relation_settings['resources'] = {
'res_rabbitmq_vip': 'ocf:heartbeat:IPaddr2',
}
relation_settings['resource_params'] = {
'res_rabbitmq_vip': 'params ip="%s" cidr_netmask="%s" nic="%s"' %
(vip, vip_cidr, vip_iface),
}
else:
relation_settings['resources'] = {
'res_rabbitmq_rbd': 'ocf:ceph:rbd',
'res_rabbitmq_fs': 'ocf:heartbeat:Filesystem',
'res_rabbitmq_vip': 'ocf:heartbeat:IPaddr2',
'res_rabbitmq-server': 'lsb:rabbitmq-server',
}
relation_settings['resource_params'] = {
'res_rabbitmq_rbd': 'params name="%s" pool="%s" user="%s" '
'secret="%s"' %
(rbd_name, POOL_NAME,
SERVICE_NAME, ceph._keyfile_path(
SERVICE_NAME)),
'res_rabbitmq_fs': 'params device="/dev/rbd/%s/%s" directory="%s" '
'fstype="ext4" op start start-delay="10s"' %
(POOL_NAME, rbd_name, RABBIT_DIR),
'res_rabbitmq_vip': 'params ip="%s" cidr_netmask="%s" nic="%s"' %
(vip, vip_cidr, vip_iface),
'res_rabbitmq-server': 'op start start-delay="5s" '
'op monitor interval="5s"',
}
relation_settings['groups'] = {
'grp_rabbitmq':
'res_rabbitmq_rbd res_rabbitmq_fs res_rabbitmq_vip '
'res_rabbitmq-server',
}
for rel_id in relation_ids('ha'):
relation_set(relation_id=rel_id, relation_settings=relation_settings)
env_vars = {
'OPENSTACK_PORT_EPMD': 4369,
'OPENSTACK_PORT_MCASTPORT': config('ha-mcastport'),
}
save_script_rc(**env_vars)
@hooks.hook('ha-relation-changed')
def ha_changed():
if not is_clustered():
return
vip = config('vip')
log('ha_changed(): We are now HA clustered. '
'Advertising our VIP (%s) to all AMQP clients.' %
vip)
# need to re-authenticate all clients since node-name changed.
for rid in relation_ids('amqp'):
for unit in related_units(rid):
amqp_changed(relation_id=rid, remote_unit=unit)
@hooks.hook('ceph-relation-joined')
def ceph_joined():
log('Start Ceph Relation Joined')
#NOTE fixup
#utils.configure_source()
ceph.install()
log('Finish Ceph Relation Joined')
@hooks.hook('ceph-relation-changed')
def ceph_changed():
log('Start Ceph Relation Changed')
auth = relation_get('auth')
key = relation_get('key')
use_syslog = str(config('use-syslog')).lower()
if None in [auth, key]:
log('Missing key or auth in relation')
sys.exit(0)
ceph.configure(service=SERVICE_NAME, key=key, auth=auth,
use_syslog=use_syslog)
if eligible_leader('res_rabbitmq_vip'):
rbd_img = config('rbd-name')
rbd_size = config('rbd-size')
sizemb = int(rbd_size.split('G')[0]) * 1024
blk_device = '/dev/rbd/%s/%s' % (POOL_NAME, rbd_img)
rbd_pool_rep_count = config('ceph-osd-replication-count')
ceph.ensure_ceph_storage(service=SERVICE_NAME, pool=POOL_NAME,
rbd_img=rbd_img, sizemb=sizemb,
fstype='ext4', mount_point=RABBIT_DIR,
blk_device=blk_device,
system_services=['rabbitmq-server'])#,
#rbd_pool_replicas=rbd_pool_rep_count)
subprocess.check_call(['chown', '-R', '%s:%s' %
(RABBIT_USER,RABBIT_GROUP), RABBIT_DIR])
else:
log('This is not the peer leader. Not configuring RBD.')
log('Stopping rabbitmq-server.')
service_stop('rabbitmq-server')
# If 'ha' relation has been made before the 'ceph' relation
# it is important to make sure the ha-relation data is being
# sent.
if is_relation_made('ha'):
log('*ha* relation exists. Triggering ha_joined()')
ha_joined()
else:
log('*ha* relation does not exist.')
log('Finish Ceph Relation Changed')
@hooks.hook('nrpe-external-master-relation-changed')
def update_nrpe_checks():
if os.path.isdir(NAGIOS_PLUGINS):
rsync(os.path.join(os.getenv('CHARM_DIR'), 'scripts',
'check_rabbitmq.py'),
os.path.join(NAGIOS_PLUGINS, 'check_rabbitmq.py'))
# create unique user and vhost for each unit
current_unit = local_unit().replace('/', '-')
user = 'nagios-%s' % current_unit
vhost = 'nagios-%s' % current_unit
password = rabbit.get_rabbit_password(user)
rabbit.create_vhost(vhost)
rabbit.create_user(user, password)
rabbit.grant_permissions(user, vhost)
nrpe_compat = NRPE()
nrpe_compat.add_check(
shortname=rabbit.RABBIT_USER,
description='Check RabbitMQ',
check_cmd='{}/check_rabbitmq.py --user {} --password {} --vhost {}'
''.format(NAGIOS_PLUGINS, user, password, vhost)
)
nrpe_compat.write()
@hooks.hook('upgrade-charm')
def upgrade_charm():
pre_install_hooks()
add_source(config('source'), config('key'))
apt_update(fatal=True)
# Ensure older passwd files in /var/lib/juju are moved to
# /var/lib/rabbitmq which will end up replicated if clustered
for f in [f for f in os.listdir('/var/lib/juju')
if os.path.isfile(os.path.join('/var/lib/juju', f))]:
if f.endswith('.passwd'):
s = os.path.join('/var/lib/juju', f)
d = os.path.join('/var/lib/charm/{}'.format(service_name()), f)
log('upgrade_charm: Migrating stored passwd'
' from %s to %s.' % (s, d))
shutil.move(s, d)
rabbit.migrate_passwords_to_peer_relation()
# explicitly update buggy file name naigos.passwd
old = os.path.join('var/lib/rabbitmq', 'naigos.passwd')
if os.path.isfile(old):
new = os.path.join('var/lib/rabbitmq', 'nagios.passwd')
shutil.move(old, new)
MAN_PLUGIN = 'rabbitmq_management'
def configure_client_ssl(relation_data):
"""Configure client with ssl
"""
ssl_mode, external_ca = _get_ssl_mode()
if ssl_mode == 'off':
return
relation_data['ssl_port'] = config('ssl_port')
if external_ca:
if config('ssl_ca'):
relation_data['ssl_ca'] = base64.b64encode(
config('ssl_ca'))
return
ca = ServiceCA.get_ca()
relation_data['ssl_ca'] = base64.b64encode(ca.get_ca_bundle())
def _get_ssl_mode():
ssl_mode = config('ssl')
external_ca = False
# Legacy config boolean option
ssl_on = config('ssl_enabled')
if ssl_mode == 'off' and ssl_on is False:
ssl_mode = 'off'
elif ssl_mode == 'off' and ssl_on:
ssl_mode = 'on'
ssl_key = config('ssl_key')
ssl_cert = config('ssl_cert')
if all((ssl_key, ssl_cert)):
external_ca = True
return ssl_mode, external_ca
def _convert_from_base64(v):
# Rabbit originally supported pem encoded key/cert in config, play
# nice on upgrades as we now expect base64 encoded key/cert/ca.
if not v:
return v
if v.startswith('-----BEGIN'):
return v
try:
return base64.b64decode(v)
except TypeError:
return v
def reconfigure_client_ssl(ssl_enabled=False):
ssl_config_keys = set(('ssl_key', 'ssl_cert', 'ssl_ca'))
for rid in relation_ids('amqp'):
rdata = relation_get(rid=rid, unit=os.environ['JUJU_UNIT_NAME'])
if not ssl_enabled and ssl_config_keys.intersection(rdata):
# No clean way to remove entirely, but blank them.
relation_set(relation_id=rid, ssl_key='', ssl_cert='', ssl_ca='')
elif ssl_enabled and not ssl_config_keys.intersection(rdata):
configure_client_ssl(rdata)
relation_set(relation_id=rid, **rdata)
def configure_rabbit_ssl():
"""
The legacy config support adds some additional complications.
ssl_enabled = True, ssl = off -> ssl enabled
ssl_enabled = False, ssl = on -> ssl enabled
"""
ssl_mode, external_ca = _get_ssl_mode()
if ssl_mode == 'off':
if os.path.exists(rabbit.RABBITMQ_CONF):
os.remove(rabbit.RABBITMQ_CONF)
close_port(config('ssl_port'))
reconfigure_client_ssl()
return
ssl_key = _convert_from_base64(config('ssl_key'))
ssl_cert = _convert_from_base64(config('ssl_cert'))
ssl_ca = _convert_from_base64(config('ssl_ca'))
ssl_port = config('ssl_port')
# If external managed certs then we need all the fields.
if (ssl_mode in ('on', 'only') and any((ssl_key, ssl_cert)) and
not all((ssl_key, ssl_cert))):
log('If ssl_key or ssl_cert are specified both are required.',
level=ERROR)
sys.exit(1)
if not external_ca:
ssl_cert, ssl_key, ssl_ca = ServiceCA.get_service_cert()
rabbit.enable_ssl(
ssl_key, ssl_cert, ssl_port, ssl_ca,
ssl_only=(ssl_mode == "only"), ssl_client=False)
reconfigure_client_ssl(True)
open_port(ssl_port)
@hooks.hook('config-changed')
def config_changed():
# Add archive source if provided
add_source(config('source'), config('key'))
apt_update(fatal=True)
# Copy in defaults file for updated ulimits
shutil.copyfile('templates/rabbitmq-server', '/etc/default/rabbitmq-server')
# Install packages to ensure any changes to source
# result in an upgrade if applicable.
apt_install(rabbit.PACKAGES, fatal=True)
open_port(5672)
chown(RABBIT_DIR, rabbit.RABBIT_USER, rabbit.RABBIT_USER)
chmod(RABBIT_DIR, 0o775)
if config('management_plugin') is True:
rabbit.enable_plugin(MAN_PLUGIN)
open_port(55672)
else:
rabbit.disable_plugin(MAN_PLUGIN)
close_port(55672)
configure_rabbit_ssl()
if eligible_leader('res_rabbitmq_vip') or \
config('ha-vip-only') is True:
service_restart('rabbitmq-server')
update_nrpe_checks()
def pre_install_hooks():
for f in glob.glob('exec.d/*/charm-pre-install'):
if os.path.isfile(f) and os.access(f, os.X_OK):
subprocess.check_call(['sh', '-c', f])
if __name__ == '__main__':
try:
hooks.execute(sys.argv)
except UnregisteredHookError as e:
log('Unknown hook {} - skipping.'.format(e))