Remove oslo namespace package

Blueprint remove-namespace-packages

Depends-on: I2eeef93ee2e61a721c69f62add819f93f62f077d
for openstack/ceilometer
Depends-on: I26390dd908769be5f1a5b70be22d3b98e3a45563
for openstack/ceilometermiddleware
Depends-on: Ifa8baab33cdb3e606cf175a8c29c3a4ef6c44480
for openstack/glance
Depends-on: I029c3260051aa48cfaae428c096c1ac7b43b2bd2
for openstack/ceilometermiddleware

Change-Id: I8c5595bbafa82db33f62fa47d214f5cb756a2639
This commit is contained in:
Doug Hellmann 2015-04-28 12:18:34 +00:00
parent 8a9b5d44ae
commit 03265410e0
51 changed files with 0 additions and 7333 deletions

View File

@ -1,16 +0,0 @@
# Copyright 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__import__('pkg_resources').declare_namespace(__name__)

View File

@ -1,38 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import warnings
from .exceptions import *
from .localcontext import *
from .notify import *
from .rpc import *
from .serializer import *
from .server import *
from .target import *
from .transport import *
def deprecated():
new_name = __name__.replace('.', '_')
warnings.warn(
('The oslo namespace package is deprecated. Please use %s instead.' %
new_name),
DeprecationWarning,
stacklevel=3,
)
deprecated()

View File

@ -1,16 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# TODO(dhellmann): This private package and these imports can be
# removed after heat fixes their tests. See
# https://bugs.launchpad.net/oslo.messaging/+bug/1410196.
from oslo_messaging._drivers.common import * # noqa

View File

@ -1,17 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._executors.base import * # noqa
# FIXME(dhellmann): Provide a dummy value so the mock in nova's unit
# test fixture works. See bug #1412841
POLL_TIMEOUT = 0.1

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.conffixture import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.exceptions import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.localcontext import * # noqa

View File

@ -1,28 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__all__ = ['Notifier',
'LoggingNotificationHandler',
'get_notification_listener',
'NotificationResult',
'PublishErrorsHandler',
'LoggingErrorNotificationHandler']
from .notifier import *
from .listener import *
from .log_handler import *
from .logger import *
from .dispatcher import NotificationResult
from oslo_messaging.notify import _impl_test

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.notify.dispatcher import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.notify.listener import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.notify.log_handler import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.notify.logger import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.notify.middleware import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.notify.notifier import * # noqa

View File

@ -1,32 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
'ClientSendError',
'ExpectedException',
'NoSuchMethod',
'RPCClient',
'RPCDispatcher',
'RPCDispatcherError',
'RPCVersionCapError',
'RemoteError',
'UnsupportedVersion',
'expected_exceptions',
'get_rpc_server',
]
from .client import *
from .dispatcher import *
from .server import *

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.rpc.client import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.rpc.dispatcher import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.rpc.server import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.serializer import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.server import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.target import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.transport import * # noqa

View File

@ -18,11 +18,7 @@ classifier =
[files]
packages =
oslo
oslo.messaging
oslo_messaging
namespace_packages =
oslo
[entry_points]
console_scripts =

View File

@ -1,26 +0,0 @@
# Copyright 2014 eNovance
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Import oslotest before importing test submodules to setup six.moves for mock
import oslotest
try:
import eventlet
except ImportError:
pass
else:
# Ensure that eventlet monkey patching is enabled before loading the qpid
# module, otherwise qpid will hang
eventlet.monkey_patch()

View File

@ -1,850 +0,0 @@
# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import operator
import random
import threading
import time
try:
import qpid
except ImportError:
qpid = None
from six.moves import _thread
import testscenarios
import testtools
from oslo import messaging
from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import impl_qpid as qpid_driver
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
load_tests = testscenarios.load_tests_apply_scenarios
QPID_BROKER = 'localhost:5672'
class TestQpidDriverLoad(test_utils.BaseTestCase):
def setUp(self):
super(TestQpidDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'qpid'
def test_driver_load(self):
transport = messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, qpid_driver.QpidDriver)
def _is_qpidd_service_running():
"""this function checks if the qpid service is running or not."""
qpid_running = True
try:
broker = QPID_BROKER
connection = qpid.messaging.Connection(broker)
connection.open()
except Exception:
# qpid service is not running.
qpid_running = False
else:
connection.close()
return qpid_running
class _QpidBaseTestCase(test_utils.BaseTestCase):
@testtools.skipIf(qpid is None, "qpid not available")
def setUp(self):
super(_QpidBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'qpid'
self.fake_qpid = not _is_qpidd_service_running()
if self.fake_qpid:
self.session_receive = get_fake_qpid_session()
self.session_send = get_fake_qpid_session()
else:
self.broker = QPID_BROKER
# create connection from the qpid.messaging
# connection for the Consumer.
self.con_receive = qpid.messaging.Connection(self.broker)
self.con_receive.open()
# session to receive the messages
self.session_receive = self.con_receive.session()
# connection for sending the message
self.con_send = qpid.messaging.Connection(self.broker)
self.con_send.open()
# session to send the messages
self.session_send = self.con_send.session()
# list to store the expected messages and
# the actual received messages
self._expected = []
self._messages = []
self.initialized = True
def tearDown(self):
super(_QpidBaseTestCase, self).tearDown()
if self.initialized:
if self.fake_qpid:
_fake_session.flush_exchanges()
else:
self.con_receive.close()
self.con_send.close()
class TestQpidTransportURL(_QpidBaseTestCase):
scenarios = [
('none', dict(url=None,
expected=[dict(host='localhost:5672',
username='',
password='')])),
('empty',
dict(url='qpid:///',
expected=[dict(host='localhost:5672',
username='',
password='')])),
('localhost',
dict(url='qpid://localhost/',
expected=[dict(host='localhost',
username='',
password='')])),
('no_creds',
dict(url='qpid://host/',
expected=[dict(host='host',
username='',
password='')])),
('no_port',
dict(url='qpid://user:password@host/',
expected=[dict(host='host',
username='user',
password='password')])),
('full_url',
dict(url='qpid://user:password@host:10/',
expected=[dict(host='host:10',
username='user',
password='password')])),
('full_two_url',
dict(url='qpid://user:password@host:10,'
'user2:password2@host2:12/',
expected=[dict(host='host:10',
username='user',
password='password'),
dict(host='host2:12',
username='user2',
password='password2')
]
)),
]
@mock.patch.object(qpid_driver.Connection, 'reconnect')
def test_transport_url(self, *args):
transport = messaging.get_transport(self.conf, self.url)
self.addCleanup(transport.cleanup)
driver = transport._driver
brokers_params = driver._get_connection().brokers_params
self.assertEqual(sorted(self.expected,
key=operator.itemgetter('host')),
sorted(brokers_params,
key=operator.itemgetter('host')))
class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
"""Unit test cases to test invalid qpid topology version."""
scenarios = [
('direct', dict(consumer_cls=qpid_driver.DirectConsumer,
consumer_kwargs={},
publisher_cls=qpid_driver.DirectPublisher,
publisher_kwargs={})),
('topic', dict(consumer_cls=qpid_driver.TopicConsumer,
consumer_kwargs={'exchange_name': 'openstack'},
publisher_cls=qpid_driver.TopicPublisher,
publisher_kwargs={'exchange_name': 'openstack'})),
('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer,
consumer_kwargs={},
publisher_cls=qpid_driver.FanoutPublisher,
publisher_kwargs={})),
]
def setUp(self):
super(TestQpidInvalidTopologyVersion, self).setUp()
self.config(qpid_topology_version=-1,
group='oslo_messaging_qpid')
def test_invalid_topology_version(self):
def consumer_callback(msg):
pass
msgid_or_topic = 'test'
# not using self.assertRaises because
# 1. qpid driver raises Exception(msg) for invalid topology version
# 2. flake8 - H202 assertRaises Exception too broad
exception_msg = ("Invalid value for qpid_topology_version: %d" %
self.conf.oslo_messaging_qpid.qpid_topology_version)
recvd_exc_msg = ''
try:
self.consumer_cls(self.conf.oslo_messaging_qpid,
self.session_receive,
msgid_or_topic,
consumer_callback,
**self.consumer_kwargs)
except Exception as e:
recvd_exc_msg = e.message
self.assertEqual(exception_msg, recvd_exc_msg)
recvd_exc_msg = ''
try:
self.publisher_cls(self.conf.oslo_messaging_qpid,
self.session_send,
topic=msgid_or_topic,
**self.publisher_kwargs)
except Exception as e:
recvd_exc_msg = e.message
self.assertEqual(exception_msg, recvd_exc_msg)
class TestQpidDirectConsumerPublisher(_QpidBaseTestCase):
"""Unit test cases to test DirectConsumer and Direct Publisher."""
_n_qpid_topology = [
('v1', dict(qpid_topology=1)),
('v2', dict(qpid_topology=2)),
]
_n_msgs = [
('single', dict(no_msgs=1)),
('multiple', dict(no_msgs=10)),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology,
cls._n_msgs)
def consumer_callback(self, msg):
# This function will be called by the DirectConsumer
# when any message is received.
# Append the received message into the messages list
# so that the received messages can be validated
# with the expected messages
if isinstance(msg, dict):
self._messages.append(msg['content'])
else:
self._messages.append(msg)
def test_qpid_direct_consumer_producer(self):
self.msgid = str(random.randint(1, 100))
# create a DirectConsumer and DirectPublisher class objects
self.dir_cons = qpid_driver.DirectConsumer(
self.conf.oslo_messaging_qpid,
self.session_receive,
self.msgid,
self.consumer_callback)
self.dir_pub = qpid_driver.DirectPublisher(
self.conf.oslo_messaging_qpid,
self.session_send,
self.msgid)
def try_send_msg(no_msgs):
for i in range(no_msgs):
self._expected.append(str(i))
snd_msg = {'content_type': 'text/plain', 'content': str(i)}
self.dir_pub.send(snd_msg)
def try_receive_msg(no_msgs):
for i in range(no_msgs):
self.dir_cons.consume()
thread1 = threading.Thread(target=try_receive_msg,
args=(self.no_msgs,))
thread2 = threading.Thread(target=try_send_msg,
args=(self.no_msgs,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
self.assertEqual(self.no_msgs, len(self._messages))
self.assertEqual(self._expected, self._messages)
TestQpidDirectConsumerPublisher.generate_scenarios()
class TestQpidTopicAndFanout(_QpidBaseTestCase):
"""Unit Test cases to test TopicConsumer and
TopicPublisher classes of the qpid driver
and FanoutConsumer and FanoutPublisher classes
of the qpid driver
"""
_n_qpid_topology = [
('v1', dict(qpid_topology=1)),
('v2', dict(qpid_topology=2)),
]
_n_msgs = [
('single', dict(no_msgs=1)),
('multiple', dict(no_msgs=10)),
]
_n_senders = [
('single', dict(no_senders=1)),
('multiple', dict(no_senders=10)),
]
_n_receivers = [
('single', dict(no_receivers=1)),
]
_exchange_class = [
('topic', dict(consumer_cls=qpid_driver.TopicConsumer,
consumer_kwargs={'exchange_name': 'openstack'},
publisher_cls=qpid_driver.TopicPublisher,
publisher_kwargs={'exchange_name': 'openstack'},
topic='topictest.test',
receive_topic='topictest.test')),
('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer,
consumer_kwargs={},
publisher_cls=qpid_driver.FanoutPublisher,
publisher_kwargs={},
topic='fanouttest',
receive_topic='fanouttest')),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology,
cls._n_msgs,
cls._n_senders,
cls._n_receivers,
cls._exchange_class)
def setUp(self):
super(TestQpidTopicAndFanout, self).setUp()
# to store the expected messages and the
# actual received messages
#
# NOTE(dhellmann): These are dicts, where the base class uses
# lists.
self._expected = {}
self._messages = {}
self._senders = []
self._receivers = []
self._sender_threads = []
self._receiver_threads = []
def consumer_callback(self, msg):
"""callback function called by the ConsumerBase class of
qpid driver.
Message will be received in the format x-y
where x is the sender id and y is the msg number of the sender
extract the sender id 'x' and store the msg 'x-y' with 'x' as
the key
"""
if isinstance(msg, dict):
msgcontent = msg['content']
else:
msgcontent = msg
splitmsg = msgcontent.split('-')
key = _thread.get_ident()
if key not in self._messages:
self._messages[key] = dict()
tdict = self._messages[key]
if splitmsg[0] not in tdict:
tdict[splitmsg[0]] = []
tdict[splitmsg[0]].append(msgcontent)
def _try_send_msg(self, sender_id, no_msgs):
for i in range(no_msgs):
sendmsg = '%s-%s' % (str(sender_id), str(i))
key = str(sender_id)
# Store the message in the self._expected for each sender.
# This will be used later to
# validate the test by comparing it with the
# received messages by all the receivers
if key not in self._expected:
self._expected[key] = []
self._expected[key].append(sendmsg)
send_dict = {'content_type': 'text/plain', 'content': sendmsg}
self._senders[sender_id].send(send_dict)
def _try_receive_msg(self, receiver_id, no_msgs):
for i in range(self.no_senders * no_msgs):
no_of_attempts = 0
# ConsumerBase.consume blocks indefinitely until a message
# is received.
# So qpid_receiver.available() is called before calling
# ConsumerBase.consume() so that we are not
# blocked indefinitely
qpid_receiver = self._receivers[receiver_id].get_receiver()
while no_of_attempts < 50:
if qpid_receiver.available() > 0:
self._receivers[receiver_id].consume()
break
no_of_attempts += 1
time.sleep(0.05)
def test_qpid_topic_and_fanout(self):
for receiver_id in range(self.no_receivers):
consumer = self.consumer_cls(self.conf.oslo_messaging_qpid,
self.session_receive,
self.receive_topic,
self.consumer_callback,
**self.consumer_kwargs)
self._receivers.append(consumer)
# create receivers threads
thread = threading.Thread(target=self._try_receive_msg,
args=(receiver_id, self.no_msgs,))
self._receiver_threads.append(thread)
for sender_id in range(self.no_senders):
publisher = self.publisher_cls(self.conf.oslo_messaging_qpid,
self.session_send,
topic=self.topic,
**self.publisher_kwargs)
self._senders.append(publisher)
# create sender threads
thread = threading.Thread(target=self._try_send_msg,
args=(sender_id, self.no_msgs,))
self._sender_threads.append(thread)
for thread in self._receiver_threads:
thread.start()
for thread in self._sender_threads:
thread.start()
for thread in self._receiver_threads:
thread.join()
for thread in self._sender_threads:
thread.join()
# Each receiver should receive all the messages sent by
# the sender(s).
# So, Iterate through each of the receiver items in
# self._messages and compare with the expected messages
# messages.
self.assertEqual(self.no_senders, len(self._expected))
self.assertEqual(self.no_receivers, len(self._messages))
for key, messages in self._messages.iteritems():
self.assertEqual(self._expected, messages)
TestQpidTopicAndFanout.generate_scenarios()
class AddressNodeMatcher(object):
def __init__(self, node):
self.node = node
def __eq__(self, address):
return address.split(';')[0].strip() == self.node
class TestDriverInterface(_QpidBaseTestCase):
"""Unit Test cases to test the amqpdriver with qpid
"""
def setUp(self):
super(TestDriverInterface, self).setUp()
self.config(qpid_topology_version=2,
group='oslo_messaging_qpid')
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
original_get_connection = self.driver._get_connection
p = mock.patch.object(self.driver, '_get_connection',
side_effect=lambda pooled=True:
original_get_connection(False))
p.start()
self.addCleanup(p.stop)
def test_listen_and_direct_send(self):
target = messaging.Target(exchange="exchange_test",
topic="topic_test",
server="server_test")
with mock.patch('qpid.messaging.Connection') as conn_cls:
conn = conn_cls.return_value
session = conn.session.return_value
session.receiver.side_effect = [mock.Mock(), mock.Mock(),
mock.Mock()]
listener = self.driver.listen(target)
listener.conn.direct_send("msg_id", {})
self.assertEqual(3, len(listener.conn.consumers))
expected_calls = [
mock.call(AddressNodeMatcher(
'amq.topic/topic/exchange_test/topic_test')),
mock.call(AddressNodeMatcher(
'amq.topic/topic/exchange_test/topic_test.server_test')),
mock.call(AddressNodeMatcher('amq.topic/fanout/topic_test')),
]
session.receiver.assert_has_calls(expected_calls)
session.sender.assert_called_with(
AddressNodeMatcher("amq.direct/msg_id"))
def test_send(self):
target = messaging.Target(exchange="exchange_test",
topic="topic_test",
server="server_test")
with mock.patch('qpid.messaging.Connection') as conn_cls:
conn = conn_cls.return_value
session = conn.session.return_value
self.driver.send(target, {}, {})
session.sender.assert_called_with(AddressNodeMatcher(
"amq.topic/topic/exchange_test/topic_test.server_test"))
def test_send_notification(self):
target = messaging.Target(exchange="exchange_test",
topic="topic_test.info")
with mock.patch('qpid.messaging.Connection') as conn_cls:
conn = conn_cls.return_value
session = conn.session.return_value
self.driver.send_notification(target, {}, {}, "2.0")
session.sender.assert_called_with(AddressNodeMatcher(
"amq.topic/topic/exchange_test/topic_test.info"))
class TestQpidReconnectOrder(test_utils.BaseTestCase):
"""Unit Test cases to test reconnection
"""
@testtools.skipIf(qpid is None, "qpid not available")
def test_reconnect_order(self):
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
brokers_count = len(brokers)
self.config(qpid_hosts=brokers,
group='oslo_messaging_qpid')
with mock.patch('qpid.messaging.Connection') as conn_mock:
# starting from the first broker in the list
url = messaging.TransportURL.parse(self.conf, None)
connection = qpid_driver.Connection(self.conf, url,
amqp.PURPOSE_SEND)
# reconnect will advance to the next broker, one broker per
# attempt, and then wrap to the start of the list once the end is
# reached
for _ in range(brokers_count):
connection.reconnect()
expected = []
for broker in brokers:
expected.extend([mock.call("%s:5672" % broker),
mock.call().open(),
mock.call().session(),
mock.call().opened(),
mock.call().opened().__nonzero__(),
mock.call().close()])
conn_mock.assert_has_calls(expected, any_order=True)
def synchronized(func):
func.__lock__ = threading.Lock()
def synced_func(*args, **kws):
with func.__lock__:
return func(*args, **kws)
return synced_func
class FakeQpidMsgManager(object):
def __init__(self):
self._exchanges = {}
@synchronized
def add_exchange(self, exchange):
if exchange not in self._exchanges:
self._exchanges[exchange] = {'msgs': [], 'consumers': {}}
@synchronized
def add_exchange_consumer(self, exchange, consumer_id):
exchange_info = self._exchanges[exchange]
cons_dict = exchange_info['consumers']
cons_dict[consumer_id] = 0
@synchronized
def add_exchange_msg(self, exchange, msg):
exchange_info = self._exchanges[exchange]
exchange_info['msgs'].append(msg)
def get_exchange_msg(self, exchange, index):
exchange_info = self._exchanges[exchange]
return exchange_info['msgs'][index]
def get_no_exch_msgs(self, exchange):
exchange_info = self._exchanges[exchange]
return len(exchange_info['msgs'])
def get_exch_cons_index(self, exchange, consumer_id):
exchange_info = self._exchanges[exchange]
cons_dict = exchange_info['consumers']
return cons_dict[consumer_id]
@synchronized
def inc_consumer_index(self, exchange, consumer_id):
exchange_info = self._exchanges[exchange]
cons_dict = exchange_info['consumers']
cons_dict[consumer_id] += 1
_fake_qpid_msg_manager = FakeQpidMsgManager()
class FakeQpidSessionSender(object):
def __init__(self, session, id, target, options):
self.session = session
self.id = id
self.target = target
self.options = options
@synchronized
def send(self, object, sync=True, timeout=None):
_fake_qpid_msg_manager.add_exchange_msg(self.target, object)
def close(self, timeout=None):
pass
class FakeQpidSessionReceiver(object):
def __init__(self, session, id, source, options):
self.session = session
self.id = id
self.source = source
self.options = options
@synchronized
def fetch(self, timeout=None):
if timeout is None:
# if timeout is not given, take a default time out
# of 30 seconds to avoid indefinite loop
_timeout = 30
else:
_timeout = timeout
deadline = time.time() + _timeout
while time.time() <= deadline:
index = _fake_qpid_msg_manager.get_exch_cons_index(self.source,
self.id)
try:
msg = _fake_qpid_msg_manager.get_exchange_msg(self.source,
index)
except IndexError:
pass
else:
_fake_qpid_msg_manager.inc_consumer_index(self.source,
self.id)
return qpid.messaging.Message(msg)
time.sleep(0.050)
if timeout is None:
raise Exception('timed out waiting for reply')
def close(self, timeout=None):
pass
@synchronized
def available(self):
no_msgs = _fake_qpid_msg_manager.get_no_exch_msgs(self.source)
index = _fake_qpid_msg_manager.get_exch_cons_index(self.source,
self.id)
if no_msgs == 0 or index >= no_msgs:
return 0
else:
return no_msgs - index
class FakeQpidSession(object):
def __init__(self, connection=None, name=None, transactional=None):
self.connection = connection
self.name = name
self.transactional = transactional
self._receivers = {}
self.conf = None
self.url = None
self._senders = {}
self._sender_id = 0
self._receiver_id = 0
@synchronized
def sender(self, target, **options):
exchange_key = self._extract_exchange_key(target)
_fake_qpid_msg_manager.add_exchange(exchange_key)
sendobj = FakeQpidSessionSender(self, self._sender_id,
exchange_key, options)
self._senders[self._sender_id] = sendobj
self._sender_id = self._sender_id + 1
return sendobj
@synchronized
def receiver(self, source, **options):
exchange_key = self._extract_exchange_key(source)
_fake_qpid_msg_manager.add_exchange(exchange_key)
recvobj = FakeQpidSessionReceiver(self, self._receiver_id,
exchange_key, options)
self._receivers[self._receiver_id] = recvobj
_fake_qpid_msg_manager.add_exchange_consumer(exchange_key,
self._receiver_id)
self._receiver_id += 1
return recvobj
def acknowledge(self, message=None, disposition=None, sync=True):
pass
@synchronized
def flush_exchanges(self):
_fake_qpid_msg_manager._exchanges = {}
def _extract_exchange_key(self, exchange_msg):
"""This function extracts a unique key for the exchange.
This key is used in the dictionary as a 'key' for
this exchange.
Eg. if the exchange_msg (for qpid topology version 1)
is 33/33 ; {"node": {"x-declare": {"auto-delete": true, ....
then 33 is returned as the key.
Eg 2. For topology v2, if the
exchange_msg is - amq.direct/44 ; {"link": {"x-dec.......
then 44 is returned
"""
# first check for ';'
semicolon_split = exchange_msg.split(';')
# split the first item of semicolon_split with '/'
slash_split = semicolon_split[0].split('/')
# return the last element of the list as the key
key = slash_split[-1]
return key.strip()
def close(self):
pass
_fake_session = FakeQpidSession()
def get_fake_qpid_session():
return _fake_session
class QPidHATestCase(test_utils.BaseTestCase):
@testtools.skipIf(qpid is None, "qpid not available")
def setUp(self):
super(QPidHATestCase, self).setUp()
self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
self.config(qpid_hosts=self.brokers,
qpid_username=None,
qpid_password=None,
group='oslo_messaging_qpid')
hostname_sets = set()
self.info = {'attempt': 0,
'fail': False}
def _connect(myself, broker):
# do as little work that is enough to pass connection attempt
myself.connection = mock.Mock()
hostname = broker['host']
self.assertNotIn(hostname, hostname_sets)
hostname_sets.add(hostname)
self.info['attempt'] += 1
if self.info['fail']:
raise qpid.messaging.exceptions.ConnectionError
# just make sure connection instantiation does not fail with an
# exception
self.stubs.Set(qpid_driver.Connection, '_connect', _connect)
# starting from the first broker in the list
url = messaging.TransportURL.parse(self.conf, None)
self.connection = qpid_driver.Connection(self.conf, url,
amqp.PURPOSE_SEND)
self.addCleanup(self.connection.close)
self.info.update({'attempt': 0,
'fail': True})
hostname_sets.clear()
def test_reconnect_order(self):
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.reconnect,
retry=len(self.brokers) - 1)
self.assertEqual(len(self.brokers), self.info['attempt'])
def test_ensure_four_retries(self):
mock_callback = mock.Mock(
side_effect=qpid.messaging.exceptions.ConnectionError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=4)
self.assertEqual(5, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
def test_ensure_one_retry(self):
mock_callback = mock.Mock(
side_effect=qpid.messaging.exceptions.ConnectionError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=1)
self.assertEqual(2, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
def test_ensure_no_retry(self):
mock_callback = mock.Mock(
side_effect=qpid.messaging.exceptions.ConnectionError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=0)
self.assertEqual(1, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)

View File

@ -1,754 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import sys
import threading
import time
import uuid
import fixtures
import kombu
from oslotest import mockpatch
import testscenarios
from oslo import messaging
from oslo_config import cfg
from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers import impl_rabbit as rabbit_driver
from oslo_messaging.tests import utils as test_utils
from oslo_serialization import jsonutils
from six.moves import mock
load_tests = testscenarios.load_tests_apply_scenarios
class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase):
def setUp(self):
super(TestDeprecatedRabbitDriverLoad, self).setUp(
conf=cfg.ConfigOpts())
self.messaging_conf.transport_driver = 'rabbit'
self.config(fake_rabbit=True, group="oslo_messaging_rabbit")
def test_driver_load(self):
self.config(heartbeat_timeout_threshold=0,
group='oslo_messaging_rabbit')
transport = messaging.get_transport(self.conf)
self.addCleanup(transport.cleanup)
driver = transport._driver
url = driver._get_connection()._url
self.assertIsInstance(driver, rabbit_driver.RabbitDriver)
self.assertEqual('memory:////', url)
class TestRabbitDriverLoad(test_utils.BaseTestCase):
scenarios = [
('rabbit', dict(transport_driver='rabbit',
url='amqp://guest:guest@localhost:5672//')),
('kombu', dict(transport_driver='kombu',
url='amqp://guest:guest@localhost:5672//')),
('rabbit+memory', dict(transport_driver='kombu+memory',
url='memory:///'))
]
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure')
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset')
def test_driver_load(self, fake_ensure, fake_reset):
self.config(heartbeat_timeout_threshold=0,
group='oslo_messaging_rabbit')
self.messaging_conf.transport_driver = self.transport_driver
transport = messaging.get_transport(self.conf)
self.addCleanup(transport.cleanup)
driver = transport._driver
url = driver._get_connection()._url
self.assertIsInstance(driver, rabbit_driver.RabbitDriver)
self.assertEqual(self.url, url)
class TestRabbitConsume(test_utils.BaseTestCase):
def test_consume_timeout(self):
transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup)
deadline = time.time() + 6
with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
self.assertRaises(driver_common.Timeout,
conn.consume, timeout=3)
# kombu memory transport doesn't really raise error
# so just simulate a real driver behavior
conn.connection.connection.recoverable_channel_errors = (IOError,)
conn.declare_fanout_consumer("notif.info", lambda msg: True)
with mock.patch('kombu.connection.Connection.drain_events',
side_effect=IOError):
self.assertRaises(driver_common.Timeout,
conn.consume, timeout=3)
self.assertEqual(0, int(deadline - time.time()))
class TestRabbitTransportURL(test_utils.BaseTestCase):
scenarios = [
('none', dict(url=None,
expected=["amqp://guest:guest@localhost:5672//"])),
('memory', dict(url='kombu+memory:////',
expected=["memory:///"])),
('empty',
dict(url='rabbit:///',
expected=['amqp://guest:guest@localhost:5672/'])),
('localhost',
dict(url='rabbit://localhost/',
expected=['amqp://:@localhost:5672/'])),
('virtual_host',
dict(url='rabbit:///vhost',
expected=['amqp://guest:guest@localhost:5672/vhost'])),
('no_creds',
dict(url='rabbit://host/virtual_host',
expected=['amqp://:@host:5672/virtual_host'])),
('no_port',
dict(url='rabbit://user:password@host/virtual_host',
expected=['amqp://user:password@host:5672/virtual_host'])),
('full_url',
dict(url='rabbit://user:password@host:10/virtual_host',
expected=['amqp://user:password@host:10/virtual_host'])),
('full_two_url',
dict(url='rabbit://user:password@host:10,'
'user2:password2@host2:12/virtual_host',
expected=["amqp://user:password@host:10/virtual_host",
"amqp://user2:password2@host2:12/virtual_host"]
)),
('qpid',
dict(url='kombu+qpid://user:password@host:10/virtual_host',
expected=['qpid://user:password@host:10/virtual_host'])),
('rabbit',
dict(url='kombu+rabbit://user:password@host:10/virtual_host',
expected=['amqp://user:password@host:10/virtual_host'])),
('rabbit_ipv6',
dict(url='kombu+rabbit://u:p@[fd00:beef:dead:55::133]:10/vhost',
expected=['amqp://u:p@[fd00:beef:dead:55::133]:10/vhost'])),
('rabbit_ipv4',
dict(url='kombu+rabbit://user:password@10.20.30.40:10/vhost',
expected=['amqp://user:password@10.20.30.40:10/vhost'])),
]
def setUp(self):
super(TestRabbitTransportURL, self).setUp()
self.config(heartbeat_timeout_threshold=0,
group='oslo_messaging_rabbit')
self.messaging_conf.transport_driver = 'rabbit'
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure')
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset')
def test_transport_url(self, fake_ensure_connection, fake_reset):
transport = messaging.get_transport(self.conf, self.url)
self.addCleanup(transport.cleanup)
driver = transport._driver
# NOTE(sileht): some kombu transport can depend on library that
# we don't want to depend yet, because selecting the transport
# is experimental, only amqp is supported
# for example kombu+qpid depends of qpid-tools
# so, mock the connection.info to skip call to qpid-tools
with mock.patch('kombu.connection.Connection.info'):
urls = driver._get_connection()._url.split(";")
self.assertEqual(sorted(self.expected), sorted(urls))
class TestSendReceive(test_utils.BaseTestCase):
_n_senders = [
('single_sender', dict(n_senders=1)),
('multiple_senders', dict(n_senders=10)),
]
_context = [
('empty_context', dict(ctxt={})),
('with_context', dict(ctxt={'user': 'mark'})),
]
_reply = [
('rx_id', dict(rx_id=True, reply=None)),
('none', dict(rx_id=False, reply=None)),
('empty_list', dict(rx_id=False, reply=[])),
('empty_dict', dict(rx_id=False, reply={})),
('false', dict(rx_id=False, reply=False)),
('zero', dict(rx_id=False, reply=0)),
]
_failure = [
('success', dict(failure=False)),
('failure', dict(failure=True, expected=False)),
('expected_failure', dict(failure=True, expected=True)),
]
_timeout = [
('no_timeout', dict(timeout=None)),
('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken?
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
cls._context,
cls._reply,
cls._failure,
cls._timeout)
def test_send_receive(self):
self.config(heartbeat_timeout_threshold=0,
group='oslo_messaging_rabbit')
transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup)
driver = transport._driver
target = messaging.Target(topic='testtopic')
listener = driver.listen(target)
senders = []
replies = []
msgs = []
errors = []
def stub_error(msg, *a, **kw):
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
a = a[0]
errors.append(str(msg) % a)
self.stubs.Set(driver_common.LOG, 'error', stub_error)
def send_and_wait_for_reply(i):
try:
replies.append(driver.send(target,
self.ctxt,
{'tx_id': i},
wait_for_reply=True,
timeout=self.timeout))
self.assertFalse(self.failure)
self.assertIsNone(self.timeout)
except (ZeroDivisionError, messaging.MessagingTimeout) as e:
replies.append(e)
self.assertTrue(self.failure or self.timeout is not None)
while len(senders) < self.n_senders:
senders.append(threading.Thread(target=send_and_wait_for_reply,
args=(len(senders), )))
for i in range(len(senders)):
senders[i].start()
received = listener.poll()
self.assertIsNotNone(received)
self.assertEqual(self.ctxt, received.ctxt)
self.assertEqual({'tx_id': i}, received.message)
msgs.append(received)
# reply in reverse, except reply to the first guy second from last
order = list(range(len(senders) - 1, -1, -1))
if len(order) > 1:
order[-1], order[-2] = order[-2], order[-1]
for i in order:
if self.timeout is None:
if self.failure:
try:
raise ZeroDivisionError
except Exception:
failure = sys.exc_info()
# NOTE(noelbk) confirm that Publisher exchanges
# are always declared with passive=True
outer_self = self
test_exchange_was_called = [False]
old_init = kombu.entity.Exchange.__init__
def new_init(self, *args, **kwargs):
test_exchange_was_called[0] = True
outer_self.assertTrue(kwargs['passive'])
old_init(self, *args, **kwargs)
kombu.entity.Exchange.__init__ = new_init
try:
msgs[i].reply(failure=failure,
log_failure=not self.expected)
finally:
kombu.entity.Exchange.__init__ = old_init
self.assertTrue(test_exchange_was_called[0])
elif self.rx_id:
msgs[i].reply({'rx_id': i})
else:
msgs[i].reply(self.reply)
senders[i].join()
self.assertEqual(len(senders), len(replies))
for i, reply in enumerate(replies):
if self.timeout is not None:
self.assertIsInstance(reply, messaging.MessagingTimeout)
elif self.failure:
self.assertIsInstance(reply, ZeroDivisionError)
elif self.rx_id:
self.assertEqual({'rx_id': order[i]}, reply)
else:
self.assertEqual(self.reply, reply)
if not self.timeout and self.failure and not self.expected:
self.assertTrue(len(errors) > 0, errors)
else:
self.assertEqual(0, len(errors), errors)
TestSendReceive.generate_scenarios()
class TestPollAsync(test_utils.BaseTestCase):
def test_poll_timeout(self):
transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup)
driver = transport._driver
target = messaging.Target(topic='testtopic')
listener = driver.listen(target)
received = listener.poll(timeout=0.050)
self.assertIsNone(received)
class TestRacyWaitForReply(test_utils.BaseTestCase):
def test_send_receive(self):
transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup)
driver = transport._driver
target = messaging.Target(topic='testtopic')
listener = driver.listen(target)
senders = []
replies = []
msgs = []
wait_conditions = []
orig_reply_waiter = amqpdriver.ReplyWaiter.wait
def reply_waiter(self, msg_id, timeout):
if wait_conditions:
cond = wait_conditions.pop()
with cond:
cond.notify()
with cond:
cond.wait()
return orig_reply_waiter(self, msg_id, timeout)
self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter)
def send_and_wait_for_reply(i, wait_for_reply):
replies.append(driver.send(target,
{},
{'tx_id': i},
wait_for_reply=wait_for_reply,
timeout=None))
while len(senders) < 2:
t = threading.Thread(target=send_and_wait_for_reply,
args=(len(senders), True))
t.daemon = True
senders.append(t)
# test the case then msg_id is not set
t = threading.Thread(target=send_and_wait_for_reply,
args=(len(senders), False))
t.daemon = True
senders.append(t)
# Start the first guy, receive his message, but delay his polling
notify_condition = threading.Condition()
wait_conditions.append(notify_condition)
with notify_condition:
senders[0].start()
notify_condition.wait()
msgs.append(listener.poll())
self.assertEqual({'tx_id': 0}, msgs[-1].message)
# Start the second guy, receive his message
senders[1].start()
msgs.append(listener.poll())
self.assertEqual({'tx_id': 1}, msgs[-1].message)
# Reply to both in order, making the second thread queue
# the reply meant for the first thread
msgs[0].reply({'rx_id': 0})
msgs[1].reply({'rx_id': 1})
# Wait for the second thread to finish
senders[1].join()
# Start the 3rd guy, receive his message
senders[2].start()
msgs.append(listener.poll())
self.assertEqual({'tx_id': 2}, msgs[-1].message)
# Verify the _send_reply was not invoked by driver:
with mock.patch.object(msgs[2], '_send_reply') as method:
msgs[2].reply({'rx_id': 2})
self.assertEqual(method.call_count, 0)
# Wait for the 3rd thread to finish
senders[2].join()
# Let the first thread continue
with notify_condition:
notify_condition.notify()
# Wait for the first thread to finish
senders[0].join()
# Verify replies were received out of order
self.assertEqual(len(senders), len(replies))
self.assertEqual({'rx_id': 1}, replies[0])
self.assertIsNone(replies[1])
self.assertEqual({'rx_id': 0}, replies[2])
def _declare_queue(target):
connection = kombu.connection.BrokerConnection(transport='memory')
# Kludge to speed up tests.
connection.transport.polling_interval = 0.0
connection.connect()
channel = connection.channel()
# work around 'memory' transport bug in 1.1.3
channel._new_queue('ae.undeliver')
if target.fanout:
exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
type='fanout',
durable=False,
auto_delete=True)
queue = kombu.entity.Queue(name=target.topic + '_fanout_12345',
channel=channel,
exchange=exchange,
routing_key=target.topic)
if target.server:
exchange = kombu.entity.Exchange(name='openstack',
type='topic',
durable=False,
auto_delete=False)
topic = '%s.%s' % (target.topic, target.server)
queue = kombu.entity.Queue(name=topic,
channel=channel,
exchange=exchange,
routing_key=topic)
else:
exchange = kombu.entity.Exchange(name='openstack',
type='topic',
durable=False,
auto_delete=False)
queue = kombu.entity.Queue(name=target.topic,
channel=channel,
exchange=exchange,
routing_key=target.topic)
queue.declare()
return connection, channel, queue
class TestRequestWireFormat(test_utils.BaseTestCase):
_target = [
('topic_target',
dict(topic='testtopic', server=None, fanout=False)),
('server_target',
dict(topic='testtopic', server='testserver', fanout=False)),
# NOTE(markmc): https://github.com/celery/kombu/issues/195
('fanout_target',
dict(topic='testtopic', server=None, fanout=True,
skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
]
_msg = [
('empty_msg',
dict(msg={}, expected={})),
('primitive_msg',
dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
('complex_msg',
dict(msg={'a': {'b': datetime.datetime(1920, 2, 3, 4, 5, 6, 7)}},
expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
]
_context = [
('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
('user_project_ctxt',
dict(ctxt={'user': 'mark', 'project': 'snarkybunch'},
expected_ctxt={'_context_user': 'mark',
'_context_project': 'snarkybunch'})),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
cls._context,
cls._target)
def setUp(self):
super(TestRequestWireFormat, self).setUp()
self.uuids = []
self.orig_uuid4 = uuid.uuid4
self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
def mock_uuid4(self):
self.uuids.append(self.orig_uuid4())
return self.uuids[-1]
def test_request_wire_format(self):
if hasattr(self, 'skip_msg'):
self.skipTest(self.skip_msg)
transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup)
driver = transport._driver
target = messaging.Target(topic=self.topic,
server=self.server,
fanout=self.fanout)
connection, channel, queue = _declare_queue(target)
self.addCleanup(connection.release)
driver.send(target, self.ctxt, self.msg)
msgs = []
def callback(msg):
msg = channel.message_to_python(msg)
msg.ack()
msgs.append(msg.payload)
queue.consume(callback=callback,
consumer_tag='1',
nowait=False)
connection.drain_events()
self.assertEqual(1, len(msgs))
self.assertIn('oslo.message', msgs[0])
received = msgs[0]
received['oslo.message'] = jsonutils.loads(received['oslo.message'])
# FIXME(markmc): add _msg_id and _reply_q check
expected_msg = {
'_unique_id': self.uuids[0].hex,
}
expected_msg.update(self.expected)
expected_msg.update(self.expected_ctxt)
expected = {
'oslo.version': '2.0',
'oslo.message': expected_msg,
}
self.assertEqual(expected, received)
TestRequestWireFormat.generate_scenarios()
def _create_producer(target):
connection = kombu.connection.BrokerConnection(transport='memory')
# Kludge to speed up tests.
connection.transport.polling_interval = 0.0
connection.connect()
channel = connection.channel()
# work around 'memory' transport bug in 1.1.3
channel._new_queue('ae.undeliver')
if target.fanout:
exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
type='fanout',
durable=False,
auto_delete=True)
producer = kombu.messaging.Producer(exchange=exchange,
channel=channel,
routing_key=target.topic)
elif target.server:
exchange = kombu.entity.Exchange(name='openstack',
type='topic',
durable=False,
auto_delete=False)
topic = '%s.%s' % (target.topic, target.server)
producer = kombu.messaging.Producer(exchange=exchange,
channel=channel,
routing_key=topic)
else:
exchange = kombu.entity.Exchange(name='openstack',
type='topic',
durable=False,
auto_delete=False)
producer = kombu.messaging.Producer(exchange=exchange,
channel=channel,
routing_key=target.topic)
return connection, producer
class TestReplyWireFormat(test_utils.BaseTestCase):
_target = [
('topic_target',
dict(topic='testtopic', server=None, fanout=False)),
('server_target',
dict(topic='testtopic', server='testserver', fanout=False)),
# NOTE(markmc): https://github.com/celery/kombu/issues/195
('fanout_target',
dict(topic='testtopic', server=None, fanout=True,
skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
]
_msg = [
('empty_msg',
dict(msg={}, expected={})),
('primitive_msg',
dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
('complex_msg',
dict(msg={'a': {'b': '1920-02-03T04:05:06.000007'}},
expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
]
_context = [
('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
('user_project_ctxt',
dict(ctxt={'_context_user': 'mark',
'_context_project': 'snarkybunch'},
expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
cls._context,
cls._target)
def test_reply_wire_format(self):
if hasattr(self, 'skip_msg'):
self.skipTest(self.skip_msg)
transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup)
driver = transport._driver
target = messaging.Target(topic=self.topic,
server=self.server,
fanout=self.fanout)
listener = driver.listen(target)
connection, producer = _create_producer(target)
self.addCleanup(connection.release)
msg = {
'oslo.version': '2.0',
'oslo.message': {}
}
msg['oslo.message'].update(self.msg)
msg['oslo.message'].update(self.ctxt)
msg['oslo.message'].update({
'_msg_id': uuid.uuid4().hex,
'_unique_id': uuid.uuid4().hex,
'_reply_q': 'reply_' + uuid.uuid4().hex,
})
msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])
producer.publish(msg)
received = listener.poll()
self.assertIsNotNone(received)
self.assertEqual(self.expected_ctxt, received.ctxt)
self.assertEqual(self.expected, received.message)
TestReplyWireFormat.generate_scenarios()
class RpcKombuHATestCase(test_utils.BaseTestCase):
def setUp(self):
super(RpcKombuHATestCase, self).setUp()
self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
self.config(rabbit_hosts=self.brokers,
rabbit_retry_interval=0.01,
rabbit_retry_backoff=0.01,
kombu_reconnect_delay=0,
group="oslo_messaging_rabbit")
self.kombu_connect = mock.Mock()
self.useFixture(mockpatch.Patch(
'kombu.connection.Connection.connect',
side_effect=self.kombu_connect))
self.useFixture(mockpatch.Patch(
'kombu.connection.Connection.channel'))
# starting from the first broker in the list
url = messaging.TransportURL.parse(self.conf, None)
self.connection = rabbit_driver.Connection(self.conf, url,
amqp.PURPOSE_SEND)
self.addCleanup(self.connection.close)
def test_ensure_four_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, mock_callback,
retry=4)
self.assertEqual(5, self.kombu_connect.call_count)
self.assertEqual(6, mock_callback.call_count)
def test_ensure_one_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, mock_callback,
retry=1)
self.assertEqual(2, self.kombu_connect.call_count)
self.assertEqual(3, mock_callback.call_count)
def test_ensure_no_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, mock_callback,
retry=0)
self.assertEqual(1, self.kombu_connect.call_count)
self.assertEqual(2, mock_callback.call_count)

View File

@ -1,440 +0,0 @@
# Copyright 2014 Canonical, Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import socket
import fixtures
import testtools
from six.moves import mock
try:
import zmq
except ImportError:
zmq = None
from oslo import messaging
from oslo_messaging.tests import utils as test_utils
from oslo_utils import importutils
# eventlet is not yet py3 compatible, so skip if not installed
eventlet = importutils.try_import('eventlet')
impl_zmq = importutils.try_import('oslo_messaging._drivers.impl_zmq')
LOG = logging.getLogger(__name__)
def get_unused_port():
"""Returns an unused port on localhost."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('localhost', 0))
port = s.getsockname()[1]
s.close()
return port
class ZmqBaseTestCase(test_utils.BaseTestCase):
"""Base test case for all ZMQ tests that make use of the ZMQ Proxy"""
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestConfZmqDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self):
transport = messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
class stopRpc(object):
def __init__(self, attrs):
self.attrs = attrs
def __call__(self):
if self.attrs['reactor']:
self.attrs['reactor'].close()
if self.attrs['driver']:
self.attrs['driver'].cleanup()
class TestZmqBasics(ZmqBaseTestCase):
def test_start_stop_listener(self):
target = messaging.Target(topic='testtopic')
listener = self.driver.listen(target)
result = listener.poll(0.01)
self.assertEqual(result, None)
def test_send_receive_raises(self):
"""Call() without method."""
target = messaging.Target(topic='testtopic')
self.driver.listen(target)
self.assertRaises(
KeyError,
self.driver.send,
target, {}, {'tx_id': 1}, wait_for_reply=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqIncomingMessage')
def test_send_receive_topic(self, mock_msg):
"""Call() with method."""
mock_msg.return_value = msg = mock.MagicMock()
msg.received = received = mock.MagicMock()
received.failure = False
received.reply = True
msg.condition = condition = mock.MagicMock()
condition.wait.return_value = True
target = messaging.Target(topic='testtopic')
self.driver.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
def test_send_receive_fanout(self, mock_call):
target = messaging.Target(topic='testtopic', fanout=True)
self.driver.listen(target)
mock_call.__name__ = '_call'
mock_call.return_value = [True]
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
mock_call.assert_called_once_with(
self.driver,
'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
{}, 'fanout~testtopic.127.0.0.1',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [], True)
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
def test_send_receive_direct(self, mock_call):
# Also verifies fix for bug http://pad.lv/1301723
target = messaging.Target(topic='testtopic', server='localhost')
self.driver.listen(target)
mock_call.__name__ = '_call'
mock_call.return_value = [True]
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
mock_call.assert_called_once_with(
self.driver,
'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
{}, 'testtopic.localhost',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [], True)
class TestZmqSocket(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestZmqSocket, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False,
subscribe=None)
self.assertTrue(sock.can_recv)
self.assertFalse(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False,
subscribe=None)
self.assertTrue(sock.can_recv)
self.assertFalse(sock.can_send)
self.assertTrue(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False,
subscribe=None)
self.assertFalse(sock.can_recv)
self.assertTrue(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False,
subscribe=None)
self.assertFalse(sock.can_recv)
self.assertTrue(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
class TestZmqIncomingMessage(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestZmqIncomingMessage, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
def test_zmqincomingmessage(self):
msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo')
msg.reply("abc")
self.assertIsInstance(
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
self.assertIsInstance(
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
self.assertEqual(msg.received.reply, "abc")
msg.requeue()
class TestZmqConnection(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver.conf, self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
(self.internal_ipc_dir))
# No Fanout
conn.create_consumer(topic, context)
conn.reactor.register.assert_called_with(context, inaddr,
impl_zmq.zmq.PULL,
subscribe=None, in_bind=False)
# Reset for next bunch of checks
conn.reactor.register.reset_mock()
# Fanout
inaddr = ('ipc://%s/zmq_topic_fanout~topic' %
(self.internal_ipc_dir))
conn.create_consumer(topic, context, fanout='subscriber.foo')
conn.reactor.register.assert_called_with(context, inaddr,
impl_zmq.zmq.SUB,
subscribe='subscriber.foo',
in_bind=False)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver.conf, self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
(self.internal_ipc_dir))
conn.create_consumer(topic, context)
conn.reactor.register.assert_called_with(
context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False)
conn.reactor.register.reset_mock()
# Call again with same topic
conn.create_consumer(topic, context)
self.assertFalse(conn.reactor.register.called)
@mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
conn = impl_zmq.Connection(self.driver.conf, self.driver)
conn.reactor.close = mock.Mock()
mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
conn.close()
self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called)
self.assertTrue(conn.reactor.close.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_wait(self, mock_reactor):
conn = impl_zmq.Connection(self.driver.conf, self.driver)
conn.reactor.wait = mock.Mock()
conn.wait()
self.assertTrue(conn.reactor.wait.called)
@mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_consume_in_thread(self, mock_reactor,
mock_getmatchmaker):
mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
conn = impl_zmq.Connection(self.driver.conf, self.driver)
conn.reactor.consume_in_thread = mock.Mock()
conn.consume_in_thread()
self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
self.assertTrue(conn.reactor.consume_in_thread.called)
class TestZmqListener(ZmqBaseTestCase):
def test_zmqlistener_no_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
# Timeout = 0 should return straight away since the queue is empty
listener.poll(timeout=0)
def test_zmqlistener_w_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
kwargs = {'a': 1, 'b': 2}
m = mock.Mock()
ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
message = {'namespace': 'name.space', 'method': m.fake_method,
'args': kwargs}
eventlet.spawn_n(listener.dispatch, ctxt, message)
resp = listener.poll(timeout=10)
msg = {'method': m.fake_method, 'namespace': 'name.space',
'args': kwargs}
self.assertEqual(resp.message, msg)
class TestZmqDriver(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
def test_zmqdriver_send(self, mock_multi_send, mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic'
msg = 'jeronimo'
self.driver.send(messaging.Target(topic=topic), context, msg,
False, 0, False)
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
topic, msg,
allowed_remote_exmods=[],
envelope=False, pooled=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic.foo'
topic_reformat = 'testtopic-foo'
msg = 'jeronimo'
self.driver.send_notification(messaging.Target(topic=topic), context,
msg, False, False)
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
topic_reformat, msg,
allowed_remote_exmods=[],
envelope=False, pooled=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
def test_zmqdriver_listen(self, mock_connection, mock_listener):
mock_listener.return_value = listener = mock.Mock()
mock_connection.return_value = conn = mock.Mock()
conn.create_consumer = mock.Mock()
conn.consume_in_thread = mock.Mock()
topic = 'testtopic.foo'
self.driver.listen(messaging.Target(topic=topic))
conn.create_consumer.assert_called_with(topic, listener, fanout=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
def test_zmqdriver_listen_for_notification(self, mock_connection,
mock_listener):
mock_listener.return_value = listener = mock.Mock()
mock_connection.return_value = conn = mock.Mock()
conn.create_consumer = mock.Mock()
conn.consume_in_thread = mock.Mock()
topic = 'testtopic.foo'
data = [(messaging.Target(topic=topic), 0)]
# NOTE(jamespage): Pooling not supported, just pass None for now.
self.driver.listen_for_notifications(data, None)
conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener)

View File

@ -1,69 +0,0 @@
# Copyright 2014 Canonical, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from oslo_messaging.tests import utils as test_utils
from oslo_utils import importutils
# NOTE(jamespage) matchmaker tied directly to eventlet
# which is not yet py3 compatible - skip if import fails
matchmaker = (
importutils.try_import('oslo.messaging._drivers.matchmaker'))
@testtools.skipIf(not matchmaker, "matchmaker/eventlet unavailable")
class MatchmakerTest(test_utils.BaseTestCase):
def test_fanout_binding(self):
matcher = matchmaker.MatchMakerBase()
matcher.add_binding(
matchmaker.FanoutBinding(), matchmaker.DirectExchange())
self.assertEqual(matcher.queues('hello.world'), [])
self.assertEqual(
matcher.queues('fanout~fantasy.unicorn'),
[('fanout~fantasy.unicorn', 'unicorn')])
self.assertEqual(
matcher.queues('fanout~fantasy.pony'),
[('fanout~fantasy.pony', 'pony')])
def test_topic_binding(self):
matcher = matchmaker.MatchMakerBase()
matcher.add_binding(
matchmaker.TopicBinding(), matchmaker.StubExchange())
self.assertEqual(
matcher.queues('hello-world'), [('hello-world', None)])
def test_direct_binding(self):
matcher = matchmaker.MatchMakerBase()
matcher.add_binding(
matchmaker.DirectBinding(), matchmaker.StubExchange())
self.assertEqual(
matcher.queues('hello.server'), [('hello.server', None)])
self.assertEqual(matcher.queues('hello-world'), [])
def test_localhost_match(self):
matcher = matchmaker.MatchMakerLocalhost()
self.assertEqual(
matcher.queues('hello.server'), [('hello.server', 'server')])
# Gets remapped due to localhost exchange
# all bindings default to first match.
self.assertEqual(
matcher.queues('fanout~testing.server'),
[('fanout~testing.localhost', 'localhost')])
self.assertEqual(
matcher.queues('hello-world'),
[('hello-world.localhost', 'localhost')])

View File

@ -1,83 +0,0 @@
# Copyright 2014 Canonical, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from oslo_messaging.tests import utils as test_utils
from oslo_utils import importutils
redis = importutils.try_import('redis')
matchmaker_redis = (
importutils.try_import('oslo.messaging._drivers.matchmaker_redis'))
def redis_available():
'''Helper to see if local redis server is running'''
if not redis:
return False
try:
c = redis.StrictRedis(socket_timeout=1)
c.ping()
return True
except redis.exceptions.ConnectionError:
return False
@testtools.skipIf(not matchmaker_redis, "matchmaker/eventlet unavailable")
@testtools.skipIf(not redis_available(), "redis unavailable")
class RedisMatchMakerTest(test_utils.BaseTestCase):
def setUp(self):
super(RedisMatchMakerTest, self).setUp()
self.ring_data = {
"conductor": ["controller1", "node1", "node2", "node3"],
"scheduler": ["controller1", "node1", "node2", "node3"],
"network": ["controller1", "node1", "node2", "node3"],
"cert": ["controller1"],
"console": ["controller1"],
"l3_agent.node1": ["node1"],
"consoleauth": ["controller1"]}
self.matcher = matchmaker_redis.MatchMakerRedis()
self.populate()
def tearDown(self):
super(RedisMatchMakerTest, self).tearDown()
c = redis.StrictRedis()
c.flushdb()
def populate(self):
for k, hosts in self.ring_data.items():
for h in hosts:
self.matcher.register(k, h)
def test_direct(self):
self.assertEqual(
self.matcher.queues('cert.controller1'),
[('cert.controller1', 'controller1')])
def test_register(self):
self.matcher.register('cert', 'keymaster')
self.assertEqual(
sorted(self.matcher.redis.smembers('cert')),
['cert.controller1', 'cert.keymaster'])
self.matcher.register('l3_agent.node1', 'node1')
self.assertEqual(
sorted(self.matcher.redis.smembers('l3_agent.node1')),
['l3_agent.node1.node1'])
def test_unregister(self):
self.matcher.unregister('conductor', 'controller1')
self.assertEqual(
sorted(self.matcher.redis.smembers('conductor')),
['conductor.node1', 'conductor.node2', 'conductor.node3'])

View File

@ -1,73 +0,0 @@
# Copyright 2014 Canonical, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from oslo_messaging.tests import utils as test_utils
from oslo_utils import importutils
# NOTE(jamespage) matchmaker tied directly to eventlet
# which is not yet py3 compatible - skip if import fails
matchmaker_ring = (
importutils.try_import('oslo.messaging._drivers.matchmaker_ring'))
@testtools.skipIf(not matchmaker_ring, "matchmaker/eventlet unavailable")
class MatchmakerRingTest(test_utils.BaseTestCase):
def setUp(self):
super(MatchmakerRingTest, self).setUp()
self.ring_data = {
"conductor": ["controller1", "node1", "node2", "node3"],
"scheduler": ["controller1", "node1", "node2", "node3"],
"network": ["controller1", "node1", "node2", "node3"],
"cert": ["controller1"],
"console": ["controller1"],
"consoleauth": ["controller1"]}
self.matcher = matchmaker_ring.MatchMakerRing(self.ring_data)
def test_direct(self):
self.assertEqual(
self.matcher.queues('cert.controller1'),
[('cert.controller1', 'controller1')])
self.assertEqual(
self.matcher.queues('conductor.node1'),
[('conductor.node1', 'node1')])
def test_fanout(self):
self.assertEqual(
self.matcher.queues('fanout~conductor'),
[('fanout~conductor.controller1', 'controller1'),
('fanout~conductor.node1', 'node1'),
('fanout~conductor.node2', 'node2'),
('fanout~conductor.node3', 'node3')])
def test_bare_topic(self):
# Round robins through the hosts on the topic
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.controller1', 'controller1')])
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.node1', 'node1')])
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.node2', 'node2')])
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.node3', 'node3')])
# Cycles loop
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.controller1', 'controller1')])

View File

@ -1,174 +0,0 @@
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from oslo_utils import timeutils
import testscenarios
from oslo import messaging
from oslo.messaging.notify import dispatcher as notify_dispatcher
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
load_tests = testscenarios.load_tests_apply_scenarios
notification_msg = dict(
publisher_id="publisher_id",
event_type="compute.start",
payload={"info": "fuu"},
message_id="uuid",
timestamp=str(timeutils.utcnow())
)
class TestDispatcherScenario(test_utils.BaseTestCase):
scenarios = [
('no_endpoints',
dict(endpoints=[],
endpoints_expect_calls=[],
priority='info',
ex=None,
return_value=messaging.NotificationResult.HANDLED)),
('one_endpoints',
dict(endpoints=[['warn']],
endpoints_expect_calls=['warn'],
priority='warn',
ex=None,
return_value=messaging.NotificationResult.HANDLED)),
('two_endpoints_only_one_match',
dict(endpoints=[['warn'], ['info']],
endpoints_expect_calls=[None, 'info'],
priority='info',
ex=None,
return_value=messaging.NotificationResult.HANDLED)),
('two_endpoints_both_match',
dict(endpoints=[['debug', 'info'], ['info', 'debug']],
endpoints_expect_calls=['debug', 'debug'],
priority='debug',
ex=None,
return_value=messaging.NotificationResult.HANDLED)),
('no_return_value',
dict(endpoints=[['warn']],
endpoints_expect_calls=['warn'],
priority='warn',
ex=None, return_value=None)),
('requeue',
dict(endpoints=[['debug', 'warn']],
endpoints_expect_calls=['debug'],
priority='debug', msg=notification_msg,
ex=None,
return_value=messaging.NotificationResult.REQUEUE)),
('exception',
dict(endpoints=[['debug', 'warn']],
endpoints_expect_calls=['debug'],
priority='debug', msg=notification_msg,
ex=Exception,
return_value=messaging.NotificationResult.HANDLED)),
]
def test_dispatcher(self):
endpoints = []
for endpoint_methods in self.endpoints:
e = mock.Mock(spec=endpoint_methods)
endpoints.append(e)
for m in endpoint_methods:
method = getattr(e, m)
if self.ex:
method.side_effect = self.ex()
else:
method.return_value = self.return_value
msg = notification_msg.copy()
msg['priority'] = self.priority
targets = [messaging.Target(topic='notifications')]
dispatcher = notify_dispatcher.NotificationDispatcher(
targets, endpoints, None, allow_requeue=True, pool=None)
# check it listen on wanted topics
self.assertEqual(sorted(set((targets[0], prio)
for prio in itertools.chain.from_iterable(
self.endpoints))),
sorted(dispatcher._targets_priorities))
incoming = mock.Mock(ctxt={}, message=msg)
callback = dispatcher(incoming)
callback.run()
callback.done()
# check endpoint callbacks are called or not
for i, endpoint_methods in enumerate(self.endpoints):
for m in endpoint_methods:
if m == self.endpoints_expect_calls[i]:
method = getattr(endpoints[i], m)
method.assert_called_once_with(
{},
msg['publisher_id'],
msg['event_type'],
msg['payload'], {
'timestamp': mock.ANY,
'message_id': mock.ANY
})
else:
self.assertEqual(0, endpoints[i].call_count)
if self.ex:
self.assertEqual(1, incoming.acknowledge.call_count)
self.assertEqual(0, incoming.requeue.call_count)
elif self.return_value == messaging.NotificationResult.HANDLED \
or self.return_value is None:
self.assertEqual(1, incoming.acknowledge.call_count)
self.assertEqual(0, incoming.requeue.call_count)
elif self.return_value == messaging.NotificationResult.REQUEUE:
self.assertEqual(0, incoming.acknowledge.call_count)
self.assertEqual(1, incoming.requeue.call_count)
class TestDispatcher(test_utils.BaseTestCase):
@mock.patch('oslo_messaging.notify.dispatcher.LOG')
def test_dispatcher_unknown_prio(self, mylog):
msg = notification_msg.copy()
msg['priority'] = 'what???'
dispatcher = notify_dispatcher.NotificationDispatcher(
[mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None)
callback = dispatcher(mock.Mock(ctxt={}, message=msg))
callback.run()
callback.done()
mylog.warning.assert_called_once_with('Unknown priority "%s"',
'what???')
def test_dispatcher_executor_callback(self):
endpoint = mock.Mock(spec=['warn'])
endpoint_method = endpoint.warn
endpoint_method.return_value = messaging.NotificationResult.HANDLED
targets = [messaging.Target(topic='notifications')]
dispatcher = notify_dispatcher.NotificationDispatcher(
targets, [endpoint], None, allow_requeue=True)
msg = notification_msg.copy()
msg['priority'] = 'warn'
incoming = mock.Mock(ctxt={}, message=msg)
executor_callback = mock.Mock()
callback = dispatcher(incoming, executor_callback)
callback.run()
callback.done()
self.assertTrue(executor_callback.called)
self.assertEqual(executor_callback.call_args[0][0], endpoint_method)

View File

@ -1,411 +0,0 @@
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import time
import testscenarios
from oslo import messaging
from oslo.messaging.notify import dispatcher
from oslo_config import cfg
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
load_tests = testscenarios.load_tests_apply_scenarios
class RestartableServerThread(object):
def __init__(self, server):
self.server = server
self.thread = None
def start(self):
if self.thread is None:
self.thread = threading.Thread(target=self.server.start)
self.thread.daemon = True
self.thread.start()
def stop(self):
if self.thread is not None:
# Check start() does nothing with a running listener
self.server.start()
self.server.stop()
self.server.wait()
self.thread.join(timeout=15)
ret = self.thread.isAlive()
self.thread = None
return ret
return True
class ListenerSetupMixin(object):
class ThreadTracker(object):
def __init__(self):
self._received_msgs = 0
self.threads = []
self.lock = threading.Lock()
def info(self, ctxt, publisher_id, event_type, payload, metadata):
# NOTE(sileht): this run into an other thread
with self.lock:
self._received_msgs += 1
def wait_for_messages(self, expect_messages):
while self._received_msgs < expect_messages:
time.sleep(0.01)
def stop(self):
for thread in self.threads:
thread.stop()
self.threads = []
def start(self, thread):
self.threads.append(thread)
thread.start()
def setUp(self):
self.trackers = {}
self.addCleanup(self._stop_trackers)
def _stop_trackers(self):
for pool in self.trackers:
self.trackers[pool].stop()
self.trackers = {}
def _setup_listener(self, transport, endpoints,
targets=None, pool=None):
if pool is None:
tracker_name = '__default__'
else:
tracker_name = pool
if targets is None:
targets = [messaging.Target(topic='testtopic')]
tracker = self.trackers.setdefault(
tracker_name, self.ThreadTracker())
listener = messaging.get_notification_listener(
transport, targets=targets, endpoints=[tracker] + endpoints,
allow_requeue=True, pool=pool)
thread = RestartableServerThread(listener)
tracker.start(thread)
return thread
def wait_for_messages(self, expect_messages, tracker_name='__default__'):
self.trackers[tracker_name].wait_for_messages(expect_messages)
def _setup_notifier(self, transport, topic='testtopic',
publisher_id='testpublisher'):
return messaging.Notifier(transport, topic=topic,
driver='messaging',
publisher_id=publisher_id)
class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
def __init__(self, *args):
super(TestNotifyListener, self).__init__(*args)
ListenerSetupMixin.__init__(self)
def setUp(self):
super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
ListenerSetupMixin.setUp(self)
def test_constructor(self):
transport = messaging.get_transport(self.conf, url='fake:')
target = messaging.Target(topic='foo')
endpoints = [object()]
listener = messaging.get_notification_listener(transport, [target],
endpoints)
self.assertIs(listener.conf, self.conf)
self.assertIs(listener.transport, transport)
self.assertIsInstance(listener.dispatcher,
dispatcher.NotificationDispatcher)
self.assertIs(listener.dispatcher.endpoints, endpoints)
self.assertEqual('blocking', listener.executor)
def test_no_target_topic(self):
transport = messaging.get_transport(self.conf, url='fake:')
listener = messaging.get_notification_listener(transport,
[messaging.Target()],
[mock.Mock()])
try:
listener.start()
except Exception as ex:
self.assertIsInstance(ex, messaging.InvalidTarget, ex)
else:
self.assertTrue(False)
def test_unknown_executor(self):
transport = messaging.get_transport(self.conf, url='fake:')
try:
messaging.get_notification_listener(transport, [], [],
executor='foo')
except Exception as ex:
self.assertIsInstance(ex, messaging.ExecutorLoadFailure)
self.assertEqual('foo', ex.executor)
else:
self.assertTrue(False)
def test_one_topic(self):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
listener_thread = self._setup_listener(transport, [endpoint])
notifier = self._setup_notifier(transport)
notifier.info({}, 'an_event.start', 'test message')
self.wait_for_messages(1)
self.assertFalse(listener_thread.stop())
endpoint.info.assert_called_once_with(
{}, 'testpublisher', 'an_event.start', 'test message',
{'message_id': mock.ANY, 'timestamp': mock.ANY})
def test_two_topics(self):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
targets = [messaging.Target(topic="topic1"),
messaging.Target(topic="topic2")]
listener_thread = self._setup_listener(transport, [endpoint],
targets=targets)
notifier = self._setup_notifier(transport, topic='topic1')
notifier.info({'ctxt': '1'}, 'an_event.start1', 'test')
notifier = self._setup_notifier(transport, topic='topic2')
notifier.info({'ctxt': '2'}, 'an_event.start2', 'test')
self.wait_for_messages(2)
self.assertFalse(listener_thread.stop())
endpoint.info.assert_has_calls([
mock.call({'ctxt': '1'}, 'testpublisher',
'an_event.start1', 'test',
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
mock.call({'ctxt': '2'}, 'testpublisher',
'an_event.start2', 'test',
{'timestamp': mock.ANY, 'message_id': mock.ANY})],
any_order=True)
def test_two_exchanges(self):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
targets = [messaging.Target(topic="topic",
exchange="exchange1"),
messaging.Target(topic="topic",
exchange="exchange2")]
listener_thread = self._setup_listener(transport, [endpoint],
targets=targets)
notifier = self._setup_notifier(transport, topic="topic")
def mock_notifier_exchange(name):
def side_effect(target, ctxt, message, version, retry):
target.exchange = name
return transport._driver.send_notification(target, ctxt,
message, version,
retry=retry)
transport._send_notification = mock.MagicMock(
side_effect=side_effect)
notifier.info({'ctxt': '0'},
'an_event.start', 'test message default exchange')
mock_notifier_exchange('exchange1')
notifier.info({'ctxt': '1'},
'an_event.start', 'test message exchange1')
mock_notifier_exchange('exchange2')
notifier.info({'ctxt': '2'},
'an_event.start', 'test message exchange2')
self.wait_for_messages(2)
self.assertFalse(listener_thread.stop())
endpoint.info.assert_has_calls([
mock.call({'ctxt': '1'}, 'testpublisher', 'an_event.start',
'test message exchange1',
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
mock.call({'ctxt': '2'}, 'testpublisher', 'an_event.start',
'test message exchange2',
{'timestamp': mock.ANY, 'message_id': mock.ANY})],
any_order=True)
def test_two_endpoints(self):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint1 = mock.Mock()
endpoint1.info.return_value = None
endpoint2 = mock.Mock()
endpoint2.info.return_value = messaging.NotificationResult.HANDLED
listener_thread = self._setup_listener(transport,
[endpoint1, endpoint2])
notifier = self._setup_notifier(transport)
notifier.info({}, 'an_event.start', 'test')
self.wait_for_messages(1)
self.assertFalse(listener_thread.stop())
endpoint1.info.assert_called_once_with(
{}, 'testpublisher', 'an_event.start', 'test', {
'timestamp': mock.ANY,
'message_id': mock.ANY})
endpoint2.info.assert_called_once_with(
{}, 'testpublisher', 'an_event.start', 'test', {
'timestamp': mock.ANY,
'message_id': mock.ANY})
def test_requeue(self):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info = mock.Mock()
def side_effect_requeue(*args, **kwargs):
if endpoint.info.call_count == 1:
return messaging.NotificationResult.REQUEUE
return messaging.NotificationResult.HANDLED
endpoint.info.side_effect = side_effect_requeue
listener_thread = self._setup_listener(transport, [endpoint])
notifier = self._setup_notifier(transport)
notifier.info({}, 'an_event.start', 'test')
self.wait_for_messages(2)
self.assertFalse(listener_thread.stop())
endpoint.info.assert_has_calls([
mock.call({}, 'testpublisher', 'an_event.start', 'test',
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
mock.call({}, 'testpublisher', 'an_event.start', 'test',
{'timestamp': mock.ANY, 'message_id': mock.ANY})])
def test_two_pools(self):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint1 = mock.Mock()
endpoint1.info.return_value = None
endpoint2 = mock.Mock()
endpoint2.info.return_value = None
targets = [messaging.Target(topic="topic")]
listener1_thread = self._setup_listener(transport, [endpoint1],
targets=targets, pool="pool1")
listener2_thread = self._setup_listener(transport, [endpoint2],
targets=targets, pool="pool2")
notifier = self._setup_notifier(transport, topic="topic")
notifier.info({'ctxt': '0'}, 'an_event.start', 'test message0')
notifier.info({'ctxt': '1'}, 'an_event.start', 'test message1')
self.wait_for_messages(2, "pool1")
self.wait_for_messages(2, "pool2")
self.assertFalse(listener2_thread.stop())
self.assertFalse(listener1_thread.stop())
def mocked_endpoint_call(i):
return mock.call({'ctxt': '%d' % i}, 'testpublisher',
'an_event.start', 'test message%d' % i,
{'timestamp': mock.ANY, 'message_id': mock.ANY})
endpoint1.info.assert_has_calls([mocked_endpoint_call(0),
mocked_endpoint_call(1)])
endpoint2.info.assert_has_calls([mocked_endpoint_call(0),
mocked_endpoint_call(1)])
def test_two_pools_three_listener(self):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint1 = mock.Mock()
endpoint1.info.return_value = None
endpoint2 = mock.Mock()
endpoint2.info.return_value = None
endpoint3 = mock.Mock()
endpoint3.info.return_value = None
targets = [messaging.Target(topic="topic")]
listener1_thread = self._setup_listener(transport, [endpoint1],
targets=targets, pool="pool1")
listener2_thread = self._setup_listener(transport, [endpoint2],
targets=targets, pool="pool2")
listener3_thread = self._setup_listener(transport, [endpoint3],
targets=targets, pool="pool2")
def mocked_endpoint_call(i):
return mock.call({'ctxt': '%d' % i}, 'testpublisher',
'an_event.start', 'test message%d' % i,
{'timestamp': mock.ANY, 'message_id': mock.ANY})
notifier = self._setup_notifier(transport, topic="topic")
mocked_endpoint1_calls = []
for i in range(0, 25):
notifier.info({'ctxt': '%d' % i}, 'an_event.start',
'test message%d' % i)
mocked_endpoint1_calls.append(mocked_endpoint_call(i))
self.wait_for_messages(25, 'pool2')
listener2_thread.stop()
for i in range(0, 25):
notifier.info({'ctxt': '%d' % i}, 'an_event.start',
'test message%d' % i)
mocked_endpoint1_calls.append(mocked_endpoint_call(i))
self.wait_for_messages(50, 'pool2')
listener2_thread.start()
listener3_thread.stop()
for i in range(0, 25):
notifier.info({'ctxt': '%d' % i}, 'an_event.start',
'test message%d' % i)
mocked_endpoint1_calls.append(mocked_endpoint_call(i))
self.wait_for_messages(75, 'pool2')
listener3_thread.start()
for i in range(0, 25):
notifier.info({'ctxt': '%d' % i}, 'an_event.start',
'test message%d' % i)
mocked_endpoint1_calls.append(mocked_endpoint_call(i))
self.wait_for_messages(100, 'pool1')
self.wait_for_messages(100, 'pool2')
self.assertFalse(listener3_thread.stop())
self.assertFalse(listener2_thread.stop())
self.assertFalse(listener1_thread.stop())
self.assertEqual(100, endpoint1.info.call_count)
endpoint1.info.assert_has_calls(mocked_endpoint1_calls)
self.assertLessEqual(25, endpoint2.info.call_count)
self.assertLessEqual(25, endpoint3.info.call_count)
self.assertEqual(100, endpoint2.info.call_count +
endpoint3.info.call_count)
for call in mocked_endpoint1_calls:
self.assertIn(call, endpoint2.info.mock_calls +
endpoint3.info.mock_calls)

View File

@ -1,57 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo import messaging
from oslo.messaging.notify import log_handler
from oslo_messaging.tests.notify import test_notifier
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
class PublishErrorsHandlerTestCase(test_utils.BaseTestCase):
"""Tests for log.PublishErrorsHandler"""
def setUp(self):
super(PublishErrorsHandlerTestCase, self).setUp()
self.publisherrorshandler = (log_handler.
PublishErrorsHandler(logging.ERROR))
def test_emit_cfg_log_notifier_in_notifier_drivers(self):
drivers = ['messaging', 'log']
self.config(notification_driver=drivers)
self.stub_flg = True
transport = test_notifier._FakeTransport(self.conf)
notifier = messaging.Notifier(transport)
def fake_notifier(*args, **kwargs):
self.stub_flg = False
self.stubs.Set(notifier, 'error', fake_notifier)
logrecord = logging.LogRecord(name='name', level='WARN',
pathname='/tmp', lineno=1, msg='Message',
args=None, exc_info=None)
self.publisherrorshandler.emit(logrecord)
self.assertTrue(self.stub_flg)
@mock.patch('oslo_messaging.notify.notifier.Notifier._notify')
def test_emit_notification(self, mock_notify):
logrecord = logging.LogRecord(name='name', level='ERROR',
pathname='/tmp', lineno=1, msg='Message',
args=None, exc_info=None)
self.publisherrorshandler.emit(logrecord)
self.assertEqual('error.publisher',
self.publisherrorshandler._notifier.publisher_id)
mock_notify.assert_called_with({}, 'error_notification',
{'error': 'Message'}, 'ERROR')

View File

@ -1,154 +0,0 @@
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import logging
import logging.config
import os
import sys
from oslo_utils import timeutils
import testscenarios
from oslo import messaging
import oslo_messaging
from oslo_messaging.tests.notify import test_notifier
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
load_tests = testscenarios.load_tests_apply_scenarios
# Stolen from openstack.common.logging
logging.AUDIT = logging.INFO + 1
logging.addLevelName(logging.AUDIT, 'AUDIT')
class TestLogNotifier(test_utils.BaseTestCase):
scenarios = [
('debug', dict(priority='debug')),
('info', dict(priority='info')),
('warning', dict(priority='warning', queue='WARN')),
('warn', dict(priority='warn')),
('error', dict(priority='error')),
('critical', dict(priority='critical')),
('audit', dict(priority='audit')),
]
def setUp(self):
super(TestLogNotifier, self).setUp()
self.addCleanup(oslo_messaging.notify._impl_test.reset)
self.config(notification_driver=['test'])
# NOTE(jamespage) disable thread information logging for testing
# as this causes test failures when zmq tests monkey_patch via
# eventlet
logging.logThreads = 0
@mock.patch('oslo_utils.timeutils.utcnow')
def test_logger(self, mock_utcnow):
with mock.patch('oslo_messaging.transport.get_transport',
return_value=test_notifier._FakeTransport(self.conf)):
self.logger = messaging.LoggingNotificationHandler('test://')
mock_utcnow.return_value = datetime.datetime.utcnow()
levelno = getattr(logging, self.priority.upper(), 42)
record = logging.LogRecord('foo',
levelno,
'/foo/bar',
42,
'Something happened',
None,
None)
self.logger.emit(record)
context = oslo_messaging.notify._impl_test.NOTIFICATIONS[0][0]
self.assertEqual({}, context)
n = oslo_messaging.notify._impl_test.NOTIFICATIONS[0][1]
self.assertEqual(getattr(self, 'queue', self.priority.upper()),
n['priority'])
self.assertEqual('logrecord', n['event_type'])
self.assertEqual(str(timeutils.utcnow()), n['timestamp'])
self.assertEqual(None, n['publisher_id'])
self.assertEqual(
{'process': os.getpid(),
'funcName': None,
'name': 'foo',
'thread': None,
'levelno': levelno,
'processName': 'MainProcess',
'pathname': '/foo/bar',
'lineno': 42,
'msg': 'Something happened',
'exc_info': None,
'levelname': logging.getLevelName(levelno),
'extra': None},
n['payload'])
@mock.patch('oslo_utils.timeutils.utcnow')
def test_logging_conf(self, mock_utcnow):
with mock.patch('oslo_messaging.transport.get_transport',
return_value=test_notifier._FakeTransport(self.conf)):
logging.config.dictConfig({
'version': 1,
'handlers': {
'notification': {
'class': 'oslo.messaging.LoggingNotificationHandler',
'level': self.priority.upper(),
'url': 'test://',
},
},
'loggers': {
'default': {
'handlers': ['notification'],
'level': self.priority.upper(),
},
},
})
mock_utcnow.return_value = datetime.datetime.utcnow()
levelno = getattr(logging, self.priority.upper())
logger = logging.getLogger('default')
lineno = sys._getframe().f_lineno + 1
logger.log(levelno, 'foobar')
n = oslo_messaging.notify._impl_test.NOTIFICATIONS[0][1]
self.assertEqual(getattr(self, 'queue', self.priority.upper()),
n['priority'])
self.assertEqual('logrecord', n['event_type'])
self.assertEqual(str(timeutils.utcnow()), n['timestamp'])
self.assertEqual(None, n['publisher_id'])
pathname = __file__
if pathname.endswith(('.pyc', '.pyo')):
pathname = pathname[:-1]
self.assertDictEqual(
n['payload'],
{'process': os.getpid(),
'funcName': 'test_logging_conf',
'name': 'default',
'thread': None,
'levelno': levelno,
'processName': 'MainProcess',
'pathname': pathname,
'lineno': lineno,
'msg': 'foobar',
'exc_info': None,
'levelname': logging.getLevelName(levelno),
'extra': None})

View File

@ -1,190 +0,0 @@
# Copyright 2013-2014 eNovance
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import webob
from oslo.messaging.notify import middleware
from oslo_messaging.tests import utils
from six.moves import mock
class FakeApp(object):
def __call__(self, env, start_response):
body = 'Some response'
start_response('200 OK', [
('Content-Type', 'text/plain'),
('Content-Length', str(sum(map(len, body))))
])
return [body]
class FakeFailingApp(object):
def __call__(self, env, start_response):
raise Exception("It happens!")
class NotifierMiddlewareTest(utils.BaseTestCase):
def test_notification(self):
m = middleware.RequestNotifier(FakeApp())
req = webob.Request.blank('/foo/bar',
environ={'REQUEST_METHOD': 'GET',
'HTTP_X_AUTH_TOKEN': uuid.uuid4()})
with mock.patch(
'oslo.messaging.notify.notifier.Notifier._notify') as notify:
m(req)
# Check first notification with only 'request'
call_args = notify.call_args_list[0][0]
self.assertEqual(call_args[1], 'http.request')
self.assertEqual(call_args[3], 'INFO')
self.assertEqual(set(call_args[2].keys()),
set(['request']))
request = call_args[2]['request']
self.assertEqual(request['PATH_INFO'], '/foo/bar')
self.assertEqual(request['REQUEST_METHOD'], 'GET')
self.assertIn('HTTP_X_SERVICE_NAME', request)
self.assertNotIn('HTTP_X_AUTH_TOKEN', request)
self.assertFalse(any(map(lambda s: s.startswith('wsgi.'),
request.keys())),
"WSGI fields are filtered out")
# Check second notification with request + response
call_args = notify.call_args_list[1][0]
self.assertEqual(call_args[1], 'http.response')
self.assertEqual(call_args[3], 'INFO')
self.assertEqual(set(call_args[2].keys()),
set(['request', 'response']))
request = call_args[2]['request']
self.assertEqual(request['PATH_INFO'], '/foo/bar')
self.assertEqual(request['REQUEST_METHOD'], 'GET')
self.assertIn('HTTP_X_SERVICE_NAME', request)
self.assertNotIn('HTTP_X_AUTH_TOKEN', request)
self.assertFalse(any(map(lambda s: s.startswith('wsgi.'),
request.keys())),
"WSGI fields are filtered out")
response = call_args[2]['response']
self.assertEqual(response['status'], '200 OK')
self.assertEqual(response['headers']['content-length'], '13')
def test_notification_response_failure(self):
m = middleware.RequestNotifier(FakeFailingApp())
req = webob.Request.blank('/foo/bar',
environ={'REQUEST_METHOD': 'GET',
'HTTP_X_AUTH_TOKEN': uuid.uuid4()})
with mock.patch(
'oslo.messaging.notify.notifier.Notifier._notify') as notify:
try:
m(req)
self.fail("Application exception has not been re-raised")
except Exception:
pass
# Check first notification with only 'request'
call_args = notify.call_args_list[0][0]
self.assertEqual(call_args[1], 'http.request')
self.assertEqual(call_args[3], 'INFO')
self.assertEqual(set(call_args[2].keys()),
set(['request']))
request = call_args[2]['request']
self.assertEqual(request['PATH_INFO'], '/foo/bar')
self.assertEqual(request['REQUEST_METHOD'], 'GET')
self.assertIn('HTTP_X_SERVICE_NAME', request)
self.assertNotIn('HTTP_X_AUTH_TOKEN', request)
self.assertFalse(any(map(lambda s: s.startswith('wsgi.'),
request.keys())),
"WSGI fields are filtered out")
# Check second notification with 'request' and 'exception'
call_args = notify.call_args_list[1][0]
self.assertEqual(call_args[1], 'http.response')
self.assertEqual(call_args[3], 'INFO')
self.assertEqual(set(call_args[2].keys()),
set(['request', 'exception']))
request = call_args[2]['request']
self.assertEqual(request['PATH_INFO'], '/foo/bar')
self.assertEqual(request['REQUEST_METHOD'], 'GET')
self.assertIn('HTTP_X_SERVICE_NAME', request)
self.assertNotIn('HTTP_X_AUTH_TOKEN', request)
self.assertFalse(any(map(lambda s: s.startswith('wsgi.'),
request.keys())),
"WSGI fields are filtered out")
exception = call_args[2]['exception']
self.assertIn('middleware.py', exception['traceback'][0])
self.assertIn('It happens!', exception['traceback'][-1])
self.assertEqual(exception['value'], "Exception('It happens!',)")
def test_process_request_fail(self):
def notify_error(context, publisher_id, event_type,
priority, payload):
raise Exception('error')
with mock.patch('oslo.messaging.notify.notifier.Notifier._notify',
notify_error):
m = middleware.RequestNotifier(FakeApp())
req = webob.Request.blank('/foo/bar',
environ={'REQUEST_METHOD': 'GET'})
m.process_request(req)
def test_process_response_fail(self):
def notify_error(context, publisher_id, event_type,
priority, payload):
raise Exception('error')
with mock.patch('oslo.messaging.notify.notifier.Notifier._notify',
notify_error):
m = middleware.RequestNotifier(FakeApp())
req = webob.Request.blank('/foo/bar',
environ={'REQUEST_METHOD': 'GET'})
m.process_response(req, webob.response.Response())
def test_ignore_req_opt(self):
m = middleware.RequestNotifier(FakeApp(),
ignore_req_list='get, PUT')
req = webob.Request.blank('/skip/foo',
environ={'REQUEST_METHOD': 'GET'})
req1 = webob.Request.blank('/skip/foo',
environ={'REQUEST_METHOD': 'PUT'})
req2 = webob.Request.blank('/accept/foo',
environ={'REQUEST_METHOD': 'POST'})
with mock.patch(
'oslo.messaging.notify.notifier.Notifier._notify') as notify:
# Check GET request does not send notification
m(req)
m(req1)
self.assertEqual(len(notify.call_args_list), 0)
# Check non-GET request does send notification
m(req2)
self.assertEqual(len(notify.call_args_list), 2)
call_args = notify.call_args_list[0][0]
self.assertEqual(call_args[1], 'http.request')
self.assertEqual(call_args[3], 'INFO')
self.assertEqual(set(call_args[2].keys()),
set(['request']))
request = call_args[2]['request']
self.assertEqual(request['PATH_INFO'], '/accept/foo')
self.assertEqual(request['REQUEST_METHOD'], 'POST')
call_args = notify.call_args_list[1][0]
self.assertEqual(call_args[1], 'http.response')
self.assertEqual(call_args[3], 'INFO')
self.assertEqual(set(call_args[2].keys()),
set(['request', 'response']))

View File

@ -1,540 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import logging
import sys
import uuid
import fixtures
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from stevedore import dispatch
from stevedore import extension
import testscenarios
import yaml
from oslo import messaging
from oslo.messaging.notify import notifier as msg_notifier
from oslo.messaging import serializer as msg_serializer
from oslo_messaging.notify import _impl_log
from oslo_messaging.notify import _impl_messaging
from oslo_messaging.notify import _impl_test
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
load_tests = testscenarios.load_tests_apply_scenarios
class _FakeTransport(object):
def __init__(self, conf):
self.conf = conf
def _send_notification(self, target, ctxt, message, version, retry=None):
pass
class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
"""Record logged exceptions and re-raise in cleanup.
The notifier just logs notification send errors so, for the sake of
debugging test failures, we record any exceptions logged and re-raise them
during cleanup.
"""
class FakeLogger(object):
def __init__(self):
self.exceptions = []
def exception(self, msg, *args, **kwargs):
self.exceptions.append(sys.exc_info()[1])
def setUp(self):
super(_ReRaiseLoggedExceptionsFixture, self).setUp()
self.logger = self.FakeLogger()
def reraise_exceptions():
for ex in self.logger.exceptions:
raise ex
self.addCleanup(reraise_exceptions)
class TestMessagingNotifier(test_utils.BaseTestCase):
_v1 = [
('v1', dict(v1=True)),
('not_v1', dict(v1=False)),
]
_v2 = [
('v2', dict(v2=True)),
('not_v2', dict(v2=False)),
]
_publisher_id = [
('ctor_pub_id', dict(ctor_pub_id='test',
expected_pub_id='test')),
('prep_pub_id', dict(prep_pub_id='test.localhost',
expected_pub_id='test.localhost')),
('override', dict(ctor_pub_id='test',
prep_pub_id='test.localhost',
expected_pub_id='test.localhost')),
]
_topics = [
('no_topics', dict(topics=[])),
('single_topic', dict(topics=['notifications'])),
('multiple_topic2', dict(topics=['foo', 'bar'])),
]
_priority = [
('audit', dict(priority='audit')),
('debug', dict(priority='debug')),
('info', dict(priority='info')),
('warn', dict(priority='warn')),
('error', dict(priority='error')),
('sample', dict(priority='sample')),
('critical', dict(priority='critical')),
]
_payload = [
('payload', dict(payload={'foo': 'bar'})),
]
_context = [
('ctxt', dict(ctxt={'user': 'bob'})),
]
_retry = [
('unconfigured', dict()),
('None', dict(retry=None)),
('0', dict(retry=0)),
('5', dict(retry=5)),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._v1,
cls._v2,
cls._publisher_id,
cls._topics,
cls._priority,
cls._payload,
cls._context,
cls._retry)
def setUp(self):
super(TestMessagingNotifier, self).setUp()
self.logger = self.useFixture(_ReRaiseLoggedExceptionsFixture()).logger
self.stubs.Set(_impl_messaging, 'LOG', self.logger)
self.stubs.Set(msg_notifier, '_LOG', self.logger)
@mock.patch('oslo_utils.timeutils.utcnow')
def test_notifier(self, mock_utcnow):
drivers = []
if self.v1:
drivers.append('messaging')
if self.v2:
drivers.append('messagingv2')
self.config(notification_driver=drivers,
notification_topics=self.topics)
transport = _FakeTransport(self.conf)
if hasattr(self, 'ctor_pub_id'):
notifier = messaging.Notifier(transport,
publisher_id=self.ctor_pub_id)
else:
notifier = messaging.Notifier(transport)
prepare_kwds = {}
if hasattr(self, 'retry'):
prepare_kwds['retry'] = self.retry
if hasattr(self, 'prep_pub_id'):
prepare_kwds['publisher_id'] = self.prep_pub_id
if prepare_kwds:
notifier = notifier.prepare(**prepare_kwds)
self.mox.StubOutWithMock(transport, '_send_notification')
message_id = uuid.uuid4()
self.mox.StubOutWithMock(uuid, 'uuid4')
uuid.uuid4().AndReturn(message_id)
mock_utcnow.return_value = datetime.datetime.utcnow()
message = {
'message_id': str(message_id),
'publisher_id': self.expected_pub_id,
'event_type': 'test.notify',
'priority': self.priority.upper(),
'payload': self.payload,
'timestamp': str(timeutils.utcnow()),
}
sends = []
if self.v1:
sends.append(dict(version=1.0))
if self.v2:
sends.append(dict(version=2.0))
for send_kwargs in sends:
for topic in self.topics:
if hasattr(self, 'retry'):
send_kwargs['retry'] = self.retry
else:
send_kwargs['retry'] = None
target = messaging.Target(topic='%s.%s' % (topic,
self.priority))
transport._send_notification(target, self.ctxt, message,
**send_kwargs).InAnyOrder()
self.mox.ReplayAll()
method = getattr(notifier, self.priority)
method(self.ctxt, 'test.notify', self.payload)
TestMessagingNotifier.generate_scenarios()
class TestSerializer(test_utils.BaseTestCase):
def setUp(self):
super(TestSerializer, self).setUp()
self.addCleanup(_impl_test.reset)
@mock.patch('oslo_utils.timeutils.utcnow')
def test_serializer(self, mock_utcnow):
transport = _FakeTransport(self.conf)
serializer = msg_serializer.NoOpSerializer()
notifier = messaging.Notifier(transport,
'test.localhost',
driver='test',
topic='test',
serializer=serializer)
message_id = uuid.uuid4()
self.mox.StubOutWithMock(uuid, 'uuid4')
uuid.uuid4().AndReturn(message_id)
mock_utcnow.return_value = datetime.datetime.utcnow()
self.mox.StubOutWithMock(serializer, 'serialize_context')
self.mox.StubOutWithMock(serializer, 'serialize_entity')
serializer.serialize_context(dict(user='bob')).\
AndReturn(dict(user='alice'))
serializer.serialize_entity(dict(user='bob'), 'bar').AndReturn('sbar')
self.mox.ReplayAll()
notifier.info(dict(user='bob'), 'test.notify', 'bar')
message = {
'message_id': str(message_id),
'publisher_id': 'test.localhost',
'event_type': 'test.notify',
'priority': 'INFO',
'payload': 'sbar',
'timestamp': str(timeutils.utcnow()),
}
self.assertEqual([(dict(user='alice'), message, 'INFO', None)],
_impl_test.NOTIFICATIONS)
class TestLogNotifier(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow')
def test_notifier(self, mock_utcnow):
self.config(notification_driver=['log'])
transport = _FakeTransport(self.conf)
notifier = messaging.Notifier(transport, 'test.localhost')
message_id = uuid.uuid4()
self.mox.StubOutWithMock(uuid, 'uuid4')
uuid.uuid4().AndReturn(message_id)
mock_utcnow.return_value = datetime.datetime.utcnow()
message = {
'message_id': str(message_id),
'publisher_id': 'test.localhost',
'event_type': 'test.notify',
'priority': 'INFO',
'payload': 'bar',
'timestamp': str(timeutils.utcnow()),
}
logger = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(logging, 'getLogger')
logging.getLogger('oslo.messaging.notification.test.notify').\
AndReturn(logger)
logger.info(jsonutils.dumps(message))
self.mox.ReplayAll()
notifier.info({}, 'test.notify', 'bar')
def test_sample_priority(self):
# Ensure logger drops sample-level notifications.
driver = _impl_log.LogDriver(None, None, None)
logger = self.mox.CreateMock(
logging.getLogger('oslo.messaging.notification.foo'))
logger.sample = None
self.mox.StubOutWithMock(logging, 'getLogger')
logging.getLogger('oslo.messaging.notification.foo').\
AndReturn(logger)
self.mox.ReplayAll()
msg = {'event_type': 'foo'}
driver.notify(None, msg, "sample", None)
class TestRoutingNotifier(test_utils.BaseTestCase):
def setUp(self):
super(TestRoutingNotifier, self).setUp()
self.config(notification_driver=['routing'])
transport = _FakeTransport(self.conf)
self.notifier = messaging.Notifier(transport)
self.router = self.notifier._driver_mgr['routing'].obj
def _fake_extension_manager(self, ext):
return extension.ExtensionManager.make_test_instance(
[extension.Extension('test', None, None, ext), ])
def _empty_extension_manager(self):
return extension.ExtensionManager.make_test_instance([])
def test_should_load_plugin(self):
self.router.used_drivers = set(["zoo", "blah"])
ext = mock.MagicMock()
ext.name = "foo"
self.assertFalse(self.router._should_load_plugin(ext))
ext.name = "zoo"
self.assertTrue(self.router._should_load_plugin(ext))
def test_load_notifiers_no_config(self):
# default routing_notifier_config=""
self.router._load_notifiers()
self.assertEqual({}, self.router.routing_groups)
self.assertEqual(0, len(self.router.used_drivers))
def test_load_notifiers_no_extensions(self):
self.config(routing_notifier_config="routing_notifier.yaml")
routing_config = r""
config_file = mock.MagicMock()
config_file.return_value = routing_config
with mock.patch.object(self.router, '_get_notifier_config_file',
config_file):
with mock.patch('stevedore.dispatch.DispatchExtensionManager',
return_value=self._empty_extension_manager()):
with mock.patch('oslo_messaging.notify.'
'_impl_routing.LOG') as mylog:
self.router._load_notifiers()
self.assertFalse(mylog.debug.called)
self.assertEqual({}, self.router.routing_groups)
def test_load_notifiers_config(self):
self.config(routing_notifier_config="routing_notifier.yaml")
routing_config = r"""
group_1:
rpc : foo
group_2:
rpc : blah
"""
config_file = mock.MagicMock()
config_file.return_value = routing_config
with mock.patch.object(self.router, '_get_notifier_config_file',
config_file):
with mock.patch('stevedore.dispatch.DispatchExtensionManager',
return_value=self._fake_extension_manager(
mock.MagicMock())):
self.router._load_notifiers()
groups = list(self.router.routing_groups.keys())
groups.sort()
self.assertEqual(['group_1', 'group_2'], groups)
def test_get_drivers_for_message_accepted_events(self):
config = r"""
group_1:
rpc:
accepted_events:
- foo.*
- blah.zoo.*
- zip
"""
groups = yaml.load(config)
group = groups['group_1']
# No matching event ...
self.assertEqual([],
self.router._get_drivers_for_message(
group, "unknown", "info"))
# Child of foo ...
self.assertEqual(['rpc'],
self.router._get_drivers_for_message(
group, "foo.1", "info"))
# Foo itself ...
self.assertEqual([],
self.router._get_drivers_for_message(
group, "foo", "info"))
# Child of blah.zoo
self.assertEqual(['rpc'],
self.router._get_drivers_for_message(
group, "blah.zoo.zing", "info"))
def test_get_drivers_for_message_accepted_priorities(self):
config = r"""
group_1:
rpc:
accepted_priorities:
- info
- error
"""
groups = yaml.load(config)
group = groups['group_1']
# No matching priority
self.assertEqual([],
self.router._get_drivers_for_message(
group, None, "unknown"))
# Info ...
self.assertEqual(['rpc'],
self.router._get_drivers_for_message(
group, None, "info"))
# Error (to make sure the list is getting processed) ...
self.assertEqual(['rpc'],
self.router._get_drivers_for_message(
group, None, "error"))
def test_get_drivers_for_message_both(self):
config = r"""
group_1:
rpc:
accepted_priorities:
- info
accepted_events:
- foo.*
driver_1:
accepted_priorities:
- info
driver_2:
accepted_events:
- foo.*
"""
groups = yaml.load(config)
group = groups['group_1']
# Valid event, but no matching priority
self.assertEqual(['driver_2'],
self.router._get_drivers_for_message(
group, 'foo.blah', "unknown"))
# Valid priority, but no matching event
self.assertEqual(['driver_1'],
self.router._get_drivers_for_message(
group, 'unknown', "info"))
# Happy day ...
x = self.router._get_drivers_for_message(group, 'foo.blah', "info")
x.sort()
self.assertEqual(['driver_1', 'driver_2', 'rpc'], x)
def test_filter_func(self):
ext = mock.MagicMock()
ext.name = "rpc"
# Good ...
self.assertTrue(self.router._filter_func(ext, {}, {}, 'info',
None, ['foo', 'rpc']))
# Bad
self.assertFalse(self.router._filter_func(ext, {}, {}, 'info',
None, ['foo']))
def test_notify(self):
self.router.routing_groups = {'group_1': None, 'group_2': None}
drivers_mock = mock.MagicMock()
drivers_mock.side_effect = [['rpc'], ['foo']]
with mock.patch.object(self.router, 'plugin_manager') as pm:
with mock.patch.object(self.router, '_get_drivers_for_message',
drivers_mock):
self.notifier.info({}, 'my_event', {})
self.assertEqual(sorted(['rpc', 'foo']),
sorted(pm.map.call_args[0][6]))
def test_notify_filtered(self):
self.config(routing_notifier_config="routing_notifier.yaml")
routing_config = r"""
group_1:
rpc:
accepted_events:
- my_event
rpc2:
accepted_priorities:
- info
bar:
accepted_events:
- nothing
"""
config_file = mock.MagicMock()
config_file.return_value = routing_config
rpc_driver = mock.Mock()
rpc2_driver = mock.Mock()
bar_driver = mock.Mock()
pm = dispatch.DispatchExtensionManager.make_test_instance(
[extension.Extension('rpc', None, None, rpc_driver),
extension.Extension('rpc2', None, None, rpc2_driver),
extension.Extension('bar', None, None, bar_driver)],
)
with mock.patch.object(self.router, '_get_notifier_config_file',
config_file):
with mock.patch('stevedore.dispatch.DispatchExtensionManager',
return_value=pm):
self.notifier.info({}, 'my_event', {})
self.assertFalse(bar_driver.info.called)
rpc_driver.notify.assert_called_once_with(
{}, mock.ANY, 'INFO', None)
rpc2_driver.notify.assert_called_once_with(
{}, mock.ANY, 'INFO', None)

View File

View File

@ -1,519 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testscenarios
from oslo import messaging
from oslo.messaging import exceptions
from oslo.messaging import serializer as msg_serializer
from oslo_config import cfg
from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
class _FakeTransport(object):
def __init__(self, conf):
self.conf = conf
def _send(self, *args, **kwargs):
pass
class TestCastCall(test_utils.BaseTestCase):
scenarios = [
('cast_no_ctxt_no_args', dict(call=False, ctxt={}, args={})),
('call_no_ctxt_no_args', dict(call=True, ctxt={}, args={})),
('cast_ctxt_and_args',
dict(call=False,
ctxt=dict(user='testuser', project='testtenant'),
args=dict(bar='blaa', foobar=11.01))),
('call_ctxt_and_args',
dict(call=True,
ctxt=dict(user='testuser', project='testtenant'),
args=dict(bar='blaa', foobar=11.01))),
]
def test_cast_call(self):
self.config(rpc_response_timeout=None)
transport = _FakeTransport(self.conf)
client = messaging.RPCClient(transport, messaging.Target())
self.mox.StubOutWithMock(transport, '_send')
msg = dict(method='foo', args=self.args)
kwargs = {'retry': None}
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
transport._send(messaging.Target(), self.ctxt, msg, **kwargs)
self.mox.ReplayAll()
method = client.call if self.call else client.cast
method(self.ctxt, 'foo', **self.args)
class TestCastToTarget(test_utils.BaseTestCase):
_base = [
('all_none', dict(ctor={}, prepare={}, expect={})),
('ctor_exchange',
dict(ctor=dict(exchange='testexchange'),
prepare={},
expect=dict(exchange='testexchange'))),
('prepare_exchange',
dict(ctor={},
prepare=dict(exchange='testexchange'),
expect=dict(exchange='testexchange'))),
('prepare_exchange_none',
dict(ctor=dict(exchange='testexchange'),
prepare=dict(exchange=None),
expect={})),
('both_exchange',
dict(ctor=dict(exchange='ctorexchange'),
prepare=dict(exchange='testexchange'),
expect=dict(exchange='testexchange'))),
('ctor_topic',
dict(ctor=dict(topic='testtopic'),
prepare={},
expect=dict(topic='testtopic'))),
('prepare_topic',
dict(ctor={},
prepare=dict(topic='testtopic'),
expect=dict(topic='testtopic'))),
('prepare_topic_none',
dict(ctor=dict(topic='testtopic'),
prepare=dict(topic=None),
expect={})),
('both_topic',
dict(ctor=dict(topic='ctortopic'),
prepare=dict(topic='testtopic'),
expect=dict(topic='testtopic'))),
('ctor_namespace',
dict(ctor=dict(namespace='testnamespace'),
prepare={},
expect=dict(namespace='testnamespace'))),
('prepare_namespace',
dict(ctor={},
prepare=dict(namespace='testnamespace'),
expect=dict(namespace='testnamespace'))),
('prepare_namespace_none',
dict(ctor=dict(namespace='testnamespace'),
prepare=dict(namespace=None),
expect={})),
('both_namespace',
dict(ctor=dict(namespace='ctornamespace'),
prepare=dict(namespace='testnamespace'),
expect=dict(namespace='testnamespace'))),
('ctor_version',
dict(ctor=dict(version='1.1'),
prepare={},
expect=dict(version='1.1'))),
('prepare_version',
dict(ctor={},
prepare=dict(version='1.1'),
expect=dict(version='1.1'))),
('prepare_version_none',
dict(ctor=dict(version='1.1'),
prepare=dict(version=None),
expect={})),
('both_version',
dict(ctor=dict(version='ctorversion'),
prepare=dict(version='1.1'),
expect=dict(version='1.1'))),
('ctor_server',
dict(ctor=dict(server='testserver'),
prepare={},
expect=dict(server='testserver'))),
('prepare_server',
dict(ctor={},
prepare=dict(server='testserver'),
expect=dict(server='testserver'))),
('prepare_server_none',
dict(ctor=dict(server='testserver'),
prepare=dict(server=None),
expect={})),
('both_server',
dict(ctor=dict(server='ctorserver'),
prepare=dict(server='testserver'),
expect=dict(server='testserver'))),
('ctor_fanout',
dict(ctor=dict(fanout=True),
prepare={},
expect=dict(fanout=True))),
('prepare_fanout',
dict(ctor={},
prepare=dict(fanout=True),
expect=dict(fanout=True))),
('prepare_fanout_none',
dict(ctor=dict(fanout=True),
prepare=dict(fanout=None),
expect={})),
('both_fanout',
dict(ctor=dict(fanout=True),
prepare=dict(fanout=False),
expect=dict(fanout=False))),
]
_prepare = [
('single_prepare', dict(double_prepare=False)),
('double_prepare', dict(double_prepare=True)),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._base,
cls._prepare)
def setUp(self):
super(TestCastToTarget, self).setUp(conf=cfg.ConfigOpts())
def test_cast_to_target(self):
target = messaging.Target(**self.ctor)
expect_target = messaging.Target(**self.expect)
transport = _FakeTransport(self.conf)
client = messaging.RPCClient(transport, target)
self.mox.StubOutWithMock(transport, '_send')
msg = dict(method='foo', args={})
if 'namespace' in self.expect:
msg['namespace'] = self.expect['namespace']
if 'version' in self.expect:
msg['version'] = self.expect['version']
transport._send(expect_target, {}, msg, retry=None)
self.mox.ReplayAll()
if self.prepare:
client = client.prepare(**self.prepare)
if self.double_prepare:
client = client.prepare(**self.prepare)
client.cast({}, 'foo')
TestCastToTarget.generate_scenarios()
_notset = object()
class TestCallTimeout(test_utils.BaseTestCase):
scenarios = [
('all_none',
dict(confval=None, ctor=None, prepare=_notset, expect=None)),
('confval',
dict(confval=21.1, ctor=None, prepare=_notset, expect=21.1)),
('ctor',
dict(confval=None, ctor=21.1, prepare=_notset, expect=21.1)),
('ctor_zero',
dict(confval=None, ctor=0, prepare=_notset, expect=0)),
('prepare',
dict(confval=None, ctor=None, prepare=21.1, expect=21.1)),
('prepare_override',
dict(confval=None, ctor=10.1, prepare=21.1, expect=21.1)),
('prepare_zero',
dict(confval=None, ctor=None, prepare=0, expect=0)),
]
def test_call_timeout(self):
self.config(rpc_response_timeout=self.confval)
transport = _FakeTransport(self.conf)
client = messaging.RPCClient(transport, messaging.Target(),
timeout=self.ctor)
self.mox.StubOutWithMock(transport, '_send')
msg = dict(method='foo', args={})
kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None)
transport._send(messaging.Target(), {}, msg, **kwargs)
self.mox.ReplayAll()
if self.prepare is not _notset:
client = client.prepare(timeout=self.prepare)
client.call({}, 'foo')
class TestCallRetry(test_utils.BaseTestCase):
scenarios = [
('all_none', dict(ctor=None, prepare=_notset, expect=None)),
('ctor', dict(ctor=21, prepare=_notset, expect=21)),
('ctor_zero', dict(ctor=0, prepare=_notset, expect=0)),
('prepare', dict(ctor=None, prepare=21, expect=21)),
('prepare_override', dict(ctor=10, prepare=21, expect=21)),
('prepare_zero', dict(ctor=None, prepare=0, expect=0)),
]
def test_call_retry(self):
transport = _FakeTransport(self.conf)
client = messaging.RPCClient(transport, messaging.Target(),
retry=self.ctor)
self.mox.StubOutWithMock(transport, '_send')
msg = dict(method='foo', args={})
kwargs = dict(wait_for_reply=True, timeout=60,
retry=self.expect)
transport._send(messaging.Target(), {}, msg, **kwargs)
self.mox.ReplayAll()
if self.prepare is not _notset:
client = client.prepare(retry=self.prepare)
client.call({}, 'foo')
class TestCallFanout(test_utils.BaseTestCase):
scenarios = [
('target', dict(prepare=_notset, target={'fanout': True})),
('prepare', dict(prepare={'fanout': True}, target={})),
('both', dict(prepare={'fanout': True}, target={'fanout': True})),
]
def test_call_fanout(self):
transport = _FakeTransport(self.conf)
client = messaging.RPCClient(transport,
messaging.Target(**self.target))
if self.prepare is not _notset:
client = client.prepare(**self.prepare)
self.assertRaises(exceptions.InvalidTarget,
client.call, {}, 'foo')
class TestSerializer(test_utils.BaseTestCase):
scenarios = [
('cast',
dict(call=False,
ctxt=dict(user='bob'),
args=dict(a='a', b='b', c='c'),
retval=None)),
('call',
dict(call=True,
ctxt=dict(user='bob'),
args=dict(a='a', b='b', c='c'),
retval='d')),
]
def test_call_serializer(self):
self.config(rpc_response_timeout=None)
transport = _FakeTransport(self.conf)
serializer = msg_serializer.NoOpSerializer()
client = messaging.RPCClient(transport, messaging.Target(),
serializer=serializer)
self.mox.StubOutWithMock(transport, '_send')
msg = dict(method='foo',
args=dict([(k, 's' + v) for k, v in self.args.items()]))
kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {}
kwargs['retry'] = None
transport._send(messaging.Target(),
dict(user='alice'),
msg,
**kwargs).AndReturn(self.retval)
self.mox.StubOutWithMock(serializer, 'serialize_entity')
self.mox.StubOutWithMock(serializer, 'deserialize_entity')
self.mox.StubOutWithMock(serializer, 'serialize_context')
for arg in self.args:
serializer.serialize_entity(self.ctxt, arg).AndReturn('s' + arg)
if self.call:
serializer.deserialize_entity(self.ctxt, self.retval).\
AndReturn('d' + self.retval)
serializer.serialize_context(self.ctxt).AndReturn(dict(user='alice'))
self.mox.ReplayAll()
method = client.call if self.call else client.cast
retval = method(self.ctxt, 'foo', **self.args)
if self.retval is not None:
self.assertEqual('d' + self.retval, retval)
class TestVersionCap(test_utils.BaseTestCase):
_call_vs_cast = [
('call', dict(call=True)),
('cast', dict(call=False)),
]
_cap_scenarios = [
('all_none',
dict(cap=None, prepare_cap=_notset,
version=None, prepare_version=_notset,
success=True)),
('ctor_cap_ok',
dict(cap='1.1', prepare_cap=_notset,
version='1.0', prepare_version=_notset,
success=True)),
('ctor_cap_override_ok',
dict(cap='2.0', prepare_cap='1.1',
version='1.0', prepare_version='1.0',
success=True)),
('ctor_cap_override_none_ok',
dict(cap='1.1', prepare_cap=None,
version='1.0', prepare_version=_notset,
success=True)),
('ctor_cap_minor_fail',
dict(cap='1.0', prepare_cap=_notset,
version='1.1', prepare_version=_notset,
success=False)),
('ctor_cap_major_fail',
dict(cap='2.0', prepare_cap=_notset,
version=None, prepare_version='1.0',
success=False)),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = (
testscenarios.multiply_scenarios(cls._call_vs_cast,
cls._cap_scenarios))
def test_version_cap(self):
self.config(rpc_response_timeout=None)
transport = _FakeTransport(self.conf)
target = messaging.Target(version=self.version)
client = messaging.RPCClient(transport, target,
version_cap=self.cap)
if self.success:
self.mox.StubOutWithMock(transport, '_send')
if self.prepare_version is not _notset:
target = target(version=self.prepare_version)
msg = dict(method='foo', args={})
if target.version is not None:
msg['version'] = target.version
kwargs = {'retry': None}
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
transport._send(target, {}, msg, **kwargs)
self.mox.ReplayAll()
prep_kwargs = {}
if self.prepare_cap is not _notset:
prep_kwargs['version_cap'] = self.prepare_cap
if self.prepare_version is not _notset:
prep_kwargs['version'] = self.prepare_version
if prep_kwargs:
client = client.prepare(**prep_kwargs)
method = client.call if self.call else client.cast
try:
method({}, 'foo')
except Exception as ex:
self.assertIsInstance(ex, messaging.RPCVersionCapError, ex)
self.assertFalse(self.success)
else:
self.assertTrue(self.success)
TestVersionCap.generate_scenarios()
class TestCanSendVersion(test_utils.BaseTestCase):
scenarios = [
('all_none',
dict(cap=None, prepare_cap=_notset,
version=None, prepare_version=_notset,
can_send_version=_notset,
can_send=True)),
('ctor_cap_ok',
dict(cap='1.1', prepare_cap=_notset,
version='1.0', prepare_version=_notset,
can_send_version=_notset,
can_send=True)),
('ctor_cap_override_ok',
dict(cap='2.0', prepare_cap='1.1',
version='1.0', prepare_version='1.0',
can_send_version=_notset,
can_send=True)),
('ctor_cap_override_none_ok',
dict(cap='1.1', prepare_cap=None,
version='1.0', prepare_version=_notset,
can_send_version=_notset,
can_send=True)),
('ctor_cap_can_send_ok',
dict(cap='1.1', prepare_cap=None,
version='1.0', prepare_version=_notset,
can_send_version='1.1',
can_send=True)),
('ctor_cap_can_send_none_ok',
dict(cap='1.1', prepare_cap=None,
version='1.0', prepare_version=_notset,
can_send_version=None,
can_send=True)),
('ctor_cap_minor_fail',
dict(cap='1.0', prepare_cap=_notset,
version='1.1', prepare_version=_notset,
can_send_version=_notset,
can_send=False)),
('ctor_cap_major_fail',
dict(cap='2.0', prepare_cap=_notset,
version=None, prepare_version='1.0',
can_send_version=_notset,
can_send=False)),
]
def test_version_cap(self):
self.config(rpc_response_timeout=None)
transport = _FakeTransport(self.conf)
target = messaging.Target(version=self.version)
client = messaging.RPCClient(transport, target,
version_cap=self.cap)
prep_kwargs = {}
if self.prepare_cap is not _notset:
prep_kwargs['version_cap'] = self.prepare_cap
if self.prepare_version is not _notset:
prep_kwargs['version'] = self.prepare_version
if prep_kwargs:
client = client.prepare(**prep_kwargs)
if self.can_send_version is not _notset:
can_send = client.can_send_version(version=self.can_send_version)
else:
can_send = client.can_send_version()
self.assertEqual(self.can_send, can_send)

View File

@ -1,179 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testscenarios
from oslo import messaging
from oslo.messaging import serializer as msg_serializer
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
load_tests = testscenarios.load_tests_apply_scenarios
class _FakeEndpoint(object):
def __init__(self, target=None):
self.target = target
def foo(self, ctxt, **kwargs):
pass
def bar(self, ctxt, **kwargs):
pass
class TestDispatcher(test_utils.BaseTestCase):
scenarios = [
('no_endpoints',
dict(endpoints=[],
dispatch_to=None,
ctxt={}, msg=dict(method='foo'),
success=False, ex=messaging.UnsupportedVersion)),
('default_target',
dict(endpoints=[{}],
dispatch_to=dict(endpoint=0, method='foo'),
ctxt={}, msg=dict(method='foo'),
success=True, ex=None)),
('default_target_ctxt_and_args',
dict(endpoints=[{}],
dispatch_to=dict(endpoint=0, method='bar'),
ctxt=dict(user='bob'), msg=dict(method='bar',
args=dict(blaa=True)),
success=True, ex=None)),
('default_target_namespace',
dict(endpoints=[{}],
dispatch_to=dict(endpoint=0, method='foo'),
ctxt={}, msg=dict(method='foo', namespace=None),
success=True, ex=None)),
('default_target_version',
dict(endpoints=[{}],
dispatch_to=dict(endpoint=0, method='foo'),
ctxt={}, msg=dict(method='foo', version='1.0'),
success=True, ex=None)),
('default_target_no_such_method',
dict(endpoints=[{}],
dispatch_to=None,
ctxt={}, msg=dict(method='foobar'),
success=False, ex=messaging.NoSuchMethod)),
('namespace',
dict(endpoints=[{}, dict(namespace='testns')],
dispatch_to=dict(endpoint=1, method='foo'),
ctxt={}, msg=dict(method='foo', namespace='testns'),
success=True, ex=None)),
('namespace_mismatch',
dict(endpoints=[{}, dict(namespace='testns')],
dispatch_to=None,
ctxt={}, msg=dict(method='foo', namespace='nstest'),
success=False, ex=messaging.UnsupportedVersion)),
('version',
dict(endpoints=[dict(version='1.5'), dict(version='3.4')],
dispatch_to=dict(endpoint=1, method='foo'),
ctxt={}, msg=dict(method='foo', version='3.2'),
success=True, ex=None)),
('version_mismatch',
dict(endpoints=[dict(version='1.5'), dict(version='3.0')],
dispatch_to=None,
ctxt={}, msg=dict(method='foo', version='3.2'),
success=False, ex=messaging.UnsupportedVersion)),
]
def test_dispatcher(self):
endpoints = [mock.Mock(spec=_FakeEndpoint,
target=messaging.Target(**e))
for e in self.endpoints]
serializer = None
target = messaging.Target()
dispatcher = messaging.RPCDispatcher(target, endpoints, serializer)
def check_reply(reply=None, failure=None, log_failure=True):
if self.ex and failure is not None:
ex = failure[1]
self.assertFalse(self.success, ex)
self.assertIsNotNone(self.ex, ex)
self.assertIsInstance(ex, self.ex, ex)
if isinstance(ex, messaging.NoSuchMethod):
self.assertEqual(self.msg.get('method'), ex.method)
elif isinstance(ex, messaging.UnsupportedVersion):
self.assertEqual(self.msg.get('version', '1.0'),
ex.version)
if ex.method:
self.assertEqual(self.msg.get('method'), ex.method)
else:
self.assertTrue(self.success, failure)
self.assertIsNone(failure)
incoming = mock.Mock(ctxt=self.ctxt, message=self.msg)
incoming.reply.side_effect = check_reply
callback = dispatcher(incoming)
callback.run()
callback.done()
for n, endpoint in enumerate(endpoints):
for method_name in ['foo', 'bar']:
method = getattr(endpoint, method_name)
if self.dispatch_to and n == self.dispatch_to['endpoint'] and \
method_name == self.dispatch_to['method']:
method.assert_called_once_with(
self.ctxt, **self.msg.get('args', {}))
else:
self.assertEqual(0, method.call_count)
self.assertEqual(1, incoming.reply.call_count)
class TestSerializer(test_utils.BaseTestCase):
scenarios = [
('no_args_or_retval',
dict(ctxt={}, dctxt={}, args={}, retval=None)),
('args_and_retval',
dict(ctxt=dict(user='bob'),
dctxt=dict(user='alice'),
args=dict(a='a', b='b', c='c'),
retval='d')),
]
def test_serializer(self):
endpoint = _FakeEndpoint()
serializer = msg_serializer.NoOpSerializer()
target = messaging.Target()
dispatcher = messaging.RPCDispatcher(target, [endpoint], serializer)
self.mox.StubOutWithMock(endpoint, 'foo')
args = dict([(k, 'd' + v) for k, v in self.args.items()])
endpoint.foo(self.dctxt, **args).AndReturn(self.retval)
self.mox.StubOutWithMock(serializer, 'serialize_entity')
self.mox.StubOutWithMock(serializer, 'deserialize_entity')
self.mox.StubOutWithMock(serializer, 'deserialize_context')
serializer.deserialize_context(self.ctxt).AndReturn(self.dctxt)
for arg in self.args:
serializer.deserialize_entity(self.dctxt, arg).AndReturn('d' + arg)
serializer.serialize_entity(self.dctxt, self.retval).\
AndReturn('s' + self.retval if self.retval else None)
self.mox.ReplayAll()
retval = dispatcher._dispatch(self.ctxt, dict(method='foo',
args=self.args))
if self.retval is not None:
self.assertEqual('s' + self.retval, retval)

View File

@ -1,503 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import testscenarios
from oslo import messaging
from oslo_config import cfg
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
load_tests = testscenarios.load_tests_apply_scenarios
class ServerSetupMixin(object):
class Server(object):
def __init__(self, transport, topic, server, endpoint, serializer):
target = messaging.Target(topic=topic, server=server)
self._server = messaging.get_rpc_server(transport,
target,
[endpoint, self],
serializer=serializer)
def stop(self, ctxt):
# Check start() does nothing with a running server
self._server.start()
self._server.stop()
self._server.wait()
def start(self):
self._server.start()
class TestSerializer(object):
def serialize_entity(self, ctxt, entity):
return ('s' + entity) if entity else entity
def deserialize_entity(self, ctxt, entity):
return ('d' + entity) if entity else entity
def serialize_context(self, ctxt):
return dict([(k, 's' + v) for k, v in ctxt.items()])
def deserialize_context(self, ctxt):
return dict([(k, 'd' + v) for k, v in ctxt.items()])
def __init__(self):
self.serializer = self.TestSerializer()
def _setup_server(self, transport, endpoint, topic=None, server=None):
server = self.Server(transport,
topic=topic or 'testtopic',
server=server or 'testserver',
endpoint=endpoint,
serializer=self.serializer)
thread = threading.Thread(target=server.start)
thread.daemon = True
thread.start()
return thread
def _stop_server(self, client, server_thread, topic=None):
if topic is not None:
client = client.prepare(topic=topic)
client.cast({}, 'stop')
server_thread.join(timeout=30)
def _setup_client(self, transport, topic='testtopic'):
return messaging.RPCClient(transport,
messaging.Target(topic=topic),
serializer=self.serializer)
class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
def __init__(self, *args):
super(TestRPCServer, self).__init__(*args)
ServerSetupMixin.__init__(self)
def setUp(self):
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
def test_constructor(self):
transport = messaging.get_transport(self.conf, url='fake:')
target = messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
server = messaging.get_rpc_server(transport, target, endpoints,
serializer=serializer)
self.assertIs(server.conf, self.conf)
self.assertIs(server.transport, transport)
self.assertIsInstance(server.dispatcher, messaging.RPCDispatcher)
self.assertIs(server.dispatcher.endpoints, endpoints)
self.assertIs(server.dispatcher.serializer, serializer)
self.assertEqual('blocking', server.executor)
def test_server_wait_method(self):
transport = messaging.get_transport(self.conf, url='fake:')
target = messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
server = messaging.get_rpc_server(transport, target, endpoints,
serializer=serializer)
# Mocking executor
server._executor = mock.Mock()
# Here assigning executor's listener object to listener variable
# before calling wait method, beacuse in wait method we are
# setting executor to None.
listener = server._executor.listener
# call server wait method
server.wait()
self.assertIsNone(server._executor)
self.assertEqual(1, listener.cleanup.call_count)
def test_no_target_server(self):
transport = messaging.get_transport(self.conf, url='fake:')
server = messaging.get_rpc_server(transport,
messaging.Target(topic='testtopic'),
[])
try:
server.start()
except Exception as ex:
self.assertIsInstance(ex, messaging.InvalidTarget, ex)
self.assertEqual('testtopic', ex.target.topic)
else:
self.assertTrue(False)
def test_no_server_topic(self):
transport = messaging.get_transport(self.conf, url='fake:')
target = messaging.Target(server='testserver')
server = messaging.get_rpc_server(transport, target, [])
try:
server.start()
except Exception as ex:
self.assertIsInstance(ex, messaging.InvalidTarget, ex)
self.assertEqual('testserver', ex.target.server)
else:
self.assertTrue(False)
def _test_no_client_topic(self, call=True):
transport = messaging.get_transport(self.conf, url='fake:')
client = self._setup_client(transport, topic=None)
method = client.call if call else client.cast
try:
method({}, 'ping', arg='foo')
except Exception as ex:
self.assertIsInstance(ex, messaging.InvalidTarget, ex)
self.assertIsNotNone(ex.target)
else:
self.assertTrue(False)
def test_no_client_topic_call(self):
self._test_no_client_topic(call=True)
def test_no_client_topic_cast(self):
self._test_no_client_topic(call=False)
def test_client_call_timeout(self):
transport = messaging.get_transport(self.conf, url='fake:')
finished = False
wait = threading.Condition()
class TestEndpoint(object):
def ping(self, ctxt, arg):
with wait:
if not finished:
wait.wait()
server_thread = self._setup_server(transport, TestEndpoint())
client = self._setup_client(transport)
try:
client.prepare(timeout=0).call({}, 'ping', arg='foo')
except Exception as ex:
self.assertIsInstance(ex, messaging.MessagingTimeout, ex)
else:
self.assertTrue(False)
with wait:
finished = True
wait.notify()
self._stop_server(client, server_thread)
def test_unknown_executor(self):
transport = messaging.get_transport(self.conf, url='fake:')
try:
messaging.get_rpc_server(transport, None, [], executor='foo')
except Exception as ex:
self.assertIsInstance(ex, messaging.ExecutorLoadFailure)
self.assertEqual('foo', ex.executor)
else:
self.assertTrue(False)
def test_cast(self):
transport = messaging.get_transport(self.conf, url='fake:')
class TestEndpoint(object):
def __init__(self):
self.pings = []
def ping(self, ctxt, arg):
self.pings.append(arg)
endpoint = TestEndpoint()
server_thread = self._setup_server(transport, endpoint)
client = self._setup_client(transport)
client.cast({}, 'ping', arg='foo')
client.cast({}, 'ping', arg='bar')
self._stop_server(client, server_thread)
self.assertEqual(['dsfoo', 'dsbar'], endpoint.pings)
def test_call(self):
transport = messaging.get_transport(self.conf, url='fake:')
class TestEndpoint(object):
def ping(self, ctxt, arg):
return arg
server_thread = self._setup_server(transport, TestEndpoint())
client = self._setup_client(transport)
self.assertIsNone(client.call({}, 'ping', arg=None))
self.assertEqual(0, client.call({}, 'ping', arg=0))
self.assertEqual(False, client.call({}, 'ping', arg=False))
self.assertEqual([], client.call({}, 'ping', arg=[]))
self.assertEqual({}, client.call({}, 'ping', arg={}))
self.assertEqual('dsdsfoo', client.call({}, 'ping', arg='foo'))
self._stop_server(client, server_thread)
def test_direct_call(self):
transport = messaging.get_transport(self.conf, url='fake:')
class TestEndpoint(object):
def ping(self, ctxt, arg):
return arg
server_thread = self._setup_server(transport, TestEndpoint())
client = self._setup_client(transport)
direct = client.prepare(server='testserver')
self.assertIsNone(direct.call({}, 'ping', arg=None))
self.assertEqual(0, client.call({}, 'ping', arg=0))
self.assertEqual(False, client.call({}, 'ping', arg=False))
self.assertEqual([], client.call({}, 'ping', arg=[]))
self.assertEqual({}, client.call({}, 'ping', arg={}))
self.assertEqual('dsdsfoo', direct.call({}, 'ping', arg='foo'))
self._stop_server(client, server_thread)
def test_context(self):
transport = messaging.get_transport(self.conf, url='fake:')
class TestEndpoint(object):
def ctxt_check(self, ctxt, key):
return ctxt[key]
server_thread = self._setup_server(transport, TestEndpoint())
client = self._setup_client(transport)
self.assertEqual('dsdsb',
client.call({'dsa': 'b'},
'ctxt_check',
key='a'))
self._stop_server(client, server_thread)
def test_failure(self):
transport = messaging.get_transport(self.conf, url='fake:')
class TestEndpoint(object):
def ping(self, ctxt, arg):
raise ValueError(arg)
server_thread = self._setup_server(transport, TestEndpoint())
client = self._setup_client(transport)
try:
client.call({}, 'ping', arg='foo')
except Exception as ex:
self.assertIsInstance(ex, ValueError)
self.assertEqual('dsfoo', str(ex))
else:
self.assertTrue(False)
self._stop_server(client, server_thread)
def test_expected_failure(self):
transport = messaging.get_transport(self.conf, url='fake:')
class TestEndpoint(object):
@messaging.expected_exceptions(ValueError)
def ping(self, ctxt, arg):
raise ValueError(arg)
server_thread = self._setup_server(transport, TestEndpoint())
client = self._setup_client(transport)
try:
client.call({}, 'ping', arg='foo')
except Exception as ex:
self.assertIsInstance(ex, ValueError)
self.assertEqual('dsfoo', str(ex))
else:
self.assertTrue(False)
self._stop_server(client, server_thread)
class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
_exchanges = [
('same_exchange', dict(exchange1=None, exchange2=None)),
('diff_exchange', dict(exchange1='x1', exchange2='x2')),
]
_topics = [
('same_topic', dict(topic1='t', topic2='t')),
('diff_topic', dict(topic1='t1', topic2='t2')),
]
_server = [
('same_server', dict(server1=None, server2=None)),
('diff_server', dict(server1='s1', server2='s2')),
]
_fanout = [
('not_fanout', dict(fanout1=None, fanout2=None)),
('fanout', dict(fanout1=True, fanout2=True)),
]
_method = [
('call', dict(call1=True, call2=True)),
('cast', dict(call1=False, call2=False)),
]
_endpoints = [
('one_endpoint',
dict(multi_endpoints=False,
expect1=['ds1', 'ds2'],
expect2=['ds1', 'ds2'])),
('two_endpoints',
dict(multi_endpoints=True,
expect1=['ds1'],
expect2=['ds2'])),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._exchanges,
cls._topics,
cls._server,
cls._fanout,
cls._method,
cls._endpoints)
# fanout call not supported
def filter_fanout_call(scenario):
params = scenario[1]
fanout = params['fanout1'] or params['fanout2']
call = params['call1'] or params['call2']
return not (call and fanout)
# listening multiple times on same topic/server pair not supported
def filter_same_topic_and_server(scenario):
params = scenario[1]
single_topic = params['topic1'] == params['topic2']
single_server = params['server1'] == params['server2']
return not (single_topic and single_server)
# fanout to multiple servers on same topic and exchange
# each endpoint will receive both messages
def fanout_to_servers(scenario):
params = scenario[1]
fanout = params['fanout1'] or params['fanout2']
single_exchange = params['exchange1'] == params['exchange2']
single_topic = params['topic1'] == params['topic2']
multi_servers = params['server1'] != params['server2']
if fanout and single_exchange and single_topic and multi_servers:
params['expect1'] = params['expect1'][:] + params['expect1']
params['expect2'] = params['expect2'][:] + params['expect2']
return scenario
# multiple endpoints on same topic and exchange
# either endpoint can get either message
def single_topic_multi_endpoints(scenario):
params = scenario[1]
single_exchange = params['exchange1'] == params['exchange2']
single_topic = params['topic1'] == params['topic2']
if single_topic and single_exchange and params['multi_endpoints']:
params['expect_either'] = (params['expect1'] +
params['expect2'])
params['expect1'] = params['expect2'] = []
else:
params['expect_either'] = []
return scenario
for f in [filter_fanout_call, filter_same_topic_and_server]:
cls.scenarios = filter(f, cls.scenarios)
for m in [fanout_to_servers, single_topic_multi_endpoints]:
cls.scenarios = map(m, cls.scenarios)
def __init__(self, *args):
super(TestMultipleServers, self).__init__(*args)
ServerSetupMixin.__init__(self)
def setUp(self):
super(TestMultipleServers, self).setUp(conf=cfg.ConfigOpts())
def test_multiple_servers(self):
url1 = 'fake:///' + (self.exchange1 or '')
url2 = 'fake:///' + (self.exchange2 or '')
transport1 = messaging.get_transport(self.conf, url=url1)
if url1 != url2:
transport2 = messaging.get_transport(self.conf, url=url1)
else:
transport2 = transport1
class TestEndpoint(object):
def __init__(self):
self.pings = []
def ping(self, ctxt, arg):
self.pings.append(arg)
def alive(self, ctxt):
return 'alive'
if self.multi_endpoints:
endpoint1, endpoint2 = TestEndpoint(), TestEndpoint()
else:
endpoint1 = endpoint2 = TestEndpoint()
thread1 = self._setup_server(transport1, endpoint1,
topic=self.topic1, server=self.server1)
thread2 = self._setup_server(transport2, endpoint2,
topic=self.topic2, server=self.server2)
client1 = self._setup_client(transport1, topic=self.topic1)
client2 = self._setup_client(transport2, topic=self.topic2)
client1 = client1.prepare(server=self.server1)
client2 = client2.prepare(server=self.server2)
if self.fanout1:
client1.call({}, 'alive')
client1 = client1.prepare(fanout=True)
if self.fanout2:
client2.call({}, 'alive')
client2 = client2.prepare(fanout=True)
(client1.call if self.call1 else client1.cast)({}, 'ping', arg='1')
(client2.call if self.call2 else client2.cast)({}, 'ping', arg='2')
self.assertTrue(thread1.isAlive())
self._stop_server(client1.prepare(fanout=None),
thread1, topic=self.topic1)
self.assertTrue(thread2.isAlive())
self._stop_server(client2.prepare(fanout=None),
thread2, topic=self.topic2)
def check(pings, expect):
self.assertEqual(len(expect), len(pings))
for a in expect:
self.assertIn(a, pings)
if self.expect_either:
check(endpoint1.pings + endpoint2.pings, self.expect_either)
else:
check(endpoint1.pings, self.expect1)
check(endpoint2.pings, self.expect2)
TestMultipleServers.generate_scenarios()

View File

@ -1,738 +0,0 @@
# Copyright (C) 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import select
import socket
import threading
import time
import uuid
import six
from six import moves
import testtools
from oslo import messaging
from oslo_messaging.tests import utils as test_utils
if six.PY2:
# NOTE(flaper87): pyngus currently doesn't support py34. It's
# on the works, though.
from oslo_messaging._drivers.protocols.amqp import driver as amqp_driver
import pyngus
LOG = logging.getLogger(__name__)
class _ListenerThread(threading.Thread):
"""Run a blocking listener in a thread."""
def __init__(self, listener, msg_count):
super(_ListenerThread, self).__init__()
self.listener = listener
self.msg_count = msg_count
self.messages = moves.queue.Queue()
self.daemon = True
self.start()
def run(self):
LOG.debug("Listener started")
while self.msg_count > 0:
in_msg = self.listener.poll()
self.messages.put(in_msg)
self.msg_count -= 1
if in_msg.message.get('method') == 'echo':
in_msg.reply(reply={'correlation-id':
in_msg.message.get('id')})
LOG.debug("Listener stopped")
def get_messages(self):
"""Returns a list of all received messages."""
msgs = []
try:
while True:
m = self.messages.get(False)
msgs.append(m)
except moves.queue.Empty:
pass
return msgs
@testtools.skipUnless(six.PY2, "No Py3K support yet")
class TestProtonDriverLoad(test_utils.BaseTestCase):
def setUp(self):
super(TestProtonDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'amqp'
def test_driver_load(self):
transport = messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver,
amqp_driver.ProtonDriver)
class _AmqpBrokerTestCase(test_utils.BaseTestCase):
@testtools.skipUnless(six.PY2, "No Py3K support yet")
def setUp(self):
super(_AmqpBrokerTestCase, self).setUp()
self._broker = FakeBroker()
self._broker_addr = "amqp://%s:%d" % (self._broker.host,
self._broker.port)
self._broker_url = messaging.TransportURL.parse(self.conf,
self._broker_addr)
self._broker.start()
def tearDown(self):
super(_AmqpBrokerTestCase, self).tearDown()
self._broker.stop()
class TestAmqpSend(_AmqpBrokerTestCase):
"""Test sending and receiving messages."""
def test_driver_unconnected_cleanup(self):
"""Verify the driver can cleanly shutdown even if never connected."""
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
driver.cleanup()
def test_listener_cleanup(self):
"""Verify unused listener can cleanly shutdown."""
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target = messaging.Target(topic="test-topic")
listener = driver.listen(target)
self.assertIsInstance(listener, amqp_driver.ProtonListener)
driver.cleanup()
def test_send_no_reply(self):
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target = messaging.Target(topic="test-topic")
listener = _ListenerThread(driver.listen(target), 1)
rc = driver.send(target, {"context": True},
{"msg": "value"}, wait_for_reply=False)
self.assertIsNone(rc)
listener.join(timeout=30)
self.assertFalse(listener.isAlive())
self.assertEqual(listener.messages.get().message, {"msg": "value"})
driver.cleanup()
def test_send_exchange_with_reply(self):
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target1 = messaging.Target(topic="test-topic", exchange="e1")
listener1 = _ListenerThread(driver.listen(target1), 1)
target2 = messaging.Target(topic="test-topic", exchange="e2")
listener2 = _ListenerThread(driver.listen(target2), 1)
rc = driver.send(target1, {"context": "whatever"},
{"method": "echo", "id": "e1"},
wait_for_reply=True,
timeout=30)
self.assertIsNotNone(rc)
self.assertEqual(rc.get('correlation-id'), 'e1')
rc = driver.send(target2, {"context": "whatever"},
{"method": "echo", "id": "e2"},
wait_for_reply=True,
timeout=30)
self.assertIsNotNone(rc)
self.assertEqual(rc.get('correlation-id'), 'e2')
listener1.join(timeout=30)
self.assertFalse(listener1.isAlive())
listener2.join(timeout=30)
self.assertFalse(listener2.isAlive())
driver.cleanup()
def test_messaging_patterns(self):
"""Verify the direct, shared, and fanout message patterns work."""
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target1 = messaging.Target(topic="test-topic", server="server1")
listener1 = _ListenerThread(driver.listen(target1), 4)
target2 = messaging.Target(topic="test-topic", server="server2")
listener2 = _ListenerThread(driver.listen(target2), 3)
shared_target = messaging.Target(topic="test-topic")
fanout_target = messaging.Target(topic="test-topic",
fanout=True)
# this should go to only one server:
driver.send(shared_target, {"context": "whatever"},
{"method": "echo", "id": "either-1"},
wait_for_reply=True)
self.assertEqual(self._broker.topic_count, 1)
self.assertEqual(self._broker.direct_count, 1) # reply
# this should go to the other server:
driver.send(shared_target, {"context": "whatever"},
{"method": "echo", "id": "either-2"},
wait_for_reply=True)
self.assertEqual(self._broker.topic_count, 2)
self.assertEqual(self._broker.direct_count, 2) # reply
# these should only go to listener1:
driver.send(target1, {"context": "whatever"},
{"method": "echo", "id": "server1-1"},
wait_for_reply=True)
driver.send(target1, {"context": "whatever"},
{"method": "echo", "id": "server1-2"},
wait_for_reply=True)
self.assertEqual(self._broker.direct_count, 6) # 2X(send+reply)
# this should only go to listener2:
driver.send(target2, {"context": "whatever"},
{"method": "echo", "id": "server2"},
wait_for_reply=True)
self.assertEqual(self._broker.direct_count, 8)
# both listeners should get a copy:
driver.send(fanout_target, {"context": "whatever"},
{"method": "echo", "id": "fanout"})
listener1.join(timeout=30)
self.assertFalse(listener1.isAlive())
listener2.join(timeout=30)
self.assertFalse(listener2.isAlive())
self.assertEqual(self._broker.fanout_count, 1)
listener1_ids = [x.message.get('id') for x in listener1.get_messages()]
listener2_ids = [x.message.get('id') for x in listener2.get_messages()]
self.assertTrue('fanout' in listener1_ids and
'fanout' in listener2_ids)
self.assertTrue('server1-1' in listener1_ids and
'server1-1' not in listener2_ids)
self.assertTrue('server1-2' in listener1_ids and
'server1-2' not in listener2_ids)
self.assertTrue('server2' in listener2_ids and
'server2' not in listener1_ids)
if 'either-1' in listener1_ids:
self.assertTrue('either-2' in listener2_ids and
'either-2' not in listener1_ids and
'either-1' not in listener2_ids)
else:
self.assertTrue('either-2' in listener1_ids and
'either-2' not in listener2_ids and
'either-1' in listener2_ids)
driver.cleanup()
def test_send_timeout(self):
"""Verify send timeout."""
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target = messaging.Target(topic="test-topic")
listener = _ListenerThread(driver.listen(target), 1)
# the listener will drop this message:
try:
driver.send(target,
{"context": "whatever"},
{"method": "drop"},
wait_for_reply=True,
timeout=1.0)
except Exception as ex:
self.assertIsInstance(ex, messaging.MessagingTimeout, ex)
else:
self.assertTrue(False, "No Exception raised!")
listener.join(timeout=30)
self.assertFalse(listener.isAlive())
driver.cleanup()
class TestAmqpNotification(_AmqpBrokerTestCase):
"""Test sending and receiving notifications."""
def test_notification(self):
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
notifications = [(messaging.Target(topic="topic-1"), 'info'),
(messaging.Target(topic="topic-1"), 'error'),
(messaging.Target(topic="topic-2"), 'debug')]
nl = driver.listen_for_notifications(notifications, None)
# send one for each support version:
msg_count = len(notifications) * 2
listener = _ListenerThread(nl, msg_count)
targets = ['topic-1.info',
'topic-1.bad', # will raise MessagingDeliveryFailure
'bad-topic.debug', # will raise MessagingDeliveryFailure
'topic-1.error',
'topic-2.debug']
excepted_targets = []
exception_count = 0
for version in (1.0, 2.0):
for t in targets:
try:
driver.send_notification(messaging.Target(topic=t),
"context", {'target': t},
version)
except messaging.MessageDeliveryFailure:
exception_count += 1
excepted_targets.append(t)
listener.join(timeout=30)
self.assertFalse(listener.isAlive())
topics = [x.message.get('target') for x in listener.get_messages()]
self.assertEqual(len(topics), msg_count)
self.assertEqual(topics.count('topic-1.info'), 2)
self.assertEqual(topics.count('topic-1.error'), 2)
self.assertEqual(topics.count('topic-2.debug'), 2)
self.assertEqual(self._broker.dropped_count, 4)
self.assertEqual(exception_count, 4)
self.assertEqual(excepted_targets.count('topic-1.bad'), 2)
self.assertEqual(excepted_targets.count('bad-topic.debug'), 2)
driver.cleanup()
@testtools.skipUnless(six.PY2, "No Py3K support yet")
class TestAuthentication(test_utils.BaseTestCase):
def setUp(self):
super(TestAuthentication, self).setUp()
# for simplicity, encode the credentials as they would appear 'on the
# wire' in a SASL frame - username and password prefixed by zero.
user_credentials = ["\0joe\0secret"]
self._broker = FakeBroker(sasl_mechanisms="PLAIN",
user_credentials=user_credentials)
self._broker.start()
def tearDown(self):
super(TestAuthentication, self).tearDown()
self._broker.stop()
def test_authentication_ok(self):
"""Verify that username and password given in TransportHost are
accepted by the broker.
"""
addr = "amqp://joe:secret@%s:%d" % (self._broker.host,
self._broker.port)
url = messaging.TransportURL.parse(self.conf, addr)
driver = amqp_driver.ProtonDriver(self.conf, url)
target = messaging.Target(topic="test-topic")
listener = _ListenerThread(driver.listen(target), 1)
rc = driver.send(target, {"context": True},
{"method": "echo"}, wait_for_reply=True)
self.assertIsNotNone(rc)
listener.join(timeout=30)
self.assertFalse(listener.isAlive())
driver.cleanup()
def test_authentication_failure(self):
"""Verify that a bad password given in TransportHost is
rejected by the broker.
"""
addr = "amqp://joe:badpass@%s:%d" % (self._broker.host,
self._broker.port)
url = messaging.TransportURL.parse(self.conf, addr)
driver = amqp_driver.ProtonDriver(self.conf, url)
target = messaging.Target(topic="test-topic")
_ListenerThread(driver.listen(target), 1)
self.assertRaises(messaging.MessagingTimeout,
driver.send,
target, {"context": True},
{"method": "echo"},
wait_for_reply=True,
timeout=2.0)
driver.cleanup()
@testtools.skipUnless(six.PY2, "No Py3K support yet")
class TestFailover(test_utils.BaseTestCase):
def setUp(self):
super(TestFailover, self).setUp()
self._brokers = [FakeBroker(), FakeBroker()]
hosts = []
for broker in self._brokers:
hosts.append(messaging.TransportHost(hostname=broker.host,
port=broker.port))
self._broker_url = messaging.TransportURL(self.conf,
transport="amqp",
hosts=hosts)
def tearDown(self):
super(TestFailover, self).tearDown()
for broker in self._brokers:
if broker.isAlive():
broker.stop()
def test_broker_failover(self):
"""Simulate failover of one broker to another."""
self._brokers[0].start()
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target = messaging.Target(topic="my-topic")
listener = _ListenerThread(driver.listen(target), 2)
rc = driver.send(target, {"context": "whatever"},
{"method": "echo", "id": "echo-1"},
wait_for_reply=True,
timeout=30)
self.assertIsNotNone(rc)
self.assertEqual(rc.get('correlation-id'), 'echo-1')
# 1 request msg, 1 response:
self.assertEqual(self._brokers[0].topic_count, 1)
self.assertEqual(self._brokers[0].direct_count, 1)
# fail broker 0 and start broker 1:
self._brokers[0].stop()
self._brokers[1].start()
deadline = time.time() + 30
responded = False
sequence = 2
while deadline > time.time() and not responded:
if not listener.isAlive():
# listener may have exited after replying to an old correlation
# id: restart new listener
listener = _ListenerThread(driver.listen(target), 1)
try:
rc = driver.send(target, {"context": "whatever"},
{"method": "echo",
"id": "echo-%d" % sequence},
wait_for_reply=True,
timeout=2)
self.assertIsNotNone(rc)
self.assertEqual(rc.get('correlation-id'),
'echo-%d' % sequence)
responded = True
except messaging.MessagingTimeout:
sequence += 1
self.assertTrue(responded)
listener.join(timeout=30)
self.assertFalse(listener.isAlive())
# note: stopping the broker first tests cleaning up driver without a
# connection active
self._brokers[1].stop()
driver.cleanup()
class FakeBroker(threading.Thread):
"""A test AMQP message 'broker'."""
if six.PY2:
class Connection(pyngus.ConnectionEventHandler):
"""A single AMQP connection."""
def __init__(self, server, socket_, name,
sasl_mechanisms, user_credentials):
"""Create a Connection using socket_."""
self.socket = socket_
self.name = name
self.server = server
self.connection = server.container.create_connection(name,
self)
self.connection.user_context = self
self.sasl_mechanisms = sasl_mechanisms
self.user_credentials = user_credentials
if sasl_mechanisms:
self.connection.pn_sasl.mechanisms(sasl_mechanisms)
self.connection.pn_sasl.server()
self.connection.open()
self.sender_links = set()
self.closed = False
def destroy(self):
"""Destroy the test connection."""
while self.sender_links:
link = self.sender_links.pop()
link.destroy()
self.connection.destroy()
self.connection = None
self.socket.close()
def fileno(self):
"""Allows use of this in a select() call."""
return self.socket.fileno()
def process_input(self):
"""Called when socket is read-ready."""
try:
pyngus.read_socket_input(self.connection, self.socket)
except socket.error:
pass
self.connection.process(time.time())
def send_output(self):
"""Called when socket is write-ready."""
try:
pyngus.write_socket_output(self.connection,
self.socket)
except socket.error:
pass
self.connection.process(time.time())
# Pyngus ConnectionEventHandler callbacks:
def connection_remote_closed(self, connection, reason):
"""Peer has closed the connection."""
self.connection.close()
def connection_closed(self, connection):
"""Connection close completed."""
self.closed = True # main loop will destroy
def connection_failed(self, connection, error):
"""Connection failure detected."""
self.connection_closed(connection)
def sender_requested(self, connection, link_handle,
name, requested_source, properties):
"""Create a new message source."""
addr = requested_source or "source-" + uuid.uuid4().hex
link = FakeBroker.SenderLink(self.server, self,
link_handle, addr)
self.sender_links.add(link)
def receiver_requested(self, connection, link_handle,
name, requested_target, properties):
"""Create a new message consumer."""
addr = requested_target or "target-" + uuid.uuid4().hex
FakeBroker.ReceiverLink(self.server, self,
link_handle, addr)
def sasl_step(self, connection, pn_sasl):
if self.sasl_mechanisms == 'PLAIN':
credentials = pn_sasl.recv()
if not credentials:
return # wait until some arrives
if credentials not in self.user_credentials:
# failed
return pn_sasl.done(pn_sasl.AUTH)
pn_sasl.done(pn_sasl.OK)
class SenderLink(pyngus.SenderEventHandler):
"""An AMQP sending link."""
def __init__(self, server, conn, handle, src_addr=None):
self.server = server
cnn = conn.connection
self.link = cnn.accept_sender(handle,
source_override=src_addr,
event_handler=self)
self.link.open()
self.routed = False
def destroy(self):
"""Destroy the link."""
self._cleanup()
if self.link:
self.link.destroy()
self.link = None
def send_message(self, message):
"""Send a message over this link."""
self.link.send(message)
def _cleanup(self):
if self.routed:
self.server.remove_route(self.link.source_address,
self)
self.routed = False
# Pyngus SenderEventHandler callbacks:
def sender_active(self, sender_link):
self.server.add_route(self.link.source_address, self)
self.routed = True
def sender_remote_closed(self, sender_link, error):
self._cleanup()
self.link.close()
def sender_closed(self, sender_link):
self.destroy()
class ReceiverLink(pyngus.ReceiverEventHandler):
"""An AMQP Receiving link."""
def __init__(self, server, conn, handle, addr=None):
self.server = server
cnn = conn.connection
self.link = cnn.accept_receiver(handle,
target_override=addr,
event_handler=self)
self.link.open()
self.link.add_capacity(10)
# ReceiverEventHandler callbacks:
def receiver_remote_closed(self, receiver_link, error):
self.link.close()
def receiver_closed(self, receiver_link):
self.link.destroy()
self.link = None
def message_received(self, receiver_link, message, handle):
"""Forward this message out the proper sending link."""
if self.server.forward_message(message):
self.link.message_accepted(handle)
else:
self.link.message_rejected(handle)
if self.link.capacity < 1:
self.link.add_capacity(10)
def __init__(self, server_prefix="exclusive",
broadcast_prefix="broadcast",
group_prefix="unicast",
address_separator=".",
sock_addr="", sock_port=0,
sasl_mechanisms="ANONYMOUS",
user_credentials=None):
"""Create a fake broker listening on sock_addr:sock_port."""
if not pyngus:
raise AssertionError("pyngus module not present")
threading.Thread.__init__(self)
self._server_prefix = server_prefix + address_separator
self._broadcast_prefix = broadcast_prefix + address_separator
self._group_prefix = group_prefix + address_separator
self._address_separator = address_separator
self._sasl_mechanisms = sasl_mechanisms
self._user_credentials = user_credentials
self._wakeup_pipe = os.pipe()
self._my_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._my_socket.bind((sock_addr, sock_port))
self.host, self.port = self._my_socket.getsockname()
self.container = pyngus.Container("test_server_%s:%d"
% (self.host, self.port))
self._connections = {}
self._sources = {}
# count of messages forwarded, by messaging pattern
self.direct_count = 0
self.topic_count = 0
self.fanout_count = 0
self.dropped_count = 0
def start(self):
"""Start the server."""
LOG.debug("Starting Test Broker on %s:%d", self.host, self.port)
self._shutdown = False
self.daemon = True
self._my_socket.listen(10)
super(FakeBroker, self).start()
def stop(self):
"""Shutdown the server."""
LOG.debug("Stopping test Broker %s:%d", self.host, self.port)
self._shutdown = True
os.write(self._wakeup_pipe[1], "!")
self.join()
LOG.debug("Test Broker %s:%d stopped", self.host, self.port)
def run(self):
"""Process I/O and timer events until the broker is stopped."""
LOG.debug("Test Broker on %s:%d started", self.host, self.port)
while not self._shutdown:
readers, writers, timers = self.container.need_processing()
# map pyngus Connections back to _TestConnections:
readfd = [c.user_context for c in readers]
readfd.extend([self._my_socket, self._wakeup_pipe[0]])
writefd = [c.user_context for c in writers]
timeout = None
if timers:
# [0] == next expiring timer
deadline = timers[0].next_tick
now = time.time()
timeout = 0 if deadline <= now else deadline - now
readable, writable, ignore = select.select(readfd,
writefd,
[],
timeout)
worked = set()
for r in readable:
if r is self._my_socket:
# new inbound connection request received,
# create a new Connection for it:
client_socket, client_address = self._my_socket.accept()
name = str(client_address)
conn = FakeBroker.Connection(self, client_socket, name,
self._sasl_mechanisms,
self._user_credentials)
self._connections[conn.name] = conn
elif r is self._wakeup_pipe[0]:
os.read(self._wakeup_pipe[0], 512)
else:
r.process_input()
worked.add(r)
for t in timers:
now = time.time()
if t.next_tick > now:
break
t.process(now)
conn = t.user_context
worked.add(conn)
for w in writable:
w.send_output()
worked.add(w)
# clean up any closed connections:
while worked:
conn = worked.pop()
if conn.closed:
del self._connections[conn.name]
conn.destroy()
# Shutting down
self._my_socket.close()
for conn in self._connections.itervalues():
conn.destroy()
return 0
def add_route(self, address, link):
# route from address -> link[, link ...]
if address not in self._sources:
self._sources[address] = [link]
elif link not in self._sources[address]:
self._sources[address].append(link)
def remove_route(self, address, link):
if address in self._sources:
if link in self._sources[address]:
self._sources[address].remove(link)
if not self._sources[address]:
del self._sources[address]
def forward_message(self, message):
# returns True if message was routed
dest = message.address
if dest not in self._sources:
self.dropped_count += 1
return False
LOG.debug("Forwarding [%s]", dest)
# route "behavior" determined by prefix:
if dest.startswith(self._broadcast_prefix):
self.fanout_count += 1
for link in self._sources[dest]:
LOG.debug("Broadcast to %s", dest)
link.send_message(message)
elif dest.startswith(self._group_prefix):
# round-robin:
self.topic_count += 1
link = self._sources[dest].pop(0)
link.send_message(message)
LOG.debug("Send to %s", dest)
self._sources[dest].append(link)
else:
# unicast:
self.direct_count += 1
LOG.debug("Unicast to %s", dest)
self._sources[dest][0].send_message(message)
return True

View File

@ -1,307 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import six
import testscenarios
from oslo import messaging
from oslo_messaging._drivers import common as exceptions
from oslo_messaging.tests import utils as test_utils
from oslo_serialization import jsonutils
load_tests = testscenarios.load_tests_apply_scenarios
EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
class NovaStyleException(Exception):
format = 'I am Nova'
def __init__(self, message=None, **kwargs):
self.kwargs = kwargs
if not message:
message = self.format % kwargs
super(NovaStyleException, self).__init__(message)
class KwargsStyleException(NovaStyleException):
format = 'I am %(who)s'
def add_remote_postfix(ex):
ex_type = type(ex)
message = str(ex)
str_override = lambda self: message
new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
{'__str__': str_override,
'__unicode__': str_override})
new_ex_type.__module__ = '%s_Remote' % ex.__class__.__module__
try:
ex.__class__ = new_ex_type
except TypeError:
ex.args = (message,) + ex.args[1:]
return ex
class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase):
_log_failure = [
('log_failure', dict(log_failure=True)),
('do_not_log_failure', dict(log_failure=False)),
]
_add_remote = [
('add_remote', dict(add_remote=True)),
('do_not_add_remote', dict(add_remote=False)),
]
_exception_types = [
('bog_standard', dict(cls=Exception,
args=['test'],
kwargs={},
clsname='Exception',
modname=EXCEPTIONS_MODULE,
msg='test')),
('nova_style', dict(cls=NovaStyleException,
args=[],
kwargs={},
clsname='NovaStyleException',
modname=__name__,
msg='I am Nova')),
('nova_style_with_msg', dict(cls=NovaStyleException,
args=['testing'],
kwargs={},
clsname='NovaStyleException',
modname=__name__,
msg='testing')),
('kwargs_style', dict(cls=KwargsStyleException,
args=[],
kwargs={'who': 'Oslo'},
clsname='KwargsStyleException',
modname=__name__,
msg='I am Oslo')),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._log_failure,
cls._add_remote,
cls._exception_types)
def setUp(self):
super(SerializeRemoteExceptionTestCase, self).setUp()
def test_serialize_remote_exception(self):
errors = []
def stub_error(msg, *a, **kw):
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
a = a[0]
errors.append(str(msg) % a)
self.stubs.Set(exceptions.LOG, 'error', stub_error)
try:
try:
raise self.cls(*self.args, **self.kwargs)
except Exception as ex:
cls_error = ex
if self.add_remote:
ex = add_remote_postfix(ex)
raise ex
except Exception:
exc_info = sys.exc_info()
serialized = exceptions.serialize_remote_exception(
exc_info, log_failure=self.log_failure)
failure = jsonutils.loads(serialized)
self.assertEqual(self.clsname, failure['class'], failure)
self.assertEqual(self.modname, failure['module'])
self.assertEqual(self.msg, failure['message'])
self.assertEqual([self.msg], failure['args'])
self.assertEqual(self.kwargs, failure['kwargs'])
# Note: _Remote prefix not stripped from tracebacks
tb = cls_error.__class__.__name__ + ': ' + self.msg
self.assertIn(tb, ''.join(failure['tb']))
if self.log_failure:
self.assertTrue(len(errors) > 0, errors)
else:
self.assertEqual(0, len(errors), errors)
SerializeRemoteExceptionTestCase.generate_scenarios()
class DeserializeRemoteExceptionTestCase(test_utils.BaseTestCase):
_standard_allowed = [__name__]
scenarios = [
('bog_standard',
dict(allowed=_standard_allowed,
clsname='Exception',
modname=EXCEPTIONS_MODULE,
cls=Exception,
args=['test'],
kwargs={},
str='test\ntraceback\ntraceback\n',
remote_name='Exception',
remote_args=('test\ntraceback\ntraceback\n', ),
remote_kwargs={})),
('nova_style',
dict(allowed=_standard_allowed,
clsname='NovaStyleException',
modname=__name__,
cls=NovaStyleException,
args=[],
kwargs={},
str='test\ntraceback\ntraceback\n',
remote_name='NovaStyleException_Remote',
remote_args=('I am Nova', ),
remote_kwargs={})),
('nova_style_with_msg',
dict(allowed=_standard_allowed,
clsname='NovaStyleException',
modname=__name__,
cls=NovaStyleException,
args=['testing'],
kwargs={},
str='test\ntraceback\ntraceback\n',
remote_name='NovaStyleException_Remote',
remote_args=('testing', ),
remote_kwargs={})),
('kwargs_style',
dict(allowed=_standard_allowed,
clsname='KwargsStyleException',
modname=__name__,
cls=KwargsStyleException,
args=[],
kwargs={'who': 'Oslo'},
str='test\ntraceback\ntraceback\n',
remote_name='KwargsStyleException_Remote',
remote_args=('I am Oslo', ),
remote_kwargs={})),
('not_allowed',
dict(allowed=[],
clsname='NovaStyleException',
modname=__name__,
cls=messaging.RemoteError,
args=[],
kwargs={},
str=("Remote error: NovaStyleException test\n"
"[%r]." % u'traceback\ntraceback\n'),
msg=("Remote error: NovaStyleException test\n"
"[%r]." % u'traceback\ntraceback\n'),
remote_name='RemoteError',
remote_args=(),
remote_kwargs={'exc_type': 'NovaStyleException',
'value': 'test',
'traceback': 'traceback\ntraceback\n'})),
('unknown_module',
dict(allowed=['notexist'],
clsname='Exception',
modname='notexist',
cls=messaging.RemoteError,
args=[],
kwargs={},
str=("Remote error: Exception test\n"
"[%r]." % u'traceback\ntraceback\n'),
msg=("Remote error: Exception test\n"
"[%r]." % u'traceback\ntraceback\n'),
remote_name='RemoteError',
remote_args=(),
remote_kwargs={'exc_type': 'Exception',
'value': 'test',
'traceback': 'traceback\ntraceback\n'})),
('unknown_exception',
dict(allowed=[],
clsname='FarcicalError',
modname=EXCEPTIONS_MODULE,
cls=messaging.RemoteError,
args=[],
kwargs={},
str=("Remote error: FarcicalError test\n"
"[%r]." % u'traceback\ntraceback\n'),
msg=("Remote error: FarcicalError test\n"
"[%r]." % u'traceback\ntraceback\n'),
remote_name='RemoteError',
remote_args=(),
remote_kwargs={'exc_type': 'FarcicalError',
'value': 'test',
'traceback': 'traceback\ntraceback\n'})),
('unknown_kwarg',
dict(allowed=[],
clsname='Exception',
modname=EXCEPTIONS_MODULE,
cls=messaging.RemoteError,
args=[],
kwargs={'foobar': 'blaa'},
str=("Remote error: Exception test\n"
"[%r]." % u'traceback\ntraceback\n'),
msg=("Remote error: Exception test\n"
"[%r]." % u'traceback\ntraceback\n'),
remote_name='RemoteError',
remote_args=(),
remote_kwargs={'exc_type': 'Exception',
'value': 'test',
'traceback': 'traceback\ntraceback\n'})),
('system_exit',
dict(allowed=[],
clsname='SystemExit',
modname=EXCEPTIONS_MODULE,
cls=messaging.RemoteError,
args=[],
kwargs={},
str=("Remote error: SystemExit test\n"
"[%r]." % u'traceback\ntraceback\n'),
msg=("Remote error: SystemExit test\n"
"[%r]." % u'traceback\ntraceback\n'),
remote_name='RemoteError',
remote_args=(),
remote_kwargs={'exc_type': 'SystemExit',
'value': 'test',
'traceback': 'traceback\ntraceback\n'})),
]
def test_deserialize_remote_exception(self):
failure = {
'class': self.clsname,
'module': self.modname,
'message': 'test',
'tb': ['traceback\ntraceback\n'],
'args': self.args,
'kwargs': self.kwargs,
}
serialized = jsonutils.dumps(failure)
ex = exceptions.deserialize_remote_exception(serialized, self.allowed)
self.assertIsInstance(ex, self.cls)
self.assertEqual(self.remote_name, ex.__class__.__name__)
self.assertEqual(self.str, six.text_type(ex))
if hasattr(self, 'msg'):
self.assertEqual(self.msg, six.text_type(ex))
self.assertEqual((self.msg,) + self.remote_args, ex.args)
else:
self.assertEqual(self.remote_args, ex.args)

View File

@ -1,66 +0,0 @@
# Copyright 2012 OpenStack Foundation
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo import messaging
from oslo_messaging.tests import utils as test_utils
class TestExpectedExceptions(test_utils.BaseTestCase):
def test_exception(self):
e = None
try:
try:
raise ValueError()
except Exception:
raise messaging.ExpectedException()
except messaging.ExpectedException as e:
self.assertIsInstance(e, messaging.ExpectedException)
self.assertTrue(hasattr(e, 'exc_info'))
self.assertIsInstance(e.exc_info[1], ValueError)
def test_decorator_expected(self):
class FooException(Exception):
pass
@messaging.expected_exceptions(FooException)
def naughty():
raise FooException()
self.assertRaises(messaging.ExpectedException, naughty)
def test_decorator_expected_subclass(self):
class FooException(Exception):
pass
class BarException(FooException):
pass
@messaging.expected_exceptions(FooException)
def naughty():
raise BarException()
self.assertRaises(messaging.ExpectedException, naughty)
def test_decorator_unexpected(self):
class FooException(Exception):
pass
@messaging.expected_exceptions(FooException)
def really_naughty():
raise ValueError()
self.assertRaises(ValueError, really_naughty)

View File

@ -1,177 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testscenarios
from oslo import messaging
from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
class TargetConstructorTestCase(test_utils.BaseTestCase):
scenarios = [
('all_none', dict(kwargs=dict())),
('exchange', dict(kwargs=dict(exchange='testexchange'))),
('topic', dict(kwargs=dict(topic='testtopic'))),
('namespace', dict(kwargs=dict(namespace='testnamespace'))),
('version', dict(kwargs=dict(version='3.4'))),
('server', dict(kwargs=dict(server='testserver'))),
('fanout', dict(kwargs=dict(fanout=True))),
]
def test_constructor(self):
target = messaging.Target(**self.kwargs)
for k in self.kwargs:
self.assertEqual(self.kwargs[k], getattr(target, k))
for k in ['exchange', 'topic', 'namespace',
'version', 'server', 'fanout']:
if k in self.kwargs:
continue
self.assertIsNone(getattr(target, k))
class TargetCallableTestCase(test_utils.BaseTestCase):
scenarios = [
('all_none', dict(attrs=dict(), kwargs=dict(), vals=dict())),
('exchange_attr', dict(attrs=dict(exchange='testexchange'),
kwargs=dict(),
vals=dict(exchange='testexchange'))),
('exchange_arg', dict(attrs=dict(),
kwargs=dict(exchange='testexchange'),
vals=dict(exchange='testexchange'))),
('topic_attr', dict(attrs=dict(topic='testtopic'),
kwargs=dict(),
vals=dict(topic='testtopic'))),
('topic_arg', dict(attrs=dict(),
kwargs=dict(topic='testtopic'),
vals=dict(topic='testtopic'))),
('namespace_attr', dict(attrs=dict(namespace='testnamespace'),
kwargs=dict(),
vals=dict(namespace='testnamespace'))),
('namespace_arg', dict(attrs=dict(),
kwargs=dict(namespace='testnamespace'),
vals=dict(namespace='testnamespace'))),
('version_attr', dict(attrs=dict(version='3.4'),
kwargs=dict(),
vals=dict(version='3.4'))),
('version_arg', dict(attrs=dict(),
kwargs=dict(version='3.4'),
vals=dict(version='3.4'))),
('server_attr', dict(attrs=dict(server='testserver'),
kwargs=dict(),
vals=dict(server='testserver'))),
('server_arg', dict(attrs=dict(),
kwargs=dict(server='testserver'),
vals=dict(server='testserver'))),
('fanout_attr', dict(attrs=dict(fanout=True),
kwargs=dict(),
vals=dict(fanout=True))),
('fanout_arg', dict(attrs=dict(),
kwargs=dict(fanout=True),
vals=dict(fanout=True))),
]
def test_callable(self):
target = messaging.Target(**self.attrs)
target = target(**self.kwargs)
for k in self.vals:
self.assertEqual(self.vals[k], getattr(target, k))
for k in ['exchange', 'topic', 'namespace',
'version', 'server', 'fanout']:
if k in self.vals:
continue
self.assertIsNone(getattr(target, k))
class TargetReprTestCase(test_utils.BaseTestCase):
scenarios = [
('all_none', dict(kwargs=dict(), repr='')),
('exchange', dict(kwargs=dict(exchange='testexchange'),
repr='exchange=testexchange')),
('topic', dict(kwargs=dict(topic='testtopic'),
repr='topic=testtopic')),
('namespace', dict(kwargs=dict(namespace='testnamespace'),
repr='namespace=testnamespace')),
('version', dict(kwargs=dict(version='3.4'),
repr='version=3.4')),
('server', dict(kwargs=dict(server='testserver'),
repr='server=testserver')),
('fanout', dict(kwargs=dict(fanout=True),
repr='fanout=True')),
('exchange_and_fanout', dict(kwargs=dict(exchange='testexchange',
fanout=True),
repr='exchange=testexchange, '
'fanout=True')),
]
def test_repr(self):
target = messaging.Target(**self.kwargs)
self.assertEqual('<Target ' + self.repr + '>', str(target))
_notset = object()
class EqualityTestCase(test_utils.BaseTestCase):
@classmethod
def generate_scenarios(cls):
attr = [
('exchange', dict(attr='exchange')),
('topic', dict(attr='topic')),
('namespace', dict(attr='namespace')),
('version', dict(attr='version')),
('server', dict(attr='server')),
('fanout', dict(attr='fanout')),
]
a = [
('a_notset', dict(a_value=_notset)),
('a_none', dict(a_value=None)),
('a_empty', dict(a_value='')),
('a_foo', dict(a_value='foo')),
('a_bar', dict(a_value='bar')),
]
b = [
('b_notset', dict(b_value=_notset)),
('b_none', dict(b_value=None)),
('b_empty', dict(b_value='')),
('b_foo', dict(b_value='foo')),
('b_bar', dict(b_value='bar')),
]
cls.scenarios = testscenarios.multiply_scenarios(attr, a, b)
for s in cls.scenarios:
s[1]['equals'] = (s[1]['a_value'] == s[1]['b_value'])
def test_equality(self):
a_kwargs = {self.attr: self.a_value}
b_kwargs = {self.attr: self.b_value}
a = messaging.Target(**a_kwargs)
b = messaging.Target(**b_kwargs)
if self.equals:
self.assertEqual(a, b)
self.assertFalse(a != b)
else:
self.assertNotEqual(a, b)
self.assertFalse(a == b)
EqualityTestCase.generate_scenarios()

View File

@ -1,367 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
from mox3 import mox
import six
from stevedore import driver
import testscenarios
from oslo import messaging
from oslo.messaging import transport
from oslo_config import cfg
from oslo_messaging.tests import utils as test_utils
from oslo_messaging import transport as private_transport
load_tests = testscenarios.load_tests_apply_scenarios
class _FakeDriver(object):
def __init__(self, conf):
self.conf = conf
def send(self, *args, **kwargs):
pass
def send_notification(self, *args, **kwargs):
pass
def listen(self, target):
pass
class _FakeManager(object):
def __init__(self, driver):
self.driver = driver
class GetTransportTestCase(test_utils.BaseTestCase):
scenarios = [
('rpc_backend',
dict(url=None, transport_url=None, rpc_backend='testbackend',
control_exchange=None, allowed=None, aliases=None,
expect=dict(backend='testbackend',
exchange=None,
url='testbackend:',
allowed=[]))),
('transport_url',
dict(url=None, transport_url='testtransport:', rpc_backend=None,
control_exchange=None, allowed=None, aliases=None,
expect=dict(backend='testtransport',
exchange=None,
url='testtransport:',
allowed=[]))),
('url_param',
dict(url='testtransport:', transport_url=None, rpc_backend=None,
control_exchange=None, allowed=None, aliases=None,
expect=dict(backend='testtransport',
exchange=None,
url='testtransport:',
allowed=[]))),
('control_exchange',
dict(url=None, transport_url=None, rpc_backend='testbackend',
control_exchange='testexchange', allowed=None, aliases=None,
expect=dict(backend='testbackend',
exchange='testexchange',
url='testbackend:',
allowed=[]))),
('allowed_remote_exmods',
dict(url=None, transport_url=None, rpc_backend='testbackend',
control_exchange=None, allowed=['foo', 'bar'], aliases=None,
expect=dict(backend='testbackend',
exchange=None,
url='testbackend:',
allowed=['foo', 'bar']))),
('rpc_backend_aliased',
dict(url=None, transport_url=None, rpc_backend='testfoo',
control_exchange=None, allowed=None,
aliases=dict(testfoo='testbackend'),
expect=dict(backend='testbackend',
exchange=None,
url='testbackend:',
allowed=[]))),
('transport_url_aliased',
dict(url=None, transport_url='testfoo:', rpc_backend=None,
control_exchange=None, allowed=None,
aliases=dict(testfoo='testtransport'),
expect=dict(backend='testtransport',
exchange=None,
url='testtransport:',
allowed=[]))),
('url_param_aliased',
dict(url='testfoo:', transport_url=None, rpc_backend=None,
control_exchange=None, allowed=None,
aliases=dict(testfoo='testtransport'),
expect=dict(backend='testtransport',
exchange=None,
url='testtransport:',
allowed=[]))),
]
def test_get_transport(self):
self.config(rpc_backend=self.rpc_backend,
control_exchange=self.control_exchange,
transport_url=self.transport_url)
self.mox.StubOutWithMock(driver, 'DriverManager')
invoke_args = [self.conf,
messaging.TransportURL.parse(self.conf,
self.expect['url'])]
invoke_kwds = dict(default_exchange=self.expect['exchange'],
allowed_remote_exmods=self.expect['allowed'])
drvr = _FakeDriver(self.conf)
driver.DriverManager('oslo.messaging.drivers',
self.expect['backend'],
invoke_on_load=True,
invoke_args=invoke_args,
invoke_kwds=invoke_kwds).\
AndReturn(_FakeManager(drvr))
self.mox.ReplayAll()
kwargs = dict(url=self.url)
if self.allowed is not None:
kwargs['allowed_remote_exmods'] = self.allowed
if self.aliases is not None:
kwargs['aliases'] = self.aliases
transport_ = messaging.get_transport(self.conf, **kwargs)
self.assertIsNotNone(transport_)
self.assertIs(transport_.conf, self.conf)
self.assertIs(transport_._driver, drvr)
class GetTransportSadPathTestCase(test_utils.BaseTestCase):
scenarios = [
('invalid_transport_url',
dict(url=None, transport_url='invalid', rpc_backend=None,
ex=dict(cls=messaging.InvalidTransportURL,
msg_contains='No scheme specified',
url='invalid'))),
('invalid_url_param',
dict(url='invalid', transport_url=None, rpc_backend=None,
ex=dict(cls=messaging.InvalidTransportURL,
msg_contains='No scheme specified',
url='invalid'))),
('driver_load_failure',
dict(url=None, transport_url=None, rpc_backend='testbackend',
ex=dict(cls=messaging.DriverLoadFailure,
msg_contains='Failed to load',
driver='testbackend'))),
]
def test_get_transport_sad(self):
self.config(rpc_backend=self.rpc_backend,
transport_url=self.transport_url)
if self.rpc_backend:
self.mox.StubOutWithMock(driver, 'DriverManager')
invoke_args = [self.conf,
messaging.TransportURL.parse(self.conf,
self.url)]
invoke_kwds = dict(default_exchange='openstack',
allowed_remote_exmods=[])
driver.DriverManager('oslo.messaging.drivers',
self.rpc_backend,
invoke_on_load=True,
invoke_args=invoke_args,
invoke_kwds=invoke_kwds).\
AndRaise(RuntimeError())
self.mox.ReplayAll()
try:
messaging.get_transport(self.conf, url=self.url)
self.assertFalse(True)
except Exception as ex:
ex_cls = self.ex.pop('cls')
ex_msg_contains = self.ex.pop('msg_contains')
self.assertIsInstance(ex, messaging.MessagingException)
self.assertIsInstance(ex, ex_cls)
self.assertIn(ex_msg_contains, six.text_type(ex))
for k, v in self.ex.items():
self.assertTrue(hasattr(ex, k))
self.assertEqual(v, str(getattr(ex, k)))
# FIXME(markmc): this could be used elsewhere
class _SetDefaultsFixture(fixtures.Fixture):
def __init__(self, set_defaults, opts, *names):
super(_SetDefaultsFixture, self).__init__()
self.set_defaults = set_defaults
self.opts = opts
self.names = names
def setUp(self):
super(_SetDefaultsFixture, self).setUp()
# FIXME(markmc): this comes from Id5c1f3ba
def first(seq, default=None, key=None):
if key is None:
key = bool
return next(six.moves.filter(key, seq), default)
def default(opts, name):
return first(opts, key=lambda o: o.name == name).default
orig_defaults = {}
for n in self.names:
orig_defaults[n] = default(self.opts, n)
def restore_defaults():
self.set_defaults(**orig_defaults)
self.addCleanup(restore_defaults)
class TestSetDefaults(test_utils.BaseTestCase):
def setUp(self):
super(TestSetDefaults, self).setUp(conf=cfg.ConfigOpts())
self.useFixture(_SetDefaultsFixture(messaging.set_transport_defaults,
private_transport._transport_opts,
'control_exchange'))
def test_set_default_control_exchange(self):
messaging.set_transport_defaults(control_exchange='foo')
self.mox.StubOutWithMock(driver, 'DriverManager')
invoke_kwds = mox.ContainsKeyValue('default_exchange', 'foo')
driver.DriverManager(mox.IgnoreArg(),
mox.IgnoreArg(),
invoke_on_load=mox.IgnoreArg(),
invoke_args=mox.IgnoreArg(),
invoke_kwds=invoke_kwds).\
AndReturn(_FakeManager(_FakeDriver(self.conf)))
self.mox.ReplayAll()
messaging.get_transport(self.conf)
class TestTransportMethodArgs(test_utils.BaseTestCase):
_target = messaging.Target(topic='topic', server='server')
def test_send_defaults(self):
t = transport.Transport(_FakeDriver(cfg.CONF))
self.mox.StubOutWithMock(t._driver, 'send')
t._driver.send(self._target, 'ctxt', 'message',
wait_for_reply=None,
timeout=None, retry=None)
self.mox.ReplayAll()
t._send(self._target, 'ctxt', 'message')
def test_send_all_args(self):
t = transport.Transport(_FakeDriver(cfg.CONF))
self.mox.StubOutWithMock(t._driver, 'send')
t._driver.send(self._target, 'ctxt', 'message',
wait_for_reply='wait_for_reply',
timeout='timeout', retry='retry')
self.mox.ReplayAll()
t._send(self._target, 'ctxt', 'message',
wait_for_reply='wait_for_reply',
timeout='timeout', retry='retry')
def test_send_notification(self):
t = transport.Transport(_FakeDriver(cfg.CONF))
self.mox.StubOutWithMock(t._driver, 'send_notification')
t._driver.send_notification(self._target, 'ctxt', 'message', 1.0,
retry=None)
self.mox.ReplayAll()
t._send_notification(self._target, 'ctxt', 'message', version=1.0)
def test_send_notification_all_args(self):
t = transport.Transport(_FakeDriver(cfg.CONF))
self.mox.StubOutWithMock(t._driver, 'send_notification')
t._driver.send_notification(self._target, 'ctxt', 'message', 1.0,
retry=5)
self.mox.ReplayAll()
t._send_notification(self._target, 'ctxt', 'message', version=1.0,
retry=5)
def test_listen(self):
t = transport.Transport(_FakeDriver(cfg.CONF))
self.mox.StubOutWithMock(t._driver, 'listen')
t._driver.listen(self._target)
self.mox.ReplayAll()
t._listen(self._target)
class TestTransportUrlCustomisation(test_utils.BaseTestCase):
def setUp(self):
super(TestTransportUrlCustomisation, self).setUp()
self.url1 = transport.TransportURL.parse(self.conf, "fake://vhost1")
self.url2 = transport.TransportURL.parse(self.conf, "fake://vhost2")
self.url3 = transport.TransportURL.parse(self.conf, "fake://vhost1")
def test_hash(self):
urls = {}
urls[self.url1] = self.url1
urls[self.url2] = self.url2
urls[self.url3] = self.url3
self.assertEqual(2, len(urls))
def test_eq(self):
self.assertEqual(self.url1, self.url3)
self.assertNotEqual(self.url1, self.url2)
class TestTransportHostCustomisation(test_utils.BaseTestCase):
def setUp(self):
super(TestTransportHostCustomisation, self).setUp()
self.host1 = transport.TransportHost("host1", 5662, "user", "pass")
self.host2 = transport.TransportHost("host1", 5662, "user", "pass")
self.host3 = transport.TransportHost("host1", 5663, "user", "pass")
self.host4 = transport.TransportHost("host1", 5662, "user2", "pass")
self.host5 = transport.TransportHost("host1", 5662, "user", "pass2")
self.host6 = transport.TransportHost("host2", 5662, "user", "pass")
def test_hash(self):
hosts = {}
hosts[self.host1] = self.host1
hosts[self.host2] = self.host2
hosts[self.host3] = self.host3
hosts[self.host4] = self.host4
hosts[self.host5] = self.host5
hosts[self.host6] = self.host6
self.assertEqual(5, len(hosts))
def test_eq(self):
self.assertEqual(self.host1, self.host2)
self.assertNotEqual(self.host1, self.host3)
self.assertNotEqual(self.host1, self.host4)
self.assertNotEqual(self.host1, self.host5)
self.assertNotEqual(self.host1, self.host6)

View File

@ -1,236 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testscenarios
from oslo import messaging
from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
class TestParseURL(test_utils.BaseTestCase):
scenarios = [
('transport',
dict(url='foo:', aliases=None,
expect=dict(transport='foo'))),
('transport_aliased',
dict(url='bar:', aliases=dict(bar='foo'),
expect=dict(transport='foo'))),
('virtual_host_slash',
dict(url='foo:////', aliases=None,
expect=dict(transport='foo', virtual_host='/'))),
('virtual_host',
dict(url='foo:///bar', aliases=None,
expect=dict(transport='foo', virtual_host='bar'))),
('host',
dict(url='foo://host/bar', aliases=None,
expect=dict(transport='foo',
virtual_host='bar',
hosts=[
dict(host='host'),
]))),
('ipv6_host',
dict(url='foo://[ffff::1]/bar', aliases=None,
expect=dict(transport='foo',
virtual_host='bar',
hosts=[
dict(host='ffff::1'),
]))),
('port',
dict(url='foo://host:1234/bar', aliases=None,
expect=dict(transport='foo',
virtual_host='bar',
hosts=[
dict(host='host', port=1234),
]))),
('ipv6_port',
dict(url='foo://[ffff::1]:1234/bar', aliases=None,
expect=dict(transport='foo',
virtual_host='bar',
hosts=[
dict(host='ffff::1', port=1234),
]))),
('username',
dict(url='foo://u@host:1234/bar', aliases=None,
expect=dict(transport='foo',
virtual_host='bar',
hosts=[
dict(host='host', port=1234, username='u'),
]))),
('password',
dict(url='foo://u:p@host:1234/bar', aliases=None,
expect=dict(transport='foo',
virtual_host='bar',
hosts=[
dict(host='host', port=1234,
username='u', password='p'),
]))),
('creds_no_host',
dict(url='foo://u:p@/bar', aliases=None,
expect=dict(transport='foo',
virtual_host='bar',
hosts=[
dict(username='u', password='p'),
]))),
('multi_host',
dict(url='foo://u:p@host1:1234,host2:4321/bar', aliases=None,
expect=dict(transport='foo',
virtual_host='bar',
hosts=[
dict(host='host1', port=1234,
username='u', password='p'),
dict(host='host2', port=4321),
]))),
('multi_creds',
dict(url='foo://u1:p1@host1:1234,u2:p2@host2:4321/bar', aliases=None,
expect=dict(transport='foo',
virtual_host='bar',
hosts=[
dict(host='host1', port=1234,
username='u1', password='p1'),
dict(host='host2', port=4321,
username='u2', password='p2'),
]))),
('multi_creds_ipv6',
dict(url='foo://u1:p1@[ffff::1]:1234,u2:p2@[ffff::2]:4321/bar',
aliases=None,
expect=dict(transport='foo',
virtual_host='bar',
hosts=[
dict(host='ffff::1', port=1234,
username='u1', password='p1'),
dict(host='ffff::2', port=4321,
username='u2', password='p2'),
]))),
]
def test_parse_url(self):
self.config(rpc_backend=None)
url = messaging.TransportURL.parse(self.conf, self.url, self.aliases)
hosts = []
for host in self.expect.get('hosts', []):
hosts.append(messaging.TransportHost(host.get('host'),
host.get('port'),
host.get('username'),
host.get('password')))
expected = messaging.TransportURL(self.conf,
self.expect.get('transport'),
self.expect.get('virtual_host'),
hosts)
self.assertEqual(expected, url)
class TestFormatURL(test_utils.BaseTestCase):
scenarios = [
('rpc_backend',
dict(rpc_backend='testbackend',
transport=None,
virtual_host=None,
hosts=[],
aliases=None,
expected='testbackend:///')),
('rpc_backend_aliased',
dict(rpc_backend='testfoo',
transport=None,
virtual_host=None,
hosts=[],
aliases=dict(testfoo='testbackend'),
expected='testbackend:///')),
('transport',
dict(rpc_backend=None,
transport='testtransport',
virtual_host=None,
hosts=[],
aliases=None,
expected='testtransport:///')),
('transport_aliased',
dict(rpc_backend=None,
transport='testfoo',
virtual_host=None,
hosts=[],
aliases=dict(testfoo='testtransport'),
expected='testtransport:///')),
('virtual_host',
dict(rpc_backend=None,
transport='testtransport',
virtual_host='/vhost',
hosts=[],
aliases=None,
expected='testtransport:////vhost')),
('host',
dict(rpc_backend=None,
transport='testtransport',
virtual_host='/',
hosts=[
dict(hostname='host',
port=10,
username='bob',
password='secret'),
],
aliases=None,
expected='testtransport://bob:secret@host:10//')),
('multi_host',
dict(rpc_backend=None,
transport='testtransport',
virtual_host='',
hosts=[
dict(hostname='h1',
port=1000,
username='b1',
password='s1'),
dict(hostname='h2',
port=2000,
username='b2',
password='s2'),
],
aliases=None,
expected='testtransport://b1:s1@h1:1000,b2:s2@h2:2000/')),
('quoting',
dict(rpc_backend=None,
transport='testtransport',
virtual_host='/$',
hosts=[
dict(hostname='host',
port=10,
username='b$',
password='s&'),
],
aliases=None,
expected='testtransport://b%24:s%26@host:10//%24')),
]
def test_parse_url(self):
self.config(rpc_backend=self.rpc_backend)
hosts = []
for host in self.hosts:
hosts.append(messaging.TransportHost(host.get('hostname'),
host.get('port'),
host.get('username'),
host.get('password')))
url = messaging.TransportURL(self.conf,
self.transport,
self.virtual_host,
hosts,
self.aliases)
self.assertEqual(self.expected, str(url))

View File

@ -1,61 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import imp
import os
import warnings
from oslotest import base as test_base
import six
from six.moves import mock
class DeprecationWarningTest(test_base.BaseTestCase):
@mock.patch('warnings.warn')
def test_warning(self, mock_warn):
import oslo.messaging
imp.reload(oslo.messaging)
self.assertTrue(mock_warn.called)
args = mock_warn.call_args
self.assertIn('oslo_messaging', args[0][0])
self.assertIn('deprecated', args[0][0])
self.assertTrue(issubclass(args[0][1], DeprecationWarning))
def test_real_warning(self):
with warnings.catch_warnings(record=True) as warning_msgs:
warnings.resetwarnings()
warnings.simplefilter('always', DeprecationWarning)
import oslo.messaging
# Use a separate function to get the stack level correct
# so we know the message points back to this file. This
# corresponds to an import or reload, which isn't working
# inside the test under Python 3.3. That may be due to a
# difference in the import implementation not triggering
# warnings properly when the module is reloaded, or
# because the warnings module is mostly implemented in C
# and something isn't cleanly resetting the global state
# used to track whether a warning needs to be
# emitted. Whatever the cause, we definitely see the
# warnings.warn() being invoked on a reload (see the test
# above) and warnings are reported on the console when we
# run the tests. A simpler test script run outside of
# testr does correctly report the warnings.
def foo():
oslo.messaging.deprecated()
foo()
self.assertEqual(1, len(warning_msgs))
msg = warning_msgs[0]
self.assertIn('oslo_messaging', six.text_type(msg.message))
self.assertEqual('test_warning.py', os.path.basename(msg.filename))