[freyes, r=niedbalski] Adds support for mirrored queues when the configuration key 'mirroring-queues' is set to True.

This commit is contained in:
Felipe Reyes 2015-01-19 16:18:58 -03:00 committed by Jorge Niedbalski
commit 2fdd79b9c8
8 changed files with 230 additions and 21 deletions

View File

@ -1,2 +1,4 @@
bin
revision
.coverage
.venv

View File

@ -4,6 +4,17 @@ CHARM_DIR := $(PWD)
HOOKS_DIR := $(PWD)/hooks
TEST_PREFIX := PYTHONPATH=$(HOOKS_DIR)
clean:
rm -f .coverage
find . -name '*.pyc' -delete
rm -rf .venv
(which dh_clean && dh_clean) || true
.venv:
sudo apt-get install -y gcc python-dev python-virtualenv python-apt
virtualenv .venv --system-site-packages
.venv/bin/pip install -I -r test-requirements.txt
lint:
@flake8 --exclude hooks/charmhelpers hooks unit_tests
@charm proof
@ -20,11 +31,10 @@ publish: lint
bzr push lp:charms/rabbitmq-server
bzr push lp:charms/trusty/rabbitmq-server
functional_test:
@echo Starting functional tests...
unit_test: clean .venv
@echo Starting tests...
env CHARM_DIR=$(CHARM_DIR) $(TEST_PREFIX) .venv/bin/nosetests unit_tests/
functional_test:
@echo Starting amulet tests...
@juju test -v -p AMULET_HTTP_PROXY --timeout 900
unit_test:
@echo Starting unit tests...
CHARM_DIR=$(CHARM_DIR) $(TEST_PREFIX) nosetests unit_tests

View File

@ -80,6 +80,12 @@ options:
hacluster charm will keep rabbit in active/active setup, but in addition
it will deploy a VIP that can be used by services that cannot work
with mutiple AMQPs (like Glance in pre-Icehouse).
mirroring-queues:
type: boolean
default: True
description: |
When set to true the 'ha-mode: all' policy is applied to all the exchages
that match the expression '^(?!amq\.).*'
rbd-size:
type: string
default: 5G

View File

@ -50,18 +50,26 @@ HOSTS_FILE = '/etc/hosts'
_named_passwd = '/var/lib/charm/{}/{}.passwd'
def vhost_exists(vhost):
class RabbitmqError(Exception):
pass
def list_vhosts():
"""
Returns a list of all the available vhosts
"""
try:
cmd = [RABBITMQ_CTL, 'list_vhosts']
out = subprocess.check_output(cmd)
for line in out.split('\n')[1:]:
if line == vhost:
log('vhost (%s) already exists.' % vhost)
return True
return False
except:
output = subprocess.check_output([RABBITMQ_CTL, 'list_vhosts'])
return output.split('\n')[1:-2]
except Exception as ex:
# if no vhosts, just raises an exception
return False
log(str(ex), level='DEBUG')
return []
def vhost_exists(vhost):
return vhost in list_vhosts()
def create_vhost(vhost):
@ -109,6 +117,93 @@ def grant_permissions(user, vhost):
subprocess.check_call(cmd)
def set_policy(vhost, policy_name, match, value):
cmd = [RABBITMQ_CTL, 'set_policy', '-p', vhost,
policy_name, match, value]
log("setting policy: %s" % str(cmd), level='DEBUG')
subprocess.check_call(cmd)
def set_ha_mode(vhost, mode, params=None, sync_mode='automatic'):
"""Valid mode values:
* 'all': Queue is mirrored across all nodes in the cluster. When a new
node is added to the cluster, the queue will be mirrored to that node.
* 'exactly': Queue is mirrored to count nodes in the cluster.
* 'nodes': Queue is mirrored to the nodes listed in node names
More details at http://www.rabbitmq.com./ha.html
:param vhost: virtual host name
:param mode: ha mode
:param params: values to pass to the policy, possible values depend on the
mode chosen.
:param sync_mode: when `mode` is 'exactly' this used to indicate how the
sync has to be done
http://www.rabbitmq.com./ha.html#eager-synchronisation
"""
if cmp_pkgrevno('rabbitmq-server', '3.0.0') < 0:
log(("Mirroring queues cannot be enabled, only supported "
"in rabbitmq-server >= 3.0"), level='WARN')
log(("More information at http://www.rabbitmq.com/blog/"
"2012/11/19/breaking-things-with-rabbitmq-3-0"), level='INFO')
return
if mode == 'all':
value = '{"ha-mode": "all"}'
elif mode == 'exactly':
value = '{"ha-mode":"exactly","ha-params":%s,"ha-sync-mode":"%s"}' \
% (params, sync_mode)
elif mode == 'nodes':
value = '{"ha-mode":"nodes","ha-params":[%s]}' % ",".join(params)
else:
raise RabbitmqError(("Unknown mode '%s', known modes: "
"all, exactly, nodes"))
log("Setting HA policy to vhost '%s'" % vhost, level='INFO')
set_policy(vhost, 'HA', '^(?!amq\.).*', value)
def clear_ha_mode(vhost, name='HA', force=False):
"""
Clear policy from the `vhost` by `name`
"""
if cmp_pkgrevno('rabbitmq-server', '3.0.0') < 0:
log(("Mirroring queues not supported "
"in rabbitmq-server >= 3.0"), level='WARN')
log(("More information at http://www.rabbitmq.com/blog/"
"2012/11/19/breaking-things-with-rabbitmq-3-0"), level='INFO')
return
log("Clearing '%s' policy from vhost '%s'" % (name, vhost), level='INFO')
try:
subprocess.check_call([RABBITMQ_CTL, 'clear_policy', '-p', vhost,
name])
except subprocess.CalledProcessError as ex:
if not force:
raise ex
def set_all_mirroring_queues(enable):
"""
:param enable: if True then enable mirroring queue for all the vhosts,
otherwise the HA policy is removed
"""
if cmp_pkgrevno('rabbitmq-server', '3.0.0') < 0:
log(("Mirroring queues not supported "
"in rabbitmq-server >= 3.0"), level='WARN')
log(("More information at http://www.rabbitmq.com/blog/"
"2012/11/19/breaking-things-with-rabbitmq-3-0"), level='INFO')
return
for vhost in list_vhosts():
if enable:
set_ha_mode(vhost, 'all')
else:
clear_ha_mode(vhost, force=True)
def service(action):
cmd = ['service', 'rabbitmq-server', action]
subprocess.check_call(cmd)
@ -180,10 +275,6 @@ def cluster_with():
cmd = [RABBITMQ_CTL, 'start_app']
subprocess.check_call(cmd)
log('Host clustered with %s.' % node)
if cmp_pkgrevno('rabbitmq-server', '3.0.1') >= 0:
cmd = [RABBITMQ_CTL, 'set_policy', 'HA',
'^(?!amq\.).*', '{"ha-mode": "all"}']
subprocess.check_call(cmd)
return True
except:
log('Failed to cluster with %s.' % node)

View File

@ -88,6 +88,12 @@ def configure_amqp(username, vhost, admin=False):
rabbit.create_user(username, password, admin)
rabbit.grant_permissions(username, vhost)
# NOTE(freyes): after rabbitmq-server 3.0 the method to define HA in the
# queues is different
# http://www.rabbitmq.com/blog/2012/11/19/breaking-things-with-rabbitmq-3-0
if config('mirroring-queues'):
rabbit.set_ha_mode(vhost, 'all')
return password
@ -629,6 +635,8 @@ def config_changed():
configure_rabbit_ssl()
rabbit.set_all_mirroring_queues(config('mirroring-queues'))
if is_relation_made("ha"):
ha_is_active_active = config("ha-vip-only")

5
setup.cfg Normal file
View File

@ -0,0 +1,5 @@
[nosetests]
verbosity=2
with-coverage=1
cover-erase=1
cover-package=hooks

3
test-requirements.txt Normal file
View File

@ -0,0 +1,3 @@
nose
testtools
mock

View File

@ -0,0 +1,84 @@
#!/usr/bin/python
#
# This Amulet test deploys rabbitmq-server
#
# Note: We use python2, because pika doesn't support python3
import amulet
import pika
import telnetlib
# The number of seconds to wait for the environment to setup.
seconds = 1200
d = amulet.Deployment(series="trusty")
# Add the rabbitmq-server charm to the deployment.
d.add('rabbitmq-server', units=2)
# Create a configuration.
configuration = {'mirroring-queues': True}
d.configure('rabbitmq-server', configuration)
d.expose('rabbitmq-server')
try:
d.setup(timeout=seconds)
d.sentry.wait(seconds)
except amulet.helpers.TimeoutError:
message = 'The environment did not setup in %d seconds.' % seconds
amulet.raise_status(amulet.SKIP, msg=message)
except:
raise
rabbit_unit = d.sentry.unit['rabbitmq-server/0']
rabbit_unit2 = d.sentry.unit['rabbitmq-server/1']
commands = ['service rabbitmq-server status',
'rabbitmqctl cluster_status']
for cmd in commands:
output, code = rabbit_unit.run(cmd)
message = cmd + ' | exit code: %d.' % code
print(message)
print(output)
if code != 0:
amulet.raise_status(amulet.FAIL, msg=message)
rabbit_addr1 = rabbit_unit.info["public-address"]
rabbit_port = "5672"
rabbit_url = 'amqp://guest:guest@%s:%s/%%2F' % (rabbit_addr1, rabbit_port)
print('Connecting to %s' % rabbit_url)
conn1 = pika.BlockingConnection(pika.connection.URLParameters(rabbit_url))
channel = conn1.channel()
print('Declaring queue')
channel.queue_declare(queue='hello')
orig_msg = 'Hello World!'
print('Publishing message: %s' % orig_msg)
channel.basic_publish(exchange='',
routing_key='hello',
body=orig_msg)
print('stopping rabbit in unit 0')
rabbit_unit.run('service rabbitmq-server stop')
print('Consuming message from second unit')
rabbit_addr2 = rabbit_unit2.info["public-address"]
rabbit_url2 = 'amqp://guest:guest@%s:%s/%%2F' % (rabbit_addr2, rabbit_port)
conn2 = pika.BlockingConnection(pika.connection.URLParameters(rabbit_url2))
channel2 = conn2.channel()
method_frame, header_frame, body = channel2.basic_get('hello')
if method_frame:
print(method_frame, header_frame, body)
assert body == orig_msg, '%s != %s' % (body, orig_msg)
channel2.basic_ack(method_frame.delivery_tag)
else:
raise Exception('No message returned')
# check the management plugin is running
mgmt_port = "15672"
print('Checking management port')
telnetlib.Telnet(rabbit_addr1, mgmt_port)