From a0648f070315b359fcf0a408eb316d9ac27adcc0 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Tue, 10 Jul 2018 12:03:03 -0400 Subject: [PATCH] Issue blocking ACK for RPC requests from the consumer thread The patch for https://review.openstack.org/#/c/436958/ fixed a threading problem by moving the ack back to the polling thread. However the RPC server expects to catch any failures of the ACK and abort the request. This patch adds the ACK error handling back to the polling thread. This patch is based heavily off the original work done by Mehdi Abaakouk (sileht). Change-Id: I708c3d6676b974d8daac6817c15f596cdf35817b Closes-Bug: #1695746 (cherry picked from commit 26b0be585a0b681d011edcb688750770bcdae199) --- oslo_messaging/_drivers/amqpdriver.py | 81 +++++++++++++++++++++++---- oslo_messaging/rpc/server.py | 3 + 2 files changed, 74 insertions(+), 10 deletions(-) diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index e4d0617d9..47967d71c 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -167,8 +167,34 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): 'duration': duration}) return + def heartbeat(self): + with self.listener.driver._get_connection( + rpc_common.PURPOSE_SEND) as conn: + self._send_reply(conn, None, None, ending=False) + + # NOTE(sileht): Those have already be ack in RpcListener IO thread + # We keep them as noop until all drivers do the same def acknowledge(self): - self._message_operations_handler.do(self.message.acknowledge) + pass + + def requeue(self): + pass + + +class NotificationAMQPIncomingMessage(AMQPIncomingMessage): + def acknowledge(self): + def _do_ack(): + try: + self.message.acknowledge() + except Exception as exc: + # NOTE(kgiusti): this failure is likely due to a loss of the + # connection to the broker. Not much we can do in this case, + # especially considering the Notification has already been + # dispatched. This *could* result in message duplication + # (unacked msg is returned to the queue by the broker), but the + # driver tries to catch that using the msg_id_cache. + LOG.warning("Failed to acknowledge received message: %s", exc) + self._message_operations_handler.do(_do_ack) self.listener.msg_id_cache.add(self.unique_id) def requeue(self): @@ -178,12 +204,12 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): # msg_id_cache, the message will be reconsumed, the only difference is # the message stay at the beginning of the queue instead of moving to # the end. - self._message_operations_handler.do(self.message.requeue) - - def heartbeat(self): - with self.listener.driver._get_connection( - rpc_common.PURPOSE_SEND) as conn: - self._send_reply(conn, None, None, ending=False) + def _do_requeue(): + try: + self.message.requeue() + except Exception as exc: + LOG.warning("Failed to requeue received message: %s", exc) + self._message_operations_handler.do(_do_requeue) class ObsoleteReplyQueuesCache(object): @@ -256,7 +282,7 @@ class AMQPListener(base.PollStyleListener): else: LOG.debug("received message with unique_id: %s", unique_id) - self.incoming.append(AMQPIncomingMessage( + self.incoming.append(self.message_cls( self, ctxt.to_dict(), message, @@ -319,6 +345,41 @@ class AMQPListener(base.PollStyleListener): self.conn.close() +class RpcAMQPListener(AMQPListener): + message_cls = AMQPIncomingMessage + + def __call__(self, message): + # NOTE(kgiusti): In the original RPC implementation the RPC server + # would acknowledge the request THEN process it. The goal of this was + # to prevent duplication if the ack failed. Should the ack fail the + # request would be discarded since the broker would not remove the + # request from the queue since no ack was received. That would lead to + # the request being redelivered at some point. However this approach + # meant that the ack was issued from the dispatch thread, not the + # consumer thread, which is bad since kombu is not thread safe. So a + # change was made to schedule the ack to be sent on the consumer thread + # - breaking the ability to catch ack errors before dispatching the + # request. To fix this we do the actual ack here in the consumer + # callback and avoid the upcall if the ack fails. See + # https://bugs.launchpad.net/oslo.messaging/+bug/1695746 + # for all the gory details... + try: + message.acknowledge() + except Exception as exc: + LOG.warning("Discarding RPC request due to failed acknowlege: %s", + exc) + else: + # NOTE(kgiusti): be aware that even if the acknowledge call + # succeeds there is no guarantee the broker actually gets the ACK + # since acknowledge() simply writes the ACK to the socket (there is + # no ACK confirmation coming back from the broker) + super(RpcAMQPListener, self).__call__(message) + + +class NotificationAMQPListener(AMQPListener): + message_cls = NotificationAMQPIncomingMessage + + class ReplyWaiters(object): WAKE_UP = object() @@ -590,7 +651,7 @@ class AMQPDriverBase(base.BaseDriver): def listen(self, target, batch_size, batch_timeout): conn = self._get_connection(rpc_common.PURPOSE_LISTEN) - listener = AMQPListener(self, conn) + listener = RpcAMQPListener(self, conn) conn.declare_topic_consumer(exchange_name=self._get_exchange(target), topic=target.topic, @@ -608,7 +669,7 @@ class AMQPDriverBase(base.BaseDriver): batch_size, batch_timeout): conn = self._get_connection(rpc_common.PURPOSE_LISTEN) - listener = AMQPListener(self, conn) + listener = NotificationAMQPListener(self, conn) for target, priority in targets_and_priorities: conn.declare_topic_consumer( exchange_name=self._get_exchange(target), diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index 18d5869d3..9dac110cc 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -152,6 +152,9 @@ class RPCServer(msg_server.MessageHandlingServer): def _process_incoming(self, incoming): message = incoming[0] + + # TODO(sileht): We should remove that at some point and do + # this directly in the driver try: message.acknowledge() except Exception: