Merge "Refactor amqp_changed to make it less noisy"

This commit is contained in:
Jenkins 2017-06-30 08:59:10 +00:00 committed by Gerrit Code Review
commit 04f67e13d9
3 changed files with 257 additions and 88 deletions

View File

@ -58,11 +58,13 @@ from charmhelpers.core.hookenv import (
open_port,
close_port,
log,
DEBUG,
ERROR,
INFO,
relation_get,
relation_clear,
relation_set,
relation_id as get_relation_id,
relation_ids,
related_units,
service_name,
@ -94,6 +96,7 @@ from charmhelpers.contrib.peerstorage import (
leader_get,
)
from charmhelpers.core.unitdata import kv
hooks = Hooks()
@ -117,10 +120,71 @@ def install():
# NOTE(jamespage) install actually happens in config_changed hook
def configure_amqp(username, vhost, admin=False):
def validate_amqp_config_tracker(f):
"""Decorator to mark all existing tracked amqp configs as stale so that
they are refreshed the next time the current unit leader.
"""
def _validate_amqp_config_tracker(*args, **kwargs):
if not is_leader():
kvstore = kv()
tracker = kvstore.get('amqp_config_tracker')
if tracker:
for rid in tracker:
tracker[rid]['stale'] = True
kvstore.set(key='amqp_config_tracker', value=tracker)
kvstore.flush()
return f(*args, **kwargs)
return _validate_amqp_config_tracker
def configure_amqp(username, vhost, relation_id, admin=False):
"""Configure rabbitmq server.
This function creates user/password, vhost and sets user permissions. It
also enabales mirroring queues if requested.
Calls to rabbitmqctl are costly and as such we aim to limit them by only
doing them if we detect that a settings needs creating or updating. To
achieve this we track what we set by storing key/value pairs associated
with a particular relation id in a local database.
Since this function is only supposed to be called by the cluster leader,
the database is expected to be invalidated if it exists and we are no
longer leader so as to ensure that a leader switch results in a
rabbitmq configuraion consistent with the current leader's view.
:param username: client username.
:param vhost: vhost name.
:param relation_id: optional relation id used to identify the context of
this operation. This should always be provided
so that we can track what has been set.
:param admin: boolean value defining whether the new user is admin.
:returns: user password
"""
log("Configuring rabbitmq for user '{}' vhost '{}' (rid={})".
format(username, vhost, relation_id), DEBUG)
if not relation_id:
raise Exception("Invalid relation id '{}' provided to "
"{}()".format(relation_id, configure_amqp.__name__))
# get and update service password
password = rabbit.get_rabbit_password(username)
expected = {'username': username, 'vhost': vhost,
'mirroring-queues': config('mirroring-queues')}
kvstore = kv()
tracker = kvstore.get('amqp_config_tracker') or {}
val = tracker.get(relation_id)
if val == expected and not val.get('stale'):
log("Rabbit already configured for relation "
"'{}'".format(relation_id), DEBUG)
return password
else:
tracker[relation_id] = expected
# update vhost
rabbit.create_vhost(vhost)
rabbit.create_user(username, password, admin)
@ -132,6 +196,9 @@ def configure_amqp(username, vhost, admin=False):
if config('mirroring-queues'):
rabbit.set_ha_mode(vhost, 'all')
kvstore.set(key='amqp_config_tracker', value=tracker)
kvstore.flush()
return password
@ -147,91 +214,95 @@ def update_clients():
amqp_changed(relation_id=rid, remote_unit=unit)
@validate_amqp_config_tracker
@hooks.hook('amqp-relation-changed')
def amqp_changed(relation_id=None, remote_unit=None):
singleset = set(['username', 'vhost'])
host_addr = rabbit.get_unit_ip()
# TODO: Simplify what the non-leader needs to do
if not is_leader() and rabbit.client_node_is_ready():
# NOTE(jamespage) clear relation to deal with data being
# removed from peer storage
relation_clear(relation_id)
# Each unit needs to set the db information otherwise if the unit
# with the info dies the settings die with it Bug# 1355848
exc_list = ['hostname', 'private-address']
for rel_id in relation_ids('amqp'):
peerdb_settings = peer_retrieve_by_prefix(rel_id,
exc_list=exc_list)
peerdb_settings['hostname'] = host_addr
peerdb_settings['private-address'] = host_addr
if 'password' in peerdb_settings:
relation_set(relation_id=rel_id, **peerdb_settings)
log('amqp_changed(): Deferring amqp_changed'
' to the leader.')
if rabbit.leader_node_is_ready():
relation_settings = {'hostname': host_addr,
'private-address': host_addr}
# NOTE: active/active case
if config('prefer-ipv6'):
relation_settings = {'private-address': host_addr}
relation_set(relation_id=relation_id,
relation_settings=relation_settings)
relation_settings['private-address'] = host_addr
return
current = relation_get(rid=relation_id, unit=remote_unit)
if singleset.issubset(current):
if not all([current.get('username'), current.get('vhost')]):
log('Relation not ready.', DEBUG)
return
# Bail if not completely ready
if not rabbit.leader_node_is_ready():
return
# Provide credentials to relations. If password is already
# available on peer relation then use it instead of reconfiguring.
username = current['username']
vhost = current['vhost']
admin = current.get('admin', False)
amqp_rid = relation_id or get_relation_id()
password = configure_amqp(username, vhost, amqp_rid, admin=admin)
relation_settings['password'] = password
else:
# NOTE(hopem): we should look at removing this code since i don't
# think it's ever used anymore and stems from the days
# when we needed to ensure consistency between
# peerstorage (replaced by leader get/set) and amqp
# relations.
queues = {}
for k, v in current.iteritems():
amqp_rid = k.split('_')[0]
x = '_'.join(k.split('_')[1:])
if amqp_rid not in queues:
queues[amqp_rid] = {}
relation_settings = {}
settings = relation_get(rid=relation_id, unit=remote_unit)
queues[amqp_rid][x] = v
singleset = set(['username', 'vhost'])
for amqp_rid in queues:
if singleset.issubset(queues[amqp_rid]):
username = queues[amqp_rid]['username']
vhost = queues[amqp_rid]['vhost']
password = configure_amqp(username, vhost, amqp_rid,
admin=admin)
key = '_'.join([amqp_rid, 'password'])
relation_settings[key] = password
if singleset.issubset(settings):
if None in [settings['username'], settings['vhost']]:
log('amqp_changed(): Relation not ready.')
return
ssl_utils.configure_client_ssl(relation_settings)
relation_settings['password'] = configure_amqp(
username=settings['username'],
vhost=settings['vhost'],
admin=settings.get('admin', False))
else:
queues = {}
for k, v in settings.iteritems():
amqp = k.split('_')[0]
x = '_'.join(k.split('_')[1:])
if amqp not in queues:
queues[amqp] = {}
queues[amqp][x] = v
for amqp in queues:
if singleset.issubset(queues[amqp]):
relation_settings[
'_'.join([amqp, 'password'])] = configure_amqp(
queues[amqp]['username'],
queues[amqp]['vhost'])
if is_clustered():
relation_settings['clustered'] = 'true'
# NOTE(dosaboy): this stanza can be removed once we fully remove
# deprecated HA support.
if is_relation_made('ha'):
# active/passive settings
relation_settings['vip'] = config('vip')
# or ha-vip-only to support active/active, but
# accessed via a VIP for older clients.
if config('ha-vip-only') is True:
relation_settings['ha-vip-only'] = 'true'
relation_settings['hostname'] = \
relation_settings['private-address'] = \
rabbit.get_unit_ip()
# set if need HA queues or not
if cmp_pkgrevno('rabbitmq-server', '3.0.1') < 0:
relation_settings['ha_queues'] = True
ssl_utils.configure_client_ssl(relation_settings)
log("Updating relation {} keys {}"
.format(relation_id or get_relation_id(),
','.join(relation_settings.keys())), DEBUG)
peer_store_and_set(relation_id=relation_id,
relation_settings=relation_settings)
elif not is_leader() and rabbit.client_node_is_ready():
log("Propagating peer settings to all amqp relations", DEBUG)
if is_clustered():
relation_settings['clustered'] = 'true'
if is_relation_made('ha'):
# active/passive settings
relation_settings['vip'] = config('vip')
# or ha-vip-only to support active/active, but
# accessed via a VIP for older clients.
if config('ha-vip-only') is True:
relation_settings['ha-vip-only'] = 'true'
# NOTE(jamespage) clear relation to deal with data being
# removed from peer storage.
relation_clear(relation_id)
# set if need HA queues or not
if cmp_pkgrevno('rabbitmq-server', '3.0.1') < 0:
relation_settings['ha_queues'] = True
peer_store_and_set(relation_id=relation_id,
relation_settings=relation_settings)
# Each unit needs to set the db information otherwise if the unit
# with the info dies the settings die with it Bug# 1355848
for rel_id in relation_ids('amqp'):
peerdb_settings = peer_retrieve_by_prefix(rel_id)
if 'password' in peerdb_settings:
peerdb_settings['hostname'] = host_addr
peerdb_settings['private-address'] = host_addr
relation_set(relation_id=rel_id, **peerdb_settings)
@hooks.hook('cluster-relation-joined')
@ -387,7 +458,7 @@ def ha_joined():
log('ha_joined: No ceph relation yet, deferring.')
return
ctxt = {rabbit.ENV_CONF: rabbit.CONFIG_FILES[rabbit.ENV_CONF]}
ctxt = {rabbit.ENV_CONF: rabbit.CONFIG_FILES[rabbit.ENV_CONF]}
rabbit.ConfigRenderer(ctxt).write(rabbit.ENV_CONF)
relation_settings = {}

View File

@ -633,6 +633,27 @@ class RmqBasicDeployment(OpenStackAmuletDeployment):
u.log.info('OK\n')
def check_unit_rmq_cluster_nodes(self, sentry, unit_node_names):
unit_name = sentry.info['unit_name']
nodes = []
errors = []
str_stat = u.get_rmq_cluster_status(sentry)
# make the interesting part of rabbitmqctl cluster_status output
# json-parseable.
if 'nodes,[{disc,' in str_stat:
pos_start = str_stat.find('nodes,[{disc,') + 13
pos_end = str_stat.find(']}]},', pos_start) + 1
str_nodes = str_stat[pos_start:pos_end].replace("'", '"')
nodes = json.loads(str_nodes)
for node in nodes:
if node not in unit_node_names:
errors.append('Cluster registration check failed on {}: '
'{} should not be registered with RabbitMQ '
'after unit removal.\n'
''.format(unit_name, node))
return errors
def test_901_remove_unit(self):
"""Test if a unit correctly cleans up by removing itself from the
RabbitMQ cluster on removal"""
@ -655,22 +676,16 @@ class RmqBasicDeployment(OpenStackAmuletDeployment):
errors = []
for sentry in sentry_units:
unit_name = sentry.info['unit_name']
nodes = []
str_stat = u.get_rmq_cluster_status(sentry)
# make the interesting part of rabbitmqctl cluster_status output
# json-parseable.
if 'nodes,[{disc,' in str_stat:
pos_start = str_stat.find('nodes,[{disc,') + 13
pos_end = str_stat.find(']}]},', pos_start) + 1
str_nodes = str_stat[pos_start:pos_end].replace("'", '"')
nodes = json.loads(str_nodes)
for node in nodes:
if node not in unit_node_names:
errors.append('Cluster registration check failed on {}: '
'{} should not be registered with RabbitMQ '
'after unit removal.\n'
''.format(unit_name, node))
e = self.check_unit_rmq_cluster_nodes(sentry, unit_node_names)
if e:
# NOTE: cluster status may not have been updated yet so wait a
# little and try one more time. Need to find a better way to do
# this.
time.sleep(10)
e = self.check_unit_rmq_cluster_nodes(sentry, unit_node_names)
if e:
errors.append(e)
if errors:
amulet.raise_status(amulet.FAIL, msg=errors)
u.log.debug('OK')

View File

@ -13,10 +13,14 @@
# limitations under the License.
import os
import shutil
import sys
import tempfile
from test_utils import CharmTestCase
from mock import patch, MagicMock
from mock import patch, MagicMock, call
from charmhelpers.core.unitdata import Storage
os.environ['JUJU_UNIT_NAME'] = 'UNIT_TEST/0' # noqa - needed for import
@ -195,3 +199,82 @@ class RelationUtil(CharmTestCase):
rabbitmq_server_relations.update_clients()
mock_amqp_changed.assert_called_with(relation_id='amqp:0',
remote_unit='client/0')
@patch.object(rabbitmq_server_relations, 'is_leader')
@patch.object(rabbitmq_server_relations.rabbit, 'set_ha_mode')
@patch.object(rabbitmq_server_relations.rabbit, 'get_rabbit_password')
@patch.object(rabbitmq_server_relations.rabbit, 'create_vhost')
@patch.object(rabbitmq_server_relations.rabbit, 'create_user')
@patch.object(rabbitmq_server_relations.rabbit, 'grant_permissions')
@patch.object(rabbitmq_server_relations, 'config', lambda *args: True)
def test_configure_amqp(self, mock_grant_permissions, mock_create_vhost,
mock_create_user, mock_get_rabbit_password,
mock_set_ha_mode, mock_is_leader):
mock_is_leader.return_value = True
tmpdir = tempfile.mkdtemp()
try:
db_path = '{}/kv.db'.format(tmpdir)
rid = 'amqp:1'
store = Storage(db_path)
with patch('charmhelpers.core.unitdata._KV', store):
# Check .set
with patch.object(store, 'set') as mock_set:
rabbitmq_server_relations.configure_amqp('user_foo',
'vhost_blah', rid)
d = {rid: {"username": "user_foo", "vhost": "vhost_blah",
"mirroring-queues": True}}
mock_set.assert_has_calls([call(key='amqp_config_tracker',
value=d)])
for m in [mock_grant_permissions, mock_create_vhost,
mock_create_user, mock_set_ha_mode]:
self.assertTrue(m.called)
m.reset_mock()
# Check .get
with patch.object(store, 'get') as mock_get:
mock_get.return_value = d
rabbitmq_server_relations.configure_amqp('user_foo',
'vhost_blah', rid)
mock_set.assert_has_calls([call(key='amqp_config_tracker',
value=d)])
for m in [mock_grant_permissions, mock_create_vhost,
mock_create_user, mock_set_ha_mode]:
self.assertFalse(m.called)
# Check invalid relation id
self.assertRaises(Exception,
rabbitmq_server_relations.configure_amqp,
'user_foo', 'vhost_blah', None, admin=True)
# Test writing data
d = {}
for rid, user in [('amqp:1', 'userA'), ('amqp:2', 'userB')]:
rabbitmq_server_relations.configure_amqp(user,
'vhost_blah', rid)
d.update({rid: {"username": user, "vhost": "vhost_blah",
"mirroring-queues": True}})
self.assertEqual(store.get('amqp_config_tracker'), d)
@rabbitmq_server_relations.validate_amqp_config_tracker
def fake_configure_amqp(*args, **kwargs):
return rabbitmq_server_relations.configure_amqp(*args,
**kwargs)
# Test invalidating data
mock_is_leader.return_value = False
d['amqp:2']['stale'] = True
for rid, user in [('amqp:1', 'userA'), ('amqp:3', 'userC')]:
fake_configure_amqp(user, 'vhost_blah', rid)
d[rid] = {"username": user, "vhost": "vhost_blah",
"mirroring-queues": True, 'stale': True}
# Since this is a dummy case we need to toggle the stale
# values.
del d[rid]['stale']
self.assertEqual(store.get('amqp_config_tracker'), d)
d[rid]['stale'] = True
finally:
if os.path.exists(tmpdir):
shutil.rmtree(tmpdir)