From 58b8ca09fc71f56ff5c09b768123336178bbd4e0 Mon Sep 17 00:00:00 2001 From: Jill Rouleau Date: Wed, 13 Sep 2017 17:21:11 -0700 Subject: [PATCH] Add action to check rmq queues Report back to the user any queues with greater than N messages. Change-Id: Ia39c05a4fe0ce74682af24ffe3d17d3006001b62 Closes-Bug: 1716981 --- actions.yaml | 19 ++++++++++++++++++- actions/actions.py | 30 +++++++++++++++++++++++++++++- actions/check-queues | 1 + tests/basic_deployment.py | 7 +++++++ unit_tests/test_actions.py | 34 ++++++++++++++++++++++++++++++++++ 5 files changed, 89 insertions(+), 2 deletions(-) create mode 120000 actions/check-queues diff --git a/actions.yaml b/actions.yaml index 2a78267f..8029a5ee 100644 --- a/actions.yaml +++ b/actions.yaml @@ -3,4 +3,21 @@ pause: resume: descrpition: Resume the rabbitmq unit. cluster-status: - description: Show the current cluster status. \ No newline at end of file + description: Show the current cluster status. +check-queues: + description: | + Show current queues, optionally only show queues with more than N messages + or queues from specified vhost. + params: + queue-depth: + type: integer + default: -1 + description: | + Only show queues with >= this many messages. -1 shows all. Note that + if the result exceeds command line length (1/4 ulimit -s) on the target + system this will fail (For ex; -1 in an openstack env) + See lp:1437366, lp:1274460 + vhost: + type: string + default: "/" + description: Show queues from the specified vhost. Eg; "openstack". diff --git a/actions/actions.py b/actions/actions.py index 94c81ce7..98e6140e 100755 --- a/actions/actions.py +++ b/actions/actions.py @@ -16,6 +16,8 @@ import os import sys +import re + from subprocess import ( check_output, CalledProcessError, @@ -26,6 +28,7 @@ sys.path.append('hooks/') from charmhelpers.core.hookenv import ( action_fail, action_set, + action_get, ) from rabbit_utils import ( @@ -62,9 +65,34 @@ def cluster_status(args): raise +def check_queues(args): + """Check for queues with greater than N messages. + Return those queues to the user.""" + queue_depth = (action_get('queue-depth')) + vhost = (action_get('vhost')) + result = [] + # rabbitmqctl's output contains lines we don't want, such as + # 'Listing queues ..' and '...done.', which may vary by release. + # Actual queue results *should* always look like 'test\t0' + queue_pattern = re.compile('.*\t[0-9]*') + try: + queues = check_output(['rabbitmqctl', 'list_queues', + '-p', vhost]).split('\n') + result = list({queue: size for (queue, size) in + [i.split('\t') for i in queues + if re.search(queue_pattern, i)] + if int(size) >= queue_depth}) + + action_set({'output': result, 'outcome': 'Success'}) + except CalledProcessError as e: + action_set({'output': e.output}) + action_fail('Failed to run rabbitmqctl list_queues') + + # A dictionary of all the defined actions to callables (which take # parsed arguments). -ACTIONS = {"pause": pause, "resume": resume, "cluster-status": cluster_status} +ACTIONS = {"pause": pause, "resume": resume, "cluster-status": cluster_status, + "check-queues": check_queues} def main(args): diff --git a/actions/check-queues b/actions/check-queues new file mode 120000 index 00000000..405a394e --- /dev/null +++ b/actions/check-queues @@ -0,0 +1 @@ +actions.py \ No newline at end of file diff --git a/tests/basic_deployment.py b/tests/basic_deployment.py index d20f0749..a672c233 100644 --- a/tests/basic_deployment.py +++ b/tests/basic_deployment.py @@ -715,3 +715,10 @@ class RmqBasicDeployment(OpenStackAmuletDeployment): assert u.wait_on_action(action_id), "Cluster status action failed." u.log.debug('OK') + + def test_912_check_queues(self): + """ rabbitmqctl check_queues action can be returned. """ + u.log.debug('Checking cluster status action...') + + action_id = u.run_action(self.rmq0_sentry, "check-queues") + assert u.wait_on_action(action_id), "Check queues action failed." diff --git a/unit_tests/test_actions.py b/unit_tests/test_actions.py index d1c4af5d..8a8a4baa 100644 --- a/unit_tests/test_actions.py +++ b/unit_tests/test_actions.py @@ -77,6 +77,40 @@ class ClusterStatusTestCase(CharmTestCase): self.action_fail.assert_called() +class CheckQueuesTestCase(CharmTestCase): + TEST_QUEUE_RESULT = 'Listing queues ...\ntest\t0\ntest\t0\n""' + + def dummy_action_get(self, key): + action_values = {"queue-depth": -1, "vhost": "/"} + return action_values[key] + + def setUp(self): + super(CheckQueuesTestCase, self).setUp( + actions, ["check_output", "action_set", "action_fail", + "ConfigRenderer", "action_get"]) + + def test_check_queues(self): + self.action_get.side_effect = self.dummy_action_get + self.check_output.return_value = self.TEST_QUEUE_RESULT + + actions.check_queues([]) + self.check_output.assert_called_once_with(['rabbitmqctl', + 'list_queues', + '-p', "/"]) + self.action_set.assert_called() + + def test_check_queues_execption(self): + self.action_get.side_effect = self.dummy_action_get + self.check_output.return_value = self.TEST_QUEUE_RESULT + + self.check_output.side_effect = actions.CalledProcessError(1, + "Failure") + actions.check_queues([]) + self.check_output.assert_called_once_with(['rabbitmqctl', + 'list_queues', + '-p', '/']) + + class MainTestCase(CharmTestCase): def setUp(self):