Add transport reconnection retries

When a rpc client try to make a RPC call and the server is unreachable
The rpc call hang until the server come back.

In most case this is the desired behavior.

But sometimes, we can prefer that the library raise an exception after a
certain number of retries.

For example in ceilometer, when publishing a
storage.objects.incoming.bytes sample from the Swift middleware to an
AMQP topic, you might not want to block the Swift client if the AMQP broker
is unavailable - instead, you might have a queueing policy whereby
if a single reconection attempt fails we queue the sample in memory and
try again when another sample is to be published.

This patch is the oslo.messaging part that allow this.

Closes bug #1282639
Co-Authored-By: Ala Rezmerita <ala.rezmerita@cloudwatt.com>

Change-Id: I32086d0abf141c368343bf225d4b021da496c020
This commit is contained in:
Mehdi Abaakouk 2014-02-21 11:50:45 +01:00
parent 7d927e5e10
commit 948c05417c
12 changed files with 326 additions and 122 deletions

View File

@ -332,7 +332,7 @@ class AMQPDriverBase(base.BaseDriver):
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None,
envelope=True, notify=False):
envelope=True, notify=False, retry=None):
# FIXME(markmc): remove this temporary hack
class Context(object):
@ -364,15 +364,16 @@ class AMQPDriverBase(base.BaseDriver):
with self._get_connection() as conn:
if notify:
conn.notify_send(self._get_exchange(target),
target.topic, msg)
target.topic, msg, retry=retry)
elif target.fanout:
conn.fanout_send(target.topic, msg)
conn.fanout_send(target.topic, msg, retry=retry)
else:
topic = target.topic
if target.server:
topic = '%s.%s' % (target.topic, target.server)
conn.topic_send(exchange_name=self._get_exchange(target),
topic=topic, msg=msg, timeout=timeout)
topic=topic, msg=msg, timeout=timeout,
retry=retry)
if wait_for_reply:
result = self._waiter.wait(msg_id, timeout)
@ -383,8 +384,10 @@ class AMQPDriverBase(base.BaseDriver):
if wait_for_reply:
self._waiter.unlisten(msg_id)
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
return self._send(target, ctxt, message, wait_for_reply, timeout)
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
return self._send(target, ctxt, message, wait_for_reply, timeout,
retry=retry)
def send_notification(self, target, ctxt, message, version):
return self._send(target, ctxt, message,

View File

@ -162,7 +162,10 @@ class FakeDriver(base.BaseDriver):
return None
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
# NOTE(sileht): retry doesn't need to be implemented, the fake
# transport always works
return self._send(target, ctxt, message, wait_for_reply, timeout)
def send_notification(self, target, ctxt, message, version):

View File

@ -25,6 +25,7 @@ import six
from oslo.messaging._drivers import amqp as rpc_amqp
from oslo.messaging._drivers import amqpdriver
from oslo.messaging._drivers import common as rpc_common
from oslo.messaging import exceptions
from oslo.messaging.openstack.common import importutils
from oslo.messaging.openstack.common import jsonutils
from oslo.messaging.openstack.common import network_utils
@ -487,7 +488,7 @@ class Connection(object):
self.reconnect()
def connection_create(self, broker):
def _connect(self, broker):
# Create the connection - this does not open the connection
self.connection = qpid_messaging.Connection(broker['host'])
@ -502,6 +503,7 @@ class Connection(object):
self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.transport = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
self.connection.open()
def _register_consumer(self, consumer):
self.consumers[str(consumer.get_receiver())] = consumer
@ -509,29 +511,52 @@ class Connection(object):
def _lookup_consumer(self, receiver):
return self.consumers[str(receiver)]
def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues."""
delay = 1
while True:
# Close the session if necessary
if self.connection is not None and self.connection.opened():
try:
self.connection.close()
except qpid_exceptions.MessagingError:
pass
broker = six.next(self.brokers)
def _disconnect(self):
# Close the session if necessary
if self.connection is not None and self.connection.opened():
try:
self.connection_create(broker)
self.connection.open()
self.connection.close()
except qpid_exceptions.MessagingError:
pass
self.connection = None
def reconnect(self, retry=None):
"""Handles reconnecting and re-establishing sessions and queues.
Will retry up to retry number of times.
retry = None or -1 means to retry forever
retry = 0 means no retry
retry = N means N retries
"""
delay = 1
attempt = 0
loop_forever = False
if retry is None or retry < 0:
loop_forever = True
while True:
self._disconnect()
attempt += 1
broker = six.next(self.brokers)
try:
self._connect(broker)
except qpid_exceptions.MessagingError as e:
msg_dict = dict(e=e, delay=delay, broker=broker['host'])
msg = _("Unable to connect to AMQP server on %(broker)s: "
"%(e)s. Sleeping %(delay)s seconds") % msg_dict
LOG.error(msg)
time.sleep(delay)
delay = min(delay + 1, 5)
msg_dict = dict(e=e,
delay=delay,
retry=retry,
broker=broker)
if not loop_forever and attempt > retry:
msg = _('Unable to connect to AMQP server on '
'%(broker)s after %(retry)d '
'tries: %(e)s') % msg_dict
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
else:
msg = _("Unable to connect to AMQP server on %(broker)s: "
"%(e)s. Sleeping %(delay)s seconds") % msg_dict
LOG.error(msg)
time.sleep(delay)
delay = min(delay + 1, 5)
else:
LOG.info(_('Connected to AMQP server on %s'), broker['host'])
break
@ -548,7 +573,7 @@ class Connection(object):
LOG.debug("Re-established AMQP queues")
def ensure(self, error_callback, method, *args, **kwargs):
def ensure(self, error_callback, method, retry=None, *args, **kwargs):
while True:
try:
return method(*args, **kwargs)
@ -556,7 +581,7 @@ class Connection(object):
qpid_exceptions.MessagingError) as e:
if error_callback:
error_callback(e)
self.reconnect()
self.reconnect(retry=retry)
def close(self):
"""Close/release this connection."""
@ -614,7 +639,7 @@ class Connection(object):
raise StopIteration
yield self.ensure(_error_callback, _consume)
def publisher_send(self, cls, topic, msg, **kwargs):
def publisher_send(self, cls, topic, msg, retry=None, **kwargs):
"""Send to a publisher based on the publisher class."""
def _connect_error(exc):
@ -626,7 +651,7 @@ class Connection(object):
publisher = cls(self.conf, self.session, topic=topic, **kwargs)
publisher.send(msg)
return self.ensure(_connect_error, _publisher_send)
return self.ensure(_connect_error, _publisher_send, retry=retry)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
@ -652,7 +677,7 @@ class Connection(object):
"""Send a 'direct' message."""
self.publisher_send(DirectPublisher, topic=msg_id, msg=msg)
def topic_send(self, exchange_name, topic, msg, timeout=None):
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""Send a 'topic' message."""
#
# We want to create a message with attributes, e.g. a TTL. We
@ -666,11 +691,11 @@ class Connection(object):
#
qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
self.publisher_send(TopicPublisher, topic=topic, msg=qpid_message,
exchange_name=exchange_name)
exchange_name=exchange_name, retry=retry)
def fanout_send(self, topic, msg):
def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic=topic, msg=msg)
self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry)
def notify_send(self, exchange_name, topic, msg, **kwargs):
"""Send a notify message on a topic."""

View File

@ -31,6 +31,7 @@ import six
from oslo.messaging._drivers import amqp as rpc_amqp
from oslo.messaging._drivers import amqpdriver
from oslo.messaging._drivers import common as rpc_common
from oslo.messaging import exceptions
from oslo.messaging.openstack.common import network_utils
# FIXME(markmc): remove this
@ -539,27 +540,8 @@ class Connection(object):
been declared before if we are reconnecting. Exceptions should
be handled by the caller.
"""
if self.connection:
LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % broker)
try:
# XXX(nic): when reconnecting to a RabbitMQ cluster
# with mirrored queues in use, the attempt to release the
# connection can hang "indefinitely" somewhere deep down
# in Kombu. Blocking the thread for a bit prior to
# release seems to kludge around the problem where it is
# otherwise reproduceable.
if self.conf.kombu_reconnect_delay > 0:
LOG.info(_("Delaying reconnect for %1.1f seconds...") %
self.conf.kombu_reconnect_delay)
time.sleep(self.conf.kombu_reconnect_delay)
self.connection.release()
except self.connection_errors:
pass
# Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet.
self.connection = None
LOG.info(_("Connecting to AMQP server on "
"%(hostname)s:%(port)d") % broker)
self.connection = kombu.connection.BrokerConnection(**broker)
self.connection_errors = self.connection.connection_errors
self.channel_errors = self.connection.channel_errors
@ -578,17 +560,47 @@ class Connection(object):
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
broker)
def reconnect(self):
def _disconnect(self):
if self.connection:
# XXX(nic): when reconnecting to a RabbitMQ cluster
# with mirrored queues in use, the attempt to release the
# connection can hang "indefinitely" somewhere deep down
# in Kombu. Blocking the thread for a bit prior to
# release seems to kludge around the problem where it is
# otherwise reproduceable.
if self.conf.kombu_reconnect_delay > 0:
LOG.info(_("Delaying reconnect for %1.1f seconds...") %
self.conf.kombu_reconnect_delay)
time.sleep(self.conf.kombu_reconnect_delay)
try:
self.connection.release()
except self.connection_errors:
pass
self.connection = None
def reconnect(self, retry=None):
"""Handles reconnecting and re-establishing queues.
Will retry up to self.max_retries number of times.
self.max_retries = 0 means to retry forever.
Will retry up to retry number of times.
retry = None means use the value of rabbit_max_retries
retry = -1 means to retry forever
retry = 0 means no retry
retry = N means N retries
Sleep between tries, starting at self.interval_start
seconds, backing off self.interval_stepping number of seconds
each attempt.
"""
attempt = 0
loop_forever = False
if retry is None:
retry = self.max_retries
if retry is None or retry < 0:
loop_forever = True
while True:
self._disconnect()
broker = six.next(self.brokers)
attempt += 1
try:
@ -610,30 +622,30 @@ class Connection(object):
log_info = {}
log_info['err_str'] = e
log_info['max_retries'] = self.max_retries
log_info['retry'] = retry or 0
log_info.update(broker)
if self.max_retries and attempt == self.max_retries:
if not loop_forever and attempt > retry:
msg = _('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d '
'%(hostname)s:%(port)d after %(retry)d '
'tries: %(err_str)s') % log_info
LOG.error(msg)
raise rpc_common.RPCException(msg)
raise exceptions.MessageDeliveryFailure(msg)
else:
if attempt == 1:
sleep_time = self.interval_start or 1
elif attempt > 1:
sleep_time += self.interval_stepping
if attempt == 1:
sleep_time = self.interval_start or 1
elif attempt > 1:
sleep_time += self.interval_stepping
if self.interval_max:
sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
log_info['sleep_time'] = sleep_time
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs):
def ensure(self, error_callback, method, retry=None, *args, **kwargs):
while True:
try:
return method(*args, **kwargs)
@ -657,7 +669,7 @@ class Connection(object):
raise
if error_callback:
error_callback(e)
self.reconnect()
self.reconnect(retry=retry)
def get_channel(self):
"""Convenience call for bin/clear_rabbit_queues."""
@ -665,8 +677,9 @@ class Connection(object):
def close(self):
"""Close/release this connection."""
self.connection.release()
self.connection = None
if self.connection:
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again."""
@ -722,7 +735,8 @@ class Connection(object):
raise StopIteration
yield self.ensure(_error_callback, _consume)
def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
**kwargs):
"""Send to a publisher based on the publisher class."""
def _error_callback(exc):
@ -734,7 +748,7 @@ class Connection(object):
publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
publisher.send(msg, timeout)
self.ensure(_error_callback, _publish)
self.ensure(_error_callback, _publish, retry=retry)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
@ -760,14 +774,14 @@ class Connection(object):
"""Send a 'direct' message."""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, exchange_name, topic, msg, timeout=None):
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""Send a 'topic' message."""
self.publisher_send(TopicPublisher, topic, msg, timeout,
exchange_name=exchange_name)
exchange_name=exchange_name, retry=retry)
def fanout_send(self, topic, msg):
def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic, msg)
self.publisher_send(FanoutPublisher, topic, msg, retry=retry)
def notify_send(self, exchange_name, topic, msg, **kwargs):
"""Send a notify message on a topic."""
@ -786,7 +800,8 @@ class Connection(object):
class RabbitDriver(amqpdriver.AMQPDriverBase):
def __init__(self, conf, url, default_exchange=None,
def __init__(self, conf, url,
default_exchange=None,
allowed_remote_exmods=[]):
conf.register_opts(rabbit_opts)
conf.register_opts(rpc_amqp.amqp_opts)

View File

@ -939,7 +939,10 @@ class ZmqDriver(base.BaseDriver):
if wait_for_reply:
return reply[-1]
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
# NOTE(sileht): retry is not implemented because this driver never
# retry anything
return self._send(target, ctxt, message, wait_for_reply, timeout)
def send_notification(self, target, ctxt, message, version):

View File

@ -13,7 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
__all__ = ['MessagingException', 'MessagingTimeout', 'InvalidTarget']
__all__ = ['MessagingException', 'MessagingTimeout', 'MessageDeliveryFailure',
'InvalidTarget']
class MessagingException(Exception):
@ -24,6 +25,10 @@ class MessagingTimeout(MessagingException):
"""Raised if message sending times out."""
class MessageDeliveryFailure(MessagingException):
"""Raised if message sending failed after the asked retry."""
class InvalidTarget(MessagingException, ValueError):
"""Raised if a target does not meet certain pre-conditions."""

View File

@ -84,13 +84,14 @@ class _CallContext(object):
_marker = object()
def __init__(self, transport, target, serializer,
timeout=None, version_cap=None):
timeout=None, version_cap=None, retry=None):
self.conf = transport.conf
self.transport = transport
self.target = target
self.serializer = serializer
self.timeout = timeout
self.retry = retry
self.version_cap = version_cap
super(_CallContext, self).__init__()
@ -129,7 +130,7 @@ class _CallContext(object):
if self.version_cap:
self._check_version_cap(msg.get('version'))
try:
self.transport._send(self.target, ctxt, msg)
self.transport._send(self.target, ctxt, msg, retry=self.retry)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
@ -147,7 +148,8 @@ class _CallContext(object):
try:
result = self.transport._send(self.target, msg_ctxt, msg,
wait_for_reply=True, timeout=timeout)
wait_for_reply=True, timeout=timeout,
retry=self.retry)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
return self.serializer.deserialize_entity(ctxt, result)
@ -156,7 +158,7 @@ class _CallContext(object):
def _prepare(cls, base,
exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker):
timeout=_marker, version_cap=_marker, retry=_marker):
"""Prepare a method invocation context. See RPCClient.prepare()."""
kwargs = dict(
exchange=exchange,
@ -171,21 +173,23 @@ class _CallContext(object):
if timeout is cls._marker:
timeout = base.timeout
if retry is cls._marker:
retry = base.retry
if version_cap is cls._marker:
version_cap = base.version_cap
return _CallContext(base.transport, target,
base.serializer,
timeout, version_cap)
timeout, version_cap, retry)
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker):
timeout=_marker, version_cap=_marker, retry=_marker):
"""Prepare a method invocation context. See RPCClient.prepare()."""
return self._prepare(self,
exchange, topic, namespace,
version, server, fanout,
timeout, version_cap)
timeout, version_cap, retry)
class RPCClient(object):
@ -245,7 +249,7 @@ class RPCClient(object):
"""
def __init__(self, transport, target,
timeout=None, version_cap=None, serializer=None):
timeout=None, version_cap=None, serializer=None, retry=None):
"""Construct an RPC client.
:param transport: a messaging transport handle
@ -258,6 +262,11 @@ class RPCClient(object):
:type version_cap: str
:param serializer: an optional entity serializer
:type serializer: Serializer
:param retry: an optional default connection retries configuration
None or -1 means to retry forever
0 means no retry
N means N retries
:type retry: int
"""
self.conf = transport.conf
self.conf.register_opts(_client_opts)
@ -265,6 +274,7 @@ class RPCClient(object):
self.transport = transport
self.target = target
self.timeout = timeout
self.retry = retry
self.version_cap = version_cap
self.serializer = serializer or msg_serializer.NoOpSerializer()
@ -274,7 +284,7 @@ class RPCClient(object):
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker):
timeout=_marker, version_cap=_marker, retry=_marker):
"""Prepare a method invocation context.
Use this method to override client properties for an individual method
@ -300,11 +310,16 @@ class RPCClient(object):
:type timeout: int or float
:param version_cap: raise a RPCVersionCapError version exceeds this cap
:type version_cap: str
:param retry: an optional connection retries configuration
None or -1 means to retry forever
0 means no retry
N means N retries
:type retry: int
"""
return _CallContext._prepare(self,
exchange, topic, namespace,
version, server, fanout,
timeout, version_cap)
timeout, version_cap, retry)
def cast(self, ctxt, method, **kwargs):
"""Invoke a method and return immediately.
@ -321,6 +336,7 @@ class RPCClient(object):
:type method: str
:param kwargs: a dict of method arguments
:type kwargs: dict
:raises: MessageDeliveryFailure
"""
self.prepare().cast(ctxt, method, **kwargs)
@ -356,7 +372,7 @@ class RPCClient(object):
:type method: str
:param kwargs: a dict of method arguments
:type kwargs: dict
:raises: MessagingTimeout, RemoteError
:raises: MessagingTimeout, RemoteError, MessageDeliveryFailure
"""
return self.prepare().call(ctxt, method, **kwargs)

View File

@ -80,13 +80,14 @@ class Transport(object):
def _require_driver_features(self, requeue=False):
self._driver.require_features(requeue=requeue)
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send',
target)
return self._driver.send(target, ctxt, message,
wait_for_reply=wait_for_reply,
timeout=timeout)
timeout=timeout, retry=retry)
def _send_notification(self, target, ctxt, message, version):
if not target.topic:

View File

@ -753,3 +753,75 @@ _fake_session = FakeQpidSession()
def get_fake_qpid_session():
return _fake_session
class QPidHATestCase(test_utils.BaseTestCase):
def setUp(self):
super(QPidHATestCase, self).setUp()
self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
self.config(qpid_hosts=self.brokers,
qpid_username=None,
qpid_password=None)
hostname_sets = set()
self.info = {'attempt': 0,
'fail': False}
def _connect(myself, broker):
# do as little work that is enough to pass connection attempt
myself.connection = mock.Mock()
hostname = broker['host']
self.assertNotIn(hostname, hostname_sets)
hostname_sets.add(hostname)
self.info['attempt'] += 1
if self.info['fail']:
raise qpid.messaging.exceptions.ConnectionError
# just make sure connection instantiation does not fail with an
# exception
self.stubs.Set(qpid_driver.Connection, '_connect', _connect)
# starting from the first broker in the list
url = messaging.TransportURL.parse(self.conf, None)
self.connection = qpid_driver.Connection(self.conf, url)
self.addCleanup(self.connection.close)
self.info.update({'attempt': 0,
'fail': True})
hostname_sets.clear()
def test_reconnect_order(self):
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.reconnect,
retry=len(self.brokers) - 1)
self.assertEqual(len(self.brokers), self.info['attempt'])
def test_ensure_four_retries(self):
mock_callback = mock.Mock(
side_effect=qpid.messaging.exceptions.ConnectionError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=4)
self.assertEqual(5, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
def test_ensure_one_retry(self):
mock_callback = mock.Mock(
side_effect=qpid.messaging.exceptions.ConnectionError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=1)
self.assertEqual(2, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
def test_ensure_no_retry(self):
mock_callback = mock.Mock(
side_effect=qpid.messaging.exceptions.ConnectionError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=0)
self.assertEqual(1, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)

View File

@ -20,6 +20,7 @@ import uuid
import fixtures
import kombu
import mock
import testscenarios
from oslo import messaging
@ -630,14 +631,14 @@ TestReplyWireFormat.generate_scenarios()
class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_reconnect_order(self):
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
brokers_count = len(brokers)
self.config(rabbit_hosts=brokers,
rabbit_max_retries=1)
def setUp(self):
super(RpcKombuHATestCase, self).setUp()
self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
self.config(rabbit_hosts=self.brokers)
hostname_sets = set()
self.info = {'attempt': 0,
'fail': False}
def _connect(myself, params):
# do as little work that is enough to pass connection attempt
@ -646,22 +647,51 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
hostname = params['hostname']
self.assertNotIn(hostname, hostname_sets)
hostname_sets.add(hostname)
self.info['attempt'] += 1
if self.info['fail']:
raise IOError('fake fail')
# just make sure connection instantiation does not fail with an
# exception
self.stubs.Set(rabbit_driver.Connection, '_connect', _connect)
# starting from the first broker in the list
url = messaging.TransportURL.parse(self.conf, None)
connection = rabbit_driver.Connection(self.conf, url)
self.connection = rabbit_driver.Connection(self.conf, url)
self.addCleanup(self.connection.close)
# now that we have connection object, revert to the real 'connect'
# implementation
self.stubs.UnsetAll()
self.info.update({'attempt': 0,
'fail': True})
hostname_sets.clear()
for i in range(brokers_count):
self.assertRaises(driver_common.RPCException, connection.reconnect)
def test_reconnect_order(self):
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.reconnect,
retry=len(self.brokers) - 1)
self.assertEqual(len(self.brokers), self.info['attempt'])
connection.close()
def test_ensure_four_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=4)
self.assertEqual(5, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
def test_ensure_one_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=1)
self.assertEqual(2, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
def test_ensure_no_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=0)
self.assertEqual(1, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)

View File

@ -56,7 +56,7 @@ class TestCastCall(test_utils.BaseTestCase):
self.mox.StubOutWithMock(transport, '_send')
msg = dict(method='foo', args=self.args)
kwargs = {}
kwargs = {'retry': None}
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
@ -197,7 +197,7 @@ class TestCastToTarget(test_utils.BaseTestCase):
msg['namespace'] = self.expect['namespace']
if 'version' in self.expect:
msg['version'] = self.expect['version']
transport._send(expect_target, {}, msg)
transport._send(expect_target, {}, msg, retry=None)
self.mox.ReplayAll()
@ -243,7 +243,7 @@ class TestCallTimeout(test_utils.BaseTestCase):
self.mox.StubOutWithMock(transport, '_send')
msg = dict(method='foo', args={})
kwargs = dict(wait_for_reply=True, timeout=self.expect)
kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None)
transport._send(messaging.Target(), {}, msg, **kwargs)
self.mox.ReplayAll()
@ -253,6 +253,36 @@ class TestCallTimeout(test_utils.BaseTestCase):
client.call({}, 'foo')
class TestCallRetry(test_utils.BaseTestCase):
scenarios = [
('all_none', dict(ctor=None, prepare=_notset, expect=None)),
('ctor', dict(ctor=21, prepare=_notset, expect=21)),
('ctor_zero', dict(ctor=0, prepare=_notset, expect=0)),
('prepare', dict(ctor=None, prepare=21, expect=21)),
('prepare_override', dict(ctor=10, prepare=21, expect=21)),
('prepare_zero', dict(ctor=None, prepare=0, expect=0)),
]
def test_call_retry(self):
transport = _FakeTransport(self.conf)
client = messaging.RPCClient(transport, messaging.Target(),
retry=self.ctor)
self.mox.StubOutWithMock(transport, '_send')
msg = dict(method='foo', args={})
kwargs = dict(wait_for_reply=True, timeout=60,
retry=self.expect)
transport._send(messaging.Target(), {}, msg, **kwargs)
self.mox.ReplayAll()
if self.prepare is not _notset:
client = client.prepare(retry=self.prepare)
client.call({}, 'foo')
class TestSerializer(test_utils.BaseTestCase):
scenarios = [
@ -282,6 +312,7 @@ class TestSerializer(test_utils.BaseTestCase):
msg = dict(method='foo',
args=dict([(k, 's' + v) for k, v in self.args.items()]))
kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {}
kwargs['retry'] = None
transport._send(messaging.Target(),
dict(user='alice'),
msg,
@ -367,7 +398,7 @@ class TestVersionCap(test_utils.BaseTestCase):
if target.version is not None:
msg['version'] = target.version
kwargs = {}
kwargs = {'retry': None}
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None

View File

@ -270,7 +270,7 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
self.mox.StubOutWithMock(t._driver, 'send')
t._driver.send(self._target, 'ctxt', 'message',
wait_for_reply=None,
timeout=None)
timeout=None, retry=None)
self.mox.ReplayAll()
t._send(self._target, 'ctxt', 'message')
@ -281,12 +281,12 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
self.mox.StubOutWithMock(t._driver, 'send')
t._driver.send(self._target, 'ctxt', 'message',
wait_for_reply='wait_for_reply',
timeout='timeout')
timeout='timeout', retry='retry')
self.mox.ReplayAll()
t._send(self._target, 'ctxt', 'message',
wait_for_reply='wait_for_reply',
timeout='timeout')
timeout='timeout', retry='retry')
def test_send_notification(self):
t = transport.Transport(_FakeDriver(cfg.CONF))