rabbit: smarter declaration of the notif. queue

The NotifyPublisher was redeclaring again and again,
the same exchange and queue each times a notification is sent.

This change fixes that by caching the already declared exchange
and queue for each channel.

Also, to make the test pass. 'Connection.ensure' have been updated
to have the same behavior for amqp and memory driver about
kombu recoverable_errors. And the hostname and port of the memory
driver are set to not fail when we print a log message.

Closes bug: #1437902

Change-Id: I20d133ac67b8a8a4c51d51b6a1b2369aa44ffe2f
This commit is contained in:
Mehdi Abaakouk 2015-04-30 23:33:39 +02:00 committed by Mehdi Abaakouk
parent 415db68b67
commit 1ba55c03b5
2 changed files with 87 additions and 14 deletions

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import logging
import os
@ -295,23 +296,33 @@ class DeclareQueuePublisher(Publisher):
them yet. If the future consumer binds the default queue it can retrieve
missing messages.
"""
# FIXME(sileht): The side effect of this is that we declare again and
# again the same queue, and generate a lot of useless rabbit traffic.
# https://bugs.launchpad.net/oslo.messaging/+bug/1437902
DECLARED_QUEUES = collections.defaultdict(set)
def send(self, conn, msg, timeout=None):
queue = kombu.entity.Queue(
channel=conn.channel,
exchange=self.exchange,
durable=self.durable,
auto_delete=self.auto_delete,
name=self.routing_key,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
queue.declare()
queue_indentifier = (self.exchange_name,
self.routing_key)
# NOTE(sileht): We only do it once per reconnection
# the Connection._set_current_channel() is responsible to clear
# this cache
if queue_indentifier not in self.DECLARED_QUEUES[conn.channel]:
queue = kombu.entity.Queue(
channel=conn.channel,
exchange=self.exchange,
durable=self.durable,
auto_delete=self.auto_delete,
name=self.routing_key,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
queue.declare()
self.DECLARED_QUEUES[conn.channel].add(queue_indentifier)
super(DeclareQueuePublisher, self).send(
conn, msg, timeout)
@classmethod
def reset_cache(cls, channel):
cls.DECLARED_QUEUES.pop(channel, None)
class RetryOnMissingExchangePublisher(Publisher):
"""Publisher that retry during 60 seconds if the exchange is missing."""
@ -577,6 +588,9 @@ class Connection(object):
if self._url.startswith('memory://'):
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
# Fixup logging
self.connection.hostname = "memory_driver"
self.connection.port = 1234
self._poll_timeout = 0.05
# FIXME(markmc): use oslo sslutils when it is available as a library
@ -715,8 +729,18 @@ class Connection(object):
self._set_current_channel(channel)
method()
recoverable_errors = (self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
# NOTE(sileht): Some dummy driver like the in-memory one doesn't
# have notion of recoverable connection, so we must raise the original
# exception like kombu does in this case.
has_modern_errors = hasattr(
self.connection.transport, 'recoverable_connection_errors',
)
if has_modern_errors:
recoverable_errors = (
self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
else:
recoverable_errors = ()
try:
autoretry_method = self.connection.autoretry(
@ -756,6 +780,7 @@ class Connection(object):
NOTE(sileht): Must be called within the connection lock
"""
if self.channel is not None and new_channel != self.channel:
DeclareQueuePublisher.reset_cache(self.channel)
self.connection.maybe_close_channel(self.channel)
self.channel = new_channel

View File

@ -173,6 +173,54 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
heartbeat=0, failover_strategy="shuffle")
class RaiseOnNoExchangePublisher(rabbit_driver.Publisher):
passive = True
class TestRabbitPublisher(test_utils.BaseTestCase):
def test_declared_queue_publisher(self):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
self.addCleanup(transport.cleanup)
p1 = RaiseOnNoExchangePublisher(
self.conf.oslo_messaging_rabbit,
exchange_name='foobar',
routing_key='foobar',
type='topic',
durable=False,
auto_delete=False)
p2 = rabbit_driver.DeclareQueuePublisher(
self.conf.oslo_messaging_rabbit,
exchange_name='foobar',
routing_key='foobar',
type='topic',
durable=False,
auto_delete=False)
with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
conn = pool_conn.connection
exc = conn.connection.channel_errors[0]
# Ensure the exchange does not exists
self.assertRaises(exc, conn.publisher_send, p1, {})
# Creates it
conn.publisher_send(p2, {})
# Ensure it creates it
conn.publisher_send(p1, {})
with mock.patch('kombu.messaging.Producer',
side_effect=exc):
# Shoud reset the cache and ensures the exchange does
# not exitsts
self.assertRaises(exc, conn.publisher_send, p1, {})
# Recreates it
conn.publisher_send(p2, {})
# Ensure it have been recreated
conn.publisher_send(p1, {})
class TestRabbitConsume(test_utils.BaseTestCase):
def test_consume_timeout(self):