charm-rabbitmq-server/scripts/check_rabbitmq.py

252 lines
8.7 KiB
Python
Executable File

#!/usr/bin/python
#
# #
# # # # # # #
# # # # # # #
# # # # # # #
# # # # # # # #
# # # # # # # # #
# ##### #### #### ####
# This file is managed by juju. Do not make local changes.
# Copyright (C) 2009, 2012 Canonical
# All Rights Reserved
#
# tests RabbitMQ operation
""" test rabbitmq functionality """
import os
import sys
import signal
import socket
try:
from amqplib import client_0_8 as amqp
except ImportError:
print "CRITICAL: amqplib not found"
sys.exit(2)
from optparse import OptionParser
ROUTE_KEY = "test_mq"
def alarm_handler(signum, frame):
print "TIMEOUT waiting for all queued messages to be delivered"
os._exit(1)
def get_connection(host_port, user, password, vhost, ssl, ssl_ca):
""" connect to the amqp service """
if options.verbose:
print "Connection to %s requested" % host_port
try:
params = {'host': host_port, 'userid': user, 'password': password,
'virtual_host': vhost, 'insist': False}
if ssl:
params['ssl'] = {'ca_certs': ssl_ca}
ret = amqp.Connection(**params)
except (socket.error, TypeError), e:
print "ERROR: Could not connect to RabbitMQ server %s:%d" % (
options.host, options.port)
if options.verbose:
print e
raise
sys.exit(2)
except Exception as ex:
print("ERROR: Unknown error connecting to RabbitMQ server %s:%d: %s"
% (options.host, options.port, ex))
if options.verbose:
raise
sys.exit(3)
return ret
def setup_exchange(conn, exchange_name, exchange_type):
""" create an exchange """
# see if we already have the exchange
must_create = False
chan = conn.channel()
try:
chan.exchange_declare(exchange=exchange_name, type=exchange_type,
passive=True)
except (amqp.AMQPConnectionException, amqp.AMQPChannelException), e:
if e.amqp_reply_code == 404:
must_create = True
# amqplib kills the channel on error.... we dispose of it too
chan.close()
chan = conn.channel()
else:
raise
# now create the exchange if needed
if must_create:
chan.exchange_declare(exchange=exchange_name, type=exchange_type,
durable=False, auto_delete=False,)
if options.verbose:
print "Created new exchange %s (%s)" % (
exchange_name, exchange_type)
else:
if options.verbose:
print "Exchange %s (%s) is already declared" % (
exchange_name, exchange_type)
chan.close()
return must_create
class Consumer(object):
""" message consumer class """
_quit = False
def __init__(self, conn, exname):
self.exname = exname
self.connection = conn
self.name = "%s_queue" % exname
def setup(self):
""" sets up the queue and links it to the exchange """
if options.verbose:
print self.name, "setup"
chan = self.connection.channel()
# setup the queue
chan.queue_declare(queue=self.name, durable=False,
exclusive=False, auto_delete=False)
chan.queue_bind(queue=self.name, exchange=self.exname,
routing_key=ROUTE_KEY)
chan.queue_purge(self.name)
chan.close()
def check_end(self, msg):
""" checks if this is an end request """
return msg.body.startswith("QUIT")
def loop(self, timeout=5):
""" main loop for the consumer client """
consumer_tag = "callback_%s" % self.name
chan = self.connection.channel()
def callback(msg):
""" callback for message received """
if options.verbose:
print "Client %s saw this message: '%s'" % (self.name, msg.body)
if self.check_end(msg): # we have been asked to quit
self._quit = True
chan.basic_consume(queue=self.name, no_ack=True, callback=callback,
consumer_tag=consumer_tag)
signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(timeout)
while True:
chan.wait()
if self._quit:
break
# cancel alarm for receive wait
signal.alarm(0)
chan.basic_cancel(consumer_tag)
chan.close()
return self._quit
def send_message(chan, exname, counter=None, message=None):
""" publish a message on the exchange """
if not message:
message = "This is test message %d" % counter
msg = amqp.Message(message)
chan.basic_publish(msg, exchange=exname, routing_key=ROUTE_KEY)
if options.verbose:
print "Sent message: %s" % message
def main_loop(conn, exname):
""" demo code to send/receive a few messages """
# first, set up a few consumers
# setup the queue that would collect the messages
consumer = Consumer(conn, exname)
consumer.setup()
# open up our own connection and start sending messages
chan = conn.channel()
# loop a few messages
for i in range(options.messages):
send_message(chan, exname, i)
# signal end of test
send_message(chan, exname, message="QUIT")
chan.close()
# loop around for a while waiting for messages to be picked up
return consumer.loop(timeout=options.timeout)
def main(host, port, exname, extype, user, password, vhost, ssl, ssl_ca):
""" setup the connection and the communication channel """
sys.stdout = os.fdopen(os.dup(1), "w", 0)
host_port = "%s:%s" % (host, port)
conn = get_connection(host_port, user, password, vhost, ssl, ssl_ca)
if setup_exchange(conn, exname, extype):
if options.verbose:
print "Created %s exchange of type %s" % (exname, extype)
else:
if options.verbose:
print "Reusing existing exchange %s of type %s" % (exname, extype)
ret = main_loop(conn, exname)
try:
conn.close()
except socket.error:
# when using SSL socket.shutdown() fails inside amqplib.
pass
return ret
if __name__ == '__main__':
parser = OptionParser()
parser.add_option("--host", dest="host",
help="RabbitMQ host [default=%default]",
metavar="HOST", default="localhost")
parser.add_option("--port", dest="port", type="int",
help="port RabbitMQ is running on [default=%default]",
metavar="PORT", default=5672)
parser.add_option("--exchange", dest="exchange",
help="Exchange name to use [default=%default]",
default="test_exchange", metavar="EXCHANGE")
parser.add_option("--type", dest="type",
help="EXCHANGE type [default=%default]",
metavar="TYPE", default="fanout")
parser.add_option("-v", "--verbose", default=False, action="store_true",
help="verbose run")
parser.add_option("-m", "--messages", dest="messages", type="int",
help="send NUM messages for testing [default=%default]",
metavar="NUM", default=10)
parser.add_option("-t", "--timeout", dest="timeout", type="int",
help="wait TIMEOUT sec for loop test [default=%default]",
metavar="TIMEOUT", default=5)
parser.add_option("-u", "--user", dest="user", default="guest",
help="RabbitMQ user [default=%default]",
metavar="USER")
parser.add_option("-p", "--password", dest="password", default="guest",
help="RabbitMQ password [default=%default]",
metavar="PASSWORD")
parser.add_option("--vhost", dest="vhost", default="/",
help="RabbitMQ vhost [default=%default]",
metavar="VHOST")
parser.add_option("--ssl", dest="ssl", default=False, action="store_true",
help="Connect using SSL")
parser.add_option("--ssl-ca", metavar="FILE", dest="ssl_ca",
help="SSL CA certificate path")
(options, args) = parser.parse_args()
if options.verbose:
print """
Using AMQP setup: host:port=%s:%d exchange_name=%s exchange_type=%s
""" % (options.host, options.port, options.exchange, options.type)
ret = main(options.host, options.port, options.exchange, options.type,
options.user, options.password, options.vhost, options.ssl,
options.ssl_ca)
if ret:
print "Ok: sent and received %d test messages" % options.messages
sys.exit(0)
print "ERROR: Could not send/receive test messages"
sys.exit(3)