Add transport reconnection retries
When a rpc client try to make a RPC call and the server is unreachable The rpc call hang until the server come back. In most case this is the desired behavior. But sometimes, we can prefer that the library raise an exception after a certain number of retries. For example in ceilometer, when publishing a storage.objects.incoming.bytes sample from the Swift middleware to an AMQP topic, you might not want to block the Swift client if the AMQP broker is unavailable - instead, you might have a queueing policy whereby if a single reconection attempt fails we queue the sample in memory and try again when another sample is to be published. This patch is the oslo.messaging part that allow this. Closes bug #1282639 Co-Authored-By: Ala Rezmerita <ala.rezmerita@cloudwatt.com> Change-Id: I32086d0abf141c368343bf225d4b021da496c020
This commit is contained in:
parent
7d927e5e10
commit
948c05417c
|
@ -332,7 +332,7 @@ class AMQPDriverBase(base.BaseDriver):
|
|||
|
||||
def _send(self, target, ctxt, message,
|
||||
wait_for_reply=None, timeout=None,
|
||||
envelope=True, notify=False):
|
||||
envelope=True, notify=False, retry=None):
|
||||
|
||||
# FIXME(markmc): remove this temporary hack
|
||||
class Context(object):
|
||||
|
@ -364,15 +364,16 @@ class AMQPDriverBase(base.BaseDriver):
|
|||
with self._get_connection() as conn:
|
||||
if notify:
|
||||
conn.notify_send(self._get_exchange(target),
|
||||
target.topic, msg)
|
||||
target.topic, msg, retry=retry)
|
||||
elif target.fanout:
|
||||
conn.fanout_send(target.topic, msg)
|
||||
conn.fanout_send(target.topic, msg, retry=retry)
|
||||
else:
|
||||
topic = target.topic
|
||||
if target.server:
|
||||
topic = '%s.%s' % (target.topic, target.server)
|
||||
conn.topic_send(exchange_name=self._get_exchange(target),
|
||||
topic=topic, msg=msg, timeout=timeout)
|
||||
topic=topic, msg=msg, timeout=timeout,
|
||||
retry=retry)
|
||||
|
||||
if wait_for_reply:
|
||||
result = self._waiter.wait(msg_id, timeout)
|
||||
|
@ -383,8 +384,10 @@ class AMQPDriverBase(base.BaseDriver):
|
|||
if wait_for_reply:
|
||||
self._waiter.unlisten(msg_id)
|
||||
|
||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
|
||||
return self._send(target, ctxt, message, wait_for_reply, timeout)
|
||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
||||
retry=None):
|
||||
return self._send(target, ctxt, message, wait_for_reply, timeout,
|
||||
retry=retry)
|
||||
|
||||
def send_notification(self, target, ctxt, message, version):
|
||||
return self._send(target, ctxt, message,
|
||||
|
|
|
@ -162,7 +162,10 @@ class FakeDriver(base.BaseDriver):
|
|||
|
||||
return None
|
||||
|
||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
|
||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
||||
retry=None):
|
||||
# NOTE(sileht): retry doesn't need to be implemented, the fake
|
||||
# transport always works
|
||||
return self._send(target, ctxt, message, wait_for_reply, timeout)
|
||||
|
||||
def send_notification(self, target, ctxt, message, version):
|
||||
|
|
|
@ -25,6 +25,7 @@ import six
|
|||
from oslo.messaging._drivers import amqp as rpc_amqp
|
||||
from oslo.messaging._drivers import amqpdriver
|
||||
from oslo.messaging._drivers import common as rpc_common
|
||||
from oslo.messaging import exceptions
|
||||
from oslo.messaging.openstack.common import importutils
|
||||
from oslo.messaging.openstack.common import jsonutils
|
||||
from oslo.messaging.openstack.common import network_utils
|
||||
|
@ -487,7 +488,7 @@ class Connection(object):
|
|||
|
||||
self.reconnect()
|
||||
|
||||
def connection_create(self, broker):
|
||||
def _connect(self, broker):
|
||||
# Create the connection - this does not open the connection
|
||||
self.connection = qpid_messaging.Connection(broker['host'])
|
||||
|
||||
|
@ -502,6 +503,7 @@ class Connection(object):
|
|||
self.connection.heartbeat = self.conf.qpid_heartbeat
|
||||
self.connection.transport = self.conf.qpid_protocol
|
||||
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
|
||||
self.connection.open()
|
||||
|
||||
def _register_consumer(self, consumer):
|
||||
self.consumers[str(consumer.get_receiver())] = consumer
|
||||
|
@ -509,29 +511,52 @@ class Connection(object):
|
|||
def _lookup_consumer(self, receiver):
|
||||
return self.consumers[str(receiver)]
|
||||
|
||||
def reconnect(self):
|
||||
"""Handles reconnecting and re-establishing sessions and queues."""
|
||||
delay = 1
|
||||
while True:
|
||||
# Close the session if necessary
|
||||
if self.connection is not None and self.connection.opened():
|
||||
try:
|
||||
self.connection.close()
|
||||
except qpid_exceptions.MessagingError:
|
||||
pass
|
||||
|
||||
broker = six.next(self.brokers)
|
||||
|
||||
def _disconnect(self):
|
||||
# Close the session if necessary
|
||||
if self.connection is not None and self.connection.opened():
|
||||
try:
|
||||
self.connection_create(broker)
|
||||
self.connection.open()
|
||||
self.connection.close()
|
||||
except qpid_exceptions.MessagingError:
|
||||
pass
|
||||
self.connection = None
|
||||
|
||||
def reconnect(self, retry=None):
|
||||
"""Handles reconnecting and re-establishing sessions and queues.
|
||||
Will retry up to retry number of times.
|
||||
retry = None or -1 means to retry forever
|
||||
retry = 0 means no retry
|
||||
retry = N means N retries
|
||||
"""
|
||||
delay = 1
|
||||
attempt = 0
|
||||
loop_forever = False
|
||||
if retry is None or retry < 0:
|
||||
loop_forever = True
|
||||
|
||||
while True:
|
||||
self._disconnect()
|
||||
|
||||
attempt += 1
|
||||
broker = six.next(self.brokers)
|
||||
try:
|
||||
self._connect(broker)
|
||||
except qpid_exceptions.MessagingError as e:
|
||||
msg_dict = dict(e=e, delay=delay, broker=broker['host'])
|
||||
msg = _("Unable to connect to AMQP server on %(broker)s: "
|
||||
"%(e)s. Sleeping %(delay)s seconds") % msg_dict
|
||||
LOG.error(msg)
|
||||
time.sleep(delay)
|
||||
delay = min(delay + 1, 5)
|
||||
msg_dict = dict(e=e,
|
||||
delay=delay,
|
||||
retry=retry,
|
||||
broker=broker)
|
||||
if not loop_forever and attempt > retry:
|
||||
msg = _('Unable to connect to AMQP server on '
|
||||
'%(broker)s after %(retry)d '
|
||||
'tries: %(e)s') % msg_dict
|
||||
LOG.error(msg)
|
||||
raise exceptions.MessageDeliveryFailure(msg)
|
||||
else:
|
||||
msg = _("Unable to connect to AMQP server on %(broker)s: "
|
||||
"%(e)s. Sleeping %(delay)s seconds") % msg_dict
|
||||
LOG.error(msg)
|
||||
time.sleep(delay)
|
||||
delay = min(delay + 1, 5)
|
||||
else:
|
||||
LOG.info(_('Connected to AMQP server on %s'), broker['host'])
|
||||
break
|
||||
|
@ -548,7 +573,7 @@ class Connection(object):
|
|||
|
||||
LOG.debug("Re-established AMQP queues")
|
||||
|
||||
def ensure(self, error_callback, method, *args, **kwargs):
|
||||
def ensure(self, error_callback, method, retry=None, *args, **kwargs):
|
||||
while True:
|
||||
try:
|
||||
return method(*args, **kwargs)
|
||||
|
@ -556,7 +581,7 @@ class Connection(object):
|
|||
qpid_exceptions.MessagingError) as e:
|
||||
if error_callback:
|
||||
error_callback(e)
|
||||
self.reconnect()
|
||||
self.reconnect(retry=retry)
|
||||
|
||||
def close(self):
|
||||
"""Close/release this connection."""
|
||||
|
@ -614,7 +639,7 @@ class Connection(object):
|
|||
raise StopIteration
|
||||
yield self.ensure(_error_callback, _consume)
|
||||
|
||||
def publisher_send(self, cls, topic, msg, **kwargs):
|
||||
def publisher_send(self, cls, topic, msg, retry=None, **kwargs):
|
||||
"""Send to a publisher based on the publisher class."""
|
||||
|
||||
def _connect_error(exc):
|
||||
|
@ -626,7 +651,7 @@ class Connection(object):
|
|||
publisher = cls(self.conf, self.session, topic=topic, **kwargs)
|
||||
publisher.send(msg)
|
||||
|
||||
return self.ensure(_connect_error, _publisher_send)
|
||||
return self.ensure(_connect_error, _publisher_send, retry=retry)
|
||||
|
||||
def declare_direct_consumer(self, topic, callback):
|
||||
"""Create a 'direct' queue.
|
||||
|
@ -652,7 +677,7 @@ class Connection(object):
|
|||
"""Send a 'direct' message."""
|
||||
self.publisher_send(DirectPublisher, topic=msg_id, msg=msg)
|
||||
|
||||
def topic_send(self, exchange_name, topic, msg, timeout=None):
|
||||
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
|
||||
"""Send a 'topic' message."""
|
||||
#
|
||||
# We want to create a message with attributes, e.g. a TTL. We
|
||||
|
@ -666,11 +691,11 @@ class Connection(object):
|
|||
#
|
||||
qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
|
||||
self.publisher_send(TopicPublisher, topic=topic, msg=qpid_message,
|
||||
exchange_name=exchange_name)
|
||||
exchange_name=exchange_name, retry=retry)
|
||||
|
||||
def fanout_send(self, topic, msg):
|
||||
def fanout_send(self, topic, msg, retry=None):
|
||||
"""Send a 'fanout' message."""
|
||||
self.publisher_send(FanoutPublisher, topic=topic, msg=msg)
|
||||
self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry)
|
||||
|
||||
def notify_send(self, exchange_name, topic, msg, **kwargs):
|
||||
"""Send a notify message on a topic."""
|
||||
|
|
|
@ -31,6 +31,7 @@ import six
|
|||
from oslo.messaging._drivers import amqp as rpc_amqp
|
||||
from oslo.messaging._drivers import amqpdriver
|
||||
from oslo.messaging._drivers import common as rpc_common
|
||||
from oslo.messaging import exceptions
|
||||
from oslo.messaging.openstack.common import network_utils
|
||||
|
||||
# FIXME(markmc): remove this
|
||||
|
@ -539,27 +540,8 @@ class Connection(object):
|
|||
been declared before if we are reconnecting. Exceptions should
|
||||
be handled by the caller.
|
||||
"""
|
||||
if self.connection:
|
||||
LOG.info(_("Reconnecting to AMQP server on "
|
||||
"%(hostname)s:%(port)d") % broker)
|
||||
try:
|
||||
# XXX(nic): when reconnecting to a RabbitMQ cluster
|
||||
# with mirrored queues in use, the attempt to release the
|
||||
# connection can hang "indefinitely" somewhere deep down
|
||||
# in Kombu. Blocking the thread for a bit prior to
|
||||
# release seems to kludge around the problem where it is
|
||||
# otherwise reproduceable.
|
||||
if self.conf.kombu_reconnect_delay > 0:
|
||||
LOG.info(_("Delaying reconnect for %1.1f seconds...") %
|
||||
self.conf.kombu_reconnect_delay)
|
||||
time.sleep(self.conf.kombu_reconnect_delay)
|
||||
|
||||
self.connection.release()
|
||||
except self.connection_errors:
|
||||
pass
|
||||
# Setting this in case the next statement fails, though
|
||||
# it shouldn't be doing any network operations, yet.
|
||||
self.connection = None
|
||||
LOG.info(_("Connecting to AMQP server on "
|
||||
"%(hostname)s:%(port)d") % broker)
|
||||
self.connection = kombu.connection.BrokerConnection(**broker)
|
||||
self.connection_errors = self.connection.connection_errors
|
||||
self.channel_errors = self.connection.channel_errors
|
||||
|
@ -578,17 +560,47 @@ class Connection(object):
|
|||
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
|
||||
broker)
|
||||
|
||||
def reconnect(self):
|
||||
def _disconnect(self):
|
||||
if self.connection:
|
||||
# XXX(nic): when reconnecting to a RabbitMQ cluster
|
||||
# with mirrored queues in use, the attempt to release the
|
||||
# connection can hang "indefinitely" somewhere deep down
|
||||
# in Kombu. Blocking the thread for a bit prior to
|
||||
# release seems to kludge around the problem where it is
|
||||
# otherwise reproduceable.
|
||||
if self.conf.kombu_reconnect_delay > 0:
|
||||
LOG.info(_("Delaying reconnect for %1.1f seconds...") %
|
||||
self.conf.kombu_reconnect_delay)
|
||||
time.sleep(self.conf.kombu_reconnect_delay)
|
||||
|
||||
try:
|
||||
self.connection.release()
|
||||
except self.connection_errors:
|
||||
pass
|
||||
self.connection = None
|
||||
|
||||
def reconnect(self, retry=None):
|
||||
"""Handles reconnecting and re-establishing queues.
|
||||
Will retry up to self.max_retries number of times.
|
||||
self.max_retries = 0 means to retry forever.
|
||||
Will retry up to retry number of times.
|
||||
retry = None means use the value of rabbit_max_retries
|
||||
retry = -1 means to retry forever
|
||||
retry = 0 means no retry
|
||||
retry = N means N retries
|
||||
Sleep between tries, starting at self.interval_start
|
||||
seconds, backing off self.interval_stepping number of seconds
|
||||
each attempt.
|
||||
"""
|
||||
|
||||
attempt = 0
|
||||
loop_forever = False
|
||||
if retry is None:
|
||||
retry = self.max_retries
|
||||
if retry is None or retry < 0:
|
||||
loop_forever = True
|
||||
|
||||
while True:
|
||||
self._disconnect()
|
||||
|
||||
broker = six.next(self.brokers)
|
||||
attempt += 1
|
||||
try:
|
||||
|
@ -610,30 +622,30 @@ class Connection(object):
|
|||
|
||||
log_info = {}
|
||||
log_info['err_str'] = e
|
||||
log_info['max_retries'] = self.max_retries
|
||||
log_info['retry'] = retry or 0
|
||||
log_info.update(broker)
|
||||
|
||||
if self.max_retries and attempt == self.max_retries:
|
||||
if not loop_forever and attempt > retry:
|
||||
msg = _('Unable to connect to AMQP server on '
|
||||
'%(hostname)s:%(port)d after %(max_retries)d '
|
||||
'%(hostname)s:%(port)d after %(retry)d '
|
||||
'tries: %(err_str)s') % log_info
|
||||
LOG.error(msg)
|
||||
raise rpc_common.RPCException(msg)
|
||||
raise exceptions.MessageDeliveryFailure(msg)
|
||||
else:
|
||||
if attempt == 1:
|
||||
sleep_time = self.interval_start or 1
|
||||
elif attempt > 1:
|
||||
sleep_time += self.interval_stepping
|
||||
|
||||
if attempt == 1:
|
||||
sleep_time = self.interval_start or 1
|
||||
elif attempt > 1:
|
||||
sleep_time += self.interval_stepping
|
||||
if self.interval_max:
|
||||
sleep_time = min(sleep_time, self.interval_max)
|
||||
|
||||
log_info['sleep_time'] = sleep_time
|
||||
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
|
||||
'unreachable: %(err_str)s. Trying again in '
|
||||
'%(sleep_time)d seconds.') % log_info)
|
||||
time.sleep(sleep_time)
|
||||
log_info['sleep_time'] = sleep_time
|
||||
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
|
||||
'unreachable: %(err_str)s. Trying again in '
|
||||
'%(sleep_time)d seconds.') % log_info)
|
||||
time.sleep(sleep_time)
|
||||
|
||||
def ensure(self, error_callback, method, *args, **kwargs):
|
||||
def ensure(self, error_callback, method, retry=None, *args, **kwargs):
|
||||
while True:
|
||||
try:
|
||||
return method(*args, **kwargs)
|
||||
|
@ -657,7 +669,7 @@ class Connection(object):
|
|||
raise
|
||||
if error_callback:
|
||||
error_callback(e)
|
||||
self.reconnect()
|
||||
self.reconnect(retry=retry)
|
||||
|
||||
def get_channel(self):
|
||||
"""Convenience call for bin/clear_rabbit_queues."""
|
||||
|
@ -665,8 +677,9 @@ class Connection(object):
|
|||
|
||||
def close(self):
|
||||
"""Close/release this connection."""
|
||||
self.connection.release()
|
||||
self.connection = None
|
||||
if self.connection:
|
||||
self.connection.release()
|
||||
self.connection = None
|
||||
|
||||
def reset(self):
|
||||
"""Reset a connection so it can be used again."""
|
||||
|
@ -722,7 +735,8 @@ class Connection(object):
|
|||
raise StopIteration
|
||||
yield self.ensure(_error_callback, _consume)
|
||||
|
||||
def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
|
||||
def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
|
||||
**kwargs):
|
||||
"""Send to a publisher based on the publisher class."""
|
||||
|
||||
def _error_callback(exc):
|
||||
|
@ -734,7 +748,7 @@ class Connection(object):
|
|||
publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
|
||||
publisher.send(msg, timeout)
|
||||
|
||||
self.ensure(_error_callback, _publish)
|
||||
self.ensure(_error_callback, _publish, retry=retry)
|
||||
|
||||
def declare_direct_consumer(self, topic, callback):
|
||||
"""Create a 'direct' queue.
|
||||
|
@ -760,14 +774,14 @@ class Connection(object):
|
|||
"""Send a 'direct' message."""
|
||||
self.publisher_send(DirectPublisher, msg_id, msg)
|
||||
|
||||
def topic_send(self, exchange_name, topic, msg, timeout=None):
|
||||
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
|
||||
"""Send a 'topic' message."""
|
||||
self.publisher_send(TopicPublisher, topic, msg, timeout,
|
||||
exchange_name=exchange_name)
|
||||
exchange_name=exchange_name, retry=retry)
|
||||
|
||||
def fanout_send(self, topic, msg):
|
||||
def fanout_send(self, topic, msg, retry=None):
|
||||
"""Send a 'fanout' message."""
|
||||
self.publisher_send(FanoutPublisher, topic, msg)
|
||||
self.publisher_send(FanoutPublisher, topic, msg, retry=retry)
|
||||
|
||||
def notify_send(self, exchange_name, topic, msg, **kwargs):
|
||||
"""Send a notify message on a topic."""
|
||||
|
@ -786,7 +800,8 @@ class Connection(object):
|
|||
|
||||
class RabbitDriver(amqpdriver.AMQPDriverBase):
|
||||
|
||||
def __init__(self, conf, url, default_exchange=None,
|
||||
def __init__(self, conf, url,
|
||||
default_exchange=None,
|
||||
allowed_remote_exmods=[]):
|
||||
conf.register_opts(rabbit_opts)
|
||||
conf.register_opts(rpc_amqp.amqp_opts)
|
||||
|
|
|
@ -939,7 +939,10 @@ class ZmqDriver(base.BaseDriver):
|
|||
if wait_for_reply:
|
||||
return reply[-1]
|
||||
|
||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
|
||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
||||
retry=None):
|
||||
# NOTE(sileht): retry is not implemented because this driver never
|
||||
# retry anything
|
||||
return self._send(target, ctxt, message, wait_for_reply, timeout)
|
||||
|
||||
def send_notification(self, target, ctxt, message, version):
|
||||
|
|
|
@ -13,7 +13,8 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
__all__ = ['MessagingException', 'MessagingTimeout', 'InvalidTarget']
|
||||
__all__ = ['MessagingException', 'MessagingTimeout', 'MessageDeliveryFailure',
|
||||
'InvalidTarget']
|
||||
|
||||
|
||||
class MessagingException(Exception):
|
||||
|
@ -24,6 +25,10 @@ class MessagingTimeout(MessagingException):
|
|||
"""Raised if message sending times out."""
|
||||
|
||||
|
||||
class MessageDeliveryFailure(MessagingException):
|
||||
"""Raised if message sending failed after the asked retry."""
|
||||
|
||||
|
||||
class InvalidTarget(MessagingException, ValueError):
|
||||
"""Raised if a target does not meet certain pre-conditions."""
|
||||
|
||||
|
|
|
@ -84,13 +84,14 @@ class _CallContext(object):
|
|||
_marker = object()
|
||||
|
||||
def __init__(self, transport, target, serializer,
|
||||
timeout=None, version_cap=None):
|
||||
timeout=None, version_cap=None, retry=None):
|
||||
self.conf = transport.conf
|
||||
|
||||
self.transport = transport
|
||||
self.target = target
|
||||
self.serializer = serializer
|
||||
self.timeout = timeout
|
||||
self.retry = retry
|
||||
self.version_cap = version_cap
|
||||
|
||||
super(_CallContext, self).__init__()
|
||||
|
@ -129,7 +130,7 @@ class _CallContext(object):
|
|||
if self.version_cap:
|
||||
self._check_version_cap(msg.get('version'))
|
||||
try:
|
||||
self.transport._send(self.target, ctxt, msg)
|
||||
self.transport._send(self.target, ctxt, msg, retry=self.retry)
|
||||
except driver_base.TransportDriverError as ex:
|
||||
raise ClientSendError(self.target, ex)
|
||||
|
||||
|
@ -147,7 +148,8 @@ class _CallContext(object):
|
|||
|
||||
try:
|
||||
result = self.transport._send(self.target, msg_ctxt, msg,
|
||||
wait_for_reply=True, timeout=timeout)
|
||||
wait_for_reply=True, timeout=timeout,
|
||||
retry=self.retry)
|
||||
except driver_base.TransportDriverError as ex:
|
||||
raise ClientSendError(self.target, ex)
|
||||
return self.serializer.deserialize_entity(ctxt, result)
|
||||
|
@ -156,7 +158,7 @@ class _CallContext(object):
|
|||
def _prepare(cls, base,
|
||||
exchange=_marker, topic=_marker, namespace=_marker,
|
||||
version=_marker, server=_marker, fanout=_marker,
|
||||
timeout=_marker, version_cap=_marker):
|
||||
timeout=_marker, version_cap=_marker, retry=_marker):
|
||||
"""Prepare a method invocation context. See RPCClient.prepare()."""
|
||||
kwargs = dict(
|
||||
exchange=exchange,
|
||||
|
@ -171,21 +173,23 @@ class _CallContext(object):
|
|||
|
||||
if timeout is cls._marker:
|
||||
timeout = base.timeout
|
||||
if retry is cls._marker:
|
||||
retry = base.retry
|
||||
if version_cap is cls._marker:
|
||||
version_cap = base.version_cap
|
||||
|
||||
return _CallContext(base.transport, target,
|
||||
base.serializer,
|
||||
timeout, version_cap)
|
||||
timeout, version_cap, retry)
|
||||
|
||||
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
|
||||
version=_marker, server=_marker, fanout=_marker,
|
||||
timeout=_marker, version_cap=_marker):
|
||||
timeout=_marker, version_cap=_marker, retry=_marker):
|
||||
"""Prepare a method invocation context. See RPCClient.prepare()."""
|
||||
return self._prepare(self,
|
||||
exchange, topic, namespace,
|
||||
version, server, fanout,
|
||||
timeout, version_cap)
|
||||
timeout, version_cap, retry)
|
||||
|
||||
|
||||
class RPCClient(object):
|
||||
|
@ -245,7 +249,7 @@ class RPCClient(object):
|
|||
"""
|
||||
|
||||
def __init__(self, transport, target,
|
||||
timeout=None, version_cap=None, serializer=None):
|
||||
timeout=None, version_cap=None, serializer=None, retry=None):
|
||||
"""Construct an RPC client.
|
||||
|
||||
:param transport: a messaging transport handle
|
||||
|
@ -258,6 +262,11 @@ class RPCClient(object):
|
|||
:type version_cap: str
|
||||
:param serializer: an optional entity serializer
|
||||
:type serializer: Serializer
|
||||
:param retry: an optional default connection retries configuration
|
||||
None or -1 means to retry forever
|
||||
0 means no retry
|
||||
N means N retries
|
||||
:type retry: int
|
||||
"""
|
||||
self.conf = transport.conf
|
||||
self.conf.register_opts(_client_opts)
|
||||
|
@ -265,6 +274,7 @@ class RPCClient(object):
|
|||
self.transport = transport
|
||||
self.target = target
|
||||
self.timeout = timeout
|
||||
self.retry = retry
|
||||
self.version_cap = version_cap
|
||||
self.serializer = serializer or msg_serializer.NoOpSerializer()
|
||||
|
||||
|
@ -274,7 +284,7 @@ class RPCClient(object):
|
|||
|
||||
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
|
||||
version=_marker, server=_marker, fanout=_marker,
|
||||
timeout=_marker, version_cap=_marker):
|
||||
timeout=_marker, version_cap=_marker, retry=_marker):
|
||||
"""Prepare a method invocation context.
|
||||
|
||||
Use this method to override client properties for an individual method
|
||||
|
@ -300,11 +310,16 @@ class RPCClient(object):
|
|||
:type timeout: int or float
|
||||
:param version_cap: raise a RPCVersionCapError version exceeds this cap
|
||||
:type version_cap: str
|
||||
:param retry: an optional connection retries configuration
|
||||
None or -1 means to retry forever
|
||||
0 means no retry
|
||||
N means N retries
|
||||
:type retry: int
|
||||
"""
|
||||
return _CallContext._prepare(self,
|
||||
exchange, topic, namespace,
|
||||
version, server, fanout,
|
||||
timeout, version_cap)
|
||||
timeout, version_cap, retry)
|
||||
|
||||
def cast(self, ctxt, method, **kwargs):
|
||||
"""Invoke a method and return immediately.
|
||||
|
@ -321,6 +336,7 @@ class RPCClient(object):
|
|||
:type method: str
|
||||
:param kwargs: a dict of method arguments
|
||||
:type kwargs: dict
|
||||
:raises: MessageDeliveryFailure
|
||||
"""
|
||||
self.prepare().cast(ctxt, method, **kwargs)
|
||||
|
||||
|
@ -356,7 +372,7 @@ class RPCClient(object):
|
|||
:type method: str
|
||||
:param kwargs: a dict of method arguments
|
||||
:type kwargs: dict
|
||||
:raises: MessagingTimeout, RemoteError
|
||||
:raises: MessagingTimeout, RemoteError, MessageDeliveryFailure
|
||||
"""
|
||||
return self.prepare().call(ctxt, method, **kwargs)
|
||||
|
||||
|
|
|
@ -80,13 +80,14 @@ class Transport(object):
|
|||
def _require_driver_features(self, requeue=False):
|
||||
self._driver.require_features(requeue=requeue)
|
||||
|
||||
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
|
||||
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
||||
retry=None):
|
||||
if not target.topic:
|
||||
raise exceptions.InvalidTarget('A topic is required to send',
|
||||
target)
|
||||
return self._driver.send(target, ctxt, message,
|
||||
wait_for_reply=wait_for_reply,
|
||||
timeout=timeout)
|
||||
timeout=timeout, retry=retry)
|
||||
|
||||
def _send_notification(self, target, ctxt, message, version):
|
||||
if not target.topic:
|
||||
|
|
|
@ -753,3 +753,75 @@ _fake_session = FakeQpidSession()
|
|||
|
||||
def get_fake_qpid_session():
|
||||
return _fake_session
|
||||
|
||||
|
||||
class QPidHATestCase(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(QPidHATestCase, self).setUp()
|
||||
self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
|
||||
|
||||
self.config(qpid_hosts=self.brokers,
|
||||
qpid_username=None,
|
||||
qpid_password=None)
|
||||
|
||||
hostname_sets = set()
|
||||
self.info = {'attempt': 0,
|
||||
'fail': False}
|
||||
|
||||
def _connect(myself, broker):
|
||||
# do as little work that is enough to pass connection attempt
|
||||
myself.connection = mock.Mock()
|
||||
hostname = broker['host']
|
||||
self.assertNotIn(hostname, hostname_sets)
|
||||
hostname_sets.add(hostname)
|
||||
|
||||
self.info['attempt'] += 1
|
||||
if self.info['fail']:
|
||||
raise qpid.messaging.exceptions.ConnectionError
|
||||
|
||||
# just make sure connection instantiation does not fail with an
|
||||
# exception
|
||||
self.stubs.Set(qpid_driver.Connection, '_connect', _connect)
|
||||
|
||||
# starting from the first broker in the list
|
||||
url = messaging.TransportURL.parse(self.conf, None)
|
||||
self.connection = qpid_driver.Connection(self.conf, url)
|
||||
self.addCleanup(self.connection.close)
|
||||
|
||||
self.info.update({'attempt': 0,
|
||||
'fail': True})
|
||||
hostname_sets.clear()
|
||||
|
||||
def test_reconnect_order(self):
|
||||
self.assertRaises(messaging.MessageDeliveryFailure,
|
||||
self.connection.reconnect,
|
||||
retry=len(self.brokers) - 1)
|
||||
self.assertEqual(len(self.brokers), self.info['attempt'])
|
||||
|
||||
def test_ensure_four_retries(self):
|
||||
mock_callback = mock.Mock(
|
||||
side_effect=qpid.messaging.exceptions.ConnectionError)
|
||||
self.assertRaises(messaging.MessageDeliveryFailure,
|
||||
self.connection.ensure, None, mock_callback,
|
||||
retry=4)
|
||||
self.assertEqual(5, self.info['attempt'])
|
||||
self.assertEqual(1, mock_callback.call_count)
|
||||
|
||||
def test_ensure_one_retry(self):
|
||||
mock_callback = mock.Mock(
|
||||
side_effect=qpid.messaging.exceptions.ConnectionError)
|
||||
self.assertRaises(messaging.MessageDeliveryFailure,
|
||||
self.connection.ensure, None, mock_callback,
|
||||
retry=1)
|
||||
self.assertEqual(2, self.info['attempt'])
|
||||
self.assertEqual(1, mock_callback.call_count)
|
||||
|
||||
def test_ensure_no_retry(self):
|
||||
mock_callback = mock.Mock(
|
||||
side_effect=qpid.messaging.exceptions.ConnectionError)
|
||||
self.assertRaises(messaging.MessageDeliveryFailure,
|
||||
self.connection.ensure, None, mock_callback,
|
||||
retry=0)
|
||||
self.assertEqual(1, self.info['attempt'])
|
||||
self.assertEqual(1, mock_callback.call_count)
|
||||
|
|
|
@ -20,6 +20,7 @@ import uuid
|
|||
|
||||
import fixtures
|
||||
import kombu
|
||||
import mock
|
||||
import testscenarios
|
||||
|
||||
from oslo import messaging
|
||||
|
@ -630,14 +631,14 @@ TestReplyWireFormat.generate_scenarios()
|
|||
|
||||
class RpcKombuHATestCase(test_utils.BaseTestCase):
|
||||
|
||||
def test_reconnect_order(self):
|
||||
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
|
||||
brokers_count = len(brokers)
|
||||
|
||||
self.config(rabbit_hosts=brokers,
|
||||
rabbit_max_retries=1)
|
||||
def setUp(self):
|
||||
super(RpcKombuHATestCase, self).setUp()
|
||||
self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
|
||||
self.config(rabbit_hosts=self.brokers)
|
||||
|
||||
hostname_sets = set()
|
||||
self.info = {'attempt': 0,
|
||||
'fail': False}
|
||||
|
||||
def _connect(myself, params):
|
||||
# do as little work that is enough to pass connection attempt
|
||||
|
@ -646,22 +647,51 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
|
|||
|
||||
hostname = params['hostname']
|
||||
self.assertNotIn(hostname, hostname_sets)
|
||||
|
||||
hostname_sets.add(hostname)
|
||||
|
||||
self.info['attempt'] += 1
|
||||
if self.info['fail']:
|
||||
raise IOError('fake fail')
|
||||
|
||||
# just make sure connection instantiation does not fail with an
|
||||
# exception
|
||||
self.stubs.Set(rabbit_driver.Connection, '_connect', _connect)
|
||||
|
||||
# starting from the first broker in the list
|
||||
url = messaging.TransportURL.parse(self.conf, None)
|
||||
connection = rabbit_driver.Connection(self.conf, url)
|
||||
self.connection = rabbit_driver.Connection(self.conf, url)
|
||||
self.addCleanup(self.connection.close)
|
||||
|
||||
# now that we have connection object, revert to the real 'connect'
|
||||
# implementation
|
||||
self.stubs.UnsetAll()
|
||||
self.info.update({'attempt': 0,
|
||||
'fail': True})
|
||||
hostname_sets.clear()
|
||||
|
||||
for i in range(brokers_count):
|
||||
self.assertRaises(driver_common.RPCException, connection.reconnect)
|
||||
def test_reconnect_order(self):
|
||||
self.assertRaises(messaging.MessageDeliveryFailure,
|
||||
self.connection.reconnect,
|
||||
retry=len(self.brokers) - 1)
|
||||
self.assertEqual(len(self.brokers), self.info['attempt'])
|
||||
|
||||
connection.close()
|
||||
def test_ensure_four_retry(self):
|
||||
mock_callback = mock.Mock(side_effect=IOError)
|
||||
self.assertRaises(messaging.MessageDeliveryFailure,
|
||||
self.connection.ensure, None, mock_callback,
|
||||
retry=4)
|
||||
self.assertEqual(5, self.info['attempt'])
|
||||
self.assertEqual(1, mock_callback.call_count)
|
||||
|
||||
def test_ensure_one_retry(self):
|
||||
mock_callback = mock.Mock(side_effect=IOError)
|
||||
self.assertRaises(messaging.MessageDeliveryFailure,
|
||||
self.connection.ensure, None, mock_callback,
|
||||
retry=1)
|
||||
self.assertEqual(2, self.info['attempt'])
|
||||
self.assertEqual(1, mock_callback.call_count)
|
||||
|
||||
def test_ensure_no_retry(self):
|
||||
mock_callback = mock.Mock(side_effect=IOError)
|
||||
self.assertRaises(messaging.MessageDeliveryFailure,
|
||||
self.connection.ensure, None, mock_callback,
|
||||
retry=0)
|
||||
self.assertEqual(1, self.info['attempt'])
|
||||
self.assertEqual(1, mock_callback.call_count)
|
||||
|
|
|
@ -56,7 +56,7 @@ class TestCastCall(test_utils.BaseTestCase):
|
|||
self.mox.StubOutWithMock(transport, '_send')
|
||||
|
||||
msg = dict(method='foo', args=self.args)
|
||||
kwargs = {}
|
||||
kwargs = {'retry': None}
|
||||
if self.call:
|
||||
kwargs['wait_for_reply'] = True
|
||||
kwargs['timeout'] = None
|
||||
|
@ -197,7 +197,7 @@ class TestCastToTarget(test_utils.BaseTestCase):
|
|||
msg['namespace'] = self.expect['namespace']
|
||||
if 'version' in self.expect:
|
||||
msg['version'] = self.expect['version']
|
||||
transport._send(expect_target, {}, msg)
|
||||
transport._send(expect_target, {}, msg, retry=None)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
|
||||
|
@ -243,7 +243,7 @@ class TestCallTimeout(test_utils.BaseTestCase):
|
|||
self.mox.StubOutWithMock(transport, '_send')
|
||||
|
||||
msg = dict(method='foo', args={})
|
||||
kwargs = dict(wait_for_reply=True, timeout=self.expect)
|
||||
kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None)
|
||||
transport._send(messaging.Target(), {}, msg, **kwargs)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
|
@ -253,6 +253,36 @@ class TestCallTimeout(test_utils.BaseTestCase):
|
|||
client.call({}, 'foo')
|
||||
|
||||
|
||||
class TestCallRetry(test_utils.BaseTestCase):
|
||||
|
||||
scenarios = [
|
||||
('all_none', dict(ctor=None, prepare=_notset, expect=None)),
|
||||
('ctor', dict(ctor=21, prepare=_notset, expect=21)),
|
||||
('ctor_zero', dict(ctor=0, prepare=_notset, expect=0)),
|
||||
('prepare', dict(ctor=None, prepare=21, expect=21)),
|
||||
('prepare_override', dict(ctor=10, prepare=21, expect=21)),
|
||||
('prepare_zero', dict(ctor=None, prepare=0, expect=0)),
|
||||
]
|
||||
|
||||
def test_call_retry(self):
|
||||
transport = _FakeTransport(self.conf)
|
||||
client = messaging.RPCClient(transport, messaging.Target(),
|
||||
retry=self.ctor)
|
||||
|
||||
self.mox.StubOutWithMock(transport, '_send')
|
||||
|
||||
msg = dict(method='foo', args={})
|
||||
kwargs = dict(wait_for_reply=True, timeout=60,
|
||||
retry=self.expect)
|
||||
transport._send(messaging.Target(), {}, msg, **kwargs)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
|
||||
if self.prepare is not _notset:
|
||||
client = client.prepare(retry=self.prepare)
|
||||
client.call({}, 'foo')
|
||||
|
||||
|
||||
class TestSerializer(test_utils.BaseTestCase):
|
||||
|
||||
scenarios = [
|
||||
|
@ -282,6 +312,7 @@ class TestSerializer(test_utils.BaseTestCase):
|
|||
msg = dict(method='foo',
|
||||
args=dict([(k, 's' + v) for k, v in self.args.items()]))
|
||||
kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {}
|
||||
kwargs['retry'] = None
|
||||
transport._send(messaging.Target(),
|
||||
dict(user='alice'),
|
||||
msg,
|
||||
|
@ -367,7 +398,7 @@ class TestVersionCap(test_utils.BaseTestCase):
|
|||
if target.version is not None:
|
||||
msg['version'] = target.version
|
||||
|
||||
kwargs = {}
|
||||
kwargs = {'retry': None}
|
||||
if self.call:
|
||||
kwargs['wait_for_reply'] = True
|
||||
kwargs['timeout'] = None
|
||||
|
|
|
@ -270,7 +270,7 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
|
|||
self.mox.StubOutWithMock(t._driver, 'send')
|
||||
t._driver.send(self._target, 'ctxt', 'message',
|
||||
wait_for_reply=None,
|
||||
timeout=None)
|
||||
timeout=None, retry=None)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
t._send(self._target, 'ctxt', 'message')
|
||||
|
@ -281,12 +281,12 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
|
|||
self.mox.StubOutWithMock(t._driver, 'send')
|
||||
t._driver.send(self._target, 'ctxt', 'message',
|
||||
wait_for_reply='wait_for_reply',
|
||||
timeout='timeout')
|
||||
timeout='timeout', retry='retry')
|
||||
self.mox.ReplayAll()
|
||||
|
||||
t._send(self._target, 'ctxt', 'message',
|
||||
wait_for_reply='wait_for_reply',
|
||||
timeout='timeout')
|
||||
timeout='timeout', retry='retry')
|
||||
|
||||
def test_send_notification(self):
|
||||
t = transport.Transport(_FakeDriver(cfg.CONF))
|
||||
|
|
Loading…
Reference in New Issue