Make AMQP based RPC consumer threads more robust

This is a backport from oslo-incubator,
commit 22ec8ff616a799085239e3e529daeeefea6366c4:

bug 1189711 Should RPC consume_in_thread() be more fault tolerant?

There are unprotected holes in the thread kicked off by RPC
consume_in_thread such that an exception will kill the thread.
This exists for both the service (TopicConsumer) and the new
reply proxy (DirectConsumer) consumers. This patch plugs
those holes as close to the base of the consumer thread as
possible by catching all non-caught exceptions and retrying
with sleeps between retries and some pacing of the log
output to prevent log flooding.

Change-Id: Ic7b286933a51d070e73eae452b93f8a011eb05c0
This commit is contained in:
Jeff Peeler 2014-01-15 11:06:12 -05:00
parent 6a044107e6
commit 7da5956a9b
3 changed files with 35 additions and 0 deletions

View File

@ -22,6 +22,7 @@ Exception related utilities.
import contextlib
import logging
import sys
import time
import traceback
from heat.openstack.common.gettextutils import _
@ -49,3 +50,33 @@ def save_and_reraise_exception():
traceback.format_exception(type_, value, tb))
raise
raise type_, value, tb
def forever_retry_uncaught_exceptions(infunc):
def inner_func(*args, **kwargs):
last_log_time = 0
last_exc_message = None
exc_count = 0
while True:
try:
return infunc(*args, **kwargs)
except Exception as exc:
if exc.message == last_exc_message:
exc_count += 1
else:
exc_count = 1
# Do not log any more frequently than once a minute unless
# the exception message changes
cur_time = int(time.time())
if (cur_time - last_log_time > 60 or
exc.message != last_exc_message):
logging.exception(
_('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count)
last_log_time = cur_time
last_exc_message = exc.message
exc_count = 0
# This should be a very rare event. In case it isn't, do
# a sleep.
time.sleep(1)
return inner_func

View File

@ -30,6 +30,7 @@ import kombu.entity
import kombu.messaging
from oslo.config import cfg
from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _
from heat.openstack.common import network_utils
from heat.openstack.common.rpc import amqp as rpc_amqp
@ -723,6 +724,7 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()

View File

@ -24,6 +24,7 @@ import eventlet
import greenlet
from oslo.config import cfg
from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
@ -541,6 +542,7 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()