Fix _poll_connection not timeout issue (1/2)
_poll_connection could fall into a loop waiting for a reply message, if
rabbit dies and up. This commit will set up a rpc_response_timeout timer
for one connection polling; so the rpc will finally jump out with a
timeout exception which is expected in such scenario.
Related bug: #1338732
The title of the commit in master was:
rabbit: more precise iterconsume timeout
but this was changed since it didn't describe the actual change.
This commit resolved some conflicts due to cherry-pick.
Change-Id: I157dab80cdb4afcf9a5f26fa900f96f0696db502
(cherry picked from commit 023b7f44e2
)
This commit is contained in:
parent
569046e426
commit
858353394d
|
@ -26,6 +26,7 @@ from oslo import messaging
|
|||
from oslo.messaging._drivers import amqp as rpc_amqp
|
||||
from oslo.messaging._drivers import base
|
||||
from oslo.messaging._drivers import common as rpc_common
|
||||
from oslo.messaging.openstack.common.gettextutils import _
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -202,6 +203,11 @@ class ReplyWaiter(object):
|
|||
def unlisten(self, msg_id):
|
||||
self.waiters.remove(msg_id)
|
||||
|
||||
@staticmethod
|
||||
def _raise_timeout_exception(msg_id):
|
||||
raise messaging.MessagingTimeout(
|
||||
_('Timed out waiting for a reply to message ID %s.') % msg_id)
|
||||
|
||||
def _process_reply(self, data):
|
||||
result = None
|
||||
ending = False
|
||||
|
@ -216,7 +222,7 @@ class ReplyWaiter(object):
|
|||
result = data['result']
|
||||
return result, ending
|
||||
|
||||
def _poll_connection(self, msg_id, timeout):
|
||||
def _poll_connection(self, msg_id, timer):
|
||||
while True:
|
||||
while self.incoming:
|
||||
message_data = self.incoming.pop(0)
|
||||
|
@ -227,15 +233,15 @@ class ReplyWaiter(object):
|
|||
|
||||
self.waiters.put(incoming_msg_id, message_data)
|
||||
|
||||
timeout = timer.check_return(self._raise_timeout_exception, msg_id)
|
||||
try:
|
||||
self.conn.consume(limit=1, timeout=timeout)
|
||||
except rpc_common.Timeout:
|
||||
raise messaging.MessagingTimeout('Timed out waiting for a '
|
||||
'reply to message ID %s'
|
||||
% msg_id)
|
||||
self._raise_timeout_exception(msg_id)
|
||||
|
||||
def _poll_queue(self, msg_id, timeout):
|
||||
message = self.waiters.get(msg_id, timeout)
|
||||
def _poll_queue(self, msg_id, timer):
|
||||
timeout = timer.check_return(self._raise_timeout_exception, msg_id)
|
||||
message = self.waiters.get(msg_id, timeout=timeout)
|
||||
if message is self.waiters.WAKE_UP:
|
||||
return None, None, True # lock was released
|
||||
|
||||
|
@ -264,6 +270,8 @@ class ReplyWaiter(object):
|
|||
# have the first thread take responsibility for passing replies not
|
||||
# intended for itself to the appropriate thread.
|
||||
#
|
||||
timer = rpc_common.DecayingTimer(duration=timeout)
|
||||
timer.start()
|
||||
final_reply = None
|
||||
while True:
|
||||
if self.conn_lock.acquire(False):
|
||||
|
@ -282,7 +290,7 @@ class ReplyWaiter(object):
|
|||
|
||||
# Now actually poll the connection
|
||||
while True:
|
||||
reply, ending = self._poll_connection(msg_id, timeout)
|
||||
reply, ending = self._poll_connection(msg_id, timer)
|
||||
if not ending:
|
||||
final_reply = reply
|
||||
else:
|
||||
|
@ -295,7 +303,7 @@ class ReplyWaiter(object):
|
|||
self.waiters.wake_all(msg_id)
|
||||
else:
|
||||
# We're going to wait for the first thread to pass us our reply
|
||||
reply, ending, trylock = self._poll_queue(msg_id, timeout)
|
||||
reply, ending, trylock = self._poll_queue(msg_id, timer)
|
||||
if trylock:
|
||||
# The first thread got its reply, let's try and take over
|
||||
# the responsibility for polling
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
import copy
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
|
||||
import six
|
||||
|
@ -347,3 +348,27 @@ def deserialize_msg(msg):
|
|||
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
|
||||
|
||||
return raw_msg
|
||||
|
||||
|
||||
class DecayingTimer(object):
|
||||
def __init__(self, duration=None):
|
||||
self._duration = duration
|
||||
self._ends_at = None
|
||||
|
||||
def start(self):
|
||||
if self._duration is not None:
|
||||
self._ends_at = time.time() + max(0, self._duration)
|
||||
|
||||
def check_return(self, timeout_callback, *args, **kwargs):
|
||||
if self._duration is None:
|
||||
return None
|
||||
if self._ends_at is None:
|
||||
raise RuntimeError(_("Can not check/return a timeout from a timer"
|
||||
" that has not been started."))
|
||||
|
||||
maximum = kwargs.pop('maximum', None)
|
||||
left = self._ends_at - time.time()
|
||||
if left <= 0:
|
||||
timeout_callback(*args, **kwargs)
|
||||
|
||||
return left if maximum is None else min(left, maximum)
|
||||
|
|
|
@ -719,14 +719,18 @@ class Connection(object):
|
|||
def iterconsume(self, limit=None, timeout=None):
|
||||
"""Return an iterator that will consume from all queues/consumers."""
|
||||
|
||||
timer = rpc_common.DecayingTimer(duration=timeout)
|
||||
timer.start()
|
||||
|
||||
def _raise_timeout(exc):
|
||||
LOG.debug('Timed out waiting for RPC response: %s', exc)
|
||||
raise rpc_common.Timeout()
|
||||
|
||||
def _error_callback(exc):
|
||||
if isinstance(exc, socket.timeout):
|
||||
LOG.debug('Timed out waiting for RPC response: %s', exc)
|
||||
raise rpc_common.Timeout()
|
||||
else:
|
||||
LOG.exception(_('Failed to consume message from queue: %s'),
|
||||
exc)
|
||||
self.do_consume = True
|
||||
timer.check_return(_raise_timeout, exc)
|
||||
LOG.exception(_('Failed to consume message from queue: %s'),
|
||||
exc)
|
||||
self.do_consume = True
|
||||
|
||||
def _consume():
|
||||
if self.do_consume:
|
||||
|
@ -736,7 +740,14 @@ class Connection(object):
|
|||
queue.consume(nowait=True)
|
||||
queues_tail.consume(nowait=False)
|
||||
self.do_consume = False
|
||||
return self.connection.drain_events(timeout=timeout)
|
||||
|
||||
poll_timeout = 1 if timeout is None else min(timeout, 1)
|
||||
while True:
|
||||
try:
|
||||
return self.connection.drain_events(timeout=poll_timeout)
|
||||
except socket.timeout as exc:
|
||||
poll_timeout = timer.check_return(_raise_timeout, exc,
|
||||
maximum=1)
|
||||
|
||||
for iteration in itertools.count(0):
|
||||
if limit and iteration >= limit:
|
||||
|
|
Loading…
Reference in New Issue