Do not leak _unique_id out of amqp drivers

In commit d8d2ad9 we delayed when a message's unique ID gets added to
the duplicate message cache in order to allow for message requeueing.

However, as part of this, we exposed the private _unique_id field
outside of the driver. This commit reverses that change by storing
the ID in the AMQPIncomingMessage object.

Change-Id: Ibeb7896de7ad9abf3c6a43495c1a87aabb762c0d
This commit is contained in:
Mark McLoughlin 2014-03-03 06:58:35 -08:00
parent d8d2ad95d7
commit 5464229e63
3 changed files with 15 additions and 16 deletions

View File

@ -318,16 +318,17 @@ class _MsgIdCache(object):
"""AMQP consumers may read same message twice when exceptions occur
before ack is returned. This method prevents doing it.
"""
if UNIQUE_ID in message_data:
msg_id = message_data.get(UNIQUE_ID)
if msg_id in self.prev_msgids:
raise rpc_common.DuplicateMessageError(msg_id=msg_id)
def add(self, message_data):
if UNIQUE_ID in message_data:
try:
msg_id = message_data.pop(UNIQUE_ID)
if msg_id not in self.prev_msgids:
self.prev_msgids.append(msg_id)
except KeyError:
return
if msg_id in self.prev_msgids:
raise rpc_common.DuplicateMessageError(msg_id=msg_id)
return msg_id
def add(self, msg_id):
if msg_id and msg_id not in self.prev_msgids:
self.prev_msgids.append(msg_id)
def _add_unique_id(msg):

View File

@ -31,10 +31,11 @@ LOG = logging.getLogger(__name__)
class AMQPIncomingMessage(base.IncomingMessage):
def __init__(self, listener, ctxt, message, msg_id, reply_q):
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
super(AMQPIncomingMessage, self).__init__(listener, ctxt,
dict(message))
self.unique_id = unique_id
self.msg_id = msg_id
self.reply_q = reply_q
self.acknowledge_callback = message.acknowledge
@ -67,7 +68,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
self._send_reply(conn, ending=True)
def acknowledge(self):
self.listener.msg_id_cache.add(self.message)
self.listener.msg_id_cache.add(self.unique_id)
self.acknowledge_callback()
def requeue(self):
@ -92,12 +93,13 @@ class AMQPListener(base.Listener):
# FIXME(markmc): logging isn't driver specific
rpc_common._safe_log(LOG.debug, 'received %s', dict(message))
self.msg_id_cache.check_duplicate_message(message)
unique_id = self.msg_id_cache.check_duplicate_message(message)
ctxt = rpc_amqp.unpack_context(self.conf, message)
self.incoming.append(AMQPIncomingMessage(self,
ctxt.to_dict(),
message,
unique_id,
ctxt.msg_id,
ctxt.reply_q))

View File

@ -206,7 +206,6 @@ class TestSendReceive(test_utils.BaseTestCase):
senders[i].start()
received = listener.poll()
received.message.pop('_unique_id')
self.assertIsNotNone(received)
self.assertEqual(received.ctxt, self.ctxt)
self.assertEqual(received.message, {'tx_id': i})
@ -303,14 +302,12 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
senders[0].start()
msgs.append(listener.poll())
msgs[-1].message.pop('_unique_id')
self.assertEqual(msgs[-1].message, {'tx_id': 0})
# Start the second guy, receive his message
senders[1].start()
msgs.append(listener.poll())
msgs[-1].message.pop('_unique_id')
self.assertEqual(msgs[-1].message, {'tx_id': 1})
# Reply to both in order, making the second thread queue
@ -605,7 +602,6 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
producer.publish(msg)
received = listener.poll()
received.message.pop('_unique_id')
self.assertIsNotNone(received)
self.assertEqual(self.expected_ctxt, received.ctxt)
self.assertEqual(self.expected, received.message)