Properly handle transport URL config on the client

On the client side, in the rabbit and qpid drivers, we use a connection
pool to avoid opening a connection for each message we send. However,
there is only currently one connection pool per process:

 def get_connection_pool(conf, connection_cls):
     with _pool_create_sem:
         # Make sure only one thread tries to create the connection pool.
         if not connection_cls.pool:
             connection_cls.pool = ConnectionPool(conf, connection_cls)
     return connection_cls.pool

This is a nasty artifact of the original RPC having no conectp of a
transport context - everything was a global. We'll fix this soon enough.

In the meantime, we need to make sure we only use this connection pool
where we're not using the default transport configuration from the
config file - i.e. where we supply a transport URL.

The use case here is cells - we send messages to a remote cell by
connecting to it using a transport URL. In our devstack testing, the
two cells are on the same Rabbit broker but under different virtual
hosts. Because we were always using the connection pool on the client
side, we were seeing both cells always send messages to the '/' virtual
host.

Note - avoiding the connection pool in the case of cells is the same
behaviour as the current RPC code:

 def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
     ...
     with ConnectionContext(conf, connection_pool, pooled=False,
                            server_params=server_params) as conn:

Change-Id: I2f35b45ef237bb85ab8faf58a408c03fcb1de9d7
This commit is contained in:
Mark McLoughlin 2013-10-23 07:28:58 +01:00
parent 9853fc6ac5
commit 7914181398
2 changed files with 25 additions and 14 deletions

View File

@ -309,10 +309,16 @@ class AMQPDriverBase(base.BaseDriver):
return sp
def _get_connection(self, pooled=True):
# FIXME(markmc): we don't yet have a connection pool for each
# Transport instance, so we'll only use the pool with the
# transport configuration from the config file
server_params = self._server_params or None
if server_params:
pooled = False
return rpc_amqp.ConnectionContext(self.conf,
self._connection_pool,
pooled=pooled,
server_params=self._server_params)
server_params=server_params)
def _get_reply_q(self):
with self._reply_q_lock:

View File

@ -48,7 +48,7 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
class TestRabbitTransportURL(test_utils.BaseTestCase):
scenarios = [
('none', dict(url=None, expected={})),
('none', dict(url=None, expected=None)),
('empty',
dict(url='rabbit:///',
expected=dict(virtual_host=''))),
@ -84,28 +84,33 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
def setUp(self):
super(TestRabbitTransportURL, self).setUp()
self.messaging_conf.transport_driver = 'rabbit'
self.messaging_conf.in_memory = True
def test_transport_url(self):
self._server_params = []
cnx_init = rabbit_driver.Connection.__init__
passed_params = []
def record_params(self, conf, server_params=None):
passed_params.append(server_params)
return cnx_init(self, conf, server_params)
def record_params(cnx, conf, server_params=None):
self._server_params.append(server_params)
return cnx_init(cnx, conf, server_params)
def dummy_send(cnx, topic, msg, timeout=None):
pass
self.stubs.Set(rabbit_driver.Connection, '__init__', record_params)
self.stubs.Set(rabbit_driver.Connection, 'topic_send', dummy_send)
transport = messaging.get_transport(self.conf, self.url)
self._driver = messaging.get_transport(self.conf, self.url)._driver
self._target = messaging.Target(topic='testtopic')
driver = transport._driver
def test_transport_url_listen(self):
self._driver.listen(self._target)
self.assertEqual(self._server_params[0], self.expected)
target = messaging.Target(topic='testtopic')
driver.listen(target)
self.assertEqual(passed_params[0], self.expected)
def test_transport_url_send(self):
self._driver.send(self._target, {}, {})
self.assertEqual(self._server_params[0], self.expected)
class TestSendReceive(test_utils.BaseTestCase):