Merge "Added list-unconsumed-queues action"
This commit is contained in:
commit
b5e72994f2
|
@ -27,3 +27,11 @@ complete-cluster-series-upgrade:
|
|||
cluster the upgrade is complete cluster wide.
|
||||
This action should be performed on the current leader. Note the leader may
|
||||
have changed during the series upgrade process.
|
||||
list-unconsumed-queues:
|
||||
description: |-
|
||||
list queues which currently have zero consumers, results are like:
|
||||
unconsumed-queue-count: "2"
|
||||
unconsumed-queues:
|
||||
$vhost:
|
||||
"0": queue_name1 - 0
|
||||
"1": $queue_name - $num_messages
|
||||
|
|
|
@ -13,9 +13,10 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from collections import OrderedDict
|
||||
from subprocess import check_output, CalledProcessError
|
||||
import sys
|
||||
|
||||
|
@ -46,6 +47,8 @@ from hooks.rabbit_utils import (
|
|||
pause_unit_helper,
|
||||
resume_unit_helper,
|
||||
assess_status,
|
||||
list_vhosts,
|
||||
vhost_queue_info,
|
||||
)
|
||||
|
||||
|
||||
|
@ -112,11 +115,44 @@ def complete_cluster_series_upgrade(args):
|
|||
assess_status(ConfigRenderer(CONFIG_FILES))
|
||||
|
||||
|
||||
def list_unconsumed_queues(args):
|
||||
"""List queues which are unconsumed in RabbitMQ"""
|
||||
count = 0
|
||||
for vhost in list_vhosts():
|
||||
try:
|
||||
queue_info_dict = vhost_queue_info(vhost)
|
||||
except CalledProcessError as e:
|
||||
# if no queues, just raises an exception
|
||||
action_set({'output': e.output,
|
||||
'return-code': e.returncode})
|
||||
action_fail("Failed to query RabbitMQ vhost {} queues"
|
||||
"".format(vhost))
|
||||
return False
|
||||
|
||||
for queue in queue_info_dict:
|
||||
if queue['consumers'] == 0:
|
||||
vhostqueue = "unconsumed-queues.{}".format(count)
|
||||
value = OrderedDict((
|
||||
('vhost', vhost),
|
||||
('name', queue['name']),
|
||||
('messages', queue['messages']),
|
||||
))
|
||||
action_set({vhostqueue: json.dumps(value)})
|
||||
count += 1
|
||||
|
||||
action_set({'unconsumed-queue-count': count})
|
||||
|
||||
|
||||
# A dictionary of all the defined actions to callables (which take
|
||||
# parsed arguments).
|
||||
ACTIONS = {"pause": pause, "resume": resume, "cluster-status": cluster_status,
|
||||
"check-queues": check_queues,
|
||||
"complete-cluster-series-upgrade": complete_cluster_series_upgrade}
|
||||
ACTIONS = {
|
||||
"pause": pause,
|
||||
"resume": resume,
|
||||
"cluster-status": cluster_status,
|
||||
"check-queues": check_queues,
|
||||
"complete-cluster-series-upgrade": complete_cluster_series_upgrade,
|
||||
"list-unconsumed-queues": list_unconsumed_queues,
|
||||
}
|
||||
|
||||
|
||||
def main(args):
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
actions.py
|
|
@ -191,6 +191,34 @@ def list_vhosts():
|
|||
return []
|
||||
|
||||
|
||||
def vhost_queue_info(vhost):
|
||||
"""Provide a list of queue info objects for the given vhost in RabbitMQ
|
||||
Each object provides name (str), messages (int), and consumers (int)
|
||||
@raises CalledProcessError on failure to list_queues of the vhost
|
||||
"""
|
||||
cmd = [RABBITMQ_CTL, '-p', vhost, 'list_queues',
|
||||
'name', 'messages', 'consumers']
|
||||
output = subprocess.check_output(cmd).decode('utf-8')
|
||||
|
||||
queue_info = []
|
||||
# NOTE(jamespage): Earlier rabbitmqctl versions append "...done"
|
||||
# to the output of list_queues
|
||||
if '...done' in output:
|
||||
queues = output.split('\n')[1:-2]
|
||||
else:
|
||||
queues = output.split('\n')[1:-1]
|
||||
|
||||
for queue in queues:
|
||||
[qname, qmsgs, qconsumers] = queue.split()
|
||||
queue_info.append({
|
||||
'name': qname,
|
||||
'messages': int(qmsgs),
|
||||
'consumers': int(qconsumers)
|
||||
})
|
||||
|
||||
return queue_info
|
||||
|
||||
|
||||
def vhost_exists(vhost):
|
||||
return vhost in list_vhosts()
|
||||
|
||||
|
|
|
@ -722,3 +722,32 @@ class RmqBasicDeployment(OpenStackAmuletDeployment):
|
|||
|
||||
action_id = u.run_action(self.rmq0_sentry, "check-queues")
|
||||
assert u.wait_on_action(action_id), "Check queues action failed."
|
||||
|
||||
def test_913_list_unconsumed_queues(self):
|
||||
""" rabbitmqctl list-unconsumed-queues action can be returned. """
|
||||
u.log.debug('Checking list-unconsumed-queues action...')
|
||||
|
||||
self._test_rmq_amqp_messages_all_units([self.rmq0_sentry])
|
||||
action_id = u.run_action(self.rmq0_sentry, "list-unconsumed-queues")
|
||||
assert u.wait_on_action(action_id), \
|
||||
"list-unconsumed-queues action failed."
|
||||
|
||||
result = amulet.actions.get_action_output(action_id, full_output=True)
|
||||
queue_count = int(result['results']['unconsumed-queue-count'])
|
||||
assert queue_count > 0, 'Did not find any unconsumed queues.'
|
||||
|
||||
queue_name = 'test' # publish_amqp_message_by_unit default queue name
|
||||
for i in range(queue_count):
|
||||
queue_data = json.loads(
|
||||
result['results']['unconsumed-queues'][str(i)])
|
||||
if queue_data['name'] == queue_name:
|
||||
break
|
||||
else:
|
||||
assert False, 'Did not find expected queue in result.'
|
||||
|
||||
# Since we just reused _test_rmq_amqp_messages_all_units, we should
|
||||
# have created the queue if it didn't already exist, but all messages
|
||||
# should have already been consumed.
|
||||
assert queue_data['messages'] == 0, 'Found unexpected message count.'
|
||||
|
||||
u.log.debug('OK')
|
||||
|
|
|
@ -111,6 +111,77 @@ class CheckQueuesTestCase(CharmTestCase):
|
|||
'-p', '/'])
|
||||
|
||||
|
||||
class ListUnconsumedQueuesTestCase(CharmTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ListUnconsumedQueuesTestCase, self).setUp(
|
||||
actions, ["list_vhosts", "vhost_queue_info", "action_set",
|
||||
"action_fail"])
|
||||
|
||||
def test_list_unconsumed_queues(self):
|
||||
self.list_vhosts.return_value = ['/']
|
||||
self.vhost_queue_info.return_value = [
|
||||
{'name': 'unconsumed_queue', 'messages': 1, 'consumers': 0},
|
||||
{'name': 'consumed_queue', 'messages': 0, 'consumers': 1}]
|
||||
actions.list_unconsumed_queues([])
|
||||
|
||||
self.list_vhosts.assert_called()
|
||||
self.vhost_queue_info.assert_called_once_with('/')
|
||||
calls = [
|
||||
mock.call({
|
||||
"unconsumed-queues.0":
|
||||
'{"vhost": "/", "name": "unconsumed_queue", "messages": 1}'}),
|
||||
mock.call({'unconsumed-queue-count': 1})
|
||||
]
|
||||
self.action_set.assert_has_calls(calls)
|
||||
|
||||
def test_list_multiple_vhosts_unconsumed_queues(self):
|
||||
self.list_vhosts.return_value = ['/', 'other_vhost']
|
||||
self.vhost_queue_info.return_value = [
|
||||
{'name': 'unconsumed_queue', 'messages': 1, 'consumers': 0},
|
||||
{'name': 'consumed_queue', 'messages': 0, 'consumers': 1}]
|
||||
actions.list_unconsumed_queues([])
|
||||
|
||||
self.list_vhosts.assert_called()
|
||||
calls = [
|
||||
mock.call({
|
||||
"unconsumed-queues.0":
|
||||
'{"vhost": "/", "name": "unconsumed_queue", "messages": 1}'}),
|
||||
mock.call({
|
||||
"unconsumed-queues.1":
|
||||
'{"vhost": "other_vhost", "name": "unconsumed_queue", '
|
||||
'"messages": 1}'}),
|
||||
mock.call({'unconsumed-queue-count': 2})
|
||||
]
|
||||
self.action_set.assert_has_calls(calls)
|
||||
|
||||
def test_list_unconsumed_queues_no_unconsumed(self):
|
||||
self.list_vhosts.return_value = ['/']
|
||||
self.vhost_queue_info.return_value = [
|
||||
{'name': 'consumed_queue', 'messages': 1, 'consumers': 1},
|
||||
{'name': 'consumed_queue2', 'messages': 0, 'consumers': 1}]
|
||||
actions.list_unconsumed_queues([])
|
||||
|
||||
self.list_vhosts.assert_called()
|
||||
self.vhost_queue_info.assert_called_once_with('/')
|
||||
self.action_set.assert_called_once_with({'unconsumed-queue-count': 0})
|
||||
|
||||
def test_list_unconsumed_queues_exception(self):
|
||||
self.vhost_queue_info.side_effect = \
|
||||
actions.CalledProcessError(1, "Failure")
|
||||
self.list_vhosts.return_value = ['/']
|
||||
self.vhost_queue_info.return_value = [
|
||||
{'name': 'unconsumed_queue', 'messages': 1, 'consumers': 0},
|
||||
{'name': 'consumed_queue', 'messages': 0, 'consumers': 1}]
|
||||
actions.list_unconsumed_queues([])
|
||||
|
||||
self.list_vhosts.assert_called()
|
||||
self.vhost_queue_info.assert_called_once_with('/')
|
||||
self.action_set.assert_called()
|
||||
self.action_fail.assert_called_once_with(
|
||||
"Failed to query RabbitMQ vhost / queues")
|
||||
|
||||
|
||||
class MainTestCase(CharmTestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
|
|
@ -99,6 +99,21 @@ RABBITMQCTL_CLUSTERSTATUS_SOLO = b"""Cluster status of node 'rabbit@juju-devel3-
|
|||
{partitions,[]}]
|
||||
"""
|
||||
|
||||
RABBITMQCTL_LIST_QUEUES = b"""Listing queues ...
|
||||
a_sample_queue 0 1
|
||||
cinder-scheduler.cinder 0 1
|
||||
cinder-fanout-12345 250 0
|
||||
myqueue 0 1
|
||||
...done
|
||||
"""
|
||||
|
||||
RABBITMQCTL_LIST_VHOSTS = b"""Listing vhosts ...
|
||||
/
|
||||
landscape
|
||||
openstack
|
||||
...done
|
||||
"""
|
||||
|
||||
|
||||
class UtilsTests(CharmTestCase):
|
||||
def setUp(self):
|
||||
|
@ -193,6 +208,28 @@ class UtilsTests(CharmTestCase):
|
|||
['rabbit@juju-devel3-machine-14',
|
||||
'rabbit@juju-devel3-machine-19'])
|
||||
|
||||
@mock.patch('rabbit_utils.subprocess')
|
||||
def test_list_vhosts(self, mock_subprocess):
|
||||
'''Ensure list_vhosts parses output into the proper list'''
|
||||
mock_subprocess.check_output.return_value = \
|
||||
RABBITMQCTL_LIST_VHOSTS
|
||||
self.assertEqual(rabbit_utils.list_vhosts(),
|
||||
['/', 'landscape', 'openstack'])
|
||||
|
||||
@mock.patch('rabbit_utils.subprocess')
|
||||
def test_vhost_queue_info(self, mock_subprocess):
|
||||
'''Ensure vhost_queue_info parses output into the proper format/info'''
|
||||
mock_subprocess.check_output.return_value = \
|
||||
RABBITMQCTL_LIST_QUEUES
|
||||
self.assertEqual(rabbit_utils.vhost_queue_info('openstack'),
|
||||
[{'name': 'a_sample_queue', 'messages': 0,
|
||||
'consumers': 1},
|
||||
{'name': 'cinder-scheduler.cinder', 'messages': 0,
|
||||
'consumers': 1},
|
||||
{'name': 'cinder-fanout-12345', 'messages': 250,
|
||||
'consumers': 0},
|
||||
{'name': 'myqueue', 'messages': 0, 'consumers': 1}])
|
||||
|
||||
@mock.patch('rabbit_utils.subprocess')
|
||||
def test_nodes_solo(self, mock_subprocess):
|
||||
'''Ensure cluster_status can be parsed for a single unit deployment'''
|
||||
|
|
Loading…
Reference in New Issue