Merge "Make AMQP based RPC consumer threads more robust" into stable/grizzly
This commit is contained in:
commit
8f400c4845
|
@ -22,6 +22,7 @@ Exception related utilities.
|
|||
import contextlib
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from heat.openstack.common.gettextutils import _
|
||||
|
@ -49,3 +50,33 @@ def save_and_reraise_exception():
|
|||
traceback.format_exception(type_, value, tb))
|
||||
raise
|
||||
raise type_, value, tb
|
||||
|
||||
|
||||
def forever_retry_uncaught_exceptions(infunc):
|
||||
def inner_func(*args, **kwargs):
|
||||
last_log_time = 0
|
||||
last_exc_message = None
|
||||
exc_count = 0
|
||||
while True:
|
||||
try:
|
||||
return infunc(*args, **kwargs)
|
||||
except Exception as exc:
|
||||
if exc.message == last_exc_message:
|
||||
exc_count += 1
|
||||
else:
|
||||
exc_count = 1
|
||||
# Do not log any more frequently than once a minute unless
|
||||
# the exception message changes
|
||||
cur_time = int(time.time())
|
||||
if (cur_time - last_log_time > 60 or
|
||||
exc.message != last_exc_message):
|
||||
logging.exception(
|
||||
_('Unexpected exception occurred %d time(s)... '
|
||||
'retrying.') % exc_count)
|
||||
last_log_time = cur_time
|
||||
last_exc_message = exc.message
|
||||
exc_count = 0
|
||||
# This should be a very rare event. In case it isn't, do
|
||||
# a sleep.
|
||||
time.sleep(1)
|
||||
return inner_func
|
||||
|
|
|
@ -30,6 +30,7 @@ import kombu.entity
|
|||
import kombu.messaging
|
||||
from oslo.config import cfg
|
||||
|
||||
from heat.openstack.common import excutils
|
||||
from heat.openstack.common.gettextutils import _
|
||||
from heat.openstack.common import network_utils
|
||||
from heat.openstack.common.rpc import amqp as rpc_amqp
|
||||
|
@ -723,6 +724,7 @@ class Connection(object):
|
|||
|
||||
def consume_in_thread(self):
|
||||
"""Consumer from all queues/consumers in a greenthread"""
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _consumer_thread():
|
||||
try:
|
||||
self.consume()
|
||||
|
|
|
@ -24,6 +24,7 @@ import eventlet
|
|||
import greenlet
|
||||
from oslo.config import cfg
|
||||
|
||||
from heat.openstack.common import excutils
|
||||
from heat.openstack.common.gettextutils import _
|
||||
from heat.openstack.common import importutils
|
||||
from heat.openstack.common import jsonutils
|
||||
|
@ -541,6 +542,7 @@ class Connection(object):
|
|||
|
||||
def consume_in_thread(self):
|
||||
"""Consumer from all queues/consumers in a greenthread"""
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _consumer_thread():
|
||||
try:
|
||||
self.consume()
|
||||
|
|
Loading…
Reference in New Issue