Leave RabbitMQ cluster gracefully on unit removal

Make leader do periodic check for and forget nodes on abrubtly
removed units in update-status hook. (See detailed explanation in
stop function docstring)

Add function to get list of all nodes registered with
RabbitMQ.  Function has modifier to limit list to nodes
currently running.

Change existing running_nodes() function to call new function
with modifier.

Update amulet test for beam process name with multiple CPUs. The
test infrastructure now presents test instances with more than one
CPU core.

Change-Id: I7eacf9839cd69539d82a76b1ea023e29ba1f5df9
Closes-Bug: #1679449
This commit is contained in:
Frode Nordahl 2017-04-20 11:48:01 +02:00
parent 8de2bae310
commit 08b10513c5
4 changed files with 248 additions and 21 deletions

View File

@ -47,6 +47,7 @@ from charmhelpers.contrib.network.ip import (
from charmhelpers.core.hookenv import (
relation_ids,
related_units,
relations_for_id,
log, ERROR,
WARNING,
INFO, DEBUG,
@ -333,7 +334,8 @@ def set_all_mirroring_queues(enable):
def rabbitmqctl(action, *args):
''' Run rabbitmqctl with action and args. This function uses
subprocess.check_call. For uses that need check_output
use a direct subproecess call
use a direct subprocess call or rabbitmqctl_normalized_output
function.
'''
cmd = []
# wait will run for ever. Timeout in a reasonable amount of time
@ -346,6 +348,25 @@ def rabbitmqctl(action, *args):
subprocess.check_call(cmd)
def rabbitmqctl_normalized_output(*args):
''' Run rabbitmqctl with args. Normalize output by removing
whitespace and return it to caller for further processing.
'''
cmd = [RABBITMQ_CTL]
cmd.extend(args)
out = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
# Output is in Erlang External Term Format (ETF). The amount of whitespace
# (including newlines in the middle of data structures) in the output
# depends on the data presented. ETF resembles JSON, but it is not.
# Writing our own parser is a bit out of scope, enabling management-plugin
# to use REST interface might be overkill at this stage.
#
# Removing whitespace will let our simple pattern matching work and is a
# compromise.
return out.translate(None, ' \t\n')
def wait_app():
''' Wait until rabbitmq has fully started '''
run_dir = '/var/run/rabbitmq/'
@ -431,15 +452,52 @@ def cluster_with():
return False
def break_cluster():
def check_cluster_memberships():
''' Iterate over RabbitMQ node list, compare it to charm cluster
relationships, and forget about any nodes previously abruptly removed
from the cluster '''
for rid in relation_ids('cluster'):
for node in nodes():
if not any(rel.get('clustered', None) == node.split('@')[1]
for rel in relations_for_id(relid=rid)) and \
node not in running_nodes():
log("check_cluster_memberships(): '{}' in nodes but not in "
"charm relations or running_nodes, telling RabbitMQ to "
"forget about it.".format(node), level=DEBUG)
forget_cluster_node(node)
def forget_cluster_node(node):
''' Remove previously departed node from cluster '''
if cmp_pkgrevno('rabbitmq-server', '3.0.0') < 0:
log('rabbitmq-server version < 3.0.0, '
'forget_cluster_node not supported.', level=DEBUG)
return
try:
rabbitmqctl('forget_cluster_node', node)
except subprocess.CalledProcessError, e:
if e.returncode == 2:
log("Unable to remove node '{}' from cluster. It is either still "
"running or already removed. (Output: '{}')"
"".format(node, e.output), level=ERROR)
return
else:
raise
log("Removed previously departed node from cluster: '{}'."
"".format(node), level=INFO)
def leave_cluster():
''' Leave cluster gracefully '''
try:
rabbitmqctl('stop_app')
rabbitmqctl('reset')
start_app()
log('Cluster successfully broken.')
log('Successfully left cluster gracefully.')
except:
# error, no nodes available for clustering
log('Error breaking rabbit cluster', level=ERROR)
log('Cannot leave cluster, we might be the last disc-node in the '
'cluster.', level=ERROR)
raise
@ -681,18 +739,27 @@ def services():
return list(set(_services))
@cached
def nodes(get_running=False):
''' Get list of nodes registered in the RabbitMQ cluster '''
out = rabbitmqctl_normalized_output('cluster_status')
cluster_status = {}
for m in re.finditer("{([^,]+),(?!\[{)\[([^\]]*)", out):
state = m.group(1)
items = m.group(2).split(',')
items = [x.replace("'", '').strip() for x in items]
cluster_status.update({state: items})
if get_running:
return cluster_status.get('running_nodes', [])
return cluster_status.get('disc', []) + cluster_status.get('ram', [])
@cached
def running_nodes():
''' Determine the current set of running rabbitmq-units in the cluster '''
out = subprocess.check_output([RABBITMQ_CTL, 'cluster_status'])
running_nodes = []
m = re.search("\{running_nodes,\[(.*?)\]\}", out.strip(), re.DOTALL)
if m is not None:
running_nodes = m.group(1).split(',')
running_nodes = [x.replace("'", '').strip() for x in running_nodes]
return running_nodes
''' Determine the current set of running nodes in the RabbitMQ cluster '''
return nodes(get_running=True)
@cached

View File

@ -311,6 +311,32 @@ def cluster_changed(relation_id=None, remote_unit=None):
update_nrpe_checks()
@hooks.hook('stop')
def stop():
"""Gracefully remove ourself from RabbitMQ cluster before unit is removed
If RabbitMQ have objections to node removal, for example because of this
being the only disc node to leave the cluster, the operation will fail and
unit removal will be blocked with error for operator to investigate.
In the event of a unit being forcefully or abrubtly removed from the
cluster without a chance to remove itself, it will be left behind as a
stopped node in the RabbitMQ cluster. Having a dormant no longer existing
stopped node lying around will cause trouble in the event that all RabbitMQ
nodes are shut down. In such a situation the cluster most likely will not
start again without operator intervention as RabbitMQ will want to
interrogate the now non-existing stopped node about any queue it thinks it
would be most likely to have authoritative knowledge about.
For this reason any abruptly removed nodes will be cleaned up periodically
by the leader unit during its update-status hook run.
This call is placed in stop hook and not in the cluster-relation-departed
hook because the latter is not called on the unit being removed.
"""
rabbit.leave_cluster()
def update_cookie(leaders_cookie=None):
# sync cookie
if leaders_cookie:
@ -666,6 +692,21 @@ def pre_install_hooks():
def update_status():
log('Updating status.')
# leader check for previously unsuccessful cluster departures
#
# This must be done here and not in the cluster-relation-departed hook. At
# the point in time the cluster-relation-departed hook is called we know
# that a unit is departing. We also know that RabbitMQ will not have
# noticed its departure yet. We cannot remove a node pre-emptively.
#
# In the normal case the departing node should remove itself from the
# cluster in its stop hook. We clean up the ones that for whatever reason
# are unable to clean up after themselves successfully here.
#
# Have a look at the docstring of the stop() function for detailed
# explanation.
if is_leader():
rabbit.check_cluster_memberships()
if __name__ == '__main__':
try:

View File

@ -265,9 +265,10 @@ class RmqBasicDeployment(OpenStackAmuletDeployment):
u.log.debug('Checking system services on units...')
# Beam and epmd sometimes briefly have more than one PID,
# Process is named 'beam' with 1 cpu; 'beam.smp' for >1 cpu.
# True checks for at least 1.
rmq_processes = {
'beam': True,
'beam.smp': True,
'epmd': True,
}
@ -602,6 +603,48 @@ class RmqBasicDeployment(OpenStackAmuletDeployment):
u.log.info('OK\n')
def test_901_remove_unit(self):
"""Test if a unit correctly cleans up by removing itself from the
RabbitMQ cluster on removal"""
u.log.debug('Checking that units correctly clean up after themselves '
'on unit removal...')
configs = {'rabbitmq-server': {'min-cluster-size': '2'}}
super(RmqBasicDeployment, self)._configure_services(configs)
self.d.sentry.wait(timeout=900)
u.rmq_wait_for_cluster(self)
self.d.remove_unit(self.rmq2_sentry.info['unit_name'])
self.d.sentry.wait(timeout=900)
u.rmq_wait_for_cluster(self)
sentry_units = self._get_rmq_sentry_units()[:-1]
unit_host_names = u.get_unit_hostnames(sentry_units)
unit_node_names = []
for unit in unit_host_names:
unit_node_names.append('rabbit@{}'.format(unit_host_names[unit]))
errors = []
for sentry in sentry_units:
unit_name = sentry.info['unit_name']
nodes = []
str_stat = u.get_rmq_cluster_status(sentry)
# make the interesting part of rabbitmqctl cluster_status output
# json-parseable.
if 'nodes,[{disc,' in str_stat:
pos_start = str_stat.find('nodes,[{disc,') + 13
pos_end = str_stat.find(']}]},', pos_start) + 1
str_nodes = str_stat[pos_start:pos_end].replace("'", '"')
nodes = json.loads(str_nodes)
for node in nodes:
if node not in unit_node_names:
errors.append('Cluster registration check failed on {}: '
'{} should not be registered with RabbitMQ '
'after unit removal.\n'
''.format(unit_name, node))
if errors:
amulet.raise_status(amulet.FAIL, msg=errors)
u.log.debug('OK')
def test_910_pause_and_resume(self):
"""The services can be paused and resumed. """
u.log.debug('Checking pause and resume actions...')

View File

@ -12,11 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import mock
import os
import tempfile
import subprocess
import sys
import collections
import tempfile
from functools import wraps
from test_utils import CharmTestCase
@ -85,10 +87,13 @@ class ConfigRendererTests(CharmTestCase):
RABBITMQCTL_CLUSTERSTATUS_RUNNING = """Cluster status of node 'rabbit@juju-devel3-machine-19' ...
[{nodes,[{disc,['rabbit@juju-devel3-machine-14',
'rabbit@juju-devel3-machine-19']}]},
{running_nodes,['rabbit@juju-devel3-machine-14',
'rabbit@juju-devel3-machine-19']},
[{nodes,
[{disc,
['rabbit@juju-devel3-machine-14','rabbit@juju-devel3-machine-19']},
{ram,
['rabbit@juju-devel3-machine-42']}]},
{running_nodes,
['rabbit@juju-devel3-machine-14','rabbit@juju-devel3-machine-19']},
{cluster_name,<<"rabbit@juju-devel3-machine-14.openstacklocal">>},
{partitions,[]}]
"""
@ -175,6 +180,16 @@ class UtilsTests(CharmTestCase):
mock_running_nodes.return_value = ['a', 'b']
self.assertTrue(rabbit_utils.clustered())
@mock.patch('rabbit_utils.subprocess')
def test_nodes(self, mock_subprocess):
'''Ensure cluster_status can be parsed for a clustered deployment'''
mock_subprocess.check_output.return_value = \
RABBITMQCTL_CLUSTERSTATUS_RUNNING
self.assertEqual(rabbit_utils.nodes(),
['rabbit@juju-devel3-machine-14',
'rabbit@juju-devel3-machine-19',
'rabbit@juju-devel3-machine-42'])
@mock.patch('rabbit_utils.subprocess')
def test_running_nodes(self, mock_subprocess):
'''Ensure cluster_status can be parsed for a clustered deployment'''
@ -184,6 +199,14 @@ class UtilsTests(CharmTestCase):
['rabbit@juju-devel3-machine-14',
'rabbit@juju-devel3-machine-19'])
@mock.patch('rabbit_utils.subprocess')
def test_nodes_solo(self, mock_subprocess):
'''Ensure cluster_status can be parsed for a single unit deployment'''
mock_subprocess.check_output.return_value = \
RABBITMQCTL_CLUSTERSTATUS_SOLO
self.assertEqual(rabbit_utils.nodes(),
['rabbit@juju-devel3-machine-14'])
@mock.patch('rabbit_utils.subprocess')
def test_running_nodes_solo(self, mock_subprocess):
'''Ensure cluster_status can be parsed for a single unit deployment'''
@ -581,3 +604,56 @@ class UtilsTests(CharmTestCase):
def test_get_managment_port(self, mock_get_upstream_version):
mock_get_upstream_version.return_value = '3.5.7'
self.assertEqual(rabbit_utils.get_managment_port(), 15672)
@mock.patch('rabbit_utils.rabbitmqctl')
@mock.patch('rabbit_utils.cmp_pkgrevno')
def test_forget_cluster_node_old_rabbitmq(self, mock_cmp_pkgrevno,
mock_rabbitmqctl):
mock_cmp_pkgrevno.return_value = -1
rabbit_utils.forget_cluster_node('a')
self.assertFalse(mock_rabbitmqctl.called)
@mock.patch('rabbit_utils.log')
@mock.patch('subprocess.check_call')
@mock.patch('rabbit_utils.cmp_pkgrevno')
def test_forget_cluster_node_subprocess_fails(self, mock_cmp_pkgrevno,
mock_check_call,
mock_log):
mock_cmp_pkgrevno.return_value = 0
def raise_error(x):
raise subprocess.CalledProcessError(2, x)
mock_check_call.side_effect = raise_error
rabbit_utils.forget_cluster_node('a')
mock_log.assert_called_with("Unable to remove node 'a' from cluster. "
"It is either still running or already "
"removed. (Output: 'None')", level='ERROR')
@mock.patch('rabbit_utils.rabbitmqctl')
@mock.patch('rabbit_utils.cmp_pkgrevno')
def test_forget_cluster_node(self, mock_cmp_pkgrevno, mock_rabbitmqctl):
mock_cmp_pkgrevno.return_value = 1
rabbit_utils.forget_cluster_node('a')
mock_rabbitmqctl.assert_called_with('forget_cluster_node', 'a')
@mock.patch('rabbit_utils.forget_cluster_node')
@mock.patch('rabbit_utils.relations_for_id')
@mock.patch('rabbit_utils.subprocess')
@mock.patch('rabbit_utils.relation_ids')
def test_check_cluster_memberships(self, mock_relation_ids,
mock_subprocess,
mock_relations_for_id,
mock_forget_cluster_node):
mock_relation_ids.return_value = [0]
mock_subprocess.check_output.return_value = \
RABBITMQCTL_CLUSTERSTATUS_RUNNING
mock_relations_for_id.return_value = [
{'clustered': 'juju-devel3-machine-14'},
{'clustered': 'juju-devel3-machine-19'},
{'dummy-entry': 'to validate behaviour on relations without '
'clustered key in dict'},
]
rabbit_utils.check_cluster_memberships()
mock_forget_cluster_node.assert_called_with(
'rabbit@juju-devel3-machine-42')