diff --git a/oslo_messaging/_drivers/amqp1_driver/drivertasks.py b/oslo_messaging/_drivers/amqp1_driver/drivertasks.py index c36cce597..74e3d3f61 100644 --- a/oslo_messaging/_drivers/amqp1_driver/drivertasks.py +++ b/oslo_messaging/_drivers/amqp1_driver/drivertasks.py @@ -94,11 +94,10 @@ class ListenTask(controller.Task): class ReplyTask(controller.Task): """A task that sends 'response' message to 'address'. """ - def __init__(self, address, response, log_failure): + def __init__(self, address, response): super(ReplyTask, self).__init__() self._address = address self._response = response - self._log_failure = log_failure self._wakeup = threading.Event() def wait(self): diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index ed94bccc5..f02159222 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -50,14 +50,13 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): self.stopwatch = timeutils.StopWatch() self.stopwatch.start() - def _send_reply(self, conn, reply=None, failure=None, log_failure=True): + def _send_reply(self, conn, reply=None, failure=None): if not self._obsolete_reply_queues.reply_q_valid(self.reply_q, self.msg_id): return if failure: - failure = rpc_common.serialize_remote_exception(failure, - log_failure) + failure = rpc_common.serialize_remote_exception(failure) # NOTE(sileht): ending can be removed in N*, see Listener.wait() # for more detail. msg = {'result': reply, 'failure': failure, 'ending': True, @@ -74,7 +73,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): 'elapsed': self.stopwatch.elapsed()}) conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): if not self.msg_id: # NOTE(Alexei_987) not sending reply, if msg_id is empty # because reply should not be expected by caller side @@ -96,8 +95,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): try: with self.listener.driver._get_connection( rpc_common.PURPOSE_SEND) as conn: - self._send_reply(conn, reply, failure, - log_failure=log_failure) + self._send_reply(conn, reply, failure) return except rpc_amqp.AMQPDestinationNotFound: if timer.check_return() > 0: diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index 78a379284..d21320998 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -92,7 +92,7 @@ class IncomingMessage(object): class RpcIncomingMessage(IncomingMessage): @abc.abstractmethod - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Send a reply or failure back to the client.""" diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py index 7b446d74d..7ac1e4dff 100644 --- a/oslo_messaging/_drivers/common.py +++ b/oslo_messaging/_drivers/common.py @@ -162,18 +162,15 @@ class Connection(object): raise NotImplementedError() -def serialize_remote_exception(failure_info, log_failure=True): +def serialize_remote_exception(failure_info): """Prepares exception data to be sent over rpc. Failure_info should be a sys.exc_info() tuple. """ tb = traceback.format_exception(*failure_info) + failure = failure_info[1] - if log_failure: - LOG.error(_LE("Returning exception %s to caller"), - six.text_type(failure)) - LOG.error(tb) kwargs = {} if hasattr(failure, 'kwargs'): diff --git a/oslo_messaging/_drivers/impl_amqp1.py b/oslo_messaging/_drivers/impl_amqp1.py index 792152cf0..46fce39cd 100644 --- a/oslo_messaging/_drivers/impl_amqp1.py +++ b/oslo_messaging/_drivers/impl_amqp1.py @@ -98,13 +98,13 @@ class ProtonIncomingMessage(base.RpcIncomingMessage): self._reply_to = message.reply_to self._correlation_id = message.id - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Schedule a ReplyTask to send the reply.""" if self._reply_to: response = marshal_response(reply=reply, failure=failure) response.correlation_id = self._correlation_id LOG.debug("Replying to %s", self._correlation_id) - task = drivertasks.ReplyTask(self._reply_to, response, log_failure) + task = drivertasks.ReplyTask(self._reply_to, response) self.listener.driver._ctrl.add_task(task) else: LOG.debug("Ignoring reply as no reply address available") diff --git a/oslo_messaging/_drivers/impl_fake.py b/oslo_messaging/_drivers/impl_fake.py index f35980493..f25e8b7d1 100644 --- a/oslo_messaging/_drivers/impl_fake.py +++ b/oslo_messaging/_drivers/impl_fake.py @@ -30,7 +30,7 @@ class FakeIncomingMessage(base.RpcIncomingMessage): self.requeue_callback = requeue self._reply_q = reply_q - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): if self._reply_q: failure = failure[1] if failure else None self._reply_q.put((reply, failure)) diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 7264d5fda..b2150f198 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -240,7 +240,7 @@ class OsloKafkaMessage(base.RpcIncomingMessage): def requeue(self): LOG.warning(_LW("requeue is not supported")) - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): LOG.warning(_LW("reply is not supported")) diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py index 3ca4b0104..2802bedb1 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -175,13 +175,11 @@ class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage): self.reply_q = properties.reply_to self.msg_id = properties.correlation_id - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Send back reply to the RPC client :param reply: Dictionary, reply. In case of exception should be None :param failure: Tuple, should be a sys.exc_info() tuple. Should be None if RPC request was successfully processed. - :param log_failure: Boolean, not used in this implementation. - It present here to be compatible with driver API :return RpcReplyPikaIncomingMessage, message with reply """ diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py index 9342bafb0..b6a7b7563 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py @@ -18,8 +18,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_names class Response(object): def __init__(self, id=None, type=None, message_id=None, - reply_id=None, reply_body=None, - failure=None, log_failure=None): + reply_id=None, reply_body=None, failure=None): self._id = id self._type = type @@ -27,7 +26,6 @@ class Response(object): self._reply_id = reply_id self._reply_body = reply_body self._failure = failure - self._log_failure = log_failure @property def id_(self): @@ -53,18 +51,13 @@ class Response(object): def failure(self): return self._failure - @property - def log_failure(self): - return self._log_failure - def to_dict(self): return {zmq_names.FIELD_ID: self._id, zmq_names.FIELD_TYPE: self._type, zmq_names.FIELD_MSG_ID: self._message_id, zmq_names.FIELD_REPLY_ID: self._reply_id, zmq_names.FIELD_REPLY: self._reply_body, - zmq_names.FIELD_FAILURE: self._failure, - zmq_names.FIELD_LOG_FAILURE: self._log_failure} + zmq_names.FIELD_FAILURE: self._failure} def __str__(self): return str(self.to_dict()) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index 6fbebf63f..f1f5d6018 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -37,7 +37,7 @@ class DealerIncomingMessage(base.RpcIncomingMessage): def __init__(self, context, message): super(DealerIncomingMessage, self).__init__(context, message) - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Reply is not needed for non-call messages""" def acknowledge(self): @@ -55,16 +55,14 @@ class DealerIncomingRequest(base.RpcIncomingMessage): self.reply_id = reply_id self.message_id = message_id - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): if failure is not None: - failure = rpc_common.serialize_remote_exception(failure, - log_failure) + failure = rpc_common.serialize_remote_exception(failure) response = zmq_response.Response(type=zmq_names.REPLY_TYPE, message_id=self.message_id, reply_id=self.reply_id, reply_body=reply, - failure=failure, - log_failure=log_failure) + failure=failure) LOG.debug("Replying %s", self.message_id) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py index a69602982..719c24e4e 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py @@ -31,7 +31,7 @@ class PullIncomingMessage(base.RpcIncomingMessage): def __init__(self, context, message): super(PullIncomingMessage, self).__init__(context, message) - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Reply is not needed for non-call messages.""" def acknowledge(self): diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 8eb3dad7a..da487f5c9 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -37,7 +37,7 @@ class RouterIncomingMessage(base.RpcIncomingMessage): self.msg_id = msg_id self.message = message - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Reply is not needed for non-call messages""" def acknowledge(self): diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py index 0d1c5213e..6aa8ec4eb 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py @@ -34,7 +34,7 @@ class SubIncomingMessage(base.RpcIncomingMessage): def __init__(self, context, message): super(SubIncomingMessage, self).__init__(context, message) - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Reply is not needed for non-call messages.""" def acknowledge(self): diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index e3bd186d2..2dc8ec309 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -39,16 +39,14 @@ class ZmqIncomingRequest(base.RpcIncomingMessage): self.received = None self.poller = poller - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): if failure is not None: - failure = rpc_common.serialize_remote_exception(failure, - log_failure) + failure = rpc_common.serialize_remote_exception(failure) response = zmq_response.Response(type=zmq_names.REPLY_TYPE, message_id=self.request.message_id, reply_id=self.reply_id, reply_body=reply, - failure=failure, - log_failure=log_failure) + failure=failure) LOG.debug("Replying %s", (str(self.request.message_id))) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index ae477e6df..51f68c6e8 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -20,7 +20,6 @@ zmq = zmq_async.import_zmq() FIELD_TYPE = 'type' FIELD_FAILURE = 'failure' FIELD_REPLY = 'reply' -FIELD_LOG_FAILURE = 'log_failure' FIELD_ID = 'id' FIELD_MSG_ID = 'message_id' FIELD_MSG_TYPE = 'msg_type' diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index 2fbdda77f..c51ffb92c 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -132,15 +132,14 @@ class RPCServer(msg_server.MessageHandlingServer): try: res = self.dispatcher.dispatch(message) except rpc_dispatcher.ExpectedException as e: - LOG.debug(u'Expected exception during message handling (%s)', - e.exc_info[1]) failure = e.exc_info - except Exception as e: + LOG.debug(u'Expected exception during message handling (%s)', e) + except Exception: # current sys.exc_info() content can be overriden - # by another exception raise by a log handler during + # by another exception raised by a log handler during # LOG.exception(). So keep a copy and delete it later. failure = sys.exc_info() - LOG.exception(_LE('Exception during handling message')) + LOG.exception(_LE('Exception during message handling')) try: if failure is None: diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index e4f86917f..eb686f7f3 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -451,14 +451,6 @@ class TestSendReceive(test_utils.BaseTestCase): senders = [] replies = [] msgs = [] - errors = [] - - def stub_error(msg, *a, **kw): - if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): - a = a[0] - errors.append(str(msg) % a) - - self.stubs.Set(driver_common.LOG, 'error', stub_error) def send_and_wait_for_reply(i): try: @@ -500,8 +492,7 @@ class TestSendReceive(test_utils.BaseTestCase): raise ZeroDivisionError except Exception: failure = sys.exc_info() - msgs[i].reply(failure=failure, - log_failure=not self.expected) + msgs[i].reply(failure=failure) elif self.rx_id: msgs[i].reply({'rx_id': i}) else: @@ -519,11 +510,6 @@ class TestSendReceive(test_utils.BaseTestCase): else: self.assertEqual(self.reply, reply) - if not self.timeout and self.failure and not self.expected: - self.assertTrue(len(errors) > 0, errors) - else: - self.assertEqual(0, len(errors), errors) - TestSendReceive.generate_scenarios() diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index 9d0cb6000..62a547f20 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -21,6 +21,7 @@ import testscenarios import mock import oslo_messaging +from oslo_messaging.rpc import server as rpc_server_module from oslo_messaging import server as server_module from oslo_messaging.tests import utils as test_utils @@ -326,6 +327,22 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): def ping(self, ctxt, arg): raise ValueError(arg) + debugs = [] + errors = [] + + def stub_debug(msg, *a, **kw): + if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): + a = a[0] + debugs.append(str(msg) % a) + + def stub_error(msg, *a, **kw): + if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): + a = a[0] + errors.append(str(msg) % a) + + self.stubs.Set(rpc_server_module.LOG, 'debug', stub_debug) + self.stubs.Set(rpc_server_module.LOG, 'error', stub_error) + server_thread = self._setup_server(transport, TestEndpoint()) client = self._setup_client(transport) @@ -334,6 +351,8 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): except Exception as ex: self.assertIsInstance(ex, ValueError) self.assertEqual('dsfoo', str(ex)) + self.assertTrue(len(debugs) == 0) + self.assertTrue(len(errors) > 0) else: self.assertTrue(False) @@ -342,6 +361,22 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): def test_expected_failure(self): transport = oslo_messaging.get_transport(self.conf, url='fake:') + debugs = [] + errors = [] + + def stub_debug(msg, *a, **kw): + if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): + a = a[0] + debugs.append(str(msg) % a) + + def stub_error(msg, *a, **kw): + if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): + a = a[0] + errors.append(str(msg) % a) + + self.stubs.Set(rpc_server_module.LOG, 'debug', stub_debug) + self.stubs.Set(rpc_server_module.LOG, 'error', stub_error) + class TestEndpoint(object): @oslo_messaging.expected_exceptions(ValueError) def ping(self, ctxt, arg): @@ -355,6 +390,8 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): except Exception as ex: self.assertIsInstance(ex, ValueError) self.assertEqual('dsfoo', str(ex)) + self.assertTrue(len(debugs) > 0) + self.assertTrue(len(errors) == 0) else: self.assertTrue(False) diff --git a/oslo_messaging/tests/test_exception_serialization.py b/oslo_messaging/tests/test_exception_serialization.py index c1079c0a4..ca4f92b02 100644 --- a/oslo_messaging/tests/test_exception_serialization.py +++ b/oslo_messaging/tests/test_exception_serialization.py @@ -61,11 +61,6 @@ def add_remote_postfix(ex): class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase): - _log_failure = [ - ('log_failure', dict(log_failure=True)), - ('do_not_log_failure', dict(log_failure=False)), - ] - _add_remote = [ ('add_remote', dict(add_remote=True)), ('do_not_add_remote', dict(add_remote=False)), @@ -100,27 +95,19 @@ class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase): @classmethod def generate_scenarios(cls): - cls.scenarios = testscenarios.multiply_scenarios(cls._log_failure, - cls._add_remote, + cls.scenarios = testscenarios.multiply_scenarios(cls._add_remote, cls._exception_types) def setUp(self): super(SerializeRemoteExceptionTestCase, self).setUp() def test_serialize_remote_exception(self): - errors = [] - - def stub_error(msg, *a, **kw): - if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): - a = a[0] - errors.append(str(msg) % a) - - self.stubs.Set(exceptions.LOG, 'error', stub_error) - try: try: raise self.cls(*self.args, **self.kwargs) except Exception as ex: + # Note: in Python 3 ex variable will be cleared at the end of + # the except clause, so explicitly make an extra copy of it cls_error = ex if self.add_remote: ex = add_remote_postfix(ex) @@ -128,8 +115,7 @@ class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase): except Exception: exc_info = sys.exc_info() - serialized = exceptions.serialize_remote_exception( - exc_info, log_failure=self.log_failure) + serialized = exceptions.serialize_remote_exception(exc_info) failure = jsonutils.loads(serialized) @@ -143,11 +129,6 @@ class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase): tb = cls_error.__class__.__name__ + ': ' + self.msg self.assertIn(tb, ''.join(failure['tb'])) - if self.log_failure: - self.assertTrue(len(errors) > 0, errors) - else: - self.assertEqual(0, len(errors), errors) - SerializeRemoteExceptionTestCase.generate_scenarios()