Qpid: advance thru the list of brokers on reconnect

In Qpid implementation, when using multiple qpid_hosts, we don't want to
immediately retry failed connection for the same failed broker. This was not
the case in existing implementation though, where we've always attempted to
reconnect starting from the first broker in the list of candidates. So if the
first broker failed, we initiated reconnect to the same failed broker.

This change makes reconnect() implementation to select the next broker in the
list. This also means that non-failure reconnect attempts will also switch the
current broker. All in all, users should not rely on any particular order to
use brokers from the list, so this should not constitute an issue.

This is a backport of the following changes from oslo.messaging:
* I257c2ad6d7ebead356c0239134340975da6dbc07 (code change)
* I32e20c0e747e6489dab4b0358d422b8a9c6b982e (unit test).

Change-Id: Ia148baa6e1ec632789ac3621c85173c2c16f3918
Partial-Bug: 1261631
(cherry picked from commit 7fe95de20c)
This commit is contained in:
Ihar Hrachyshka 2014-01-16 18:22:49 +01:00
parent ad4dfa7171
commit 877f1cab32
2 changed files with 40 additions and 3 deletions

View File

@ -468,6 +468,10 @@ class Connection(object):
self.brokers = params['qpid_hosts']
self.username = params['username']
self.password = params['password']
brokers_count = len(self.brokers)
self.next_broker_indices = itertools.cycle(range(brokers_count))
self.connection_create(self.brokers[0])
self.reconnect()
@ -495,7 +499,6 @@ class Connection(object):
def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues."""
attempt = 0
delay = 1
while True:
# Close the session if necessary
@ -505,8 +508,7 @@ class Connection(object):
except qpid_exceptions.ConnectionError:
pass
broker = self.brokers[attempt % len(self.brokers)]
attempt += 1
broker = self.brokers[next(self.next_broker_indices)]
try:
self.connection_create(broker)

View File

@ -24,6 +24,7 @@ import eventlet
eventlet.monkey_patch()
import fixtures
import mock
import mox
import time
import uuid
@ -775,6 +776,40 @@ class RpcQpidTestCase(tests.utils.BaseTestCase):
connection.reconnect()
connection.close()
def test_reconnect_order(self):
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
brokers_count = len(brokers)
self.config(qpid_hosts=brokers)
with mock.patch('qpid.messaging.Connection') as conn_mock:
# starting from the first broker in the list
connection = impl_qpid.Connection(self.conf)
# reconnect will advance to the next broker, one broker per
# attempt, and then wrap to the start of the list once the end is
# reached
for _ in range(brokers_count):
connection.reconnect()
connection.close()
expected = []
for broker in brokers:
expected.extend([mock.call(broker),
mock.call().open(),
mock.call().session(),
mock.call().opened(),
mock.call().opened().__nonzero__(),
mock.call().close()])
# the last one was closed with close(), not reconnect()
expected.extend([mock.call(brokers[0]),
mock.call().open(),
mock.call().session(),
mock.call().close()])
conn_mock.assert_has_calls(expected)
#
#from nova.tests.rpc import common