Add action to check rmq queues

Report back to the user any queues with greater than N messages.

Change-Id: Ia39c05a4fe0ce74682af24ffe3d17d3006001b62
Closes-Bug: 1716981
This commit is contained in:
Jill Rouleau 2017-09-13 17:21:11 -07:00 committed by Jill Rouleau
parent 33e0d1d74b
commit 58b8ca09fc
5 changed files with 89 additions and 2 deletions

View File

@ -3,4 +3,21 @@ pause:
resume:
descrpition: Resume the rabbitmq unit.
cluster-status:
description: Show the current cluster status.
description: Show the current cluster status.
check-queues:
description: |
Show current queues, optionally only show queues with more than N messages
or queues from specified vhost.
params:
queue-depth:
type: integer
default: -1
description: |
Only show queues with >= this many messages. -1 shows all. Note that
if the result exceeds command line length (1/4 ulimit -s) on the target
system this will fail (For ex; -1 in an openstack env)
See lp:1437366, lp:1274460
vhost:
type: string
default: "/"
description: Show queues from the specified vhost. Eg; "openstack".

View File

@ -16,6 +16,8 @@
import os
import sys
import re
from subprocess import (
check_output,
CalledProcessError,
@ -26,6 +28,7 @@ sys.path.append('hooks/')
from charmhelpers.core.hookenv import (
action_fail,
action_set,
action_get,
)
from rabbit_utils import (
@ -62,9 +65,34 @@ def cluster_status(args):
raise
def check_queues(args):
"""Check for queues with greater than N messages.
Return those queues to the user."""
queue_depth = (action_get('queue-depth'))
vhost = (action_get('vhost'))
result = []
# rabbitmqctl's output contains lines we don't want, such as
# 'Listing queues ..' and '...done.', which may vary by release.
# Actual queue results *should* always look like 'test\t0'
queue_pattern = re.compile('.*\t[0-9]*')
try:
queues = check_output(['rabbitmqctl', 'list_queues',
'-p', vhost]).split('\n')
result = list({queue: size for (queue, size) in
[i.split('\t') for i in queues
if re.search(queue_pattern, i)]
if int(size) >= queue_depth})
action_set({'output': result, 'outcome': 'Success'})
except CalledProcessError as e:
action_set({'output': e.output})
action_fail('Failed to run rabbitmqctl list_queues')
# A dictionary of all the defined actions to callables (which take
# parsed arguments).
ACTIONS = {"pause": pause, "resume": resume, "cluster-status": cluster_status}
ACTIONS = {"pause": pause, "resume": resume, "cluster-status": cluster_status,
"check-queues": check_queues}
def main(args):

1
actions/check-queues Symbolic link
View File

@ -0,0 +1 @@
actions.py

View File

@ -715,3 +715,10 @@ class RmqBasicDeployment(OpenStackAmuletDeployment):
assert u.wait_on_action(action_id), "Cluster status action failed."
u.log.debug('OK')
def test_912_check_queues(self):
""" rabbitmqctl check_queues action can be returned. """
u.log.debug('Checking cluster status action...')
action_id = u.run_action(self.rmq0_sentry, "check-queues")
assert u.wait_on_action(action_id), "Check queues action failed."

View File

@ -77,6 +77,40 @@ class ClusterStatusTestCase(CharmTestCase):
self.action_fail.assert_called()
class CheckQueuesTestCase(CharmTestCase):
TEST_QUEUE_RESULT = 'Listing queues ...\ntest\t0\ntest\t0\n""'
def dummy_action_get(self, key):
action_values = {"queue-depth": -1, "vhost": "/"}
return action_values[key]
def setUp(self):
super(CheckQueuesTestCase, self).setUp(
actions, ["check_output", "action_set", "action_fail",
"ConfigRenderer", "action_get"])
def test_check_queues(self):
self.action_get.side_effect = self.dummy_action_get
self.check_output.return_value = self.TEST_QUEUE_RESULT
actions.check_queues([])
self.check_output.assert_called_once_with(['rabbitmqctl',
'list_queues',
'-p', "/"])
self.action_set.assert_called()
def test_check_queues_execption(self):
self.action_get.side_effect = self.dummy_action_get
self.check_output.return_value = self.TEST_QUEUE_RESULT
self.check_output.side_effect = actions.CalledProcessError(1,
"Failure")
actions.check_queues([])
self.check_output.assert_called_once_with(['rabbitmqctl',
'list_queues',
'-p', '/'])
class MainTestCase(CharmTestCase):
def setUp(self):