From 7da5956a9b6c3b37ae27667a03a61f228f0ce21f Mon Sep 17 00:00:00 2001 From: Jeff Peeler Date: Wed, 15 Jan 2014 11:06:12 -0500 Subject: [PATCH] Make AMQP based RPC consumer threads more robust This is a backport from oslo-incubator, commit 22ec8ff616a799085239e3e529daeeefea6366c4: bug 1189711 Should RPC consume_in_thread() be more fault tolerant? There are unprotected holes in the thread kicked off by RPC consume_in_thread such that an exception will kill the thread. This exists for both the service (TopicConsumer) and the new reply proxy (DirectConsumer) consumers. This patch plugs those holes as close to the base of the consumer thread as possible by catching all non-caught exceptions and retrying with sleeps between retries and some pacing of the log output to prevent log flooding. Change-Id: Ic7b286933a51d070e73eae452b93f8a011eb05c0 --- heat/openstack/common/excutils.py | 31 +++++++++++++++++++++++++ heat/openstack/common/rpc/impl_kombu.py | 2 ++ heat/openstack/common/rpc/impl_qpid.py | 2 ++ 3 files changed, 35 insertions(+) diff --git a/heat/openstack/common/excutils.py b/heat/openstack/common/excutils.py index 890b466a75..6438866959 100644 --- a/heat/openstack/common/excutils.py +++ b/heat/openstack/common/excutils.py @@ -22,6 +22,7 @@ Exception related utilities. import contextlib import logging import sys +import time import traceback from heat.openstack.common.gettextutils import _ @@ -49,3 +50,33 @@ def save_and_reraise_exception(): traceback.format_exception(type_, value, tb)) raise raise type_, value, tb + + +def forever_retry_uncaught_exceptions(infunc): + def inner_func(*args, **kwargs): + last_log_time = 0 + last_exc_message = None + exc_count = 0 + while True: + try: + return infunc(*args, **kwargs) + except Exception as exc: + if exc.message == last_exc_message: + exc_count += 1 + else: + exc_count = 1 + # Do not log any more frequently than once a minute unless + # the exception message changes + cur_time = int(time.time()) + if (cur_time - last_log_time > 60 or + exc.message != last_exc_message): + logging.exception( + _('Unexpected exception occurred %d time(s)... ' + 'retrying.') % exc_count) + last_log_time = cur_time + last_exc_message = exc.message + exc_count = 0 + # This should be a very rare event. In case it isn't, do + # a sleep. + time.sleep(1) + return inner_func diff --git a/heat/openstack/common/rpc/impl_kombu.py b/heat/openstack/common/rpc/impl_kombu.py index 886fbcfee4..77e8e99789 100644 --- a/heat/openstack/common/rpc/impl_kombu.py +++ b/heat/openstack/common/rpc/impl_kombu.py @@ -30,6 +30,7 @@ import kombu.entity import kombu.messaging from oslo.config import cfg +from heat.openstack.common import excutils from heat.openstack.common.gettextutils import _ from heat.openstack.common import network_utils from heat.openstack.common.rpc import amqp as rpc_amqp @@ -723,6 +724,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() diff --git a/heat/openstack/common/rpc/impl_qpid.py b/heat/openstack/common/rpc/impl_qpid.py index 88c531abff..5a688b8505 100644 --- a/heat/openstack/common/rpc/impl_qpid.py +++ b/heat/openstack/common/rpc/impl_qpid.py @@ -24,6 +24,7 @@ import eventlet import greenlet from oslo.config import cfg +from heat.openstack.common import excutils from heat.openstack.common.gettextutils import _ from heat.openstack.common import importutils from heat.openstack.common import jsonutils @@ -541,6 +542,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume()