Added list-unconsumed-queues action

Add status inquiry list-unconsumed-queues action to allow operators to
determine which queues are not being consumed in each RMQ vhost. Useful for
troubleshooting message queue volume alerts.

Closes-Bug: 1767437
Change-Id: Icdd0b8c4db607701bc5e33d86e263b6a5f1bb7f5
This commit is contained in:
Drew Freiberger 2017-05-16 16:59:48 -05:00 committed by Paul Goins
parent 065914d696
commit 6672c48136
7 changed files with 214 additions and 4 deletions

View File

@ -27,3 +27,11 @@ complete-cluster-series-upgrade:
cluster the upgrade is complete cluster wide.
This action should be performed on the current leader. Note the leader may
have changed during the series upgrade process.
list-unconsumed-queues:
description: |-
list queues which currently have zero consumers, results are like:
unconsumed-queue-count: "2"
unconsumed-queues:
$vhost:
"0": queue_name1 - 0
"1": $queue_name - $num_messages

View File

@ -13,9 +13,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import re
from collections import OrderedDict
from subprocess import check_output, CalledProcessError
import sys
@ -46,6 +47,8 @@ from hooks.rabbit_utils import (
pause_unit_helper,
resume_unit_helper,
assess_status,
list_vhosts,
vhost_queue_info,
)
@ -112,11 +115,44 @@ def complete_cluster_series_upgrade(args):
assess_status(ConfigRenderer(CONFIG_FILES))
def list_unconsumed_queues(args):
"""List queues which are unconsumed in RabbitMQ"""
count = 0
for vhost in list_vhosts():
try:
queue_info_dict = vhost_queue_info(vhost)
except CalledProcessError as e:
# if no queues, just raises an exception
action_set({'output': e.output,
'return-code': e.returncode})
action_fail("Failed to query RabbitMQ vhost {} queues"
"".format(vhost))
return False
for queue in queue_info_dict:
if queue['consumers'] == 0:
vhostqueue = "unconsumed-queues.{}".format(count)
value = OrderedDict((
('vhost', vhost),
('name', queue['name']),
('messages', queue['messages']),
))
action_set({vhostqueue: json.dumps(value)})
count += 1
action_set({'unconsumed-queue-count': count})
# A dictionary of all the defined actions to callables (which take
# parsed arguments).
ACTIONS = {"pause": pause, "resume": resume, "cluster-status": cluster_status,
"check-queues": check_queues,
"complete-cluster-series-upgrade": complete_cluster_series_upgrade}
ACTIONS = {
"pause": pause,
"resume": resume,
"cluster-status": cluster_status,
"check-queues": check_queues,
"complete-cluster-series-upgrade": complete_cluster_series_upgrade,
"list-unconsumed-queues": list_unconsumed_queues,
}
def main(args):

View File

@ -0,0 +1 @@
actions.py

View File

@ -191,6 +191,34 @@ def list_vhosts():
return []
def vhost_queue_info(vhost):
"""Provide a list of queue info objects for the given vhost in RabbitMQ
Each object provides name (str), messages (int), and consumers (int)
@raises CalledProcessError on failure to list_queues of the vhost
"""
cmd = [RABBITMQ_CTL, '-p', vhost, 'list_queues',
'name', 'messages', 'consumers']
output = subprocess.check_output(cmd).decode('utf-8')
queue_info = []
# NOTE(jamespage): Earlier rabbitmqctl versions append "...done"
# to the output of list_queues
if '...done' in output:
queues = output.split('\n')[1:-2]
else:
queues = output.split('\n')[1:-1]
for queue in queues:
[qname, qmsgs, qconsumers] = queue.split()
queue_info.append({
'name': qname,
'messages': int(qmsgs),
'consumers': int(qconsumers)
})
return queue_info
def vhost_exists(vhost):
return vhost in list_vhosts()

View File

@ -722,3 +722,32 @@ class RmqBasicDeployment(OpenStackAmuletDeployment):
action_id = u.run_action(self.rmq0_sentry, "check-queues")
assert u.wait_on_action(action_id), "Check queues action failed."
def test_913_list_unconsumed_queues(self):
""" rabbitmqctl list-unconsumed-queues action can be returned. """
u.log.debug('Checking list-unconsumed-queues action...')
self._test_rmq_amqp_messages_all_units([self.rmq0_sentry])
action_id = u.run_action(self.rmq0_sentry, "list-unconsumed-queues")
assert u.wait_on_action(action_id), \
"list-unconsumed-queues action failed."
result = amulet.actions.get_action_output(action_id, full_output=True)
queue_count = int(result['results']['unconsumed-queue-count'])
assert queue_count > 0, 'Did not find any unconsumed queues.'
queue_name = 'test' # publish_amqp_message_by_unit default queue name
for i in range(queue_count):
queue_data = json.loads(
result['results']['unconsumed-queues'][str(i)])
if queue_data['name'] == queue_name:
break
else:
assert False, 'Did not find expected queue in result.'
# Since we just reused _test_rmq_amqp_messages_all_units, we should
# have created the queue if it didn't already exist, but all messages
# should have already been consumed.
assert queue_data['messages'] == 0, 'Found unexpected message count.'
u.log.debug('OK')

View File

@ -111,6 +111,77 @@ class CheckQueuesTestCase(CharmTestCase):
'-p', '/'])
class ListUnconsumedQueuesTestCase(CharmTestCase):
def setUp(self):
super(ListUnconsumedQueuesTestCase, self).setUp(
actions, ["list_vhosts", "vhost_queue_info", "action_set",
"action_fail"])
def test_list_unconsumed_queues(self):
self.list_vhosts.return_value = ['/']
self.vhost_queue_info.return_value = [
{'name': 'unconsumed_queue', 'messages': 1, 'consumers': 0},
{'name': 'consumed_queue', 'messages': 0, 'consumers': 1}]
actions.list_unconsumed_queues([])
self.list_vhosts.assert_called()
self.vhost_queue_info.assert_called_once_with('/')
calls = [
mock.call({
"unconsumed-queues.0":
'{"vhost": "/", "name": "unconsumed_queue", "messages": 1}'}),
mock.call({'unconsumed-queue-count': 1})
]
self.action_set.assert_has_calls(calls)
def test_list_multiple_vhosts_unconsumed_queues(self):
self.list_vhosts.return_value = ['/', 'other_vhost']
self.vhost_queue_info.return_value = [
{'name': 'unconsumed_queue', 'messages': 1, 'consumers': 0},
{'name': 'consumed_queue', 'messages': 0, 'consumers': 1}]
actions.list_unconsumed_queues([])
self.list_vhosts.assert_called()
calls = [
mock.call({
"unconsumed-queues.0":
'{"vhost": "/", "name": "unconsumed_queue", "messages": 1}'}),
mock.call({
"unconsumed-queues.1":
'{"vhost": "other_vhost", "name": "unconsumed_queue", '
'"messages": 1}'}),
mock.call({'unconsumed-queue-count': 2})
]
self.action_set.assert_has_calls(calls)
def test_list_unconsumed_queues_no_unconsumed(self):
self.list_vhosts.return_value = ['/']
self.vhost_queue_info.return_value = [
{'name': 'consumed_queue', 'messages': 1, 'consumers': 1},
{'name': 'consumed_queue2', 'messages': 0, 'consumers': 1}]
actions.list_unconsumed_queues([])
self.list_vhosts.assert_called()
self.vhost_queue_info.assert_called_once_with('/')
self.action_set.assert_called_once_with({'unconsumed-queue-count': 0})
def test_list_unconsumed_queues_exception(self):
self.vhost_queue_info.side_effect = \
actions.CalledProcessError(1, "Failure")
self.list_vhosts.return_value = ['/']
self.vhost_queue_info.return_value = [
{'name': 'unconsumed_queue', 'messages': 1, 'consumers': 0},
{'name': 'consumed_queue', 'messages': 0, 'consumers': 1}]
actions.list_unconsumed_queues([])
self.list_vhosts.assert_called()
self.vhost_queue_info.assert_called_once_with('/')
self.action_set.assert_called()
self.action_fail.assert_called_once_with(
"Failed to query RabbitMQ vhost / queues")
class MainTestCase(CharmTestCase):
def setUp(self):

View File

@ -99,6 +99,21 @@ RABBITMQCTL_CLUSTERSTATUS_SOLO = b"""Cluster status of node 'rabbit@juju-devel3-
{partitions,[]}]
"""
RABBITMQCTL_LIST_QUEUES = b"""Listing queues ...
a_sample_queue 0 1
cinder-scheduler.cinder 0 1
cinder-fanout-12345 250 0
myqueue 0 1
...done
"""
RABBITMQCTL_LIST_VHOSTS = b"""Listing vhosts ...
/
landscape
openstack
...done
"""
class UtilsTests(CharmTestCase):
def setUp(self):
@ -193,6 +208,28 @@ class UtilsTests(CharmTestCase):
['rabbit@juju-devel3-machine-14',
'rabbit@juju-devel3-machine-19'])
@mock.patch('rabbit_utils.subprocess')
def test_list_vhosts(self, mock_subprocess):
'''Ensure list_vhosts parses output into the proper list'''
mock_subprocess.check_output.return_value = \
RABBITMQCTL_LIST_VHOSTS
self.assertEqual(rabbit_utils.list_vhosts(),
['/', 'landscape', 'openstack'])
@mock.patch('rabbit_utils.subprocess')
def test_vhost_queue_info(self, mock_subprocess):
'''Ensure vhost_queue_info parses output into the proper format/info'''
mock_subprocess.check_output.return_value = \
RABBITMQCTL_LIST_QUEUES
self.assertEqual(rabbit_utils.vhost_queue_info('openstack'),
[{'name': 'a_sample_queue', 'messages': 0,
'consumers': 1},
{'name': 'cinder-scheduler.cinder', 'messages': 0,
'consumers': 1},
{'name': 'cinder-fanout-12345', 'messages': 250,
'consumers': 0},
{'name': 'myqueue', 'messages': 0, 'consumers': 1}])
@mock.patch('rabbit_utils.subprocess')
def test_nodes_solo(self, mock_subprocess):
'''Ensure cluster_status can be parsed for a single unit deployment'''