Merge "Make AMQP based RPC consumer threads more robust" into stable/grizzly

This commit is contained in:
Jenkins 2014-02-01 03:12:55 +00:00 committed by Gerrit Code Review
commit 8f400c4845
3 changed files with 35 additions and 0 deletions

View File

@ -22,6 +22,7 @@ Exception related utilities.
import contextlib
import logging
import sys
import time
import traceback
from heat.openstack.common.gettextutils import _
@ -49,3 +50,33 @@ def save_and_reraise_exception():
traceback.format_exception(type_, value, tb))
raise
raise type_, value, tb
def forever_retry_uncaught_exceptions(infunc):
def inner_func(*args, **kwargs):
last_log_time = 0
last_exc_message = None
exc_count = 0
while True:
try:
return infunc(*args, **kwargs)
except Exception as exc:
if exc.message == last_exc_message:
exc_count += 1
else:
exc_count = 1
# Do not log any more frequently than once a minute unless
# the exception message changes
cur_time = int(time.time())
if (cur_time - last_log_time > 60 or
exc.message != last_exc_message):
logging.exception(
_('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count)
last_log_time = cur_time
last_exc_message = exc.message
exc_count = 0
# This should be a very rare event. In case it isn't, do
# a sleep.
time.sleep(1)
return inner_func

View File

@ -30,6 +30,7 @@ import kombu.entity
import kombu.messaging
from oslo.config import cfg
from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _
from heat.openstack.common import network_utils
from heat.openstack.common.rpc import amqp as rpc_amqp
@ -723,6 +724,7 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()

View File

@ -24,6 +24,7 @@ import eventlet
import greenlet
from oslo.config import cfg
from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
@ -541,6 +542,7 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()