From da3a7063bbcf0c262957149d5c6c2fa3d0525599 Mon Sep 17 00:00:00 2001 From: Edward Hope-Morley Date: Fri, 16 Jun 2017 13:37:20 +0100 Subject: [PATCH] Refactor amqp_changed to make it less noisy Currently each and every hook executed results in a sweep of all amqp relations and their related units to ensure config consistency both in rabbit and for remote units. This scales very poorly since rabbtimqctl commands take a while to complete and the majority of the time they are not needed. This patch aims to reduce the impact of performing this set of operations and the amount of time it takes to do them. Change-Id: Ia060ce34052cd543a63d045944b35e4188279f05 Closes-Bug: 1698340 --- hooks/rabbitmq_server_relations.py | 213 ++++++++++++------- tests/basic_deployment.py | 47 ++-- unit_tests/test_rabbitmq_server_relations.py | 85 +++++++- 3 files changed, 257 insertions(+), 88 deletions(-) diff --git a/hooks/rabbitmq_server_relations.py b/hooks/rabbitmq_server_relations.py index 22e6a923..85645658 100755 --- a/hooks/rabbitmq_server_relations.py +++ b/hooks/rabbitmq_server_relations.py @@ -58,11 +58,13 @@ from charmhelpers.core.hookenv import ( open_port, close_port, log, + DEBUG, ERROR, INFO, relation_get, relation_clear, relation_set, + relation_id as get_relation_id, relation_ids, related_units, service_name, @@ -94,6 +96,7 @@ from charmhelpers.contrib.peerstorage import ( leader_get, ) +from charmhelpers.core.unitdata import kv hooks = Hooks() @@ -117,10 +120,71 @@ def install(): # NOTE(jamespage) install actually happens in config_changed hook -def configure_amqp(username, vhost, admin=False): +def validate_amqp_config_tracker(f): + """Decorator to mark all existing tracked amqp configs as stale so that + they are refreshed the next time the current unit leader. + """ + def _validate_amqp_config_tracker(*args, **kwargs): + if not is_leader(): + kvstore = kv() + tracker = kvstore.get('amqp_config_tracker') + if tracker: + for rid in tracker: + tracker[rid]['stale'] = True + + kvstore.set(key='amqp_config_tracker', value=tracker) + kvstore.flush() + + return f(*args, **kwargs) + return _validate_amqp_config_tracker + + +def configure_amqp(username, vhost, relation_id, admin=False): + """Configure rabbitmq server. + + This function creates user/password, vhost and sets user permissions. It + also enabales mirroring queues if requested. + + Calls to rabbitmqctl are costly and as such we aim to limit them by only + doing them if we detect that a settings needs creating or updating. To + achieve this we track what we set by storing key/value pairs associated + with a particular relation id in a local database. + + Since this function is only supposed to be called by the cluster leader, + the database is expected to be invalidated if it exists and we are no + longer leader so as to ensure that a leader switch results in a + rabbitmq configuraion consistent with the current leader's view. + + :param username: client username. + :param vhost: vhost name. + :param relation_id: optional relation id used to identify the context of + this operation. This should always be provided + so that we can track what has been set. + :param admin: boolean value defining whether the new user is admin. + :returns: user password + """ + log("Configuring rabbitmq for user '{}' vhost '{}' (rid={})". + format(username, vhost, relation_id), DEBUG) + + if not relation_id: + raise Exception("Invalid relation id '{}' provided to " + "{}()".format(relation_id, configure_amqp.__name__)) + # get and update service password password = rabbit.get_rabbit_password(username) + expected = {'username': username, 'vhost': vhost, + 'mirroring-queues': config('mirroring-queues')} + kvstore = kv() + tracker = kvstore.get('amqp_config_tracker') or {} + val = tracker.get(relation_id) + if val == expected and not val.get('stale'): + log("Rabbit already configured for relation " + "'{}'".format(relation_id), DEBUG) + return password + else: + tracker[relation_id] = expected + # update vhost rabbit.create_vhost(vhost) rabbit.create_user(username, password, admin) @@ -132,6 +196,9 @@ def configure_amqp(username, vhost, admin=False): if config('mirroring-queues'): rabbit.set_ha_mode(vhost, 'all') + kvstore.set(key='amqp_config_tracker', value=tracker) + kvstore.flush() + return password @@ -147,91 +214,95 @@ def update_clients(): amqp_changed(relation_id=rid, remote_unit=unit) +@validate_amqp_config_tracker @hooks.hook('amqp-relation-changed') def amqp_changed(relation_id=None, remote_unit=None): + singleset = set(['username', 'vhost']) host_addr = rabbit.get_unit_ip() - # TODO: Simplify what the non-leader needs to do - if not is_leader() and rabbit.client_node_is_ready(): - # NOTE(jamespage) clear relation to deal with data being - # removed from peer storage - relation_clear(relation_id) - # Each unit needs to set the db information otherwise if the unit - # with the info dies the settings die with it Bug# 1355848 - exc_list = ['hostname', 'private-address'] - for rel_id in relation_ids('amqp'): - peerdb_settings = peer_retrieve_by_prefix(rel_id, - exc_list=exc_list) - peerdb_settings['hostname'] = host_addr - peerdb_settings['private-address'] = host_addr - if 'password' in peerdb_settings: - relation_set(relation_id=rel_id, **peerdb_settings) - - log('amqp_changed(): Deferring amqp_changed' - ' to the leader.') - + if rabbit.leader_node_is_ready(): + relation_settings = {'hostname': host_addr, + 'private-address': host_addr} # NOTE: active/active case if config('prefer-ipv6'): - relation_settings = {'private-address': host_addr} - relation_set(relation_id=relation_id, - relation_settings=relation_settings) + relation_settings['private-address'] = host_addr - return + current = relation_get(rid=relation_id, unit=remote_unit) + if singleset.issubset(current): + if not all([current.get('username'), current.get('vhost')]): + log('Relation not ready.', DEBUG) + return - # Bail if not completely ready - if not rabbit.leader_node_is_ready(): - return + # Provide credentials to relations. If password is already + # available on peer relation then use it instead of reconfiguring. + username = current['username'] + vhost = current['vhost'] + admin = current.get('admin', False) + amqp_rid = relation_id or get_relation_id() + password = configure_amqp(username, vhost, amqp_rid, admin=admin) + relation_settings['password'] = password + else: + # NOTE(hopem): we should look at removing this code since i don't + # think it's ever used anymore and stems from the days + # when we needed to ensure consistency between + # peerstorage (replaced by leader get/set) and amqp + # relations. + queues = {} + for k, v in current.iteritems(): + amqp_rid = k.split('_')[0] + x = '_'.join(k.split('_')[1:]) + if amqp_rid not in queues: + queues[amqp_rid] = {} - relation_settings = {} - settings = relation_get(rid=relation_id, unit=remote_unit) + queues[amqp_rid][x] = v - singleset = set(['username', 'vhost']) + for amqp_rid in queues: + if singleset.issubset(queues[amqp_rid]): + username = queues[amqp_rid]['username'] + vhost = queues[amqp_rid]['vhost'] + password = configure_amqp(username, vhost, amqp_rid, + admin=admin) + key = '_'.join([amqp_rid, 'password']) + relation_settings[key] = password - if singleset.issubset(settings): - if None in [settings['username'], settings['vhost']]: - log('amqp_changed(): Relation not ready.') - return + ssl_utils.configure_client_ssl(relation_settings) - relation_settings['password'] = configure_amqp( - username=settings['username'], - vhost=settings['vhost'], - admin=settings.get('admin', False)) - else: - queues = {} - for k, v in settings.iteritems(): - amqp = k.split('_')[0] - x = '_'.join(k.split('_')[1:]) - if amqp not in queues: - queues[amqp] = {} - queues[amqp][x] = v - for amqp in queues: - if singleset.issubset(queues[amqp]): - relation_settings[ - '_'.join([amqp, 'password'])] = configure_amqp( - queues[amqp]['username'], - queues[amqp]['vhost']) + if is_clustered(): + relation_settings['clustered'] = 'true' + # NOTE(dosaboy): this stanza can be removed once we fully remove + # deprecated HA support. + if is_relation_made('ha'): + # active/passive settings + relation_settings['vip'] = config('vip') + # or ha-vip-only to support active/active, but + # accessed via a VIP for older clients. + if config('ha-vip-only') is True: + relation_settings['ha-vip-only'] = 'true' - relation_settings['hostname'] = \ - relation_settings['private-address'] = \ - rabbit.get_unit_ip() + # set if need HA queues or not + if cmp_pkgrevno('rabbitmq-server', '3.0.1') < 0: + relation_settings['ha_queues'] = True - ssl_utils.configure_client_ssl(relation_settings) + log("Updating relation {} keys {}" + .format(relation_id or get_relation_id(), + ','.join(relation_settings.keys())), DEBUG) + peer_store_and_set(relation_id=relation_id, + relation_settings=relation_settings) + elif not is_leader() and rabbit.client_node_is_ready(): + log("Propagating peer settings to all amqp relations", DEBUG) - if is_clustered(): - relation_settings['clustered'] = 'true' - if is_relation_made('ha'): - # active/passive settings - relation_settings['vip'] = config('vip') - # or ha-vip-only to support active/active, but - # accessed via a VIP for older clients. - if config('ha-vip-only') is True: - relation_settings['ha-vip-only'] = 'true' + # NOTE(jamespage) clear relation to deal with data being + # removed from peer storage. + relation_clear(relation_id) - # set if need HA queues or not - if cmp_pkgrevno('rabbitmq-server', '3.0.1') < 0: - relation_settings['ha_queues'] = True - peer_store_and_set(relation_id=relation_id, - relation_settings=relation_settings) + # Each unit needs to set the db information otherwise if the unit + # with the info dies the settings die with it Bug# 1355848 + for rel_id in relation_ids('amqp'): + peerdb_settings = peer_retrieve_by_prefix(rel_id) + if 'password' in peerdb_settings: + peerdb_settings['hostname'] = host_addr + peerdb_settings['private-address'] = host_addr + relation_set(relation_id=rel_id, **peerdb_settings) @hooks.hook('cluster-relation-joined') @@ -387,7 +458,7 @@ def ha_joined(): log('ha_joined: No ceph relation yet, deferring.') return - ctxt = {rabbit.ENV_CONF: rabbit.CONFIG_FILES[rabbit.ENV_CONF]} + ctxt = {rabbit.ENV_CONF: rabbit.CONFIG_FILES[rabbit.ENV_CONF]} rabbit.ConfigRenderer(ctxt).write(rabbit.ENV_CONF) relation_settings = {} diff --git a/tests/basic_deployment.py b/tests/basic_deployment.py index a94c9c60..73451927 100644 --- a/tests/basic_deployment.py +++ b/tests/basic_deployment.py @@ -633,6 +633,27 @@ class RmqBasicDeployment(OpenStackAmuletDeployment): u.log.info('OK\n') + def check_unit_rmq_cluster_nodes(self, sentry, unit_node_names): + unit_name = sentry.info['unit_name'] + nodes = [] + errors = [] + str_stat = u.get_rmq_cluster_status(sentry) + # make the interesting part of rabbitmqctl cluster_status output + # json-parseable. + if 'nodes,[{disc,' in str_stat: + pos_start = str_stat.find('nodes,[{disc,') + 13 + pos_end = str_stat.find(']}]},', pos_start) + 1 + str_nodes = str_stat[pos_start:pos_end].replace("'", '"') + nodes = json.loads(str_nodes) + for node in nodes: + if node not in unit_node_names: + errors.append('Cluster registration check failed on {}: ' + '{} should not be registered with RabbitMQ ' + 'after unit removal.\n' + ''.format(unit_name, node)) + + return errors + def test_901_remove_unit(self): """Test if a unit correctly cleans up by removing itself from the RabbitMQ cluster on removal""" @@ -655,22 +676,16 @@ class RmqBasicDeployment(OpenStackAmuletDeployment): errors = [] for sentry in sentry_units: - unit_name = sentry.info['unit_name'] - nodes = [] - str_stat = u.get_rmq_cluster_status(sentry) - # make the interesting part of rabbitmqctl cluster_status output - # json-parseable. - if 'nodes,[{disc,' in str_stat: - pos_start = str_stat.find('nodes,[{disc,') + 13 - pos_end = str_stat.find(']}]},', pos_start) + 1 - str_nodes = str_stat[pos_start:pos_end].replace("'", '"') - nodes = json.loads(str_nodes) - for node in nodes: - if node not in unit_node_names: - errors.append('Cluster registration check failed on {}: ' - '{} should not be registered with RabbitMQ ' - 'after unit removal.\n' - ''.format(unit_name, node)) + e = self.check_unit_rmq_cluster_nodes(sentry, unit_node_names) + if e: + # NOTE: cluster status may not have been updated yet so wait a + # little and try one more time. Need to find a better way to do + # this. + time.sleep(10) + e = self.check_unit_rmq_cluster_nodes(sentry, unit_node_names) + if e: + errors.append(e) + if errors: amulet.raise_status(amulet.FAIL, msg=errors) u.log.debug('OK') diff --git a/unit_tests/test_rabbitmq_server_relations.py b/unit_tests/test_rabbitmq_server_relations.py index 70acda9a..22d62fad 100644 --- a/unit_tests/test_rabbitmq_server_relations.py +++ b/unit_tests/test_rabbitmq_server_relations.py @@ -13,10 +13,14 @@ # limitations under the License. import os +import shutil import sys +import tempfile from test_utils import CharmTestCase -from mock import patch, MagicMock +from mock import patch, MagicMock, call + +from charmhelpers.core.unitdata import Storage os.environ['JUJU_UNIT_NAME'] = 'UNIT_TEST/0' # noqa - needed for import @@ -195,3 +199,82 @@ class RelationUtil(CharmTestCase): rabbitmq_server_relations.update_clients() mock_amqp_changed.assert_called_with(relation_id='amqp:0', remote_unit='client/0') + + @patch.object(rabbitmq_server_relations, 'is_leader') + @patch.object(rabbitmq_server_relations.rabbit, 'set_ha_mode') + @patch.object(rabbitmq_server_relations.rabbit, 'get_rabbit_password') + @patch.object(rabbitmq_server_relations.rabbit, 'create_vhost') + @patch.object(rabbitmq_server_relations.rabbit, 'create_user') + @patch.object(rabbitmq_server_relations.rabbit, 'grant_permissions') + @patch.object(rabbitmq_server_relations, 'config', lambda *args: True) + def test_configure_amqp(self, mock_grant_permissions, mock_create_vhost, + mock_create_user, mock_get_rabbit_password, + mock_set_ha_mode, mock_is_leader): + mock_is_leader.return_value = True + tmpdir = tempfile.mkdtemp() + try: + db_path = '{}/kv.db'.format(tmpdir) + rid = 'amqp:1' + store = Storage(db_path) + with patch('charmhelpers.core.unitdata._KV', store): + # Check .set + with patch.object(store, 'set') as mock_set: + rabbitmq_server_relations.configure_amqp('user_foo', + 'vhost_blah', rid) + + d = {rid: {"username": "user_foo", "vhost": "vhost_blah", + "mirroring-queues": True}} + mock_set.assert_has_calls([call(key='amqp_config_tracker', + value=d)]) + + for m in [mock_grant_permissions, mock_create_vhost, + mock_create_user, mock_set_ha_mode]: + self.assertTrue(m.called) + m.reset_mock() + + # Check .get + with patch.object(store, 'get') as mock_get: + mock_get.return_value = d + rabbitmq_server_relations.configure_amqp('user_foo', + 'vhost_blah', rid) + mock_set.assert_has_calls([call(key='amqp_config_tracker', + value=d)]) + for m in [mock_grant_permissions, mock_create_vhost, + mock_create_user, mock_set_ha_mode]: + self.assertFalse(m.called) + + # Check invalid relation id + self.assertRaises(Exception, + rabbitmq_server_relations.configure_amqp, + 'user_foo', 'vhost_blah', None, admin=True) + + # Test writing data + d = {} + for rid, user in [('amqp:1', 'userA'), ('amqp:2', 'userB')]: + rabbitmq_server_relations.configure_amqp(user, + 'vhost_blah', rid) + + d.update({rid: {"username": user, "vhost": "vhost_blah", + "mirroring-queues": True}}) + self.assertEqual(store.get('amqp_config_tracker'), d) + + @rabbitmq_server_relations.validate_amqp_config_tracker + def fake_configure_amqp(*args, **kwargs): + return rabbitmq_server_relations.configure_amqp(*args, + **kwargs) + + # Test invalidating data + mock_is_leader.return_value = False + d['amqp:2']['stale'] = True + for rid, user in [('amqp:1', 'userA'), ('amqp:3', 'userC')]: + fake_configure_amqp(user, 'vhost_blah', rid) + d[rid] = {"username": user, "vhost": "vhost_blah", + "mirroring-queues": True, 'stale': True} + # Since this is a dummy case we need to toggle the stale + # values. + del d[rid]['stale'] + self.assertEqual(store.get('amqp_config_tracker'), d) + d[rid]['stale'] = True + finally: + if os.path.exists(tmpdir): + shutil.rmtree(tmpdir)