diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index e4d0617d9..47967d71c 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -167,8 +167,34 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): 'duration': duration}) return + def heartbeat(self): + with self.listener.driver._get_connection( + rpc_common.PURPOSE_SEND) as conn: + self._send_reply(conn, None, None, ending=False) + + # NOTE(sileht): Those have already be ack in RpcListener IO thread + # We keep them as noop until all drivers do the same def acknowledge(self): - self._message_operations_handler.do(self.message.acknowledge) + pass + + def requeue(self): + pass + + +class NotificationAMQPIncomingMessage(AMQPIncomingMessage): + def acknowledge(self): + def _do_ack(): + try: + self.message.acknowledge() + except Exception as exc: + # NOTE(kgiusti): this failure is likely due to a loss of the + # connection to the broker. Not much we can do in this case, + # especially considering the Notification has already been + # dispatched. This *could* result in message duplication + # (unacked msg is returned to the queue by the broker), but the + # driver tries to catch that using the msg_id_cache. + LOG.warning("Failed to acknowledge received message: %s", exc) + self._message_operations_handler.do(_do_ack) self.listener.msg_id_cache.add(self.unique_id) def requeue(self): @@ -178,12 +204,12 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): # msg_id_cache, the message will be reconsumed, the only difference is # the message stay at the beginning of the queue instead of moving to # the end. - self._message_operations_handler.do(self.message.requeue) - - def heartbeat(self): - with self.listener.driver._get_connection( - rpc_common.PURPOSE_SEND) as conn: - self._send_reply(conn, None, None, ending=False) + def _do_requeue(): + try: + self.message.requeue() + except Exception as exc: + LOG.warning("Failed to requeue received message: %s", exc) + self._message_operations_handler.do(_do_requeue) class ObsoleteReplyQueuesCache(object): @@ -256,7 +282,7 @@ class AMQPListener(base.PollStyleListener): else: LOG.debug("received message with unique_id: %s", unique_id) - self.incoming.append(AMQPIncomingMessage( + self.incoming.append(self.message_cls( self, ctxt.to_dict(), message, @@ -319,6 +345,41 @@ class AMQPListener(base.PollStyleListener): self.conn.close() +class RpcAMQPListener(AMQPListener): + message_cls = AMQPIncomingMessage + + def __call__(self, message): + # NOTE(kgiusti): In the original RPC implementation the RPC server + # would acknowledge the request THEN process it. The goal of this was + # to prevent duplication if the ack failed. Should the ack fail the + # request would be discarded since the broker would not remove the + # request from the queue since no ack was received. That would lead to + # the request being redelivered at some point. However this approach + # meant that the ack was issued from the dispatch thread, not the + # consumer thread, which is bad since kombu is not thread safe. So a + # change was made to schedule the ack to be sent on the consumer thread + # - breaking the ability to catch ack errors before dispatching the + # request. To fix this we do the actual ack here in the consumer + # callback and avoid the upcall if the ack fails. See + # https://bugs.launchpad.net/oslo.messaging/+bug/1695746 + # for all the gory details... + try: + message.acknowledge() + except Exception as exc: + LOG.warning("Discarding RPC request due to failed acknowlege: %s", + exc) + else: + # NOTE(kgiusti): be aware that even if the acknowledge call + # succeeds there is no guarantee the broker actually gets the ACK + # since acknowledge() simply writes the ACK to the socket (there is + # no ACK confirmation coming back from the broker) + super(RpcAMQPListener, self).__call__(message) + + +class NotificationAMQPListener(AMQPListener): + message_cls = NotificationAMQPIncomingMessage + + class ReplyWaiters(object): WAKE_UP = object() @@ -590,7 +651,7 @@ class AMQPDriverBase(base.BaseDriver): def listen(self, target, batch_size, batch_timeout): conn = self._get_connection(rpc_common.PURPOSE_LISTEN) - listener = AMQPListener(self, conn) + listener = RpcAMQPListener(self, conn) conn.declare_topic_consumer(exchange_name=self._get_exchange(target), topic=target.topic, @@ -608,7 +669,7 @@ class AMQPDriverBase(base.BaseDriver): batch_size, batch_timeout): conn = self._get_connection(rpc_common.PURPOSE_LISTEN) - listener = AMQPListener(self, conn) + listener = NotificationAMQPListener(self, conn) for target, priority in targets_and_priorities: conn.declare_topic_consumer( exchange_name=self._get_exchange(target), diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index 18d5869d3..9dac110cc 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -152,6 +152,9 @@ class RPCServer(msg_server.MessageHandlingServer): def _process_incoming(self, incoming): message = incoming[0] + + # TODO(sileht): We should remove that at some point and do + # this directly in the driver try: message.acknowledge() except Exception: