Merge "Refactor amqp_changed to make it less noisy"
This commit is contained in:
commit
04f67e13d9
|
@ -58,11 +58,13 @@ from charmhelpers.core.hookenv import (
|
|||
open_port,
|
||||
close_port,
|
||||
log,
|
||||
DEBUG,
|
||||
ERROR,
|
||||
INFO,
|
||||
relation_get,
|
||||
relation_clear,
|
||||
relation_set,
|
||||
relation_id as get_relation_id,
|
||||
relation_ids,
|
||||
related_units,
|
||||
service_name,
|
||||
|
@ -94,6 +96,7 @@ from charmhelpers.contrib.peerstorage import (
|
|||
leader_get,
|
||||
)
|
||||
|
||||
from charmhelpers.core.unitdata import kv
|
||||
|
||||
hooks = Hooks()
|
||||
|
||||
|
@ -117,10 +120,71 @@ def install():
|
|||
# NOTE(jamespage) install actually happens in config_changed hook
|
||||
|
||||
|
||||
def configure_amqp(username, vhost, admin=False):
|
||||
def validate_amqp_config_tracker(f):
|
||||
"""Decorator to mark all existing tracked amqp configs as stale so that
|
||||
they are refreshed the next time the current unit leader.
|
||||
"""
|
||||
def _validate_amqp_config_tracker(*args, **kwargs):
|
||||
if not is_leader():
|
||||
kvstore = kv()
|
||||
tracker = kvstore.get('amqp_config_tracker')
|
||||
if tracker:
|
||||
for rid in tracker:
|
||||
tracker[rid]['stale'] = True
|
||||
|
||||
kvstore.set(key='amqp_config_tracker', value=tracker)
|
||||
kvstore.flush()
|
||||
|
||||
return f(*args, **kwargs)
|
||||
return _validate_amqp_config_tracker
|
||||
|
||||
|
||||
def configure_amqp(username, vhost, relation_id, admin=False):
|
||||
"""Configure rabbitmq server.
|
||||
|
||||
This function creates user/password, vhost and sets user permissions. It
|
||||
also enabales mirroring queues if requested.
|
||||
|
||||
Calls to rabbitmqctl are costly and as such we aim to limit them by only
|
||||
doing them if we detect that a settings needs creating or updating. To
|
||||
achieve this we track what we set by storing key/value pairs associated
|
||||
with a particular relation id in a local database.
|
||||
|
||||
Since this function is only supposed to be called by the cluster leader,
|
||||
the database is expected to be invalidated if it exists and we are no
|
||||
longer leader so as to ensure that a leader switch results in a
|
||||
rabbitmq configuraion consistent with the current leader's view.
|
||||
|
||||
:param username: client username.
|
||||
:param vhost: vhost name.
|
||||
:param relation_id: optional relation id used to identify the context of
|
||||
this operation. This should always be provided
|
||||
so that we can track what has been set.
|
||||
:param admin: boolean value defining whether the new user is admin.
|
||||
:returns: user password
|
||||
"""
|
||||
log("Configuring rabbitmq for user '{}' vhost '{}' (rid={})".
|
||||
format(username, vhost, relation_id), DEBUG)
|
||||
|
||||
if not relation_id:
|
||||
raise Exception("Invalid relation id '{}' provided to "
|
||||
"{}()".format(relation_id, configure_amqp.__name__))
|
||||
|
||||
# get and update service password
|
||||
password = rabbit.get_rabbit_password(username)
|
||||
|
||||
expected = {'username': username, 'vhost': vhost,
|
||||
'mirroring-queues': config('mirroring-queues')}
|
||||
kvstore = kv()
|
||||
tracker = kvstore.get('amqp_config_tracker') or {}
|
||||
val = tracker.get(relation_id)
|
||||
if val == expected and not val.get('stale'):
|
||||
log("Rabbit already configured for relation "
|
||||
"'{}'".format(relation_id), DEBUG)
|
||||
return password
|
||||
else:
|
||||
tracker[relation_id] = expected
|
||||
|
||||
# update vhost
|
||||
rabbit.create_vhost(vhost)
|
||||
rabbit.create_user(username, password, admin)
|
||||
|
@ -132,6 +196,9 @@ def configure_amqp(username, vhost, admin=False):
|
|||
if config('mirroring-queues'):
|
||||
rabbit.set_ha_mode(vhost, 'all')
|
||||
|
||||
kvstore.set(key='amqp_config_tracker', value=tracker)
|
||||
kvstore.flush()
|
||||
|
||||
return password
|
||||
|
||||
|
||||
|
@ -147,78 +214,63 @@ def update_clients():
|
|||
amqp_changed(relation_id=rid, remote_unit=unit)
|
||||
|
||||
|
||||
@validate_amqp_config_tracker
|
||||
@hooks.hook('amqp-relation-changed')
|
||||
def amqp_changed(relation_id=None, remote_unit=None):
|
||||
singleset = set(['username', 'vhost'])
|
||||
host_addr = rabbit.get_unit_ip()
|
||||
|
||||
# TODO: Simplify what the non-leader needs to do
|
||||
if not is_leader() and rabbit.client_node_is_ready():
|
||||
# NOTE(jamespage) clear relation to deal with data being
|
||||
# removed from peer storage
|
||||
relation_clear(relation_id)
|
||||
# Each unit needs to set the db information otherwise if the unit
|
||||
# with the info dies the settings die with it Bug# 1355848
|
||||
exc_list = ['hostname', 'private-address']
|
||||
for rel_id in relation_ids('amqp'):
|
||||
peerdb_settings = peer_retrieve_by_prefix(rel_id,
|
||||
exc_list=exc_list)
|
||||
peerdb_settings['hostname'] = host_addr
|
||||
peerdb_settings['private-address'] = host_addr
|
||||
if 'password' in peerdb_settings:
|
||||
relation_set(relation_id=rel_id, **peerdb_settings)
|
||||
|
||||
log('amqp_changed(): Deferring amqp_changed'
|
||||
' to the leader.')
|
||||
|
||||
if rabbit.leader_node_is_ready():
|
||||
relation_settings = {'hostname': host_addr,
|
||||
'private-address': host_addr}
|
||||
# NOTE: active/active case
|
||||
if config('prefer-ipv6'):
|
||||
relation_settings = {'private-address': host_addr}
|
||||
relation_set(relation_id=relation_id,
|
||||
relation_settings=relation_settings)
|
||||
relation_settings['private-address'] = host_addr
|
||||
|
||||
current = relation_get(rid=relation_id, unit=remote_unit)
|
||||
if singleset.issubset(current):
|
||||
if not all([current.get('username'), current.get('vhost')]):
|
||||
log('Relation not ready.', DEBUG)
|
||||
return
|
||||
|
||||
# Bail if not completely ready
|
||||
if not rabbit.leader_node_is_ready():
|
||||
return
|
||||
|
||||
relation_settings = {}
|
||||
settings = relation_get(rid=relation_id, unit=remote_unit)
|
||||
|
||||
singleset = set(['username', 'vhost'])
|
||||
|
||||
if singleset.issubset(settings):
|
||||
if None in [settings['username'], settings['vhost']]:
|
||||
log('amqp_changed(): Relation not ready.')
|
||||
return
|
||||
|
||||
relation_settings['password'] = configure_amqp(
|
||||
username=settings['username'],
|
||||
vhost=settings['vhost'],
|
||||
admin=settings.get('admin', False))
|
||||
# Provide credentials to relations. If password is already
|
||||
# available on peer relation then use it instead of reconfiguring.
|
||||
username = current['username']
|
||||
vhost = current['vhost']
|
||||
admin = current.get('admin', False)
|
||||
amqp_rid = relation_id or get_relation_id()
|
||||
password = configure_amqp(username, vhost, amqp_rid, admin=admin)
|
||||
relation_settings['password'] = password
|
||||
else:
|
||||
# NOTE(hopem): we should look at removing this code since i don't
|
||||
# think it's ever used anymore and stems from the days
|
||||
# when we needed to ensure consistency between
|
||||
# peerstorage (replaced by leader get/set) and amqp
|
||||
# relations.
|
||||
queues = {}
|
||||
for k, v in settings.iteritems():
|
||||
amqp = k.split('_')[0]
|
||||
for k, v in current.iteritems():
|
||||
amqp_rid = k.split('_')[0]
|
||||
x = '_'.join(k.split('_')[1:])
|
||||
if amqp not in queues:
|
||||
queues[amqp] = {}
|
||||
queues[amqp][x] = v
|
||||
for amqp in queues:
|
||||
if singleset.issubset(queues[amqp]):
|
||||
relation_settings[
|
||||
'_'.join([amqp, 'password'])] = configure_amqp(
|
||||
queues[amqp]['username'],
|
||||
queues[amqp]['vhost'])
|
||||
if amqp_rid not in queues:
|
||||
queues[amqp_rid] = {}
|
||||
|
||||
relation_settings['hostname'] = \
|
||||
relation_settings['private-address'] = \
|
||||
rabbit.get_unit_ip()
|
||||
queues[amqp_rid][x] = v
|
||||
|
||||
for amqp_rid in queues:
|
||||
if singleset.issubset(queues[amqp_rid]):
|
||||
username = queues[amqp_rid]['username']
|
||||
vhost = queues[amqp_rid]['vhost']
|
||||
password = configure_amqp(username, vhost, amqp_rid,
|
||||
admin=admin)
|
||||
key = '_'.join([amqp_rid, 'password'])
|
||||
relation_settings[key] = password
|
||||
|
||||
ssl_utils.configure_client_ssl(relation_settings)
|
||||
|
||||
if is_clustered():
|
||||
relation_settings['clustered'] = 'true'
|
||||
# NOTE(dosaboy): this stanza can be removed once we fully remove
|
||||
# deprecated HA support.
|
||||
if is_relation_made('ha'):
|
||||
# active/passive settings
|
||||
relation_settings['vip'] = config('vip')
|
||||
|
@ -230,8 +282,27 @@ def amqp_changed(relation_id=None, remote_unit=None):
|
|||
# set if need HA queues or not
|
||||
if cmp_pkgrevno('rabbitmq-server', '3.0.1') < 0:
|
||||
relation_settings['ha_queues'] = True
|
||||
|
||||
log("Updating relation {} keys {}"
|
||||
.format(relation_id or get_relation_id(),
|
||||
','.join(relation_settings.keys())), DEBUG)
|
||||
peer_store_and_set(relation_id=relation_id,
|
||||
relation_settings=relation_settings)
|
||||
elif not is_leader() and rabbit.client_node_is_ready():
|
||||
log("Propagating peer settings to all amqp relations", DEBUG)
|
||||
|
||||
# NOTE(jamespage) clear relation to deal with data being
|
||||
# removed from peer storage.
|
||||
relation_clear(relation_id)
|
||||
|
||||
# Each unit needs to set the db information otherwise if the unit
|
||||
# with the info dies the settings die with it Bug# 1355848
|
||||
for rel_id in relation_ids('amqp'):
|
||||
peerdb_settings = peer_retrieve_by_prefix(rel_id)
|
||||
if 'password' in peerdb_settings:
|
||||
peerdb_settings['hostname'] = host_addr
|
||||
peerdb_settings['private-address'] = host_addr
|
||||
relation_set(relation_id=rel_id, **peerdb_settings)
|
||||
|
||||
|
||||
@hooks.hook('cluster-relation-joined')
|
||||
|
|
|
@ -633,6 +633,27 @@ class RmqBasicDeployment(OpenStackAmuletDeployment):
|
|||
|
||||
u.log.info('OK\n')
|
||||
|
||||
def check_unit_rmq_cluster_nodes(self, sentry, unit_node_names):
|
||||
unit_name = sentry.info['unit_name']
|
||||
nodes = []
|
||||
errors = []
|
||||
str_stat = u.get_rmq_cluster_status(sentry)
|
||||
# make the interesting part of rabbitmqctl cluster_status output
|
||||
# json-parseable.
|
||||
if 'nodes,[{disc,' in str_stat:
|
||||
pos_start = str_stat.find('nodes,[{disc,') + 13
|
||||
pos_end = str_stat.find(']}]},', pos_start) + 1
|
||||
str_nodes = str_stat[pos_start:pos_end].replace("'", '"')
|
||||
nodes = json.loads(str_nodes)
|
||||
for node in nodes:
|
||||
if node not in unit_node_names:
|
||||
errors.append('Cluster registration check failed on {}: '
|
||||
'{} should not be registered with RabbitMQ '
|
||||
'after unit removal.\n'
|
||||
''.format(unit_name, node))
|
||||
|
||||
return errors
|
||||
|
||||
def test_901_remove_unit(self):
|
||||
"""Test if a unit correctly cleans up by removing itself from the
|
||||
RabbitMQ cluster on removal"""
|
||||
|
@ -655,22 +676,16 @@ class RmqBasicDeployment(OpenStackAmuletDeployment):
|
|||
errors = []
|
||||
|
||||
for sentry in sentry_units:
|
||||
unit_name = sentry.info['unit_name']
|
||||
nodes = []
|
||||
str_stat = u.get_rmq_cluster_status(sentry)
|
||||
# make the interesting part of rabbitmqctl cluster_status output
|
||||
# json-parseable.
|
||||
if 'nodes,[{disc,' in str_stat:
|
||||
pos_start = str_stat.find('nodes,[{disc,') + 13
|
||||
pos_end = str_stat.find(']}]},', pos_start) + 1
|
||||
str_nodes = str_stat[pos_start:pos_end].replace("'", '"')
|
||||
nodes = json.loads(str_nodes)
|
||||
for node in nodes:
|
||||
if node not in unit_node_names:
|
||||
errors.append('Cluster registration check failed on {}: '
|
||||
'{} should not be registered with RabbitMQ '
|
||||
'after unit removal.\n'
|
||||
''.format(unit_name, node))
|
||||
e = self.check_unit_rmq_cluster_nodes(sentry, unit_node_names)
|
||||
if e:
|
||||
# NOTE: cluster status may not have been updated yet so wait a
|
||||
# little and try one more time. Need to find a better way to do
|
||||
# this.
|
||||
time.sleep(10)
|
||||
e = self.check_unit_rmq_cluster_nodes(sentry, unit_node_names)
|
||||
if e:
|
||||
errors.append(e)
|
||||
|
||||
if errors:
|
||||
amulet.raise_status(amulet.FAIL, msg=errors)
|
||||
u.log.debug('OK')
|
||||
|
|
|
@ -13,10 +13,14 @@
|
|||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
from test_utils import CharmTestCase
|
||||
from mock import patch, MagicMock
|
||||
from mock import patch, MagicMock, call
|
||||
|
||||
from charmhelpers.core.unitdata import Storage
|
||||
|
||||
os.environ['JUJU_UNIT_NAME'] = 'UNIT_TEST/0' # noqa - needed for import
|
||||
|
||||
|
@ -195,3 +199,82 @@ class RelationUtil(CharmTestCase):
|
|||
rabbitmq_server_relations.update_clients()
|
||||
mock_amqp_changed.assert_called_with(relation_id='amqp:0',
|
||||
remote_unit='client/0')
|
||||
|
||||
@patch.object(rabbitmq_server_relations, 'is_leader')
|
||||
@patch.object(rabbitmq_server_relations.rabbit, 'set_ha_mode')
|
||||
@patch.object(rabbitmq_server_relations.rabbit, 'get_rabbit_password')
|
||||
@patch.object(rabbitmq_server_relations.rabbit, 'create_vhost')
|
||||
@patch.object(rabbitmq_server_relations.rabbit, 'create_user')
|
||||
@patch.object(rabbitmq_server_relations.rabbit, 'grant_permissions')
|
||||
@patch.object(rabbitmq_server_relations, 'config', lambda *args: True)
|
||||
def test_configure_amqp(self, mock_grant_permissions, mock_create_vhost,
|
||||
mock_create_user, mock_get_rabbit_password,
|
||||
mock_set_ha_mode, mock_is_leader):
|
||||
mock_is_leader.return_value = True
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
try:
|
||||
db_path = '{}/kv.db'.format(tmpdir)
|
||||
rid = 'amqp:1'
|
||||
store = Storage(db_path)
|
||||
with patch('charmhelpers.core.unitdata._KV', store):
|
||||
# Check .set
|
||||
with patch.object(store, 'set') as mock_set:
|
||||
rabbitmq_server_relations.configure_amqp('user_foo',
|
||||
'vhost_blah', rid)
|
||||
|
||||
d = {rid: {"username": "user_foo", "vhost": "vhost_blah",
|
||||
"mirroring-queues": True}}
|
||||
mock_set.assert_has_calls([call(key='amqp_config_tracker',
|
||||
value=d)])
|
||||
|
||||
for m in [mock_grant_permissions, mock_create_vhost,
|
||||
mock_create_user, mock_set_ha_mode]:
|
||||
self.assertTrue(m.called)
|
||||
m.reset_mock()
|
||||
|
||||
# Check .get
|
||||
with patch.object(store, 'get') as mock_get:
|
||||
mock_get.return_value = d
|
||||
rabbitmq_server_relations.configure_amqp('user_foo',
|
||||
'vhost_blah', rid)
|
||||
mock_set.assert_has_calls([call(key='amqp_config_tracker',
|
||||
value=d)])
|
||||
for m in [mock_grant_permissions, mock_create_vhost,
|
||||
mock_create_user, mock_set_ha_mode]:
|
||||
self.assertFalse(m.called)
|
||||
|
||||
# Check invalid relation id
|
||||
self.assertRaises(Exception,
|
||||
rabbitmq_server_relations.configure_amqp,
|
||||
'user_foo', 'vhost_blah', None, admin=True)
|
||||
|
||||
# Test writing data
|
||||
d = {}
|
||||
for rid, user in [('amqp:1', 'userA'), ('amqp:2', 'userB')]:
|
||||
rabbitmq_server_relations.configure_amqp(user,
|
||||
'vhost_blah', rid)
|
||||
|
||||
d.update({rid: {"username": user, "vhost": "vhost_blah",
|
||||
"mirroring-queues": True}})
|
||||
self.assertEqual(store.get('amqp_config_tracker'), d)
|
||||
|
||||
@rabbitmq_server_relations.validate_amqp_config_tracker
|
||||
def fake_configure_amqp(*args, **kwargs):
|
||||
return rabbitmq_server_relations.configure_amqp(*args,
|
||||
**kwargs)
|
||||
|
||||
# Test invalidating data
|
||||
mock_is_leader.return_value = False
|
||||
d['amqp:2']['stale'] = True
|
||||
for rid, user in [('amqp:1', 'userA'), ('amqp:3', 'userC')]:
|
||||
fake_configure_amqp(user, 'vhost_blah', rid)
|
||||
d[rid] = {"username": user, "vhost": "vhost_blah",
|
||||
"mirroring-queues": True, 'stale': True}
|
||||
# Since this is a dummy case we need to toggle the stale
|
||||
# values.
|
||||
del d[rid]['stale']
|
||||
self.assertEqual(store.get('amqp_config_tracker'), d)
|
||||
d[rid]['stale'] = True
|
||||
finally:
|
||||
if os.path.exists(tmpdir):
|
||||
shutil.rmtree(tmpdir)
|
||||
|
|
Loading…
Reference in New Issue