From 948c05417c7a44f85d3e77bae35e02f5c31834d7 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Fri, 21 Feb 2014 11:50:45 +0100 Subject: [PATCH] Add transport reconnection retries When a rpc client try to make a RPC call and the server is unreachable The rpc call hang until the server come back. In most case this is the desired behavior. But sometimes, we can prefer that the library raise an exception after a certain number of retries. For example in ceilometer, when publishing a storage.objects.incoming.bytes sample from the Swift middleware to an AMQP topic, you might not want to block the Swift client if the AMQP broker is unavailable - instead, you might have a queueing policy whereby if a single reconection attempt fails we queue the sample in memory and try again when another sample is to be published. This patch is the oslo.messaging part that allow this. Closes bug #1282639 Co-Authored-By: Ala Rezmerita Change-Id: I32086d0abf141c368343bf225d4b021da496c020 --- oslo/messaging/_drivers/amqpdriver.py | 15 ++-- oslo/messaging/_drivers/impl_fake.py | 5 +- oslo/messaging/_drivers/impl_qpid.py | 85 ++++++++++++------- oslo/messaging/_drivers/impl_rabbit.py | 113 ++++++++++++++----------- oslo/messaging/_drivers/impl_zmq.py | 5 +- oslo/messaging/exceptions.py | 7 +- oslo/messaging/rpc/client.py | 38 ++++++--- oslo/messaging/transport.py | 5 +- tests/test_qpid.py | 72 ++++++++++++++++ tests/test_rabbit.py | 58 ++++++++++--- tests/test_rpc_client.py | 39 ++++++++- tests/test_transport.py | 6 +- 12 files changed, 326 insertions(+), 122 deletions(-) diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index d990e90a7..6d0b583fc 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -332,7 +332,7 @@ class AMQPDriverBase(base.BaseDriver): def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None, - envelope=True, notify=False): + envelope=True, notify=False, retry=None): # FIXME(markmc): remove this temporary hack class Context(object): @@ -364,15 +364,16 @@ class AMQPDriverBase(base.BaseDriver): with self._get_connection() as conn: if notify: conn.notify_send(self._get_exchange(target), - target.topic, msg) + target.topic, msg, retry=retry) elif target.fanout: - conn.fanout_send(target.topic, msg) + conn.fanout_send(target.topic, msg, retry=retry) else: topic = target.topic if target.server: topic = '%s.%s' % (target.topic, target.server) conn.topic_send(exchange_name=self._get_exchange(target), - topic=topic, msg=msg, timeout=timeout) + topic=topic, msg=msg, timeout=timeout, + retry=retry) if wait_for_reply: result = self._waiter.wait(msg_id, timeout) @@ -383,8 +384,10 @@ class AMQPDriverBase(base.BaseDriver): if wait_for_reply: self._waiter.unlisten(msg_id) - def send(self, target, ctxt, message, wait_for_reply=None, timeout=None): - return self._send(target, ctxt, message, wait_for_reply, timeout) + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, + retry=None): + return self._send(target, ctxt, message, wait_for_reply, timeout, + retry=retry) def send_notification(self, target, ctxt, message, version): return self._send(target, ctxt, message, diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index 2fa0bc494..b99c1b306 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -162,7 +162,10 @@ class FakeDriver(base.BaseDriver): return None - def send(self, target, ctxt, message, wait_for_reply=None, timeout=None): + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, + retry=None): + # NOTE(sileht): retry doesn't need to be implemented, the fake + # transport always works return self._send(target, ctxt, message, wait_for_reply, timeout) def send_notification(self, target, ctxt, message, version): diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index def074baf..fd36acffc 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -25,6 +25,7 @@ import six from oslo.messaging._drivers import amqp as rpc_amqp from oslo.messaging._drivers import amqpdriver from oslo.messaging._drivers import common as rpc_common +from oslo.messaging import exceptions from oslo.messaging.openstack.common import importutils from oslo.messaging.openstack.common import jsonutils from oslo.messaging.openstack.common import network_utils @@ -487,7 +488,7 @@ class Connection(object): self.reconnect() - def connection_create(self, broker): + def _connect(self, broker): # Create the connection - this does not open the connection self.connection = qpid_messaging.Connection(broker['host']) @@ -502,6 +503,7 @@ class Connection(object): self.connection.heartbeat = self.conf.qpid_heartbeat self.connection.transport = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay + self.connection.open() def _register_consumer(self, consumer): self.consumers[str(consumer.get_receiver())] = consumer @@ -509,29 +511,52 @@ class Connection(object): def _lookup_consumer(self, receiver): return self.consumers[str(receiver)] - def reconnect(self): - """Handles reconnecting and re-establishing sessions and queues.""" - delay = 1 - while True: - # Close the session if necessary - if self.connection is not None and self.connection.opened(): - try: - self.connection.close() - except qpid_exceptions.MessagingError: - pass - - broker = six.next(self.brokers) - + def _disconnect(self): + # Close the session if necessary + if self.connection is not None and self.connection.opened(): try: - self.connection_create(broker) - self.connection.open() + self.connection.close() + except qpid_exceptions.MessagingError: + pass + self.connection = None + + def reconnect(self, retry=None): + """Handles reconnecting and re-establishing sessions and queues. + Will retry up to retry number of times. + retry = None or -1 means to retry forever + retry = 0 means no retry + retry = N means N retries + """ + delay = 1 + attempt = 0 + loop_forever = False + if retry is None or retry < 0: + loop_forever = True + + while True: + self._disconnect() + + attempt += 1 + broker = six.next(self.brokers) + try: + self._connect(broker) except qpid_exceptions.MessagingError as e: - msg_dict = dict(e=e, delay=delay, broker=broker['host']) - msg = _("Unable to connect to AMQP server on %(broker)s: " - "%(e)s. Sleeping %(delay)s seconds") % msg_dict - LOG.error(msg) - time.sleep(delay) - delay = min(delay + 1, 5) + msg_dict = dict(e=e, + delay=delay, + retry=retry, + broker=broker) + if not loop_forever and attempt > retry: + msg = _('Unable to connect to AMQP server on ' + '%(broker)s after %(retry)d ' + 'tries: %(e)s') % msg_dict + LOG.error(msg) + raise exceptions.MessageDeliveryFailure(msg) + else: + msg = _("Unable to connect to AMQP server on %(broker)s: " + "%(e)s. Sleeping %(delay)s seconds") % msg_dict + LOG.error(msg) + time.sleep(delay) + delay = min(delay + 1, 5) else: LOG.info(_('Connected to AMQP server on %s'), broker['host']) break @@ -548,7 +573,7 @@ class Connection(object): LOG.debug("Re-established AMQP queues") - def ensure(self, error_callback, method, *args, **kwargs): + def ensure(self, error_callback, method, retry=None, *args, **kwargs): while True: try: return method(*args, **kwargs) @@ -556,7 +581,7 @@ class Connection(object): qpid_exceptions.MessagingError) as e: if error_callback: error_callback(e) - self.reconnect() + self.reconnect(retry=retry) def close(self): """Close/release this connection.""" @@ -614,7 +639,7 @@ class Connection(object): raise StopIteration yield self.ensure(_error_callback, _consume) - def publisher_send(self, cls, topic, msg, **kwargs): + def publisher_send(self, cls, topic, msg, retry=None, **kwargs): """Send to a publisher based on the publisher class.""" def _connect_error(exc): @@ -626,7 +651,7 @@ class Connection(object): publisher = cls(self.conf, self.session, topic=topic, **kwargs) publisher.send(msg) - return self.ensure(_connect_error, _publisher_send) + return self.ensure(_connect_error, _publisher_send, retry=retry) def declare_direct_consumer(self, topic, callback): """Create a 'direct' queue. @@ -652,7 +677,7 @@ class Connection(object): """Send a 'direct' message.""" self.publisher_send(DirectPublisher, topic=msg_id, msg=msg) - def topic_send(self, exchange_name, topic, msg, timeout=None): + def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): """Send a 'topic' message.""" # # We want to create a message with attributes, e.g. a TTL. We @@ -666,11 +691,11 @@ class Connection(object): # qpid_message = qpid_messaging.Message(content=msg, ttl=timeout) self.publisher_send(TopicPublisher, topic=topic, msg=qpid_message, - exchange_name=exchange_name) + exchange_name=exchange_name, retry=retry) - def fanout_send(self, topic, msg): + def fanout_send(self, topic, msg, retry=None): """Send a 'fanout' message.""" - self.publisher_send(FanoutPublisher, topic=topic, msg=msg) + self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry) def notify_send(self, exchange_name, topic, msg, **kwargs): """Send a notify message on a topic.""" diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index f7ca5e41c..cfd506d68 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -31,6 +31,7 @@ import six from oslo.messaging._drivers import amqp as rpc_amqp from oslo.messaging._drivers import amqpdriver from oslo.messaging._drivers import common as rpc_common +from oslo.messaging import exceptions from oslo.messaging.openstack.common import network_utils # FIXME(markmc): remove this @@ -539,27 +540,8 @@ class Connection(object): been declared before if we are reconnecting. Exceptions should be handled by the caller. """ - if self.connection: - LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % broker) - try: - # XXX(nic): when reconnecting to a RabbitMQ cluster - # with mirrored queues in use, the attempt to release the - # connection can hang "indefinitely" somewhere deep down - # in Kombu. Blocking the thread for a bit prior to - # release seems to kludge around the problem where it is - # otherwise reproduceable. - if self.conf.kombu_reconnect_delay > 0: - LOG.info(_("Delaying reconnect for %1.1f seconds...") % - self.conf.kombu_reconnect_delay) - time.sleep(self.conf.kombu_reconnect_delay) - - self.connection.release() - except self.connection_errors: - pass - # Setting this in case the next statement fails, though - # it shouldn't be doing any network operations, yet. - self.connection = None + LOG.info(_("Connecting to AMQP server on " + "%(hostname)s:%(port)d") % broker) self.connection = kombu.connection.BrokerConnection(**broker) self.connection_errors = self.connection.connection_errors self.channel_errors = self.connection.channel_errors @@ -578,17 +560,47 @@ class Connection(object): LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') % broker) - def reconnect(self): + def _disconnect(self): + if self.connection: + # XXX(nic): when reconnecting to a RabbitMQ cluster + # with mirrored queues in use, the attempt to release the + # connection can hang "indefinitely" somewhere deep down + # in Kombu. Blocking the thread for a bit prior to + # release seems to kludge around the problem where it is + # otherwise reproduceable. + if self.conf.kombu_reconnect_delay > 0: + LOG.info(_("Delaying reconnect for %1.1f seconds...") % + self.conf.kombu_reconnect_delay) + time.sleep(self.conf.kombu_reconnect_delay) + + try: + self.connection.release() + except self.connection_errors: + pass + self.connection = None + + def reconnect(self, retry=None): """Handles reconnecting and re-establishing queues. - Will retry up to self.max_retries number of times. - self.max_retries = 0 means to retry forever. + Will retry up to retry number of times. + retry = None means use the value of rabbit_max_retries + retry = -1 means to retry forever + retry = 0 means no retry + retry = N means N retries Sleep between tries, starting at self.interval_start seconds, backing off self.interval_stepping number of seconds each attempt. """ attempt = 0 + loop_forever = False + if retry is None: + retry = self.max_retries + if retry is None or retry < 0: + loop_forever = True + while True: + self._disconnect() + broker = six.next(self.brokers) attempt += 1 try: @@ -610,30 +622,30 @@ class Connection(object): log_info = {} log_info['err_str'] = e - log_info['max_retries'] = self.max_retries + log_info['retry'] = retry or 0 log_info.update(broker) - if self.max_retries and attempt == self.max_retries: + if not loop_forever and attempt > retry: msg = _('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' + '%(hostname)s:%(port)d after %(retry)d ' 'tries: %(err_str)s') % log_info LOG.error(msg) - raise rpc_common.RPCException(msg) + raise exceptions.MessageDeliveryFailure(msg) + else: + if attempt == 1: + sleep_time = self.interval_start or 1 + elif attempt > 1: + sleep_time += self.interval_stepping - if attempt == 1: - sleep_time = self.interval_start or 1 - elif attempt > 1: - sleep_time += self.interval_stepping - if self.interval_max: sleep_time = min(sleep_time, self.interval_max) - log_info['sleep_time'] = sleep_time - LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' - 'unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.') % log_info) - time.sleep(sleep_time) + log_info['sleep_time'] = sleep_time + LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' + 'unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) + time.sleep(sleep_time) - def ensure(self, error_callback, method, *args, **kwargs): + def ensure(self, error_callback, method, retry=None, *args, **kwargs): while True: try: return method(*args, **kwargs) @@ -657,7 +669,7 @@ class Connection(object): raise if error_callback: error_callback(e) - self.reconnect() + self.reconnect(retry=retry) def get_channel(self): """Convenience call for bin/clear_rabbit_queues.""" @@ -665,8 +677,9 @@ class Connection(object): def close(self): """Close/release this connection.""" - self.connection.release() - self.connection = None + if self.connection: + self.connection.release() + self.connection = None def reset(self): """Reset a connection so it can be used again.""" @@ -722,7 +735,8 @@ class Connection(object): raise StopIteration yield self.ensure(_error_callback, _consume) - def publisher_send(self, cls, topic, msg, timeout=None, **kwargs): + def publisher_send(self, cls, topic, msg, timeout=None, retry=None, + **kwargs): """Send to a publisher based on the publisher class.""" def _error_callback(exc): @@ -734,7 +748,7 @@ class Connection(object): publisher = cls(self.conf, self.channel, topic=topic, **kwargs) publisher.send(msg, timeout) - self.ensure(_error_callback, _publish) + self.ensure(_error_callback, _publish, retry=retry) def declare_direct_consumer(self, topic, callback): """Create a 'direct' queue. @@ -760,14 +774,14 @@ class Connection(object): """Send a 'direct' message.""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, exchange_name, topic, msg, timeout=None): + def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): """Send a 'topic' message.""" self.publisher_send(TopicPublisher, topic, msg, timeout, - exchange_name=exchange_name) + exchange_name=exchange_name, retry=retry) - def fanout_send(self, topic, msg): + def fanout_send(self, topic, msg, retry=None): """Send a 'fanout' message.""" - self.publisher_send(FanoutPublisher, topic, msg) + self.publisher_send(FanoutPublisher, topic, msg, retry=retry) def notify_send(self, exchange_name, topic, msg, **kwargs): """Send a notify message on a topic.""" @@ -786,7 +800,8 @@ class Connection(object): class RabbitDriver(amqpdriver.AMQPDriverBase): - def __init__(self, conf, url, default_exchange=None, + def __init__(self, conf, url, + default_exchange=None, allowed_remote_exmods=[]): conf.register_opts(rabbit_opts) conf.register_opts(rpc_amqp.amqp_opts) diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py index 210380afb..a98a0130e 100644 --- a/oslo/messaging/_drivers/impl_zmq.py +++ b/oslo/messaging/_drivers/impl_zmq.py @@ -939,7 +939,10 @@ class ZmqDriver(base.BaseDriver): if wait_for_reply: return reply[-1] - def send(self, target, ctxt, message, wait_for_reply=None, timeout=None): + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, + retry=None): + # NOTE(sileht): retry is not implemented because this driver never + # retry anything return self._send(target, ctxt, message, wait_for_reply, timeout) def send_notification(self, target, ctxt, message, version): diff --git a/oslo/messaging/exceptions.py b/oslo/messaging/exceptions.py index 7eba1c6b9..21d14e997 100644 --- a/oslo/messaging/exceptions.py +++ b/oslo/messaging/exceptions.py @@ -13,7 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. -__all__ = ['MessagingException', 'MessagingTimeout', 'InvalidTarget'] +__all__ = ['MessagingException', 'MessagingTimeout', 'MessageDeliveryFailure', + 'InvalidTarget'] class MessagingException(Exception): @@ -24,6 +25,10 @@ class MessagingTimeout(MessagingException): """Raised if message sending times out.""" +class MessageDeliveryFailure(MessagingException): + """Raised if message sending failed after the asked retry.""" + + class InvalidTarget(MessagingException, ValueError): """Raised if a target does not meet certain pre-conditions.""" diff --git a/oslo/messaging/rpc/client.py b/oslo/messaging/rpc/client.py index 30d24508d..8abb156c4 100644 --- a/oslo/messaging/rpc/client.py +++ b/oslo/messaging/rpc/client.py @@ -84,13 +84,14 @@ class _CallContext(object): _marker = object() def __init__(self, transport, target, serializer, - timeout=None, version_cap=None): + timeout=None, version_cap=None, retry=None): self.conf = transport.conf self.transport = transport self.target = target self.serializer = serializer self.timeout = timeout + self.retry = retry self.version_cap = version_cap super(_CallContext, self).__init__() @@ -129,7 +130,7 @@ class _CallContext(object): if self.version_cap: self._check_version_cap(msg.get('version')) try: - self.transport._send(self.target, ctxt, msg) + self.transport._send(self.target, ctxt, msg, retry=self.retry) except driver_base.TransportDriverError as ex: raise ClientSendError(self.target, ex) @@ -147,7 +148,8 @@ class _CallContext(object): try: result = self.transport._send(self.target, msg_ctxt, msg, - wait_for_reply=True, timeout=timeout) + wait_for_reply=True, timeout=timeout, + retry=self.retry) except driver_base.TransportDriverError as ex: raise ClientSendError(self.target, ex) return self.serializer.deserialize_entity(ctxt, result) @@ -156,7 +158,7 @@ class _CallContext(object): def _prepare(cls, base, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, - timeout=_marker, version_cap=_marker): + timeout=_marker, version_cap=_marker, retry=_marker): """Prepare a method invocation context. See RPCClient.prepare().""" kwargs = dict( exchange=exchange, @@ -171,21 +173,23 @@ class _CallContext(object): if timeout is cls._marker: timeout = base.timeout + if retry is cls._marker: + retry = base.retry if version_cap is cls._marker: version_cap = base.version_cap return _CallContext(base.transport, target, base.serializer, - timeout, version_cap) + timeout, version_cap, retry) def prepare(self, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, - timeout=_marker, version_cap=_marker): + timeout=_marker, version_cap=_marker, retry=_marker): """Prepare a method invocation context. See RPCClient.prepare().""" return self._prepare(self, exchange, topic, namespace, version, server, fanout, - timeout, version_cap) + timeout, version_cap, retry) class RPCClient(object): @@ -245,7 +249,7 @@ class RPCClient(object): """ def __init__(self, transport, target, - timeout=None, version_cap=None, serializer=None): + timeout=None, version_cap=None, serializer=None, retry=None): """Construct an RPC client. :param transport: a messaging transport handle @@ -258,6 +262,11 @@ class RPCClient(object): :type version_cap: str :param serializer: an optional entity serializer :type serializer: Serializer + :param retry: an optional default connection retries configuration + None or -1 means to retry forever + 0 means no retry + N means N retries + :type retry: int """ self.conf = transport.conf self.conf.register_opts(_client_opts) @@ -265,6 +274,7 @@ class RPCClient(object): self.transport = transport self.target = target self.timeout = timeout + self.retry = retry self.version_cap = version_cap self.serializer = serializer or msg_serializer.NoOpSerializer() @@ -274,7 +284,7 @@ class RPCClient(object): def prepare(self, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, - timeout=_marker, version_cap=_marker): + timeout=_marker, version_cap=_marker, retry=_marker): """Prepare a method invocation context. Use this method to override client properties for an individual method @@ -300,11 +310,16 @@ class RPCClient(object): :type timeout: int or float :param version_cap: raise a RPCVersionCapError version exceeds this cap :type version_cap: str + :param retry: an optional connection retries configuration + None or -1 means to retry forever + 0 means no retry + N means N retries + :type retry: int """ return _CallContext._prepare(self, exchange, topic, namespace, version, server, fanout, - timeout, version_cap) + timeout, version_cap, retry) def cast(self, ctxt, method, **kwargs): """Invoke a method and return immediately. @@ -321,6 +336,7 @@ class RPCClient(object): :type method: str :param kwargs: a dict of method arguments :type kwargs: dict + :raises: MessageDeliveryFailure """ self.prepare().cast(ctxt, method, **kwargs) @@ -356,7 +372,7 @@ class RPCClient(object): :type method: str :param kwargs: a dict of method arguments :type kwargs: dict - :raises: MessagingTimeout, RemoteError + :raises: MessagingTimeout, RemoteError, MessageDeliveryFailure """ return self.prepare().call(ctxt, method, **kwargs) diff --git a/oslo/messaging/transport.py b/oslo/messaging/transport.py index 51da01de7..9df148625 100644 --- a/oslo/messaging/transport.py +++ b/oslo/messaging/transport.py @@ -80,13 +80,14 @@ class Transport(object): def _require_driver_features(self, requeue=False): self._driver.require_features(requeue=requeue) - def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None): + def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None, + retry=None): if not target.topic: raise exceptions.InvalidTarget('A topic is required to send', target) return self._driver.send(target, ctxt, message, wait_for_reply=wait_for_reply, - timeout=timeout) + timeout=timeout, retry=retry) def _send_notification(self, target, ctxt, message, version): if not target.topic: diff --git a/tests/test_qpid.py b/tests/test_qpid.py index 976d5eb37..08b7a2226 100644 --- a/tests/test_qpid.py +++ b/tests/test_qpid.py @@ -753,3 +753,75 @@ _fake_session = FakeQpidSession() def get_fake_qpid_session(): return _fake_session + + +class QPidHATestCase(test_utils.BaseTestCase): + + def setUp(self): + super(QPidHATestCase, self).setUp() + self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] + + self.config(qpid_hosts=self.brokers, + qpid_username=None, + qpid_password=None) + + hostname_sets = set() + self.info = {'attempt': 0, + 'fail': False} + + def _connect(myself, broker): + # do as little work that is enough to pass connection attempt + myself.connection = mock.Mock() + hostname = broker['host'] + self.assertNotIn(hostname, hostname_sets) + hostname_sets.add(hostname) + + self.info['attempt'] += 1 + if self.info['fail']: + raise qpid.messaging.exceptions.ConnectionError + + # just make sure connection instantiation does not fail with an + # exception + self.stubs.Set(qpid_driver.Connection, '_connect', _connect) + + # starting from the first broker in the list + url = messaging.TransportURL.parse(self.conf, None) + self.connection = qpid_driver.Connection(self.conf, url) + self.addCleanup(self.connection.close) + + self.info.update({'attempt': 0, + 'fail': True}) + hostname_sets.clear() + + def test_reconnect_order(self): + self.assertRaises(messaging.MessageDeliveryFailure, + self.connection.reconnect, + retry=len(self.brokers) - 1) + self.assertEqual(len(self.brokers), self.info['attempt']) + + def test_ensure_four_retries(self): + mock_callback = mock.Mock( + side_effect=qpid.messaging.exceptions.ConnectionError) + self.assertRaises(messaging.MessageDeliveryFailure, + self.connection.ensure, None, mock_callback, + retry=4) + self.assertEqual(5, self.info['attempt']) + self.assertEqual(1, mock_callback.call_count) + + def test_ensure_one_retry(self): + mock_callback = mock.Mock( + side_effect=qpid.messaging.exceptions.ConnectionError) + self.assertRaises(messaging.MessageDeliveryFailure, + self.connection.ensure, None, mock_callback, + retry=1) + self.assertEqual(2, self.info['attempt']) + self.assertEqual(1, mock_callback.call_count) + + def test_ensure_no_retry(self): + mock_callback = mock.Mock( + side_effect=qpid.messaging.exceptions.ConnectionError) + self.assertRaises(messaging.MessageDeliveryFailure, + self.connection.ensure, None, mock_callback, + retry=0) + self.assertEqual(1, self.info['attempt']) + self.assertEqual(1, mock_callback.call_count) diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py index d42a0f507..f665b444e 100644 --- a/tests/test_rabbit.py +++ b/tests/test_rabbit.py @@ -20,6 +20,7 @@ import uuid import fixtures import kombu +import mock import testscenarios from oslo import messaging @@ -630,14 +631,14 @@ TestReplyWireFormat.generate_scenarios() class RpcKombuHATestCase(test_utils.BaseTestCase): - def test_reconnect_order(self): - brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] - brokers_count = len(brokers) - - self.config(rabbit_hosts=brokers, - rabbit_max_retries=1) + def setUp(self): + super(RpcKombuHATestCase, self).setUp() + self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] + self.config(rabbit_hosts=self.brokers) hostname_sets = set() + self.info = {'attempt': 0, + 'fail': False} def _connect(myself, params): # do as little work that is enough to pass connection attempt @@ -646,22 +647,51 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): hostname = params['hostname'] self.assertNotIn(hostname, hostname_sets) - hostname_sets.add(hostname) + self.info['attempt'] += 1 + if self.info['fail']: + raise IOError('fake fail') + # just make sure connection instantiation does not fail with an # exception self.stubs.Set(rabbit_driver.Connection, '_connect', _connect) # starting from the first broker in the list url = messaging.TransportURL.parse(self.conf, None) - connection = rabbit_driver.Connection(self.conf, url) + self.connection = rabbit_driver.Connection(self.conf, url) + self.addCleanup(self.connection.close) - # now that we have connection object, revert to the real 'connect' - # implementation - self.stubs.UnsetAll() + self.info.update({'attempt': 0, + 'fail': True}) + hostname_sets.clear() - for i in range(brokers_count): - self.assertRaises(driver_common.RPCException, connection.reconnect) + def test_reconnect_order(self): + self.assertRaises(messaging.MessageDeliveryFailure, + self.connection.reconnect, + retry=len(self.brokers) - 1) + self.assertEqual(len(self.brokers), self.info['attempt']) - connection.close() + def test_ensure_four_retry(self): + mock_callback = mock.Mock(side_effect=IOError) + self.assertRaises(messaging.MessageDeliveryFailure, + self.connection.ensure, None, mock_callback, + retry=4) + self.assertEqual(5, self.info['attempt']) + self.assertEqual(1, mock_callback.call_count) + + def test_ensure_one_retry(self): + mock_callback = mock.Mock(side_effect=IOError) + self.assertRaises(messaging.MessageDeliveryFailure, + self.connection.ensure, None, mock_callback, + retry=1) + self.assertEqual(2, self.info['attempt']) + self.assertEqual(1, mock_callback.call_count) + + def test_ensure_no_retry(self): + mock_callback = mock.Mock(side_effect=IOError) + self.assertRaises(messaging.MessageDeliveryFailure, + self.connection.ensure, None, mock_callback, + retry=0) + self.assertEqual(1, self.info['attempt']) + self.assertEqual(1, mock_callback.call_count) diff --git a/tests/test_rpc_client.py b/tests/test_rpc_client.py index c42a3455d..0735f86c4 100644 --- a/tests/test_rpc_client.py +++ b/tests/test_rpc_client.py @@ -56,7 +56,7 @@ class TestCastCall(test_utils.BaseTestCase): self.mox.StubOutWithMock(transport, '_send') msg = dict(method='foo', args=self.args) - kwargs = {} + kwargs = {'retry': None} if self.call: kwargs['wait_for_reply'] = True kwargs['timeout'] = None @@ -197,7 +197,7 @@ class TestCastToTarget(test_utils.BaseTestCase): msg['namespace'] = self.expect['namespace'] if 'version' in self.expect: msg['version'] = self.expect['version'] - transport._send(expect_target, {}, msg) + transport._send(expect_target, {}, msg, retry=None) self.mox.ReplayAll() @@ -243,7 +243,7 @@ class TestCallTimeout(test_utils.BaseTestCase): self.mox.StubOutWithMock(transport, '_send') msg = dict(method='foo', args={}) - kwargs = dict(wait_for_reply=True, timeout=self.expect) + kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None) transport._send(messaging.Target(), {}, msg, **kwargs) self.mox.ReplayAll() @@ -253,6 +253,36 @@ class TestCallTimeout(test_utils.BaseTestCase): client.call({}, 'foo') +class TestCallRetry(test_utils.BaseTestCase): + + scenarios = [ + ('all_none', dict(ctor=None, prepare=_notset, expect=None)), + ('ctor', dict(ctor=21, prepare=_notset, expect=21)), + ('ctor_zero', dict(ctor=0, prepare=_notset, expect=0)), + ('prepare', dict(ctor=None, prepare=21, expect=21)), + ('prepare_override', dict(ctor=10, prepare=21, expect=21)), + ('prepare_zero', dict(ctor=None, prepare=0, expect=0)), + ] + + def test_call_retry(self): + transport = _FakeTransport(self.conf) + client = messaging.RPCClient(transport, messaging.Target(), + retry=self.ctor) + + self.mox.StubOutWithMock(transport, '_send') + + msg = dict(method='foo', args={}) + kwargs = dict(wait_for_reply=True, timeout=60, + retry=self.expect) + transport._send(messaging.Target(), {}, msg, **kwargs) + + self.mox.ReplayAll() + + if self.prepare is not _notset: + client = client.prepare(retry=self.prepare) + client.call({}, 'foo') + + class TestSerializer(test_utils.BaseTestCase): scenarios = [ @@ -282,6 +312,7 @@ class TestSerializer(test_utils.BaseTestCase): msg = dict(method='foo', args=dict([(k, 's' + v) for k, v in self.args.items()])) kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {} + kwargs['retry'] = None transport._send(messaging.Target(), dict(user='alice'), msg, @@ -367,7 +398,7 @@ class TestVersionCap(test_utils.BaseTestCase): if target.version is not None: msg['version'] = target.version - kwargs = {} + kwargs = {'retry': None} if self.call: kwargs['wait_for_reply'] = True kwargs['timeout'] = None diff --git a/tests/test_transport.py b/tests/test_transport.py index 46f308ce9..8c8fab507 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -270,7 +270,7 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): self.mox.StubOutWithMock(t._driver, 'send') t._driver.send(self._target, 'ctxt', 'message', wait_for_reply=None, - timeout=None) + timeout=None, retry=None) self.mox.ReplayAll() t._send(self._target, 'ctxt', 'message') @@ -281,12 +281,12 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): self.mox.StubOutWithMock(t._driver, 'send') t._driver.send(self._target, 'ctxt', 'message', wait_for_reply='wait_for_reply', - timeout='timeout') + timeout='timeout', retry='retry') self.mox.ReplayAll() t._send(self._target, 'ctxt', 'message', wait_for_reply='wait_for_reply', - timeout='timeout') + timeout='timeout', retry='retry') def test_send_notification(self): t = transport.Transport(_FakeDriver(cfg.CONF))