From 6672c481363f9cf35b82546a5fc057a716d607b5 Mon Sep 17 00:00:00 2001 From: Drew Freiberger Date: Tue, 16 May 2017 16:59:48 -0500 Subject: [PATCH] Added list-unconsumed-queues action Add status inquiry list-unconsumed-queues action to allow operators to determine which queues are not being consumed in each RMQ vhost. Useful for troubleshooting message queue volume alerts. Closes-Bug: 1767437 Change-Id: Icdd0b8c4db607701bc5e33d86e263b6a5f1bb7f5 --- actions.yaml | 8 ++++ actions/actions.py | 44 ++++++++++++++++++-- actions/list-unconsumed-queues | 1 + hooks/rabbit_utils.py | 28 +++++++++++++ tests/basic_deployment.py | 29 ++++++++++++++ unit_tests/test_actions.py | 71 +++++++++++++++++++++++++++++++++ unit_tests/test_rabbit_utils.py | 37 +++++++++++++++++ 7 files changed, 214 insertions(+), 4 deletions(-) create mode 120000 actions/list-unconsumed-queues diff --git a/actions.yaml b/actions.yaml index f71441ef..54ee142e 100644 --- a/actions.yaml +++ b/actions.yaml @@ -27,3 +27,11 @@ complete-cluster-series-upgrade: cluster the upgrade is complete cluster wide. This action should be performed on the current leader. Note the leader may have changed during the series upgrade process. +list-unconsumed-queues: + description: |- + list queues which currently have zero consumers, results are like: + unconsumed-queue-count: "2" + unconsumed-queues: + $vhost: + "0": queue_name1 - 0 + "1": $queue_name - $num_messages diff --git a/actions/actions.py b/actions/actions.py index 689b8c8d..25b5a791 100755 --- a/actions/actions.py +++ b/actions/actions.py @@ -13,9 +13,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import os import re +from collections import OrderedDict from subprocess import check_output, CalledProcessError import sys @@ -46,6 +47,8 @@ from hooks.rabbit_utils import ( pause_unit_helper, resume_unit_helper, assess_status, + list_vhosts, + vhost_queue_info, ) @@ -112,11 +115,44 @@ def complete_cluster_series_upgrade(args): assess_status(ConfigRenderer(CONFIG_FILES)) +def list_unconsumed_queues(args): + """List queues which are unconsumed in RabbitMQ""" + count = 0 + for vhost in list_vhosts(): + try: + queue_info_dict = vhost_queue_info(vhost) + except CalledProcessError as e: + # if no queues, just raises an exception + action_set({'output': e.output, + 'return-code': e.returncode}) + action_fail("Failed to query RabbitMQ vhost {} queues" + "".format(vhost)) + return False + + for queue in queue_info_dict: + if queue['consumers'] == 0: + vhostqueue = "unconsumed-queues.{}".format(count) + value = OrderedDict(( + ('vhost', vhost), + ('name', queue['name']), + ('messages', queue['messages']), + )) + action_set({vhostqueue: json.dumps(value)}) + count += 1 + + action_set({'unconsumed-queue-count': count}) + + # A dictionary of all the defined actions to callables (which take # parsed arguments). -ACTIONS = {"pause": pause, "resume": resume, "cluster-status": cluster_status, - "check-queues": check_queues, - "complete-cluster-series-upgrade": complete_cluster_series_upgrade} +ACTIONS = { + "pause": pause, + "resume": resume, + "cluster-status": cluster_status, + "check-queues": check_queues, + "complete-cluster-series-upgrade": complete_cluster_series_upgrade, + "list-unconsumed-queues": list_unconsumed_queues, +} def main(args): diff --git a/actions/list-unconsumed-queues b/actions/list-unconsumed-queues new file mode 120000 index 00000000..405a394e --- /dev/null +++ b/actions/list-unconsumed-queues @@ -0,0 +1 @@ +actions.py \ No newline at end of file diff --git a/hooks/rabbit_utils.py b/hooks/rabbit_utils.py index 315c1db1..23e6cd4d 100644 --- a/hooks/rabbit_utils.py +++ b/hooks/rabbit_utils.py @@ -191,6 +191,34 @@ def list_vhosts(): return [] +def vhost_queue_info(vhost): + """Provide a list of queue info objects for the given vhost in RabbitMQ + Each object provides name (str), messages (int), and consumers (int) + @raises CalledProcessError on failure to list_queues of the vhost + """ + cmd = [RABBITMQ_CTL, '-p', vhost, 'list_queues', + 'name', 'messages', 'consumers'] + output = subprocess.check_output(cmd).decode('utf-8') + + queue_info = [] + # NOTE(jamespage): Earlier rabbitmqctl versions append "...done" + # to the output of list_queues + if '...done' in output: + queues = output.split('\n')[1:-2] + else: + queues = output.split('\n')[1:-1] + + for queue in queues: + [qname, qmsgs, qconsumers] = queue.split() + queue_info.append({ + 'name': qname, + 'messages': int(qmsgs), + 'consumers': int(qconsumers) + }) + + return queue_info + + def vhost_exists(vhost): return vhost in list_vhosts() diff --git a/tests/basic_deployment.py b/tests/basic_deployment.py index a2d9a64d..94523e18 100644 --- a/tests/basic_deployment.py +++ b/tests/basic_deployment.py @@ -722,3 +722,32 @@ class RmqBasicDeployment(OpenStackAmuletDeployment): action_id = u.run_action(self.rmq0_sentry, "check-queues") assert u.wait_on_action(action_id), "Check queues action failed." + + def test_913_list_unconsumed_queues(self): + """ rabbitmqctl list-unconsumed-queues action can be returned. """ + u.log.debug('Checking list-unconsumed-queues action...') + + self._test_rmq_amqp_messages_all_units([self.rmq0_sentry]) + action_id = u.run_action(self.rmq0_sentry, "list-unconsumed-queues") + assert u.wait_on_action(action_id), \ + "list-unconsumed-queues action failed." + + result = amulet.actions.get_action_output(action_id, full_output=True) + queue_count = int(result['results']['unconsumed-queue-count']) + assert queue_count > 0, 'Did not find any unconsumed queues.' + + queue_name = 'test' # publish_amqp_message_by_unit default queue name + for i in range(queue_count): + queue_data = json.loads( + result['results']['unconsumed-queues'][str(i)]) + if queue_data['name'] == queue_name: + break + else: + assert False, 'Did not find expected queue in result.' + + # Since we just reused _test_rmq_amqp_messages_all_units, we should + # have created the queue if it didn't already exist, but all messages + # should have already been consumed. + assert queue_data['messages'] == 0, 'Found unexpected message count.' + + u.log.debug('OK') diff --git a/unit_tests/test_actions.py b/unit_tests/test_actions.py index 4c7c89c6..0694a1b7 100644 --- a/unit_tests/test_actions.py +++ b/unit_tests/test_actions.py @@ -111,6 +111,77 @@ class CheckQueuesTestCase(CharmTestCase): '-p', '/']) +class ListUnconsumedQueuesTestCase(CharmTestCase): + + def setUp(self): + super(ListUnconsumedQueuesTestCase, self).setUp( + actions, ["list_vhosts", "vhost_queue_info", "action_set", + "action_fail"]) + + def test_list_unconsumed_queues(self): + self.list_vhosts.return_value = ['/'] + self.vhost_queue_info.return_value = [ + {'name': 'unconsumed_queue', 'messages': 1, 'consumers': 0}, + {'name': 'consumed_queue', 'messages': 0, 'consumers': 1}] + actions.list_unconsumed_queues([]) + + self.list_vhosts.assert_called() + self.vhost_queue_info.assert_called_once_with('/') + calls = [ + mock.call({ + "unconsumed-queues.0": + '{"vhost": "/", "name": "unconsumed_queue", "messages": 1}'}), + mock.call({'unconsumed-queue-count': 1}) + ] + self.action_set.assert_has_calls(calls) + + def test_list_multiple_vhosts_unconsumed_queues(self): + self.list_vhosts.return_value = ['/', 'other_vhost'] + self.vhost_queue_info.return_value = [ + {'name': 'unconsumed_queue', 'messages': 1, 'consumers': 0}, + {'name': 'consumed_queue', 'messages': 0, 'consumers': 1}] + actions.list_unconsumed_queues([]) + + self.list_vhosts.assert_called() + calls = [ + mock.call({ + "unconsumed-queues.0": + '{"vhost": "/", "name": "unconsumed_queue", "messages": 1}'}), + mock.call({ + "unconsumed-queues.1": + '{"vhost": "other_vhost", "name": "unconsumed_queue", ' + '"messages": 1}'}), + mock.call({'unconsumed-queue-count': 2}) + ] + self.action_set.assert_has_calls(calls) + + def test_list_unconsumed_queues_no_unconsumed(self): + self.list_vhosts.return_value = ['/'] + self.vhost_queue_info.return_value = [ + {'name': 'consumed_queue', 'messages': 1, 'consumers': 1}, + {'name': 'consumed_queue2', 'messages': 0, 'consumers': 1}] + actions.list_unconsumed_queues([]) + + self.list_vhosts.assert_called() + self.vhost_queue_info.assert_called_once_with('/') + self.action_set.assert_called_once_with({'unconsumed-queue-count': 0}) + + def test_list_unconsumed_queues_exception(self): + self.vhost_queue_info.side_effect = \ + actions.CalledProcessError(1, "Failure") + self.list_vhosts.return_value = ['/'] + self.vhost_queue_info.return_value = [ + {'name': 'unconsumed_queue', 'messages': 1, 'consumers': 0}, + {'name': 'consumed_queue', 'messages': 0, 'consumers': 1}] + actions.list_unconsumed_queues([]) + + self.list_vhosts.assert_called() + self.vhost_queue_info.assert_called_once_with('/') + self.action_set.assert_called() + self.action_fail.assert_called_once_with( + "Failed to query RabbitMQ vhost / queues") + + class MainTestCase(CharmTestCase): def setUp(self): diff --git a/unit_tests/test_rabbit_utils.py b/unit_tests/test_rabbit_utils.py index 46402f6d..52a0b5b2 100644 --- a/unit_tests/test_rabbit_utils.py +++ b/unit_tests/test_rabbit_utils.py @@ -99,6 +99,21 @@ RABBITMQCTL_CLUSTERSTATUS_SOLO = b"""Cluster status of node 'rabbit@juju-devel3- {partitions,[]}] """ +RABBITMQCTL_LIST_QUEUES = b"""Listing queues ... +a_sample_queue 0 1 +cinder-scheduler.cinder 0 1 +cinder-fanout-12345 250 0 +myqueue 0 1 +...done +""" + +RABBITMQCTL_LIST_VHOSTS = b"""Listing vhosts ... +/ +landscape +openstack +...done +""" + class UtilsTests(CharmTestCase): def setUp(self): @@ -193,6 +208,28 @@ class UtilsTests(CharmTestCase): ['rabbit@juju-devel3-machine-14', 'rabbit@juju-devel3-machine-19']) + @mock.patch('rabbit_utils.subprocess') + def test_list_vhosts(self, mock_subprocess): + '''Ensure list_vhosts parses output into the proper list''' + mock_subprocess.check_output.return_value = \ + RABBITMQCTL_LIST_VHOSTS + self.assertEqual(rabbit_utils.list_vhosts(), + ['/', 'landscape', 'openstack']) + + @mock.patch('rabbit_utils.subprocess') + def test_vhost_queue_info(self, mock_subprocess): + '''Ensure vhost_queue_info parses output into the proper format/info''' + mock_subprocess.check_output.return_value = \ + RABBITMQCTL_LIST_QUEUES + self.assertEqual(rabbit_utils.vhost_queue_info('openstack'), + [{'name': 'a_sample_queue', 'messages': 0, + 'consumers': 1}, + {'name': 'cinder-scheduler.cinder', 'messages': 0, + 'consumers': 1}, + {'name': 'cinder-fanout-12345', 'messages': 250, + 'consumers': 0}, + {'name': 'myqueue', 'messages': 0, 'consumers': 1}]) + @mock.patch('rabbit_utils.subprocess') def test_nodes_solo(self, mock_subprocess): '''Ensure cluster_status can be parsed for a single unit deployment'''