RabbitMQ: advance thru the list of brokers on reconnect
In RabbitMQ implementation, when using multiple rabbit_hosts, we don't want to
immediately retry failed connection for the same failed broker. This was not
the case in existing implementation though, where we've always attempted to
reconnect starting from the first broker in the list of candidates. So if the
first broker failed, we initiated reconnect to the same failed broker.
This change makes reconnect() implementation to select the next broker in the
list. This also means that non-failure reconnect attempts will also switch the
current broker. All in all, users should not rely on any particular order to
use brokers from the list, so this should not constitute an issue.
This is a backport of 71c6866471d628b207d7c5b84bfd37cc9fed0898 from
oslo.messaging.
Change-Id: I67923cb024bbd143edc8edccf35b9b400df31eb3
Partial-Bug: 1261631
(cherry picked from commit abe3e5ee99
)
This commit is contained in:
parent
ad4dfa7171
commit
2887edbdba
|
@ -459,6 +459,9 @@ class Connection(object):
|
|||
|
||||
self.params_list = params_list
|
||||
|
||||
brokers_count = len(self.params_list)
|
||||
self.next_broker_indices = itertools.cycle(range(brokers_count))
|
||||
|
||||
self.memory_transport = self.conf.fake_rabbit
|
||||
|
||||
self.connection = None
|
||||
|
@ -529,7 +532,7 @@ class Connection(object):
|
|||
|
||||
attempt = 0
|
||||
while True:
|
||||
params = self.params_list[attempt % len(self.params_list)]
|
||||
params = self.params_list[next(self.next_broker_indices)]
|
||||
attempt += 1
|
||||
try:
|
||||
self._connect(params)
|
||||
|
|
|
@ -803,7 +803,8 @@ class RpcKombuHATestCase(test_base.BaseTestCase):
|
|||
|
||||
def connect(myself):
|
||||
if info['attempt'] < 5:
|
||||
# the word timeout is important (see impl_kombu.py:486)
|
||||
# the word timeout is important;
|
||||
# see impl_kombu.py:reconnect()
|
||||
raise Exception('connection timeout')
|
||||
super(kombu.connection.BrokerConnection, myself).connect()
|
||||
|
||||
|
@ -838,3 +839,35 @@ class RpcKombuHATestCase(test_base.BaseTestCase):
|
|||
with contextlib.closing(
|
||||
self.rpc.create_connection(self.FLAGS)) as conn:
|
||||
conn.declare_topic_consumer('a_topic', lambda *args: None)
|
||||
|
||||
def test_reconnect_order(self):
|
||||
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
|
||||
brokers_count = len(brokers)
|
||||
|
||||
info = {'attempt': 0}
|
||||
|
||||
class MyConnection(kombu.connection.BrokerConnection):
|
||||
def __init__(myself, *args, **params):
|
||||
super(MyConnection, myself).__init__(*args, **params)
|
||||
self.assertEqual(params['hostname'],
|
||||
brokers[info['attempt'] % brokers_count])
|
||||
try:
|
||||
if info['attempt']:
|
||||
raise Exception('connection timeout')
|
||||
finally:
|
||||
info['attempt'] += 1
|
||||
|
||||
self.stubs.Set(kombu.connection, 'BrokerConnection', MyConnection)
|
||||
|
||||
self.config(rabbit_hosts=brokers, rabbit_max_retries=1)
|
||||
|
||||
# starting from the first broker in the list
|
||||
connection = self.rpc.create_connection(self.FLAGS)
|
||||
|
||||
# reconnect will advance to the next broker, one broker per attempt,
|
||||
# and finally wrap back to the start of the list once its end is
|
||||
# reached
|
||||
for i in range(1, brokers_count) + [0]:
|
||||
self.assertRaises(rpc_common.RPCException, connection.reconnect)
|
||||
|
||||
connection.close()
|
||||
|
|
Loading…
Reference in New Issue