Merge "Issue blocking ACK for RPC requests from the consumer thread"

This commit is contained in:
Zuul 2018-07-30 15:48:21 +00:00 committed by Gerrit Code Review
commit 01a37733eb
2 changed files with 74 additions and 10 deletions

View File

@ -167,8 +167,34 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
'duration': duration})
return
def heartbeat(self):
with self.listener.driver._get_connection(
rpc_common.PURPOSE_SEND) as conn:
self._send_reply(conn, None, None, ending=False)
# NOTE(sileht): Those have already be ack in RpcListener IO thread
# We keep them as noop until all drivers do the same
def acknowledge(self):
self._message_operations_handler.do(self.message.acknowledge)
pass
def requeue(self):
pass
class NotificationAMQPIncomingMessage(AMQPIncomingMessage):
def acknowledge(self):
def _do_ack():
try:
self.message.acknowledge()
except Exception as exc:
# NOTE(kgiusti): this failure is likely due to a loss of the
# connection to the broker. Not much we can do in this case,
# especially considering the Notification has already been
# dispatched. This *could* result in message duplication
# (unacked msg is returned to the queue by the broker), but the
# driver tries to catch that using the msg_id_cache.
LOG.warning("Failed to acknowledge received message: %s", exc)
self._message_operations_handler.do(_do_ack)
self.listener.msg_id_cache.add(self.unique_id)
def requeue(self):
@ -178,12 +204,12 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
# msg_id_cache, the message will be reconsumed, the only difference is
# the message stay at the beginning of the queue instead of moving to
# the end.
self._message_operations_handler.do(self.message.requeue)
def heartbeat(self):
with self.listener.driver._get_connection(
rpc_common.PURPOSE_SEND) as conn:
self._send_reply(conn, None, None, ending=False)
def _do_requeue():
try:
self.message.requeue()
except Exception as exc:
LOG.warning("Failed to requeue received message: %s", exc)
self._message_operations_handler.do(_do_requeue)
class ObsoleteReplyQueuesCache(object):
@ -256,7 +282,7 @@ class AMQPListener(base.PollStyleListener):
else:
LOG.debug("received message with unique_id: %s", unique_id)
self.incoming.append(AMQPIncomingMessage(
self.incoming.append(self.message_cls(
self,
ctxt.to_dict(),
message,
@ -319,6 +345,41 @@ class AMQPListener(base.PollStyleListener):
self.conn.close()
class RpcAMQPListener(AMQPListener):
message_cls = AMQPIncomingMessage
def __call__(self, message):
# NOTE(kgiusti): In the original RPC implementation the RPC server
# would acknowledge the request THEN process it. The goal of this was
# to prevent duplication if the ack failed. Should the ack fail the
# request would be discarded since the broker would not remove the
# request from the queue since no ack was received. That would lead to
# the request being redelivered at some point. However this approach
# meant that the ack was issued from the dispatch thread, not the
# consumer thread, which is bad since kombu is not thread safe. So a
# change was made to schedule the ack to be sent on the consumer thread
# - breaking the ability to catch ack errors before dispatching the
# request. To fix this we do the actual ack here in the consumer
# callback and avoid the upcall if the ack fails. See
# https://bugs.launchpad.net/oslo.messaging/+bug/1695746
# for all the gory details...
try:
message.acknowledge()
except Exception as exc:
LOG.warning("Discarding RPC request due to failed acknowlege: %s",
exc)
else:
# NOTE(kgiusti): be aware that even if the acknowledge call
# succeeds there is no guarantee the broker actually gets the ACK
# since acknowledge() simply writes the ACK to the socket (there is
# no ACK confirmation coming back from the broker)
super(RpcAMQPListener, self).__call__(message)
class NotificationAMQPListener(AMQPListener):
message_cls = NotificationAMQPIncomingMessage
class ReplyWaiters(object):
WAKE_UP = object()
@ -590,7 +651,7 @@ class AMQPDriverBase(base.BaseDriver):
def listen(self, target, batch_size, batch_timeout):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
listener = RpcAMQPListener(self, conn)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic=target.topic,
@ -608,7 +669,7 @@ class AMQPDriverBase(base.BaseDriver):
batch_size, batch_timeout):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
listener = NotificationAMQPListener(self, conn)
for target, priority in targets_and_priorities:
conn.declare_topic_consumer(
exchange_name=self._get_exchange(target),

View File

@ -152,6 +152,9 @@ class RPCServer(msg_server.MessageHandlingServer):
def _process_incoming(self, incoming):
message = incoming[0]
# TODO(sileht): We should remove that at some point and do
# this directly in the driver
try:
message.acknowledge()
except Exception: