From c872f8d67045fb287850d8cdc1ba4b865c84c19c Mon Sep 17 00:00:00 2001 From: Numan Siddique Date: Mon, 24 Feb 2014 18:04:11 +0530 Subject: [PATCH] Adds unit test cases to impl_qpid Change-Id: I0c056efd9fb8b48d02a11dfb969ee98b736ba017 Closes-Bug: #1255239 --- test-requirements.txt | 1 + tests/test_qpid.py | 667 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 668 insertions(+) create mode 100644 tests/test_qpid.py diff --git a/test-requirements.txt b/test-requirements.txt index c7b3c2654..6cc413005 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -22,3 +22,4 @@ coverage>=3.6 # this is required for the docs build jobs sphinx>=1.1.2,<1.2 oslosphinx +qpid-python diff --git a/tests/test_qpid.py b/tests/test_qpid.py new file mode 100644 index 000000000..3f3fa7726 --- /dev/null +++ b/tests/test_qpid.py @@ -0,0 +1,667 @@ +# Copyright (C) 2014 eNovance SAS +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import random +import thread +import threading +import time + +import qpid +import testscenarios + +from oslo.config import cfg +from oslo import messaging +from oslo.messaging._drivers import impl_qpid as qpid_driver +from tests import utils as test_utils + + +load_tests = testscenarios.load_tests_apply_scenarios + +QPID_BROKER = 'localhost:5672' + + +class TestQpidDriverLoad(test_utils.BaseTestCase): + + def setUp(self): + super(TestQpidDriverLoad, self).setUp() + self.messaging_conf.transport_driver = 'qpid' + + def test_driver_load(self): + transport = messaging.get_transport(self.conf) + self.assertIsInstance(transport._driver, qpid_driver.QpidDriver) + + +def _is_qpidd_service_running(): + + """this function checks if the qpid service is running or not.""" + + qpid_running = True + try: + broker = QPID_BROKER + connection = qpid.messaging.Connection(broker) + connection.open() + except Exception: + # qpid service is not runnung. + qpid_running = False + else: + connection.close() + + return qpid_running + + +class TestQpidInvalidTopologyVersion(test_utils.BaseTestCase): + """Unit test cases to test invalid qpid topology version.""" + + scenarios = [ + ('direct', dict(consumer_cls=qpid_driver.DirectConsumer, + publisher_cls=qpid_driver.DirectPublisher)), + ('topic', dict(consumer_cls=qpid_driver.TopicConsumer, + publisher_cls=qpid_driver.TopicPublisher)), + ('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer, + publisher_cls=qpid_driver.FanoutPublisher)), + ] + + def setUp(self): + super(TestQpidInvalidTopologyVersion, self).setUp() + + self.qpid_opts = [ + cfg.BoolOpt('amqp_durable_queues', + default=False, + deprecated_name='rabbit_durable_queues', + deprecated_group='DEFAULT', + help='Use durable queues in amqp.'), + cfg.BoolOpt('amqp_auto_delete', + default=False, + help='Auto-delete queues in amqp.'), + cfg.IntOpt('qpid_topology_version', + default=-1, + help='qpid topology version'), + cfg.StrOpt('control_exchange', + default='openstack', + help='AMQP exchange to connect to if using Qpid'), + ] + + self.qpid_conf = cfg.ConfigOpts() + self.qpid_conf.register_opts(self.qpid_opts) + + self.fake_qpid = not _is_qpidd_service_running() + + if self.fake_qpid: + self.session = get_fake_qpid_session() + else: + self.broker = QPID_BROKER + # create connection from the qpid.messaging + self.connection = qpid.messaging.Connection(self.broker) + self.connection.open() + self.session = self.connection.session() + + def tearDown(self): + self.qpid_conf.unregister_opts(self.qpid_opts) + super(TestQpidInvalidTopologyVersion, self).tearDown() + + if self.fake_qpid: + _fake_session.flush_exchanges() + else: + self.connection.close() + + def test_invalid_topology_version(self): + def consumer_callback(msg): + pass + + msgid_or_topic = 'test' + + # not using self.assertRaises because + # 1. qpid driver raises Exception(msg) for invalid topology version + # 2. flake8 - H202 assertRaises Exception too broad + exception_msg = ("Invalid value for qpid_topology_version: %d" % + self.qpid_conf.qpid_topology_version) + recvd_exc_msg = '' + + try: + self.consumer_cls(self.qpid_conf, self.session, msgid_or_topic, + consumer_callback) + except Exception as e: + recvd_exc_msg = e.message + + self.assertEqual(exception_msg, recvd_exc_msg) + + recvd_exc_msg = '' + try: + self.publisher_cls(self.qpid_conf, self.session, msgid_or_topic) + except Exception as e: + recvd_exc_msg = e.message + + self.assertEqual(exception_msg, recvd_exc_msg) + + +class TestQpidDirectConsumerPublisher(test_utils.BaseTestCase): + """Unit test cases to test DirectConsumer and Direct Publisher.""" + + _n_qpid_topology = [ + ('v1', dict(qpid_topology=1)), + ('v2', dict(qpid_topology=2)), + ] + + _n_msgs = [ + ('single', dict(no_msgs=1)), + ('multiple', dict(no_msgs=10)), + ] + + @classmethod + def generate_scenarios(cls): + cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology, + cls._n_msgs) + + def setUp(self): + super(TestQpidDirectConsumerPublisher, self).setUp() + + self.qpid_opts = [ + cfg.BoolOpt('amqp_durable_queues', + default=False, + deprecated_name='rabbit_durable_queues', + deprecated_group='DEFAULT', + help='Use durable queues in amqp.'), + cfg.BoolOpt('amqp_auto_delete', + default=False, + help='Auto-delete queues in amqp.'), + cfg.IntOpt('qpid_topology_version', + default=self.qpid_topology, + help='qpid topology version'), + ] + + self.qpid_conf = cfg.ConfigOpts() + self.qpid_conf.register_opts(self.qpid_opts) + self.fake_qpid = not _is_qpidd_service_running() + + if self.fake_qpid: + self.session_receive = get_fake_qpid_session() + self.session_send = get_fake_qpid_session() + else: + self.broker = QPID_BROKER + # create connection from the qpid.messaging + # connection for the Consumer. + self.con_receive = qpid.messaging.Connection(self.broker) + self.con_receive.open() + # session to receive the messages + self.session_receive = self.con_receive.session() + + # connection for sending the message + self.con_send = qpid.messaging.Connection(self.broker) + self.con_send.open() + # session to send the messages + self.session_send = self.con_send.session() + + # list to store the expected messages and + # the actual received messages + self._expected = [] + self._messages = [] + + def tearDown(self): + self.qpid_conf.unregister_opts(self.qpid_opts) + super(TestQpidDirectConsumerPublisher, self).tearDown() + + if self.fake_qpid: + _fake_session.flush_exchanges() + else: + self.con_receive.close() + self.con_send.close() + + def consumer_callback(self, msg): + # This function will be called by the DirectConsumer + # when any message is received. + # Append the received message into the messages list + # so that the received messages can be validated + # with the expected messages + if isinstance(msg, dict): + self._messages.append(msg['content']) + else: + self._messages.append(msg) + + def test_qpid_direct_consumer_producer(self): + self.msgid = str(random.randint(1, 100)) + + # create a DirectConsumer and DirectPublisher class objects + self.dir_cons = qpid_driver.DirectConsumer(self.qpid_conf, + self.session_receive, + self.msgid, + self.consumer_callback) + self.dir_pub = qpid_driver.DirectPublisher(self.qpid_conf, + self.session_send, + self.msgid) + + def try_send_msg(no_msgs): + for i in range(no_msgs): + self._expected.append(str(i)) + snd_msg = {'content_type': 'text/plain', 'content': str(i)} + self.dir_pub.send(snd_msg) + + def try_receive_msg(no_msgs): + for i in range(no_msgs): + self.dir_cons.consume() + + thread1 = threading.Thread(target=try_receive_msg, + args=(self.no_msgs,)) + thread2 = threading.Thread(target=try_send_msg, + args=(self.no_msgs,)) + + thread1.start() + thread2.start() + thread1.join() + thread2.join() + + self.assertEqual(len(self._messages), self.no_msgs) + self.assertEqual(self._messages, self._expected) + + +TestQpidDirectConsumerPublisher.generate_scenarios() + + +class TestQpidTopicAndFanout(test_utils.BaseTestCase): + """Unit Test cases to test TopicConsumer and + TopicPublisher classes of the qpid driver + and FanoutConsumer and FanoutPublisher classes + of the qpid driver + """ + + _n_qpid_topology = [ + ('v1', dict(qpid_topology=1)), + ('v2', dict(qpid_topology=2)), + ] + + _n_msgs = [ + ('single', dict(no_msgs=1)), + ('multiple', dict(no_msgs=10)), + ] + + _n_senders = [ + ('single', dict(no_senders=1)), + ('multiple', dict(no_senders=10)), + ] + + _n_receivers = [ + ('single', dict(no_receivers=1)), + ] + _exchange_class = [ + ('topic', dict(consumer_cls=qpid_driver.TopicConsumer, + publisher_cls=qpid_driver.TopicPublisher, + topic='topictest.test', + receive_topic='topictest.test')), + ('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer, + publisher_cls=qpid_driver.FanoutPublisher, + topic='fanouttest', + receive_topic='fanouttest')), + ] + + @classmethod + def generate_scenarios(cls): + cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology, + cls._n_msgs, + cls._n_senders, + cls._n_receivers, + cls._exchange_class) + + def setUp(self): + self.qpid_opts = [ + cfg.BoolOpt('amqp_durable_queues', + default=False, + deprecated_name='rabbit_durable_queues', + deprecated_group='DEFAULT', + help='Use durable queues in amqp.'), + cfg.BoolOpt('amqp_auto_delete', + default=False, + help='Auto-delete queues in amqp.'), + cfg.IntOpt('qpid_topology_version', + default=self.qpid_topology, + help='qpid topology version'), + cfg.StrOpt('control_exchange', + default='openstack', + help='AMQP exchange to connect to if using Qpid'), + ] + + super(TestQpidTopicAndFanout, self).setUp() + self.qpid_conf = cfg.ConfigOpts() + self.qpid_conf.register_opts(self.qpid_opts) + + self._fake_qpid = not _is_qpidd_service_running() + + if self._fake_qpid: + self.session_receive = get_fake_qpid_session() + self.session_send = get_fake_qpid_session() + else: + self.broker = QPID_BROKER + # connection for the Consumer. + self.con_receive = qpid.messaging.Connection(self.broker) + self.con_receive.open() + # session to receive the messages + self.session_receive = self.con_receive.session() + + # connection for sending the message + self.con_send = qpid.messaging.Connection(self.broker) + self.con_send.open() + # session to send the messages + self.session_send = self.con_send.session() + + # to store the expected messages and the + # actual received messages + self._expected = {} + self._messages = {} + + self._senders = [] + self._receivers = [] + + self._sender_threads = [] + self._receiver_threads = [] + + def tearDown(self): + self.qpid_conf.unregister_opts(self.qpid_opts) + super(TestQpidTopicAndFanout, self).tearDown() + if self._fake_qpid: + _fake_session.flush_exchanges() + else: + self.con_receive.close() + self.con_send.close() + + def consumer_callback(self, msg): + """callback function called by the ConsumerBase class of + qpid driver. + Message will be received in the format x-y + where x is the sender id and y is the msg number of the sender + extract the sender id 'x' and store the msg 'x-y' with 'x' as + the key + """ + + if isinstance(msg, dict): + msgcontent = msg['content'] + else: + msgcontent = msg + + splitmsg = msgcontent.split('-') + key = thread.get_ident() + + if key not in self._messages: + self._messages[key] = dict() + + tdict = self._messages[key] + + if splitmsg[0] not in tdict: + tdict[splitmsg[0]] = [] + + tdict[splitmsg[0]].append(msgcontent) + + def _try_send_msg(self, sender_id, no_msgs): + for i in range(no_msgs): + sendmsg = '%s-%s' % (str(sender_id), str(i)) + key = str(sender_id) + # Store the message in the self._expected for each sender. + # This will be used later to + # validate the test by comparing it with the + # received messages by all the receivers + if key not in self._expected: + self._expected[key] = [] + self._expected[key].append(sendmsg) + send_dict = {'content_type': 'text/plain', 'content': sendmsg} + self._senders[sender_id].send(send_dict) + + def _try_receive_msg(self, receiver_id, no_msgs): + for i in range(self.no_senders * no_msgs): + no_of_attempts = 0 + + # ConsumerBase.consume blocks indefinitely until a message + # is received. + # So qpid_receiver.available() is called before calling + # ConsumerBase.consume() so that we are not + # blocked indefinitely + qpid_receiver = self._receivers[receiver_id].get_receiver() + while no_of_attempts < 50: + if qpid_receiver.available() > 0: + self._receivers[receiver_id].consume() + break + no_of_attempts += 1 + time.sleep(0.05) + + def test_qpid_topic_and_fanout(self): + for receiver_id in range(self.no_receivers): + consumer = self.consumer_cls(self.qpid_conf, + self.session_receive, + self.receive_topic, + self.consumer_callback) + self._receivers.append(consumer) + + # create receivers threads + thread = threading.Thread(target=self._try_receive_msg, + args=(receiver_id, self.no_msgs,)) + self._receiver_threads.append(thread) + + for sender_id in range(self.no_senders): + publisher = self.publisher_cls(self.qpid_conf, + self.session_send, + self.topic) + self._senders.append(publisher) + + # create sender threads + thread = threading.Thread(target=self._try_send_msg, + args=(sender_id, self.no_msgs,)) + self._sender_threads.append(thread) + + for thread in self._receiver_threads: + thread.start() + + for thread in self._sender_threads: + thread.start() + + for thread in self._receiver_threads: + thread.join() + + for thread in self._sender_threads: + thread.join() + + # Each receiver should receive all the messages sent by + # the sender(s). + # So, Iterate through each of the receiver items in + # self._messages and compare with the expected messages + # messages. + + self.assertEqual(len(self._expected), self.no_senders) + self.assertEqual(len(self._messages), self.no_receivers) + + for key, messages in self._messages.iteritems(): + self.assertEqual(self._expected, messages) + +TestQpidTopicAndFanout.generate_scenarios() + + +def synchronized(func): + func.__lock__ = threading.Lock() + + def synced_func(*args, **kws): + with func.__lock__: + return func(*args, **kws) + + return synced_func + + +class FakeQpidMsgManager(object): + def __init__(self): + self._exchanges = {} + + @synchronized + def add_exchange(self, exchange): + if exchange not in self._exchanges: + self._exchanges[exchange] = {'msgs': [], 'consumers': {}} + + @synchronized + def add_exchange_consumer(self, exchange, consumer_id): + exchange_info = self._exchanges[exchange] + cons_dict = exchange_info['consumers'] + cons_dict[consumer_id] = 0 + + @synchronized + def add_exchange_msg(self, exchange, msg): + exchange_info = self._exchanges[exchange] + exchange_info['msgs'].append(msg) + + def get_exchange_msg(self, exchange, index): + exchange_info = self._exchanges[exchange] + return exchange_info['msgs'][index] + + def get_no_exch_msgs(self, exchange): + exchange_info = self._exchanges[exchange] + return len(exchange_info['msgs']) + + def get_exch_cons_index(self, exchange, consumer_id): + exchange_info = self._exchanges[exchange] + cons_dict = exchange_info['consumers'] + return cons_dict[consumer_id] + + @synchronized + def inc_consumer_index(self, exchange, consumer_id): + exchange_info = self._exchanges[exchange] + cons_dict = exchange_info['consumers'] + cons_dict[consumer_id] += 1 + +_fake_qpid_msg_manager = FakeQpidMsgManager() + + +class FakeQpidSessionSender(object): + def __init__(self, session, id, target, options): + self.session = session + self.id = id + self.target = target + self.options = options + + @synchronized + def send(self, object, sync=True, timeout=None): + _fake_qpid_msg_manager.add_exchange_msg(self.target, object) + + def close(self, timeout=None): + pass + + +class FakeQpidSessionReceiver(object): + + def __init__(self, session, id, source, options): + self.session = session + self.id = id + self.source = source + self.options = options + + @synchronized + def fetch(self, timeout=None): + if timeout is None: + # if timeout is not given, take a default time out + # of 30 seconds to avoid indefinite loop + _timeout = 30 + else: + _timeout = timeout + + deadline = time.time() + _timeout + while time.time() <= deadline: + index = _fake_qpid_msg_manager.get_exch_cons_index(self.source, + self.id) + try: + msg = _fake_qpid_msg_manager.get_exchange_msg(self.source, + index) + except IndexError: + pass + else: + _fake_qpid_msg_manager.inc_consumer_index(self.source, + self.id) + return qpid.messaging.Message(msg) + time.sleep(0.050) + + if timeout is None: + raise Exception('timed out waiting for reply') + + def close(self, timeout=None): + pass + + @synchronized + def available(self): + no_msgs = _fake_qpid_msg_manager.get_no_exch_msgs(self.source) + index = _fake_qpid_msg_manager.get_exch_cons_index(self.source, + self.id) + if no_msgs == 0 or index >= no_msgs: + return 0 + else: + return no_msgs - index + + +class FakeQpidSession(object): + + def __init__(self, connection=None, name=None, transactional=None): + self.connection = connection + self.name = name + self.transactional = transactional + self._receivers = {} + self.conf = None + self.url = None + self._senders = {} + self._sender_id = 0 + self._receiver_id = 0 + + @synchronized + def sender(self, target, **options): + exchange_key = self._extract_exchange_key(target) + _fake_qpid_msg_manager.add_exchange(exchange_key) + + sendobj = FakeQpidSessionSender(self, self._sender_id, + exchange_key, options) + self._senders[self._sender_id] = sendobj + self._sender_id = self._sender_id + 1 + return sendobj + + @synchronized + def receiver(self, source, **options): + exchange_key = self._extract_exchange_key(source) + _fake_qpid_msg_manager.add_exchange(exchange_key) + recvobj = FakeQpidSessionReceiver(self, self._receiver_id, + exchange_key, options) + self._receivers[self._receiver_id] = recvobj + _fake_qpid_msg_manager.add_exchange_consumer(exchange_key, + self._receiver_id) + self._receiver_id += 1 + return recvobj + + def acknowledge(self, message=None, disposition=None, sync=True): + pass + + @synchronized + def flush_exchanges(self): + _fake_qpid_msg_manager._exchanges = {} + + def _extract_exchange_key(self, exchange_msg): + """This function extracts a unique key for the exchange. + This key is used in the dictionary as a 'key' for + this exchange. + Eg. if the exchange_msg (for qpid topology version 1) + is 33/33 ; {"node": {"x-declare": {"auto-delete": true, .... + then 33 is returned as the key. + Eg 2. For topology v2, if the + exchange_msg is - amq.direct/44 ; {"link": {"x-dec....... + then 44 is returned + """ + # first check for ';' + semicolon_split = exchange_msg.split(';') + + # split the first item of semicolon_split with '/' + slash_split = semicolon_split[0].split('/') + # return the last element of the list as the key + key = slash_split[-1] + return key.strip() + +_fake_session = FakeQpidSession() + + +def get_fake_qpid_session(): + return _fake_session