Merge "Leave RabbitMQ cluster gracefully on unit removal"
This commit is contained in:
commit
9fc021ca29
|
@ -47,6 +47,7 @@ from charmhelpers.contrib.network.ip import (
|
|||
from charmhelpers.core.hookenv import (
|
||||
relation_ids,
|
||||
related_units,
|
||||
relations_for_id,
|
||||
log, ERROR,
|
||||
WARNING,
|
||||
INFO, DEBUG,
|
||||
|
@ -333,7 +334,8 @@ def set_all_mirroring_queues(enable):
|
|||
def rabbitmqctl(action, *args):
|
||||
''' Run rabbitmqctl with action and args. This function uses
|
||||
subprocess.check_call. For uses that need check_output
|
||||
use a direct subproecess call
|
||||
use a direct subprocess call or rabbitmqctl_normalized_output
|
||||
function.
|
||||
'''
|
||||
cmd = []
|
||||
# wait will run for ever. Timeout in a reasonable amount of time
|
||||
|
@ -346,6 +348,25 @@ def rabbitmqctl(action, *args):
|
|||
subprocess.check_call(cmd)
|
||||
|
||||
|
||||
def rabbitmqctl_normalized_output(*args):
|
||||
''' Run rabbitmqctl with args. Normalize output by removing
|
||||
whitespace and return it to caller for further processing.
|
||||
'''
|
||||
cmd = [RABBITMQ_CTL]
|
||||
cmd.extend(args)
|
||||
out = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
||||
|
||||
# Output is in Erlang External Term Format (ETF). The amount of whitespace
|
||||
# (including newlines in the middle of data structures) in the output
|
||||
# depends on the data presented. ETF resembles JSON, but it is not.
|
||||
# Writing our own parser is a bit out of scope, enabling management-plugin
|
||||
# to use REST interface might be overkill at this stage.
|
||||
#
|
||||
# Removing whitespace will let our simple pattern matching work and is a
|
||||
# compromise.
|
||||
return out.translate(None, ' \t\n')
|
||||
|
||||
|
||||
def wait_app():
|
||||
''' Wait until rabbitmq has fully started '''
|
||||
run_dir = '/var/run/rabbitmq/'
|
||||
|
@ -431,15 +452,52 @@ def cluster_with():
|
|||
return False
|
||||
|
||||
|
||||
def break_cluster():
|
||||
def check_cluster_memberships():
|
||||
''' Iterate over RabbitMQ node list, compare it to charm cluster
|
||||
relationships, and forget about any nodes previously abruptly removed
|
||||
from the cluster '''
|
||||
for rid in relation_ids('cluster'):
|
||||
for node in nodes():
|
||||
if not any(rel.get('clustered', None) == node.split('@')[1]
|
||||
for rel in relations_for_id(relid=rid)) and \
|
||||
node not in running_nodes():
|
||||
log("check_cluster_memberships(): '{}' in nodes but not in "
|
||||
"charm relations or running_nodes, telling RabbitMQ to "
|
||||
"forget about it.".format(node), level=DEBUG)
|
||||
forget_cluster_node(node)
|
||||
|
||||
|
||||
def forget_cluster_node(node):
|
||||
''' Remove previously departed node from cluster '''
|
||||
if cmp_pkgrevno('rabbitmq-server', '3.0.0') < 0:
|
||||
log('rabbitmq-server version < 3.0.0, '
|
||||
'forget_cluster_node not supported.', level=DEBUG)
|
||||
return
|
||||
try:
|
||||
rabbitmqctl('forget_cluster_node', node)
|
||||
except subprocess.CalledProcessError, e:
|
||||
if e.returncode == 2:
|
||||
log("Unable to remove node '{}' from cluster. It is either still "
|
||||
"running or already removed. (Output: '{}')"
|
||||
"".format(node, e.output), level=ERROR)
|
||||
return
|
||||
else:
|
||||
raise
|
||||
log("Removed previously departed node from cluster: '{}'."
|
||||
"".format(node), level=INFO)
|
||||
|
||||
|
||||
def leave_cluster():
|
||||
''' Leave cluster gracefully '''
|
||||
try:
|
||||
rabbitmqctl('stop_app')
|
||||
rabbitmqctl('reset')
|
||||
start_app()
|
||||
log('Cluster successfully broken.')
|
||||
log('Successfully left cluster gracefully.')
|
||||
except:
|
||||
# error, no nodes available for clustering
|
||||
log('Error breaking rabbit cluster', level=ERROR)
|
||||
log('Cannot leave cluster, we might be the last disc-node in the '
|
||||
'cluster.', level=ERROR)
|
||||
raise
|
||||
|
||||
|
||||
|
@ -681,18 +739,27 @@ def services():
|
|||
return list(set(_services))
|
||||
|
||||
|
||||
@cached
|
||||
def nodes(get_running=False):
|
||||
''' Get list of nodes registered in the RabbitMQ cluster '''
|
||||
out = rabbitmqctl_normalized_output('cluster_status')
|
||||
cluster_status = {}
|
||||
for m in re.finditer("{([^,]+),(?!\[{)\[([^\]]*)", out):
|
||||
state = m.group(1)
|
||||
items = m.group(2).split(',')
|
||||
items = [x.replace("'", '').strip() for x in items]
|
||||
cluster_status.update({state: items})
|
||||
|
||||
if get_running:
|
||||
return cluster_status.get('running_nodes', [])
|
||||
|
||||
return cluster_status.get('disc', []) + cluster_status.get('ram', [])
|
||||
|
||||
|
||||
@cached
|
||||
def running_nodes():
|
||||
''' Determine the current set of running rabbitmq-units in the cluster '''
|
||||
out = subprocess.check_output([RABBITMQ_CTL, 'cluster_status'])
|
||||
|
||||
running_nodes = []
|
||||
m = re.search("\{running_nodes,\[(.*?)\]\}", out.strip(), re.DOTALL)
|
||||
if m is not None:
|
||||
running_nodes = m.group(1).split(',')
|
||||
running_nodes = [x.replace("'", '').strip() for x in running_nodes]
|
||||
|
||||
return running_nodes
|
||||
''' Determine the current set of running nodes in the RabbitMQ cluster '''
|
||||
return nodes(get_running=True)
|
||||
|
||||
|
||||
@cached
|
||||
|
|
|
@ -311,6 +311,32 @@ def cluster_changed(relation_id=None, remote_unit=None):
|
|||
update_nrpe_checks()
|
||||
|
||||
|
||||
@hooks.hook('stop')
|
||||
def stop():
|
||||
"""Gracefully remove ourself from RabbitMQ cluster before unit is removed
|
||||
|
||||
If RabbitMQ have objections to node removal, for example because of this
|
||||
being the only disc node to leave the cluster, the operation will fail and
|
||||
unit removal will be blocked with error for operator to investigate.
|
||||
|
||||
In the event of a unit being forcefully or abrubtly removed from the
|
||||
cluster without a chance to remove itself, it will be left behind as a
|
||||
stopped node in the RabbitMQ cluster. Having a dormant no longer existing
|
||||
stopped node lying around will cause trouble in the event that all RabbitMQ
|
||||
nodes are shut down. In such a situation the cluster most likely will not
|
||||
start again without operator intervention as RabbitMQ will want to
|
||||
interrogate the now non-existing stopped node about any queue it thinks it
|
||||
would be most likely to have authoritative knowledge about.
|
||||
|
||||
For this reason any abruptly removed nodes will be cleaned up periodically
|
||||
by the leader unit during its update-status hook run.
|
||||
|
||||
This call is placed in stop hook and not in the cluster-relation-departed
|
||||
hook because the latter is not called on the unit being removed.
|
||||
"""
|
||||
rabbit.leave_cluster()
|
||||
|
||||
|
||||
def update_cookie(leaders_cookie=None):
|
||||
# sync cookie
|
||||
if leaders_cookie:
|
||||
|
@ -666,6 +692,21 @@ def pre_install_hooks():
|
|||
def update_status():
|
||||
log('Updating status.')
|
||||
|
||||
# leader check for previously unsuccessful cluster departures
|
||||
#
|
||||
# This must be done here and not in the cluster-relation-departed hook. At
|
||||
# the point in time the cluster-relation-departed hook is called we know
|
||||
# that a unit is departing. We also know that RabbitMQ will not have
|
||||
# noticed its departure yet. We cannot remove a node pre-emptively.
|
||||
#
|
||||
# In the normal case the departing node should remove itself from the
|
||||
# cluster in its stop hook. We clean up the ones that for whatever reason
|
||||
# are unable to clean up after themselves successfully here.
|
||||
#
|
||||
# Have a look at the docstring of the stop() function for detailed
|
||||
# explanation.
|
||||
if is_leader():
|
||||
rabbit.check_cluster_memberships()
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
|
|
|
@ -265,6 +265,7 @@ class RmqBasicDeployment(OpenStackAmuletDeployment):
|
|||
u.log.debug('Checking system services on units...')
|
||||
|
||||
# Beam and epmd sometimes briefly have more than one PID,
|
||||
# Process is named 'beam' with 1 cpu; 'beam.smp' for >1 cpu.
|
||||
# True checks for at least 1.
|
||||
rmq_processes = {
|
||||
'beam.smp': True,
|
||||
|
@ -602,6 +603,48 @@ class RmqBasicDeployment(OpenStackAmuletDeployment):
|
|||
|
||||
u.log.info('OK\n')
|
||||
|
||||
def test_901_remove_unit(self):
|
||||
"""Test if a unit correctly cleans up by removing itself from the
|
||||
RabbitMQ cluster on removal"""
|
||||
u.log.debug('Checking that units correctly clean up after themselves '
|
||||
'on unit removal...')
|
||||
configs = {'rabbitmq-server': {'min-cluster-size': '2'}}
|
||||
super(RmqBasicDeployment, self)._configure_services(configs)
|
||||
self.d.sentry.wait(timeout=900)
|
||||
u.rmq_wait_for_cluster(self)
|
||||
|
||||
self.d.remove_unit(self.rmq2_sentry.info['unit_name'])
|
||||
self.d.sentry.wait(timeout=900)
|
||||
u.rmq_wait_for_cluster(self)
|
||||
|
||||
sentry_units = self._get_rmq_sentry_units()[:-1]
|
||||
unit_host_names = u.get_unit_hostnames(sentry_units)
|
||||
unit_node_names = []
|
||||
for unit in unit_host_names:
|
||||
unit_node_names.append('rabbit@{}'.format(unit_host_names[unit]))
|
||||
errors = []
|
||||
|
||||
for sentry in sentry_units:
|
||||
unit_name = sentry.info['unit_name']
|
||||
nodes = []
|
||||
str_stat = u.get_rmq_cluster_status(sentry)
|
||||
# make the interesting part of rabbitmqctl cluster_status output
|
||||
# json-parseable.
|
||||
if 'nodes,[{disc,' in str_stat:
|
||||
pos_start = str_stat.find('nodes,[{disc,') + 13
|
||||
pos_end = str_stat.find(']}]},', pos_start) + 1
|
||||
str_nodes = str_stat[pos_start:pos_end].replace("'", '"')
|
||||
nodes = json.loads(str_nodes)
|
||||
for node in nodes:
|
||||
if node not in unit_node_names:
|
||||
errors.append('Cluster registration check failed on {}: '
|
||||
'{} should not be registered with RabbitMQ '
|
||||
'after unit removal.\n'
|
||||
''.format(unit_name, node))
|
||||
if errors:
|
||||
amulet.raise_status(amulet.FAIL, msg=errors)
|
||||
u.log.debug('OK')
|
||||
|
||||
def test_910_pause_and_resume(self):
|
||||
"""The services can be paused and resumed. """
|
||||
u.log.debug('Checking pause and resume actions...')
|
||||
|
|
|
@ -12,11 +12,13 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import mock
|
||||
import os
|
||||
import tempfile
|
||||
import subprocess
|
||||
import sys
|
||||
import collections
|
||||
import tempfile
|
||||
|
||||
from functools import wraps
|
||||
|
||||
from test_utils import CharmTestCase
|
||||
|
@ -85,10 +87,13 @@ class ConfigRendererTests(CharmTestCase):
|
|||
|
||||
|
||||
RABBITMQCTL_CLUSTERSTATUS_RUNNING = """Cluster status of node 'rabbit@juju-devel3-machine-19' ...
|
||||
[{nodes,[{disc,['rabbit@juju-devel3-machine-14',
|
||||
'rabbit@juju-devel3-machine-19']}]},
|
||||
{running_nodes,['rabbit@juju-devel3-machine-14',
|
||||
'rabbit@juju-devel3-machine-19']},
|
||||
[{nodes,
|
||||
[{disc,
|
||||
['rabbit@juju-devel3-machine-14','rabbit@juju-devel3-machine-19']},
|
||||
{ram,
|
||||
['rabbit@juju-devel3-machine-42']}]},
|
||||
{running_nodes,
|
||||
['rabbit@juju-devel3-machine-14','rabbit@juju-devel3-machine-19']},
|
||||
{cluster_name,<<"rabbit@juju-devel3-machine-14.openstacklocal">>},
|
||||
{partitions,[]}]
|
||||
"""
|
||||
|
@ -175,6 +180,16 @@ class UtilsTests(CharmTestCase):
|
|||
mock_running_nodes.return_value = ['a', 'b']
|
||||
self.assertTrue(rabbit_utils.clustered())
|
||||
|
||||
@mock.patch('rabbit_utils.subprocess')
|
||||
def test_nodes(self, mock_subprocess):
|
||||
'''Ensure cluster_status can be parsed for a clustered deployment'''
|
||||
mock_subprocess.check_output.return_value = \
|
||||
RABBITMQCTL_CLUSTERSTATUS_RUNNING
|
||||
self.assertEqual(rabbit_utils.nodes(),
|
||||
['rabbit@juju-devel3-machine-14',
|
||||
'rabbit@juju-devel3-machine-19',
|
||||
'rabbit@juju-devel3-machine-42'])
|
||||
|
||||
@mock.patch('rabbit_utils.subprocess')
|
||||
def test_running_nodes(self, mock_subprocess):
|
||||
'''Ensure cluster_status can be parsed for a clustered deployment'''
|
||||
|
@ -184,6 +199,14 @@ class UtilsTests(CharmTestCase):
|
|||
['rabbit@juju-devel3-machine-14',
|
||||
'rabbit@juju-devel3-machine-19'])
|
||||
|
||||
@mock.patch('rabbit_utils.subprocess')
|
||||
def test_nodes_solo(self, mock_subprocess):
|
||||
'''Ensure cluster_status can be parsed for a single unit deployment'''
|
||||
mock_subprocess.check_output.return_value = \
|
||||
RABBITMQCTL_CLUSTERSTATUS_SOLO
|
||||
self.assertEqual(rabbit_utils.nodes(),
|
||||
['rabbit@juju-devel3-machine-14'])
|
||||
|
||||
@mock.patch('rabbit_utils.subprocess')
|
||||
def test_running_nodes_solo(self, mock_subprocess):
|
||||
'''Ensure cluster_status can be parsed for a single unit deployment'''
|
||||
|
@ -581,3 +604,56 @@ class UtilsTests(CharmTestCase):
|
|||
def test_get_managment_port(self, mock_get_upstream_version):
|
||||
mock_get_upstream_version.return_value = '3.5.7'
|
||||
self.assertEqual(rabbit_utils.get_managment_port(), 15672)
|
||||
|
||||
@mock.patch('rabbit_utils.rabbitmqctl')
|
||||
@mock.patch('rabbit_utils.cmp_pkgrevno')
|
||||
def test_forget_cluster_node_old_rabbitmq(self, mock_cmp_pkgrevno,
|
||||
mock_rabbitmqctl):
|
||||
mock_cmp_pkgrevno.return_value = -1
|
||||
rabbit_utils.forget_cluster_node('a')
|
||||
self.assertFalse(mock_rabbitmqctl.called)
|
||||
|
||||
@mock.patch('rabbit_utils.log')
|
||||
@mock.patch('subprocess.check_call')
|
||||
@mock.patch('rabbit_utils.cmp_pkgrevno')
|
||||
def test_forget_cluster_node_subprocess_fails(self, mock_cmp_pkgrevno,
|
||||
mock_check_call,
|
||||
mock_log):
|
||||
mock_cmp_pkgrevno.return_value = 0
|
||||
|
||||
def raise_error(x):
|
||||
raise subprocess.CalledProcessError(2, x)
|
||||
mock_check_call.side_effect = raise_error
|
||||
|
||||
rabbit_utils.forget_cluster_node('a')
|
||||
mock_log.assert_called_with("Unable to remove node 'a' from cluster. "
|
||||
"It is either still running or already "
|
||||
"removed. (Output: 'None')", level='ERROR')
|
||||
|
||||
@mock.patch('rabbit_utils.rabbitmqctl')
|
||||
@mock.patch('rabbit_utils.cmp_pkgrevno')
|
||||
def test_forget_cluster_node(self, mock_cmp_pkgrevno, mock_rabbitmqctl):
|
||||
mock_cmp_pkgrevno.return_value = 1
|
||||
rabbit_utils.forget_cluster_node('a')
|
||||
mock_rabbitmqctl.assert_called_with('forget_cluster_node', 'a')
|
||||
|
||||
@mock.patch('rabbit_utils.forget_cluster_node')
|
||||
@mock.patch('rabbit_utils.relations_for_id')
|
||||
@mock.patch('rabbit_utils.subprocess')
|
||||
@mock.patch('rabbit_utils.relation_ids')
|
||||
def test_check_cluster_memberships(self, mock_relation_ids,
|
||||
mock_subprocess,
|
||||
mock_relations_for_id,
|
||||
mock_forget_cluster_node):
|
||||
mock_relation_ids.return_value = [0]
|
||||
mock_subprocess.check_output.return_value = \
|
||||
RABBITMQCTL_CLUSTERSTATUS_RUNNING
|
||||
mock_relations_for_id.return_value = [
|
||||
{'clustered': 'juju-devel3-machine-14'},
|
||||
{'clustered': 'juju-devel3-machine-19'},
|
||||
{'dummy-entry': 'to validate behaviour on relations without '
|
||||
'clustered key in dict'},
|
||||
]
|
||||
rabbit_utils.check_cluster_memberships()
|
||||
mock_forget_cluster_node.assert_called_with(
|
||||
'rabbit@juju-devel3-machine-42')
|
||||
|
|
Loading…
Reference in New Issue