If rabbit cluster is partioned show that in status

If rabbit cluster is partioned show that in status. This check
only works on focal+, prior to that the check is ignored.

Change-Id: Id45c969d37f8cb1c26d0f9834f4a79e7555dd03c
Closes-Bug: 1930417
This commit is contained in:
Liam Young 2021-06-17 13:36:42 +00:00
parent 453b8e979b
commit fbf3bda59a
2 changed files with 91 additions and 18 deletions

View File

@ -793,31 +793,61 @@ def services():
return list(set(_services))
def get_cluster_status(cmd_timeout=None):
"""Raturn rabbit cluster status
:param cmd_timeout: How long to give the command to complete.
:type cmd_timeout: int
:returns: Rabbitmq cluster status
:rtype: dict
:raises: NotImplementedError, subprocess.TimeoutExpired,
"""
if caching_cmp_pkgrevno('rabbitmq-server', '3.8.2') >= 0:
cmd = [RABBITMQ_CTL, 'cluster_status', '--formatter=json']
output = subprocess.check_output(
cmd,
timeout=cmd_timeout).decode('utf-8')
return json.loads(output)
else:
# rabbitmqctl has not implemented the formatter option.
raise NotImplementedError
@cached
def nodes(get_running=False):
''' Get list of nodes registered in the RabbitMQ cluster '''
# NOTE(ajkavanagh): In focal and above, rabbitmq-server now has a
# --formatter option.
if caching_cmp_pkgrevno('rabbitmq-server', '3.8.2') >= 0:
cmd = [RABBITMQ_CTL, 'cluster_status', '--formatter=json']
output = subprocess.check_output(cmd).decode('utf-8')
decoded = json.loads(output)
try:
status = get_cluster_status()
if get_running:
return decoded['running_nodes']
return decoded['disk_nodes'] + decoded['ram_nodes']
return status['running_nodes']
return status['disk_nodes'] + status['ram_nodes']
except NotImplementedError:
out = rabbitmqctl_normalized_output('cluster_status')
cluster_status = {}
for m in re.finditer(r"{([^,]+),(?!\[{)\[([^\]]*)", out):
state = m.group(1)
items = m.group(2).split(',')
items = [x.replace("'", '').strip() for x in items]
cluster_status.update({state: items})
out = rabbitmqctl_normalized_output('cluster_status')
cluster_status = {}
for m in re.finditer(r"{([^,]+),(?!\[{)\[([^\]]*)", out):
state = m.group(1)
items = m.group(2).split(',')
items = [x.replace("'", '').strip() for x in items]
cluster_status.update({state: items})
if get_running:
return cluster_status.get('running_nodes', [])
if get_running:
return cluster_status.get('running_nodes', [])
return cluster_status.get('disc', []) + cluster_status.get('ram', [])
return cluster_status.get('disc', []) + cluster_status.get('ram', [])
@cached
def is_partitioned():
"""Check whether rabbitmq cluster is partitioned.
:returns: Whether cluster is partitioned
:rtype: bool
:raises: NotImplementedError, subprocess.TimeoutExpired,
"""
status = get_cluster_status(cmd_timeout=60)
return status.get('partitions') != {}
@cached
@ -902,6 +932,13 @@ def assess_cluster_status(*args):
'node, remove with `forget-cluster-node` action'
.format(departed_node))
# Check if cluster is partitioned
try:
if peer_ids and len(related_units(peer_ids[0])) and is_partitioned():
return ('blocked', 'RabbitMQ is partitioned')
except (subprocess.TimeoutExpired, NotImplementedError):
pass
# General status check
if not wait_app():
return (

View File

@ -276,6 +276,33 @@ class UtilsTests(CharmTestCase):
['rabbit@juju-devel3-machine-14',
'rabbit@juju-devel3-machine-19'])
@mock.patch('rabbit_utils.caching_cmp_pkgrevno')
@mock.patch('rabbit_utils.subprocess')
def test_get_cluster_status(self, mock_subprocess, mock_cmp_pkgrevno):
mock_subprocess.check_output.return_value = b'{"status": "tip top"}'
mock_cmp_pkgrevno.return_value = -1
with self.assertRaises(NotImplementedError):
rabbit_utils.get_cluster_status()
mock_cmp_pkgrevno.return_value = 1
self.assertEqual(
rabbit_utils.get_cluster_status(),
{'status': 'tip top'})
mock_subprocess.check_output.reset_mock()
self.assertEqual(
rabbit_utils.get_cluster_status(cmd_timeout=42),
{'status': 'tip top'})
mock_subprocess.check_output.assert_called_once_with(
['/usr/sbin/rabbitmqctl', 'cluster_status', '--formatter=json'],
timeout=42)
@mock.patch('rabbit_utils.get_cluster_status')
def test_is_partitioned(self, get_cluster_status):
get_cluster_status.return_value = {'partitions': {}}
self.assertFalse(rabbit_utils.is_partitioned())
get_cluster_status.return_value = {
'partitions': {'node1': ['node2', 'node3']}}
self.assertTrue(rabbit_utils.is_partitioned())
@mock.patch('rabbit_utils.caching_cmp_pkgrevno')
@mock.patch('rabbit_utils.subprocess')
def test_list_vhosts(self, mock_subprocess, mock_cmp_pkgrevno):
@ -904,6 +931,7 @@ class UtilsTests(CharmTestCase):
'-p', 'test'
)
@mock.patch.object(rabbit_utils, 'is_partitioned')
@mock.patch.object(rabbit_utils, 'wait_app')
@mock.patch.object(rabbit_utils, 'check_cluster_memberships')
@mock.patch.object(rabbit_utils, 'clustered')
@ -913,8 +941,8 @@ class UtilsTests(CharmTestCase):
def test_assess_cluster_status(
self, rabbitmq_is_installed, is_unit_paused_set,
is_sufficient_peers, clustered, check_cluster_memberships,
wait_app):
wait_app, is_partitioned):
is_partitioned.return_value = False
self.relation_ids.return_value = ["cluster:1"]
self.related_units.return_value = ["rabbitmq-server/1"]
_min = 3
@ -966,8 +994,16 @@ class UtilsTests(CharmTestCase):
"Unable to determine if the rabbitmq service is up")
self.assertEqual(_expected, rabbit_utils.assess_cluster_status())
wait_app.return_value = True
is_partitioned.return_value = True
_expected = (
"blocked",
"RabbitMQ is partitioned")
self.assertEqual(_expected, rabbit_utils.assess_cluster_status())
# All OK
wait_app.return_value = True
is_partitioned.return_value = False
_expected = (
"active",
"message is ignored")