rabbit: smarter declaration of the notif. queue
The NotifyPublisher was redeclaring again and again, the same exchange and queue each times a notification is sent. This change fixes that by caching the already declared exchange and queue for each channel. Also, to make the test pass. 'Connection.ensure' have been updated to have the same behavior for amqp and memory driver about kombu recoverable_errors. And the hostname and port of the memory driver are set to not fail when we print a log message. Closes bug: #1437902 Change-Id: I20d133ac67b8a8a4c51d51b6a1b2369aa44ffe2f
This commit is contained in:
parent
415db68b67
commit
1ba55c03b5
|
@ -12,6 +12,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
|
@ -295,23 +296,33 @@ class DeclareQueuePublisher(Publisher):
|
|||
them yet. If the future consumer binds the default queue it can retrieve
|
||||
missing messages.
|
||||
"""
|
||||
# FIXME(sileht): The side effect of this is that we declare again and
|
||||
# again the same queue, and generate a lot of useless rabbit traffic.
|
||||
# https://bugs.launchpad.net/oslo.messaging/+bug/1437902
|
||||
|
||||
DECLARED_QUEUES = collections.defaultdict(set)
|
||||
|
||||
def send(self, conn, msg, timeout=None):
|
||||
queue = kombu.entity.Queue(
|
||||
channel=conn.channel,
|
||||
exchange=self.exchange,
|
||||
durable=self.durable,
|
||||
auto_delete=self.auto_delete,
|
||||
name=self.routing_key,
|
||||
routing_key=self.routing_key,
|
||||
queue_arguments=self.queue_arguments)
|
||||
queue.declare()
|
||||
queue_indentifier = (self.exchange_name,
|
||||
self.routing_key)
|
||||
# NOTE(sileht): We only do it once per reconnection
|
||||
# the Connection._set_current_channel() is responsible to clear
|
||||
# this cache
|
||||
if queue_indentifier not in self.DECLARED_QUEUES[conn.channel]:
|
||||
queue = kombu.entity.Queue(
|
||||
channel=conn.channel,
|
||||
exchange=self.exchange,
|
||||
durable=self.durable,
|
||||
auto_delete=self.auto_delete,
|
||||
name=self.routing_key,
|
||||
routing_key=self.routing_key,
|
||||
queue_arguments=self.queue_arguments)
|
||||
queue.declare()
|
||||
self.DECLARED_QUEUES[conn.channel].add(queue_indentifier)
|
||||
super(DeclareQueuePublisher, self).send(
|
||||
conn, msg, timeout)
|
||||
|
||||
@classmethod
|
||||
def reset_cache(cls, channel):
|
||||
cls.DECLARED_QUEUES.pop(channel, None)
|
||||
|
||||
|
||||
class RetryOnMissingExchangePublisher(Publisher):
|
||||
"""Publisher that retry during 60 seconds if the exchange is missing."""
|
||||
|
@ -577,6 +588,9 @@ class Connection(object):
|
|||
if self._url.startswith('memory://'):
|
||||
# Kludge to speed up tests.
|
||||
self.connection.transport.polling_interval = 0.0
|
||||
# Fixup logging
|
||||
self.connection.hostname = "memory_driver"
|
||||
self.connection.port = 1234
|
||||
self._poll_timeout = 0.05
|
||||
|
||||
# FIXME(markmc): use oslo sslutils when it is available as a library
|
||||
|
@ -715,8 +729,18 @@ class Connection(object):
|
|||
self._set_current_channel(channel)
|
||||
method()
|
||||
|
||||
recoverable_errors = (self.connection.recoverable_channel_errors +
|
||||
self.connection.recoverable_connection_errors)
|
||||
# NOTE(sileht): Some dummy driver like the in-memory one doesn't
|
||||
# have notion of recoverable connection, so we must raise the original
|
||||
# exception like kombu does in this case.
|
||||
has_modern_errors = hasattr(
|
||||
self.connection.transport, 'recoverable_connection_errors',
|
||||
)
|
||||
if has_modern_errors:
|
||||
recoverable_errors = (
|
||||
self.connection.recoverable_channel_errors +
|
||||
self.connection.recoverable_connection_errors)
|
||||
else:
|
||||
recoverable_errors = ()
|
||||
|
||||
try:
|
||||
autoretry_method = self.connection.autoretry(
|
||||
|
@ -756,6 +780,7 @@ class Connection(object):
|
|||
NOTE(sileht): Must be called within the connection lock
|
||||
"""
|
||||
if self.channel is not None and new_channel != self.channel:
|
||||
DeclareQueuePublisher.reset_cache(self.channel)
|
||||
self.connection.maybe_close_channel(self.channel)
|
||||
self.channel = new_channel
|
||||
|
||||
|
|
|
@ -173,6 +173,54 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
|
|||
heartbeat=0, failover_strategy="shuffle")
|
||||
|
||||
|
||||
class RaiseOnNoExchangePublisher(rabbit_driver.Publisher):
|
||||
passive = True
|
||||
|
||||
|
||||
class TestRabbitPublisher(test_utils.BaseTestCase):
|
||||
|
||||
def test_declared_queue_publisher(self):
|
||||
transport = oslo_messaging.get_transport(self.conf,
|
||||
'kombu+memory:////')
|
||||
self.addCleanup(transport.cleanup)
|
||||
|
||||
p1 = RaiseOnNoExchangePublisher(
|
||||
self.conf.oslo_messaging_rabbit,
|
||||
exchange_name='foobar',
|
||||
routing_key='foobar',
|
||||
type='topic',
|
||||
durable=False,
|
||||
auto_delete=False)
|
||||
|
||||
p2 = rabbit_driver.DeclareQueuePublisher(
|
||||
self.conf.oslo_messaging_rabbit,
|
||||
exchange_name='foobar',
|
||||
routing_key='foobar',
|
||||
type='topic',
|
||||
durable=False,
|
||||
auto_delete=False)
|
||||
|
||||
with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
|
||||
conn = pool_conn.connection
|
||||
exc = conn.connection.channel_errors[0]
|
||||
# Ensure the exchange does not exists
|
||||
self.assertRaises(exc, conn.publisher_send, p1, {})
|
||||
# Creates it
|
||||
conn.publisher_send(p2, {})
|
||||
# Ensure it creates it
|
||||
conn.publisher_send(p1, {})
|
||||
|
||||
with mock.patch('kombu.messaging.Producer',
|
||||
side_effect=exc):
|
||||
# Shoud reset the cache and ensures the exchange does
|
||||
# not exitsts
|
||||
self.assertRaises(exc, conn.publisher_send, p1, {})
|
||||
# Recreates it
|
||||
conn.publisher_send(p2, {})
|
||||
# Ensure it have been recreated
|
||||
conn.publisher_send(p1, {})
|
||||
|
||||
|
||||
class TestRabbitConsume(test_utils.BaseTestCase):
|
||||
|
||||
def test_consume_timeout(self):
|
||||
|
|
Loading…
Reference in New Issue