diff --git a/hooks/rabbit_utils.py b/hooks/rabbit_utils.py index 7193c176..58199f79 100644 --- a/hooks/rabbit_utils.py +++ b/hooks/rabbit_utils.py @@ -47,6 +47,7 @@ from charmhelpers.contrib.network.ip import ( from charmhelpers.core.hookenv import ( relation_ids, related_units, + relations_for_id, log, ERROR, WARNING, INFO, DEBUG, @@ -333,7 +334,8 @@ def set_all_mirroring_queues(enable): def rabbitmqctl(action, *args): ''' Run rabbitmqctl with action and args. This function uses subprocess.check_call. For uses that need check_output - use a direct subproecess call + use a direct subprocess call or rabbitmqctl_normalized_output + function. ''' cmd = [] # wait will run for ever. Timeout in a reasonable amount of time @@ -346,6 +348,25 @@ def rabbitmqctl(action, *args): subprocess.check_call(cmd) +def rabbitmqctl_normalized_output(*args): + ''' Run rabbitmqctl with args. Normalize output by removing + whitespace and return it to caller for further processing. + ''' + cmd = [RABBITMQ_CTL] + cmd.extend(args) + out = subprocess.check_output(cmd, stderr=subprocess.STDOUT) + + # Output is in Erlang External Term Format (ETF). The amount of whitespace + # (including newlines in the middle of data structures) in the output + # depends on the data presented. ETF resembles JSON, but it is not. + # Writing our own parser is a bit out of scope, enabling management-plugin + # to use REST interface might be overkill at this stage. + # + # Removing whitespace will let our simple pattern matching work and is a + # compromise. + return out.translate(None, ' \t\n') + + def wait_app(): ''' Wait until rabbitmq has fully started ''' run_dir = '/var/run/rabbitmq/' @@ -431,15 +452,52 @@ def cluster_with(): return False -def break_cluster(): +def check_cluster_memberships(): + ''' Iterate over RabbitMQ node list, compare it to charm cluster + relationships, and forget about any nodes previously abruptly removed + from the cluster ''' + for rid in relation_ids('cluster'): + for node in nodes(): + if not any(rel.get('clustered', None) == node.split('@')[1] + for rel in relations_for_id(relid=rid)) and \ + node not in running_nodes(): + log("check_cluster_memberships(): '{}' in nodes but not in " + "charm relations or running_nodes, telling RabbitMQ to " + "forget about it.".format(node), level=DEBUG) + forget_cluster_node(node) + + +def forget_cluster_node(node): + ''' Remove previously departed node from cluster ''' + if cmp_pkgrevno('rabbitmq-server', '3.0.0') < 0: + log('rabbitmq-server version < 3.0.0, ' + 'forget_cluster_node not supported.', level=DEBUG) + return + try: + rabbitmqctl('forget_cluster_node', node) + except subprocess.CalledProcessError, e: + if e.returncode == 2: + log("Unable to remove node '{}' from cluster. It is either still " + "running or already removed. (Output: '{}')" + "".format(node, e.output), level=ERROR) + return + else: + raise + log("Removed previously departed node from cluster: '{}'." + "".format(node), level=INFO) + + +def leave_cluster(): + ''' Leave cluster gracefully ''' try: rabbitmqctl('stop_app') rabbitmqctl('reset') start_app() - log('Cluster successfully broken.') + log('Successfully left cluster gracefully.') except: # error, no nodes available for clustering - log('Error breaking rabbit cluster', level=ERROR) + log('Cannot leave cluster, we might be the last disc-node in the ' + 'cluster.', level=ERROR) raise @@ -681,18 +739,27 @@ def services(): return list(set(_services)) +@cached +def nodes(get_running=False): + ''' Get list of nodes registered in the RabbitMQ cluster ''' + out = rabbitmqctl_normalized_output('cluster_status') + cluster_status = {} + for m in re.finditer("{([^,]+),(?!\[{)\[([^\]]*)", out): + state = m.group(1) + items = m.group(2).split(',') + items = [x.replace("'", '').strip() for x in items] + cluster_status.update({state: items}) + + if get_running: + return cluster_status.get('running_nodes', []) + + return cluster_status.get('disc', []) + cluster_status.get('ram', []) + + @cached def running_nodes(): - ''' Determine the current set of running rabbitmq-units in the cluster ''' - out = subprocess.check_output([RABBITMQ_CTL, 'cluster_status']) - - running_nodes = [] - m = re.search("\{running_nodes,\[(.*?)\]\}", out.strip(), re.DOTALL) - if m is not None: - running_nodes = m.group(1).split(',') - running_nodes = [x.replace("'", '').strip() for x in running_nodes] - - return running_nodes + ''' Determine the current set of running nodes in the RabbitMQ cluster ''' + return nodes(get_running=True) @cached diff --git a/hooks/rabbitmq_server_relations.py b/hooks/rabbitmq_server_relations.py index ea826a91..eed5e162 100755 --- a/hooks/rabbitmq_server_relations.py +++ b/hooks/rabbitmq_server_relations.py @@ -311,6 +311,32 @@ def cluster_changed(relation_id=None, remote_unit=None): update_nrpe_checks() +@hooks.hook('stop') +def stop(): + """Gracefully remove ourself from RabbitMQ cluster before unit is removed + + If RabbitMQ have objections to node removal, for example because of this + being the only disc node to leave the cluster, the operation will fail and + unit removal will be blocked with error for operator to investigate. + + In the event of a unit being forcefully or abrubtly removed from the + cluster without a chance to remove itself, it will be left behind as a + stopped node in the RabbitMQ cluster. Having a dormant no longer existing + stopped node lying around will cause trouble in the event that all RabbitMQ + nodes are shut down. In such a situation the cluster most likely will not + start again without operator intervention as RabbitMQ will want to + interrogate the now non-existing stopped node about any queue it thinks it + would be most likely to have authoritative knowledge about. + + For this reason any abruptly removed nodes will be cleaned up periodically + by the leader unit during its update-status hook run. + + This call is placed in stop hook and not in the cluster-relation-departed + hook because the latter is not called on the unit being removed. + """ + rabbit.leave_cluster() + + def update_cookie(leaders_cookie=None): # sync cookie if leaders_cookie: @@ -666,6 +692,21 @@ def pre_install_hooks(): def update_status(): log('Updating status.') + # leader check for previously unsuccessful cluster departures + # + # This must be done here and not in the cluster-relation-departed hook. At + # the point in time the cluster-relation-departed hook is called we know + # that a unit is departing. We also know that RabbitMQ will not have + # noticed its departure yet. We cannot remove a node pre-emptively. + # + # In the normal case the departing node should remove itself from the + # cluster in its stop hook. We clean up the ones that for whatever reason + # are unable to clean up after themselves successfully here. + # + # Have a look at the docstring of the stop() function for detailed + # explanation. + if is_leader(): + rabbit.check_cluster_memberships() if __name__ == '__main__': try: diff --git a/tests/basic_deployment.py b/tests/basic_deployment.py index 91009574..54545ff3 100644 --- a/tests/basic_deployment.py +++ b/tests/basic_deployment.py @@ -265,6 +265,7 @@ class RmqBasicDeployment(OpenStackAmuletDeployment): u.log.debug('Checking system services on units...') # Beam and epmd sometimes briefly have more than one PID, + # Process is named 'beam' with 1 cpu; 'beam.smp' for >1 cpu. # True checks for at least 1. rmq_processes = { 'beam.smp': True, @@ -602,6 +603,48 @@ class RmqBasicDeployment(OpenStackAmuletDeployment): u.log.info('OK\n') + def test_901_remove_unit(self): + """Test if a unit correctly cleans up by removing itself from the + RabbitMQ cluster on removal""" + u.log.debug('Checking that units correctly clean up after themselves ' + 'on unit removal...') + configs = {'rabbitmq-server': {'min-cluster-size': '2'}} + super(RmqBasicDeployment, self)._configure_services(configs) + self.d.sentry.wait(timeout=900) + u.rmq_wait_for_cluster(self) + + self.d.remove_unit(self.rmq2_sentry.info['unit_name']) + self.d.sentry.wait(timeout=900) + u.rmq_wait_for_cluster(self) + + sentry_units = self._get_rmq_sentry_units()[:-1] + unit_host_names = u.get_unit_hostnames(sentry_units) + unit_node_names = [] + for unit in unit_host_names: + unit_node_names.append('rabbit@{}'.format(unit_host_names[unit])) + errors = [] + + for sentry in sentry_units: + unit_name = sentry.info['unit_name'] + nodes = [] + str_stat = u.get_rmq_cluster_status(sentry) + # make the interesting part of rabbitmqctl cluster_status output + # json-parseable. + if 'nodes,[{disc,' in str_stat: + pos_start = str_stat.find('nodes,[{disc,') + 13 + pos_end = str_stat.find(']}]},', pos_start) + 1 + str_nodes = str_stat[pos_start:pos_end].replace("'", '"') + nodes = json.loads(str_nodes) + for node in nodes: + if node not in unit_node_names: + errors.append('Cluster registration check failed on {}: ' + '{} should not be registered with RabbitMQ ' + 'after unit removal.\n' + ''.format(unit_name, node)) + if errors: + amulet.raise_status(amulet.FAIL, msg=errors) + u.log.debug('OK') + def test_910_pause_and_resume(self): """The services can be paused and resumed. """ u.log.debug('Checking pause and resume actions...') diff --git a/unit_tests/test_rabbit_utils.py b/unit_tests/test_rabbit_utils.py index 1ceea2c3..e4c3fde8 100644 --- a/unit_tests/test_rabbit_utils.py +++ b/unit_tests/test_rabbit_utils.py @@ -12,11 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import mock import os -import tempfile +import subprocess import sys -import collections +import tempfile + from functools import wraps from test_utils import CharmTestCase @@ -85,10 +87,13 @@ class ConfigRendererTests(CharmTestCase): RABBITMQCTL_CLUSTERSTATUS_RUNNING = """Cluster status of node 'rabbit@juju-devel3-machine-19' ... -[{nodes,[{disc,['rabbit@juju-devel3-machine-14', - 'rabbit@juju-devel3-machine-19']}]}, - {running_nodes,['rabbit@juju-devel3-machine-14', - 'rabbit@juju-devel3-machine-19']}, +[{nodes, + [{disc, + ['rabbit@juju-devel3-machine-14','rabbit@juju-devel3-machine-19']}, + {ram, + ['rabbit@juju-devel3-machine-42']}]}, + {running_nodes, + ['rabbit@juju-devel3-machine-14','rabbit@juju-devel3-machine-19']}, {cluster_name,<<"rabbit@juju-devel3-machine-14.openstacklocal">>}, {partitions,[]}] """ @@ -175,6 +180,16 @@ class UtilsTests(CharmTestCase): mock_running_nodes.return_value = ['a', 'b'] self.assertTrue(rabbit_utils.clustered()) + @mock.patch('rabbit_utils.subprocess') + def test_nodes(self, mock_subprocess): + '''Ensure cluster_status can be parsed for a clustered deployment''' + mock_subprocess.check_output.return_value = \ + RABBITMQCTL_CLUSTERSTATUS_RUNNING + self.assertEqual(rabbit_utils.nodes(), + ['rabbit@juju-devel3-machine-14', + 'rabbit@juju-devel3-machine-19', + 'rabbit@juju-devel3-machine-42']) + @mock.patch('rabbit_utils.subprocess') def test_running_nodes(self, mock_subprocess): '''Ensure cluster_status can be parsed for a clustered deployment''' @@ -184,6 +199,14 @@ class UtilsTests(CharmTestCase): ['rabbit@juju-devel3-machine-14', 'rabbit@juju-devel3-machine-19']) + @mock.patch('rabbit_utils.subprocess') + def test_nodes_solo(self, mock_subprocess): + '''Ensure cluster_status can be parsed for a single unit deployment''' + mock_subprocess.check_output.return_value = \ + RABBITMQCTL_CLUSTERSTATUS_SOLO + self.assertEqual(rabbit_utils.nodes(), + ['rabbit@juju-devel3-machine-14']) + @mock.patch('rabbit_utils.subprocess') def test_running_nodes_solo(self, mock_subprocess): '''Ensure cluster_status can be parsed for a single unit deployment''' @@ -581,3 +604,56 @@ class UtilsTests(CharmTestCase): def test_get_managment_port(self, mock_get_upstream_version): mock_get_upstream_version.return_value = '3.5.7' self.assertEqual(rabbit_utils.get_managment_port(), 15672) + + @mock.patch('rabbit_utils.rabbitmqctl') + @mock.patch('rabbit_utils.cmp_pkgrevno') + def test_forget_cluster_node_old_rabbitmq(self, mock_cmp_pkgrevno, + mock_rabbitmqctl): + mock_cmp_pkgrevno.return_value = -1 + rabbit_utils.forget_cluster_node('a') + self.assertFalse(mock_rabbitmqctl.called) + + @mock.patch('rabbit_utils.log') + @mock.patch('subprocess.check_call') + @mock.patch('rabbit_utils.cmp_pkgrevno') + def test_forget_cluster_node_subprocess_fails(self, mock_cmp_pkgrevno, + mock_check_call, + mock_log): + mock_cmp_pkgrevno.return_value = 0 + + def raise_error(x): + raise subprocess.CalledProcessError(2, x) + mock_check_call.side_effect = raise_error + + rabbit_utils.forget_cluster_node('a') + mock_log.assert_called_with("Unable to remove node 'a' from cluster. " + "It is either still running or already " + "removed. (Output: 'None')", level='ERROR') + + @mock.patch('rabbit_utils.rabbitmqctl') + @mock.patch('rabbit_utils.cmp_pkgrevno') + def test_forget_cluster_node(self, mock_cmp_pkgrevno, mock_rabbitmqctl): + mock_cmp_pkgrevno.return_value = 1 + rabbit_utils.forget_cluster_node('a') + mock_rabbitmqctl.assert_called_with('forget_cluster_node', 'a') + + @mock.patch('rabbit_utils.forget_cluster_node') + @mock.patch('rabbit_utils.relations_for_id') + @mock.patch('rabbit_utils.subprocess') + @mock.patch('rabbit_utils.relation_ids') + def test_check_cluster_memberships(self, mock_relation_ids, + mock_subprocess, + mock_relations_for_id, + mock_forget_cluster_node): + mock_relation_ids.return_value = [0] + mock_subprocess.check_output.return_value = \ + RABBITMQCTL_CLUSTERSTATUS_RUNNING + mock_relations_for_id.return_value = [ + {'clustered': 'juju-devel3-machine-14'}, + {'clustered': 'juju-devel3-machine-19'}, + {'dummy-entry': 'to validate behaviour on relations without ' + 'clustered key in dict'}, + ] + rabbit_utils.check_cluster_memberships() + mock_forget_cluster_node.assert_called_with( + 'rabbit@juju-devel3-machine-42')