kombu: fix driver loading with kombu+qpid scheme
When a url looks like: kombu+qpid:///host:port/ the driver fail to load. This change fixes that, adds a warning message that this kind of URL is experimental and not yet supported. Also our Consumer code use internal kombu API that can be optionnal implemented by the kombu transport (message_to_python), so check that this one exists before using it. In the future, we should use kombu.messaging.Consumer/Publisher instead of Consumer/Publisher implementation to avoid such hack... Change-Id: I066d57c23bff922c5734ab036b6ca8e1608e5c6a
This commit is contained in:
parent
2a35b290b1
commit
3c40cee36c
|
@ -36,6 +36,7 @@ from oslo_messaging._drivers import amqpdriver
|
|||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._i18n import _
|
||||
from oslo_messaging._i18n import _LI
|
||||
from oslo_messaging._i18n import _LW
|
||||
from oslo_messaging import exceptions
|
||||
|
||||
|
||||
|
@ -213,8 +214,10 @@ class ConsumerBase(object):
|
|||
if not callback:
|
||||
raise ValueError("No callback defined")
|
||||
|
||||
def _callback(raw_message):
|
||||
message = self.channel.message_to_python(raw_message)
|
||||
def _callback(message):
|
||||
m2p = getattr(self.channel, 'message_to_python', None)
|
||||
if m2p:
|
||||
message = m2p(message)
|
||||
self._callback_handler(message, callback)
|
||||
|
||||
self.queue.consume(*args, callback=_callback, **options)
|
||||
|
@ -469,9 +472,13 @@ class Connection(object):
|
|||
"driver instead.")
|
||||
self._url = 'memory://%s/' % virtual_host
|
||||
elif url.hosts:
|
||||
if url.transport.startswith('kombu+'):
|
||||
LOG.warn(_LW('Selecting the kombu transport through the '
|
||||
'transport url (%s) is a experimental feature '
|
||||
'and this is not yet supported.') % url.transport)
|
||||
for host in url.hosts:
|
||||
transport = url.transport.replace('kombu+', '')
|
||||
transport = url.transport.replace('rabbit', 'amqp')
|
||||
transport = transport.replace('rabbit', 'amqp')
|
||||
self._url += '%s%s://%s:%s@%s:%s/%s' % (
|
||||
";" if self._url else '',
|
||||
transport,
|
||||
|
@ -506,14 +513,12 @@ class Connection(object):
|
|||
failover_strategy="shuffle")
|
||||
|
||||
LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)d'),
|
||||
{'hostname': self.connection.hostname,
|
||||
'port': self.connection.port})
|
||||
self.connection.info())
|
||||
# NOTE(sileht): just ensure the connection is setuped at startup
|
||||
self.ensure(error_callback=None,
|
||||
method=lambda channel: True)
|
||||
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'),
|
||||
{'hostname': self.connection.hostname,
|
||||
'port': self.connection.port})
|
||||
self.connection.info())
|
||||
|
||||
if self._url.startswith('memory://'):
|
||||
# Kludge to speed up tests.
|
||||
|
@ -599,16 +604,15 @@ class Connection(object):
|
|||
interval = (self.conf.kombu_reconnect_delay + interval
|
||||
if self.conf.kombu_reconnect_delay > 0 else interval)
|
||||
|
||||
info = {'hostname': self.connection.hostname,
|
||||
'port': self.connection.port,
|
||||
'err_str': exc, 'sleep_time': interval}
|
||||
info = {'err_str': exc, 'sleep_time': interval}
|
||||
info.update(self.connection.info())
|
||||
|
||||
if 'Socket closed' in six.text_type(exc):
|
||||
LOG.error(_('AMQP server %(hostname)s:%(port)s closed'
|
||||
LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
|
||||
' the connection. Check login credentials:'
|
||||
' %(err_str)s'), info)
|
||||
else:
|
||||
LOG.error(_('AMQP server on %(hostname)s:%(port)s is '
|
||||
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
|
||||
'unreachable: %(err_str)s. Trying again in '
|
||||
'%(sleep_time)d seconds.'), info)
|
||||
|
||||
|
|
|
@ -130,6 +130,12 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
|
|||
expected=["amqp://user:password@host:10/virtual_host",
|
||||
"amqp://user2:password2@host2:12/virtual_host"]
|
||||
)),
|
||||
('qpid',
|
||||
dict(url='kombu+qpid://user:password@host:10/virtual_host',
|
||||
expected=['qpid://user:password@host:10/virtual_host'])),
|
||||
('rabbit',
|
||||
dict(url='kombu+rabbit://user:password@host:10/virtual_host',
|
||||
expected=['amqp://user:password@host:10/virtual_host'])),
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
|
@ -143,7 +149,13 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
|
|||
self.addCleanup(transport.cleanup)
|
||||
driver = transport._driver
|
||||
|
||||
urls = driver._get_connection()._url.split(";")
|
||||
# NOTE(sileht): some kombu transport can depend on library that
|
||||
# we don't want to depend yet, because selecting the transport
|
||||
# is experimental, only amqp is supported
|
||||
# for example kombu+qpid depends of qpid-tools
|
||||
# so, mock the connection.info to skip call to qpid-tools
|
||||
with mock.patch('kombu.connection.Connection.info'):
|
||||
urls = driver._get_connection()._url.split(";")
|
||||
self.assertEqual(sorted(self.expected), sorted(urls))
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue