Select AMQP message broker at random

The rule of choosing AMQP message broker is that chose first
available one in order now. The order depends on what we set in
configuration file. That means all the connections will flock
to same message broker and that may lead out performance issue.
This patch randomizes the order of choosing message broker for
each connection to leverage broker cluster.

Change-Id: Ib5098e574d4ef81428065885e2295d0f87aba715
Partial-Bug: #1261631
This commit is contained in:
ChangBo Guo(gcb) 2014-03-21 09:49:32 +08:00
parent c90581937c
commit 8ae1880f7a
4 changed files with 11 additions and 14 deletions

View File

@ -16,6 +16,7 @@
import functools
import itertools
import logging
import random
import time
from oslo.config import cfg
@ -467,12 +468,13 @@ class Connection(object):
]
params = {
'qpid_hosts': self.conf.qpid_hosts,
'qpid_hosts': self.conf.qpid_hosts[:],
'username': self.conf.qpid_username,
'password': self.conf.qpid_password,
}
params.update(server_params or {})
random.shuffle(params['qpid_hosts'])
self.brokers = itertools.cycle(params['qpid_hosts'])
self.username = params['username']

View File

@ -15,6 +15,7 @@
import functools
import itertools
import logging
import random
import socket
import ssl
import time
@ -466,6 +467,7 @@ class Connection(object):
params_list.append(params)
random.shuffle(params_list)
self.params_list = itertools.cycle(params_list)
self.memory_transport = self.conf.fake_rabbit

View File

@ -410,8 +410,6 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
for _ in range(brokers_count):
connection.reconnect()
connection.close()
expected = []
for broker in brokers:
expected.extend([mock.call(broker),
@ -421,12 +419,7 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
mock.call().opened().__nonzero__(),
mock.call().close()])
# the last one was closed with close(), not reconnect()
expected.extend([mock.call(brokers[0]),
mock.call().open(),
mock.call().session(),
mock.call().close()])
conn_mock.assert_has_calls(expected)
conn_mock.assert_has_calls(expected, any_order=True)
def synchronized(func):

View File

@ -622,17 +622,17 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
self.conf.rabbit_hosts = brokers
self.conf.rabbit_max_retries = 1
info = {'attempt': 0}
hostname_sets = set()
def _connect(myself, params):
# do as little work that is enough to pass connection attempt
myself.connection = kombu.connection.BrokerConnection(**params)
myself.connection_errors = myself.connection.connection_errors
expected_broker = brokers[info['attempt'] % brokers_count]
self.assertEqual(expected_broker, params['hostname'])
hostname = params['hostname']
self.assertNotIn(hostname, hostname_sets)
info['attempt'] += 1
hostname_sets.add(hostname)
# just make sure connection instantiation does not fail with an
# exception
@ -645,7 +645,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
# implementation
self.stubs.UnsetAll()
for i in range(len(brokers)):
for i in range(brokers_count):
self.assertRaises(driver_common.RPCException, connection.reconnect)
connection.close()